From aa35e0888e977ff6ba539178c995c353c88afa5d Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Thu, 9 Jan 2025 10:40:15 +0000 Subject: [PATCH] Enable fairness on Kinesis Source's SynchronousQueue (#105) This fixes a possible problem in which some Kinesis shards progress more quickly than others. Each KCL shard processor has its own thread, and each calls `queue.put()` on the synchronous queue. It is possible that by luck some shard/threads were having their `put()` accepted by a consumer more quickly. --- .../snowplow/sources/kinesis/KinesisSource.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala index 988b1dd..1b01614 100644 --- a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala @@ -42,11 +42,14 @@ object KinesisSource { } } + // We enable fairness on the `SynchronousQueue` to ensure all Kinesis shards are sourced at an equal rate. + private val synchronousQueueFairness: Boolean = true + private def kinesisStream[F[_]: Async]( config: KinesisSourceConfig, liveness: Ref[F, FiniteDuration] ): Stream[F, Stream[F, LowLevelEvents[Map[String, Checkpointable]]]] = { - val actionQueue = new SynchronousQueue[KCLAction]() + val actionQueue = new SynchronousQueue[KCLAction](synchronousQueueFairness) for { _ <- Stream.resource(KCLScheduler.populateQueue[F](config, actionQueue)) events <- Stream.emit(pullFromQueueAndEmit(actionQueue, liveness).stream).repeat