Skip to content

Commit

Permalink
Introduce a few useful constant for message headers
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Mar 26, 2019
1 parent b998f07 commit e711a4e
Showing 1 changed file with 30 additions and 56 deletions.
86 changes: 30 additions & 56 deletions src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.NetSocketInternal;
import io.vertx.core.impl.NoStackTraceThrowable;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.net.NetClient;
Expand Down Expand Up @@ -73,9 +72,30 @@
*/
public class MqttClientImpl implements MqttClient {

private static MqttFixedHeader createHeaders(MqttMessageType msgType, MqttQoS qos) {
return new MqttFixedHeader(
msgType,
false,
qos,
false,
0
);
}

public static final Future<Void> UNACKNOWLEDGE_PING = Future.failedFuture("Expired ping");
public static final Future<Void> CONNECTION_CLOSED = Future.failedFuture("Connection closed before receiving ping response");

// Bunch of constant MQTT message headers
private static final MqttFixedHeader DISCONNECT_MSG = createHeaders(MqttMessageType.DISCONNECT, AT_MOST_ONCE);
private static final MqttFixedHeader SUBSCRIBE_MSG = createHeaders(MqttMessageType.SUBSCRIBE, AT_LEAST_ONCE);
private static final MqttFixedHeader UNSUBSCRIBE_MSG = createHeaders(MqttMessageType.UNSUBSCRIBE, AT_LEAST_ONCE);
private static final MqttFixedHeader CONNECT_MSG = createHeaders(MqttMessageType.CONNECT, AT_MOST_ONCE);
private static final MqttFixedHeader PUBACK_MSG = createHeaders(MqttMessageType.PUBACK, AT_MOST_ONCE);
private static final MqttFixedHeader PUCREC_MSG = createHeaders(MqttMessageType.PUBREC, AT_MOST_ONCE);
private static final MqttFixedHeader PUBCOMP_MSG = createHeaders(MqttMessageType.PUBCOMP, AT_MOST_ONCE);
private static final MqttFixedHeader PUBREL_MSG = createHeaders(MqttMessageType.PUBREL, MqttQoS.AT_LEAST_ONCE);
private static final MqttFixedHeader PINGREQ_MSG = createHeaders(MqttMessageType.PINGREQ, MqttQoS.AT_MOST_ONCE);

// patterns for topics validation
private static final Pattern validTopicNamePattern = Pattern.compile("^[^#+\\u0000]+$");
private static final Pattern validTopicFilterPattern = Pattern.compile("^(#|((\\+(?![^/]))?([^#+]*(/\\+(?![^/]))?)*(/#)?))$");
Expand Down Expand Up @@ -201,12 +221,6 @@ private void doConnect(int port, String host, String serverName, Handler<AsyncRe
// an exception at connection level
soi.exceptionHandler(this::handleException);

MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT,
false,
AT_MOST_ONCE,
false,
0);

MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(
PROTOCOL_NAME,
PROTOCOL_VERSION,
Expand All @@ -227,7 +241,7 @@ private void doConnect(int port, String host, String serverName, Handler<AsyncRe
options.hasPassword() ? options.getPassword().getBytes() : null
);

io.netty.handler.codec.mqtt.MqttMessage connect = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
io.netty.handler.codec.mqtt.MqttMessage connect = MqttMessageFactory.newMessage(CONNECT_MSG, variableHeader, payload);

this.write(connect);

Expand Down Expand Up @@ -267,15 +281,7 @@ public MqttClient disconnect() {
@Override
public MqttClient disconnect(Handler<AsyncResult<Void>> disconnectHandler) {

MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.DISCONNECT,
false,
AT_MOST_ONCE,
false,
0
);

io.netty.handler.codec.mqtt.MqttMessage disconnect = MqttMessageFactory.newMessage(fixedHeader, null, null);
io.netty.handler.codec.mqtt.MqttMessage disconnect = MqttMessageFactory.newMessage(DISCONNECT_MSG, null, null);

this.write(disconnect);

Expand Down Expand Up @@ -440,13 +446,6 @@ public MqttClient subscribe(Map<String, Integer> topics, Handler<AsyncResult<Int
return this;
}

MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.SUBSCRIBE,
false,
AT_LEAST_ONCE,
false,
0);

MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(nextMessageId());
List<MqttTopicSubscription> subscriptions = topics.entrySet()
.stream()
Expand All @@ -455,7 +454,7 @@ public MqttClient subscribe(Map<String, Integer> topics, Handler<AsyncResult<Int

MqttSubscribePayload payload = new MqttSubscribePayload(subscriptions);

io.netty.handler.codec.mqtt.MqttMessage subscribe = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
io.netty.handler.codec.mqtt.MqttMessage subscribe = MqttMessageFactory.newMessage(SUBSCRIBE_MSG, variableHeader, payload);

this.write(subscribe);

Expand Down Expand Up @@ -486,18 +485,11 @@ private synchronized Handler<Integer> unsubscribeCompletionHandler() {
@Override
public MqttClient unsubscribe(String topic, Handler<AsyncResult<Integer>> unsubscribeSentHandler) {

MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.UNSUBSCRIBE,
false,
AT_LEAST_ONCE,
false,
0);

MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(nextMessageId());

MqttUnsubscribePayload payload = new MqttUnsubscribePayload(Stream.of(topic).collect(Collectors.toList()));

io.netty.handler.codec.mqtt.MqttMessage unsubscribe = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
io.netty.handler.codec.mqtt.MqttMessage unsubscribe = MqttMessageFactory.newMessage(UNSUBSCRIBE_MSG, variableHeader, payload);

this.write(unsubscribe);

Expand Down Expand Up @@ -564,14 +556,8 @@ private synchronized Handler<Void> closeHandler() {
@Override
public MqttClient ping() {
ctx.runOnContext(v -> {
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.PINGREQ,
false,
MqttQoS.AT_MOST_ONCE,
false,
0);
io.netty.handler.codec.mqtt.MqttMessage pingreq = MqttMessageFactory.newMessage(
fixedHeader,
PINGREQ_MSG,
null,
null);
this.write(pingreq);
Expand All @@ -596,13 +582,10 @@ public synchronized boolean isConnected() {
*/
private void publishAcknowledge(int publishMessageId) {

MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PUBACK, false, AT_MOST_ONCE, false, 0);

MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(publishMessageId);

io.netty.handler.codec.mqtt.MqttMessage puback = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
io.netty.handler.codec.mqtt.MqttMessage puback = MqttMessageFactory.newMessage(PUBACK_MSG, variableHeader, null);

this.write(puback);
}
Expand All @@ -614,13 +597,10 @@ private void publishAcknowledge(int publishMessageId) {
*/
private void publishReceived(MqttPublishMessage publishMessage) {

MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PUBREC, false, AT_MOST_ONCE, false, 0);

MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(publishMessage.messageId());

io.netty.handler.codec.mqtt.MqttMessage pubrec = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
io.netty.handler.codec.mqtt.MqttMessage pubrec = MqttMessageFactory.newMessage(PUCREC_MSG, variableHeader, null);

synchronized (this) {
qos2inbound.put(publishMessage.messageId(), publishMessage);
Expand All @@ -635,13 +615,10 @@ private void publishReceived(MqttPublishMessage publishMessage) {
*/
private void publishComplete(int publishMessageId) {

MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PUBCOMP, false, AT_MOST_ONCE, false, 0);

MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(publishMessageId);

io.netty.handler.codec.mqtt.MqttMessage pubcomp = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
io.netty.handler.codec.mqtt.MqttMessage pubcomp = MqttMessageFactory.newMessage(PUBCOMP_MSG, variableHeader, null);

this.write(pubcomp);
}
Expand All @@ -653,13 +630,10 @@ private void publishComplete(int publishMessageId) {
*/
private void publishRelease(int publishMessageId) {

MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0);

MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(publishMessageId);

io.netty.handler.codec.mqtt.MqttMessage pubrel = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
io.netty.handler.codec.mqtt.MqttMessage pubrel = MqttMessageFactory.newMessage(PUBREL_MSG, variableHeader, null);

synchronized (this) {
qos2outbound.put(publishMessageId, pubrel);
Expand Down

0 comments on commit e711a4e

Please sign in to comment.