Skip to content

Commit

Permalink
Debounce how often we checkpoint progress
Browse files Browse the repository at this point in the history
In snowplow-incubator/snowflake-loader#57 we added code to the snowflake
loader so it checkpoints once every 10 seconds instead of once per
batch. This meant we could decrease the write-throughput requirements of
the DynamoDB table.

This commit moves the logic over here into common-streams so that all
loaders get the benefit of this improvement.
  • Loading branch information
istreeter committed Jan 17, 2025
1 parent 17a9360 commit f09fb48
Show file tree
Hide file tree
Showing 12 changed files with 271 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ object Utils {
Some(endpoint),
10.seconds,
BigDecimal(1.0),
BackoffPolicy(100.millis, 1.second)
BackoffPolicy(100.millis, 1.second),
10.seconds
)

def getKinesisSinkConfig(endpoint: URI)(streamName: String): KinesisSinkConfig = KinesisSinkConfig(
Expand Down
1 change: 1 addition & 0 deletions modules/kafka/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ snowplow.defaults {
"sasl.mechanism": "OAUTHBEARER"
"sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
}
debounceCommitOffsets: "10 seconds"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger
import scala.reflect._

import java.nio.ByteBuffer
import scala.concurrent.duration.DurationLong
import scala.concurrent.duration.{DurationLong, FiniteDuration}

// kafka
import fs2.kafka._
Expand Down Expand Up @@ -49,6 +49,8 @@ object KafkaSource {

def stream: Stream[F, Stream[F, Option[LowLevelEvents[KafkaCheckpoints[F]]]]] =
kafkaStream(config, authHandlerClass)

def debounceCheckpoints: FiniteDuration = config.debounceCommitOffsets
}

case class OffsetAndCommit[F[_]](offset: Long, commit: F[Unit])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,22 @@ package com.snowplowanalytics.snowplow.sources.kafka

import io.circe.Decoder
import io.circe.generic.semiauto._
import io.circe.config.syntax._

import scala.concurrent.duration.FiniteDuration

/**
* Config to be supplied from the app's hocon
*
* @param debounceCommitOffsets
* How frequently to commit our progress back to kafka. By increasing this value, we decrease the
* number of requests made to the kafka broker.
*/
case class KafkaSourceConfig(
topicName: String,
bootstrapServers: String,
consumerConf: Map[String, String]
consumerConf: Map[String, String],
debounceCommitOffsets: FiniteDuration
)

object KafkaSourceConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import io.circe.Decoder
import io.circe.generic.semiauto._
import org.specs2.Specification

import scala.concurrent.duration.DurationLong

class KafkaSourceConfigSpec extends Specification {
import KafkaSourceConfigSpec._

Expand Down Expand Up @@ -50,7 +52,8 @@ class KafkaSourceConfigSpec extends Specification {
"security.protocol" -> "SASL_SSL",
"sasl.mechanism" -> "OAUTHBEARER",
"sasl.jaas.config" -> "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
)
),
debounceCommitOffsets = 10.seconds
)

