diff --git a/bridge/svix-bridge/src/webhook_receiver/mod.rs b/bridge/svix-bridge/src/webhook_receiver/mod.rs index 9fb2bb323..47fc76ef2 100644 --- a/bridge/svix-bridge/src/webhook_receiver/mod.rs +++ b/bridge/svix-bridge/src/webhook_receiver/mod.rs @@ -250,10 +250,11 @@ impl PollerReceiverConfig { } async fn run_inner(poller: &SvixEventsPoller) -> ! { - // How long to wait if the request comes back non-2xx. - const ERROR_SLEEP: Duration = Duration::from_secs(3); - // FIXME: exponential back off - let mut interval = tokio::time::interval(Duration::from_secs(3)); + const MIN_SLEEP: Duration = Duration::from_millis(10); + const MAX_SLEEP: Duration = Duration::from_secs(300); + const NO_SLEEP: Duration = Duration::ZERO; + let mut sleep_time = NO_SLEEP; + let PollerInputOpts::SvixEvents { app_id, subscription_id, @@ -261,9 +262,9 @@ async fn run_inner(poller: &SvixEventsPoller) -> ! { } = &poller.input_opts; let mut iterator = None; + loop { - interval.tick().await; - tracing::trace!(app_id, subscription_id, "poll /events"); + tracing::trace!(app_id, subscription_id, "polling `/events`"); match poller .svix_client .message() @@ -279,11 +280,12 @@ async fn run_inner(poller: &SvixEventsPoller) -> ! { .await { Ok(resp) => { - tracing::trace!("got messages, handling..."); + tracing::trace!("got messages - handling..."); for msg in resp.data.into_iter() { let payload = match parse_payload( &SerializablePayload::Standard( - // FIXME: for svix-event pollers we already know the payload is json so there's some redundant ser/deser cycles being lost + // FIXME: for svix-event pollers we already know the payload is json so + // there's some wasted ser/deser/ser cycles. serde_json::to_vec(&msg) .expect("just fetched as json, must be serializable"), ), @@ -300,7 +302,9 @@ async fn run_inner(poller: &SvixEventsPoller) -> ! { status = status.as_u16(), "error while parsing polled message" ); - // Retry the current iterator + // BACKOFF + sleep_time = (sleep_time * 2).clamp(MIN_SLEEP, MAX_SLEEP); + // Retry the current iterator. continue; } Ok(p) => p, @@ -311,6 +315,8 @@ async fn run_inner(poller: &SvixEventsPoller) -> ! { status = status.as_u16(), "error while handling polled message" ); + // BACKOFF + sleep_time = (sleep_time * 2).clamp(MIN_SLEEP, MAX_SLEEP); } } tracing::trace!( @@ -320,16 +326,30 @@ async fn run_inner(poller: &SvixEventsPoller) -> ! { ); // Update the iterator _after we've handled all the messages in the batch_. iterator = Some(resp.iterator.clone()); + // If the iterator is "done" we can backoff to wait for new messages to arrive. + sleep_time = if resp.done { + // BACKOFF + (sleep_time * 2).clamp(MIN_SLEEP, MAX_SLEEP) + } else { + NO_SLEEP + }; } + Err(err) => { tracing::trace!( error =? err, iterator =? &iterator, "request failed, retrying current iterator" ); - tokio::time::sleep(ERROR_SLEEP).await; + // BACKOFF + sleep_time = (sleep_time * 2).clamp(MIN_SLEEP, MAX_SLEEP); } } + + if !sleep_time.is_zero() { + tracing::trace!(sleep_time=?sleep_time, "sleeping"); + tokio::time::sleep(sleep_time).await; + } } }