Skip to content

Commit

Permalink
Blocking command support (BLPOP/BRPOP & BLMOVE) (#356)
Browse files Browse the repository at this point in the history
* wip - adding brpop

* wip

* wip

* small fix

* wip - adding updated broker

* fixes

* more fixes

* Removing unnecessary code

* Adding BLPOP

* fixes

* Adding comments

* formatting

* wip

* wip

* wip

* Added command info + website docs for BRPOP / BLPOP

* refactoring CollectionItemBroker

* Added cancellation token for observer

* Added timeout to tests

* format

* Limiting batch size in BasicListBlockingPopTest

* Fix

* Updating RespCommandsInfo.json

* Added BLMOVE

* format

* Fixing tests + bugfix in CommandInfoUpdater

* Added docs for collection broker

* wip

* wip

* format

* wip

* Switching CollectionItemObserver back to class

---------

Co-authored-by: Badrish Chandramouli <[email protected]>
  • Loading branch information
TalZaccai and badrishc authored Jun 21, 2024
1 parent b503e3a commit b322f29
Show file tree
Hide file tree
Showing 25 changed files with 1,771 additions and 294 deletions.
5 changes: 4 additions & 1 deletion libs/host/GarnetServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class GarnetServer : IDisposable
private IDevice aofDevice;
private TsavoriteLog appendOnlyFile;
private SubscribeBroker<SpanByte, SpanByte, IKeySerializer<SpanByte>> broker;
private CollectionItemBroker itemBroker;
private LogSettings logSettings, objLogSettings;
private INamedDeviceFactory logFactory;
private MemoryLogger initLogger;
Expand Down Expand Up @@ -234,6 +235,8 @@ private void InitializeServer()
revivificationSettings: objRevivSettings, logger: this.loggerFactory?.CreateLogger("TsavoriteKV [obj]"));
if (objTotalMemorySize > 0)
objectStoreSizeTracker = new CacheSizeTracker(objectStore, objLogSettings, objTotalMemorySize, this.loggerFactory);

itemBroker = new CollectionItemBroker();
}

if (!opts.DisablePubSub)
Expand Down Expand Up @@ -279,7 +282,7 @@ private void InitializeServer()
storeWrapper = new StoreWrapper(version, redisProtocolVersion, server, store, objectStore, objectStoreSizeTracker, customCommandManager, appendOnlyFile, opts, clusterFactory: clusterFactory, loggerFactory: loggerFactory);

// Create session provider for Garnet
Provider = new GarnetProvider(storeWrapper, broker);
Provider = new GarnetProvider(storeWrapper, broker, itemBroker);

// Create user facing API endpoints
Metrics = new MetricsApi(Provider);
Expand Down
2 changes: 1 addition & 1 deletion libs/server/AOF/AofProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public AofProcessor(
accessControlList: storeWrapper.accessControlList,
loggerFactory: storeWrapper.loggerFactory);

this.respServerSession = new RespServerSession(null, replayAofStoreWrapper, null);
this.respServerSession = new RespServerSession(null, replayAofStoreWrapper, null, null);

var session = respServerSession.storageSession.basicContext.Session;
basicContext = session.BasicContext;
Expand Down
1 change: 1 addition & 0 deletions libs/server/API/IGarnetApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,7 @@ public interface IGarnetApi : IGarnetReadApi, IGarnetAdvancedApi
/// <param name="count"></param>
/// <returns></returns>
GarnetStatus ListRightPop(ArgSlice key, int count, out ArgSlice[] elements);

#endregion

/// <summary>
Expand Down
Loading

0 comments on commit b322f29

Please sign in to comment.