Skip to content

Commit

Permalink
CustomCommandManager + CustomCommandManagerSession to use auto-expand…
Browse files Browse the repository at this point in the history
…able maps (#848)

* wip

* wip

* fixes

* wip

* wip

* Comments + tests

* wip

* fixes

* fix

* format

* Adding a non-concurrent ExpandableMap

* small fix

* Switching to SingleWriterMultiReaderLock

* Some thread-safety related fixes

* Added some async tests for command registration

* small bugfixes

* format
  • Loading branch information
TalZaccai authored Dec 13, 2024
1 parent 0d4c744 commit f2c2261
Show file tree
Hide file tree
Showing 26 changed files with 919 additions and 278 deletions.
262 changes: 99 additions & 163 deletions libs/server/Custom/CustomCommandManager.cs

Large diffs are not rendered by default.

77 changes: 53 additions & 24 deletions libs/server/Custom/CustomCommandManagerSession.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System.Diagnostics;
using Garnet.common;

namespace Garnet.server
Expand All @@ -13,53 +14,81 @@ internal sealed class CustomCommandManagerSession
readonly CustomCommandManager customCommandManager;

// These session specific arrays are indexed by the same ID as the arrays in CustomCommandManager
readonly (CustomTransactionProcedure, int)[] sessionTransactionProcMap;
readonly CustomProcedure[] sessionCustomProcMap;

ExpandableMap<CustomTransactionProcedureWithArity> sessionTransactionProcMap;
ExpandableMap<CustomProcedure> sessionCustomProcMap;

public CustomCommandManagerSession(CustomCommandManager customCommandManager)
{
this.customCommandManager = customCommandManager;
sessionTransactionProcMap = new (CustomTransactionProcedure, int)[CustomCommandManager.MaxRegistrations];
sessionCustomProcMap = new CustomProcedure[CustomCommandManager.MaxRegistrations];
sessionTransactionProcMap = new ExpandableMap<CustomTransactionProcedureWithArity>(CustomCommandManager.MinMapSize, 0, byte.MaxValue);
sessionCustomProcMap = new ExpandableMap<CustomProcedure>(CustomCommandManager.MinMapSize, 0, byte.MaxValue);
}

public CustomProcedure GetCustomProcedure(int id, RespServerSession respServerSession)
{
if (sessionCustomProcMap[id] == null)
if (!sessionCustomProcMap.TryGetValue(id, out var customProc))
{
var entry = customCommandManager.customProcedureMap[id] ?? throw new GarnetException($"Custom procedure {id} not found");
sessionCustomProcMap[id] = entry.CustomProcedureFactory();
sessionCustomProcMap[id].respServerSession = respServerSession;
if (!customCommandManager.TryGetCustomProcedure(id, out var entry))
throw new GarnetException($"Custom procedure {id} not found");

customProc = entry.CustomProcedureFactory();
customProc.respServerSession = respServerSession;
var setSuccessful = sessionCustomProcMap.TrySetValue(id, ref customProc);
Debug.Assert(setSuccessful);
}

return sessionCustomProcMap[id];
return customProc;
}

public (CustomTransactionProcedure, int) GetCustomTransactionProcedure(int id, RespServerSession respServerSession, TransactionManager txnManager, ScratchBufferManager scratchBufferManager)
public CustomTransactionProcedure GetCustomTransactionProcedure(int id, RespServerSession respServerSession, TransactionManager txnManager, ScratchBufferManager scratchBufferManager, out int arity)
{
if (sessionTransactionProcMap[id].Item1 == null)
if (sessionTransactionProcMap.Exists(id))
{
var entry = customCommandManager.transactionProcMap[id] ?? throw new GarnetException($"Transaction procedure {id} not found");
_ = customCommandManager.CustomCommandsInfo.TryGetValue(entry.NameStr, out var cmdInfo);
return GetCustomTransactionProcedure(entry, respServerSession, txnManager, scratchBufferManager, cmdInfo?.Arity ?? 0);
ref var customTranProc = ref sessionTransactionProcMap.GetValueByRef(id);
if (customTranProc.Procedure != null)
{
arity = customTranProc.Arity;
return customTranProc.Procedure;
}
}
return sessionTransactionProcMap[id];

if (!customCommandManager.TryGetCustomTransactionProcedure(id, out var entry))
throw new GarnetException($"Transaction procedure {id} not found");
_ = customCommandManager.customCommandsInfo.TryGetValue(entry.NameStr, out var cmdInfo);
arity = cmdInfo?.Arity ?? 0;
return GetCustomTransactionProcedureAndSetArity(entry, respServerSession, txnManager, scratchBufferManager, cmdInfo?.Arity ?? 0);
}

public (CustomTransactionProcedure, int) GetCustomTransactionProcedure(CustomTransaction entry, RespServerSession respServerSession, TransactionManager txnManager, ScratchBufferManager scratchBufferManager, int arity)
private CustomTransactionProcedure GetCustomTransactionProcedureAndSetArity(CustomTransaction entry, RespServerSession respServerSession, TransactionManager txnManager, ScratchBufferManager scratchBufferManager, int arity)
{
int id = entry.id;
if (sessionTransactionProcMap[id].Item1 == null)

var customTranProc = new CustomTransactionProcedureWithArity(entry.proc(), arity)
{
sessionTransactionProcMap[id].Item1 = entry.proc();
sessionTransactionProcMap[id].Item2 = arity;
Procedure =
{
txnManager = txnManager,
scratchBufferManager = scratchBufferManager,
respServerSession = respServerSession
}
};
var setSuccessful = sessionTransactionProcMap.TrySetValue(id, ref customTranProc);
Debug.Assert(setSuccessful);

return customTranProc.Procedure;
}

private struct CustomTransactionProcedureWithArity
{
public CustomTransactionProcedure Procedure { get; }

sessionTransactionProcMap[id].Item1.txnManager = txnManager;
sessionTransactionProcMap[id].Item1.scratchBufferManager = scratchBufferManager;
sessionTransactionProcMap[id].Item1.respServerSession = respServerSession;
public int Arity { get; }

public CustomTransactionProcedureWithArity(CustomTransactionProcedure procedure, int arity)
{
this.Procedure = procedure;
this.Arity = arity;
}
return sessionTransactionProcMap[id];
}
}
}
4 changes: 2 additions & 2 deletions libs/server/Custom/CustomCommandRegistration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,9 @@ public override void Register(CustomCommandManager customCommandManager)
RegisterArgs.Name,
RegisterArgs.CommandType,
factory,
RegisterArgs.ObjectCommand,
RegisterArgs.CommandInfo,
RegisterArgs.CommandDocs);
RegisterArgs.CommandDocs,
RegisterArgs.ObjectCommand);
}
}

