Skip to content

Commit

Permalink
exponential backoff
Browse files Browse the repository at this point in the history
  • Loading branch information
svix-onelson committed Jul 9, 2024
1 parent 5e2aaef commit 681bbf1
Showing 1 changed file with 30 additions and 10 deletions.
40 changes: 30 additions & 10 deletions bridge/svix-bridge/src/webhook_receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,20 +250,21 @@ impl PollerReceiverConfig {
}

async fn run_inner(poller: &SvixEventsPoller) -> ! {
// How long to wait if the request comes back non-2xx.
const ERROR_SLEEP: Duration = Duration::from_secs(3);
// FIXME: exponential back off
let mut interval = tokio::time::interval(Duration::from_secs(3));
const MIN_SLEEP: Duration = Duration::from_millis(10);
const MAX_SLEEP: Duration = Duration::from_secs(300);
const NO_SLEEP: Duration = Duration::ZERO;
let mut sleep_time = NO_SLEEP;

let PollerInputOpts::SvixEvents {
app_id,
subscription_id,
..
} = &poller.input_opts;

let mut iterator = None;

loop {
interval.tick().await;
tracing::trace!(app_id, subscription_id, "poll /events");
tracing::trace!(app_id, subscription_id, "polling `/events`");
match poller
.svix_client
.message()
Expand All @@ -279,11 +280,12 @@ async fn run_inner(poller: &SvixEventsPoller) -> ! {
.await
{
Ok(resp) => {
tracing::trace!("got messages, handling...");
tracing::trace!("got messages - handling...");
for msg in resp.data.into_iter() {
let payload = match parse_payload(
&SerializablePayload::Standard(
// FIXME: for svix-event pollers we already know the payload is json so there's some redundant ser/deser cycles being lost
// FIXME: for svix-event pollers we already know the payload is json so
// there's some wasted ser/deser/ser cycles.
serde_json::to_vec(&msg)
.expect("just fetched as json, must be serializable"),
),
Expand All @@ -300,7 +302,9 @@ async fn run_inner(poller: &SvixEventsPoller) -> ! {
status = status.as_u16(),
"error while parsing polled message"
);
// Retry the current iterator
// BACKOFF
sleep_time = (sleep_time * 2).clamp(MIN_SLEEP, MAX_SLEEP);
// Retry the current iterator.
continue;
}
Ok(p) => p,
Expand All @@ -311,6 +315,8 @@ async fn run_inner(poller: &SvixEventsPoller) -> ! {
status = status.as_u16(),
"error while handling polled message"
);
// BACKOFF
sleep_time = (sleep_time * 2).clamp(MIN_SLEEP, MAX_SLEEP);
}
}
tracing::trace!(
Expand All @@ -320,16 +326,30 @@ async fn run_inner(poller: &SvixEventsPoller) -> ! {
);
// Update the iterator _after we've handled all the messages in the batch_.
iterator = Some(resp.iterator.clone());
// If the iterator is "done" we can backoff to wait for new messages to arrive.
sleep_time = if resp.done {
// BACKOFF
(sleep_time * 2).clamp(MIN_SLEEP, MAX_SLEEP)
} else {
NO_SLEEP
};
}

Err(err) => {
tracing::trace!(
error =? err,
iterator =? &iterator,
"request failed, retrying current iterator"
);
tokio::time::sleep(ERROR_SLEEP).await;
// BACKOFF
sleep_time = (sleep_time * 2).clamp(MIN_SLEEP, MAX_SLEEP);
}
}

if !sleep_time.is_zero() {
tracing::trace!(sleep_time=?sleep_time, "sleeping");
tokio::time::sleep(sleep_time).await;
}
}
}

Expand Down

0 comments on commit 681bbf1

Please sign in to comment.