Skip to content

Commit

Permalink
feat(Indexer): Add upconverting export
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Feb 4, 2024
1 parent cc37b92 commit d63f693
Show file tree
Hide file tree
Showing 16 changed files with 746 additions and 235 deletions.
5 changes: 3 additions & 2 deletions equinox-shipping/Watchdog/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,10 @@ module EventStoreContext =

module OutcomeKind =

let [<return: Struct>] (|StoreExceptions|_|) exn =
let [<return: Struct>] (|StoreExceptions|_|) (exn: exn) =
match exn with
| Equinox.DynamoStore.Exceptions.ProvisionedThroughputExceeded
| Equinox.CosmosStore.Exceptions.RateLimited -> Propulsion.Streams.OutcomeKind.RateLimited |> ValueSome
| Equinox.CosmosStore.Exceptions.RequestTimeout -> Propulsion.Streams.OutcomeKind.Timeout |> ValueSome
| Equinox.CosmosStore.Exceptions.RequestTimeout -> Propulsion.Streams.OutcomeKind.Tagged "cosmosTimeout" |> ValueSome
| :? System.Threading.Tasks.TaskCanceledException -> Propulsion.Streams.OutcomeKind.Tagged "taskCancelled" |> ValueSome
| _ -> ValueNone
6 changes: 5 additions & 1 deletion propulsion-indexer/App/App.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@

<ItemGroup>
<Compile Include="Infrastructure.fs" />
<Compile Include="Configuration.fs" />
<Compile Include="Args.fs" />
<Compile Include="CosmosDumpSource.fs" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Argu" Version="6.1.4" />
<PackageReference Include="Equinox.CosmosStore.Prometheus" Version="4.0.0-rc.16" />
<PackageReference Include="prometheus-net.AspNetCore" Version="3.6.0" />
<PackageReference Include="Propulsion.Feed" Version="3.0.0-rc.10" />
<PackageReference Include="Propulsion.CosmosStore" Version="3.0.0-rc.10" />
<PackageReference Include="Propulsion.MemoryStore" Version="3.0.0-rc.10" />
<PackageReference Include="Serilog.Sinks.Async" Version="1.5.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.0.1" />
</ItemGroup>
Expand Down
116 changes: 116 additions & 0 deletions propulsion-indexer/App/Args.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
module App.Args

open Argu
open System

let [<Literal>] CONNECTION = "EQUINOX_COSMOS_CONNECTION"
let [<Literal>] DATABASE = "EQUINOX_COSMOS_DATABASE"
let [<Literal>] CONTAINER = "EQUINOX_COSMOS_CONTAINER"
let [<Literal>] VIEWS = "EQUINOX_COSMOS_VIEWS"

type Configuration(tryGet: string -> string option) =

let get key = match tryGet key with Some value -> value | None -> failwith $"Missing Argument/Environment Variable %s{key}"

member x.CosmosConnection = get CONNECTION
member x.CosmosDatabase = get DATABASE
member x.CosmosContainer = get CONTAINER
member x.CosmosViews = get VIEWS

