Skip to content

Commit

Permalink
Add Support for [After|Before]Disconnect Events (#89)
Browse files Browse the repository at this point in the history
  • Loading branch information
pglombardo authored Sep 26, 2023
1 parent f36e7f8 commit fe7d12f
Show file tree
Hide file tree
Showing 12 changed files with 356 additions and 23 deletions.
26 changes: 26 additions & 0 deletions Examples/Reconnect/.vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"version": "0.2.0",
"configurations": [
{
// Use IntelliSense to find out which attributes exist for C# debugging
// Use hover for the description of the existing attributes
// For further information visit https://github.com/OmniSharp/omnisharp-vscode/blob/master/debugger-launchjson.md
"name": ".NET Core Launch (console)",
"type": "coreclr",
"request": "launch",
"preLaunchTask": "build",
// If you have changed target frameworks, make sure to update the program path.
"program": "${workspaceFolder}/bin/Debug/net7.0/Reconnect.dll",
"args": [],
"cwd": "${workspaceFolder}",
// For more information about the 'console' field, see https://aka.ms/VSCode-CS-LaunchJson-Console
"console": "integratedTerminal",
"stopAtEntry": false
},
{
"name": ".NET Core Attach",
"type": "coreclr",
"request": "attach"
}
]
}
41 changes: 41 additions & 0 deletions Examples/Reconnect/.vscode/tasks.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{
"version": "2.0.0",
"tasks": [
{
"label": "build",
"command": "dotnet",
"type": "process",
"args": [
"build",
"${workspaceFolder}/Reconnect.csproj",
"/property:GenerateFullPaths=true",
"/consoleloggerparameters:NoSummary"
],
"problemMatcher": "$msCompile"
},
{
"label": "publish",
"command": "dotnet",
"type": "process",
"args": [
"publish",
"${workspaceFolder}/Reconnect.csproj",
"/property:GenerateFullPaths=true",
"/consoleloggerparameters:NoSummary"
],
"problemMatcher": "$msCompile"
},
{
"label": "watch",
"command": "dotnet",
"type": "process",
"args": [
"watch",
"run",
"--project",
"${workspaceFolder}/Reconnect.csproj"
],
"problemMatcher": "$msCompile"
}
]
}
139 changes: 139 additions & 0 deletions Examples/Reconnect/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
using HiveMQtt.Client;
using HiveMQtt.Client.Options;
using System.Text.Json;

var topic = "hivemqtt/waiting/game";

var options = new HiveMQClientOptions();
options.Host = "127.0.0.1";
options.Port = 1883;

var client = new HiveMQClient(options);

// Add handlers
// Message handler
client.OnMessageReceived += (sender, args) =>
{
// Handle Message in args.PublishMessage
Console.WriteLine($"--> Message Received: {args.PublishMessage.PayloadAsString}");
};

// This handler is called when the client is disconnected
client.AfterDisconnect += async (sender, args) =>
{
var client = (HiveMQClient)sender;

Console.WriteLine($"AfterDisconnect Handler called with args.CleanDisconnect={args.CleanDisconnect}.");

// We've been disconnected
if (args.CleanDisconnect)
{
Console.WriteLine("--> AfterDisconnectEventArgs indicate a clean disconnect.");
Console.WriteLine("--> A clean disconnect was requested by either the client or the broker.");
}
else
{
Console.WriteLine("--> AfterDisconnectEventArgs indicate an unexpected disconnect.");
Console.WriteLine("--> This could be due to a network outage, broker outage, or other issue.");
Console.WriteLine("--> In this case we will attempt to reconnect periodically.");

// We could have been disconnected for any number of reasons: network outage, broker outage, etc.
// Here we loop with a backing off delay until we reconnect

// Start with a small delay and double it on each retry up to a maximum value
var delay = 5000;
var reconnectAttempts = 0;

while (true)
{
await Task.Delay(delay).ConfigureAwait(false);
reconnectAttempts++;

if (reconnectAttempts > 3)
{
Console.WriteLine("--> Maximum reconnect attempts exceeded. Exiting.");
break;
}

try
{
Console.WriteLine($"--> Attempting to reconnect to broker. This is attempt #{reconnectAttempts}.");
var connectResult = await client.ConnectAsync().ConfigureAwait(false);

if (connectResult.ReasonCode != HiveMQtt.MQTT5.ReasonCodes.ConnAckReasonCode.Success)
{
Console.WriteLine($"--> Failed to connect: {connectResult.ReasonString}");

// Double the delay with each failed retry to a maximum
delay = Math.Min(delay * 2, 30000);
Console.WriteLine($"--> Will delay for {delay / 1000} seconds until next try.");
}
else
{
Console.WriteLine("--> Reconnected successfully.");
break;
}
}
catch (Exception ex)
{
Console.WriteLine($"--> Failed to connect: {ex.Message}");

// Double the delay with each failed retry to a maximum
delay = Math.Min(delay * 2, 10000);
Console.WriteLine($"--> Will delay for {delay / 1000} seconds until next try.");
}
}
} // if (args.CleanDisconnect)

Console.WriteLine("--> Exiting AfterDisconnect handler.");
};

