From 42e5c578153c3c026337d20aae052fbf5490c2cb Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Sat, 3 Feb 2024 23:16:01 +0000 Subject: [PATCH] Minor cleanups --- equinox-patterns/Domain/Types.fs | 5 ++-- equinox-shipping/Domain/Types.fs | 4 ++- .../Watchdog.Integration/Generators.fs | 2 +- .../Watchdog.Integration/ReactorFixture.fs | 2 +- equinox-web/Domain/Todo.fs | 2 +- feed-source/FeedApi/Program.fs | 3 +-- propulsion-consumer/Examples.fs | 4 +-- propulsion-hotel/Domain/Domain.fsproj | 1 + propulsion-hotel/Domain/GroupCheckout.fs | 2 +- propulsion-hotel/Domain/Infrastructure.fs | 23 ++++++++++++++++ propulsion-hotel/Domain/Types.fs | 27 +++---------------- .../Reactor.Integration/ReactorFixture.fs | 7 +---- propulsion-indexer/Domain/Infrastructure.fs | 16 +++++------ propulsion-indexer/Domain/Store.fs | 5 ++-- propulsion-indexer/Domain/Todo.fs | 2 +- propulsion-reactor/Todo.fs | 25 +++++++++-------- propulsion-sync/Program.fs | 3 +-- 17 files changed, 66 insertions(+), 67 deletions(-) create mode 100644 propulsion-hotel/Domain/Infrastructure.fs diff --git a/equinox-patterns/Domain/Types.fs b/equinox-patterns/Domain/Types.fs index f73c2c57c..2bc5f8a60 100644 --- a/equinox-patterns/Domain/Types.fs +++ b/equinox-patterns/Domain/Types.fs @@ -1,7 +1,6 @@ namespace Patterns.Domain open FSharp.UMX -open System /// Identifies a single period within a temporally linked chain of periods /// Each Period commences with a Balance `BroughtForward` based on what the predecessor Period @@ -44,6 +43,8 @@ module ListSeriesId = let wellKnownId: ListSeriesId = UMX.tag "0" let toString: ListSeriesId -> string = UMX.untag +namespace global + module Guid = - let toStringN (g: Guid) = g.ToString "N" + let toStringN (g: System.Guid) = g.ToString "N" diff --git a/equinox-shipping/Domain/Types.fs b/equinox-shipping/Domain/Types.fs index 38c719b86..52ff745a2 100644 --- a/equinox-shipping/Domain/Types.fs +++ b/equinox-shipping/Domain/Types.fs @@ -19,14 +19,16 @@ module TransactionId = let parse (x: string): TransactionId = %x let (|Parse|) = parse +namespace global + module Seq = let inline chooseV f xs = seq { for x in xs do match f x with ValueSome v -> yield v | ValueNone -> () } module Guid = + let inline gen () = System.Guid.NewGuid() let inline toStringN (x: System.Guid) = x.ToString "N" - let generateStringN () = let g = System.Guid.NewGuid() in toStringN g /// Handles symmetric generation and decoding of StreamNames composed of a series of elements via the FsCodec.StreamId helpers type internal CategoryId<'elements>(name, gen: 'elements -> FsCodec.StreamId, dec: FsCodec.StreamId -> 'elements) = diff --git a/equinox-shipping/Watchdog.Integration/Generators.fs b/equinox-shipping/Watchdog.Integration/Generators.fs index 0f8e730c0..a317fdc2c 100644 --- a/equinox-shipping/Watchdog.Integration/Generators.fs +++ b/equinox-shipping/Watchdog.Integration/Generators.fs @@ -12,6 +12,6 @@ let genDefault<'t> = ArbMap.defaults |> ArbMap.generate<'t> type Custom = - static member GuidStringN() = genDefault |> Gen.map (Shipping.Domain.Guid.toStringN >> GuidStringN) |> Arb.fromGen + static member GuidStringN() = genDefault |> Gen.map (Guid.toStringN >> GuidStringN) |> Arb.fromGen [ |] )>] do() diff --git a/equinox-shipping/Watchdog.Integration/ReactorFixture.fs b/equinox-shipping/Watchdog.Integration/ReactorFixture.fs index 721167abe..8ca5ddc83 100644 --- a/equinox-shipping/Watchdog.Integration/ReactorFixture.fs +++ b/equinox-shipping/Watchdog.Integration/ReactorFixture.fs @@ -9,7 +9,7 @@ open System /// See SerilogLogFixture for details of how to expose complete diagnostic messages type FixtureBase(messageSink, store, dumpStats, createSourceConfig) = let serilogLog = new SerilogLogFixture(messageSink) // create directly to ensure correct sequencing and no loss of messages - let contextId = Shipping.Domain.Guid.generateStringN () + let contextId = Guid.gen () |> Guid.toStringN let manager = let maxDop = 4 Shipping.Domain.FinalizationProcess.Factory.create maxDop store diff --git a/equinox-web/Domain/Todo.fs b/equinox-web/Domain/Todo.fs index 300a7fd44..6f049d5d5 100644 --- a/equinox-web/Domain/Todo.fs +++ b/equinox-web/Domain/Todo.fs @@ -34,7 +34,7 @@ module Fold = let initial = { items = []; nextId = 0 } /// Compute State change implied by a given Event let evolve state = function - | Events.Added item -> { state with items = item :: state.items; nextId = state.nextId + 1 } + | Events.Added item -> { items = item :: state.items; nextId = state.nextId + 1 } | Events.Updated value -> { state with items = state.items |> List.map (function { id = id } when id = value.id -> value | item -> item) } | Events.Deleted e -> { state with items = state.items |> List.filter (fun x -> x.id <> e.id) } | Events.Cleared e -> { nextId = e.nextId; items = [] } diff --git a/feed-source/FeedApi/Program.fs b/feed-source/FeedApi/Program.fs index 173e2fa1c..79703febd 100644 --- a/feed-source/FeedApi/Program.fs +++ b/feed-source/FeedApi/Program.fs @@ -19,8 +19,7 @@ module Args = | [] Verbose | [] Cosmos of ParseResults interface IArgParserTemplate with - member a.Usage = - match a with + member a.Usage = a |> function | Verbose -> "request Verbose Logging. Default: off." | Cosmos _ -> "specify CosmosDB input parameters." and Arguments(config: Configuration, p: ParseResults) = diff --git a/propulsion-consumer/Examples.fs b/propulsion-consumer/Examples.fs index 896fa7d18..d6359cfac 100644 --- a/propulsion-consumer/Examples.fs +++ b/propulsion-consumer/Examples.fs @@ -118,8 +118,8 @@ module MultiStreams = // Dump stats relating to how much information is being held - note it's likely for requests to be in flighht during the call member _.DumpState(log: ILogger) = - log.Information(" Favorited {total}/{users}", faves.Values |> Seq.sumBy (fun x -> x.Count), faves.Count) - log.Information(" SavedForLater {total}/{users}", saves.Values |> Seq.sumBy (fun x -> x.Length), saves.Count) + log.Information(" Favorited {total}/{users}", faves.Values |> Seq.sumBy _.Count, faves.Count) + log.Information(" SavedForLater {total}/{users}", saves.Values |> Seq.sumBy _.Length, saves.Count) type Stats(log, statsInterval, stateInterval) = inherit Propulsion.Streams.Stats(log, statsInterval, stateInterval) diff --git a/propulsion-hotel/Domain/Domain.fsproj b/propulsion-hotel/Domain/Domain.fsproj index a9f4c96bd..2dce5221b 100644 --- a/propulsion-hotel/Domain/Domain.fsproj +++ b/propulsion-hotel/Domain/Domain.fsproj @@ -16,6 +16,7 @@ + diff --git a/propulsion-hotel/Domain/GroupCheckout.fs b/propulsion-hotel/Domain/GroupCheckout.fs index f0c473411..2d6358e5d 100644 --- a/propulsion-hotel/Domain/GroupCheckout.fs +++ b/propulsion-hotel/Domain/GroupCheckout.fs @@ -44,7 +44,7 @@ module Fold = | StaysMerged e -> { removePending (seq { for s in e.residuals -> s.stay }) state with checkedOut = Array.append state.checkedOut e.residuals - balance = state.balance + (e.residuals |> Seq.sumBy (fun x -> x.residual)) } + balance = state.balance + (e.residuals |> Seq.sumBy _.residual) } | MergesFailed e -> { removePending e.stays state with failed = Array.append state.failed e.stays } diff --git a/propulsion-hotel/Domain/Infrastructure.fs b/propulsion-hotel/Domain/Infrastructure.fs new file mode 100644 index 000000000..de3f9c0d8 --- /dev/null +++ b/propulsion-hotel/Domain/Infrastructure.fs @@ -0,0 +1,23 @@ +namespace global + +module Guid = + let gen () = System.Guid.NewGuid() + let parse: string -> System.Guid = System.Guid.Parse + let toStringN (x: System.Guid): string = x.ToString "N" + +type DateTimeOffset = System.DateTimeOffset +type HashSet<'t> = System.Collections.Generic.HashSet<'t> + +/// Handles symmetric generation and decoding of StreamNames composed of a series of elements via the FsCodec.StreamId helpers +type internal CategoryId<'elements>(name, gen: 'elements -> FsCodec.StreamId, dec: FsCodec.StreamId -> 'elements) = + member _.StreamName = gen >> FsCodec.StreamName.create name + member _.TryDecode = FsCodec.StreamName.tryFind name >> ValueOption.map dec + +[] +module DeciderExtensions = + + type Equinox.Decider<'E, 'S> with + + member x.TransactWithPostVersion(decide: 'S -> Async<'R * 'E[]>): Async<'R * int64> = + x.TransactEx((fun c -> decide c.State), + (fun r (c: Equinox.ISyncContext<'S>) -> (r, c.Version))) diff --git a/propulsion-hotel/Domain/Types.fs b/propulsion-hotel/Domain/Types.fs index a19069661..cd319e82b 100644 --- a/propulsion-hotel/Domain/Types.fs +++ b/propulsion-hotel/Domain/Types.fs @@ -1,41 +1,20 @@ namespace Domain open FSharp.UMX -open System - -module Guid = - let toString (x: Guid): string = x.ToString "N" type GroupCheckoutId = Guid and [] groupCheckoutId module GroupCheckoutId = - let toString: GroupCheckoutId -> string = UMX.untag >> Guid.toString - let parse: string -> GroupCheckoutId = Guid.Parse >> UMX.tag + let toString: GroupCheckoutId -> string = UMX.untag >> Guid.toStringN + let parse: string -> GroupCheckoutId = Guid.parse >> UMX.tag type GuestStayId = Guid and [] guestStayId module GuestStayId = - let toString: GuestStayId -> string = UMX.untag >> Guid.toString + let toString: GuestStayId -> string = UMX.untag >> Guid.toStringN type ChargeId = Guid and [] chargeId type PaymentId = Guid and [] paymentId - -type DateTimeOffset = System.DateTimeOffset -type HashSet<'t> = System.Collections.Generic.HashSet<'t> - -/// Handles symmetric generation and decoding of StreamNames composed of a series of elements via the FsCodec.StreamId helpers -type internal CategoryId<'elements>(name, gen: 'elements -> FsCodec.StreamId, dec: FsCodec.StreamId -> 'elements) = - member _.StreamName = gen >> FsCodec.StreamName.create name - member _.TryDecode = FsCodec.StreamName.tryFind name >> ValueOption.map dec - -[] -module DeciderExtensions = - - type Equinox.Decider<'E, 'S> with - - member x.TransactWithPostVersion(decide: 'S -> Async<'R * 'E[]>): Async<'R * int64> = - x.TransactEx((fun c -> decide c.State), - (fun r (c: Equinox.ISyncContext<'S>) -> (r, c.Version))) diff --git a/propulsion-hotel/Reactor.Integration/ReactorFixture.fs b/propulsion-hotel/Reactor.Integration/ReactorFixture.fs index d24349870..48921ac21 100644 --- a/propulsion-hotel/Reactor.Integration/ReactorFixture.fs +++ b/propulsion-hotel/Reactor.Integration/ReactorFixture.fs @@ -1,19 +1,14 @@ namespace Reactor.Integration -open Infrastructure open Propulsion.Internal open Reactor open System -module Guid = - - let generateStringN () = Guid.NewGuid() |> Domain.Guid.toString - /// XUnit Collection Fixture managing setup and disposal of Serilog.Log.Logger, a Reactor instance and the source passed from the concrete fixture /// See SerilogLogFixture for details of how to expose complete diagnostic messages type FixtureBase(messageSink, store, dumpStats, createSourceConfig) = let serilogLog = new SerilogLogFixture(messageSink) // create directly to ensure correct sequencing and no loss of messages - let contextId = Guid.generateStringN () + let contextId = Guid.gen () |> Guid.toStringN let handler = Handler.create store let log = Serilog.Log.Logger let stats = Handler.Stats(log, statsInterval = TimeSpan.FromMinutes 1, stateInterval = TimeSpan.FromMinutes 2, diff --git a/propulsion-indexer/Domain/Infrastructure.fs b/propulsion-indexer/Domain/Infrastructure.fs index 3e11ec691..677d06bc3 100644 --- a/propulsion-indexer/Domain/Infrastructure.fs +++ b/propulsion-indexer/Domain/Infrastructure.fs @@ -12,14 +12,6 @@ module TimeSpan = let seconds value = TimeSpan.FromSeconds value -/// ClientId strongly typed id; represented internally as a Guid; not used for storage so rendering is not significant -type ClientId = Guid -and [] clientId -module ClientId = - let toString (value: ClientId): string = Guid.toStringN %value - let parse (value: string): ClientId = let raw = Guid.Parse value in % raw - let (|Parse|) = parse - type Equinox.Decider<'e, 's> with member x.TransactWithPostVersion(decide: 's -> 'r * 'e[]) = @@ -27,3 +19,11 @@ type Equinox.Decider<'e, 's> with (fun res (c: Equinox.ISyncContext<_>) -> res, c.Version)) type DataMemberAttribute = System.Runtime.Serialization.DataMemberAttribute + +/// ClientId strongly typed id; represented internally as a Guid; not used for storage so rendering is not significant +type ClientId = Guid +and [] clientId +module ClientId = + let toString (value: ClientId): string = Guid.toStringN %value + let parse (value: string): ClientId = let raw = Guid.Parse value in % raw + let (|Parse|) = parse diff --git a/propulsion-indexer/Domain/Store.fs b/propulsion-indexer/Domain/Store.fs index 2240fa018..40369fe84 100644 --- a/propulsion-indexer/Domain/Store.fs +++ b/propulsion-indexer/Domain/Store.fs @@ -2,14 +2,15 @@ module Store module Metrics = - let log = Serilog.Log.ForContext("isMetric", true) + let [] PropertyTag = "isMetric" + let log = Serilog.Log.ForContext(PropertyTag, true) let createDecider cat = Equinox.Decider.forStream Metrics.log cat module Codec = let genJsonElement<'t when 't :> TypeShape.UnionContract.IUnionContract> = - FsCodec.SystemTextJson.CodecJsonElement.Create<'t>() // options = Options.Default + FsCodec.SystemTextJson.CodecJsonElement.Create<'t>() // options = Options.Default /// Implements a Service with a single method that visits the identified stream, with the following possible outcomes: /// 1) stream has a 'current' snapshot (per the `isCurrentSnapshot` predicate supplied to `Snapshot.create` and/or `fold'`:- diff --git a/propulsion-indexer/Domain/Todo.fs b/propulsion-indexer/Domain/Todo.fs index 54598f789..a5980db95 100644 --- a/propulsion-indexer/Domain/Todo.fs +++ b/propulsion-indexer/Domain/Todo.fs @@ -54,7 +54,7 @@ module Fold = /// Compute State change implied by a given Event let evolve s = function - | Events.Added item -> { s with items = item :: s.items; nextId = s.nextId + 1 } + | Events.Added item -> { items = item :: s.items; nextId = s.nextId + 1 } | Events.Updated value -> { s with items = s.items |> List.map (function { id = id } when id = value.id -> value | item -> item) } | Events.Deleted e -> { s with items = s.items |> List.filter (fun x -> x.id <> e.id) } | Events.Cleared e -> { nextId = e.nextId; items = [] } diff --git a/propulsion-reactor/Todo.fs b/propulsion-reactor/Todo.fs index 62437696b..8de3491e5 100644 --- a/propulsion-reactor/Todo.fs +++ b/propulsion-reactor/Todo.fs @@ -2,11 +2,10 @@ module ReactorTemplate.Todo open Propulsion.Internal -module private Stream = - let [] Category = "Todos" - let id = FsCodec.StreamId.gen ClientId.toString - let decodeId = FsCodec.StreamId.dec ClientId.parse - let tryDecode = FsCodec.StreamName.tryFind Category >> ValueOption.map decodeId +let [] CategoryName = "Todos" +let private streamId = FsCodec.StreamId.gen ClientId.toString +let private decodeId = FsCodec.StreamId.dec ClientId.parse +let private tryDecode = FsCodec.StreamName.tryFind CategoryName >> ValueOption.map decodeId // NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care module Events = @@ -26,13 +25,13 @@ module Events = module Reactions = - let categories = [| Stream.Category |] + let categories = [| CategoryName |] /// Allows us to skip producing summaries for events that we know won't result in an externally discernable change to the summary output let private impliesStateChange = function Events.Snapshotted _ -> false | _ -> true let private dec = Streams.Codec.gen - let [] (|For|_|) = Stream.tryDecode + let [] (|For|_|) = tryDecode let [] private (|Parse|_|) = function | struct (For clientId, _) & Streams.Decode dec events -> ValueSome struct (clientId, events) | _ -> ValueNone @@ -51,7 +50,7 @@ module Fold = let initial = { items = []; nextId = 0 } /// Compute State change implied by a given Event let evolve s = function - | Events.Added item -> { s with items = item :: s.items; nextId = s.nextId + 1 } + | Events.Added item -> { items = item :: s.items; nextId = s.nextId + 1 } | Events.Updated value -> { s with items = s.items |> List.map (function { id = id } when id = value.id -> value | item -> item) } | Events.Deleted e -> { s with items = s.items |> List.filter (fun x -> x.id <> e.id) } | Events.Cleared e -> { nextId = e.nextId; items = [] } @@ -75,10 +74,10 @@ type Service internal (resolve: ClientId -> Equinox.Decider Store.Dynamo.createSnapshotted Stream.Category Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache) - | Store.Config.Cosmos (context, cache) -> Store.Cosmos.createSnapshotted Stream.Category Events.codecJe Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache) + | Store.Config.Dynamo (context, cache) -> Store.Dynamo.createSnapshotted CategoryName Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache) + | Store.Config.Cosmos (context, cache) -> Store.Cosmos.createSnapshotted CategoryName Events.codecJe Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache) #if !(sourceKafka && kafka) - | Store.Config.Esdb (context, cache) -> Store.Esdb.create Stream.Category Events.codec Fold.initial Fold.fold (context, cache) - | Store.Config.Sss (context, cache) -> Store.Sss.create Stream.Category Events.codec Fold.initial Fold.fold (context, cache) + | Store.Config.Esdb (context, cache) -> Store.Esdb.create CategoryName Events.codec Fold.initial Fold.fold (context, cache) + | Store.Config.Sss (context, cache) -> Store.Sss.create CategoryName Events.codec Fold.initial Fold.fold (context, cache) #endif - let create (Category cat) = Service(Stream.id >> Store.createDecider cat) + let create (Category cat) = Service(streamId >> Store.createDecider cat) diff --git a/propulsion-sync/Program.fs b/propulsion-sync/Program.fs index 1e9183134..872bb89aa 100644 --- a/propulsion-sync/Program.fs +++ b/propulsion-sync/Program.fs @@ -7,7 +7,6 @@ open Propulsion.Kafka #endif open Serilog open System -open System.Threading type Configuration(tryGet) = @@ -210,7 +209,7 @@ module Args = | Gorge _ -> "Request Parallel readers phase during initial catchup, running one chunk (256MB) apart. Default: off" | StreamReaders _ -> "number of concurrent readers that will fetch a missing stream when in tailing mode. Default: 1. TODO: IMPLEMENT!" | Tail _ -> "attempt to read from tail at specified interval in Seconds. Default: 1" - | ForceRestart _ -> "Forget the current committed position; start from (and commit) specified position. Default: start from specified position or resume from committed." + | ForceRestart -> "Forget the current committed position; start from (and commit) specified position. Default: start from specified position or resume from committed." | BatchSize _ -> "maximum item count to request from feed. Default: 4096" | MinBatchSize _ -> "minimum item count to drop down to in reaction to read failures. Default: 512" | Position _ -> "EventStore $all Stream Position to commence from"