Skip to content

Commit

Permalink
Add an MQTT client session, which allows to automatically re-connect.
Browse files Browse the repository at this point in the history
Signed-off-by: Jens Reimann <[email protected]>
  • Loading branch information
ctron committed May 17, 2021
1 parent 1d12436 commit 683b982
Show file tree
Hide file tree
Showing 8 changed files with 899 additions and 2 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
<organization>Red Hat</organization>
<organizationUrl>http://www.redhat.com</organizationUrl>
</developer>
<developer>
<name>Jens Reimann</name>
<email>[email protected]</email>
<organization>Red Hat</organization>
<organizationUrl>https://www.redhat.com</organizationUrl>
</developer>
</developers>

<properties>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.vertx.mqtt;

import io.vertx.core.json.JsonObject;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.impl.JsonUtil;
import java.time.Instant;
import java.time.format.DateTimeFormatter;

/**
* Converter and mapper for {@link io.vertx.mqtt.MqttClientSessionOptions}.
* NOTE: This class has been automatically generated from the {@link io.vertx.mqtt.MqttClientSessionOptions} original class using Vert.x codegen.
*/
public class MqttClientSessionOptionsConverter {


public static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, MqttClientSessionOptions obj) {
for (java.util.Map.Entry<String, Object> member : json) {
switch (member.getKey()) {
case "hostname":
if (member.getValue() instanceof String) {
obj.setHostname((String)member.getValue());
}
break;
case "port":
if (member.getValue() instanceof Number) {
obj.setPort(((Number)member.getValue()).intValue());
}
break;
}
}
}

public static void toJson(MqttClientSessionOptions obj, JsonObject json) {
toJson(obj, json.getMap());
}

public static void toJson(MqttClientSessionOptions obj, java.util.Map<String, Object> json) {
if (obj.getHostname() != null) {
json.put("hostname", obj.getHostname());
}
json.put("port", obj.getPort());
}
}
19 changes: 19 additions & 0 deletions src/main/java/io/vertx/mqtt/MqttClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.vertx.mqtt.messages.MqttPublishMessage;
import io.vertx.mqtt.messages.MqttSubAckMessage;

import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -253,6 +254,14 @@ static MqttClient create(Vertx vertx) {
*/
Future<Integer> unsubscribe(String topic);

/**
* Unsubscribe from receiving messages on given topics
*
* @param topics Topics you want to unsubscribe from
* @return a {@code Future} completed after UNSUBSCRIBE packet sent with packetid
*/
Future<Integer> unsubscribe(List<String> topics);

/**
* Unsubscribe from receiving messages on given topic
*
Expand All @@ -263,6 +272,16 @@ static MqttClient create(Vertx vertx) {
@Fluent
MqttClient unsubscribe(String topic, Handler<AsyncResult<Integer>> unsubscribeSentHandler);

/**
* Unsubscribe from receiving messages on given topics
*
* @param topics Topics you want to unsubscribe from
* @param unsubscribeSentHandler handler called after UNSUBSCRIBE packet sent
* @return current MQTT client instance
*/
@Fluent
MqttClient unsubscribe(List<String> topics, Handler<AsyncResult<Integer>> unsubscribeSentHandler);

/**
* Sets handler which will be called after PINGRESP packet receiving
*
Expand Down
176 changes: 176 additions & 0 deletions src/main/java/io/vertx/mqtt/MqttClientSession.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Copyright 2021 Red Hat Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.vertx.mqtt;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;

import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.codegen.annotations.Fluent;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.mqtt.impl.MqttClientSessionImpl;
import io.vertx.mqtt.messages.MqttPublishMessage;

public interface MqttClientSession {

enum SessionState {
DISCONNECTED,
CONNECTING,
CONNECTED,
DISCONNECTING,
}

enum SubscriptionState {
UNSUBSCRIBED,
SUBSCRIBING,
SUBSCRIBED,
FAILED,
}

enum RequestedQoS {
QOS_0(0),
QOS_1(1);

private final int value;

RequestedQoS(int value) {
this.value = value;
}

public int toInteger() {
return this.value;
}
}

class SessionEvent {

private final SessionState sessionState;
private final Throwable cause;

public SessionEvent(final SessionState sessionState, final Throwable reason) {
this.sessionState = sessionState;
this.cause = reason;
}

public SessionState getSessionState() {
return this.sessionState;
}

public Throwable getCause() {
return this.cause;
}
}

class SubscriptionEvent {
private final String topic;
private final SubscriptionState subscriptionState;
private final Integer qos;

public SubscriptionEvent(final String topic, final SubscriptionState subscriptionState, final Integer qos) {
this.topic = topic;
this.subscriptionState = subscriptionState;
this.qos = qos;
}

public Integer getQos() {
return this.qos;
}

public SubscriptionState getSubscriptionState() {
return this.subscriptionState;
}

public String getTopic() {
return this.topic;
}
}

static MqttClientSession create(Vertx vertx, MqttClientSessionOptions options) {
return new MqttClientSessionImpl(vertx, options);
}

@Fluent
MqttClientSession sessionStateHandler(Handler<SessionEvent> sessionStateHandler);

@Fluent
MqttClientSession subscriptionStateHandler(Handler<SubscriptionEvent> subscriptionStateHandler);

void start();

void stop();

/**
* Subscribes to the topics with related QoS levels
*
* @param topics topics and related QoS levels to subscribe to
*/
@Fluent
MqttClientSession subscribe(Map<String, RequestedQoS> topics);

/**
* Subscribes to a single topic with related QoS level.
*
* @param topic The topic to subscribe to.
* @param qos The QoS to request from the server.
*/
@Fluent
default MqttClientSession subscribe(String topic, RequestedQoS qos) {
return subscribe(Collections.singletonMap(topic, qos));
}

@Fluent
default MqttClientSession subscribe(RequestedQoS qos, String... topics) {
final Map<String, RequestedQoS> topicMap = new LinkedHashMap<>(topics.length);
for (String topic : topics) {
topicMap.put(topic, qos);
}
return subscribe(topicMap);
}

/**
* Unsubscribe from receiving messages on given topics
*
* @param topics Topics you want to unsubscribe from
*/
MqttClientSession unsubscribe(Set<String> topics);

/**
* Sets handler which will be called each time server publish something to client
*
* @param messageHandler handler to call
* @return current MQTT client session instance
*/
@Fluent
MqttClientSession messageHandler(Handler<MqttPublishMessage> messageHandler);

/**
* Sends the PUBLISH message to the remote MQTT server
*
* @param topic topic on which the message is published
* @param payload message payload
* @param qosLevel QoS level
* @param isDup if the message is a duplicate
* @param isRetain if the message needs to be retained
* @return a {@code Future} completed after PUBLISH packet sent with packetid (not when QoS 0)
*/
Future<Integer> publish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain);
}
73 changes: 73 additions & 0 deletions src/main/java/io/vertx/mqtt/MqttClientSessionOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2021 Red Hat Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.vertx.mqtt;

