Skip to content

Commit

Permalink
Equinox.Tool: Add support for Cosmos Autoscaling (#302)
Browse files Browse the repository at this point in the history
Adds an `--autoscale`/`-A` switch to the `eqx` tool to enable support
for the Autoscaling feature of Cosmos Containers and Databases.
Specifying this flag causes the `--rus` parameter to be interpreted
as "Maximum RU/s" and changes the default value to 4000 RU/s, the
minimum value allowed by Cosmos.
  • Loading branch information
belcher-rok authored Nov 17, 2021
1 parent 7d31312 commit abb3366
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 39 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
### Added

- Stores: Expose `.Log.PropertyTag` Literals to enable log filtering [#298](https://github.com/jet/equinox/pull/298~~~~)
- `Equinox.Tool`: Add support for [autoscaling throughput](https://docs.microsoft.com/en-us/azure/cosmos-db/provision-throughput-autoscale) of Cosmos containers and databases [#302](https://github.com/jet/equinox/pull/302) :pray: [@belcher-rok](https://github.com/belcher-rok)

### Changed
### Removed
Expand Down
60 changes: 33 additions & 27 deletions src/Equinox.CosmosStore/CosmosStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -565,43 +565,49 @@ module internal Sync =

module Initialization =

type [<RequireQualifiedAccess>] Provisioning = Container of rus: int | Database of rus: int | Serverless
let adjustOfferC (c:Container) (rus : int) = async {
// Note: the Cosmos SDK does not (currently) support changing the Throughput mode of an existing Database or Container.
type [<RequireQualifiedAccess>] Throughput = Manual of rus: int | Autoscale of maxRus: int
type [<RequireQualifiedAccess>] Provisioning = Container of Throughput | Database of Throughput | Serverless
let (|ThroughputProperties|) = function
| Throughput.Manual rus -> ThroughputProperties.CreateManualThroughput(rus)
| Throughput.Autoscale maxRus -> ThroughputProperties.CreateAutoscaleThroughput(maxRus)
let (|MaybeThroughputProperties|) = Option.map (function ThroughputProperties tp -> tp) >> Option.toObj
let adjustOfferC (c:Container) (ThroughputProperties tp) = async {
let! ct = Async.CancellationToken
let! _ = c.ReplaceThroughputAsync(rus, cancellationToken = ct) |> Async.AwaitTaskCorrect in () }
let adjustOfferD (d:Database) (rus : int) = async {
let! _ = c.ReplaceThroughputAsync(tp, cancellationToken = ct) |> Async.AwaitTaskCorrect in () }
let adjustOfferD (d : Database) (ThroughputProperties tp) = async {
let! ct = Async.CancellationToken
let! _ = d.ReplaceThroughputAsync(rus, cancellationToken = ct) |> Async.AwaitTaskCorrect in () }
let private createDatabaseIfNotExists (client:CosmosClient) dName maybeRus = async {
let! _ = d.ReplaceThroughputAsync(tp, cancellationToken = ct) |> Async.AwaitTaskCorrect in () }
let private createDatabaseIfNotExists (client : CosmosClient) dName (MaybeThroughputProperties tpOrNull) = async {
let! ct = Async.CancellationToken
let! dbr = client.CreateDatabaseIfNotExistsAsync(id = dName, throughput = Option.toNullable maybeRus, cancellationToken = ct) |> Async.AwaitTaskCorrect
let! dbr = client.CreateDatabaseIfNotExistsAsync(dName, throughputProperties = tpOrNull, cancellationToken = ct) |> Async.AwaitTaskCorrect
return dbr.Database }
let private createOrProvisionDatabase (client:CosmosClient) dName mode = async {
let private createOrProvisionDatabase (client : CosmosClient) dName mode = async {
match mode with
| Provisioning.Database rus ->
let! db = createDatabaseIfNotExists client dName (Some rus)
do! adjustOfferD db rus
| Provisioning.Database throughput ->
let! db = createDatabaseIfNotExists client dName (Some throughput)
do! adjustOfferD db throughput
| Provisioning.Container _ | Provisioning.Serverless ->
let! _ = createDatabaseIfNotExists client dName None in () }
let private createContainerIfNotExists (d:Database) (cp:ContainerProperties) maybeRus = async {
let private createContainerIfNotExists (d : Database) (cp : ContainerProperties) (MaybeThroughputProperties tpOrNull) = async {
let! ct = Async.CancellationToken
let! c = d.CreateContainerIfNotExistsAsync(cp, throughput = Option.toNullable maybeRus, cancellationToken = ct) |> Async.AwaitTaskCorrect
let! c = d.CreateContainerIfNotExistsAsync(cp, throughputProperties = tpOrNull, cancellationToken = ct) |> Async.AwaitTaskCorrect
return c.Container }
let private createOrProvisionContainer (d:Database) (cp:ContainerProperties) mode = async {
let private createOrProvisionContainer (d : Database) (cp : ContainerProperties) mode = async {
match mode with
| Provisioning.Container rus ->
let! c = createContainerIfNotExists d cp (Some rus)
do! adjustOfferC c rus
| Provisioning.Container throughput ->
let! c = createContainerIfNotExists d cp (Some throughput)
do! adjustOfferC c throughput
return c
| Provisioning.Database _ | Provisioning.Serverless ->
return! createContainerIfNotExists d cp None }
let private createStoredProcIfNotExists (c:Container) (name, body): Async<float> = async {
let private createStoredProcIfNotExists (c : Container) (name, body): Async<float> = async {
try let! r = c.Scripts.CreateStoredProcedureAsync(Scripts.StoredProcedureProperties(id = name, body = body)) |> Async.AwaitTaskCorrect
return r.RequestCharge
with (:? Microsoft.Azure.Cosmos.CosmosException as ce) when ce.StatusCode = System.Net.HttpStatusCode.Conflict -> return ce.RequestCharge }
let private mkContainerProperties containerName partitionKeyFieldName =
ContainerProperties(id = containerName, partitionKeyPath = sprintf "/%s" partitionKeyFieldName)
let private createBatchAndTipContainerIfNotExists (client: CosmosClient) (dName,cName) mode : Async<Container> =
let private createBatchAndTipContainerIfNotExists (client : CosmosClient) (dName, cName) mode : Async<Container> =
let def = mkContainerProperties cName Batch.PartitionKeyField
def.IndexingPolicy.IndexingMode <- IndexingMode.Consistent
def.IndexingPolicy.Automatic <- true
Expand All @@ -611,27 +617,27 @@ module Initialization =
// NB its critical to index the nominated PartitionKey field defined above or there will be runtime errors
for k in Batch.IndexedFields do def.IndexingPolicy.IncludedPaths.Add(IncludedPath(Path = sprintf "/%s/?" k))
createOrProvisionContainer (client.GetDatabase dName) def mode
let createSyncStoredProcIfNotExists (log: ILogger option) container = async {
let! t, ru = createStoredProcIfNotExists container (SyncStoredProc.name,SyncStoredProc.body) |> Stopwatch.Time
let createSyncStoredProcIfNotExists (log : ILogger option) container = async {
let! t, ru = createStoredProcIfNotExists container (SyncStoredProc.name, SyncStoredProc.body) |> Stopwatch.Time
match log with
| None -> ()
| Some log -> log.Information("Created stored procedure {procName} in {ms}ms {ru}RU", SyncStoredProc.name, (let e = t.Elapsed in e.TotalMilliseconds), ru) }
let private createAuxContainerIfNotExists (client: CosmosClient) (dName,cName) mode : Async<Container> =
let private createAuxContainerIfNotExists (client: CosmosClient) (dName, cName) mode : Async<Container> =
let def = mkContainerProperties cName "id" // as per Cosmos team, Partition Key must be "/id"
// TL;DR no indexing of any kind; see https://github.com/Azure/azure-documentdb-changefeedprocessor-dotnet/issues/142
def.IndexingPolicy.Automatic <- false
def.IndexingPolicy.IndexingMode <- IndexingMode.None
createOrProvisionContainer (client.GetDatabase dName) def mode
let init log (client: CosmosClient) (dName,cName) mode skipStoredProc = async {
let init log (client : CosmosClient) (dName, cName) mode skipStoredProc = async {
do! createOrProvisionDatabase client dName mode
let! container = createBatchAndTipContainerIfNotExists client (dName,cName) mode
let! container = createBatchAndTipContainerIfNotExists client (dName, cName) mode
if not skipStoredProc then
do! createSyncStoredProcIfNotExists (Some log) container }
let initAux (client: CosmosClient) (dName,cName) rus = async {
let initAux (client : CosmosClient) (dName, cName) rus = async {
// Hardwired for now (not sure if CFP can store in a Database-allocated as it would need to be supplying partition keys)
let mode = Provisioning.Container rus
let mode = Provisioning.Container (Throughput.Manual rus)
do! createOrProvisionDatabase client dName mode
return! createAuxContainerIfNotExists client (dName,cName) mode }
return! createAuxContainerIfNotExists client (dName, cName) mode }

/// Holds Container state, coordinating initialization activities
type internal ContainerInitializerGuard(container : Container, fallback : Container option, ?initContainer : Container -> Async<unit>) =
Expand Down
28 changes: 16 additions & 12 deletions tools/Equinox.Tool/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ open System
open System.Net.Http
open System.Threading

type Provisioning = Equinox.CosmosStore.Core.Initialization.Provisioning
module CosmosInit = Equinox.CosmosStore.Core.Initialization

let [<Literal>] appName = "equinox-tool"

Expand Down Expand Up @@ -41,23 +41,29 @@ type Arguments =
| Dump _ -> "Load and show events in a specified stream (supports all stores)."
and [<NoComparison; NoEquality>]InitArguments =
| [<AltCommandLine "-ru">] Rus of int
| [<AltCommandLine "-A">] Autoscale
| [<AltCommandLine "-m">] Mode of CosmosModeType
| [<AltCommandLine "-P">] SkipStoredProc
| [<CliPrefix(CliPrefix.None)>] Cosmos of ParseResults<Storage.Cosmos.Arguments>
interface IArgParserTemplate with
member a.Usage = a |> function
| Rus _ -> "Specify RU/s level to provision for the Container (Default: 400 RU/s)."
| Rus _ -> "Specify RU/s level to provision for the Container (Default: 400 RU/s or 4000 RU/s if autoscaling)."
| Autoscale -> "Autoscale provisioned throughput. Use --rus to specify the maximum RU/s."
| Mode _ -> "Configure RU mode to use Container-level RU, Database-level RU, or Serverless allocations (Default: Use Container-level allocation)."
| SkipStoredProc -> "Inhibit creation of stored procedure in specified Container."
| Cosmos _ -> "Cosmos Connection parameters."
and CosmosInitInfo(args : ParseResults<InitArguments>) =
member __.ProvisioningMode =
let throughput () =
if args.Contains Autoscale
then CosmosInit.Throughput.Autoscale (args.GetResult(Rus, 4000))
else CosmosInit.Throughput.Manual (args.GetResult(Rus, 400))
match args.GetResult(Mode, CosmosModeType.Container) with
| CosmosModeType.Container -> Provisioning.Container (args.GetResult(Rus, 400))
| CosmosModeType.Db -> Provisioning.Database (args.GetResult(Rus, 400))
| CosmosModeType.Container -> CosmosInit.Provisioning.Container (throughput ())
| CosmosModeType.Db -> CosmosInit.Provisioning.Database (throughput ())
| CosmosModeType.Serverless ->
if args.Contains Rus then raise (Storage.MissingArg "Cannot specify RU/s in Serverless mode")
Provisioning.Serverless
if args.Contains Rus || args.Contains Autoscale then raise (Storage.MissingArg "Cannot specify RU/s or Autoscale in Serverless mode")
CosmosInit.Provisioning.Serverless
and [<NoComparison; NoEquality>]ConfigArguments =
| [<CliPrefix(CliPrefix.None); Last; AltCommandLine "ms">] MsSql of ParseResults<Storage.Sql.Ms.Arguments>
| [<CliPrefix(CliPrefix.None); Last; AltCommandLine "my">] MySql of ParseResults<Storage.Sql.My.Arguments>
Expand Down Expand Up @@ -312,8 +318,6 @@ let createDomainLog verbose verboseConsole maybeSeqEndpoint =

module CosmosInit =

open Equinox.CosmosStore.Core.Initialization

let connect log (sargs : ParseResults<Storage.Cosmos.Arguments>) =
Storage.Cosmos.connect log (Storage.Cosmos.Info sargs) |> fst

Expand All @@ -324,16 +328,16 @@ module CosmosInit =
let client,dName,cName = connect log sargs
let mode = (CosmosInitInfo iargs).ProvisioningMode
match mode with
| Provisioning.Container ru ->
| CosmosInit.Provisioning.Container ru ->
let modeStr = "Container"
log.Information("Provisioning `Equinox.CosmosStore` Store at {mode:l} level for {rus:n0} RU/s", modeStr, ru)
| Provisioning.Database ru ->
| CosmosInit.Provisioning.Database ru ->
let modeStr = "Database"
log.Information("Provisioning `Equinox.CosmosStore` Store at {mode:l} level for {rus:n0} RU/s", modeStr, ru)
| Provisioning.Serverless ->
| CosmosInit.Provisioning.Serverless ->
let modeStr = "Serverless"
log.Information("Provisioning `Equinox.CosmosStore` Store in {mode:l} mode with automatic RU/s as configured in account", modeStr)
return! init log client (dName,cName) mode skipStoredProc
return! CosmosInit.init log client (dName,cName) mode skipStoredProc
| _ -> failwith "please specify a `cosmos` endpoint" }

module SqlInit =
Expand Down

0 comments on commit abb3366

Please sign in to comment.