diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md index d70c2389..24220131 100755 --- a/DOCUMENTATION.md +++ b/DOCUMENTATION.md @@ -407,6 +407,95 @@ NOTE batching should only be used where there is a direct benefit (the batch han - Store in the derived data - Add at the end, just in time +# Handler expectations / assumptions + +All processing within the Indexer adheres to the following rules and/or conventions: +- in addition to normal at least once delivery handling, handlers are expected to efficiently handle complete re-traversal of all events as standard function +- processing logic works on a streams basis: while the behavior is in some cases predicated on specific Event Types, all behaviors work in a batches fashion at the stream level. Each time the handler is invoked, the entire set of buffered events is presented. Within the handler, the processing batches the work to the maximum degree possible. +- there are no cross-stream ordering requirements/constraints (any handler that has a dependency on data outside of a given stream is expected to internally manage the correlation in an appropriate way, e.g., the handler may implement states within the derived information such that it buffers child information until such time as related parent information is required) +- checkpoints are maintained in the Main Container's `-aux` Container (by default, there's a container adjacent any source ('Monitored') Container that maintains the checkpoints, this is referred to as the 'Leases Container') per the normal Cosmos ChangeFeed processor convention. (The Leases Container is referred to as that due to the fact that each consumer group's checkpoint ) +- processing stops if an error is encountered; i.e. there is no dead-lettering or similar facility - progress is not marked against the source for any given batch of input events until each and every event has been successfully processed. This philosophy is often referred to as an [Andon cord](https://thinkinsights.net/strategy/andon-cord/) policy. + + +# Stream Positions, Buffering and Deduplication + +The `Propulsion.Streams` processing system receives events from the ChangeFeed as a chain of batches per physical partition of the CosmosDB store (approx a continuous `select * from C`). Internally events are grouped by stream. Handlers are fed all the buffered items for a single stream as an array. + +There should never be a gap in the events as they arrive from the feed; where this does happen (e.g. if an Item was manually deleted), the Propulsion Scheduler will refuse to dispatch the events to the handler (TODO document behavior and/or handling). + +As well as buffering events that have yet to be processed, `Propulsion.Streams` also maintains a 'write position' per stream, to manage: +1. (low frequency) de-duplicating redelivered events where a lease is lost and then reassigned, without the checkpoint having been advanced +2. (high frequency when [Equinox configured to store Events In Tip](#events-in-tip)) discarding events from tip (or calved items) that have already been processed +3. (high frequency when re-traversing events and using `StreamResult.OverrideNextIndex`) immediate discarding of events that are known to already have been ingested into the indexed form. In addition to the fact that the event can immediately be dropped from the buffer, this also avoids the handler potentially doing duplicate work. + +The write position is `0`-based, i.e. the following rules apply: +- if you load an Equinox stream with two events, they should be numbered with `Index` values of `0` and `1`, and the `ISyncContext.Version` in such a state would be `2`. The `Version` value thus coincides with the notion of the _write position_ that Propulsion maintains: if the write position is `3`, then events `0`, `1` and `2` can be discarded immediately on read. +- when the position is unknown, it's effectively `0` + +Given a batch of two `events` numbered `0` and `1`, the write position after they have been processed can be derived by any of the following means using helpers in `Propulsion.Streams.Sinks`: +- `Index` value of the first event, plus the length, `event[0].Index + events.Length` (a handler is only ever supplied a contiguous span of events) +- `Index` value of the last event, plus 1: `event[^1].Index + 1` (the 'next' index after any given event is the value plus 1) +- `Events.nextIndex events` (built in helper in `Propulsion.Streams.Sinks`) + +## `StreamResult` implications + +Every handler invocation (that does not fail with an Exception), triggers an update to the write position based on the `StreamResult`. The net effect will be one of the following: +1. Full/partial progress has been achieved: where at least one event has been declared handled, the position advances, and the events are removed from the buffer. The stream is removed from the `busy`, `active` and `failing` classifications. +2. the Handler identifies that the indexed state is actually ahead of the current event delivery. This can occur for multiple reasons, e.g.: + - If the handler loads the source data being monitored, it may reach a position beyond the Change Feed's read position. For example: if the handler (prompted by event 0) loads the state of the source stream and notes that the version of the state implies that it has also taken the effect of event 1 into account, it can indicate that the write position should now move to event 2 (which means events 0 and 1 will be discarded immediately on read). + - If the handler records the attained event index as part of the derived state, it can use that information to avoid processing some of the events it has been supplied. For example, if events 0-3 are presented to the Handler, it's first action may be to load the current derived state, and skip all events whose `Index` is less than the position that had previously been attained for that stream (by comparing the `ITimelineEvent.Index` value per event). Furthermore, if it discovers that it has actually previously ingested all information up to position 10, then events 4-9 can be dropped. + - a target may be able to safely and cheaply ignore events that have already been ingested, yielding the actual attained position as a by-product (this is termed 'idempotent write' semantics). For example, `Propulsion.CosmosStore.CosmosStoreSink` uses this technique; if presented with events with `Index` values ranging `0-3`, it will prepare a `Sync` batch consisting of the 4 events, without touching the target store. + - If the Sync Stored Procedure determines that two events are already present, it will only append the two that are new from it's perspective. The handler can report the progress via any of the following: + - `StreamResult.AllProcessed`: as the handler was supplied events `0-4`, the write position thus logically becomes `5`. + - If it determines that it has all events up to number `4`, and more, then it will respond that the write was successful, but the next index / write position is something other than `5`. For instance if the sync attempt reveals that there already 10 events, then the Handler will end up yielding a `StreamResult.OverrideNextIndex 5` + + +## Position storage / purging + +A stream that does not have events buffered is represented by a 16 byte `struct` within a .NET `Dictionary` keyed by the stream name (a `string`). There's an optional purge frequency that defines how frequently positions for streams that don't have events waiting are jettisoned. In general, one should not set the purge interval too aggressively for the following reasons: +- Gaps (malformed store states) cannot be detected (Equinox's behavior is such that there are no APIs that can programmatically induce gaps as all writes are to the Tip, and Prune operations are always deletes of entire Items in order from the start of the stream) +- The at least once delivery de-duplication facility will lose the state upon which it depends, leading to potential avoidable processing cost increases. Example: if there are 8 events in the Tip Item of a Stream, and a ninth is added, an array with all 9 will be supplied to the handler, which may result in redundant processing. + +# Checkpoints + +The ChangeFeedProcessor mechanism guarantees that all events written to the store will be observed at least once per Consumer Group Name (referred to as a Processor Name in the MS documentation). + +Note that there is no relationship between the checkpoint position and any derived state; the sole control is that checkpoints can only ever advance when all events across all streams within a given batch of items from the feed have been (successfully) processed by the handler. + + +## Resetting checkpoints + +The MS CFP implementation [does not presently implement a facility to rewind/retraverse a consumer group](https://github.com/Azure/azure-cosmos-dotnet-v3/issues/510). Workarounds: +- delete the lease/position documents from the `-aux` container; the naming of the documents is relatively intuitive, but it's obviously not ideal to have to do such a maneuver. +- use a new Consumer Group Name; each processor name is expected to be unique in the context of a given leases container. As a result, using a new name will establish an independent set of positions and/or leases. + + +# Malformed streams handling + +Production has a number of streams that have had mutations applied that result in the ChangeFeed presenting the events out of order. The Propulsion scheduler component will refuse to process streams that have gaps within them, which results in processing stalling (TODO include screenshot of how this manifests) + +For the data presently in the store, it's possible to circumvent this hurdle by supplying arguments that have the reader obtain the missing events 'from the future' in order to ensure that processing can complete despite the inconsistency: +- `-r 10` (the 'batches to read ahead count) +- cosmos -b 1000 (the 'batch size count') + +# Notes + + +## Equinox Events in Tip mode + +In CosmosDB, each item is subject to two general overheads: there's a set of headers (approx 512B), and each item consumes index space. Additionally, point reads are significantly more efficient in terms of latency and RU consumption. For these reasons, in the general case it's best to configure streams to maintain events in the tip. + +From the write perspective, a Sync operation that's writing events becomes one of: +1. _append_: pushing an event to the tail of the `e` field in the Tip that holds the buffered events. The cost of such an operation involves a read and an overwrite, and is largely a function of the size of the Tip document. +2. _calve_: when the events in the Tip cause the maximum JSON size (configurable, but think 32K) to be breached, the events held in the Tip, together with the pending one being appended are 'calved off' into a separate item/document in the Container. Instead of a point write, there's a transactional batch consisting of: +- inserting the calved item/document - the events being added as part of this Sync operation, combined with the ones in the Tip become a fresh document +- updating the tip - the events in tip buffer is reset to being empty, and the unfolds (snapshots) and position are updated + +In both the calve and append cases above, the events that were being held in the Tip will be re-observed by the ChangeFeed reader (if we assume it's reading from the tail; in the scenario where we are re-traversing all the events, there are no such repeats; in fact the reading is actually more efficient as there's a minimal amount of per item overhead to the read cost). + +The most significant effect of storing event in tip on reactors (and projection systems based off the ChangeFeed in general) is the fact that the consumer needs to de-duplicate the events each time the document is received, ideally shedding the existing events and forwarding only the newly appended events for processing. + + ### Testing #### Unit testing projections