diff --git a/src/Propulsion.Cosmos/CosmosPruner.fs b/src/Propulsion.Cosmos/CosmosPruner.fs index 6a6fad32..d448d0d2 100644 --- a/src/Propulsion.Cosmos/CosmosPruner.fs +++ b/src/Propulsion.Cosmos/CosmosPruner.fs @@ -44,7 +44,7 @@ module Pruner = totalDeferred <- totalDeferred + deferred /// Used to render exceptions that don't fall into the rate-limiting or timed-out categories - override _.HandleExn(log, exn) = + override _.HandleExn(log, _stream, exn) = match classify exn with | ExceptionKind.RateLimited | ExceptionKind.TimedOut -> () // Outcomes are already included in the statistics - no logging is warranted diff --git a/src/Propulsion.CosmosStore/CosmosStorePruner.fs b/src/Propulsion.CosmosStore/CosmosStorePruner.fs index 93743376..bd5d269d 100644 --- a/src/Propulsion.CosmosStore/CosmosStorePruner.fs +++ b/src/Propulsion.CosmosStore/CosmosStorePruner.fs @@ -42,7 +42,7 @@ module Pruner = totalDeferred <- totalDeferred + deferred /// Used to render exceptions that don't fall into the rate-limiting or timed-out categories - override _.HandleExn(log, exn) = + override _.HandleExn(log, _stream, exn) = match classify exn with | ExceptionKind.RateLimited | ExceptionKind.TimedOut -> () // Outcomes are already included in the statistics - no logging is warranted diff --git a/src/Propulsion/Streams.fs b/src/Propulsion/Streams.fs index 553fbbac..4e3be862 100755 --- a/src/Propulsion/Streams.fs +++ b/src/Propulsion/Streams.fs @@ -518,6 +518,10 @@ module Scheduling = let timeSpanFromStopwatchTicks = function | ticks when ticks > 0L -> TimeSpan.FromSeconds(double ticks / ticksPerSecond) | _ -> TimeSpan.Zero + let private dateTimeOffsetToTimeStamp (dto : DateTimeOffset) : int64 = + let now, nowTs = DateTimeOffset.UtcNow, System.Diagnostics.Stopwatch.GetTimestamp() + let totalWaitS = (dto - now).TotalSeconds + nowTs + int64 (totalWaitS * ticksPerSecond) type private StreamState = { ts : int64; mutable count : int } let private walkAges (state : Dictionary<_, _>) = let now = System.Diagnostics.Stopwatch.GetTimestamp() @@ -551,18 +555,39 @@ module Scheduling = | true, v -> v.count <- v.count + 1 | false, _ -> state.Add(sn, { ts = startTs; count = 1 }) member _.State = walkAges state |> renderState + /// Maintains a list of streams that have been marked to backing off on processing + type private Waiting() = + let state = Dictionary() // NOTE ts is a cutoff time, not a start time here + let prune cutoff = + for sn in [| for kv in state do if kv.Value.ts <= cutoff then kv.Key |] do + state.Remove sn |> ignore + let walk now = if state.Count = 0 then Seq.empty else seq { for x in state.Values -> struct (x.ts - now, x.count) } + member _.HandleBackOff(sn, untilTs) = state.Add(sn, { ts = untilTs; count = 1 }) + member _.CanDispatch(sn, ts) = + match state.TryGetValue sn with + | true, { ts = until } when until > ts -> false + | true, _ -> state.Remove sn |> ignore; true + | false, _ -> true + member _.State : (int * int) * (TimeSpan * TimeSpan) = + let now = System.Diagnostics.Stopwatch.GetTimestamp() + prune now + walk now |> renderState /// Collates all state and reactions to manage the list of busy streams based on callbacks/notifications from the Dispatcher type Monitor() = - let active, failing, stuck = Active(), Repeating(), Repeating() + let active, failing, stuck, waiting = Active(), Repeating(), Repeating(), Waiting() let emit (log : ILogger) state (streams, attempts) (oldest : TimeSpan, newest : TimeSpan) = log.Information(" {state} {streams} for {newest:n1}-{oldest:n1}s, {attempts} attempts", state, streams, newest.TotalSeconds, oldest.TotalSeconds, attempts) + member _.CanDispatch(sn, ts) = + waiting.CanDispatch(sn, ts) member _.HandleStarted(sn, ts) = active.HandleStarted(sn, ts) member _.HandleResult(sn, succeeded, progressed) = let startTs = active.TakeFinished(sn) failing.HandleResult(sn, not succeeded, startTs) stuck.HandleResult(sn, succeeded && not progressed, startTs) + member _.HandleBackoff(sn, untilTs) = + waiting.HandleBackOff(sn, dateTimeOffsetToTimeStamp untilTs) member _.DumpState(log : ILogger) = let inline dump state (streams, attempts) ages = if streams <> 0 then @@ -570,6 +595,7 @@ module Scheduling = active.State ||> dump "active" failing.State ||> dump "failing" stuck.State ||> dump "stalled" + waiting.State ||> dump "waiting" member _.EmitMetrics(log : ILogger) = let inline report state (streams, attempts) (oldest : TimeSpan, newest : TimeSpan) = let m = Log.Metric.StreamsBusy (state, streams, oldest.TotalSeconds, newest.TotalSeconds) @@ -577,6 +603,7 @@ module Scheduling = active.State ||> report "active" failing.State ||> report "failing" stuck.State ||> report "stalled" + waiting.State ||> report "waiting" /// Gathers stats pertaining to the core projection/ingestion activity [] @@ -625,6 +652,14 @@ module Scheduling = if stucksDue () then mon.EmitMetrics metricsLog + abstract BackoffUntil : stream : FsCodec.StreamName * until : DateTimeOffset -> unit + default _.BackoffUntil(stream, until) = + mon.HandleBackoff(stream, until) + + /// Enables one to configure backoffs for streams that are failing + abstract CanDispatch : stream : FsCodec.StreamName * stopwatchTicks : int64 -> bool + default _.CanDispatch(stream, stopwatchTicks) = mon.CanDispatch(stream, stopwatchTicks) + abstract MarkStarted : stream : FsCodec.StreamName * stopwatchTicks : int64 -> unit default _.MarkStarted(stream, stopwatchTicks) = mon.HandleStarted(stream, stopwatchTicks) @@ -685,7 +720,7 @@ module Scheduling = let inner = DopDispatcher(maxDop) // On each iteration, we try to fill the in-flight queue, taking the oldest and/or heaviest streams first - let tryFillDispatcher (pending, markStarted) project markBusy = + let tryFillDispatcher (pending, canDispatchAt, markStarted) project markBusy = let mutable hasCapacity, dispatched = inner.HasCapacity, false if hasCapacity then let potential : seq> = pending () @@ -693,19 +728,20 @@ module Scheduling = let ts = System.Diagnostics.Stopwatch.GetTimestamp() while xs.MoveNext() && hasCapacity do let item = xs.Current - let succeeded = inner.TryAdd(project ts item) - if succeeded then - markBusy item.stream - markStarted (item.stream, ts) - hasCapacity <- succeeded - dispatched <- dispatched || succeeded // if we added any request, we'll skip sleeping + if canDispatchAt (item.stream, ts) then + let succeeded = inner.TryAdd(project ts item) + if succeeded then + markBusy item.stream + markStarted (item.stream, ts) + hasCapacity <- succeeded + dispatched <- dispatched || succeeded // if we added any request, we'll skip sleeping hasCapacity, dispatched member _.Pump() = inner.Pump() [] member _.Result = inner.Result member _.State = inner.State - member _.TryReplenish (pending, markStarted) project markStreamBusy = - tryFillDispatcher (pending, markStarted) project markStreamBusy + member _.TryReplenish (pending, canDispatchAt, markStarted) project markStreamBusy = + tryFillDispatcher (pending, canDispatchAt, markStarted) project markStreamBusy /// Defines interface between Scheduler (which owns the pending work) and the Dispatcher which periodically selects work to commence based on a policy type IDispatcher<'P, 'R, 'E> = @@ -743,7 +779,7 @@ module Scheduling = interface IDispatcher<'P, 'R, 'E> with override _.TryReplenish pending markStreamBusy = - inner.TryReplenish (pending, stats.MarkStarted) project markStreamBusy + inner.TryReplenish (pending, stats.CanDispatch, stats.MarkStarted) project markStreamBusy [] override _.Result = inner.Result override _.InterpretProgress(streams : StreamStates<_>, stream : FsCodec.StreamName, res : Choice<'P, 'E>) = interpretProgress streams stream res @@ -998,9 +1034,9 @@ type Stats<'Outcome>(log : ILogger, statsInterval, statesInterval) = exnEvents <- exnEvents + es exnBytes <- exnBytes + int64 bs resultExnOther <- resultExnOther + 1 - this.HandleExn(log.ForContext("stream", stream).ForContext("events", es).ForContext("duration", duration), exn) + this.HandleExn(log.ForContext("stream", stream).ForContext("events", es).ForContext("duration", duration), stream, exn) abstract member HandleOk : outcome : 'Outcome -> unit - abstract member HandleExn : log : ILogger * exn : exn -> unit + abstract member HandleExn : log : ILogger * streamName : FsCodec.StreamName * exn : exn -> unit module Projector = diff --git a/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs b/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs index d0c384dd..6fce72d4 100644 --- a/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs +++ b/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs @@ -120,7 +120,7 @@ module Helpers = inherit Propulsion.Streams.Stats(log, statsInterval, stateInterval) override _.HandleOk(()) = () - override _.HandleExn(log, exn) = log.Information(exn, "Unhandled") + override _.HandleExn(log, _stream, exn) = log.Information(exn, "Unhandled") let runConsumersBatch log (config : KafkaConsumerConfig) (numConsumers : int) (timeout : TimeSpan option) (handler : ConsumerCallback) = async { let mkConsumer (consumerId : int) = async { diff --git a/tools/Propulsion.Tool/Program.fs b/tools/Propulsion.Tool/Program.fs index 71a6c70b..23521b7b 100644 --- a/tools/Propulsion.Tool/Program.fs +++ b/tools/Propulsion.Tool/Program.fs @@ -162,7 +162,7 @@ type Stats(log, statsInterval, statesInterval) = inherit Propulsion.Streams.Stats(log, statsInterval=statsInterval, statesInterval=statesInterval) member val StatsInterval = statsInterval override _.HandleOk(_log) = () - override _.HandleExn(_log, _exn) = () + override _.HandleExn(_log, _stream, _exn) = () [] let main argv =