From 58e491bcc4d19ebe3b8b4bd96a8a30a3b977c3af Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Thu, 14 Dec 2023 03:18:05 +0000 Subject: [PATCH] refactor: Tidy Argu processing (#135) --- CHANGELOG.md | 1 + README.md | 5 +- equinox-shipping/Watchdog.Lambda/Function.fs | 2 +- equinox-shipping/Watchdog/Args.fs | 39 +++++------ equinox-shipping/Watchdog/Program.fs | 19 +++--- equinox-shipping/Watchdog/SourceArgs.fs | 28 ++++---- equinox-shipping/Watchdog/Watchdog.fsproj | 2 +- equinox-testbed/Program.fs | 21 +++--- equinox-testbed/Storage.fs | 11 ++- equinox-testbed/Testbed.fsproj | 2 +- feed-consumer/FeedConsumer.fsproj | 2 +- feed-consumer/Program.fs | 42 ++++++------ feed-source/FeedApi/FeedApi.fsproj | 2 +- feed-source/FeedApi/Program.fs | 29 ++++---- periodic-ingester/PeriodicIngester.fsproj | 2 +- periodic-ingester/Program.fs | 38 +++++------ propulsion-archiver/Archiver.fsproj | 2 +- propulsion-archiver/Program.fs | 34 ++++------ propulsion-consumer/Consumer.fsproj | 2 +- propulsion-consumer/Program.fs | 20 +++--- propulsion-hotel/Reactor/Args.fs | 5 +- propulsion-hotel/Reactor/Program.fs | 17 +++-- propulsion-hotel/Reactor/Reactor.fsproj | 2 +- propulsion-hotel/Reactor/SourceArgs.fs | 16 ++--- propulsion-indexer/App/Configuration.fs | 5 +- propulsion-indexer/Indexer/Indexer.fsproj | 2 +- propulsion-indexer/Indexer/Program.fs | 29 ++++---- propulsion-projector/Args.fs | 21 +++--- propulsion-projector/Program.fs | 21 +++--- propulsion-projector/Projector.fsproj | 2 +- propulsion-projector/SourceArgs.fs | 28 ++++---- propulsion-pruner/Program.fs | 32 ++++----- propulsion-pruner/Pruner.fsproj | 2 +- propulsion-reactor/Args.fs | 19 +++--- propulsion-reactor/Program.fs | 45 ++++++------ propulsion-reactor/Reactor.fsproj | 2 +- propulsion-reactor/SourceArgs.fs | 64 ++++++++--------- propulsion-summary-consumer/Program.fs | 28 ++++---- .../SummaryConsumer.fsproj | 2 +- propulsion-sync/Program.fs | 68 +++++++++---------- propulsion-sync/Sync.fsproj | 2 +- propulsion-tracking-consumer/Program.fs | 28 ++++---- .../TrackingConsumer.fsproj | 2 +- 43 files changed, 342 insertions(+), 403 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 354b41c07..22adc6ef3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Changed - Target `Equinox` v `4.0.0-rc.14.5`, `Propulsion` v `3.0.0-rc.9.11`, `FsCodec` v `3.0.0-rc.14.1` [#131](https://github.com/jet/dotnet-templates/pull/131) +- Target `Argu` v `6.0.14` [#135](https://github.com/jet/dotnet-templates/pull/135) ### Removed diff --git a/README.md b/README.md index c12677b88..46af67062 100644 --- a/README.md +++ b/README.md @@ -353,10 +353,9 @@ let main argv = try let args = Args.parse EnvVar.tryGet argv try Log.Logger <- LoggerConfiguration().Configure(verbose=args.Verbose).CreateLogger() try run args |> Async.RunSynchronously; 0 - with e when not (e :? MissingArg) -> Log.Fatal(e, "Exiting"); 2 + with e when not (e :? System.Threading.Tasks.TaskCanceledException) -> Log.Fatal(e, "Exiting"); 2 finally Log.CloseAndFlush() - with MissingArg msg -> eprintfn "%s" msg; 1 - | :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1 + with :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1 | e -> eprintf "Exception %s" e.Message; 1 ``` diff --git a/equinox-shipping/Watchdog.Lambda/Function.fs b/equinox-shipping/Watchdog.Lambda/Function.fs index 616c6486d..827362b30 100644 --- a/equinox-shipping/Watchdog.Lambda/Function.fs +++ b/equinox-shipping/Watchdog.Lambda/Function.fs @@ -13,7 +13,7 @@ open System type Configuration(appName, ?tryGet) = let envVarTryGet = Environment.GetEnvironmentVariable >> Option.ofObj let tryGet = defaultArg tryGet envVarTryGet - let get key = match tryGet key with Some value -> value | None -> failwithf "Missing Argument/Environment Variable %s" key + let get key = match tryGet key with Some value -> value | None -> failwithf $"Missing Argument/Environment Variable %s{key}" member _.DynamoRegion = tryGet Propulsion.DynamoStore.Lambda.Args.Dynamo.REGION member _.DynamoServiceUrl = get Propulsion.DynamoStore.Lambda.Args.Dynamo.SERVICE_URL diff --git a/equinox-shipping/Watchdog/Args.fs b/equinox-shipping/Watchdog/Args.fs index 7fcdb6214..cdf1dc58d 100644 --- a/equinox-shipping/Watchdog/Args.fs +++ b/equinox-shipping/Watchdog/Args.fs @@ -4,9 +4,6 @@ module Args open System open FSharp.Control -exception MissingArg of message: string with override this.Message = this.message -let missingArg msg = raise (MissingArg msg) - let [] REGION = "EQUINOX_DYNAMO_REGION" let [] SERVICE_URL = "EQUINOX_DYNAMO_SERVICE_URL" let [] ACCESS_KEY = "EQUINOX_DYNAMO_ACCESS_KEY_ID" @@ -16,20 +13,20 @@ let [] INDEX_TABLE = "EQUINOX_DYNAMO_TABLE_INDEX" type Configuration(tryGet: string -> string option) = + let get key = match tryGet key with Some value -> value | None -> failwith $"Missing Argument/Environment Variable %s{key}" member val tryGet = tryGet - member _.get key = match tryGet key with Some value -> value | None -> missingArg $"Missing Argument/Environment Variable %s{key}" - member x.CosmosConnection = x.get "EQUINOX_COSMOS_CONNECTION" - member x.CosmosDatabase = x.get "EQUINOX_COSMOS_DATABASE" - member x.CosmosContainer = x.get "EQUINOX_COSMOS_CONTAINER" + member _.CosmosConnection = get "EQUINOX_COSMOS_CONNECTION" + member _.CosmosDatabase = get "EQUINOX_COSMOS_DATABASE" + member _.CosmosContainer = get "EQUINOX_COSMOS_CONTAINER" - member x.DynamoServiceUrl = x.get SERVICE_URL - member x.DynamoAccessKey = x.get ACCESS_KEY - member x.DynamoSecretKey = x.get SECRET_KEY - member x.DynamoTable = x.get TABLE - member x.DynamoRegion = x.tryGet REGION + member _.DynamoServiceUrl = get SERVICE_URL + member _.DynamoAccessKey = get ACCESS_KEY + member _.DynamoSecretKey = get SECRET_KEY + member _.DynamoTable = get TABLE + member _.DynamoRegion = tryGet REGION - member x.EventStoreConnection = x.get "EQUINOX_ES_CONNECTION" + member _.EventStoreConnection = get "EQUINOX_ES_CONNECTION" member _.MaybeEventStoreConnection = tryGet "EQUINOX_ES_CONNECTION" member _.MaybeEventStoreCredentials = tryGet "EQUINOX_ES_CREDENTIALS" @@ -50,7 +47,7 @@ module Cosmos = | [] RetriesWaitTime of float interface IArgParserTemplate with member p.Usage = p |> function - | Verbose _ -> "request verbose logging." + | Verbose -> "request verbose logging." | ConnectionMode _ -> "override the connection mode. Default: Direct." | Connection _ -> "specify a connection string for a Cosmos account. (optional if environment variable EQUINOX_COSMOS_CONNECTION specified)" | Database _ -> "specify a database name for Cosmos store. (optional if environment variable EQUINOX_COSMOS_DATABASE specified)" @@ -60,15 +57,15 @@ module Cosmos = | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds (default: 5)" type Arguments(c: Configuration, p: ParseResults) = - let connection = p.TryGetResult Connection |> Option.defaultWith (fun () -> c.CosmosConnection) + let connection = p.GetResult(Connection, fun () -> c.CosmosConnection) let discovery = Equinox.CosmosStore.Discovery.ConnectionString connection let mode = p.TryGetResult ConnectionMode let timeout = p.GetResult(Timeout, 5.) |> TimeSpan.FromSeconds let retries = p.GetResult(Retries, 1) let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 5.) |> TimeSpan.FromSeconds let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime, ?mode = mode) - let database = p.TryGetResult Database |> Option.defaultWith (fun () -> c.CosmosDatabase) - let container = p.TryGetResult Container |> Option.defaultWith (fun () -> c.CosmosContainer) + let database = p.GetResult(Database, fun () -> c.CosmosDatabase) + let container = p.GetResult(Container, fun () -> c.CosmosContainer) member val Verbose = p.Contains Verbose member _.Connect() = connector.ConnectContext(database, container) @@ -102,9 +99,9 @@ module Dynamo = | Some systemName -> Choice1Of2 systemName | None -> - let serviceUrl = p.TryGetResult ServiceUrl |> Option.defaultWith (fun () -> c.DynamoServiceUrl) - let accessKey = p.TryGetResult AccessKey |> Option.defaultWith (fun () -> c.DynamoAccessKey) - let secretKey = p.TryGetResult SecretKey |> Option.defaultWith (fun () -> c.DynamoSecretKey) + let serviceUrl = p.GetResult(ServiceUrl, fun () -> c.DynamoServiceUrl) + let accessKey = p.GetResult(AccessKey, fun () -> c.DynamoAccessKey) + let secretKey = p.GetResult(SecretKey, fun () -> c.DynamoSecretKey) Choice2Of2 (serviceUrl, accessKey, secretKey) let connector = let timeout = p.GetResult(RetriesTimeoutS, 5.) |> TimeSpan.FromSeconds let retries = p.GetResult(Retries, 1) @@ -113,7 +110,7 @@ module Dynamo = Equinox.DynamoStore.DynamoStoreConnector(systemName, timeout, retries) | Choice2Of2 (serviceUrl, accessKey, secretKey) -> Equinox.DynamoStore.DynamoStoreConnector(serviceUrl, accessKey, secretKey, timeout, retries) - let table = p.TryGetResult Table |> Option.defaultWith (fun () -> c.DynamoTable) + let table = p.GetResult(Table, fun () -> c.DynamoTable) member _.Connect() = connector.CreateClient().CreateContext("Main", table) type [] diff --git a/equinox-shipping/Watchdog/Program.fs b/equinox-shipping/Watchdog/Program.fs index 032a2dc08..557722eef 100644 --- a/equinox-shipping/Watchdog/Program.fs +++ b/equinox-shipping/Watchdog/Program.fs @@ -6,7 +6,7 @@ open System module Args = open Argu - [] + [] type Parameters = | [] Verbose | [] ProcessorName of string @@ -17,9 +17,9 @@ module Args = | [] IdleDelayMs of int | [] WakeForResults - | [] Cosmos of ParseResults - | [] Dynamo of ParseResults - | [] Esdb of ParseResults + | [] Cosmos of ParseResults + | [] Dynamo of ParseResults + | [] Esdb of ParseResults interface IArgParserTemplate with member p.Usage = p |> function | Verbose -> "request Verbose Logging. Default: off." @@ -29,7 +29,7 @@ module Args = | TimeoutS _ -> "Timeout (in seconds) before Watchdog should step in to process transactions. Default: 10." | IdleDelayMs _ -> "Idle delay for scheduler. Default 1000ms" - | WakeForResults _ -> "Wake for all results to provide optimal throughput" + | WakeForResults -> "Wake for all results to provide optimal throughput" | Cosmos _ -> "specify CosmosDB parameters." | Dynamo _ -> "specify DynamoDB input parameters" @@ -55,7 +55,7 @@ module Args = | Cosmos a -> Choice1Of3 <| SourceArgs.Cosmos.Arguments(c, a) | Dynamo a -> Choice2Of3 <| SourceArgs.Dynamo.Arguments(c, a) | Esdb a -> Choice3Of3 <| SourceArgs.Esdb.Arguments(c, a) - | a -> Args.missingArg $"Unexpected Store subcommand %A{a}" + | a -> failwith $"Unexpected Store subcommand %A{a}" member x.VerboseStore = match x.Store with | Choice1Of3 s -> s.Verbose | Choice2Of3 s -> s.Verbose @@ -128,8 +128,7 @@ let main argv = try let args = Args.parse EnvVar.tryGet argv try Log.Logger <- LoggerConfiguration().Configure(verbose = args.Verbose).CreateLogger() try run args |> Async.RunSynchronously; 0 - with e when not (e :? Args.MissingArg) && not (e :? System.Threading.Tasks.TaskCanceledException) -> Log.Fatal(e, "Exiting"); 2 + with e when not (e :? System.Threading.Tasks.TaskCanceledException) -> Log.Fatal(e, "Exiting"); 2 finally Log.CloseAndFlush() - with Args.MissingArg msg -> eprintfn "%s" msg; 1 - | :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1 - | e -> eprintf "Exception %s" e.Message; 1 + with :? Argu.ArguParseException as e -> eprintfn $"%s{e.Message}"; 1 + | e -> eprintf $"Exception %s{e.Message}"; 1 diff --git a/equinox-shipping/Watchdog/SourceArgs.fs b/equinox-shipping/Watchdog/SourceArgs.fs index 506bffb93..9490363dc 100644 --- a/equinox-shipping/Watchdog/SourceArgs.fs +++ b/equinox-shipping/Watchdog/SourceArgs.fs @@ -42,14 +42,14 @@ module Cosmos = | LagFreqM _ -> "specify frequency (minutes) to dump lag stats. Default: 1" type Arguments(c: Args.Configuration, p: ParseResults) = - let discovery = p.TryGetResult Connection |> Option.defaultWith (fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString + let discovery = p.GetResult(Connection, fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString let mode = p.TryGetResult ConnectionMode let timeout = p.GetResult(Timeout, 5.) |> TimeSpan.FromSeconds let retries = p.GetResult(Retries, 9) let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 30.) |> TimeSpan.FromSeconds let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime, ?mode = mode) - let database = p.TryGetResult Database |> Option.defaultWith (fun () -> c.CosmosDatabase) - let containerId = p.TryGetResult Container |> Option.defaultWith (fun () -> c.CosmosContainer) + let database = p.GetResult(Database, fun () -> c.CosmosDatabase) + let containerId = p.GetResult(Container, fun () -> c.CosmosContainer) let leaseContainerId = p.GetResult(LeaseContainer, containerId + "-aux") let fromTail = p.Contains FromTail let maxItems = p.TryGetResult MaxItems @@ -99,9 +99,9 @@ module Dynamo = | Some systemName -> Choice1Of2 systemName | None -> - let serviceUrl = p.TryGetResult ServiceUrl |> Option.defaultWith (fun () -> c.DynamoServiceUrl) - let accessKey = p.TryGetResult AccessKey |> Option.defaultWith (fun () -> c.DynamoAccessKey) - let secretKey = p.TryGetResult SecretKey |> Option.defaultWith (fun () -> c.DynamoSecretKey) + let serviceUrl = p.GetResult(ServiceUrl, fun () -> c.DynamoServiceUrl) + let accessKey = p.GetResult(AccessKey, fun () -> c.DynamoAccessKey) + let secretKey = p.GetResult(SecretKey, fun () -> c.DynamoSecretKey) Choice2Of2 (serviceUrl, accessKey, secretKey) let connector = let timeout = p.GetResult(RetriesTimeoutS, 60.) |> TimeSpan.FromSeconds let retries = p.GetResult(Retries, 9) @@ -110,9 +110,9 @@ module Dynamo = Equinox.DynamoStore.DynamoStoreConnector(systemName, timeout, retries) | Choice2Of2 (serviceUrl, accessKey, secretKey) -> Equinox.DynamoStore.DynamoStoreConnector(serviceUrl, accessKey, secretKey, timeout, retries) - let table = p.TryGetResult Table |> Option.defaultWith (fun () -> c.DynamoTable) + let table = p.GetResult(Table, fun () -> c.DynamoTable) let indexSuffix = p.GetResult(IndexSuffix, "-index") - let indexTable = p.TryGetResult IndexTable |> Option.orElseWith (fun () -> c.DynamoIndexTable) |> Option.defaultWith (fun () -> table + indexSuffix) + let indexTable = p.GetResult(IndexTable, fun () -> defaultArg c.DynamoIndexTable (table + indexSuffix)) let fromTail = p.Contains FromTail let tailSleepInterval = TimeSpan.FromMilliseconds 500. let batchSizeCutoff = p.GetResult(MaxItems, 100) @@ -142,9 +142,9 @@ module Esdb = Propulsion.Feed.ReaderCheckpoint.CosmosStore.create Store.Metrics.log (consumerGroup, checkpointInterval) (context, cache) | Store.Config.Dynamo (context, cache) -> Propulsion.Feed.ReaderCheckpoint.DynamoStore.create Store.Metrics.log (consumerGroup, checkpointInterval) (context, cache) - | Store.Config.Memory _ | Store.Config.Esdb _ -> Args.missingArg "Unexpected store type" + | Store.Config.Memory _ | Store.Config.Esdb _ -> failwith "Unexpected store type" - type [] Parameters = + type [] Parameters = | [] Verbose | [] Connection of string | [] Credentials of string @@ -154,8 +154,8 @@ module Esdb = | [] MaxItems of int | [] FromTail - | [] Cosmos of ParseResults - | [] Dynamo of ParseResults + | [] Cosmos of ParseResults + | [] Dynamo of ParseResults interface IArgParserTemplate with member p.Usage = p |> function | Verbose -> "Include low level Store logging." @@ -174,7 +174,7 @@ module Esdb = let startFromTail = p.Contains FromTail let maxItems = p.GetResult(MaxItems, 100) let tailSleepInterval = TimeSpan.FromSeconds 0.5 - let connectionStringLoggable = p.TryGetResult Connection |> Option.defaultWith (fun () -> c.EventStoreConnection) + let connectionStringLoggable = p.GetResult(Connection, fun () -> c.EventStoreConnection) let credentials = p.TryGetResult Credentials |> Option.orElseWith (fun () -> c.MaybeEventStoreCredentials) let discovery = match credentials with Some x -> String.Join(";", connectionStringLoggable, x) | None -> connectionStringLoggable |> Equinox.EventStoreDb.Discovery.ConnectionString @@ -193,7 +193,7 @@ module Esdb = match p.GetSubCommand() with | Cosmos cosmos -> Args.TargetStoreArgs.Cosmos (Args.Cosmos.Arguments(c, cosmos)) | Dynamo dynamo -> Args.TargetStoreArgs.Dynamo (Args.Dynamo.Arguments(c, dynamo)) - | _ -> Args.missingArg "Must specify `cosmos` or `dynamo` target store when source is `esdb`" + | _ -> p.Raise "Must specify `cosmos` or `dynamo` target store when source is `esdb`" member _.MonitoringParams(log: ILogger) = log.Information("EventStoreSource MaxItems {maxItems} ", maxItems) diff --git a/equinox-shipping/Watchdog/Watchdog.fsproj b/equinox-shipping/Watchdog/Watchdog.fsproj index 16f70c74f..9183ddae6 100644 --- a/equinox-shipping/Watchdog/Watchdog.fsproj +++ b/equinox-shipping/Watchdog/Watchdog.fsproj @@ -17,7 +17,7 @@ - + diff --git a/equinox-testbed/Program.fs b/equinox-testbed/Program.fs index e21f812bb..165302f2b 100644 --- a/equinox-testbed/Program.fs +++ b/equinox-testbed/Program.fs @@ -10,13 +10,13 @@ open System.Threading [] module Args = - type [] + type [] Parameters = | [] Verbose | [] VerboseConsole | [] LocalSeq | [] LogFile of string - | [] Run of ParseResults + | [] Run of ParseResults interface IArgParserTemplate with member p.Usage = p |> function | Verbose -> "Include low level logging regarding specific test runs." @@ -24,7 +24,7 @@ module Args = | LocalSeq -> "Configures writing to a local Seq endpoint at http://localhost:5341, see https://getseq.net" | LogFile _ -> "specify a log file to write the result breakdown into. Default: TestbedTemplate.log." | Run _ -> "Run a load test" - and [] + and [] TestParameters = | [] Name of Tests.Test | [] Size of int @@ -36,13 +36,13 @@ module Args = | [] ErrorCutoff of int64 | [] ReportIntervalS of int //#if (memoryStore || (!cosmos && !eventStore)) - | [] Memory of ParseResults + | [] Memory of ParseResults //#endif //#if eventStore - | [] Es of ParseResults + | [] Es of ParseResults //#endif //#if cosmos - | [] Cosmos of ParseResults + | [] Cosmos of ParseResults //#endif interface IArgParserTemplate with member p.Usage = p |> function @@ -99,13 +99,13 @@ module Args = storeLog, Storage.Cosmos.config (x.Cache, x.Unfolds, x.BatchSize) (Storage.Cosmos.Arguments(c, sargs)) //#endif #if ((!cosmos && !eventStore) || (cosmos && eventStore)) - | _ -> Storage.missingArg "Please identify a valid store: memory, es, cosmos" + | _ -> p.Raise "Please identify a valid store: memory, es, cosmos" #endif #if eventStore - | _ -> Storage.missingArg "Please identify a valid store: memory, es" + | _ -> p.Raise "Please identify a valid store: memory, es" #endif #if cosmos - | _ -> Storage.missingArg "Please identify a valid store: memory, cosmos" + | _ -> p.Raise "Please identify a valid store: memory, cosmos" #endif let createStoreLog verbose verboseConsole maybeSeqEndpoint = @@ -203,9 +203,8 @@ let main argv = let log = createDomainLog verbose verboseConsole maybeSeq let reportFilename = args.GetResult(LogFile, programName+".log") |> fun n -> System.IO.FileInfo(n).FullName LoadTest.run log (verbose, verboseConsole, maybeSeq) reportFilename (TestArguments(Storage.Configuration EnvVar.tryGet, rargs)) - | _ -> failwith "Please specify a valid subcommand :- run" + | _ -> args.Raise "Please specify a valid subcommand :- run" 0 with :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1 | :? Argu.ArguException as e -> eprintf "Argument parsing exception %s" e.Message; 1 - | Storage.MissingArg msg -> eprintfn "%s" msg; 1 | e -> eprintfn "%s" e.Message; 1 diff --git a/equinox-testbed/Storage.fs b/equinox-testbed/Storage.fs index 8a20f730b..07091835a 100644 --- a/equinox-testbed/Storage.fs +++ b/equinox-testbed/Storage.fs @@ -3,12 +3,9 @@ open Argu open System -exception MissingArg of message: string with override this.Message = this.message -let missingArg msg = raise (MissingArg msg) - type Configuration(tryGet: string -> string option) = - let get key = match tryGet key with Some value -> value | None -> missingArg $"Missing Argument/Environment Variable %s{key}" + let get key = match tryGet key with Some value -> value | None -> failwith $"Missing Argument/Environment Variable %s{key}" member _.CosmosConnection = get "EQUINOX_COSMOS_CONNECTION" member _.CosmosDatabase = get "EQUINOX_COSMOS_DATABASE" @@ -48,14 +45,14 @@ module Cosmos = | Database _ -> "specify a database name for store. (optional if environment variable EQUINOX_COSMOS_DATABASE specified)" | Container _ -> "specify a container name for store. (optional if environment variable EQUINOX_COSMOS_CONTAINER specified)" type Arguments(c: Configuration, p: ParseResults) = - let discovery = p.TryGetResult Connection |> Option.defaultWith (fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString + let discovery = p.GetResult(Connection, fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString let mode = p.TryGetResult ConnectionMode let timeout = p.GetResult(Timeout, 5.) |> TimeSpan.FromSeconds let retries = p.GetResult(Retries, 1) let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 5.) |> TimeSpan.FromSeconds let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime, ?mode = mode) - let database = p.TryGetResult Database |> Option.defaultWith (fun () -> c.CosmosDatabase) - let container = p.TryGetResult Container |> Option.defaultWith (fun () -> c.CosmosContainer) + let database = p.GetResult(Database, fun () -> c.CosmosDatabase) + let container = p.GetResult(Container, fun () -> c.CosmosContainer) member _.Connect(tipMaxEvents, queryMaxItems) = connector.ConnectContext("Main", database, container, tipMaxEvents, queryMaxItems) diff --git a/equinox-testbed/Testbed.fsproj b/equinox-testbed/Testbed.fsproj index 41424f81a..5833c989f 100644 --- a/equinox-testbed/Testbed.fsproj +++ b/equinox-testbed/Testbed.fsproj @@ -16,7 +16,7 @@ - + diff --git a/feed-consumer/FeedConsumer.fsproj b/feed-consumer/FeedConsumer.fsproj index d7eee791c..1d1a079ee 100644 --- a/feed-consumer/FeedConsumer.fsproj +++ b/feed-consumer/FeedConsumer.fsproj @@ -15,7 +15,7 @@ - + diff --git a/feed-consumer/Program.fs b/feed-consumer/Program.fs index 410f738d6..f050f161f 100644 --- a/feed-consumer/Program.fs +++ b/feed-consumer/Program.fs @@ -3,12 +3,9 @@ open Serilog open System -exception MissingArg of message: string with override this.Message = this.message -let missingArg msg = raise (MissingArg msg) - type Configuration(tryGet) = - let get key = match tryGet key with Some value -> value | None -> missingArg $"Missing Argument/Environment Variable %s{key}" + let get key = match tryGet key with Some value -> value | None -> failwith $"Missing Argument/Environment Variable %s{key}" member _.CosmosConnection = get "EQUINOX_COSMOS_CONNECTION" member _.CosmosDatabase = get "EQUINOX_COSMOS_DATABASE" @@ -20,7 +17,7 @@ type Configuration(tryGet) = module Args = open Argu - [] + [] type Parameters = | [] Verbose @@ -32,10 +29,10 @@ module Args = | [] FcsDop of int | [] TicketsDop of int - | [] Cosmos of ParseResults + | [] Cosmos of ParseResults interface IArgParserTemplate with member p.Usage = p |> function - | Verbose _ -> "request verbose logging." + | Verbose -> "request verbose logging." | Group _ -> "specify Api Consumer Group Id. (optional if environment variable API_CONSUMER_GROUP specified)" | SourceId _ -> "specify Api SourceId. Default: 'default'" | BaseUri _ -> "specify Api endpoint. (optional if environment variable API_BASE_URI specified)" @@ -45,12 +42,12 @@ module Args = | Cosmos _ -> "Cosmos Store parameters." and Arguments(c: Configuration, p: ParseResults) = member val Verbose = p.Contains Parameters.Verbose - member val GroupId = p.TryGetResult Group |> Option.defaultWith (fun () -> c.Group) - member val SourceId = p.GetResult(SourceId,"default") |> Propulsion.Feed.SourceId.parse - member val BaseUri = p.TryGetResult BaseUri |> Option.defaultWith (fun () -> c.BaseUri) |> Uri - member val MaxReadAhead = p.GetResult(MaxReadAhead,8) - member val FcsDop = p.TryGetResult FcsDop |> Option.defaultValue 4 - member val TicketsDop = p.TryGetResult TicketsDop |> Option.defaultValue 4 + member val GroupId = p.GetResult(Group, fun () -> c.Group) + member val SourceId = p.GetResult(SourceId, "default") |> Propulsion.Feed.SourceId.parse + member val BaseUri = p.GetResult(BaseUri, fun () -> c.BaseUri) |> Uri + member val MaxReadAhead = p.GetResult(MaxReadAhead, 8) + member val FcsDop = p.GetResult(FcsDop, 4) + member val TicketsDop = p.GetResult(TicketsDop, 4) member val StatsInterval = TimeSpan.FromMinutes 1. member val StateInterval = TimeSpan.FromMinutes 5. member val CheckpointInterval = TimeSpan.FromHours 1. @@ -58,8 +55,8 @@ module Args = member val Cosmos: CosmosArguments = match p.GetSubCommand() with | Cosmos cosmos -> CosmosArguments(c, cosmos) - | _ -> missingArg "Must specify cosmos" - and [] CosmosParameters = + | _ -> p.Raise "Must specify cosmos" + and [] CosmosParameters = | [] Verbose | [] ConnectionMode of Microsoft.Azure.Cosmos.ConnectionMode | [] Connection of string @@ -70,7 +67,7 @@ module Args = | [] RetriesWaitTime of float interface IArgParserTemplate with member p.Usage = p |> function - | Verbose _ -> "request verbose logging." + | Verbose -> "request verbose logging." | ConnectionMode _ -> "override the connection mode. Default: Direct." | Connection _ -> "specify a connection string for a Cosmos account. (optional if environment variable EQUINOX_COSMOS_CONNECTION specified)" | Database _ -> "specify a database name for Cosmos store. (optional if environment variable EQUINOX_COSMOS_DATABASE specified)" @@ -79,14 +76,14 @@ module Args = | Retries _ -> "specify operation retries (default: 9)." | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds (default: 30)" and CosmosArguments(c: Configuration, p: ParseResults) = - let discovery = p.TryGetResult Connection |> Option.defaultWith (fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString + let discovery = p.GetResult(Connection, fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString let mode = p.TryGetResult ConnectionMode let timeout = p.GetResult(Timeout, 30.) |> TimeSpan.FromSeconds let retries = p.GetResult(Retries, 9) let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 30.) |> TimeSpan.FromSeconds let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime, ?mode=mode) - let database = p.TryGetResult Database |> Option.defaultWith (fun () -> c.CosmosDatabase) - let container = p.TryGetResult Container |> Option.defaultWith (fun () -> c.CosmosContainer) + let database = p.GetResult(Database, fun () -> c.CosmosDatabase) + let container = p.GetResult(Container, fun () -> c.CosmosContainer) member val Verbose = p.Contains Verbose member _.Connect(maxEvents) = connector.ConnectContext("Main", database, container, maxEvents) @@ -130,8 +127,7 @@ let main argv = try let metrics = Sinks.equinoxAndPropulsionFeedConsumerMetrics (Sinks.tags AppName) args.SourceId Log.Logger <- LoggerConfiguration().Configure(args.Verbose).Sinks(metrics, args.Cosmos.Verbose).CreateLogger() try run args |> Async.RunSynchronously - with e when not (e :? MissingArg) -> Log.Fatal(e, "Exiting"); 2 + with e when not (e :? System.Threading.Tasks.TaskCanceledException) -> Log.Fatal(e, "Exiting"); 2 finally Log.CloseAndFlush() - with MissingArg msg -> eprintfn "%s" msg; 1 - | :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1 - | e -> eprintf "Exception %s" e.Message; 1 + with :? Argu.ArguParseException as e -> eprintfn $"%s{e.Message}"; 1 + | e -> eprintf $"Exception %s{e.Message}"; 1 diff --git a/feed-source/FeedApi/FeedApi.fsproj b/feed-source/FeedApi/FeedApi.fsproj index a95cf5cb1..357b2f78f 100644 --- a/feed-source/FeedApi/FeedApi.fsproj +++ b/feed-source/FeedApi/FeedApi.fsproj @@ -18,7 +18,7 @@ - + diff --git a/feed-source/FeedApi/Program.fs b/feed-source/FeedApi/Program.fs index fa2148771..173e2fa1c 100644 --- a/feed-source/FeedApi/Program.fs +++ b/feed-source/FeedApi/Program.fs @@ -1,15 +1,11 @@ module FeedSourceTemplate.Program -open Equinox.CosmosStore open Serilog open System -exception MissingArg of message: string with override this.Message = this.message -let missingArg msg = raise (MissingArg msg) - type Configuration(tryGet) = - let get key = match tryGet key with Some value -> value | None -> missingArg $"Missing Argument/Environment Variable %s{key}" + let get key = match tryGet key with Some value -> value | None -> failwith $"Missing Argument/Environment Variable %s{key}" member _.CosmosConnection = get "EQUINOX_COSMOS_CONNECTION" member _.CosmosDatabase = get "EQUINOX_COSMOS_DATABASE" @@ -18,10 +14,10 @@ type Configuration(tryGet) = module Args = open Argu - [] + [] type Parameters = | [] Verbose - | [] Cosmos of ParseResults + | [] Cosmos of ParseResults interface IArgParserTemplate with member a.Usage = match a with @@ -32,7 +28,7 @@ module Args = member val Cosmos: CosmosArguments = match p.GetSubCommand() with | Parameters.Cosmos cosmos -> CosmosArguments(config, cosmos) - | _ -> missingArg "Must specify cosmos" + | _ -> p.Raise "unexpected" and [] CosmosParameters = | [] Verbose | [] Connection of string @@ -44,7 +40,7 @@ module Args = | [] RetriesWaitTime of float interface IArgParserTemplate with member p.Usage = p |> function - | Verbose _ -> "request verbose logging." + | Verbose -> "request verbose logging." | ConnectionMode _ -> "override the connection mode. Default: Direct." | Connection _ -> "specify a connection string for a Cosmos account. (optional if environment variable EQUINOX_COSMOS_CONNECTION specified)" | Database _ -> "specify a database name for Cosmos store. (optional if environment variable EQUINOX_COSMOS_DATABASE specified)" @@ -53,18 +49,18 @@ module Args = | Retries _ -> "specify operation retries. Default: 9." | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 30." and CosmosArguments(c: Configuration, p: ParseResults) = - let discovery = p.TryGetResult Connection |> Option.defaultWith (fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString + let discovery = p.GetResult(Connection, fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString let mode = p.TryGetResult ConnectionMode let timeout = p.GetResult(Timeout, 5.) |> TimeSpan.FromSeconds let retries = p.GetResult(Retries, 9) let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 30.) |> TimeSpan.FromSeconds let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime, ?mode=mode) - let database = p.TryGetResult Database |> Option.defaultWith (fun () -> c.CosmosDatabase) - let container = p.TryGetResult Container |> Option.defaultWith (fun () -> c.CosmosContainer) + let database = p.GetResult(Database, fun () -> c.CosmosDatabase) + let container = p.GetResult(Container, fun () -> c.CosmosContainer) member val Verbose = p.Contains Verbose member _.Connect() = connector.ConnectContext("Main", database, container) - /// Parse the commandline; can throw MissingArg or Argu.ArguParseException in response to missing arguments and/or `-h`/`--help` args + /// Parse the commandline; can throw Argu.ArguParseException in response to missing arguments and/or `-h`/`--help` args let parse tryGetConfigValue argv = let programName = System.Reflection.Assembly.GetEntryAssembly().GetName().Name let parser = ArgumentParser.Create(programName=programName) @@ -112,8 +108,7 @@ let main argv = try let metrics = Sinks.equinoxMetricsOnly (Sinks.tags AppName) Log.Logger <- LoggerConfiguration().Configure(args.Verbose).Sinks(metrics, args.Cosmos.Verbose).CreateLogger() try run args; 0 - with e when not (e :? MissingArg) -> Log.Fatal(e, "Exiting"); 2 + with e -> Log.Fatal(e, "Exiting"); 2 finally Log.CloseAndFlush() - with MissingArg msg -> eprintfn "%s" msg; 1 - | :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1 - | e -> eprintf "Exception %s" e.Message; 1 + with :? Argu.ArguParseException as e -> eprintfn $"%s{e.Message}"; 1 + | e -> eprintf $"Exception %s{e.Message}"; 1 diff --git a/periodic-ingester/PeriodicIngester.fsproj b/periodic-ingester/PeriodicIngester.fsproj index 710f612e0..78e467fd7 100644 --- a/periodic-ingester/PeriodicIngester.fsproj +++ b/periodic-ingester/PeriodicIngester.fsproj @@ -16,7 +16,7 @@ - + diff --git a/periodic-ingester/Program.fs b/periodic-ingester/Program.fs index d3e5083fa..407922c4f 100644 --- a/periodic-ingester/Program.fs +++ b/periodic-ingester/Program.fs @@ -3,12 +3,9 @@ open Serilog open System -exception MissingArg of message: string with override this.Message = this.message -let missingArg msg = raise (MissingArg msg) - type Configuration(tryGet) = - let get key = match tryGet key with Some value -> value | None -> missingArg $"Missing Argument/Environment Variable %s{key}" + let get key = match tryGet key with Some value -> value | None -> failwith $"Missing Argument/Environment Variable %s{key}" member _.CosmosConnection = get "EQUINOX_COSMOS_CONNECTION" member _.CosmosDatabase = get "EQUINOX_COSMOS_DATABASE" @@ -23,7 +20,7 @@ module Args = open Argu - type [] Parameters = + type [] Parameters = | [] Verbose | [] PrometheusPort of int | [] GroupId of string @@ -31,10 +28,10 @@ module Args = | [] MaxReadAhead of int | [] TicketsDop of int - | [] Feed of ParseResults + | [] Feed of ParseResults interface IArgParserTemplate with member p.Usage = p |> function - | Verbose _ -> "request verbose logging." + | Verbose -> "request verbose logging." | GroupId _ -> "consumer group name. Default: 'default'" | PrometheusPort _ -> "port from which to expose a Prometheus /metrics endpoint. Default: off (optional if environment variable PROMETHEUS_PORT specified)" | MaxReadAhead _ -> "maximum number of batches to let processing get ahead of completion. Default: 8." @@ -53,24 +50,24 @@ module Args = member val Feed: FeedArguments = match p.GetSubCommand() with | Feed feed -> FeedArguments(c, feed) - | _ -> missingArg "Must specify feed" - and [] FeedParameters = + | _ -> p.Raise "Must specify feed" + and [] FeedParameters = | [] Group of string | [] BaseUri of string - | [] Cosmos of ParseResults + | [] Cosmos of ParseResults interface IArgParserTemplate with member p.Usage = p |> function | Group _ -> "specify Api Consumer Group Id. (optional if environment variable API_CONSUMER_GROUP specified)" | BaseUri _ -> "specify Api endpoint. (optional if environment variable API_BASE_URI specified)" | Cosmos _ -> "Cosmos Store parameters." and FeedArguments(c: Configuration, p: ParseResults) = - member val SourceId = p.TryGetResult Group |> Option.defaultWith (fun () -> c.Group) |> Propulsion.Feed.SourceId.parse - member val BaseUri = p.TryGetResult BaseUri |> Option.defaultWith (fun () -> c.BaseUri) |> Uri + member val SourceId = p.GetResult(Group, fun () -> c.Group) |> Propulsion.Feed.SourceId.parse + member val BaseUri = p.GetResult(BaseUri, fun () -> c.BaseUri) |> Uri member val RefreshInterval = TimeSpan.FromHours 1. member val Cosmos: CosmosArguments = match p.GetSubCommand() with | Cosmos cosmos -> CosmosArguments(c, cosmos) - | _ -> missingArg "Must specify cosmos" + | _ -> p.Raise "unexpected" and [] CosmosParameters = | [] Verbose | [] ConnectionMode of Microsoft.Azure.Cosmos.ConnectionMode @@ -82,7 +79,7 @@ module Args = | [] RetriesWaitTime of float interface IArgParserTemplate with member p.Usage = p |> function - | Verbose _ -> "request verbose logging." + | Verbose -> "request verbose logging." | ConnectionMode _ -> "override the connection mode. Default: Direct." | Connection _ -> "specify a connection string for a Cosmos account. (optional if environment variable EQUINOX_COSMOS_CONNECTION specified)" | Database _ -> "specify a database name for Cosmos store. (optional if environment variable EQUINOX_COSMOS_DATABASE specified)" @@ -91,14 +88,14 @@ module Args = | Retries _ -> "specify operation retries (default: 9)." | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds (default: 30)" and CosmosArguments(c: Configuration, p: ParseResults) = - let discovery = p.TryGetResult Connection |> Option.defaultWith (fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString + let discovery = p.GetResult(Connection, fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString let mode = p.TryGetResult ConnectionMode let timeout = p.GetResult(Timeout, 30.) |> TimeSpan.FromSeconds let retries = p.GetResult(Retries, 9) let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 30.) |> TimeSpan.FromSeconds let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime, ?mode=mode) - let database = p.TryGetResult Database |> Option.defaultWith (fun () -> c.CosmosDatabase) - let container = p.TryGetResult Container |> Option.defaultWith (fun () -> c.CosmosContainer) + let database = p.GetResult(Database, fun () -> c.CosmosDatabase) + let container = p.GetResult(Container, fun () -> c.CosmosContainer) member val Verbose = p.Contains Verbose member _.Connect() = connector.ConnectContext("Main", database, container) @@ -148,8 +145,7 @@ let main argv = try let metrics = Sinks.equinoxAndPropulsionFeedConsumerMetrics (Sinks.tags AppName) args.Feed.SourceId Log.Logger <- LoggerConfiguration().Configure(args.Verbose).Sinks(metrics, args.Feed.Cosmos.Verbose).CreateLogger() try run args |> Async.RunSynchronously; 0 - with e when not (e :? MissingArg) -> Log.Fatal(e, "Exiting"); 2 + with e when not (e :? System.Threading.Tasks.TaskCanceledException) -> Log.Fatal(e, "Exiting"); 2 finally Log.CloseAndFlush() - with MissingArg msg -> eprintfn "%s" msg; 1 - | :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1 - | e -> eprintf "Exception %s" e.Message; 1 + with :? Argu.ArguParseException as e -> eprintfn $"%s{e.Message}"; 1 + | e -> eprintf $"Exception %s{e.Message}"; 1 diff --git a/propulsion-archiver/Archiver.fsproj b/propulsion-archiver/Archiver.fsproj index b80dd2445..4967e9a88 100644 --- a/propulsion-archiver/Archiver.fsproj +++ b/propulsion-archiver/Archiver.fsproj @@ -13,7 +13,7 @@ - + diff --git a/propulsion-archiver/Program.fs b/propulsion-archiver/Program.fs index 667d74ee2..54d3c8898 100644 --- a/propulsion-archiver/Program.fs +++ b/propulsion-archiver/Program.fs @@ -4,12 +4,9 @@ open Propulsion.CosmosStore open Serilog open System -exception MissingArg of message: string with override this.Message = this.message -let missingArg msg = raise (MissingArg msg) - type Configuration(tryGet) = - let get key = match tryGet key with Some value -> value | None -> missingArg $"Missing Argument/Environment Variable %s{key}" + let get key = match tryGet key with Some value -> value | None -> failwith $"Missing Argument/Environment Variable %s{key}" member _.CosmosConnection = get "EQUINOX_COSMOS_CONNECTION" member _.CosmosDatabase = get "EQUINOX_COSMOS_DATABASE" @@ -18,7 +15,7 @@ type Configuration(tryGet) = module Args = open Argu - [] + [] type Parameters = | [] Verbose | [] SyncVerbose @@ -28,7 +25,7 @@ module Args = | [] MaxWriters of int | [] RuThreshold of float | [] MaxKib of int - | [] SrcCosmos of ParseResults + | [] SrcCosmos of ParseResults interface IArgParserTemplate with member p.Usage = p |> function | Verbose -> "request Verbose Logging. Default: off" @@ -53,14 +50,14 @@ module Args = member val Source: CosmosSourceArguments = match p.GetSubCommand() with | SrcCosmos cosmos -> CosmosSourceArguments(c, cosmos) - | _ -> missingArg "Must specify cosmos for SrcCosmos" + | _ -> p.Raise "Must specify cosmos for SrcCosmos" member x.DestinationArchive = x.Source.Archive member x.Parameters() = Log.Information("Archiving... {dop} writers, max {maxReadAhead} batches read ahead, max write batch {maxKib} KiB", x.MaxWriters, x.MaxReadAhead, x.MaxBytes / 1024) x.ProcessorName, x.MaxReadAhead, x.MaxWriters, x.MaxBytes - and [] CosmosSourceParameters = + and [] CosmosSourceParameters = | [] Verbose | [] FromTail | [] MaxItems of int @@ -75,7 +72,7 @@ module Args = | [] Retries of int | [] RetriesWaitTime of float - | [] DstCosmos of ParseResults + | [] DstCosmos of ParseResults interface IArgParserTemplate with member p.Usage = p |> function | Verbose -> "request Verbose Change Feed Processor Logging. Default: off" @@ -94,13 +91,13 @@ module Args = | DstCosmos _ -> "CosmosDb Sink parameters." and CosmosSourceArguments(c: Configuration, p: ParseResults) = - let discovery = p.TryGetResult CosmosSourceParameters.Connection |> Option.defaultWith (fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString + let discovery = p.GetResult(CosmosSourceParameters.Connection, fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString let mode = p.TryGetResult CosmosSourceParameters.ConnectionMode let timeout = p.GetResult(CosmosSourceParameters.Timeout, 5.) |> TimeSpan.FromSeconds let retries = p.GetResult(CosmosSourceParameters.Retries, 5) let maxRetryWaitTime = p.GetResult(CosmosSourceParameters.RetriesWaitTime, 30.) |> TimeSpan.FromSeconds let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime, ?mode = mode) - let database = p.TryGetResult CosmosSourceParameters.Database |> Option.defaultWith (fun () -> c.CosmosDatabase) + let database = p.GetResult(CosmosSourceParameters.Database, fun () -> c.CosmosDatabase) let fromTail = p.Contains CosmosSourceParameters.FromTail let maxItems = p.TryGetResult MaxItems let lagFrequency: TimeSpan = p.GetResult(LagFreqM, 1.) |> TimeSpan.FromMinutes @@ -114,7 +111,7 @@ module Args = member val Archive: CosmosSinkArguments = match p.GetSubCommand() with | DstCosmos cosmos -> CosmosSinkArguments(c, cosmos) - | _ -> missingArg "Must specify cosmos for Sink" + | _ -> p.Raise "Must specify cosmos for Sink" and [] CosmosSinkParameters = | [] ConnectionMode of Microsoft.Azure.Cosmos.ConnectionMode | [] Connection of string @@ -135,14 +132,14 @@ module Args = | Retries _ -> "specify operation retries. Default: 0." | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 5." and CosmosSinkArguments(c: Configuration, p: ParseResults) = - let discovery = p.TryGetResult Connection |> Option.defaultWith (fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString + let discovery = p.GetResult(Connection, fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString let mode = p.TryGetResult ConnectionMode let timeout = p.GetResult(Timeout, 5.) |> TimeSpan.FromSeconds let retries = p.GetResult(Retries, 0) let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 5.) |> TimeSpan.FromSeconds let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime, ?mode = mode) - let container = p.TryGetResult Container |> Option.defaultWith (fun () -> c.CosmosContainer) - let databaseId = p.TryGetResult Database |> Option.defaultWith (fun () -> c.CosmosDatabase) + let container = p.GetResult(Container, fun () -> c.CosmosContainer) + let databaseId = p.GetResult(Database, fun () -> c.CosmosDatabase) let leaseContainerId = p.TryGetResult LeaseContainer member val MaybeLeasesContainer: Microsoft.Azure.Cosmos.Container option = leaseContainerId |> Option.map (fun id -> connector.LeasesContainer(databaseId, id)) member x.Connect() = async { // while the default maxJsonBytes is 30000 - we are prepared to incur significant extra write RU charges in order to maximize packing @@ -196,8 +193,7 @@ let main argv = try let args = Args.parse EnvVar.tryGet argv try Log.Logger <- LoggerConfiguration().Configure(AppName, args.Verbose, args.SyncLogging).CreateLogger() try run args |> Async.RunSynchronously; 0 - with e when not (e :? MissingArg) && not (e :? System.Threading.Tasks.TaskCanceledException) -> Log.Fatal(e, "Exiting"); 2 + with e when not (e :? System.Threading.Tasks.TaskCanceledException) -> Log.Fatal(e, "Exiting"); 2 finally Log.CloseAndFlush() - with MissingArg msg -> eprintfn "%s" msg; 1 - | :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1 - | e -> eprintf "Exception %s" e.Message; 1 + with :? Argu.ArguParseException as e -> eprintfn $"%s{e.Message}"; 1 + | e -> eprintf $"Exception %s{e.Message}"; 1 diff --git a/propulsion-consumer/Consumer.fsproj b/propulsion-consumer/Consumer.fsproj index 70acd13a5..3a4f9fb59 100644 --- a/propulsion-consumer/Consumer.fsproj +++ b/propulsion-consumer/Consumer.fsproj @@ -14,7 +14,7 @@ - + diff --git a/propulsion-consumer/Program.fs b/propulsion-consumer/Program.fs index 66562c5c9..fca74bb74 100644 --- a/propulsion-consumer/Program.fs +++ b/propulsion-consumer/Program.fs @@ -3,12 +3,9 @@ open Serilog open System -exception MissingArg of message: string with override this.Message = this.message -let missingArg msg = raise (MissingArg msg) - type Configuration(tryGet) = - let get key = match tryGet key with Some value -> value | None -> missingArg $"Missing Argument/Environment Variable %s{key}" + let get key = match tryGet key with Some value -> value | None -> failwith $"Missing Argument/Environment Variable %s{key}" member _.CosmosConnection = get "EQUINOX_COSMOS_CONNECTION" member _.CosmosDatabase = get "EQUINOX_COSMOS_DATABASE" @@ -40,11 +37,11 @@ module Args = | LagFreqM _ -> "specify frequency (minutes) to dump lag stats. Default: off." | MaxDop _ -> "maximum number of items to process in parallel. Default: 8" - | Verbose _ -> "request verbose logging." + | Verbose -> "request verbose logging." type Arguments(c: Configuration, p: ParseResults) = - member val Broker = p.TryGetResult Broker |> Option.defaultWith (fun () -> c.Broker) - member val Topic = p.TryGetResult Topic |> Option.defaultWith (fun () -> c.Topic) - member val Group = p.TryGetResult Group |> Option.defaultWith (fun () -> c.Group) + member val Broker = p.GetResult(Broker, fun () -> c.Broker) + member val Topic = p.GetResult(Topic, fun () -> c.Topic) + member val Group = p.GetResult(Group, fun () -> c.Group) member val MaxInFlightBytes = p.GetResult(MaxInflightMb, 10.) * 1024. * 1024. |> int64 member val LagFrequency = p.TryGetResult LagFreqM |> Option.map TimeSpan.FromMinutes @@ -80,8 +77,7 @@ let main argv = try let args = Args.parse EnvVar.tryGet argv try Log.Logger <- LoggerConfiguration().Configure(verbose=args.Verbose).CreateLogger() try run args |> Async.RunSynchronously; 0 - with e when not (e :? MissingArg) -> Log.Fatal(e, "Exiting"); 2 + with e -> Log.Fatal(e, "Exiting"); 2 finally Log.CloseAndFlush() - with MissingArg msg -> eprintfn "%s" msg; 1 - | :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1 - | e -> eprintf "Exception %s" e.Message; 1 + with :? Argu.ArguParseException as e -> eprintfn $"%s{e.Message}"; 1 + | e -> eprintf $"Exception %s{e.Message}"; 1 diff --git a/propulsion-hotel/Reactor/Args.fs b/propulsion-hotel/Reactor/Args.fs index d740faa07..99d63d7c6 100644 --- a/propulsion-hotel/Reactor/Args.fs +++ b/propulsion-hotel/Reactor/Args.fs @@ -1,9 +1,6 @@ /// Commandline arguments and/or secrets loading specifications module Args -exception MissingArg of message: string with override this.Message = this.message -let missingArg msg = raise (MissingArg msg) - module Configuration = module Dynamo = @@ -24,7 +21,7 @@ module Configuration = type Configuration(tryGet: string -> string option) = member val tryGet = tryGet - member _.get key = match tryGet key with Some value -> value | None -> missingArg $"Missing Argument/Environment Variable %s{key}" + member _.get key = match tryGet key with Some value -> value | None -> failwith $"Missing Argument/Environment Variable %s{key}" member x.DynamoRegion = tryGet Configuration.Dynamo.REGION diff --git a/propulsion-hotel/Reactor/Program.fs b/propulsion-hotel/Reactor/Program.fs index bb8614e2a..ec50d9734 100644 --- a/propulsion-hotel/Reactor/Program.fs +++ b/propulsion-hotel/Reactor/Program.fs @@ -9,7 +9,7 @@ module Store = Domain.Store module Args = open Argu - [] + [] type Parameters = | [] Verbose | [] ProcessorName of string @@ -21,8 +21,8 @@ module Args = | [] IdleDelayMs of int | [] WakeForResults - | [] Dynamo of ParseResults - | [] Mdb of ParseResults + | [] Dynamo of ParseResults + | [] Mdb of ParseResults interface IArgParserTemplate with member p.Usage = p |> function | Verbose -> "request Verbose Logging. Default: off." @@ -33,7 +33,7 @@ module Args = | TimeoutS _ -> "Timeout (in seconds) before Watchdog should step in to process transactions. Default: 10." | IdleDelayMs _ -> "Idle delay for scheduler. Default 1000ms" - | WakeForResults _ -> "Wake for all results to provide optimal throughput" + | WakeForResults -> "Wake for all results to provide optimal throughput" | Dynamo _ -> "specify DynamoDB input parameters" | Mdb _ -> "specify MessageDb input parameters" @@ -56,7 +56,7 @@ module Args = match p.GetSubCommand() with | Dynamo a -> Choice1Of2 <| SourceArgs.Dynamo.Arguments(c, a) | Mdb a -> Choice2Of2 <| SourceArgs.Mdb.Arguments(c, a) - | a -> Args.missingArg $"Unexpected Store subcommand %A{a}" + | a -> failwith $"Unexpected Store subcommand %A{a}" member x.ConnectStoreAndSource(appName): Store.Config * (ILogger -> string -> SourceConfig) * (ILogger -> unit) = let cache = Equinox.Cache (appName, sizeMb = x.CacheSizeMb) match x.Store with @@ -124,8 +124,7 @@ let main argv = try let metrics = Sinks.equinoxAndPropulsionFeedMetrics (Sinks.tags AppName) args.ProcessorName Log.Logger <- LoggerConfiguration().Configure(args.Verbose).Sinks(metrics, args.Verbose).CreateLogger() try run args |> Async.RunSynchronously; 0 - with e when not (e :? Args.MissingArg) && not (e :? System.Threading.Tasks.TaskCanceledException) -> Log.Fatal(e, "Exiting"); 2 + with e when not (e :? System.Threading.Tasks.TaskCanceledException) -> Log.Fatal(e, "Exiting"); 2 finally Log.CloseAndFlush() - with Args.MissingArg msg -> eprintfn "%s" msg; 1 - | :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1 - | e -> eprintf "Exception %s" e.Message; 1 + with :? Argu.ArguParseException as e -> eprintfn $"%s{e.Message}"; 1 + | e -> eprintf $"Exception %s{e.Message}"; 1 diff --git a/propulsion-hotel/Reactor/Reactor.fsproj b/propulsion-hotel/Reactor/Reactor.fsproj index f1300e8a4..2c0389c57 100644 --- a/propulsion-hotel/Reactor/Reactor.fsproj +++ b/propulsion-hotel/Reactor/Reactor.fsproj @@ -17,7 +17,7 @@ - + diff --git a/propulsion-hotel/Reactor/SourceArgs.fs b/propulsion-hotel/Reactor/SourceArgs.fs index b9ef56341..c78b5d67e 100644 --- a/propulsion-hotel/Reactor/SourceArgs.fs +++ b/propulsion-hotel/Reactor/SourceArgs.fs @@ -48,9 +48,9 @@ module Dynamo = | Some systemName -> Choice1Of2 systemName | None -> - let serviceUrl = p.TryGetResult ServiceUrl |> Option.defaultWith (fun () -> c.DynamoServiceUrl) - let accessKey = p.TryGetResult AccessKey |> Option.defaultWith (fun () -> c.DynamoAccessKey) - let secretKey = p.TryGetResult SecretKey |> Option.defaultWith (fun () -> c.DynamoSecretKey) + let serviceUrl = p.GetResult(ServiceUrl, fun () -> c.DynamoServiceUrl) + let accessKey = p.GetResult(AccessKey, fun () -> c.DynamoAccessKey) + let secretKey = p.GetResult(SecretKey, fun () -> c.DynamoSecretKey) Choice2Of2 (serviceUrl, accessKey, secretKey) let connector = let timeout = p.GetResult(RetriesTimeoutS, 60.) |> TimeSpan.FromSeconds let retries = p.GetResult(Retries, 9) @@ -59,9 +59,9 @@ module Dynamo = Equinox.DynamoStore.DynamoStoreConnector(systemName, timeout, retries) | Choice2Of2 (serviceUrl, accessKey, secretKey) -> Equinox.DynamoStore.DynamoStoreConnector(serviceUrl, accessKey, secretKey, timeout, retries) - let table = p.TryGetResult Table |> Option.defaultWith (fun () -> c.DynamoTable) + let table = p.GetResult(Table, fun () -> c.DynamoTable) let indexSuffix = p.GetResult(IndexSuffix, "-index") - let indexTable = p.TryGetResult IndexTable |> Option.orElseWith (fun () -> c.DynamoIndexTable) |> Option.defaultWith (fun () -> table + indexSuffix) + let indexTable = p.GetResult(IndexTable, fun () -> defaultArg c.DynamoIndexTable (table + indexSuffix)) let client = lazy connector.CreateClient() let indexContext = lazy client.Value.CreateContext("Index", indexTable) let fromTail = p.Contains FromTail @@ -97,10 +97,10 @@ module Mdb = | TailSleepIntervalMs _ -> "How long to sleep in ms once the consumer has hit the tail (default: 100ms)" | FromTail -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." type Arguments(c: Args.Configuration, p: ParseResults) = - let writeConnStr = p.TryGetResult ConnectionString |> Option.defaultWith (fun () -> c.MdbConnectionString) + let writeConnStr = p.GetResult(ConnectionString, fun () -> c.MdbConnectionString) let readConnStr = p.TryGetResult ReadConnectionString |> Option.orElseWith (fun () -> c.MdbReadConnectionString) |> Option.defaultValue writeConnStr - let checkpointConnStr = p.TryGetResult CheckpointConnectionString |> Option.defaultValue writeConnStr - let schema = p.TryGetResult CheckpointSchema |> Option.defaultWith (fun () -> c.MdbSchema) + let checkpointConnStr = p.GetResult(CheckpointConnectionString, writeConnStr) + let schema = p.GetResult(CheckpointSchema, fun () -> c.MdbSchema) let fromTail = p.Contains FromTail let batchSize = p.GetResult(BatchSize, 1000) let tailSleepInterval = p.GetResult(TailSleepIntervalMs, 100) |> TimeSpan.FromMilliseconds diff --git a/propulsion-indexer/App/Configuration.fs b/propulsion-indexer/App/Configuration.fs index 57c072722..568a54671 100644 --- a/propulsion-indexer/App/Configuration.fs +++ b/propulsion-indexer/App/Configuration.fs @@ -1,8 +1,5 @@ module App.Args -exception MissingArg of message: string with override this.Message = this.message -let missingArg msg = raise (MissingArg msg) - let [] CONNECTION = "EQUINOX_COSMOS_CONNECTION" let [] DATABASE = "EQUINOX_COSMOS_DATABASE" let [] CONTAINER = "EQUINOX_COSMOS_CONTAINER" @@ -10,7 +7,7 @@ let [] VIEWS = "EQUINOX_COSMOS_VIEWS" type Configuration(tryGet: string -> string option) = - let get key = match tryGet key with Some value -> value | None -> missingArg $"Missing Argument/Environment Variable %s{key}" + let get key = match tryGet key with Some value -> value | None -> failwith $"Missing Argument/Environment Variable %s{key}" member _.CosmosConnection = get CONNECTION member _.CosmosDatabase = get DATABASE diff --git a/propulsion-indexer/Indexer/Indexer.fsproj b/propulsion-indexer/Indexer/Indexer.fsproj index 78dc99355..e25a0afb1 100644 --- a/propulsion-indexer/Indexer/Indexer.fsproj +++ b/propulsion-indexer/Indexer/Indexer.fsproj @@ -16,7 +16,7 @@ - + diff --git a/propulsion-indexer/Indexer/Program.fs b/propulsion-indexer/Indexer/Program.fs index 8252e8433..f5b2b59a2 100644 --- a/propulsion-indexer/Indexer/Program.fs +++ b/propulsion-indexer/Indexer/Program.fs @@ -7,16 +7,16 @@ open System module Args = open Argu - [] + [] type Parameters = | [] Verbose | [] PrometheusPort of int | [] ProcessorName of string | [] MaxReadAhead of int | [] MaxWriters of int - | [] Index of ParseResults - | [] Snapshot of ParseResults - | [] Sync of ParseResults + | [] Index of ParseResults + | [] Snapshot of ParseResults + | [] Sync of ParseResults interface IArgParserTemplate with member p.Usage = p |> function | Verbose -> "request Verbose Logging. Default: off." @@ -50,7 +50,7 @@ module Args = x.ActionLabel, x.ProcessorName, maxReadAhead, maxConcurrentProcessors) (x.ProcessorName, maxReadAhead, maxConcurrentProcessors) and [] Action = Index of CosmosArguments | Snapshot of CosmosArguments | Sync of SyncArguments - and [] SyncParameters = + and [] SyncParameters = | [] Connection of string | [] Database of string | [] Container of string @@ -59,7 +59,7 @@ module Args = | [] Retries of int | [] RetriesWaitTime of float | [] MaxKiB of int - | [] Source of ParseResults + | [] Source of ParseResults interface IArgParserTemplate with member p.Usage = p |> function | Connection _ -> "specify a connection string for the destination Cosmos account. Default: Same as Source" @@ -80,7 +80,7 @@ module Args = let retries = p.GetResult(SyncParameters.Retries, 1) let maxRetryWaitTime = p.GetResult(SyncParameters.RetriesWaitTime, 5) |> TimeSpan.FromSeconds let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime) - let database = p.TryGetResult SyncParameters.Database |> Option.defaultWith (fun () -> source.Database) + let database = p.GetResult(SyncParameters.Database, fun () -> source.Database) let container = p.GetResult SyncParameters.Container member val MaxBytes = p.GetResult(MaxKiB, 128) * 1024 member val Source = source @@ -121,15 +121,15 @@ module Args = | MaxItems _ -> "maximum item count to request from the feed. Default: unlimited." | LagFreqM _ -> "specify frequency (minutes) to dump lag stats. Default: 1" and CosmosArguments(c: Args.Configuration, p: ParseResults) = - let discovery = p.TryGetResult CosmosParameters.Connection |> Option.defaultWith (fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString + let discovery = p.GetResult(CosmosParameters.Connection, fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString let mode = p.TryGetResult ConnectionMode let timeout = p.GetResult(Timeout, 5) |> TimeSpan.FromSeconds let retries = p.GetResult(Retries, 1) let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 5) |> TimeSpan.FromSeconds let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime, ?mode = mode) - let database = p.TryGetResult Database |> Option.defaultWith (fun () -> c.CosmosDatabase) - let containerId = p.TryGetResult Container |> Option.defaultWith (fun () -> c.CosmosContainer) - let viewsContainerId = p.TryGetResult Views |> Option.defaultWith (fun () -> c.CosmosViews) + let database = p.GetResult(Database, fun () -> c.CosmosDatabase) + let containerId = p.GetResult(Container, fun () -> c.CosmosContainer) + let viewsContainerId = p.GetResult(Views, fun () -> c.CosmosViews) let leaseContainerId = p.GetResult(LeaseContainer, containerId + "-aux") let fromTail = p.Contains FromTail @@ -197,8 +197,7 @@ let main argv = try let metrics = Sinks.equinoxAndPropulsionCosmosConsumerMetrics (Sinks.tags AppName) args.ProcessorName Log.Logger <- LoggerConfiguration().Configure(args.Verbose).Sinks(metrics, args.Cosmos.Verbose).CreateLogger() try run args |> Async.RunSynchronously; 0 - with e when not (e :? Args.MissingArg) && not (e :? System.Threading.Tasks.TaskCanceledException) -> Log.Fatal(e, "Exiting"); 2 + with e when not (e :? System.Threading.Tasks.TaskCanceledException) -> Log.Fatal(e, "Exiting"); 2 finally Log.CloseAndFlush() - with Args.MissingArg msg -> eprintfn $"%s{msg}"; 1 - | :? Argu.ArguParseException as e -> eprintfn $"%s{e.Message}"; 1 - | e -> eprintfn $"Exception %s{e.Message}"; 1 + with :? Argu.ArguParseException as e -> eprintfn $"%s{e.Message}"; 1 + | e -> eprintf $"Exception %s{e.Message}"; 1 diff --git a/propulsion-projector/Args.fs b/propulsion-projector/Args.fs index f92232898..a6a20978b 100644 --- a/propulsion-projector/Args.fs +++ b/propulsion-projector/Args.fs @@ -3,9 +3,6 @@ module ProjectorTemplate.Args open System -exception MissingArg of message: string with override this.Message = this.message -let missingArg msg = raise (MissingArg msg) - let [] REGION = "EQUINOX_DYNAMO_REGION" let [] SERVICE_URL = "EQUINOX_DYNAMO_SERVICE_URL" let [] ACCESS_KEY = "EQUINOX_DYNAMO_ACCESS_KEY_ID" @@ -16,7 +13,7 @@ let [] INDEX_TABLE = "EQUINOX_DYNAMO_TABLE_INDEX" type Configuration(tryGet: string -> string option) = member val tryGet = tryGet - member _.get key = match tryGet key with Some value -> value | None -> missingArg $"Missing Argument/Environment Variable %s{key}" + member _.get key = match tryGet key with Some value -> value | None -> failwith $"Missing Argument/Environment Variable %s{key}" member x.CosmosConnection = x.get "EQUINOX_COSMOS_CONNECTION" member x.CosmosDatabase = x.get "EQUINOX_COSMOS_DATABASE" @@ -44,7 +41,7 @@ module Cosmos = | [] RetriesWaitTime of float interface IArgParserTemplate with member p.Usage = p |> function - | Verbose _ -> "request verbose logging." + | Verbose -> "request verbose logging." | ConnectionMode _ -> "override the connection mode. Default: Direct." | Connection _ -> "specify a connection string for a Cosmos account. (optional if environment variable EQUINOX_COSMOS_CONNECTION specified)" | Database _ -> "specify a database name for Cosmos store. (optional if environment variable EQUINOX_COSMOS_DATABASE specified)" @@ -54,15 +51,15 @@ module Cosmos = | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds (default: 5)" type Arguments(c: Configuration, p: ParseResults) = - let connection = p.TryGetResult Connection |> Option.defaultWith (fun () -> c.CosmosConnection) + let connection = p.GetResult(Connection, fun () -> c.CosmosConnection) let discovery = Equinox.CosmosStore.Discovery.ConnectionString connection let mode = p.TryGetResult ConnectionMode let timeout = p.GetResult(Timeout, 5.) |> TimeSpan.FromSeconds let retries = p.GetResult(Retries, 1) let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 5.) |> TimeSpan.FromSeconds let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime, ?mode = mode) - let database = p.TryGetResult Database |> Option.defaultWith (fun () -> c.CosmosDatabase) - let container = p.TryGetResult Container |> Option.defaultWith (fun () -> c.CosmosContainer) + let database = p.GetResult(Database, fun () -> c.CosmosDatabase) + let container = p.GetResult(Container, fun () -> c.CosmosContainer) member val Verbose = p.Contains Verbose member _.Connect() = connector.ConnectContext("Target", database, container) @@ -96,9 +93,9 @@ module Dynamo = | Some systemName -> Choice1Of2 systemName | None -> - let serviceUrl = p.TryGetResult ServiceUrl |> Option.defaultWith (fun () -> c.DynamoServiceUrl) - let accessKey = p.TryGetResult AccessKey |> Option.defaultWith (fun () -> c.DynamoAccessKey) - let secretKey = p.TryGetResult SecretKey |> Option.defaultWith (fun () -> c.DynamoSecretKey) + let serviceUrl = p.GetResult(ServiceUrl, fun () -> c.DynamoServiceUrl) + let accessKey = p.GetResult(AccessKey, fun () -> c.DynamoAccessKey) + let secretKey = p.GetResult(SecretKey, fun () -> c.DynamoSecretKey) Choice2Of2 (serviceUrl, accessKey, secretKey) let connector = let timeout = p.GetResult(RetriesTimeoutS, 5.) |> TimeSpan.FromSeconds let retries = p.GetResult(Retries, 1) @@ -107,7 +104,7 @@ module Dynamo = Equinox.DynamoStore.DynamoStoreConnector(systemName, timeout, retries) | Choice2Of2 (serviceUrl, accessKey, secretKey) -> Equinox.DynamoStore.DynamoStoreConnector(serviceUrl, accessKey, secretKey, timeout, retries) - let table = p.TryGetResult Table |> Option.defaultWith (fun () -> c.DynamoTable) + let table = p.GetResult(Table, fun () -> c.DynamoTable) member val Verbose = p.Contains Verbose member _.CreateContext() = connector.CreateClient().CreateContext("Main", table) // #endif diff --git a/propulsion-projector/Program.fs b/propulsion-projector/Program.fs index f8e9de2d2..057bd6b57 100644 --- a/propulsion-projector/Program.fs +++ b/propulsion-projector/Program.fs @@ -7,7 +7,7 @@ module Args = open Argu - [] + [] type Parameters = | [] Verbose | [] ProcessorName of string @@ -19,16 +19,16 @@ module Args = | [] Topic of string #endif // #if cosmos - | [] Cosmos of ParseResults + | [] Cosmos of ParseResults // #endif #if dynamo - | [] Dynamo of ParseResults + | [] Dynamo of ParseResults #endif #if esdb - | [] Esdb of ParseResults + | [] Esdb of ParseResults #endif #if sss - | [] SqlMs of ParseResults + | [] SqlMs of ParseResults #endif interface IArgParserTemplate with member p.Usage = p |> function @@ -76,7 +76,7 @@ module Args = #if sss | SqlMs p -> SourceArgs.Sss.Arguments(c, p) #endif - | p -> Args.missingArg $"Unexpected Store subcommand %A{p}" + | p -> failwith $"Unexpected Store subcommand %A{p}" member x.VerboseStore = x.Store.Verbose #if kafka member val Sink = KafkaSinkArguments(c, p) @@ -131,8 +131,8 @@ module Args = #if kafka and KafkaSinkArguments(c: SourceArgs.Configuration, p: ParseResults) = - member val Broker = p.TryGetResult Broker |> Option.defaultWith (fun () -> c.Broker) - member val Topic = p.TryGetResult Topic |> Option.defaultWith (fun () -> c.Topic) + member val Broker = p.GetResult(Broker, fun () -> c.Broker) + member val Topic = p.GetResult(Topic, fun () -> c.Topic) member x.BuildTargetParams() = x.Broker, x.Topic #endif @@ -190,8 +190,7 @@ let main argv = try let args = Args.parse EnvVar.tryGet argv try Log.Logger <- LoggerConfiguration().Configure(verbose=args.Verbose).CreateLogger() try run args |> Async.RunSynchronously; 0 - with e when not (e :? Args.MissingArg) && not (e :? System.Threading.Tasks.TaskCanceledException) -> Log.Fatal(e, "Exiting"); 2 + with e when not (e :? System.Threading.Tasks.TaskCanceledException) -> Log.Fatal(e, "Exiting"); 2 finally Log.CloseAndFlush() - with Args.MissingArg msg -> eprintfn "%s" msg; 1 - | :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1 + with :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1 | e -> eprintf "Exception %s" e.Message; 1 diff --git a/propulsion-projector/Projector.fsproj b/propulsion-projector/Projector.fsproj index 550027810..d05749b13 100644 --- a/propulsion-projector/Projector.fsproj +++ b/propulsion-projector/Projector.fsproj @@ -18,7 +18,7 @@ - + diff --git a/propulsion-projector/SourceArgs.fs b/propulsion-projector/SourceArgs.fs index b54a0594e..c33a33289 100644 --- a/propulsion-projector/SourceArgs.fs +++ b/propulsion-projector/SourceArgs.fs @@ -58,14 +58,14 @@ module Cosmos = | MaxItems _ -> "maximum item count to supply for the Change Feed query. Default: use response size limit" | LagFreqM _ -> "specify frequency (minutes) to dump lag stats. Default: 1" type Arguments(c: Args.Configuration, p: ParseResults) = - let discovery = p.TryGetResult Connection |> Option.defaultWith (fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString + let discovery = p.GetResult(Connection, fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString let mode = p.TryGetResult ConnectionMode let timeout = p.GetResult(Timeout, 5.) |> TimeSpan.FromSeconds let retries = p.GetResult(Retries, 9) let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 30.) |> TimeSpan.FromSeconds let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime, ?mode = mode) - let database = p.TryGetResult Database |> Option.defaultWith (fun () -> c.CosmosDatabase) - let containerId = p.TryGetResult Container |> Option.defaultWith (fun () -> c.CosmosContainer) + let database = p.GetResult(Database, fun () -> c.CosmosDatabase) + let containerId = p.GetResult(Container, fun () -> c.CosmosContainer) let leaseContainerId = p.GetResult(LeaseContainer, containerId + "-aux") let fromTail = p.Contains FromTail let maxItems = p.TryGetResult MaxItems @@ -117,9 +117,9 @@ module Dynamo = | Some systemName -> Choice1Of2 systemName | None -> - let serviceUrl = p.TryGetResult ServiceUrl |> Option.defaultWith (fun () -> c.DynamoServiceUrl) - let accessKey = p.TryGetResult AccessKey |> Option.defaultWith (fun () -> c.DynamoAccessKey) - let secretKey = p.TryGetResult SecretKey |> Option.defaultWith (fun () -> c.DynamoSecretKey) + let serviceUrl = p.GetResult(ServiceUrl, fun () -> c.DynamoServiceUrl) + let accessKey = p.GetResult(AccessKey, fun () -> c.DynamoAccessKey) + let secretKey = p.GetResult(SecretKey, fun () -> c.DynamoSecretKey) Choice2Of2 (serviceUrl, accessKey, secretKey) let connector = let timeout = p.GetResult(RetriesTimeoutS, 60.) |> TimeSpan.FromSeconds let retries = p.GetResult(Retries, 9) @@ -128,9 +128,9 @@ module Dynamo = Equinox.DynamoStore.DynamoStoreConnector(systemName, timeout, retries) | Choice2Of2 (serviceUrl, accessKey, secretKey) -> Equinox.DynamoStore.DynamoStoreConnector(serviceUrl, accessKey, secretKey, timeout, retries) - let table = p.TryGetResult Table |> Option.defaultWith (fun () -> c.DynamoTable) + let table = p.GetResult(Table, fun () -> c.DynamoTable) let indexSuffix = p.GetResult(IndexSuffix, "-index") - let indexTable = p.TryGetResult IndexTable |> Option.orElseWith (fun () -> c.DynamoIndexTable) |> Option.defaultWith (fun () -> table + indexSuffix) + let indexTable = p.GetResult(IndexTable, fun () -> defaultArg c.DynamoIndexTable (table + indexSuffix)) let fromTail = p.Contains FromTail let tailSleepInterval = TimeSpan.FromMilliseconds 500. let batchSizeCutoff = p.GetResult(MaxItems, 100) @@ -163,7 +163,7 @@ module Esdb = | Store.Config.Dynamo (context, cache) -> Propulsion.Feed.ReaderCheckpoint.DynamoStore.create Store.Metrics.log (consumerGroup, checkpointInterval) (context, cache) - type [] Parameters = + type [] Parameters = | [] Verbose | [] BatchSize of int | [] Connection of string @@ -173,8 +173,8 @@ module Esdb = | [] FromTail - | [] Cosmos of ParseResults - | [] Dynamo of ParseResults + | [] Cosmos of ParseResults + | [] Dynamo of ParseResults interface IArgParserTemplate with member p.Usage = p |> function | Verbose -> "Include low level Store logging." @@ -193,7 +193,7 @@ module Esdb = let startFromTail = p.Contains FromTail let batchSize = p.GetResult(BatchSize, 100) let tailSleepInterval = TimeSpan.FromSeconds 0.5 - let connectionStringLoggable = p.TryGetResult Connection |> Option.defaultWith (fun () -> c.EventStoreConnection) + let connectionStringLoggable = p.GetResult(Connection, fun () -> c.EventStoreConnection) let credentials = p.TryGetResult Credentials |> Option.orElseWith (fun () -> c.MaybeEventStoreCredentials) let discovery = match credentials with Some x -> String.Join(";", connectionStringLoggable, x) | None -> connectionStringLoggable |> Equinox.EventStoreDb.Discovery.ConnectionString @@ -220,7 +220,7 @@ module Esdb = | Dynamo a -> let context = Args.Dynamo.Arguments(c, a).CreateContext() Store.Config.Dynamo (context, cache) - | _ -> Args.missingArg "Must specify `cosmos` or `dynamo` checkpoint store when source is `esdb`" + | _ -> p.Raise "Must specify `cosmos` or `dynamo` checkpoint store when source is `esdb`" #endif // esdb #if sss @@ -253,7 +253,7 @@ module Sss = let startFromTail = p.Contains FromTail let tailSleepInterval = p.GetResult(Tail, 1.) |> TimeSpan.FromSeconds let batchSize = p.GetResult(BatchSize, 512) - let connection = p.TryGetResult Connection |> Option.defaultWith (fun () -> c.SqlStreamStoreConnection) + let connection = p.GetResult(Connection, fun () -> c.SqlStreamStoreConnection) let credentials = p.TryGetResult Credentials |> Option.orElseWith (fun () -> c.SqlStreamStoreCredentials) |> Option.toObj let schema = p.GetResult(Schema, null) member val Verbose = false diff --git a/propulsion-pruner/Program.fs b/propulsion-pruner/Program.fs index 0da680d40..ce635d85f 100644 --- a/propulsion-pruner/Program.fs +++ b/propulsion-pruner/Program.fs @@ -4,12 +4,9 @@ open Propulsion.CosmosStore open Serilog open System -exception MissingArg of message: string with override this.Message = this.message -let missingArg msg = raise (MissingArg msg) - type Configuration(tryGet) = - let get key = match tryGet key with Some value -> value | None -> missingArg $"Missing Argument/Environment Variable %s{key}" + let get key = match tryGet key with Some value -> value | None -> failwith $"Missing Argument/Environment Variable %s{key}" member _.CosmosConnection = get "EQUINOX_COSMOS_CONNECTION" member _.CosmosDatabase = get "EQUINOX_COSMOS_DATABASE" @@ -18,14 +15,14 @@ type Configuration(tryGet) = module Args = open Argu - [] + [] type Parameters = | [] Verbose | [] PrometheusPort of int | [] ProcessorName of string | [] MaxReadAhead of int | [] MaxWriters of int - | [] SrcCosmos of ParseResults + | [] SrcCosmos of ParseResults interface IArgParserTemplate with member p.Usage = p |> function | Verbose -> "request Verbose Logging. Default: off" @@ -45,7 +42,7 @@ module Args = member val Source: CosmosSourceArguments = match p.GetSubCommand() with | SrcCosmos cosmos -> CosmosSourceArguments(c, cosmos) - | _ -> missingArg "Must specify cosmos for Source" + | _ -> p.Raise "Must specify cosmos for Source" member x.DeletionTarget = x.Source.Target and [] CosmosSourceParameters = | [] Verbose @@ -62,7 +59,7 @@ module Args = | [] Retries of int | [] RetriesWaitTime of float - | [] DstCosmos of ParseResults + | [] DstCosmos of ParseResults interface IArgParserTemplate with member p.Usage = p |> function | Verbose -> "request Verbose Change Feed Processor Logging. Default: off" @@ -81,13 +78,13 @@ module Args = | DstCosmos _ -> "CosmosDb Pruning Target parameters." and CosmosSourceArguments(c: Configuration, p: ParseResults) = - let discovery = p.TryGetResult CosmosSourceParameters.Connection |> Option.defaultWith (fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString + let discovery = p.GetResult(CosmosSourceParameters.Connection, fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString let mode = p.TryGetResult CosmosSourceParameters.ConnectionMode let timeout = p.GetResult(CosmosSourceParameters.Timeout, 5.) |> TimeSpan.FromSeconds let retries = p.GetResult(CosmosSourceParameters.Retries, 5) let maxRetryWaitTime = p.GetResult(CosmosSourceParameters.RetriesWaitTime, 30.) |> TimeSpan.FromSeconds let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime, ?mode = mode) - let databaseId = p.TryGetResult CosmosSourceParameters.Database |> Option.defaultWith (fun () -> c.CosmosDatabase) + let databaseId = p.GetResult(CosmosSourceParameters.Database, fun () -> c.CosmosDatabase) let containerId = p.GetResult CosmosSourceParameters.Container let fromTail = p.Contains CosmosSourceParameters.FromTail @@ -95,7 +92,7 @@ module Args = let lagFrequency: TimeSpan = p.GetResult(LagFreqM, 1.) |> TimeSpan.FromMinutes member val Verbose = p.Contains CosmosSourceParameters.Verbose member val MonitoringParams = fromTail, maxItems, lagFrequency - member x.ConnectFeed() = if x.Target.Is(databaseId, containerId) then missingArg "Danger! Can not prune a target based on itself" + member x.ConnectFeed() = if x.Target.Is(databaseId, containerId) then p.Raise "Danger! Can not prune a target based on itself" match x.Target.MaybeLeasesContainer with | Some leasesContainerInTarget -> connector.ConnectFeed(databaseId, containerId, leasesContainerInTarget) | None -> @@ -104,7 +101,7 @@ module Args = member val Target: CosmosSinkArguments = match p.GetSubCommand() with | DstCosmos cosmos -> CosmosSinkArguments(c, cosmos) - | _ -> missingArg "Must specify cosmos for Target" + | _ -> p.Raise "Must specify cosmos for Target" and [] CosmosSinkParameters = | [] ConnectionMode of Microsoft.Azure.Cosmos.ConnectionMode | [] Connection of string @@ -125,14 +122,14 @@ module Args = | Retries _ -> "specify operation retries. Default: 0." | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 5." and CosmosSinkArguments(c: Configuration, p: ParseResults) = - let discovery = p.TryGetResult Connection |> Option.defaultWith (fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString + let discovery = p.GetResult(Connection, fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString let mode = p.TryGetResult ConnectionMode let timeout = p.GetResult(CosmosSinkParameters.Timeout, 5.) |> TimeSpan.FromSeconds let retries = p.GetResult(CosmosSinkParameters.Retries, 0) let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 5.) |> TimeSpan.FromSeconds let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime, ?mode = mode) - let databaseId = p.TryGetResult Database |> Option.defaultWith (fun () -> c.CosmosDatabase) - let containerId = p.TryGetResult Container |> Option.defaultWith (fun () -> c.CosmosContainer) + let databaseId = p.GetResult(Database, fun () -> c.CosmosDatabase) + let containerId = p.GetResult(Container, fun () -> c.CosmosContainer) let leaseContainerId = p.TryGetResult LeaseContainer member _.Is(d, c) = databaseId = d && containerId = c member _.Connect() = async { let! context = connector.ConnectContext("DELETION Target", databaseId, containerId, tipMaxEvents = 256, ?auxContainerId = leaseContainerId) @@ -187,8 +184,7 @@ let main argv = try let args = Args.parse EnvVar.tryGet argv try Log.Logger <- LoggerConfiguration().Configure(AppName, args.Verbose, args.Source.Verbose).CreateLogger() try run args |> Async.RunSynchronously; 0 - with e when not (e :? MissingArg) -> Log.Fatal(e, "Exiting"); 2 + with e when not (e :? System.Threading.Tasks.TaskCanceledException) -> Log.Fatal(e, "Exiting"); 2 finally Log.CloseAndFlush() - with MissingArg msg -> eprintfn "%s" msg; 1 - | :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1 + with :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1 | e -> eprintf "Exception %s" e.Message; 1 diff --git a/propulsion-pruner/Pruner.fsproj b/propulsion-pruner/Pruner.fsproj index 1844d9eac..974405320 100644 --- a/propulsion-pruner/Pruner.fsproj +++ b/propulsion-pruner/Pruner.fsproj @@ -13,7 +13,7 @@ - + diff --git a/propulsion-reactor/Args.fs b/propulsion-reactor/Args.fs index 7c941a84a..73588fa33 100644 --- a/propulsion-reactor/Args.fs +++ b/propulsion-reactor/Args.fs @@ -3,9 +3,6 @@ module ReactorTemplate.Args open System -exception MissingArg of message: string with override this.Message = this.message -let missingArg msg = raise (MissingArg msg) - #if !(sourceKafka && blank && kafka) let [] REGION = "EQUINOX_DYNAMO_REGION" let [] SERVICE_URL = "EQUINOX_DYNAMO_SERVICE_URL" @@ -18,7 +15,7 @@ let [] INDEX_TABLE = "EQUINOX_DYNAMO_TABLE_INDEX" type Configuration(tryGet: string -> string option) = member val tryGet = tryGet - member _.get key = match tryGet key with Some value -> value | None -> missingArg $"Missing Argument/Environment Variable %s{key}" + member _.get key = match tryGet key with Some value -> value | None -> failwith $"Missing Argument/Environment Variable %s{key}" #if !(sourceKafka && blank && kafka) member x.CosmosConnection = x.get "EQUINOX_COSMOS_CONNECTION" @@ -65,15 +62,15 @@ module Cosmos = | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds (default: 5)" type Arguments(c: Configuration, p: ParseResults) = - let connection = p.TryGetResult Connection |> Option.defaultWith (fun () -> c.CosmosConnection) + let connection = p.GetResult(Connection, fun () -> c.CosmosConnection) let discovery = Equinox.CosmosStore.Discovery.ConnectionString connection let mode = p.TryGetResult ConnectionMode let timeout = p.GetResult(Timeout, 5.) |> TimeSpan.FromSeconds let retries = p.GetResult(Retries, 1) let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 5.) |> TimeSpan.FromSeconds let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime, ?mode = mode) - let database = p.TryGetResult Database |> Option.defaultWith (fun () -> c.CosmosDatabase) - let container = p.TryGetResult Container |> Option.defaultWith (fun () -> c.CosmosContainer) + let database = p.GetResult(Database, fun () -> c.CosmosDatabase) + let container = p.GetResult(Container, fun () -> c.CosmosContainer) member _.Connect() = connector.ConnectContext("Target", database, container) module Dynamo = @@ -106,9 +103,9 @@ module Dynamo = | Some systemName -> Choice1Of2 systemName | None -> - let serviceUrl = p.TryGetResult ServiceUrl |> Option.defaultWith (fun () -> c.DynamoServiceUrl) - let accessKey = p.TryGetResult AccessKey |> Option.defaultWith (fun () -> c.DynamoAccessKey) - let secretKey = p.TryGetResult SecretKey |> Option.defaultWith (fun () -> c.DynamoSecretKey) + let serviceUrl = p.GetResult(ServiceUrl, fun () -> c.DynamoServiceUrl) + let accessKey = p.GetResult(AccessKey, fun () -> c.DynamoAccessKey) + let secretKey = p.GetResult(SecretKey, fun () -> c.DynamoSecretKey) Choice2Of2 (serviceUrl, accessKey, secretKey) let connector = let timeout = p.GetResult(RetriesTimeoutS, 5.) |> TimeSpan.FromSeconds let retries = p.GetResult(Retries, 1) @@ -117,6 +114,6 @@ module Dynamo = Equinox.DynamoStore.DynamoStoreConnector(systemName, timeout, retries) | Choice2Of2 (serviceUrl, accessKey, secretKey) -> Equinox.DynamoStore.DynamoStoreConnector(serviceUrl, accessKey, secretKey, timeout, retries) - let table = p.TryGetResult Table |> Option.defaultWith (fun () -> c.DynamoTable) + let table = p.GetResult(Table, fun () -> c.DynamoTable) member _.Connect() = connector.CreateClient().CreateContext("Main", table) #endif diff --git a/propulsion-reactor/Program.fs b/propulsion-reactor/Program.fs index aeebbb5fd..23fb8cfb9 100644 --- a/propulsion-reactor/Program.fs +++ b/propulsion-reactor/Program.fs @@ -31,22 +31,22 @@ module Args = #endif open Argu - [] + [] type Parameters = | [] Verbose | [] ProcessorName of string | [] MaxReadAhead of int | [] MaxWriters of int #if kafka - | [] Kafka of ParseResults + | [] Kafka of ParseResults #else #if sourceKafka // && kafka - | [] Kafka of ParseResults + | [] Kafka of ParseResults #else // kafka && !sourceKafka - | [] Cosmos of ParseResults - | [] Dynamo of ParseResults - | [] Esdb of ParseResults - | [] SqlMs of ParseResults + | [] Cosmos of ParseResults + | [] Dynamo of ParseResults + | [] Esdb of ParseResults + | [] SqlMs of ParseResults #endif #endif interface IArgParserTemplate with @@ -170,31 +170,31 @@ module Args = #if sourceKafka member val Source: Source = match p.GetSubCommand() with | Kafka p -> Source.Kafka <| SourceArgs.Kafka.Arguments(c, p) - | p -> Args.missingArg $"Unexpected Source subcommand %A{p}" + | x -> p.Raise $"Unexpected Source subcommand %A{x}" #else member val Source: Source = match p.GetSubCommand() with | Cosmos p -> Source.Cosmos <| SourceArgs.Cosmos.Arguments(c, p) | Dynamo p -> Source.Dynamo <| SourceArgs.Dynamo.Arguments(c, p) | Esdb p -> Source.Esdb <| SourceArgs.Esdb.Arguments(c, p) | SqlMs p -> Source.SqlMs <| SourceArgs.Sss.Arguments(c, p) - | p -> Args.missingArg $"Unexpected Source subcommand %A{p}" + | x -> p.Raise $"Unexpected Source subcommand %A{x}" #endif #else // kafka member val Sink = match p.GetSubCommand() with | Parameters.Kafka p -> KafkaSinkArguments(c, p) - | p -> Args.missingArg $"Unexpected Sink subcommand %A{p}" + | x -> p.Raise $"Unexpected Sink subcommand %A{x}" member x.Source: Source = x.Sink.Source - and [] KafkaSinkParameters = + and [] KafkaSinkParameters = | [] Broker of string | [] Topic of string #if sourceKafka - | [] Kafka of ParseResults + | [] Kafka of ParseResults #else - | [] Cosmos of ParseResults - | [] Dynamo of ParseResults - | [] Esdb of ParseResults - | [] SqlMs of ParseResults + | [] Cosmos of ParseResults + | [] Dynamo of ParseResults + | [] Esdb of ParseResults + | [] SqlMs of ParseResults #endif interface IArgParserTemplate with member p.Usage = p |> function @@ -210,20 +210,20 @@ module Args = #endif and KafkaSinkArguments(c: Configuration, p: ParseResults) = - member val Broker = p.TryGetResult Broker |> Option.defaultWith (fun () -> c.Broker) - member val Topic = p.TryGetResult Topic |> Option.defaultWith (fun () -> c.Topic) + member val Broker = p.GetResult(Broker, fun () -> c.Broker) + member val Topic = p.GetResult(Topic, fun () -> c.Topic) member x.BuildTargetParams() = x.Broker, x.Topic #if sourceKafka member val Source = match p.GetSubCommand() with | KafkaSinkParameters.Kafka p -> Source.Kafka <| SourceArgs.Kafka.Arguments(c, p) - | p -> Args.missingArg $"Unexpected Source subcommand %A{p}" + | x -> p.Raise $"Unexpected Source subcommand %A{x}" #else member val Source: Source = match p.GetSubCommand() with | Cosmos p -> Source.Cosmos <| SourceArgs.Cosmos.Arguments(c, p) | Dynamo p -> Source.Dynamo <| SourceArgs.Dynamo.Arguments(c, p) | Esdb p -> Source.Esdb <| SourceArgs.Esdb.Arguments(c, p) | SqlMs p -> Source.SqlMs <| SourceArgs.Sss.Arguments(c, p) - | p -> Args.missingArg $"Unexpected Source subcommand %A{p}" + | x -> p.Raise $"Unexpected Source subcommand %A{x}" #endif #endif @@ -312,8 +312,7 @@ let main argv = try let args = Args.parse EnvVar.tryGet argv try Log.Logger <- LoggerConfiguration().Configure(verbose=args.Verbose).CreateLogger() try build args |> Async.Parallel |> Async.Ignore |> Async.RunSynchronously; 0 - with e when not (e :? Args.MissingArg) && not (e :? System.Threading.Tasks.TaskCanceledException) -> Log.Fatal(e, "Exiting"); 2 + with e when not (e :? System.Threading.Tasks.TaskCanceledException) -> Log.Fatal(e, "Exiting"); 2 finally Log.CloseAndFlush() - with Args.MissingArg msg -> eprintfn "%s" msg; 1 - | :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1 + with :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1 | e -> eprintf "Exception %s" e.Message; 1 diff --git a/propulsion-reactor/Reactor.fsproj b/propulsion-reactor/Reactor.fsproj index 93cb72244..8af5aea06 100644 --- a/propulsion-reactor/Reactor.fsproj +++ b/propulsion-reactor/Reactor.fsproj @@ -34,7 +34,7 @@ - + diff --git a/propulsion-reactor/SourceArgs.fs b/propulsion-reactor/SourceArgs.fs index 5fb332df3..d9ed4ce2f 100644 --- a/propulsion-reactor/SourceArgs.fs +++ b/propulsion-reactor/SourceArgs.fs @@ -41,14 +41,14 @@ module TargetStoreArgs = #if sourceKafka module Kafka = - type [] Parameters = + type [] Parameters = | [] Broker of string | [] Topic of string | [] MaxInflightMb of float | [] LagFreqM of float #if !(kafka && blank) - | [] Cosmos of ParseResults - | [] Dynamo of ParseResults + | [] Cosmos of ParseResults + | [] Dynamo of ParseResults #endif interface IArgParserTemplate with member p.Usage = p |> function @@ -60,8 +60,8 @@ module Kafka = | Dynamo _ -> "CosmosDb Sink parameters." type Arguments(c: Configuration, p: ParseResults) = - member val Broker = p.TryGetResult Broker |> Option.defaultWith (fun () -> c.Broker) - member val Topic = p.TryGetResult Topic |> Option.defaultWith (fun () -> c.Topic) + member val Broker = p.GetResult(Broker, fun () -> c.Broker) + member val Topic = p.GetResult(Topic, fun () -> c.Topic) member val MaxInFlightBytes = p.GetResult(MaxInflightMb, 10.) * 1024. * 1024. |> int64 member val LagFrequency = p.TryGetResult LagFreqM |> Option.map TimeSpan.FromMinutes member x.BuildSourceParams() = x.Broker, x.Topic @@ -71,14 +71,14 @@ module Kafka = match p.GetSubCommand() with | Cosmos cosmos -> TargetStoreArgs.Cosmos (Args.Cosmos.Arguments(c, cosmos)) | Dynamo dynamo -> TargetStoreArgs.Dynamo (Args.Dynamo.Arguments(c, dynamo)) - | _ -> Args.missingArg "Must specify `cosmos` or `dynamo` target store when source is `kafka`" + | _ -> p.Raise "Must specify `cosmos` or `dynamo` target store when source is `kafka`" member x.ConnectTarget(cache): Store.Config = TargetStoreArgs.connectTarget x.TargetStoreArgs cache #endif #else // !sourceKafka module Cosmos = - type [] Parameters = + type [] Parameters = | [] Verbose | [] ConnectionMode of Microsoft.Azure.Cosmos.ConnectionMode | [] Connection of string @@ -92,8 +92,8 @@ module Cosmos = | [] MaxItems of int | [] LagFreqM of float #if !(kafka && blank) - | [] Cosmos of ParseResults - | [] Dynamo of ParseResults + | [] Cosmos of ParseResults + | [] Dynamo of ParseResults #endif interface IArgParserTemplate with member p.Usage = p |> function @@ -115,14 +115,14 @@ module Cosmos = #endif type Arguments(c: Args.Configuration, p: ParseResults) = - let discovery = p.TryGetResult Connection |> Option.defaultWith (fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString + let discovery = p.GetResult(Connection, fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString let mode = p.TryGetResult ConnectionMode let timeout = p.GetResult(Timeout, 5.) |> TimeSpan.FromSeconds let retries = p.GetResult(Retries, 9) let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 30.) |> TimeSpan.FromSeconds let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime, ?mode = mode) - let database = p.TryGetResult Database |> Option.defaultWith (fun () -> c.CosmosDatabase) - let containerId = p.TryGetResult Container |> Option.defaultWith (fun () -> c.CosmosContainer) + let database = p.GetResult(Database, fun () -> c.CosmosDatabase) + let containerId = p.GetResult(Container, fun () -> c.CosmosContainer) let leaseContainerId = p.GetResult(LeaseContainer, containerId + "-aux") let fromTail = p.Contains FromTail let maxItems = p.TryGetResult MaxItems @@ -136,14 +136,14 @@ module Cosmos = match p.GetSubCommand() with | Cosmos cosmos -> TargetStoreArgs.Cosmos (Args.Cosmos.Arguments(c, cosmos)) | Dynamo dynamo -> TargetStoreArgs.Dynamo (Args.Dynamo.Arguments(c, dynamo)) - | _ -> Args.missingArg "Must specify `cosmos` or `dynamo` target store when source is `esdb`" + | _ -> p.Raise "Must specify `cosmos` or `dynamo` target store when source is `esdb`" member x.ConnectTarget(cache): Store.Config = TargetStoreArgs.connectTarget x.TargetStoreArgs cache #endif module Dynamo = - type [] Parameters = + type [] Parameters = | [] Verbose | [] RegionProfile of string | [] ServiceUrl of string @@ -158,8 +158,8 @@ module Dynamo = | [] FromTail | [] StreamsDop of int #if !(kafka && blank) - | [] Cosmos of ParseResults - | [] Dynamo of ParseResults + | [] Cosmos of ParseResults + | [] Dynamo of ParseResults #endif interface IArgParserTemplate with member p.Usage = p |> function @@ -189,9 +189,9 @@ module Dynamo = | Some systemName -> Choice1Of2 systemName | None -> - let serviceUrl = p.TryGetResult ServiceUrl |> Option.defaultWith (fun () -> c.DynamoServiceUrl) - let accessKey = p.TryGetResult AccessKey |> Option.defaultWith (fun () -> c.DynamoAccessKey) - let secretKey = p.TryGetResult SecretKey |> Option.defaultWith (fun () -> c.DynamoSecretKey) + let serviceUrl = p.GetResult(ServiceUrl, fun () -> c.DynamoServiceUrl) + let accessKey = p.GetResult(AccessKey, fun () -> c.DynamoAccessKey) + let secretKey = p.GetResult(SecretKey, fun () -> c.DynamoSecretKey) Choice2Of2 (serviceUrl, accessKey, secretKey) let connector = let timeout = p.GetResult(RetriesTimeoutS, 60.) |> TimeSpan.FromSeconds let retries = p.GetResult(Retries, 9) @@ -200,9 +200,9 @@ module Dynamo = Equinox.DynamoStore.DynamoStoreConnector(systemName, timeout, retries) | Choice2Of2 (serviceUrl, accessKey, secretKey) -> Equinox.DynamoStore.DynamoStoreConnector(serviceUrl, accessKey, secretKey, timeout, retries) - let table = p.TryGetResult Table |> Option.defaultWith (fun () -> c.DynamoTable) + let table = p.GetResult(Table, fun () -> c.DynamoTable) let indexSuffix = p.GetResult(IndexSuffix, "-index") - let indexTable = p.TryGetResult IndexTable |> Option.orElseWith (fun () -> c.DynamoIndexTable) |> Option.defaultWith (fun () -> table + indexSuffix) + let indexTable = p.GetResult(IndexTable, fun () -> defaultArg c.DynamoIndexTable (table + indexSuffix)) let fromTail = p.Contains FromTail let tailSleepInterval = TimeSpan.FromMilliseconds 500. let batchSizeCutoff = p.GetResult(MaxItems, 100) @@ -223,7 +223,7 @@ module Dynamo = match p.GetSubCommand() with | Cosmos cosmos -> TargetStoreArgs.Cosmos (Args.Cosmos.Arguments(c, cosmos)) | Dynamo dynamo -> TargetStoreArgs.Dynamo (Args.Dynamo.Arguments(c, dynamo)) - | _ -> Args.missingArg "Must specify `cosmos` or `dynamo` target store when source is `esdb`" + | _ -> p.Raise "Must specify `cosmos` or `dynamo` target store when source is `esdb`" member x.ConnectTarget(cache): Store.Config = TargetStoreArgs.connectTarget x.TargetStoreArgs cache #endif @@ -246,7 +246,7 @@ module Esdb = | Store.Config.Sss _ -> failwith "Unexpected store type" #endif - type [] Parameters = + type [] Parameters = | [] Verbose | [] Connection of string | [] Credentials of string @@ -256,8 +256,8 @@ module Esdb = | [] BatchSize of int | [] FromTail - | [] Cosmos of ParseResults - | [] Dynamo of ParseResults + | [] Cosmos of ParseResults + | [] Dynamo of ParseResults interface IArgParserTemplate with member p.Usage = p |> function | Verbose -> "Include low level Store logging." @@ -276,7 +276,7 @@ module Esdb = let startFromTail = p.Contains FromTail let batchSize = p.GetResult(BatchSize, 100) let tailSleepInterval = TimeSpan.FromSeconds 0.5 - let connectionStringLoggable = p.TryGetResult Connection |> Option.defaultWith (fun () -> c.EventStoreConnection) + let connectionStringLoggable = p.GetResult(Connection, fun () -> c.EventStoreConnection) let credentials = p.TryGetResult Credentials |> Option.orElseWith (fun () -> c.MaybeEventStoreCredentials) let discovery = match credentials with Some x -> String.Join(";", connectionStringLoggable, x) | None -> connectionStringLoggable |> Equinox.EventStoreDb.Discovery.ConnectionString @@ -301,7 +301,7 @@ module Esdb = match p.GetSubCommand() with | Cosmos cosmos -> TargetStoreArgs.Cosmos (Args.Cosmos.Arguments(c, cosmos)) | Dynamo dynamo -> TargetStoreArgs.Dynamo (Args.Dynamo.Arguments(c, dynamo)) - | _ -> Args.missingArg "Must specify `cosmos` or `dynamo` target store when source is `esdb`" + | _ -> p.Raise "Must specify `cosmos` or `dynamo` target store when source is `esdb`" member x.ConnectTarget(cache): Store.Config = TargetStoreArgs.connectTarget x.TargetStoreArgs cache #endif @@ -309,7 +309,7 @@ module Esdb = module Sss = // TOCONSIDER: add DB connectors other than MsSql - type [] Parameters = + type [] Parameters = | [] Tail of intervalS: float | [] Connection of string | [] Credentials of string @@ -319,8 +319,8 @@ module Sss = | [] CheckpointsConnection of string | [] CheckpointsCredentials of string #if !(kafka && blank) - | [] Cosmos of ParseResults - | [] Dynamo of ParseResults + | [] Cosmos of ParseResults + | [] Dynamo of ParseResults #endif interface IArgParserTemplate with member p.Usage = p |> function @@ -341,7 +341,7 @@ module Sss = let startFromTail = p.Contains FromTail let tailSleepInterval = p.GetResult(Tail, 1.) |> TimeSpan.FromSeconds let batchSize = p.GetResult(BatchSize, 512) - let connection = p.TryGetResult Connection |> Option.defaultWith (fun () -> c.SqlStreamStoreConnection) + let connection = p.GetResult(Connection, fun () -> c.SqlStreamStoreConnection) let credentials = p.TryGetResult Credentials |> Option.orElseWith (fun () -> c.SqlStreamStoreCredentials) |> Option.toObj let schema = p.GetResult(Schema, null) @@ -372,7 +372,7 @@ module Sss = match p.GetSubCommand() with | Cosmos cosmos -> TargetStoreArgs.Cosmos (Args.Cosmos.Arguments(c, cosmos)) | Dynamo dynamo -> TargetStoreArgs.Dynamo (Args.Dynamo.Arguments(c, dynamo)) - | _ -> Args.missingArg "Must specify `cosmos` or `dynamo` target store when source is `sss`" + | _ -> p.Raise "Must specify `cosmos` or `dynamo` target store when source is `sss`" member x.ConnectTarget(cache): Store.Config = TargetStoreArgs.connectTarget x.TargetStoreArgs cache #endif diff --git a/propulsion-summary-consumer/Program.fs b/propulsion-summary-consumer/Program.fs index fe9133a0c..c0e01c1d0 100644 --- a/propulsion-summary-consumer/Program.fs +++ b/propulsion-summary-consumer/Program.fs @@ -3,12 +3,9 @@ open Serilog open System -exception MissingArg of message: string with override this.Message = this.message -let missingArg msg = raise (MissingArg msg) - type Configuration(tryGet) = - let get key = match tryGet key with Some value -> value | None -> missingArg $"Missing Argument/Environment Variable %s{key}" + let get key = match tryGet key with Some value -> value | None -> failwith $"Missing Argument/Environment Variable %s{key}" member _.CosmosConnection = get "EQUINOX_COSMOS_CONNECTION" member _.CosmosDatabase = get "EQUINOX_COSMOS_DATABASE" @@ -20,7 +17,7 @@ type Configuration(tryGet) = module Args = open Argu - [] + [] type Parameters = | [] Broker of string | [] Topic of string @@ -30,7 +27,7 @@ module Args = | [] MaxWriters of int | [] Verbose - | [] Cosmos of ParseResults + | [] Cosmos of ParseResults interface IArgParserTemplate with member p.Usage = p |> function @@ -41,13 +38,13 @@ module Args = | LagFreqM _ -> "specify frequency (minutes) to dump lag stats. Default: off." | MaxWriters _ -> "maximum number of items to process in parallel. Default: 8" - | Verbose _ -> "request verbose logging." + | Verbose -> "request verbose logging." | Cosmos _ -> "specify CosmosDb input parameters" and Arguments(c: Configuration, p: ParseResults) = member val Cosmos = CosmosArguments(c, p.GetResult Cosmos) - member val Broker = p.TryGetResult Broker |> Option.defaultWith (fun () -> c.Broker) - member val Topic = p.TryGetResult Topic |> Option.defaultWith (fun () -> c.Topic) - member val Group = p.TryGetResult Group |> Option.defaultWith (fun () -> c.Group) + member val Broker = p.GetResult(Broker, fun () -> c.Broker) + member val Topic = p.GetResult(Topic, fun () -> c.Topic) + member val Group = p.GetResult(Group, fun () -> c.Group) member val MaxInFlightBytes = p.GetResult(MaxInflightMb, 10.) * 1024. * 1024. |> int64 member val LagFrequency = p.TryGetResult LagFreqM |> Option.map TimeSpan.FromMinutes @@ -74,13 +71,13 @@ module Args = | Retries _ -> "specify operation retries. Default: 1." | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 5." and CosmosArguments(c: Configuration, p: ParseResults) = - let discovery = p.TryGetResult Connection |> Option.defaultWith (fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString + let discovery = p.GetResult(Connection, fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString let mode = p.TryGetResult ConnectionMode let timeout = p.GetResult(Timeout, 5.) |> TimeSpan.FromSeconds let retries = p.GetResult(Retries, 1) let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 5.) |> TimeSpan.FromSeconds let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime, ?mode = mode) - let database = p.TryGetResult Database |> Option.defaultWith (fun () -> c.CosmosDatabase) + let database = p.GetResult(Database, fun () -> c.CosmosDatabase) let container = p.GetResult Container member _.Connect() = connector.ConnectContext("Main", database, container, 256) @@ -118,8 +115,7 @@ let main argv = try let args = Args.parse EnvVar.tryGet argv try Log.Logger <- LoggerConfiguration().Configure(verbose=args.Verbose).CreateLogger() try run args |> Async.RunSynchronously; 0 - with e when not (e :? MissingArg) -> Log.Fatal(e, "Exiting"); 2 + with e when not (e :? System.Threading.Tasks.TaskCanceledException) -> Log.Fatal(e, "Exiting"); 2 finally Log.CloseAndFlush() - with MissingArg msg -> eprintfn "%s" msg; 1 - | :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1 - | e -> eprintf "Exception %s" e.Message; 1 + with :? Argu.ArguParseException as e -> eprintfn $"%s{e.Message}"; 1 + | e -> eprintf $"Exception %s{e.Message}"; 1 diff --git a/propulsion-summary-consumer/SummaryConsumer.fsproj b/propulsion-summary-consumer/SummaryConsumer.fsproj index 8b69f1aa4..fdf44e71e 100644 --- a/propulsion-summary-consumer/SummaryConsumer.fsproj +++ b/propulsion-summary-consumer/SummaryConsumer.fsproj @@ -16,7 +16,7 @@ - + diff --git a/propulsion-sync/Program.fs b/propulsion-sync/Program.fs index 199fc6546..1e9183134 100644 --- a/propulsion-sync/Program.fs +++ b/propulsion-sync/Program.fs @@ -9,12 +9,9 @@ open Serilog open System open System.Threading -exception MissingArg of message: string with override this.Message = this.message -let missingArg msg = raise (MissingArg msg) - type Configuration(tryGet) = - let get key = match tryGet key with Some value -> value | None -> missingArg $"Missing Argument/Environment Variable %s{key}" + let get key = match tryGet key with Some value -> value | None -> failwith $"Missing Argument/Environment Variable %s{key}" let isTrue varName = tryGet varName |> Option.exists (fun s -> String.Equals(s, bool.TrueString, StringComparison.OrdinalIgnoreCase)) member _.CosmosConnection = get "EQUINOX_COSMOS_CONNECTION" @@ -33,7 +30,7 @@ type Configuration(tryGet) = module Args = open Argu - [] + [] type Parameters = | [] Verbose | [] VerboseStore @@ -44,8 +41,8 @@ module Args = | [] MaxConnections of int | [] CategoryBlacklist of string | [] CategoryWhitelist of string - | [] SrcEs of ParseResults - | [] SrcCosmos of ParseResults + | [] SrcEs of ParseResults + | [] SrcCosmos of ParseResults interface IArgParserTemplate with member p.Usage = p |> function | Verbose -> "request Verbose Logging. Default: off" @@ -73,7 +70,7 @@ module Args = match p.GetSubCommand() with | SrcCosmos cosmos -> Choice1Of2 (CosmosSourceArguments(c, cosmos)) | SrcEs es -> Choice2Of2 (EsSourceArguments(c, es)) - | _ -> missingArg "Must specify one of cosmos or es for Src" + | _ -> p.Raise "Must specify one of cosmos or es for Src" member val StatsInterval = TimeSpan.FromMinutes 1. member val StateInterval = TimeSpan.FromMinutes 5. @@ -101,7 +98,7 @@ module Args = fun x -> not (black.Contains x) && (not << isCheckpoint) x && (not excludeLong || (not << isLong) x) | bad, [] -> let black = Set.ofList bad in Log.Warning("Excluding categories: {cats}", black); fun x -> not (black.Contains x) | [], good -> let white = Set.ofList good in Log.Warning("Only copying categories: {cats}", white); fun x -> white.Contains x - | _, _ -> missingArg "BlackList and Whitelist are mutually exclusive; inclusions and exclusions cannot be mixed" + | _, _ -> p.Raise "BlackList and Whitelist are mutually exclusive; inclusions and exclusions cannot be mixed" member x.Sink: Choice = match x.Source with @@ -124,7 +121,7 @@ module Args = { groupName = x.ProcessorName; start = startPos; checkpointInterval = srcE.CheckpointInterval; tailInterval = srcE.TailInterval forceRestart = srcE.ForceRestart batchSize = srcE.StartingBatchSize; minBatchSize = srcE.MinBatchSize; gorge = srcE.Gorge; streamReaders = srcE.StreamReaders }) - and [] CosmosSourceParameters = + and [] CosmosSourceParameters = | [] FromTail | [] MaxItems of int | [] LagFreqM of float @@ -138,8 +135,8 @@ module Args = | [] Retries of int | [] RetriesWaitTime of float - | [] DstEs of ParseResults - | [] DstCosmos of ParseResults + | [] DstEs of ParseResults + | [] DstCosmos of ParseResults interface IArgParserTemplate with member p.Usage = p |> function | FromTail -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." @@ -158,13 +155,13 @@ module Args = | DstEs _ -> "EventStore Sink parameters." | DstCosmos _ -> "CosmosDb Sink parameters." and CosmosSourceArguments(c: Configuration, p: ParseResults) = - let discovery = p.TryGetResult CosmosSourceParameters.Connection |> Option.defaultWith (fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString + let discovery = p.GetResult(CosmosSourceParameters.Connection, fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString let mode = p.TryGetResult CosmosSourceParameters.ConnectionMode let timeout = p.GetResult(CosmosSourceParameters.Timeout, 5.) |> TimeSpan.FromSeconds let retries = p.GetResult(CosmosSourceParameters.Retries, 1) let maxRetryWaitTime = p.GetResult(CosmosSourceParameters.RetriesWaitTime, 5.) |> TimeSpan.FromSeconds let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime, ?mode = mode) - let database = p.TryGetResult CosmosSourceParameters.Database |> Option.defaultWith (fun () -> c.CosmosDatabase) + let database = p.GetResult(CosmosSourceParameters.Database, fun () -> c.CosmosDatabase) let containerId: string = p.GetResult CosmosSourceParameters.Container let leaseContainerId = p.GetResult(CosmosSourceParameters.LeaseContainer, containerId + "-aux") @@ -182,8 +179,8 @@ module Args = match p.GetSubCommand() with | DstCosmos cosmos -> Choice1Of2 (CosmosSinkArguments(c, cosmos)) | DstEs es -> Choice2Of2 (EsSinkArguments(c, es)) - | _ -> missingArg "Must specify one of cosmos or es for Sink" - and [] EsSourceParameters = + | _ -> p.Raise "Must specify one of cosmos or es for Sink" + and [] EsSourceParameters = | [] FromTail | [] Gorge of int | [] StreamReaders of int @@ -205,8 +202,8 @@ module Args = | [] Username of string | [] Password of string - | [] Es of ParseResults - | [] Cosmos of ParseResults + | [] Es of ParseResults + | [] Cosmos of ParseResults interface IArgParserTemplate with member p.Usage = p |> function | FromTail -> "Start the processing from the Tail" @@ -255,9 +252,9 @@ module Args = | true, Some p -> Discovery.Uri (UriBuilder("tcp", x.Host, p).Uri) member val Tcp = p.Contains EsSourceParameters.Tcp || c.EventStoreTcp member val Port = match p.TryGetResult EsSourceParameters.Port with Some x -> Some x | None -> c.EventStorePort - member val Host = p.TryGetResult EsSourceParameters.Host |> Option.defaultWith (fun () -> c.EventStoreHost) - member val User = p.TryGetResult EsSourceParameters.Username |> Option.defaultWith (fun () -> c.EventStoreUsername) - member val Password = p.TryGetResult EsSourceParameters.Password |> Option.defaultWith (fun () -> c.EventStorePassword) + member val Host = p.GetResult(EsSourceParameters.Host, fun () -> c.EventStoreHost) + member val User = p.GetResult(EsSourceParameters.Username, fun () -> c.EventStoreUsername) + member val Password = p.GetResult(EsSourceParameters.Password, fun () -> c.EventStorePassword) member val Retries = p.GetResult(EsSourceParameters.Retries, 3) member val Timeout = p.GetResult(EsSourceParameters.Timeout, 20.) |> TimeSpan.FromSeconds member val Heartbeat = p.GetResult(EsSourceParameters.HeartbeatTimeout, 1.5) |> TimeSpan.FromSeconds @@ -276,8 +273,8 @@ module Args = member val Sink = match p.GetSubCommand() with | Cosmos cosmos -> CosmosSinkArguments(c, cosmos) - | _ -> missingArg "Must specify cosmos for Sink if source is `es`" - and [] CosmosSinkParameters = + | _ -> p.Raise "Must specify cosmos for Sink if source is `es`" + and [] CosmosSinkParameters = | [] ConnectionMode of Microsoft.Azure.Cosmos.ConnectionMode | [] Connection of string | [] Database of string @@ -287,7 +284,7 @@ module Args = | [] Retries of int | [] RetriesWaitTime of float #if kafka - | [] Kafka of ParseResults + | [] Kafka of ParseResults #endif interface IArgParserTemplate with member p.Usage = p |> function @@ -303,14 +300,14 @@ module Args = | Kafka _ -> "specify Kafka target for non-Synced categories. Default: None." #endif and CosmosSinkArguments(c: Configuration, p: ParseResults) = - let discovery = p.TryGetResult Connection |> Option.defaultWith (fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString + let discovery = p.GetResult(Connection, fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString let mode = p.GetResult(ConnectionMode, Microsoft.Azure.Cosmos.ConnectionMode.Direct) let timeout = p.GetResult(CosmosSinkParameters.Timeout, 5.) |> TimeSpan.FromSeconds let retries = p.GetResult(CosmosSinkParameters.Retries, 0) let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 5.) |> TimeSpan.FromSeconds let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime, mode) - let database = p.TryGetResult Database |> Option.defaultWith (fun () -> c.CosmosDatabase) - let container = p.TryGetResult Container |> Option.defaultWith (fun () -> c.CosmosContainer) + let database = p.GetResult(Database, fun () -> c.CosmosDatabase) + let container = p.GetResult(Container, fun () -> c.CosmosContainer) let leaseContainerId = p.TryGetResult LeaseContainer member _.ConnectTarget(maxEvents) = connector.ConnectTarget("Destination", database, container, maxEvents) member _.MaybeLeasesContainer: Microsoft.Azure.Cosmos.Container option = leaseContainerId |> Option.map (fun id -> connector.LeasesContainer(database, id)) @@ -351,9 +348,9 @@ module Args = | true, Some p -> Discovery.Uri (UriBuilder("tcp", x.Host, p).Uri) member val Tcp = p.Contains EsSinkParameters.Tcp || c.EventStoreTcp member val Port = match p.TryGetResult Port with Some x -> Some x | None -> c.EventStorePort - member val Host = p.TryGetResult Host |> Option.defaultWith (fun () -> c.EventStoreHost) - member val User = p.TryGetResult Username |> Option.defaultWith (fun () -> c.EventStoreUsername) - member val Password = p.TryGetResult Password |> Option.defaultWith (fun () -> c.EventStorePassword) + member val Host = p.GetResult(Host, fun () -> c.EventStoreHost) + member val User = p.GetResult(Username, fun () -> c.EventStoreUsername) + member val Password = p.GetResult(Password, fun () -> c.EventStorePassword) member val Retries = p.GetResult(Retries, 3) member val Timeout = p.GetResult(Timeout, 20.) |> TimeSpan.FromSeconds member val Heartbeat = p.GetResult(HeartbeatTimeout, 1.5) |> TimeSpan.FromSeconds @@ -378,8 +375,8 @@ module Args = | Topic _ -> "specify Kafka Topic Id. (optional if environment variable PROPULSION_KAFKA_TOPIC specified)." | Producers _ -> "specify number of Kafka Producer instances to use. Default: 1." and KafkaSinkArguments(c: Configuration, p: ParseResults) = - member val Broker = p.TryGetResult Broker |> Option.defaultWith (fun () -> c.Broker) - member val Topic = p.TryGetResult Topic |> Option.defaultWith (fun () -> c.Topic) + member val Broker = p.GetResult(Broker, fun () -> c.Broker) + member val Topic = p.GetResult(Topic, fun () -> c.Topic) member val Producers = p.GetResult(Producers, 1) member x.BuildTargetParams() = x.Broker, x.Topic, x.Producers #endif @@ -563,8 +560,7 @@ let main argv = try let args = Args.parse EnvVar.tryGet argv try Log.Logger <- LoggerConfiguration().Configure(args.Verbose, args.VerboseStore, ?maybeSeqEndpoint = args.MaybeSeqEndpoint).CreateLogger() try run args |> Async.RunSynchronously; 0 - with e when not (e :? MissingArg) -> Log.Fatal(e, "Exiting"); 2 + with e when not (e :? System.Threading.Tasks.TaskCanceledException) -> Log.Fatal(e, "Exiting"); 2 finally Log.CloseAndFlush() - with MissingArg msg -> eprintfn "%s" msg; 1 - | :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1 - | e -> eprintf "Exception %s" e.Message; 1 + with :? Argu.ArguParseException as e -> eprintfn $"%s{e.Message}"; 1 + | e -> eprintf $"Exception %s{e.Message}"; 1 diff --git a/propulsion-sync/Sync.fsproj b/propulsion-sync/Sync.fsproj index aba94352d..3c4fbfd9c 100644 --- a/propulsion-sync/Sync.fsproj +++ b/propulsion-sync/Sync.fsproj @@ -13,7 +13,7 @@ - + diff --git a/propulsion-tracking-consumer/Program.fs b/propulsion-tracking-consumer/Program.fs index 97caf4b54..333634fd4 100644 --- a/propulsion-tracking-consumer/Program.fs +++ b/propulsion-tracking-consumer/Program.fs @@ -3,12 +3,9 @@ open Serilog open System -exception MissingArg of message: string with override this.Message = this.message -let missingArg msg = raise (MissingArg msg) - type Configuration(tryGet) = - let get key = match tryGet key with Some value -> value | None -> missingArg $"Missing Argument/Environment Variable %s{key}" + let get key = match tryGet key with Some value -> value | None -> failwith $"Missing Argument/Environment Variable %s{key}" member _.CosmosConnection = get "EQUINOX_COSMOS_CONNECTION" member _.CosmosDatabase = get "EQUINOX_COSMOS_DATABASE" @@ -20,7 +17,7 @@ type Configuration(tryGet) = module Args = open Argu - type [] Parameters = + type [] Parameters = | [] Broker of string | [] Topic of string | [] Group of string @@ -29,11 +26,11 @@ module Args = | [] MaxWriters of int | [] Verbose - | [] Cosmos of ParseResults + | [] Cosmos of ParseResults interface IArgParserTemplate with member p.Usage = p |> function - | Verbose _ -> "request verbose logging." + | Verbose -> "request verbose logging." | Broker _ -> "specify Kafka Broker, in host:port format. (optional if environment variable PROPULSION_KAFKA_BROKER specified)" | Topic _ -> "specify Kafka Topic name. (optional if environment variable PROPULSION_KAFKA_TOPIC specified)" | Group _ -> "specify Kafka Consumer Group Id. (optional if environment variable PROPULSION_KAFKA_GROUP specified)" @@ -43,9 +40,9 @@ module Args = | Cosmos _ -> "specify CosmosDb input parameters" and Arguments(c: Configuration, p: ParseResults) = member val Verbose = p.Contains Verbose - member val Broker = p.TryGetResult Broker |> Option.defaultWith (fun () -> c.Broker) - member val Topic = p.TryGetResult Topic |> Option.defaultWith (fun () -> c.Topic) - member val Group = p.TryGetResult Group |> Option.defaultWith (fun () -> c.Group) + member val Broker = p.GetResult(Broker, fun () -> c.Broker) + member val Topic = p.GetResult(Topic, fun () -> c.Topic) + member val Group = p.GetResult(Group, fun () -> c.Group) member val MaxInFlightBytes = p.GetResult(MaxInflightMb, 10.) * 1024. * 1024. |> int64 member val LagFrequency = p.TryGetResult LagFreqM |> Option.map TimeSpan.FromMinutes member val MaxConcurrentStreams = p.GetResult(MaxWriters, 8) @@ -70,13 +67,13 @@ module Args = | Retries _ -> "specify operation retries (default: 1)." | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds (default: 5)" and CosmosArguments(c: Configuration, p: ParseResults) = - let discovery = p.TryGetResult Connection |> Option.defaultWith (fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString + let discovery = p.GetResult(Connection, fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString let mode = p.TryGetResult ConnectionMode let timeout = p.GetResult(Timeout, 5.) |> TimeSpan.FromSeconds let retries = p.GetResult(Retries, 1) let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 5.) |> TimeSpan.FromSeconds let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime, ?mode = mode) - let database = p.TryGetResult Database |> Option.defaultWith (fun () -> c.CosmosDatabase) + let database = p.GetResult(Database, fun () -> c.CosmosDatabase) let container = p.GetResult Container member _.Connect() = connector.ConnectContext("Main", database, container) @@ -118,8 +115,7 @@ let main argv = try let args = Args.parse EnvVar.tryGet argv try Log.Logger <- LoggerConfiguration().Configure(verbose=args.Verbose).CreateLogger() try run args |> Async.RunSynchronously; 0 - with e when not (e :? MissingArg) -> Log.Fatal(e, "Exiting"); 2 + with e when not (e :? System.Threading.Tasks.TaskCanceledException) -> Log.Fatal(e, "Exiting"); 2 finally Log.CloseAndFlush() - with MissingArg msg -> eprintfn "%s" msg; 1 - | :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1 - | e -> eprintf "Exception %s" e.Message; 1 + with :? Argu.ArguParseException as e -> eprintfn $"%s{e.Message}"; 1 + | e -> eprintf $"Exception %s{e.Message}"; 1 diff --git a/propulsion-tracking-consumer/TrackingConsumer.fsproj b/propulsion-tracking-consumer/TrackingConsumer.fsproj index b05e028e5..a81b68319 100644 --- a/propulsion-tracking-consumer/TrackingConsumer.fsproj +++ b/propulsion-tracking-consumer/TrackingConsumer.fsproj @@ -16,7 +16,7 @@ - +