result.as[Wrapper] must beRight.like { case w: Wrapper =>
Expand Down
1 change: 1 addition & 0 deletions modules/kinesis/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ snowplow.defaults: {
minBackoff: "100 millis"
maxBackoff: "1 second"
}
debounceCheckpoints: "10 seconds"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import software.amazon.kinesis.lifecycle.events.{ProcessRecordsInput, ShardEnded
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber

import java.util.concurrent.{CountDownLatch, SynchronousQueue}
import scala.concurrent.duration.DurationLong
import scala.concurrent.duration.{DurationLong, FiniteDuration}
import scala.jdk.CollectionConverters._

object KinesisSource {
Expand All @@ -34,6 +34,8 @@ object KinesisSource {

def checkpointer: KinesisCheckpointer[F] =
new KinesisCheckpointer[F](config.checkpointThrottledBackoffPolicy)

def debounceCheckpoints: FiniteDuration = config.debounceCheckpoints
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ import com.snowplowanalytics.snowplow.kinesis.BackoffPolicy
* up/down and pod-rotation, we want the app to be quick to acquire shard-leases to process. With
* bigger instances (more cores/processors) we tend to have more shard-leases per instance, so we
* increase how aggressively it acquires leases.
* @param debounceCheckpoints
* How frequently to checkpoint our progress to the DynamoDB table. By increasing this value we
* can decrease the write-throughput requirements of the DynamoDB table.
*
* Other params are self-explanatory
*/
Expand All @@ -49,7 +52,8 @@ case class KinesisSourceConfig(
cloudwatchCustomEndpoint: Option[URI],
leaseDuration: FiniteDuration,
maxLeasesToStealAtOneTimeFactor: BigDecimal,
checkpointThrottledBackoffPolicy: BackoffPolicy
checkpointThrottledBackoffPolicy: BackoffPolicy,
debounceCheckpoints: FiniteDuration
)

object KinesisSourceConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class KinesisSourceConfigSpec extends Specification {
"checkpointThrottledBackoffPolicy": {
"minBackoff": "100 millis",
"maxBackoff": "1second"
}
},
"debounceCheckpoints": "42 seconds"
}
"""

Expand All @@ -59,7 +60,8 @@ class KinesisSourceConfigSpec extends Specification {
c.retrievalMode must beEqualTo(KinesisSourceConfig.Retrieval.Polling(42)),
c.leaseDuration must beEqualTo(20.seconds),
c.maxLeasesToStealAtOneTimeFactor must beEqualTo(BigDecimal(0.42)),
c.checkpointThrottledBackoffPolicy must beEqualTo(BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second))
c.checkpointThrottledBackoffPolicy must beEqualTo(BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second)),
c.debounceCheckpoints must beEqualTo(42.seconds)
).reduce(_ and _)
}
}
Expand All @@ -82,7 +84,8 @@ class KinesisSourceConfigSpec extends Specification {
"checkpointThrottledBackoffPolicy": {
"minBackoff": "100 millis",
"maxBackoff": "1second"
}
},
"debounceCheckpoints": "42 seconds"
}
"""

Expand All @@ -95,7 +98,8 @@ class KinesisSourceConfigSpec extends Specification {
c.retrievalMode must beEqualTo(KinesisSourceConfig.Retrieval.Polling(42)),
c.leaseDuration must beEqualTo(20.seconds),
c.maxLeasesToStealAtOneTimeFactor must beEqualTo(BigDecimal(0.42)),
c.checkpointThrottledBackoffPolicy must beEqualTo(BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second))
c.checkpointThrottledBackoffPolicy must beEqualTo(BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second)),
c.debounceCheckpoints must beEqualTo(42.seconds)
).reduce(_ and _)
}
}
Expand Down Expand Up @@ -124,7 +128,8 @@ class KinesisSourceConfigSpec extends Specification {
cloudwatchCustomEndpoint = None,
leaseDuration = 10.seconds,
maxLeasesToStealAtOneTimeFactor = BigDecimal(2.0),
checkpointThrottledBackoffPolicy = BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second)
checkpointThrottledBackoffPolicy = BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second),
debounceCheckpoints = 10.seconds
)