import io.vertx.codegen.annotations.DataObject;
import io.vertx.core.json.JsonObject;

@DataObject(generateConverter = true)
public class MqttClientSessionOptions extends MqttClientOptions {

private String hostname = MqttClientOptions.DEFAULT_HOST;
private int port = MqttClientOptions.DEFAULT_PORT;

/**
* Default constructor
*/
public MqttClientSessionOptions() {
super();
}

/**
* Create an instance of MqttClientSessionOptions from JSON
*
* @param json the JSON
*/
public MqttClientSessionOptions(JsonObject json) {
super(json);
MqttClientSessionOptionsConverter.fromJson(json, this);
}

/**
* Copy constructor
*
* @param other the options to copy
*/
public MqttClientSessionOptions(MqttClientSessionOptions other) {
super(other);
this.hostname = other.hostname;
this.port = other.port;
}

public int getPort() {
return this.port;
}

public MqttClientSessionOptions setPort(int port) {
this.port = port;
return this;
}

public String getHostname() {
return this.hostname;
}

public MqttClientSessionOptions setHostname(String hostname) {
this.hostname = hostname;
return this;
}
}
21 changes: 19 additions & 2 deletions src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -615,8 +615,16 @@ private synchronized Handler<Integer> unsubscribeCompletionHandler() {
*/
@Override
public MqttClient unsubscribe(String topic, Handler<AsyncResult<Integer>> unsubscribeSentHandler) {
return unsubscribe(Collections.singletonList(topic), unsubscribeSentHandler);
}

/**
* See {@link MqttClient#unsubscribe(List, Handler)} )} for more details
*/
@Override
public MqttClient unsubscribe(List<String> topics, Handler<AsyncResult<Integer>> unsubscribeSentHandler) {

Future<Integer> fut = unsubscribe(topic);
Future<Integer> fut = unsubscribe(topics);
if (unsubscribeSentHandler != null) {
fut.onComplete(unsubscribeSentHandler);
}
Expand All @@ -629,6 +637,15 @@ public MqttClient unsubscribe(String topic, Handler<AsyncResult<Integer>> unsubs
@Override
public Future<Integer> unsubscribe(String topic) {

return unsubscribe(Collections.singletonList(topic));
}

/**
* See {@link MqttClient#unsubscribe(List)} for more details
*/
@Override
public Future<Integer> unsubscribe(List<String> topics) {

MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.UNSUBSCRIBE,
false,
Expand All @@ -638,7 +655,7 @@ public Future<Integer> unsubscribe(String topic) {

MqttMessageIdVariableHeader variableHeader = new MqttMessageIdAndPropertiesVariableHeader(nextMessageId(), MqttProperties.NO_PROPERTIES);

MqttUnsubscribePayload payload = new MqttUnsubscribePayload(Stream.of(topic).collect(Collectors.toList()));
MqttUnsubscribePayload payload = new MqttUnsubscribePayload(topics);

io.netty.handler.codec.mqtt.MqttMessage unsubscribe = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);

Expand Down
Loading

0 comments on commit 683b982

Please sign in to comment.