Skip to content

Commit

Permalink
Kinesis source improve management of shard ends (#102)
Browse files Browse the repository at this point in the history
In the Kinesis Source, we terminate the inner stream of events
whenever we reach the end of a Kinesis shard. Terminating the inner
stream is important, because it forces the application to fully process
and checkpoint any outstanding events, and this unblocks our KCL record
processor from checkpointing the end of the shard.

Before this PR, we terminated the inner stream for _every_ shard end. But
terminating the stream is quite inefficient, and during a re-sharding we
probably reach many shard ends at similar time. This PR changes things
so we try to handle many shard ends at the same time. During
re-sharding, this should reduce the number of times we need to terminate
the inner stream.
  • Loading branch information
istreeter authored Jan 3, 2025
1 parent f2c9fac commit 82d3b8a
Showing 1 changed file with 55 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package com.snowplowanalytics.snowplow.sources.kinesis

import cats.effect.{Async, Ref, Sync}
import cats.data.NonEmptyList
import cats.implicits._
import com.snowplowanalytics.snowplow.sources.SourceAndAck
import com.snowplowanalytics.snowplow.sources.internal.{LowLevelEvents, LowLevelSource}
Expand Down Expand Up @@ -48,32 +49,68 @@ object KinesisSource {
val actionQueue = new SynchronousQueue[KCLAction]()
for {
_ <- Stream.resource(KCLScheduler.populateQueue[F](config, actionQueue))
events <- Stream.emit(pullFromQueue(actionQueue, liveness).stream).repeat
events <- Stream.emit(pullFromQueueAndEmit(actionQueue, liveness).stream).repeat
} yield events
}

private def pullFromQueue[F[_]: Sync](
private def pullFromQueueAndEmit[F[_]: Sync](
queue: SynchronousQueue[KCLAction],
liveness: Ref[F, FiniteDuration]
): Pull[F, LowLevelEvents[Map[String, Checkpointable]], Unit] =
Pull.eval(resolveNextAction(queue, liveness)).flatMap {
case KCLAction.ProcessRecords(_, processRecordsInput) if processRecordsInput.records.asScala.isEmpty =>
pullFromQueue[F](queue, liveness)
case KCLAction.ProcessRecords(shardId, processRecordsInput) =>
Pull.output1(provideNextChunk(shardId, processRecordsInput)).covary[F] *> pullFromQueue[F](queue, liveness)
case KCLAction.ShardEnd(shardId, await, shardEndedInput) =>
handleShardEnd[F](shardId, await, shardEndedInput) *> Pull.done
case KCLAction.KCLError(t) =>
Pull.eval(Logger[F].error(t)("Exception from Kinesis source")) *> Pull.raiseError[F](t)
Pull.eval(pullFromQueue(queue, liveness)).flatMap { case PullFromQueueResult(actions, hasShardEnd) =>
val toEmit = actions.traverse {
case KCLAction.ProcessRecords(_, processRecordsInput) if processRecordsInput.records.asScala.isEmpty =>
Pull.done
case KCLAction.ProcessRecords(shardId, processRecordsInput) =>
Pull.output1(provideNextChunk(shardId, processRecordsInput)).covary[F]
case KCLAction.ShardEnd(shardId, await, shardEndedInput) =>
handleShardEnd[F](shardId, await, shardEndedInput)
case KCLAction.KCLError(t) =>
Pull.eval(Logger[F].error(t)("Exception from Kinesis source")) *> Pull.raiseError[F](t)
}
if (hasShardEnd) {
val log = Logger[F].info {
actions
.collect { case KCLAction.ShardEnd(shardId, _, _) =>
shardId
}
.mkString("Ending this window of events early because reached the end of Kinesis shards: ", ",", "")
}
Pull.eval(log).covaryOutput *> toEmit *> Pull.done
} else
toEmit *> pullFromQueueAndEmit(queue, liveness)
}

private def resolveNextAction[F[_]: Sync](queue: SynchronousQueue[KCLAction], liveness: Ref[F, FiniteDuration]): F[KCLAction] = {
val nextAction = Sync[F].delay(Option[KCLAction](queue.poll)).flatMap {
private case class PullFromQueueResult(actions: NonEmptyList[KCLAction], hasShardEnd: Boolean)

private def pullFromQueue[F[_]: Sync](queue: SynchronousQueue[KCLAction], liveness: Ref[F, FiniteDuration]): F[PullFromQueueResult] =
resolveNextAction(queue)
.productL(updateLiveness(liveness))
.flatMap {
case shardEnd: KCLAction.ShardEnd =>
// If we reached the end of one shard, it is likely we reached the end of other shards too.
// Therefore pull more actions from the queue, to minimize the number of times we need to do
// an early close of the inner stream.
resolveAllActions(queue).map { more =>
PullFromQueueResult(NonEmptyList(shardEnd, more), hasShardEnd = true)
}
case other =>
PullFromQueueResult(NonEmptyList.one(other), hasShardEnd = false).pure[F]
}

/** Always returns a `KCLAction`, possibly waiting until one is available */
private def resolveNextAction[F[_]: Sync](queue: SynchronousQueue[KCLAction]): F[KCLAction] =
Sync[F].delay(Option[KCLAction](queue.poll)).flatMap {
case Some(action) => Sync[F].pure(action)
case None => Sync[F].interruptible(queue.take)
}
nextAction <* updateLiveness(liveness)
}

/** Returns immediately, but the `List[KCLAction]` might be empty */
private def resolveAllActions[F[_]: Sync](queue: SynchronousQueue[KCLAction]): F[List[KCLAction]] =
for {
ret <- Sync[F].delay(new java.util.ArrayList[KCLAction]())
_ <- Sync[F].delay(queue.drainTo(ret))
} yield ret.asScala.toList

private def updateLiveness[F[_]: Sync](liveness: Ref[F, FiniteDuration]): F[Unit] =
Sync[F].realTime.flatMap(now => liveness.set(now))
Expand All @@ -89,17 +126,14 @@ object KinesisSource {
LowLevelEvents(chunk, Map[String, Checkpointable](shardId -> checkpointable), Some(firstRecord.approximateArrivalTimestamp))
}

private def handleShardEnd[F[_]: Sync](
private def handleShardEnd[F[_]](
shardId: String,
await: CountDownLatch,
shardEndedInput: ShardEndedInput
) = {
): Pull[F, LowLevelEvents[Map[String, Checkpointable]], Unit] = {
val checkpointable = Checkpointable.ShardEnd(shardEndedInput.checkpointer, await)
val last = LowLevelEvents(Chunk.empty, Map[String, Checkpointable](shardId -> checkpointable), None)
Pull
.eval(Logger[F].info(s"Ending this window of events early because reached the end of Kinesis shard $shardId"))
.covaryOutput *>
Pull.output1(last).covary[F]
Pull.output1(last)
}

}

0 comments on commit 82d3b8a

Please sign in to comment.