diff --git a/bridge/svix-bridge/src/webhook_receiver/mod.rs b/bridge/svix-bridge/src/webhook_receiver/mod.rs index 2b976c3de..afa69ce1f 100644 --- a/bridge/svix-bridge/src/webhook_receiver/mod.rs +++ b/bridge/svix-bridge/src/webhook_receiver/mod.rs @@ -287,8 +287,9 @@ async fn run_inner(poller: &SvixEventsPoller) -> ! { .await { Ok(resp) => { + let mut has_failure = false; tracing::trace!(count = resp.data.len(), "got messages"); - for msg in resp.data.into_iter() { + 'inner: for msg in resp.data.into_iter() { let payload = match parse_payload( &SerializablePayload::Standard( // FIXME: for svix-event pollers we already know the payload is json so @@ -309,10 +310,8 @@ async fn run_inner(poller: &SvixEventsPoller) -> ! { status = status.as_u16(), "error while parsing polled message" ); - // BACKOFF - sleep_time = (sleep_time * 2).clamp(MIN_SLEEP, MAX_SLEEP); - // Retry the current iterator. - continue; + has_failure = true; + break 'inner; } Ok(p) => p, }; @@ -322,24 +321,32 @@ async fn run_inner(poller: &SvixEventsPoller) -> ! { status = status.as_u16(), "error while handling polled message" ); - // BACKOFF - sleep_time = (sleep_time * 2).clamp(MIN_SLEEP, MAX_SLEEP); + has_failure = true; + break 'inner; } } - tracing::trace!( - ?iterator, - next_iterator = ?resp.iterator, - "batch handled, updating local iterator" - ); - // Update the iterator _after we've handled all the messages in the batch_. - iterator = Some(resp.iterator.clone()); - // If the iterator is "done" we can backoff to wait for new messages to arrive. - sleep_time = if resp.done { + + // Retry the current iterator if we see failures while handling any of the messages + // in the batch. + if has_failure { // BACKOFF - (sleep_time * 2).clamp(MIN_SLEEP, MAX_SLEEP) + sleep_time = (sleep_time * 2).clamp(MIN_SLEEP, MAX_SLEEP); } else { - NO_SLEEP - }; + tracing::trace!( + ?iterator, + next_iterator = ?resp.iterator, + "batch handled, updating local iterator" + ); + // Update the iterator _after we've handled all the messages in the batch_. + iterator = Some(resp.iterator.clone()); + // If the iterator is "done" we can backoff to wait for new messages to arrive. + sleep_time = if resp.done { + // BACKOFF + (sleep_time * 2).clamp(MIN_SLEEP, MAX_SLEEP) + } else { + NO_SLEEP + }; + } } Err(err) => {