Skip to content

Commit

Permalink
Kinesis checkpointer should retry on dynamodb provisioned throughput …
Browse files Browse the repository at this point in the history
…exception (#103)
  • Loading branch information
istreeter authored Jan 2, 2025
1 parent 86f0316 commit f2c9fac
Show file tree
Hide file tree
Showing 11 changed files with 117 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ package com.snowplowanalytics.snowplow.it.kinesis

import cats.effect.{IO, Ref}

import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._

import software.amazon.awssdk.core.SdkBytes
Expand All @@ -19,13 +18,13 @@ import software.amazon.awssdk.services.kinesis.model.{GetRecordsRequest, GetShar

import com.snowplowanalytics.snowplow.sources.{EventProcessor, TokenedEvents}
import com.snowplowanalytics.snowplow.sources.kinesis.KinesisSourceConfig
import com.snowplowanalytics.snowplow.sinks.kinesis.{BackoffPolicy, KinesisSinkConfig}
import com.snowplowanalytics.snowplow.kinesis.BackoffPolicy
import com.snowplowanalytics.snowplow.sinks.kinesis.KinesisSinkConfig

import java.net.URI
import java.nio.charset.StandardCharsets
import java.util.UUID
import java.time.Instant
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.DurationLong

import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest
Expand Down Expand Up @@ -96,12 +95,13 @@ object Utils {
Some(endpoint),
Some(endpoint),
10.seconds,
BigDecimal(1.0)
BigDecimal(1.0),
BackoffPolicy(100.millis, 1.second)
)

def getKinesisSinkConfig(endpoint: URI)(streamName: String): KinesisSinkConfig = KinesisSinkConfig(
streamName,
BackoffPolicy(FiniteDuration(1, TimeUnit.SECONDS), FiniteDuration(1, TimeUnit.SECONDS), None),
BackoffPolicy(1.second, 1.second),
1000,
1000000,
Some(endpoint)
Expand Down
4 changes: 4 additions & 0 deletions modules/kinesis/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ snowplow.defaults: {
}
leaseDuration: "10 seconds"
maxLeasesToStealAtOneTimeFactor: 2.0
checkpointThrottledBackoffPolicy: {
minBackoff: "100 millis"
maxBackoff: "1 second"
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.kinesis

import io.circe._
import io.circe.generic.semiauto._
import io.circe.config.syntax._
import scala.concurrent.duration.FiniteDuration

case class BackoffPolicy(
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration
)

object BackoffPolicy {

implicit def backoffPolicyDecoder: Decoder[BackoffPolicy] =
deriveDecoder[BackoffPolicy]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.kinesis

import cats.Applicative
import retry.{RetryPolicies, RetryPolicy}

object Retries {

/**
* A retry policy appropriate for when we are throttled by a AWS rate limit.
*
* E.g. throttled by Kinesis when sinking records; or throttled by Dynamodb when checkpointing.
*/
def forThrottling[F[_]: Applicative](config: BackoffPolicy): RetryPolicy[F] =
RetryPolicies.capDelay[F](config.maxBackoff, RetryPolicies.fibonacciBackoff[F](config.minBackoff))
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@
package com.snowplowanalytics.snowplow.sinks.kinesis

import cats.implicits._
import cats.{Applicative, Parallel}
import cats.Parallel
import cats.effect.{Async, Resource, Sync}
import cats.effect.kernel.Ref

import org.typelevel.log4cats.{Logger, SelfAwareStructuredLogger}
import org.typelevel.log4cats.slf4j.Slf4jLogger
import retry.syntax.all._
import retry.{RetryPolicies, RetryPolicy}

import software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.services.kinesis.KinesisClient
Expand All @@ -24,6 +23,8 @@ import software.amazon.awssdk.awscore.defaultsmode.DefaultsMode
import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain
import software.amazon.awssdk.services.kinesis.model.{PutRecordsRequest, PutRecordsRequestEntry, PutRecordsResponse}

import com.snowplowanalytics.snowplow.kinesis.{BackoffPolicy, Retries}

import java.net.URI
import java.util.UUID
import java.nio.charset.StandardCharsets.UTF_8
Expand Down Expand Up @@ -202,7 +203,7 @@ object KinesisSink {
streamName: String,
records: ListOfList[Sinkable]
): F[Unit] = {
val policyForThrottling = Retries.fibonacci[F](throttlingErrorsPolicy)
val policyForThrottling = Retries.forThrottling[F](throttlingErrorsPolicy)

// First, tryWriteToKinesis - the AWS SDK will handle retries. If there are still failures after that, it will:
// - return messages for retries if we only hit throttliing
Expand Down Expand Up @@ -233,17 +234,6 @@ object KinesisSink {

private final case class RequestLimits(recordLimit: Int, bytesLimit: Int)

private object Retries {

def fibonacci[F[_]: Applicative](config: BackoffPolicy): RetryPolicy[F] =
capBackoffAndRetries(config, RetryPolicies.fibonacciBackoff[F](config.minBackoff))

private def capBackoffAndRetries[F[_]: Applicative](config: BackoffPolicy, policy: RetryPolicy[F]): RetryPolicy[F] = {
val capped = RetryPolicies.capDelay[F](config.maxBackoff, policy)
config.maxRetries.fold(capped)(max => capped.join(RetryPolicies.limitRetries(max)))
}
}

private def getRecordSize(record: PutRecordsRequestEntry) =
record.data.asByteArrayUnsafe().length + record.partitionKey().getBytes(UTF_8).length

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,10 @@ package com.snowplowanalytics.snowplow.sinks.kinesis

import io.circe._
import io.circe.generic.semiauto._
import io.circe.config.syntax._
import scala.concurrent.duration.FiniteDuration

import java.net.URI

case class BackoffPolicy(
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
maxRetries: Option[Int]
)
import com.snowplowanalytics.snowplow.kinesis.BackoffPolicy

object BackoffPolicy {

implicit def backoffPolicyDecoder: Decoder[BackoffPolicy] =
deriveDecoder[BackoffPolicy]
}
import java.net.URI

case class KinesisSinkConfig(
streamName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,21 @@ package com.snowplowanalytics.snowplow.sources.kinesis
import cats.effect.{Async, Sync}
import cats.implicits._
import cats.effect.implicits._
import com.snowplowanalytics.snowplow.sources.internal.Checkpointer
import org.typelevel.log4cats.Logger
import software.amazon.kinesis.exceptions.ShutdownException
import retry.syntax.all._
import software.amazon.kinesis.exceptions.{ShutdownException, ThrottlingException}
import software.amazon.kinesis.processor.RecordProcessorCheckpointer
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber

import com.snowplowanalytics.snowplow.sources.internal.Checkpointer
import com.snowplowanalytics.snowplow.kinesis.{BackoffPolicy, Retries}

import java.util.concurrent.CountDownLatch

private class KinesisCheckpointer[F[_]: Async: Logger] extends Checkpointer[F, Map[String, Checkpointable]] {
private class KinesisCheckpointer[F[_]: Async: Logger](throttledBackoffPolicy: BackoffPolicy)
extends Checkpointer[F, Map[String, Checkpointable]] {

private val retryPolicy = Retries.forThrottling[F](throttledBackoffPolicy)

override val empty: Map[String, Checkpointable] = Map.empty

Expand Down Expand Up @@ -56,6 +62,18 @@ private class KinesisCheckpointer[F[_]: Async: Logger] extends Checkpointer[F, M
checkpointer.checkpoint(extendedSequenceNumber.sequenceNumber, extendedSequenceNumber.subSequenceNumber)
)
.recoverWith(ignoreShutdownExceptions(shardId))
.retryingOnSomeErrors(
policy = retryPolicy,
isWorthRetrying = {
case _: ThrottlingException => true.pure[F]
case _ => false.pure[F]
},
onError = { case (_, retryDetails) =>
Logger[F].warn(
s"Exceeded DynamoDB provisioned throughput. Checkpointing will be retried. (${retryDetails.retriesSoFar} retries so far)"
)
}
)

private def ignoreShutdownExceptions(shardId: String): PartialFunction[Throwable, F[Unit]] = { case _: ShutdownException =>
// The ShardRecordProcessor instance has been shutdown. This just means another KCL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ object KinesisSource {
kinesisStream(config, liveness)

def checkpointer: KinesisCheckpointer[F] =
new KinesisCheckpointer[F]()
new KinesisCheckpointer[F](config.checkpointThrottledBackoffPolicy)

def lastLiveness: F[FiniteDuration] =
liveness.get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import java.net.URI
import java.time.Instant
import scala.concurrent.duration.FiniteDuration

import com.snowplowanalytics.snowplow.kinesis.BackoffPolicy

/**
* Config to be supplied from the app's hocon
*
Expand Down Expand Up @@ -46,7 +48,8 @@ case class KinesisSourceConfig(
dynamodbCustomEndpoint: Option[URI],
cloudwatchCustomEndpoint: Option[URI],
leaseDuration: FiniteDuration,
maxLeasesToStealAtOneTimeFactor: BigDecimal
maxLeasesToStealAtOneTimeFactor: BigDecimal,
checkpointThrottledBackoffPolicy: BackoffPolicy
)

object KinesisSourceConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import org.specs2.Specification

import scala.concurrent.duration.DurationLong

import com.snowplowanalytics.snowplow.kinesis.BackoffPolicy

class KinesisSinkConfigSpec extends Specification {
import KinesisSinkConfigSpec._

Expand All @@ -38,7 +40,7 @@ class KinesisSinkConfigSpec extends Specification {

val expected = KinesisSinkConfig(
streamName = "my-stream",
throttledBackoffPolicy = BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second, maxRetries = None),
throttledBackoffPolicy = BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second),
recordLimit = 500,
byteLimit = 5242880,
customEndpoint = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import io.circe.generic.semiauto._
import org.specs2.Specification
import scala.concurrent.duration.DurationLong

import com.snowplowanalytics.snowplow.kinesis.BackoffPolicy

class KinesisSourceConfigSpec extends Specification {
import KinesisSourceConfigSpec._

Expand All @@ -40,7 +42,11 @@ class KinesisSourceConfigSpec extends Specification {
"type": "TrimHorizon"
},
"leaseDuration": "20 seconds",
"maxLeasesToStealAtOneTimeFactor": 0.42
"maxLeasesToStealAtOneTimeFactor": 0.42,
"checkpointThrottledBackoffPolicy": {
"minBackoff": "100 millis",
"maxBackoff": "1second"
}
}
"""

Expand All @@ -52,7 +58,8 @@ class KinesisSourceConfigSpec extends Specification {
c.initialPosition must beEqualTo(KinesisSourceConfig.InitialPosition.TrimHorizon),
c.retrievalMode must beEqualTo(KinesisSourceConfig.Retrieval.Polling(42)),
c.leaseDuration must beEqualTo(20.seconds),
c.maxLeasesToStealAtOneTimeFactor must beEqualTo(BigDecimal(0.42))
c.maxLeasesToStealAtOneTimeFactor must beEqualTo(BigDecimal(0.42)),
c.checkpointThrottledBackoffPolicy must beEqualTo(BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second))
).reduce(_ and _)
}
}
Expand All @@ -71,7 +78,11 @@ class KinesisSourceConfigSpec extends Specification {
"type": "TRIM_HORIZON"
},
"leaseDuration": "20 seconds",
"maxLeasesToStealAtOneTimeFactor": 0.42
"maxLeasesToStealAtOneTimeFactor": 0.42,
"checkpointThrottledBackoffPolicy": {
"minBackoff": "100 millis",
"maxBackoff": "1second"
}
}
"""

Expand All @@ -83,7 +94,8 @@ class KinesisSourceConfigSpec extends Specification {
c.initialPosition must beEqualTo(KinesisSourceConfig.InitialPosition.TrimHorizon),
c.retrievalMode must beEqualTo(KinesisSourceConfig.Retrieval.Polling(42)),
c.leaseDuration must beEqualTo(20.seconds),
c.maxLeasesToStealAtOneTimeFactor must beEqualTo(BigDecimal(0.42))
c.maxLeasesToStealAtOneTimeFactor must beEqualTo(BigDecimal(0.42)),
c.checkpointThrottledBackoffPolicy must beEqualTo(BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second))
).reduce(_ and _)
}
}
Expand All @@ -102,16 +114,17 @@ class KinesisSourceConfigSpec extends Specification {
val result = ConfigFactory.load(ConfigFactory.parseString(input))

val expected = KinesisSourceConfig(
appName = "my-app",
streamName = "my-stream",
workerIdentifier = System.getenv("HOSTNAME"),
initialPosition = KinesisSourceConfig.InitialPosition.Latest,
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000),
customEndpoint = None,
dynamodbCustomEndpoint = None,
cloudwatchCustomEndpoint = None,
leaseDuration = 10.seconds,
maxLeasesToStealAtOneTimeFactor = BigDecimal(2.0)
appName = "my-app",
streamName = "my-stream",
workerIdentifier = System.getenv("HOSTNAME"),
initialPosition = KinesisSourceConfig.InitialPosition.Latest,
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000),
customEndpoint = None,
dynamodbCustomEndpoint = None,
cloudwatchCustomEndpoint = None,
leaseDuration = 10.seconds,
maxLeasesToStealAtOneTimeFactor = BigDecimal(2.0),
checkpointThrottledBackoffPolicy = BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second)
)

result.as[Wrapper] must beRight.like { case w: Wrapper =>
Expand Down

0 comments on commit f2c9fac

Please sign in to comment.