Skip to content

Commit

Permalink
Use CreateDynamoStoreClient
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Sep 13, 2023
1 parent 106c7c0 commit c5e890c
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 14 deletions.
24 changes: 12 additions & 12 deletions src/Propulsion.DynamoStore.Indexer/Function.fs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ open Serilog
type Configuration(?tryGet) =
let envVarTryGet = System.Environment.GetEnvironmentVariable >> Option.ofObj
let tryGet = defaultArg tryGet envVarTryGet
let get key = match tryGet key with Some value -> value | None -> failwithf "Missing Argument/Environment Variable %s" key
let get key = match tryGet key with Some value -> value | None -> failwith $"Missing Argument/Environment Variable %s{key}"

member _.DynamoRegion = tryGet Propulsion.DynamoStore.Lambda.Args.Dynamo.REGION
member _.DynamoServiceUrl = get Propulsion.DynamoStore.Lambda.Args.Dynamo.SERVICE_URL
Expand All @@ -20,35 +20,35 @@ type Configuration(?tryGet) =
member _.DynamoIndexTable = get Propulsion.DynamoStore.Lambda.Args.Dynamo.INDEX_TABLE
member _.OnlyWarnGap = tryGet Propulsion.DynamoStore.Lambda.Args.Dynamo.ONLY_WARN_GAP |> Option.map bool.Parse

type Store(connector: DynamoStoreConnector, table, dynamoItemSizeCutoffBytes) =
let queryMaxItems = 100
type Connector(client: DynamoStoreClient, table) =

let storeClient = connector.CreateDynamoDbClient() |> DynamoStoreClient
member val Context = DynamoStoreContext(storeClient, table, maxBytes = dynamoItemSizeCutoffBytes, queryMaxItems = queryMaxItems)
member _.CreateContext(dynamoItemSizeCutoffBytes, ?queryMaxItems) =
DynamoStoreContext(client, table, maxBytes = dynamoItemSizeCutoffBytes, queryMaxItems = defaultArg queryMaxItems 100)

new(c: Configuration, requestTimeout, retries, dynamoItemSizeCutoffBytes) =
let conn =
new(c: Configuration, requestTimeout, retries) =
let connector =
match c.DynamoRegion with
| Some r -> DynamoStoreConnector(r, requestTimeout, retries)
| None -> DynamoStoreConnector(c.DynamoServiceUrl, c.DynamoAccessKey, c.DynamoSecretKey, requestTimeout, retries)
Store(conn, c.DynamoIndexTable, dynamoItemSizeCutoffBytes)
Connector(connector.CreateDynamoStoreClient(), c.DynamoIndexTable)

type Function() =

// Larger optimizes for not needing to use TransactWriteItems as frequently
// Smaller will trigger more items and reduce read costs for Sources reading fro the tail
let itemCutoffKiB = 48
let config = Configuration()
let store = Store(config, requestTimeout = System.TimeSpan.FromSeconds 120., retries = 10, dynamoItemSizeCutoffBytes = itemCutoffKiB * 1024)
let connector = Connector(config, requestTimeout = System.TimeSpan.FromSeconds 120., retries = 10)
// TOCONSIDER surface metrics from write activities to prometheus by wiring up Metrics Sink (for now we log them instead)
let removeMetrics (e : Serilog.Events.LogEvent) = e.RemovePropertyIfPresent(Equinox.DynamoStore.Core.Log.PropertyTag)
let removeMetrics (e: Serilog.Events.LogEvent) = e.RemovePropertyIfPresent(Equinox.DynamoStore.Core.Log.PropertyTag)
let template = "{Level:u1} {Message} {Properties}{NewLine}{Exception}"
let log =
LoggerConfiguration()
.Enrich.With({ new Serilog.Core.ILogEventEnricher with member _.Enrich(evt, _) = removeMetrics evt })
.WriteTo.Console(outputTemplate = template)
.CreateLogger()
let ingester = DynamoStoreIngester(log, store.Context, ?onlyWarnOnGap = config.OnlyWarnGap)
let context = connector.CreateContext(dynamoItemSizeCutoffBytes = itemCutoffKiB * 1024)
let ingester = DynamoStoreIngester(log, context, ?onlyWarnOnGap = config.OnlyWarnGap)

member _.Handle(dynamoEvent : DynamoDBEvent, _context : ILambdaContext) : System.Threading.Tasks.Task =
member _.Handle(dynamoEvent: DynamoDBEvent, _context: ILambdaContext): System.Threading.Tasks.Task =
Handler.handle log ingester.Service dynamoEvent
3 changes: 1 addition & 2 deletions tools/Propulsion.Tool/Args.fs
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,7 @@ module Dynamo =
x.Endpoint, (let t = x.Timeout in t.TotalSeconds), x.Retries)
member x.CreateStoreClient() =
x.LogConfiguration()
x.CreateDynamoDbClient()
|> Equinox.DynamoStore.DynamoStoreClient
x.CreateDynamoStoreClient()

type Equinox.DynamoStore.DynamoStoreClient with

Expand Down

0 comments on commit c5e890c

Please sign in to comment.