diff --git a/src/Equinox.MemoryStore/MemoryStore.fs b/src/Equinox.MemoryStore/MemoryStore.fs index d108c205d..855a9bcca 100644 --- a/src/Equinox.MemoryStore/MemoryStore.fs +++ b/src/Equinox.MemoryStore/MemoryStore.fs @@ -37,7 +37,7 @@ type VolatileStore<'Format>() = events /// Attempts a synchronization operation - yields conflicting value if expectedCount does not match - member _.TrySync(streamName, _categoryName, _streamId, expectedCount, events): struct (bool * FsCodec.ITimelineEvent<'Format>[]) = + member _.TrySync(streamName, expectedCount, events): struct (bool * FsCodec.ITimelineEvent<'Format>[]) = // Where attempts overlap on the same stream, there's a race to raise the Committed event for each 'commit' // If we don't serialize the publishing of the events, its possible for handlers to observe the Events out of order // NOTE while a Channels based impl might offer better throughput at load, in practical terms serializing all Committed event notifications @@ -47,6 +47,8 @@ type VolatileStore<'Format>() = let struct (succeeded, _) as outcome = trySync streamName expectedCount events if succeeded then committed.Trigger(FsCodec.StreamName.Internal.trust streamName, events) outcome + [] + member x.TrySync(streamName, _categoryName, _streamId, expectedCount, events): struct (bool * FsCodec.ITimelineEvent<'Format>[]) = x.TrySync(streamName, expectedCount, events) type private StoreCategory<'event, 'state, 'req, 'Format>(store: VolatileStore<'Format>, codec, fold, initial) = let fold s xs = (fold : System.Func<'state, 'event[], 'state>).Invoke(s, xs) @@ -60,7 +62,7 @@ type private StoreCategory<'event, 'state, 'req, 'Format>(store: VolatileStore<' | xs -> return res xs.Length initial (decode xs) } member _.Sync(_log, categoryName, streamId, streamName, req, token, state, events, _ct) = task { let encoded = events |> Array.mapi (fun i e -> FsCodec.Core.TimelineEvent.Create(token.version + int64 i, codec.Encode(req, e))) - match store.TrySync(streamName, categoryName, streamId, int token.version, encoded) with + match store.TrySync(streamName, int token.version, encoded) with | true, streamEvents' -> return SyncResult.Written (res streamEvents'.Length state events) | false, conflictingEvents ->