From b998f0786e34e42d5b639d48ae1a03cd95985b92 Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Mon, 25 Mar 2019 09:57:18 +0100 Subject: [PATCH 1/2] Client keep alive improvements - see #124 --- src/main/asciidoc/dataobjects.adoc | 5 + .../mqtt/MqttClientOptionsConverter.java | 6 + .../java/io/vertx/mqtt/MqttClientOptions.java | 30 +++++ .../io/vertx/mqtt/impl/MqttClientImpl.java | 98 +++++++++------ .../vertx/mqtt/impl/MqttServerConnection.java | 6 +- .../io/vertx/mqtt/impl/MqttServerImpl.java | 13 +- .../test/client/MqttClientKeepAliveTest.java | 115 ++++++++++++++++++ 7 files changed, 234 insertions(+), 39 deletions(-) create mode 100644 src/test/java/io/vertx/mqtt/test/client/MqttClientKeepAliveTest.java diff --git a/src/main/asciidoc/dataobjects.adoc b/src/main/asciidoc/dataobjects.adoc index 7f42ff67..ad046f49 100644 --- a/src/main/asciidoc/dataobjects.adoc +++ b/src/main/asciidoc/dataobjects.adoc @@ -60,6 +60,11 @@ Do the same thing as link. Use it instead. |[[keepAliveTimeSeconds]]`@keepAliveTimeSeconds`|`Number (int)`|+++ Set the keep alive timeout in seconds +++ +|[[keepAliveTimeout]]`@keepAliveTimeout`|`Number (int)`|+++ +Set the time period in seconds after which unacknowledged pings will close the connection. +

+ Setting 0 will simply expire pings but won't close the connection. ++++ |[[keyStoreOptions]]`@keyStoreOptions`|`link:dataobjects.html#JksOptions[JksOptions]`|- |[[localAddress]]`@localAddress`|`String`|- |[[logActivity]]`@logActivity`|`Boolean`|- diff --git a/src/main/generated/io/vertx/mqtt/MqttClientOptionsConverter.java b/src/main/generated/io/vertx/mqtt/MqttClientOptionsConverter.java index 97f28037..f62513e5 100644 --- a/src/main/generated/io/vertx/mqtt/MqttClientOptionsConverter.java +++ b/src/main/generated/io/vertx/mqtt/MqttClientOptionsConverter.java @@ -39,6 +39,11 @@ public static void fromJson(Iterable> json, obj.setKeepAliveTimeSeconds(((Number)member.getValue()).intValue()); } break; + case "keepAliveTimeout": + if (member.getValue() instanceof Number) { + obj.setKeepAliveTimeout(((Number)member.getValue()).intValue()); + } + break; case "maxInflightQueue": if (member.getValue() instanceof Number) { obj.setMaxInflightQueue(((Number)member.getValue()).intValue()); @@ -100,6 +105,7 @@ public static void toJson(MqttClientOptions obj, java.util.Map j json.put("clientId", obj.getClientId()); } json.put("keepAliveTimeSeconds", obj.getKeepAliveTimeSeconds()); + json.put("keepAliveTimeout", obj.getKeepAliveTimeout()); json.put("maxInflightQueue", obj.getMaxInflightQueue()); json.put("maxMessageSize", obj.getMaxMessageSize()); if (obj.getPassword() != null) { diff --git a/src/main/java/io/vertx/mqtt/MqttClientOptions.java b/src/main/java/io/vertx/mqtt/MqttClientOptions.java index 12e761a2..fb4e8707 100644 --- a/src/main/java/io/vertx/mqtt/MqttClientOptions.java +++ b/src/main/java/io/vertx/mqtt/MqttClientOptions.java @@ -45,6 +45,11 @@ public class MqttClientOptions extends NetClientOptions { public static final boolean DEFAULT_WILL_RETAIN = false; public static final int DEFAULT_MAX_MESSAGE_SIZE = -1; + /** + * The default timeout value for an unacknowledge ping in seconds = 10 + */ + public static final int PING_TIMEOUT_SECONDS = 10; + private String clientId; private String username; private String password; @@ -55,6 +60,7 @@ public class MqttClientOptions extends NetClientOptions { private int willQoS = DEFAULT_WILL_QOS; private boolean willRetain = DEFAULT_WILL_RETAIN; private int keepAliveTimeSeconds = DEFAULT_KEEP_ALIVE_TIME_SECONDS; + private int keepAliveTimeout = PING_TIMEOUT_SECONDS; private boolean isAutoKeepAlive = true; private boolean isAutoGeneratedClientId = true; private int maxInflightQueue = DEFAULT_MAX_INFLIGHT_QUEUE; @@ -93,6 +99,7 @@ public MqttClientOptions(MqttClientOptions other) { this.willFlag = other.willFlag; this.willQoS = other.willQoS; this.willRetain = other.willRetain; + this.keepAliveTimeout = other.keepAliveTimeout; this.keepAliveTimeSeconds = other.keepAliveTimeSeconds; this.isAutoKeepAlive = other.isAutoKeepAlive; this.isAutoGeneratedClientId = other.isAutoGeneratedClientId; @@ -149,6 +156,13 @@ public int getKeepAliveTimeSeconds() { return keepAliveTimeSeconds; } + /** + * @return the ping timeout (in seconds) + */ + public int getKeepAliveTimeout() { + return keepAliveTimeout; + } + /** * @return provided username */ @@ -294,6 +308,22 @@ public MqttClientOptions setKeepAliveTimeSeconds(int keepAliveTimeSeconds) { return this; } + /** + * Set the time period in seconds after which unacknowledged pings will close the connection. + *

