diff --git a/broker/pom.xml b/broker/pom.xml index 64911facc..172100370 100644 --- a/broker/pom.xml +++ b/broker/pom.xml @@ -167,6 +167,12 @@ ${h2.version} test + + org.jetbrains + annotations + RELEASE + test + diff --git a/broker/src/main/java/io/moquette/broker/ISessionsRepository.java b/broker/src/main/java/io/moquette/broker/ISessionsRepository.java new file mode 100644 index 000000000..d172a2d28 --- /dev/null +++ b/broker/src/main/java/io/moquette/broker/ISessionsRepository.java @@ -0,0 +1,96 @@ +package io.moquette.broker; + +import io.netty.handler.codec.mqtt.MqttVersion; + +import java.time.Instant; +import java.util.Collection; +import java.util.Objects; +import java.util.Optional; + +/** + * Used to store data about persisted sessions like MQTT version, session's properties. + * */ +public interface ISessionsRepository { + + // Data class + final class SessionData { + private final String clientId; + private Instant expireAt = null; + final MqttVersion version; + private final int expiryInterval; + + /** + * Construct a new SessionData without expiration set yet. + * */ + public SessionData(String clientId, MqttVersion version, int expiryInterval) { + this.clientId = clientId; + this.version = version; + this.expiryInterval = expiryInterval; + } + + /** + * Construct SessionData with an expiration instant, created by loading from the storage. + * */ + public SessionData(String clientId, Instant expireAt, MqttVersion version, int expiryInterval) { + this.expiryInterval = expiryInterval; + Objects.requireNonNull(expireAt, "An expiration time is requested"); + this.clientId = clientId; + this.expireAt = expireAt; + this.version = version; + } + + public String clientId() { + return clientId; + } + + public MqttVersion protocolVersion() { + return version; + } + + public Optional expireAt() { + return Optional.ofNullable(expireAt); + } + + public Optional expiryInstant() { + return expireAt() + .map(Instant::toEpochMilli); + } + + public int expiryInterval() { + return expiryInterval; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SessionData that = (SessionData) o; + return clientId.equals(that.clientId); + } + + @Override + public int hashCode() { + return Objects.hash(clientId); + } + + @Override + public String toString() { + return "SessionData{" + + "clientId='" + clientId + '\'' + + ", expireAt=" + expireAt + + ", version=" + version + + ", expiryInterval=" + expiryInterval + + '}'; + } + } + + /** + * @return the full list of persisted sessions data. + * */ + Collection list(); + + /** + * Save data composing a session, es MQTT version, creation date and properties but not queues or subscriptions. + * */ + void saveSession(SessionData session); +} diff --git a/broker/src/main/java/io/moquette/broker/Server.java b/broker/src/main/java/io/moquette/broker/Server.java index 3bbaf6f5d..d00ed8164 100644 --- a/broker/src/main/java/io/moquette/broker/Server.java +++ b/broker/src/main/java/io/moquette/broker/Server.java @@ -31,6 +31,7 @@ import io.moquette.broker.unsafequeues.QueueException; import io.moquette.interception.InterceptHandler; import io.moquette.persistence.H2Builder; +import io.moquette.persistence.MemorySessionsRepository; import io.moquette.persistence.MemorySubscriptionsRepository; import io.moquette.interception.BrokerInterceptor; import io.moquette.broker.subscriptions.CTrieSubscriptionDirectory; @@ -192,6 +193,7 @@ public void startServer(IConfig config, List handler authenticator = initializeAuthenticator(authenticator, config); authorizatorPolicy = initializeAuthorizatorPolicy(authorizatorPolicy, config); + final ISessionsRepository sessionsRepository; final ISubscriptionsRepository subscriptionsRepository; final IQueueRepository queueRepository; final IRetainedRepository retainedRepository; @@ -225,17 +227,19 @@ public void startServer(IConfig config, List handler LOG.trace("Configuring H2 subscriptions repository"); subscriptionsRepository = h2Builder.subscriptionsRepository(); retainedRepository = h2Builder.retainedRepository(); + sessionsRepository = h2Builder.sessionsRepository(); } else { LOG.trace("Configuring in-memory subscriptions store"); subscriptionsRepository = new MemorySubscriptionsRepository(); queueRepository = new MemoryQueueRepository(); retainedRepository = new MemoryRetainedRepository(); + sessionsRepository = new MemorySessionsRepository(); } ISubscriptionsDirectory subscriptions = new CTrieSubscriptionDirectory(); subscriptions.init(subscriptionsRepository); final Authorizator authorizator = new Authorizator(authorizatorPolicy); - sessions = new SessionRegistry(subscriptions, queueRepository, authorizator); + sessions = new SessionRegistry(subscriptions, sessionsRepository, queueRepository, authorizator); final int sessionQueueSize = config.intProp(BrokerConstants.SESSION_QUEUE_SIZE, 1024); dispatcher = new PostOffice(subscriptions, retainedRepository, sessions, interceptor, authorizator, sessionQueueSize); diff --git a/broker/src/main/java/io/moquette/broker/Session.java b/broker/src/main/java/io/moquette/broker/Session.java index f5b1561be..9046fc7a5 100644 --- a/broker/src/main/java/io/moquette/broker/Session.java +++ b/broker/src/main/java/io/moquette/broker/Session.java @@ -22,13 +22,18 @@ import io.moquette.broker.subscriptions.Subscription; import io.moquette.broker.subscriptions.Topic; import io.netty.buffer.ByteBuf; -import io.netty.handler.codec.mqtt.*; +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttPublishMessage; +import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; +import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.util.ReferenceCountUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; -import java.time.Instant; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -46,6 +51,9 @@ class Session { private static final Logger LOG = LoggerFactory.getLogger(Session.class); + // By specification session expiry value of 0xFFFFFFFF (UINT_MAX) (seconds) means + // session that doesn't expire, it's ~136 years, we can set a cap at 100 year + static final int INFINITE_EXPIRY = (int) Duration.ofDays(80 * 365).toMillis() / 1000; static class InFlightPacket implements Delayed { @@ -95,7 +103,6 @@ static final class Will { } } - private final String clientId; private boolean clean; private Will will; private final SessionMessageQueue sessionQueue; @@ -106,28 +113,24 @@ static final class Will { private final DelayQueue inflightTimeouts = new DelayQueue<>(); private final Map qos2Receiving = new HashMap<>(); private final AtomicInteger inflightSlots = new AtomicInteger(INFLIGHT_WINDOW_SIZE); // this should be configurable - private final Instant created; - private final int expiryInterval; + private final ISessionsRepository.SessionData data; - Session(String clientId, boolean clean, Will will, SessionMessageQueue sessionQueue) { - this(clientId, clean, sessionQueue); + Session(ISessionsRepository.SessionData data, boolean clean, Will will, SessionMessageQueue sessionQueue) { + this(data, clean, sessionQueue); this.will = will; } - Session(String clientId, boolean clean, SessionMessageQueue sessionQueue) { + Session(ISessionsRepository.SessionData data, boolean clean, SessionMessageQueue sessionQueue) { if (sessionQueue == null) { throw new IllegalArgumentException("sessionQueue parameter can't be null"); } - this.clientId = clientId; + this.data = data; this.clean = clean; this.sessionQueue = sessionQueue; - this.created = Instant.now(); - // in MQTT3 cleanSession = true means expiryInterval=0 else infinite - expiryInterval = clean ? 0 : 0xFFFFFFFF; } public boolean expireImmediately() { - return expiryInterval == 0; + return data.expiryInterval() == 0; } void update(boolean clean, Will will) { @@ -160,7 +163,7 @@ public boolean connected() { } public String getClientID() { - return clientId; + return data.clientId(); } public List getSubscriptions() { @@ -172,7 +175,7 @@ public void addSubscriptions(List newSubscriptions) { } public void removeSubscription(Topic topic) { - subscriptions.remove(new Subscription(clientId, topic, MqttQoS.EXACTLY_ONCE)); + subscriptions.remove(new Subscription(data.clientId(), topic, MqttQoS.EXACTLY_ONCE)); } public boolean hasWill() { @@ -502,7 +505,7 @@ public void cleanUp() { @Override public String toString() { return "Session{" + - "clientId='" + clientId + '\'' + + "clientId='" + data.clientId() + '\'' + ", clean=" + clean + ", status=" + status + ", inflightSlots=" + inflightSlots + diff --git a/broker/src/main/java/io/moquette/broker/SessionRegistry.java b/broker/src/main/java/io/moquette/broker/SessionRegistry.java index aba4f8d83..435f938df 100644 --- a/broker/src/main/java/io/moquette/broker/SessionRegistry.java +++ b/broker/src/main/java/io/moquette/broker/SessionRegistry.java @@ -23,6 +23,7 @@ import io.netty.buffer.Unpooled; import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttVersion; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,6 +36,8 @@ import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; +import static io.moquette.broker.Session.INFINITE_EXPIRY; + public class SessionRegistry { public abstract static class EnqueuedMessage { @@ -114,13 +117,16 @@ public SessionCreationResult(Session session, CreationModeEnum mode, boolean alr private final ConcurrentMap pool = new ConcurrentHashMap<>(); private final ISubscriptionsDirectory subscriptionsDirectory; + private final ISessionsRepository sessionsRepository; private final IQueueRepository queueRepository; private final Authorizator authorizator; SessionRegistry(ISubscriptionsDirectory subscriptionsDirectory, + ISessionsRepository sessionsRepository, IQueueRepository queueRepository, Authorizator authorizator) { this.subscriptionsDirectory = subscriptionsDirectory; + this.sessionsRepository = sessionsRepository; this.queueRepository = queueRepository; this.authorizator = authorizator; recreateSessionPool(); @@ -128,13 +134,13 @@ public SessionCreationResult(Session session, CreationModeEnum mode, boolean alr private void recreateSessionPool() { final Set queues = queueRepository.listQueueNames(); - for (String clientId : subscriptionsDirectory.listAllSessionIds()) { + for (ISessionsRepository.SessionData session : sessionsRepository.list()) { // if the subscriptions are present is obviously false - if (queueRepository.containsQueue(clientId)) { - final SessionMessageQueue persistentQueue = queueRepository.getOrCreateQueue(clientId); - queues.remove(clientId); - Session rehydrated = new Session(clientId, false, persistentQueue); - pool.put(clientId, rehydrated); + if (queueRepository.containsQueue(session.clientId())) { + final SessionMessageQueue persistentQueue = queueRepository.getOrCreateQueue(session.clientId()); + queues.remove(session.clientId()); + Session rehydrated = new Session(session, false, persistentQueue); + pool.put(session.clientId(), rehydrated); } } if (!queues.isEmpty()) { @@ -228,14 +234,19 @@ private Session createNewSession(MqttConnectMessage msg, String clientId) { } else { queue = new InMemoryQueue(); } + // in MQTT3 cleanSession = true means expiryInterval=0 else infinite + final int expiryInterval = clean ? 0 : INFINITE_EXPIRY; + final ISessionsRepository.SessionData sessionData = new ISessionsRepository.SessionData(clientId, + MqttVersion.MQTT_3_1_1, expiryInterval); if (msg.variableHeader().isWillFlag()) { final Session.Will will = createWill(msg); - newSession = new Session(clientId, clean, will, queue); + newSession = new Session(sessionData, clean, will, queue); } else { - newSession = new Session(clientId, clean, queue); + newSession = new Session(sessionData, clean, queue); } newSession.markConnecting(); + sessionsRepository.saveSession(sessionData); return newSession; } diff --git a/broker/src/main/java/io/moquette/persistence/H2Builder.java b/broker/src/main/java/io/moquette/persistence/H2Builder.java index bb6c33bae..d2dd66d42 100644 --- a/broker/src/main/java/io/moquette/persistence/H2Builder.java +++ b/broker/src/main/java/io/moquette/persistence/H2Builder.java @@ -2,6 +2,7 @@ import io.moquette.broker.IQueueRepository; import io.moquette.broker.IRetainedRepository; +import io.moquette.broker.ISessionsRepository; import io.moquette.broker.ISubscriptionsRepository; import org.h2.mvstore.MVStore; import org.slf4j.Logger; @@ -72,4 +73,8 @@ public IQueueRepository queueRepository() { public IRetainedRepository retainedRepository() { return new H2RetainedRepository(mvStore); } + + public ISessionsRepository sessionsRepository() { + return new H2SessionsRepository(mvStore); + } } diff --git a/broker/src/main/java/io/moquette/persistence/H2SessionsRepository.java b/broker/src/main/java/io/moquette/persistence/H2SessionsRepository.java new file mode 100644 index 000000000..e28e551bf --- /dev/null +++ b/broker/src/main/java/io/moquette/persistence/H2SessionsRepository.java @@ -0,0 +1,96 @@ +package io.moquette.persistence; + +import io.moquette.broker.ISessionsRepository; +import io.netty.handler.codec.mqtt.MqttVersion; +import org.h2.mvstore.MVMap; +import org.h2.mvstore.MVStore; +import org.h2.mvstore.WriteBuffer; +import org.h2.mvstore.type.BasicDataType; +import org.h2.mvstore.type.StringDataType; + +import java.nio.ByteBuffer; +import java.time.Instant; +import java.util.Collection; + +class H2SessionsRepository implements ISessionsRepository { + + private static final byte SESSION_DATA_SERDES_V1 = 1; + private static final long UNDEFINED_INSTANT = -1; + + private final MVMap sessionMap; + + public H2SessionsRepository(MVStore mvStore) { + final MVMap.Builder sessionTypeBuilder = + new MVMap.Builder() + .valueType(new SessionDataValueType()); + + this.sessionMap = mvStore.openMap("sessions_store", sessionTypeBuilder); + } + + @Override + public Collection list() { + return sessionMap.values(); + } + + @Override + public void saveSession(SessionData session) { + sessionMap.put(session.clientId(), session); + } + + /** + * Codec data type to load and store SessionData instances + */ + private final class SessionDataValueType extends BasicDataType { + + private final StringDataType stringDataType = new StringDataType(); + + @Override + public int getMemory(SessionData obj) { + return stringDataType.getMemory(obj.clientId()) + 8 + 1 + 4; + } + + @Override + public void write(WriteBuffer buff, SessionData obj) { + buff.put(SESSION_DATA_SERDES_V1); + stringDataType.write(buff, obj.clientId()); + buff.putLong(obj.expiryInstant().orElse(UNDEFINED_INSTANT)); + buff.put(obj.protocolVersion().protocolLevel()); + buff.putInt(obj.expiryInterval()); + } + + @Override + public SessionData read(ByteBuffer buff) { + final byte serDesVersion = buff.get(); + if (serDesVersion != SESSION_DATA_SERDES_V1) { + throw new IllegalArgumentException("Unrecognized serialization version " + serDesVersion); + } + final String clientId = stringDataType.read(buff); + final long expiresAt = buff.getLong(); + final MqttVersion version = readMQTTVersion(buff.get()); + final int expiryInterval = buff.getInt(); + + if (expiresAt == UNDEFINED_INSTANT) { + return new SessionData(clientId, version, expiryInterval); + } else { + return new SessionData(clientId, Instant.ofEpochMilli(expiresAt), version, expiryInterval); + } + } + + @Override + public SessionData[] createStorage(int i) { + return new SessionData[i]; + } + } + + private MqttVersion readMQTTVersion(byte rawVersion) { + final MqttVersion version; + switch (rawVersion) { + case 3: version = MqttVersion.MQTT_3_1; break; + case 4: version = MqttVersion.MQTT_3_1_1; break; + case 5: version = MqttVersion.MQTT_5; break; + default: + throw new IllegalArgumentException("Unrecognized MQTT version value " + rawVersion); + } + return version; + } +} diff --git a/broker/src/main/java/io/moquette/persistence/MemorySessionsRepository.java b/broker/src/main/java/io/moquette/persistence/MemorySessionsRepository.java new file mode 100644 index 000000000..bb2c11c0c --- /dev/null +++ b/broker/src/main/java/io/moquette/persistence/MemorySessionsRepository.java @@ -0,0 +1,22 @@ +package io.moquette.persistence; + +import io.moquette.broker.ISessionsRepository; + +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class MemorySessionsRepository implements ISessionsRepository { + + private final ConcurrentMap sessions = new ConcurrentHashMap<>(); + + @Override + public Collection list() { + return sessions.values(); + } + + @Override + public void saveSession(SessionData session) { + sessions.put(session.clientId(), session); + } +} diff --git a/broker/src/test/java/io/moquette/broker/MQTTConnectionConnectTest.java b/broker/src/test/java/io/moquette/broker/MQTTConnectionConnectTest.java index 08e2358d0..8f37c8cc6 100644 --- a/broker/src/test/java/io/moquette/broker/MQTTConnectionConnectTest.java +++ b/broker/src/test/java/io/moquette/broker/MQTTConnectionConnectTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.ExecutionException; +import static io.moquette.broker.MQTTConnectionPublishTest.memorySessionsRepository; import static io.moquette.broker.NettyChannelAssertions.assertEqualsConnAck; import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*; import static java.util.Collections.singleton; @@ -70,7 +71,7 @@ public void setUp() { final PermitAllAuthorizatorPolicy authorizatorPolicy = new PermitAllAuthorizatorPolicy(); final Authorizator permitAll = new Authorizator(authorizatorPolicy); - sessionRegistry = new SessionRegistry(subscriptions, queueRepository, permitAll); + sessionRegistry = new SessionRegistry(subscriptions, memorySessionsRepository(), queueRepository, permitAll); postOffice = new PostOffice(subscriptions, new MemoryRetainedRepository(), sessionRegistry, ConnectionTestUtils.NO_OBSERVERS_INTERCEPTOR, permitAll, 1024); diff --git a/broker/src/test/java/io/moquette/broker/MQTTConnectionPublishTest.java b/broker/src/test/java/io/moquette/broker/MQTTConnectionPublishTest.java index 7d605de87..593f96dbc 100644 --- a/broker/src/test/java/io/moquette/broker/MQTTConnectionPublishTest.java +++ b/broker/src/test/java/io/moquette/broker/MQTTConnectionPublishTest.java @@ -19,6 +19,7 @@ import io.moquette.broker.subscriptions.CTrieSubscriptionDirectory; import io.moquette.broker.subscriptions.ISubscriptionsDirectory; import io.moquette.broker.security.IAuthenticator; +import io.moquette.persistence.MemorySessionsRepository; import io.moquette.persistence.MemorySubscriptionsRepository; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -28,6 +29,7 @@ import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttVersion; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -76,12 +78,17 @@ private MQTTConnection createMQTTConnection(BrokerConfiguration config, Channel final PermitAllAuthorizatorPolicy authorizatorPolicy = new PermitAllAuthorizatorPolicy(); final Authorizator permitAll = new Authorizator(authorizatorPolicy); - sessionRegistry = new SessionRegistry(subscriptions, queueRepository, permitAll); + sessionRegistry = new SessionRegistry(subscriptions, memorySessionsRepository(), queueRepository, permitAll); final PostOffice postOffice = new PostOffice(subscriptions, new MemoryRetainedRepository(), sessionRegistry, ConnectionTestUtils.NO_OBSERVERS_INTERCEPTOR, permitAll, 1024); return new MQTTConnection(channel, config, mockAuthenticator, sessionRegistry, postOffice); } + @NotNull + static ISessionsRepository memorySessionsRepository() { + return new MemorySessionsRepository(); + } + @Test public void dropConnectionOnPublishWithInvalidTopicFormat() throws ExecutionException, InterruptedException { // Connect message with clean session set to true and client id is null. diff --git a/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java b/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java index 2f82d4925..181ada1d3 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java @@ -32,6 +32,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import static io.moquette.broker.MQTTConnectionPublishTest.memorySessionsRepository; import static io.moquette.broker.PostOfficeUnsubscribeTest.CONFIG; import static io.netty.handler.codec.mqtt.MqttQoS.*; import static java.nio.charset.StandardCharsets.UTF_8; @@ -89,7 +90,7 @@ private void initPostOfficeAndSubsystems() { final PermitAllAuthorizatorPolicy authorizatorPolicy = new PermitAllAuthorizatorPolicy(); final Authorizator permitAll = new Authorizator(authorizatorPolicy); - sessionRegistry = new SessionRegistry(subscriptions, queueRepository, permitAll); + sessionRegistry = new SessionRegistry(subscriptions, memorySessionsRepository(), queueRepository, permitAll); sut = new PostOffice(subscriptions, retainedRepository, sessionRegistry, ConnectionTestUtils.NO_OBSERVERS_INTERCEPTOR, permitAll, 1024); } diff --git a/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java b/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java index 6a8ed3bb6..a88e6a727 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java @@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static io.moquette.broker.MQTTConnectionPublishTest.memorySessionsRepository; import static io.moquette.broker.PostOfficeUnsubscribeTest.CONFIG; import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE; import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; @@ -103,7 +104,7 @@ private void initPostOfficeAndSubsystems() { final PermitAllAuthorizatorPolicy authorizatorPolicy = new PermitAllAuthorizatorPolicy(); final Authorizator permitAll = new Authorizator(authorizatorPolicy); - sessionRegistry = new SessionRegistry(subscriptions, queueRepository, permitAll); + sessionRegistry = new SessionRegistry(subscriptions, memorySessionsRepository(), queueRepository, permitAll); sut = new PostOffice(subscriptions, retainedRepository, sessionRegistry, ConnectionTestUtils.NO_OBSERVERS_INTERCEPTOR, permitAll, 1024); } diff --git a/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java b/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java index 5318d6ca3..2b8417161 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java @@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static io.moquette.broker.MQTTConnectionPublishTest.memorySessionsRepository; import static io.moquette.broker.PostOfficePublishTest.ALLOW_ANONYMOUS_AND_ZERO_BYTES_CLID; import static io.moquette.broker.PostOfficePublishTest.SUBSCRIBER_ID; import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED; @@ -95,7 +96,7 @@ private void prepareSUT() { final PermitAllAuthorizatorPolicy authorizatorPolicy = new PermitAllAuthorizatorPolicy(); final Authorizator permitAll = new Authorizator(authorizatorPolicy); - sessionRegistry = new SessionRegistry(subscriptions, queueRepository, permitAll); + sessionRegistry = new SessionRegistry(subscriptions, memorySessionsRepository(), queueRepository, permitAll); sut = new PostOffice(subscriptions, new MemoryRetainedRepository(), sessionRegistry, ConnectionTestUtils.NO_OBSERVERS_INTERCEPTOR, permitAll, 1024); } diff --git a/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java b/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java index 579a64fa9..8a7d68a94 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java @@ -27,7 +27,6 @@ import io.netty.channel.Channel; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.mqtt.*; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -38,6 +37,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static io.moquette.broker.MQTTConnectionPublishTest.memorySessionsRepository; import static io.moquette.broker.PostOfficePublishTest.PUBLISHER_ID; import static io.netty.handler.codec.mqtt.MqttQoS.*; import static java.util.Collections.*; @@ -87,7 +87,7 @@ private void prepareSUT() { final PermitAllAuthorizatorPolicy authorizatorPolicy = new PermitAllAuthorizatorPolicy(); final Authorizator permitAll = new Authorizator(authorizatorPolicy); - sessionRegistry = new SessionRegistry(subscriptions, queueRepository, permitAll); + sessionRegistry = new SessionRegistry(subscriptions, memorySessionsRepository(), queueRepository, permitAll); sut = new PostOffice(subscriptions, new MemoryRetainedRepository(), sessionRegistry, ConnectionTestUtils.NO_OBSERVERS_INTERCEPTOR, permitAll, 1024); } diff --git a/broker/src/test/java/io/moquette/broker/SessionRegistryTest.java b/broker/src/test/java/io/moquette/broker/SessionRegistryTest.java index 78d1c9f8f..36bda39be 100644 --- a/broker/src/test/java/io/moquette/broker/SessionRegistryTest.java +++ b/broker/src/test/java/io/moquette/broker/SessionRegistryTest.java @@ -40,6 +40,7 @@ import java.nio.charset.StandardCharsets; import java.util.concurrent.ExecutionException; +import static io.moquette.broker.MQTTConnectionPublishTest.memorySessionsRepository; import static io.moquette.broker.NettyChannelAssertions.assertEqualsConnAck; import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED; import static java.util.Collections.singleton; @@ -85,7 +86,7 @@ private MQTTConnection createMQTTConnection(BrokerConfiguration config, Channel final PermitAllAuthorizatorPolicy authorizatorPolicy = new PermitAllAuthorizatorPolicy(); final Authorizator permitAll = new Authorizator(authorizatorPolicy); - sut = new SessionRegistry(subscriptions, queueRepository, permitAll); + sut = new SessionRegistry(subscriptions, memorySessionsRepository(), queueRepository, permitAll); final PostOffice postOffice = new PostOffice(subscriptions, new MemoryRetainedRepository(), sut, ConnectionTestUtils.NO_OBSERVERS_INTERCEPTOR, permitAll, 1024); return new MQTTConnection(channel, config, mockAuthenticator, sut, postOffice); diff --git a/broker/src/test/java/io/moquette/broker/SessionTest.java b/broker/src/test/java/io/moquette/broker/SessionTest.java index e440f6cac..8d3b746fb 100644 --- a/broker/src/test/java/io/moquette/broker/SessionTest.java +++ b/broker/src/test/java/io/moquette/broker/SessionTest.java @@ -6,6 +6,7 @@ import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttVersion; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -13,7 +14,10 @@ import io.moquette.broker.subscriptions.Subscription; import java.util.Arrays; import org.assertj.core.api.Assertions; -import static org.junit.jupiter.api.Assertions.*; + +import static io.moquette.broker.Session.INFINITE_EXPIRY; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; public class SessionTest { @@ -27,7 +31,8 @@ public class SessionTest { public void setUp() { testChannel = new EmbeddedChannel(); queuedMessages = new InMemoryQueue(); - client = new Session(CLIENT_ID, true, null, queuedMessages); + final ISessionsRepository.SessionData data = new ISessionsRepository.SessionData(CLIENT_ID, MqttVersion.MQTT_3_1_1, INFINITE_EXPIRY); + client = new Session(data, true, null, queuedMessages); createConnection(client); }