diff --git a/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs b/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs index a3cbb6d5..6c9b96c0 100644 --- a/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs +++ b/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs @@ -85,46 +85,56 @@ private Task ConnectionMonitorAsync(CancellationToken cancellationToken) => Task while (true) { - if (cancellationToken.IsCancellationRequested) + try { - Logger.Trace($"{this.Options.ClientId}-(CM)- Cancelled"); - break; - } + if (cancellationToken.IsCancellationRequested) + { + Logger.Trace($"{this.Options.ClientId}-(CM)- Cancelled"); + break; + } - // If connected and no recent traffic, send a ping - if (this.ConnectState == ConnectState.Connected) - { - if (this.lastCommunicationTimer.Elapsed > TimeSpan.FromSeconds(keepAlivePeriod)) + // If connected and no recent traffic, send a ping + if (this.ConnectState == ConnectState.Connected) { - // Send PingReq - Logger.Trace($"{this.Options.ClientId}-(CM)- --> PingReq"); - this.SendQueue.Enqueue(new PingReqPacket()); + if (this.lastCommunicationTimer.Elapsed > TimeSpan.FromSeconds(keepAlivePeriod)) + { + // Send PingReq + Logger.Trace($"{this.Options.ClientId}-(CM)- --> PingReq"); + this.SendQueue.Enqueue(new PingReqPacket()); + } } - } - // Dumping Client State - Logger.Debug($"{this.Options.ClientId}-(CM)- {this.ConnectState} lastCommunicationTimer:{this.lastCommunicationTimer.Elapsed}"); - Logger.Debug($"{this.Options.ClientId}-(CM)- SendQueue:............{this.SendQueue.Count}"); - Logger.Debug($"{this.Options.ClientId}-(CM)- ReceivedQueue:........{this.ReceivedQueue.Count}"); - Logger.Debug($"{this.Options.ClientId}-(CM)- OutgoingPublishQueue:.{this.OutgoingPublishQueue.Count}"); - Logger.Debug($"{this.Options.ClientId}-(CM)- BrokerReceiveMaxSem...{this.BrokerReceiveSemaphore.CurrentCount}"); - Logger.Debug($"{this.Options.ClientId}-(CM)- OPubTransactionQueue:.{this.OPubTransactionQueue.Count}"); - Logger.Debug($"{this.Options.ClientId}-(CM)- IPubTransactionQueue:.{this.IPubTransactionQueue.Count}"); - Logger.Debug($"{this.Options.ClientId}-(CM)- # of Subscriptions:...{this.Subscriptions.Count}"); - - await this.RunTaskHealthCheckAsync(this.ConnectionWriterTask, "ConnectionWriter").ConfigureAwait(false); - await this.RunTaskHealthCheckAsync(this.ConnectionReaderTask, "ConnectionReader").ConfigureAwait(false); - await this.RunTaskHealthCheckAsync(this.ConnectionPublishWriterTask, "ConnectionPublishWriter").ConfigureAwait(false); - await this.RunTaskHealthCheckAsync(this.ReceivedPacketsHandlerTask, "ReceivedPacketsHandler").ConfigureAwait(false); + // Dumping Client State + Logger.Debug($"{this.Options.ClientId}-(CM)- {this.ConnectState} lastCommunicationTimer:{this.lastCommunicationTimer.Elapsed}"); + Logger.Debug($"{this.Options.ClientId}-(CM)- SendQueue:............{this.SendQueue.Count}"); + Logger.Debug($"{this.Options.ClientId}-(CM)- ReceivedQueue:........{this.ReceivedQueue.Count}"); + Logger.Debug($"{this.Options.ClientId}-(CM)- OutgoingPublishQueue:.{this.OutgoingPublishQueue.Count}"); + Logger.Debug($"{this.Options.ClientId}-(CM)- BrokerReceiveMaxSem...{this.BrokerReceiveSemaphore.CurrentCount}"); + Logger.Debug($"{this.Options.ClientId}-(CM)- OPubTransactionQueue:.{this.OPubTransactionQueue.Count}"); + Logger.Debug($"{this.Options.ClientId}-(CM)- IPubTransactionQueue:.{this.IPubTransactionQueue.Count}"); + Logger.Debug($"{this.Options.ClientId}-(CM)- # of Subscriptions:...{this.Subscriptions.Count}"); + + await this.RunTaskHealthCheckAsync(this.ConnectionWriterTask, "ConnectionWriter").ConfigureAwait(false); + await this.RunTaskHealthCheckAsync(this.ConnectionReaderTask, "ConnectionReader").ConfigureAwait(false); + await this.RunTaskHealthCheckAsync(this.ConnectionPublishWriterTask, "ConnectionPublishWriter").ConfigureAwait(false); + await this.RunTaskHealthCheckAsync(this.ReceivedPacketsHandlerTask, "ReceivedPacketsHandler").ConfigureAwait(false); - try - { await Task.Delay(2000, cancellationToken).ConfigureAwait(false); } catch (TaskCanceledException) { - Logger.Info($"{this.Options.ClientId}-(CM)- Cancelled"); - return; + Logger.Debug($"{this.Options.ClientId}-(CM)- Cancelled"); + break; + } + catch (Exception ex) + { + if (!cancellationToken.IsCancellationRequested) + { + Logger.Error($"{this.Options.ClientId}-(CM)- Exception: {ex}"); + throw; + } + + break; } } @@ -141,51 +151,69 @@ private Task ConnectionPublishWriterAsync(CancellationToken cancellationToken) = while (true) { - if (cancellationToken.IsCancellationRequested) - { - Logger.Trace($"{this.Options.ClientId}-(PW)- Cancelled with {this.OutgoingPublishQueue.Count} publish packets remaining."); - break; - } - - while (this.ConnectState != ConnectState.Connected) + try { - Logger.Trace($"{this.Options.ClientId}-(PW)- Not connected. Waiting for connect..."); - await Task.Delay(500).ConfigureAwait(false); - continue; - } + if (cancellationToken.IsCancellationRequested) + { + Logger.Trace($"{this.Options.ClientId}-(PW)- Cancelled with {this.OutgoingPublishQueue.Count} publish packets remaining."); + break; + } - FlushResult writeResult = default; - var publishPacket = await this.OutgoingPublishQueue.DequeueAsync(cancellationToken).ConfigureAwait(false); + while (this.ConnectState != ConnectState.Connected) + { + Logger.Trace($"{this.Options.ClientId}-(PW)- Not connected. Waiting for connect..."); + await Task.Delay(500).ConfigureAwait(false); + continue; + } - Logger.Trace($"{this.Options.ClientId}-(PW)- --> Sending PublishPacket id={publishPacket.PacketIdentifier}"); - if (publishPacket.Message.QoS is QualityOfService.AtLeastOnceDelivery || - publishPacket.Message.QoS is QualityOfService.ExactlyOnceDelivery) - { - // We have the next qos>0 publish packet to send - // Respect the broker's ReceiveMaximum - await this.BrokerReceiveSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + FlushResult writeResult = default; + var publishPacket = await this.OutgoingPublishQueue.DequeueAsync(cancellationToken).ConfigureAwait(false); - // QoS > 0 - Add to transaction queue - if (!this.OPubTransactionQueue.TryAdd(publishPacket.PacketIdentifier, new List { publishPacket })) + Logger.Trace($"{this.Options.ClientId}-(PW)- --> Sending PublishPacket id={publishPacket.PacketIdentifier}"); + if (publishPacket.Message.QoS is QualityOfService.AtLeastOnceDelivery || + publishPacket.Message.QoS is QualityOfService.ExactlyOnceDelivery) { - Logger.Warn($"Duplicate packet ID detected {publishPacket.PacketIdentifier} while queueing to transaction queue for an outgoing QoS {publishPacket.Message.QoS} publish ."); - this.BrokerReceiveSemaphore.Release(); - continue; + // We have the next qos>0 publish packet to send + // Respect the broker's ReceiveMaximum + await this.BrokerReceiveSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + + // QoS > 0 - Add to transaction queue + if (!this.OPubTransactionQueue.TryAdd(publishPacket.PacketIdentifier, new List { publishPacket })) + { + Logger.Warn($"Duplicate packet ID detected {publishPacket.PacketIdentifier} while queueing to transaction queue for an outgoing QoS {publishPacket.Message.QoS} publish ."); + this.BrokerReceiveSemaphore.Release(); + continue; + } } - } - writeResult = await this.WriteAsync(publishPacket.Encode()).ConfigureAwait(false); - this.OnPublishSentEventLauncher(publishPacket); + writeResult = await this.WriteAsync(publishPacket.Encode()).ConfigureAwait(false); + this.OnPublishSentEventLauncher(publishPacket); - if (writeResult.IsCanceled) + if (writeResult.IsCanceled) + { + Logger.Trace($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter Write Cancelled"); + break; + } + + if (writeResult.IsCompleted) + { + Logger.Trace($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter IsCompleted: end of the stream"); + break; + } + } + catch (TaskCanceledException) { - Logger.Trace($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter Write Cancelled"); + Logger.Debug($"{this.Options.ClientId}-(PW)- Cancelled"); break; } - - if (writeResult.IsCompleted) + catch (Exception ex) { - Logger.Trace($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter IsCompleted: end of the stream"); + if (!cancellationToken.IsCancellationRequested) + { + Logger.Error($"{this.Options.ClientId}-(PW)- Exception: {ex}"); + throw; + } + break; } } // while(true) @@ -203,88 +231,106 @@ private Task ConnectionWriterAsync(CancellationToken cancellationToken) => Task. while (true) { - if (cancellationToken.IsCancellationRequested) + try { - Logger.Trace($"{this.Options.ClientId}-(W)- Cancelled with {this.SendQueue.Count} packets remaining."); - break; - } + if (cancellationToken.IsCancellationRequested) + { + Logger.Trace($"{this.Options.ClientId}-(W)- Cancelled with {this.SendQueue.Count} packets remaining."); + break; + } - // We allow this task to run in Connecting, Connected, and Disconnecting states - // because it is the one that has to send the CONNECT and DISCONNECT packets. - while (this.ConnectState == ConnectState.Disconnected) - { - Logger.Trace($"{this.Options.ClientId}-(W)- Not connected. Waiting for connect..."); - await Task.Delay(2000).ConfigureAwait(false); - continue; - } + // We allow this task to run in Connecting, Connected, and Disconnecting states + // because it is the one that has to send the CONNECT and DISCONNECT packets. + while (this.ConnectState == ConnectState.Disconnected) + { + Logger.Trace($"{this.Options.ClientId}-(W)- Not connected. Waiting for connect..."); + await Task.Delay(2000).ConfigureAwait(false); + continue; + } - var packet = await this.SendQueue.DequeueAsync(cancellationToken).ConfigureAwait(false); - FlushResult writeResult = default; + var packet = await this.SendQueue.DequeueAsync(cancellationToken).ConfigureAwait(false); + FlushResult writeResult = default; - switch (packet) - { - // FIXME: Only one connect, subscribe or unsubscribe packet can be sent at a time. - case ConnectPacket connectPacket: - Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending ConnectPacket id={connectPacket.PacketIdentifier}"); - writeResult = await this.WriteAsync(connectPacket.Encode()).ConfigureAwait(false); - this.OnConnectSentEventLauncher(connectPacket); - break; - case DisconnectPacket disconnectPacket: - Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending DisconnectPacket id={disconnectPacket.PacketIdentifier}"); - writeResult = await this.WriteAsync(disconnectPacket.Encode()).ConfigureAwait(false); - this.OnDisconnectSentEventLauncher(disconnectPacket); - break; - case SubscribePacket subscribePacket: - Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending SubscribePacket id={subscribePacket.PacketIdentifier}"); - writeResult = await this.WriteAsync(subscribePacket.Encode()).ConfigureAwait(false); - this.OnSubscribeSentEventLauncher(subscribePacket); - break; - case UnsubscribePacket unsubscribePacket: - Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending UnsubscribePacket id={unsubscribePacket.PacketIdentifier}"); - writeResult = await this.WriteAsync(unsubscribePacket.Encode()).ConfigureAwait(false); - this.OnUnsubscribeSentEventLauncher(unsubscribePacket); - break; - case PublishPacket publishPacket: - throw new HiveMQttClientException("PublishPacket should be sent via ConnectionPublishWriterAsync."); - case PubAckPacket pubAckPacket: - Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubAckPacket id={pubAckPacket.PacketIdentifier} reason={pubAckPacket.ReasonCode}"); - writeResult = await this.WriteAsync(pubAckPacket.Encode()).ConfigureAwait(false); - this.OnPubAckSentEventLauncher(pubAckPacket); - break; - case PubRecPacket pubRecPacket: - Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubRecPacket id={pubRecPacket.PacketIdentifier} reason={pubRecPacket.ReasonCode}"); - writeResult = await this.WriteAsync(pubRecPacket.Encode()).ConfigureAwait(false); - this.OnPubRecSentEventLauncher(pubRecPacket); - break; - case PubRelPacket pubRelPacket: - Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubRelPacket id={pubRelPacket.PacketIdentifier} reason={pubRelPacket.ReasonCode}"); - writeResult = await this.WriteAsync(pubRelPacket.Encode()).ConfigureAwait(false); - this.OnPubRelSentEventLauncher(pubRelPacket); - break; - case PubCompPacket pubCompPacket: - Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubCompPacket id={pubCompPacket.PacketIdentifier} reason={pubCompPacket.ReasonCode}"); - writeResult = await this.WriteAsync(pubCompPacket.Encode()).ConfigureAwait(false); - this.OnPubCompSentEventLauncher(pubCompPacket); - break; - case PingReqPacket pingReqPacket: - Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PingReqPacket id={pingReqPacket.PacketIdentifier}"); - writeResult = await this.WriteAsync(PingReqPacket.Encode()).ConfigureAwait(false); - this.OnPingReqSentEventLauncher(pingReqPacket); - break; - default: - Logger.Trace($"{this.Options.ClientId}-(W)- --> Unknown packet type {packet}"); + switch (packet) + { + // FIXME: Only one connect, subscribe or unsubscribe packet can be sent at a time. + case ConnectPacket connectPacket: + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending ConnectPacket id={connectPacket.PacketIdentifier}"); + writeResult = await this.WriteAsync(connectPacket.Encode()).ConfigureAwait(false); + this.OnConnectSentEventLauncher(connectPacket); + break; + case DisconnectPacket disconnectPacket: + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending DisconnectPacket id={disconnectPacket.PacketIdentifier}"); + writeResult = await this.WriteAsync(disconnectPacket.Encode()).ConfigureAwait(false); + this.OnDisconnectSentEventLauncher(disconnectPacket); + break; + case SubscribePacket subscribePacket: + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending SubscribePacket id={subscribePacket.PacketIdentifier}"); + writeResult = await this.WriteAsync(subscribePacket.Encode()).ConfigureAwait(false); + this.OnSubscribeSentEventLauncher(subscribePacket); + break; + case UnsubscribePacket unsubscribePacket: + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending UnsubscribePacket id={unsubscribePacket.PacketIdentifier}"); + writeResult = await this.WriteAsync(unsubscribePacket.Encode()).ConfigureAwait(false); + this.OnUnsubscribeSentEventLauncher(unsubscribePacket); + break; + case PublishPacket publishPacket: + throw new HiveMQttClientException("PublishPacket should be sent via ConnectionPublishWriterAsync."); + case PubAckPacket pubAckPacket: + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubAckPacket id={pubAckPacket.PacketIdentifier} reason={pubAckPacket.ReasonCode}"); + writeResult = await this.WriteAsync(pubAckPacket.Encode()).ConfigureAwait(false); + this.OnPubAckSentEventLauncher(pubAckPacket); + break; + case PubRecPacket pubRecPacket: + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubRecPacket id={pubRecPacket.PacketIdentifier} reason={pubRecPacket.ReasonCode}"); + writeResult = await this.WriteAsync(pubRecPacket.Encode()).ConfigureAwait(false); + this.OnPubRecSentEventLauncher(pubRecPacket); + break; + case PubRelPacket pubRelPacket: + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubRelPacket id={pubRelPacket.PacketIdentifier} reason={pubRelPacket.ReasonCode}"); + writeResult = await this.WriteAsync(pubRelPacket.Encode()).ConfigureAwait(false); + this.OnPubRelSentEventLauncher(pubRelPacket); + break; + case PubCompPacket pubCompPacket: + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubCompPacket id={pubCompPacket.PacketIdentifier} reason={pubCompPacket.ReasonCode}"); + writeResult = await this.WriteAsync(pubCompPacket.Encode()).ConfigureAwait(false); + this.OnPubCompSentEventLauncher(pubCompPacket); + break; + case PingReqPacket pingReqPacket: + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PingReqPacket id={pingReqPacket.PacketIdentifier}"); + writeResult = await this.WriteAsync(PingReqPacket.Encode()).ConfigureAwait(false); + this.OnPingReqSentEventLauncher(pingReqPacket); + break; + default: + Logger.Trace($"{this.Options.ClientId}-(W)- --> Unknown packet type {packet}"); + break; + } // switch + + if (writeResult.IsCanceled) + { + Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter Write Cancelled"); break; - } // switch + } - if (writeResult.IsCanceled) + if (writeResult.IsCompleted) + { + Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter IsCompleted: end of the stream"); + break; + } + } + catch (TaskCanceledException) { - Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter Write Cancelled"); + Logger.Debug($"{this.Options.ClientId}-(W)- Cancelled"); break; } - - if (writeResult.IsCompleted) + catch (Exception ex) { - Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter IsCompleted: end of the stream"); + if (!cancellationToken.IsCancellationRequested) + { + Logger.Error($"{this.Options.ClientId}-(W)- Exception: {ex}"); + throw; + } + break; } } // while(true) @@ -300,108 +346,126 @@ private Task ConnectionWriterAsync(CancellationToken cancellationToken) => Task. private Task ConnectionReaderAsync(CancellationToken cancellationToken) => Task.Run( async () => { - Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader Starting...{this.ConnectState}"); - ReadResult readResult; + Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader Starting...{this.ConnectState}"); while (this.ConnectState is ConnectState.Connecting or ConnectState.Connected) { - if (cancellationToken.IsCancellationRequested) - { - Logger.Trace($"{this.Options.ClientId}-(R)- Cancelled"); - break; - } - - readResult = await this.ReadAsync().ConfigureAwait(false); - - if (readResult.IsCanceled) - { - Logger.Trace($"{this.Options.ClientId}-(R)- Cancelled read result."); - break; - } - - if (readResult.IsCompleted) + try { - Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader IsCompleted: end of the stream"); - if (this.ConnectState == ConnectState.Connected) + if (cancellationToken.IsCancellationRequested) { - // This is an unexpected exit and may be due to a network failure. - Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader IsCompleted: this was unexpected"); - await this.HandleDisconnectionAsync(false).ConfigureAwait(false); + Logger.Trace($"{this.Options.ClientId}-(R)- Cancelled"); + break; } - Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader Exiting...{this.ConnectState}"); - return true; - } + readResult = await this.ReadAsync().ConfigureAwait(false); - var buffer = readResult.Buffer; + if (readResult.IsCanceled) + { + Logger.Trace($"{this.Options.ClientId}-(R)- Cancelled read result."); + break; + } - while (buffer.Length > 0) - { - if (!PacketDecoder.TryDecode(buffer, out var decodedPacket, out var consumed)) + if (readResult.IsCompleted) { - if (decodedPacket is MalformedPacket) + Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader IsCompleted: end of the stream"); + if (this.ConnectState == ConnectState.Connected) { - Logger.Warn($"Malformed packet received. Disconnecting."); - Logger.Debug($"{this.Options.ClientId}-(R)- Malformed packet received: {decodedPacket}"); - - var opts = new DisconnectOptions - { - ReasonCode = DisconnectReasonCode.MalformedPacket, - ReasonString = "Malformed Packet", - }; - return await this.DisconnectAsync(opts).ConfigureAwait(false); + // This is an unexpected exit and may be due to a network failure. + Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader IsCompleted: this was unexpected"); + await this.HandleDisconnectionAsync(false).ConfigureAwait(false); } - // Not enough data in the buffer to decode a packet - // Advance the reader to the end of the consumed data - buffer = buffer.Slice(0, consumed); - this.Reader?.AdvanceTo(buffer.Start, readResult.Buffer.End); - Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader: PacketDecoder.TryDecode returned false. Waiting for more data..."); - break; + Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader Exiting...{this.ConnectState}"); + return true; } - // Advance the reader to indicate how much of the buffer has been consumed - buffer = buffer.Slice(consumed); - this.Reader?.AdvanceTo(buffer.Start); - - // We handle disconnects immediately - if (decodedPacket is DisconnectPacket disconnectPacket) - { - Logger.Warn($"-(R)- <-- Disconnect received: {disconnectPacket.DisconnectReasonCode} {disconnectPacket.Properties.ReasonString}"); - await this.HandleDisconnectionAsync(false).ConfigureAwait(false); - this.OnDisconnectReceivedEventLauncher(disconnectPacket); - break; - } + var buffer = readResult.Buffer; - if (decodedPacket is PublishPacket publishPacket) + while (buffer.Length > 0) { - // Limit the number of concurrent incoming QoS 1 and QoS 2 transactions - if (publishPacket.Message.QoS is QualityOfService.ExactlyOnceDelivery || - publishPacket.Message.QoS is QualityOfService.AtLeastOnceDelivery) + if (!PacketDecoder.TryDecode(buffer, out var decodedPacket, out var consumed)) { - while (true) + if (decodedPacket is MalformedPacket) { - if (this.IPubTransactionQueue.Count >= this.Options.ClientReceiveMaximum) + Logger.Warn($"Malformed packet received. Disconnecting."); + Logger.Debug($"{this.Options.ClientId}-(R)- Malformed packet received: {decodedPacket}"); + + var opts = new DisconnectOptions { - Logger.Trace($"-(R)- The Maximum number of concurrent publishes have been received from broker. Applying back-pressure and waiting for existing transactions to complete."); - await Task.Delay(500).ConfigureAwait(false); - } - else + ReasonCode = DisconnectReasonCode.MalformedPacket, + ReasonString = "Malformed Packet", + }; + return await this.DisconnectAsync(opts).ConfigureAwait(false); + } + + // Not enough data in the buffer to decode a packet + // Advance the reader to the end of the consumed data + buffer = buffer.Slice(0, consumed); + this.Reader?.AdvanceTo(buffer.Start, readResult.Buffer.End); + Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader: PacketDecoder.TryDecode returned false. Waiting for more data..."); + break; + } + + // Advance the reader to indicate how much of the buffer has been consumed + buffer = buffer.Slice(consumed); + this.Reader?.AdvanceTo(buffer.Start); + + // We handle disconnects immediately + if (decodedPacket is DisconnectPacket disconnectPacket) + { + Logger.Warn($"-(R)- <-- Disconnect received: {disconnectPacket.DisconnectReasonCode} {disconnectPacket.Properties.ReasonString}"); + await this.HandleDisconnectionAsync(false).ConfigureAwait(false); + this.OnDisconnectReceivedEventLauncher(disconnectPacket); + break; + } + + if (decodedPacket is PublishPacket publishPacket) + { + // Limit the number of concurrent incoming QoS 1 and QoS 2 transactions + if (publishPacket.Message.QoS is QualityOfService.ExactlyOnceDelivery || + publishPacket.Message.QoS is QualityOfService.AtLeastOnceDelivery) + { + while (true) { - break; - } - } // while (true) + if (this.IPubTransactionQueue.Count >= this.Options.ClientReceiveMaximum) + { + Logger.Trace($"-(R)- The Maximum number of concurrent publishes have been received from broker. Applying back-pressure and waiting for existing transactions to complete."); + await Task.Delay(500).ConfigureAwait(false); + } + else + { + break; + } + } // while (true) + } } - } - Logger.Trace($"{this.Options.ClientId}-(R)- <-- Received {decodedPacket.GetType().Name} id: {decodedPacket.PacketIdentifier}. Adding to receivedQueue."); + Logger.Trace($"{this.Options.ClientId}-(R)- <-- Received {decodedPacket.GetType().Name} id: {decodedPacket.PacketIdentifier}. Adding to receivedQueue."); + + // Add the packet to the received queue for processing later by ReceivedPacketsHandlerAsync + this.ReceivedQueue.Enqueue(decodedPacket); + } // while (buffer.Length > 0 - // Add the packet to the received queue for processing later by ReceivedPacketsHandlerAsync - this.ReceivedQueue.Enqueue(decodedPacket); - } // while (buffer.Length > 0 + // FIXME + await Task.Yield(); + } + catch (TaskCanceledException) + { + Logger.Debug($"{this.Options.ClientId}-(R)- Cancelled"); + break; + } + catch (Exception ex) + { + if (!cancellationToken.IsCancellationRequested) + { + Logger.Error($"{this.Options.ClientId}-(R)- Exception: {ex}"); + throw; + } - await Task.Yield(); + break; + } } // while (this.ConnectState is ConnectState.Connecting or ConnectState.Connected) Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader Exiting...{this.ConnectState}"); @@ -419,82 +483,100 @@ private Task ReceivedPacketsHandlerAsync(CancellationToken cancellationToken) => while (true) { - if (cancellationToken.IsCancellationRequested) - { - Logger.Trace($"{this.Options.ClientId}-(RPH)- Cancelled with {this.ReceivedQueue.Count} received packets remaining."); - break; - } - - var packet = await this.ReceivedQueue.DequeueAsync(cancellationToken).ConfigureAwait(false); - if (this.Options.ClientMaximumPacketSize != null) + try { - if (packet.PacketSize > this.Options.ClientMaximumPacketSize) + if (cancellationToken.IsCancellationRequested) { - Logger.Warn($"Received packet size {packet.PacketSize} exceeds maximum packet size {this.Options.ClientMaximumPacketSize}. Disconnecting."); - Logger.Debug($"{this.Options.ClientId}-(RPH)- Received packet size {packet.PacketSize} exceeds maximum packet size {this.Options.ClientMaximumPacketSize}. Disconnecting."); - - var opts = new DisconnectOptions - { - ReasonCode = DisconnectReasonCode.PacketTooLarge, - ReasonString = "Packet Too Large", - }; - await this.DisconnectAsync(opts).ConfigureAwait(false); - return; + Logger.Trace($"{this.Options.ClientId}-(RPH)- Cancelled with {this.ReceivedQueue.Count} received packets remaining."); + break; } - } - switch (packet) - { - case ConnAckPacket connAckPacket: - Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received ConnAck id={connAckPacket.PacketIdentifier}"); - if (connAckPacket.ReasonCode == ConnAckReasonCode.Success && connAckPacket.Properties.ReceiveMaximum != null) + var packet = await this.ReceivedQueue.DequeueAsync(cancellationToken).ConfigureAwait(false); + if (this.Options.ClientMaximumPacketSize != null) + { + if (packet.PacketSize > this.Options.ClientMaximumPacketSize) { - Logger.Debug($"{this.Options.ClientId}-(RPH)- <-- Broker says limit concurrent incoming QoS 1 and QoS 2 publishes to {connAckPacket.Properties.ReceiveMaximum}."); + Logger.Warn($"Received packet size {packet.PacketSize} exceeds maximum packet size {this.Options.ClientMaximumPacketSize}. Disconnecting."); + Logger.Debug($"{this.Options.ClientId}-(RPH)- Received packet size {packet.PacketSize} exceeds maximum packet size {this.Options.ClientMaximumPacketSize}. Disconnecting."); - // Replace the BrokerReceiveSemaphore with a new one with the broker's ReceiveMaximum - this.BrokerReceiveSemaphore = new SemaphoreSlim((int)connAckPacket.Properties.ReceiveMaximum); + var opts = new DisconnectOptions + { + ReasonCode = DisconnectReasonCode.PacketTooLarge, + ReasonString = "Packet Too Large", + }; + await this.DisconnectAsync(opts).ConfigureAwait(false); + return; } + } - this.ConnectionProperties = connAckPacket.Properties; - this.OnConnAckReceivedEventLauncher(connAckPacket); - break; - case PingRespPacket pingRespPacket: - Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received PingResp id={pingRespPacket.PacketIdentifier}"); - this.OnPingRespReceivedEventLauncher(pingRespPacket); - break; - case SubAckPacket subAckPacket: - Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received SubAck id={subAckPacket.PacketIdentifier}"); - this.OnSubAckReceivedEventLauncher(subAckPacket); - break; - case UnsubAckPacket unsubAckPacket: - Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received UnsubAck id={unsubAckPacket.PacketIdentifier}"); - this.OnUnsubAckReceivedEventLauncher(unsubAckPacket); - break; - case PublishPacket publishPacket: - this.HandleIncomingPublishPacket(publishPacket); - break; - case PubAckPacket pubAckPacket: - this.HandleIncomingPubAckPacket(pubAckPacket); - break; - case PubRecPacket pubRecPacket: - this.HandleIncomingPubRecPacket(pubRecPacket); - break; - case PubRelPacket pubRelPacket: - this.HandleIncomingPubRelPacket(pubRelPacket); - break; - case PubCompPacket pubCompPacket: - this.HandleIncomingPubCompPacket(pubCompPacket); - break; - case DisconnectPacket disconnectPacket: - // Disconnects are handled immediate and shouldn't be received here - // We leave this just as a sanity backup - Logger.Error($"{this.Options.ClientId}-(RPH)- Incorrectly received Disconnect packet in ReceivedPacketsHandlerAsync"); - throw new HiveMQttClientException("Received Disconnect packet in ReceivedPacketsHandlerAsync"); - default: - Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received Unknown packet type. Will discard."); - Logger.Error($"Unrecognized packet received. Will discard. {packet}"); - break; - } // switch (packet) + switch (packet) + { + case ConnAckPacket connAckPacket: + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received ConnAck id={connAckPacket.PacketIdentifier}"); + if (connAckPacket.ReasonCode == ConnAckReasonCode.Success && connAckPacket.Properties.ReceiveMaximum != null) + { + Logger.Debug($"{this.Options.ClientId}-(RPH)- <-- Broker says limit concurrent incoming QoS 1 and QoS 2 publishes to {connAckPacket.Properties.ReceiveMaximum}."); + + // Replace the BrokerReceiveSemaphore with a new one with the broker's ReceiveMaximum + this.BrokerReceiveSemaphore = new SemaphoreSlim((int)connAckPacket.Properties.ReceiveMaximum); + } + + this.ConnectionProperties = connAckPacket.Properties; + this.OnConnAckReceivedEventLauncher(connAckPacket); + break; + case PingRespPacket pingRespPacket: + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received PingResp id={pingRespPacket.PacketIdentifier}"); + this.OnPingRespReceivedEventLauncher(pingRespPacket); + break; + case SubAckPacket subAckPacket: + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received SubAck id={subAckPacket.PacketIdentifier}"); + this.OnSubAckReceivedEventLauncher(subAckPacket); + break; + case UnsubAckPacket unsubAckPacket: + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received UnsubAck id={unsubAckPacket.PacketIdentifier}"); + this.OnUnsubAckReceivedEventLauncher(unsubAckPacket); + break; + case PublishPacket publishPacket: + this.HandleIncomingPublishPacket(publishPacket); + break; + case PubAckPacket pubAckPacket: + this.HandleIncomingPubAckPacket(pubAckPacket); + break; + case PubRecPacket pubRecPacket: + this.HandleIncomingPubRecPacket(pubRecPacket); + break; + case PubRelPacket pubRelPacket: + this.HandleIncomingPubRelPacket(pubRelPacket); + break; + case PubCompPacket pubCompPacket: + this.HandleIncomingPubCompPacket(pubCompPacket); + break; + case DisconnectPacket disconnectPacket: + // Disconnects are handled immediate and shouldn't be received here + // We leave this just as a sanity backup + Logger.Error($"{this.Options.ClientId}-(RPH)- Incorrectly received Disconnect packet in ReceivedPacketsHandlerAsync"); + throw new HiveMQttClientException("Received Disconnect packet in ReceivedPacketsHandlerAsync"); + default: + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received Unknown packet type. Will discard."); + Logger.Error($"Unrecognized packet received. Will discard. {packet}"); + break; + } // switch (packet) + } + catch (TaskCanceledException) + { + Logger.Debug($"{this.Options.ClientId}-(RPH)- Cancelled"); + break; + } + catch (Exception ex) + { + if (!cancellationToken.IsCancellationRequested) + { + Logger.Error($"{this.Options.ClientId}-(RPH)- Exception: {ex}"); + throw; + } + + break; + } } // while (true) Logger.Trace($"{this.Options.ClientId}-(RPH)- ReceivedPacketsHandler Exiting...{this.ConnectState}");