diff --git a/src/Propulsion/Streams.fs b/src/Propulsion/Streams.fs index 1a09c490..4e3be862 100755 --- a/src/Propulsion/Streams.fs +++ b/src/Propulsion/Streams.fs @@ -518,6 +518,10 @@ module Scheduling = let timeSpanFromStopwatchTicks = function | ticks when ticks > 0L -> TimeSpan.FromSeconds(double ticks / ticksPerSecond) | _ -> TimeSpan.Zero + let private dateTimeOffsetToTimeStamp (dto : DateTimeOffset) : int64 = + let now, nowTs = DateTimeOffset.UtcNow, System.Diagnostics.Stopwatch.GetTimestamp() + let totalWaitS = (dto - now).TotalSeconds + nowTs + int64 (totalWaitS * ticksPerSecond) type private StreamState = { ts : int64; mutable count : int } let private walkAges (state : Dictionary<_, _>) = let now = System.Diagnostics.Stopwatch.GetTimestamp() @@ -551,18 +555,39 @@ module Scheduling = | true, v -> v.count <- v.count + 1 | false, _ -> state.Add(sn, { ts = startTs; count = 1 }) member _.State = walkAges state |> renderState + /// Maintains a list of streams that have been marked to backing off on processing + type private Waiting() = + let state = Dictionary() // NOTE ts is a cutoff time, not a start time here + let prune cutoff = + for sn in [| for kv in state do if kv.Value.ts <= cutoff then kv.Key |] do + state.Remove sn |> ignore + let walk now = if state.Count = 0 then Seq.empty else seq { for x in state.Values -> struct (x.ts - now, x.count) } + member _.HandleBackOff(sn, untilTs) = state.Add(sn, { ts = untilTs; count = 1 }) + member _.CanDispatch(sn, ts) = + match state.TryGetValue sn with + | true, { ts = until } when until > ts -> false + | true, _ -> state.Remove sn |> ignore; true + | false, _ -> true + member _.State : (int * int) * (TimeSpan * TimeSpan) = + let now = System.Diagnostics.Stopwatch.GetTimestamp() + prune now + walk now |> renderState /// Collates all state and reactions to manage the list of busy streams based on callbacks/notifications from the Dispatcher type Monitor() = - let active, failing, stuck = Active(), Repeating(), Repeating() + let active, failing, stuck, waiting = Active(), Repeating(), Repeating(), Waiting() let emit (log : ILogger) state (streams, attempts) (oldest : TimeSpan, newest : TimeSpan) = log.Information(" {state} {streams} for {newest:n1}-{oldest:n1}s, {attempts} attempts", state, streams, newest.TotalSeconds, oldest.TotalSeconds, attempts) + member _.CanDispatch(sn, ts) = + waiting.CanDispatch(sn, ts) member _.HandleStarted(sn, ts) = active.HandleStarted(sn, ts) member _.HandleResult(sn, succeeded, progressed) = let startTs = active.TakeFinished(sn) failing.HandleResult(sn, not succeeded, startTs) stuck.HandleResult(sn, succeeded && not progressed, startTs) + member _.HandleBackoff(sn, untilTs) = + waiting.HandleBackOff(sn, dateTimeOffsetToTimeStamp untilTs) member _.DumpState(log : ILogger) = let inline dump state (streams, attempts) ages = if streams <> 0 then @@ -570,6 +595,7 @@ module Scheduling = active.State ||> dump "active" failing.State ||> dump "failing" stuck.State ||> dump "stalled" + waiting.State ||> dump "waiting" member _.EmitMetrics(log : ILogger) = let inline report state (streams, attempts) (oldest : TimeSpan, newest : TimeSpan) = let m = Log.Metric.StreamsBusy (state, streams, oldest.TotalSeconds, newest.TotalSeconds) @@ -577,6 +603,7 @@ module Scheduling = active.State ||> report "active" failing.State ||> report "failing" stuck.State ||> report "stalled" + waiting.State ||> report "waiting" /// Gathers stats pertaining to the core projection/ingestion activity [] @@ -625,6 +652,14 @@ module Scheduling = if stucksDue () then mon.EmitMetrics metricsLog + abstract BackoffUntil : stream : FsCodec.StreamName * until : DateTimeOffset -> unit + default _.BackoffUntil(stream, until) = + mon.HandleBackoff(stream, until) + + /// Enables one to configure backoffs for streams that are failing + abstract CanDispatch : stream : FsCodec.StreamName * stopwatchTicks : int64 -> bool + default _.CanDispatch(stream, stopwatchTicks) = mon.CanDispatch(stream, stopwatchTicks) + abstract MarkStarted : stream : FsCodec.StreamName * stopwatchTicks : int64 -> unit default _.MarkStarted(stream, stopwatchTicks) = mon.HandleStarted(stream, stopwatchTicks) @@ -685,7 +720,7 @@ module Scheduling = let inner = DopDispatcher(maxDop) // On each iteration, we try to fill the in-flight queue, taking the oldest and/or heaviest streams first - let tryFillDispatcher (pending, markStarted) project markBusy = + let tryFillDispatcher (pending, canDispatchAt, markStarted) project markBusy = let mutable hasCapacity, dispatched = inner.HasCapacity, false if hasCapacity then let potential : seq> = pending () @@ -693,19 +728,20 @@ module Scheduling = let ts = System.Diagnostics.Stopwatch.GetTimestamp() while xs.MoveNext() && hasCapacity do let item = xs.Current - let succeeded = inner.TryAdd(project ts item) - if succeeded then - markBusy item.stream - markStarted (item.stream, ts) - hasCapacity <- succeeded - dispatched <- dispatched || succeeded // if we added any request, we'll skip sleeping + if canDispatchAt (item.stream, ts) then + let succeeded = inner.TryAdd(project ts item) + if succeeded then + markBusy item.stream + markStarted (item.stream, ts) + hasCapacity <- succeeded + dispatched <- dispatched || succeeded // if we added any request, we'll skip sleeping hasCapacity, dispatched member _.Pump() = inner.Pump() [] member _.Result = inner.Result member _.State = inner.State - member _.TryReplenish (pending, markStarted) project markStreamBusy = - tryFillDispatcher (pending, markStarted) project markStreamBusy + member _.TryReplenish (pending, canDispatchAt, markStarted) project markStreamBusy = + tryFillDispatcher (pending, canDispatchAt, markStarted) project markStreamBusy /// Defines interface between Scheduler (which owns the pending work) and the Dispatcher which periodically selects work to commence based on a policy type IDispatcher<'P, 'R, 'E> = @@ -743,7 +779,7 @@ module Scheduling = interface IDispatcher<'P, 'R, 'E> with override _.TryReplenish pending markStreamBusy = - inner.TryReplenish (pending, stats.MarkStarted) project markStreamBusy + inner.TryReplenish (pending, stats.CanDispatch, stats.MarkStarted) project markStreamBusy [] override _.Result = inner.Result override _.InterpretProgress(streams : StreamStates<_>, stream : FsCodec.StreamName, res : Choice<'P, 'E>) = interpretProgress streams stream res