Skip to content

Commit

Permalink
refactor!: Rename AsyncLazy/CacheCell -> LazyTask/TaskCell (#433)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Nov 25, 2023
1 parent 6ffcbac commit 30092d5
Show file tree
Hide file tree
Showing 12 changed files with 32 additions and 31 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Equinox.Decider`: Replace `maxAttempts` with a default policy and an optional argument on `Transact*` APIs [#337](https://github.com/jet/equinox/pull/337)
- `Equinox.Decider`: rename `Decider.TransactAsync`, `Decider.TransactExAsync` to `Transact` [#314](https://github.com/jet/equinox/pull/314)
- `Equinox.Core.AsyncBatchingGate`: renamed to `Batching.Batcher` [#390](https://github.com/jet/equinox/pull/390)
- `Equinox.Core`: Now a free-standing library that a) does not depend on `Equinox` b) is not depended on by the Stores (though `CosmosStore` inlines `AsyncCacheCell`) [#420](https://github.com/jet/equinox/pull/420)
- `Equinox.Core.AsyncCacheCell`: renamed to `TaskCell` [#433](https://github.com/jet/equinox/pull/433)
- `Equinox.Core`: Now a free-standing library that a) does not depend on `Equinox` b) is not depended on by the Stores (though `CosmosStore` inlines `TaskCell`) [#420](https://github.com/jet/equinox/pull/420)
- Stores: Change Event Body types, requiring `FsCodec` v `3.0.0`, with [`EventBody` types switching from `byte[]` to `ReadOnlyMemory<byte>` and/or `JsonElement` see FsCodec#75](https://github.com/jet/FsCodec/pull/75) [#323](https://github.com/jet/equinox/pull/323)
- Stores: `*Category.Resolve`: Replace `Resolve(sn, ?ResolveOption, ?requestContext)` with `?load = LoadOption` parameter on all `Transact` and `Query` methods, and `Decider.forStream`/`Decider.forRequest` to convey request context [#308](https://github.com/jet/equinox/pull/308)
- Stores: `*Category` ctor: Add mandatory `name` argument, and `Name` property [#410](https://github.com/jet/equinox/pull/410)
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ The components within this repository are delivered as multi-targeted Nuget pack

## Data Store libraries

- `Equinox.Core` [![NuGet](https://img.shields.io/nuget/v/Equinox.Core.svg)](https://www.nuget.org/packages/Equinox.Core/): Hosts generic utility types frequently useful alongside Equinox: [`AsyncCacheCell`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/AsyncCacheCell.fs#L36), [`Batcher`, `BatcherCache`, `BatcherDictionary`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/Batching.fs#L44). ([depends](https://www.fuget.org/packages/Equinox.Core) on `System.Runtime.Caching`)
- `Equinox.Core` [![NuGet](https://img.shields.io/nuget/v/Equinox.Core.svg)](https://www.nuget.org/packages/Equinox.Core/): Hosts generic utility types frequently useful alongside Equinox: [`TaskCell`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/TaskCell.fs#L36), [`Batcher`, `BatcherCache`, `BatcherDictionary`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/Batching.fs#L44). ([depends](https://www.fuget.org/packages/Equinox.Core) on `System.Runtime.Caching`)
- `Equinox.MemoryStore` [![MemoryStore NuGet](https://img.shields.io/nuget/v/Equinox.MemoryStore.svg)](https://www.nuget.org/packages/Equinox.MemoryStore/): In-memory store for integration testing/performance base-lining/providing out-of-the-box zero dependency storage for examples. ([depends](https://www.fuget.org/packages/Equinox.MemoryStore) on `Equinox`)
- `Equinox.CosmosStore` [![CosmosStore NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.svg)](https://www.nuget.org/packages/Equinox.CosmosStore/): Azure CosmosDB Adapter with integrated 'unfolds' feature, facilitating optimal read performance in terms of latency and RU costs, instrumented to meet Jet's production monitoring requirements. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore) on `Equinox`, `Equinox`, `Microsoft.Azure.Cosmos` >= `3.27`, `System.Text.Json`, `FSharp.Control.TaskSeq`)
- `Equinox.CosmosStore.Prometheus` [![CosmosStore.Prometheus NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.Prometheus.svg)](https://www.nuget.org/packages/Equinox.CosmosStore.Prometheus/): Integration package providing a `Serilog.Core.ILogEventSink` that extracts detailed metrics information attached to the `LogEvent`s and feeds them to the `prometheus-net`'s `Prometheus.Metrics` static instance. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore.Prometheus) on `Equinox.CosmosStore`, `prometheus-net >= 3.6.0`)
Expand Down
4 changes: 2 additions & 2 deletions src/Equinox.Core/Equinox.Core.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

<ItemGroup>
<Compile Include="..\Equinox\Infrastructure.fs" />
<Compile Include="..\Equinox\AsyncLazy.fs" />
<Compile Include="AsyncCacheCell.fs" />
<Compile Include="..\Equinox\LazyTask.fs" />
<Compile Include="TaskCell.fs" />
<Compile Include="Retry.fs" />
<Compile Include="Batching.fs" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ type
// PUBLIC in Equinox.Core (it can also be used independent of Equinox)
internal
#endif
AsyncCacheCell<'T>(startWorkflow : System.Func<CancellationToken, Task<'T>>, [<O; D null>]?isExpired: System.Func<'T, bool>) =
TaskCell<'T>(startWorkflow : System.Func<CancellationToken, Task<'T>>, [<O; D null>]?isExpired: System.Func<'T, bool>) =

let isValid = match isExpired with Some f -> not << f.Invoke | None -> fun _ -> true
let mutable cell = AsyncLazy<'T>.Empty
let mutable cell = LazyTask<'T>.Empty

/// Synchronously check the value remains valid (to enable short-circuiting an Await step where value not required)
member _.IsValid() =
Expand All @@ -26,7 +26,7 @@ type
| ValueSome res when isValid res -> return res
| _ ->
// Prepare a new instance, with cancellation under our control (it won't start until the first Await on the LazyTask triggers it though)
let newInstance = AsyncLazy<'T>(fun () -> startWorkflow.Invoke ct)
let newInstance = LazyTask<'T>(fun () -> startWorkflow.Invoke ct)
// If there are concurrent executions, the first through the gate wins; everybody else awaits the instance the winner wrote
let _ = Interlocked.CompareExchange(&cell, newInstance, current)
return! cell.Await() }
Expand Down
2 changes: 1 addition & 1 deletion src/Equinox.CosmosStore/CosmosStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ module Initialization =

/// Per Container, we need to ensure the stored procedure has been created exactly once (per process lifetime)
type internal ContainerInitializerGuard(container: Container, ?initContainer: Container -> CancellationToken -> Task<unit>) =
let initGuard = initContainer |> Option.map (fun init -> AsyncCacheCell<unit>(init container))
let initGuard = initContainer |> Option.map (fun init -> TaskCell<unit>(init container))
member val Container = container
/// Coordinates max of one in flight call to the init logic, retrying on next request if it fails. Calls after it has succeeded noop
member _.Initialize(ct): System.Threading.Tasks.ValueTask =
Expand Down
4 changes: 2 additions & 2 deletions src/Equinox.CosmosStore/Equinox.CosmosStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

<ItemGroup>
<Compile Include="..\Equinox\Infrastructure.fs" />
<Compile Include="..\Equinox\AsyncLazy.fs" />
<Compile Include="..\Equinox.Core\AsyncCacheCell.fs" />
<Compile Include="..\Equinox\LazyTask.fs" />
<Compile Include="..\Equinox.Core\TaskCell.fs" />
<Compile Include="..\Equinox.Core\Internal.fs" />
<Compile Include="CosmosStoreSerialization.fs" />
<Compile Include="CosmosStore.fs" />
Expand Down
6 changes: 3 additions & 3 deletions src/Equinox/Cache.fs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type private CacheEntry<'state>(initialToken: StreamToken, initialState: 'state,
let tryGet () =
if verifiedTimestamp = 0L then ValueNone // 0 => Null cache entry
else ValueSome (struct (currentToken, currentState))
let mutable cell = AsyncLazy<struct(int64 * (struct (StreamToken * 'state)))>.Empty
let mutable cell = LazyTask<struct(int64 * (struct (StreamToken * 'state)))>.Empty
static member CreateEmpty() =
new CacheEntry<'state>(Unchecked.defaultof<StreamToken>, Unchecked.defaultof<'state>, 0L) // 0 => Null cache entry signifies token and state both invalid
/// Attempt to retrieve the cached state, and associated token (ValueNone if this is a placeholder entry that's yet to complete its first Load operation)
Expand All @@ -31,7 +31,7 @@ type private CacheEntry<'state>(initialToken: StreamToken, initialState: 'state,
if verifiedTimestamp < timestamp then // Don't count attempts to overwrite with stale state as verification
verifiedTimestamp <- timestamp
/// Coordinates having a max of one in-flight request across all staleness-tolerant loads at all times
// Follows high level flow of AsyncCacheCell.Await - read the comments there, and the AsyncCacheCell tests first!
// Follows high level flow of TaskCell.Await - read the comments there, and the TaskCell tests first!
member x.ReadThrough(maxAge: TimeSpan, isStale, load: Func<_, _>) = task {
let cacheEntryValidityCheckTimestamp = System.Diagnostics.Stopwatch.GetTimestamp()
let isWithinMaxAge cachedValueTimestamp = Stopwatch.TicksToSeconds(cacheEntryValidityCheckTimestamp - cachedValueTimestamp) <= maxAge.TotalSeconds
Expand All @@ -47,7 +47,7 @@ type private CacheEntry<'state>(initialToken: StreamToken, initialState: 'state,
| _ ->

// .. it wasn't; join the race to dispatch a request (others following us will share our fate via the TryAwaitValid)
let newInstance = AsyncLazy(fun () -> load.Invoke maybeBaseState)
let newInstance = LazyTask(fun () -> load.Invoke maybeBaseState)
let _ = Interlocked.CompareExchange(&cell, newInstance, ourInitialCellState)
let! timestamp, (token, state as res) = cell.Await()
x.MergeUpdates(isStale, timestamp, token, state) // merge observed result into the cache
Expand Down
2 changes: 1 addition & 1 deletion src/Equinox/Equinox.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<Compile Include="Stream.fs" />
<Compile Include="Decider.fs" />
<Compile Include="Category.fs" />
<Compile Include="AsyncLazy.fs" />
<Compile Include="LazyTask.fs" />
<Compile Include="Cache.fs" />
<Compile Include="Caching.fs" />
</ItemGroup>
Expand Down
8 changes: 4 additions & 4 deletions src/Equinox/AsyncLazy.fs → src/Equinox/LazyTask.fs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ namespace Equinox.Core
type
#if !EQUINOX_CORE
// NOT PUBLIC in Equinox library - used internally in the impl of CacheEntry
// PUBLIC in Equinox.Core (which also uses it in the impl of AsyncCacheCell)
// PUBLIC in Equinox.Core (which also uses it in the impl of TaskCell)
internal
#endif
AsyncLazy<'T>(startTask: System.Func<Task<'T>>) =
// NOTE due to `Lazy<T>` semantics, failed attempts will cache any exception; AsyncCacheCell compensates for this by rolling over to a new instance
LazyTask<'T>(startTask: System.Func<Task<'T>>) =
// NOTE due to `Lazy<T>` semantics, failed attempts will cache any exception; TaskCell compensates for this by rolling over to a new instance
let workflow = lazy startTask.Invoke()

/// Synchronously peek at what's been previously computed (if it's not Empty, or the last attempt Faulted).
Expand All @@ -33,4 +33,4 @@ type
member _.Await() = workflow.Value

/// Singleton Empty value
static member val Empty = AsyncLazy(fun () -> Task.FromException<'T>(System.InvalidOperationException "Uninitialized AsyncLazy"))
static member val Empty = LazyTask(fun () -> Task.FromException<'T>(System.InvalidOperationException "Uninitialized LazyTask"))
4 changes: 2 additions & 2 deletions tests/Equinox.Core.Tests/Equinox.Core.Tests.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

<ItemGroup>
<Compile Include="..\..\src\Equinox\Infrastructure.fs" />
<Compile Include="AsyncLazyTests.fs" />
<Compile Include="AsyncCacheCellTests.fs" />
<Compile Include="LazyTaskTests.fs" />
<Compile Include="TaskCellTests.fs" />
<Compile Include="BatchingTests.fs" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
module Equinox.Tests.AsyncLazyTests
module Equinox.Tests.LazyTaskTests

open Equinox.Core
open Swensen.Unquote
open Xunit

[<Fact>]
let ``AsyncLazy correctness`` () = async {
let ``LazyTask correctness`` () = async {
// ensure that the encapsulated computation fires only once
let mutable count = 0
let cell = AsyncLazy(fun () -> task { return System.Threading.Interlocked.Increment &count })
let cell = LazyTask(fun () -> task { return System.Threading.Interlocked.Increment &count })
test <@ cell.TryCompleted() |> ValueOption.isNone @>
let! accessResult = [|1 .. 100|] |> Array.map (fun _ -> cell.Await() |> Async.AwaitTask) |> Async.Parallel
test <@ cell.TryCompleted() |> ValueOption.isSome @>
test <@ accessResult |> Array.forall ((=) 1) @> }

// Pinning the fact that the algorithm is not sensitive to the reuse of the initial value of a cache entry
let [<Fact>] ``AsyncLazy.Empty is a true singleton, does not allocate`` () =
let i1 = AsyncLazy<int>.Empty
let i2 = AsyncLazy<int>.Empty
let [<Fact>] ``LazyTask.Empty is a true singleton, does not allocate`` () =
let i1 = LazyTask<int>.Empty
let i2 = LazyTask<int>.Empty
test <@ obj.ReferenceEquals(i1, i2) @>
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
module Equinox.Core.Tests.AsyncCacheCellTests
module Equinox.Core.Tests.TaskCellTests

open Equinox.Core
open Swensen.Unquote
open System
open Xunit

[<Fact>]
let ``AsyncCacheCell correctness`` () = async {
let ``TaskCell correctness`` () = async {
// ensure that the encapsulated computation fires only once and that expiry functions as expected
let mutable state = 0
let mutable expectedValue = 1
let cell = AsyncCacheCell((fun _ct -> task { return Interlocked.Increment &state }), fun value -> value <> expectedValue)
let cell = TaskCell((fun _ct -> task { return Interlocked.Increment &state }), fun value -> value <> expectedValue)

false =! cell.IsValid()

Expand All @@ -26,7 +26,7 @@ let ``AsyncCacheCell correctness`` () = async {
}

[<Theory; InlineData false; InlineData true>]
let ``AsyncCacheCell correctness with throwing`` initiallyThrowing = async {
let ``TaskCell correctness with throwing`` initiallyThrowing = async {
// ensure that the encapsulated computation fires only once and that expiry functions as expected
let mutable state = 0
let mutable expectedValue = 1
Expand All @@ -39,7 +39,7 @@ let ``AsyncCacheCell correctness with throwing`` initiallyThrowing = async {
return r
}

let cell = AsyncCacheCell(update, fun value -> value <> expectedValue)
let cell = TaskCell(update, fun value -> value <> expectedValue)
false =! cell.IsValid()

// If the runner is throwing, we want to be sure it doesn't place us in a failed state forever, per the semantics of Lazy<T>
Expand Down

0 comments on commit 30092d5

Please sign in to comment.