Expand Down
9 changes: 4 additions & 5 deletions libs/server/Custom/CustomObjectCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@

namespace Garnet.server
{
public class CustomObjectCommand
public class CustomObjectCommand : ICustomCommand
{
public byte[] Name { get; }

public readonly string NameStr;
public readonly byte[] name;
public readonly byte id;
public readonly byte subid;
public readonly CommandType type;
Expand All @@ -16,14 +17,12 @@ public class CustomObjectCommand
internal CustomObjectCommand(string name, byte id, byte subid, CommandType type, CustomObjectFactory factory, CustomObjectFunctions functions = null)
{
NameStr = name.ToUpperInvariant();
this.name = System.Text.Encoding.ASCII.GetBytes(NameStr);
this.Name = System.Text.Encoding.ASCII.GetBytes(NameStr);
this.id = id;
this.subid = subid;
this.type = type;
this.factory = factory;
this.functions = functions;
}

internal GarnetObjectType GetObjectType() => (GarnetObjectType)(id + CustomCommandManager.TypeIdStartOffset);
}
}
8 changes: 5 additions & 3 deletions libs/server/Custom/CustomObjectCommandWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@ namespace Garnet.server
/// </summary>
class CustomObjectCommandWrapper
{
static readonly int MinMapSize = 8;
static readonly byte MaxSubId = 31; // RespInputHeader uses the 3 MSBs of SubId, so SubId must fit in the 5 LSBs

public readonly byte id;
public readonly CustomObjectFactory factory;
public int CommandId = 0;
public readonly CustomObjectCommand[] commandMap;
public ConcurrentExpandableMap<CustomObjectCommand> commandMap;

public CustomObjectCommandWrapper(byte id, CustomObjectFactory functions)
{
this.id = id;
this.factory = functions;
this.commandMap = new CustomObjectCommand[byte.MaxValue];
this.commandMap = new ConcurrentExpandableMap<CustomObjectCommand>(MinMapSize, 0, MaxSubId);
}
}
}
5 changes: 3 additions & 2 deletions libs/server/Custom/CustomProcedureWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ public abstract bool Execute<TGarnetApi>(TGarnetApi garnetApi, ref CustomProcedu
where TGarnetApi : IGarnetApi;
}

