Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timeout issue #174

Merged
merged 5 commits into from
Oct 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/main/asciidoc/dataobjects.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ Do the same thing as link. Use it instead.
|[[keepAliveTimeSeconds]]`@keepAliveTimeSeconds`|`Number (int)`|+++
Set the keep alive timeout in seconds
+++
|[[keepAliveTimeout]]`@keepAliveTimeout`|`Number (int)`|+++
Set the time period in seconds after which unacknowledged pings will close the connection.
<p/>
Setting <code>0</code> will simply expire pings but won't close the connection.
+++
|[[keyStoreOptions]]`@keyStoreOptions`|`link:dataobjects.html#JksOptions[JksOptions]`|-
|[[localAddress]]`@localAddress`|`String`|-
|[[logActivity]]`@logActivity`|`Boolean`|-
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ public static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json,
obj.setKeepAliveTimeSeconds(((Number)member.getValue()).intValue());
}
break;
case "keepAliveTimeout":
if (member.getValue() instanceof Number) {
obj.setKeepAliveTimeout(((Number)member.getValue()).intValue());
}
break;
case "maxInflightQueue":
if (member.getValue() instanceof Number) {
obj.setMaxInflightQueue(((Number)member.getValue()).intValue());
Expand Down Expand Up @@ -106,6 +111,7 @@ public static void toJson(MqttClientOptions obj, java.util.Map<String, Object> j
json.put("clientId", obj.getClientId());
}
json.put("keepAliveTimeSeconds", obj.getKeepAliveTimeSeconds());
json.put("keepAliveTimeout", obj.getKeepAliveTimeout());
json.put("maxInflightQueue", obj.getMaxInflightQueue());
json.put("maxMessageSize", obj.getMaxMessageSize());
if (obj.getPassword() != null) {
Expand Down
27 changes: 27 additions & 0 deletions src/main/java/io/vertx/mqtt/MqttClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class MqttClientOptions extends NetClientOptions {
public static final boolean DEFAULT_WILL_RETAIN = false;
public static final int DEFAULT_MAX_MESSAGE_SIZE = -1;
public static final int DEFAULT_ACK_TIMEOUT = -1;
/** The default timeout value for an unacknowledge ping in seconds = 10 */
public static final int DEFAULT_PING_TIMEOUT_SECONDS = 10;

private String clientId;
private String username;
Expand All @@ -56,6 +58,7 @@ public class MqttClientOptions extends NetClientOptions {
private int willQoS = DEFAULT_WILL_QOS;
private boolean willRetain = DEFAULT_WILL_RETAIN;
private int keepAliveTimeSeconds = DEFAULT_KEEP_ALIVE_TIME_SECONDS;
private int keepAliveTimeout = DEFAULT_PING_TIMEOUT_SECONDS;
private boolean isAutoKeepAlive = true;
private boolean isAutoGeneratedClientId = true;
private int maxInflightQueue = DEFAULT_MAX_INFLIGHT_QUEUE;
Expand Down Expand Up @@ -95,6 +98,7 @@ public MqttClientOptions(MqttClientOptions other) {
this.willFlag = other.willFlag;
this.willQoS = other.willQoS;
this.willRetain = other.willRetain;
this.keepAliveTimeout = other.keepAliveTimeout;
this.keepAliveTimeSeconds = other.keepAliveTimeSeconds;
this.isAutoKeepAlive = other.isAutoKeepAlive;
this.isAutoGeneratedClientId = other.isAutoGeneratedClientId;
Expand Down Expand Up @@ -152,6 +156,13 @@ public int getKeepAliveTimeSeconds() {
return keepAliveTimeSeconds;
}

/**
* @return the ping timeout (in seconds)
*/
public int getKeepAliveTimeout() {
return keepAliveTimeout;
}

/**
* @return provided username
*/
Expand Down Expand Up @@ -335,6 +346,22 @@ public int getMaxInflightQueue() {
return maxInflightQueue;
}

/**
* Set the time period in seconds after which unacknowledged pings will close the connection.
* <p/>
* Setting {@code 0} will simply expire pings but won't close the connection.
*
* @param keepAliveTimeout the ping timeout
* @return current options instance
*/
public MqttClientOptions setKeepAliveTimeout(int keepAliveTimeout) {
if (keepAliveTimeout < 0) {
throw new IllegalArgumentException("The ping timeout value must be greater than 0");
}
this.keepAliveTimeout = keepAliveTimeout;
return this;
}

/**
* Set max count of unacknowledged messages
* @param maxInflightQueue max count of unacknowledged messages
Expand Down
33 changes: 22 additions & 11 deletions src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -701,21 +701,32 @@ private void initChannel(ChannelPipeline pipeline) {
if (this.options.isAutoKeepAlive() &&
this.options.getKeepAliveTimeSeconds() != 0) {

pipeline.addBefore("handler", "idle",
new IdleStateHandler(0, this.options.getKeepAliveTimeSeconds(), 0));
pipeline.addBefore("handler", "keepAliveHandler", new ChannelDuplexHandler() {

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
int keepAliveInterval = this.options.getKeepAliveTimeSeconds();

if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.WRITER_IDLE) {
// handler for sending PINGREQ (keepAlive) if reader- or writer-channel become idle
pipeline.addBefore("handler", "idle",
new IdleStateHandler(0, 0, keepAliveInterval) {
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
if (evt.state() == IdleState.ALL_IDLE) {
// verify that server is still connected (e.g. when using QoS-0)
ping();
}
}
}
});
});

if (this.options.getKeepAliveTimeout() > 0) {
int keepAliveTimeout = keepAliveInterval + this.options.getKeepAliveTimeout();
// handler for ping-response timeout. connection will be closed if broker READER_IDLE extends timeout
pipeline.addBefore("handler", "idleTimeout", new IdleStateHandler(keepAliveTimeout, 0, 0) {
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
if (evt.state() == IdleState.READER_IDLE) {
ctx.close();
}
}
});
}
}
}

Expand Down
5 changes: 2 additions & 3 deletions src/main/java/io/vertx/mqtt/impl/MqttServerConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,10 @@ private void handleConnect(MqttConnectMessage msg) {
if (msg.variableHeader().keepAliveTimeSeconds() != 0) {

// the server waits for one and a half times the keep alive time period (MQTT spec)
int timeout = msg.variableHeader().keepAliveTimeSeconds() +
msg.variableHeader().keepAliveTimeSeconds() / 2;
int keepAliveTimeout = (int)(msg.variableHeader().keepAliveTimeSeconds() * 1.5);

// modifying the channel pipeline for adding the idle state handler with previous timeout
chctx.pipeline().addBefore("handler", "idle", new IdleStateHandler(timeout, 0, 0));
chctx.pipeline().addBefore("handler", "idle", new IdleStateHandler(keepAliveTimeout, 0, 0));
chctx.pipeline().addBefore("handler", "keepAliveHandler", new ChannelDuplexHandler() {

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2016 Red Hat Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.vertx.mqtt.test.client;

import io.vertx.core.Vertx;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.impl.MqttServerImpl;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;

import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertEquals;

/**
* MQTT client keep alive tests using a Vert.x MQTT server to accommodate testing.
*/
@RunWith(VertxUnitRunner.class)
public class MqttClientKeepAliveTest {

private Vertx vertx;
private MqttServer server;

private void startServer(TestContext ctx) {
Async async = ctx.async();
server.listen(ctx.asyncAssertSuccess(server -> async.complete()));
async.awaitSuccess(10000);
}

@Before
public void before(TestContext ctx) {
vertx = Vertx.vertx();
server = MqttServer.create(vertx);
}

@After
public void after(TestContext ctx) {
server.close(ctx.asyncAssertSuccess(v -> {
vertx.close(ctx.asyncAssertSuccess());
}));
}

@Test
public void clientWillDisconnectOnMissingPingResponse(TestContext ctx) {
AtomicInteger pings = new AtomicInteger();
server.endpointHandler(endpoint -> {
endpoint.autoKeepAlive(false); // Tell the server not to respond to PINGREQ
endpoint.accept(false);
endpoint.pingHandler(v -> pings.incrementAndGet());
});
startServer(ctx);
MqttClientOptions options = new MqttClientOptions();
options.setKeepAliveTimeSeconds(2);
options.setKeepAliveTimeout(1);
MqttClient client = MqttClient.create(vertx, options);
client.connect(MqttClientOptions.DEFAULT_PORT, MqttClientOptions.DEFAULT_HOST, ctx.asyncAssertSuccess(ack -> {
Async async = ctx.async();
client.closeHandler(v -> {
assertEquals(1, pings.get());
async.complete();
});
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,10 @@ public void before(TestContext context) {
}

@After
public void after() {
this.server.close();
this.vertx.close();
public void after(TestContext ctx) {
this.server.close(ctx.asyncAssertSuccess(v -> {
this.vertx.close(ctx.asyncAssertSuccess());
}));
}

private static void serverLogic(MqttEndpoint mqttEndpoint) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2016 Red Hat Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.vertx.mqtt.test.server;

import io.vertx.core.Vertx;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mqtt.MqttServer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertEquals;

/**
* MQTT server keep alive tests using a Vert.x MQTT server to accommodate testing.
*/
@RunWith(VertxUnitRunner.class)
public class MqttServerKeepAliveTest {

private Vertx vertx;
private MqttServer server;

private void startServer(TestContext ctx) {
Async async = ctx.async();
server.listen(ctx.asyncAssertSuccess(server -> async.complete()));
async.awaitSuccess(10000);
}

@Before
public void before(TestContext ctx) {
vertx = Vertx.vertx();
server = MqttServer.create(vertx);
}

@After
public void after(TestContext ctx) {
server.close(ctx.asyncAssertSuccess(v -> {
vertx.close(ctx.asyncAssertSuccess());
}));
}

@Test
public void serverWillDisconnectOnTimeout(TestContext ctx) {
server.endpointHandler(endpoint -> {
endpoint.accept(false);
endpoint.pingHandler(v -> ctx.fail());
});
startServer(ctx);
MqttClientOptions options = new MqttClientOptions();
options.setAutoKeepAlive(false); // The client will manage pings manually
options.setKeepAliveTimeSeconds(2); // Tell the server to disconnects the client after 3 seconds of inactivity
MqttClient client = MqttClient.create(vertx, options);
client.connect(MqttClientOptions.DEFAULT_PORT, MqttClientOptions.DEFAULT_HOST, ctx.asyncAssertSuccess(ack -> {
Async async = ctx.async();
client.closeHandler(v -> {
async.complete();
});
}));
}
}