From 1aec1448ca77603972fd2a0ba9d3d8103db51119 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Tue, 1 Oct 2024 22:41:27 +0100 Subject: [PATCH] Pubsub source open more transport channels (#92) Previously the pubsub SDK opened just a single transport channel (roughly equivalent to a TCP channel). Some snowplow apps we need to batch up a very large number of events from the source before acking. In those circumstances it is good to open more streaming pulls (a type of RPC) to keep the pubsub subscription healthy. But to open more streaming pulls we also need to open more transport channels (TCP channels) to support all the RPCs. This commit adds a new config parameter `maxPullsPerTransportChannel`. The default value 16 is good for most apps. --- .../pubsub/src/main/resources/reference.conf | 1 + .../sources/pubsub/PubsubSource.scala | 26 ++++++++++++++++++- .../sources/pubsub/PubsubSourceConfig.scala | 3 ++- .../pubsub/PubsubSourceConfigSpec.scala | 17 ++++++------ 4 files changed, 37 insertions(+), 10 deletions(-) diff --git a/modules/pubsub/src/main/resources/reference.conf b/modules/pubsub/src/main/resources/reference.conf index c7f9e5df..dc03ea3c 100644 --- a/modules/pubsub/src/main/resources/reference.conf +++ b/modules/pubsub/src/main/resources/reference.conf @@ -10,6 +10,7 @@ snowplow.defaults: { productName: "Snowplow OSS" } shutdownTimeout: "30 seconds" + maxPullsPerTransportChannel: 16 } } diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala index 6c3c901e..eaacea4e 100644 --- a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSource.scala @@ -21,7 +21,8 @@ import java.time.Instant import com.google.api.core.ApiService import com.google.api.gax.batching.FlowControlSettings import com.google.api.gax.core.FixedExecutorProvider -import com.google.cloud.pubsub.v1.{AckReplyConsumer, MessageReceiver, Subscriber} +import com.google.api.gax.grpc.ChannelPoolSettings +import com.google.cloud.pubsub.v1.{AckReplyConsumer, MessageReceiver, Subscriber, SubscriptionAdminSettings} import com.google.common.util.concurrent.{ForwardingExecutorService, ListeningExecutorService, MoreExecutors} import com.google.pubsub.v1.{ProjectSubscriptionName, PubsubMessage} import org.threeten.bp.{Duration => ThreetenDuration} @@ -191,6 +192,7 @@ object PubsubSource { for { direct <- executorResource(Sync[F].delay(MoreExecutors.newDirectExecutorService())) parallelPullCount = chooseNumParallelPulls(config) + channelCount = chooseNumTransportChannels(config, parallelPullCount) executor <- executorResource(Sync[F].delay(Executors.newScheduledThreadPool(2 * parallelPullCount))) receiver = messageReceiver(config, control) name = ProjectSubscriptionName.of(config.subscription.projectId, config.subscription.subscriptionId) @@ -208,6 +210,16 @@ object PubsubSource { FlowControlSettings.getDefaultInstance } .setHeaderProvider(GcpUserAgent.headerProvider(config.gcpUserAgent)) + .setChannelProvider { + SubscriptionAdminSettings.defaultGrpcTransportProviderBuilder + .setMaxInboundMessageSize(20 << 20) // copies Subscriber hard-coded default + .setMaxInboundMetadataSize(20 << 20) // copies Subscriber hard-coded default + .setKeepAliveTime(ThreetenDuration.ofMinutes(5)) // copies Subscriber hard-coded default + .setChannelPoolSettings { + ChannelPoolSettings.staticallySized(channelCount) + } + .build + } .build }) _ <- Resource.eval(Sync[F].delay { @@ -327,4 +339,16 @@ object PubsubSource { .setScale(0, BigDecimal.RoundingMode.UP) .toInt + /** + * Picks a sensible number of GRPC transport channels (roughly equivalent to a TCP connection) + * + * GRPC has a hard limit of 100 concurrent RPCs on a channel. And experience shows it is healthy + * to stay much under that limit. If we need to open a large number of streaming pulls then we + * might approach/exceed that limit. + */ + private def chooseNumTransportChannels(config: PubsubSourceConfig, parallelPullCount: Int): Int = + (BigDecimal(parallelPullCount) / config.maxPullsPerTransportChannel) + .setScale(0, BigDecimal.RoundingMode.UP) + .toInt + } diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfig.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfig.scala index 50f45a96..0b0e8719 100644 --- a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfig.scala +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfig.scala @@ -23,7 +23,8 @@ case class PubsubSourceConfig( minDurationPerAckExtension: FiniteDuration, maxDurationPerAckExtension: FiniteDuration, gcpUserAgent: GcpUserAgent, - shutdownTimeout: FiniteDuration + shutdownTimeout: FiniteDuration, + maxPullsPerTransportChannel: Int ) object PubsubSourceConfig { diff --git a/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfigSpec.scala b/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfigSpec.scala index eaf7a3c8..94b9c3f2 100644 --- a/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfigSpec.scala +++ b/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/sources/pubsub/PubsubSourceConfigSpec.scala @@ -41,14 +41,15 @@ class PubsubSourceConfigSpec extends Specification { val result = ConfigFactory.load(ConfigFactory.parseString(input)) val expected = PubsubSourceConfig( - subscription = PubsubSourceConfig.Subscription("my-project", "my-subscription"), - parallelPullFactor = BigDecimal(0.5), - bufferMaxBytes = 10000000, - maxAckExtensionPeriod = 1.hour, - minDurationPerAckExtension = 1.minute, - maxDurationPerAckExtension = 10.minutes, - gcpUserAgent = GcpUserAgent("Snowplow OSS", "example-version"), - shutdownTimeout = 30.seconds + subscription = PubsubSourceConfig.Subscription("my-project", "my-subscription"), + parallelPullFactor = BigDecimal(0.5), + bufferMaxBytes = 10000000, + maxAckExtensionPeriod = 1.hour, + minDurationPerAckExtension = 1.minute, + maxDurationPerAckExtension = 10.minutes, + gcpUserAgent = GcpUserAgent("Snowplow OSS", "example-version"), + shutdownTimeout = 30.seconds, + maxPullsPerTransportChannel = 16 ) result.as[Wrapper] must beRight.like { case w: Wrapper =>