From d63f6933928d2cac0ff37a59f459892fc3035038 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Sun, 4 Feb 2024 08:33:13 +0000 Subject: [PATCH] feat(Indexer): Add upconverting export --- equinox-shipping/Watchdog/Infrastructure.fs | 5 +- propulsion-indexer/App/App.fsproj | 6 +- propulsion-indexer/App/Args.fs | 116 ++++++ propulsion-indexer/App/Configuration.fs | 16 - propulsion-indexer/App/CosmosDumpSource.fs | 33 ++ propulsion-indexer/App/Infrastructure.fs | 54 ++- propulsion-indexer/Domain/Domain.fsproj | 4 +- propulsion-indexer/Domain/Store.fs | 120 +++++- propulsion-indexer/Domain/Todo.fs | 20 +- propulsion-indexer/Indexer/Indexer.fs | 8 +- propulsion-indexer/Indexer/Indexer.fsproj | 6 +- propulsion-indexer/Indexer/Ingester.fs | 27 ++ propulsion-indexer/Indexer/Program.fs | 419 ++++++++++++++------ propulsion-indexer/Indexer/Snapshotter.fs | 60 +-- propulsion-indexer/Indexer/Visitor.fs | 82 ++++ propulsion-reactor/Infrastructure.fs | 5 +- 16 files changed, 746 insertions(+), 235 deletions(-) create mode 100644 propulsion-indexer/App/Args.fs delete mode 100644 propulsion-indexer/App/Configuration.fs create mode 100644 propulsion-indexer/App/CosmosDumpSource.fs create mode 100644 propulsion-indexer/Indexer/Ingester.fs create mode 100644 propulsion-indexer/Indexer/Visitor.fs diff --git a/equinox-shipping/Watchdog/Infrastructure.fs b/equinox-shipping/Watchdog/Infrastructure.fs index 30a176ed7..6cbfdb57c 100644 --- a/equinox-shipping/Watchdog/Infrastructure.fs +++ b/equinox-shipping/Watchdog/Infrastructure.fs @@ -95,9 +95,10 @@ module EventStoreContext = module OutcomeKind = - let [] (|StoreExceptions|_|) exn = + let [] (|StoreExceptions|_|) (exn: exn) = match exn with | Equinox.DynamoStore.Exceptions.ProvisionedThroughputExceeded | Equinox.CosmosStore.Exceptions.RateLimited -> Propulsion.Streams.OutcomeKind.RateLimited |> ValueSome - | Equinox.CosmosStore.Exceptions.RequestTimeout -> Propulsion.Streams.OutcomeKind.Timeout |> ValueSome + | Equinox.CosmosStore.Exceptions.RequestTimeout -> Propulsion.Streams.OutcomeKind.Tagged "cosmosTimeout" |> ValueSome + | :? System.Threading.Tasks.TaskCanceledException -> Propulsion.Streams.OutcomeKind.Tagged "taskCancelled" |> ValueSome | _ -> ValueNone diff --git a/propulsion-indexer/App/App.fsproj b/propulsion-indexer/App/App.fsproj index 1283a5d17..40eff004a 100644 --- a/propulsion-indexer/App/App.fsproj +++ b/propulsion-indexer/App/App.fsproj @@ -8,13 +8,17 @@ - + + + + + diff --git a/propulsion-indexer/App/Args.fs b/propulsion-indexer/App/Args.fs new file mode 100644 index 000000000..cb2d9956d --- /dev/null +++ b/propulsion-indexer/App/Args.fs @@ -0,0 +1,116 @@ +module App.Args + +open Argu +open System + +let [] CONNECTION = "EQUINOX_COSMOS_CONNECTION" +let [] DATABASE = "EQUINOX_COSMOS_DATABASE" +let [] CONTAINER = "EQUINOX_COSMOS_CONTAINER" +let [] VIEWS = "EQUINOX_COSMOS_VIEWS" + +type Configuration(tryGet: string -> string option) = + + let get key = match tryGet key with Some value -> value | None -> failwith $"Missing Argument/Environment Variable %s{key}" + + member x.CosmosConnection = get CONNECTION + member x.CosmosDatabase = get DATABASE + member x.CosmosContainer = get CONTAINER + member x.CosmosViews = get VIEWS + +type [] CosmosParameters = + | [] Verbose + | [] Connection of string + | [] ConnectionMode of Microsoft.Azure.Cosmos.ConnectionMode + | [] Database of string + | [] Container of string + | [] Views of string + | [] Timeout of float + | [] Retries of int + | [] RetriesWaitTime of float + interface IArgParserTemplate with + member p.Usage = p |> function + | Verbose -> "request Verbose Logging from Store. Default: off" + | ConnectionMode _ -> "override the connection mode. Default: Direct." + | Connection _ -> $"specify a connection string for a Cosmos account. (optional if environment variable ${CONNECTION} specified)" + | Database _ -> $"specify a database name for store. (optional if environment variable ${DATABASE} specified)" + | Container _ -> $"specify a container name for store. (optional if environment variable ${CONTAINER} specified)" + | Views _ -> $"specify a views Container name for Cosmos views. (optional if environment variable ${VIEWS} specified)" + | Timeout _ -> "specify operation timeout in seconds. Default: 5." + | Retries _ -> "specify operation retries. Default: 1." + | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 5." +and CosmosArguments(c: Configuration, p: ParseResults) = + let connection = p.GetResult(Connection, fun () -> c.CosmosConnection) + let connector = + let timeout = p.GetResult(Timeout, 5.) |> TimeSpan.FromSeconds + let retries = p.GetResult(Retries, 1) + let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 5.) |> TimeSpan.FromSeconds + let mode = p.TryGetResult ConnectionMode + Equinox.CosmosStore.CosmosStoreConnector(Equinox.CosmosStore.Discovery.ConnectionString connection, timeout, retries, maxRetryWaitTime, ?mode = mode) + member val Verbose = p.Contains Verbose + member val Connection = connection + member val Database = p.GetResult(Database, fun () -> c.CosmosDatabase) + member val Container = p.GetResult(Container, fun () -> c.CosmosContainer) + member val private Views = p.GetResult(Views, fun () -> c.CosmosViews) + member x.Connect() = connector.Connect(x.Database, x.Container, x.Views) + +type [] CosmosSourceParameters = + | [] Verbose + | [] ConnectionMode of Microsoft.Azure.Cosmos.ConnectionMode + | [] Connection of string + | [] Database of string + | [] Container of string + | [] Views of string + | [] Timeout of float + | [] Retries of int + | [] RetriesWaitTime of float + + | [] LeaseContainerSuffix of string + | [] LeaseContainer of string + | [] FromTail + | [] MaxItems of int + | [] LagFreqM of float + interface IArgParserTemplate with + member p.Usage = p |> function + | Verbose -> "request Verbose Logging from ChangeFeedProcessor and Store. Default: off" + | ConnectionMode _ -> "override the connection mode. Default: Direct." + | Connection _ -> $"specify a connection string for a Cosmos account. (optional if environment variable {CONNECTION} specified)" + | Database _ -> $"specify a database name for store. (optional if environment variable {DATABASE} specified)" + | Container _ -> $"specify a container name for store. (optional if environment variable {CONTAINER} specified)" + | Views _ -> $"specify a container name for views container. (optional if environment variable {VIEWS} specified)" + | Timeout _ -> "specify operation timeout in seconds. Default: 5." + | Retries _ -> "specify operation retries. Default: 1." + | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 5." + + | LeaseContainerSuffix _ -> "specify Container Name suffix for Leases container. Default: `-aux`." + | LeaseContainer _ -> "specify Container Name (in this [target] Database) for Leases container. Default: `` + `-aux`." + | FromTail -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." + | MaxItems _ -> "maximum item count to request from the feed. Default: unlimited." + | LagFreqM _ -> "specify frequency (minutes) to dump lag stats. Default: 1" +and CosmosSourceArguments(c: Configuration, p: ParseResults) = + let connection = p.GetResult(Connection, fun () -> c.CosmosConnection) + let connector = + let timeout = p.GetResult(Timeout, 5.) |> TimeSpan.FromSeconds + let retries = p.GetResult(Retries, 1) + let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 5.) |> TimeSpan.FromSeconds + let mode = p.TryGetResult ConnectionMode + Equinox.CosmosStore.CosmosStoreConnector(Equinox.CosmosStore.Discovery.ConnectionString connection, timeout, retries, maxRetryWaitTime, ?mode = mode) + let database = p.GetResult(Database, fun () -> c.CosmosDatabase) + let containerId = p.GetResult(Container, fun () -> c.CosmosContainer) + let viewsContainerId = p.GetResult(Views, fun () -> c.CosmosViews) + + let suffix = p.GetResult(LeaseContainerSuffix, "-aux") + let leaseContainerId = p.GetResult(LeaseContainer, containerId + suffix) + + let fromTail = p.Contains FromTail + let maxItems = p.TryGetResult MaxItems + let tailSleepInterval = TimeSpan.FromMilliseconds 500. + let lagFrequency = p.GetResult(LagFreqM, 1.) |> TimeSpan.FromMinutes + + member val IsLagFreqSpecified = p.Contains LagFreqM + member val Verbose = p.Contains Verbose + member val Connection = connection + member val Database = database + member _.ConnectWithFeed(?lsc) = connector.ConnectWithFeed(database, containerId, viewsContainerId, leaseContainerId, ?logSnapshotConfig = lsc) + member _.ConnectWithFeedReadOnly(auxClient, auxDatabase, auxContainerId) = + connector.ConnectWithFeedReadOnly(database, containerId, viewsContainerId, auxClient, auxDatabase, auxContainerId) + member val MonitoringParams = fromTail, maxItems, tailSleepInterval, lagFrequency diff --git a/propulsion-indexer/App/Configuration.fs b/propulsion-indexer/App/Configuration.fs deleted file mode 100644 index 568a54671..000000000 --- a/propulsion-indexer/App/Configuration.fs +++ /dev/null @@ -1,16 +0,0 @@ -module App.Args - -let [] CONNECTION = "EQUINOX_COSMOS_CONNECTION" -let [] DATABASE = "EQUINOX_COSMOS_DATABASE" -let [] CONTAINER = "EQUINOX_COSMOS_CONTAINER" -let [] VIEWS = "EQUINOX_COSMOS_VIEWS" - -type Configuration(tryGet: string -> string option) = - - let get key = match tryGet key with Some value -> value | None -> failwith $"Missing Argument/Environment Variable %s{key}" - - member _.CosmosConnection = get CONNECTION - member _.CosmosDatabase = get DATABASE - member _.CosmosContainer = get CONTAINER - member _.CosmosViews = get VIEWS - diff --git a/propulsion-indexer/App/CosmosDumpSource.fs b/propulsion-indexer/App/CosmosDumpSource.fs new file mode 100644 index 000000000..ab1f36f8e --- /dev/null +++ b/propulsion-indexer/App/CosmosDumpSource.fs @@ -0,0 +1,33 @@ +namespace App + +open FSharp.Control +open Propulsion.Feed +open System + +/// Parses CR separated file with items dumped from a Cosmos Container containing Equinox Items +/// One way to generate one of those is via the cosmic tool at https://github.com/creyke/Cosmic +/// dotnet tool install -g cosmic +/// # then connect/select db per https://github.com/creyke/Cosmic#basic-usage +/// cosmic query 'select * from c order by c._ts' > file.out +type [] CosmosDumpSource private () = + + static member Start(log, statsInterval, filePath, skip, parseFeedDoc, sink, ?truncateTo) = + let isNonCommentLine line = System.Text.RegularExpressions.Regex.IsMatch(line, "^\s*#") |> not + let truncate = match truncateTo with Some count -> Seq.truncate count | None -> id + let lines = Seq.append (System.IO.File.ReadLines filePath |> truncate) (Seq.singleton null) // Add a trailing EOF sentinel so checkpoint positions can be line numbers even when finished reading + let mapLine isEof lineNo (line: string) = // TODO inline in F#7 when seq exprs support try/withs + try let items = if isEof then Array.empty + else System.Text.Json.JsonDocument.Parse line |> parseFeedDoc |> Seq.toArray + struct (TimeSpan.Zero, ({ items = items; isTail = isEof; checkpoint = Position.parse lineNo }: Core.Batch<_>)) + with e -> exn($"File Parse error on L{lineNo}: '{line.Substring(0, 200)}'", e) |> raise + let crawl _ _ _ = TaskSeq.ofSeq <| seq { + for i, line in lines |> Seq.indexed do + let isEof = line = null + if isEof || (i >= skip && isNonCommentLine line) then + let lineNo = int64 i + 1L + mapLine isEof lineNo line } + let source = + let checkpointStore = Equinox.MemoryStore.VolatileStore() + let checkpoints = ReaderCheckpoint.MemoryStore.create log ("consumerGroup", TimeSpan.FromMinutes 1) checkpointStore + Propulsion.Feed.Core.SinglePassFeedSource(log, statsInterval, SourceId.parse filePath, crawl, checkpoints, sink, string) + source.Start(fun _ct -> task { return [| TrancheId.parse "0" |] }) diff --git a/propulsion-indexer/App/Infrastructure.fs b/propulsion-indexer/App/Infrastructure.fs index d27526389..7037b1b56 100644 --- a/propulsion-indexer/App/Infrastructure.fs +++ b/propulsion-indexer/App/Infrastructure.fs @@ -10,8 +10,9 @@ module EnvVar = module Log = + let [] PropertyTag = "isMetric" /// Allow logging to filter out emission of log messages whose information is also surfaced as metrics - let isStoreMetrics e = Filters.Matching.WithProperty("isMetric").Invoke e + let logEventIsMetric e = Serilog.Filters.Matching.WithProperty(PropertyTag).Invoke e /// Equinox and Propulsion provide metrics as properties in log emissions /// These helpers wire those to pass through virtual Log Sinks that expose them as Prometheus metrics. @@ -25,15 +26,22 @@ module Sinks = let equinoxAndPropulsionConsumerMetrics tags group (l: LoggerConfiguration) = l |> equinoxMetricsOnly tags - |> fun l -> l.WriteTo.Sink(Propulsion.Prometheus.LogSink(tags, group)) + |> _.WriteTo.Sink(Propulsion.Prometheus.LogSink(tags, group)) + |> _.WriteTo.Sink(Propulsion.CosmosStore.Prometheus.LogSink(tags)) - let equinoxAndPropulsionCosmosConsumerMetrics tags group (l: LoggerConfiguration) = - l |> equinoxAndPropulsionConsumerMetrics tags group - |> fun l -> l.WriteTo.Sink(Propulsion.CosmosStore.Prometheus.LogSink(tags)) + let private removeMetrics (e: Serilog.Events.LogEvent) = + e.RemovePropertyIfPresent Equinox.CosmosStore.Core.Log.PropertyTag + e.RemovePropertyIfPresent Propulsion.CosmosStore.Log.PropertyTag + e.RemovePropertyIfPresent Propulsion.Feed.Core.Log.PropertyTag + e.RemovePropertyIfPresent Propulsion.Streams.Log.PropertyTag + e.RemovePropertyIfPresent Log.PropertyTag let console (configuration: LoggerConfiguration) = - let t = "[{Timestamp:HH:mm:ss} {Level:u1}] {Message:lj} {Properties:j}{NewLine}{Exception}" - configuration.WriteTo.Console(theme=Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code, outputTemplate=t) + let t = "{Timestamp:HH:mm:ss} {Level:u1} {Message:lj} {Properties:j}{NewLine}{Exception}" + configuration + .WriteTo.Logger(fun l -> + l.Enrich.With({ new Serilog.Core.ILogEventEnricher with member _.Enrich(evt, _) = removeMetrics evt }) + .WriteTo.Console(outputTemplate = t, theme = Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code) |> ignore) [] type Logging() = @@ -43,6 +51,8 @@ type Logging() = configuration .Enrich.FromLogContext() |> fun c -> if verbose = Some true then c.MinimumLevel.Debug() else c + |> fun c -> let generalLevel = if verbose = Some true then Events.LogEventLevel.Information else Events.LogEventLevel.Warning + c.MinimumLevel.Override(typeof.FullName, generalLevel) [] static member private Sinks(configuration: LoggerConfiguration, configureMetricsSinks, configureConsoleSink, ?isMetric) = @@ -50,13 +60,14 @@ type Logging() = a.Logger(configureMetricsSinks >> ignore) |> ignore // unconditionally feed all log events to the metrics sinks a.Logger(fun l -> // but filter what gets emitted to the console sink let l = match isMetric with None -> l | Some predicate -> l.Filter.ByExcluding(Func predicate) + let l = l.Filter.ByExcluding(fun e -> match e.Properties.TryGetValue "SourceContext" with true, (:? Serilog.Events.ScalarValue as v) -> string v.Value = "LeMans.Common.CosmosRepository" | _ -> false) configureConsoleSink l |> ignore) |> ignore - configuration.WriteTo.Async(bufferSize=65536, blockWhenFull=true, configure=System.Action<_> configure) + configuration.WriteTo.Async(bufferSize = 65536, blockWhenFull = true, configure = System.Action<_> configure) [] static member Sinks(configuration: LoggerConfiguration, configureMetricsSinks, verboseStore) = - configuration.Sinks(configureMetricsSinks, Sinks.console, ?isMetric = if verboseStore then None else Some Log.isStoreMetrics) + configuration.Sinks(configureMetricsSinks, Sinks.console, ?isMetric = if verboseStore then None else Some Log.logEventIsMetric) module CosmosStoreConnector = @@ -99,19 +110,27 @@ type Equinox.CosmosStore.CosmosStoreConnector with let client = Equinox.CosmosStore.CosmosStoreClient(cosmosClient) let contexts = client.CreateContext(role, databaseId, containerId, tipMaxEvents = 256, queryMaxItems = 500), - client.CreateContext(role, databaseId, viewsContainerId, tipMaxEvents = 256, queryMaxItems = 500), + // In general, the views container won't write events. We also know we generally won't attach a CFP, so we keep events in tip + client.CreateContext($"{role}(Views)", databaseId, viewsContainerId, tipMaxEvents = 128), // NOTE the tip limits for this connection are set to be effectively infinite in order to ensure that writes never trigger calving from the tip client.CreateContext("snapshotUpdater", databaseId, containerId, tipMaxEvents = 1024, tipMaxJsonLength = 1024 * 1024, skipLog = not (logSnapshotConfig = Some true)) return cosmosClient, contexts } + + /// Connect to the database (including verifying and warming up relevant containers), establish relevant CosmosStoreContexts required by Domain + member x.Connect(databaseId, containerId, viewsContainerId) = async { + let! _client, contexts = x.Connect("Main", databaseId, containerId, viewsContainerId) + return contexts } + + /// Indexer: Connects to a Store as both a CosmosStoreClient and a ChangeFeedProcessor Monitored Container member x.ConnectWithFeed(databaseId, containerId, viewsContainerId, auxContainerId, ?logSnapshotConfig) = async { - let! cosmosClient, contexts = x.Connect("Main", databaseId, containerId, viewsContainerId, auxContainerId, ?logSnapshotConfig = logSnapshotConfig) - let source, leases = CosmosStoreConnector.getSourceAndLeases cosmosClient databaseId containerId auxContainerId + let! client, contexts = x.Connect("Main", databaseId, containerId, viewsContainerId, auxContainerId, ?logSnapshotConfig = logSnapshotConfig) + let source, leases = CosmosStoreConnector.getSourceAndLeases client databaseId containerId auxContainerId return contexts, source, leases } /// Indexer Sync mode: When using a ReadOnly connection string, the leases need to be maintained alongside the target - member x.ConnectWithFeedReadOnly(databaseId, containerId: string, viewsContainerId, auxClient, auxDatabaseId, auxContainerId) = async { - let! client, contexts = x.Connect("Main", databaseId, containerId, viewsContainerId = viewsContainerId) + member x.ConnectWithFeedReadOnly(databaseId, containerId, viewsContainerId, auxClient, auxDatabaseId, auxContainerId) = async { + let! client, contexts = x.Connect("Main", databaseId, containerId, viewsContainerId) let source = CosmosStoreConnector.getSource client databaseId containerId let leases = CosmosStoreConnector.getLeases auxClient auxDatabaseId auxContainerId return contexts, source, leases } @@ -123,7 +142,7 @@ type Equinox.CosmosStore.CosmosStoreConnector with type Factory private () = - static member StartSink(log, stats, maxConcurrentStreams, handle, maxReadAhead) = + static member StartStreamsSink(log, stats, maxConcurrentStreams, handle, maxReadAhead) = Propulsion.Sinks.Factory.StartConcurrent(log, maxReadAhead, maxConcurrentStreams, handle, stats) module OutcomeKind = @@ -131,8 +150,9 @@ module OutcomeKind = let [] (|StoreExceptions|_|) (exn: exn) = match exn with | Equinox.CosmosStore.Exceptions.RateLimited -> Propulsion.Streams.OutcomeKind.RateLimited |> ValueSome - | Equinox.CosmosStore.Exceptions.RequestTimeout -> Propulsion.Streams.OutcomeKind.Timeout |> ValueSome - | :? System.Threading.Tasks.TaskCanceledException -> Propulsion.Streams.OutcomeKind.Timeout |> ValueSome + | Equinox.CosmosStore.Exceptions.CosmosStatus System.Net.HttpStatusCode.RequestEntityTooLarge -> Propulsion.Streams.OutcomeKind.Tagged "cosmosTooLarge" |> ValueSome + | Equinox.CosmosStore.Exceptions.RequestTimeout -> Propulsion.Streams.OutcomeKind.Tagged "cosmosTimeout" |> ValueSome + | :? System.Threading.Tasks.TaskCanceledException -> Propulsion.Streams.OutcomeKind.Tagged "taskCancelled" |> ValueSome | _ -> ValueNone // A typical app will likely have health checks etc, implying the wireup would be via `endpoints.MapMetrics()` and thus not use this ugly code directly diff --git a/propulsion-indexer/Domain/Domain.fsproj b/propulsion-indexer/Domain/Domain.fsproj index f02d501b7..3eeacb638 100644 --- a/propulsion-indexer/Domain/Domain.fsproj +++ b/propulsion-indexer/Domain/Domain.fsproj @@ -7,8 +7,8 @@ - - + + diff --git a/propulsion-indexer/Domain/Store.fs b/propulsion-indexer/Domain/Store.fs index 40369fe84..bbaca8711 100644 --- a/propulsion-indexer/Domain/Store.fs +++ b/propulsion-indexer/Domain/Store.fs @@ -25,28 +25,68 @@ module Codec = /// - The Sync Stored procedure then processes the ensuing request, replacing the current (missing or outdated) `'u`nfolds with the fresh snapshot module Snapshotter = + type Result = + | Valid // No-op case: no update required as the stream already has a correct snapshot + | Invalid // Update skipped due to running in dryRun mode; we avoided running the update + | Updated // Update required: yield a tentative event (which transmuteAllEventsToUnfolds will flip to being an unfold) + let decide generate dryRun (hasSnapshot, state) = + if hasSnapshot then Valid, Array.empty + elif dryRun then Invalid, Array.empty + // Note Updated is a synthetic/tentative event, which transmuteAllEventsToUnfolds will use as a signal to a) update the unfolds b) drop the event + else Updated, generate state type private StateWithSnapshottedFlag<'s> = bool * 's - type Service<'id, 'e, 's> internal (resolve: 'id -> Equinox.Decider<'e, StateWithSnapshottedFlag<'s>>, generate: 's -> 'e) = - - member _.TryUpdate(id): Async = + type Service<'id, 'e, 's> internal (resolve: 'id -> Equinox.Decider<'e, StateWithSnapshottedFlag<'s>>, generate: 's -> 'e[]) = + member _.TryUpdate(id, dryRun): Async = let decider = resolve id - let decide (hasSnapshot, state) = - if hasSnapshot then false, Array.empty // case 1: no update required as the stream already has a correct snapshot - else true, generate state |> Array.singleton // case 2: yield a tentative event (which transmuteAllEventsToUnfolds will flip to being an unfold) - decider.TransactWithPostVersion(decide) - + decider.TransactWithPostVersion(decide generate dryRun) + module Service = + let tryUpdate dryRun (x: Service<_, _, _>) id = x.TryUpdate(id, dryRun) let internal createService streamId generate cat = let resolve = streamId >> createDecider cat Service(resolve, generate) let internal initial'<'s> initial: StateWithSnapshottedFlag<'s> = false, initial - let internal fold' isCurrentSnapshot fold (_wasOrigin, s) xs: StateWithSnapshottedFlag<'s> = + let internal fold' isValidUnfolds fold (_wasOrigin, s) xs: StateWithSnapshottedFlag<'s> = // NOTE ITimelineEvent.IsUnfold and/or a generic isOrigin event would be insufficient for our needs // The tail event encountered by the fold could either be: // - an 'out of date' snapshot (which the normal load process would be able to upconvert from, but is not what we desire) // - another event (if there is no snapshot of any kind) - isCurrentSnapshot (Array.last xs), fold s xs - + isValidUnfolds xs, fold s xs + +module Ingester = + + open FsCodec + type internal Event<'e, 'f> = (struct (ITimelineEvent<'f> * 'e)) + let internal createCodec<'e, 'f, 'c> (target: IEventCodec<'e, 'f, 'c>): IEventCodec, 'f, 'c> = + let encode (c: 'c) ((input, upped): Event<'e, 'f>) : struct (string * 'f * 'f * System.Guid * string * string * System.DateTimeOffset) = + let e = target.Encode(c, upped) + e.EventType, e.Data, input.Meta, input.EventId, input.CorrelationId, input.CausationId, input.Timestamp + let decode (e: ITimelineEvent<'f>): Event<'e, 'f> voption = match target.Decode e with ValueNone -> ValueNone | ValueSome d -> ValueSome (e, d) + Codec.Create, 'f, 'c>(encode, decode) + + type private State = unit + let internal initial: State = () + let internal fold () = ignore + + let private decide (inputCodec: IEventCodec<'e, 'f, unit>) (inputs: ITimelineEvent<'f>[]) (c: Equinox.ISyncContext): Event<'e, 'f>[] = [| + for x in inputs do + if x.Index >= c.Version then // NOTE source and target need to have 1:1 matching event indexes, or things would be much more complex + match inputCodec.Decode x with + | ValueNone -> failwith $"Unknown EventType {x.EventType} at index {x.Index}" + | ValueSome d -> struct (x, d) |] // So we require all source events to exactly one event in the target + + type Service<'id, 'e, 's, 'f> internal (codec: IEventCodec<'e, 'f, unit>, resolve: 'id -> Equinox.Decider, State>) = + member _.Ingest(id, sourceEvents: ITimelineEvent<'f>[]): Async = + let decider = resolve id + decider.TransactEx(decide codec sourceEvents, fun (x: Equinox.ISyncContext) -> x.Version) + let internal createService<'id, 'e, 'f> streamId inputCodec cat = + let resolve = streamId >> createDecider cat + Service<'id, 'e, unit, 'f>(inputCodec, resolve) + module Service = + let ingest (svc: Service<'id, 'e, 's, System.Text.Json.JsonElement>) id (events: ITimelineEvent>[]) = + let events = events |> Array.map (FsCodec.Core.TimelineEvent.Map (System.Func<_, _> FsCodec.SystemTextJson.Interop.InteropHelpers.Utf8ToJsonElement)) + svc.Ingest(id, events) + let private defaultCacheDuration = System.TimeSpan.FromMinutes 20 let private cacheStrategy cache = Equinox.CachingStrategy.SlidingWindow (cache, defaultCacheDuration) @@ -63,16 +103,16 @@ module Cosmos = open Equinox.CosmosStore - let private createCached name codec initial fold accessStrategy (context, cache) = - CosmosStoreCategory(context, name, codec, fold, initial, accessStrategy, cacheStrategy cache) + let private createCached name codec initial fold accessStrategy shouldCompress (context, cache) = + CosmosStoreCategory(context, name, codec, fold, initial, accessStrategy, cacheStrategy cache, ?shouldCompress = shouldCompress) let createSnapshotted name codec initial fold (isOrigin, toSnapshot) (context, cache) = let accessStrategy = AccessStrategy.Snapshot (isOrigin, toSnapshot) - createCached name codec initial fold accessStrategy (context.main, cache) + createCached name codec initial fold accessStrategy None (context.main, cache) let createRollingState name codec initial fold toSnapshot (context, cache) = let accessStrategy = AccessStrategy.RollingState toSnapshot - createCached name codec initial fold accessStrategy (context.views, cache) + createCached name codec initial fold accessStrategy None (context.views, cache) let createConfig (main, views, snapshotUpdate) cache = Config.Cosmos ({ main = main; views = views; snapshotUpdate = snapshotUpdate }, cache) @@ -82,10 +122,48 @@ module Cosmos = let private accessStrategy isOrigin = let transmuteAllEventsToUnfolds events _state = [||], events AccessStrategy.Custom (isOrigin, transmuteAllEventsToUnfolds) - let private createCategory name codec initial fold isCurrent (contexts, cache) = - createCached name codec (Snapshotter.initial' initial) (Snapshotter.fold' isCurrent fold) (accessStrategy isCurrent) (contexts.snapshotUpdate, cache) - - let create codec initial fold (isCurrentSnapshot, generate) streamId categoryName config = + let private createCategory name codec initial fold isValidUnfolds isOrigin (contexts, cache) = + createCached name codec (Snapshotter.initial' initial) (Snapshotter.fold' isValidUnfolds fold) (accessStrategy isOrigin) (Some isOrigin) (contexts.snapshotUpdate, cache) + /// Equinox allows any number of unfold events to be stored: + /// - the `isOrigin` predicate identifies the "main snapshot" - if it returns `true`, we don't need to load and fold based on events + /// - `isValidUnfolds` inspects the full set of unfolds in order to determine whether they are complete + /// - where Index Unfolds are required for application functionality, we can trigger regeneration where they are missing + let withIndexing codec initial fold (isOrigin, isValidUnfolds, generateUnfolds) streamId categoryName config = let cat = config |> function - | Config.Cosmos (context, cache) -> createCategory categoryName codec initial fold isCurrentSnapshot (context, cache) - Snapshotter.createService streamId generate cat + | Config.Cosmos (context, cache) -> createCategory categoryName codec initial fold isValidUnfolds isOrigin (context, cache) + Snapshotter.createService streamId generateUnfolds cat + /// For the common case where we don't use any indexing - we only have a single relevant unfold to detect, and a function to generate it + let single codec initial fold (isCurrentSnapshot, generate) streamId categoryName config = + withIndexing codec initial fold (isCurrentSnapshot, Array.tryExactlyOne >> Option.exists isCurrentSnapshot, generate >> Array.singleton) streamId categoryName config + + module Ingester = + + let private slice eventSize struct (maxEvents, maxBytes) span = + let mutable countBudget, bytesBudget = maxEvents, maxBytes + let withinLimits y = + countBudget <- countBudget - 1 + bytesBudget <- bytesBudget - eventSize y + // always send at least one event in order to surface the problem and have the stream marked malformed + countBudget = maxEvents - 1 || (countBudget >= 0 && bytesBudget >= 0) + span |> Array.takeWhile withinLimits + // We gauge the likely output size from the input size + // (to be 100% correct, we should encode it as the Sync in Equinox would do for the real converted data) + // (or, to completely cover/gold plate it, we could have an opt-in on the Category to do slicing internally) + let eventSize ((x, _e): Ingester.Event<_, _>) = x.Size + let private accessStrategy = + let isOriginIgnoreEvents _ = true // we only need to know the Version to manage the ingestion process + let transmuteTrimsToStoredProcInputLimitAndDoesNotGenerateUnfolds events () = + let maxEvents, maxBytes = 16384, 256 * 1024 + let trimmed = slice eventSize (maxEvents, maxBytes) events + trimmed, Array.empty + AccessStrategy.Custom (isOriginIgnoreEvents, transmuteTrimsToStoredProcInputLimitAndDoesNotGenerateUnfolds) + let private createCategory name codec (context, cache) = + createCached name codec Ingester.initial Ingester.fold accessStrategy None (context, cache) + + type TargetCodec<'e> = FsCodec.IEventCodec<'e, Core.EventBody, unit> + open FsCodec.SystemTextJson.Interop + let create<'id, 'e> struct (inputStreamCodec: FsCodec.IEventCodec<'e, System.ReadOnlyMemory, unit>, targetCodec: TargetCodec<'e>) streamId categoryName struct (context, cache) = + let rewriteEventBodiesCodec = Ingester.createCodec<'e, System.Text.Json.JsonElement, unit> targetCodec + let cat = createCategory categoryName rewriteEventBodiesCodec (context, cache) + let inputStreamToJsonElement = inputStreamCodec.ToJsonElementCodec() + Ingester.createService<'id, 'e, System.Text.Json.JsonElement> streamId inputStreamToJsonElement cat diff --git a/propulsion-indexer/Domain/Todo.fs b/propulsion-indexer/Domain/Todo.fs index a5980db95..a0d87fddc 100644 --- a/propulsion-indexer/Domain/Todo.fs +++ b/propulsion-indexer/Domain/Todo.fs @@ -1,10 +1,9 @@ module IndexerTemplate.Domain.Todo -module private Stream = - let [] Category = "Todos" - let id = FsCodec.StreamId.gen ClientId.toString - let decodeId = FsCodec.StreamId.dec ClientId.parse - let tryDecode = FsCodec.StreamName.tryFind Category >> ValueOption.map decodeId +let [] CategoryName = "Todos" +let private streamId = FsCodec.StreamId.gen ClientId.toString +let private decodeId = FsCodec.StreamId.dec ClientId.parse +let private tryDecode = FsCodec.StreamName.tryFind CategoryName >> ValueOption.map decodeId // NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care module Events = @@ -24,13 +23,13 @@ module Events = module Reactions = - let categories = [| Stream.Category |] + let categories = [| CategoryName |] /// Allows us to skip producing summaries for events that we know won't result in an externally discernable change to the summary output let private impliesStateChange = function Events.Snapshotted _ -> false | _ -> true let dec = Streams.Codec.gen - let [] (|For|_|) = Stream.tryDecode + let [] (|For|_|) = tryDecode let [] (|ImpliesStateChange|_|) = function | struct (For clientId, _) & Streams.Decode dec events when Array.exists impliesStateChange events -> ValueSome clientId | _ -> ValueNone @@ -73,7 +72,8 @@ type Service internal (resolve: ClientId -> Equinox.Decider Store.Cosmos.createSnapshotted Stream.Category Events.codec Fold.initial Fold.fold Fold.Snapshot.config (context, cache) - let create (Category cat) = Service(Stream.id >> Store.createDecider cat) + | Store.Config.Cosmos (context, cache) -> Store.Cosmos.createSnapshotted CategoryName Events.codec Fold.initial Fold.fold Fold.Snapshot.config (context, cache) + let create (Category cat) = Service(streamId >> Store.createDecider cat) diff --git a/propulsion-indexer/Indexer/Indexer.fs b/propulsion-indexer/Indexer/Indexer.fs index d82e97819..6b2724213 100644 --- a/propulsion-indexer/Indexer/Indexer.fs +++ b/propulsion-indexer/Indexer/Indexer.fs @@ -3,8 +3,8 @@ module IndexerTemplate.Indexer.Indexer type Outcome = Metrics.Outcome /// Gathers stats based on the Outcome of each Span as it's processed, for periodic emission via DumpStats() -type Stats(log, statsInterval, stateInterval, verboseStore) = - inherit Propulsion.Streams.Stats(log, statsInterval, stateInterval) +type Stats(log, statsInterval, stateInterval, verboseStore, abendThreshold) = + inherit Propulsion.Streams.Stats(log, statsInterval, stateInterval, abendThreshold = abendThreshold) let mutable ok, skipped, na = 0, 0, 0 override _.HandleOk res = @@ -48,7 +48,7 @@ let handle (sourceService: Todo.Service) (summaryService: TodoIndex.Service) str module Factory = - let createHandler store = + let create store = let srcService = Todo.Factory.create store let dstService = TodoIndex.Factory.create store - handle srcService dstService + Some sourceCategories, handle srcService dstService diff --git a/propulsion-indexer/Indexer/Indexer.fsproj b/propulsion-indexer/Indexer/Indexer.fsproj index e25a0afb1..ae12f24b2 100644 --- a/propulsion-indexer/Indexer/Indexer.fsproj +++ b/propulsion-indexer/Indexer/Indexer.fsproj @@ -10,15 +10,13 @@ + + - - - - diff --git a/propulsion-indexer/Indexer/Ingester.fs b/propulsion-indexer/Indexer/Ingester.fs new file mode 100644 index 000000000..008731578 --- /dev/null +++ b/propulsion-indexer/Indexer/Ingester.fs @@ -0,0 +1,27 @@ +module IndexerTemplate.Indexer.Ingester + +open IndexerTemplate.Domain +open Visitor + +type Stats(log, statsInterval, stateInterval, verboseStore, abendThreshold) = + inherit StatsBase(log, statsInterval, stateInterval, verboseStore, abendThreshold = abendThreshold) + override _.HandleOk(()) = () + +let handle todo + stream (events: Propulsion.Sinks.Event[]): Async<_ * unit> = async { + let handle = + match stream with + | Todo.Reactions.For id -> todo id + | sn -> failwith $"Unexpected category %A{sn}" + let! pos' = handle events + return Propulsion.Sinks.StreamResult.OverrideNextIndex pos', () } + +module Factory = + + let createHandler store = + + let todo = Todo.Factory.createIngester store + + let h svc = Store.Ingester.Service.ingest svc + handle + (h todo) diff --git a/propulsion-indexer/Indexer/Program.fs b/propulsion-indexer/Indexer/Program.fs index f5b2b59a2..32dc828d4 100644 --- a/propulsion-indexer/Indexer/Program.fs +++ b/propulsion-indexer/Indexer/Program.fs @@ -14,43 +14,153 @@ module Args = | [] ProcessorName of string | [] MaxReadAhead of int | [] MaxWriters of int - | [] Index of ParseResults - | [] Snapshot of ParseResults - | [] Sync of ParseResults + | [] AbendTimeoutM of float + + | [] DryRun + | [] Follow + | [] IncIdx + | [] IncCat of regex: string + | [] ExcCat of regex: string + | [] IncStream of regex: string + | [] ExcStream of regex: string + | [] IncEvent of regex: string + | [] ExcEvent of regex: string + + | [] Stats of ParseResults + | [] StatsFile of ParseResults + | [] Index of ParseResults + | [] Snapshot of ParseResults + | [] Sync of ParseResults + | [] Export of ParseResults interface IArgParserTemplate with member p.Usage = p |> function | Verbose -> "request Verbose Logging. Default: off." | PrometheusPort _ -> "port from which to expose a Prometheus /metrics endpoint. Default: off." | ProcessorName _ -> "Projector consumer group name." - | MaxReadAhead _ -> "maximum number of batches to let processing get ahead of completion. Default: 2." - | MaxWriters _ -> "maximum number of concurrent streams on which to process at any time. Default: 8." - | Index _ -> "Process indexing into the Views Container for the specified Cosmos feed" + | MaxReadAhead _ -> "maximum number of batches to let processing get ahead of completion. Default: File: 32768 Cosmos: 2." + | MaxWriters _ -> "maximum number of concurrent streams on which to process at any time. Default: 8 (Sync, Index: 16)." + | AbendTimeoutM _ -> "maximum number of minutes to wait before existing where processing enters a (non-transient) perpetual exception state. Default: 2" + + | DryRun -> "For Snapshot subcommand, skip actually updating" + | Follow -> "Continue waiting for more input when complete (like unix `tail -f`). Default: the Snapshot and Stats operations exit when the Tail of the feed has been reached" + + | IncIdx -> "Include Index streams. Default: Exclude Index Streams, identified by a $ prefix." + | IncCat _ -> "Allow Stream Category. Multiple values are combined with OR. Default: include all, subject to Category Deny and Stream Deny rules." + | ExcCat _ -> "Deny Stream Category. Specified values/regexes are applied after the Category Allow rule(s)." + | IncStream _ -> "Allow Stream Name. Multiple values are combined with OR. Default: Allow all streams that pass the category Allow test, Fail the Category and Stream deny tests." + | ExcStream _ -> "Deny Stream Name. Specified values/regexes are applied after the IncCat, ExcCat and IncStream filters." + + | IncEvent _ -> "Allow Event Type Name. Multiple values are combined with OR. Applied only after Category and Stream filters. Default: include all." + | ExcEvent _ -> "Deny Event Type Name. Specified values/regexes are applied after the Event Type Name Allow rule(s)." + + | Stats _ -> "Gather stats from the input data only; No indexing or writes performed." + | StatsFile _ -> "Same as stats, but replacing normal input with a File source" + | Index _ -> "Process indexing into the Views Container for the specified feed" | Snapshot _ -> "Process updating of snapshots for all traversed streams in the specified Cosmos feed" - | Sync _ -> "Sync into a specified Store for the specified Cosmos feed" + | Sync _ -> "Sync into a specified Store from the specified Cosmos feed" + | Export _ -> "Sync into a specified Store from the application's store, rewriting the events" + and StreamFilterArguments(p: ParseResults) = + let allowCats, denyCats = p.GetResults IncCat, p.GetResults ExcCat + let allowSns, denySns = p.GetResults IncStream, p.GetResults ExcStream + let incIndexes = p.Contains IncIdx + let allowEts, denyEts = p.GetResults IncEvent, p.GetResults ExcEvent + let isPlain = Seq.forall (fun x -> Char.IsLetterOrDigit x || x = '_') + let asRe = Seq.map (fun x -> if isPlain x then $"^{x}$" else x) + let (|Filter|) exprs = + let values, pats = List.partition isPlain exprs + let valuesContains = let set = System.Collections.Generic.HashSet(values) in set.Contains + let aPatternMatches x = pats |> List.exists (fun p -> System.Text.RegularExpressions.Regex.IsMatch(x, p)) + fun cat -> valuesContains cat || aPatternMatches cat + let filter map (allow, deny) = + match allow, deny with + | [], [] -> fun _ -> true + | Filter includes, Filter excludes -> fun x -> let x = map x in (List.isEmpty allow || includes x) && not (excludes x) + let validStream = filter FsCodec.StreamName.toString (allowSns, denySns) + let isTransactionalStream (sn: FsCodec.StreamName) = let sn = FsCodec.StreamName.toString sn in not (sn.StartsWith('$')) + member _.CreateStreamFilter(maybeCategories) = + let handlerCats = match maybeCategories with Some xs -> List.ofArray xs | None -> List.empty + let allowCats = handlerCats @ allowCats + let validCat = filter FsCodec.StreamName.Category.ofStreamName (allowCats, denyCats) + let allowCats = match allowCats with [] -> [ ".*" ] | xs -> xs + let denyCats = denyCats @ [ if not incIndexes then "^\$" ] + let allowSns, denySns = match allowSns, denySns with [], [] -> [".*"], [] | x -> x + let allowEts, denyEts = match allowEts, denyEts with [], [] -> [".*"], [] | x -> x + Log.Information("Categories ☑️ {@allowCats} 🚫{@denyCats} Streams ☑️ {@allowStreams} 🚫{denyStreams} Events ☑️ {allowEts} 🚫{@denyEts}", + asRe allowCats, asRe denyCats, asRe allowSns, asRe denySns, asRe allowEts, asRe denyEts) + fun sn -> + validCat sn + && validStream sn + && (incIndexes || isTransactionalStream sn) + member val EventFilter = filter (fun (x: Propulsion.Sinks.Event) -> x.EventType) (allowEts, denyEts) + and [] Action = + | SummarizeFile of FileArguments + | Summarize of Args.CosmosSourceArguments + | Index of IndexArguments + | Snapshot of Args.CosmosSourceArguments + | Sync of SyncArguments + | Export of SyncArguments and Arguments(c: Args.Configuration, p: ParseResults) = - let maxReadAhead = p.GetResult(MaxReadAhead, 2) - let maxConcurrentProcessors = p.GetResult(MaxWriters, 8) - member val Verbose = p.Contains Parameters.Verbose + let action = match p.GetSubCommand() with + | Parameters.Stats p -> Summarize <| Args.CosmosSourceArguments(c, p) + | Parameters.StatsFile p -> SummarizeFile <| FileArguments(c, p) + | Parameters.Index p -> Index <| IndexArguments(c, p) + | Parameters.Snapshot p -> Snapshot <| Args.CosmosSourceArguments(c, p) + | Parameters.Sync p -> Sync <| SyncArguments(c, p) + | Parameters.Export p -> Export <| SyncArguments(c, p) + | _ -> p.Raise "Must specify a subcommand" + let source = match action with + | Summarize c | Snapshot c -> Choice1Of2 c + | Index a -> match a.Source with + | Choice1Of2 c -> Choice1Of2 c + | Choice2Of2 f -> Choice2Of2 f + | Sync s | Export s -> match s.Source with + | Choice1Of2 c -> Choice1Of2 c + | Choice2Of2 f -> Choice2Of2 f + | SummarizeFile f -> Choice2Of2 f + let dryRun = match action, p.Contains DryRun with + | Snapshot _, value -> value + | _, true -> p.Raise "dryRun is not applicable to any subcommand other than Snapshot" + | _, false -> false + let actionLabel = match action with + | Snapshot _ when dryRun -> "DryRun Snapshot inspect" + | Snapshot _ -> "Snapshot updat" + | Summarize _ | SummarizeFile _ -> "Summariz" + | Index _ -> "Index" + | Sync _ -> "Synchroniz" + | Export _ -> "Export" + let isFileSource = match source with Choice1Of2 _ -> false | Choice2Of2 _ -> true + let maxReadAhead = p.GetResult(MaxReadAhead, if isFileSource then 32768 else 2) + member val Action = action + member val DryRun = dryRun + member val Source = source + member val Verbose = p.Contains Verbose member val PrometheusPort = p.TryGetResult PrometheusPort member val ProcessorName = p.GetResult ProcessorName member val StatsInterval = TimeSpan.FromMinutes 1. member val StateInterval = TimeSpan.FromMinutes 5. - member x.Cosmos = match x.Action with Action.Index c | Action.Snapshot c -> c | Action.Sync s -> s.Source - member x.ConnectWithFeed(?lsc) = match x.Action with - | Action.Index c | Action.Snapshot c -> c.ConnectWithFeed(?lsc = lsc) - | Action.Sync s -> s.ConnectWithFeed() - member val Action = match p.GetSubCommand() with - | Parameters.Index p -> CosmosArguments(c, p) |> Index - | Parameters.Snapshot p -> CosmosArguments(c, p) |> Snapshot - | Parameters.Sync p -> SyncArguments(c, p) |> Sync - | _ -> p.Raise "Must specify a subcommand" - member x.ActionLabel = match x.Action with Action.Index _ -> "Indexing" | Action.Snapshot _ -> "Snapshotting" | Action.Sync _ -> "Exporting" - member x.IsSnapshotting = match x.Action with Action.Snapshot _ -> true | _ -> false - member x.ProcessorParams() = Log.Information("{action}... {processorName}, reading {maxReadAhead} ahead, {dop} writers", - x.ActionLabel, x.ProcessorName, maxReadAhead, maxConcurrentProcessors) - (x.ProcessorName, maxReadAhead, maxConcurrentProcessors) - and [] Action = Index of CosmosArguments | Snapshot of CosmosArguments | Sync of SyncArguments - and [] SyncParameters = + member val AbendTimeout = p.GetResult(AbendTimeoutM, 2.) |> TimeSpan.FromMinutes + member val Filters = StreamFilterArguments(p) + member val MaxConcurrentProcessors =p.GetResult(MaxWriters, match action with Sync _ | Index _ -> 16 | _ -> 8) + member val CosmosVerbose = match source with Choice1Of2 c -> c.Verbose | Choice2Of2 f -> f.CosmosVerbose + member x.WaitForTail = if isFileSource || p.Contains Follow then None + else Some (x.StatsInterval * 2.) + member x.LagEstimationInterval = x.WaitForTail |> Option.map (fun _ -> TimeSpan.seconds 5) + member x.ProcessorParams() = Log.Information("{action}ing... {processorName}, reading {maxReadAhead} ahead, {dop} writers", + actionLabel, x.ProcessorName, maxReadAhead, x.MaxConcurrentProcessors) + (x.ProcessorName, maxReadAhead, x.MaxConcurrentProcessors) + member _.Connect appName = async { let store contexts = (contexts, Equinox.Cache(appName, sizeMb = 10)) ||> Store.Cosmos.createConfig + match source, action with + | Choice2Of2 f, (SummarizeFile _ | Sync _ | Export _) -> + return Choice1Of3 (f.Filepath, (f.Skip, f.Trunc)) + | Choice2Of2 f, Index _ -> + let! contexts = f.Connect() + return Choice2Of3 (f.Filepath, (f.Skip, f.Trunc), store contexts) + | Choice2Of2 _, (Summarize _ | Snapshot _ as x) -> return x |> failwithf "unexpected %A" + | Choice1Of2 c, action -> + let lsc = match action with Snapshot _ -> true | _ -> false + let! contexts, monitored, leases = c.ConnectWithFeed(lsc = lsc) + return Choice3Of3 (monitored, leases, c.MonitoringParams, store contexts) } + and [] SyncParameters = | [] Connection of string | [] Database of string | [] Container of string @@ -59,11 +169,12 @@ module Args = | [] Retries of int | [] RetriesWaitTime of float | [] MaxKiB of int - | [] Source of ParseResults + | [] Source of ParseResults + | [] SourceFile of ParseResults interface IArgParserTemplate with member p.Usage = p |> function - | Connection _ -> "specify a connection string for the destination Cosmos account. Default: Same as Source" - | Database _ -> "specify a database name for store. Default: Same as Source" + | Connection _ -> "specify a connection string for the destination Cosmos account. Default (if Cosmos): Same as Source" + | Database _ -> "specify a database name for store. Default (if Cosmos): Same as Source" | Container _ -> "specify a container name for store." | LeaseContainerId _ -> "store leases in Sync target DB (default: use `-aux` adjacent to the Source Container). Enables the Source to be read via a ReadOnly connection string." | Timeout _ -> "specify operation timeout in seconds. Default: 5." @@ -71,133 +182,177 @@ module Args = | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 5." | MaxKiB _ -> "specify maximum size in KiB to pass to the Sync stored proc (reduce if Malformed Streams due to 413 RequestTooLarge responses). Default: 128." | Source _ -> "Source store from which events are to be consumed via the feed" + | SourceFile _ -> "Source File from which events are to be consumed" and SyncArguments(c: Args.Configuration, p: ParseResults) = - let source = CosmosArguments(c, p.GetResult Source) - let discovery = p.TryGetResult SyncParameters.Connection - |> Option.map Equinox.CosmosStore.Discovery.ConnectionString - |> Option.defaultWith (fun () -> source.Discovery) - let timeout = p.GetResult(SyncParameters.Timeout, 5) |> TimeSpan.FromSeconds - let retries = p.GetResult(SyncParameters.Retries, 1) - let maxRetryWaitTime = p.GetResult(SyncParameters.RetriesWaitTime, 5) |> TimeSpan.FromSeconds - let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime) - let database = p.GetResult(SyncParameters.Database, fun () -> source.Database) - let container = p.GetResult SyncParameters.Container - member val MaxBytes = p.GetResult(MaxKiB, 128) * 1024 + let source = match p.GetSubCommand() with + | Source p -> Choice1Of2 (Args.CosmosSourceArguments(c, p)) + | SourceFile f -> Choice2Of2 (FileArguments(c, f)) + | x -> p.Raise $"Unexpected Subcommand %A{x}" + let connection = match source with + | Choice1Of2 c -> p.GetResult(Connection, fun () -> c.Connection) + | Choice2Of2 _ -> p.GetResult Connection + let connector = + let timeout = p.GetResult(Timeout, 5.) |> TimeSpan.FromSeconds + let retries = p.GetResult(Retries, 1) + let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 5.) |> TimeSpan.FromSeconds + Equinox.CosmosStore.CosmosStoreConnector(Equinox.CosmosStore.Discovery.ConnectionString connection, timeout, retries, maxRetryWaitTime) + let database = match source with + | Choice1Of2 c -> p.GetResult(Database, fun () -> c.Database) + | Choice2Of2 _ -> p.GetResult Database + let container = p.GetResult Container member val Source = source - member _.ConnectWithFeed() = match p.TryGetResult LeaseContainerId with + member val MaxBytes = p.GetResult(MaxKiB, 128) * 1024 + member x.Connect() = connector.ConnectExternal("Destination", database, container) + member x.ConnectEvents() = async { let! context = x.Connect() + return Equinox.CosmosStore.Core.EventsContext(context, Store.Metrics.log) } + member x.ConnectWithFeed() = let source = match source with Choice1Of2 c -> c | Choice2Of2 _file -> p.Raise "unexpected" + match p.TryGetResult LeaseContainerId with | Some localAuxContainerId -> source.ConnectWithFeedReadOnly(connector.CreateUninitialized(), database, localAuxContainerId) | None -> source.ConnectWithFeed() - member _.Connect() = async { let! context = connector.ConnectExternal("Destination", database, container) - return Equinox.CosmosStore.Core.EventsContext(context, Store.Metrics.log) } - and [] CosmosParameters = - | [] ConnectionMode of Microsoft.Azure.Cosmos.ConnectionMode - | [] Connection of string - | [] Database of string - | [] Container of string - | [] Views of string - | [] Timeout of float - | [] Retries of int - | [] RetriesWaitTime of float - - | [] Verbose - | [] LeaseContainer of string - | [] FromTail - | [] MaxItems of int - | [] LagFreqM of float + and [] IndexParameters = + | [] Cosmos of ParseResults + | [] File of ParseResults interface IArgParserTemplate with member p.Usage = p |> function - | ConnectionMode _ -> "override the connection mode. Default: Direct." - | Connection _ -> $"specify a connection string for a Cosmos account. (optional if environment variable {Args.CONNECTION} specified)" - | Database _ -> $"specify a database name for store. (optional if environment variable {Args.DATABASE} specified)" - | Container _ -> $"specify a container name for store. (optional if environment variable {Args.CONTAINER} specified)" - | Views _ -> $"specify a container name for views. (optional if environment variable {Args.VIEWS} specified)" - | Timeout _ -> "specify operation timeout in seconds. Default: 5." - | Retries _ -> "specify operation retries. Default: 1." - | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 5." - - | Verbose -> "request Verbose Logging from ChangeFeedProcessor and Store. Default: off" - | LeaseContainer _ -> "specify Container Name (in this [target] Database) for Leases container. Default: `` + `-aux`." - | FromTail -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." - | MaxItems _ -> "maximum item count to request from the feed. Default: unlimited." - | LagFreqM _ -> "specify frequency (minutes) to dump lag stats. Default: 1" - and CosmosArguments(c: Args.Configuration, p: ParseResults) = - let discovery = p.GetResult(CosmosParameters.Connection, fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString - let mode = p.TryGetResult ConnectionMode - let timeout = p.GetResult(Timeout, 5) |> TimeSpan.FromSeconds - let retries = p.GetResult(Retries, 1) - let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 5) |> TimeSpan.FromSeconds - let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime, ?mode = mode) - let database = p.GetResult(Database, fun () -> c.CosmosDatabase) - let containerId = p.GetResult(Container, fun () -> c.CosmosContainer) - let viewsContainerId = p.GetResult(Views, fun () -> c.CosmosViews) - - let leaseContainerId = p.GetResult(LeaseContainer, containerId + "-aux") - let fromTail = p.Contains FromTail - let maxItems = p.TryGetResult MaxItems - let lagFrequency = p.GetResult(LagFreqM, 1.) |> TimeSpan.FromMinutes - member val Verbose = p.Contains Verbose - member val MonitoringParams = fromTail, maxItems, lagFrequency - member _.Discovery = discovery - member _.Database = database - member _.ConnectWithFeed(?lsc) = connector.ConnectWithFeed(database, containerId, viewsContainerId, leaseContainerId, ?logSnapshotConfig = lsc) - member _.ConnectWithFeedReadOnly(auxClient, auxDatabase, auxContainerId) = - connector.ConnectWithFeedReadOnly(database, containerId, viewsContainerId, auxClient, auxDatabase, auxContainerId) + | Cosmos _ -> "CosmosDb source parameters" + | File _ -> "Replacing normal input with a File source" + and IndexArguments(c: Args.Configuration, p: ParseResults) = + member val Source = match p.GetSubCommand() with + | IndexParameters.Cosmos p -> Choice1Of2 (Args.CosmosSourceArguments(c, p)) + | IndexParameters.File f -> Choice2Of2 (FileArguments(c, f)) + // | _ -> p.Raise $"Unexpected Subcommand %A{x}" + and [] FileParameters = + | [] Path of filename: string + | [] Skip of lines: int + | [] Truncate of lines: int + | [] LineNo of int + | [] Cosmos of ParseResults + interface IArgParserTemplate with + member p.Usage = p |> function + | Path _ -> "specify file path" + | Skip _ -> "specify number of lines to skip" + | Truncate _ -> "specify line number to pretend is End of File" + | LineNo _ -> "specify line number to start (1-based)" + | Cosmos _ -> "CosmosDb parameters (required for Index, not applicable for StatsFile or SourceFile)" + and FileArguments(c: Args.Configuration, p: ParseResults) = + let cosmos = match p.TryGetSubCommand() with Some (Cosmos p) -> Args.CosmosArguments(c, p) |> Some | _ -> None + let cosmosMissing () = p.Raise "cosmos details must be specified" + member val CosmosVerbose = match cosmos with Some c -> c.Verbose | None -> false + member val Filepath = p.GetResult Path + member val Skip = p.TryPostProcessResult(LineNo, fun l -> l - 1) |> Option.defaultWith (fun () -> p.GetResult(Skip, 0)) + member val Trunc = p.TryGetResult Truncate + member _.Connect() = match cosmos with Some c -> c.Connect() | None -> async { return cosmosMissing () } /// Parse the commandline; can throw exceptions in response to missing arguments and/or `-h`/`--help` args let parse tryGetConfigValue argv: Arguments = let programName = System.Reflection.Assembly.GetEntryAssembly().GetName().Name - let parser = ArgumentParser.Create(programName=programName) + let parser = ArgumentParser.Create(programName = programName) Arguments(Args.Configuration tryGetConfigValue, parser.ParseCommandLine argv) let [] AppName = "IndexerTemplate" let build (args: Args.Arguments) = async { let processorName, maxReadAhead, maxConcurrentStreams = args.ProcessorParams() - let! contexts, monitored, leases = args.ConnectWithFeed(args.IsSnapshotting) - let store = (contexts, Equinox.Cache(AppName, sizeMb = 10)) ||> Store.Cosmos.createConfig - let parseFeedDoc, sink = - let mkParseAll () = Propulsion.CosmosStore.EquinoxSystemTextJsonParser.whereCategory (fun _ -> true) - let mkSink stats handle = Factory.StartSink(Log.Logger, stats, maxConcurrentStreams, handle, maxReadAhead) - match args.Action with - | Args.Action.Index _ -> - let mkParseCats = Propulsion.CosmosStore.EquinoxSystemTextJsonParser.ofCategories - let stats = Indexer.Stats(Log.Logger, args.StatsInterval, args.StateInterval, args.Cosmos.Verbose) - let handle = Indexer.Factory.createHandler store - mkParseCats Indexer.sourceCategories, mkSink stats handle - | Args.Action.Snapshot _ -> - let stats = Snapshotter.Stats(Log.Logger, args.StatsInterval, args.StateInterval, args.Cosmos.Verbose) - let handle = Snapshotter.Factory.createHandler store - mkParseAll (), mkSink stats handle - | Args.Action.Sync a -> - mkParseAll (), - let eventsContext = a.Connect() |> Async.RunSynchronously - let stats = Propulsion.CosmosStore.CosmosStoreSinkStats(Log.Logger, args.StatsInterval, args.StateInterval) - Propulsion.CosmosStore.CosmosStoreSink.Start( - Log.Logger, maxReadAhead, eventsContext, maxConcurrentStreams, stats, - purgeInterval = TimeSpan.FromHours 1, maxBytes = a.MaxBytes) - let source = - let startFromTail, maxItems, lagFrequency = args.Cosmos.MonitoringParams - Propulsion.CosmosStore.CosmosStoreSource(Log.Logger, args.StatsInterval, monitored, leases, processorName, parseFeedDoc, sink, - startFromTail = startFromTail, ?maxItems = maxItems, lagEstimationInterval = lagFrequency).Start() - return sink, source } + let parse = args.Filters.CreateStreamFilter >> Propulsion.CosmosStore.EquinoxSystemTextJsonParser.whereStream + let configureWithStreamsSink_ stats cats handle = + cats |> parse, Factory.StartStreamsSink(Log.Logger, stats, maxConcurrentStreams, handle, maxReadAhead) + let configureWithStreamsSink stats handle = configureWithStreamsSink_ stats None handle + let summarize () = + let stats = Visitor.Stats(Log.Logger, args.StatsInterval, args.StateInterval, args.CosmosVerbose, args.AbendTimeout) + let handle = Visitor.Factory.createHandler args.Filters.EventFilter + configureWithStreamsSink stats handle + let index store = + let stats = Indexer.Stats(Log.Logger, args.StatsInterval, args.StateInterval, args.CosmosVerbose, args.AbendTimeout) + let cats, handle = Indexer.Factory.create store // args.Filters.EventFilter + configureWithStreamsSink_ stats cats handle + let snapshot store = + let stats = Snapshotter.Stats(Log.Logger, args.StatsInterval, args.StateInterval, args.CosmosVerbose, args.AbendTimeout) + let handle = Snapshotter.Factory.createHandler args.DryRun store + configureWithStreamsSink stats handle + let sync (a: Args.SyncArguments) = + let eventsContext = a.ConnectEvents() |> Async.RunSynchronously + parse None, + let stats = Propulsion.CosmosStore.CosmosStoreSinkStats(Log.Logger, args.StatsInterval, args.StateInterval) + Propulsion.CosmosStore.CosmosStoreSink.Start(Log.Logger, maxReadAhead, eventsContext, maxConcurrentStreams, stats, + purgeInterval = TimeSpan.FromHours 1, maxBytes = a.MaxBytes) + let export (a: Args.SyncArguments) = + let context = a.Connect() |> Async.RunSynchronously + let cache = Equinox.Cache (AppName, sizeMb = 10) + let stats = Ingester.Stats(Log.Logger, args.StatsInterval, args.StateInterval, args.CosmosVerbose, args.AbendTimeout) + let handle = Ingester.Factory.createHandler (context, cache) + configureWithStreamsSink stats handle + let mkFileSource filePath (skip, truncate) parseFeedDoc sink = + sink, CosmosDumpSource.Start(Log.Logger, args.StatsInterval, filePath, skip, parseFeedDoc, sink, ?truncateTo = truncate) + match! args.Connect AppName with + | Choice1Of3 (filePath, skipTrunc) -> // Summarize or ingest from file (no application store or change feed processor involved) + return mkFileSource filePath skipTrunc <|| + match args.Action with + | Args.Action.SummarizeFile _ -> summarize () + | Args.Action.Sync a -> sync a + | Args.Action.Export a -> export a + | x -> x |> failwithf "unexpected %A" + | Choice2Of3 (filePath, skipTrunc, store) -> // Index from file to store (no change feed involved) + return mkFileSource filePath skipTrunc <|| + match args.Action with + | Args.Action.Index _ -> index store + | x -> x |> failwithf "unexpected %A" + | Choice3Of3 (monitored, leases, (startFromTail, maxItems, tailSleepInterval, _lagFrequency), store) -> // normal case - consume from change feed, write to store + let parseFeedDoc, sink = + match args.Action with + | Args.Action.Summarize _ -> summarize () + | Args.Action.Index _ -> index store + | Args.Action.Snapshot _ -> snapshot store + | Args.Action.Sync a -> sync a + | Args.Action.Export a -> export a + | Args.Action.SummarizeFile _ as x -> x |> failwithf "unexpected %A" + let source = + Propulsion.CosmosStore.CosmosStoreSource( + Log.Logger, args.StatsInterval, monitored, leases, processorName, parseFeedDoc, sink, + startFromTail = startFromTail, ?maxItems = maxItems, tailSleepInterval = tailSleepInterval, ?lagEstimationInterval = args.LagEstimationInterval + ).Start() + return sink, source } open Propulsion.Internal // AwaitKeyboardInterruptAsTaskCanceledException +(* +// A typical app will likely have health checks etc, implying the wireup would be via `endpoints.MapMetrics()` and thus not use this ugly code directly +let startMetricsServer port: IDisposable = + let metricsServer = new Prometheus.KestrelMetricServer(port = port) + let ms = metricsServer.Start() + Log.Information("Prometheus /metrics endpoint on port {port}", port) + { new IDisposable with member x.Dispose() = ms.Stop(); (metricsServer :> IDisposable).Dispose() } *) + +let eofSignalException = System.Threading.Tasks.TaskCanceledException "Stopping; FeedMonitor wait completed" +let isExpectedShutdownSignalException: exn -> bool = function + | :? Argu.ArguParseException // Via Arguments.Parse and/or Configuration.tryGet + | :? System.Threading.Tasks.TaskCanceledException -> true // via AwaitKeyboardInterruptAsTaskCanceledException + | _ -> false + let run args = async { let! sink, source = build args - use _metricsServer: IDisposable = args.PrometheusPort |> Option.map startMetricsServer |> Option.toObj - return! [| Async.AwaitKeyboardInterruptAsTaskCanceledException() - source.AwaitWithStopOnCancellation() - sink.AwaitWithStopOnCancellation() - |] |> Async.Parallel |> Async.Ignore } + // use _metricsServer: IDisposable = args.PrometheusPort |> Option.map startMetricsServer |> Option.toObj + try do! [| async { match args.WaitForTail with + | None -> () + | Some initialWait -> + do! source.Monitor.AwaitCompletion(initialWait, awaitFullyCaughtUp = true, logInterval = args.StatsInterval / 2.) |> Async.AwaitTask + source.Stop() + do! source.AwaitWithStopOnCancellation() // Wait until Source has emitted stats + return raise eofSignalException } // trigger tear down of sibling waits + sink.AwaitWithStopOnCancellation() + Async.AwaitKeyboardInterruptAsTaskCanceledException() |] |> Async.Parallel |> Async.Ignore + finally source.Flush() |> Async.Ignore |> Async.RunSynchronously } // flush checkpoints // TODO do! in F# 7 [] let main argv = try let args = Args.parse EnvVar.tryGet argv - try let metrics = Sinks.equinoxAndPropulsionCosmosConsumerMetrics (Sinks.tags AppName) args.ProcessorName - Log.Logger <- LoggerConfiguration().Configure(args.Verbose).Sinks(metrics, args.Cosmos.Verbose).CreateLogger() + try let metrics = Sinks.equinoxAndPropulsionConsumerMetrics (Sinks.tags AppName) args.ProcessorName + Log.Logger <- LoggerConfiguration().Configure(args.Verbose).Sinks(metrics, args.CosmosVerbose).CreateLogger() try run args |> Async.RunSynchronously; 0 - with e when not (e :? System.Threading.Tasks.TaskCanceledException) -> Log.Fatal(e, "Exiting"); 2 + with + | :? Propulsion.Streams.HealthCheckException as e -> + Log.Fatal(e, "Exiting due to Healthcheck; Stuck streams {stuck} Failing streams {failing}", e.StuckStreams, e.FailingStreams); 3 + | e when not (isExpectedShutdownSignalException e) -> Log.Fatal(e, "Exiting"); 2 finally Log.CloseAndFlush() - with :? Argu.ArguParseException as e -> eprintfn $"%s{e.Message}"; 1 - | e -> eprintf $"Exception %s{e.Message}"; 1 + with x when x = eofSignalException -> printfn "Processing COMPLETE"; 0 + | :? Argu.ArguParseException as e -> eprintfn $"%s{e.Message}"; 1 + | e -> eprintfn $"Exception %s{e.Message}"; 1 diff --git a/propulsion-indexer/Indexer/Snapshotter.fs b/propulsion-indexer/Indexer/Snapshotter.fs index feb1b6563..b858d6652 100644 --- a/propulsion-indexer/Indexer/Snapshotter.fs +++ b/propulsion-indexer/Indexer/Snapshotter.fs @@ -1,46 +1,58 @@ module IndexerTemplate.Indexer.Snapshotter -type Outcome = bool +open Visitor +open Propulsion.Internal + +type Outcome = (struct (string * System.TimeSpan * Store.Snapshotter.Result)) +module Outcome = let create sn ts res: Outcome = struct (FsCodec.StreamName.Category.ofStreamName sn, ts, res) + +/// Gathers counts of snapshots updated vs skipped +type Stats(log, statsInterval, stateInterval, verboseStore, abendThreshold) = + inherit StatsBase(log, statsInterval, stateInterval, verboseStore, abendThreshold = abendThreshold) + let lats, accLats = Stats.LatencyStatsSet(), Stats.LatencyStatsSet() + let counts, accCounts = CategoryCounters(), CategoryCounters() + override _.HandleOk((cat, ts, res)) = + lats.Record(cat, ts) + accLats.Record(cat, ts) + let count = [ FsCodec.Union.caseName res, 1 ] + counts.Ingest(cat, count) + accCounts.Ingest(cat, count) -type Stats(log, statsInterval, stateInterval, verboseStore) = - inherit Propulsion.Streams.Stats(log, statsInterval, stateInterval) - - let mutable handled, skipped = 0, 0 - override _.HandleOk(updated) = if updated then handled <- handled + 1 else skipped <- skipped + 1 override _.DumpStats() = + counts.DumpGrouped(log, "OUTCOMES") + counts.Clear() + lats.DumpGrouped(id, log, totalLabel = "CATEGORIES") + lats.Clear() base.DumpStats() - log.Information(" Snapshotted {handled}, skipped {skipped}", handled, skipped) - handled <- 0; skipped <- 0 - Equinox.CosmosStore.Core.Log.InternalMetrics.dump Serilog.Log.Logger - - override _.Classify(e) = - match e with - | OutcomeKind.StoreExceptions kind -> kind - | Equinox.CosmosStore.Exceptions.ServiceUnavailable when not verboseStore -> Propulsion.Streams.OutcomeKind.RateLimited - | x -> base.Classify x - override _.HandleExn(log, exn) = - log.Information(exn, "Unhandled") + override _.DumpState(prune) = + accCounts.DumpGrouped(log, "ΣOUTCOMES") + accLats.DumpGrouped(id, log, totalLabel = "ΣCATEGORIES") + if prune then accLats.Clear() + base.DumpState(prune) open IndexerTemplate.Domain -let handle - tryUpdateTodo +let handle todo stream _events: Async<_ * Outcome> = async { + let ts = Stopwatch.timestamp () let! res, pos' = match stream with - | Todo.Reactions.For id -> tryUpdateTodo id + | Todo.Reactions.For id -> todo id | sn -> failwith $"Unexpected category %A{sn}" // a) if the tryUpdate saw a version beyond what (Propulsion.Sinks.Events.nextIndex events) would suggest, then we pass that information out // in order to have the scheduler drop all events until we meet an event that signifies we may need to re-update // b) the fact that we use the same Microsoft.Azure.Cosmos.CosmosClient for the Change Feed and the Equinox-based Services means we are guaranteed // to always see all the _events we've been supplied. (Even if this were not the case, the scheduler would retain the excess events, and that // would result in an immediate re-triggering of the handler with those events) - return Propulsion.Sinks.StreamResult.OverrideNextIndex pos', res } + let elapsed = Stopwatch.elapsed ts + return Propulsion.Sinks.StreamResult.OverrideNextIndex pos', Outcome.create stream elapsed res } module Factory = - let createHandler context = + let createHandler dryRun store = + + let todo = Todo.Factory.createSnapshotter store - let todo = Todo.Factory.createSnapshotter context + let h svc = Store.Snapshotter.Service.tryUpdate dryRun svc handle - todo.TryUpdate + ( h todo) diff --git a/propulsion-indexer/Indexer/Visitor.fs b/propulsion-indexer/Indexer/Visitor.fs new file mode 100644 index 000000000..ea1d3cfe7 --- /dev/null +++ b/propulsion-indexer/Indexer/Visitor.fs @@ -0,0 +1,82 @@ +module IndexerTemplate.Indexer.Visitor + +type Outcome = (struct (string * (struct (string * System.TimeSpan))[] * (string * int)[])) +module Outcome = + let private eventType (x: Propulsion.Sinks.Event) = x.EventType + let private eventCounts = Array.countBy eventType + let private create sn ham spam: Outcome = struct (FsCodec.StreamName.Category.ofStreamName sn, ham, spam) + let render sn ham spam = create sn ham (eventCounts spam) + let render_ sn ham spam elapsedS = + let share = TimeSpan.seconds (match Array.length ham with 0 -> 0 | count -> elapsedS / float count) + create sn (ham |> Array.map (fun x -> struct (eventType x, share))) (eventCounts spam) + +[] +type StatsBase<'outcome>(log, statsInterval, stateInterval, verboseStore, ?abendThreshold) = + inherit Propulsion.Streams.Stats<'outcome>(log, statsInterval, stateInterval, failThreshold = TimeSpan.seconds 120, ?abendThreshold = abendThreshold) + + override _.DumpStats() = + base.DumpStats() + Equinox.CosmosStore.Core.Log.InternalMetrics.dump Serilog.Log.Logger + + override _.Classify(e) = + match e with + | OutcomeKind.StoreExceptions kind -> kind + // Cosmos Emulator overload manifests as 'Response status code does not indicate success: ServiceUnavailable (503); Substatus: 20002' + // (in verbose mode, we let the actual exception bubble up) + | Equinox.CosmosStore.Exceptions.ServiceUnavailable when not verboseStore -> Propulsion.Streams.OutcomeKind.Tagged "cosmos503" + | :? System.TimeoutException -> Propulsion.Streams.OutcomeKind.Tagged "timeoutEx" + // Emulator can emit this (normally only during laptop sleeps) + | Equinox.CosmosStore.Exceptions.CosmosStatus System.Net.HttpStatusCode.Forbidden as e + when e.Message.Contains "Authorization token is not valid at the current time. Please provide a valid token" -> + Propulsion.Streams.OutcomeKind.Timeout + | x -> + base.Classify x + override _.HandleExn(log, exn) = + log.Information(exn, "Unhandled") + +type CategoryCounters() = + let cats = System.Collections.Generic.Dictionary() + member _.Ingest(category, counts) = + let cat = + match cats.TryGetValue category with + | false, _ -> let acc = Propulsion.Internal.Stats.Counters() in cats.Add(category, acc); acc + | true, acc -> acc + for event, count : int in counts do cat.Ingest(event, count) + member _.Categories = cats.Keys + member _.StatsDescending cat = + match cats.TryGetValue cat with + | true, acc -> acc.StatsDescending + | false, _ -> Seq.empty + member _.DumpGrouped(log: Serilog.ILogger, totalLabel) = + if cats.Count <> 0 then + Propulsion.Internal.Stats.dumpCounterSet log totalLabel cats + member _.Clear() = cats.Clear() + +type Stats(log, statsInterval, stateInterval, verboseStore, abendThreshold) = + inherit StatsBase(log, statsInterval, stateInterval, verboseStore, abendThreshold = abendThreshold) + let mutable handled, ignored = 0, 0 + let accHam, accSpam = CategoryCounters(), CategoryCounters() + override _.HandleOk((category, ham, spam)) = + accHam.Ingest(category, ham |> Seq.countBy Propulsion.Internal.ValueTuple.fst) + accSpam.Ingest(category, spam) + handled <- handled + Array.length ham + ignored <- ignored + Array.sumBy snd spam + override _.DumpStats() = + if handled > 0 || ignored > 0 then + if ignored > 0 then log.Information(" Handled {count}, skipped {skipped}", handled, ignored) + handled <- 0; ignored <- 0 + override _.DumpState purge = + for cat in Seq.append accHam.Categories accSpam.Categories |> Seq.distinct |> Seq.sort do + let ham, spam = accHam.StatsDescending(cat) |> Array.ofSeq, accSpam.StatsDescending cat |> Array.ofSeq + if ham.Length > 00 then log.Information(" Category {cat} handled {@ham}", cat, ham) + if spam.Length <> 0 then log.Information(" Category {cat} ignored {@spam}", cat, spam) + if purge then + accHam.Clear(); accSpam.Clear() + +let private handle isValidEvent stream (events: Propulsion.Sinks.Event[]): Async<_ * Outcome> = async { + let ham, spam = events |> Array.partition isValidEvent + return Propulsion.Sinks.StreamResult.AllProcessed, Outcome.render_ stream ham spam 0 } + +module Factory = + + let createHandler = handle diff --git a/propulsion-reactor/Infrastructure.fs b/propulsion-reactor/Infrastructure.fs index 9489039f0..a0ddc9cb6 100644 --- a/propulsion-reactor/Infrastructure.fs +++ b/propulsion-reactor/Infrastructure.fs @@ -146,9 +146,10 @@ type Logging() = module OutcomeKind = - let [] (|StoreExceptions|_|) exn = + let [] (|StoreExceptions|_|) (exn: exn) = match exn with | Equinox.DynamoStore.Exceptions.ProvisionedThroughputExceeded | Equinox.CosmosStore.Exceptions.RateLimited -> Propulsion.Streams.OutcomeKind.RateLimited |> ValueSome - | Equinox.CosmosStore.Exceptions.RequestTimeout -> Propulsion.Streams.OutcomeKind.Timeout |> ValueSome + | Equinox.CosmosStore.Exceptions.RequestTimeout -> Propulsion.Streams.OutcomeKind.Tagged "cosmosTimeout" |> ValueSome + | :? System.Threading.Tasks.TaskCanceledException -> Propulsion.Streams.OutcomeKind.Tagged "taskCancelled" |> ValueSome | _ -> ValueNone