Skip to content

Commit

Permalink
Pubsub source open more transport channels (#92)
Browse files Browse the repository at this point in the history
Previously the pubsub SDK opened just a single transport channel
(roughly equivalent to a TCP channel).

Some snowplow apps we need to batch up a very large number of events
from the source before acking.  In those circumstances it is good to
open more streaming pulls (a type of RPC) to keep the pubsub
subscription healthy.  But to open more streaming pulls we also need to
open more transport channels (TCP channels) to support all the RPCs.

This commit adds a new config parameter `maxPullsPerTransportChannel`.
The default value 16 is good for most apps.
  • Loading branch information
istreeter authored Oct 1, 2024
1 parent 2c71497 commit 1aec144
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 10 deletions.
1 change: 1 addition & 0 deletions modules/pubsub/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ snowplow.defaults: {
productName: "Snowplow OSS"
}
shutdownTimeout: "30 seconds"
maxPullsPerTransportChannel: 16
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import java.time.Instant
import com.google.api.core.ApiService
import com.google.api.gax.batching.FlowControlSettings
import com.google.api.gax.core.FixedExecutorProvider
import com.google.cloud.pubsub.v1.{AckReplyConsumer, MessageReceiver, Subscriber}
import com.google.api.gax.grpc.ChannelPoolSettings
import com.google.cloud.pubsub.v1.{AckReplyConsumer, MessageReceiver, Subscriber, SubscriptionAdminSettings}
import com.google.common.util.concurrent.{ForwardingExecutorService, ListeningExecutorService, MoreExecutors}
import com.google.pubsub.v1.{ProjectSubscriptionName, PubsubMessage}
import org.threeten.bp.{Duration => ThreetenDuration}
Expand Down Expand Up @@ -191,6 +192,7 @@ object PubsubSource {
for {
direct <- executorResource(Sync[F].delay(MoreExecutors.newDirectExecutorService()))
parallelPullCount = chooseNumParallelPulls(config)
channelCount = chooseNumTransportChannels(config, parallelPullCount)
executor <- executorResource(Sync[F].delay(Executors.newScheduledThreadPool(2 * parallelPullCount)))
receiver = messageReceiver(config, control)
name = ProjectSubscriptionName.of(config.subscription.projectId, config.subscription.subscriptionId)
Expand All @@ -208,6 +210,16 @@ object PubsubSource {
FlowControlSettings.getDefaultInstance
}
.setHeaderProvider(GcpUserAgent.headerProvider(config.gcpUserAgent))
.setChannelProvider {
SubscriptionAdminSettings.defaultGrpcTransportProviderBuilder
.setMaxInboundMessageSize(20 << 20) // copies Subscriber hard-coded default
.setMaxInboundMetadataSize(20 << 20) // copies Subscriber hard-coded default
.setKeepAliveTime(ThreetenDuration.ofMinutes(5)) // copies Subscriber hard-coded default
.setChannelPoolSettings {
ChannelPoolSettings.staticallySized(channelCount)
}
.build
}
.build
})
_ <- Resource.eval(Sync[F].delay {
Expand Down Expand Up @@ -327,4 +339,16 @@ object PubsubSource {
.setScale(0, BigDecimal.RoundingMode.UP)
.toInt

/**
* Picks a sensible number of GRPC transport channels (roughly equivalent to a TCP connection)
*
* GRPC has a hard limit of 100 concurrent RPCs on a channel. And experience shows it is healthy
* to stay much under that limit. If we need to open a large number of streaming pulls then we
* might approach/exceed that limit.
*/
private def chooseNumTransportChannels(config: PubsubSourceConfig, parallelPullCount: Int): Int =
(BigDecimal(parallelPullCount) / config.maxPullsPerTransportChannel)
.setScale(0, BigDecimal.RoundingMode.UP)
.toInt

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ case class PubsubSourceConfig(
minDurationPerAckExtension: FiniteDuration,
maxDurationPerAckExtension: FiniteDuration,
gcpUserAgent: GcpUserAgent,
shutdownTimeout: FiniteDuration
shutdownTimeout: FiniteDuration,
maxPullsPerTransportChannel: Int
)

object PubsubSourceConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,15 @@ class PubsubSourceConfigSpec extends Specification {
val result = ConfigFactory.load(ConfigFactory.parseString(input))

val expected = PubsubSourceConfig(
subscription = PubsubSourceConfig.Subscription("my-project", "my-subscription"),
parallelPullFactor = BigDecimal(0.5),
bufferMaxBytes = 10000000,
maxAckExtensionPeriod = 1.hour,
minDurationPerAckExtension = 1.minute,
maxDurationPerAckExtension = 10.minutes,
gcpUserAgent = GcpUserAgent("Snowplow OSS", "example-version"),
shutdownTimeout = 30.seconds
subscription = PubsubSourceConfig.Subscription("my-project", "my-subscription"),
parallelPullFactor = BigDecimal(0.5),
bufferMaxBytes = 10000000,
maxAckExtensionPeriod = 1.hour,
minDurationPerAckExtension = 1.minute,
maxDurationPerAckExtension = 10.minutes,
gcpUserAgent = GcpUserAgent("Snowplow OSS", "example-version"),
shutdownTimeout = 30.seconds,
maxPullsPerTransportChannel = 16
)

result.as[Wrapper] must beRight.like { case w: Wrapper =>
Expand Down

0 comments on commit 1aec144

Please sign in to comment.