Skip to content

Commit

Permalink
Enable fairness on Kinesis Source's SynchronousQueue (#105)
Browse files Browse the repository at this point in the history
This fixes a possible problem in which some Kinesis shards progress more
quickly than others. Each KCL shard processor has its own thread, and
each calls `queue.put()` on the synchronous queue.  It is possible that
by luck some shard/threads were having their `put()` accepted by a
consumer more quickly.
  • Loading branch information
istreeter authored Jan 9, 2025
1 parent d5f509b commit aa35e08
Showing 1 changed file with 4 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,14 @@ object KinesisSource {
}
}

// We enable fairness on the `SynchronousQueue` to ensure all Kinesis shards are sourced at an equal rate.
private val synchronousQueueFairness: Boolean = true

private def kinesisStream[F[_]: Async](
config: KinesisSourceConfig,
liveness: Ref[F, FiniteDuration]
): Stream[F, Stream[F, LowLevelEvents[Map[String, Checkpointable]]]] = {
val actionQueue = new SynchronousQueue[KCLAction]()
val actionQueue = new SynchronousQueue[KCLAction](synchronousQueueFairness)
for {
_ <- Stream.resource(KCLScheduler.populateQueue[F](config, actionQueue))
events <- Stream.emit(pullFromQueueAndEmit(actionQueue, liveness).stream).repeat
Expand Down

0 comments on commit aa35e08

Please sign in to comment.