diff --git a/tools/Equinox.Tool/Program.fs b/tools/Equinox.Tool/Program.fs index c263108f6..38db999c5 100644 --- a/tools/Equinox.Tool/Program.fs +++ b/tools/Equinox.Tool/Program.fs @@ -241,11 +241,7 @@ and DestroyArguments(p: ParseResults) = member x.Connect() = match Store.Cosmos.config Log.Logger (None, true) x.CosmosArgs with | Store.Config.Cosmos (cc, _, _) -> cc.Container | _ -> failwith "Destroy requires Cosmos" - member x.Execute(sql) = let container = x.Connect() - let qd = Microsoft.Azure.Cosmos.QueryDefinition sql - let qo = Microsoft.Azure.Cosmos.QueryRequestOptions(MaxItemCount = x.CosmosArgs.QueryMaxItems) - container.GetItemQueryIterator(qd, requestOptions = qo) -and SnEventsUnfolds = { sn: string; events: int; unfolds: int } +and SnEventsUnfolds = { p: string; id: string; es: int; us: int } and [] DumpParameters = | [] Stream of FsCodec.StreamName | [] Correlation @@ -604,28 +600,44 @@ module CosmosDestroy = open FSharp.Control let run (a: DestroyArguments) = task { - let sw = System.Diagnostics.Stopwatch.StartNew() - let sql = $"SELECT c.p AS sn, ARRAYLENGTH(c.e) AS events, ARRAYLENGTH(c.u) AS unfolds FROM c WHERE {a.Criteria.Sql}" + let tsw = System.Diagnostics.Stopwatch.StartNew() + let sql = $"SELECT c.p, c.id, ARRAYLENGTH(c.e) AS es, ARRAYLENGTH(c.u) AS us FROM c WHERE {a.Criteria.Sql}" if a.DryRun then Log.Warning("Dry-run of deleting all Items matching {sql}", sql) else Log.Warning("DESTROYING all Items matching {sql}", sql) - + let container = a.Connect() + let query = + let qd = Microsoft.Azure.Cosmos.QueryDefinition sql + let qo = Microsoft.Azure.Cosmos.QueryRequestOptions(MaxItemCount = a.CosmosArgs.QueryMaxItems) + container.GetItemQueryIterator(qd, requestOptions = qo) let pageStreams, accStreams = System.Collections.Generic.HashSet(), System.Collections.Generic.HashSet() - let mutable accI, accE, accU, accRus, accRds, accOds = 0L, 0L, 0L, 0., 0L, 0L - try for rtt, rc, items, rdc, rds, ods in a.Execute sql |> Query.enum__ do - let mutable pageI, pageE, pageU, sw = 0, 0, 0, System.Diagnostics.Stopwatch.StartNew() + let mutable accI, accE, accU, accRus, accDelRu, accRds, accOds = 0L, 0L, 0L, 0., 0., 0L, 0L + try for rtt, rc, items, rdc, rds, ods in query |> Query.enum__ do + let mutable pageI, pageE, pageU, pdRu, idRu = 0, 0, 0, 0., 0. + let psw, isw = System.Diagnostics.Stopwatch.StartNew(), System.Diagnostics.Stopwatch.StartNew() for i in items do - if pageStreams.Add i.sn then accStreams.Add i.sn |> ignore - pageI <- pageI + 1; pageE <- pageE + i.events; pageU <- pageU + i.unfolds - Log.Information("Page{rdc,5}>{count,4}i{streams,5}s{es,5}e{us,5}u{rds,5:f2}>{ods,4:f2}{rc,7:f2}RU{s,5:N1}s {s,5:N1}s", - rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, rc, rtt.TotalSeconds, sw.Elapsed.TotalSeconds) + if pageStreams.Add i.p then accStreams.Add i.p |> ignore + pageI <- pageI + 1; pageE <- pageE + i.es; pageU <- pageU + i.us + if not a.DryRun then + let! res = container.DeleteItemStreamAsync(i.id, Microsoft.Azure.Cosmos.PartitionKey i.p) + let ru = res.Headers.RequestCharge in idRu <- idRu + ru; pdRu <- pdRu + ru + if not res.IsSuccessStatusCode then + failwith $"Deletion of {i.p}/{i.id} failed with Code: {res.StatusCode} Message: {res.ErrorMessage}\n{res.Diagnostics}" + if isw.Elapsed.TotalSeconds > 30 then + Log.Information(".. Deleted {count,5}i {streams,7}s{es,7}e{us,7}u {ru,6:N2}WRU/s {s,5:N1}s", + pageI, pageStreams.Count, pageE, pageU, idRu / isw.Elapsed.TotalSeconds, psw.Elapsed.TotalSeconds) + isw.Restart() + idRu <- 0 + let ps = psw.Elapsed.TotalSeconds + Log.Information("Page{rdc,6}>{count,5}i {streams,7}s{es,7}e{us,7}u{rds,8:f2}>{ods,4:f2} {rc,8:f2}RRU {s,5:N1}s {ru:N2}WRU/s {s,5:N1}s", + rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, rc, rtt.TotalSeconds, pdRu / ps, ps) pageStreams.Clear() accI <- accI + int64 pageI; accE <- accE + int64 pageE; accU <- accU + int64 pageU - accRus <- accRus + rc; accRds <- accRds + int64 rds; accOds <- accOds + int64 ods + accRus <- accRus + rc; accDelRu <- accDelRu + pdRu; accRds <- accRds + int64 rds; accOds <- accOds + int64 ods finally let accCats = accStreams |> Seq.map StreamName.categoryName |> System.Collections.Generic.HashSet |> _.Count - Log.Information("TOTALS {count:N0}i {cats:N0}c {streams:N0}s {es:N0}e {us:N0}u read {rmib:f1}MiB output {omib:f1}MiB {ru:N2}RU {s:N1}s", - accI, accCats, accStreams.Count, accE, accU, miB accRds, miB accOds, accRus, sw.Elapsed.TotalSeconds) } + Log.Information("TOTALS {count:N0}i {cats:N0}c {streams:N0}s {es:N0}e {us:N0}u read {rmib:f1}MiB output {omib:f1}MiB {rru:N2}RRU Avg {ru:N2}WRU/s Delete {ru:N2}WRU Total {s:N1}s", + accI, accCats, accStreams.Count, accE, accU, miB accRds, miB accOds, accRus, accDelRu / tsw.Elapsed.TotalSeconds, accDelRu, tsw.Elapsed.TotalSeconds) } module DynamoInit =