Skip to content

Commit

Permalink
fix(escalator): improve error handling (#21)
Browse files Browse the repository at this point in the history
seeing two types of errors in the escalator that we don't handle well:
https://cloudlogging.app.goo.gl/weesjEZT3GuREEUu6

the default behaviour is to shut down the escalator for that chain if an
unexpected error occurs, which isn't what we want, so I'm releasing a
new PR to:
- drop the erroring tx instead of shutting down the escalator, if the
error is unknown
- handle the "transaction underpriced" error: means the gas escalation
was insufficient, so the tx is re-added to the monitored tx list with
its old timestamp (but its new gas price), to be re-escalated again
- handle the "already known" error: not sure why this one happens - it
means the same identical tx was submitted twice to the mempool. In this
case we re-add it to the escalator monitor, since we're not yet sure it
landed onchain

As a drive-by change, I realised that if the RPC briefly fails, the
escalator monitor will shut down; now we catch any errors, log them, and
keep trying to escalate instead of terminating
  • Loading branch information
daniel-savu authored Dec 3, 2024
1 parent 12f9681 commit d22cecc
Showing 1 changed file with 57 additions and 19 deletions.
76 changes: 57 additions & 19 deletions ethers-middleware/src/gas_escalator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,19 +267,68 @@ impl<M, E: Clone> EscalationTask<M, E> {
Self { inner, escalator, frequency, txs }
}

/// Handles errors from broadcasting a gas-escalated transaction
///
/// **Returns** `None`, if the transaction is to be dropped from the escalator, `Some` to keep monitoring and escalating it
fn handle_broadcast_error(
err_message: String,
old_monitored_tx: MonitoredTransaction,
new_tx: TypedTransaction,
) -> Option<(H256, Instant)> {
if err_message.contains("nonce too low") {
// may happen if we try to broadcast a new, gas-escalated tx when the original tx
// already landed onchain, meaning we no longer need to escalate it
tracing::warn!(err = err_message, ?old_monitored_tx.hash, ?new_tx, "Nonce error when escalating gas price. Tx may have already been included onchain. Dropping it from escalator");
None
} else if err_message.contains("replacement transaction underpriced") {
// the gas escalation wasn't sufficient
tracing::warn!(
err = err_message,
old_tx = ?old_monitored_tx,
new_tx = ?new_tx,
"Escalated gas price was underpriced, re-adding to escalator"
);
// return the old tx_hash and creation time so the transaction is re-escalated
// as soon as `monitored_txs` are evaluated again
Some((old_monitored_tx.hash, old_monitored_tx.creation_time))
} else if err_message.contains("already known") {
tracing::warn!(
err = err_message,
old_tx = ?old_monitored_tx,
new_tx = ?new_tx,
"The escalator broadcasted the same transaction twice. Re-adding to attempt re-escalating"
);
// return the old tx_hash and creation time so the transaction is re-escalated
// as soon as `monitored_txs` are evaluated again
Some((old_monitored_tx.hash, old_monitored_tx.creation_time))
} else {
tracing::error!(
err = err_message,
old_tx = ?old_monitored_tx,
new_tx = ?new_tx,
"Unexpected error when broadcasting gas-escalated transaction. Dropping it from escalator."
);
None
}
}

/// Broadcasts the new transaction with the escalated gas price
///
/// **Returns** a tx hash to monitor and the time it was created, unless the tx was already
/// included or an unknown error occurred
async fn broadcast_tx_if_escalated(
&self,
old_monitored_tx: MonitoredTransaction,
new_tx: TypedTransaction,
) -> Result<Option<(H256, Instant)>, GasEscalatorError<M>>
) -> Option<(H256, Instant)>
where
M: Middleware,
E: GasEscalator,
{
// gas price wasn't escalated
// keep monitoring the old tx
if old_monitored_tx.inner.eq(&new_tx) {
return Ok(Some((old_monitored_tx.hash, old_monitored_tx.creation_time)));
return Some((old_monitored_tx.hash, old_monitored_tx.creation_time));
}

// send a replacement tx with the escalated gas price
Expand All @@ -293,22 +342,9 @@ impl<M, E: Clone> EscalationTask<M, E> {
);
// Return the new tx hash to monitor and the time it was created.
// The latter is used to know when to escalate the gas price again
Ok(Some((new_tx_hash, Instant::now())))
}
Err(err) => {
if err.to_string().contains("nonce too low") {
// may happen if we try to broadcast a new, gas-escalated tx when the original tx
// already landed onchain, meaning we no longer need to escalate it
tracing::warn!(err = %err, ?old_monitored_tx.hash, ?new_tx, "Nonce error when escalating gas price. Tx may have already been included onchain. Dropping it from escalator");
Ok(None)
} else {
tracing::error!(
err = %err,
"Unexpected error. Killing escalator backend."
);
Err(GasEscalatorError::MiddlewareError(err))
}
Some((new_tx_hash, Instant::now()))
}
Err(err) => Self::handle_broadcast_error(err.to_string(), old_monitored_tx, new_tx),
}
}

Expand Down Expand Up @@ -347,7 +383,7 @@ impl<M, E: Clone> EscalationTask<M, E> {
};

let maybe_tx_to_monitor =
self.broadcast_tx_if_escalated(monitored_tx.clone(), new_tx.clone()).await?;
self.broadcast_tx_if_escalated(monitored_tx.clone(), new_tx.clone()).await;

if let Some((new_txhash, new_creation_time)) = maybe_tx_to_monitor {
new_txs_to_monitor.push(MonitoredTransaction {
Expand Down Expand Up @@ -384,7 +420,9 @@ impl<M, E: Clone> EscalationTask<M, E> {

let mut escalation_frequency_watcher = escalation_frequency_watcher.fuse();
while escalation_frequency_watcher.next().await.is_some() {
self.escalate_stuck_txs().await?;
if let Err(err) = self.escalate_stuck_txs().await {
tracing::error!(err = ?err, "error escalating stuck transactions");
}
}
tracing::error!("timing future has gone away");
Ok(())
Expand Down

0 comments on commit d22cecc

Please sign in to comment.