Skip to content

Commit

Permalink
Address PR review concerns and fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Oct 7, 2020
1 parent ea9597f commit 53e0dd1
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 19 deletions.
5 changes: 5 additions & 0 deletions src/main/asciidoc/dataobjects.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ Do the same thing as link. Use it instead.
|[[keepAliveTimeSeconds]]`@keepAliveTimeSeconds`|`Number (int)`|+++
Set the keep alive timeout in seconds
+++
|[[keepAliveTimeout]]`@keepAliveTimeout`|`Number (int)`|+++
Set the time period in seconds after which unacknowledged pings will close the connection.
<p/>
Setting <code>0</code> will simply expire pings but won't close the connection.
+++
|[[localAddress]]`@localAddress`|`String`|-
|[[logActivity]]`@logActivity`|`Boolean`|-
|[[maxInflightQueue]]`@maxInflightQueue`|`Number (int)`|+++
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ private void initChannel(ChannelPipeline pipeline) {
}
});

if(this.options.getKeepAliveTimeout() > 0) {
if (this.options.getKeepAliveTimeout() > 0) {
int keepAliveTimeout = keepAliveInterval + this.options.getKeepAliveTimeout();
// handler for ping-response timeout. connection will be closed if broker READER_IDLE extends timeout
pipeline.addBefore("handler", "idleTimeout", new IdleStateHandler(keepAliveTimeout, 0, 0));
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/mqtt/impl/MqttServerConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ private void handleConnect(MqttConnectMessage msg) {
if (msg.variableHeader().keepAliveTimeSeconds() != 0) {

// the server waits for one and a half times the keep alive time period (MQTT spec)
int keepAliveTimeout =(int) (msg.variableHeader().keepAliveTimeSeconds() * 1.5);
int keepAliveTimeout = (int)(msg.variableHeader().keepAliveTimeSeconds() * 1.5);

// modifying the channel pipeline for adding the idle state handler with previous timeout
chctx.pipeline().addBefore("handler", "idle", new IdleStateHandler(keepAliveTimeout, 0, 0));
Expand Down
3 changes: 2 additions & 1 deletion src/test/java/io/vertx/mqtt/impl/MqttClientImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.vertx.core.impl.CloseFuture;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -91,7 +92,7 @@ public void setUp() {
return null;
}).when(context).runOnContext(any(Handler.class));
when(vertx.getOrCreateContext()).thenReturn(context);
when(vertx.createNetClient(any(NetClientOptions.class))).thenReturn(netClient);
when(vertx.createNetClient(any(NetClientOptions.class), any(CloseFuture.class))).thenReturn(netClient);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import static org.junit.Assert.assertEquals;

/**
* MQTT client keep alive tests using a Vert.x MQTT server to accomodate testing.
* MQTT client keep alive tests using a Vert.x MQTT server to accommodate testing.
*/
@RunWith(VertxUnitRunner.class)
public class MqttClientKeepAliveTest {
Expand All @@ -47,7 +47,7 @@ public class MqttClientKeepAliveTest {

private void startServer(TestContext ctx) {
Async async = ctx.async();
server.listen(1884, "localhost", ctx.asyncAssertSuccess(server -> async.complete()));
server.listen(ctx.asyncAssertSuccess(server -> async.complete()));
async.awaitSuccess(10000);
}

Expand All @@ -59,7 +59,9 @@ public void before(TestContext ctx) {

@After
public void after(TestContext ctx) {
vertx.close(ctx.asyncAssertSuccess());
server.close(ctx.asyncAssertSuccess(v -> {
vertx.close(ctx.asyncAssertSuccess());
}));
}

@Test
Expand All @@ -71,18 +73,16 @@ public void clientWillDisconnectOnMissingPingResponse(TestContext ctx) {
endpoint.pingHandler(v -> pings.incrementAndGet());
});
startServer(ctx);
Async async = ctx.async();
MqttClientOptions options = new MqttClientOptions();
options.setKeepAliveTimeSeconds(2);
options.setKeepAliveTimeout(1);
MqttClient client = MqttClient.create(vertx, options);
client.connect(1884, "localhost", ctx.asyncAssertSuccess(ack -> {
client.connect(MqttClientOptions.DEFAULT_PORT, MqttClientOptions.DEFAULT_HOST, ctx.asyncAssertSuccess(ack -> {
Async async = ctx.async();
client.closeHandler(v -> {
assertEquals(1, pings.get());
async.complete();
});
}));
async.await();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,10 @@ public void before(TestContext context) {
}

@After
public void after() {
this.server.close();
this.vertx.close();
public void after(TestContext ctx) {
this.server.close(ctx.asyncAssertSuccess(v -> {
this.vertx.close(ctx.asyncAssertSuccess());
}));
}

private static void serverLogic(MqttEndpoint mqttEndpoint) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import static org.junit.Assert.assertEquals;

/**
* MQTT client keep alive tests using a Vert.x MQTT server to accomodate testing.
* MQTT server keep alive tests using a Vert.x MQTT server to accommodate testing.
*/
@RunWith(VertxUnitRunner.class)
public class MqttServerKeepAliveTest {
Expand All @@ -43,7 +43,7 @@ public class MqttServerKeepAliveTest {

private void startServer(TestContext ctx) {
Async async = ctx.async();
server.listen(1884, "localhost", ctx.asyncAssertSuccess(server -> async.complete()));
server.listen(ctx.asyncAssertSuccess(server -> async.complete()));
async.awaitSuccess(10000);
}

Expand All @@ -55,12 +55,13 @@ public void before(TestContext ctx) {

@After
public void after(TestContext ctx) {
vertx.close(ctx.asyncAssertSuccess());
server.close(ctx.asyncAssertSuccess(v -> {
vertx.close(ctx.asyncAssertSuccess());
}));
}

@Test
public void serverWillDisconnectOnTimeout(TestContext ctx) {
Async async = ctx.async();
server.endpointHandler(endpoint -> {
endpoint.accept(false);
endpoint.pingHandler(v -> ctx.fail());
Expand All @@ -70,11 +71,11 @@ public void serverWillDisconnectOnTimeout(TestContext ctx) {
options.setAutoKeepAlive(false); // The client will manage pings manually
options.setKeepAliveTimeSeconds(2); // Tell the server to disconnects the client after 3 seconds of inactivity
MqttClient client = MqttClient.create(vertx, options);
client.connect(1884, "localhost", ctx.asyncAssertSuccess(ack -> {
client.connect(MqttClientOptions.DEFAULT_PORT, MqttClientOptions.DEFAULT_HOST, ctx.asyncAssertSuccess(ack -> {
Async async = ctx.async();
client.closeHandler(v -> {
async.complete();
});
}));
async.await();
}
}

0 comments on commit 53e0dd1

Please sign in to comment.