diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala index 988b1dd..1b01614 100644 --- a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala @@ -42,11 +42,14 @@ object KinesisSource { } } + // We enable fairness on the `SynchronousQueue` to ensure all Kinesis shards are sourced at an equal rate. + private val synchronousQueueFairness: Boolean = true + private def kinesisStream[F[_]: Async]( config: KinesisSourceConfig, liveness: Ref[F, FiniteDuration] ): Stream[F, Stream[F, LowLevelEvents[Map[String, Checkpointable]]]] = { - val actionQueue = new SynchronousQueue[KCLAction]() + val actionQueue = new SynchronousQueue[KCLAction](synchronousQueueFairness) for { _ <- Stream.resource(KCLScheduler.populateQueue[F](config, actionQueue)) events <- Stream.emit(pullFromQueueAndEmit(actionQueue, liveness).stream).repeat