diff --git a/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala b/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala index df560b2..a4e93b1 100644 --- a/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala +++ b/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala @@ -95,7 +95,8 @@ object Utils { Some(endpoint), 10.seconds, BigDecimal(1.0), - BackoffPolicy(100.millis, 1.second) + BackoffPolicy(100.millis, 1.second), + 10.seconds ) def getKinesisSinkConfig(endpoint: URI)(streamName: String): KinesisSinkConfig = KinesisSinkConfig( diff --git a/modules/kafka/src/main/resources/reference.conf b/modules/kafka/src/main/resources/reference.conf index 81621fa..8cb016b 100644 --- a/modules/kafka/src/main/resources/reference.conf +++ b/modules/kafka/src/main/resources/reference.conf @@ -9,6 +9,7 @@ snowplow.defaults { "sasl.mechanism": "OAUTHBEARER" "sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;" } + debounceCommitOffsets: "10 seconds" } } diff --git a/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sources/kafka/KafkaSource.scala b/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sources/kafka/KafkaSource.scala index f80b7a0..9a7afbc 100644 --- a/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sources/kafka/KafkaSource.scala +++ b/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sources/kafka/KafkaSource.scala @@ -19,7 +19,7 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger import scala.reflect._ import java.nio.ByteBuffer -import scala.concurrent.duration.DurationLong +import scala.concurrent.duration.{DurationLong, FiniteDuration} // kafka import fs2.kafka._ @@ -49,6 +49,8 @@ object KafkaSource { def stream: Stream[F, Stream[F, Option[LowLevelEvents[KafkaCheckpoints[F]]]]] = kafkaStream(config, authHandlerClass) + + def debounceCheckpoints: FiniteDuration = config.debounceCommitOffsets } case class OffsetAndCommit[F[_]](offset: Long, commit: F[Unit]) diff --git a/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sources/kafka/KafkaSourceConfig.scala b/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sources/kafka/KafkaSourceConfig.scala index a86332b..85944b5 100644 --- a/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sources/kafka/KafkaSourceConfig.scala +++ b/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sources/kafka/KafkaSourceConfig.scala @@ -9,11 +9,22 @@ package com.snowplowanalytics.snowplow.sources.kafka import io.circe.Decoder import io.circe.generic.semiauto._ +import io.circe.config.syntax._ +import scala.concurrent.duration.FiniteDuration + +/** + * Config to be supplied from the app's hocon + * + * @param debounceCommitOffsets + * How frequently to commit our progress back to kafka. By increasing this value, we decrease the + * number of requests made to the kafka broker. + */ case class KafkaSourceConfig( topicName: String, bootstrapServers: String, - consumerConf: Map[String, String] + consumerConf: Map[String, String], + debounceCommitOffsets: FiniteDuration ) object KafkaSourceConfig { diff --git a/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/sources/kafka/KafkaSourceConfigSpec.scala b/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/sources/kafka/KafkaSourceConfigSpec.scala index c87a45e..5741c82 100644 --- a/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/sources/kafka/KafkaSourceConfigSpec.scala +++ b/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/sources/kafka/KafkaSourceConfigSpec.scala @@ -15,6 +15,8 @@ import io.circe.Decoder import io.circe.generic.semiauto._ import org.specs2.Specification +import scala.concurrent.duration.DurationLong + class KafkaSourceConfigSpec extends Specification { import KafkaSourceConfigSpec._ @@ -50,7 +52,8 @@ class KafkaSourceConfigSpec extends Specification { "security.protocol" -> "SASL_SSL", "sasl.mechanism" -> "OAUTHBEARER", "sasl.jaas.config" -> "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;" - ) + ), + debounceCommitOffsets = 10.seconds ) result.as[Wrapper] must beRight.like { case w: Wrapper => diff --git a/modules/kinesis/src/main/resources/reference.conf b/modules/kinesis/src/main/resources/reference.conf index d7d878c..617dd47 100644 --- a/modules/kinesis/src/main/resources/reference.conf +++ b/modules/kinesis/src/main/resources/reference.conf @@ -15,6 +15,7 @@ snowplow.defaults: { minBackoff: "100 millis" maxBackoff: "1 second" } + debounceCheckpoints: "10 seconds" } } diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala index 9f87ddf..4ceb2ee 100644 --- a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala @@ -19,7 +19,7 @@ import software.amazon.kinesis.lifecycle.events.{ProcessRecordsInput, ShardEnded import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber import java.util.concurrent.{CountDownLatch, SynchronousQueue} -import scala.concurrent.duration.DurationLong +import scala.concurrent.duration.{DurationLong, FiniteDuration} import scala.jdk.CollectionConverters._ object KinesisSource { @@ -34,6 +34,8 @@ object KinesisSource { def checkpointer: KinesisCheckpointer[F] = new KinesisCheckpointer[F](config.checkpointThrottledBackoffPolicy) + + def debounceCheckpoints: FiniteDuration = config.debounceCheckpoints } } diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfig.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfig.scala index 3d8c4c4..aa7ec31 100644 --- a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfig.scala +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfig.scala @@ -35,6 +35,9 @@ import com.snowplowanalytics.snowplow.kinesis.BackoffPolicy * up/down and pod-rotation, we want the app to be quick to acquire shard-leases to process. With * bigger instances (more cores/processors) we tend to have more shard-leases per instance, so we * increase how aggressively it acquires leases. + * @param debounceCheckpoints + * How frequently to checkpoint our progress to the DynamoDB table. By increasing this value we + * can decrease the write-throughput requirements of the DynamoDB table. * * Other params are self-explanatory */ @@ -49,7 +52,8 @@ case class KinesisSourceConfig( cloudwatchCustomEndpoint: Option[URI], leaseDuration: FiniteDuration, maxLeasesToStealAtOneTimeFactor: BigDecimal, - checkpointThrottledBackoffPolicy: BackoffPolicy + checkpointThrottledBackoffPolicy: BackoffPolicy, + debounceCheckpoints: FiniteDuration ) object KinesisSourceConfig { diff --git a/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfigSpec.scala b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfigSpec.scala index f613e44..cc64fc5 100644 --- a/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfigSpec.scala +++ b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfigSpec.scala @@ -46,7 +46,8 @@ class KinesisSourceConfigSpec extends Specification { "checkpointThrottledBackoffPolicy": { "minBackoff": "100 millis", "maxBackoff": "1second" - } + }, + "debounceCheckpoints": "42 seconds" } """ @@ -59,7 +60,8 @@ class KinesisSourceConfigSpec extends Specification { c.retrievalMode must beEqualTo(KinesisSourceConfig.Retrieval.Polling(42)), c.leaseDuration must beEqualTo(20.seconds), c.maxLeasesToStealAtOneTimeFactor must beEqualTo(BigDecimal(0.42)), - c.checkpointThrottledBackoffPolicy must beEqualTo(BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second)) + c.checkpointThrottledBackoffPolicy must beEqualTo(BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second)), + c.debounceCheckpoints must beEqualTo(42.seconds) ).reduce(_ and _) } } @@ -82,7 +84,8 @@ class KinesisSourceConfigSpec extends Specification { "checkpointThrottledBackoffPolicy": { "minBackoff": "100 millis", "maxBackoff": "1second" - } + }, + "debounceCheckpoints": "42 seconds" } """ @@ -95,7 +98,8 @@ class KinesisSourceConfigSpec extends Specification { c.retrievalMode must beEqualTo(KinesisSourceConfig.Retrieval.Polling(42)), c.leaseDuration must beEqualTo(20.seconds), c.maxLeasesToStealAtOneTimeFactor must beEqualTo(BigDecimal(0.42)), - c.checkpointThrottledBackoffPolicy must beEqualTo(BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second)) + c.checkpointThrottledBackoffPolicy must beEqualTo(BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second)), + c.debounceCheckpoints must beEqualTo(42.seconds) ).reduce(_ and _) } } @@ -124,7 +128,8 @@ class KinesisSourceConfigSpec extends Specification { cloudwatchCustomEndpoint = None, leaseDuration = 10.seconds, maxLeasesToStealAtOneTimeFactor = BigDecimal(2.0), - checkpointThrottledBackoffPolicy = BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second) + checkpointThrottledBackoffPolicy = BackoffPolicy(minBackoff = 100.millis, maxBackoff = 1.second), + debounceCheckpoints = 10.seconds ) result.as[Wrapper] must beRight.like { case w: Wrapper => diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala index 59e2b6c..8c4d9ae 100644 --- a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala @@ -64,6 +64,8 @@ object PubsubSource { def stream: Stream[F, Stream[F, Option[LowLevelEvents[Vector[Unique.Token]]]]] = pubsubStream(config, deferredResources) + + def debounceCheckpoints: FiniteDuration = Duration.Zero } private def pubsubStream[F[_]: Async]( diff --git a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala index 3ebc13a..495c520 100644 --- a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala +++ b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala @@ -7,7 +7,7 @@ */ package com.snowplowanalytics.snowplow.sources.internal -import cats.Monad +import cats.{Monad, Semigroup} import cats.implicits._ import cats.effect.std.Queue import cats.effect.kernel.{Ref, Unique} @@ -49,6 +49,14 @@ private[sources] trait LowLevelSource[F[_], C] { * `SourceAndAck` reporting itself as unhealthy. */ def stream: Stream[F, Stream[F, Option[LowLevelEvents[C]]]] + + /** + * How frequently we should checkpoint progress to this source + * + * E.g. for the Kinesis we can increase value to reduce how often we need to write to the DynamoDB + * table + */ + def debounceCheckpoints: FiniteDuration } private[sources] object LowLevelSource { @@ -118,7 +126,7 @@ private[sources] object LowLevelSource { .through(windowed(config.windowing)) val sinks = EagerWindows.pipes { control: EagerWindows.Control[F] => - CleanCancellation(messageSink(processor, acksRef, source.checkpointer, control)) + CleanCancellation(messageSink(processor, acksRef, source.checkpointer, control, source.debounceCheckpoints)) } tokenedSources @@ -223,20 +231,21 @@ private[sources] object LowLevelSource { * @param control * Controls the processing of eager windows. Prevents the next eager window from checkpointing * any events before the previous window is fully finalized. + * @param debounceCheckpoints + * Debounces how often we call the checkpointer. */ private def messageSink[F[_]: Async, C]( processor: EventProcessor[F], ref: Ref[F, AcksState[C]], checkpointer: Checkpointer[F, C], - control: EagerWindows.Control[F] + control: EagerWindows.Control[F], + debounceCheckpoints: FiniteDuration ): Pipe[F, TokenedEvents, Nothing] = _.evalTap { case TokenedEvents(events, _) => Logger[F].debug(s"Batch of ${events.size} events received from the source stream") } .through(processor) .chunks - .prefetch // This prefetch means we can ack messages concurrently with processing the next batch - .evalTap(_ => control.waitForPreviousWindow) .evalMap { chunk => chunk .traverse { token => @@ -249,10 +258,14 @@ private[sources] object LowLevelSource { case None => Async[F].raiseError[C](new IllegalStateException("Missing checkpoint for token")) } } - .flatMap { cs => - checkpointer.ack(checkpointer.combineAll(cs.toIterable)) + .map { cs => + checkpointer.combineAll(cs.toIterable) } } + .prefetch // This prefetch means we can ack messages concurrently with processing the next batch + .through(batchUpCheckpoints(debounceCheckpoints, checkpointer)) + .evalTap(_ => control.waitForPreviousWindow) + .evalMap(c => checkpointer.ack(c)) .drain .onFinalizeCase { case ExitCase.Succeeded => @@ -344,4 +357,38 @@ private[sources] object LowLevelSource { */ private def timeoutForFirstWindow(config: EventProcessingConfig.TimedWindows): FiniteDuration = (config.duration.toMillis * config.firstWindowScaling).toLong.milliseconds + + private def batchUpCheckpoints[F[_]: Async, C](timeout: FiniteDuration, semigroup: Semigroup[C]): Pipe[F, C, C] = { + + def go(timedPull: Pull.Timed[F, C], output: Option[C]): Pull[F, C, Unit] = + timedPull.uncons.flatMap { + case None => + // Upstream finished cleanly. Emit whatever is pending and we're done. + Pull.outputOption1(output) + case Some((Left(_), next)) => + // Timer timed-out. Emit whatever is pending. + Pull.outputOption1(output) >> go(next, None) + case Some((Right(chunk), next)) => + // Upstream emitted tokens to us. We might already have pending tokens + output match { + case Some(c) => + go(next, Some(chunk.foldLeft(c)(semigroup.combine(_, _)))) + case None => + semigroup.combineAllOption(chunk.iterator) match { + case Some(c) => + next.timeout(timeout) >> go(next, Some(c)) + case None => + go(next, None) + } + } + } + + in => + if (timeout > Duration.Zero) + in.pull.timed { timedPull => + go(timedPull, None) + }.stream + else + in + } } diff --git a/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala b/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala index 4b6961c..5fec4e6 100644 --- a/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala +++ b/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala @@ -34,6 +34,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { - when time between batches is longer than the time to process a batch $e1 - when time between batches is shorter than the time to process a batch $e2 not checkpoint events if the event processor throws an exception $e3 + delay checkpoints according to the debounceCheckpoints config $e4 With a processor that operates on windows of events: process and checkpoint events in timed windows $windowed1 @@ -41,6 +42,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { not checkpoint events if the event processor throws an exception $windowed3 eagerly start windows when previous window is still finalizing $windowed4 use a short first window according to the configuration $windowed5 + not delay checkpoints at the end of a window when debounceCheckpoints is large $windowed6 When reporting healthy status report healthy when there are no events but the source emits periodic liveness pings $health1 @@ -203,6 +205,59 @@ class LowLevelSourceSpec extends Specification with CatsEffect { TestControl.executeEmbed(io) } + def e4 = { + + val config = EventProcessingConfig(EventProcessingConfig.NoWindowing, _ => IO.unit) + + val testConfig = TestSourceConfig( + batchesPerRebalance = 5, + eventsPerBatch = 8, + timeBetweenBatches = 20.seconds, + timeToProcessBatch = 1.second, + debounceCheckpoints = 30.seconds + ) + + val io = for { + refActions <- Ref[IO].of(Vector.empty[Action]) + sourceAndAck <- LowLevelSource.toSourceAndAck(testLowLevelSource(refActions, testConfig)) + processor = testProcessor(refActions, testConfig) + fiber <- sourceAndAck.stream(config, processor).compile.drain.start + _ <- IO.sleep(235.seconds) + _ <- fiber.cancel + result <- refActions.get + } yield result must beEqualTo( + Vector( + Action.ProcessorStartedWindow("1970-01-01T00:00:00Z"), + Action.ProcessorReceivedEvents("1970-01-01T00:00:00Z", List("1", "2", "3", "4", "5", "6", "7", "8")), + Action.ProcessorReceivedEvents("1970-01-01T00:00:20Z", List("9", "10", "11", "12", "13", "14", "15", "16")), + Action.Checkpointed(List("1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16")), + Action.ProcessorReceivedEvents("1970-01-01T00:00:40Z", List("17", "18", "19", "20", "21", "22", "23", "24")), + Action.ProcessorReceivedEvents("1970-01-01T00:01:00Z", List("25", "26", "27", "28", "29", "30", "31", "32")), + Action.Checkpointed(List("17", "18", "19", "20", "21", "22", "23", "24", "25", "26", "27", "28", "29", "30", "31", "32")), + Action.ProcessorReceivedEvents("1970-01-01T00:01:20Z", List("33", "34", "35", "36", "37", "38", "39", "40")), + Action.ProcessorReachedEndOfWindow("1970-01-01T00:01:40Z"), + Action.Checkpointed(List("33", "34", "35", "36", "37", "38", "39", "40")), + Action.ProcessorStartedWindow("1970-01-01T00:01:40Z"), + Action.ProcessorReceivedEvents("1970-01-01T00:01:40Z", List("41", "42", "43", "44", "45", "46", "47", "48")), + Action.ProcessorReceivedEvents("1970-01-01T00:02:00Z", List("49", "50", "51", "52", "53", "54", "55", "56")), + Action.Checkpointed(List("41", "42", "43", "44", "45", "46", "47", "48", "49", "50", "51", "52", "53", "54", "55", "56")), + Action.ProcessorReceivedEvents("1970-01-01T00:02:20Z", List("57", "58", "59", "60", "61", "62", "63", "64")), + Action.ProcessorReceivedEvents("1970-01-01T00:02:40Z", List("65", "66", "67", "68", "69", "70", "71", "72")), + Action.Checkpointed(List("57", "58", "59", "60", "61", "62", "63", "64", "65", "66", "67", "68", "69", "70", "71", "72")), + Action.ProcessorReceivedEvents("1970-01-01T00:03:00Z", List("73", "74", "75", "76", "77", "78", "79", "80")), + Action.ProcessorReachedEndOfWindow("1970-01-01T00:03:20Z"), + Action.Checkpointed(List("73", "74", "75", "76", "77", "78", "79", "80")), + Action.ProcessorStartedWindow("1970-01-01T00:03:20Z"), + Action.ProcessorReceivedEvents("1970-01-01T00:03:20Z", List("81", "82", "83", "84", "85", "86", "87", "88")), + Action.ProcessorReceivedEvents("1970-01-01T00:03:40Z", List("89", "90", "91", "92", "93", "94", "95", "96")), + Action.Checkpointed(List("81", "82", "83", "84", "85", "86", "87", "88", "89", "90", "91", "92", "93", "94", "95", "96")), + Action.ProcessorReachedEndOfWindow("1970-01-01T00:03:55Z") + ) + ) + + TestControl.executeEmbed(io) + } + /** Specs for when the processor works with windows */ def windowed1 = { @@ -511,6 +566,115 @@ class LowLevelSourceSpec extends Specification with CatsEffect { TestControl.executeEmbed(io) } + def windowed6 = { + + val config = EventProcessingConfig(EventProcessingConfig.TimedWindows(45.seconds, 1.0, 2), _ => IO.unit) + + val testConfig = TestSourceConfig( + batchesPerRebalance = 5, + eventsPerBatch = 8, + timeBetweenBatches = 20.seconds, + timeToProcessBatch = 1.second, + debounceCheckpoints = 30.hours + ) + + val io = for { + refActions <- Ref[IO].of(Vector.empty[Action]) + sourceAndAck <- LowLevelSource.toSourceAndAck(testLowLevelSource(refActions, testConfig)) + processor = windowedProcessor(refActions, testConfig) + fiber <- sourceAndAck.stream(config, processor).compile.drain.start + _ <- IO.sleep(235.seconds) + _ <- fiber.cancel + result <- refActions.get + } yield result must beEqualTo( + Vector( + Action.ProcessorStartedWindow("1970-01-01T00:00:00Z"), + Action.ProcessorReceivedEvents("1970-01-01T00:00:00Z", List("1", "2", "3", "4", "5", "6", "7", "8")), + Action.ProcessorReceivedEvents("1970-01-01T00:00:20Z", List("9", "10", "11", "12", "13", "14", "15", "16")), + Action.ProcessorReceivedEvents("1970-01-01T00:00:40Z", List("17", "18", "19", "20", "21", "22", "23", "24")), + Action.ProcessorReachedEndOfWindow("1970-01-01T00:00:45Z"), + Action.Checkpointed( + List( + "1", + "2", + "3", + "4", + "5", + "6", + "7", + "8", + "9", + "10", + "11", + "12", + "13", + "14", + "15", + "16", + "17", + "18", + "19", + "20", + "21", + "22", + "23", + "24" + ) + ), + Action.ProcessorStartedWindow("1970-01-01T00:01:00Z"), + Action.ProcessorReceivedEvents("1970-01-01T00:01:00Z", List("25", "26", "27", "28", "29", "30", "31", "32")), + Action.ProcessorReceivedEvents("1970-01-01T00:01:20Z", List("33", "34", "35", "36", "37", "38", "39", "40")), + Action.ProcessorReachedEndOfWindow("1970-01-01T00:01:40Z"), + Action.Checkpointed(List("25", "26", "27", "28", "29", "30", "31", "32", "33", "34", "35", "36", "37", "38", "39", "40")), + Action.ProcessorStartedWindow("1970-01-01T00:01:40Z"), + Action.ProcessorReceivedEvents("1970-01-01T00:01:40Z", List("41", "42", "43", "44", "45", "46", "47", "48")), + Action.ProcessorReceivedEvents("1970-01-01T00:02:00Z", List("49", "50", "51", "52", "53", "54", "55", "56")), + Action.ProcessorReceivedEvents("1970-01-01T00:02:20Z", List("57", "58", "59", "60", "61", "62", "63", "64")), + Action.ProcessorReachedEndOfWindow("1970-01-01T00:02:25Z"), + Action.Checkpointed( + List( + "41", + "42", + "43", + "44", + "45", + "46", + "47", + "48", + "49", + "50", + "51", + "52", + "53", + "54", + "55", + "56", + "57", + "58", + "59", + "60", + "61", + "62", + "63", + "64" + ) + ), + Action.ProcessorStartedWindow("1970-01-01T00:02:40Z"), + Action.ProcessorReceivedEvents("1970-01-01T00:02:40Z", List("65", "66", "67", "68", "69", "70", "71", "72")), + Action.ProcessorReceivedEvents("1970-01-01T00:03:00Z", List("73", "74", "75", "76", "77", "78", "79", "80")), + Action.ProcessorReachedEndOfWindow("1970-01-01T00:03:20Z"), + Action.Checkpointed(List("65", "66", "67", "68", "69", "70", "71", "72", "73", "74", "75", "76", "77", "78", "79", "80")), + Action.ProcessorStartedWindow("1970-01-01T00:03:20Z"), + Action.ProcessorReceivedEvents("1970-01-01T00:03:20Z", List("81", "82", "83", "84", "85", "86", "87", "88")), + Action.ProcessorReceivedEvents("1970-01-01T00:03:40Z", List("89", "90", "91", "92", "93", "94", "95", "96")), + Action.ProcessorReachedEndOfWindow("1970-01-01T00:03:55Z"), + Action.Checkpointed(List("81", "82", "83", "84", "85", "86", "87", "88", "89", "90", "91", "92", "93", "94", "95", "96")) + ) + ) + + TestControl.executeEmbed(io) + } + /** Specs for health check */ def health1 = { @@ -521,6 +685,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { val lowLevelSource = new LowLevelSource[IO, Unit] { def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit) def stream: Stream[IO, Stream[IO, Option[LowLevelEvents[Unit]]]] = Stream.emit(Stream.awakeDelay[IO](1.second).map(_ => None)) + def debounceCheckpoints: FiniteDuration = 42.seconds } val io = for { @@ -544,6 +709,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { val lowLevelSource = new LowLevelSource[IO, Unit] { def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit) def stream: Stream[IO, Stream[IO, Option[LowLevelEvents[Unit]]]] = Stream.emit(Stream.never[IO]) + def debounceCheckpoints: FiniteDuration = 42.seconds } val io = for { @@ -642,6 +808,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { Stream.fixedDelay[IO](5.minutes).map { _ => Stream.emit(Some(LowLevelEvents(Chunk.empty, (), None))) } + def debounceCheckpoints: FiniteDuration = 42.seconds } val io = for { @@ -664,6 +831,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { val lowLevelSource = new LowLevelSource[IO, Unit] { def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit) def stream: Stream[IO, Stream[IO, Option[LowLevelEvents[Unit]]]] = Stream.emit(Stream.never[IO]) + def debounceCheckpoints: FiniteDuration = 42.seconds } val io = for { @@ -689,6 +857,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { val lowLevelSource = new LowLevelSource[IO, Unit] { def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit) def stream: Stream[IO, Stream[IO, Option[LowLevelEvents[Unit]]]] = Stream.emit(Stream.never[IO]) + def debounceCheckpoints: FiniteDuration = 42.seconds } val io = for { @@ -763,6 +932,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { val lowLevelSource = new LowLevelSource[IO, Unit] { def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit) def stream: Stream[IO, Stream[IO, Option[LowLevelEvents[Unit]]]] = Stream.emit(Stream.never[IO]) + def debounceCheckpoints: FiniteDuration = 42.seconds } val io = for { @@ -786,6 +956,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { val lowLevelSource = new LowLevelSource[IO, Unit] { def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit) def stream: Stream[IO, Stream[IO, Option[LowLevelEvents[Unit]]]] = Stream.emit(Stream.awakeDelay[IO](1.second).map(_ => None)) + def debounceCheckpoints: FiniteDuration = 42.seconds } val io = for { @@ -849,7 +1020,8 @@ object LowLevelSourceSpec { timeBetweenBatches: FiniteDuration, timeToProcessBatch: FiniteDuration, timeToFinalizeWindow: FiniteDuration = 0.seconds, - streamTstamp: Instant = Instant.EPOCH + streamTstamp: Instant = Instant.EPOCH, + debounceCheckpoints: FiniteDuration = 1.millis ) /** @@ -946,6 +1118,7 @@ object LowLevelSourceSpec { .mergeHaltL(Stream.awakeDelay[IO](1.second).map(_ => None).repeat) } } + def debounceCheckpoints: FiniteDuration = config.debounceCheckpoints } }