type [<NoEquality; NoComparison>] CosmosParameters =
| [<AltCommandLine "-V"; Unique>] Verbose
| [<AltCommandLine "-s">] Connection of string
| [<AltCommandLine "-m">] ConnectionMode of Microsoft.Azure.Cosmos.ConnectionMode
| [<AltCommandLine "-d">] Database of string
| [<AltCommandLine "-c">] Container of string
| [<AltCommandLine "-v">] Views of string
| [<AltCommandLine "-o">] Timeout of float
| [<AltCommandLine "-r">] Retries of int
| [<AltCommandLine "-rt">] RetriesWaitTime of float
interface IArgParserTemplate with
member p.Usage = p |> function
| Verbose -> "request Verbose Logging from Store. Default: off"
| ConnectionMode _ -> "override the connection mode. Default: Direct."
| Connection _ -> $"specify a connection string for a Cosmos account. (optional if environment variable ${CONNECTION} specified)"
| Database _ -> $"specify a database name for store. (optional if environment variable ${DATABASE} specified)"
| Container _ -> $"specify a container name for store. (optional if environment variable ${CONTAINER} specified)"
| Views _ -> $"specify a views Container name for Cosmos views. (optional if environment variable ${VIEWS} specified)"
| Timeout _ -> "specify operation timeout in seconds. Default: 5."
| Retries _ -> "specify operation retries. Default: 1."
| RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 5."
and CosmosArguments(c: Configuration, p: ParseResults<CosmosParameters>) =
let connection = p.GetResult(Connection, fun () -> c.CosmosConnection)
let connector =
let timeout = p.GetResult(Timeout, 5.) |> TimeSpan.FromSeconds
let retries = p.GetResult(Retries, 1)
let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 5.) |> TimeSpan.FromSeconds
let mode = p.TryGetResult ConnectionMode
Equinox.CosmosStore.CosmosStoreConnector(Equinox.CosmosStore.Discovery.ConnectionString connection, timeout, retries, maxRetryWaitTime, ?mode = mode)
member val Verbose = p.Contains Verbose
member val Connection = connection
member val Database = p.GetResult(Database, fun () -> c.CosmosDatabase)
member val Container = p.GetResult(Container, fun () -> c.CosmosContainer)
member val private Views = p.GetResult(Views, fun () -> c.CosmosViews)
member x.Connect() = connector.Connect(x.Database, x.Container, x.Views)

type [<NoEquality; NoComparison>] CosmosSourceParameters =
| [<AltCommandLine "-V"; Unique>] Verbose
| [<AltCommandLine "-m">] ConnectionMode of Microsoft.Azure.Cosmos.ConnectionMode
| [<AltCommandLine "-s">] Connection of string
| [<AltCommandLine "-d">] Database of string
| [<AltCommandLine "-c">] Container of string
| [<AltCommandLine "-v">] Views of string
| [<AltCommandLine "-o">] Timeout of float
| [<AltCommandLine "-r">] Retries of int
| [<AltCommandLine "-rt">] RetriesWaitTime of float

| [<AltCommandLine "-lcs"; Unique>] LeaseContainerSuffix of string
| [<AltCommandLine "-a"; Unique>] LeaseContainer of string
| [<AltCommandLine "-Z"; Unique>] FromTail
| [<AltCommandLine "-b"; Unique>] MaxItems of int
| [<AltCommandLine "-l"; Unique>] LagFreqM of float
interface IArgParserTemplate with
member p.Usage = p |> function
| Verbose -> "request Verbose Logging from ChangeFeedProcessor and Store. Default: off"
| ConnectionMode _ -> "override the connection mode. Default: Direct."
| Connection _ -> $"specify a connection string for a Cosmos account. (optional if environment variable {CONNECTION} specified)"
| Database _ -> $"specify a database name for store. (optional if environment variable {DATABASE} specified)"
| Container _ -> $"specify a container name for store. (optional if environment variable {CONTAINER} specified)"
| Views _ -> $"specify a container name for views container. (optional if environment variable {VIEWS} specified)"
| Timeout _ -> "specify operation timeout in seconds. Default: 5."
| Retries _ -> "specify operation retries. Default: 1."
| RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 5."

| LeaseContainerSuffix _ -> "specify Container Name suffix for Leases container. Default: `-aux`."
| LeaseContainer _ -> "specify Container Name (in this [target] Database) for Leases container. Default: `<Container>` + `-aux`."
| FromTail -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event."
| MaxItems _ -> "maximum item count to request from the feed. Default: unlimited."
| LagFreqM _ -> "specify frequency (minutes) to dump lag stats. Default: 1"
and CosmosSourceArguments(c: Configuration, p: ParseResults<CosmosSourceParameters>) =
let connection = p.GetResult(Connection, fun () -> c.CosmosConnection)
let connector =
let timeout = p.GetResult(Timeout, 5.) |> TimeSpan.FromSeconds
let retries = p.GetResult(Retries, 1)
let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 5.) |> TimeSpan.FromSeconds
let mode = p.TryGetResult ConnectionMode
Equinox.CosmosStore.CosmosStoreConnector(Equinox.CosmosStore.Discovery.ConnectionString connection, timeout, retries, maxRetryWaitTime, ?mode = mode)
let database = p.GetResult(Database, fun () -> c.CosmosDatabase)
let containerId = p.GetResult(Container, fun () -> c.CosmosContainer)
let viewsContainerId = p.GetResult(Views, fun () -> c.CosmosViews)

