Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement and use a Session data storage #721

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,12 @@
<version>${h2.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
<version>RELEASE</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
96 changes: 96 additions & 0 deletions broker/src/main/java/io/moquette/broker/ISessionsRepository.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package io.moquette.broker;

import io.netty.handler.codec.mqtt.MqttVersion;

import java.time.Instant;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;

/**
* Used to store data about persisted sessions like MQTT version, session's properties.
* */
public interface ISessionsRepository {

// Data class
final class SessionData {
private final String clientId;
private Instant expireAt = null;
final MqttVersion version;
private final int expiryInterval;

/**
* Construct a new SessionData without expiration set yet.
* */
public SessionData(String clientId, MqttVersion version, int expiryInterval) {
this.clientId = clientId;
this.version = version;
this.expiryInterval = expiryInterval;
}

/**
* Construct SessionData with an expiration instant, created by loading from the storage.
* */
public SessionData(String clientId, Instant expireAt, MqttVersion version, int expiryInterval) {
this.expiryInterval = expiryInterval;
Objects.requireNonNull(expireAt, "An expiration time is requested");
this.clientId = clientId;
this.expireAt = expireAt;
this.version = version;
}

public String clientId() {
return clientId;
}

public MqttVersion protocolVersion() {
return version;
}

public Optional<Instant> expireAt() {
return Optional.ofNullable(expireAt);
}

public Optional<Long> expiryInstant() {
return expireAt()
.map(Instant::toEpochMilli);
}

public int expiryInterval() {
return expiryInterval;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SessionData that = (SessionData) o;
return clientId.equals(that.clientId);
}

@Override
public int hashCode() {
return Objects.hash(clientId);
}

@Override
public String toString() {
return "SessionData{" +
"clientId='" + clientId + '\'' +
", expireAt=" + expireAt +
", version=" + version +
", expiryInterval=" + expiryInterval +
'}';
}
}

/**
* @return the full list of persisted sessions data.
* */
Collection<SessionData> list();

/**
* Save data composing a session, es MQTT version, creation date and properties but not queues or subscriptions.
* */
void saveSession(SessionData session);
}
6 changes: 5 additions & 1 deletion broker/src/main/java/io/moquette/broker/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.moquette.broker.unsafequeues.QueueException;
import io.moquette.interception.InterceptHandler;
import io.moquette.persistence.H2Builder;
import io.moquette.persistence.MemorySessionsRepository;
import io.moquette.persistence.MemorySubscriptionsRepository;
import io.moquette.interception.BrokerInterceptor;
import io.moquette.broker.subscriptions.CTrieSubscriptionDirectory;
Expand Down Expand Up @@ -192,6 +193,7 @@ public void startServer(IConfig config, List<? extends InterceptHandler> handler
authenticator = initializeAuthenticator(authenticator, config);
authorizatorPolicy = initializeAuthorizatorPolicy(authorizatorPolicy, config);

final ISessionsRepository sessionsRepository;
final ISubscriptionsRepository subscriptionsRepository;
final IQueueRepository queueRepository;
final IRetainedRepository retainedRepository;
Expand Down Expand Up @@ -225,17 +227,19 @@ public void startServer(IConfig config, List<? extends InterceptHandler> handler
LOG.trace("Configuring H2 subscriptions repository");
subscriptionsRepository = h2Builder.subscriptionsRepository();
retainedRepository = h2Builder.retainedRepository();
sessionsRepository = h2Builder.sessionsRepository();
} else {
LOG.trace("Configuring in-memory subscriptions store");
subscriptionsRepository = new MemorySubscriptionsRepository();
queueRepository = new MemoryQueueRepository();
retainedRepository = new MemoryRetainedRepository();
sessionsRepository = new MemorySessionsRepository();
}

ISubscriptionsDirectory subscriptions = new CTrieSubscriptionDirectory();
subscriptions.init(subscriptionsRepository);
final Authorizator authorizator = new Authorizator(authorizatorPolicy);
sessions = new SessionRegistry(subscriptions, queueRepository, authorizator);
sessions = new SessionRegistry(subscriptions, sessionsRepository, queueRepository, authorizator);
final int sessionQueueSize = config.intProp(BrokerConstants.SESSION_QUEUE_SIZE, 1024);
dispatcher = new PostOffice(subscriptions, retainedRepository, sessions, interceptor, authorizator,
sessionQueueSize);
Expand Down
35 changes: 19 additions & 16 deletions broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,18 @@
import io.moquette.broker.subscriptions.Subscription;
import io.moquette.broker.subscriptions.Topic;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.*;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.time.Instant;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -46,6 +51,9 @@
class Session {

private static final Logger LOG = LoggerFactory.getLogger(Session.class);
// By specification session expiry value of 0xFFFFFFFF (UINT_MAX) (seconds) means
// session that doesn't expire, it's ~136 years, we can set a cap at 100 year
static final int INFINITE_EXPIRY = (int) Duration.ofDays(80 * 365).toMillis() / 1000;

static class InFlightPacket implements Delayed {

Expand Down Expand Up @@ -95,7 +103,6 @@ static final class Will {
}
}

private final String clientId;
private boolean clean;
private Will will;
private final SessionMessageQueue<SessionRegistry.EnqueuedMessage> sessionQueue;
Expand All @@ -106,28 +113,24 @@ static final class Will {
private final DelayQueue<InFlightPacket> inflightTimeouts = new DelayQueue<>();
private final Map<Integer, MqttPublishMessage> qos2Receiving = new HashMap<>();
private final AtomicInteger inflightSlots = new AtomicInteger(INFLIGHT_WINDOW_SIZE); // this should be configurable
private final Instant created;
private final int expiryInterval;
private final ISessionsRepository.SessionData data;

Session(String clientId, boolean clean, Will will, SessionMessageQueue<SessionRegistry.EnqueuedMessage> sessionQueue) {
this(clientId, clean, sessionQueue);
Session(ISessionsRepository.SessionData data, boolean clean, Will will, SessionMessageQueue<SessionRegistry.EnqueuedMessage> sessionQueue) {
this(data, clean, sessionQueue);
this.will = will;
}

Session(String clientId, boolean clean, SessionMessageQueue<SessionRegistry.EnqueuedMessage> sessionQueue) {
Session(ISessionsRepository.SessionData data, boolean clean, SessionMessageQueue<SessionRegistry.EnqueuedMessage> sessionQueue) {
if (sessionQueue == null) {
throw new IllegalArgumentException("sessionQueue parameter can't be null");
}
this.clientId = clientId;
this.data = data;
this.clean = clean;
this.sessionQueue = sessionQueue;
this.created = Instant.now();
// in MQTT3 cleanSession = true means expiryInterval=0 else infinite
expiryInterval = clean ? 0 : 0xFFFFFFFF;
}

public boolean expireImmediately() {
return expiryInterval == 0;
return data.expiryInterval() == 0;
}

void update(boolean clean, Will will) {
Expand Down Expand Up @@ -160,7 +163,7 @@ public boolean connected() {
}

public String getClientID() {
return clientId;
return data.clientId();
}

public List<Subscription> getSubscriptions() {
Expand All @@ -172,7 +175,7 @@ public void addSubscriptions(List<Subscription> newSubscriptions) {
}

public void removeSubscription(Topic topic) {
subscriptions.remove(new Subscription(clientId, topic, MqttQoS.EXACTLY_ONCE));
subscriptions.remove(new Subscription(data.clientId(), topic, MqttQoS.EXACTLY_ONCE));
}

public boolean hasWill() {
Expand Down Expand Up @@ -502,7 +505,7 @@ public void cleanUp() {
@Override
public String toString() {
return "Session{" +
"clientId='" + clientId + '\'' +
"clientId='" + data.clientId() + '\'' +
", clean=" + clean +
", status=" + status +
", inflightSlots=" + inflightSlots +
Expand Down
27 changes: 19 additions & 8 deletions broker/src/main/java/io/moquette/broker/SessionRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -35,6 +36,8 @@
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

import static io.moquette.broker.Session.INFINITE_EXPIRY;

public class SessionRegistry {

public abstract static class EnqueuedMessage {
Expand Down Expand Up @@ -114,27 +117,30 @@ public SessionCreationResult(Session session, CreationModeEnum mode, boolean alr

private final ConcurrentMap<String, Session> pool = new ConcurrentHashMap<>();
private final ISubscriptionsDirectory subscriptionsDirectory;
private final ISessionsRepository sessionsRepository;
private final IQueueRepository queueRepository;
private final Authorizator authorizator;

SessionRegistry(ISubscriptionsDirectory subscriptionsDirectory,
ISessionsRepository sessionsRepository,
IQueueRepository queueRepository,
Authorizator authorizator) {
this.subscriptionsDirectory = subscriptionsDirectory;
this.sessionsRepository = sessionsRepository;
this.queueRepository = queueRepository;
this.authorizator = authorizator;
recreateSessionPool();
}

private void recreateSessionPool() {
final Set<String> queues = queueRepository.listQueueNames();
for (String clientId : subscriptionsDirectory.listAllSessionIds()) {
for (ISessionsRepository.SessionData session : sessionsRepository.list()) {
// if the subscriptions are present is obviously false
if (queueRepository.containsQueue(clientId)) {
final SessionMessageQueue<EnqueuedMessage> persistentQueue = queueRepository.getOrCreateQueue(clientId);
queues.remove(clientId);
Session rehydrated = new Session(clientId, false, persistentQueue);
pool.put(clientId, rehydrated);
if (queueRepository.containsQueue(session.clientId())) {
final SessionMessageQueue<EnqueuedMessage> persistentQueue = queueRepository.getOrCreateQueue(session.clientId());
queues.remove(session.clientId());
Session rehydrated = new Session(session, false, persistentQueue);
pool.put(session.clientId(), rehydrated);
}
}
if (!queues.isEmpty()) {
Expand Down Expand Up @@ -228,14 +234,19 @@ private Session createNewSession(MqttConnectMessage msg, String clientId) {
} else {
queue = new InMemoryQueue();
}
// in MQTT3 cleanSession = true means expiryInterval=0 else infinite
final int expiryInterval = clean ? 0 : INFINITE_EXPIRY;
final ISessionsRepository.SessionData sessionData = new ISessionsRepository.SessionData(clientId,
MqttVersion.MQTT_3_1_1, expiryInterval);
if (msg.variableHeader().isWillFlag()) {
final Session.Will will = createWill(msg);
newSession = new Session(clientId, clean, will, queue);
newSession = new Session(sessionData, clean, will, queue);
} else {
newSession = new Session(clientId, clean, queue);
newSession = new Session(sessionData, clean, queue);
}

newSession.markConnecting();
sessionsRepository.saveSession(sessionData);
return newSession;
}

Expand Down
5 changes: 5 additions & 0 deletions broker/src/main/java/io/moquette/persistence/H2Builder.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.moquette.broker.IQueueRepository;
import io.moquette.broker.IRetainedRepository;
import io.moquette.broker.ISessionsRepository;
import io.moquette.broker.ISubscriptionsRepository;
import org.h2.mvstore.MVStore;
import org.slf4j.Logger;
Expand Down Expand Up @@ -72,4 +73,8 @@ public IQueueRepository queueRepository() {
public IRetainedRepository retainedRepository() {
return new H2RetainedRepository(mvStore);
}

public ISessionsRepository sessionsRepository() {
return new H2SessionsRepository(mvStore);
}
}
Loading