Skip to content

Commit

Permalink
1883 in a weak network environment the mqtt broker is unable to detec…
Browse files Browse the repository at this point in the history
…t disconnections (#1891)

* Add new setting

* Align keep alive handling with RFC

Only completely sent packets are treated as keep alive

* Update ReleaseNotes.md
  • Loading branch information
chkr1011 authored Dec 7, 2023
1 parent f349901 commit 4df0698
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 60 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ReleaseNotes.md
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
* [Server] Added new events for delivered and dropped messages (#1866, thanks to @kallayj).
* [Server] The server will no longer treat a client which is receiving a large payload as alive. The packet must be received completely within the keep alive boundaries (BREAKING CHANGE!, #1883).
12 changes: 1 addition & 11 deletions Source/MQTTnet.AspnetCore/MqttConnectionContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,6 @@ public string Endpoint
}
}

public bool IsReadingPacket { get; private set; }

public bool IsSecureConnection
{
get
Expand Down Expand Up @@ -164,11 +162,6 @@ public async Task<MqttPacket> ReceivePacketAsync(CancellationToken cancellationT
BytesReceived += received;
return packet;
}
else
{
// we did receive something but the message is not yet complete
IsReadingPacket = true;
}
}
else if (readResult.IsCompleted)
{
Expand All @@ -189,12 +182,9 @@ public async Task<MqttPacket> ReceivePacketAsync(CancellationToken cancellationT
// completing the channel makes sure that there is no more data read after a protocol error
_input?.Complete(exception);
_output?.Complete(exception);

throw;
}
finally
{
IsReadingPacket = false;
}

cancellationToken.ThrowIfCancellationRequested();
return null;
Expand Down
2 changes: 0 additions & 2 deletions Source/MQTTnet/Adapter/IMqttChannelAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ public interface IMqttChannelAdapter : IDisposable

long BytesReceived { get; }

bool IsReadingPacket { get; }

Task ConnectAsync(CancellationToken cancellationToken);

Task DisconnectAsync(CancellationToken cancellationToken);
Expand Down
66 changes: 28 additions & 38 deletions Source/MQTTnet/Adapter/MqttChannelAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ public MqttChannelAdapter(IMqttChannel channel, MqttPacketFormatterAdapter packe

public string Endpoint => _channel.Endpoint;

public bool IsReadingPacket { get; private set; }

public bool IsSecureConnection => _channel.IsSecureConnection;

public MqttPacketFormatterAdapter PacketFormatterAdapter { get; }
Expand Down Expand Up @@ -389,53 +387,45 @@ async Task<ReceivedMqttPacket> ReceiveAsync(CancellationToken cancellationToken)
return ReceivedMqttPacket.Empty;
}

IsReadingPacket = true;
try
var fixedHeader = readFixedHeaderResult.FixedHeader;
if (fixedHeader.RemainingLength == 0)
{
var fixedHeader = readFixedHeaderResult.FixedHeader;
if (fixedHeader.RemainingLength == 0)
{
return new ReceivedMqttPacket(fixedHeader.Flags, EmptyBuffer.ArraySegment, 2);
}
return new ReceivedMqttPacket(fixedHeader.Flags, EmptyBuffer.ArraySegment, 2);
}

var bodyLength = fixedHeader.RemainingLength;
var body = new byte[bodyLength];
var bodyLength = fixedHeader.RemainingLength;
var body = new byte[bodyLength];

var bodyOffset = 0;
var chunkSize = Math.Min(ReadBufferSize, bodyLength);
var bodyOffset = 0;
var chunkSize = Math.Min(ReadBufferSize, bodyLength);

do
do
{
var bytesLeft = body.Length - bodyOffset;
if (chunkSize > bytesLeft)
{
var bytesLeft = body.Length - bodyOffset;
if (chunkSize > bytesLeft)
{
chunkSize = bytesLeft;
}
chunkSize = bytesLeft;
}

var readBytes = await _channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken).ConfigureAwait(false);
var readBytes = await _channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken).ConfigureAwait(false);

if (cancellationToken.IsCancellationRequested)
{
return ReceivedMqttPacket.Empty;
}
if (cancellationToken.IsCancellationRequested)
{
return ReceivedMqttPacket.Empty;
}

if (readBytes == 0)
{
return ReceivedMqttPacket.Empty;
}
if (readBytes == 0)
{
return ReceivedMqttPacket.Empty;
}

bodyOffset += readBytes;
} while (bodyOffset < bodyLength);
bodyOffset += readBytes;
} while (bodyOffset < bodyLength);

PacketInspector?.FillReceiveBuffer(body);
PacketInspector?.FillReceiveBuffer(body);

var bodySegment = new ArraySegment<byte>(body, 0, bodyLength);
return new ReceivedMqttPacket(fixedHeader.Flags, bodySegment, fixedHeader.TotalLength);
}
finally
{
IsReadingPacket = false;
}
var bodySegment = new ArraySegment<byte>(body, 0, bodyLength);
return new ReceivedMqttPacket(fixedHeader.Flags, bodySegment, fixedHeader.TotalLength);
}

static bool WrapAndThrowException(Exception exception)
Expand Down
7 changes: 0 additions & 7 deletions Source/MQTTnet/Server/Internal/MqttServerKeepAliveMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,6 @@ void TryProcessClient(MqttClient connection, DateTime now)
return;
}

if (connection.ChannelAdapter.IsReadingPacket)
{
// The connection is currently reading a (large) packet. So it is obviously
// doing something and thus "connected".
return;
}

// Values described here: [MQTT-3.1.2-24].
// If the client sends 5 sec. the server will allow up to 7.5 seconds.
// If the client sends 1 sec. the server will allow up to 1.5 seconds.
Expand Down
21 changes: 21 additions & 0 deletions Source/MQTTnet/Server/Options/MqttServerKeepAliveOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;

namespace MQTTnet.Server
{
public sealed class MqttServerKeepAliveOptions
{
/// <summary>
/// When this mode is enabled the MQTT server will not close a connection when the
/// client is currently sending a (large) payload. This may lead to "dead" connections
/// When this mode is disabled the MQTT server will disconnect a client when the keep
/// alive timeout is reached even if the client is currently sending a (large) payload.
/// </summary>
public bool DisconnectClientWhenReadingPayload { get; set; }

public TimeSpan MonitorInterval { get; set; } = TimeSpan.FromMilliseconds(500);
}
}
11 changes: 9 additions & 2 deletions Source/MQTTnet/Server/Options/MqttServerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,19 @@ namespace MQTTnet.Server
public sealed class MqttServerOptions
{
public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(100);

public MqttServerTcpEndpointOptions DefaultEndpointOptions { get; } = new MqttServerTcpEndpointOptions();

public bool EnablePersistentSessions { get; set; }

public TimeSpan KeepAliveMonitorInterval { get; set; } = TimeSpan.FromMilliseconds(500);
[Obsolete("Use KeepAliveOptions instead.")]
public TimeSpan KeepAliveMonitorInterval
{
get => KeepAliveOptions.MonitorInterval;
set => KeepAliveOptions.MonitorInterval = value;
}

public MqttServerKeepAliveOptions KeepAliveOptions { get; } = new MqttServerKeepAliveOptions();

public int MaxPendingMessagesPerClient { get; set; } = 250;

Expand Down

0 comments on commit 4df0698

Please sign in to comment.