diff --git a/modules/pubsub/src/main/resources/reference.conf b/modules/pubsub/src/main/resources/reference.conf index dc03ea3c..bd3a0d18 100644 --- a/modules/pubsub/src/main/resources/reference.conf +++ b/modules/pubsub/src/main/resources/reference.conf @@ -2,15 +2,13 @@ snowplow.defaults: { sources: { pubsub: { parallelPullFactor: 0.5 - bufferMaxBytes: 10000000 - maxAckExtensionPeriod: "1 hour" - minDurationPerAckExtension: "60 seconds" - maxDurationPerAckExtension: "600 seconds" + durationPerAckExtension: "60 seconds" + minRemainingAckDeadline: 0.1 + maxMessagesPerPull: 1000 + debounceRequests: "100 millis" gcpUserAgent: { productName: "Snowplow OSS" } - shutdownTimeout: "30 seconds" - maxPullsPerTransportChannel: 16 } } diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/pubsub/FutureInterop.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/pubsub/FutureInterop.scala index 434fe51d..3a9b0be9 100644 --- a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/pubsub/FutureInterop.scala +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/pubsub/FutureInterop.scala @@ -13,7 +13,7 @@ import com.google.api.core.{ApiFuture, ApiFutureCallback, ApiFutures} import com.google.common.util.concurrent.MoreExecutors object FutureInterop { - def fromFuture[F[_]: Async, A](fut: ApiFuture[A]): F[Unit] = + def fromFuture[F[_]: Async, A](fut: ApiFuture[A]): F[A] = Async[F] .async[A] { cb => val cancel = Async[F].delay { @@ -24,7 +24,9 @@ object FutureInterop { Some(cancel) } } - .void + + def fromFuture_[F[_]: Async, A](fut: ApiFuture[A]): F[Unit] = + fromFuture(fut).void private def addCallback[A](fut: ApiFuture[A], cb: Either[Throwable, A] => Unit): Unit = { val apiFutureCallback = new ApiFutureCallback[A] { diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubBatchState.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubBatchState.scala new file mode 100644 index 00000000..50af421f --- /dev/null +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubBatchState.scala @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.sources.pubsub + +import java.time.Instant + +/** + * Data held about a batch of messages pulled from a pubsub subscription + * + * @param currentDeadline + * The deadline before which we must either ack, nack, or extend the deadline to something further + * in the future. This is updated over time if we approach a deadline. + * @param ackIds + * The IDs which are needed to ack all messages in the batch + */ +private case class PubsubBatchState( + currentDeadline: Instant, + ackIds: Vector[String] +) diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubCheckpointer.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubCheckpointer.scala new file mode 100644 index 00000000..b527e6fe --- /dev/null +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubCheckpointer.scala @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.sources.pubsub + +import cats.implicits._ +import cats.effect.kernel.Unique +import cats.effect.{Async, Deferred, Ref, Sync} +import com.google.cloud.pubsub.v1.stub.SubscriberStub +import com.google.pubsub.v1.AcknowledgeRequest +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger + +import scala.jdk.CollectionConverters._ +import scala.concurrent.duration.Duration + +import com.snowplowanalytics.snowplow.sources.internal.Checkpointer +import com.snowplowanalytics.snowplow.pubsub.FutureInterop +import com.snowplowanalytics.snowplow.sources.pubsub.PubsubRetryOps.implicits._ + +/** + * The Pubsub checkpointer + * + * @param subscription + * Pubsub subscription name + * @param deferredResources + * Resources needed so we can ack/nack messages. This is wrapped in `Deferred` because the + * resources are not available until the application calls `.stream` on the `LowLevelSource`. This + * is a limitation in the design of the common-streams Source interface. + */ +class PubsubCheckpointer[F[_]: Async]( + subscription: PubsubSourceConfig.Subscription, + deferredResources: Deferred[F, PubsubCheckpointer.Resources[F]] +) extends Checkpointer[F, Vector[Unique.Token]] { + + import PubsubCheckpointer._ + + private implicit def logger: Logger[F] = Slf4jLogger.getLogger[F] + + override def combine(x: Vector[Unique.Token], y: Vector[Unique.Token]): Vector[Unique.Token] = + x |+| y + + override val empty: Vector[Unique.Token] = Vector.empty + + /** + * Ack some batches of messages received from pubsub + * + * @param c + * tokens which are keys to batch data held in the shared state + */ + override def ack(c: Vector[Unique.Token]): F[Unit] = + for { + Resources(stub, refAckIds) <- deferredResources.get + ackDatas <- refAckIds.modify(m => (m -- c, c.flatMap(m.get))) + _ <- ackDatas.flatMap(_.ackIds).grouped(1000).toVector.traverse_ { ackIds => + val request = AcknowledgeRequest.newBuilder.setSubscription(subscription.show).addAllAckIds(ackIds.asJava).build + val attempt = for { + apiFuture <- Sync[F].delay(stub.acknowledgeCallable.futureCall(request)) + _ <- FutureInterop.fromFuture[F, com.google.protobuf.Empty](apiFuture) + } yield () + attempt.retryingOnTransientGrpcFailures + .recoveringOnGrpcInvalidArgument { s => + // This can happen if ack IDs have expired before we acked + Logger[F].info(s"Ignoring error from GRPC when acking: ${s.getDescription}") + } + } + } yield () + + /** + * Nack some batches of messages received from pubsub + * + * @param c + * tokens which are keys to batch data held in the shared state + */ + override def nack(c: Vector[Unique.Token]): F[Unit] = + for { + Resources(stub, refAckIds) <- deferredResources.get + ackDatas <- refAckIds.modify(m => (m -- c, c.flatMap(m.get))) + ackIds = ackDatas.flatMap(_.ackIds) + // A nack is just a modack with zero duration + _ <- Utils.modAck[F](subscription, stub, ackIds, Duration.Zero) + } yield () +} + +private object PubsubCheckpointer { + + /** + * Resources needed by `PubsubCheckpointer` so it can ack/nack messages + * + * @param stub + * The GRPC stub needed to execute the ack/nack RPCs + * @param refState + * A map from tokens to the data held about a batch of messages received from pubsub. The map is + * wrapped in a `Ref` because it is concurrently modified by the source adding new batches to + * the state. + */ + case class Resources[F[_]](stub: SubscriberStub, refState: Ref[F, Map[Unique.Token, PubsubBatchState]]) + +} diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubRetryOps.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubRetryOps.scala new file mode 100644 index 00000000..8a546cac --- /dev/null +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubRetryOps.scala @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.sources.pubsub + +import cats.implicits._ +import cats.effect.Async +import com.google.api.gax.rpc.{ApiException, StatusCode} +import io.grpc.Status +import org.typelevel.log4cats.Logger +import retry.RetryPolicies +import retry.implicits._ + +import scala.concurrent.duration.DurationDouble + +private[pubsub] object PubsubRetryOps { + + object implicits { + implicit class Ops[F[_], A](val f: F[A]) extends AnyVal { + + def retryingOnTransientGrpcFailures(implicit F: Async[F], L: Logger[F]): F[A] = + f.retryingOnSomeErrors( + isWorthRetrying = { e => isRetryableException(e).pure[F] }, + policy = RetryPolicies.fullJitter(1.second), + onError = { case (t, _) => + Logger[F].info(t)(s"Pubsub retryable GRPC error will be retried: ${t.getMessage}") + } + ) + + def recoveringOnGrpcInvalidArgument(f2: Status => F[A])(implicit F: Async[F]): F[A] = + f.recoverWith { + case StatusFromThrowable(s) if s.getCode.equals(Status.Code.INVALID_ARGUMENT) => + f2(s) + } + } + } + + private object StatusFromThrowable { + def unapply(t: Throwable): Option[Status] = + Some(Status.fromThrowable(t)) + } + + def isRetryableException: Throwable => Boolean = { + case apiException: ApiException => + apiException.getStatusCode.getCode match { + case StatusCode.Code.DEADLINE_EXCEEDED => true + case StatusCode.Code.INTERNAL => true + case StatusCode.Code.CANCELLED => true + case StatusCode.Code.RESOURCE_EXHAUSTED => true + case StatusCode.Code.ABORTED => true + case StatusCode.Code.UNKNOWN => true + case StatusCode.Code.UNAVAILABLE => !apiException.getMessage().contains("Server shutdownNow invoked") + case _ => false + } + case _ => + false + } +} diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala index eaacea4e..743b891a 100644 --- a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala @@ -7,325 +7,257 @@ */ package com.snowplowanalytics.snowplow.sources.pubsub -import cats.effect.{Async, Resource, Sync} -import cats.effect.implicits._ +import cats.effect.{Async, Deferred, Ref, Resource, Sync} +import cats.effect.kernel.Unique import cats.implicits._ import fs2.{Chunk, Stream} -import org.typelevel.log4cats.{Logger, SelfAwareStructuredLogger} +import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger -import java.nio.ByteBuffer import java.time.Instant // pubsub -import com.google.api.core.ApiService -import com.google.api.gax.batching.FlowControlSettings -import com.google.api.gax.core.FixedExecutorProvider +import com.google.api.gax.core.{ExecutorProvider, FixedExecutorProvider} import com.google.api.gax.grpc.ChannelPoolSettings -import com.google.cloud.pubsub.v1.{AckReplyConsumer, MessageReceiver, Subscriber, SubscriptionAdminSettings} -import com.google.common.util.concurrent.{ForwardingExecutorService, ListeningExecutorService, MoreExecutors} -import com.google.pubsub.v1.{ProjectSubscriptionName, PubsubMessage} +import com.google.cloud.pubsub.v1.SubscriptionAdminSettings +import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings +import com.google.pubsub.v1.{PullRequest, PullResponse, ReceivedMessage} +import com.google.cloud.pubsub.v1.stub.{GrpcSubscriberStub, SubscriberStub} import org.threeten.bp.{Duration => ThreetenDuration} // snowplow -import com.snowplowanalytics.snowplow.pubsub.GcpUserAgent +import com.snowplowanalytics.snowplow.pubsub.{FutureInterop, GcpUserAgent} import com.snowplowanalytics.snowplow.sources.SourceAndAck import com.snowplowanalytics.snowplow.sources.internal.{Checkpointer, LowLevelEvents, LowLevelSource} +import com.snowplowanalytics.snowplow.sources.pubsub.PubsubRetryOps.implicits._ -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.{Duration, FiniteDuration} +import scala.jdk.CollectionConverters._ -import java.util.concurrent.{ - Callable, - ExecutorService, - Executors, - Phaser, - ScheduledExecutorService, - ScheduledFuture, - Semaphore, - TimeUnit, - TimeoutException -} -import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.{ExecutorService, Executors} +/** + * A common-streams `Source` that pulls messages from PubSub + * + * This Source is a wrapper around a GRPC stub. It uses the "Unary Pull" GRPC method to fetch + * events. + * + * Note that "Unary Pull" GRPC is different to the "Streaming Pull" GRPC used by the 3rd-party + * java-pubsub library. We use "Unary Pull" to avoid a problem in which PubSub occasionally + * re-delivers the same messages, causing downstream duplicates. The problem happened especially in + * apps like Lake Loader, which builds up a very large number of un-acked messages and then acks + * them all in one go at the end of a timed window. + */ object PubsubSource { - private implicit def logger[F[_]: Sync]: SelfAwareStructuredLogger[F] = Slf4jLogger.getLogger[F] + private implicit def logger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F] def build[F[_]: Async](config: PubsubSourceConfig): F[SourceAndAck[F]] = - LowLevelSource.toSourceAndAck(lowLevel(config)) - - private type PubSubCheckpointer[F[_]] = Checkpointer[F, Chunk[AckReplyConsumer]] + Deferred[F, PubsubCheckpointer.Resources[F]].flatMap { deferred => + LowLevelSource.toSourceAndAck(lowLevel(config, deferred)) + } - private def lowLevel[F[_]: Async](config: PubsubSourceConfig): LowLevelSource[F, Chunk[AckReplyConsumer]] = - new LowLevelSource[F, Chunk[AckReplyConsumer]] { - def checkpointer: PubSubCheckpointer[F] = pubsubCheckpointer + private def lowLevel[F[_]: Async]( + config: PubsubSourceConfig, + deferredResources: Deferred[F, PubsubCheckpointer.Resources[F]] + ): LowLevelSource[F, Vector[Unique.Token]] = + new LowLevelSource[F, Vector[Unique.Token]] { + def checkpointer: Checkpointer[F, Vector[Unique.Token]] = new PubsubCheckpointer(config.subscription, deferredResources) - def stream: Stream[F, Stream[F, LowLevelEvents[Chunk[AckReplyConsumer]]]] = - pubsubStream(config) + def stream: Stream[F, Stream[F, LowLevelEvents[Vector[Unique.Token]]]] = + pubsubStream(config, deferredResources) def lastLiveness: F[FiniteDuration] = Sync[F].realTime } - private def pubsubCheckpointer[F[_]: Async]: PubSubCheckpointer[F] = new PubSubCheckpointer[F] { - def combine(x: Chunk[AckReplyConsumer], y: Chunk[AckReplyConsumer]): Chunk[AckReplyConsumer] = - Chunk.Queue(x, y) + private def pubsubStream[F[_]: Async]( + config: PubsubSourceConfig, + deferredResources: Deferred[F, PubsubCheckpointer.Resources[F]] + ): Stream[F, Stream[F, LowLevelEvents[Vector[Unique.Token]]]] = + for { + parallelPullCount <- Stream.eval(Sync[F].delay(chooseNumParallelPulls(config))) + stub <- Stream.resource(stubResource(config)) + refStates <- Stream.eval(Ref[F].of(Map.empty[Unique.Token, PubsubBatchState])) + _ <- Stream.eval(deferredResources.complete(PubsubCheckpointer.Resources(stub, refStates))) + } yield Stream + .fixedRateStartImmediately(config.debounceRequests, dampen = true) + .parEvalMapUnordered(parallelPullCount)(_ => pullAndManageState(config, stub, refStates)) + .unNone + .prefetchN(parallelPullCount) + .concurrently(extendDeadlines(config, stub, refStates)) + .onFinalize(nackRefStatesForShutdown(config, stub, refStates)) - val empty: Chunk[AckReplyConsumer] = Chunk.empty - def ack(c: Chunk[AckReplyConsumer]): F[Unit] = - Sync[F].delay { - c.foreach(_.ack()) + /** + * Pulls a batch of messages from pubsub and then manages the state of the batch + * + * Managing state of the batch includes: + * + * - Extend the ack deadline, which gives us some time to process this batch. + * - Generate a unique token by which to identify this batch internally + * - Add the batch to the local "State" so that we can re-extend the ack deadline if needed + */ + private def pullAndManageState[F[_]: Async]( + config: PubsubSourceConfig, + stub: SubscriberStub, + refStates: Ref[F, Map[Unique.Token, PubsubBatchState]] + ): F[Option[LowLevelEvents[Vector[Unique.Token]]]] = + pullFromSubscription(config, stub).flatMap { response => + if (response.getReceivedMessagesCount > 0) { + val records = response.getReceivedMessagesList.asScala.toVector + val chunk = Chunk.from(records.map(_.getMessage.getData.asReadOnlyByteBuffer())) + val ackIds = records.map(_.getAckId) + Sync[F].uncancelable { _ => + for { + _ <- Logger[F].trace { + records.map(_.getMessage.getMessageId).mkString("Pubsub message IDs: ", ",", "") + } + timeReceived <- Sync[F].realTimeInstant + _ <- Utils.modAck[F](config.subscription, stub, ackIds, config.durationPerAckExtension) + token <- Unique[F].unique + currentDeadline = timeReceived.plusMillis(config.durationPerAckExtension.toMillis) + _ <- refStates.update(_ + (token -> PubsubBatchState(currentDeadline, ackIds))) + } yield Some(LowLevelEvents(chunk, Vector(token), Some(earliestTimestampOfRecords(records)))) + } + } else { + none.pure[F] } + } - def nack(c: Chunk[AckReplyConsumer]): F[Unit] = - Sync[F].delay { - c.foreach(_.nack()) - } + private def earliestTimestampOfRecords(records: Vector[ReceivedMessage]): Instant = { + val (tstampSeconds, tstampNanos) = + records.map(r => (r.getMessage.getPublishTime.getSeconds, r.getMessage.getPublishTime.getNanos)).min + Instant.ofEpochSecond(tstampSeconds, tstampNanos.toLong) } - private case class SingleMessage( - message: ByteBuffer, - ackReply: AckReplyConsumer, - tstamp: Instant - ) - /** - * The toolkit for coordinating concurrent operations between the message receiver and the fs2 - * stream. This is the interface between FS2 and non-FS2 worlds. + * "Nacks" any message that was pulled from pubsub but never consumed by the app. * - * We use pure Java (non-cats-effect) classes so we can use them in the message receiver without - * needing to use a cats-effect Dispatcher. - * - * @param queue - * A List of messages that have been provided by the Pubsub Subscriber. The FS2 stream should - * periodically drain this queue. - * @param semaphore - * A Semaphore used to manage how many bytes are being held in memory. When the queue holds too - * many bytes, the semaphore will block on acquiring more permits. This is needed because we - * have turned off FlowControl in the subscriber. - * @param phaser - * A Phaser that advances each time the queue has new messages to be consumed. The FS2 stream - * can wait on the phaser instead of repeatedly pooling the queue. - * @param errorRef - * Any error provided to us by the pubsub Subscriber. If the FS2 stream sees an error here, then - * it should raise the error. + * This is called during graceful shutdown. It allows PubSub to immediately re-deliver the + * messages to a different pod; instead of waiting for the ack deadline to expire. */ - private case class Control( - queue: AtomicReference[List[SingleMessage]], - semaphore: Semaphore, - phaser: Phaser, - errorRef: AtomicReference[Option[Throwable]] - ) - - private object Control { - def build[F[_]: Sync](config: PubsubSourceConfig): F[Control] = Sync[F].delay { - Control( - new AtomicReference(Nil), - new Semaphore(config.bufferMaxBytes, false), - new Phaser(2), - new AtomicReference(None) - ) + private def nackRefStatesForShutdown[F[_]: Async]( + config: PubsubSourceConfig, + stub: SubscriberStub, + refStates: Ref[F, Map[Unique.Token, PubsubBatchState]] + ): F[Unit] = + refStates.getAndSet(Map.empty).flatMap { m => + Utils.modAck(config.subscription, stub, m.values.flatMap(_.ackIds.toVector).toVector, Duration.Zero) } - } - private def pubsubStream[F[_]: Async](config: PubsubSourceConfig): Stream[F, Stream[F, LowLevelEvents[Chunk[AckReplyConsumer]]]] = - for { - control <- Stream.eval(Control.build(config)) - _ <- Stream.resource(runSubscriber(config, control)) - } yield consumeFromQueue(config, control) + /** + * Wrapper around the "Pull" PubSub GRPC. + * + * @return + * The PullResponse, comprising a batch of pubsub messages + */ + private def pullFromSubscription[F[_]: Async]( + config: PubsubSourceConfig, + stub: SubscriberStub + ): F[PullResponse] = { + val request = PullRequest.newBuilder + .setSubscription(config.subscription.show) + .setMaxMessages(config.maxMessagesPerPull) + .build + val io = for { + apiFuture <- Sync[F].delay(stub.pullCallable.futureCall(request)) + res <- FutureInterop.fromFuture[F, PullResponse](apiFuture) + } yield res + Logger[F].debug("Pulling from subscription") *> + io.retryingOnTransientGrpcFailures + .flatTap { response => + Logger[F].debug(s"Pulled ${response.getReceivedMessagesCount} messages") + } + } - private def consumeFromQueue[F[_]: Sync]( + /** + * Modify ack deadlines if we need more time to process the messages + * + * @param config + * The Source configuration + * @param stub + * The GRPC stub on which we can issue modack requests + * @param refStates + * A map from tokens to the data held about a batch of messages received from pubsub. This + * function must update the state if it extends a deadline. + */ + private def extendDeadlines[F[_]: Async]( config: PubsubSourceConfig, - control: Control - ): Stream[F, LowLevelEvents[Chunk[AckReplyConsumer]]] = + stub: SubscriberStub, + refStates: Ref[F, Map[Unique.Token, PubsubBatchState]] + ): Stream[F, Nothing] = Stream - .repeatEval { - Sync[F].delay(control.errorRef.get).flatMap { - case None => Sync[F].unit - case Some(throwable) => Sync[F].raiseError[Unit](throwable) + .eval(Sync[F].realTimeInstant) + .evalMap { now => + val minAllowedDeadline = now.plusMillis((config.minRemainingAckDeadline.toDouble * config.durationPerAckExtension.toMillis).toLong) + val newDeadline = now.plusMillis(config.durationPerAckExtension.toMillis) + refStates.modify { m => + val toExtend = m.filter { case (_, batchState) => + batchState.currentDeadline.isBefore(minAllowedDeadline) + } + val fixed = toExtend.view.map { case (k, v) => + k -> v.copy(currentDeadline = newDeadline) + }.toMap + (m ++ fixed, toExtend.values.toVector) } } - .evalMap { _ => - // Semantically block until message receive has written at least one message - val waitForData = Sync[F].interruptible { - control.phaser.awaitAdvanceInterruptibly(control.phaser.arrive()) - } - Sync[F].uncancelable { poll => - poll(waitForData) *> Sync[F].delay(control.queue.getAndSet(Nil)) - } - } - .filter(_.nonEmpty) // Would happen if phaser was terminated - .map { list => - val events = Chunk.iterator(list.iterator.map(_.message)) - val acks = Chunk.iterator(list.iterator.map(_.ackReply)) - val earliestTstamp = list.iterator.map(_.tstamp).min - LowLevelEvents(events, acks, Some(earliestTstamp)) - } - .evalTap { case LowLevelEvents(events, _, _) => - val numPermits = events.foldLeft(0) { case (numPermits, e) => - numPermits + permitsFor(config, e.remaining()) - } - Sync[F].delay { - control.semaphore.release(numPermits) + .evalMap { toExtend => + if (toExtend.isEmpty) + // If no message had a deadline close to expiry, then sleep for an appropriate amount of time and check again + Sync[F].sleep(0.5 * config.minRemainingAckDeadline.toDouble * config.durationPerAckExtension) + else { + val ackIds = toExtend.sortBy(_.currentDeadline).flatMap(_.ackIds) + Utils.modAck[F](config.subscription, stub, ackIds, config.durationPerAckExtension) } } + .repeat + .drain /** - * Number of semaphore permits needed to write an event to the buffer. + * Builds the "Stub" which is the object from which we can call PubSub SDK methods * - * - For small/medium events, this equals the size of the event in bytes. - * - For large events, there are not enough permits available for the event in bytes, so return - * the number of available permits. + * This implementation has some hard-coded values, which have been copied over from the equivalent + * hard-coded values in the java-pubsub client library. */ - private def permitsFor(config: PubsubSourceConfig, bytes: Int): Int = - Math.min(config.bufferMaxBytes, bytes) - - private def errorListener(phaser: Phaser, errorRef: AtomicReference[Option[Throwable]]): ApiService.Listener = - new ApiService.Listener { - override def failed(from: ApiService.State, failure: Throwable): Unit = { - errorRef.compareAndSet(None, Some(failure)) - phaser.forceTermination() - } - } - - private def runSubscriber[F[_]: Async](config: PubsubSourceConfig, control: Control): Resource[F, Unit] = - for { - direct <- executorResource(Sync[F].delay(MoreExecutors.newDirectExecutorService())) - parallelPullCount = chooseNumParallelPulls(config) - channelCount = chooseNumTransportChannels(config, parallelPullCount) - executor <- executorResource(Sync[F].delay(Executors.newScheduledThreadPool(2 * parallelPullCount))) - receiver = messageReceiver(config, control) - name = ProjectSubscriptionName.of(config.subscription.projectId, config.subscription.subscriptionId) - subscriber <- Resource.eval(Sync[F].delay { - Subscriber - .newBuilder(name, receiver) - .setMaxAckExtensionPeriod(convertDuration(config.maxAckExtensionPeriod)) - .setMaxDurationPerAckExtension(convertDuration(config.maxDurationPerAckExtension)) - .setMinDurationPerAckExtension(convertDuration(config.minDurationPerAckExtension)) - .setParallelPullCount(parallelPullCount) - .setExecutorProvider(FixedExecutorProvider.create(executorForEventCallbacks(direct, executor))) - .setSystemExecutorProvider(FixedExecutorProvider.create(executor)) - .setFlowControlSettings { - // Switch off any flow control, because we handle it ourselves with the semaphore - FlowControlSettings.getDefaultInstance - } - .setHeaderProvider(GcpUserAgent.headerProvider(config.gcpUserAgent)) - .setChannelProvider { - SubscriptionAdminSettings.defaultGrpcTransportProviderBuilder - .setMaxInboundMessageSize(20 << 20) // copies Subscriber hard-coded default - .setMaxInboundMetadataSize(20 << 20) // copies Subscriber hard-coded default - .setKeepAliveTime(ThreetenDuration.ofMinutes(5)) // copies Subscriber hard-coded default - .setChannelPoolSettings { - ChannelPoolSettings.staticallySized(channelCount) - } - .build - } - .build - }) - _ <- Resource.eval(Sync[F].delay { - subscriber.addListener(errorListener(control.phaser, control.errorRef), MoreExecutors.directExecutor) - }) - apiService <- Resource.make(Sync[F].delay(subscriber.startAsync())) { apiService => - for { - _ <- Logger[F].info("Stopping the PubSub Subscriber...") - _ <- Sync[F].delay(apiService.stopAsync()) - fiber <- drainQueue(control).start - _ <- Logger[F].info("Waiting for the PubSub Subscriber to finish cleanly...") - _ <- Sync[F] - .blocking(apiService.awaitTerminated(config.shutdownTimeout.toMillis, TimeUnit.MILLISECONDS)) - .attemptNarrow[TimeoutException] - _ <- Sync[F].delay(control.phaser.forceTermination()) - _ <- fiber.join - } yield () - } - _ <- Resource.eval(Sync[F].blocking(apiService.awaitRunning())) - } yield () - - private def drainQueue[F[_]: Async](control: Control): F[Unit] = - Async[F].untilDefinedM { - for { - _ <- Sync[F].delay(control.semaphore.release(Int.MaxValue - control.semaphore.availablePermits())) - phase <- Sync[F].blocking(control.phaser.arriveAndAwaitAdvance()) - messages <- Sync[F].delay(control.queue.getAndSet(Nil)) - _ <- pubsubCheckpointer.nack(Chunk.from(messages.map(_.ackReply))) - } yield if (phase < 0) None else Some(()) - } - - private def messageReceiver( + private def buildSubscriberStub[F[_]: Sync]( config: PubsubSourceConfig, - control: Control - ): MessageReceiver = - new MessageReceiver { - def receiveMessage(message: PubsubMessage, ackReply: AckReplyConsumer): Unit = { - val tstamp = Instant.ofEpochSecond(message.getPublishTime.getSeconds, message.getPublishTime.getNanos.toLong) - val singleMessage = SingleMessage(message.getData.asReadOnlyByteBuffer(), ackReply, tstamp) - val permitsRequired = permitsFor(config, singleMessage.message.remaining()) - control.semaphore.acquire(permitsRequired) - val previousQueue = control.queue.getAndUpdate(list => singleMessage :: list) - if (previousQueue.isEmpty) { - control.phaser.arrive() - } - () + executorProvider: ExecutorProvider + ): Resource[F, GrpcSubscriberStub] = { + val channelProvider = SubscriptionAdminSettings + .defaultGrpcTransportProviderBuilder() + .setMaxInboundMessageSize(20 << 20) + .setMaxInboundMetadataSize(20 << 20) + .setKeepAliveTime(ThreetenDuration.ofMinutes(5)) + .setChannelPoolSettings { + ChannelPoolSettings.staticallySized(1) } - } - - private def executorResource[F[_]: Sync, E <: ExecutorService](make: F[E]): Resource[F, E] = - Resource.make(make)(es => Sync[F].blocking(es.shutdown())) + .build + + val stubSettings = SubscriberStubSettings + .newBuilder() + .setBackgroundExecutorProvider(executorProvider) + .setCredentialsProvider(SubscriptionAdminSettings.defaultCredentialsProviderBuilder().build()) + .setTransportChannelProvider(channelProvider) + .setHeaderProvider(GcpUserAgent.headerProvider(config.gcpUserAgent)) + .setEndpoint(SubscriberStubSettings.getDefaultEndpoint()) + .build + + Resource.make(Sync[F].delay(GrpcSubscriberStub.create(stubSettings)))(stub => Sync[F].blocking(stub.shutdownNow)) + } /** - * The ScheduledExecutorService to be used for processing events. - * - * We execute the callback on a `DirectExecutor`, which means the underlying Subscriber runs it - * directly on its system executor. When the queue is full, this means we deliberately block the - * system exeuctor. We need to do this trick because we have disabled FlowControl. This trick is - * our own version of flow control. + * Wraps the Stub in a Resource, for managing lifecycle */ - private def executorForEventCallbacks( - directExecutor: ListeningExecutorService, - systemExecutor: ScheduledExecutorService - ): ScheduledExecutorService = - new ForwardingExecutorService with ScheduledExecutorService { - - /** - * Non-scheduled tasks (e.g. when a message is received), are run directly, without jumping to - * another thread pool - */ - override val delegate = directExecutor - - /** - * Scheduled tasks (if they exist) are scheduled on the same thread pool shared by the system - * executor. As far as I know, these schedule methods never get called. - */ - override def schedule[V]( - callable: Callable[V], - delay: Long, - unit: TimeUnit - ): ScheduledFuture[V] = - systemExecutor.schedule(callable, delay, unit) - override def schedule( - runnable: Runnable, - delay: Long, - unit: TimeUnit - ): ScheduledFuture[_] = - systemExecutor.schedule(runnable, delay, unit) - override def scheduleAtFixedRate( - runnable: Runnable, - initialDelay: Long, - period: Long, - unit: TimeUnit - ): ScheduledFuture[_] = - systemExecutor.scheduleAtFixedRate(runnable, initialDelay, period, unit) - override def scheduleWithFixedDelay( - runnable: Runnable, - initialDelay: Long, - delay: Long, - unit: TimeUnit - ): ScheduledFuture[_] = - systemExecutor.scheduleWithFixedDelay(runnable, initialDelay, delay, unit) - } + private def stubResource[F[_]: Async]( + config: PubsubSourceConfig + ): Resource[F, SubscriberStub] = + for { + executor <- executorResource(Sync[F].delay(Executors.newScheduledThreadPool(2))) + subStub <- buildSubscriberStub(config, FixedExecutorProvider.create(executor)) + } yield subStub - private def convertDuration(d: FiniteDuration): ThreetenDuration = - ThreetenDuration.ofMillis(d.toMillis) + private def executorResource[F[_]: Sync, E <: ExecutorService](make: F[E]): Resource[F, E] = + Resource.make(make)(es => Sync[F].blocking(es.shutdown())) /** * Converts `parallelPullFactor` to a suggested number of parallel pulls @@ -339,16 +271,4 @@ object PubsubSource { .setScale(0, BigDecimal.RoundingMode.UP) .toInt - /** - * Picks a sensible number of GRPC transport channels (roughly equivalent to a TCP connection) - * - * GRPC has a hard limit of 100 concurrent RPCs on a channel. And experience shows it is healthy - * to stay much under that limit. If we need to open a large number of streaming pulls then we - * might approach/exceed that limit. - */ - private def chooseNumTransportChannels(config: PubsubSourceConfig, parallelPullCount: Int): Int = - (BigDecimal(parallelPullCount) / config.maxPullsPerTransportChannel) - .setScale(0, BigDecimal.RoundingMode.UP) - .toInt - } diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfig.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfig.scala index 0b0e8719..a993ed18 100644 --- a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfig.scala +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfig.scala @@ -7,30 +7,65 @@ */ package com.snowplowanalytics.snowplow.sources.pubsub +import cats.Show import io.circe.Decoder import io.circe.generic.semiauto._ import io.circe.config.syntax._ +import com.google.pubsub.v1.ProjectSubscriptionName import scala.concurrent.duration.FiniteDuration import com.snowplowanalytics.snowplow.pubsub.GcpUserAgent +/** + * Configures the Pubsub Source + * + * @param subscription + * Identifier of the pubsub subscription + * @param parallelPullFactor + * Controls how many RPCs may be opened concurrently from the client to the PubSub server. The + * maximum number of RPCs is equal to this factor multiplied by the number of available cpu cores. + * Increasing this factor can increase the rate of events provided by the Source to the + * application. + * @param durationPerAckExtension + * Ack deadlines are extended for this duration. For common-streams apps this should be set + * slightly larger than the maximum time we expect between app receiving the message and acking + * the message. If a message is ever held by the app for longer than expected, then it's ok: the + * Source will re-extend the ack deadline. + * @param minRemainingAckDeadline + * Controls when ack deadlines are re-extended, for a message that is close to exceeding its ack + * deadline.. For example, if `durationPerAckExtension` is `60 seconds` and + * `minRemainingAckDeadline` is `0.1` then the Source will wait until there is `6 seconds` left of + * the remining deadline, before re-extending the message deadline. + * @param gcpUserAgent + * Name by which to identify Snowplow in the GRPC headers + * @param maxMessagesPerPull + * How many pubsub messages to pull from the server in a single request. + * @param debounceRequests + * Adds an artifical delay between consecutive requests to pubsub for more messages. Under some + * circumstances, this was found to slightly alleviate a problem in which pubsub might re-deliver + * the same messages multiple times. + */ case class PubsubSourceConfig( subscription: PubsubSourceConfig.Subscription, parallelPullFactor: BigDecimal, - bufferMaxBytes: Int, - maxAckExtensionPeriod: FiniteDuration, - minDurationPerAckExtension: FiniteDuration, - maxDurationPerAckExtension: FiniteDuration, + durationPerAckExtension: FiniteDuration, + minRemainingAckDeadline: BigDecimal, gcpUserAgent: GcpUserAgent, - shutdownTimeout: FiniteDuration, - maxPullsPerTransportChannel: Int + maxMessagesPerPull: Int, + debounceRequests: FiniteDuration ) object PubsubSourceConfig { case class Subscription(projectId: String, subscriptionId: String) + object Subscription { + implicit def show: Show[Subscription] = Show[Subscription] { s => + ProjectSubscriptionName.of(s.projectId, s.subscriptionId).toString + } + } + private implicit def subscriptionDecoder: Decoder[Subscription] = Decoder.decodeString .map(_.split("/")) diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/Utils.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/Utils.scala new file mode 100644 index 00000000..56d8e49d --- /dev/null +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/Utils.scala @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.sources.pubsub + +import cats.effect.{Async, Sync} +import cats.implicits._ +import org.typelevel.log4cats.Logger + +import com.google.cloud.pubsub.v1.stub.SubscriberStub +import com.google.pubsub.v1.ModifyAckDeadlineRequest +import com.snowplowanalytics.snowplow.sources.pubsub.PubsubRetryOps.implicits._ +import com.snowplowanalytics.snowplow.pubsub.FutureInterop + +import scala.concurrent.duration.FiniteDuration +import scala.jdk.CollectionConverters._ + +private[pubsub] object Utils { + + def modAck[F[_]: Async: Logger]( + subscription: PubsubSourceConfig.Subscription, + stub: SubscriberStub, + ackIds: Vector[String], + duration: FiniteDuration + ): F[Unit] = + ackIds.grouped(1000).toVector.traverse_ { group => + val request = ModifyAckDeadlineRequest.newBuilder + .setSubscription(subscription.show) + .addAllAckIds(group.asJava) + .setAckDeadlineSeconds(duration.toSeconds.toInt) + .build + val io = for { + _ <- Logger[F].debug(s"Modifying ack deadline for ${ackIds.length} messages by ${duration.toSeconds} seconds") + apiFuture <- Sync[F].delay(stub.modifyAckDeadlineCallable.futureCall(request)) + _ <- FutureInterop.fromFuture_(apiFuture) + } yield () + + io.retryingOnTransientGrpcFailures + .recoveringOnGrpcInvalidArgument { s => + // This can happen if ack IDs were acked before we modAcked + Logger[F].info(s"Ignoring error from GRPC when modifying ack IDs: ${s.getDescription}") + } + } + +} diff --git a/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfigSpec.scala b/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfigSpec.scala index 94b9c3f2..4fab8b74 100644 --- a/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfigSpec.scala +++ b/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfigSpec.scala @@ -41,15 +41,13 @@ class PubsubSourceConfigSpec extends Specification { val result = ConfigFactory.load(ConfigFactory.parseString(input)) val expected = PubsubSourceConfig( - subscription = PubsubSourceConfig.Subscription("my-project", "my-subscription"), - parallelPullFactor = BigDecimal(0.5), - bufferMaxBytes = 10000000, - maxAckExtensionPeriod = 1.hour, - minDurationPerAckExtension = 1.minute, - maxDurationPerAckExtension = 10.minutes, - gcpUserAgent = GcpUserAgent("Snowplow OSS", "example-version"), - shutdownTimeout = 30.seconds, - maxPullsPerTransportChannel = 16 + subscription = PubsubSourceConfig.Subscription("my-project", "my-subscription"), + parallelPullFactor = BigDecimal(0.5), + durationPerAckExtension = 1.minute, + minRemainingAckDeadline = BigDecimal(0.1), + gcpUserAgent = GcpUserAgent("Snowplow OSS", "example-version"), + maxMessagesPerPull = 1000, + debounceRequests = 100.millis ) result.as[Wrapper] must beRight.like { case w: Wrapper =>