Skip to content

Commit

Permalink
Implement leaky bucket rate limiter and integrate into Matrix client
Browse files Browse the repository at this point in the history
  • Loading branch information
rschili committed Dec 23, 2024
1 parent e1f3663 commit 2718f53
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/MatrixTextClient/Http/HttpClientHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public static class HttpClientHelper
{
public static async Task<TResponse> SendAsync<TResponse>(HttpClientParameters parameters, string path, HttpMethod? method = null, HttpContent? content = null)
{
//TODO: Rate limiter. System.Threading.RateLimiting considered, but we don't want timers and disposable objects. Task.Delay is fine.
var cancellationToken = parameters.CancellationToken;
ArgumentNullException.ThrowIfNull(parameters, nameof(parameters));
ArgumentException.ThrowIfNullOrEmpty(path, nameof(path));
Expand Down
68 changes: 68 additions & 0 deletions src/MatrixTextClient/Http/LeakyBucketRateLimiter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
using System;
using System.Diagnostics;
using System.Threading.Tasks;

namespace MatrixTextClient.Http;
/// <summary>
/// Since matrix usually has a rate limit of 600 requests per hour, we need to implement a rate limiter to prevent
/// the client from making too many requests.
/// </summary>
/// <remarks>
/// This rate limiter is based on the leaky bucket metaphor. It allows for a burst of requests up to a certain
/// quota, and then limits the rate of requests to a certain number per hour. Imagine a leaky bucket with requests
/// leaking out at a constant rate. When it is empty, no more requests can be made until it is refilled.
/// </remarks>
public class LeakyBucketRateLimiter
{
private readonly int _capacity;
private readonly long _ticksPerHour;
private readonly long _ticksPerRestore; // ticks to restore one leak

private readonly int _maxLeaksPerHour;

private object _lock = new object();
private long _lastRestoreTicks;

private int _waterlevel;


public LeakyBucketRateLimiter(int capacity, int maxLeaksPerHour)
{
if(capacity > maxLeaksPerHour)
throw new ArgumentException("Capacity cannot be greater than maxLeaksPerHour.");
if(capacity < 0 || maxLeaksPerHour <= 5)
throw new ArgumentException("capacity must be greater than 0 and maxLeaksPerHour must be greater than 5.");

_capacity = capacity;
var ticksPerSecond = Stopwatch.Frequency; // Use stopwatch, it's more accurate than DateTime
_ticksPerHour = ticksPerSecond * 60 * 60; // Avoid using TimeSpan, its tick frequency may be different
var currentTicks = Stopwatch.GetTimestamp();
_lastRestoreTicks = currentTicks;
_ticksPerRestore = _ticksPerHour / (maxLeaksPerHour - capacity); // subtract capacity to make sure we never exceed maxRequestsPerHour even if the bucket is full
_waterlevel = capacity;
_maxLeaksPerHour = maxLeaksPerHour;
}

public bool Leak()
{
lock (_lock)
{
var currentTicks = Stopwatch.GetTimestamp();
var elapsedTicks = currentTicks - _lastRestoreTicks;
var restored = (int)(elapsedTicks / _ticksPerRestore);
if(restored > 0)
{
_lastRestoreTicks += restored * _ticksPerRestore;
_waterlevel = Math.Min(_waterlevel + restored, _capacity);
}

if(_waterlevel > 0)
{
_waterlevel--;
return true;
}

return false;
}
}
}
61 changes: 61 additions & 0 deletions src/MatrixTextClient/MatrixClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using MatrixTextClient.Models;
using MatrixTextClient.Http;

namespace MatrixTextClient;

/// <summary>
/// The low level class for interacting with the Matrix server.
/// </summary>
public sealed class MatrixClient
{
public MatrixClientCore Core { get; private init; }
public ILogger Logger => Core.Logger;
public MatrixId User => Core.User;

private MatrixClient(MatrixClientCore core)
{
Core = core ?? throw new ArgumentNullException(nameof(core));
}

public static async Task<MatrixClient> ConnectAsync(string userId, string password, string deviceId, IHttpClientFactory httpClientFactory, CancellationToken cancellationToken, ILogger? logger = null)
{
var core = await MatrixClientCore.ConnectAsync(userId, password, deviceId, httpClientFactory, cancellationToken, logger).ConfigureAwait(false);
var client = new MatrixClient(core);
return client;
}

public async Task SyncAsync(int? millisecondsBetweenRequests = 1000, MatrixClientCore.SyncReceivedHandler? handler = null)
{
if (handler == null)
handler = DefaultSyncReceivedHandler;

var request = new SyncParameters
{
FullState = false,
SetPresence = Presence.Online,
Timeout = 60000
};

while (!Core.HttpClientParameters.CancellationToken.IsCancellationRequested)
{
var response = await MatrixHelper.GetSyncAsync(Core.HttpClientParameters, request).ConfigureAwait(false);
if (response != null)
{
await handler(Core, response);
request.Since = response.NextBatch;
}
//Throttle the requests
if (millisecondsBetweenRequests != null)
await Task.Delay(millisecondsBetweenRequests.Value, Core.HttpClientParameters.CancellationToken).ConfigureAwait(false);
}
}

public static Task DefaultSyncReceivedHandler(MatrixClientCore client, SyncResponse response)
{
client.Logger.LogInformation("Received sync response");
return Task.CompletedTask;
}
}

9 changes: 9 additions & 0 deletions src/MatrixTextClient/Models/Responses.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,19 @@ public class Capabilities
[JsonPropertyName("m.room_versions")]
public RoomVersionsCapability? RoomVersions { get; set; }

[JsonPropertyName("com.example.custom.ratelimit")]
public RateLimit? RateLimit { get; set; }

[JsonExtensionData]
public Dictionary<string, JsonElement>? AdditionalProps { get; set; }
}

public class RateLimit
{
[JsonPropertyName("max_requests_per_hour")]
public required int MaxRequestsPerHour { get; set; }
}

public class BooleanCapability
{
[JsonPropertyName("enabled")]
Expand Down

0 comments on commit 2718f53

Please sign in to comment.