From ae0e57e489606e0c32cc3fc53e88f74e0e21c6c2 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Fri, 13 Sep 2024 09:48:34 +0100 Subject: [PATCH] Re-implement Kinesis source without fs2-kinesis (#84) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The common-streams Kinesis source suffers from a problem where we don't quite achieve at-least-once processing semantics near the end of a shard. The problem was in the 3rd-party fs-kinesis library, and it is not easy to fix with any small code change to that library. Sorry I cannot provide a link here to back that up -- it is documented internally at Snowplow. This PR re-implements our Kinesis source from scratch, this time without a dependency on fs2-kinesis. The biggest difference is the way we block the `shardEnded` method of the KCL record processor, until all records from the shard have been written to the destination. --------- Co-authored-by: Piotr PoniedziaƂek --- .../snowplow/sources/kinesis/Utils.scala | 3 - .../kinesis/src/main/resources/reference.conf | 1 - .../sources/kinesis/Checkpointable.scala | 36 +++ .../snowplow/sources/kinesis/KCLAction.scala | 26 ++ .../sources/kinesis/KCLScheduler.scala | 150 +++++++++ .../sources/kinesis/KinesisCheckpointer.scala | 66 ++++ .../sources/kinesis/KinesisSource.scala | 290 +++++------------- .../sources/kinesis/KinesisSourceConfig.scala | 5 - .../kinesis/ShardRecordProcessor.scala | 65 ++++ .../kinesis/KinesisSourceConfigSpec.scala | 6 - .../sources/internal/LowLevelSource.scala | 13 +- project/Dependencies.scala | 20 +- 12 files changed, 428 insertions(+), 253 deletions(-) create mode 100644 modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/Checkpointable.scala create mode 100644 modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KCLAction.scala create mode 100644 modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KCLScheduler.scala create mode 100644 modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisCheckpointer.scala create mode 100644 modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/ShardRecordProcessor.scala diff --git a/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala b/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala index f2ed3d0a..fdcd7ae9 100644 --- a/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala +++ b/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala @@ -12,8 +12,6 @@ import cats.effect.{IO, Ref} import scala.concurrent.duration.FiniteDuration import scala.jdk.CollectionConverters._ -import eu.timepit.refined.types.numeric.PosInt - import software.amazon.awssdk.core.SdkBytes import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.kinesis.KinesisAsyncClient @@ -94,7 +92,6 @@ object Utils { UUID.randomUUID.toString, KinesisSourceConfig.InitialPosition.TrimHorizon, KinesisSourceConfig.Retrieval.Polling(1), - PosInt.unsafeFrom(1), Some(endpoint), Some(endpoint), Some(endpoint), diff --git a/modules/kinesis/src/main/resources/reference.conf b/modules/kinesis/src/main/resources/reference.conf index adabb89a..ab3b6593 100644 --- a/modules/kinesis/src/main/resources/reference.conf +++ b/modules/kinesis/src/main/resources/reference.conf @@ -9,7 +9,6 @@ snowplow.defaults: { type: "Polling" maxRecords: 1000 } - bufferSize: 1 leaseDuration: "10 seconds" } } diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/Checkpointable.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/Checkpointable.scala new file mode 100644 index 00000000..36091051 --- /dev/null +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/Checkpointable.scala @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.sources.kinesis + +import cats.implicits._ +import cats.{Order, Semigroup} +import software.amazon.kinesis.processor.RecordProcessorCheckpointer +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber + +import java.util.concurrent.CountDownLatch + +private sealed trait Checkpointable { + def extendedSequenceNumber: ExtendedSequenceNumber +} + +private object Checkpointable { + final case class Record(extendedSequenceNumber: ExtendedSequenceNumber, checkpointer: RecordProcessorCheckpointer) extends Checkpointable + + final case class ShardEnd(checkpointer: RecordProcessorCheckpointer, release: CountDownLatch) extends Checkpointable { + override def extendedSequenceNumber: ExtendedSequenceNumber = ExtendedSequenceNumber.SHARD_END + } + + implicit def checkpointableOrder: Order[Checkpointable] = Order.from { case (a, b) => + a.extendedSequenceNumber.compareTo(b.extendedSequenceNumber) + } + + implicit def checkpointableSemigroup: Semigroup[Checkpointable] = new Semigroup[Checkpointable] { + def combine(x: Checkpointable, y: Checkpointable): Checkpointable = + x.max(y) + } +} diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KCLAction.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KCLAction.scala new file mode 100644 index 00000000..f3774d54 --- /dev/null +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KCLAction.scala @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.sources.kinesis + +import software.amazon.kinesis.lifecycle.events.{ProcessRecordsInput, ShardEndedInput} + +import java.util.concurrent.CountDownLatch + +private sealed trait KCLAction + +private object KCLAction { + + final case class ProcessRecords(shardId: String, processRecordsInput: ProcessRecordsInput) extends KCLAction + final case class ShardEnd( + shardId: String, + await: CountDownLatch, + shardEndedInput: ShardEndedInput + ) extends KCLAction + final case class KCLError(t: Throwable) extends KCLAction + +} diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KCLScheduler.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KCLScheduler.scala new file mode 100644 index 00000000..55ae8bd5 --- /dev/null +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KCLScheduler.scala @@ -0,0 +1,150 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.sources.kinesis + +import cats.effect.implicits._ +import cats.effect.{Async, Resource, Sync} +import cats.implicits._ +import com.snowplowanalytics.snowplow.sources.kinesis.KCLAction.KCLError +import software.amazon.awssdk.awscore.defaultsmode.DefaultsMode +import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient +import software.amazon.kinesis.common.{ConfigsBuilder, InitialPositionInStream, InitialPositionInStreamExtended} +import software.amazon.kinesis.coordinator.{Scheduler, WorkerStateChangeListener} +import software.amazon.kinesis.metrics.MetricsLevel +import software.amazon.kinesis.processor.SingleStreamTracker +import software.amazon.kinesis.retrieval.fanout.FanOutConfig +import software.amazon.kinesis.retrieval.polling.PollingConfig + +import java.net.URI +import java.util.Date +import java.util.concurrent.SynchronousQueue +import java.util.concurrent.atomic.AtomicReference + +private[kinesis] object KCLScheduler { + + def populateQueue[F[_]: Async]( + config: KinesisSourceConfig, + queue: SynchronousQueue[KCLAction] + ): Resource[F, Unit] = + for { + kinesis <- mkKinesisClient[F](config.customEndpoint) + dynamo <- mkDynamoDbClient[F](config.dynamodbCustomEndpoint) + cloudWatch <- mkCloudWatchClient[F](config.cloudwatchCustomEndpoint) + scheduler <- Resource.eval(mkScheduler(kinesis, dynamo, cloudWatch, config, queue)) + _ <- runInBackground(scheduler) + } yield () + + private def mkScheduler[F[_]: Sync]( + kinesisClient: KinesisAsyncClient, + dynamoDbClient: DynamoDbAsyncClient, + cloudWatchClient: CloudWatchAsyncClient, + kinesisConfig: KinesisSourceConfig, + queue: SynchronousQueue[KCLAction] + ): F[Scheduler] = + Sync[F].delay { + val configsBuilder = + new ConfigsBuilder( + kinesisConfig.streamName, + kinesisConfig.appName, + kinesisClient, + dynamoDbClient, + cloudWatchClient, + kinesisConfig.workerIdentifier, + () => ShardRecordProcessor(queue, new AtomicReference(Set.empty[String])) + ) + + val retrievalConfig = + configsBuilder.retrievalConfig + .streamTracker(new SingleStreamTracker(kinesisConfig.streamName, initialPositionOf(kinesisConfig.initialPosition))) + .retrievalSpecificConfig { + kinesisConfig.retrievalMode match { + case KinesisSourceConfig.Retrieval.FanOut => + new FanOutConfig(kinesisClient).streamName(kinesisConfig.streamName).applicationName(kinesisConfig.appName) + case KinesisSourceConfig.Retrieval.Polling(maxRecords) => + new PollingConfig(kinesisConfig.streamName, kinesisClient).maxRecords(maxRecords) + } + } + + val leaseManagementConfig = + configsBuilder.leaseManagementConfig + .failoverTimeMillis(kinesisConfig.leaseDuration.toMillis) + + // We ask to see empty batches, so that we can update the health check even when there are no records in the stream + val processorConfig = + configsBuilder.processorConfig + .callProcessRecordsEvenForEmptyRecordList(true) + + val coordinatorConfig = configsBuilder.coordinatorConfig + .workerStateChangeListener(new WorkerStateChangeListener { + def onWorkerStateChange(newState: WorkerStateChangeListener.WorkerState): Unit = () + override def onAllInitializationAttemptsFailed(e: Throwable): Unit = + queue.put(KCLError(e)) + }) + + new Scheduler( + configsBuilder.checkpointConfig, + coordinatorConfig, + leaseManagementConfig, + configsBuilder.lifecycleConfig, + configsBuilder.metricsConfig.metricsLevel(MetricsLevel.NONE), + processorConfig, + retrievalConfig + ) + } + + private def runInBackground[F[_]: Async](scheduler: Scheduler): Resource[F, Unit] = + Sync[F].blocking(scheduler.run()).background *> Resource.onFinalize(Sync[F].blocking(scheduler.shutdown())) + + private def initialPositionOf(config: KinesisSourceConfig.InitialPosition): InitialPositionInStreamExtended = + config match { + case KinesisSourceConfig.InitialPosition.Latest => InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST) + case KinesisSourceConfig.InitialPosition.TrimHorizon => + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) + case KinesisSourceConfig.InitialPosition.AtTimestamp(instant) => + InitialPositionInStreamExtended.newInitialPositionAtTimestamp(Date.from(instant)) + } + + private def mkKinesisClient[F[_]: Sync](customEndpoint: Option[URI]): Resource[F, KinesisAsyncClient] = + Resource.fromAutoCloseable { + Sync[F].blocking { // Blocking because this might dial the EC2 metadata endpoint + val builder = + KinesisAsyncClient + .builder() + .defaultsMode(DefaultsMode.AUTO) + val customized = customEndpoint.map(builder.endpointOverride).getOrElse(builder) + customized.build + } + } + + private def mkDynamoDbClient[F[_]: Sync](customEndpoint: Option[URI]): Resource[F, DynamoDbAsyncClient] = + Resource.fromAutoCloseable { + Sync[F].blocking { // Blocking because this might dial the EC2 metadata endpoint + val builder = + DynamoDbAsyncClient + .builder() + .defaultsMode(DefaultsMode.AUTO) + val customized = customEndpoint.map(builder.endpointOverride).getOrElse(builder) + customized.build + } + } + + private def mkCloudWatchClient[F[_]: Sync](customEndpoint: Option[URI]): Resource[F, CloudWatchAsyncClient] = + Resource.fromAutoCloseable { + Sync[F].blocking { // Blocking because this might dial the EC2 metadata endpoint + val builder = + CloudWatchAsyncClient + .builder() + .defaultsMode(DefaultsMode.AUTO) + val customized = customEndpoint.map(builder.endpointOverride).getOrElse(builder) + customized.build + } + } + +} diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisCheckpointer.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisCheckpointer.scala new file mode 100644 index 00000000..7190e1fb --- /dev/null +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisCheckpointer.scala @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.sources.kinesis + +import cats.effect.{Async, Sync} +import cats.implicits._ +import cats.effect.implicits._ +import com.snowplowanalytics.snowplow.sources.internal.Checkpointer +import org.typelevel.log4cats.Logger +import software.amazon.kinesis.exceptions.ShutdownException +import software.amazon.kinesis.processor.RecordProcessorCheckpointer +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber + +import java.util.concurrent.CountDownLatch + +private class KinesisCheckpointer[F[_]: Async: Logger] extends Checkpointer[F, Map[String, Checkpointable]] { + + override val empty: Map[String, Checkpointable] = Map.empty + + override def combine(x: Map[String, Checkpointable], y: Map[String, Checkpointable]): Map[String, Checkpointable] = + x |+| y + + override def ack(c: Map[String, Checkpointable]): F[Unit] = + c.toList.parTraverse_ { + case (shardId, Checkpointable.Record(extendedSequenceNumber, checkpointer)) => + checkpointRecord(shardId, extendedSequenceNumber, checkpointer) + case (shardId, Checkpointable.ShardEnd(checkpointer, release)) => + checkpointShardEnd(shardId, checkpointer, release) + } + + override def nack(c: Map[String, Checkpointable]): F[Unit] = + Sync[F].unit + + private def checkpointShardEnd( + shardId: String, + checkpointer: RecordProcessorCheckpointer, + release: CountDownLatch + ) = + Logger[F].debug(s"Checkpointing shard $shardId at SHARD_END") *> + Sync[F].blocking(checkpointer.checkpoint()).recoverWith(ignoreShutdownExceptions(shardId)) *> + Sync[F].delay(release.countDown()) + + private def checkpointRecord( + shardId: String, + extendedSequenceNumber: ExtendedSequenceNumber, + checkpointer: RecordProcessorCheckpointer + ) = + Logger[F].debug(s"Checkpointing shard $shardId at $extendedSequenceNumber") *> + Sync[F] + .blocking( + checkpointer.checkpoint(extendedSequenceNumber.sequenceNumber, extendedSequenceNumber.subSequenceNumber) + ) + .recoverWith(ignoreShutdownExceptions(shardId)) + + private def ignoreShutdownExceptions(shardId: String): PartialFunction[Throwable, F[Unit]] = { case _: ShutdownException => + // The ShardRecordProcessor instance has been shutdown. This just means another KCL + // worker has stolen our lease. It is expected during autoscaling of instances, and is + // safe to ignore. + Logger[F].warn(s"Skipping checkpointing of shard $shardId because this worker no longer owns the lease") + } +} diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala index bded23a3..25d256c8 100644 --- a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala @@ -7,248 +7,98 @@ */ package com.snowplowanalytics.snowplow.sources.kinesis -import cats.{Applicative, Semigroup} -import cats.effect.{Async, Ref, Resource, Sync} -import cats.effect.implicits._ +import cats.effect.{Async, Ref, Sync} import cats.implicits._ -import fs2.Stream -import fs2.aws.kinesis.{CommittableRecord, Kinesis, KinesisConsumerSettings} -import org.typelevel.log4cats.{Logger, SelfAwareStructuredLogger} +import com.snowplowanalytics.snowplow.sources.SourceAndAck +import com.snowplowanalytics.snowplow.sources.internal.{LowLevelEvents, LowLevelSource} +import fs2.{Chunk, Pull, Stream} import org.typelevel.log4cats.slf4j.Slf4jLogger -import software.amazon.awssdk.awscore.defaultsmode.DefaultsMode -import software.amazon.awssdk.regions.Region -import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain -import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient -import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient -import software.amazon.awssdk.services.kinesis.KinesisAsyncClient -import software.amazon.kinesis.common.{InitialPositionInStream, InitialPositionInStreamExtended} +import org.typelevel.log4cats.{Logger, SelfAwareStructuredLogger} +import software.amazon.kinesis.lifecycle.events.{ProcessRecordsInput, ShardEndedInput} +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber -import java.net.URI -import java.util.Date -import java.util.concurrent.Semaphore +import java.util.concurrent.{CountDownLatch, SynchronousQueue} import scala.concurrent.duration.FiniteDuration - -// kinesis -import software.amazon.kinesis.common.ConfigsBuilder -import software.amazon.kinesis.coordinator.Scheduler -import software.amazon.kinesis.exceptions.ShutdownException -import software.amazon.kinesis.metrics.MetricsLevel -import software.amazon.kinesis.processor.{ShardRecordProcessorFactory, SingleStreamTracker} -import software.amazon.kinesis.retrieval.fanout.FanOutConfig -import software.amazon.kinesis.retrieval.polling.PollingConfig - -// snowplow -import com.snowplowanalytics.snowplow.sources.internal.{Checkpointer, LowLevelEvents, LowLevelSource} -import com.snowplowanalytics.snowplow.sources.SourceAndAck +import scala.jdk.CollectionConverters._ object KinesisSource { private implicit def logger[F[_]: Sync]: SelfAwareStructuredLogger[F] = Slf4jLogger.getLogger[F] def build[F[_]: Async](config: KinesisSourceConfig): F[SourceAndAck[F]] = - Ref.ofEffect(Sync[F].realTime).flatMap { livenessRef => - LowLevelSource.toSourceAndAck(lowLevel(config, livenessRef)) - } - - private type KinesisCheckpointer[F[_]] = Checkpointer[F, Map[String, KinesisMetadata[F]]] - - private implicit class RichCommitableRecord(val cr: CommittableRecord) extends AnyVal { - def toMetadata[F[_]: Sync]: KinesisMetadata[F] = - KinesisMetadata(cr.shardId, cr.sequenceNumber, cr.isLastInShard, cr.lastRecordSemaphore, cr.checkpoint) - } - - private final case class KinesisMetadata[F[_]]( - shardId: String, - sequenceNumber: String, - isLastInShard: Boolean, - lastRecordSemaphore: Semaphore, - ack: F[Unit] - ) - - private def lowLevel[F[_]: Async]( - config: KinesisSourceConfig, - livenessRef: Ref[F, FiniteDuration] - ): LowLevelSource[F, Map[String, KinesisMetadata[F]]] = - new LowLevelSource[F, Map[String, KinesisMetadata[F]]] { - def checkpointer: KinesisCheckpointer[F] = kinesisCheckpointer[F] - - def stream: Stream[F, Stream[F, LowLevelEvents[Map[String, KinesisMetadata[F]]]]] = - Stream.emit(kinesisStream(config, livenessRef)) - - def lastLiveness: F[FiniteDuration] = - livenessRef.get - } - - private implicit def metadataSemigroup[F[_]]: Semigroup[KinesisMetadata[F]] = new Semigroup[KinesisMetadata[F]] { - override def combine(x: KinesisMetadata[F], y: KinesisMetadata[F]): KinesisMetadata[F] = - if (x.sequenceNumber > y.sequenceNumber) x else y - } - - private def kinesisCheckpointer[F[_]: Async]: KinesisCheckpointer[F] = new KinesisCheckpointer[F] { - def combine(x: Map[String, KinesisMetadata[F]], y: Map[String, KinesisMetadata[F]]): Map[String, KinesisMetadata[F]] = - x |+| y - - val empty: Map[String, KinesisMetadata[F]] = Map.empty - def ack(c: Map[String, KinesisMetadata[F]]): F[Unit] = - c.values.toList - .parTraverse_ { metadata => - metadata.ack - .recoverWith { - case _: ShutdownException => - // The ShardRecordProcessor instance has been shutdown. This just means another KCL - // worker has stolen our lease. It is expected during autoscaling of instances, and is - // safe to ignore. - Logger[F].warn(s"Skipping checkpointing of shard ${metadata.shardId} because this worker no longer owns the lease") + Ref.ofEffect(Sync[F].realTime).flatMap { liveness => + LowLevelSource.toSourceAndAck { + new LowLevelSource[F, Map[String, Checkpointable]] { + def stream: Stream[F, Stream[F, LowLevelEvents[Map[String, Checkpointable]]]] = + kinesisStream(config, liveness) - case _: IllegalArgumentException if metadata.isLastInShard => - // See https://github.com/snowplow/enrich/issues/657 - // This can happen at the shard end when KCL no longer allows checkpointing of the last record in the shard. - // We need to release the semaphore, so that fs2-aws handles checkpointing the end of the shard. - Logger[F].warn( - s"Checkpointing failed on last record in shard. Ignoring error and instead try checkpointing of the shard end" - ) *> - Sync[F].delay(metadata.lastRecordSemaphore.release()) + def checkpointer: KinesisCheckpointer[F] = + new KinesisCheckpointer[F]() - case _: IllegalArgumentException if metadata.lastRecordSemaphore.availablePermits === 0 => - // See https://github.com/snowplow/enrich/issues/657 and https://github.com/snowplow/enrich/pull/658 - // This can happen near the shard end, e.g. the penultimate batch in the shard, when KCL has already enqueued the final record in the shard to the fs2 queue. - // We must not release the semaphore yet, because we are not ready for fs2-aws to checkpoint the end of the shard. - // We can safely ignore the exception and move on. - Logger[F].warn( - s"Checkpointing failed on a record which was not the last in the shard. Meanwhile, KCL has already enqueued the final record in the shard to the fs2 queue. Ignoring error and instead continue processing towards the shard end" - ) - } + def lastLiveness: F[FiniteDuration] = + liveness.get } - def nack(c: Map[String, KinesisMetadata[F]]): F[Unit] = Applicative[F].unit - } + } + } private def kinesisStream[F[_]: Async]( config: KinesisSourceConfig, - livenessRef: Ref[F, FiniteDuration] - ): Stream[F, LowLevelEvents[Map[String, KinesisMetadata[F]]]] = + liveness: Ref[F, FiniteDuration] + ): Stream[F, Stream[F, LowLevelEvents[Map[String, Checkpointable]]]] = { + val actionQueue = new SynchronousQueue[KCLAction]() for { - region <- Stream.eval(Sync[F].delay((new DefaultAwsRegionProviderChain).getRegion)) - consumerSettings = KinesisConsumerSettings( - config.streamName, - config.appName, - region, - bufferSize = config.bufferSize - ) - kinesisClient <- Stream.resource(mkKinesisClient[F](region, config.customEndpoint)) - dynamoClient <- Stream.resource(mkDynamoDbClient[F](region, config.dynamodbCustomEndpoint)) - cloudWatchClient <- Stream.resource(mkCloudWatchClient[F](region, config.cloudwatchCustomEndpoint)) - kinesis = Kinesis.create(scheduler(kinesisClient, dynamoClient, cloudWatchClient, config, _)) - chunk <- kinesis.readChunkedFromKinesisStream(consumerSettings) - now <- Stream.eval(Sync[F].realTime) - _ <- Stream.eval(livenessRef.set(now)) - if chunk.nonEmpty - } yield { - val ack = chunk.asSeq - .groupBy(_.shardId) - .map { case (k, records) => - k -> records.maxBy(_.sequenceNumber).toMetadata[F] - } - .toMap - val earliestTstamp = chunk.iterator.map(_.record.approximateArrivalTimestamp).min - LowLevelEvents(chunk.map(_.record.data()), ack, Some(earliestTstamp)) - } + _ <- Stream.resource(KCLScheduler.populateQueue[F](config, actionQueue)) + events <- Stream.emit(pullFromQueue(actionQueue, liveness).stream).repeat + } yield events + } - private def initialPositionOf(config: KinesisSourceConfig.InitialPosition): InitialPositionInStreamExtended = - config match { - case KinesisSourceConfig.InitialPosition.Latest => InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST) - case KinesisSourceConfig.InitialPosition.TrimHorizon => - InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) - case KinesisSourceConfig.InitialPosition.AtTimestamp(instant) => - InitialPositionInStreamExtended.newInitialPositionAtTimestamp(Date.from(instant)) + private def pullFromQueue[F[_]: Sync]( + queue: SynchronousQueue[KCLAction], + liveness: Ref[F, FiniteDuration] + ): Pull[F, LowLevelEvents[Map[String, Checkpointable]], Unit] = + Pull.eval(resolveNextAction(queue, liveness)).flatMap { + case KCLAction.ProcessRecords(_, processRecordsInput) if processRecordsInput.records.asScala.isEmpty => + pullFromQueue[F](queue, liveness) + case KCLAction.ProcessRecords(shardId, processRecordsInput) => + Pull.output1(provideNextChunk(shardId, processRecordsInput)).covary[F] *> pullFromQueue[F](queue, liveness) + case KCLAction.ShardEnd(shardId, await, shardEndedInput) => + handleShardEnd[F](shardId, await, shardEndedInput) *> Pull.done + case KCLAction.KCLError(t) => + Pull.eval(Logger[F].error(t)("Exception from Kinesis source")) *> Pull.raiseError[F](t) } - private def scheduler[F[_]: Sync]( - kinesisClient: KinesisAsyncClient, - dynamoDbClient: DynamoDbAsyncClient, - cloudWatchClient: CloudWatchAsyncClient, - kinesisConfig: KinesisSourceConfig, - recordProcessorFactory: ShardRecordProcessorFactory - ): F[Scheduler] = - Sync[F].delay { - val configsBuilder = - new ConfigsBuilder( - kinesisConfig.streamName, - kinesisConfig.appName, - kinesisClient, - dynamoDbClient, - cloudWatchClient, - kinesisConfig.workerIdentifier, - recordProcessorFactory - ) - - val retrievalConfig = - configsBuilder.retrievalConfig - .streamTracker(new SingleStreamTracker(kinesisConfig.streamName, initialPositionOf(kinesisConfig.initialPosition))) - .retrievalSpecificConfig { - kinesisConfig.retrievalMode match { - case KinesisSourceConfig.Retrieval.FanOut => - new FanOutConfig(kinesisClient).streamName(kinesisConfig.streamName).applicationName(kinesisConfig.appName) - case KinesisSourceConfig.Retrieval.Polling(maxRecords) => - new PollingConfig(kinesisConfig.streamName, kinesisClient).maxRecords(maxRecords) - } - } - - val leaseManagementConfig = - configsBuilder.leaseManagementConfig - .failoverTimeMillis(kinesisConfig.leaseDuration.toMillis) - - // We ask to see empty batches, so that we can update the health check even when there are no records in the stream - val processorConfig = - configsBuilder.processorConfig - .callProcessRecordsEvenForEmptyRecordList(true) - - new Scheduler( - configsBuilder.checkpointConfig, - configsBuilder.coordinatorConfig, - leaseManagementConfig, - configsBuilder.lifecycleConfig, - configsBuilder.metricsConfig.metricsLevel(MetricsLevel.NONE), - processorConfig, - retrievalConfig - ) + private def resolveNextAction[F[_]: Sync](queue: SynchronousQueue[KCLAction], liveness: Ref[F, FiniteDuration]): F[KCLAction] = { + val nextAction = Sync[F].delay(Option[KCLAction](queue.poll)).flatMap { + case Some(action) => Sync[F].pure(action) + case None => Sync[F].interruptible(queue.take) } + nextAction <* updateLiveness(liveness) + } - private def mkKinesisClient[F[_]: Sync](region: Region, customEndpoint: Option[URI]): Resource[F, KinesisAsyncClient] = - Resource.fromAutoCloseable { - Sync[F].delay { - val builder = - KinesisAsyncClient - .builder() - .region(region) - .defaultsMode(DefaultsMode.AUTO) - val customized = customEndpoint.map(builder.endpointOverride).getOrElse(builder) - customized.build - } - } + private def updateLiveness[F[_]: Sync](liveness: Ref[F, FiniteDuration]): F[Unit] = + Sync[F].realTime.flatMap(now => liveness.set(now)) + + private def provideNextChunk(shardId: String, input: ProcessRecordsInput) = { + val chunk = Chunk.javaList(input.records()).map(_.data()) + val lastRecord = input.records.asScala.last // last is safe because we handled the empty case above + val checkpointable = Checkpointable.Record( + new ExtendedSequenceNumber(lastRecord.sequenceNumber, lastRecord.subSequenceNumber), + input.checkpointer + ) + LowLevelEvents(chunk, Map[String, Checkpointable](shardId -> checkpointable), Some(lastRecord.approximateArrivalTimestamp)) + } - private def mkDynamoDbClient[F[_]: Sync](region: Region, customEndpoint: Option[URI]): Resource[F, DynamoDbAsyncClient] = - Resource.fromAutoCloseable { - Sync[F].delay { - val builder = - DynamoDbAsyncClient - .builder() - .region(region) - .defaultsMode(DefaultsMode.AUTO) - val customized = customEndpoint.map(builder.endpointOverride).getOrElse(builder) - customized.build - } - } + private def handleShardEnd[F[_]: Sync]( + shardId: String, + await: CountDownLatch, + shardEndedInput: ShardEndedInput + ) = { + val checkpointable = Checkpointable.ShardEnd(shardEndedInput.checkpointer, await) + val last = LowLevelEvents(Chunk.empty, Map[String, Checkpointable](shardId -> checkpointable), None) + Pull + .eval(Logger[F].info(s"Ending this window of events early because reached the end of Kinesis shard $shardId")) + .covaryOutput *> + Pull.output1(last).covary[F] + } - private def mkCloudWatchClient[F[_]: Sync](region: Region, customEndpoint: Option[URI]): Resource[F, CloudWatchAsyncClient] = - Resource.fromAutoCloseable { - Sync[F].delay { - val builder = - CloudWatchAsyncClient - .builder() - .region(region) - .defaultsMode(DefaultsMode.AUTO) - val customized = customEndpoint.map(builder.endpointOverride).getOrElse(builder) - customized.build - } - } } diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfig.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfig.scala index 33161b57..5667e204 100644 --- a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfig.scala +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfig.scala @@ -7,8 +7,6 @@ */ package com.snowplowanalytics.snowplow.sources.kinesis -import eu.timepit.refined.types.all.PosInt - import io.circe._ import io.circe.config.syntax._ import io.circe.generic.extras.semiauto.deriveConfiguredDecoder @@ -24,7 +22,6 @@ case class KinesisSourceConfig( workerIdentifier: String, initialPosition: KinesisSourceConfig.InitialPosition, retrievalMode: KinesisSourceConfig.Retrieval, - bufferSize: PosInt, customEndpoint: Option[URI], dynamodbCustomEndpoint: Option[URI], cloudwatchCustomEndpoint: Option[URI], @@ -33,8 +30,6 @@ case class KinesisSourceConfig( object KinesisSourceConfig { - private implicit val posIntDecoder: Decoder[PosInt] = Decoder.decodeInt.emap(PosInt.from) - sealed trait InitialPosition object InitialPosition { diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/ShardRecordProcessor.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/ShardRecordProcessor.scala new file mode 100644 index 00000000..8b67637b --- /dev/null +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/ShardRecordProcessor.scala @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.sources.kinesis + +import software.amazon.kinesis.lifecycle.events.{ + InitializationInput, + LeaseLostInput, + ProcessRecordsInput, + ShardEndedInput, + ShutdownRequestedInput +} +import software.amazon.kinesis.processor.{ShardRecordProcessor => KCLShardProcessor} + +import java.util.concurrent.{CountDownLatch, SynchronousQueue} +import java.util.concurrent.atomic.AtomicReference + +private[kinesis] object ShardRecordProcessor { + + def apply( + queue: SynchronousQueue[KCLAction], + currentShardIds: AtomicReference[Set[String]] + ): KCLShardProcessor = new KCLShardProcessor { + private var shardId: String = _ + + override def initialize(initializationInput: InitializationInput): Unit = { + shardId = initializationInput.shardId + val oldSet = currentShardIds.getAndUpdate(_ + shardId) + if (oldSet.contains(shardId)) { + // This is a rare edge-case scenario. Three things must all happen to hit this scenario: + // 1. KCL fails to renew a lease due to some transient runtime error + // 2. KCL re-aquires the lost lease for the same shard + // 3. The original ShardRecordProcessor is not terminated until after KCL re-aquires the lease + // This is a very unhealthy state, so we should kill the app. + val action = KCLAction.KCLError(new RuntimeException(s"Refusing to initialize a duplicate record processor for shard $shardId")) + queue.put(action) + } + } + + override def shardEnded(shardEndedInput: ShardEndedInput): Unit = { + val countDownLatch = new CountDownLatch(1) + queue.put(KCLAction.ShardEnd(shardId, countDownLatch, shardEndedInput)) + countDownLatch.await() + currentShardIds.updateAndGet(_ - shardId) + () + } + + override def processRecords(processRecordsInput: ProcessRecordsInput): Unit = { + val action = KCLAction.ProcessRecords(shardId, processRecordsInput) + queue.put(action) + } + + override def leaseLost(leaseLostInput: LeaseLostInput): Unit = { + currentShardIds.updateAndGet(_ - shardId) + () + } + + override def shutdownRequested(shutdownRequestedInput: ShutdownRequestedInput): Unit = () + + } +} diff --git a/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfigSpec.scala b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfigSpec.scala index 892236f2..ab0c4d6c 100644 --- a/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfigSpec.scala +++ b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfigSpec.scala @@ -9,7 +9,6 @@ package com.snowplowanalytics.snowplow.sources.kinesis import io.circe.literal._ import com.typesafe.config.ConfigFactory -import eu.timepit.refined.types.all.PosInt import io.circe.config.syntax.CirceConfigOps import io.circe.Decoder import io.circe.generic.semiauto._ @@ -40,7 +39,6 @@ class KinesisSourceConfigSpec extends Specification { "initialPosition": { "type": "TrimHorizon" }, - "bufferSize": 42, "leaseDuration": "20 seconds" } """ @@ -52,7 +50,6 @@ class KinesisSourceConfigSpec extends Specification { c.workerIdentifier must beEqualTo("my-identifier"), c.initialPosition must beEqualTo(KinesisSourceConfig.InitialPosition.TrimHorizon), c.retrievalMode must beEqualTo(KinesisSourceConfig.Retrieval.Polling(42)), - c.bufferSize.value must beEqualTo(42), c.leaseDuration must beEqualTo(20.seconds) ).reduce(_ and _) } @@ -71,7 +68,6 @@ class KinesisSourceConfigSpec extends Specification { "initialPosition": { "type": "TRIM_HORIZON" }, - "bufferSize": 42, "leaseDuration": "20 seconds" } """ @@ -83,7 +79,6 @@ class KinesisSourceConfigSpec extends Specification { c.workerIdentifier must beEqualTo("my-identifier"), c.initialPosition must beEqualTo(KinesisSourceConfig.InitialPosition.TrimHorizon), c.retrievalMode must beEqualTo(KinesisSourceConfig.Retrieval.Polling(42)), - c.bufferSize.value must beEqualTo(42), c.leaseDuration must beEqualTo(20.seconds) ).reduce(_ and _) } @@ -108,7 +103,6 @@ class KinesisSourceConfigSpec extends Specification { workerIdentifier = System.getenv("HOSTNAME"), initialPosition = KinesisSourceConfig.InitialPosition.Latest, retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000), - bufferSize = PosInt.unsafeFrom(1), customEndpoint = None, dynamodbCustomEndpoint = None, cloudwatchCustomEndpoint = None, diff --git a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala index dbdb5ae4..3ac01bb6 100644 --- a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala +++ b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala @@ -248,18 +248,18 @@ private[sources] object LowLevelSource { */ private def timedWindows[F[_]: Async, A](config: EventProcessingConfig.TimedWindows): Pipe[F, A, Stream[F, A]] = { def go(timedPull: Pull.Timed[F, A], current: Option[Queue[F, Option[A]]]): Pull[F, Stream[F, A], Unit] = - timedPull.uncons.flatMap { - case None => + timedPull.uncons.attempt.flatMap { + case Right(None) => current match { case None => Pull.done case Some(q) => Pull.eval(q.offer(None)) >> Pull.done } - case Some((Left(_), next)) => + case Right(Some((Left(_), next))) => current match { case None => go(next, None) case Some(q) => Pull.eval(q.offer(None)) >> go(next, None) } - case Some((Right(chunk), next)) => + case Right(Some((Right(chunk), next))) => current match { case None => val pull = for { @@ -272,6 +272,11 @@ private[sources] object LowLevelSource { case Some(q) => Pull.eval(chunk.traverse(a => q.offer(Some(a)))) >> go(next, Some(q)) } + case Left(throwable) => + current match { + case None => Pull.raiseError[F](throwable) + case Some(q) => Pull.eval(q.offer(None)) >> Pull.raiseError[F](throwable) + } } in => diff --git a/project/Dependencies.scala b/project/Dependencies.scala index f9233a97..c905f44f 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -24,12 +24,10 @@ object Dependencies { val betterMonadicFor = "0.3.1" val kindProjector = "0.13.2" val collectionCompat = "2.11.0" - val refined = "0.11.1" // Streams val fs2Kafka = "3.4.0" val pubsub = "1.127.3" - val fs2AwsKinesis = "4.1.0" val awsSdk2 = "2.25.16" val kinesisClient = "2.5.7" @@ -67,18 +65,14 @@ object Dependencies { val betterMonadicFor = "com.olegpy" %% "better-monadic-for" % V.betterMonadicFor val kindProjector = "org.typelevel" %% "kind-projector" % V.kindProjector cross CrossVersion.full val collectionCompat = "org.scala-lang.modules" %% "scala-collection-compat" % V.collectionCompat - val refined = "eu.timepit" %% "refined" % V.refined // streams - val fs2Kafka = "com.github.fd4s" %% "fs2-kafka" % V.fs2Kafka - val pubsub = "com.google.cloud" % "google-cloud-pubsub" % V.pubsub - val fs2AwsKinesis = ("io.laserdisc" %% "fs2-aws-kinesis" % V.fs2AwsKinesis) - .exclude("software.amazon.kinesis", "amazon-kinesis-client") - .exclude("com.amazonaws", "amazon-kinesis-producer") - val arnsSdk2 = "software.amazon.awssdk" % "arns" % V.awsSdk2 - val kinesisSdk2 = "software.amazon.awssdk" % "kinesis" % V.awsSdk2 - val dynamoDbSdk2 = "software.amazon.awssdk" % "dynamodb" % V.awsSdk2 - val cloudwatchSdk2 = "software.amazon.awssdk" % "cloudwatch" % V.awsSdk2 + val fs2Kafka = "com.github.fd4s" %% "fs2-kafka" % V.fs2Kafka + val pubsub = "com.google.cloud" % "google-cloud-pubsub" % V.pubsub + val arnsSdk2 = "software.amazon.awssdk" % "arns" % V.awsSdk2 + val kinesisSdk2 = "software.amazon.awssdk" % "kinesis" % V.awsSdk2 + val dynamoDbSdk2 = "software.amazon.awssdk" % "dynamodb" % V.awsSdk2 + val cloudwatchSdk2 = "software.amazon.awssdk" % "cloudwatch" % V.awsSdk2 val kinesisClient = ("software.amazon.kinesis" % "amazon-kinesis-client" % V.kinesisClient) .exclude("com.amazonaws", "amazon-kinesis-producer") .exclude("software.amazon.glue", "schema-registry-build-tools") @@ -124,12 +118,10 @@ object Dependencies { val kinesisDependencies = Seq( kinesisClient, - fs2AwsKinesis, arnsSdk2, kinesisSdk2, dynamoDbSdk2, cloudwatchSdk2, - refined, circeConfig, circeGeneric, circeGenericExtra,