From 1a42903a64c13c90fe50b78fb099e256c81e8e51 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Wed, 12 Jun 2024 19:18:34 +0100 Subject: [PATCH] Destroy skeleton --- tools/Equinox.Tool/Program.fs | 110 +++++++++++++++++++++++++++------- 1 file changed, 90 insertions(+), 20 deletions(-) diff --git a/tools/Equinox.Tool/Program.fs b/tools/Equinox.Tool/Program.fs index 91f1ba684..c263108f6 100644 --- a/tools/Equinox.Tool/Program.fs +++ b/tools/Equinox.Tool/Program.fs @@ -26,12 +26,14 @@ type Parameters = | [] Stats of ParseResults | [] Query of ParseResults | [] Top of ParseResults + | [] Destroy of ParseResults interface IArgParserTemplate with member a.Usage = a |> function | Verbose -> "Include low level logging regarding specific test runs." | VerboseConsole -> "Include low level test and store actions logging in on-screen output to console." | LocalSeq -> "Configures writing to a local Seq endpoint at http://localhost:5341, see https://getseq.net" | LogFile _ -> "specify a log file to write the result breakdown into (default: eqx.log)." + | Dump _ -> "Load and show events in a specified stream (supports all stores)." | LoadTest _ -> "Run a load test" | Init _ -> "Initialize Store/Container (supports `cosmos` stores; also handles RU/s provisioning adjustment)." | InitAws _ -> "Initialize DynamoDB Table (supports `dynamo` stores; also handles RU/s provisioning adjustment)." @@ -39,7 +41,7 @@ type Parameters = | Stats _ -> "inspect store to determine numbers of streams/documents/events and/or config (supports `cosmos` and `dynamo` stores)." | Query _ -> "Load/Summarise streams based on Cosmos SQL Queries (supports `cosmos` only)." | Top _ -> "Scan to determine top categories and streams (supports `cosmos` only)." - | Dump _ -> "Load and show events in a specified stream (supports all stores)." + | Destroy _ -> "DELETE documents for a nominated category and/or stream (includes a dry-run mode). (supports `cosmos` only)." and [] InitParameters = | [] Rus of int | [] Autoscale @@ -156,7 +158,7 @@ and QueryArguments(p: ParseResults) = match p.TryGetResult QueryParameters.StreamName, p.TryGetResult QueryParameters.CategoryName, p.TryGetResult QueryParameters.CategoryLike with | Some sn, None, None -> Criteria.SingleStream sn | Some _, Some _, _ - | Some _, _, Some _ -> p.Raise "StreamName and CategoryLike/CategoryName mutually exclusive" + | Some _, _, Some _ -> p.Raise "StreamName and CategoryLike/CategoryName are mutually exclusive" | None, Some cn, None -> Criteria.CatName cn | None, None, Some cl -> Criteria.CatLike cl | None, None, None -> Criteria.Unfiltered @@ -190,10 +192,10 @@ and [] TopParameters = and Order = Name | Items | Events | Unfolds | Size | EventSize | UnfoldSize | InflateSize | CorrCauseSize and TopArguments(p: ParseResults) = member val Criteria = - match p.TryGetResult StreamName, p.TryGetResult CategoryName, p.TryGetResult CategoryLike with + match p.TryGetResult TopParameters.StreamName, p.TryGetResult TopParameters.CategoryName, p.TryGetResult TopParameters.CategoryLike with | Some sn, None, None -> Criteria.SingleStream sn | Some _, Some _, _ - | Some _, _, Some _ -> p.Raise "StreamName and CategoryLike/CategoryName mutually exclusive" + | Some _, _, Some _ -> p.Raise "StreamName and CategoryLike/CategoryName are mutually exclusive" | None, Some cn, None -> Criteria.CatName cn | None, None, Some cl -> Criteria.CatLike cl | None, None, None -> Criteria.Unfiltered @@ -211,7 +213,39 @@ and TopArguments(p: ParseResults) = let qd = Microsoft.Azure.Cosmos.QueryDefinition sql let qo = Microsoft.Azure.Cosmos.QueryRequestOptions(MaxItemCount = x.CosmosArgs.QueryMaxItems) container.GetItemQueryIterator(qd, requestOptions = qo) - +and [] DestroyParameters = + | [] StreamName of string + | [] CategoryName of string + | [] CategoryLike of string + | [] Force + | [] Cosmos of ParseResults + interface IArgParserTemplate with + member a.Usage = a |> function + | StreamName _ -> "Specify stream name to match against `p`, e.g. `$UserServices-f7c1ce63389a45bdbea1cccebb1b3c8a`." + | CategoryName _ -> "Specify category name to match against `p`, e.g. `$UserServices`." + | CategoryLike _ -> "Specify category name to match against `p` as a Cosmos LIKE expression (with `%` as wildcard, e.g. `$UserServices-%`." + | Force -> "Actually delete the documents (default is a dry run, reporting what would be deleted)" + | Cosmos _ -> "Parameters for CosmosDB." +and DestroyArguments(p: ParseResults) = + member val Criteria = + match p.TryGetResult StreamName, p.TryGetResult CategoryName, p.TryGetResult CategoryLike with + | Some sn, None, None -> Criteria.SingleStream sn + | Some _, Some _, _ + | Some _, _, Some _ -> p.Raise "StreamName and CategoryLike/CategoryName are mutually exclusive" + | None, Some cn, None -> Criteria.CatName cn + | None, None, Some cl -> Criteria.CatLike cl + | None, None, None -> failwith "Category or stream name criteria must be supplied" + | None, Some _, Some _ -> p.Raise "CategoryLike and CategoryName are mutually exclusive" + member val CosmosArgs = p.GetResult DestroyParameters.Cosmos |> Store.Cosmos.Arguments + member val DryRun = p.Contains Force |> not + member x.Connect() = match Store.Cosmos.config Log.Logger (None, true) x.CosmosArgs with + | Store.Config.Cosmos (cc, _, _) -> cc.Container + | _ -> failwith "Destroy requires Cosmos" + member x.Execute(sql) = let container = x.Connect() + let qd = Microsoft.Azure.Cosmos.QueryDefinition sql + let qo = Microsoft.Azure.Cosmos.QueryRequestOptions(MaxItemCount = x.CosmosArgs.QueryMaxItems) + container.GetItemQueryIterator(qd, requestOptions = qo) +and SnEventsUnfolds = { sn: string; events: int; unfolds: int } and [] DumpParameters = | [] Stream of FsCodec.StreamName | [] Correlation @@ -348,6 +382,7 @@ module CosmosStats = open Equinox.CosmosStore.Linq.Internal open FSharp.Control + let run (log : ILogger, _verboseConsole, _maybeSeq) (p : ParseResults) = match p.GetSubCommand() with | StatsParameters.Cosmos sp -> @@ -389,6 +424,7 @@ let prettySerdes = lazy FsCodec.SystemTextJson.Serdes(FsCodec.SystemTextJson.Opt type System.Text.Json.JsonElement with member x.Timestamp = x.GetProperty("_ts").GetDouble() |> DateTime.UnixEpoch.AddSeconds member x.TryProp(name: string) = let mutable p = Unchecked.defaultof<_> in if x.TryGetProperty(name, &p) then ValueSome p else ValueNone + module StreamName = let categoryName = FsCodec.StreamName.parse >> FsCodec.StreamName.split >> fun struct (cn, _sid) -> cn @@ -464,6 +500,17 @@ module CosmosTop = open FSharp.Control open System.Text.Json + let _t = Unchecked.defaultof + let inline tryEquinoxStreamName (x: JsonElement) = + match x.TryProp(nameof _t.p) with + | ValueSome (je: JsonElement) when je.ValueKind = JsonValueKind.String -> + je.GetString() |> FsCodec.StreamName.parse |> FsCodec.StreamName.toString |> ValueSome + | _ -> ValueNone + let inline parseEquinoxStreamName (x: JsonElement) = + match tryEquinoxStreamName x with + | ValueNone -> failwith $"Could not parse document:\n{prettySerdes.Value.Serialize x}" + | ValueSome sn -> sn + module private Parser = let scratch = new System.IO.MemoryStream() let utf8Size (x: JsonElement) = @@ -483,15 +530,10 @@ module CosmosTop = ||> Seq.fold (fun struct (c, i) x -> struct (c + (x.TryProp(nameof _e.correlationId) |> stringLen) + (x.TryProp(nameof _e.causationId) |> stringLen), i + (x.TryProp(nameof _e.d) |> infSize) + (x.TryProp(nameof _e.m) |> infSize))) - let _t = Unchecked.defaultof - let inline tryEquinoxStreamName (x: JsonElement) = - match x.TryProp(nameof _t.p) with - | ValueSome (je: JsonElement) when je.ValueKind = JsonValueKind.String -> - je.GetString() |> FsCodec.StreamName.parse |> FsCodec.StreamName.toString |> ValueSome - | _ -> ValueNone let private tryParseEventOrUnfold = function | ValueNone -> struct (0, 0L, struct (0, 0L)) | ValueSome (x: JsonElement) -> x.GetArrayLength(), utf8Size x, dmcSize x + let _t = Unchecked.defaultof [] type Stat = { key: string; count: int; events: int; unfolds: int; bytes: int64; eBytes: int64; uBytes: int64; cBytes: int64; iBytes: int64 } @@ -517,15 +559,13 @@ module CosmosTop = let mutable pageI, pageE, pageU, pageB, pageCc, pageDm, newestTs, sw = 0, 0, 0, 0L, 0L, 0L, DateTime.MinValue, System.Diagnostics.Stopwatch.StartNew() for x in items do newestTs <- max newestTs x.Timestamp - match Parser.tryEquinoxStreamName x with - | ValueNone -> failwith $"Could not parse document:\n{prettySerdes.Value.Serialize x}" - | ValueSome sn -> - if pageStreams.Add sn && not a.StreamLevel then accStreams.Add sn |> ignore - let x = Parser.Stat.Create(group sn, x) - let mutable v = Unchecked.defaultof<_> - s.Add(if s.TryGetValue(x, &v) then s.Remove x |> ignore; v.Merge x else x) |> ignore - pageI <- pageI + 1; pageE <- pageE + x.events; pageU <- pageU + x.unfolds - pageB <- pageB + x.bytes; pageCc <- pageCc + x.cBytes; pageDm <- pageDm + x.iBytes + let sn = parseEquinoxStreamName x + if pageStreams.Add sn && not a.StreamLevel then accStreams.Add sn |> ignore + let x = Parser.Stat.Create(group sn, x) + let mutable v = Unchecked.defaultof<_> + s.Add(if s.TryGetValue(x, &v) then s.Remove x |> ignore; v.Merge x else x) |> ignore + pageI <- pageI + 1; pageE <- pageE + x.events; pageU <- pageU + x.unfolds + pageB <- pageB + x.bytes; pageCc <- pageCc + x.cBytes; pageDm <- pageDm + x.iBytes Log.Information("Page{rdc,5}>{count,4}i{streams,5}s{es,5}e{us,5}u{rds,5:f2}>{ods,4:f2}<{jds,4:f2}MiB{rc,7:f2}RU{s,5:N1}s D+M{im,4:f1} C+C{cm,5:f2} {ms,3}ms age {age:dddd\.hh\:mm\:ss}", rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, miB pageB, rc, rtt.TotalSeconds, miB pageDm, miB pageCc, sw.ElapsedMilliseconds, DateTime.UtcNow - newestTs) pageStreams.Clear() @@ -558,6 +598,35 @@ module CosmosTop = sort collapsed |> Seq.truncate a.Count |> Seq.iter render sort s |> Seq.truncate (if a.StreamLevel then a.StreamCount else a.Count) |> Seq.iter render } +module CosmosDestroy = + + open Equinox.CosmosStore.Linq.Internal + open FSharp.Control + + let run (a: DestroyArguments) = task { + let sw = System.Diagnostics.Stopwatch.StartNew() + let sql = $"SELECT c.p AS sn, ARRAYLENGTH(c.e) AS events, ARRAYLENGTH(c.u) AS unfolds FROM c WHERE {a.Criteria.Sql}" + if a.DryRun then Log.Warning("Dry-run of deleting all Items matching {sql}", sql) + else Log.Warning("DESTROYING all Items matching {sql}", sql) + + let pageStreams, accStreams = System.Collections.Generic.HashSet(), System.Collections.Generic.HashSet() + let mutable accI, accE, accU, accRus, accRds, accOds = 0L, 0L, 0L, 0., 0L, 0L + try for rtt, rc, items, rdc, rds, ods in a.Execute sql |> Query.enum__ do + let mutable pageI, pageE, pageU, sw = 0, 0, 0, System.Diagnostics.Stopwatch.StartNew() + for i in items do + if pageStreams.Add i.sn then accStreams.Add i.sn |> ignore + pageI <- pageI + 1; pageE <- pageE + i.events; pageU <- pageU + i.unfolds + Log.Information("Page{rdc,5}>{count,4}i{streams,5}s{es,5}e{us,5}u{rds,5:f2}>{ods,4:f2}{rc,7:f2}RU{s,5:N1}s {s,5:N1}s", + rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, rc, rtt.TotalSeconds, sw.Elapsed.TotalSeconds) + pageStreams.Clear() + accI <- accI + int64 pageI; accE <- accE + int64 pageE; accU <- accU + int64 pageU + accRus <- accRus + rc; accRds <- accRds + int64 rds; accOds <- accOds + int64 ods + finally + + let accCats = accStreams |> Seq.map StreamName.categoryName |> System.Collections.Generic.HashSet |> _.Count + Log.Information("TOTALS {count:N0}i {cats:N0}c {streams:N0}s {es:N0}e {us:N0}u read {rmib:f1}MiB output {omib:f1}MiB {ru:N2}RU {s:N1}s", + accI, accCats, accStreams.Count, accE, accU, miB accRds, miB accOds, accRus, sw.Elapsed.TotalSeconds) } + module DynamoInit = open Equinox.DynamoStore @@ -674,6 +743,7 @@ type Arguments(p: ParseResults) = | Dump a -> do! Dump.run (Log.Logger, verboseConsole, maybeSeq) a | Query a -> do! CosmosQuery.run (QueryArguments a) |> Async.AwaitTaskCorrect | Top a -> do! CosmosTop.run (TopArguments a) |> Async.AwaitTaskCorrect + | Destroy a -> do! CosmosDestroy.run (DestroyArguments a) |> Async.AwaitTaskCorrect | Stats a -> do! CosmosStats.run (Log.Logger, verboseConsole, maybeSeq) a | LoadTest a -> let n = p.GetResult(LogFile, fun () -> p.ProgramName + ".log") let reportFilename = System.IO.FileInfo(n).FullName