From 04f58c43e12a638ce549f7027b8b5f637e341736 Mon Sep 17 00:00:00 2001 From: andsel Date: Mon, 13 Feb 2023 11:06:53 +0100 Subject: [PATCH 1/6] Implement and use a Session data storage --- .../moquette/broker/ISessionsRepository.java | 77 ++++++++++++++++++ .../main/java/io/moquette/broker/Server.java | 6 +- .../main/java/io/moquette/broker/Session.java | 3 +- .../io/moquette/broker/SessionRegistry.java | 20 +++-- .../io/moquette/persistence/H2Builder.java | 5 ++ .../persistence/H2SessionsRepository.java | 80 +++++++++++++++++++ .../persistence/MemorySessionsRepository.java | 22 +++++ .../broker/MQTTConnectionConnectTest.java | 3 +- .../broker/MQTTConnectionPublishTest.java | 9 ++- .../broker/PostOfficeInternalPublishTest.java | 3 +- .../broker/PostOfficePublishTest.java | 3 +- .../broker/PostOfficeSubscribeTest.java | 3 +- .../broker/PostOfficeUnsubscribeTest.java | 3 +- .../moquette/broker/SessionRegistryTest.java | 3 +- 14 files changed, 225 insertions(+), 15 deletions(-) create mode 100644 broker/src/main/java/io/moquette/broker/ISessionsRepository.java create mode 100644 broker/src/main/java/io/moquette/persistence/H2SessionsRepository.java create mode 100644 broker/src/main/java/io/moquette/persistence/MemorySessionsRepository.java diff --git a/broker/src/main/java/io/moquette/broker/ISessionsRepository.java b/broker/src/main/java/io/moquette/broker/ISessionsRepository.java new file mode 100644 index 000000000..c7a96b850 --- /dev/null +++ b/broker/src/main/java/io/moquette/broker/ISessionsRepository.java @@ -0,0 +1,77 @@ +package io.moquette.broker; + +import io.netty.handler.codec.mqtt.MqttVersion; + +import java.time.Instant; +import java.util.Collection; +import java.util.Objects; + +/** + * Used to store data about persisted sessions like MQTT version, session's properties. + * */ +public interface ISessionsRepository { + + // Data class + final class SessionData { + private final String clientId; + private final Instant created; + final MqttVersion version; + final long expiryInterval; + + public SessionData(String clientId, Instant created, MqttVersion version, long expiryInterval) { + this.clientId = clientId; + this.created = created; + this.version = version; + this.expiryInterval = expiryInterval; + } + + public String clientId() { + return clientId; + } + + public MqttVersion protocolVersion() { + return version; + } + + public Instant created() { + return created; + } + + public long expiryInterval() { + return expiryInterval; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SessionData that = (SessionData) o; + return clientId.equals(that.clientId); + } + + @Override + public int hashCode() { + return Objects.hash(clientId); + } + + @Override + public String toString() { + return "SessionData{" + + "clientId='" + clientId + '\'' + + ", created=" + created + + ", version=" + version + + ", expiryInterval=" + expiryInterval + + '}'; + } + } + + /** + * @return the full list of persisted sessions data. + * */ + Collection list(); + + /** + * Save data composing a session, es MQTT version, creation date and properties but not queues or subscriptions. + * */ + void saveSession(SessionData session); +} diff --git a/broker/src/main/java/io/moquette/broker/Server.java b/broker/src/main/java/io/moquette/broker/Server.java index 3bbaf6f5d..d00ed8164 100644 --- a/broker/src/main/java/io/moquette/broker/Server.java +++ b/broker/src/main/java/io/moquette/broker/Server.java @@ -31,6 +31,7 @@ import io.moquette.broker.unsafequeues.QueueException; import io.moquette.interception.InterceptHandler; import io.moquette.persistence.H2Builder; +import io.moquette.persistence.MemorySessionsRepository; import io.moquette.persistence.MemorySubscriptionsRepository; import io.moquette.interception.BrokerInterceptor; import io.moquette.broker.subscriptions.CTrieSubscriptionDirectory; @@ -192,6 +193,7 @@ public void startServer(IConfig config, List handler authenticator = initializeAuthenticator(authenticator, config); authorizatorPolicy = initializeAuthorizatorPolicy(authorizatorPolicy, config); + final ISessionsRepository sessionsRepository; final ISubscriptionsRepository subscriptionsRepository; final IQueueRepository queueRepository; final IRetainedRepository retainedRepository; @@ -225,17 +227,19 @@ public void startServer(IConfig config, List handler LOG.trace("Configuring H2 subscriptions repository"); subscriptionsRepository = h2Builder.subscriptionsRepository(); retainedRepository = h2Builder.retainedRepository(); + sessionsRepository = h2Builder.sessionsRepository(); } else { LOG.trace("Configuring in-memory subscriptions store"); subscriptionsRepository = new MemorySubscriptionsRepository(); queueRepository = new MemoryQueueRepository(); retainedRepository = new MemoryRetainedRepository(); + sessionsRepository = new MemorySessionsRepository(); } ISubscriptionsDirectory subscriptions = new CTrieSubscriptionDirectory(); subscriptions.init(subscriptionsRepository); final Authorizator authorizator = new Authorizator(authorizatorPolicy); - sessions = new SessionRegistry(subscriptions, queueRepository, authorizator); + sessions = new SessionRegistry(subscriptions, sessionsRepository, queueRepository, authorizator); final int sessionQueueSize = config.intProp(BrokerConstants.SESSION_QUEUE_SIZE, 1024); dispatcher = new PostOffice(subscriptions, retainedRepository, sessions, interceptor, authorizator, sessionQueueSize); diff --git a/broker/src/main/java/io/moquette/broker/Session.java b/broker/src/main/java/io/moquette/broker/Session.java index f5b1561be..fda277958 100644 --- a/broker/src/main/java/io/moquette/broker/Session.java +++ b/broker/src/main/java/io/moquette/broker/Session.java @@ -46,6 +46,7 @@ class Session { private static final Logger LOG = LoggerFactory.getLogger(Session.class); + static final int INFINITE_EXPIRY = 0xFFFFFFFF; static class InFlightPacket implements Delayed { @@ -123,7 +124,7 @@ static final class Will { this.sessionQueue = sessionQueue; this.created = Instant.now(); // in MQTT3 cleanSession = true means expiryInterval=0 else infinite - expiryInterval = clean ? 0 : 0xFFFFFFFF; + expiryInterval = clean ? 0 : INFINITE_EXPIRY; } public boolean expireImmediately() { diff --git a/broker/src/main/java/io/moquette/broker/SessionRegistry.java b/broker/src/main/java/io/moquette/broker/SessionRegistry.java index aba4f8d83..670f360b4 100644 --- a/broker/src/main/java/io/moquette/broker/SessionRegistry.java +++ b/broker/src/main/java/io/moquette/broker/SessionRegistry.java @@ -23,10 +23,12 @@ import io.netty.buffer.Unpooled; import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttVersion; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; +import java.time.Instant; import java.util.Arrays; import java.util.Collection; import java.util.Optional; @@ -35,6 +37,8 @@ import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; +import static io.moquette.broker.Session.INFINITE_EXPIRY; + public class SessionRegistry { public abstract static class EnqueuedMessage { @@ -114,13 +118,16 @@ public SessionCreationResult(Session session, CreationModeEnum mode, boolean alr private final ConcurrentMap pool = new ConcurrentHashMap<>(); private final ISubscriptionsDirectory subscriptionsDirectory; + private final ISessionsRepository sessionsRepository; private final IQueueRepository queueRepository; private final Authorizator authorizator; SessionRegistry(ISubscriptionsDirectory subscriptionsDirectory, + ISessionsRepository sessionsRepository, IQueueRepository queueRepository, Authorizator authorizator) { this.subscriptionsDirectory = subscriptionsDirectory; + this.sessionsRepository = sessionsRepository; this.queueRepository = queueRepository; this.authorizator = authorizator; recreateSessionPool(); @@ -128,13 +135,13 @@ public SessionCreationResult(Session session, CreationModeEnum mode, boolean alr private void recreateSessionPool() { final Set queues = queueRepository.listQueueNames(); - for (String clientId : subscriptionsDirectory.listAllSessionIds()) { + for (ISessionsRepository.SessionData session : sessionsRepository.list()) { // if the subscriptions are present is obviously false - if (queueRepository.containsQueue(clientId)) { - final SessionMessageQueue persistentQueue = queueRepository.getOrCreateQueue(clientId); - queues.remove(clientId); - Session rehydrated = new Session(clientId, false, persistentQueue); - pool.put(clientId, rehydrated); + if (queueRepository.containsQueue(session.clientId())) { + final SessionMessageQueue persistentQueue = queueRepository.getOrCreateQueue(session.clientId()); + queues.remove(session.clientId()); + Session rehydrated = new Session(session.clientId(), false, persistentQueue); + pool.put(session.clientId(), rehydrated); } } if (!queues.isEmpty()) { @@ -236,6 +243,7 @@ private Session createNewSession(MqttConnectMessage msg, String clientId) { } newSession.markConnecting(); + sessionsRepository.saveSession(new ISessionsRepository.SessionData(clientId, Instant.now(), MqttVersion.MQTT_3_1_1, INFINITE_EXPIRY)); return newSession; } diff --git a/broker/src/main/java/io/moquette/persistence/H2Builder.java b/broker/src/main/java/io/moquette/persistence/H2Builder.java index bb6c33bae..d2dd66d42 100644 --- a/broker/src/main/java/io/moquette/persistence/H2Builder.java +++ b/broker/src/main/java/io/moquette/persistence/H2Builder.java @@ -2,6 +2,7 @@ import io.moquette.broker.IQueueRepository; import io.moquette.broker.IRetainedRepository; +import io.moquette.broker.ISessionsRepository; import io.moquette.broker.ISubscriptionsRepository; import org.h2.mvstore.MVStore; import org.slf4j.Logger; @@ -72,4 +73,8 @@ public IQueueRepository queueRepository() { public IRetainedRepository retainedRepository() { return new H2RetainedRepository(mvStore); } + + public ISessionsRepository sessionsRepository() { + return new H2SessionsRepository(mvStore); + } } diff --git a/broker/src/main/java/io/moquette/persistence/H2SessionsRepository.java b/broker/src/main/java/io/moquette/persistence/H2SessionsRepository.java new file mode 100644 index 000000000..193c27e15 --- /dev/null +++ b/broker/src/main/java/io/moquette/persistence/H2SessionsRepository.java @@ -0,0 +1,80 @@ +package io.moquette.persistence; + +import io.moquette.broker.ISessionsRepository; +import io.netty.handler.codec.mqtt.MqttVersion; +import org.h2.mvstore.MVMap; +import org.h2.mvstore.MVStore; +import org.h2.mvstore.WriteBuffer; +import org.h2.mvstore.type.BasicDataType; +import org.h2.mvstore.type.StringDataType; + +import java.nio.ByteBuffer; +import java.time.Instant; +import java.util.Collection; + +class H2SessionsRepository implements ISessionsRepository { + + private final MVMap sessionMap; + + public H2SessionsRepository(MVStore mvStore) { + final MVMap.Builder sessionTypeBuilder = + new MVMap.Builder() + .valueType(new SessionDataValueType()); + + this.sessionMap = mvStore.openMap("sessions_store", sessionTypeBuilder); + } + + @Override + public Collection list() { + return sessionMap.values(); + } + + @Override + public void saveSession(SessionData session) { + sessionMap.put(session.clientId(), session); + } + + /** + * Codec data type to load and store SessionData instances + */ + private final class SessionDataValueType extends BasicDataType { + + private final StringDataType stringDataType = new StringDataType(); + + @Override + public int getMemory(SessionData obj) { + return stringDataType.getMemory(obj.clientId()) + 8 + 1 + 8; + } + + @Override + public void write(WriteBuffer buff, SessionData obj) { + stringDataType.write(buff, obj.clientId()); + buff.putLong(obj.created().toEpochMilli()); + buff.put(obj.protocolVersion().protocolLevel()); + buff.putLong(obj.expiryInterval()); + } + + @Override + public SessionData read(ByteBuffer buff) { + final String clientId = stringDataType.read(buff); + final long created = buff.getLong(); + final byte rawVersion = buff.get(); + final MqttVersion version; + switch (rawVersion) { + case 3: version = MqttVersion.MQTT_3_1; break; + case 4: version = MqttVersion.MQTT_3_1_1; break; + case 5: version = MqttVersion.MQTT_5; break; + default: + throw new IllegalArgumentException("Unrecognized MQTT version value " + rawVersion); + } + final long expiryInterval = buff.getLong(); + + return new SessionData(clientId, Instant.ofEpochMilli(created), version, expiryInterval); + } + + @Override + public SessionData[] createStorage(int i) { + return new SessionData[i]; + } + } +} diff --git a/broker/src/main/java/io/moquette/persistence/MemorySessionsRepository.java b/broker/src/main/java/io/moquette/persistence/MemorySessionsRepository.java new file mode 100644 index 000000000..bb2c11c0c --- /dev/null +++ b/broker/src/main/java/io/moquette/persistence/MemorySessionsRepository.java @@ -0,0 +1,22 @@ +package io.moquette.persistence; + +import io.moquette.broker.ISessionsRepository; + +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class MemorySessionsRepository implements ISessionsRepository { + + private final ConcurrentMap sessions = new ConcurrentHashMap<>(); + + @Override + public Collection list() { + return sessions.values(); + } + + @Override + public void saveSession(SessionData session) { + sessions.put(session.clientId(), session); + } +} diff --git a/broker/src/test/java/io/moquette/broker/MQTTConnectionConnectTest.java b/broker/src/test/java/io/moquette/broker/MQTTConnectionConnectTest.java index 08e2358d0..3c97186aa 100644 --- a/broker/src/test/java/io/moquette/broker/MQTTConnectionConnectTest.java +++ b/broker/src/test/java/io/moquette/broker/MQTTConnectionConnectTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.ExecutionException; +import static io.moquette.broker.MQTTConnectionPublishTest.inMemorySessionsRepository; import static io.moquette.broker.NettyChannelAssertions.assertEqualsConnAck; import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*; import static java.util.Collections.singleton; @@ -70,7 +71,7 @@ public void setUp() { final PermitAllAuthorizatorPolicy authorizatorPolicy = new PermitAllAuthorizatorPolicy(); final Authorizator permitAll = new Authorizator(authorizatorPolicy); - sessionRegistry = new SessionRegistry(subscriptions, queueRepository, permitAll); + sessionRegistry = new SessionRegistry(subscriptions, inMemorySessionsRepository(), queueRepository, permitAll); postOffice = new PostOffice(subscriptions, new MemoryRetainedRepository(), sessionRegistry, ConnectionTestUtils.NO_OBSERVERS_INTERCEPTOR, permitAll, 1024); diff --git a/broker/src/test/java/io/moquette/broker/MQTTConnectionPublishTest.java b/broker/src/test/java/io/moquette/broker/MQTTConnectionPublishTest.java index 7d605de87..718693cde 100644 --- a/broker/src/test/java/io/moquette/broker/MQTTConnectionPublishTest.java +++ b/broker/src/test/java/io/moquette/broker/MQTTConnectionPublishTest.java @@ -19,6 +19,7 @@ import io.moquette.broker.subscriptions.CTrieSubscriptionDirectory; import io.moquette.broker.subscriptions.ISubscriptionsDirectory; import io.moquette.broker.security.IAuthenticator; +import io.moquette.persistence.MemorySessionsRepository; import io.moquette.persistence.MemorySubscriptionsRepository; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -28,6 +29,7 @@ import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttVersion; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -76,12 +78,17 @@ private MQTTConnection createMQTTConnection(BrokerConfiguration config, Channel final PermitAllAuthorizatorPolicy authorizatorPolicy = new PermitAllAuthorizatorPolicy(); final Authorizator permitAll = new Authorizator(authorizatorPolicy); - sessionRegistry = new SessionRegistry(subscriptions, queueRepository, permitAll); + sessionRegistry = new SessionRegistry(subscriptions, inMemorySessionsRepository(), queueRepository, permitAll); final PostOffice postOffice = new PostOffice(subscriptions, new MemoryRetainedRepository(), sessionRegistry, ConnectionTestUtils.NO_OBSERVERS_INTERCEPTOR, permitAll, 1024); return new MQTTConnection(channel, config, mockAuthenticator, sessionRegistry, postOffice); } + @NotNull + static ISessionsRepository inMemorySessionsRepository() { + return new MemorySessionsRepository(); + } + @Test public void dropConnectionOnPublishWithInvalidTopicFormat() throws ExecutionException, InterruptedException { // Connect message with clean session set to true and client id is null. diff --git a/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java b/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java index 2f82d4925..e22e1ad2d 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java @@ -32,6 +32,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import static io.moquette.broker.MQTTConnectionPublishTest.inMemorySessionsRepository; import static io.moquette.broker.PostOfficeUnsubscribeTest.CONFIG; import static io.netty.handler.codec.mqtt.MqttQoS.*; import static java.nio.charset.StandardCharsets.UTF_8; @@ -89,7 +90,7 @@ private void initPostOfficeAndSubsystems() { final PermitAllAuthorizatorPolicy authorizatorPolicy = new PermitAllAuthorizatorPolicy(); final Authorizator permitAll = new Authorizator(authorizatorPolicy); - sessionRegistry = new SessionRegistry(subscriptions, queueRepository, permitAll); + sessionRegistry = new SessionRegistry(subscriptions, inMemorySessionsRepository(), queueRepository, permitAll); sut = new PostOffice(subscriptions, retainedRepository, sessionRegistry, ConnectionTestUtils.NO_OBSERVERS_INTERCEPTOR, permitAll, 1024); } diff --git a/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java b/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java index 6a8ed3bb6..65d02a4a5 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java @@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static io.moquette.broker.MQTTConnectionPublishTest.inMemorySessionsRepository; import static io.moquette.broker.PostOfficeUnsubscribeTest.CONFIG; import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE; import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; @@ -103,7 +104,7 @@ private void initPostOfficeAndSubsystems() { final PermitAllAuthorizatorPolicy authorizatorPolicy = new PermitAllAuthorizatorPolicy(); final Authorizator permitAll = new Authorizator(authorizatorPolicy); - sessionRegistry = new SessionRegistry(subscriptions, queueRepository, permitAll); + sessionRegistry = new SessionRegistry(subscriptions, inMemorySessionsRepository(), queueRepository, permitAll); sut = new PostOffice(subscriptions, retainedRepository, sessionRegistry, ConnectionTestUtils.NO_OBSERVERS_INTERCEPTOR, permitAll, 1024); } diff --git a/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java b/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java index 5318d6ca3..fa98aee38 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java @@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static io.moquette.broker.MQTTConnectionPublishTest.inMemorySessionsRepository; import static io.moquette.broker.PostOfficePublishTest.ALLOW_ANONYMOUS_AND_ZERO_BYTES_CLID; import static io.moquette.broker.PostOfficePublishTest.SUBSCRIBER_ID; import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED; @@ -95,7 +96,7 @@ private void prepareSUT() { final PermitAllAuthorizatorPolicy authorizatorPolicy = new PermitAllAuthorizatorPolicy(); final Authorizator permitAll = new Authorizator(authorizatorPolicy); - sessionRegistry = new SessionRegistry(subscriptions, queueRepository, permitAll); + sessionRegistry = new SessionRegistry(subscriptions, inMemorySessionsRepository(), queueRepository, permitAll); sut = new PostOffice(subscriptions, new MemoryRetainedRepository(), sessionRegistry, ConnectionTestUtils.NO_OBSERVERS_INTERCEPTOR, permitAll, 1024); } diff --git a/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java b/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java index 579a64fa9..5a46d7c14 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java @@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static io.moquette.broker.MQTTConnectionPublishTest.inMemorySessionsRepository; import static io.moquette.broker.PostOfficePublishTest.PUBLISHER_ID; import static io.netty.handler.codec.mqtt.MqttQoS.*; import static java.util.Collections.*; @@ -87,7 +88,7 @@ private void prepareSUT() { final PermitAllAuthorizatorPolicy authorizatorPolicy = new PermitAllAuthorizatorPolicy(); final Authorizator permitAll = new Authorizator(authorizatorPolicy); - sessionRegistry = new SessionRegistry(subscriptions, queueRepository, permitAll); + sessionRegistry = new SessionRegistry(subscriptions, inMemorySessionsRepository(), queueRepository, permitAll); sut = new PostOffice(subscriptions, new MemoryRetainedRepository(), sessionRegistry, ConnectionTestUtils.NO_OBSERVERS_INTERCEPTOR, permitAll, 1024); } diff --git a/broker/src/test/java/io/moquette/broker/SessionRegistryTest.java b/broker/src/test/java/io/moquette/broker/SessionRegistryTest.java index 78d1c9f8f..ac39f6002 100644 --- a/broker/src/test/java/io/moquette/broker/SessionRegistryTest.java +++ b/broker/src/test/java/io/moquette/broker/SessionRegistryTest.java @@ -40,6 +40,7 @@ import java.nio.charset.StandardCharsets; import java.util.concurrent.ExecutionException; +import static io.moquette.broker.MQTTConnectionPublishTest.inMemorySessionsRepository; import static io.moquette.broker.NettyChannelAssertions.assertEqualsConnAck; import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED; import static java.util.Collections.singleton; @@ -85,7 +86,7 @@ private MQTTConnection createMQTTConnection(BrokerConfiguration config, Channel final PermitAllAuthorizatorPolicy authorizatorPolicy = new PermitAllAuthorizatorPolicy(); final Authorizator permitAll = new Authorizator(authorizatorPolicy); - sut = new SessionRegistry(subscriptions, queueRepository, permitAll); + sut = new SessionRegistry(subscriptions, inMemorySessionsRepository(), queueRepository, permitAll); final PostOffice postOffice = new PostOffice(subscriptions, new MemoryRetainedRepository(), sut, ConnectionTestUtils.NO_OBSERVERS_INTERCEPTOR, permitAll, 1024); return new MQTTConnection(channel, config, mockAuthenticator, sut, postOffice); From 86bd86241ccada9d0757c8e8ded8d50bb584b470 Mon Sep 17 00:00:00 2001 From: andsel Date: Fri, 17 Feb 2023 14:20:56 +0100 Subject: [PATCH 2/6] Minor rename inMemorySessionsRepository to memorySessionRepository and included JetBrains null/nullable annotations library --- broker/pom.xml | 6 ++++++ .../java/io/moquette/broker/MQTTConnectionConnectTest.java | 4 ++-- .../java/io/moquette/broker/MQTTConnectionPublishTest.java | 4 ++-- .../io/moquette/broker/PostOfficeInternalPublishTest.java | 4 ++-- .../test/java/io/moquette/broker/PostOfficePublishTest.java | 4 ++-- .../java/io/moquette/broker/PostOfficeSubscribeTest.java | 4 ++-- .../java/io/moquette/broker/PostOfficeUnsubscribeTest.java | 5 ++--- .../test/java/io/moquette/broker/SessionRegistryTest.java | 4 ++-- 8 files changed, 20 insertions(+), 15 deletions(-) diff --git a/broker/pom.xml b/broker/pom.xml index 64911facc..172100370 100644 --- a/broker/pom.xml +++ b/broker/pom.xml @@ -167,6 +167,12 @@ ${h2.version} test + + org.jetbrains + annotations + RELEASE + test + diff --git a/broker/src/test/java/io/moquette/broker/MQTTConnectionConnectTest.java b/broker/src/test/java/io/moquette/broker/MQTTConnectionConnectTest.java index 3c97186aa..8f37c8cc6 100644 --- a/broker/src/test/java/io/moquette/broker/MQTTConnectionConnectTest.java +++ b/broker/src/test/java/io/moquette/broker/MQTTConnectionConnectTest.java @@ -31,7 +31,7 @@ import java.util.concurrent.ExecutionException; -import static io.moquette.broker.MQTTConnectionPublishTest.inMemorySessionsRepository; +import static io.moquette.broker.MQTTConnectionPublishTest.memorySessionsRepository; import static io.moquette.broker.NettyChannelAssertions.assertEqualsConnAck; import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*; import static java.util.Collections.singleton; @@ -71,7 +71,7 @@ public void setUp() { final PermitAllAuthorizatorPolicy authorizatorPolicy = new PermitAllAuthorizatorPolicy(); final Authorizator permitAll = new Authorizator(authorizatorPolicy); - sessionRegistry = new SessionRegistry(subscriptions, inMemorySessionsRepository(), queueRepository, permitAll); + sessionRegistry = new SessionRegistry(subscriptions, memorySessionsRepository(), queueRepository, permitAll); postOffice = new PostOffice(subscriptions, new MemoryRetainedRepository(), sessionRegistry, ConnectionTestUtils.NO_OBSERVERS_INTERCEPTOR, permitAll, 1024); diff --git a/broker/src/test/java/io/moquette/broker/MQTTConnectionPublishTest.java b/broker/src/test/java/io/moquette/broker/MQTTConnectionPublishTest.java index 718693cde..593f96dbc 100644 --- a/broker/src/test/java/io/moquette/broker/MQTTConnectionPublishTest.java +++ b/broker/src/test/java/io/moquette/broker/MQTTConnectionPublishTest.java @@ -78,14 +78,14 @@ private MQTTConnection createMQTTConnection(BrokerConfiguration config, Channel final PermitAllAuthorizatorPolicy authorizatorPolicy = new PermitAllAuthorizatorPolicy(); final Authorizator permitAll = new Authorizator(authorizatorPolicy); - sessionRegistry = new SessionRegistry(subscriptions, inMemorySessionsRepository(), queueRepository, permitAll); + sessionRegistry = new SessionRegistry(subscriptions, memorySessionsRepository(), queueRepository, permitAll); final PostOffice postOffice = new PostOffice(subscriptions, new MemoryRetainedRepository(), sessionRegistry, ConnectionTestUtils.NO_OBSERVERS_INTERCEPTOR, permitAll, 1024); return new MQTTConnection(channel, config, mockAuthenticator, sessionRegistry, postOffice); } @NotNull - static ISessionsRepository inMemorySessionsRepository() { + static ISessionsRepository memorySessionsRepository() { return new MemorySessionsRepository(); } diff --git a/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java b/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java index e22e1ad2d..181ada1d3 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java @@ -32,7 +32,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import static io.moquette.broker.MQTTConnectionPublishTest.inMemorySessionsRepository; +import static io.moquette.broker.MQTTConnectionPublishTest.memorySessionsRepository; import static io.moquette.broker.PostOfficeUnsubscribeTest.CONFIG; import static io.netty.handler.codec.mqtt.MqttQoS.*; import static java.nio.charset.StandardCharsets.UTF_8; @@ -90,7 +90,7 @@ private void initPostOfficeAndSubsystems() { final PermitAllAuthorizatorPolicy authorizatorPolicy = new PermitAllAuthorizatorPolicy(); final Authorizator permitAll = new Authorizator(authorizatorPolicy); - sessionRegistry = new SessionRegistry(subscriptions, inMemorySessionsRepository(), queueRepository, permitAll); + sessionRegistry = new SessionRegistry(subscriptions, memorySessionsRepository(), queueRepository, permitAll); sut = new PostOffice(subscriptions, retainedRepository, sessionRegistry, ConnectionTestUtils.NO_OBSERVERS_INTERCEPTOR, permitAll, 1024); } diff --git a/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java b/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java index 65d02a4a5..a88e6a727 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java @@ -38,7 +38,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import static io.moquette.broker.MQTTConnectionPublishTest.inMemorySessionsRepository; +import static io.moquette.broker.MQTTConnectionPublishTest.memorySessionsRepository; import static io.moquette.broker.PostOfficeUnsubscribeTest.CONFIG; import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE; import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; @@ -104,7 +104,7 @@ private void initPostOfficeAndSubsystems() { final PermitAllAuthorizatorPolicy authorizatorPolicy = new PermitAllAuthorizatorPolicy(); final Authorizator permitAll = new Authorizator(authorizatorPolicy); - sessionRegistry = new SessionRegistry(subscriptions, inMemorySessionsRepository(), queueRepository, permitAll); + sessionRegistry = new SessionRegistry(subscriptions, memorySessionsRepository(), queueRepository, permitAll); sut = new PostOffice(subscriptions, retainedRepository, sessionRegistry, ConnectionTestUtils.NO_OBSERVERS_INTERCEPTOR, permitAll, 1024); } diff --git a/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java b/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java index fa98aee38..2b8417161 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java @@ -38,7 +38,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import static io.moquette.broker.MQTTConnectionPublishTest.inMemorySessionsRepository; +import static io.moquette.broker.MQTTConnectionPublishTest.memorySessionsRepository; import static io.moquette.broker.PostOfficePublishTest.ALLOW_ANONYMOUS_AND_ZERO_BYTES_CLID; import static io.moquette.broker.PostOfficePublishTest.SUBSCRIBER_ID; import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED; @@ -96,7 +96,7 @@ private void prepareSUT() { final PermitAllAuthorizatorPolicy authorizatorPolicy = new PermitAllAuthorizatorPolicy(); final Authorizator permitAll = new Authorizator(authorizatorPolicy); - sessionRegistry = new SessionRegistry(subscriptions, inMemorySessionsRepository(), queueRepository, permitAll); + sessionRegistry = new SessionRegistry(subscriptions, memorySessionsRepository(), queueRepository, permitAll); sut = new PostOffice(subscriptions, new MemoryRetainedRepository(), sessionRegistry, ConnectionTestUtils.NO_OBSERVERS_INTERCEPTOR, permitAll, 1024); } diff --git a/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java b/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java index 5a46d7c14..8a7d68a94 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java @@ -27,7 +27,6 @@ import io.netty.channel.Channel; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.mqtt.*; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -38,7 +37,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import static io.moquette.broker.MQTTConnectionPublishTest.inMemorySessionsRepository; +import static io.moquette.broker.MQTTConnectionPublishTest.memorySessionsRepository; import static io.moquette.broker.PostOfficePublishTest.PUBLISHER_ID; import static io.netty.handler.codec.mqtt.MqttQoS.*; import static java.util.Collections.*; @@ -88,7 +87,7 @@ private void prepareSUT() { final PermitAllAuthorizatorPolicy authorizatorPolicy = new PermitAllAuthorizatorPolicy(); final Authorizator permitAll = new Authorizator(authorizatorPolicy); - sessionRegistry = new SessionRegistry(subscriptions, inMemorySessionsRepository(), queueRepository, permitAll); + sessionRegistry = new SessionRegistry(subscriptions, memorySessionsRepository(), queueRepository, permitAll); sut = new PostOffice(subscriptions, new MemoryRetainedRepository(), sessionRegistry, ConnectionTestUtils.NO_OBSERVERS_INTERCEPTOR, permitAll, 1024); } diff --git a/broker/src/test/java/io/moquette/broker/SessionRegistryTest.java b/broker/src/test/java/io/moquette/broker/SessionRegistryTest.java index ac39f6002..36bda39be 100644 --- a/broker/src/test/java/io/moquette/broker/SessionRegistryTest.java +++ b/broker/src/test/java/io/moquette/broker/SessionRegistryTest.java @@ -40,7 +40,7 @@ import java.nio.charset.StandardCharsets; import java.util.concurrent.ExecutionException; -import static io.moquette.broker.MQTTConnectionPublishTest.inMemorySessionsRepository; +import static io.moquette.broker.MQTTConnectionPublishTest.memorySessionsRepository; import static io.moquette.broker.NettyChannelAssertions.assertEqualsConnAck; import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED; import static java.util.Collections.singleton; @@ -86,7 +86,7 @@ private MQTTConnection createMQTTConnection(BrokerConfiguration config, Channel final PermitAllAuthorizatorPolicy authorizatorPolicy = new PermitAllAuthorizatorPolicy(); final Authorizator permitAll = new Authorizator(authorizatorPolicy); - sut = new SessionRegistry(subscriptions, inMemorySessionsRepository(), queueRepository, permitAll); + sut = new SessionRegistry(subscriptions, memorySessionsRepository(), queueRepository, permitAll); final PostOffice postOffice = new PostOffice(subscriptions, new MemoryRetainedRepository(), sut, ConnectionTestUtils.NO_OBSERVERS_INTERCEPTOR, permitAll, 1024); return new MQTTConnection(channel, config, mockAuthenticator, sut, postOffice); From 69ae603cfd33fff016ed33c601b8ab9811399f02 Mon Sep 17 00:00:00 2001 From: andsel Date: Fri, 17 Feb 2023 14:26:26 +0100 Subject: [PATCH 3/6] Added version to serialization/deserializion of H2 SessionData to b future proof --- .../persistence/H2SessionsRepository.java | 29 +++++++++++++------ 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/broker/src/main/java/io/moquette/persistence/H2SessionsRepository.java b/broker/src/main/java/io/moquette/persistence/H2SessionsRepository.java index 193c27e15..85e1bd3dc 100644 --- a/broker/src/main/java/io/moquette/persistence/H2SessionsRepository.java +++ b/broker/src/main/java/io/moquette/persistence/H2SessionsRepository.java @@ -14,6 +14,8 @@ class H2SessionsRepository implements ISessionsRepository { + private static final byte SESSION_DATA_SERDES_V1 = 1; + private final MVMap sessionMap; public H2SessionsRepository(MVStore mvStore) { @@ -48,6 +50,7 @@ public int getMemory(SessionData obj) { @Override public void write(WriteBuffer buff, SessionData obj) { + buff.put(SESSION_DATA_SERDES_V1); stringDataType.write(buff, obj.clientId()); buff.putLong(obj.created().toEpochMilli()); buff.put(obj.protocolVersion().protocolLevel()); @@ -56,17 +59,13 @@ public void write(WriteBuffer buff, SessionData obj) { @Override public SessionData read(ByteBuffer buff) { + final byte serDesVersion = buff.get(); + if (serDesVersion != SESSION_DATA_SERDES_V1) { + throw new IllegalArgumentException("Unrecognized serialization version " + serDesVersion); + } final String clientId = stringDataType.read(buff); final long created = buff.getLong(); - final byte rawVersion = buff.get(); - final MqttVersion version; - switch (rawVersion) { - case 3: version = MqttVersion.MQTT_3_1; break; - case 4: version = MqttVersion.MQTT_3_1_1; break; - case 5: version = MqttVersion.MQTT_5; break; - default: - throw new IllegalArgumentException("Unrecognized MQTT version value " + rawVersion); - } + final MqttVersion version = readMQTTVersion(buff.get()); final long expiryInterval = buff.getLong(); return new SessionData(clientId, Instant.ofEpochMilli(created), version, expiryInterval); @@ -77,4 +76,16 @@ public SessionData[] createStorage(int i) { return new SessionData[i]; } } + + private MqttVersion readMQTTVersion(byte rawVersion) { + final MqttVersion version; + switch (rawVersion) { + case 3: version = MqttVersion.MQTT_3_1; break; + case 4: version = MqttVersion.MQTT_3_1_1; break; + case 5: version = MqttVersion.MQTT_5; break; + default: + throw new IllegalArgumentException("Unrecognized MQTT version value " + rawVersion); + } + return version; + } } From 8fa896f83802ffae149bfd6b3396945feb912c77 Mon Sep 17 00:00:00 2001 From: andsel Date: Fri, 3 Mar 2023 09:59:52 +0100 Subject: [PATCH 4/6] Capped the infinite session expire interval to 100 years (as seconds instead of UINT max value) --- broker/src/main/java/io/moquette/broker/Session.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/broker/src/main/java/io/moquette/broker/Session.java b/broker/src/main/java/io/moquette/broker/Session.java index fda277958..d50e53078 100644 --- a/broker/src/main/java/io/moquette/broker/Session.java +++ b/broker/src/main/java/io/moquette/broker/Session.java @@ -28,7 +28,10 @@ import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; +import java.time.Duration; import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.time.temporal.TemporalUnit; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -46,7 +49,9 @@ class Session { private static final Logger LOG = LoggerFactory.getLogger(Session.class); - static final int INFINITE_EXPIRY = 0xFFFFFFFF; + // By specification session expiry value of 0xFFFFFFFF (UINT_MAX) (seconds) means + // session that doesn't expire, it's ~136 years, we can set a cap at 100 year + static final int INFINITE_EXPIRY = (int) Duration.of(100, ChronoUnit.YEARS).toMillis() / 1000; static class InFlightPacket implements Delayed { From e11fd3446ccd2b8a8259cf7dda08b675f54114fb Mon Sep 17 00:00:00 2001 From: andsel Date: Fri, 3 Mar 2023 11:28:38 +0100 Subject: [PATCH 5/6] Updated SessionData to store expiry instant and expiry interval --- .../moquette/broker/ISessionsRepository.java | 35 ++++++++++++++----- .../main/java/io/moquette/broker/Session.java | 2 +- .../io/moquette/broker/SessionRegistry.java | 2 +- .../persistence/H2SessionsRepository.java | 17 +++++---- 4 files changed, 40 insertions(+), 16 deletions(-) diff --git a/broker/src/main/java/io/moquette/broker/ISessionsRepository.java b/broker/src/main/java/io/moquette/broker/ISessionsRepository.java index c7a96b850..d172a2d28 100644 --- a/broker/src/main/java/io/moquette/broker/ISessionsRepository.java +++ b/broker/src/main/java/io/moquette/broker/ISessionsRepository.java @@ -5,6 +5,7 @@ import java.time.Instant; import java.util.Collection; import java.util.Objects; +import java.util.Optional; /** * Used to store data about persisted sessions like MQTT version, session's properties. @@ -14,17 +15,30 @@ public interface ISessionsRepository { // Data class final class SessionData { private final String clientId; - private final Instant created; + private Instant expireAt = null; final MqttVersion version; - final long expiryInterval; + private final int expiryInterval; - public SessionData(String clientId, Instant created, MqttVersion version, long expiryInterval) { + /** + * Construct a new SessionData without expiration set yet. + * */ + public SessionData(String clientId, MqttVersion version, int expiryInterval) { this.clientId = clientId; - this.created = created; this.version = version; this.expiryInterval = expiryInterval; } + /** + * Construct SessionData with an expiration instant, created by loading from the storage. + * */ + public SessionData(String clientId, Instant expireAt, MqttVersion version, int expiryInterval) { + this.expiryInterval = expiryInterval; + Objects.requireNonNull(expireAt, "An expiration time is requested"); + this.clientId = clientId; + this.expireAt = expireAt; + this.version = version; + } + public String clientId() { return clientId; } @@ -33,11 +47,16 @@ public MqttVersion protocolVersion() { return version; } - public Instant created() { - return created; + public Optional expireAt() { + return Optional.ofNullable(expireAt); + } + + public Optional expiryInstant() { + return expireAt() + .map(Instant::toEpochMilli); } - public long expiryInterval() { + public int expiryInterval() { return expiryInterval; } @@ -58,7 +77,7 @@ public int hashCode() { public String toString() { return "SessionData{" + "clientId='" + clientId + '\'' + - ", created=" + created + + ", expireAt=" + expireAt + ", version=" + version + ", expiryInterval=" + expiryInterval + '}'; diff --git a/broker/src/main/java/io/moquette/broker/Session.java b/broker/src/main/java/io/moquette/broker/Session.java index d50e53078..d0ccb1b8f 100644 --- a/broker/src/main/java/io/moquette/broker/Session.java +++ b/broker/src/main/java/io/moquette/broker/Session.java @@ -51,7 +51,7 @@ class Session { private static final Logger LOG = LoggerFactory.getLogger(Session.class); // By specification session expiry value of 0xFFFFFFFF (UINT_MAX) (seconds) means // session that doesn't expire, it's ~136 years, we can set a cap at 100 year - static final int INFINITE_EXPIRY = (int) Duration.of(100, ChronoUnit.YEARS).toMillis() / 1000; + static final int INFINITE_EXPIRY = (int) Duration.ofDays(80 * 365).toMillis() / 1000; static class InFlightPacket implements Delayed { diff --git a/broker/src/main/java/io/moquette/broker/SessionRegistry.java b/broker/src/main/java/io/moquette/broker/SessionRegistry.java index 670f360b4..a1629d685 100644 --- a/broker/src/main/java/io/moquette/broker/SessionRegistry.java +++ b/broker/src/main/java/io/moquette/broker/SessionRegistry.java @@ -243,7 +243,7 @@ private Session createNewSession(MqttConnectMessage msg, String clientId) { } newSession.markConnecting(); - sessionsRepository.saveSession(new ISessionsRepository.SessionData(clientId, Instant.now(), MqttVersion.MQTT_3_1_1, INFINITE_EXPIRY)); + sessionsRepository.saveSession(new ISessionsRepository.SessionData(clientId, MqttVersion.MQTT_3_1_1, INFINITE_EXPIRY)); return newSession; } diff --git a/broker/src/main/java/io/moquette/persistence/H2SessionsRepository.java b/broker/src/main/java/io/moquette/persistence/H2SessionsRepository.java index 85e1bd3dc..e28e551bf 100644 --- a/broker/src/main/java/io/moquette/persistence/H2SessionsRepository.java +++ b/broker/src/main/java/io/moquette/persistence/H2SessionsRepository.java @@ -15,6 +15,7 @@ class H2SessionsRepository implements ISessionsRepository { private static final byte SESSION_DATA_SERDES_V1 = 1; + private static final long UNDEFINED_INSTANT = -1; private final MVMap sessionMap; @@ -45,16 +46,16 @@ private final class SessionDataValueType extends BasicDataType { @Override public int getMemory(SessionData obj) { - return stringDataType.getMemory(obj.clientId()) + 8 + 1 + 8; + return stringDataType.getMemory(obj.clientId()) + 8 + 1 + 4; } @Override public void write(WriteBuffer buff, SessionData obj) { buff.put(SESSION_DATA_SERDES_V1); stringDataType.write(buff, obj.clientId()); - buff.putLong(obj.created().toEpochMilli()); + buff.putLong(obj.expiryInstant().orElse(UNDEFINED_INSTANT)); buff.put(obj.protocolVersion().protocolLevel()); - buff.putLong(obj.expiryInterval()); + buff.putInt(obj.expiryInterval()); } @Override @@ -64,11 +65,15 @@ public SessionData read(ByteBuffer buff) { throw new IllegalArgumentException("Unrecognized serialization version " + serDesVersion); } final String clientId = stringDataType.read(buff); - final long created = buff.getLong(); + final long expiresAt = buff.getLong(); final MqttVersion version = readMQTTVersion(buff.get()); - final long expiryInterval = buff.getLong(); + final int expiryInterval = buff.getInt(); - return new SessionData(clientId, Instant.ofEpochMilli(created), version, expiryInterval); + if (expiresAt == UNDEFINED_INSTANT) { + return new SessionData(clientId, version, expiryInterval); + } else { + return new SessionData(clientId, Instant.ofEpochMilli(expiresAt), version, expiryInterval); + } } @Override From 90b55426dd81ba43924641aa3e0bca3bc45c83d6 Mon Sep 17 00:00:00 2001 From: andsel Date: Fri, 3 Mar 2023 12:25:31 +0100 Subject: [PATCH 6/6] Replaces session semi-final fields in Session live object with SessionData --- .../main/java/io/moquette/broker/Session.java | 33 +++++++++---------- .../io/moquette/broker/SessionRegistry.java | 13 +++++--- .../java/io/moquette/broker/SessionTest.java | 9 +++-- 3 files changed, 30 insertions(+), 25 deletions(-) diff --git a/broker/src/main/java/io/moquette/broker/Session.java b/broker/src/main/java/io/moquette/broker/Session.java index d0ccb1b8f..9046fc7a5 100644 --- a/broker/src/main/java/io/moquette/broker/Session.java +++ b/broker/src/main/java/io/moquette/broker/Session.java @@ -22,16 +22,18 @@ import io.moquette.broker.subscriptions.Subscription; import io.moquette.broker.subscriptions.Topic; import io.netty.buffer.ByteBuf; -import io.netty.handler.codec.mqtt.*; +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttPublishMessage; +import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; +import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.util.ReferenceCountUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; import java.time.Duration; -import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.time.temporal.TemporalUnit; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -101,7 +103,6 @@ static final class Will { } } - private final String clientId; private boolean clean; private Will will; private final SessionMessageQueue sessionQueue; @@ -112,28 +113,24 @@ static final class Will { private final DelayQueue inflightTimeouts = new DelayQueue<>(); private final Map qos2Receiving = new HashMap<>(); private final AtomicInteger inflightSlots = new AtomicInteger(INFLIGHT_WINDOW_SIZE); // this should be configurable - private final Instant created; - private final int expiryInterval; + private final ISessionsRepository.SessionData data; - Session(String clientId, boolean clean, Will will, SessionMessageQueue sessionQueue) { - this(clientId, clean, sessionQueue); + Session(ISessionsRepository.SessionData data, boolean clean, Will will, SessionMessageQueue sessionQueue) { + this(data, clean, sessionQueue); this.will = will; } - Session(String clientId, boolean clean, SessionMessageQueue sessionQueue) { + Session(ISessionsRepository.SessionData data, boolean clean, SessionMessageQueue sessionQueue) { if (sessionQueue == null) { throw new IllegalArgumentException("sessionQueue parameter can't be null"); } - this.clientId = clientId; + this.data = data; this.clean = clean; this.sessionQueue = sessionQueue; - this.created = Instant.now(); - // in MQTT3 cleanSession = true means expiryInterval=0 else infinite - expiryInterval = clean ? 0 : INFINITE_EXPIRY; } public boolean expireImmediately() { - return expiryInterval == 0; + return data.expiryInterval() == 0; } void update(boolean clean, Will will) { @@ -166,7 +163,7 @@ public boolean connected() { } public String getClientID() { - return clientId; + return data.clientId(); } public List getSubscriptions() { @@ -178,7 +175,7 @@ public void addSubscriptions(List newSubscriptions) { } public void removeSubscription(Topic topic) { - subscriptions.remove(new Subscription(clientId, topic, MqttQoS.EXACTLY_ONCE)); + subscriptions.remove(new Subscription(data.clientId(), topic, MqttQoS.EXACTLY_ONCE)); } public boolean hasWill() { @@ -508,7 +505,7 @@ public void cleanUp() { @Override public String toString() { return "Session{" + - "clientId='" + clientId + '\'' + + "clientId='" + data.clientId() + '\'' + ", clean=" + clean + ", status=" + status + ", inflightSlots=" + inflightSlots + diff --git a/broker/src/main/java/io/moquette/broker/SessionRegistry.java b/broker/src/main/java/io/moquette/broker/SessionRegistry.java index a1629d685..435f938df 100644 --- a/broker/src/main/java/io/moquette/broker/SessionRegistry.java +++ b/broker/src/main/java/io/moquette/broker/SessionRegistry.java @@ -28,7 +28,6 @@ import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; -import java.time.Instant; import java.util.Arrays; import java.util.Collection; import java.util.Optional; @@ -140,7 +139,7 @@ private void recreateSessionPool() { if (queueRepository.containsQueue(session.clientId())) { final SessionMessageQueue persistentQueue = queueRepository.getOrCreateQueue(session.clientId()); queues.remove(session.clientId()); - Session rehydrated = new Session(session.clientId(), false, persistentQueue); + Session rehydrated = new Session(session, false, persistentQueue); pool.put(session.clientId(), rehydrated); } } @@ -235,15 +234,19 @@ private Session createNewSession(MqttConnectMessage msg, String clientId) { } else { queue = new InMemoryQueue(); } + // in MQTT3 cleanSession = true means expiryInterval=0 else infinite + final int expiryInterval = clean ? 0 : INFINITE_EXPIRY; + final ISessionsRepository.SessionData sessionData = new ISessionsRepository.SessionData(clientId, + MqttVersion.MQTT_3_1_1, expiryInterval); if (msg.variableHeader().isWillFlag()) { final Session.Will will = createWill(msg); - newSession = new Session(clientId, clean, will, queue); + newSession = new Session(sessionData, clean, will, queue); } else { - newSession = new Session(clientId, clean, queue); + newSession = new Session(sessionData, clean, queue); } newSession.markConnecting(); - sessionsRepository.saveSession(new ISessionsRepository.SessionData(clientId, MqttVersion.MQTT_3_1_1, INFINITE_EXPIRY)); + sessionsRepository.saveSession(sessionData); return newSession; } diff --git a/broker/src/test/java/io/moquette/broker/SessionTest.java b/broker/src/test/java/io/moquette/broker/SessionTest.java index e440f6cac..8d3b746fb 100644 --- a/broker/src/test/java/io/moquette/broker/SessionTest.java +++ b/broker/src/test/java/io/moquette/broker/SessionTest.java @@ -6,6 +6,7 @@ import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttVersion; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -13,7 +14,10 @@ import io.moquette.broker.subscriptions.Subscription; import java.util.Arrays; import org.assertj.core.api.Assertions; -import static org.junit.jupiter.api.Assertions.*; + +import static io.moquette.broker.Session.INFINITE_EXPIRY; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; public class SessionTest { @@ -27,7 +31,8 @@ public class SessionTest { public void setUp() { testChannel = new EmbeddedChannel(); queuedMessages = new InMemoryQueue(); - client = new Session(CLIENT_ID, true, null, queuedMessages); + final ISessionsRepository.SessionData data = new ISessionsRepository.SessionData(CLIENT_ID, MqttVersion.MQTT_3_1_1, INFINITE_EXPIRY); + client = new Session(data, true, null, queuedMessages); createConnection(client); }