Skip to content

Commit

Permalink
PubSub Source using Unary Pull (#101)
Browse files Browse the repository at this point in the history
The previous implementation of the PubSub Source was a wrapper around
`Subscriber` provided by the 3rd-party pubsub sdk. That `Subscriber` is
a wrapper around a lower-level GRPC stub. It used the "Streaming Pull"
GRPC method to fetch messages from PubSub.

This commit abandons using the `Subscriber` and instead wraps the GRPC
stub directly. It uses the "Unary Pull" GRPC method to fetch messages
from PubSub.

We found that "Unary Pull" alleviates a problem in which PubSub
occasionally re-delivers the same messages, causing downstream
duplicates. The problem happened especially in apps like Lake Loader,
which builds up a very large number of un-acked messages and then acks
them all in one go at the end of a timed window.

Compared with the previous Source implementation it has these
differences in behaviour:

- Previously, the 3rd-party `Subscriber` managed ack extensions (a.k.a.
  modifying ack deadlines). Ack extension periods were adjusted
  dynamically according to runtime heuristics of message processing
  times. Whereas in this new Source, the ack extension period is a fixed
  configurable period.
- Previously, the `Subscriber` periodically mod-acked all unacked
  messages currently held in memory. Whereas the new Source only
  mod-acks messages when they are approaching their ack deadline. This
  is an improvement for apps like Lake Loader which might have a very
  large number of outstanding unacked messages.
- Unary Pull has _slightly_ worse latency compared to Streaming Pull. I
  consider this ok for common-streams apps, where the latency caused
  by the Source is negligible compared to other latencies in the app.
- Arguably, "Unary Pull" is a neater fit (less hacky) for a
  common-streams Source. Now, we simply pull a batch when a batch is
  needed.
  • Loading branch information
istreeter authored Jan 3, 2025
1 parent 82d3b8a commit d5f509b
Show file tree
Hide file tree
Showing 9 changed files with 491 additions and 300 deletions.
10 changes: 4 additions & 6 deletions modules/pubsub/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ snowplow.defaults: {
sources: {
pubsub: {
parallelPullFactor: 0.5
bufferMaxBytes: 10000000
maxAckExtensionPeriod: "1 hour"
minDurationPerAckExtension: "60 seconds"
maxDurationPerAckExtension: "600 seconds"
durationPerAckExtension: "60 seconds"
minRemainingAckDeadline: 0.1
maxMessagesPerPull: 1000
debounceRequests: "100 millis"
gcpUserAgent: {
productName: "Snowplow OSS"
}
shutdownTimeout: "30 seconds"
maxPullsPerTransportChannel: 16
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import com.google.api.core.{ApiFuture, ApiFutureCallback, ApiFutures}
import com.google.common.util.concurrent.MoreExecutors

object FutureInterop {
def fromFuture[F[_]: Async, A](fut: ApiFuture[A]): F[Unit] =
def fromFuture[F[_]: Async, A](fut: ApiFuture[A]): F[A] =
Async[F]
.async[A] { cb =>
val cancel = Async[F].delay {
Expand All @@ -24,7 +24,9 @@ object FutureInterop {
Some(cancel)
}
}
.void

def fromFuture_[F[_]: Async, A](fut: ApiFuture[A]): F[Unit] =
fromFuture(fut).void

private def addCallback[A](fut: ApiFuture[A], cb: Either[Throwable, A] => Unit): Unit = {
val apiFutureCallback = new ApiFutureCallback[A] {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.sources.pubsub

import java.time.Instant

/**
* Data held about a batch of messages pulled from a pubsub subscription
*
* @param currentDeadline
* The deadline before which we must either ack, nack, or extend the deadline to something further
* in the future. This is updated over time if we approach a deadline.
* @param ackIds
* The IDs which are needed to ack all messages in the batch
*/
private case class PubsubBatchState(
currentDeadline: Instant,
ackIds: Vector[String]
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.sources.pubsub

import cats.implicits._
import cats.effect.kernel.Unique
import cats.effect.{Async, Deferred, Ref, Sync}
import com.google.cloud.pubsub.v1.stub.SubscriberStub
import com.google.pubsub.v1.AcknowledgeRequest
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import scala.jdk.CollectionConverters._
import scala.concurrent.duration.Duration

import com.snowplowanalytics.snowplow.sources.internal.Checkpointer
import com.snowplowanalytics.snowplow.pubsub.FutureInterop
import com.snowplowanalytics.snowplow.sources.pubsub.PubsubRetryOps.implicits._

/**
* The Pubsub checkpointer
*
* @param subscription
* Pubsub subscription name
* @param deferredResources
* Resources needed so we can ack/nack messages. This is wrapped in `Deferred` because the
* resources are not available until the application calls `.stream` on the `LowLevelSource`. This
* is a limitation in the design of the common-streams Source interface.
*/
class PubsubCheckpointer[F[_]: Async](
subscription: PubsubSourceConfig.Subscription,
deferredResources: Deferred[F, PubsubCheckpointer.Resources[F]]
) extends Checkpointer[F, Vector[Unique.Token]] {

import PubsubCheckpointer._

private implicit def logger: Logger[F] = Slf4jLogger.getLogger[F]

override def combine(x: Vector[Unique.Token], y: Vector[Unique.Token]): Vector[Unique.Token] =
x |+| y

override val empty: Vector[Unique.Token] = Vector.empty

/**
* Ack some batches of messages received from pubsub
*
* @param c
* tokens which are keys to batch data held in the shared state
*/
override def ack(c: Vector[Unique.Token]): F[Unit] =
for {
Resources(stub, refAckIds) <- deferredResources.get
ackDatas <- refAckIds.modify(m => (m -- c, c.flatMap(m.get)))
_ <- ackDatas.flatMap(_.ackIds).grouped(1000).toVector.traverse_ { ackIds =>
val request = AcknowledgeRequest.newBuilder.setSubscription(subscription.show).addAllAckIds(ackIds.asJava).build
val attempt = for {
apiFuture <- Sync[F].delay(stub.acknowledgeCallable.futureCall(request))
_ <- FutureInterop.fromFuture[F, com.google.protobuf.Empty](apiFuture)
} yield ()
attempt.retryingOnTransientGrpcFailures
.recoveringOnGrpcInvalidArgument { s =>
// This can happen if ack IDs have expired before we acked
Logger[F].info(s"Ignoring error from GRPC when acking: ${s.getDescription}")
}
}
} yield ()

/**
* Nack some batches of messages received from pubsub
*
* @param c
* tokens which are keys to batch data held in the shared state
*/
override def nack(c: Vector[Unique.Token]): F[Unit] =
for {
Resources(stub, refAckIds) <- deferredResources.get
ackDatas <- refAckIds.modify(m => (m -- c, c.flatMap(m.get)))
ackIds = ackDatas.flatMap(_.ackIds)
// A nack is just a modack with zero duration
_ <- Utils.modAck[F](subscription, stub, ackIds, Duration.Zero)
} yield ()
}

private object PubsubCheckpointer {

/**
* Resources needed by `PubsubCheckpointer` so it can ack/nack messages
*
* @param stub
* The GRPC stub needed to execute the ack/nack RPCs
* @param refState
* A map from tokens to the data held about a batch of messages received from pubsub. The map is
* wrapped in a `Ref` because it is concurrently modified by the source adding new batches to
* the state.
*/
case class Resources[F[_]](stub: SubscriberStub, refState: Ref[F, Map[Unique.Token, PubsubBatchState]])

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.sources.pubsub

import cats.implicits._
import cats.effect.Async
import com.google.api.gax.rpc.{ApiException, StatusCode}
import io.grpc.Status
import org.typelevel.log4cats.Logger
import retry.RetryPolicies
import retry.implicits._

import scala.concurrent.duration.DurationDouble

private[pubsub] object PubsubRetryOps {

object implicits {
implicit class Ops[F[_], A](val f: F[A]) extends AnyVal {

def retryingOnTransientGrpcFailures(implicit F: Async[F], L: Logger[F]): F[A] =
f.retryingOnSomeErrors(
isWorthRetrying = { e => isRetryableException(e).pure[F] },
policy = RetryPolicies.fullJitter(1.second),
onError = { case (t, _) =>
Logger[F].info(t)(s"Pubsub retryable GRPC error will be retried: ${t.getMessage}")
}
)

def recoveringOnGrpcInvalidArgument(f2: Status => F[A])(implicit F: Async[F]): F[A] =
f.recoverWith {
case StatusFromThrowable(s) if s.getCode.equals(Status.Code.INVALID_ARGUMENT) =>
f2(s)
}
}
}

private object StatusFromThrowable {
def unapply(t: Throwable): Option[Status] =
Some(Status.fromThrowable(t))
}

def isRetryableException: Throwable => Boolean = {
case apiException: ApiException =>
apiException.getStatusCode.getCode match {
case StatusCode.Code.DEADLINE_EXCEEDED => true
case StatusCode.Code.INTERNAL => true
case StatusCode.Code.CANCELLED => true
case StatusCode.Code.RESOURCE_EXHAUSTED => true
case StatusCode.Code.ABORTED => true
case StatusCode.Code.UNKNOWN => true
case StatusCode.Code.UNAVAILABLE => !apiException.getMessage().contains("Server shutdownNow invoked")
case _ => false
}
case _ =>
false
}
}
Loading

0 comments on commit d5f509b

Please sign in to comment.