+ * Setting {@code 0} will simply expire pings but won't close the connection. + * + * @param keepAliveTimeout the ping timeout + * @return current options instance + */ + public MqttClientOptions setKeepAliveTimeout(int keepAliveTimeout) { + if (keepAliveTimeout <= 0) { + throw new IllegalArgumentException("The ping timeout value must be greater than 0"); + } + this.keepAliveTimeout = keepAliveTimeout; + return this; + } + /** * @return max count of unacknowledged messages */ diff --git a/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java b/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java index 8938ead8..c6dcad58 100644 --- a/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java +++ b/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java @@ -18,7 +18,6 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.DecoderResult; @@ -36,12 +35,11 @@ import io.netty.handler.codec.mqtt.MqttSubscribePayload; import io.netty.handler.codec.mqtt.MqttTopicSubscription; import io.netty.handler.codec.mqtt.MqttUnsubscribePayload; -import io.netty.handler.timeout.IdleState; -import io.netty.handler.timeout.IdleStateEvent; -import io.netty.handler.timeout.IdleStateHandler; import io.vertx.core.*; import io.vertx.core.buffer.Buffer; +import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.NetSocketInternal; +import io.vertx.core.impl.NoStackTraceThrowable; import io.vertx.core.impl.logging.Logger; import io.vertx.core.impl.logging.LoggerFactory; import io.vertx.core.net.NetClient; @@ -75,6 +73,9 @@ */ public class MqttClientImpl implements MqttClient { + public static final Future UNACKNOWLEDGE_PING = Future.failedFuture("Expired ping"); + public static final Future CONNECTION_CLOSED = Future.failedFuture("Connection closed before receiving ping response"); + // patterns for topics validation private static final Pattern validTopicNamePattern = Pattern.compile("^[^#+\\u0000]+$"); private static final Pattern validTopicFilterPattern = Pattern.compile("^(#|((\\+(?![^/]))?([^#+]*(/\\+(?![^/]))?)*(/#)?))$"); @@ -89,6 +90,7 @@ public class MqttClientImpl implements MqttClient { private final MqttClientOptions options; private final NetClient client; + private final long pingTimeout; private NetSocketInternal connection; private Context ctx; @@ -127,6 +129,9 @@ public class MqttClientImpl implements MqttClient { private boolean isConnected; + private long timerID = -1; + private boolean pingSent; + /** * Constructor * @@ -141,6 +146,7 @@ public MqttClientImpl(Vertx vertx, MqttClientOptions options) { this.client = vertx.createNetClient(netClientOptions); this.options = options; + this.pingTimeout = options.getKeepAliveTimeout() * 1000; } /** @@ -185,7 +191,7 @@ private void doConnect(int port, String host, String serverName, Handler 0) { + pingCheck(); + } + } }); } + private void pingCheck() { + if (this.timerID == -1) { + this.pingSent = false; + this.timerID = this.ctx.owner().setTimer(options.getKeepAliveTimeSeconds() * 1000, id -> pingCheck()); + } else if (pingSent) { + this.pingSent = false; + this.timerID = -1; + this.connection.close(); + } else { + this.pingSent = true; + this.timerID = this.ctx.owner().setTimer(pingTimeout, id -> pingCheck()); + this.ping(); + } + } + /** * See {@link MqttClient#disconnect()} for more details */ @@ -539,14 +563,19 @@ private synchronized Handler closeHandler() { */ @Override public MqttClient ping() { - - MqttFixedHeader fixedHeader = - new MqttFixedHeader(MqttMessageType.PINGREQ, false, MqttQoS.AT_MOST_ONCE, false, 0); - - io.netty.handler.codec.mqtt.MqttMessage pingreq = MqttMessageFactory.newMessage(fixedHeader, null, null); - - this.write(pingreq); - + ctx.runOnContext(v -> { + MqttFixedHeader fixedHeader = new MqttFixedHeader( + MqttMessageType.PINGREQ, + false, + MqttQoS.AT_MOST_ONCE, + false, + 0); + io.netty.handler.codec.mqtt.MqttMessage pingreq = MqttMessageFactory.newMessage( + fixedHeader, + null, + null); + this.write(pingreq); + }); return this; } @@ -638,7 +667,7 @@ private void publishRelease(int publishMessageId) { this.write(pubrel); } - private void initChannel(ChannelPipeline pipeline) { + private void initChannel(NetSocketInternal soi, ChannelPipeline pipeline) { // add into pipeline netty's (en/de)coder pipeline.addBefore("handler", "mqttEncoder", MqttEncoder.INSTANCE); @@ -649,26 +678,6 @@ private void initChannel(ChannelPipeline pipeline) { // max message size not set, so the default from Netty MQTT codec is used pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder()); } - - if (this.options.isAutoKeepAlive() && - this.options.getKeepAliveTimeSeconds() != 0) { - - pipeline.addBefore("handler", "idle", - new IdleStateHandler(0, this.options.getKeepAliveTimeSeconds(), 0)); - pipeline.addBefore("handler", "keepAliveHandler", new ChannelDuplexHandler() { - - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - - if (evt instanceof IdleStateEvent) { - IdleStateEvent e = (IdleStateEvent) evt; - if (e.state() == IdleState.WRITER_IDLE) { - ping(); - } - } - } - }); - } } /** @@ -697,6 +706,11 @@ void write(io.netty.handler.codec.mqtt.MqttMessage mqttMessage) { */ private void handleClosed() { synchronized (this) { + // Cancel auto keep alive timer + if (this.timerID != -1L) { + ctx.owner().cancelTimer(this.timerID); + this.timerID = -1L; + } boolean isConnected = this.isConnected; this.isConnected = false; if (!isConnected) { @@ -705,7 +719,11 @@ private void handleClosed() { } Handler handler = closeHandler(); if (handler != null) { - handler.handle(null); + try { + handler.handle(null); + } catch (Exception e) { + ((ContextInternal)ctx).reportException(e); + } } } @@ -812,6 +830,14 @@ private void handleMessage(ChannelHandlerContext chctx, Object msg) { private void handlePingresp() { Handler handler = pingResponseHandler(); + + if (this.timerID != -1) { + ctx.owner().cancelTimer(this.timerID); + this.timerID = -1; + this.pingSent = false; + } + pingCheck(); + if (handler != null) { handler.handle(null); } diff --git a/src/main/java/io/vertx/mqtt/impl/MqttServerConnection.java b/src/main/java/io/vertx/mqtt/impl/MqttServerConnection.java index 38837377..afb9df49 100644 --- a/src/main/java/io/vertx/mqtt/impl/MqttServerConnection.java +++ b/src/main/java/io/vertx/mqtt/impl/MqttServerConnection.java @@ -60,16 +60,18 @@ public class MqttServerConnection { private MqttEndpointImpl endpoint; private final ChannelHandlerContext chctx; private final MqttServerOptions options; + private final boolean keepAliveCheck; void init(Handler endpointHandler, Handler rejectHandler) { this.endpointHandler = endpointHandler; this.exceptionHandler = rejectHandler; } - public MqttServerConnection(NetSocketInternal so, MqttServerOptions options) { + public MqttServerConnection(NetSocketInternal so, MqttServerOptions options, boolean keepAliveCheck) { this.so = so; this.chctx = so.channelHandlerContext(); this.options = options; + this.keepAliveCheck = keepAliveCheck; } /** @@ -247,7 +249,7 @@ private void handleConnect(MqttConnectMessage msg) { msg.variableHeader().keepAliveTimeSeconds() / 2; // modifying the channel pipeline for adding the idle state handler with previous timeout - chctx.pipeline().addBefore("handler", "idle", new IdleStateHandler(timeout, 0, 0)); + chctx.pipeline().addBefore("handler", "idle", new IdleStateHandler(keepAliveCheck ? timeout : 0, 0, 0)); chctx.pipeline().addBefore("handler", "keepAliveHandler", new ChannelDuplexHandler() { @Override diff --git a/src/main/java/io/vertx/mqtt/impl/MqttServerImpl.java b/src/main/java/io/vertx/mqtt/impl/MqttServerImpl.java index a6207b54..e42282c5 100644 --- a/src/main/java/io/vertx/mqtt/impl/MqttServerImpl.java +++ b/src/main/java/io/vertx/mqtt/impl/MqttServerImpl.java @@ -45,12 +45,23 @@ public class MqttServerImpl implements MqttServer { private final NetServer server; private Handler endpointHandler; private Handler exceptionHandler; + private volatile boolean keepAliveCheck; private MqttServerOptions options; public MqttServerImpl(Vertx vertx, MqttServerOptions options) { this.server = vertx.createNetServer(options); this.options = options; + this.keepAliveCheck = true; + } + + public MqttServerImpl keepAliveCheck(boolean val) { + this.keepAliveCheck = val; + return this; + } + + public boolean keepAliveCheck() { + return keepAliveCheck; } @Override @@ -87,7 +98,7 @@ public MqttServer listen(int port, String host, Handler> ChannelPipeline pipeline = soi.channelHandlerContext().pipeline(); initChannel(pipeline); - MqttServerConnection conn = new MqttServerConnection(soi, options); + MqttServerConnection conn = new MqttServerConnection(soi, options, keepAliveCheck); soi.messageHandler(msg -> { synchronized (conn) { diff --git a/src/test/java/io/vertx/mqtt/test/client/MqttClientKeepAliveTest.java b/src/test/java/io/vertx/mqtt/test/client/MqttClientKeepAliveTest.java new file mode 100644 index 00000000..c5c4ef8a --- /dev/null +++ b/src/test/java/io/vertx/mqtt/test/client/MqttClientKeepAliveTest.java @@ -0,0 +1,115 @@ +/* + * Copyright 2016 Red Hat Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.vertx.mqtt.test.client; + +import io.vertx.core.Vertx; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import io.vertx.mqtt.MqttClient; +import io.vertx.mqtt.MqttClientOptions; +import io.vertx.mqtt.MqttServer; +import io.vertx.mqtt.impl.MqttServerImpl; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; + +/** + * MQTT client keep alive tests using a Vert.x MQTT server to accomodate testing. + */ +@RunWith(VertxUnitRunner.class) +public class MqttClientKeepAliveTest { + + private Vertx vertx; + private MqttServer server; + + private void startServer(TestContext ctx) { + Async async = ctx.async(); + server.listen(1884, "localhost", ctx.asyncAssertSuccess(server -> async.complete())); + async.awaitSuccess(10000); + } + + @Before + public void before(TestContext ctx) { + vertx = Vertx.vertx(); + server = MqttServer.create(vertx); + } + + @After + public void after(TestContext ctx) { + vertx.close(ctx.asyncAssertSuccess()); + } + + @Test + public void expireClientPingOnMissingPingResponse(TestContext ctx) { + AtomicInteger pings = new AtomicInteger(); + ((MqttServerImpl)server).keepAliveCheck(false); // Tell the server to not evict us + server.endpointHandler(endpoint -> { + endpoint.autoKeepAlive(false); + endpoint.accept(false); + endpoint.pingHandler(v -> pings.incrementAndGet()); + }); + startServer(ctx); + Async async = ctx.async(); + MqttClientOptions options = new MqttClientOptions(); + options.setAutoKeepAlive(true); + options.setKeepAliveTimeSeconds(1); + options.setKeepAliveTimeout(4); + MqttClient client = MqttClient.create(vertx, options); + client.connect(1884, "localhost", ctx.asyncAssertSuccess(ack -> { + long now = System.currentTimeMillis(); + client.closeHandler(v -> { + long delta = System.currentTimeMillis() - now; + long val = Math.round(delta / 1000D); + assertEquals(5, val); + assertEquals(1, pings.get()); + async.complete(); + }); + })); + async.await(); + } + + @Test + public void serverWillDisconnectWhenKeepAliveSet(TestContext ctx) { + Async async = ctx.async(); + server.endpointHandler(endpoint -> { + endpoint.accept(false); + endpoint.pingHandler(v -> ctx.fail()); + }); + startServer(ctx); + MqttClientOptions options = new MqttClientOptions(); + options.setAutoKeepAlive(false); // The client will manage pings + options.setKeepAliveTimeSeconds(2); // Tell the server to disconnects the client after 3 seconds of inactivity + MqttClient client = MqttClient.create(vertx, options); + client.connect(1884, "localhost", ctx.asyncAssertSuccess(ack -> { + long now = System.currentTimeMillis(); + client.closeHandler(v -> { + long delta = System.currentTimeMillis() - now; + long val = Math.round(delta / 1000D); + // MQTT-3.1.2-24 - 3 = 2 + 1 + ctx.assertEquals(3L, val); + async.complete(); + }); + })); + async.await(); + } +} From e711a4e7280f8e6734776cc264d6d13c89bfa26c Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Tue, 26 Mar 2019 10:12:44 +0100 Subject: [PATCH 2/2] Introduce a few useful constant for message headers --- .../io/vertx/mqtt/impl/MqttClientImpl.java | 86 +++++++------------ 1 file changed, 30 insertions(+), 56 deletions(-) diff --git a/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java b/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java index c6dcad58..f0ef227b 100644 --- a/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java +++ b/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java @@ -39,7 +39,6 @@ import io.vertx.core.buffer.Buffer; import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.NetSocketInternal; -import io.vertx.core.impl.NoStackTraceThrowable; import io.vertx.core.impl.logging.Logger; import io.vertx.core.impl.logging.LoggerFactory; import io.vertx.core.net.NetClient; @@ -73,9 +72,30 @@ */ public class MqttClientImpl implements MqttClient { + private static MqttFixedHeader createHeaders(MqttMessageType msgType, MqttQoS qos) { + return new MqttFixedHeader( + msgType, + false, + qos, + false, + 0 + ); + } + public static final Future UNACKNOWLEDGE_PING = Future.failedFuture("Expired ping"); public static final Future CONNECTION_CLOSED = Future.failedFuture("Connection closed before receiving ping response"); + // Bunch of constant MQTT message headers + private static final MqttFixedHeader DISCONNECT_MSG = createHeaders(MqttMessageType.DISCONNECT, AT_MOST_ONCE); + private static final MqttFixedHeader SUBSCRIBE_MSG = createHeaders(MqttMessageType.SUBSCRIBE, AT_LEAST_ONCE); + private static final MqttFixedHeader UNSUBSCRIBE_MSG = createHeaders(MqttMessageType.UNSUBSCRIBE, AT_LEAST_ONCE); + private static final MqttFixedHeader CONNECT_MSG = createHeaders(MqttMessageType.CONNECT, AT_MOST_ONCE); + private static final MqttFixedHeader PUBACK_MSG = createHeaders(MqttMessageType.PUBACK, AT_MOST_ONCE); + private static final MqttFixedHeader PUCREC_MSG = createHeaders(MqttMessageType.PUBREC, AT_MOST_ONCE); + private static final MqttFixedHeader PUBCOMP_MSG = createHeaders(MqttMessageType.PUBCOMP, AT_MOST_ONCE); + private static final MqttFixedHeader PUBREL_MSG = createHeaders(MqttMessageType.PUBREL, MqttQoS.AT_LEAST_ONCE); + private static final MqttFixedHeader PINGREQ_MSG = createHeaders(MqttMessageType.PINGREQ, MqttQoS.AT_MOST_ONCE); + // patterns for topics validation private static final Pattern validTopicNamePattern = Pattern.compile("^[^#+\\u0000]+$"); private static final Pattern validTopicFilterPattern = Pattern.compile("^(#|((\\+(?![^/]))?([^#+]*(/\\+(?![^/]))?)*(/#)?))$"); @@ -201,12 +221,6 @@ private void doConnect(int port, String host, String serverName, Handler> disconnectHandler) { - MqttFixedHeader fixedHeader = new MqttFixedHeader( - MqttMessageType.DISCONNECT, - false, - AT_MOST_ONCE, - false, - 0 - ); - - io.netty.handler.codec.mqtt.MqttMessage disconnect = MqttMessageFactory.newMessage(fixedHeader, null, null); + io.netty.handler.codec.mqtt.MqttMessage disconnect = MqttMessageFactory.newMessage(DISCONNECT_MSG, null, null); this.write(disconnect); @@ -440,13 +446,6 @@ public MqttClient subscribe(Map topics, Handler subscriptions = topics.entrySet() .stream() @@ -455,7 +454,7 @@ public MqttClient subscribe(Map topics, Handler unsubscribeCompletionHandler() { @Override public MqttClient unsubscribe(String topic, Handler> unsubscribeSentHandler) { - MqttFixedHeader fixedHeader = new MqttFixedHeader( - MqttMessageType.UNSUBSCRIBE, - false, - AT_LEAST_ONCE, - false, - 0); - MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(nextMessageId()); MqttUnsubscribePayload payload = new MqttUnsubscribePayload(Stream.of(topic).collect(Collectors.toList())); - io.netty.handler.codec.mqtt.MqttMessage unsubscribe = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload); + io.netty.handler.codec.mqtt.MqttMessage unsubscribe = MqttMessageFactory.newMessage(UNSUBSCRIBE_MSG, variableHeader, payload); this.write(unsubscribe); @@ -564,14 +556,8 @@ private synchronized Handler closeHandler() { @Override public MqttClient ping() { ctx.runOnContext(v -> { - MqttFixedHeader fixedHeader = new MqttFixedHeader( - MqttMessageType.PINGREQ, - false, - MqttQoS.AT_MOST_ONCE, - false, - 0); io.netty.handler.codec.mqtt.MqttMessage pingreq = MqttMessageFactory.newMessage( - fixedHeader, + PINGREQ_MSG, null, null); this.write(pingreq); @@ -596,13 +582,10 @@ public synchronized boolean isConnected() { */ private void publishAcknowledge(int publishMessageId) { - MqttFixedHeader fixedHeader = - new MqttFixedHeader(MqttMessageType.PUBACK, false, AT_MOST_ONCE, false, 0); - MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(publishMessageId); - io.netty.handler.codec.mqtt.MqttMessage puback = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null); + io.netty.handler.codec.mqtt.MqttMessage puback = MqttMessageFactory.newMessage(PUBACK_MSG, variableHeader, null); this.write(puback); } @@ -614,13 +597,10 @@ private void publishAcknowledge(int publishMessageId) { */ private void publishReceived(MqttPublishMessage publishMessage) { - MqttFixedHeader fixedHeader = - new MqttFixedHeader(MqttMessageType.PUBREC, false, AT_MOST_ONCE, false, 0); - MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(publishMessage.messageId()); - io.netty.handler.codec.mqtt.MqttMessage pubrec = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null); + io.netty.handler.codec.mqtt.MqttMessage pubrec = MqttMessageFactory.newMessage(PUCREC_MSG, variableHeader, null); synchronized (this) { qos2inbound.put(publishMessage.messageId(), publishMessage); @@ -635,13 +615,10 @@ private void publishReceived(MqttPublishMessage publishMessage) { */ private void publishComplete(int publishMessageId) { - MqttFixedHeader fixedHeader = - new MqttFixedHeader(MqttMessageType.PUBCOMP, false, AT_MOST_ONCE, false, 0); - MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(publishMessageId); - io.netty.handler.codec.mqtt.MqttMessage pubcomp = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null); + io.netty.handler.codec.mqtt.MqttMessage pubcomp = MqttMessageFactory.newMessage(PUBCOMP_MSG, variableHeader, null); this.write(pubcomp); } @@ -653,13 +630,10 @@ private void publishComplete(int publishMessageId) { */ private void publishRelease(int publishMessageId) { - MqttFixedHeader fixedHeader = - new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0); - MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(publishMessageId); - io.netty.handler.codec.mqtt.MqttMessage pubrel = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null); + io.netty.handler.codec.mqtt.MqttMessage pubrel = MqttMessageFactory.newMessage(PUBREL_MSG, variableHeader, null); synchronized (this) { qos2outbound.put(publishMessageId, pubrel);