Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

integrated client side keep alive timeout #129

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/main/asciidoc/dataobjects.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ Do the same thing as link. Use it instead.
|[[keepAliveTimeSeconds]]`@keepAliveTimeSeconds`|`Number (int)`|+++
Set the keep alive timeout in seconds
+++
|[[keepAliveTimeout]]`@keepAliveTimeout`|`Number (int)`|+++
Set the time period in seconds after which unacknowledged pings will close the connection.
<p/>
Setting <code>0</code> will simply expire pings but won't close the connection.
+++
|[[keyStoreOptions]]`@keyStoreOptions`|`link:dataobjects.html#JksOptions[JksOptions]`|-
|[[localAddress]]`@localAddress`|`String`|-
|[[logActivity]]`@logActivity`|`Boolean`|-
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json,
obj.setKeepAliveTimeSeconds(((Number)member.getValue()).intValue());
}
break;
case "keepAliveTimeout":
if (member.getValue() instanceof Number) {
obj.setKeepAliveTimeout(((Number)member.getValue()).intValue());
}
break;
case "maxInflightQueue":
if (member.getValue() instanceof Number) {
obj.setMaxInflightQueue(((Number)member.getValue()).intValue());
Expand Down Expand Up @@ -100,6 +105,7 @@ public static void toJson(MqttClientOptions obj, java.util.Map<String, Object> j
json.put("clientId", obj.getClientId());
}
json.put("keepAliveTimeSeconds", obj.getKeepAliveTimeSeconds());
json.put("keepAliveTimeout", obj.getKeepAliveTimeout());
json.put("maxInflightQueue", obj.getMaxInflightQueue());
json.put("maxMessageSize", obj.getMaxMessageSize());
if (obj.getPassword() != null) {
Expand Down
27 changes: 27 additions & 0 deletions src/main/java/io/vertx/mqtt/MqttClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public class MqttClientOptions extends NetClientOptions {
public static final boolean DEFAULT_WILL_FLAG = false;
public static final boolean DEFAULT_WILL_RETAIN = false;
public static final int DEFAULT_MAX_MESSAGE_SIZE = -1;
/** The default timeout value for an unacknowledge ping in seconds = 10 */
public static final int DEFAULT_PING_TIMEOUT_SECONDS = 10;

private String clientId;
private String username;
Expand All @@ -55,6 +57,7 @@ public class MqttClientOptions extends NetClientOptions {
private int willQoS = DEFAULT_WILL_QOS;
private boolean willRetain = DEFAULT_WILL_RETAIN;
private int keepAliveTimeSeconds = DEFAULT_KEEP_ALIVE_TIME_SECONDS;
private int keepAliveTimeout = DEFAULT_PING_TIMEOUT_SECONDS;
private boolean isAutoKeepAlive = true;
private boolean isAutoGeneratedClientId = true;
private int maxInflightQueue = DEFAULT_MAX_INFLIGHT_QUEUE;
Expand Down Expand Up @@ -93,6 +96,7 @@ public MqttClientOptions(MqttClientOptions other) {
this.willFlag = other.willFlag;
this.willQoS = other.willQoS;
this.willRetain = other.willRetain;
this.keepAliveTimeout = other.keepAliveTimeout;
this.keepAliveTimeSeconds = other.keepAliveTimeSeconds;
this.isAutoKeepAlive = other.isAutoKeepAlive;
this.isAutoGeneratedClientId = other.isAutoGeneratedClientId;
Expand Down Expand Up @@ -149,6 +153,13 @@ public int getKeepAliveTimeSeconds() {
return keepAliveTimeSeconds;
}

/**
* @return the ping timeout (in seconds)
*/
public int getKeepAliveTimeout() {
return keepAliveTimeout;
}

/**
* @return provided username
*/
Expand Down Expand Up @@ -301,6 +312,22 @@ public int getMaxInflightQueue() {
return maxInflightQueue;
}

/**
* Set the time period in seconds after which unacknowledged pings will close the connection.
* <p/>
* Setting {@code 0} will simply expire pings but won't close the connection.
*
* @param keepAliveTimeout the ping timeout
* @return current options instance
*/
public MqttClientOptions setKeepAliveTimeout(int keepAliveTimeout) {
if (keepAliveTimeout < 0) {
throw new IllegalArgumentException("The ping timeout value must be greater than 0");
}
this.keepAliveTimeout = keepAliveTimeout;
return this;
}

/**
* Set max count of unacknowledged messages
* @param maxInflightQueue max count of unacknowledged messages
Expand Down
35 changes: 30 additions & 5 deletions src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -653,21 +653,46 @@ private void initChannel(ChannelPipeline pipeline) {
if (this.options.isAutoKeepAlive() &&
this.options.getKeepAliveTimeSeconds() != 0) {

int keepAliveInterval = this.options.getKeepAliveTimeSeconds();

// handler for sending PINGREQ (keepAlive) if reader- or writer-channel become idle
pipeline.addBefore("handler", "idle",
new IdleStateHandler(0, this.options.getKeepAliveTimeSeconds(), 0));
new IdleStateHandler(keepAliveInterval, keepAliveInterval, 0));
pipeline.addBefore("handler", "keepAliveHandler", new ChannelDuplexHandler() {

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

@Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.WRITER_IDLE) {
switch (e.state()) {
case READER_IDLE:
// verify that server is still connected (e.g. when using QoS-0)
ping();
break;
case WRITER_IDLE:
// send ping or broker will close connection
ping();
break;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do "fall through" here? Both cases have the same ping() instruction

}
}
}
});

if(this.options.getKeepAliveTimeout() > 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Space after if

int keepAliveTimeout = keepAliveInterval + this.options.getKeepAliveTimeout();
// handler for ping-response timeout. connection will be closed if broker READER_IDLE extends timeout
pipeline.addBefore("handler", "idleTimeout", new IdleStateHandler(keepAliveTimeout, 0, 0));
pipeline.addBefore("handler", "idleTimeoutHandler", new ChannelDuplexHandler() {

@Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.READER_IDLE) {
ctx.close();
}
}
}
});
}
}
}

Expand Down
5 changes: 2 additions & 3 deletions src/main/java/io/vertx/mqtt/impl/MqttServerConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,10 @@ private void handleConnect(MqttConnectMessage msg) {
if (msg.variableHeader().keepAliveTimeSeconds() != 0) {

// the server waits for one and a half times the keep alive time period (MQTT spec)
int timeout = msg.variableHeader().keepAliveTimeSeconds() +
msg.variableHeader().keepAliveTimeSeconds() / 2;
int keepAliveTimeout =(int) (msg.variableHeader().keepAliveTimeSeconds() * 1.5);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space after =


// modifying the channel pipeline for adding the idle state handler with previous timeout
chctx.pipeline().addBefore("handler", "idle", new IdleStateHandler(timeout, 0, 0));
chctx.pipeline().addBefore("handler", "idle", new IdleStateHandler(keepAliveTimeout, 0, 0));
chctx.pipeline().addBefore("handler", "keepAliveHandler", new ChannelDuplexHandler() {

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2016 Red Hat Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.vertx.mqtt.test.client;

import io.vertx.core.Vertx;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.impl.MqttServerImpl;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;

import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertEquals;

/**
* MQTT client keep alive tests using a Vert.x MQTT server to accomodate testing.
*/
@RunWith(VertxUnitRunner.class)
public class MqttClientKeepAliveTest {

private Vertx vertx;
private MqttServer server;

private void startServer(TestContext ctx) {
Async async = ctx.async();
server.listen(1884, "localhost", ctx.asyncAssertSuccess(server -> async.complete()));
async.awaitSuccess(10000);
}

@Before
public void before(TestContext ctx) {
vertx = Vertx.vertx();
server = MqttServer.create(vertx);
}

@After
public void after(TestContext ctx) {
vertx.close(ctx.asyncAssertSuccess());
}

@Test
public void clientWillDisconnectOnMissingPingResponse(TestContext ctx) {
AtomicInteger pings = new AtomicInteger();
server.endpointHandler(endpoint -> {
endpoint.autoKeepAlive(false); // Tell the server not to respond to PINGREQ
endpoint.accept(false);
endpoint.pingHandler(v -> pings.incrementAndGet());
});
startServer(ctx);
Async async = ctx.async();
MqttClientOptions options = new MqttClientOptions();
options.setKeepAliveTimeSeconds(2);
options.setKeepAliveTimeout(1);
MqttClient client = MqttClient.create(vertx, options);
client.connect(1884, "localhost", ctx.asyncAssertSuccess(ack -> {
client.closeHandler(v -> {
assertEquals(2, pings.get()); // READER & WRITER idle
async.complete();
});
}));
async.await();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2016 Red Hat Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.vertx.mqtt.test.server;

import io.vertx.core.Vertx;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mqtt.MqttServer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertEquals;

/**
* MQTT client keep alive tests using a Vert.x MQTT server to accomodate testing.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this comment is wrong, the test is about MQTT server not client.

*/
@RunWith(VertxUnitRunner.class)
public class MqttServerKeepAliveTest {

private Vertx vertx;
private MqttServer server;

private void startServer(TestContext ctx) {
Async async = ctx.async();
server.listen(1884, "localhost", ctx.asyncAssertSuccess(server -> async.complete()));
async.awaitSuccess(10000);
}

@Before
public void before(TestContext ctx) {
vertx = Vertx.vertx();
server = MqttServer.create(vertx);
}

@After
public void after(TestContext ctx) {
vertx.close(ctx.asyncAssertSuccess());
}

@Test
public void serverWillDisconnectOnTimeout(TestContext ctx) {
Async async = ctx.async();
server.endpointHandler(endpoint -> {
endpoint.accept(false);
endpoint.pingHandler(v -> ctx.fail());
});
startServer(ctx);
MqttClientOptions options = new MqttClientOptions();
options.setAutoKeepAlive(false); // The client will manage pings manually
options.setKeepAliveTimeSeconds(2); // Tell the server to disconnects the client after 3 seconds of inactivity
MqttClient client = MqttClient.create(vertx, options);
client.connect(1884, "localhost", ctx.asyncAssertSuccess(ack -> {
client.closeHandler(v -> {
async.complete();
});
}));
async.await();
}
}