From 809ee832a16b1e3fc27faff0e0bd8b6b017454b8 Mon Sep 17 00:00:00 2001 From: Michael Konrad Date: Wed, 16 Jan 2019 09:07:54 +0100 Subject: [PATCH] close connection if keep-alive is exceeded by factor 1.5 --- .../io/vertx/mqtt/impl/MqttClientImpl.java | 28 +++++++++++++++---- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java b/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java index 627ea177..bb460870 100644 --- a/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java +++ b/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java @@ -21,6 +21,7 @@ import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; import io.netty.handler.codec.DecoderResult; import io.netty.handler.codec.mqtt.MqttConnectPayload; import io.netty.handler.codec.mqtt.MqttConnectReturnCode; @@ -36,6 +37,8 @@ import io.netty.handler.codec.mqtt.MqttSubscribePayload; import io.netty.handler.codec.mqtt.MqttTopicSubscription; import io.netty.handler.codec.mqtt.MqttUnsubscribePayload; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; @@ -58,11 +61,8 @@ import java.io.UnsupportedEncodingException; import java.nio.charset.StandardCharsets; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.util.*; +import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -653,6 +653,7 @@ private void initChannel(ChannelPipeline pipeline) { if (this.options.isAutoKeepAlive() && this.options.getKeepAliveTimeSeconds() != 0) { + // Idle state handler for keep alive -> will result in pinging broker pipeline.addBefore("handler", "idle", new IdleStateHandler(0, this.options.getKeepAliveTimeSeconds(), 0)); pipeline.addBefore("handler", "keepAliveHandler", new ChannelDuplexHandler() { @@ -668,6 +669,23 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc } } }); + + // Idle state handler for timeout -> will result in connection close if no traffic comes in (including ping request) + int timeout = (int) (this.options.getKeepAliveTimeSeconds() * 1.5); + pipeline.addBefore("handler","idleTimeout", + new IdleStateHandler(timeout, 0, 0)); + pipeline.addBefore("handler","idleTimeoutHandler", new ChannelDuplexHandler() { + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + IdleStateEvent e = (IdleStateEvent) evt; + if (e.state() == IdleState.READER_IDLE) { + ctx.close(); + } + } + } + }); } }