Skip to content

Commit

Permalink
Prepare for MQTT5 changes
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-lysak committed Sep 10, 2020
1 parent c21ed05 commit bd1d65a
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 5 deletions.
6 changes: 4 additions & 2 deletions src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageFactory;
import io.netty.handler.codec.mqtt.MqttMessageIdAndPropertiesVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubscribePayload;
Expand Down Expand Up @@ -434,7 +436,7 @@ public Future<Integer> subscribe(Map<String, Integer> topics) {
false,
0);

MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(nextMessageId());
MqttMessageIdVariableHeader variableHeader = new MqttMessageIdAndPropertiesVariableHeader(nextMessageId(), MqttProperties.NO_PROPERTIES);
List<MqttTopicSubscription> subscriptions = topics.entrySet()
.stream()
.map(e -> new MqttTopicSubscription(e.getKey(), valueOf(e.getValue())))
Expand Down Expand Up @@ -505,7 +507,7 @@ public Future<Integer> unsubscribe(String topic) {
false,
0);

MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(nextMessageId());
MqttMessageIdVariableHeader variableHeader = new MqttMessageIdAndPropertiesVariableHeader(nextMessageId(), MqttProperties.NO_PROPERTIES);

MqttUnsubscribePayload payload = new MqttUnsubscribePayload(Stream.of(topic).collect(Collectors.toList()));

Expand Down
4 changes: 3 additions & 1 deletion src/main/java/io/vertx/mqtt/impl/MqttEndpointImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttUnsubAckPayload;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
Expand Down Expand Up @@ -367,8 +368,9 @@ public MqttEndpointImpl unsubscribeAcknowledge(int unsubscribeMessageId) {
new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(unsubscribeMessageId);
MqttUnsubAckPayload payload = new MqttUnsubAckPayload();

io.netty.handler.codec.mqtt.MqttMessage unsuback = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
io.netty.handler.codec.mqtt.MqttMessage unsuback = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);

this.write(unsuback);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ private void subscribe(TestContext context, String topic, int expectedQos) {
}

@Test
public void subscribeUnsupportedQos(TestContext context) {
public void subscribeUnsupportedMqttVersion(TestContext context) {

Async async = context.async();

Expand All @@ -122,7 +122,7 @@ public void subscribeUnsupportedQos(TestContext context) {
0x11, // MSG LEN
0x00, 0x04, // PROTOCOL NAME LENGTH
0x4D, 0x51, 0x54, 0x54, // MQTT
0x05, // VERSION
0x06, // VERSION
0x02, // QOS
0x00, 0x3C, // KEEP ALIVE
0x00, 0x05, // CLIENT ID LENGTH
Expand Down

0 comments on commit bd1d65a

Please sign in to comment.