diff --git a/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sinks/kafka/KafkaSink.scala b/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sinks/kafka/KafkaSink.scala index 512689b..9ac2fdf 100644 --- a/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sinks/kafka/KafkaSink.scala +++ b/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sinks/kafka/KafkaSink.scala @@ -28,7 +28,6 @@ object KafkaSink { for { producer <- KafkaProducer[F].resource(producerSettings) } yield fromFs2Producer(config, producer) - } private def fromFs2Producer[F[_]: Monad](config: KafkaSinkConfig, producer: KafkaProducer[F, String, Array[Byte]]): Sink[F] = diff --git a/modules/kinesis-it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSinkSpec.scala b/modules/kinesis-it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSinkSpec.scala new file mode 100644 index 0000000..ede57c1 --- /dev/null +++ b/modules/kinesis-it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSinkSpec.scala @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.sinks.kinesis + +import cats.effect.{IO, Resource} +import cats.effect.testing.specs2.CatsResource + +import scala.concurrent.duration.{DurationInt, FiniteDuration} + +import org.specs2.mutable.SpecificationLike + +import org.testcontainers.containers.localstack.LocalStackContainer + +import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain +import software.amazon.awssdk.regions.Region + +import com.snowplowanalytics.snowplow.it.kinesis._ +import com.snowplowanalytics.snowplow.sinks.{Sink, Sinkable} + +import Utils._ + +class KinesisSinkSpec extends CatsResource[IO, (Region, LocalStackContainer, Sink[IO])] with SpecificationLike { + import KinesisSinkSpec._ + + override val Timeout: FiniteDuration = 3.minutes + + /** Resources which are shared across tests */ + override val resource: Resource[IO, (Region, LocalStackContainer, Sink[IO])] = + for { + region <- Resource.eval(IO.blocking((new DefaultAwsRegionProviderChain).getRegion)) + localstack <- Localstack.resource(region, KINESIS_INITIALIZE_STREAMS, KinesisSinkSpec.getClass.getSimpleName) + testSink <- KinesisSink.resource[IO](getKinesisSinkConfig(localstack.getEndpoint)(testStream1Name)) + } yield (region, localstack, testSink) + + override def is = s2""" + KinesisSinkSpec should + write to output stream $e1 + """ + + def e1 = withResource { case (region, localstack, testSink) => + val testPayload = "test-payload" + val testInput = List(Sinkable(testPayload.getBytes(), Some("myPk"), Map(("", "")))) + + for { + kinesisClient <- getKinesisClient(localstack.getEndpoint, region) + _ <- testSink.sink(testInput) + _ <- IO.sleep(3.seconds) + result = getDataFromKinesis(kinesisClient, testStream1Name) + } yield List( + result.events must haveSize(1), + result.events must beEqualTo(List(testPayload)) + ) + } +} + +object KinesisSinkSpec { + val testStream1Name = "test-sink-stream-1" + val KINESIS_INITIALIZE_STREAMS: String = + List(s"$testStream1Name:1").mkString(",") +} diff --git a/modules/kinesis-it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceSpec.scala b/modules/kinesis-it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceSpec.scala index 47fdf55..2b09e3a 100644 --- a/modules/kinesis-it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceSpec.scala +++ b/modules/kinesis-it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceSpec.scala @@ -17,9 +17,11 @@ import org.specs2.mutable.SpecificationLike import org.testcontainers.containers.localstack.LocalStackContainer import software.amazon.awssdk.services.kinesis.KinesisAsyncClient +import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain import com.snowplowanalytics.snowplow.sources.EventProcessingConfig import com.snowplowanalytics.snowplow.sources.EventProcessingConfig.NoWindowing +import com.snowplowanalytics.snowplow.it.kinesis._ import java.time.Instant @@ -35,17 +37,17 @@ class KinesisSourceSpec /** Resources which are shared across tests */ override val resource: Resource[IO, (LocalStackContainer, KinesisAsyncClient, String => KinesisSourceConfig)] = for { - region <- Resource.eval(KinesisSourceConfig.getRuntimeRegion[IO]) - localstack <- Localstack.resource(region, KINESIS_INITIALIZE_STREAMS) + region <- Resource.eval(IO.blocking((new DefaultAwsRegionProviderChain).getRegion)) + localstack <- Localstack.resource(region, KINESIS_INITIALIZE_STREAMS, KinesisSourceSpec.getClass.getSimpleName) kinesisClient <- Resource.eval(getKinesisClient(localstack.getEndpoint, region)) - } yield (localstack, kinesisClient, getKinesisConfig(localstack.getEndpoint)(_)) + } yield (localstack, kinesisClient, getKinesisSourceConfig(localstack.getEndpoint)(_)) override def is = s2""" KinesisSourceSpec should read from input stream $e1 """ - def e1 = withResource { case (_, kinesisClient, getKinesisConfig) => + def e1 = withResource { case (_, kinesisClient, getKinesisSourceConfig) => val testPayload = "test-payload" for { @@ -54,7 +56,7 @@ class KinesisSourceSpec _ <- putDataToKinesis(kinesisClient, testStream1Name, testPayload) t2 <- IO.realTimeInstant processingConfig = new EventProcessingConfig(NoWindowing) - kinesisConfig = getKinesisConfig(testStream1Name) + kinesisConfig = getKinesisSourceConfig(testStream1Name) sourceAndAck <- KinesisSource.build[IO](kinesisConfig) stream = sourceAndAck.stream(processingConfig, testProcessor(refProcessed)) fiber <- stream.compile.drain.start @@ -72,7 +74,7 @@ class KinesisSourceSpec } object KinesisSourceSpec { - val testStream1Name = "test-stream-1" + val testStream1Name = "test-source-stream-1" val KINESIS_INITIALIZE_STREAMS: String = List(s"$testStream1Name:1").mkString(",") } diff --git a/modules/kinesis-it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Localstack.scala b/modules/kinesis-it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Localstack.scala index 54aa0e2..8eff350 100644 --- a/modules/kinesis-it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Localstack.scala +++ b/modules/kinesis-it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Localstack.scala @@ -5,7 +5,7 @@ * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 */ -package com.snowplowanalytics.snowplow.sources.kinesis +package com.snowplowanalytics.snowplow.it.kinesis import cats.effect.{IO, Resource} import org.slf4j.LoggerFactory @@ -17,19 +17,23 @@ import software.amazon.awssdk.regions.Region object Localstack { - def resource(region: Region, kinesisInitializeStreams: String): Resource[IO, LocalStackContainer] = + def resource( + region: Region, + kinesisInitializeStreams: String, + loggerName: String + ): Resource[IO, LocalStackContainer] = Resource.make { val localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:2.2.0")) localstack.addEnv("AWS_DEFAULT_REGION", region.id) localstack.addEnv("KINESIS_INITIALIZE_STREAMS", kinesisInitializeStreams) localstack.addExposedPort(4566) localstack.setWaitStrategy(Wait.forLogMessage(".*Ready.*", 1)) - IO(startLocalstack(localstack)) + IO(startLocalstack(localstack, loggerName)) }(ls => IO.blocking(ls.stop())) - private def startLocalstack(localstack: LocalStackContainer): LocalStackContainer = { + private def startLocalstack(localstack: LocalStackContainer, loggerName: String): LocalStackContainer = { localstack.start() - val logger = LoggerFactory.getLogger(KinesisSourceSpec.getClass.getSimpleName) + val logger = LoggerFactory.getLogger(loggerName) val logs = new Slf4jLogConsumer(logger) localstack.followOutput(logs) localstack diff --git a/modules/kinesis-it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala b/modules/kinesis-it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala index 6d5dca6..35b12ec 100644 --- a/modules/kinesis-it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala +++ b/modules/kinesis-it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala @@ -5,23 +5,31 @@ * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 */ -package com.snowplowanalytics.snowplow.sources.kinesis +package com.snowplowanalytics.snowplow.it.kinesis import cats.effect.{IO, Ref} +import scala.concurrent.duration.FiniteDuration +import scala.jdk.CollectionConverters._ + import eu.timepit.refined.types.numeric.PosInt import software.amazon.awssdk.core.SdkBytes import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.kinesis.KinesisAsyncClient -import software.amazon.awssdk.services.kinesis.model.{PutRecordRequest, PutRecordResponse} +import software.amazon.awssdk.services.kinesis.model.{GetRecordsRequest, GetShardIteratorRequest, PutRecordRequest, PutRecordResponse} + +import com.snowplowanalytics.snowplow.sources.{EventProcessor, TokenedEvents} +import com.snowplowanalytics.snowplow.sources.kinesis.KinesisSourceConfig +import com.snowplowanalytics.snowplow.sinks.kinesis.{BackoffPolicy, KinesisSinkConfig} import java.net.URI import java.nio.charset.StandardCharsets import java.util.UUID import java.time.Instant +import java.util.concurrent.TimeUnit -import com.snowplowanalytics.snowplow.sources.{EventProcessor, TokenedEvents} +import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest object Utils { @@ -42,7 +50,44 @@ object Utils { IO.blocking(client.putRecord(record).get()) } - def getKinesisConfig(endpoint: URI)(streamName: String): KinesisSourceConfig = KinesisSourceConfig( + /** + * getDataFromKinesis gets the last 1000 records from kinesis, stringifies the datta it found, and + * returns a ReceivedEvents It can be called at the end of simple tests to return data from a + * Kinesis stream. + * + * If required in future, where more data is used we might amend it to poll the stream for data + * and return everything it finds after a period without any data. + */ + def getDataFromKinesis( + client: KinesisAsyncClient, + streamName: String + ): ReceivedEvents = { + + val descStreamResp = client.describeStream(DescribeStreamRequest.builder().streamName(streamName).build()).get + + // We're assuming only one shard here. + // Any future test with multiple shards requires us to create one iterator per shard + val shIterRequest = GetShardIteratorRequest + .builder() + .streamName(streamName) + .shardIteratorType("TRIM_HORIZON") + .shardId(descStreamResp.streamDescription.shards.get(0).shardId) + .build(); + + val shIter = client.getShardIterator(shIterRequest).get.shardIterator + + val request = GetRecordsRequest + .builder() + .streamARN(descStreamResp.streamDescription().streamARN()) + .shardIterator(shIter) + .build() + + val out = + ReceivedEvents(client.getRecords(request).get().records().asScala.toList.map(record => new String(record.data.asByteArray())), None) + out + } + + def getKinesisSourceConfig(endpoint: URI)(streamName: String): KinesisSourceConfig = KinesisSourceConfig( UUID.randomUUID().toString, streamName, KinesisSourceConfig.InitialPosition.TrimHorizon, @@ -53,6 +98,14 @@ object Utils { Some(endpoint) ) + def getKinesisSinkConfig(endpoint: URI)(streamName: String): KinesisSinkConfig = KinesisSinkConfig( + streamName, + BackoffPolicy(FiniteDuration(1, TimeUnit.SECONDS), FiniteDuration(1, TimeUnit.SECONDS), None), + 1000, + 1000000, + Some(endpoint) + ) + def testProcessor(ref: Ref[IO, List[ReceivedEvents]]): EventProcessor[IO] = _.evalMap { case TokenedEvents(events, token, tstamp) => val parsed = events.map(byteBuffer => StandardCharsets.UTF_8.decode(byteBuffer).toString) diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sinks/kinesis/KinesisSink.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sinks/kinesis/KinesisSink.scala new file mode 100644 index 0000000..146ba90 --- /dev/null +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sinks/kinesis/KinesisSink.scala @@ -0,0 +1,275 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.sinks.kinesis + +import cats.implicits._ +import cats.{Applicative, Monoid, Parallel} +import cats.effect.{Async, Resource, Sync} +import cats.effect.kernel.Ref + +import org.typelevel.log4cats.{Logger, SelfAwareStructuredLogger} +import org.typelevel.log4cats.slf4j.Slf4jLogger +import retry.syntax.all._ +import retry.{RetryPolicies, RetryPolicy} + +import software.amazon.awssdk.core.SdkBytes +import software.amazon.awssdk.services.kinesis.KinesisClient +import software.amazon.awssdk.regions.Region +import software.amazon.awssdk.awscore.defaultsmode.DefaultsMode +import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain +import software.amazon.awssdk.services.kinesis.model.{PutRecordsRequest, PutRecordsRequestEntry, PutRecordsResponse} + +import java.net.URI +import java.util.UUID +import java.nio.charset.StandardCharsets.UTF_8 + +import scala.jdk.CollectionConverters._ + +import com.snowplowanalytics.snowplow.sinks.{Sink, Sinkable} + +object KinesisSink { + + def resource[F[_]: Parallel: Async](config: KinesisSinkConfig): Resource[F, Sink[F]] = + mkProducer[F](config).map { p => + Sink( + writeToKinesis[F]( + config.throttledBackoffPolicy, + RequestLimits(config.recordLimit, config.byteLimit), + p, + config.streamName, + _ + ) + ) + } + + private implicit def logger[F[_]: Sync]: SelfAwareStructuredLogger[F] = Slf4jLogger.getLogger[F] + + private def buildKinesisClient(customEndpoint: Option[URI], region: Region): KinesisClient = { + val builder = KinesisClient.builder().region(region).defaultsMode(DefaultsMode.AUTO) + customEndpoint.foreach(e => builder.endpointOverride(e)) + builder.build() + } + + private def mkProducer[F[_]: Sync](config: KinesisSinkConfig): Resource[F, KinesisClient] = { + val make = Sync[F].delay(buildKinesisClient(config.customEndpoint, (new DefaultAwsRegionProviderChain).getRegion())) + + Resource.make(make) { producer => + Sync[F].blocking { + producer.close() + } + } + } + + /** + * This function takes a list of records and splits it into several lists, where each list is as + * big as possible with respecting the record limit and the size limit. + */ + private[kinesis] def group[A]( + records: List[A], + recordLimit: Int, + sizeLimit: Int, + getRecordSize: A => Int + ): List[List[A]] = { + case class Batch( + size: Int, + count: Int, + records: List[A] + ) + + records + .foldLeft(List.empty[Batch]) { case (acc, record) => + val recordSize = getRecordSize(record) + acc match { + case head :: tail => + if (head.count + 1 > recordLimit || head.size + recordSize > sizeLimit) + List(Batch(recordSize, 1, List(record))) ++ List(head) ++ tail + else + List(Batch(head.size + recordSize, head.count + 1, record :: head.records)) ++ tail + case Nil => + List(Batch(recordSize, 1, List(record))) + } + } + .map(_.records) + } + + private def putRecords( + kinesis: KinesisClient, + streamName: String, + records: List[PutRecordsRequestEntry] + ): PutRecordsResponse = { + val putRecordsRequest = { + val prr = PutRecordsRequest.builder() + prr + .streamName(streamName) + .records(records.asJava) + prr.build() + } + kinesis.putRecords(putRecordsRequest) + } + + private def toKinesisRecords(records: List[Sinkable]): List[PutRecordsRequestEntry] = + records.map { r => + val data = SdkBytes.fromByteArrayUnsafe(r.bytes) + val prre = PutRecordsRequestEntry + .builder() + .partitionKey(r.partitionKey.getOrElse(UUID.randomUUID.toString())) + .data(data) + .build() + prre + } + + /** + * The result of trying to write a batch to kinesis + * @param nextBatchAttempt + * Records to re-package into another batch, either because of throttling or an internal error + * @param hadNonThrottleErrors + * Whether at least one of failures is not because of throttling + * @param exampleInternalError + * A message to help with logging + */ + private case class TryBatchResult( + nextBatchAttempt: Vector[PutRecordsRequestEntry], + hadNonThrottleErrors: Boolean, + exampleInternalError: Option[String] + ) + + private object TryBatchResult { + + implicit private def tryBatchResultMonoid: Monoid[TryBatchResult] = + new Monoid[TryBatchResult] { + override val empty: TryBatchResult = TryBatchResult(Vector.empty, false, None) + override def combine(x: TryBatchResult, y: TryBatchResult): TryBatchResult = + TryBatchResult( + x.nextBatchAttempt ++ y.nextBatchAttempt, + x.hadNonThrottleErrors || y.hadNonThrottleErrors, + x.exampleInternalError.orElse(y.exampleInternalError) + ) + } + + /** + * The build method creates a TryBatchResult, which: + * + * - Returns an empty list and false for hadNonThrottleErrors if everything was successful + * - Returns the list of failed requests and true for hadNonThrottleErrors if we encountered + * any errors that weren't throttles + * - Returns the list of failed requests and false for hadNonThrottleErrors if we encountered + * only throttling + */ + def build(records: List[PutRecordsRequestEntry], prr: PutRecordsResponse): TryBatchResult = + if (prr.failedRecordCount().toInt =!= 0) + records + .zip(prr.records().asScala) + .foldMap { case (orig, recordResult) => + Option(recordResult.errorCode()) match { + // If the record had no error, treat as success + case None => + TryBatchResult(Vector.empty, false, None) + // If it had a throughput exception, mark that and provide the original + case Some("ProvisionedThroughputExceededException") => + TryBatchResult(Vector(orig), false, None) + // If any other error, mark success and throttled false for this record, and provide the original + case Some(_) => + TryBatchResult(Vector(orig), true, Option(recordResult.errorMessage())) + } + } + else + TryBatchResult(Vector.empty, false, None) + } + + /** + * Try writing a batch, and returns a list of the failures to be retried: + * + * If we are not throttled by kinesis, then the list is empty. If we are throttled by kinesis, the + * list contains throttled records and records that gave internal errors. If there is an + * exception, or if all records give internal errors, then we retry using the policy. + */ + private def tryWriteToKinesis[F[_]: Sync]( + streamName: String, + kinesis: KinesisClient, + records: List[PutRecordsRequestEntry] + ): F[Vector[PutRecordsRequestEntry]] = + Logger[F].debug(s"Writing ${records.size} records to ${streamName}") *> + Sync[F] + .blocking(putRecords(kinesis, streamName, records)) + .map(TryBatchResult.build(records, _)) + .flatMap { result => + // If we encountered non-throttle errors, raise an exception. Otherwise, return all the requests that should + // be manually retried due to throttling + if (result.hadNonThrottleErrors) + Sync[F].raiseError(new RuntimeException(failureMessageForInternalErrors(records, streamName, result))) + else + result.nextBatchAttempt.pure[F] + } + + private def writeToKinesis[F[_]: Parallel: Async]( + throttlingErrorsPolicy: BackoffPolicy, + requestLimits: RequestLimits, + kinesis: KinesisClient, + streamName: String, + records: List[Sinkable] + ): F[Unit] = { + val policyForThrottling = Retries.fibonacci[F](throttlingErrorsPolicy) + + // First, tryWriteToKinesis - the AWS SDK will handle retries. If there are still failures after that, it will: + // - return messages for retries if we only hit throttliing + // - raise an error if we still have non-throttle failures after the SDK has carried out retries + def runAndCaptureFailures(ref: Ref[F, List[PutRecordsRequestEntry]]): F[List[PutRecordsRequestEntry]] = + for { + records <- ref.get + failures <- group(records, requestLimits.recordLimit, requestLimits.bytesLimit, getRecordSize) + .parTraverse(g => tryWriteToKinesis(streamName, kinesis, g)) + flattened = failures.flatten + _ <- ref.set(flattened) + } yield flattened + for { + ref <- Ref.of[F, List[PutRecordsRequestEntry]](toKinesisRecords(records)) + failures <- runAndCaptureFailures(ref) + .retryingOnFailures( + policy = policyForThrottling, + wasSuccessful = entries => Sync[F].pure(entries.isEmpty), + onFailure = { case (result, retryDetails) => + val msg = failureMessageForThrottling(result, streamName) + Logger[F].warn(s"$msg (${retryDetails.retriesSoFar} retries from cats-retry)") + } + ) + _ <- if (failures.isEmpty) Sync[F].unit + else Sync[F].raiseError(new RuntimeException(failureMessageForThrottling(failures, streamName))) + } yield () + } + + private final case class RequestLimits(recordLimit: Int, bytesLimit: Int) + + private object Retries { + + def fibonacci[F[_]: Applicative](config: BackoffPolicy): RetryPolicy[F] = + capBackoffAndRetries(config, RetryPolicies.fibonacciBackoff[F](config.minBackoff)) + + private def capBackoffAndRetries[F[_]: Applicative](config: BackoffPolicy, policy: RetryPolicy[F]): RetryPolicy[F] = { + val capped = RetryPolicies.capDelay[F](config.maxBackoff, policy) + config.maxRetries.fold(capped)(max => capped.join(RetryPolicies.limitRetries(max))) + } + } + + private def getRecordSize(record: PutRecordsRequestEntry) = + record.data.asByteArrayUnsafe().length + record.partitionKey().getBytes(UTF_8).length + + private def failureMessageForInternalErrors( + records: List[PutRecordsRequestEntry], + streamName: String, + result: TryBatchResult + ): String = { + val exampleMessage = result.exampleInternalError.getOrElse("none") + s"Writing ${records.size} records to $streamName errored with internal failures. Example error message [$exampleMessage]" + } + + private def failureMessageForThrottling( + records: List[PutRecordsRequestEntry], + streamName: String + ): String = + s"Exceeded Kinesis provisioned throughput: ${records.size} records failed writing to $streamName." +} diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sinks/kinesis/KinesisSinkConfig.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sinks/kinesis/KinesisSinkConfig.scala new file mode 100644 index 0000000..20209c9 --- /dev/null +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sinks/kinesis/KinesisSinkConfig.scala @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.sinks.kinesis + +import io.circe._ +import io.circe.generic.semiauto._ +import io.circe.config.syntax._ +import scala.concurrent.duration.FiniteDuration + +import java.net.URI + +case class BackoffPolicy( + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + maxRetries: Option[Int] +) + +object BackoffPolicy { + + implicit def backoffPolicyDecoder: Decoder[BackoffPolicy] = + deriveDecoder[BackoffPolicy] +} + +case class KinesisSinkConfig( + streamName: String, + throttledBackoffPolicy: BackoffPolicy, + recordLimit: Int, + byteLimit: Int, + customEndpoint: Option[URI] +) + +object KinesisSinkConfig { + implicit def decoder: Decoder[KinesisSinkConfig] = + deriveDecoder[KinesisSinkConfig] +} diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala index 662ce25..b94b8ff 100644 --- a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala @@ -16,6 +16,7 @@ import org.typelevel.log4cats.{Logger, SelfAwareStructuredLogger} import org.typelevel.log4cats.slf4j.Slf4jLogger import software.amazon.awssdk.awscore.defaultsmode.DefaultsMode import software.amazon.awssdk.regions.Region +import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient import software.amazon.awssdk.services.kinesis.KinesisAsyncClient @@ -114,7 +115,7 @@ object KinesisSource { private def kinesisStream[F[_]: Async](config: KinesisSourceConfig): Stream[F, LowLevelEvents[Map[String, KinesisMetadata[F]]]] = { val resources = for { - region <- Resource.eval(KinesisSourceConfig.getRuntimeRegion) + region <- Resource.eval(Sync[F].delay((new DefaultAwsRegionProviderChain).getRegion)) consumerSettings <- Resource.pure[F, KinesisConsumerSettings]( KinesisConsumerSettings( config.streamName, diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfig.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfig.scala index 8369504..43d4c59 100644 --- a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfig.scala +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfig.scala @@ -7,17 +7,12 @@ */ package com.snowplowanalytics.snowplow.sources.kinesis -import cats.effect.Sync - import eu.timepit.refined.types.all.PosInt import io.circe._ import io.circe.generic.extras.semiauto.deriveConfiguredDecoder import io.circe.generic.extras.Configuration -import software.amazon.awssdk.regions.Region -import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain - import java.net.URI import java.time.Instant @@ -36,9 +31,6 @@ object KinesisSourceConfig { private implicit val posIntDecoder: Decoder[PosInt] = Decoder.decodeInt.emap(PosInt.from) - private[kinesis] def getRuntimeRegion[F[_]: Sync]: F[Region] = - Sync[F].blocking((new DefaultAwsRegionProviderChain).getRegion) - sealed trait InitialPosition object InitialPosition { diff --git a/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/sinks/kinesis/KinesisSinkConfigSpec.scala b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/sinks/kinesis/KinesisSinkConfigSpec.scala new file mode 100644 index 0000000..ac3b506 --- /dev/null +++ b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/sinks/kinesis/KinesisSinkConfigSpec.scala @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.sinks.kinesis + +import io.circe.literal._ +import org.specs2.Specification +import scala.concurrent.duration.FiniteDuration + +import java.net.URI + +class KinesisSinkConfigSpec extends Specification { + + def is = s2""" + The KinesisSource decoder should: + Decode a valid JSON config $e1 + Fail to decode an invalid JSON config $e2 + """ + + def e1 = { + val json = json""" + { + "streamName": "my-stream", + "throttledBackoffPolicy": { + "minBackoff": "100ms", + "maxBackoff": "500ms", + "maxRetries": 3 + }, + "recordLimit": 1000, + "byteLimit": 1000, + "customEndpoint": "http://localhost:4040" + } + """ + + json.as[KinesisSinkConfig] must beRight.like { case c: KinesisSinkConfig => + List( + c.streamName must beEqualTo("my-stream"), + c.throttledBackoffPolicy.minBackoff must beEqualTo(FiniteDuration(100, "ms")), + c.throttledBackoffPolicy.maxBackoff must beEqualTo(FiniteDuration(500, "ms")), + c.throttledBackoffPolicy.maxRetries must beEqualTo(Some(3)), + c.recordLimit must beEqualTo(1000), + c.byteLimit must beEqualTo(1000), + c.customEndpoint must beEqualTo(Some(URI.create("http://localhost:4040"))) + ).reduce(_ and _) + } + } + + def e2 = { + val json = json""" + { + "throttledBackoffPolicy": { + "minBackoff": "100ms", + "maxBackoff": "500ms", + "maxRetries": 3 + }, + "recordLimit": 1000, + "byteLimit": 1000, + "customEndpoint": "http://localhost:4040" + } + """ + + json.as[KinesisSinkConfig] must beLeft + + } +} diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala index 5d522a8..c0e4df6 100644 --- a/project/BuildSettings.scala +++ b/project/BuildSettings.scala @@ -85,7 +85,7 @@ object BuildSettings { ThisBuild / mimaFailOnNoPrevious := false, mimaBinaryIssueFilters ++= Seq(), Test / test := { - mimaReportBinaryIssues.value + val _ = mimaReportBinaryIssues.value (Test / test).value } ) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index ea30710..206ef19 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -13,6 +13,7 @@ object Dependencies { // Scala val cats = "2.9.0" val catsEffect = "3.5.2" + val catsRetry = "3.1.0" val fs2 = "3.9.3" val log4cats = "2.6.0" val http4s = "0.23.15" @@ -55,6 +56,7 @@ object Dependencies { val cats = "org.typelevel" %% "cats-core" % V.cats val fs2 = "co.fs2" %% "fs2-core" % V.fs2 val log4cats = "org.typelevel" %% "log4cats-slf4j" % V.log4cats + val catsRetry = "com.github.cb372" %% "cats-retry" % V.catsRetry val emberServer = "org.http4s" %% "http4s-ember-server" % V.http4s val decline = "com.monovore" %% "decline-effect" % V.decline val circeConfig = "io.circe" %% "circe-config" % V.circeConfig @@ -110,6 +112,7 @@ object Dependencies { catsEffectKernel, fs2, log4cats, + catsRetry, specs2, catsEffectSpecs2, catsEffectTestkit,