diff --git a/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala b/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala index 61fc3384..ae661384 100644 --- a/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala +++ b/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala @@ -9,7 +9,6 @@ package com.snowplowanalytics.snowplow.it.kinesis import cats.effect.{IO, Ref} -import scala.concurrent.duration.FiniteDuration import scala.jdk.CollectionConverters._ import software.amazon.awssdk.core.SdkBytes @@ -19,13 +18,13 @@ import software.amazon.awssdk.services.kinesis.model.{GetRecordsRequest, GetShar import com.snowplowanalytics.snowplow.sources.{EventProcessor, TokenedEvents} import com.snowplowanalytics.snowplow.sources.kinesis.KinesisSourceConfig -import com.snowplowanalytics.snowplow.sinks.kinesis.{BackoffPolicy, KinesisSinkConfig} +import com.snowplowanalytics.snowplow.kinesis.BackoffPolicy +import com.snowplowanalytics.snowplow.sinks.kinesis.KinesisSinkConfig import java.net.URI import java.nio.charset.StandardCharsets import java.util.UUID import java.time.Instant -import java.util.concurrent.TimeUnit import scala.concurrent.duration.DurationLong import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest @@ -96,12 +95,13 @@ object Utils { Some(endpoint), Some(endpoint), 10.seconds, - BigDecimal(1.0) + BigDecimal(1.0), + BackoffPolicy(100.millis, 1.second) ) def getKinesisSinkConfig(endpoint: URI)(streamName: String): KinesisSinkConfig = KinesisSinkConfig( streamName, - BackoffPolicy(FiniteDuration(1, TimeUnit.SECONDS), FiniteDuration(1, TimeUnit.SECONDS), None), + BackoffPolicy(1.second, 1.second), 1000, 1000000, Some(endpoint) diff --git a/modules/kinesis/src/main/resources/reference.conf b/modules/kinesis/src/main/resources/reference.conf index 032d7e5c..d7d878cf 100644 --- a/modules/kinesis/src/main/resources/reference.conf +++ b/modules/kinesis/src/main/resources/reference.conf @@ -11,6 +11,10 @@ snowplow.defaults: { } leaseDuration: "10 seconds" maxLeasesToStealAtOneTimeFactor: 2.0 + checkpointThrottledBackoffPolicy: { + minBackoff: "100 millis" + maxBackoff: "1 second" + } } } diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/kinesis/BackoffPolicy.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/kinesis/BackoffPolicy.scala new file mode 100644 index 00000000..6efcfb8a --- /dev/null +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/kinesis/BackoffPolicy.scala @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.kinesis + +import io.circe._ +import io.circe.generic.semiauto._ +import io.circe.config.syntax._ +import scala.concurrent.duration.FiniteDuration + +case class BackoffPolicy( + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration +) + +object BackoffPolicy { + + implicit def backoffPolicyDecoder: Decoder[BackoffPolicy] = + deriveDecoder[BackoffPolicy] +} diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/kinesis/Retries.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/kinesis/Retries.scala new file mode 100644 index 00000000..25a4008c --- /dev/null +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/kinesis/Retries.scala @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.kinesis + +import cats.Applicative +import retry.{RetryPolicies, RetryPolicy} + +object Retries { + + /** + * A retry policy appropriate for when we are throttled by a AWS rate limit. + * + * E.g. throttled by Kinesis when sinking records; or throttled by Dynamodb when checkpointing. + */ + def forThrottling[F[_]: Applicative](config: BackoffPolicy): RetryPolicy[F] = + RetryPolicies.capDelay[F](config.maxBackoff, RetryPolicies.fibonacciBackoff[F](config.minBackoff)) +} diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sinks/kinesis/KinesisSink.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sinks/kinesis/KinesisSink.scala index f8cc119a..16823c53 100644 --- a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sinks/kinesis/KinesisSink.scala +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sinks/kinesis/KinesisSink.scala @@ -8,14 +8,13 @@ package com.snowplowanalytics.snowplow.sinks.kinesis import cats.implicits._ -import cats.{Applicative, Parallel} +import cats.Parallel import cats.effect.{Async, Resource, Sync} import cats.effect.kernel.Ref import org.typelevel.log4cats.{Logger, SelfAwareStructuredLogger} import org.typelevel.log4cats.slf4j.Slf4jLogger import retry.syntax.all._ -import retry.{RetryPolicies, RetryPolicy} import software.amazon.awssdk.core.SdkBytes import software.amazon.awssdk.services.kinesis.KinesisClient @@ -24,6 +23,8 @@ import software.amazon.awssdk.awscore.defaultsmode.DefaultsMode import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain import software.amazon.awssdk.services.kinesis.model.{PutRecordsRequest, PutRecordsRequestEntry, PutRecordsResponse} +import com.snowplowanalytics.snowplow.kinesis.{BackoffPolicy, Retries} + import java.net.URI import java.util.UUID import java.nio.charset.StandardCharsets.UTF_8 @@ -202,7 +203,7 @@ object KinesisSink { streamName: String, records: ListOfList[Sinkable] ): F[Unit] = { - val policyForThrottling = Retries.fibonacci[F](throttlingErrorsPolicy) + val policyForThrottling = Retries.forThrottling[F](throttlingErrorsPolicy) // First, tryWriteToKinesis - the AWS SDK will handle retries. If there are still failures after that, it will: // - return messages for retries if we only hit throttliing @@ -233,17 +234,6 @@ object KinesisSink { private final case class RequestLimits(recordLimit: Int, bytesLimit: Int) - private object Retries { - - def fibonacci[F[_]: Applicative](config: BackoffPolicy): RetryPolicy[F] = - capBackoffAndRetries(config, RetryPolicies.fibonacciBackoff[F](config.minBackoff)) - - private def capBackoffAndRetries[F[_]: Applicative](config: BackoffPolicy, policy: RetryPolicy[F]): RetryPolicy[F] = { - val capped = RetryPolicies.capDelay[F](config.maxBackoff, policy) - config.maxRetries.fold(capped)(max => capped.join(RetryPolicies.limitRetries(max))) - } - } - private def getRecordSize(record: PutRecordsRequestEntry) = record.data.asByteArrayUnsafe().length + record.partitionKey().getBytes(UTF_8).length diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sinks/kinesis/KinesisSinkConfig.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sinks/kinesis/KinesisSinkConfig.scala index 20209c9b..8d8f671d 100644 --- a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sinks/kinesis/KinesisSinkConfig.scala +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sinks/kinesis/KinesisSinkConfig.scala @@ -9,22 +9,10 @@ package com.snowplowanalytics.snowplow.sinks.kinesis import io.circe._ import io.circe.generic.semiauto._ -import io.circe.config.syntax._ -import scala.concurrent.duration.FiniteDuration -import java.net.URI - -case class BackoffPolicy( - minBackoff: FiniteDuration, - maxBackoff: FiniteDuration, - maxRetries: Option[Int] -) +import com.snowplowanalytics.snowplow.kinesis.BackoffPolicy -object BackoffPolicy { - - implicit def backoffPolicyDecoder: Decoder[BackoffPolicy] = - deriveDecoder[BackoffPolicy] -} +import java.net.URI case class KinesisSinkConfig( streamName: String, diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisCheckpointer.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisCheckpointer.scala index 7190e1fb..53a055c4 100644 --- a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisCheckpointer.scala +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisCheckpointer.scala @@ -10,15 +10,21 @@ package com.snowplowanalytics.snowplow.sources.kinesis import cats.effect.{Async, Sync} import cats.implicits._ import cats.effect.implicits._ -import com.snowplowanalytics.snowplow.sources.internal.Checkpointer import org.typelevel.log4cats.Logger -import software.amazon.kinesis.exceptions.ShutdownException +import retry.syntax.all._ +import software.amazon.kinesis.exceptions.{ShutdownException, ThrottlingException} import software.amazon.kinesis.processor.RecordProcessorCheckpointer import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber +import com.snowplowanalytics.snowplow.sources.internal.Checkpointer +import com.snowplowanalytics.snowplow.kinesis.{BackoffPolicy, Retries} + import java.util.concurrent.CountDownLatch -private class KinesisCheckpointer[F[_]: Async: Logger] extends Checkpointer[F, Map[String, Checkpointable]] { +private class KinesisCheckpointer[F[_]: Async: Logger](throttledBackoffPolicy: BackoffPolicy) + extends Checkpointer[F, Map[String, Checkpointable]] { + + private val retryPolicy = Retries.forThrottling[F](throttledBackoffPolicy) override val empty: Map[String, Checkpointable] = Map.empty @@ -56,6 +62,18 @@ private class KinesisCheckpointer[F[_]: Async: Logger] extends Checkpointer[F, M checkpointer.checkpoint(extendedSequenceNumber.sequenceNumber, extendedSequenceNumber.subSequenceNumber) ) .recoverWith(ignoreShutdownExceptions(shardId)) + .retryingOnSomeErrors( + policy = retryPolicy, + isWorthRetrying = { + case _: ThrottlingException => true.pure[F] + case _ => false.pure[F] + }, + onError = { case (_, retryDetails) => + Logger[F].warn( + s"Exceeded DynamoDB provisioned throughput. Checkpointing will be retried. (${retryDetails.retriesSoFar} retries so far)" + ) + } + ) private def ignoreShutdownExceptions(shardId: String): PartialFunction[Throwable, F[Unit]] = { case _: ShutdownException => // The ShardRecordProcessor instance has been shutdown. This just means another KCL diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala index 4ef34873..f3dbf0c2 100644 --- a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala @@ -33,7 +33,7 @@ object KinesisSource { kinesisStream(config, liveness) def checkpointer: KinesisCheckpointer[F] = - new KinesisCheckpointer[F]() + new KinesisCheckpointer[F](config.checkpointThrottledBackoffPolicy) def lastLiveness: F[FiniteDuration] = liveness.get diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfig.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfig.scala index 403f35fe..3d8c4c4f 100644 --- a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfig.scala +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfig.scala @@ -16,6 +16,8 @@ import java.net.URI import java.time.Instant import scala.concurrent.duration.FiniteDuration +import com.snowplowanalytics.snowplow.kinesis.BackoffPolicy + /** * Config to be supplied from the app's hocon * @@ -46,7 +48,8 @@ case class KinesisSourceConfig( dynamodbCustomEndpoint: Option[URI], cloudwatchCustomEndpoint: Option[URI], leaseDuration: FiniteDuration, - maxLeasesToStealAtOneTimeFactor: BigDecimal + maxLeasesToStealAtOneTimeFactor: BigDecimal, + checkpointThrottledBackoffPolicy: BackoffPolicy ) object KinesisSourceConfig { diff --git a/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/sinks/kinesis/KinesisSinkConfigSpec.scala b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/sinks/kinesis/KinesisSinkConfigSpec.scala index d5e18701..4c483bf5 100644 --- a/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/sinks/kinesis/KinesisSinkConfigSpec.scala +++ b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/sinks/kinesis/KinesisSinkConfigSpec.scala @@ -16,6 +16,8 @@ import org.specs2.Specification import scala.concurrent.duration.DurationLong +import com.snowplowanalytics.snowplow.kinesis.BackoffPolicy + class KinesisSinkConfigSpec extends Specification { import KinesisSinkConfigSpec._ @@ -38,7 +40,7 @@ class KinesisSinkConfigSpec extends Specification { val expected = KinesisSinkConfig( streamName = "my-stream", - throttledBackoffPolicy = BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second, maxRetries = None), + throttledBackoffPolicy = BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second), recordLimit = 500, byteLimit = 5242880, customEndpoint = None diff --git a/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfigSpec.scala b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfigSpec.scala index e6e4c91b..f613e444 100644 --- a/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfigSpec.scala +++ b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfigSpec.scala @@ -15,6 +15,8 @@ import io.circe.generic.semiauto._ import org.specs2.Specification import scala.concurrent.duration.DurationLong +import com.snowplowanalytics.snowplow.kinesis.BackoffPolicy + class KinesisSourceConfigSpec extends Specification { import KinesisSourceConfigSpec._ @@ -40,7 +42,11 @@ class KinesisSourceConfigSpec extends Specification { "type": "TrimHorizon" }, "leaseDuration": "20 seconds", - "maxLeasesToStealAtOneTimeFactor": 0.42 + "maxLeasesToStealAtOneTimeFactor": 0.42, + "checkpointThrottledBackoffPolicy": { + "minBackoff": "100 millis", + "maxBackoff": "1second" + } } """ @@ -52,7 +58,8 @@ class KinesisSourceConfigSpec extends Specification { c.initialPosition must beEqualTo(KinesisSourceConfig.InitialPosition.TrimHorizon), c.retrievalMode must beEqualTo(KinesisSourceConfig.Retrieval.Polling(42)), c.leaseDuration must beEqualTo(20.seconds), - c.maxLeasesToStealAtOneTimeFactor must beEqualTo(BigDecimal(0.42)) + c.maxLeasesToStealAtOneTimeFactor must beEqualTo(BigDecimal(0.42)), + c.checkpointThrottledBackoffPolicy must beEqualTo(BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second)) ).reduce(_ and _) } } @@ -71,7 +78,11 @@ class KinesisSourceConfigSpec extends Specification { "type": "TRIM_HORIZON" }, "leaseDuration": "20 seconds", - "maxLeasesToStealAtOneTimeFactor": 0.42 + "maxLeasesToStealAtOneTimeFactor": 0.42, + "checkpointThrottledBackoffPolicy": { + "minBackoff": "100 millis", + "maxBackoff": "1second" + } } """ @@ -83,7 +94,8 @@ class KinesisSourceConfigSpec extends Specification { c.initialPosition must beEqualTo(KinesisSourceConfig.InitialPosition.TrimHorizon), c.retrievalMode must beEqualTo(KinesisSourceConfig.Retrieval.Polling(42)), c.leaseDuration must beEqualTo(20.seconds), - c.maxLeasesToStealAtOneTimeFactor must beEqualTo(BigDecimal(0.42)) + c.maxLeasesToStealAtOneTimeFactor must beEqualTo(BigDecimal(0.42)), + c.checkpointThrottledBackoffPolicy must beEqualTo(BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second)) ).reduce(_ and _) } } @@ -102,16 +114,17 @@ class KinesisSourceConfigSpec extends Specification { val result = ConfigFactory.load(ConfigFactory.parseString(input)) val expected = KinesisSourceConfig( - appName = "my-app", - streamName = "my-stream", - workerIdentifier = System.getenv("HOSTNAME"), - initialPosition = KinesisSourceConfig.InitialPosition.Latest, - retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000), - customEndpoint = None, - dynamodbCustomEndpoint = None, - cloudwatchCustomEndpoint = None, - leaseDuration = 10.seconds, - maxLeasesToStealAtOneTimeFactor = BigDecimal(2.0) + appName = "my-app", + streamName = "my-stream", + workerIdentifier = System.getenv("HOSTNAME"), + initialPosition = KinesisSourceConfig.InitialPosition.Latest, + retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000), + customEndpoint = None, + dynamodbCustomEndpoint = None, + cloudwatchCustomEndpoint = None, + leaseDuration = 10.seconds, + maxLeasesToStealAtOneTimeFactor = BigDecimal(2.0), + checkpointThrottledBackoffPolicy = BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second) ) result.as[Wrapper] must beRight.like { case w: Wrapper =>