Skip to content

Commit

Permalink
refactor: Track Equinox, Propulsion updates (#131)
Browse files Browse the repository at this point in the history
* Category names
* Handle arrays
* module Stream
* Sink/Pruner Stats
* Match -> For
  • Loading branch information
bartelink authored Aug 26, 2023
1 parent b95da5a commit d965067
Show file tree
Hide file tree
Showing 148 changed files with 1,912 additions and 1,880 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ The `Unreleased` section name is replaced by the expected version of next releas

### Added
### Changed

- Target `Equinox` v `4.0.0-rc.13`, `Propulsion` v `3.0.0-rc.8.10`

### Removed
### Fixed

Expand All @@ -18,7 +21,7 @@ The `Unreleased` section name is replaced by the expected version of next releas

### Changed

- Target `Equinox` v `4.0.0-rc.11`, `Propulsion` v `3.0.0-rc.6~~~~`
- Target `Equinox` v `4.0.0-rc.11`, `Propulsion` v `3.0.0-rc.6`

- <a name="6.3.0"></a>
## [6.3.0] - 2023-06-08
Expand Down
9 changes: 9 additions & 0 deletions dotnet-templates.sln.DotSettings
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:Boolean x:Key="/Default/UserDictionary/Words/=discernable/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Esdb/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Favorited/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=idempotently/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=roundtripping/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Unfavorite/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Unfavorited/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=wireup/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
2 changes: 1 addition & 1 deletion equinox-patterns/Domain.Tests/ExactlyOnceIngesterTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Custom =

let [<Property>] properties shouldUseSameSut (Gap gap) (initialEpochId, NonEmptyArray (Ids initialItems)) (NonEmptyArray (Ids items)) = async {

let store = Equinox.MemoryStore.VolatileStore() |> Store.Context.Memory
let store = Equinox.MemoryStore.VolatileStore() |> Store.Config.Memory

let mutable nextEpochId = initialEpochId
for _ in 1 .. gap do nextEpochId <- ExactlyOnceIngester.Internal.next nextEpochId
Expand Down
2 changes: 1 addition & 1 deletion equinox-patterns/Domain.Tests/PeriodsCarryingForward.fs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ open Xunit

[<Fact>]
let ``Happy path`` () =
let store = Equinox.MemoryStore.VolatileStore() |> Store.Context.Memory
let store = Equinox.MemoryStore.VolatileStore() |> Store.Config.Memory
let service = Factory.create store
let decide items _state =
let apply = Array.truncate 2 items
Expand Down
8 changes: 5 additions & 3 deletions equinox-patterns/Domain/Domain.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Equinox.MemoryStore" Version="4.0.0-rc.11" />
<PackageReference Include="Equinox.CosmosStore" Version="4.0.0-rc.11" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.10" />
<!-- Equinox.Core.Batching -->
<PackageReference Include="Equinox.Core" Version="4.0.0-rc.13" />
<PackageReference Include="Equinox.MemoryStore" Version="4.0.0-rc.13" />
<PackageReference Include="Equinox.CosmosStore" Version="4.0.0-rc.13" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.10.1" />
</ItemGroup>

</Project>
4 changes: 2 additions & 2 deletions equinox-patterns/Domain/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ type Accumulator<'e, 's>(originState: 's, fold: 's -> seq<'e> -> 's) =
let r, Apply = decide state in r

/// Accumulated events based on the Decisions applied to date
member _.Events: 'e list =
List.ofSeq pendingEvents
member _.Events: 'e[] =
pendingEvents.ToArray()

// /// Run a decision function that does not yield a result
// member x.Transact(interpret): unit =
Expand Down
28 changes: 16 additions & 12 deletions equinox-patterns/Domain/ListEpoch.fs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
module Patterns.Domain.ListEpoch

let [<Literal>] Category = "ListEpoch"
let streamId = Equinox.StreamId.gen ListEpochId.toString
module private Stream =
let [<Literal>] Category = "ListEpoch"
let id = FsCodec.StreamId.gen ListEpochId.toString

// NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
[<RequireQualifiedAccess>]
Expand Down Expand Up @@ -35,16 +36,16 @@ let decide shouldClose candidateIds = function
let added, events =
// TOCONSIDER in general, one would expect the inputs to naturally be distinct
match candidateIds |> Array.except currentIds (*|> Array.distinct*) with
| [||] -> [||], []
| [||] -> [||], [||]
| news ->
let closing = shouldClose news currentIds
let ingestEvent = Events.Ingested {| ids = news |}
news, if closing then [ ingestEvent ; Events.Closed ] else [ ingestEvent ]
news, [| ingestEvent; if closing then Events.Closed |]
let _, closed = Fold.fold state events
let res: ExactlyOnceIngester.IngestResult<_, _> = { accepted = added; closed = closed; residual = [||] }
res, events
| currentIds, true ->
{ accepted = [||]; closed = true; residual = candidateIds |> Array.except currentIds (*|> Array.distinct*) }, []
{ accepted = [||]; closed = true; residual = candidateIds |> Array.except currentIds (*|> Array.distinct*) }, [||]

// NOTE see feedSource for example of separating Service logic into Ingestion and Read Services in order to vary the folding and/or state held
type Service internal
Expand All @@ -58,18 +59,21 @@ type Service internal
// NOTE decider which will initially transact against potentially stale cached state, which will trigger a
// resync if another writer has gotten in before us. This is a conscious decision in this instance; the bulk
// of writes are presumed to be coming from within this same process
decider.Transact(decide shouldClose items, load = Equinox.AnyCachedValue)
decider.Transact(decide shouldClose items, load = Equinox.LoadOption.AnyCachedValue)

/// Returns all the items currently held in the stream (Not using AnyCachedValue on the assumption this needs to see updates from other apps)
member _.Read epochId: Async<Fold.State> =
/// Returns all the items currently held in the stream
/// Accommodates for Ingest logic running in another process / on another machine
member _.Read epochId: Async<Fold.State> = async {
let decider = resolve epochId
decider.Query(id, Equinox.AllowStale (System.TimeSpan.FromSeconds 1))
let! _, closed as res = decider.Query(id, Equinox.LoadOption.AnyCachedValue)
if closed then return res // Once the Epoch is closed, no new tickets ca ever be entered so no re-reads needed
else return! decider.Query(id, Equinox.LoadOption.AllowStale (System.TimeSpan.FromSeconds 1)) }

module Factory =

let private (|Category|) = function
| Store.Context.Memory store -> Store.Memory.create Events.codec Fold.initial Fold.fold store
| Store.Context.Cosmos (context, cache) -> Store.Cosmos.createSnapshotted Events.codecJe Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
| Store.Config.Memory store -> Store.Memory.create Stream.Category Events.codec Fold.initial Fold.fold store
| Store.Config.Cosmos (context, cache) -> Store.Cosmos.createSnapshotted Stream.Category Events.codecJe Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
let create maxItemsPerEpoch (Category cat) =
let shouldClose candidateItems currentItems = Array.length currentItems + Array.length candidateItems >= maxItemsPerEpoch
Service(shouldClose, streamId >> Store.resolveDecider cat Category)
Service(shouldClose, Stream.id >> Store.createDecider cat)
25 changes: 13 additions & 12 deletions equinox-patterns/Domain/ListSeries.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
/// As an Epoch is marked `Closed`, the Ingester will mark a new Epoch `Started` on this aggregate via MarkIngestionEpochId
module Patterns.Domain.ListSeries

let [<Literal>] Category = "ListSeries"
// TOCONSIDER: if you need multiple lists series/epochs in a single system, the Series and Epoch streams should have a SeriesId in the stream name
// See also the implementation in the feedSource template, where the Series aggregate also functions as an index of series held in the system
let streamId () = Equinox.StreamId.gen ListSeriesId.toString ListSeriesId.wellKnownId
module private Stream =
let [<Literal>] Category = "ListSeries"
// TOCONSIDER: if you need multiple lists series/epochs in a single system, the Series and Epoch streams should have a SeriesId in the stream name
// See also the implementation in the feedSource template, where the Series aggregate also functions as an index of series held in the system
let id () = FsCodec.StreamId.gen ListSeriesId.toString ListSeriesId.wellKnownId

// NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
[<RequireQualifiedAccess>]
Expand All @@ -30,9 +31,9 @@ module Fold =
let isOrigin = function Events.Snapshotted _ -> true | _ -> false
let toSnapshot s = Events.Snapshotted {| active = Option.get s |}

let interpret epochId (state: Fold.State) =
[if state |> Option.forall (fun cur -> cur < epochId) && epochId >= ListEpochId.initial then
yield Events.Started {| epochId = epochId |}]
let interpret epochId (state: Fold.State) = [|
if state |> Option.forall (fun cur -> cur < epochId) && epochId >= ListEpochId.initial then
Events.Started {| epochId = epochId |} |]

type Service internal (resolve: unit -> Equinox.Decider<Events.Event, Fold.State>) =

Expand All @@ -46,12 +47,12 @@ type Service internal (resolve: unit -> Equinox.Decider<Events.Event, Fold.State
/// Writers are expected to react to having writes to an epoch denied (due to it being Closed) by anointing a successor via this
member _.MarkIngestionEpochId epochId: Async<unit> =
let decider = resolve ()
decider.Transact(interpret epochId, load = Equinox.AnyCachedValue)
decider.Transact(interpret epochId, load = Equinox.LoadOption.AnyCachedValue)

module Factory =

let private (|Category|) = function
| Store.Context.Memory store -> Store.Memory.create Events.codec Fold.initial Fold.fold store
| Store.Context.Cosmos (context, cache) ->
Store.Cosmos.createSnapshotted Events.codecJe Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
let create (Category cat) = Service(streamId >> Store.resolveDecider cat Category)
| Store.Config.Memory store -> Store.Memory.create Stream.Category Events.codec Fold.initial Fold.fold store
| Store.Config.Cosmos (context, cache) ->
Store.Cosmos.createSnapshotted Stream.Category Events.codecJe Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
let create (Category cat) = Service(Stream.id >> Store.createDecider cat)
19 changes: 10 additions & 9 deletions equinox-patterns/Domain/Period.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
/// c) if appropriate, the target period may be closed as part of the same decision flow if `decideCarryForward` yields Some
module Patterns.Domain.Period

let [<Literal>] Category = "Period"
let streamId = Equinox.StreamId.gen PeriodId.toString
module private Stream =
let [<Literal>] Category = "Period"
let id = FsCodec.StreamId.gen PeriodId.toString

// NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
module Events =
Expand Down Expand Up @@ -80,7 +81,7 @@ type Result<'request, 'result> =
/// 1. Streams must open with a BroughtForward event (obtained via Rules.getIncomingBalance if this is an uninitialized Period)
/// 2. (If the Period has not closed) Rules.decide gets to map the request to events and a residual
/// 3. Rules.decideCarryForward may trigger the closing of the Period based on the residual and/or the State by emitting Some balance
let decideIngestWithCarryForward rules req s: Async<Result<'req, 'result> * Events.Event list> = async {
let decideIngestWithCarryForward rules req s: Async<Result<'req, 'result> * Events.Event[]> = async {
let acc = Accumulator(s, Fold.fold)
do! acc.Transact(Fold.maybeOpen rules.getIncomingBalance)
let residual, result = acc.Transact(Fold.tryIngest rules.decideIngestion req)
Expand All @@ -107,7 +108,7 @@ type Service internal (resolve: PeriodId -> Equinox.Decider<Events.Event, Fold.S
let decide' s = async {
let! r, es = decideIngestWithCarryForward rules () s
return Option.get r.carryForward, es }
decider.TransactAsync(decide', load = Equinox.AnyCachedValue)
decider.Transact(decide', load = Equinox.LoadOption.AnyCachedValue)

/// Runs the decision function on the specified Period, closing and bringing forward balances from preceding Periods if necessary
let tryTransact periodId getIncoming (decide: 'request -> Fold.State -> 'request * 'result * Events.Event list) request shouldClose: Async<Result<'request, 'result>> =
Expand All @@ -116,7 +117,7 @@ type Service internal (resolve: PeriodId -> Equinox.Decider<Events.Event, Fold.S
decideIngestion = fun request state -> let residual, result, events = decide request state in residual, result, events
decideCarryForward = fun res state -> async { if shouldClose res then return! genBalance state else return None } } // also close, if we should
let decider = resolve periodId
decider.TransactAsync(decideIngestWithCarryForward rules request, load = Equinox.AnyCachedValue)
decider.Transact(decideIngestWithCarryForward rules request, load = Equinox.LoadOption.AnyCachedValue)

/// Runs the decision function on the specified Period, closing and bringing forward balances from preceding Periods if necessary
/// Processing completes when `decide` yields None for the residual of the 'request
Expand All @@ -141,9 +142,9 @@ type Service internal (resolve: PeriodId -> Equinox.Decider<Events.Event, Fold.S
module Factory =

let private (|Category|) = function
| Store.Context.Memory store -> Store.Memory.create Events.codec Fold.initial Fold.fold store
| Store.Context.Cosmos (context, cache) ->
| Store.Config.Memory store -> Store.Memory.create Stream.Category Events.codec Fold.initial Fold.fold store
| Store.Config.Cosmos (context, cache) ->
// Not using snapshots, on the basis that the writes are all coming from this process, so the cache will be sufficient
// to make reads cheap enough, with the benefit of writes being cheaper as you're not paying to maintain the snapshot
Store.Cosmos.createUnoptimized Events.codecJe Fold.initial Fold.fold (context, cache)
let create (Category cat) = Service(streamId >> Store.resolveDecider cat Category)
Store.Cosmos.createUnoptimized Stream.Category Events.codecJe Fold.initial Fold.fold (context, cache)
let create (Category cat) = Service(Stream.id >> Store.createDecider cat)
35 changes: 19 additions & 16 deletions equinox-patterns/Domain/Store.fs
Original file line number Diff line number Diff line change
@@ -1,37 +1,40 @@
module Patterns.Domain.Store

let log = Serilog.Log.ForContext("isMetric", true)
let resolveDecider cat = Equinox.Decider.resolve log cat
module Metrics =

module Codec =
let log = Serilog.Log.ForContext("isMetric", true)

let createDecider cat = Equinox.Decider.forStream Metrics.log cat

open FsCodec.SystemTextJson
module Codec =

let gen<'t when 't :> TypeShape.UnionContract.IUnionContract> =
Codec.Create<'t>() // options = Options.Default
FsCodec.SystemTextJson.Codec.Create<'t>() // options = Options.Default
let genJsonElement<'t when 't :> TypeShape.UnionContract.IUnionContract> =
CodecJsonElement.Create<'t>() // options = Options.Default
FsCodec.SystemTextJson.CodecJsonElement.Create<'t>() // options = Options.Default

module Memory =

let create codec initial fold store: Equinox.Category<_, _, _> =
Equinox.MemoryStore.MemoryStoreCategory(store, FsCodec.Deflate.EncodeUncompressed codec, fold, initial)
let create name codec initial fold store: Equinox.Category<_, _, _> =
Equinox.MemoryStore.MemoryStoreCategory(store, name, FsCodec.Deflate.EncodeUncompressed codec, fold, initial)

let private defaultCacheDuration = System.TimeSpan.FromMinutes 20
let private cacheStrategy cache = Equinox.CachingStrategy.SlidingWindow (cache, defaultCacheDuration)

module Cosmos =

let private createCached codec initial fold accessStrategy (context, cache) =
let cacheStrategy = Equinox.CosmosStore.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
Equinox.CosmosStore.CosmosStoreCategory(context, codec, fold, initial, cacheStrategy, accessStrategy)
let private createCached name codec initial fold accessStrategy (context, cache) =
Equinox.CosmosStore.CosmosStoreCategory(context, name, codec, fold, initial, accessStrategy, cacheStrategy cache)

let createUnoptimized codec initial fold (context, cache) =
let createUnoptimized name codec initial fold (context, cache) =
let accessStrategy = Equinox.CosmosStore.AccessStrategy.Unoptimized
createCached codec initial fold accessStrategy (context, cache)
createCached name codec initial fold accessStrategy (context, cache)

let createSnapshotted codec initial fold (isOrigin, toSnapshot) (context, cache) =
let createSnapshotted name codec initial fold (isOrigin, toSnapshot) (context, cache) =
let accessStrategy = Equinox.CosmosStore.AccessStrategy.Snapshot (isOrigin, toSnapshot)
createCached codec initial fold accessStrategy (context, cache)
createCached name codec initial fold accessStrategy (context, cache)

[<NoComparison; NoEquality; RequireQualifiedAccess>]
type Context<'t> =
type Config<'t> =
| Memory of Equinox.MemoryStore.VolatileStore<'t>
| Cosmos of Equinox.CosmosStore.CosmosStoreContext * Equinox.Cache
2 changes: 1 addition & 1 deletion equinox-shipping/Domain.Tests/Domain.Tests.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.2.0" />

<PackageReference Include="FsCheck.Xunit" Version="3.0.0-beta2" />
<PackageReference Include="Propulsion.MemoryStore" Version="3.0.0-rc.6" />
<PackageReference Include="Propulsion.MemoryStore" Version="3.0.0-rc.8.10" />
<PackageReference Include="xunit" Version="2.4.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.5" />
</ItemGroup>
Expand Down
4 changes: 2 additions & 2 deletions equinox-shipping/Domain.Tests/FinalizationProcessTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ type Properties(testOutput) =
nameof(Container.Events.Finalized)] // Container
test <@ res1 && set eventTypes = set expectedEvents @>
let containerEvents =
buffer.Queue(Container.Category, Container.streamId containerId1)
buffer.Queue(Container.Reactions.streamName containerId1)
|> Seq.chooseV (FsCodec.Deflate.EncodeUncompressed Container.Events.codec).TryDecode
|> List.ofSeq
test <@ match containerEvents with
| [ Container.Events.Finalized e ] -> e.shipmentIds = requestedShipmentIds
| xs -> failwithf "Unexpected %A" xs @>
| xs -> xs |> failwithf "Unexpected %A" @>
(* Next, we run an overlapping finalize - this should
a) yield a fail result
b) result in triggering of Revert flow with associated Shipment revoke events *)
Expand Down
1 change: 0 additions & 1 deletion equinox-shipping/Domain.Tests/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ type EventAccumulator<'E>() =
match messages.TryGetValue stream with
| false, _ -> Seq.empty<'E>
| true, xs -> xs :> _
member x.Queue(cat, sid) = x.Queue(FsCodec.StreamName.create cat (Equinox.Core.StreamId.toString sid))

member _.All() = seq { for KeyValue (_, xs) in messages do yield! xs }

Expand Down
2 changes: 1 addition & 1 deletion equinox-shipping/Domain.Tests/MemoryStoreFixture.fs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ open System
type MemoryStoreFixture() =
let store = Equinox.MemoryStore.VolatileStore<struct (int * ReadOnlyMemory<byte>)>()
let mutable disconnectLog: (unit -> unit) option = None
member val Config = Shipping.Domain.Store.Context.Memory store
member val Config = Store.Config.Memory store
member _.Committed = store.Committed
member _.TestOutput with set testOutput =
if Option.isSome disconnectLog then invalidOp "Cannot connect more than one test output"
Expand Down
Loading

0 comments on commit d965067

Please sign in to comment.