result.as[Wrapper] must beRight.like { case w: Wrapper =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ object PubsubSource {

def stream: Stream[F, Stream[F, Option[LowLevelEvents[Vector[Unique.Token]]]]] =
pubsubStream(config, deferredResources)

def debounceCheckpoints: FiniteDuration = Duration.Zero
}

private def pubsubStream[F[_]: Async](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*/
package com.snowplowanalytics.snowplow.sources.internal

import cats.Monad
import cats.{Monad, Semigroup}
import cats.implicits._
import cats.effect.std.Queue
import cats.effect.kernel.{Ref, Unique}
Expand Down Expand Up @@ -49,6 +49,14 @@ private[sources] trait LowLevelSource[F[_], C] {
* `SourceAndAck` reporting itself as unhealthy.
*/
def stream: Stream[F, Stream[F, Option[LowLevelEvents[C]]]]

/**
* How frequently we should checkpoint progress to this source
*
* E.g. for the Kinesis we can increase value to reduce how often we need to write to the DynamoDB
* table
*/
def debounceCheckpoints: FiniteDuration
}

private[sources] object LowLevelSource {
Expand Down Expand Up @@ -118,7 +126,7 @@ private[sources] object LowLevelSource {
.through(windowed(config.windowing))

val sinks = EagerWindows.pipes { control: EagerWindows.Control[F] =>
CleanCancellation(messageSink(processor, acksRef, source.checkpointer, control))
CleanCancellation(messageSink(processor, acksRef, source.checkpointer, control, source.debounceCheckpoints))
}

tokenedSources
Expand Down Expand Up @@ -223,20 +231,21 @@ private[sources] object LowLevelSource {
* @param control
* Controls the processing of eager windows. Prevents the next eager window from checkpointing
* any events before the previous window is fully finalized.
* @param debounceCheckpoints
* Debounces how often we call the checkpointer.
*/
private def messageSink[F[_]: Async, C](
processor: EventProcessor[F],
ref: Ref[F, AcksState[C]],
checkpointer: Checkpointer[F, C],
control: EagerWindows.Control[F]
control: EagerWindows.Control[F],
debounceCheckpoints: FiniteDuration
): Pipe[F, TokenedEvents, Nothing] =
_.evalTap { case TokenedEvents(events, _) =>
Logger[F].debug(s"Batch of ${events.size} events received from the source stream")
}
.through(processor)
.chunks
.prefetch // This prefetch means we can ack messages concurrently with processing the next batch
.evalTap(_ => control.waitForPreviousWindow)
.evalMap { chunk =>
chunk
.traverse { token =>
Expand All @@ -249,10 +258,14 @@ private[sources] object LowLevelSource {
case None => Async[F].raiseError[C](new IllegalStateException("Missing checkpoint for token"))
}
}
.flatMap { cs =>
checkpointer.ack(checkpointer.combineAll(cs.toIterable))
.map { cs =>
checkpointer.combineAll(cs.toIterable)
}
}
.prefetch // This prefetch means we can ack messages concurrently with processing the next batch
.through(batchUpCheckpoints(debounceCheckpoints, checkpointer))
.evalTap(_ => control.waitForPreviousWindow)
.evalMap(c => checkpointer.ack(c))
.drain
.onFinalizeCase {
case ExitCase.Succeeded =>
Expand Down Expand Up @@ -344,4 +357,38 @@ private[sources] object LowLevelSource {
*/
private def timeoutForFirstWindow(config: EventProcessingConfig.TimedWindows): FiniteDuration =
(config.duration.toMillis * config.firstWindowScaling).toLong.milliseconds

private def batchUpCheckpoints[F[_]: Async, C](timeout: FiniteDuration, semigroup: Semigroup[C]): Pipe[F, C, C] = {

def go(timedPull: Pull.Timed[F, C], output: Option[C]): Pull[F, C, Unit] =
timedPull.uncons.flatMap {
case None =>
// Upstream finished cleanly. Emit whatever is pending and we're done.
Pull.outputOption1(output)
case Some((Left(_), next)) =>
// Timer timed-out. Emit whatever is pending.
Pull.outputOption1(output) >> go(next, None)
case Some((Right(chunk), next)) =>
// Upstream emitted tokens to us. We might already have pending tokens
output match {
case Some(c) =>
go(next, Some(chunk.foldLeft(c)(semigroup.combine(_, _))))
case None =>
semigroup.combineAllOption(chunk.iterator) match {
case Some(c) =>
next.timeout(timeout) >> go(next, Some(c))
case None =>
go(next, None)
}
}
}

in =>
if (timeout > Duration.Zero)
in.pull.timed { timedPull =>
go(timedPull, None)
}.stream
else
in
}
}
Loading

0 comments on commit f09fb48

Please sign in to comment.