Skip to content

Commit

Permalink
Repeater: PubSub ack extensions should match backoff delay
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Apr 12, 2024
1 parent b1ff197 commit 0fb67cf
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import com.snowplowanalytics.snowplow.badrows.Processor
import cats.effect._
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import scala.concurrent.duration.DurationInt

object Repeater extends IOApp {

Expand All @@ -34,7 +35,8 @@ object Repeater extends IOApp {
resources.env.projectId,
resources.env.config.input.subscription,
resources.uninsertable,
resources.env.gcpUserAgent
resources.env.gcpUserAgent,
command.backoffPeriod.seconds
)
.interruptWhen(resources.stop)
.through[IO, Unit](Flow.sink(resources))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import com.snowplowanalytics.snowplow.storage.bigquery.common.config.AllAppsConf
import com.snowplowanalytics.snowplow.storage.bigquery.common.createGcpUserAgentHeader
import fs2.Stream
import org.typelevel.log4cats.Logger
import org.threeten.bp.{Duration => ThreetenDuration}
import scala.concurrent.duration.{DurationInt, FiniteDuration}

/** Module responsible for reading Pub/Sub */
object PubSub {
Expand All @@ -33,18 +35,26 @@ object PubSub {
projectId: String,
subscription: String,
uninsertable: Queue[F, BadRow],
gcpUserAgent: GcpUserAgent
gcpUserAgent: GcpUserAgent,
backoffPeriod: FiniteDuration
): Stream[F, ConsumerRecord[F, EventContainer]] =
PubsubGoogleConsumer.subscribe[F, EventContainer](
Model.ProjectId(projectId),
Model.Subscription(subscription),
(msg, err, ack, _) => callback[F](msg, err, ack, uninsertable),
PubsubGoogleConsumerConfig[F](
onFailedTerminate = t => Logger[F].error(s"Terminating consumer due to $t"),
customizeSubscriber = Some(_.setHeaderProvider(createGcpUserAgentHeader(gcpUserAgent)))
onFailedTerminate = t => Logger[F].error(s"Terminating consumer due to $t"),
customizeSubscriber = Some {
_.setHeaderProvider(createGcpUserAgentHeader(gcpUserAgent))
.setMaxAckExtensionPeriod(convertDuration(backoffPeriod.min(1.hour)))
.setMinDurationPerAckExtension(convertDuration(backoffPeriod.min(600.seconds).minus(1.second)))
}
)
)

private def convertDuration(d: FiniteDuration): ThreetenDuration =
ThreetenDuration.ofMillis(d.toMillis)

private def callback[F[_]: Sync](msg: PubsubMessage, err: Throwable, ack: F[Unit], uninsertable: Queue[F, BadRow]) = {
val info = FailureDetails.LoaderRecoveryError.ParsingError(err.toString, Nil)
val failure = Failure.LoaderRecoveryFailure(info)
Expand Down

0 comments on commit 0fb67cf

Please sign in to comment.