From 191d9b3e31d3e89c5b44e85ec41ee6daafa663cd Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Fri, 12 Apr 2024 11:52:27 +0100 Subject: [PATCH] Repeater: PubSub ack extensions should match backoff delay --- .../Flow.scala | 7 +- .../Repeater.scala | 4 +- .../services/PubSub.scala | 76 +++++++++++++++++-- 3 files changed, 77 insertions(+), 10 deletions(-) diff --git a/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Flow.scala b/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Flow.scala index 9493b880..a7aba152 100644 --- a/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Flow.scala +++ b/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Flow.scala @@ -100,8 +100,11 @@ object Flow { .as(Inserted.asInstanceOf[InsertStatus]) .value } else { - Logger[F].debug(s"Event ${event.value.eventId}/${event.value.etlTstamp} is not ready yet. Nack") >> - event.nack.as(Retry.asInstanceOf[InsertStatus].asRight) + Logger[F] + .debug( + s"Event ${event.value.eventId}/${event.value.etlTstamp} is not ready yet. Ignoring it so PubSub re-sends it later." + ) + .as(Retry.asInstanceOf[InsertStatus].asRight) } } } yield result diff --git a/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Repeater.scala b/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Repeater.scala index 83a6a4b2..59297b35 100644 --- a/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Repeater.scala +++ b/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Repeater.scala @@ -17,6 +17,7 @@ import com.snowplowanalytics.snowplow.badrows.Processor import cats.effect._ import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger +import scala.concurrent.duration.DurationInt object Repeater extends IOApp { @@ -34,7 +35,8 @@ object Repeater extends IOApp { resources.env.projectId, resources.env.config.input.subscription, resources.uninsertable, - resources.env.gcpUserAgent + resources.env.gcpUserAgent, + command.backoffPeriod.seconds ) .interruptWhen(resources.stop) .through[IO, Unit](Flow.sink(resources)) diff --git a/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/services/PubSub.scala b/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/services/PubSub.scala index f5a5f124..65f19d2a 100644 --- a/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/services/PubSub.scala +++ b/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/services/PubSub.scala @@ -12,18 +12,26 @@ */ package com.snowplowanalytics.snowplow.storage.bigquery.repeater.services -import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, FailureDetails, Payload} -import com.snowplowanalytics.snowplow.storage.bigquery.repeater.{EventContainer, Repeater} import cats.effect._ import cats.effect.std.Queue import cats.syntax.all._ +import fs2.Stream +import org.typelevel.log4cats.Logger +import org.threeten.bp.{Duration => ThreetenDuration} import com.google.pubsub.v1.PubsubMessage +import com.google.api.gax.core.ExecutorProvider +import com.google.api.gax.batching.FlowControlSettings +import com.google.common.util.concurrent.{ForwardingListeningExecutorService, MoreExecutors} + import com.permutive.pubsub.consumer.{ConsumerRecord, Model} import com.permutive.pubsub.consumer.grpc.{PubsubGoogleConsumer, PubsubGoogleConsumerConfig} +import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, FailureDetails, Payload} import com.snowplowanalytics.snowplow.storage.bigquery.common.config.AllAppsConfig.GcpUserAgent import com.snowplowanalytics.snowplow.storage.bigquery.common.createGcpUserAgentHeader -import fs2.Stream -import org.typelevel.log4cats.Logger +import com.snowplowanalytics.snowplow.storage.bigquery.repeater.{EventContainer, Repeater} + +import scala.concurrent.duration.{DurationInt, FiniteDuration} +import java.util.concurrent.{Callable, ScheduledExecutorService, ScheduledFuture, ScheduledThreadPoolExecutor, TimeUnit} /** Module responsible for reading Pub/Sub */ object PubSub { @@ -33,18 +41,72 @@ object PubSub { projectId: String, subscription: String, uninsertable: Queue[F, BadRow], - gcpUserAgent: GcpUserAgent + gcpUserAgent: GcpUserAgent, + backoffPeriod: FiniteDuration ): Stream[F, ConsumerRecord[F, EventContainer]] = PubsubGoogleConsumer.subscribe[F, EventContainer]( Model.ProjectId(projectId), Model.Subscription(subscription), (msg, err, ack, _) => callback[F](msg, err, ack, uninsertable), PubsubGoogleConsumerConfig[F]( - onFailedTerminate = t => Logger[F].error(s"Terminating consumer due to $t"), - customizeSubscriber = Some(_.setHeaderProvider(createGcpUserAgentHeader(gcpUserAgent))) + onFailedTerminate = t => Logger[F].error(s"Terminating consumer due to $t"), + customizeSubscriber = Some { + _.setHeaderProvider(createGcpUserAgentHeader(gcpUserAgent)) + .setMaxAckExtensionPeriod(convertDuration(backoffPeriod.min(1.hour))) + .setMinDurationPerAckExtension(convertDuration(backoffPeriod.min(600.seconds).minus(1.second))) + .setExecutorProvider { + new ExecutorProvider { + def shouldAutoClose: Boolean = true + def getExecutor: ScheduledExecutorService = scheduledExecutorService + } + } + .setFlowControlSettings { + // Switch off any flow control, because we handle it ourselves via fs2's backpressure + FlowControlSettings.getDefaultInstance + } + } ) ) + private def convertDuration(d: FiniteDuration): ThreetenDuration = + ThreetenDuration.ofMillis(d.toMillis) + + def scheduledExecutorService: ScheduledExecutorService = + new ForwardingListeningExecutorService with ScheduledExecutorService { + val delegate = MoreExecutors.newDirectExecutorService + lazy val scheduler = new ScheduledThreadPoolExecutor(1) // I think this scheduler is never used, but I implement it here for safety + override def schedule[V]( + callable: Callable[V], + delay: Long, + unit: TimeUnit + ): ScheduledFuture[V] = + scheduler.schedule(callable, delay, unit) + override def schedule( + runnable: Runnable, + delay: Long, + unit: TimeUnit + ): ScheduledFuture[_] = + scheduler.schedule(runnable, delay, unit) + override def scheduleAtFixedRate( + runnable: Runnable, + initialDelay: Long, + period: Long, + unit: TimeUnit + ): ScheduledFuture[_] = + scheduler.scheduleAtFixedRate(runnable, initialDelay, period, unit) + override def scheduleWithFixedDelay( + runnable: Runnable, + initialDelay: Long, + delay: Long, + unit: TimeUnit + ): ScheduledFuture[_] = + scheduler.scheduleWithFixedDelay(runnable, initialDelay, delay, unit) + override def shutdown(): Unit = { + delegate.shutdown() + scheduler.shutdown() + } + } + private def callback[F[_]: Sync](msg: PubsubMessage, err: Throwable, ack: F[Unit], uninsertable: Queue[F, BadRow]) = { val info = FailureDetails.LoaderRecoveryError.ParsingError(err.toString, Nil) val failure = Failure.LoaderRecoveryFailure(info)