Skip to content

Commit

Permalink
Merge pull request #173 from vert-x3/konradmichael-issue-123
Browse files Browse the repository at this point in the history
Konradmichael issue 123
  • Loading branch information
vietj authored Oct 7, 2020
2 parents f3d109b + 338a6d2 commit 35189f0
Show file tree
Hide file tree
Showing 9 changed files with 237 additions and 18 deletions.
5 changes: 5 additions & 0 deletions src/main/asciidoc/dataobjects.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ Do the same thing as link. Use it instead.
|[[keepAliveTimeSeconds]]`@keepAliveTimeSeconds`|`Number (int)`|+++
Set the keep alive timeout in seconds
+++
|[[keepAliveTimeout]]`@keepAliveTimeout`|`Number (int)`|+++
Set the time period in seconds after which unacknowledged pings will close the connection.
<p/>
Setting <code>0</code> will simply expire pings but won't close the connection.
+++
|[[localAddress]]`@localAddress`|`String`|-
|[[logActivity]]`@logActivity`|`Boolean`|-
|[[maxInflightQueue]]`@maxInflightQueue`|`Number (int)`|+++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ public static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json,
obj.setKeepAliveTimeSeconds(((Number)member.getValue()).intValue());
}
break;
case "keepAliveTimeout":
if (member.getValue() instanceof Number) {
obj.setKeepAliveTimeout(((Number)member.getValue()).intValue());
}
break;
case "maxInflightQueue":
if (member.getValue() instanceof Number) {
obj.setMaxInflightQueue(((Number)member.getValue()).intValue());
Expand Down Expand Up @@ -108,6 +113,7 @@ public static void toJson(MqttClientOptions obj, java.util.Map<String, Object> j
json.put("clientId", obj.getClientId());
}
json.put("keepAliveTimeSeconds", obj.getKeepAliveTimeSeconds());
json.put("keepAliveTimeout", obj.getKeepAliveTimeout());
json.put("maxInflightQueue", obj.getMaxInflightQueue());
json.put("maxMessageSize", obj.getMaxMessageSize());
if (obj.getPassword() != null) {
Expand Down
27 changes: 27 additions & 0 deletions src/main/java/io/vertx/mqtt/MqttClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class MqttClientOptions extends NetClientOptions {
public static final boolean DEFAULT_WILL_RETAIN = false;
public static final int DEFAULT_MAX_MESSAGE_SIZE = -1;
public static final int DEFAULT_ACK_TIMEOUT = -1;
/** The default timeout value for an unacknowledge ping in seconds = 10 */
public static final int DEFAULT_PING_TIMEOUT_SECONDS = 10;

private String clientId;
private String username;
Expand All @@ -56,6 +58,7 @@ public class MqttClientOptions extends NetClientOptions {
private int willQoS = DEFAULT_WILL_QOS;
private boolean willRetain = DEFAULT_WILL_RETAIN;
private int keepAliveTimeSeconds = DEFAULT_KEEP_ALIVE_TIME_SECONDS;
private int keepAliveTimeout = DEFAULT_PING_TIMEOUT_SECONDS;
private boolean isAutoKeepAlive = true;
private boolean isAutoGeneratedClientId = true;
private int maxInflightQueue = DEFAULT_MAX_INFLIGHT_QUEUE;
Expand Down Expand Up @@ -95,6 +98,7 @@ public MqttClientOptions(MqttClientOptions other) {
this.willFlag = other.willFlag;
this.willQoS = other.willQoS;
this.willRetain = other.willRetain;
this.keepAliveTimeout = other.keepAliveTimeout;
this.keepAliveTimeSeconds = other.keepAliveTimeSeconds;
this.isAutoKeepAlive = other.isAutoKeepAlive;
this.isAutoGeneratedClientId = other.isAutoGeneratedClientId;
Expand Down Expand Up @@ -152,6 +156,13 @@ public int getKeepAliveTimeSeconds() {
return keepAliveTimeSeconds;
}

/**
* @return the ping timeout (in seconds)
*/
public int getKeepAliveTimeout() {
return keepAliveTimeout;
}

/**
* @return provided username
*/
Expand Down Expand Up @@ -335,6 +346,22 @@ public int getMaxInflightQueue() {
return maxInflightQueue;
}

/**
* Set the time period in seconds after which unacknowledged pings will close the connection.
* <p/>
* Setting {@code 0} will simply expire pings but won't close the connection.
*
* @param keepAliveTimeout the ping timeout
* @return current options instance
*/
public MqttClientOptions setKeepAliveTimeout(int keepAliveTimeout) {
if (keepAliveTimeout < 0) {
throw new IllegalArgumentException("The ping timeout value must be greater than 0");
}
this.keepAliveTimeout = keepAliveTimeout;
return this;
}

/**
* Set max count of unacknowledged messages
* @param maxInflightQueue max count of unacknowledged messages
Expand Down
33 changes: 22 additions & 11 deletions src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -734,21 +734,32 @@ private void initChannel(ChannelPipeline pipeline) {
if (this.options.isAutoKeepAlive() &&
this.options.getKeepAliveTimeSeconds() != 0) {

pipeline.addBefore("handler", "idle",
new IdleStateHandler(0, this.options.getKeepAliveTimeSeconds(), 0));
pipeline.addBefore("handler", "keepAliveHandler", new ChannelDuplexHandler() {

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
int keepAliveInterval = this.options.getKeepAliveTimeSeconds();

if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.WRITER_IDLE) {
// handler for sending PINGREQ (keepAlive) if reader- or writer-channel become idle
pipeline.addBefore("handler", "idle",
new IdleStateHandler(0, 0, keepAliveInterval) {
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
if (evt.state() == IdleState.ALL_IDLE) {
// verify that server is still connected (e.g. when using QoS-0)
ping();
}
}
}
});
});

if (this.options.getKeepAliveTimeout() > 0) {
int keepAliveTimeout = keepAliveInterval + this.options.getKeepAliveTimeout();
// handler for ping-response timeout. connection will be closed if broker READER_IDLE extends timeout
pipeline.addBefore("handler", "idleTimeout", new IdleStateHandler(keepAliveTimeout, 0, 0) {
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
if (evt.state() == IdleState.READER_IDLE) {
ctx.close();
}
}
});
}
}
}

Expand Down
5 changes: 2 additions & 3 deletions src/main/java/io/vertx/mqtt/impl/MqttServerConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,10 @@ private void handleConnect(MqttConnectMessage msg) {
if (msg.variableHeader().keepAliveTimeSeconds() != 0) {

// the server waits for one and a half times the keep alive time period (MQTT spec)
int timeout = msg.variableHeader().keepAliveTimeSeconds() +
msg.variableHeader().keepAliveTimeSeconds() / 2;
int keepAliveTimeout = (int)(msg.variableHeader().keepAliveTimeSeconds() * 1.5);

// modifying the channel pipeline for adding the idle state handler with previous timeout
chctx.pipeline().addBefore("handler", "idle", new IdleStateHandler(timeout, 0, 0));
chctx.pipeline().addBefore("handler", "idle", new IdleStateHandler(keepAliveTimeout, 0, 0));
chctx.pipeline().addBefore("handler", "keepAliveHandler", new ChannelDuplexHandler() {

@Override
Expand Down
3 changes: 2 additions & 1 deletion src/test/java/io/vertx/mqtt/impl/MqttClientImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.vertx.core.impl.CloseFuture;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -91,7 +92,7 @@ public void setUp() {
return null;
}).when(context).runOnContext(any(Handler.class));
when(vertx.getOrCreateContext()).thenReturn(context);
when(vertx.createNetClient(any(NetClientOptions.class))).thenReturn(netClient);
when(vertx.createNetClient(any(NetClientOptions.class), any(CloseFuture.class))).thenReturn(netClient);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2016 Red Hat Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.vertx.mqtt.test.client;

import io.vertx.core.Vertx;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.impl.MqttServerImpl;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;

import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertEquals;

/**
* MQTT client keep alive tests using a Vert.x MQTT server to accommodate testing.
*/
@RunWith(VertxUnitRunner.class)
public class MqttClientKeepAliveTest {

private Vertx vertx;
private MqttServer server;

private void startServer(TestContext ctx) {
Async async = ctx.async();
server.listen(ctx.asyncAssertSuccess(server -> async.complete()));
async.awaitSuccess(10000);
}

@Before
public void before(TestContext ctx) {
vertx = Vertx.vertx();
server = MqttServer.create(vertx);
}

@After
public void after(TestContext ctx) {
server.close(ctx.asyncAssertSuccess(v -> {
vertx.close(ctx.asyncAssertSuccess());
}));
}

@Test
public void clientWillDisconnectOnMissingPingResponse(TestContext ctx) {
AtomicInteger pings = new AtomicInteger();
server.endpointHandler(endpoint -> {
endpoint.autoKeepAlive(false); // Tell the server not to respond to PINGREQ
endpoint.accept(false);
endpoint.pingHandler(v -> pings.incrementAndGet());
});
startServer(ctx);
MqttClientOptions options = new MqttClientOptions();
options.setKeepAliveTimeSeconds(2);
options.setKeepAliveTimeout(1);
MqttClient client = MqttClient.create(vertx, options);
client.connect(MqttClientOptions.DEFAULT_PORT, MqttClientOptions.DEFAULT_HOST, ctx.asyncAssertSuccess(ack -> {
Async async = ctx.async();
client.closeHandler(v -> {
assertEquals(1, pings.get());
async.complete();
});
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,10 @@ public void before(TestContext context) {
}

@After
public void after() {
this.server.close();
this.vertx.close();
public void after(TestContext ctx) {
this.server.close(ctx.asyncAssertSuccess(v -> {
this.vertx.close(ctx.asyncAssertSuccess());
}));
}

private static void serverLogic(MqttEndpoint mqttEndpoint) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2016 Red Hat Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.vertx.mqtt.test.server;

import io.vertx.core.Vertx;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mqtt.MqttServer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertEquals;

/**
* MQTT server keep alive tests using a Vert.x MQTT server to accommodate testing.
*/
@RunWith(VertxUnitRunner.class)
public class MqttServerKeepAliveTest {

private Vertx vertx;
private MqttServer server;

private void startServer(TestContext ctx) {
Async async = ctx.async();
server.listen(ctx.asyncAssertSuccess(server -> async.complete()));
async.awaitSuccess(10000);
}

@Before
public void before(TestContext ctx) {
vertx = Vertx.vertx();
server = MqttServer.create(vertx);
}

@After
public void after(TestContext ctx) {
server.close(ctx.asyncAssertSuccess(v -> {
vertx.close(ctx.asyncAssertSuccess());
}));
}

@Test
public void serverWillDisconnectOnTimeout(TestContext ctx) {
server.endpointHandler(endpoint -> {
endpoint.accept(false);
endpoint.pingHandler(v -> ctx.fail());
});
startServer(ctx);
MqttClientOptions options = new MqttClientOptions();
options.setAutoKeepAlive(false); // The client will manage pings manually
options.setKeepAliveTimeSeconds(2); // Tell the server to disconnects the client after 3 seconds of inactivity
MqttClient client = MqttClient.create(vertx, options);
client.connect(MqttClientOptions.DEFAULT_PORT, MqttClientOptions.DEFAULT_HOST, ctx.asyncAssertSuccess(ack -> {
Async async = ctx.async();
client.closeHandler(v -> {
async.complete();
});
}));
}
}

0 comments on commit 35189f0

Please sign in to comment.