Skip to content

Commit

Permalink
Sources should report stream latency of stuck events - amendments 1
Browse files Browse the repository at this point in the history
The `SourceAndAck` is now responsible for pushing the latency metric.  This
consolidates the metric across multiple apps into one place.

The `TokenedEvents` now does not contain a stream timestamp.  The downstream
app has no business knowing the stream timestamp, now that latency is
calculated inside common-streams lib.

The `LowLevelSource` no longer provides a `lastLiveness`.  Instead, the low
level source is expected to periodically emit a `None` whenever it is
healthy. The higher level `SourceAndAck` then takes responsibility for
monitoring whether the low level source is healthy.

As a result of these changes, the `SourceAndAck` emits a latency metric
of zero whenever the source is genuinely healthy but there are no events
to be pulled. This means the app's latency metric is only zero if there
are no events.  This is better than the previous situation, where the
app might mis-leadingly emit a zero latency when the app is stuck, e.g.
cannot connect to warehouse.
  • Loading branch information
istreeter committed Jan 6, 2025
1 parent 98a9d8d commit 9b1b094
Show file tree
Hide file tree
Showing 11 changed files with 286 additions and 187 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import com.snowplowanalytics.snowplow.sources.EventProcessingConfig.NoWindowing
import com.snowplowanalytics.snowplow.it.DockerPull
import com.snowplowanalytics.snowplow.it.kinesis._

import java.time.Instant

import Utils._

import org.specs2.specification.BeforeAll
Expand Down Expand Up @@ -60,23 +58,21 @@ class KinesisSourceSpec

for {
refProcessed <- Ref[IO].of[List[ReceivedEvents]](Nil)
t1 <- IO.realTimeInstant
_ <- putDataToKinesis(kinesisClient, testStream1Name, testPayload)
t2 <- IO.realTimeInstant
processingConfig = new EventProcessingConfig(NoWindowing)
refReportedLatencies <- Ref[IO].of(Vector.empty[FiniteDuration])
processingConfig = EventProcessingConfig(NoWindowing, tstamp => refReportedLatencies.update(_ :+ tstamp))
kinesisConfig = getKinesisSourceConfig(testStream1Name)
sourceAndAck <- KinesisSource.build[IO](kinesisConfig)
stream = sourceAndAck.stream(processingConfig, testProcessor(refProcessed))
fiber <- stream.compile.drain.start
_ <- IO.sleep(2.minutes)
processed <- refProcessed.get
reportedLatencies <- refReportedLatencies.get
_ <- fiber.cancel
} yield List(
processed must haveSize(1),
processed.head.events must beEqualTo(List(testPayload)),
processed.head.tstamp must beSome { tstamp: Instant =>
tstamp.toEpochMilli must beBetween(t1.toEpochMilli, t2.toEpochMilli)
}
reportedLatencies.max must beBetween(1.second, 2.minutes)
).reduce(_ and _)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@ import com.snowplowanalytics.snowplow.sinks.kinesis.KinesisSinkConfig
import java.net.URI
import java.nio.charset.StandardCharsets
import java.util.UUID
import java.time.Instant
import scala.concurrent.duration.DurationLong

import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest

