Skip to content

Commit

Permalink
Bridge: factor backoffs to make sure we set sleep_time consistently
Browse files Browse the repository at this point in the history
There were some gaps in the initial implementation. This switches to
setting a flag within the message handling loop and breaking the loop so
that the sleep time penalty (or reset) can be assessed at the bottom of
the loop more consistently.

Mostly this is about making sure any single message that has a problem
makes the whole iterator batch retry.
  • Loading branch information
svix-onelson committed Jul 11, 2024
1 parent 6bfccbe commit 5536e9a
Showing 1 changed file with 26 additions and 19 deletions.
45 changes: 26 additions & 19 deletions bridge/svix-bridge/src/webhook_receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,9 @@ async fn run_inner(poller: &SvixEventsPoller) -> ! {
.await
{
Ok(resp) => {
let mut has_failure = false;
tracing::trace!(count = resp.data.len(), "got messages");
for msg in resp.data.into_iter() {
'inner: for msg in resp.data.into_iter() {
let payload = match parse_payload(
&SerializablePayload::Standard(
// FIXME: for svix-event pollers we already know the payload is json so
Expand All @@ -309,10 +310,8 @@ async fn run_inner(poller: &SvixEventsPoller) -> ! {
status = status.as_u16(),
"error while parsing polled message"
);
// BACKOFF
sleep_time = (sleep_time * 2).clamp(MIN_SLEEP, MAX_SLEEP);
// Retry the current iterator.
continue;
has_failure = true;
break 'inner;
}
Ok(p) => p,
};
Expand All @@ -322,24 +321,32 @@ async fn run_inner(poller: &SvixEventsPoller) -> ! {
status = status.as_u16(),
"error while handling polled message"
);
// BACKOFF
sleep_time = (sleep_time * 2).clamp(MIN_SLEEP, MAX_SLEEP);
has_failure = true;
break 'inner;
}
}
tracing::trace!(
?iterator,
next_iterator = ?resp.iterator,
"batch handled, updating local iterator"
);
// Update the iterator _after we've handled all the messages in the batch_.
iterator = Some(resp.iterator.clone());
// If the iterator is "done" we can backoff to wait for new messages to arrive.
sleep_time = if resp.done {

// Retry the current iterator if we see failures while handling any of the messages
// in the batch.
if has_failure {
// BACKOFF
(sleep_time * 2).clamp(MIN_SLEEP, MAX_SLEEP)
sleep_time = (sleep_time * 2).clamp(MIN_SLEEP, MAX_SLEEP);
} else {
NO_SLEEP
};
tracing::trace!(
?iterator,
next_iterator = ?resp.iterator,
"batch handled, updating local iterator"
);
// Update the iterator _after we've handled all the messages in the batch_.
iterator = Some(resp.iterator.clone());
// If the iterator is "done" we can backoff to wait for new messages to arrive.
sleep_time = if resp.done {
// BACKOFF
(sleep_time * 2).clamp(MIN_SLEEP, MAX_SLEEP)
} else {
NO_SLEEP
};
}
}

Err(err) => {
Expand Down

0 comments on commit 5536e9a

Please sign in to comment.