// Attempt to connect to the broker
try
{
var connectResult = await client.ConnectAsync().ConfigureAwait(false);
if (connectResult.ReasonCode != HiveMQtt.MQTT5.ReasonCodes.ConnAckReasonCode.Success)
{
throw new Exception($"Failed to connect to broker: {connectResult.ReasonString}");
}
}
catch (Exception ex)
{
Console.WriteLine($"Failed to connect to broker: {ex.Message}");
return;
}

// Subscribe to a topic
Console.WriteLine($"Subscribing to {topic}...");
await client.SubscribeAsync(topic).ConfigureAwait(false);

Console.WriteLine($"We are connected to the broker and will be waiting indefinitely for messages or a disconnect.");
Console.WriteLine($"--> Publish messages to {topic} and they will be printed.");
Console.WriteLine($"--> Shutdown/disconnect the broker and see the AfterDisconnect code execute.");

await Task.Delay(1000).ConfigureAwait(false);

// Publish a message
Console.WriteLine("Publishing a test message...");
var resultPublish = await client.PublishAsync(
topic,
JsonSerializer.Serialize(new
{
Command = "Hello",
})
).ConfigureAwait(false);


while (true)
{
await Task.Delay(2000).ConfigureAwait(false);
Console.WriteLine("Press q exit...");
if (Console.ReadKey().Key == ConsoleKey.Q)
{
Console.WriteLine("\n");
break;
}
}

Console.WriteLine("Disconnecting gracefully...");
await client.DisconnectAsync().ConfigureAwait(false);
20 changes: 20 additions & 0 deletions Examples/Reconnect/Reconnect.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net7.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<!-- Use the HiveMQtt project as a local source. Otherwise, fetch from nuget. -->
<PropertyGroup>
<RestoreSources>$(RestoreSources);../../Source/HiveMQtt/bin/Debug/;https://api.nuget.org/v3/index.json</RestoreSources>
</PropertyGroup>

<!-- Update the version to match -->
<ItemGroup>
<PackageReference Include="HiveMQtt" Version="0.4.0" />
</ItemGroup>

</Project>
16 changes: 12 additions & 4 deletions Source/HiveMQtt/Client/Events/AfterDisconnectEventArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,20 @@ namespace HiveMQtt.Client.Events;

/// <summary>
/// Event arguments for the <see cref="HiveMQClient.AfterDisconnect"/> event.
/// <para>This event is called after a disconnect is sent to the broker.</para>
/// <para><see cref="AfterDisconnectEventArgs.DisconnectResult"/> contains the result of the disconnect operation.</para>
/// <para>
/// This event is called after a disconnect from the MQTT broker. This can be happen because
/// of a call to <see cref="HiveMQClient.DisconnectAsync"/> or because of a failure.
/// </para>
/// <para>
/// If the disconnect was caused by a call to <see cref="HiveMQClient.DisconnectAsync"/>, then
/// <see cref="AfterDisconnectEventArgs.CleanDisconnect"/> will be <c>true</c>. If the disconnect
/// was caused by a failure, then <see cref="AfterDisconnectEventArgs.CleanDisconnect"/> will be
/// <c>false</c>.
/// </para>
/// </summary>
public class AfterDisconnectEventArgs : EventArgs
{
public AfterDisconnectEventArgs(bool result) => this.DisconnectResult = result;
public AfterDisconnectEventArgs(bool clean = false) => this.CleanDisconnect = clean;

public bool DisconnectResult { get; set; }
public bool CleanDisconnect { get; set; }
}
4 changes: 3 additions & 1 deletion Source/HiveMQtt/Client/Events/BeforeDisconnectEventArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ namespace HiveMQtt.Client.Events;
/// </summary>
public class BeforeDisconnectEventArgs : EventArgs
{
public BeforeDisconnectEventArgs(HiveMQClientOptions options) => this.Options = options;
public BeforeDisconnectEventArgs()
{
}

public HiveMQClientOptions Options { get; set; }
}
3 changes: 3 additions & 0 deletions Source/HiveMQtt/Client/HiveMQClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ public async Task<bool> DisconnectAsync(DisconnectOptions? options = null)

