diff --git a/CHANGELOG.md b/CHANGELOG.md index 62e29ca31..354b41c07 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,7 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Changed -- Target `Equinox` v `4.0.0-rc.13`, `Propulsion` v `3.0.0-rc.8.10`, `FsCodec` v `3.0.0-rc.11.1` [#131](https://github.com/jet/dotnet-templates/pull/131) +- Target `Equinox` v `4.0.0-rc.14.5`, `Propulsion` v `3.0.0-rc.9.11`, `FsCodec` v `3.0.0-rc.14.1` [#131](https://github.com/jet/dotnet-templates/pull/131) ### Removed diff --git a/equinox-patterns/Domain/Domain.fsproj b/equinox-patterns/Domain/Domain.fsproj index 78f4b5464..ee67d35ba 100644 --- a/equinox-patterns/Domain/Domain.fsproj +++ b/equinox-patterns/Domain/Domain.fsproj @@ -17,10 +17,10 @@ - - - - + + + + diff --git a/equinox-patterns/Domain/Store.fs b/equinox-patterns/Domain/Store.fs index ee7af0a3d..b97652a9c 100644 --- a/equinox-patterns/Domain/Store.fs +++ b/equinox-patterns/Domain/Store.fs @@ -16,7 +16,7 @@ module Codec = module Memory = let create name codec initial fold store: Equinox.Category<_, _, _> = - Equinox.MemoryStore.MemoryStoreCategory(store, name, FsCodec.Deflate.EncodeUncompressed codec, fold, initial) + Equinox.MemoryStore.MemoryStoreCategory(store, name, FsCodec.Compression.EncodeUncompressed codec, fold, initial) let private defaultCacheDuration = System.TimeSpan.FromMinutes 20 let private cacheStrategy cache = Equinox.CachingStrategy.SlidingWindow (cache, defaultCacheDuration) diff --git a/equinox-shipping/Domain.Tests/ContainerTests.fs b/equinox-shipping/Domain.Tests/ContainerTests.fs index bd704fdf2..02c287d5d 100644 --- a/equinox-shipping/Domain.Tests/ContainerTests.fs +++ b/equinox-shipping/Domain.Tests/ContainerTests.fs @@ -8,5 +8,5 @@ open Swensen.Unquote let [] ``events roundtrip`` (x: Events.Event) = let ee = Events.codec.Encode((), x) let e = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data) - let des = Events.codec.TryDecode e + let des = Events.codec.Decode e test <@ des = ValueSome x @> diff --git a/equinox-shipping/Domain.Tests/Domain.Tests.fsproj b/equinox-shipping/Domain.Tests/Domain.Tests.fsproj index ff3465d4b..6ad470aa5 100644 --- a/equinox-shipping/Domain.Tests/Domain.Tests.fsproj +++ b/equinox-shipping/Domain.Tests/Domain.Tests.fsproj @@ -20,7 +20,7 @@ - + diff --git a/equinox-shipping/Domain.Tests/FinalizationProcessTests.fs b/equinox-shipping/Domain.Tests/FinalizationProcessTests.fs index 8e499011f..c307baf41 100644 --- a/equinox-shipping/Domain.Tests/FinalizationProcessTests.fs +++ b/equinox-shipping/Domain.Tests/FinalizationProcessTests.fs @@ -43,7 +43,7 @@ type Properties(testOutput) = test <@ res1 && set eventTypes = set expectedEvents @> let containerEvents = buffer.Queue(Container.Reactions.streamName containerId1) - |> Seq.chooseV (FsCodec.Deflate.EncodeUncompressed Container.Events.codec).TryDecode + |> Seq.chooseV (FsCodec.Compression.EncodeUncompressed Container.Events.codec).Decode |> List.ofSeq test <@ match containerEvents with | [ Container.Events.Finalized e ] -> e.shipmentIds = requestedShipmentIds diff --git a/equinox-shipping/Domain.Tests/FinalizationTransactionTests.fs b/equinox-shipping/Domain.Tests/FinalizationTransactionTests.fs index c6b86fc35..17a095883 100644 --- a/equinox-shipping/Domain.Tests/FinalizationTransactionTests.fs +++ b/equinox-shipping/Domain.Tests/FinalizationTransactionTests.fs @@ -8,5 +8,5 @@ open Swensen.Unquote let [] ``events roundtrip`` (x: Events.Event) = let ee = Events.codec.Encode((), x) let e = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data) - let des = Events.codec.TryDecode e + let des = Events.codec.Decode e test <@ des = ValueSome x @> diff --git a/equinox-shipping/Domain.Tests/ShipmentTests.fs b/equinox-shipping/Domain.Tests/ShipmentTests.fs index 26b2444e7..fbf68715d 100644 --- a/equinox-shipping/Domain.Tests/ShipmentTests.fs +++ b/equinox-shipping/Domain.Tests/ShipmentTests.fs @@ -8,5 +8,5 @@ open Swensen.Unquote let [] ``events roundtrip`` (x: Events.Event) = let ee = Events.codec.Encode((), x) let e = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data) - let des = Events.codec.TryDecode e + let des = Events.codec.Decode e test <@ des = ValueSome x @> diff --git a/equinox-shipping/Domain/Container.fs b/equinox-shipping/Domain/Container.fs index 6607f529e..1b3637c62 100644 --- a/equinox-shipping/Domain/Container.fs +++ b/equinox-shipping/Domain/Container.fs @@ -1,12 +1,10 @@ module Shipping.Domain.Container -module private Stream = - let [] Category = "Container" - let id = FsCodec.StreamId.gen ContainerId.toString - let name = id >> FsCodec.StreamName.create Category +let [] private CategoryName = "Container" +let private streamId = FsCodec.StreamId.gen ContainerId.toString module Reactions = - let streamName = Stream.name + let streamName = streamId >> FsCodec.StreamName.create CategoryName // NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care module Events = @@ -44,8 +42,8 @@ type Service internal (resolve: ContainerId -> Equinox.Decider Store.Memory.create Stream.Category Events.codec Fold.initial Fold.fold store - | Store.Config.Cosmos (context, cache) -> Store.Cosmos.createSnapshotted Stream.Category Events.codecJe Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache) - | Store.Config.Dynamo (context, cache) -> Store.Dynamo.createSnapshotted Stream.Category Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache) - | Store.Config.Esdb (context, cache) -> Store.Esdb.createUnoptimized Stream.Category Events.codec Fold.initial Fold.fold (context, cache) - let create (Category cat) = Service(Stream.id >> Store.createDecider cat) + | Store.Config.Memory store -> Store.Memory.create CategoryName Events.codec Fold.initial Fold.fold store + | Store.Config.Cosmos (context, cache) -> Store.Cosmos.createSnapshotted CategoryName Events.codecJe Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache) + | Store.Config.Dynamo (context, cache) -> Store.Dynamo.createSnapshotted CategoryName Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache) + | Store.Config.Esdb (context, cache) -> Store.Esdb.createUnoptimized CategoryName Events.codec Fold.initial Fold.fold (context, cache) + let create (Category cat) = Service(streamId >> Store.createDecider cat) diff --git a/equinox-shipping/Domain/Domain.fsproj b/equinox-shipping/Domain/Domain.fsproj index 5dc90e596..5b679032c 100644 --- a/equinox-shipping/Domain/Domain.fsproj +++ b/equinox-shipping/Domain/Domain.fsproj @@ -20,11 +20,11 @@ - - - - - + + + + + diff --git a/equinox-shipping/Domain/FinalizationTransaction.fs b/equinox-shipping/Domain/FinalizationTransaction.fs index 9c3988937..34bf5dfaa 100644 --- a/equinox-shipping/Domain/FinalizationTransaction.fs +++ b/equinox-shipping/Domain/FinalizationTransaction.fs @@ -1,10 +1,8 @@ module Shipping.Domain.FinalizationTransaction -module private Stream = - let [] Category = "FinalizationTransaction" - let id = FsCodec.StreamId.gen TransactionId.toString - let decodeId = FsCodec.StreamId.dec TransactionId.parse - let tryDecode = FsCodec.StreamName.tryFind Category >> ValueOption.map decodeId +let [] private CategoryName = "FinalizationTransaction" +let private streamId = FsCodec.StreamId.gen TransactionId.toString +let private catId = CategoryId(CategoryName, streamId, FsCodec.StreamId.dec TransactionId.parse) // NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care module Events = @@ -36,8 +34,8 @@ module Reactions = /// Used by the Watchdog to infer whether a given event signifies that the processing has reached a terminal state let isTerminalEvent (encoded: FsCodec.ITimelineEvent<_>) = encoded.EventType = nameof(Events.Completed) - let [] categoryName = Stream.Category - let [] (|For|_|) = Stream.tryDecode + let [] categoryName = CategoryName + let [] (|For|_|) = catId.TryDecode module Fold = @@ -84,9 +82,9 @@ module Flow = match state, event with | Fold.State.Initial, Events.FinalizationRequested _ | Fold.State.Reserving _, Events.RevertCommenced _ - | Fold.State.Reserving _, Events.ReservationCompleted _ + | Fold.State.Reserving _, Events.ReservationCompleted | Fold.State.Reverting _, Events.Completed - | Fold.State.Assigning _, Events.AssignmentCompleted _ + | Fold.State.Assigning _, Events.AssignmentCompleted | Fold.State.Assigned _, Events.Completed -> true | _ -> false @@ -107,8 +105,8 @@ type Service internal (resolve: TransactionId -> Equinox.Decider Store.Memory.create Stream.Category Events.codec Fold.initial Fold.fold store - | Store.Config.Cosmos (context, cache) -> Store.Cosmos.createSnapshotted Stream.Category Events.codecJe Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache) - | Store.Config.Dynamo (context, cache) -> Store.Dynamo.createSnapshotted Stream.Category Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache) - | Store.Config.Esdb (context, cache) -> Store.Esdb.createUnoptimized Stream.Category Events.codec Fold.initial Fold.fold (context, cache) - let create (Category cat) = Service(Stream.id >> Store.createDecider cat) + | Store.Config.Memory store -> Store.Memory.create CategoryName Events.codec Fold.initial Fold.fold store + | Store.Config.Cosmos (context, cache) -> Store.Cosmos.createSnapshotted CategoryName Events.codecJe Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache) + | Store.Config.Dynamo (context, cache) -> Store.Dynamo.createSnapshotted CategoryName Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache) + | Store.Config.Esdb (context, cache) -> Store.Esdb.createUnoptimized CategoryName Events.codec Fold.initial Fold.fold (context, cache) + let create (Category cat) = Service(streamId >> Store.createDecider cat) diff --git a/equinox-shipping/Domain/Shipment.fs b/equinox-shipping/Domain/Shipment.fs index 3bc80b04b..2fc0a2420 100644 --- a/equinox-shipping/Domain/Shipment.fs +++ b/equinox-shipping/Domain/Shipment.fs @@ -1,8 +1,7 @@ module Shipping.Domain.Shipment -module private Stream = - let [] Category = "Shipment" - let id = FsCodec.StreamId.gen ShipmentId.toString +let [] private CategoryName = "Shipment" +let private streamId = FsCodec.StreamId.gen ShipmentId.toString // NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care module Events = @@ -30,40 +29,42 @@ module Fold = let isOrigin = function Events.Snapshotted _ -> true | _ -> false let toSnapshot (state: State) = Events.Snapshotted {| reservation = state.reservation; association = state.association |} -let decideReserve transactionId: Fold.State -> bool * Events.Event[] = function - | { reservation = Some r } when r = transactionId -> true, [||] - | { reservation = None } -> true, [| Events.Reserved {| transaction = transactionId |} |] - | _ -> false, [||] +module Decisions = + + let reserve transactionId: Fold.State -> bool * Events.Event[] = function + | { reservation = Some r } when r = transactionId -> true, [||] + | { reservation = None } -> true, [| Events.Reserved {| transaction = transactionId |} |] + | _ -> false, [||] -let interpretRevoke transactionId: Fold.State -> Events.Event[] = function - | { reservation = Some r; association = None } when r = transactionId -> - [| Events.Revoked |] - | _ -> [||] // Ignore if a) already revoked/never reserved b) not reserved for this transactionId + let revoke transactionId: Fold.State -> Events.Event[] = function + | { reservation = Some r; association = None } when r = transactionId -> + [| Events.Revoked |] + | _ -> [||] // Ignore if a) already revoked/never reserved b) not reserved for this transactionId -let interpretAssign transactionId containerId: Fold.State -> Events.Event[] = function - | { reservation = Some r; association = None } when r = transactionId -> - [| Events.Assigned {| container = containerId |} |] - | _ -> [||] // Ignore if a) this transaction was not the one reserving it or b) it's already been assigned + let assign transactionId containerId: Fold.State -> Events.Event[] = function + | { reservation = Some r; association = None } when r = transactionId -> + [| Events.Assigned {| container = containerId |} |] + | _ -> [||] // Ignore if a) this transaction was not the one reserving it or b) it's already been assigned type Service internal (resolve: ShipmentId -> Equinox.Decider) = member _.TryReserve(shipmentId, transactionId): Async = let decider = resolve shipmentId - decider.Transact(decideReserve transactionId) + decider.Transact(Decisions.reserve transactionId) member _.Revoke(shipmentId, transactionId): Async = let decider = resolve shipmentId - decider.Transact(interpretRevoke transactionId) + decider.Transact(Decisions.revoke transactionId) member _.Assign(shipmentId, containerId, transactionId): Async = let decider = resolve shipmentId - decider.Transact(interpretAssign transactionId containerId) + decider.Transact(Decisions.assign transactionId containerId) module Factory = let private (|Category|) = function - | Store.Config.Memory store -> Store.Memory.create Stream.Category Events.codec Fold.initial Fold.fold store - | Store.Config.Cosmos (context, cache) -> Store.Cosmos.createSnapshotted Stream.Category Events.codecJe Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache) - | Store.Config.Dynamo (context, cache) -> Store.Dynamo.createSnapshotted Stream.Category Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache) - | Store.Config.Esdb (context, cache) -> Store.Esdb.createUnoptimized Stream.Category Events.codec Fold.initial Fold.fold (context, cache) - let create (Category cat) = Service(Stream.id >> Store.createDecider cat) + | Store.Config.Memory store -> Store.Memory.create CategoryName Events.codec Fold.initial Fold.fold store + | Store.Config.Cosmos (context, cache) -> Store.Cosmos.createSnapshotted CategoryName Events.codecJe Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache) + | Store.Config.Dynamo (context, cache) -> Store.Dynamo.createSnapshotted CategoryName Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache) + | Store.Config.Esdb (context, cache) -> Store.Esdb.createUnoptimized CategoryName Events.codec Fold.initial Fold.fold (context, cache) + let create (Category cat) = Service(streamId >> Store.createDecider cat) diff --git a/equinox-shipping/Domain/Store.fs b/equinox-shipping/Domain/Store.fs index a1d7b265c..f4670e6e1 100644 --- a/equinox-shipping/Domain/Store.fs +++ b/equinox-shipping/Domain/Store.fs @@ -19,7 +19,7 @@ module Codec = module Memory = let create name codec initial fold store: Equinox.Category<_, _, _> = - Equinox.MemoryStore.MemoryStoreCategory(store, name, FsCodec.Deflate.EncodeUncompressed codec, fold, initial) + Equinox.MemoryStore.MemoryStoreCategory(store, name, FsCodec.Compression.EncodeUncompressed codec, fold, initial) let private defaultCacheDuration = System.TimeSpan.FromMinutes 20 let private cacheStrategy cache = Equinox.CachingStrategy.SlidingWindow (cache, defaultCacheDuration) @@ -40,7 +40,7 @@ module Dynamo = open Equinox.DynamoStore let private createCached name codec initial fold accessStrategy (context, cache) = - DynamoStoreCategory(context, name, FsCodec.Deflate.EncodeTryDeflate codec, fold, initial, accessStrategy, cacheStrategy cache) + DynamoStoreCategory(context, name, FsCodec.Compression.EncodeTryCompress codec, fold, initial, accessStrategy, cacheStrategy cache) let createSnapshotted name codec initial fold (isOrigin, toSnapshot) (context, cache) = let accessStrategy = AccessStrategy.Snapshot (isOrigin, toSnapshot) diff --git a/equinox-shipping/Domain/TransactionWatchdog.fs b/equinox-shipping/Domain/TransactionWatchdog.fs index 56890e2d5..cb81a3450 100644 --- a/equinox-shipping/Domain/TransactionWatchdog.fs +++ b/equinox-shipping/Domain/TransactionWatchdog.fs @@ -43,7 +43,7 @@ let fold: Events.Categorization seq -> Fold.State = let (|TransactionStatus|) (codec: #FsCodec.IEventCodec<_, _, _>) events: Fold.State = events - |> Seq.choose (codec.TryDecode >> function ValueSome x -> Some x | ValueNone -> None) + |> Seq.chooseV codec.Decode |> fold module Finalization = diff --git a/equinox-shipping/Domain/Types.fs b/equinox-shipping/Domain/Types.fs index beb7d39af..38c719b86 100644 --- a/equinox-shipping/Domain/Types.fs +++ b/equinox-shipping/Domain/Types.fs @@ -19,7 +19,16 @@ module TransactionId = let parse (x: string): TransactionId = %x let (|Parse|) = parse +module Seq = + + let inline chooseV f xs = seq { for x in xs do match f x with ValueSome v -> yield v | ValueNone -> () } + module Guid = let inline toStringN (x: System.Guid) = x.ToString "N" let generateStringN () = let g = System.Guid.NewGuid() in toStringN g + +/// Handles symmetric generation and decoding of StreamNames composed of a series of elements via the FsCodec.StreamId helpers +type internal CategoryId<'elements>(name, gen: 'elements -> FsCodec.StreamId, dec: FsCodec.StreamId -> 'elements) = + member _.StreamName = gen >> FsCodec.StreamName.create name + member _.TryDecode = FsCodec.StreamName.tryFind name >> ValueOption.map dec diff --git a/equinox-shipping/Watchdog.Integration/CosmosConnector.fs b/equinox-shipping/Watchdog.Integration/CosmosConnector.fs index 43d5820b2..f1becdc7e 100644 --- a/equinox-shipping/Watchdog.Integration/CosmosConnector.fs +++ b/equinox-shipping/Watchdog.Integration/CosmosConnector.fs @@ -1,7 +1,5 @@ namespace Shipping.Watchdog.Integration -open Shipping.Infrastructure - type CosmosConnector(connectionString, databaseId, containerId) = let discovery = connectionString |> Equinox.CosmosStore.Discovery.ConnectionString diff --git a/equinox-shipping/Watchdog.Integration/DynamoConnector.fs b/equinox-shipping/Watchdog.Integration/DynamoConnector.fs index f8cd1977b..26c596b75 100644 --- a/equinox-shipping/Watchdog.Integration/DynamoConnector.fs +++ b/equinox-shipping/Watchdog.Integration/DynamoConnector.fs @@ -1,7 +1,5 @@ namespace Shipping.Watchdog.Integration -open Shipping.Infrastructure - type DynamoConnector(connector: Equinox.DynamoStore.DynamoStoreConnector, table, indexTable) = let client = connector.CreateClient() diff --git a/equinox-shipping/Watchdog.Integration/EsdbConnector.fs b/equinox-shipping/Watchdog.Integration/EsdbConnector.fs index ff46c4ecb..57ae31931 100644 --- a/equinox-shipping/Watchdog.Integration/EsdbConnector.fs +++ b/equinox-shipping/Watchdog.Integration/EsdbConnector.fs @@ -1,6 +1,5 @@ namespace Shipping.Watchdog.Integration -open Shipping.Infrastructure open System type EsdbConnector(connection, credentials) = diff --git a/equinox-shipping/Watchdog.Integration/ReactorFixture.fs b/equinox-shipping/Watchdog.Integration/ReactorFixture.fs index 84ae702d3..721167abe 100644 --- a/equinox-shipping/Watchdog.Integration/ReactorFixture.fs +++ b/equinox-shipping/Watchdog.Integration/ReactorFixture.fs @@ -2,7 +2,6 @@ namespace Shipping.Watchdog.Integration open Propulsion.Internal // IntervalTimer etc open Shipping.Domain.Tests -open Shipping.Infrastructure open Shipping.Watchdog open System @@ -38,10 +37,7 @@ type FixtureBase(messageSink, store, dumpStats, createSourceConfig) = if stats.StatsInterval.RemainingMs > 3000 then stats.StatsInterval.Trigger() stats.StatsInterval.SleepUntilTriggerCleared() - member _.Await(propagationDelay) = - match awaitReactions with - | Some f -> f propagationDelay |> Async.ofTask - | None -> async { () } + member _.Await(propagationDelay) = awaitReactions propagationDelay |> Async.ofTask interface IDisposable with @@ -80,7 +76,7 @@ module CosmosReactor = let store, monitored, leases = conn.Connect() let createSourceConfig consumerGroupName = let checkpointConfig = CosmosFeedConfig.Ephemeral consumerGroupName - SourceConfig.Cosmos (monitored, leases, checkpointConfig, tailSleepInterval) + SourceConfig.Cosmos (monitored, leases, checkpointConfig, tailSleepInterval, TimeSpan.FromSeconds 60.) new Fixture(messageSink, store, createSourceConfig) member _.NullWait(_arguments) = async.Zero () // We could wire up a way to await all tranches having caught up, but not implemented yet member val private Timeout = if System.Diagnostics.Debugger.IsAttached then TimeSpan.FromHours 1. else TimeSpan.FromMinutes 1. diff --git a/equinox-shipping/Watchdog.Lambda.Cdk/Watchdog.Lambda.Cdk.fsproj b/equinox-shipping/Watchdog.Lambda.Cdk/Watchdog.Lambda.Cdk.fsproj index 469391cf9..41100b04a 100644 --- a/equinox-shipping/Watchdog.Lambda.Cdk/Watchdog.Lambda.Cdk.fsproj +++ b/equinox-shipping/Watchdog.Lambda.Cdk/Watchdog.Lambda.Cdk.fsproj @@ -20,7 +20,7 @@ - + diff --git a/equinox-shipping/Watchdog.Lambda/Function.fs b/equinox-shipping/Watchdog.Lambda/Function.fs index 4493792c9..616c6486d 100644 --- a/equinox-shipping/Watchdog.Lambda/Function.fs +++ b/equinox-shipping/Watchdog.Lambda/Function.fs @@ -5,7 +5,6 @@ open Amazon.Lambda.SQSEvents open Equinox.DynamoStore open Serilog open Shipping.Domain -open Shipping.Infrastructure open Shipping.Watchdog open System diff --git a/equinox-shipping/Watchdog.Lambda/Watchdog.Lambda.fsproj b/equinox-shipping/Watchdog.Lambda/Watchdog.Lambda.fsproj index 4b9f2d242..91527b6ab 100644 --- a/equinox-shipping/Watchdog.Lambda/Watchdog.Lambda.fsproj +++ b/equinox-shipping/Watchdog.Lambda/Watchdog.Lambda.fsproj @@ -16,7 +16,7 @@ - + diff --git a/equinox-shipping/Watchdog/Args.fs b/equinox-shipping/Watchdog/Args.fs index 76208be5f..7fcdb6214 100644 --- a/equinox-shipping/Watchdog/Args.fs +++ b/equinox-shipping/Watchdog/Args.fs @@ -1,5 +1,5 @@ /// Commandline arguments and/or secrets loading specifications -module Shipping.Infrastructure.Args +module Args open System open FSharp.Control diff --git a/equinox-shipping/Watchdog/Handler.fs b/equinox-shipping/Watchdog/Handler.fs index a29050fa3..e07cb8123 100644 --- a/equinox-shipping/Watchdog/Handler.fs +++ b/equinox-shipping/Watchdog/Handler.fs @@ -1,6 +1,5 @@ module Shipping.Watchdog.Handler -open Shipping.Infrastructure open System [] diff --git a/equinox-shipping/Watchdog/Infrastructure.fs b/equinox-shipping/Watchdog/Infrastructure.fs index 3d672ef22..4f1d4051a 100644 --- a/equinox-shipping/Watchdog/Infrastructure.fs +++ b/equinox-shipping/Watchdog/Infrastructure.fs @@ -70,7 +70,7 @@ type Equinox.DynamoStore.DynamoStoreConnector with member x.CreateClient() = x.LogConfiguration() - x.CreateDynamoDbClient() |> Equinox.DynamoStore.DynamoStoreClient + x.CreateDynamoStoreClient() type Equinox.DynamoStore.DynamoStoreClient with diff --git a/equinox-shipping/Watchdog/Program.fs b/equinox-shipping/Watchdog/Program.fs index 4395e2817..032a2dc08 100644 --- a/equinox-shipping/Watchdog/Program.fs +++ b/equinox-shipping/Watchdog/Program.fs @@ -1,7 +1,6 @@ module Shipping.Watchdog.Program open Serilog -open Shipping.Infrastructure open System module Args = @@ -69,7 +68,7 @@ module Args = let buildSourceConfig _log groupName = let startFromTail, maxItems, tailSleepInterval, lagFrequency = a.MonitoringParams let checkpointConfig = CosmosFeedConfig.Persistent (groupName, startFromTail, maxItems, lagFrequency) - SourceConfig.Cosmos (monitored, leases, checkpointConfig, tailSleepInterval) + SourceConfig.Cosmos (monitored, leases, checkpointConfig, tailSleepInterval, x.StatsInterval) let store = Store.Config.Cosmos (context, cache) store, buildSourceConfig, Equinox.CosmosStore.Core.Log.InternalMetrics.dump | Choice2Of3 a -> diff --git a/equinox-shipping/Watchdog/SourceArgs.fs b/equinox-shipping/Watchdog/SourceArgs.fs index c1f3e51d2..506bffb93 100644 --- a/equinox-shipping/Watchdog/SourceArgs.fs +++ b/equinox-shipping/Watchdog/SourceArgs.fs @@ -2,7 +2,6 @@ module Shipping.Watchdog.SourceArgs open Argu open Shipping.Domain // Config etc -open Shipping.Infrastructure // Args etc open Serilog open System @@ -38,7 +37,7 @@ module Cosmos = | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 30." | LeaseContainer _ -> "specify Container Name (in this [target] Database) for Leases container. Default: `SourceContainer` + `-aux`." - | FromTail _ -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." + | FromTail -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." | MaxItems _ -> "maximum item count to supply for the Change Feed query. Default: use response size limit" | LagFreqM _ -> "specify frequency (minutes) to dump lag stats. Default: 1" @@ -92,7 +91,7 @@ module Dynamo = | IndexTable _ -> "specify a table name for the index store. (optional if environment variable " + Args.INDEX_TABLE + " specified. default: `Table`+`IndexSuffix`)" | IndexSuffix _ -> "specify a suffix for the index store. (optional if environment variable " + Args.INDEX_TABLE + " specified. default: \"-index\")" | MaxItems _ -> "maximum events to load in a batch. Default: 100" - | FromTail _ -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." + | FromTail -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." | StreamsDop _ -> "parallelism when loading events from Store Feed Source. Default 4" type Arguments(c: Configuration, p: ParseResults) = diff --git a/equinox-shipping/Watchdog/SourceConfig.fs b/equinox-shipping/Watchdog/SourceConfig.fs index fdcd19954..96e62c11d 100644 --- a/equinox-shipping/Watchdog/SourceConfig.fs +++ b/equinox-shipping/Watchdog/SourceConfig.fs @@ -1,4 +1,4 @@ -namespace Shipping.Infrastructure +namespace global open System open System.Threading.Tasks @@ -10,6 +10,7 @@ type SourceConfig = * leasesContainer: Microsoft.Azure.Cosmos.Container * checkpoints: CosmosFeedConfig * tailSleepInterval: TimeSpan + * statsInterval: TimeSpan | Dynamo of indexContext: Equinox.DynamoStore.DynamoStoreContext * checkpoints: Propulsion.Feed.IFeedCheckpointStore * loading: Propulsion.DynamoStore.EventLoadMode @@ -31,33 +32,32 @@ and [] CosmosFeedConfig = module SourceConfig = module Memory = open Propulsion.MemoryStore - let start log (sink: Propulsion.Sinks.Sink) (categories: string[]) - (store: Equinox.MemoryStore.VolatileStore<_>): Propulsion.Pipeline * (TimeSpan -> Task) option = + let start log (sink: Propulsion.Sinks.SinkPipeline) (categories: string[]) + (store: Equinox.MemoryStore.VolatileStore<_>): Propulsion.Pipeline * (TimeSpan -> Task) = let source = MemoryStoreSource(log, store, categories, sink) - source.Start(), Some (fun _propagationDelay -> source.Monitor.AwaitCompletion(ignoreSubsequent = false)) + source.Start(), fun _propagationDelay -> source.Monitor.AwaitCompletion(ignoreSubsequent = false) module Cosmos = open Propulsion.CosmosStore - let start log (sink: Propulsion.Sinks.Sink) categories - (monitoredContainer, leasesContainer, checkpointConfig, tailSleepInterval): Propulsion.Pipeline * (TimeSpan -> Task) option = - let parseFeedDoc = EquinoxSystemTextJsonParser.enumCategoryEvents categories - let observer = CosmosStoreSource.CreateObserver(log, sink.StartIngester, Seq.collect parseFeedDoc) + let start log (sink: Propulsion.Sinks.SinkPipeline) categories + (monitoredContainer, leasesContainer, checkpointConfig, tailSleepInterval, statsInterval): Propulsion.Pipeline * (TimeSpan -> Task) = + let parseFeedDoc = EquinoxSystemTextJsonParser.ofCategories categories let source = match checkpointConfig with | Ephemeral processorName -> let withStartTime1sAgo (x: Microsoft.Azure.Cosmos.ChangeFeedProcessorBuilder) = x.WithStartTime(let t = DateTime.UtcNow in t.AddSeconds -1.) let lagFrequency = TimeSpan.FromMinutes 1. - CosmosStoreSource.Start(log, monitoredContainer, leasesContainer, processorName, observer, - startFromTail = true, customize = withStartTime1sAgo, tailSleepInterval = tailSleepInterval, - lagReportFreq = lagFrequency) + CosmosStoreSource(log, statsInterval, monitoredContainer, leasesContainer, processorName, parseFeedDoc, sink, + startFromTail = true, customize = withStartTime1sAgo, tailSleepInterval = tailSleepInterval, + lagEstimationInterval = lagFrequency).Start() | Persistent (processorName, startFromTail, maxItems, lagFrequency) -> - CosmosStoreSource.Start(log, monitoredContainer, leasesContainer, processorName, observer, - startFromTail = startFromTail, ?maxItems = maxItems, tailSleepInterval = tailSleepInterval, - lagReportFreq = lagFrequency) - source, None + CosmosStoreSource(log, statsInterval, monitoredContainer, leasesContainer, processorName, parseFeedDoc, sink, + startFromTail = startFromTail, ?maxItems = maxItems, tailSleepInterval = tailSleepInterval, + lagEstimationInterval = lagFrequency).Start() + source, fun propagationDelay -> source.Monitor.AwaitCompletion(propagationDelay, ignoreSubsequent = false) module Dynamo = open Propulsion.DynamoStore - let create (log, storeLog) (sink: Propulsion.Sinks.Sink) categories + let create (log, storeLog) (sink: Propulsion.Sinks.SinkPipeline) categories (indexContext, checkpoints, loadMode, startFromTail, batchSizeCutoff, tailSleepInterval, statsInterval) trancheIds = DynamoStoreSource( log, statsInterval, @@ -65,27 +65,27 @@ module SourceConfig = checkpoints, sink, loadMode, categories = categories, startFromTail = startFromTail, storeLog = storeLog, ?trancheIds = trancheIds) let start (log, storeLog) sink categories (indexContext, checkpoints, loadMode, startFromTail, batchSizeCutoff, tailSleepInterval, statsInterval) - : Propulsion.Pipeline * (TimeSpan -> Task) option = + : Propulsion.Pipeline * (TimeSpan -> Task) = let source = create (log, storeLog) sink categories (indexContext, checkpoints, loadMode, startFromTail, batchSizeCutoff, tailSleepInterval, statsInterval) None let source = source.Start() - source, Some (fun propagationDelay -> source.Monitor.AwaitCompletion(propagationDelay, ignoreSubsequent = false)) + source, fun propagationDelay -> source.Monitor.AwaitCompletion(propagationDelay, ignoreSubsequent = false) module Esdb = open Propulsion.EventStoreDb - let start log (sink: Propulsion.Sinks.Sink) categories - (client, checkpoints, withData, startFromTail, batchSize, tailSleepInterval, statsInterval): Propulsion.Pipeline * (TimeSpan -> Task) option = + let start log (sink: Propulsion.Sinks.SinkPipeline) categories + (client, checkpoints, withData, startFromTail, batchSize, tailSleepInterval, statsInterval): Propulsion.Pipeline * (TimeSpan -> Task) = let source = EventStoreSource( log, statsInterval, client, batchSize, tailSleepInterval, checkpoints, sink, categories, withData = withData, startFromTail = startFromTail) let source = source.Start() - source, Some (fun propagationDelay -> source.Monitor.AwaitCompletion(propagationDelay, ignoreSubsequent = false)) + source, fun propagationDelay -> source.Monitor.AwaitCompletion(propagationDelay, ignoreSubsequent = false) - let start (log, storeLog) sink categories: SourceConfig -> Propulsion.Pipeline * (TimeSpan -> Task) option = function + let start (log, storeLog) sink categories: SourceConfig -> Propulsion.Pipeline * (TimeSpan -> Task) = function | SourceConfig.Memory volatileStore -> Memory.start log sink categories volatileStore - | SourceConfig.Cosmos (monitored, leases, checkpointConfig, tailSleepInterval) -> - Cosmos.start log sink categories (monitored, leases, checkpointConfig, tailSleepInterval) + | SourceConfig.Cosmos (monitored, leases, checkpointConfig, tailSleepInterval, statsInterval) -> + Cosmos.start log sink categories (monitored, leases, checkpointConfig, tailSleepInterval, statsInterval) | SourceConfig.Dynamo (indexContext, checkpoints, loadMode, startFromTail, batchSizeCutoff, tailSleepInterval, statsInterval) -> Dynamo.start (log, storeLog) sink categories (indexContext, checkpoints, loadMode, startFromTail, batchSizeCutoff, tailSleepInterval, statsInterval) | SourceConfig.Esdb (client, checkpoints, withData, startFromTail, batchSize, tailSleepInterval, statsInterval) -> diff --git a/equinox-shipping/Watchdog/Watchdog.fsproj b/equinox-shipping/Watchdog/Watchdog.fsproj index 46dae6dd2..16f70c74f 100644 --- a/equinox-shipping/Watchdog/Watchdog.fsproj +++ b/equinox-shipping/Watchdog/Watchdog.fsproj @@ -18,10 +18,10 @@ - - - - + + + + diff --git a/equinox-testbed/Testbed.fsproj b/equinox-testbed/Testbed.fsproj index b93983171..41424f81a 100644 --- a/equinox-testbed/Testbed.fsproj +++ b/equinox-testbed/Testbed.fsproj @@ -17,11 +17,11 @@ - - - - - + + + + + diff --git a/equinox-web-csharp/Domain/Domain.csproj b/equinox-web-csharp/Domain/Domain.csproj index e9b2c86ea..47173e8f1 100755 --- a/equinox-web-csharp/Domain/Domain.csproj +++ b/equinox-web-csharp/Domain/Domain.csproj @@ -5,8 +5,8 @@ - - + + diff --git a/equinox-web-csharp/Web/Web.csproj b/equinox-web-csharp/Web/Web.csproj index 8068b5100..9be8b7486 100755 --- a/equinox-web-csharp/Web/Web.csproj +++ b/equinox-web-csharp/Web/Web.csproj @@ -5,10 +5,10 @@ - - - - + + + + diff --git a/equinox-web/Domain/Domain.fsproj b/equinox-web/Domain/Domain.fsproj index 7c92d8f33..bd6d1f9c3 100644 --- a/equinox-web/Domain/Domain.fsproj +++ b/equinox-web/Domain/Domain.fsproj @@ -17,11 +17,11 @@ - - - - - + + + + + diff --git a/equinox-web/Domain/Store.fs b/equinox-web/Domain/Store.fs index b7af1e772..3f8bf67fb 100644 --- a/equinox-web/Domain/Store.fs +++ b/equinox-web/Domain/Store.fs @@ -40,7 +40,7 @@ module Dynamo = let private createCached name codec initial fold accessStrategy (context, cache) = let cacheStrategy = Equinox.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) - Equinox.DynamoStore.DynamoStoreCategory(context, name, FsCodec.Deflate.EncodeUncompressed codec, fold, initial, accessStrategy, cacheStrategy) + Equinox.DynamoStore.DynamoStoreCategory(context, name, FsCodec.Compression.EncodeUncompressed codec, fold, initial, accessStrategy, cacheStrategy) let createSnapshotted name codec initial fold (isOrigin, toSnapshot) (context, cache) = let accessStrategy = Equinox.DynamoStore.AccessStrategy.Snapshot (isOrigin, toSnapshot) diff --git a/equinox-web/Web/Startup.fs b/equinox-web/Web/Startup.fs index 7c09f7c3a..e2d61c312 100644 --- a/equinox-web/Web/Startup.fs +++ b/equinox-web/Web/Startup.fs @@ -60,7 +60,7 @@ module Store = module private Dynamo = open Equinox.DynamoStore let connect (region, table) (timeout, retries) = - let c = DynamoStoreConnector(region, timeout, retries).CreateDynamoDbClient() |> DynamoStoreClient + let c = DynamoStoreConnector(region, timeout, retries).CreateDynamoStoreClient() DynamoStoreContext.Establish(c, table) |> Async.RunSynchronously //#endif diff --git a/equinox-web/Web/Web.fsproj b/equinox-web/Web/Web.fsproj index 05d9b6b5c..3619ad14b 100644 --- a/equinox-web/Web/Web.fsproj +++ b/equinox-web/Web/Web.fsproj @@ -11,12 +11,12 @@ - - + + - + diff --git a/feed-consumer/FeedConsumer.fsproj b/feed-consumer/FeedConsumer.fsproj index 496d1373e..d7eee791c 100644 --- a/feed-consumer/FeedConsumer.fsproj +++ b/feed-consumer/FeedConsumer.fsproj @@ -16,11 +16,11 @@ - - - - - + + + + + diff --git a/feed-source/Domain/Domain.fsproj b/feed-source/Domain/Domain.fsproj index f5727d45d..df4735163 100644 --- a/feed-source/Domain/Domain.fsproj +++ b/feed-source/Domain/Domain.fsproj @@ -14,11 +14,11 @@ - - - - - + + + + + diff --git a/feed-source/Domain/TicketsEpoch.fs b/feed-source/Domain/TicketsEpoch.fs index dab6be924..971c1a648 100644 --- a/feed-source/Domain/TicketsEpoch.fs +++ b/feed-source/Domain/TicketsEpoch.fs @@ -5,9 +5,8 @@ /// Each successive epoch is identified by an index, i.e. TicketsEpoch-FC001_0, then TicketsEpoch-FC001_1 module FeedSourceTemplate.Domain.TicketsEpoch -module private Stream = - let [] Category = "TicketsEpoch" - let id = FsCodec.StreamId.gen2 FcId.toString TicketsEpochId.toString +let [] CategoryName = "TicketsEpoch" +let private streamId = FsCodec.StreamId.gen2 FcId.toString TicketsEpochId.toString // NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care [] @@ -72,7 +71,7 @@ let decide capacity candidates (currentIds, closed as state) = { accepted = addedItemIds; residual = residualItems; content = currentIds; closed = closed }, events /// Service used for the write side; manages ingestion of items into the series of epochs -type IngestionService internal (capacity, resolve: FcId * TicketsEpochId -> Equinox.Decider) = +type IngestionService internal (capacity, resolve: struct (FcId * TicketsEpochId) -> Equinox.Decider) = /// Handles idempotent deduplicated insertion into the set of items held within the epoch member _.Ingest(fcId, epochId, ticketIds): Async = @@ -93,10 +92,10 @@ type IngestionService internal (capacity, resolve: FcId * TicketsEpochId -> Equi module Factory = let private create_ capacity resolve = - IngestionService(capacity, Stream.id >> resolve) + IngestionService(capacity, streamId >> resolve) let private (|Category|) = function - | Store.Config.Memory store -> Store.Memory.create Stream.Category Events.codec Fold.initial Fold.fold store - | Store.Config.Cosmos (context, cache) -> Store.Cosmos.createSnapshotted Stream.Category Events.codec Fold.initial Fold.fold Fold.Snapshot.config (context, cache) + | Store.Config.Memory store -> Store.Memory.create CategoryName Events.codec Fold.initial Fold.fold store + | Store.Config.Cosmos (context, cache) -> Store.Cosmos.createSnapshotted CategoryName Events.codec Fold.initial Fold.fold Fold.Snapshot.config (context, cache) let create capacity (Category cat) = Store.createDecider cat |> create_ capacity /// Custom Fold and caching logic compared to the IngesterService @@ -115,7 +114,7 @@ module Reader = type StateDto = { closed: bool; tickets: Events.Item[] } - type Service internal (resolve: FcId * TicketsEpochId -> Equinox.Decider) = + type Service internal (resolve: struct (FcId * TicketsEpochId) -> Equinox.Decider) = /// Returns all the items currently held in the stream member _.Read(fcId, epochId): Async = @@ -125,6 +124,6 @@ module Reader = module Factory = let private (|Category|) = function - | Store.Config.Memory store -> Store.Memory.create Stream.Category Events.codec initial fold store - | Store.Config.Cosmos (context, cache) -> Store.Cosmos.createUnoptimized Stream.Category Events.codec initial fold (context, cache) - let create (Category cat) = Service(Stream.id >> Store.createDecider cat) + | Store.Config.Memory store -> Store.Memory.create CategoryName Events.codec initial fold store + | Store.Config.Cosmos (context, cache) -> Store.Cosmos.createUnoptimized CategoryName Events.codec initial fold (context, cache) + let create (Category cat) = Service(streamId >> Store.createDecider cat) diff --git a/feed-source/Domain/TicketsIngester.fs b/feed-source/Domain/TicketsIngester.fs index be52d2bab..35ac96d40 100644 --- a/feed-source/Domain/TicketsIngester.fs +++ b/feed-source/Domain/TicketsIngester.fs @@ -46,11 +46,11 @@ type ServiceForFc internal (log: Serilog.ILogger, fcId, epochs: TicketsEpoch.Ing return! Async.Parallel(seq { for epochId in (max 0 (%startingId - lookBack)) .. (%startingId - 1) -> readEpoch %epochId }, loadDop) } // Tickets cache - used to maintain a list of tickets that have already been ingested in order to avoid db round-trips - let previousTickets: AsyncCacheCell = + let previousTickets: TaskCell = let aux = async { let! batches = loadPreviousEpochs 4 return IdsCache.Create(Seq.concat batches) } - AsyncCacheCell(fun ct -> Async.StartAsTask(aux, cancellationToken = ct)) + TaskCell(fun ct -> Async.StartAsTask(aux, cancellationToken = ct)) let tryIngest items = async { let! previousTickets = previousTickets.Await() diff --git a/feed-source/Domain/TicketsSeries.fs b/feed-source/Domain/TicketsSeries.fs index 961897c7d..0fe819dcd 100644 --- a/feed-source/Domain/TicketsSeries.fs +++ b/feed-source/Domain/TicketsSeries.fs @@ -4,9 +4,8 @@ /// Can also be used to walk back through time to visit every ticket there has ever been for correlation purposes module FeedSourceTemplate.Domain.TicketsSeries -module private Stream = - let [] Category = "Tickets" - let id = FsCodec.StreamId.gen TicketsSeriesId.toString +let [] private CategoryName = "Tickets" +let private streamId = FsCodec.StreamId.gen TicketsSeriesId.toString // NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care [] @@ -69,8 +68,8 @@ module Factory = // For now we have a single global sequence. This provides us an extension point should we ever need to reprocess // NOTE we use a custom id in order to isolate data for acceptance tests let seriesId = defaultArg seriesId TicketsSeriesId.wellKnownId - Service(seriesId, Stream.id >> resolve) + Service(seriesId, streamId >> resolve) let private (|Category|) = function - | Store.Config.Memory store -> Store.Memory.create Stream.Category Events.codec Fold.initial Fold.fold store - | Store.Config.Cosmos (context, cache) -> Store.Cosmos.createSnapshotted Stream.Category Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache) + | Store.Config.Memory store -> Store.Memory.create CategoryName Events.codec Fold.initial Fold.fold store + | Store.Config.Cosmos (context, cache) -> Store.Cosmos.createSnapshotted CategoryName Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache) let create seriesOverride (Category cat) = create_ seriesOverride (Store.createDecider cat) diff --git a/feed-source/Domain/Types.fs b/feed-source/Domain/Types.fs index c03f8bb49..cb5134be4 100644 --- a/feed-source/Domain/Types.fs +++ b/feed-source/Domain/Types.fs @@ -57,4 +57,4 @@ module TicketsCheckpoint = let toEpochAndOffset (value: TicketsCheckpoint): TicketsEpochId * int = let d, r = System.Math.DivRem(%value, 1_000_000L) - (%int %d: TicketsEpochId), int r + (%int %d: TicketsEpochId), int r \ No newline at end of file diff --git a/feed-source/FeedApi/FeedApi.fsproj b/feed-source/FeedApi/FeedApi.fsproj index 66c22718e..a95cf5cb1 100644 --- a/feed-source/FeedApi/FeedApi.fsproj +++ b/feed-source/FeedApi/FeedApi.fsproj @@ -19,7 +19,7 @@ - + diff --git a/periodic-ingester/Ingester.fs b/periodic-ingester/Ingester.fs index 74c56321a..d7fd60738 100644 --- a/periodic-ingester/Ingester.fs +++ b/periodic-ingester/Ingester.fs @@ -30,17 +30,16 @@ module PipelineEvent = (* Each item fed into the Sink has a StreamName associated with it, just as with a regular source based on a change feed *) - let [] Category = "Ticket" - let id = FsCodec.StreamId.gen TicketId.toString - let decodeId = FsCodec.StreamId.dec TicketId.parse - let name = id >> FsCodec.StreamName.create Category - let [] (|For|_|) = FsCodec.StreamName.tryFind Category >> ValueOption.map decodeId + let [] CategoryName = "Ticket" + let private streamId = FsCodec.StreamId.gen TicketId.toString + let private catId = CategoryId(CategoryName, streamId, FsCodec.StreamId.dec TicketId.parse) + let [] (|For|_|) = catId.TryDecode (* Each item per stream is represented as an event; if multiple events have been found for a given stream, they are delivered together *) let private dummyEventData = let dummyEventType, noBody = "eventType", Unchecked.defaultof<_> in FsCodec.Core.EventData.Create(dummyEventType, noBody) let sourceItemOfTicketIdAndData struct (id: TicketId, data: TicketData): Propulsion.Feed.SourceItem = - { streamName = name id; eventData = dummyEventData; context = box data } + { streamName = catId.StreamName id; eventData = dummyEventData; context = box data } let [] (|TicketEvents|_|) = function | For ticketId, (s: Propulsion.Sinks.Event[]) -> ValueSome (ticketId, s |> Seq.map (fun e -> Unchecked.unbox e.Context)) diff --git a/periodic-ingester/PeriodicIngester.fsproj b/periodic-ingester/PeriodicIngester.fsproj index 9fbaaa7eb..710f612e0 100644 --- a/periodic-ingester/PeriodicIngester.fsproj +++ b/periodic-ingester/PeriodicIngester.fsproj @@ -17,11 +17,11 @@ - - + + - - + + diff --git a/periodic-ingester/Types.fs b/periodic-ingester/Types.fs index 68588c3c3..16d517e54 100644 --- a/periodic-ingester/Types.fs +++ b/periodic-ingester/Types.fs @@ -9,5 +9,10 @@ module TicketId = let parse (value: string): TicketId = let raw = value in % raw let (|Parse|) = parse +/// Handles symmetric generation and decoding of StreamNames composed of a series of elements via the FsCodec.StreamId helpers +type internal CategoryId<'elements>(name, gen: 'elements -> FsCodec.StreamId, dec: FsCodec.StreamId -> 'elements) = + member _.StreamName = gen >> FsCodec.StreamName.create name + member _.TryDecode = FsCodec.StreamName.tryFind name >> ValueOption.map dec + [] type IngestionOutcome = Changed | Unchanged | Stale diff --git a/propulsion-archiver/Archiver.fsproj b/propulsion-archiver/Archiver.fsproj index b85a11dc4..b80dd2445 100644 --- a/propulsion-archiver/Archiver.fsproj +++ b/propulsion-archiver/Archiver.fsproj @@ -14,8 +14,8 @@ - - + + diff --git a/propulsion-archiver/Handler.fs b/propulsion-archiver/Handler.fs index 2c0f8f549..ede0db445 100644 --- a/propulsion-archiver/Handler.fs +++ b/propulsion-archiver/Handler.fs @@ -19,9 +19,8 @@ let (|Archivable|NotArchivable|) = function NotArchivable let selectArchivable changeFeedDocument: Propulsion.Sinks.StreamEvent seq = seq { - for struct (s, _e) as batch in Propulsion.CosmosStore.EquinoxSystemTextJsonParser.enumStreamEvents categoryFilter changeFeedDocument do - let (FsCodec.StreamName.Category cat) = s - match cat with - | Archivable -> yield batch + for s, _e as se in Propulsion.CosmosStore.EquinoxSystemTextJsonParser.whereCategory categoryFilter changeFeedDocument do + match FsCodec.StreamName.Category.ofStreamName s with + | Archivable -> yield se | NotArchivable -> () } diff --git a/propulsion-archiver/Program.fs b/propulsion-archiver/Program.fs index ba7a867b8..667d74ee2 100644 --- a/propulsion-archiver/Program.fs +++ b/propulsion-archiver/Program.fs @@ -169,10 +169,9 @@ let build (args: Args.Arguments) = purgeInterval = TimeSpan.FromMinutes 10., maxBytes = maxBytes) let monitored, leases = args.Source.ConnectFeed() |> Async.RunSynchronously let source = - let observer = CosmosStoreSource.CreateObserver(log, archiverSink.StartIngester, Seq.collect Handler.selectArchivable) let startFromTail, maxItems, lagFrequency = args.Source.MonitoringParams - CosmosStoreSource.Start(log, monitored, leases, processorName, observer, - startFromTail = startFromTail, ?maxItems = maxItems, lagReportFreq = lagFrequency) + CosmosStoreSource(log, args.StatsInterval, monitored, leases, processorName, Handler.selectArchivable, archiverSink, + startFromTail = startFromTail, ?maxItems = maxItems, lagEstimationInterval = lagFrequency).Start() archiverSink, source // A typical app will likely have health checks etc, implying the wireup would be via `endpoints.MapMetrics()` and thus not use this ugly code directly diff --git a/propulsion-consumer/Consumer.fsproj b/propulsion-consumer/Consumer.fsproj index b0bb9bf2e..70acd13a5 100644 --- a/propulsion-consumer/Consumer.fsproj +++ b/propulsion-consumer/Consumer.fsproj @@ -15,8 +15,8 @@ - - + + diff --git a/propulsion-consumer/Examples.fs b/propulsion-consumer/Examples.fs index 4ca66a22b..896fa7d18 100644 --- a/propulsion-consumer/Examples.fs +++ b/propulsion-consumer/Examples.fs @@ -87,7 +87,8 @@ module MultiStreams = | SavedForLater.Reactions.Decode (id, events) -> let s = match saves.TryGetValue id with true, value -> value | false, _ -> [] SavedForLaterEvents (id, s, events) - | FsCodec.StreamName.Split (categoryName, _), events -> OtherCategory struct (categoryName, Array.length events) + | FsCodec.StreamName.Split (categoryName, _), events -> + OtherCategory struct (categoryName, Array.length events) // each event is guaranteed to only be supplied once by virtue of having been passed through the Streams Scheduler member _.Handle(streamName: FsCodec.StreamName, events: Propulsion.Sinks.Event[]) = async { @@ -124,7 +125,7 @@ module MultiStreams = inherit Propulsion.Streams.Stats(log, statsInterval, stateInterval) let mutable faves, saves = 0, 0 - let otherCats = Stats.CatStats() + let otherCats = Stats.Counters() override _.HandleOk res = res |> function | Faves count -> faves <- faves + count | Saves count -> saves <- saves + count @@ -162,7 +163,7 @@ module MultiMessages = type Processor() = let mutable favorited, unfavorited, saved, removed, cleared = 0, 0, 0, 0, 0 - let cats, keys = Stats.CatStats(), ConcurrentDictionary() + let cats, keys = Stats.Counters(), ConcurrentDictionary() // `BatchedConsumer` holds a `Processor` instance per in-flight batch (there will likely be a batch in flight per partition assigned to this consumer) // and waits for the work to complete before calling this @@ -182,7 +183,7 @@ module MultiMessages = match struct (streamName, Array.ofSeq raw) with | Favorites.Reactions.Decode (_, events) -> yield! events |> Seq.map Fave | SavedForLater.Reactions.Decode (_, events) -> yield! events |> Seq.map Save - | FsCodec.StreamName.Split (otherCategoryName, _), events -> yield OtherCat (otherCategoryName, events.Length) } + | FsCodec.StreamName.Split (otherCategoryName, _), events -> OtherCat (otherCategoryName, events.Length) } // NB can be called in parallel, so must be thread-safe member x.Handle(streamName: FsCodec.StreamName, spanJson: string) = diff --git a/propulsion-consumer/Infrastructure.fs b/propulsion-consumer/Infrastructure.fs index ff9312659..13edc297b 100644 --- a/propulsion-consumer/Infrastructure.fs +++ b/propulsion-consumer/Infrastructure.fs @@ -10,7 +10,7 @@ module Streams = let private renderBody (x: Propulsion.Sinks.EventBody) = System.Text.Encoding.UTF8.GetString(x.Span) // Uses the supplied codec to decode the supplied event record (iff at LogEventLevel.Debug, failures are logged, citing `stream` and `.Data`) let private tryDecode<'E> (codec: Propulsion.Sinks.Codec<'E>) (streamName: FsCodec.StreamName) event = - match codec.TryDecode event with + match codec.Decode event with | ValueNone when Log.IsEnabled Serilog.Events.LogEventLevel.Debug -> Log.ForContext("eventData", renderBody event.Data) .Debug("Codec {type} Could not decode {eventType} in {stream}", codec.GetType().FullName, event.EventType, streamName) diff --git a/propulsion-dynamostore-cdk/DynamoStore.Cdk.fsproj b/propulsion-dynamostore-cdk/DynamoStore.Cdk.fsproj index b6a3f75fd..86315b83b 100644 --- a/propulsion-dynamostore-cdk/DynamoStore.Cdk.fsproj +++ b/propulsion-dynamostore-cdk/DynamoStore.Cdk.fsproj @@ -21,9 +21,9 @@ - - - + + + diff --git a/propulsion-hotel/Domain/Domain.fsproj b/propulsion-hotel/Domain/Domain.fsproj index 4e209668a..a9f4c96bd 100644 --- a/propulsion-hotel/Domain/Domain.fsproj +++ b/propulsion-hotel/Domain/Domain.fsproj @@ -9,10 +9,10 @@ - - - - + + + + diff --git a/propulsion-hotel/Domain/GroupCheckout.fs b/propulsion-hotel/Domain/GroupCheckout.fs index 044dc416f..f0c473411 100644 --- a/propulsion-hotel/Domain/GroupCheckout.fs +++ b/propulsion-hotel/Domain/GroupCheckout.fs @@ -1,14 +1,12 @@ module Domain.GroupCheckout -module private Stream = - let [] Category = "GroupCheckout" - let id = FsCodec.StreamId.gen GroupCheckoutId.toString - let decodeId = FsCodec.StreamId.dec GroupCheckoutId.parse - let tryDecode = FsCodec.StreamName.tryFind Category >> ValueOption.map decodeId +let [] private CategoryName = "GroupCheckout" +let private streamId = FsCodec.StreamId.gen GroupCheckoutId.toString +let private catId = CategoryId(CategoryName, streamId, FsCodec.StreamId.dec GroupCheckoutId.parse) module Reactions = - let [] categoryName = Stream.Category - let [] (|For|_|) = Stream.tryDecode + let [] categoryName = CategoryName + let [] (|For|_|) = catId.TryDecode module Events = @@ -124,9 +122,9 @@ module Factory = let private (|Category|) = function | Store.Config.Memory store -> - Store.Memory.create Stream.Category Events.codec Fold.initial Fold.fold store + Store.Memory.create CategoryName Events.codec Fold.initial Fold.fold store | Store.Config.Dynamo (context, cache) -> - Store.Dynamo.createUnoptimized Stream.Category Events.codec Fold.initial Fold.fold (context, cache) + Store.Dynamo.createUnoptimized CategoryName Events.codec Fold.initial Fold.fold (context, cache) | Store.Config.Mdb (context, cache) -> - Store.Mdb.createUnoptimized Stream.Category Events.codec Fold.initial Fold.fold (context, cache) - let create (Category cat) = Stream.id >> Store.createDecider cat |> Service + Store.Mdb.createUnoptimized CategoryName Events.codec Fold.initial Fold.fold (context, cache) + let create (Category cat) = streamId >> Store.createDecider cat |> Service diff --git a/propulsion-hotel/Domain/GuestStay.fs b/propulsion-hotel/Domain/GuestStay.fs index a0ba03848..8adff6494 100644 --- a/propulsion-hotel/Domain/GuestStay.fs +++ b/propulsion-hotel/Domain/GuestStay.fs @@ -1,8 +1,7 @@ module Domain.GuestStay -module private Stream = - let [] Category = "GuestStay" - let id = FsCodec.StreamId.gen GuestStayId.toString +let [] private CategoryName = "GuestStay" +let private streamId = FsCodec.StreamId.gen GuestStayId.toString module Events = @@ -39,7 +38,7 @@ module Fold = | Events.Paid e -> Active { bal with balance = bal.balance - e.amount; payments = [| yield! bal.payments; e.paymentId |] } | Events.CheckedOut _ -> Closed | Events.TransferredToGroup e -> TransferredToGroup {| groupId = e.groupId; amount = e.residualBalance |} - | Closed _ | TransferredToGroup _ -> invalidOp "No events allowed after CheckedOut/TransferredToGroup" + | Closed | TransferredToGroup _ -> invalidOp "No events allowed after CheckedOut/TransferredToGroup" let fold: State -> Events.Event seq -> State = Seq.fold evolve module Decide = @@ -49,16 +48,16 @@ module Decide = let checkin at = function | Active { checkedInAt = None } -> [ Events.CheckedIn {| at = at |} ] | Active { checkedInAt = Some t } when t = at -> [] - | Active _ | Closed _ | TransferredToGroup _ -> invalidOp "Invalid checkin" + | Active _ | Closed | TransferredToGroup _ -> invalidOp "Invalid checkin" let charge at chargeId amount = function - | Closed _ | TransferredToGroup _ -> invalidOp "Cannot record charge for Closed account" + | Closed | TransferredToGroup _ -> invalidOp "Cannot record charge for Closed account" | Active bal -> if bal.charges |> Array.contains chargeId then [||] else [| Events.Charged {| at = at; chargeId = chargeId; amount = amount |} |] let payment at paymentId amount = function - | Closed _ | TransferredToGroup _ -> invalidOp "Cannot record payment for not opened account" // TODO fix message at source + | Closed | TransferredToGroup _ -> invalidOp "Cannot record payment for not opened account" // TODO fix message at source | Active bal -> if bal.payments |> Array.contains paymentId then [||] else [| Events.Paid {| at = at; paymentId = paymentId; amount = amount |} |] @@ -102,9 +101,9 @@ module Factory = let private (|Category|) = function | Store.Config.Memory store -> - Store.Memory.create Stream.Category Events.codec Fold.initial Fold.fold store + Store.Memory.create CategoryName Events.codec Fold.initial Fold.fold store | Store.Config.Dynamo (context, cache) -> - Store.Dynamo.createUnoptimized Stream.Category Events.codec Fold.initial Fold.fold (context, cache) + Store.Dynamo.createUnoptimized CategoryName Events.codec Fold.initial Fold.fold (context, cache) | Store.Config.Mdb (context, cache) -> - Store.Mdb.createUnoptimized Stream.Category Events.codec Fold.initial Fold.fold (context, cache) - let create (Category cat) = Service(Stream.id >> Store.createDecider cat) + Store.Mdb.createUnoptimized CategoryName Events.codec Fold.initial Fold.fold (context, cache) + let create (Category cat) = Service(streamId >> Store.createDecider cat) diff --git a/propulsion-hotel/Domain/Store.fs b/propulsion-hotel/Domain/Store.fs index 05cea16f2..09abb56e3 100644 --- a/propulsion-hotel/Domain/Store.fs +++ b/propulsion-hotel/Domain/Store.fs @@ -14,7 +14,7 @@ module Codec = module Memory = let create name codec initial fold store: Equinox.Category<_, _, _> = - Equinox.MemoryStore.MemoryStoreCategory(store, name, FsCodec.Deflate.EncodeUncompressed codec, fold, initial) + Equinox.MemoryStore.MemoryStoreCategory(store, name, FsCodec.Compression.EncodeUncompressed codec, fold, initial) let private defaultCacheDuration = System.TimeSpan.FromMinutes 20 let private cacheStrategy cache = Equinox.CachingStrategy.SlidingWindow (cache, defaultCacheDuration) @@ -24,7 +24,7 @@ module Dynamo = open Equinox.DynamoStore let private create name codec initial fold accessStrategy (context, cache) = - DynamoStoreCategory(context, name, FsCodec.Deflate.EncodeUncompressed codec, fold, initial, accessStrategy, cacheStrategy cache) + DynamoStoreCategory(context, name, FsCodec.Compression.EncodeUncompressed codec, fold, initial, accessStrategy, cacheStrategy cache) let createUnoptimized name codec initial fold (context, cache) = let accessStrategy = AccessStrategy.Unoptimized diff --git a/propulsion-hotel/Domain/Types.fs b/propulsion-hotel/Domain/Types.fs index 51ac41574..a19069661 100644 --- a/propulsion-hotel/Domain/Types.fs +++ b/propulsion-hotel/Domain/Types.fs @@ -26,6 +26,11 @@ type PaymentId = Guid type DateTimeOffset = System.DateTimeOffset type HashSet<'t> = System.Collections.Generic.HashSet<'t> +/// Handles symmetric generation and decoding of StreamNames composed of a series of elements via the FsCodec.StreamId helpers +type internal CategoryId<'elements>(name, gen: 'elements -> FsCodec.StreamId, dec: FsCodec.StreamId -> 'elements) = + member _.StreamName = gen >> FsCodec.StreamName.create name + member _.TryDecode = FsCodec.StreamName.tryFind name >> ValueOption.map dec + [] module DeciderExtensions = diff --git a/propulsion-hotel/README.md b/propulsion-hotel/README.md index 941a0a2c6..e27d1f63b 100644 --- a/propulsion-hotel/README.md +++ b/propulsion-hotel/README.md @@ -98,7 +98,7 @@ coordinate the transfer of the balance of the GuestStay balance onto the only the Stays that have not yet had either a success or failure outcome recorded for them. -## Reading Your Writes / `OverrideWritePosition` +## Reading Your Writes / `OverrideNextIndex` The process is not necessarily dependent on the Reactor being able to immediately see the events notified on the change feed. @@ -108,7 +108,7 @@ Examples: Handler attempts to read `GroupCheckout` state_ but that event has not yet propagated to the node from which the Handler reads_, the State will be `Flow.Action.Ready 0`, and the Handler will immediately yield a - `SpanResult.OverrideWritePosition 0` + `SpanResult.OverrideNextIndex 0` 2. if the read _does_ include the `StaysSelected`, then the State will be `Flow.Action.MergeStays { stays = (6 stayIds) }`. The @@ -117,7 +117,7 @@ Examples: the result is `StaysMerged { stays = [{stayId = ..., residual = ...], ...]}` and `MergesFailed { stays = [ (stayid) ]}`. After writing those two events to the stream, the version has moved from `1` to `3`, resulting in - `SpanResult.OverrideWritePosition 3`. + `SpanResult.OverrideNextIndex 3`. This implies one of the following two possible outcomes: @@ -125,7 +125,7 @@ Examples: discard events 1 and 2 on receipt from the change feed, without even entering the streams buffer (and no further handler invocations take place) - 2. At the point where Propulsion sees the `OverrideWritePosition`, events + 2. At the point where Propulsion sees the `OverrideNextIndex`, events and/or 2 have already been read and buffered ready for dispatch. In this case, the events are removed from the buffer immediately (and no further handler invocations take place) diff --git a/propulsion-hotel/Reactor.Integration/ReactorFixture.fs b/propulsion-hotel/Reactor.Integration/ReactorFixture.fs index 988b6e288..d24349870 100644 --- a/propulsion-hotel/Reactor.Integration/ReactorFixture.fs +++ b/propulsion-hotel/Reactor.Integration/ReactorFixture.fs @@ -35,10 +35,7 @@ type FixtureBase(messageSink, store, dumpStats, createSourceConfig) = if stats.StatsInterval.RemainingMs > 3000 then stats.StatsInterval.Trigger() stats.StatsInterval.SleepUntilTriggerCleared() - member _.Await(propagationDelay) = - match awaitReactions with - | Some f -> f propagationDelay |> Async.ofTask - | None -> async { () } + member _.Await(propagationDelay) = awaitReactions propagationDelay |> Async.ofTask interface IDisposable with diff --git a/propulsion-hotel/Reactor/Infrastructure.fs b/propulsion-hotel/Reactor/Infrastructure.fs index 568571989..7ad6c071e 100644 --- a/propulsion-hotel/Reactor/Infrastructure.fs +++ b/propulsion-hotel/Reactor/Infrastructure.fs @@ -35,7 +35,7 @@ type Equinox.DynamoStore.DynamoStoreConnector with member x.CreateClient() = x.LogConfiguration() - x.CreateDynamoDbClient() |> Equinox.DynamoStore.DynamoStoreClient + x.CreateDynamoStoreClient() type Equinox.DynamoStore.DynamoStoreClient with diff --git a/propulsion-hotel/Reactor/Reactor.fsproj b/propulsion-hotel/Reactor/Reactor.fsproj index fc41cdc4b..f1300e8a4 100644 --- a/propulsion-hotel/Reactor/Reactor.fsproj +++ b/propulsion-hotel/Reactor/Reactor.fsproj @@ -18,11 +18,11 @@ - + - - - + + + diff --git a/propulsion-hotel/Reactor/SourceArgs.fs b/propulsion-hotel/Reactor/SourceArgs.fs index fa3b20eff..b9ef56341 100644 --- a/propulsion-hotel/Reactor/SourceArgs.fs +++ b/propulsion-hotel/Reactor/SourceArgs.fs @@ -41,7 +41,7 @@ module Dynamo = | IndexTable _ -> "specify a table name for the index store. (optional if environment variable " + INDEX_TABLE + " specified. default: `Table`+`IndexSuffix`)" | IndexSuffix _ -> "specify a suffix for the index store. (optional if environment variable " + INDEX_TABLE + " specified. default: \"-index\")" | MaxItems _ -> "maximum events to load in a batch. Default: 100" - | FromTail _ -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." + | FromTail -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." type Arguments(c: Args.Configuration, p: ParseResults) = let conn = match p.TryGetResult RegionProfile |> Option.orElseWith (fun () -> c.DynamoRegion) with @@ -95,7 +95,7 @@ module Mdb = | CheckpointSchema _ -> $"Schema that should contain the checkpoints table. Optional if environment variable {SCHEMA} is defined" | BatchSize _ -> "maximum events to load in a batch. Default: 1000" | TailSleepIntervalMs _ -> "How long to sleep in ms once the consumer has hit the tail (default: 100ms)" - | FromTail _ -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." + | FromTail -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." type Arguments(c: Args.Configuration, p: ParseResults) = let writeConnStr = p.TryGetResult ConnectionString |> Option.defaultWith (fun () -> c.MdbConnectionString) let readConnStr = p.TryGetResult ReadConnectionString |> Option.orElseWith (fun () -> c.MdbReadConnectionString) |> Option.defaultValue writeConnStr diff --git a/propulsion-hotel/Reactor/SourceConfig.fs b/propulsion-hotel/Reactor/SourceConfig.fs index ec0009b36..71651d082 100644 --- a/propulsion-hotel/Reactor/SourceConfig.fs +++ b/propulsion-hotel/Reactor/SourceConfig.fs @@ -23,13 +23,13 @@ type SourceConfig = module SourceConfig = module Memory = open Propulsion.MemoryStore - let start log (sink: Propulsion.Sinks.Sink) (categories: string[]) - (store: Equinox.MemoryStore.VolatileStore<_>): Propulsion.Pipeline * (TimeSpan -> Task) option = + let start log (sink: Propulsion.Sinks.SinkPipeline) (categories: string[]) + (store: Equinox.MemoryStore.VolatileStore<_>): Propulsion.Pipeline * (TimeSpan -> Task) = let source = MemoryStoreSource(log, store, categories, sink) - source.Start(), Some (fun _propagationDelay -> source.Monitor.AwaitCompletion(ignoreSubsequent = false)) + source.Start(), fun _propagationDelay -> source.Monitor.AwaitCompletion(ignoreSubsequent = false) module Dynamo = open Propulsion.DynamoStore - let private create (log, storeLog) (sink: Propulsion.Sinks.Sink) categories + let private create (log, storeLog) (sink: Propulsion.Sinks.SinkPipeline) categories (indexContext, checkpoints, loadMode, startFromTail, batchSizeCutoff, tailSleepInterval, statsInterval) trancheIds = DynamoStoreSource( log, statsInterval, @@ -37,14 +37,14 @@ module SourceConfig = checkpoints, sink, loadMode, categories = categories, startFromTail = startFromTail, storeLog = storeLog, ?trancheIds = trancheIds) let start (log, storeLog) sink categories (indexContext, checkpoints, loadMode, startFromTail, batchSizeCutoff, tailSleepInterval, statsInterval) - : Propulsion.Pipeline * (TimeSpan -> Task) option = + : Propulsion.Pipeline * (TimeSpan -> Task) = let source = create (log, storeLog) sink categories (indexContext, checkpoints, loadMode, startFromTail, batchSizeCutoff, tailSleepInterval, statsInterval) None let source = source.Start() - source, Some (fun propagationDelay -> source.Monitor.AwaitCompletion(propagationDelay, ignoreSubsequent = false)) + source, fun propagationDelay -> source.Monitor.AwaitCompletion(propagationDelay, ignoreSubsequent = false) module Mdb = open Propulsion.MessageDb let start log sink categories (connectionString, checkpoints, startFromTail, batchSize, tailSleepInterval, statsInterval) - : Propulsion.Pipeline * (TimeSpan -> Task) option = + : Propulsion.Pipeline * (TimeSpan -> Task) = let source = MessageDbSource( log, statsInterval, @@ -52,9 +52,9 @@ module SourceConfig = checkpoints, sink, categories, startFromTail = startFromTail) let source = source.Start() - source, Some (fun propagationDelay -> source.Monitor.AwaitCompletion(propagationDelay, ignoreSubsequent = false)) + source, fun propagationDelay -> source.Monitor.AwaitCompletion(propagationDelay, ignoreSubsequent = false) - let start (log, storeLog) sink categories: SourceConfig -> Propulsion.Pipeline * (TimeSpan -> Task) option = function + let start (log, storeLog) sink categories: SourceConfig -> Propulsion.Pipeline * (TimeSpan -> Task) = function | SourceConfig.Memory volatileStore -> Memory.start log sink categories volatileStore | SourceConfig.Dynamo (indexContext, checkpoints, loading, startFromTail, batchSizeCutoff, tailSleepInterval, statsInterval) -> diff --git a/propulsion-indexer/App/App.fsproj b/propulsion-indexer/App/App.fsproj index 3ec78ee35..d58eee7bd 100644 --- a/propulsion-indexer/App/App.fsproj +++ b/propulsion-indexer/App/App.fsproj @@ -12,9 +12,9 @@ - + - + diff --git a/propulsion-indexer/Domain/Domain.fsproj b/propulsion-indexer/Domain/Domain.fsproj index b71ed65fe..77b64dfd3 100644 --- a/propulsion-indexer/Domain/Domain.fsproj +++ b/propulsion-indexer/Domain/Domain.fsproj @@ -7,9 +7,9 @@ - - - + + + diff --git a/propulsion-indexer/Domain/Streams.fs b/propulsion-indexer/Domain/Streams.fs index 87abfc2d6..811e73faf 100644 --- a/propulsion-indexer/Domain/Streams.fs +++ b/propulsion-indexer/Domain/Streams.fs @@ -11,7 +11,7 @@ module Codec = // Uses the supplied codec to decode the supplied event record (iff at LogEventLevel.Debug, failures are logged, citing `stream` and `.Data`) let internal tryDecode<'E> (codec: Propulsion.Sinks.Codec<'E>) (streamName: FsCodec.StreamName) event = - match codec.TryDecode event with + match codec.Decode event with | ValueNone when Log.IsEnabled Serilog.Events.LogEventLevel.Debug -> Log.ForContext("eventData", renderBody event.Data) .Debug("Codec {type} Could not decode {eventType} in {stream}", codec.GetType().FullName, event.EventType, streamName) diff --git a/propulsion-indexer/Indexer/Program.fs b/propulsion-indexer/Indexer/Program.fs index 17542de7f..8252e8433 100644 --- a/propulsion-indexer/Indexer/Program.fs +++ b/propulsion-indexer/Indexer/Program.fs @@ -117,7 +117,7 @@ module Args = | Verbose -> "request Verbose Logging from ChangeFeedProcessor and Store. Default: off" | LeaseContainer _ -> "specify Container Name (in this [target] Database) for Leases container. Default: `` + `-aux`." - | FromTail _ -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." + | FromTail -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." | MaxItems _ -> "maximum item count to request from the feed. Default: unlimited." | LagFreqM _ -> "specify frequency (minutes) to dump lag stats. Default: 1" and CosmosArguments(c: Args.Configuration, p: ParseResults) = @@ -156,11 +156,11 @@ let build (args: Args.Arguments) = async { let! contexts, monitored, leases = args.ConnectWithFeed(args.IsSnapshotting) let store = (contexts, Equinox.Cache(AppName, sizeMb = 10)) ||> Store.Cosmos.createConfig let parseFeedDoc, sink = - let mkParseAll () = Propulsion.CosmosStore.EquinoxSystemTextJsonParser.enumStreamEvents (fun _ -> true) + let mkParseAll () = Propulsion.CosmosStore.EquinoxSystemTextJsonParser.whereCategory (fun _ -> true) let mkSink stats handle = Factory.StartSink(Log.Logger, stats, maxConcurrentStreams, handle, maxReadAhead) match args.Action with | Args.Action.Index _ -> - let mkParseCats = Propulsion.CosmosStore.EquinoxSystemTextJsonParser.enumCategoryEvents + let mkParseCats = Propulsion.CosmosStore.EquinoxSystemTextJsonParser.ofCategories let stats = Indexer.Stats(Log.Logger, args.StatsInterval, args.StateInterval, args.Cosmos.Verbose) let handle = Indexer.Factory.createHandler store mkParseCats Indexer.sourceCategories, mkSink stats handle @@ -176,10 +176,9 @@ let build (args: Args.Arguments) = async { Log.Logger, maxReadAhead, eventsContext, maxConcurrentStreams, stats, purgeInterval = TimeSpan.FromHours 1, maxBytes = a.MaxBytes) let source = - let observer = Propulsion.CosmosStore.CosmosStoreSource.CreateObserver(Log.Logger, sink.StartIngester, Seq.collect parseFeedDoc) let startFromTail, maxItems, lagFrequency = args.Cosmos.MonitoringParams - Propulsion.CosmosStore.CosmosStoreSource.Start(Log.Logger, monitored, leases, processorName, observer, - startFromTail = startFromTail, ?maxItems = maxItems, lagReportFreq = lagFrequency) + Propulsion.CosmosStore.CosmosStoreSource(Log.Logger, args.StatsInterval, monitored, leases, processorName, parseFeedDoc, sink, + startFromTail = startFromTail, ?maxItems = maxItems, lagEstimationInterval = lagFrequency).Start() return sink, source } open Propulsion.Internal // AwaitKeyboardInterruptAsTaskCanceledException diff --git a/propulsion-projector/Handler.fs b/propulsion-projector/Handler.fs index b9bf3aa77..7813bef31 100644 --- a/propulsion-projector/Handler.fs +++ b/propulsion-projector/Handler.fs @@ -4,9 +4,7 @@ open Propulsion.Internal //#if cosmos #if parallelOnly // Here we pass the items directly through to the handler without parsing them -let mapToStreamItems (x: System.Collections.Generic.IReadOnlyCollection<'a>): seq<'a> = upcast x let categories = [||] // TODO add category names -#else // cosmos && !parallelOnly #endif // !parallelOnly //#endif // cosmos @@ -15,7 +13,8 @@ let categories = [||] // TODO add category names type ExampleOutput = { id: string } let serdes = FsCodec.SystemTextJson.Options.Default |> FsCodec.SystemTextJson.Serdes -let render (doc: System.Text.Json.JsonDocument) = +let render ((_s,e): Propulsion.Sinks.StreamEvent) = + let doc = unbox e.Context let r = doc.RootElement let gs (name: string) = let x = r.GetProperty name in x.GetString() let equinoxPartition, itemId = gs "p", gs "id" diff --git a/propulsion-projector/Infrastructure.fs b/propulsion-projector/Infrastructure.fs index 8e88e92dc..05ccdaab7 100644 --- a/propulsion-projector/Infrastructure.fs +++ b/propulsion-projector/Infrastructure.fs @@ -69,7 +69,7 @@ module Dynamo = let defaultCacheDuration = TimeSpan.FromMinutes 20. let private createCached name codec initial fold accessStrategy (context, cache) = let cacheStrategy = Equinox.CachingStrategy.SlidingWindow (cache, defaultCacheDuration) - DynamoStoreCategory(context, name, FsCodec.Deflate.EncodeTryDeflate codec, fold, initial, accessStrategy, cacheStrategy) + DynamoStoreCategory(context, name, FsCodec.Compression.EncodeTryCompress codec, fold, initial, accessStrategy, cacheStrategy) let createSnapshotted name codec initial fold (isOrigin, toSnapshot) (context, cache) = let accessStrategy = AccessStrategy.Snapshot (isOrigin, toSnapshot) @@ -82,7 +82,7 @@ type Equinox.DynamoStore.DynamoStoreConnector with member x.CreateClient() = x.LogConfiguration() - x.CreateDynamoDbClient() |> Equinox.DynamoStore.DynamoStoreClient + x.CreateDynamoStoreClient() type Equinox.DynamoStore.DynamoStoreClient with diff --git a/propulsion-projector/Program.fs b/propulsion-projector/Program.fs index e69291cf4..f8e9de2d2 100644 --- a/propulsion-projector/Program.fs +++ b/propulsion-projector/Program.fs @@ -18,12 +18,12 @@ module Args = | [] Broker of string | [] Topic of string #endif -#if cosmos +// #if cosmos | [] Cosmos of ParseResults -#endif -// #if dynamo - | [] Dynamo of ParseResults // #endif +#if dynamo + | [] Dynamo of ParseResults +#endif #if esdb | [] Esdb of ParseResults #endif @@ -40,12 +40,12 @@ module Args = | Broker _ -> "specify Kafka Broker, in host:port format. (optional if environment variable PROPULSION_KAFKA_BROKER specified)" | Topic _ -> "specify Kafka Topic Id. (optional if environment variable PROPULSION_KAFKA_TOPIC specified)" #endif -#if cosmos +// #if cosmos | Cosmos _ -> "specify CosmosDb input parameters" -#endif -// #if dynamo - | Dynamo _ -> "specify DynamoDb input parameters" // #endif +#if dynamo + | Dynamo _ -> "specify DynamoDb input parameters" +#endif #if esdb | Esdb _ -> "specify EventStore input parameters." #endif @@ -64,12 +64,12 @@ module Args = processorName, maxReadAhead, maxConcurrentProcessors) (processorName, maxReadAhead, maxConcurrentProcessors) member val Store = match p.GetSubCommand() with -#if cosmos +// #if cosmos | Cosmos p -> SourceArgs.Cosmos.Arguments(c, p) -#endif -// #if dynamo - | Dynamo p -> SourceArgs.Dynamo.Arguments(c, p) // #endif +#if dynamo + | Dynamo p -> SourceArgs.Dynamo.Arguments(c, p) +#endif #if esdb | Esdb p -> SourceArgs.Esdb.Arguments(c, p) #endif @@ -87,15 +87,15 @@ module Args = let cache = Equinox.Cache (appName, sizeMb = x.CacheSizeMb) match x.Store with | a -> -#if cosmos +// #if cosmos let monitored, leases = a.ConnectFeed() |> Async.RunSynchronously let buildSourceConfig log groupName = let startFromTail, maxItems, tailSleepInterval, lagFrequency = a.MonitoringParams let checkpointConfig = CosmosFeedConfig.Persistent (groupName, startFromTail, maxItems, lagFrequency) - SourceConfig.Cosmos (monitored, leases, checkpointConfig, tailSleepInterval) + SourceConfig.Cosmos (monitored, leases, checkpointConfig, tailSleepInterval, x.StatsInterval) buildSourceConfig, x.Sink, ignore -#endif -// #if dynamo +// #endif +#if dynamo let context = a.Connect() let buildSourceConfig log groupName = let indexContext, startFromTail, batchSizeCutoff, tailSleepInterval, streamsDop = a.MonitoringParams(log) @@ -103,7 +103,7 @@ module Args = let load = Propulsion.DynamoStore.WithData (streamsDop, context) SourceConfig.Dynamo (indexContext, checkpoints, load, startFromTail, batchSizeCutoff, tailSleepInterval, x.StatsInterval) buildSourceConfig, x.Sink, Equinox.DynamoStore.Core.Log.InternalMetrics.dump -// #endif +#endif #if esdb let connection = a.Connect(appName, EventStore.Client.NodePreference.Leader) let targetStore = a.ConnectTarget(cache) @@ -165,15 +165,14 @@ let build (args: Args.Arguments) = #if (cosmos && parallelOnly) // Custom logic for establishing the source, as we're not projecting StreamEvents - TODO could probably be generalized let source = - let mapToStreamItems (x: System.Collections.Generic.IReadOnlyCollection<'a>): seq<'a> = upcast x - let observer = Propulsion.CosmosStore.CosmosStoreSource.CreateObserver(Log.Logger, sink.StartIngester, Handler.mapToStreamItems) - match buildSourceConfig Log.Logger consumerGroupName with SourceConfig.Cosmos (monitoredContainer, leasesContainer, checkpoints, tailSleepInterval: TimeSpan) -> + match buildSourceConfig Log.Logger consumerGroupName with SourceConfig.Cosmos (monitoredContainer, leasesContainer, checkpoints, tailSleepInterval: TimeSpan, statsInterval) -> match checkpoints with | Ephemeral _ -> failwith "Unexpected" | Persistent (processorName, startFromTail, maxItems, lagFrequency) -> - Propulsion.CosmosStore.CosmosStoreSource.Start(Log.Logger, monitoredContainer, leasesContainer, consumerGroupName, observer, - startFromTail = startFromTail, ?maxItems=maxItems, lagReportFreq=lagFrequency) + let parseFeedDoc (x: System.Text.Json.JsonDocument) = seq { struct (FsCodec.StreamName.Internal.trust null, FsCodec.Core.TimelineEvent.Create(0L, "dummyEventType", data = ReadOnlyMemory.Empty, context = box x)) } + Propulsion.CosmosStore.CosmosStoreSource(Log.Logger, statsInterval, monitoredContainer, leasesContainer, processorName, parseFeedDoc, sink, + startFromTail = startFromTail, ?maxItems = maxItems, lagEstimationInterval = lagFrequency).Start() #else let source, _awaitReactions = let sourceConfig = buildSourceConfig Log.Logger consumerGroupName diff --git a/propulsion-projector/Projector.fsproj b/propulsion-projector/Projector.fsproj index c49562620..550027810 100644 --- a/propulsion-projector/Projector.fsproj +++ b/propulsion-projector/Projector.fsproj @@ -20,17 +20,17 @@ - - + + - - - - - - + + + + + + - + diff --git a/propulsion-projector/SourceArgs.fs b/propulsion-projector/SourceArgs.fs index c86f5362c..b54a0594e 100644 --- a/propulsion-projector/SourceArgs.fs +++ b/propulsion-projector/SourceArgs.fs @@ -54,7 +54,7 @@ module Cosmos = | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 30." | LeaseContainer _ -> "specify Container Name (in this [target] Database) for Leases container. Default: `SourceContainer` + `-aux`." - | FromTail _ -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." + | FromTail -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." | MaxItems _ -> "maximum item count to supply for the Change Feed query. Default: use response size limit" | LagFreqM _ -> "specify frequency (minutes) to dump lag stats. Default: 1" type Arguments(c: Args.Configuration, p: ParseResults) = @@ -109,7 +109,7 @@ module Dynamo = | IndexTable _ -> "specify a table name for the index store. (optional if environment variable " + Args.INDEX_TABLE + " specified. default: `Table`+`IndexSuffix`)" | IndexSuffix _ -> "specify a suffix for the index store. (optional if environment variable " + Args.INDEX_TABLE + " specified. default: \"-index\")" | MaxItems _ -> "maximum events to load in a batch. Default: 100" - | FromTail _ -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." + | FromTail -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." | StreamsDop _ -> "parallelism when loading events from Store Feed Source. Default 4" type Arguments(c: Configuration, p: ParseResults) = diff --git a/propulsion-projector/SourceConfig.fs b/propulsion-projector/SourceConfig.fs index 9f90c0bbb..0c3086663 100644 --- a/propulsion-projector/SourceConfig.fs +++ b/propulsion-projector/SourceConfig.fs @@ -5,13 +5,14 @@ open System.Threading.Tasks [] type SourceConfig = -#if (cosmos) +// #if cosmos | Cosmos of monitoredContainer: Microsoft.Azure.Cosmos.Container * leasesContainer: Microsoft.Azure.Cosmos.Container * checkpoints: CosmosFeedConfig * tailSleepInterval: TimeSpan -#endif -// #if dynamo + * statsInterval: TimeSpan +// #endif +#if dynamo | Dynamo of indexContext: Equinox.DynamoStore.DynamoStoreContext * checkpoints: Propulsion.Feed.IFeedCheckpointStore * loading: Propulsion.DynamoStore.EventLoadMode @@ -19,7 +20,7 @@ type SourceConfig = * batchSizeCutoff: int * tailSleepInterval: TimeSpan * statsInterval: TimeSpan -// #endif +#endif #if esdb | Esdb of client: EventStore.Client.EventStoreClient * checkpoints: Propulsion.Feed.IFeedCheckpointStore @@ -48,30 +49,29 @@ module SourceConfig = // #if cosmos module Cosmos = open Propulsion.CosmosStore - let start log (sink: Propulsion.Sinks.Sink) categories - (monitoredContainer, leasesContainer, checkpointConfig, tailSleepInterval): Propulsion.Pipeline * (TimeSpan -> Task) option = - let parseFeedDoc = EquinoxSystemTextJsonParser.enumCategoryEvents categories - let observer = CosmosStoreSource.CreateObserver(log, sink.StartIngester, Seq.collect parseFeedDoc) + let start log (sink: Propulsion.Sinks.SinkPipeline) categories + (monitoredContainer, leasesContainer, checkpointConfig, tailSleepInterval, statsInterval): Propulsion.Pipeline * (TimeSpan -> Task) = + let parseFeedDoc = EquinoxSystemTextJsonParser.ofCategories categories let source = match checkpointConfig with | Ephemeral processorName -> let withStartTime1sAgo (x: Microsoft.Azure.Cosmos.ChangeFeedProcessorBuilder) = x.WithStartTime(let t = DateTime.UtcNow in t.AddSeconds -1.) let lagFrequency = TimeSpan.FromMinutes 1. - CosmosStoreSource.Start(log, monitoredContainer, leasesContainer, processorName, observer, - startFromTail = true, customize = withStartTime1sAgo, tailSleepInterval = tailSleepInterval, - lagReportFreq = lagFrequency) + CosmosStoreSource(log, statsInterval, monitoredContainer, leasesContainer, processorName, parseFeedDoc, sink, + startFromTail = true, customize = withStartTime1sAgo, tailSleepInterval = tailSleepInterval, + lagEstimationInterval = lagFrequency).Start() | Persistent (processorName, startFromTail, maxItems, lagFrequency) -> - CosmosStoreSource.Start(log, monitoredContainer, leasesContainer, processorName, observer, - startFromTail = startFromTail, ?maxItems = maxItems, tailSleepInterval = tailSleepInterval, - lagReportFreq = lagFrequency) - source, None + CosmosStoreSource(log, statsInterval, monitoredContainer, leasesContainer, processorName, parseFeedDoc, sink, + startFromTail = startFromTail, ?maxItems = maxItems, tailSleepInterval = tailSleepInterval, + lagEstimationInterval = lagFrequency).Start() + source, fun propagationDelay -> source.Monitor.AwaitCompletion(propagationDelay, ignoreSubsequent = false) // #endif -// #if dynamo +#if dynamo module Dynamo = open Propulsion.DynamoStore - let start (log, storeLog) (sink: Propulsion.Sinks.Sink) categories - (indexContext, checkpoints, loadMode, startFromTail, batchSizeCutoff, tailSleepInterval, statsInterval): Propulsion.Pipeline * (TimeSpan -> Task) option = + let start (log, storeLog) (sink: Propulsion.Sinks.SinkPipeline) categories + (indexContext, checkpoints, loadMode, startFromTail, batchSizeCutoff, tailSleepInterval, statsInterval): Propulsion.Pipeline * (TimeSpan -> Task) = let source = DynamoStoreSource( log, statsInterval, @@ -79,43 +79,43 @@ module SourceConfig = checkpoints, sink, loadMode, categories = categories, startFromTail = startFromTail, storeLog = storeLog) let source = source.Start() - source, Some (fun propagationDelay -> source.Monitor.AwaitCompletion(propagationDelay, ignoreSubsequent = false)) -// #endif + source, fun propagationDelay -> source.Monitor.AwaitCompletion(propagationDelay, ignoreSubsequent = false) +#endif #if esdb module Esdb = open Propulsion.EventStoreDb - let start log (sink: Propulsion.Sinks.Sink) (categories: string[]) - (client, checkpoints, withData, startFromTail, batchSize, tailSleepInterval, statsInterval): Propulsion.Pipeline * (TimeSpan -> Task) option = + let start log (sink: Propulsion.Sinks.SinkPipeline) (categories: string[]) + (client, checkpoints, withData, startFromTail, batchSize, tailSleepInterval, statsInterval): Propulsion.Pipeline * (TimeSpan -> Task) = let source = EventStoreSource( log, statsInterval, client, batchSize, tailSleepInterval, checkpoints, sink, categories, withData = withData, startFromTail = startFromTail) let source = source.Start() - source, Some (fun propagationDelay -> source.Monitor.AwaitCompletion(propagationDelay, ignoreSubsequent = false)) + source, fun propagationDelay -> source.Monitor.AwaitCompletion(propagationDelay, ignoreSubsequent = false) #endif #if sss module Sss = open Propulsion.SqlStreamStore - let start log (sink: Propulsion.Sinks.Sink) (categories: string[]) - (client, checkpoints, withData, startFromTail, batchSize, tailSleepInterval, statsInterval): Propulsion.Pipeline * (TimeSpan -> Task) option = + let start log (sink: Propulsion.Sinks.SinkPipeline) (categories: string[]) + (client, checkpoints, withData, startFromTail, batchSize, tailSleepInterval, statsInterval): Propulsion.Pipeline * (TimeSpan -> Task) = let source = SqlStreamStoreSource( log, statsInterval, client, batchSize, tailSleepInterval, checkpoints, sink, categories, withData = withData, startFromTail = startFromTail) let source = source.Start() - source, Some (fun propagationDelay -> source.Monitor.AwaitCompletion(propagationDelay, ignoreSubsequent = false)) + source, fun propagationDelay -> source.Monitor.AwaitCompletion(propagationDelay, ignoreSubsequent = false) #endif - let start (log, storeLog) sink categories: SourceConfig -> Propulsion.Pipeline * (TimeSpan -> Task) option = function -#if cosmos - | SourceConfig.Cosmos (monitored, leases, checkpointConfig, tailSleepInterval) -> - Cosmos.start log sink categories (monitored, leases, checkpointConfig, tailSleepInterval) -#endif -// #if dynamo + let start (log, storeLog) sink categories: SourceConfig -> Propulsion.Pipeline * (TimeSpan -> Task) = function +// #if cosmos + | SourceConfig.Cosmos (monitored, leases, checkpointConfig, tailSleepInterval, statsInterval) -> + Cosmos.start log sink categories (monitored, leases, checkpointConfig, tailSleepInterval, statsInterval) +// #endif +#if dynamo | SourceConfig.Dynamo (indexContext, checkpoints, loadMode, startFromTail, batchSizeCutoff, tailSleepInterval, statsInterval) -> Dynamo.start (log, storeLog) sink categories (indexContext, checkpoints, loadMode, startFromTail, batchSizeCutoff, tailSleepInterval, statsInterval) -// #endif +#endif #if esdb | SourceConfig.Esdb (client, checkpoints, withData, startFromTail, batchSize, tailSleepInterval, statsInterval) -> Esdb.start log sink categories (client, checkpoints, withData, startFromTail, batchSize, tailSleepInterval, statsInterval) diff --git a/propulsion-projector/Store.fs b/propulsion-projector/Store.fs index 3f6cec893..e3cdc696e 100644 --- a/propulsion-projector/Store.fs +++ b/propulsion-projector/Store.fs @@ -25,7 +25,7 @@ module Cosmos = module Dynamo = let private createCached name codec initial fold accessStrategy (context, cache): Equinox.Category<_, _, _> = - Equinox.DynamoStore.DynamoStoreCategory(context, name, FsCodec.Deflate.EncodeUncompressed codec, fold, initial, accessStrategy, cacheStrategy cache) + Equinox.DynamoStore.DynamoStoreCategory(context, name, FsCodec.Compression.EncodeUncompressed codec, fold, initial, accessStrategy, cacheStrategy cache) let createSnapshotted name codec initial fold (isOrigin, toSnapshot) (context, cache) = let accessStrategy = Equinox.DynamoStore.AccessStrategy.Snapshot (isOrigin, toSnapshot) diff --git a/propulsion-pruner/Handler.fs b/propulsion-pruner/Handler.fs index a53d07b4f..f7c03af7b 100644 --- a/propulsion-pruner/Handler.fs +++ b/propulsion-pruner/Handler.fs @@ -26,8 +26,8 @@ let shouldPrune category (age: TimeSpan) = // NOTE - DANGEROUS - events submitted to the CosmosPruner get removed from the supplied Context! let selectPrunable changeFeedDocument: Propulsion.Streams.StreamEvent<_> seq = seq { let asOf = DateTimeOffset.UtcNow - for s, e in Propulsion.CosmosStore.EquinoxSystemTextJsonParser.enumStreamEvents categoryFilter changeFeedDocument do - let (FsCodec.StreamName.Category cat) = s + for s, e in Propulsion.CosmosStore.EquinoxSystemTextJsonParser.whereCategory categoryFilter changeFeedDocument do + let cat = FsCodec.StreamName.Category.ofStreamName s let age = asOf - e.Timestamp if shouldPrune cat age then yield s, removeDataAndMeta e diff --git a/propulsion-pruner/Program.fs b/propulsion-pruner/Program.fs index 0b838ace2..0da680d40 100644 --- a/propulsion-pruner/Program.fs +++ b/propulsion-pruner/Program.fs @@ -158,10 +158,9 @@ let build (args: Args.Arguments, log: ILogger) = CosmosStorePruner.Start(Log.Logger, args.MaxReadAhead, eventsContext, args.MaxWriters, stats) let monitored, leases = archive.ConnectFeed() |> Async.RunSynchronously let source = - let observer = CosmosStoreSource.CreateObserver(log.ForContext(), deletingEventsSink.StartIngester, Seq.collect Handler.selectPrunable) let startFromTail, maxItems, lagFrequency = args.Source.MonitoringParams - CosmosStoreSource.Start(log, monitored, leases, processorName, observer, - startFromTail = startFromTail, ?maxItems = maxItems, lagReportFreq = lagFrequency) + CosmosStoreSource(log, args.StatsInterval, monitored, leases, processorName, Handler.selectPrunable, deletingEventsSink, + startFromTail = startFromTail, ?maxItems = maxItems, lagEstimationInterval = lagFrequency).Start() deletingEventsSink, source // A typical app will likely have health checks etc, implying the wireup would be via `endpoints.MapMetrics()` and thus not use this ugly code directly diff --git a/propulsion-pruner/Pruner.fsproj b/propulsion-pruner/Pruner.fsproj index 7dd9c5ed1..1844d9eac 100644 --- a/propulsion-pruner/Pruner.fsproj +++ b/propulsion-pruner/Pruner.fsproj @@ -14,9 +14,9 @@ - + - + diff --git a/propulsion-reactor/Args.fs b/propulsion-reactor/Args.fs index f94b95b59..7c941a84a 100644 --- a/propulsion-reactor/Args.fs +++ b/propulsion-reactor/Args.fs @@ -55,7 +55,7 @@ module Cosmos = | [] RetriesWaitTime of float interface IArgParserTemplate with member p.Usage = p |> function - | Verbose _ -> "request verbose logging." + | Verbose -> "request verbose logging." | ConnectionMode _ -> "override the connection mode. Default: Direct." | Connection _ -> "specify a connection string for a Cosmos account. (optional if environment variable EQUINOX_COSMOS_CONNECTION specified)" | Database _ -> "specify a database name for Cosmos store. (optional if environment variable EQUINOX_COSMOS_DATABASE specified)" diff --git a/propulsion-reactor/Contract.fs b/propulsion-reactor/Contract.fs index 9827b7d9b..c6df63162 100644 --- a/propulsion-reactor/Contract.fs +++ b/propulsion-reactor/Contract.fs @@ -19,9 +19,9 @@ let ofState (state: Todo.Fold.State): SummaryInfo = #if blank module Input = - let [] Category = "CategoryName" + let [] CategoryName = "CategoryName" let decodeId = FsCodec.StreamId.dec ClientId.parse - let tryDecode = FsCodec.StreamName.tryFind Category >> ValueOption.map decodeId + let tryDecode = FsCodec.StreamName.tryFind CategoryName >> ValueOption.map decodeId type Value = { field: int } type Event = diff --git a/propulsion-reactor/Handler.fs b/propulsion-reactor/Handler.fs index aad5f8b20..280ef6cfa 100644 --- a/propulsion-reactor/Handler.fs +++ b/propulsion-reactor/Handler.fs @@ -42,7 +42,7 @@ let generate stream version summary = Propulsion.Codec.NewtonsoftJson.RenderedSummary.ofStreamEvent stream version event #if blank -let categories = [| Contract.Input.Category |] +let categories = [| Contract.Input.CategoryName |] let handle (produceSummary: Propulsion.Codec.NewtonsoftJson.RenderedSummary -> Async) diff --git a/propulsion-reactor/Infrastructure.fs b/propulsion-reactor/Infrastructure.fs index 2f97f54c8..3c47e6921 100644 --- a/propulsion-reactor/Infrastructure.fs +++ b/propulsion-reactor/Infrastructure.fs @@ -27,7 +27,7 @@ module Streams = let private renderBody (x: Propulsion.Sinks.EventBody) = System.Text.Encoding.UTF8.GetString(x.Span) // Uses the supplied codec to decode the supplied event record (iff at LogEventLevel.Debug, failures are logged, citing `stream` and `.Data`) let private tryDecode<'E> (codec: Propulsion.Sinks.Codec<'E>) (streamName: FsCodec.StreamName) event = - match codec.TryDecode event with + match codec.Decode event with | ValueNone when Log.IsEnabled Serilog.Events.LogEventLevel.Debug -> Log.ForContext("eventData", renderBody event.Data) .Debug("Codec {type} Could not decode {eventType} in {stream}", codec.GetType().FullName, event.EventType, streamName) @@ -100,7 +100,7 @@ module Dynamo = let defaultCacheDuration = TimeSpan.FromMinutes 20. let private createCached name codec initial fold accessStrategy (context, cache) = let cacheStrategy = Equinox.CachingStrategy.SlidingWindow (cache, defaultCacheDuration) - DynamoStoreCategory(context, name, FsCodec.Deflate.EncodeTryDeflate codec, fold, initial, accessStrategy, cacheStrategy) + DynamoStoreCategory(context, name, FsCodec.Compression.EncodeTryCompress codec, fold, initial, accessStrategy, cacheStrategy) let createSnapshotted name codec initial fold (isOrigin, toSnapshot) (context, cache) = let accessStrategy = AccessStrategy.Snapshot (isOrigin, toSnapshot) @@ -113,7 +113,7 @@ type Equinox.DynamoStore.DynamoStoreConnector with member x.CreateClient() = x.LogConfiguration() - x.CreateDynamoDbClient() |> Equinox.DynamoStore.DynamoStoreClient + x.CreateDynamoStoreClient() type Equinox.DynamoStore.DynamoStoreClient with diff --git a/propulsion-reactor/Program.fs b/propulsion-reactor/Program.fs index b0e06a73f..aeebbb5fd 100644 --- a/propulsion-reactor/Program.fs +++ b/propulsion-reactor/Program.fs @@ -106,7 +106,7 @@ module Args = let buildSourceConfig log groupName = let startFromTail, maxItems, tailSleepInterval, lagFrequency = a.MonitoringParams let checkpointConfig = CosmosFeedConfig.Persistent (groupName, startFromTail, maxItems, lagFrequency) - SourceConfig.Cosmos (monitored, leases, checkpointConfig, tailSleepInterval) + SourceConfig.Cosmos (monitored, leases, checkpointConfig, tailSleepInterval, x.StatsInterval) let store = Store.Config.Cosmos (context, cache) #if blank let targetStore = store diff --git a/propulsion-reactor/Reactor.fsproj b/propulsion-reactor/Reactor.fsproj index d7b8376db..93cb72244 100644 --- a/propulsion-reactor/Reactor.fsproj +++ b/propulsion-reactor/Reactor.fsproj @@ -35,14 +35,14 @@ - - - - - - + + + + + + - + diff --git a/propulsion-reactor/SourceArgs.fs b/propulsion-reactor/SourceArgs.fs index 5c67e0e62..5fb332df3 100644 --- a/propulsion-reactor/SourceArgs.fs +++ b/propulsion-reactor/SourceArgs.fs @@ -106,7 +106,7 @@ module Cosmos = | Retries _ -> "specify operation retries. Default: 9." | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 30." | LeaseContainer _ -> "specify Container Name (in this [target] Database) for Leases container. Default: `SourceContainer` + `-aux`." - | FromTail _ -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." + | FromTail -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." | MaxItems _ -> "maximum item count to supply for the Change Feed query. Default: use response size limit" | LagFreqM _ -> "specify frequency (minutes) to dump lag stats. Default: 1" #if !(kafka && blank) @@ -177,7 +177,7 @@ module Dynamo = | IndexTable _ -> "specify a table name for the index store. (optional if environment variable " + Args.INDEX_TABLE + " specified. default: `Table`+`IndexSuffix`)" | IndexSuffix _ -> "specify a suffix for the index store. (optional if environment variable " + Args.INDEX_TABLE + " specified. default: \"-index\")" | MaxItems _ -> "maximum events to load in a batch. Default: 100" - | FromTail _ -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." + | FromTail -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." | StreamsDop _ -> "parallelism when loading events from Store Feed Source. Default 4" #if !(kafka && blank) | Cosmos _ -> "CosmosDb Sink parameters." diff --git a/propulsion-reactor/SourceConfig.fs b/propulsion-reactor/SourceConfig.fs index 9b650e099..0069f275e 100644 --- a/propulsion-reactor/SourceConfig.fs +++ b/propulsion-reactor/SourceConfig.fs @@ -9,6 +9,7 @@ type SourceConfig = * leasesContainer: Microsoft.Azure.Cosmos.Container * checkpoints: CosmosFeedConfig * tailSleepInterval: TimeSpan + * statsInterval: TimeSpan | Dynamo of indexContext: Equinox.DynamoStore.DynamoStoreContext * checkpoints: Propulsion.Feed.IFeedCheckpointStore * loading: Propulsion.DynamoStore.EventLoadMode @@ -37,28 +38,27 @@ and [] CosmosFeedConfig = module SourceConfig = module Cosmos = open Propulsion.CosmosStore - let start log (sink: Propulsion.Sinks.Sink) categories - (monitoredContainer, leasesContainer, checkpointConfig, tailSleepInterval): Propulsion.Pipeline * (TimeSpan -> Task) option = - let parseFeedDoc = EquinoxSystemTextJsonParser.enumCategoryEvents categories - let observer = CosmosStoreSource.CreateObserver(log, sink.StartIngester, Seq.collect parseFeedDoc) + let start log (sink: Propulsion.Sinks.SinkPipeline) categories + (monitoredContainer, leasesContainer, checkpointConfig, tailSleepInterval, statsInterval): Propulsion.Pipeline * (TimeSpan -> Task) = + let parseFeedDoc = EquinoxSystemTextJsonParser.ofCategories categories let source = match checkpointConfig with | Ephemeral processorName -> let withStartTime1sAgo (x: Microsoft.Azure.Cosmos.ChangeFeedProcessorBuilder) = x.WithStartTime(let t = DateTime.UtcNow in t.AddSeconds -1.) let lagFrequency = TimeSpan.FromMinutes 1. - CosmosStoreSource.Start(log, monitoredContainer, leasesContainer, processorName, observer, - startFromTail = true, customize = withStartTime1sAgo, tailSleepInterval = tailSleepInterval, - lagReportFreq = lagFrequency) + CosmosStoreSource(log, statsInterval, monitoredContainer, leasesContainer, processorName, parseFeedDoc, sink, + startFromTail = true, customize = withStartTime1sAgo, tailSleepInterval = tailSleepInterval, + lagEstimationInterval = lagFrequency).Start() | Persistent (processorName, startFromTail, maxItems, lagFrequency) -> - CosmosStoreSource.Start(log, monitoredContainer, leasesContainer, processorName, observer, - startFromTail = startFromTail, ?maxItems = maxItems, tailSleepInterval = tailSleepInterval, - lagReportFreq = lagFrequency) - source, None + CosmosStoreSource(log, statsInterval, monitoredContainer, leasesContainer, processorName, parseFeedDoc, sink, + startFromTail = startFromTail, ?maxItems = maxItems, tailSleepInterval = tailSleepInterval, + lagEstimationInterval = lagFrequency).Start() + source, fun propagationDelay -> source.Monitor.AwaitCompletion(propagationDelay, ignoreSubsequent = false) module Dynamo = open Propulsion.DynamoStore - let start (log, storeLog) (sink: Propulsion.Sinks.Sink) categories - (indexContext, checkpoints, loadMode, startFromTail, batchSizeCutoff, tailSleepInterval, statsInterval): Propulsion.Pipeline * (TimeSpan -> Task) option = + let start (log, storeLog) (sink: Propulsion.Sinks.SinkPipeline) categories + (indexContext, checkpoints, loadMode, startFromTail, batchSizeCutoff, tailSleepInterval, statsInterval): Propulsion.Pipeline * (TimeSpan -> Task) = let source = DynamoStoreSource( log, statsInterval, @@ -66,33 +66,33 @@ module SourceConfig = checkpoints, sink, loadMode, categories = categories, startFromTail = startFromTail, storeLog = storeLog) let source = source.Start() - source, Some (fun propagationDelay -> source.Monitor.AwaitCompletion(propagationDelay, ignoreSubsequent = false)) + source, fun propagationDelay -> source.Monitor.AwaitCompletion(propagationDelay, ignoreSubsequent = false) module Esdb = open Propulsion.EventStoreDb - let start log (sink: Propulsion.Sinks.Sink) categories - (client, checkpoints, withData, startFromTail, batchSize, tailSleepInterval, statsInterval): Propulsion.Pipeline * (TimeSpan -> Task) option = + let start log (sink: Propulsion.Sinks.SinkPipeline) categories + (client, checkpoints, withData, startFromTail, batchSize, tailSleepInterval, statsInterval): Propulsion.Pipeline * (TimeSpan -> Task) = let source = EventStoreSource( log, statsInterval, client, batchSize, tailSleepInterval, checkpoints, sink, categories, withData = withData, startFromTail = startFromTail) let source = source.Start() - source, Some (fun propagationDelay -> source.Monitor.AwaitCompletion(propagationDelay, ignoreSubsequent = false)) + source, fun propagationDelay -> source.Monitor.AwaitCompletion(propagationDelay, ignoreSubsequent = false) module Sss = open Propulsion.SqlStreamStore - let start log (sink: Propulsion.Sinks.Sink) categories - (client, checkpoints, withData, startFromTail, batchSize, tailSleepInterval, statsInterval): Propulsion.Pipeline * (TimeSpan -> Task) option = + let start log (sink: Propulsion.Sinks.SinkPipeline) categories + (client, checkpoints, withData, startFromTail, batchSize, tailSleepInterval, statsInterval): Propulsion.Pipeline * (TimeSpan -> Task) = let source = SqlStreamStoreSource( log, statsInterval, client, batchSize, tailSleepInterval, checkpoints, sink, categories, withData = withData, startFromTail = startFromTail) let source = source.Start() - source, Some (fun propagationDelay -> source.Monitor.AwaitCompletion(propagationDelay, ignoreSubsequent = false)) + source, fun propagationDelay -> source.Monitor.AwaitCompletion(propagationDelay, ignoreSubsequent = false) - let start (log, storeLog) sink categories: SourceConfig -> Propulsion.Pipeline * (TimeSpan -> Task) option = function - | SourceConfig.Cosmos (monitored, leases, checkpointConfig, tailSleepInterval) -> - Cosmos.start log sink categories (monitored, leases, checkpointConfig, tailSleepInterval) + let start (log, storeLog) sink categories: SourceConfig -> Propulsion.Pipeline * (TimeSpan -> Task) = function + | SourceConfig.Cosmos (monitored, leases, checkpointConfig, tailSleepInterval, statsInterval) -> + Cosmos.start log sink categories (monitored, leases, checkpointConfig, tailSleepInterval, statsInterval) | SourceConfig.Dynamo (indexContext, checkpoints, loadMode, startFromTail, batchSizeCutoff, tailSleepInterval, statsInterval) -> Dynamo.start (log, storeLog) sink categories (indexContext, checkpoints, loadMode, startFromTail, batchSizeCutoff, tailSleepInterval, statsInterval) | SourceConfig.Esdb (client, checkpoints, withData, startFromTail, batchSize, tailSleepInterval, statsInterval) -> diff --git a/propulsion-reactor/Store.fs b/propulsion-reactor/Store.fs index 993b65f31..78d03548c 100644 --- a/propulsion-reactor/Store.fs +++ b/propulsion-reactor/Store.fs @@ -31,7 +31,7 @@ module Dynamo = let private createCached name codec initial fold accessStrategy (context, cache): Equinox.Category<_, _, _> = let cacheStrategy = Equinox.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) - Equinox.DynamoStore.DynamoStoreCategory(context, name, FsCodec.Deflate.EncodeUncompressed codec, fold, initial, accessStrategy, cacheStrategy) + Equinox.DynamoStore.DynamoStoreCategory(context, name, FsCodec.Compression.EncodeUncompressed codec, fold, initial, accessStrategy, cacheStrategy) let createSnapshotted name codec initial fold (isOrigin, toSnapshot) (context, cache) = let accessStrategy = Equinox.DynamoStore.AccessStrategy.Snapshot (isOrigin, toSnapshot) diff --git a/propulsion-summary-consumer/Infrastructure.fs b/propulsion-summary-consumer/Infrastructure.fs index 98ca74c5e..65fbe6769 100644 --- a/propulsion-summary-consumer/Infrastructure.fs +++ b/propulsion-summary-consumer/Infrastructure.fs @@ -26,7 +26,7 @@ module Streams = let private renderBody (x: Propulsion.Sinks.EventBody) = System.Text.Encoding.UTF8.GetString(x.Span) // Uses the supplied codec to decode the supplied event record (iff at LogEventLevel.Debug, failures are logged, citing `stream` and `.Data`) let private tryDecode<'E> (codec: Propulsion.Sinks.Codec<'E>) (streamName: FsCodec.StreamName) event = - match codec.TryDecode event with + match codec.Decode event with | ValueNone when Log.IsEnabled Serilog.Events.LogEventLevel.Debug -> Log.ForContext("eventData", renderBody event.Data) .Debug("Codec {type} Could not decode {eventType} in {stream}", codec.GetType().FullName, event.EventType, streamName) diff --git a/propulsion-summary-consumer/SummaryConsumer.fsproj b/propulsion-summary-consumer/SummaryConsumer.fsproj index 542a47f5c..8b69f1aa4 100644 --- a/propulsion-summary-consumer/SummaryConsumer.fsproj +++ b/propulsion-summary-consumer/SummaryConsumer.fsproj @@ -17,9 +17,9 @@ - - - + + + diff --git a/propulsion-sync/Program.fs b/propulsion-sync/Program.fs index 8d875519f..199fc6546 100644 --- a/propulsion-sync/Program.fs +++ b/propulsion-sync/Program.fs @@ -442,10 +442,10 @@ let transformV0 catFilter v0SchemaDocument: Propulsion.Streams.StreamEvent<_> se yield parsed } //#else let transformOrFilter catFilter changeFeedDocument: Propulsion.Sinks.StreamEvent seq = seq { - for FsCodec.StreamName.Category cat, _ as x in Propulsion.CosmosStore.EquinoxSystemTextJsonParser.enumStreamEvents catFilter changeFeedDocument do - // NB the `index` needs to be contiguous with existing events - IOW filtering needs to be at stream (and not event) level - if catFilter cat then - yield x } + for se in Propulsion.CosmosStore.EquinoxSystemTextJsonParser.whereCategory catFilter changeFeedDocument do + // NB the `index` needs to be contiguous with existing events, so filtering at stream or category level is OK + // However, individual events must feed through to the Sink or the Scheduler will be awaiting them + yield (*transform*) se } //#endif let [] AppName = "SyncTemplate" @@ -515,14 +515,14 @@ let build (args: Args.Arguments, log) = match args.SourceParams() with | Choice1Of2 (monitored, leases, processorName, startFromTail, maxItems, lagFrequency) -> #if marveleqx - let observer = Propulsion.CosmosStore.CosmosStoreSource.CreateObserver(Log.Logger, sink.StartIngester, Seq.collect (transformV0 streamFilter)) + let parseFeedDoc = transformV0 streamFilter #else - let observer = Propulsion.CosmosStore.CosmosStoreSource.CreateObserver(Log.Logger, sink.StartIngester, Seq.collect (transformOrFilter streamFilter)) + let parseFeedDoc = transformOrFilter streamFilter #endif let source = - Propulsion.CosmosStore.CosmosStoreSource.Start( - Log.Logger, monitored, leases, processorName, observer, startFromTail = startFromTail, - ?maxItems = maxItems, lagReportFreq = lagFrequency) + Propulsion.CosmosStore.CosmosStoreSource( + Log.Logger, args.StatsInterval, monitored, leases, processorName, parseFeedDoc, sink, startFromTail = startFromTail, + ?maxItems = maxItems, lagEstimationInterval = lagFrequency).Start() [ Async.AwaitKeyboardInterruptAsTaskCanceledException(); source.AwaitWithStopOnCancellation(); sink.AwaitWithStopOnCancellation() ] | Choice2Of2 (srcE, spec) -> match maybeDstCosmos with diff --git a/propulsion-sync/Sync.fsproj b/propulsion-sync/Sync.fsproj index ae3536fb6..aba94352d 100644 --- a/propulsion-sync/Sync.fsproj +++ b/propulsion-sync/Sync.fsproj @@ -14,10 +14,10 @@ - - + + - + diff --git a/propulsion-tracking-consumer/TrackingConsumer.fsproj b/propulsion-tracking-consumer/TrackingConsumer.fsproj index 5b9c31eef..b05e028e5 100644 --- a/propulsion-tracking-consumer/TrackingConsumer.fsproj +++ b/propulsion-tracking-consumer/TrackingConsumer.fsproj @@ -17,9 +17,9 @@ - - - + + +