let suffix = p.GetResult(LeaseContainerSuffix, "-aux")
let leaseContainerId = p.GetResult(LeaseContainer, containerId + suffix)

let fromTail = p.Contains FromTail
let maxItems = p.TryGetResult MaxItems
let tailSleepInterval = TimeSpan.FromMilliseconds 500.
let lagFrequency = p.GetResult(LagFreqM, 1.) |> TimeSpan.FromMinutes

member val IsLagFreqSpecified = p.Contains LagFreqM
member val Verbose = p.Contains Verbose
member val Connection = connection
member val Database = database
member _.ConnectWithFeed(?lsc) = connector.ConnectWithFeed(database, containerId, viewsContainerId, leaseContainerId, ?logSnapshotConfig = lsc)
member _.ConnectWithFeedReadOnly(auxClient, auxDatabase, auxContainerId) =
connector.ConnectWithFeedReadOnly(database, containerId, viewsContainerId, auxClient, auxDatabase, auxContainerId)
member val MonitoringParams = fromTail, maxItems, tailSleepInterval, lagFrequency
16 changes: 0 additions & 16 deletions propulsion-indexer/App/Configuration.fs

This file was deleted.

33 changes: 33 additions & 0 deletions propulsion-indexer/App/CosmosDumpSource.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
namespace App

open FSharp.Control
open Propulsion.Feed
open System

/// <summary>Parses CR separated file with items dumped from a Cosmos Container containing Equinox Items</summary>
/// <remarks>One way to generate one of those is via the cosmic tool at https://github.com/creyke/Cosmic
/// dotnet tool install -g cosmic
/// # then connect/select db per https://github.com/creyke/Cosmic#basic-usage
/// cosmic query 'select * from c order by c._ts' > file.out </remarks>
type [<Sealed; AbstractClass>] CosmosDumpSource private () =

static member Start(log, statsInterval, filePath, skip, parseFeedDoc, sink, ?truncateTo) =
let isNonCommentLine line = System.Text.RegularExpressions.Regex.IsMatch(line, "^\s*#") |> not
let truncate = match truncateTo with Some count -> Seq.truncate count | None -> id
let lines = Seq.append (System.IO.File.ReadLines filePath |> truncate) (Seq.singleton null) // Add a trailing EOF sentinel so checkpoint positions can be line numbers even when finished reading
let mapLine isEof lineNo (line: string) = // TODO inline in F#7 when seq exprs support try/withs
try let items = if isEof then Array.empty
else System.Text.Json.JsonDocument.Parse line |> parseFeedDoc |> Seq.toArray
struct (TimeSpan.Zero, ({ items = items; isTail = isEof; checkpoint = Position.parse lineNo }: Core.Batch<_>))
with e -> exn($"File Parse error on L{lineNo}: '{line.Substring(0, 200)}'", e) |> raise
let crawl _ _ _ = TaskSeq.ofSeq <| seq {
for i, line in lines |> Seq.indexed do
let isEof = line = null
if isEof || (i >= skip && isNonCommentLine line) then
let lineNo = int64 i + 1L
mapLine isEof lineNo line }
let source =
let checkpointStore = Equinox.MemoryStore.VolatileStore()
let checkpoints = ReaderCheckpoint.MemoryStore.create log ("consumerGroup", TimeSpan.FromMinutes 1) checkpointStore
Propulsion.Feed.Core.SinglePassFeedSource(log, statsInterval, SourceId.parse filePath, crawl, checkpoints, sink, string)
source.Start(fun _ct -> task { return [| TrancheId.parse "0" |] })
Loading

0 comments on commit d63f693

Please sign in to comment.