object Utils {

case class ReceivedEvents(events: List[String], tstamp: Option[Instant])
case class ReceivedEvents(events: List[String])

def putDataToKinesis(
client: KinesisAsyncClient,
Expand Down Expand Up @@ -81,7 +80,7 @@ object Utils {
.build()

val out =
ReceivedEvents(client.getRecords(request).get().records().asScala.toList.map(record => new String(record.data.asByteArray())), None)
ReceivedEvents(client.getRecords(request).get().records().asScala.toList.map(record => new String(record.data.asByteArray())))
out
}

Expand All @@ -108,10 +107,10 @@ object Utils {
)

def testProcessor(ref: Ref[IO, List[ReceivedEvents]]): EventProcessor[IO] =
_.evalMap { case TokenedEvents(events, token, tstamp) =>
_.evalMap { case TokenedEvents(events, token) =>
val parsed = events.map(byteBuffer => StandardCharsets.UTF_8.decode(byteBuffer).toString)
for {
_ <- ref.update(_ :+ ReceivedEvents(parsed.toList, tstamp))
_ <- ref.update(_ :+ ReceivedEvents(parsed.toList))
} yield token
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger
import scala.reflect._

import java.nio.ByteBuffer
import java.time.Instant
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration.DurationLong

// kafka
import fs2.kafka._
Expand Down Expand Up @@ -48,11 +47,8 @@ object KafkaSource {
new LowLevelSource[F, KafkaCheckpoints[F]] {
def checkpointer: Checkpointer[F, KafkaCheckpoints[F]] = kafkaCheckpointer

def stream: Stream[F, Stream[F, LowLevelEvents[KafkaCheckpoints[F]]]] =
def stream: Stream[F, Stream[F, Option[LowLevelEvents[KafkaCheckpoints[F]]]]] =
kafkaStream(config, authHandlerClass)

def lastLiveness: F[FiniteDuration] =
Sync[F].realTime
}

case class OffsetAndCommit[F[_]](offset: Long, commit: F[Unit])
Expand All @@ -75,7 +71,7 @@ object KafkaSource {
private def kafkaStream[F[_]: Async, T <: AzureAuthenticationCallbackHandler](
config: KafkaSourceConfig,
authHandlerClass: ClassTag[T]
): Stream[F, Stream[F, LowLevelEvents[KafkaCheckpoints[F]]]] =
): Stream[F, Stream[F, Option[LowLevelEvents[KafkaCheckpoints[F]]]]] =
KafkaConsumer
.stream(consumerSettings[F, T](config, authHandlerClass))
.evalTap(_.subscribeTo(config.topicName))
Expand All @@ -89,7 +85,7 @@ object KafkaSource {

private def joinPartitions[F[_]: Async](
partitioned: PartitionedStreams[F]
): Stream[F, LowLevelEvents[KafkaCheckpoints[F]]] = {
): Stream[F, Option[LowLevelEvents[KafkaCheckpoints[F]]]] = {
val streams = partitioned.toSeq.map { case (topicPartition, stream) =>
stream.chunks
.flatMap { chunk =>
Expand All @@ -103,8 +99,8 @@ object KafkaSource {
val ts = ccr.record.timestamp
ts.logAppendTime.orElse(ts.createTime).orElse(ts.unknownTime)
}
val earliestTimestamp = if (timestamps.isEmpty) None else Some(Instant.ofEpochMilli(timestamps.min))
Stream.emit(LowLevelEvents(events, ack, earliestTimestamp))
val earliestTimestamp = if (timestamps.isEmpty) None else Some(timestamps.min.millis)
Stream.emit(Some(LowLevelEvents(events, ack, earliestTimestamp)))
case None =>
Stream.empty
}
Expand All @@ -117,6 +113,7 @@ object KafkaSource {
Stream
.emits(streams)
.parJoinUnbounded
.mergeHaltL(Stream.awakeDelay(10.seconds).map(_ => None).repeat) // keepalives
.onFinalize {
Logger[F].info(s"Stopping processing of partitions: $formatted")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*/
package com.snowplowanalytics.snowplow.sources.kinesis

import cats.effect.{Async, Ref, Sync}
import cats.effect.{Async, Sync}
import cats.data.NonEmptyList
import cats.implicits._
import com.snowplowanalytics.snowplow.sources.SourceAndAck
Expand All @@ -19,50 +19,43 @@ import software.amazon.kinesis.lifecycle.events.{ProcessRecordsInput, ShardEnded
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber

import java.util.concurrent.{CountDownLatch, SynchronousQueue}
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration.DurationLong
import scala.jdk.CollectionConverters._

object KinesisSource {

private implicit def logger[F[_]: Sync]: SelfAwareStructuredLogger[F] = Slf4jLogger.getLogger[F]

def build[F[_]: Async](config: KinesisSourceConfig): F[SourceAndAck[F]] =
Ref.ofEffect(Sync[F].realTime).flatMap { liveness =>
LowLevelSource.toSourceAndAck {
new LowLevelSource[F, Map[String, Checkpointable]] {
def stream: Stream[F, Stream[F, LowLevelEvents[Map[String, Checkpointable]]]] =
kinesisStream(config, liveness)
LowLevelSource.toSourceAndAck {
new LowLevelSource[F, Map[String, Checkpointable]] {
def stream: Stream[F, Stream[F, Option[LowLevelEvents[Map[String, Checkpointable]]]]] =
kinesisStream(config)

def checkpointer: KinesisCheckpointer[F] =
new KinesisCheckpointer[F](config.checkpointThrottledBackoffPolicy)

def lastLiveness: F[FiniteDuration] =
liveness.get
}
def checkpointer: KinesisCheckpointer[F] =
new KinesisCheckpointer[F](config.checkpointThrottledBackoffPolicy)
}
}

private def kinesisStream[F[_]: Async](
config: KinesisSourceConfig,
liveness: Ref[F, FiniteDuration]
): Stream[F, Stream[F, LowLevelEvents[Map[String, Checkpointable]]]] = {
config: KinesisSourceConfig
): Stream[F, Stream[F, Option[LowLevelEvents[Map[String, Checkpointable]]]]] = {
val actionQueue = new SynchronousQueue[KCLAction]()
for {
_ <- Stream.resource(KCLScheduler.populateQueue[F](config, actionQueue))
events <- Stream.emit(pullFromQueueAndEmit(actionQueue, liveness).stream).repeat
events <- Stream.emit(pullFromQueueAndEmit(actionQueue).stream).repeat
} yield events
}

private def pullFromQueueAndEmit[F[_]: Sync](
queue: SynchronousQueue[KCLAction],
liveness: Ref[F, FiniteDuration]
): Pull[F, LowLevelEvents[Map[String, Checkpointable]], Unit] =
Pull.eval(pullFromQueue(queue, liveness)).flatMap { case PullFromQueueResult(actions, hasShardEnd) =>
queue: SynchronousQueue[KCLAction]
): Pull[F, Option[LowLevelEvents[Map[String, Checkpointable]]], Unit] =
Pull.eval(pullFromQueue(queue)).flatMap { case PullFromQueueResult(actions, hasShardEnd) =>
val toEmit = actions.traverse {
case KCLAction.ProcessRecords(_, processRecordsInput) if processRecordsInput.records.asScala.isEmpty =>
Pull.done
Pull.output1(None)
case KCLAction.ProcessRecords(shardId, processRecordsInput) =>
Pull.output1(provideNextChunk(shardId, processRecordsInput)).covary[F]
Pull.output1(Some(provideNextChunk(shardId, processRecordsInput))).covary[F]
case KCLAction.ShardEnd(shardId, await, shardEndedInput) =>
handleShardEnd[F](shardId, await, shardEndedInput)
case KCLAction.KCLError(t) =>
Expand All @@ -78,14 +71,13 @@ object KinesisSource {
}
Pull.eval(log).covaryOutput *> toEmit *> Pull.done
} else
toEmit *> pullFromQueueAndEmit(queue, liveness)
toEmit *> pullFromQueueAndEmit(queue)
}

private case class PullFromQueueResult(actions: NonEmptyList[KCLAction], hasShardEnd: Boolean)

private def pullFromQueue[F[_]: Sync](queue: SynchronousQueue[KCLAction], liveness: Ref[F, FiniteDuration]): F[PullFromQueueResult] =
private def pullFromQueue[F[_]: Sync](queue: SynchronousQueue[KCLAction]): F[PullFromQueueResult] =
resolveNextAction(queue)
.productL(updateLiveness(liveness))
.flatMap {
case shardEnd: KCLAction.ShardEnd =>
// If we reached the end of one shard, it is likely we reached the end of other shards too.
Expand All @@ -112,9 +104,6 @@ object KinesisSource {
_ <- Sync[F].delay(queue.drainTo(ret))
} yield ret.asScala.toList

private def updateLiveness[F[_]: Sync](liveness: Ref[F, FiniteDuration]): F[Unit] =
Sync[F].realTime.flatMap(now => liveness.set(now))

private def provideNextChunk(shardId: String, input: ProcessRecordsInput) = {
val chunk = Chunk.javaList(input.records()).map(_.data())
val lastRecord = input.records.asScala.last // last is safe because we handled the empty case above
Expand All @@ -123,17 +112,21 @@ object KinesisSource {
new ExtendedSequenceNumber(lastRecord.sequenceNumber, lastRecord.subSequenceNumber),
input.checkpointer
)
LowLevelEvents(chunk, Map[String, Checkpointable](shardId -> checkpointable), Some(firstRecord.approximateArrivalTimestamp))
LowLevelEvents(
chunk,
Map[String, Checkpointable](shardId -> checkpointable),
Some(firstRecord.approximateArrivalTimestamp.toEpochMilli.millis)
)
}

private def handleShardEnd[F[_]](
shardId: String,
await: CountDownLatch,
shardEndedInput: ShardEndedInput
): Pull[F, LowLevelEvents[Map[String, Checkpointable]], Unit] = {
): Pull[F, Option[LowLevelEvents[Map[String, Checkpointable]]], Unit] = {
val checkpointable = Checkpointable.ShardEnd(shardEndedInput.checkpointer, await)
val last = LowLevelEvents(Chunk.empty, Map[String, Checkpointable](shardId -> checkpointable), None)
Pull.output1(last)
Pull.output1(Some(last))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import fs2.{Chunk, Stream}
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import java.time.Instant

// pubsub
import com.google.api.gax.core.{ExecutorProvider, FixedExecutorProvider}
import com.google.api.gax.grpc.ChannelPoolSettings
Expand All @@ -31,7 +29,7 @@ import com.snowplowanalytics.snowplow.sources.SourceAndAck
import com.snowplowanalytics.snowplow.sources.internal.{Checkpointer, LowLevelEvents, LowLevelSource}
import com.snowplowanalytics.snowplow.sources.pubsub.PubsubRetryOps.implicits._

import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.concurrent.duration.{Duration, DurationLong, FiniteDuration}
import scala.jdk.CollectionConverters._

import java.util.concurrent.{ExecutorService, Executors}
Expand Down Expand Up @@ -64,17 +62,14 @@ object PubsubSource {
new LowLevelSource[F, Vector[Unique.Token]] {
def checkpointer: Checkpointer[F, Vector[Unique.Token]] = new PubsubCheckpointer(config.subscription, deferredResources)

def stream: Stream[F, Stream[F, LowLevelEvents[Vector[Unique.Token]]]] =
def stream: Stream[F, Stream[F, Option[LowLevelEvents[Vector[Unique.Token]]]]] =
pubsubStream(config, deferredResources)

def lastLiveness: F[FiniteDuration] =
Sync[F].realTime
}

private def pubsubStream[F[_]: Async](
config: PubsubSourceConfig,
deferredResources: Deferred[F, PubsubCheckpointer.Resources[F]]
): Stream[F, Stream[F, LowLevelEvents[Vector[Unique.Token]]]] =
): Stream[F, Stream[F, Option[LowLevelEvents[Vector[Unique.Token]]]]] =
for {
parallelPullCount <- Stream.eval(Sync[F].delay(chooseNumParallelPulls(config)))
stub <- Stream.resource(stubResource(config))
Expand All @@ -83,7 +78,6 @@ object PubsubSource {
} yield Stream
.fixedRateStartImmediately(config.debounceRequests, dampen = true)
.parEvalMapUnordered(parallelPullCount)(_ => pullAndManageState(config, stub, refStates))
.unNone
.prefetchN(parallelPullCount)
.concurrently(extendDeadlines(config, stub, refStates))
.onFinalize(nackRefStatesForShutdown(config, stub, refStates))
Expand Down Expand Up @@ -124,10 +118,10 @@ object PubsubSource {
}
}

private def earliestTimestampOfRecords(records: Vector[ReceivedMessage]): Instant = {
private def earliestTimestampOfRecords(records: Vector[ReceivedMessage]): FiniteDuration = {
val (tstampSeconds, tstampNanos) =
records.map(r => (r.getMessage.getPublishTime.getSeconds, r.getMessage.getPublishTime.getNanos)).min
Instant.ofEpochSecond(tstampSeconds, tstampNanos.toLong)
tstampSeconds.seconds + tstampNanos.toLong.nanos
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ import scala.concurrent.duration.FiniteDuration
* Whether to open a new [[EventProcessor]] to handle a timed window of events (e.g. for the
* transformer) or whether to feed events to a single [[EventProcessor]] in a continuous endless
* stream (e.g. Enrich)
*
* @param latencyConsumer
* common-streams apps should use the latencyConsumer to set the latency metric.
*/
case class EventProcessingConfig(windowing: EventProcessingConfig.Windowing)
case class EventProcessingConfig[F[_]](windowing: EventProcessingConfig.Windowing, latencyConsumer: FiniteDuration => F[Unit])

object EventProcessingConfig {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ trait SourceAndAck[F[_]] {
* @return
* A stream which should be compiled and drained
*/
def stream(config: EventProcessingConfig, processor: EventProcessor[F]): Stream[F, Nothing]
def stream(config: EventProcessingConfig[F], processor: EventProcessor[F]): Stream[F, Nothing]

/**
* Reports on whether the source of events is healthy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import cats.effect.kernel.Unique
import fs2.Chunk

import java.nio.ByteBuffer
import java.time.Instant

/**
* The events as they are fed into a [[EventProcessor]]
Expand All @@ -22,12 +21,8 @@ import java.time.Instant
* The [[EventProcessor]] must emit this token after it has fully processed the batch of events.
* When the [[EventProcessor]] emits the token, it is an instruction to the [[SourceAndAck]] to
* ack/checkpoint the events.
* @param earliestSourceTstamp
* The timestamp that an event was originally written to the source stream. Used for calculating
* the latency metric.
*/
case class TokenedEvents(
events: Chunk[ByteBuffer],
ack: Unique.Token,
earliestSourceTstamp: Option[Instant]
ack: Unique.Token
)
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,19 @@ package com.snowplowanalytics.snowplow.sources.internal
import fs2.Chunk

import java.nio.ByteBuffer
import java.time.Instant
import scala.concurrent.duration.FiniteDuration

/**
* The events and checkpointable item emitted by a LowLevelSource
*
* This library uses LowLevelEvents internally, but it is never exposed to the high level event
* processor
*
* @param earliestSourceTstamp
* A point in time, represented as a `FiniteDuration` since epoch
*/
case class LowLevelEvents[C](
events: Chunk[ByteBuffer],
ack: C,
earliestSourceTstamp: Option[Instant]
earliestSourceTstamp: Option[FiniteDuration]
)
Loading

0 comments on commit 9b1b094

Please sign in to comment.