class CustomProcedureWrapper
class CustomProcedureWrapper : ICustomCommand
{
public byte[] Name { get; }

public readonly string NameStr;
public readonly byte[] Name;
public readonly byte Id;
public readonly Func<CustomProcedure> CustomProcedureFactory;

Expand Down
9 changes: 4 additions & 5 deletions libs/server/Custom/CustomRawStringCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@

namespace Garnet.server
{
public class CustomRawStringCommand
public class CustomRawStringCommand : ICustomCommand
{
public byte[] Name { get; }

public readonly string NameStr;
public readonly byte[] name;
public readonly ushort id;
public readonly CommandType type;
public readonly CustomRawStringFunctions functions;
Expand All @@ -15,13 +16,11 @@ public class CustomRawStringCommand
internal CustomRawStringCommand(string name, ushort id, CommandType type, CustomRawStringFunctions functions, long expirationTicks)
{
NameStr = name.ToUpperInvariant();
this.name = System.Text.Encoding.ASCII.GetBytes(NameStr);
this.Name = System.Text.Encoding.ASCII.GetBytes(NameStr);
this.id = id;
this.type = type;
this.functions = functions;
this.expirationTicks = expirationTicks;
}

internal RespCommand GetRespCommand() => (RespCommand)(id + CustomCommandManager.StartOffset);
}
}
6 changes: 3 additions & 3 deletions libs/server/Custom/CustomRespCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private bool TryTransactionProc(byte id, CustomTransactionProcedure proc, int st
public bool RunTransactionProc(byte id, ref CustomProcedureInput procInput, ref MemoryResult<byte> output)
{
var proc = customCommandManagerSession
.GetCustomTransactionProcedure(id, this, txnManager, scratchBufferManager).Item1;
.GetCustomTransactionProcedure(id, this, txnManager, scratchBufferManager, out _);
return txnManager.RunTransactionProc(id, ref procInput, proc, ref output);

}
Expand Down Expand Up @@ -226,7 +226,7 @@ public bool InvokeCustomRawStringCommand<TGarnetApi>(ref TGarnetApi storageApi,
var sbKey = key.SpanByte;
var inputArg = customCommand.expirationTicks > 0 ? DateTimeOffset.UtcNow.Ticks + customCommand.expirationTicks : customCommand.expirationTicks;
customCommandParseState.InitializeWithArguments(args);
var rawStringInput = new RawStringInput(customCommand.GetRespCommand(), ref customCommandParseState, arg1: inputArg);
var rawStringInput = new RawStringInput((RespCommand)customCommand.id, ref customCommandParseState, arg1: inputArg);

var _output = new SpanByteAndMemory(null);
GarnetStatus status;
Expand Down Expand Up @@ -290,7 +290,7 @@ public bool InvokeCustomObjectCommand<TGarnetApi>(ref TGarnetApi storageApi, Cus
var keyBytes = key.ToArray();

// Prepare input
var header = new RespInputHeader(customObjCommand.GetObjectType()) { SubId = customObjCommand.subid };
var header = new RespInputHeader((GarnetObjectType)customObjCommand.id) { SubId = customObjCommand.subid };
customCommandParseState.InitializeWithArguments(args);
var input = new ObjectInput(header, ref customCommandParseState);

Expand Down
7 changes: 4 additions & 3 deletions libs/server/Custom/CustomTransaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@

namespace Garnet.server
{
class CustomTransaction
class CustomTransaction : ICustomCommand
{
public byte[] Name { get; }

public readonly string NameStr;
public readonly byte[] name;
public readonly byte id;
public readonly Func<CustomTransactionProcedure> proc;

Expand All @@ -18,7 +19,7 @@ internal CustomTransaction(string name, byte id, Func<CustomTransactionProcedure
if (name == null)
throw new GarnetException("CustomTransaction name is null");
NameStr = name.ToUpperInvariant();
this.name = System.Text.Encoding.ASCII.GetBytes(NameStr);
this.Name = System.Text.Encoding.ASCII.GetBytes(NameStr);
this.id = id;
this.proc = proc ?? throw new GarnetException("CustomTransactionProcedure is null");
}
Expand Down
Loading

0 comments on commit f2c2261

Please sign in to comment.