options ??= new DisconnectOptions();

// Fire the corresponding event
this.BeforeDisconnectEventLauncher();

var disconnectPacket = new DisconnectPacket
{
DisconnectReasonCode = options.ReasonCode,
Expand Down
13 changes: 7 additions & 6 deletions Source/HiveMQtt/Client/HiveMQClientEvents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ namespace HiveMQtt.Client;

using System;
using System.Diagnostics;
using System.Security.Claims;
using HiveMQtt.Client.Events;
using HiveMQtt.Client.Options;
using HiveMQtt.Client.Results;
Expand Down Expand Up @@ -55,25 +56,25 @@ protected virtual void AfterConnectEventLauncher(ConnectResult results)
}

/// <summary>
/// Event that is fired before the client connects to the broker.
/// Event that is fired before the client disconnects from the broker.
/// </summary>
public event EventHandler<BeforeDisconnectEventArgs> BeforeDisconnect = new((client, e) => { });

protected virtual void BeforeDisconnectEventLauncher(HiveMQClientOptions options)
protected virtual void BeforeDisconnectEventLauncher()
{
var eventArgs = new BeforeDisconnectEventArgs(options);
var eventArgs = new BeforeDisconnectEventArgs();
Trace.WriteLine("BeforeDisconnectEventLauncher");
this.BeforeDisconnect?.Invoke(this, eventArgs);
}

/// <summary>
/// Event that is fired after the client connects to the broker.
/// Event that is fired after the client is disconnected from the broker.
/// </summary>
public event EventHandler<AfterDisconnectEventArgs> AfterDisconnect = new((client, e) => { });

protected virtual void AfterDisconnectEventLauncher(bool result)
protected virtual void AfterDisconnectEventLauncher(bool clean = false)
{
var eventArgs = new AfterDisconnectEventArgs(result);
var eventArgs = new AfterDisconnectEventArgs(clean);
Trace.WriteLine("AfterDisconnectEventLauncher");
this.AfterDisconnect?.Invoke(this, eventArgs);
}
Expand Down
17 changes: 15 additions & 2 deletions Source/HiveMQtt/Client/HiveMQClientSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ namespace HiveMQtt.Client;
using System.Net.Sockets;
using System.Security.Cryptography.X509Certificates;

using System.Threading;
using System.Threading.Tasks;

using HiveMQtt.Client.Exceptions;

/// <inheritdoc />
Expand All @@ -30,6 +33,9 @@ public partial class HiveMQClient : IDisposable, IHiveMQClient
private Stream? stream;
private PipeReader? reader;
private PipeWriter? writer;
private CancellationTokenSource cancellationSource;
private CancellationToken outFlowCancellationToken;
private CancellationToken infoFlowCancellationToken;

internal static bool ValidateServerCertificate(
object sender,
Expand Down Expand Up @@ -116,16 +122,23 @@ internal async Task<bool> ConnectSocketAsync()
this.reader = PipeReader.Create(this.stream);
this.writer = PipeWriter.Create(this.stream);

// Setup the cancellation tokens
this.cancellationSource = new CancellationTokenSource();
this.outFlowCancellationToken = this.cancellationSource.Token;
this.infoFlowCancellationToken = this.cancellationSource.Token;

// Start the traffic processors
_ = this.TrafficOutflowProcessorAsync();
_ = this.TrafficInflowProcessorAsync();
_ = this.TrafficOutflowProcessorAsync(this.outFlowCancellationToken);
_ = this.TrafficInflowProcessorAsync(this.infoFlowCancellationToken);

// Console.WriteLine($"Socket connected to {this.socket.RemoteEndPoint}");
return socketConnected;
}

internal bool CloseSocket(bool? shutdownPipeline = true)
{
this.cancellationSource.Cancel();

if (shutdownPipeline == true)
{
// Shutdown the pipeline
Expand Down
Loading

0 comments on commit fe7d12f

Please sign in to comment.