From d5f509b581a603055c1879799945868d80c5f21a Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Fri, 3 Jan 2025 09:03:17 +0000 Subject: [PATCH] PubSub Source using Unary Pull (#101) The previous implementation of the PubSub Source was a wrapper around `Subscriber` provided by the 3rd-party pubsub sdk. That `Subscriber` is a wrapper around a lower-level GRPC stub. It used the "Streaming Pull" GRPC method to fetch messages from PubSub. This commit abandons using the `Subscriber` and instead wraps the GRPC stub directly. It uses the "Unary Pull" GRPC method to fetch messages from PubSub. We found that "Unary Pull" alleviates a problem in which PubSub occasionally re-delivers the same messages, causing downstream duplicates. The problem happened especially in apps like Lake Loader, which builds up a very large number of un-acked messages and then acks them all in one go at the end of a timed window. Compared with the previous Source implementation it has these differences in behaviour: - Previously, the 3rd-party `Subscriber` managed ack extensions (a.k.a. modifying ack deadlines). Ack extension periods were adjusted dynamically according to runtime heuristics of message processing times. Whereas in this new Source, the ack extension period is a fixed configurable period. - Previously, the `Subscriber` periodically mod-acked all unacked messages currently held in memory. Whereas the new Source only mod-acks messages when they are approaching their ack deadline. This is an improvement for apps like Lake Loader which might have a very large number of outstanding unacked messages. - Unary Pull has _slightly_ worse latency compared to Streaming Pull. I consider this ok for common-streams apps, where the latency caused by the Source is negligible compared to other latencies in the app. - Arguably, "Unary Pull" is a neater fit (less hacky) for a common-streams Source. Now, we simply pull a batch when a batch is needed. --- .../pubsub/src/main/resources/reference.conf | 10 +- .../snowplow/pubsub/FutureInterop.scala | 6 +- .../sources/pubsub/PubsubBatchState.scala | 24 + .../sources/pubsub/PubsubCheckpointer.scala | 103 ++++ .../sources/pubsub/PubsubRetryOps.scala | 62 +++ .../sources/pubsub/PubsubSource.scala | 474 ++++++++---------- .../sources/pubsub/PubsubSourceConfig.scala | 47 +- .../snowplow/sources/pubsub/Utils.scala | 49 ++ .../pubsub/PubsubSourceConfigSpec.scala | 16 +- 9 files changed, 491 insertions(+), 300 deletions(-) create mode 100644 modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubBatchState.scala create mode 100644 modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubCheckpointer.scala create mode 100644 modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubRetryOps.scala create mode 100644 modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/Utils.scala diff --git a/modules/pubsub/src/main/resources/reference.conf b/modules/pubsub/src/main/resources/reference.conf index dc03ea3c..bd3a0d18 100644 --- a/modules/pubsub/src/main/resources/reference.conf +++ b/modules/pubsub/src/main/resources/reference.conf @@ -2,15 +2,13 @@ snowplow.defaults: { sources: { pubsub: { parallelPullFactor: 0.5 - bufferMaxBytes: 10000000 - maxAckExtensionPeriod: "1 hour" - minDurationPerAckExtension: "60 seconds" - maxDurationPerAckExtension: "600 seconds" + durationPerAckExtension: "60 seconds" + minRemainingAckDeadline: 0.1 + maxMessagesPerPull: 1000 + debounceRequests: "100 millis" gcpUserAgent: { productName: "Snowplow OSS" } - shutdownTimeout: "30 seconds" - maxPullsPerTransportChannel: 16 } } diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/pubsub/FutureInterop.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/pubsub/FutureInterop.scala index 434fe51d..3a9b0be9 100644 --- a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/pubsub/FutureInterop.scala +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/pubsub/FutureInterop.scala @@ -13,7 +13,7 @@ import com.google.api.core.{ApiFuture, ApiFutureCallback, ApiFutures} import com.google.common.util.concurrent.MoreExecutors object FutureInterop { - def fromFuture[F[_]: Async, A](fut: ApiFuture[A]): F[Unit] = + def fromFuture[F[_]: Async, A](fut: ApiFuture[A]): F[A] = Async[F] .async[A] { cb => val cancel = Async[F].delay { @@ -24,7 +24,9 @@ object FutureInterop { Some(cancel) } } - .void + + def fromFuture_[F[_]: Async, A](fut: ApiFuture[A]): F[Unit] = + fromFuture(fut).void private def addCallback[A](fut: ApiFuture[A], cb: Either[Throwable, A] => Unit): Unit = { val apiFutureCallback = new ApiFutureCallback[A] { diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubBatchState.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubBatchState.scala new file mode 100644 index 00000000..50af421f --- /dev/null +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubBatchState.scala @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.sources.pubsub + +import java.time.Instant + +/** + * Data held about a batch of messages pulled from a pubsub subscription + * + * @param currentDeadline + * The deadline before which we must either ack, nack, or extend the deadline to something further + * in the future. This is updated over time if we approach a deadline. + * @param ackIds + * The IDs which are needed to ack all messages in the batch + */ +private case class PubsubBatchState( + currentDeadline: Instant, + ackIds: Vector[String] +) diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubCheckpointer.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubCheckpointer.scala new file mode 100644 index 00000000..b527e6fe --- /dev/null +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubCheckpointer.scala @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.sources.pubsub + +import cats.implicits._ +import cats.effect.kernel.Unique +import cats.effect.{Async, Deferred, Ref, Sync} +import com.google.cloud.pubsub.v1.stub.SubscriberStub +import com.google.pubsub.v1.AcknowledgeRequest +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger + +import scala.jdk.CollectionConverters._ +import scala.concurrent.duration.Duration + +import com.snowplowanalytics.snowplow.sources.internal.Checkpointer +import com.snowplowanalytics.snowplow.pubsub.FutureInterop +import com.snowplowanalytics.snowplow.sources.pubsub.PubsubRetryOps.implicits._ + +/** + * The Pubsub checkpointer + * + * @param subscription + * Pubsub subscription name + * @param deferredResources + * Resources needed so we can ack/nack messages. This is wrapped in `Deferred` because the + * resources are not available until the application calls `.stream` on the `LowLevelSource`. This + * is a limitation in the design of the common-streams Source interface. + */ +class PubsubCheckpointer[F[_]: Async]( + subscription: PubsubSourceConfig.Subscription, + deferredResources: Deferred[F, PubsubCheckpointer.Resources[F]] +) extends Checkpointer[F, Vector[Unique.Token]] { + + import PubsubCheckpointer._ + + private implicit def logger: Logger[F] = Slf4jLogger.getLogger[F] + + override def combine(x: Vector[Unique.Token], y: Vector[Unique.Token]): Vector[Unique.Token] = + x |+| y + + override val empty: Vector[Unique.Token] = Vector.empty + + /** + * Ack some batches of messages received from pubsub + * + * @param c + * tokens which are keys to batch data held in the shared state + */ + override def ack(c: Vector[Unique.Token]): F[Unit] = + for { + Resources(stub, refAckIds) <- deferredResources.get + ackDatas <- refAckIds.modify(m => (m -- c, c.flatMap(m.get))) + _ <- ackDatas.flatMap(_.ackIds).grouped(1000).toVector.traverse_ { ackIds => + val request = AcknowledgeRequest.newBuilder.setSubscription(subscription.show).addAllAckIds(ackIds.asJava).build + val attempt = for { + apiFuture <- Sync[F].delay(stub.acknowledgeCallable.futureCall(request)) + _ <- FutureInterop.fromFuture[F, com.google.protobuf.Empty](apiFuture) + } yield () + attempt.retryingOnTransientGrpcFailures + .recoveringOnGrpcInvalidArgument { s => + // This can happen if ack IDs have expired before we acked + Logger[F].info(s"Ignoring error from GRPC when acking: ${s.getDescription}") + } + } + } yield () + + /** + * Nack some batches of messages received from pubsub + * + * @param c + * tokens which are keys to batch data held in the shared state + */ + override def nack(c: Vector[Unique.Token]): F[Unit] = + for { + Resources(stub, refAckIds) <- deferredResources.get + ackDatas <- refAckIds.modify(m => (m -- c, c.flatMap(m.get))) + ackIds = ackDatas.flatMap(_.ackIds) + // A nack is just a modack with zero duration + _ <- Utils.modAck[F](subscription, stub, ackIds, Duration.Zero) + } yield () +} + +private object PubsubCheckpointer { + + /** + * Resources needed by `PubsubCheckpointer` so it can ack/nack messages + * + * @param stub + * The GRPC stub needed to execute the ack/nack RPCs + * @param refState + * A map from tokens to the data held about a batch of messages received from pubsub. The map is + * wrapped in a `Ref` because it is concurrently modified by the source adding new batches to + * the state. + */ + case class Resources[F[_]](stub: SubscriberStub, refState: Ref[F, Map[Unique.Token, PubsubBatchState]]) + +} diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubRetryOps.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubRetryOps.scala new file mode 100644 index 00000000..8a546cac --- /dev/null +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubRetryOps.scala @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.sources.pubsub + +import cats.implicits._ +import cats.effect.Async +import com.google.api.gax.rpc.{ApiException, StatusCode} +import io.grpc.Status +import org.typelevel.log4cats.Logger +import retry.RetryPolicies +import retry.implicits._ + +import scala.concurrent.duration.DurationDouble + +private[pubsub] object PubsubRetryOps { + + object implicits { + implicit class Ops[F[_], A](val f: F[A]) extends AnyVal { + + def retryingOnTransientGrpcFailures(implicit F: Async[F], L: Logger[F]): F[A] = + f.retryingOnSomeErrors( + isWorthRetrying = { e => isRetryableException(e).pure[F] }, + policy = RetryPolicies.fullJitter(1.second), + onError = { case (t, _) => + Logger[F].info(t)(s"Pubsub retryable GRPC error will be retried: ${t.getMessage}") + } + ) + + def recoveringOnGrpcInvalidArgument(f2: Status => F[A])(implicit F: Async[F]): F[A] = + f.recoverWith { + case StatusFromThrowable(s) if s.getCode.equals(Status.Code.INVALID_ARGUMENT) => + f2(s) + } + } + } + + private object StatusFromThrowable { + def unapply(t: Throwable): Option[Status] = + Some(Status.fromThrowable(t)) + } + + def isRetryableException: Throwable => Boolean = { + case apiException: ApiException => + apiException.getStatusCode.getCode match { + case StatusCode.Code.DEADLINE_EXCEEDED => true + case StatusCode.Code.INTERNAL => true + case StatusCode.Code.CANCELLED => true + case StatusCode.Code.RESOURCE_EXHAUSTED => true + case StatusCode.Code.ABORTED => true + case StatusCode.Code.UNKNOWN => true + case StatusCode.Code.UNAVAILABLE => !apiException.getMessage().contains("Server shutdownNow invoked") + case _ => false + } + case _ => + false + } +} diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala index eaacea4e..743b891a 100644 --- a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala @@ -7,325 +7,257 @@ */ package com.snowplowanalytics.snowplow.sources.pubsub -import cats.effect.{Async, Resource, Sync} -import cats.effect.implicits._ +import cats.effect.{Async, Deferred, Ref, Resource, Sync} +import cats.effect.kernel.Unique import cats.implicits._ import fs2.{Chunk, Stream} -import org.typelevel.log4cats.{Logger, SelfAwareStructuredLogger} +import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger -import java.nio.ByteBuffer import java.time.Instant // pubsub -import com.google.api.core.ApiService -import com.google.api.gax.batching.FlowControlSettings -import com.google.api.gax.core.FixedExecutorProvider +import com.google.api.gax.core.{ExecutorProvider, FixedExecutorProvider} import com.google.api.gax.grpc.ChannelPoolSettings -import com.google.cloud.pubsub.v1.{AckReplyConsumer, MessageReceiver, Subscriber, SubscriptionAdminSettings} -import com.google.common.util.concurrent.{ForwardingExecutorService, ListeningExecutorService, MoreExecutors} -import com.google.pubsub.v1.{ProjectSubscriptionName, PubsubMessage} +import com.google.cloud.pubsub.v1.SubscriptionAdminSettings +import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings +import com.google.pubsub.v1.{PullRequest, PullResponse, ReceivedMessage} +import com.google.cloud.pubsub.v1.stub.{GrpcSubscriberStub, SubscriberStub} import org.threeten.bp.{Duration => ThreetenDuration} // snowplow -import com.snowplowanalytics.snowplow.pubsub.GcpUserAgent +import com.snowplowanalytics.snowplow.pubsub.{FutureInterop, GcpUserAgent} import com.snowplowanalytics.snowplow.sources.SourceAndAck import com.snowplowanalytics.snowplow.sources.internal.{Checkpointer, LowLevelEvents, LowLevelSource} +import com.snowplowanalytics.snowplow.sources.pubsub.PubsubRetryOps.implicits._ -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.{Duration, FiniteDuration} +import scala.jdk.CollectionConverters._ -import java.util.concurrent.{ - Callable, - ExecutorService, - Executors, - Phaser, - ScheduledExecutorService, - ScheduledFuture, - Semaphore, - TimeUnit, - TimeoutException -} -import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.{ExecutorService, Executors} +/** + * A common-streams `Source` that pulls messages from PubSub + * + * This Source is a wrapper around a GRPC stub. It uses the "Unary Pull" GRPC method to fetch + * events. + * + * Note that "Unary Pull" GRPC is different to the "Streaming Pull" GRPC used by the 3rd-party + * java-pubsub library. We use "Unary Pull" to avoid a problem in which PubSub occasionally + * re-delivers the same messages, causing downstream duplicates. The problem happened especially in + * apps like Lake Loader, which builds up a very large number of un-acked messages and then acks + * them all in one go at the end of a timed window. + */ object PubsubSource { - private implicit def logger[F[_]: Sync]: SelfAwareStructuredLogger[F] = Slf4jLogger.getLogger[F] + private implicit def logger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F] def build[F[_]: Async](config: PubsubSourceConfig): F[SourceAndAck[F]] = - LowLevelSource.toSourceAndAck(lowLevel(config)) - - private type PubSubCheckpointer[F[_]] = Checkpointer[F, Chunk[AckReplyConsumer]] + Deferred[F, PubsubCheckpointer.Resources[F]].flatMap { deferred => + LowLevelSource.toSourceAndAck(lowLevel(config, deferred)) + } - private def lowLevel[F[_]: Async](config: PubsubSourceConfig): LowLevelSource[F, Chunk[AckReplyConsumer]] = - new LowLevelSource[F, Chunk[AckReplyConsumer]] { - def checkpointer: PubSubCheckpointer[F] = pubsubCheckpointer + private def lowLevel[F[_]: Async]( + config: PubsubSourceConfig, + deferredResources: Deferred[F, PubsubCheckpointer.Resources[F]] + ): LowLevelSource[F, Vector[Unique.Token]] = + new LowLevelSource[F, Vector[Unique.Token]] { + def checkpointer: Checkpointer[F, Vector[Unique.Token]] = new PubsubCheckpointer(config.subscription, deferredResources) - def stream: Stream[F, Stream[F, LowLevelEvents[Chunk[AckReplyConsumer]]]] = - pubsubStream(config) + def stream: Stream[F, Stream[F, LowLevelEvents[Vector[Unique.Token]]]] = + pubsubStream(config, deferredResources) def lastLiveness: F[FiniteDuration] = Sync[F].realTime } - private def pubsubCheckpointer[F[_]: Async]: PubSubCheckpointer[F] = new PubSubCheckpointer[F] { - def combine(x: Chunk[AckReplyConsumer], y: Chunk[AckReplyConsumer]): Chunk[AckReplyConsumer] = - Chunk.Queue(x, y) + private def pubsubStream[F[_]: Async]( + config: PubsubSourceConfig, + deferredResources: Deferred[F, PubsubCheckpointer.Resources[F]] + ): Stream[F, Stream[F, LowLevelEvents[Vector[Unique.Token]]]] = + for { + parallelPullCount <- Stream.eval(Sync[F].delay(chooseNumParallelPulls(config))) + stub <- Stream.resource(stubResource(config)) + refStates <- Stream.eval(Ref[F].of(Map.empty[Unique.Token, PubsubBatchState])) + _ <- Stream.eval(deferredResources.complete(PubsubCheckpointer.Resources(stub, refStates))) + } yield Stream + .fixedRateStartImmediately(config.debounceRequests, dampen = true) + .parEvalMapUnordered(parallelPullCount)(_ => pullAndManageState(config, stub, refStates)) + .unNone + .prefetchN(parallelPullCount) + .concurrently(extendDeadlines(config, stub, refStates)) + .onFinalize(nackRefStatesForShutdown(config, stub, refStates)) - val empty: Chunk[AckReplyConsumer] = Chunk.empty - def ack(c: Chunk[AckReplyConsumer]): F[Unit] = - Sync[F].delay { - c.foreach(_.ack()) + /** + * Pulls a batch of messages from pubsub and then manages the state of the batch + * + * Managing state of the batch includes: + * + * - Extend the ack deadline, which gives us some time to process this batch. + * - Generate a unique token by which to identify this batch internally + * - Add the batch to the local "State" so that we can re-extend the ack deadline if needed + */ + private def pullAndManageState[F[_]: Async]( + config: PubsubSourceConfig, + stub: SubscriberStub, + refStates: Ref[F, Map[Unique.Token, PubsubBatchState]] + ): F[Option[LowLevelEvents[Vector[Unique.Token]]]] = + pullFromSubscription(config, stub).flatMap { response => + if (response.getReceivedMessagesCount > 0) { + val records = response.getReceivedMessagesList.asScala.toVector + val chunk = Chunk.from(records.map(_.getMessage.getData.asReadOnlyByteBuffer())) + val ackIds = records.map(_.getAckId) + Sync[F].uncancelable { _ => + for { + _ <- Logger[F].trace { + records.map(_.getMessage.getMessageId).mkString("Pubsub message IDs: ", ",", "") + } + timeReceived <- Sync[F].realTimeInstant + _ <- Utils.modAck[F](config.subscription, stub, ackIds, config.durationPerAckExtension) + token <- Unique[F].unique + currentDeadline = timeReceived.plusMillis(config.durationPerAckExtension.toMillis) + _ <- refStates.update(_ + (token -> PubsubBatchState(currentDeadline, ackIds))) + } yield Some(LowLevelEvents(chunk, Vector(token), Some(earliestTimestampOfRecords(records)))) + } + } else { + none.pure[F] } + } - def nack(c: Chunk[AckReplyConsumer]): F[Unit] = - Sync[F].delay { - c.foreach(_.nack()) - } + private def earliestTimestampOfRecords(records: Vector[ReceivedMessage]): Instant = { + val (tstampSeconds, tstampNanos) = + records.map(r => (r.getMessage.getPublishTime.getSeconds, r.getMessage.getPublishTime.getNanos)).min + Instant.ofEpochSecond(tstampSeconds, tstampNanos.toLong) } - private case class SingleMessage( - message: ByteBuffer, - ackReply: AckReplyConsumer, - tstamp: Instant - ) - /** - * The toolkit for coordinating concurrent operations between the message receiver and the fs2 - * stream. This is the interface between FS2 and non-FS2 worlds. + * "Nacks" any message that was pulled from pubsub but never consumed by the app. * - * We use pure Java (non-cats-effect) classes so we can use them in the message receiver without - * needing to use a cats-effect Dispatcher. - * - * @param queue - * A List of messages that have been provided by the Pubsub Subscriber. The FS2 stream should - * periodically drain this queue. - * @param semaphore - * A Semaphore used to manage how many bytes are being held in memory. When the queue holds too - * many bytes, the semaphore will block on acquiring more permits. This is needed because we - * have turned off FlowControl in the subscriber. - * @param phaser - * A Phaser that advances each time the queue has new messages to be consumed. The FS2 stream - * can wait on the phaser instead of repeatedly pooling the queue. - * @param errorRef - * Any error provided to us by the pubsub Subscriber. If the FS2 stream sees an error here, then - * it should raise the error. + * This is called during graceful shutdown. It allows PubSub to immediately re-deliver the + * messages to a different pod; instead of waiting for the ack deadline to expire. */ - private case class Control( - queue: AtomicReference[List[SingleMessage]], - semaphore: Semaphore, - phaser: Phaser, - errorRef: AtomicReference[Option[Throwable]] - ) - - private object Control { - def build[F[_]: Sync](config: PubsubSourceConfig): F[Control] = Sync[F].delay { - Control( - new AtomicReference(Nil), - new Semaphore(config.bufferMaxBytes, false), - new Phaser(2), - new AtomicReference(None) - ) + private def nackRefStatesForShutdown[F[_]: Async]( + config: PubsubSourceConfig, + stub: SubscriberStub, + refStates: Ref[F, Map[Unique.Token, PubsubBatchState]] + ): F[Unit] = + refStates.getAndSet(Map.empty).flatMap { m => + Utils.modAck(config.subscription, stub, m.values.flatMap(_.ackIds.toVector).toVector, Duration.Zero) } - } - private def pubsubStream[F[_]: Async](config: PubsubSourceConfig): Stream[F, Stream[F, LowLevelEvents[Chunk[AckReplyConsumer]]]] = - for { - control <- Stream.eval(Control.build(config)) - _ <- Stream.resource(runSubscriber(config, control)) - } yield consumeFromQueue(config, control) + /** + * Wrapper around the "Pull" PubSub GRPC. + * + * @return + * The PullResponse, comprising a batch of pubsub messages + */ + private def pullFromSubscription[F[_]: Async]( + config: PubsubSourceConfig, + stub: SubscriberStub + ): F[PullResponse] = { + val request = PullRequest.newBuilder + .setSubscription(config.subscription.show) + .setMaxMessages(config.maxMessagesPerPull) + .build + val io = for { + apiFuture <- Sync[F].delay(stub.pullCallable.futureCall(request)) + res <- FutureInterop.fromFuture[F, PullResponse](apiFuture) + } yield res + Logger[F].debug("Pulling from subscription") *> + io.retryingOnTransientGrpcFailures + .flatTap { response => + Logger[F].debug(s"Pulled ${response.getReceivedMessagesCount} messages") + } + } - private def consumeFromQueue[F[_]: Sync]( + /** + * Modify ack deadlines if we need more time to process the messages + * + * @param config + * The Source configuration + * @param stub + * The GRPC stub on which we can issue modack requests + * @param refStates + * A map from tokens to the data held about a batch of messages received from pubsub. This + * function must update the state if it extends a deadline. + */ + private def extendDeadlines[F[_]: Async]( config: PubsubSourceConfig, - control: Control - ): Stream[F, LowLevelEvents[Chunk[AckReplyConsumer]]] = + stub: SubscriberStub, + refStates: Ref[F, Map[Unique.Token, PubsubBatchState]] + ): Stream[F, Nothing] = Stream - .repeatEval { - Sync[F].delay(control.errorRef.get).flatMap { - case None => Sync[F].unit - case Some(throwable) => Sync[F].raiseError[Unit](throwable) + .eval(Sync[F].realTimeInstant) + .evalMap { now => + val minAllowedDeadline = now.plusMillis((config.minRemainingAckDeadline.toDouble * config.durationPerAckExtension.toMillis).toLong) + val newDeadline = now.plusMillis(config.durationPerAckExtension.toMillis) + refStates.modify { m => + val toExtend = m.filter { case (_, batchState) => + batchState.currentDeadline.isBefore(minAllowedDeadline) + } + val fixed = toExtend.view.map { case (k, v) => + k -> v.copy(currentDeadline = newDeadline) + }.toMap + (m ++ fixed, toExtend.values.toVector) } } - .evalMap { _ => - // Semantically block until message receive has written at least one message - val waitForData = Sync[F].interruptible { - control.phaser.awaitAdvanceInterruptibly(control.phaser.arrive()) - } - Sync[F].uncancelable { poll => - poll(waitForData) *> Sync[F].delay(control.queue.getAndSet(Nil)) - } - } - .filter(_.nonEmpty) // Would happen if phaser was terminated - .map { list => - val events = Chunk.iterator(list.iterator.map(_.message)) - val acks = Chunk.iterator(list.iterator.map(_.ackReply)) - val earliestTstamp = list.iterator.map(_.tstamp).min - LowLevelEvents(events, acks, Some(earliestTstamp)) - } - .evalTap { case LowLevelEvents(events, _, _) => - val numPermits = events.foldLeft(0) { case (numPermits, e) => - numPermits + permitsFor(config, e.remaining()) - } - Sync[F].delay { - control.semaphore.release(numPermits) + .evalMap { toExtend => + if (toExtend.isEmpty) + // If no message had a deadline close to expiry, then sleep for an appropriate amount of time and check again + Sync[F].sleep(0.5 * config.minRemainingAckDeadline.toDouble * config.durationPerAckExtension) + else { + val ackIds = toExtend.sortBy(_.currentDeadline).flatMap(_.ackIds) + Utils.modAck[F](config.subscription, stub, ackIds, config.durationPerAckExtension) } } + .repeat + .drain /** - * Number of semaphore permits needed to write an event to the buffer. + * Builds the "Stub" which is the object from which we can call PubSub SDK methods * - * - For small/medium events, this equals the size of the event in bytes. - * - For large events, there are not enough permits available for the event in bytes, so return - * the number of available permits. + * This implementation has some hard-coded values, which have been copied over from the equivalent + * hard-coded values in the java-pubsub client library. */ - private def permitsFor(config: PubsubSourceConfig, bytes: Int): Int = - Math.min(config.bufferMaxBytes, bytes) - - private def errorListener(phaser: Phaser, errorRef: AtomicReference[Option[Throwable]]): ApiService.Listener = - new ApiService.Listener { - override def failed(from: ApiService.State, failure: Throwable): Unit = { - errorRef.compareAndSet(None, Some(failure)) - phaser.forceTermination() - } - } - - private def runSubscriber[F[_]: Async](config: PubsubSourceConfig, control: Control): Resource[F, Unit] = - for { - direct <- executorResource(Sync[F].delay(MoreExecutors.newDirectExecutorService())) - parallelPullCount = chooseNumParallelPulls(config) - channelCount = chooseNumTransportChannels(config, parallelPullCount) - executor <- executorResource(Sync[F].delay(Executors.newScheduledThreadPool(2 * parallelPullCount))) - receiver = messageReceiver(config, control) - name = ProjectSubscriptionName.of(config.subscription.projectId, config.subscription.subscriptionId) - subscriber <- Resource.eval(Sync[F].delay { - Subscriber - .newBuilder(name, receiver) - .setMaxAckExtensionPeriod(convertDuration(config.maxAckExtensionPeriod)) - .setMaxDurationPerAckExtension(convertDuration(config.maxDurationPerAckExtension)) - .setMinDurationPerAckExtension(convertDuration(config.minDurationPerAckExtension)) - .setParallelPullCount(parallelPullCount) - .setExecutorProvider(FixedExecutorProvider.create(executorForEventCallbacks(direct, executor))) - .setSystemExecutorProvider(FixedExecutorProvider.create(executor)) - .setFlowControlSettings { - // Switch off any flow control, because we handle it ourselves with the semaphore - FlowControlSettings.getDefaultInstance - } - .setHeaderProvider(GcpUserAgent.headerProvider(config.gcpUserAgent)) - .setChannelProvider { - SubscriptionAdminSettings.defaultGrpcTransportProviderBuilder - .setMaxInboundMessageSize(20 << 20) // copies Subscriber hard-coded default - .setMaxInboundMetadataSize(20 << 20) // copies Subscriber hard-coded default - .setKeepAliveTime(ThreetenDuration.ofMinutes(5)) // copies Subscriber hard-coded default - .setChannelPoolSettings { - ChannelPoolSettings.staticallySized(channelCount) - } - .build - } - .build - }) - _ <- Resource.eval(Sync[F].delay { - subscriber.addListener(errorListener(control.phaser, control.errorRef), MoreExecutors.directExecutor) - }) - apiService <- Resource.make(Sync[F].delay(subscriber.startAsync())) { apiService => - for { - _ <- Logger[F].info("Stopping the PubSub Subscriber...") - _ <- Sync[F].delay(apiService.stopAsync()) - fiber <- drainQueue(control).start - _ <- Logger[F].info("Waiting for the PubSub Subscriber to finish cleanly...") - _ <- Sync[F] - .blocking(apiService.awaitTerminated(config.shutdownTimeout.toMillis, TimeUnit.MILLISECONDS)) - .attemptNarrow[TimeoutException] - _ <- Sync[F].delay(control.phaser.forceTermination()) - _ <- fiber.join - } yield () - } - _ <- Resource.eval(Sync[F].blocking(apiService.awaitRunning())) - } yield () - - private def drainQueue[F[_]: Async](control: Control): F[Unit] = - Async[F].untilDefinedM { - for { - _ <- Sync[F].delay(control.semaphore.release(Int.MaxValue - control.semaphore.availablePermits())) - phase <- Sync[F].blocking(control.phaser.arriveAndAwaitAdvance()) - messages <- Sync[F].delay(control.queue.getAndSet(Nil)) - _ <- pubsubCheckpointer.nack(Chunk.from(messages.map(_.ackReply))) - } yield if (phase < 0) None else Some(()) - } - - private def messageReceiver( + private def buildSubscriberStub[F[_]: Sync]( config: PubsubSourceConfig, - control: Control - ): MessageReceiver = - new MessageReceiver { - def receiveMessage(message: PubsubMessage, ackReply: AckReplyConsumer): Unit = { - val tstamp = Instant.ofEpochSecond(message.getPublishTime.getSeconds, message.getPublishTime.getNanos.toLong) - val singleMessage = SingleMessage(message.getData.asReadOnlyByteBuffer(), ackReply, tstamp) - val permitsRequired = permitsFor(config, singleMessage.message.remaining()) - control.semaphore.acquire(permitsRequired) - val previousQueue = control.queue.getAndUpdate(list => singleMessage :: list) - if (previousQueue.isEmpty) { - control.phaser.arrive() - } - () + executorProvider: ExecutorProvider + ): Resource[F, GrpcSubscriberStub] = { + val channelProvider = SubscriptionAdminSettings + .defaultGrpcTransportProviderBuilder() + .setMaxInboundMessageSize(20 << 20) + .setMaxInboundMetadataSize(20 << 20) + .setKeepAliveTime(ThreetenDuration.ofMinutes(5)) + .setChannelPoolSettings { + ChannelPoolSettings.staticallySized(1) } - } - - private def executorResource[F[_]: Sync, E <: ExecutorService](make: F[E]): Resource[F, E] = - Resource.make(make)(es => Sync[F].blocking(es.shutdown())) + .build + + val stubSettings = SubscriberStubSettings + .newBuilder() + .setBackgroundExecutorProvider(executorProvider) + .setCredentialsProvider(SubscriptionAdminSettings.defaultCredentialsProviderBuilder().build()) + .setTransportChannelProvider(channelProvider) + .setHeaderProvider(GcpUserAgent.headerProvider(config.gcpUserAgent)) + .setEndpoint(SubscriberStubSettings.getDefaultEndpoint()) + .build + + Resource.make(Sync[F].delay(GrpcSubscriberStub.create(stubSettings)))(stub => Sync[F].blocking(stub.shutdownNow)) + } /** - * The ScheduledExecutorService to be used for processing events. - * - * We execute the callback on a `DirectExecutor`, which means the underlying Subscriber runs it - * directly on its system executor. When the queue is full, this means we deliberately block the - * system exeuctor. We need to do this trick because we have disabled FlowControl. This trick is - * our own version of flow control. + * Wraps the Stub in a Resource, for managing lifecycle */ - private def executorForEventCallbacks( - directExecutor: ListeningExecutorService, - systemExecutor: ScheduledExecutorService - ): ScheduledExecutorService = - new ForwardingExecutorService with ScheduledExecutorService { - - /** - * Non-scheduled tasks (e.g. when a message is received), are run directly, without jumping to - * another thread pool - */ - override val delegate = directExecutor - - /** - * Scheduled tasks (if they exist) are scheduled on the same thread pool shared by the system - * executor. As far as I know, these schedule methods never get called. - */ - override def schedule[V]( - callable: Callable[V], - delay: Long, - unit: TimeUnit - ): ScheduledFuture[V] = - systemExecutor.schedule(callable, delay, unit) - override def schedule( - runnable: Runnable, - delay: Long, - unit: TimeUnit - ): ScheduledFuture[_] = - systemExecutor.schedule(runnable, delay, unit) - override def scheduleAtFixedRate( - runnable: Runnable, - initialDelay: Long, - period: Long, - unit: TimeUnit - ): ScheduledFuture[_] = - systemExecutor.scheduleAtFixedRate(runnable, initialDelay, period, unit) - override def scheduleWithFixedDelay( - runnable: Runnable, - initialDelay: Long, - delay: Long, - unit: TimeUnit - ): ScheduledFuture[_] = - systemExecutor.scheduleWithFixedDelay(runnable, initialDelay, delay, unit) - } + private def stubResource[F[_]: Async]( + config: PubsubSourceConfig + ): Resource[F, SubscriberStub] = + for { + executor <- executorResource(Sync[F].delay(Executors.newScheduledThreadPool(2))) + subStub <- buildSubscriberStub(config, FixedExecutorProvider.create(executor)) + } yield subStub - private def convertDuration(d: FiniteDuration): ThreetenDuration = - ThreetenDuration.ofMillis(d.toMillis) + private def executorResource[F[_]: Sync, E <: ExecutorService](make: F[E]): Resource[F, E] = + Resource.make(make)(es => Sync[F].blocking(es.shutdown())) /** * Converts `parallelPullFactor` to a suggested number of parallel pulls @@ -339,16 +271,4 @@ object PubsubSource { .setScale(0, BigDecimal.RoundingMode.UP) .toInt - /** - * Picks a sensible number of GRPC transport channels (roughly equivalent to a TCP connection) - * - * GRPC has a hard limit of 100 concurrent RPCs on a channel. And experience shows it is healthy - * to stay much under that limit. If we need to open a large number of streaming pulls then we - * might approach/exceed that limit. - */ - private def chooseNumTransportChannels(config: PubsubSourceConfig, parallelPullCount: Int): Int = - (BigDecimal(parallelPullCount) / config.maxPullsPerTransportChannel) - .setScale(0, BigDecimal.RoundingMode.UP) - .toInt - } diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfig.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfig.scala index 0b0e8719..a993ed18 100644 --- a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfig.scala +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfig.scala @@ -7,30 +7,65 @@ */ package com.snowplowanalytics.snowplow.sources.pubsub +import cats.Show import io.circe.Decoder import io.circe.generic.semiauto._ import io.circe.config.syntax._ +import com.google.pubsub.v1.ProjectSubscriptionName import scala.concurrent.duration.FiniteDuration import com.snowplowanalytics.snowplow.pubsub.GcpUserAgent +/** + * Configures the Pubsub Source + * + * @param subscription + * Identifier of the pubsub subscription + * @param parallelPullFactor + * Controls how many RPCs may be opened concurrently from the client to the PubSub server. The + * maximum number of RPCs is equal to this factor multiplied by the number of available cpu cores. + * Increasing this factor can increase the rate of events provided by the Source to the + * application. + * @param durationPerAckExtension + * Ack deadlines are extended for this duration. For common-streams apps this should be set + * slightly larger than the maximum time we expect between app receiving the message and acking + * the message. If a message is ever held by the app for longer than expected, then it's ok: the + * Source will re-extend the ack deadline. + * @param minRemainingAckDeadline + * Controls when ack deadlines are re-extended, for a message that is close to exceeding its ack + * deadline.. For example, if `durationPerAckExtension` is `60 seconds` and + * `minRemainingAckDeadline` is `0.1` then the Source will wait until there is `6 seconds` left of + * the remining deadline, before re-extending the message deadline. + * @param gcpUserAgent + * Name by which to identify Snowplow in the GRPC headers + * @param maxMessagesPerPull + * How many pubsub messages to pull from the server in a single request. + * @param debounceRequests + * Adds an artifical delay between consecutive requests to pubsub for more messages. Under some + * circumstances, this was found to slightly alleviate a problem in which pubsub might re-deliver + * the same messages multiple times. + */ case class PubsubSourceConfig( subscription: PubsubSourceConfig.Subscription, parallelPullFactor: BigDecimal, - bufferMaxBytes: Int, - maxAckExtensionPeriod: FiniteDuration, - minDurationPerAckExtension: FiniteDuration, - maxDurationPerAckExtension: FiniteDuration, + durationPerAckExtension: FiniteDuration, + minRemainingAckDeadline: BigDecimal, gcpUserAgent: GcpUserAgent, - shutdownTimeout: FiniteDuration, - maxPullsPerTransportChannel: Int + maxMessagesPerPull: Int, + debounceRequests: FiniteDuration ) object PubsubSourceConfig { case class Subscription(projectId: String, subscriptionId: String) + object Subscription { + implicit def show: Show[Subscription] = Show[Subscription] { s => + ProjectSubscriptionName.of(s.projectId, s.subscriptionId).toString + } + } + private implicit def subscriptionDecoder: Decoder[Subscription] = Decoder.decodeString .map(_.split("/")) diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/Utils.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/Utils.scala new file mode 100644 index 00000000..56d8e49d --- /dev/null +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/Utils.scala @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.sources.pubsub + +import cats.effect.{Async, Sync} +import cats.implicits._ +import org.typelevel.log4cats.Logger + +import com.google.cloud.pubsub.v1.stub.SubscriberStub +import com.google.pubsub.v1.ModifyAckDeadlineRequest +import com.snowplowanalytics.snowplow.sources.pubsub.PubsubRetryOps.implicits._ +import com.snowplowanalytics.snowplow.pubsub.FutureInterop + +import scala.concurrent.duration.FiniteDuration +import scala.jdk.CollectionConverters._ + +private[pubsub] object Utils { + + def modAck[F[_]: Async: Logger]( + subscription: PubsubSourceConfig.Subscription, + stub: SubscriberStub, + ackIds: Vector[String], + duration: FiniteDuration + ): F[Unit] = + ackIds.grouped(1000).toVector.traverse_ { group => + val request = ModifyAckDeadlineRequest.newBuilder + .setSubscription(subscription.show) + .addAllAckIds(group.asJava) + .setAckDeadlineSeconds(duration.toSeconds.toInt) + .build + val io = for { + _ <- Logger[F].debug(s"Modifying ack deadline for ${ackIds.length} messages by ${duration.toSeconds} seconds") + apiFuture <- Sync[F].delay(stub.modifyAckDeadlineCallable.futureCall(request)) + _ <- FutureInterop.fromFuture_(apiFuture) + } yield () + + io.retryingOnTransientGrpcFailures + .recoveringOnGrpcInvalidArgument { s => + // This can happen if ack IDs were acked before we modAcked + Logger[F].info(s"Ignoring error from GRPC when modifying ack IDs: ${s.getDescription}") + } + } + +} diff --git a/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfigSpec.scala b/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfigSpec.scala index 94b9c3f2..4fab8b74 100644 --- a/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfigSpec.scala +++ b/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfigSpec.scala @@ -41,15 +41,13 @@ class PubsubSourceConfigSpec extends Specification { val result = ConfigFactory.load(ConfigFactory.parseString(input)) val expected = PubsubSourceConfig( - subscription = PubsubSourceConfig.Subscription("my-project", "my-subscription"), - parallelPullFactor = BigDecimal(0.5), - bufferMaxBytes = 10000000, - maxAckExtensionPeriod = 1.hour, - minDurationPerAckExtension = 1.minute, - maxDurationPerAckExtension = 10.minutes, - gcpUserAgent = GcpUserAgent("Snowplow OSS", "example-version"), - shutdownTimeout = 30.seconds, - maxPullsPerTransportChannel = 16 + subscription = PubsubSourceConfig.Subscription("my-project", "my-subscription"), + parallelPullFactor = BigDecimal(0.5), + durationPerAckExtension = 1.minute, + minRemainingAckDeadline = BigDecimal(0.1), + gcpUserAgent = GcpUserAgent("Snowplow OSS", "example-version"), + maxMessagesPerPull = 1000, + debounceRequests = 100.millis ) result.as[Wrapper] must beRight.like { case w: Wrapper =>