diff --git a/ethers-middleware/src/gas_escalator/mod.rs b/ethers-middleware/src/gas_escalator/mod.rs index 9ca6d3531..0bf23744c 100644 --- a/ethers-middleware/src/gas_escalator/mod.rs +++ b/ethers-middleware/src/gas_escalator/mod.rs @@ -61,7 +61,7 @@ pub(crate) struct GasEscalatorMiddlewareInternal { #[derive(Debug, Clone, PartialEq, Eq)] pub struct MonitoredTransaction { - hash: Option, + hash: TxHash, inner: TypedTransaction, creation_time: Instant, block: Option, @@ -205,30 +205,23 @@ where ) -> Result, GasEscalatorError> { let tx = tx.into(); - match self.inner.send_transaction(tx.clone(), block).await { - Ok(pending_tx) => { - tracing::debug!(tx = ?tx, tx_hash = ?pending_tx.tx_hash(), "Sent tx, adding to gas escalator watcher"); - let mut lock = self.txs.lock().await; - lock.push(MonitoredTransaction { - hash: Some(*pending_tx), - inner: tx, - creation_time: Instant::now(), - block, - }); - Ok(pending_tx) - } - Err(err) => { - tracing::warn!(tx = ?tx, "Failed to send tx, adding to gas escalator watcher regardless"); - let mut lock = self.txs.lock().await; - lock.push(MonitoredTransaction { - hash: None, - inner: tx, - creation_time: Instant::now(), - block: None, - }); - Err(GasEscalatorError::MiddlewareError(err)) - } - } + let pending_tx = self + .inner + .send_transaction(tx.clone(), block) + .await + .map_err(GasEscalatorError::MiddlewareError)?; + + tracing::debug!(tx = ?tx, tx_hash = ?pending_tx.tx_hash(), "Sent tx, adding to gas escalator watcher"); + // insert the tx in the pending txs + let mut lock = self.txs.lock().await; + lock.push(MonitoredTransaction { + hash: *pending_tx, + inner: tx, + creation_time: Instant::now(), + block, + }); + + Ok(pending_tx) } } @@ -286,11 +279,11 @@ impl EscalationTask { err_message: String, old_monitored_tx: MonitoredTransaction, new_tx: TypedTransaction, - ) -> Option<(Option, Instant)> { + ) -> Option<(H256, Instant)> { if err_message.contains("nonce too low") { // may happen if we try to broadcast a new, gas-escalated tx when the original tx // already landed onchain, meaning we no longer need to escalate it - tracing::warn!(err = err_message, ?old_monitored_tx, ?new_tx, "Nonce error when escalating gas price. Tx may have already been included onchain. Dropping it from escalator"); + tracing::warn!(err = err_message, ?old_monitored_tx.hash, ?new_tx, "Nonce error when escalating gas price. Tx may have already been included onchain. Dropping it from escalator"); None } else if RETRYABLE_ERRORS.iter().any(|err_msg| err_message.contains(err_msg)) { // if the error is one of the known retryable errors, we can keep trying to escalate @@ -318,15 +311,21 @@ impl EscalationTask { /// /// **Returns** a tx hash to monitor and the time it was created, unless the tx was already /// included or an unknown error occurred - async fn broadcast_tx( + async fn broadcast_tx_if_escalated( &self, old_monitored_tx: MonitoredTransaction, new_tx: TypedTransaction, - ) -> Option<(Option, Instant)> + ) -> Option<(H256, Instant)> where M: Middleware, E: GasEscalator, { + // gas price wasn't escalated + // keep monitoring the old tx + if old_monitored_tx.inner.eq(&new_tx) { + return Some((old_monitored_tx.hash, old_monitored_tx.creation_time)); + } + // send a replacement tx with the escalated gas price match self.inner.send_transaction(new_tx.clone(), old_monitored_tx.block).await { Ok(new_tx_hash) => { @@ -339,7 +338,7 @@ impl EscalationTask { ); // Return the new tx hash to monitor and the time it was created. // The latter is used to know when to escalate the gas price again - Some((Some(new_tx_hash), Instant::now())) + Some((new_tx_hash, Instant::now())) } Err(err) => Self::handle_broadcast_error(err.to_string(), old_monitored_tx, new_tx), } @@ -361,41 +360,34 @@ impl EscalationTask { tracing::trace!(?monitored_txs, "In the escalator watcher loop. Monitoring txs"); } let mut new_txs_to_monitor = vec![]; - for old_monitored_tx in monitored_txs { - let receipt = if let Some(tx_hash) = old_monitored_tx.hash { - tracing::trace!(tx_hash = ?old_monitored_tx.hash, "checking if exists"); - self.inner - .get_transaction_receipt(tx_hash) - .await - .map_err(GasEscalatorError::MiddlewareError)? - } else { - None - }; + for monitored_tx in monitored_txs { + let receipt = self + .inner + .get_transaction_receipt(monitored_tx.hash) + .await + .map_err(GasEscalatorError::MiddlewareError)?; + + tracing::trace!(tx_hash = ?monitored_tx.hash, "checking if exists"); - if let Some(receipt) = receipt { + if receipt.is_some() { // tx was already included, can drop from escalator - tracing::debug!(tx = ?receipt.transaction_hash, "Transaction was included onchain, dropping from escalator"); + tracing::debug!(tx = ?monitored_tx.hash, "Transaction was included onchain, dropping from escalator"); continue; } - let Some(new_tx) = old_monitored_tx.escalate_gas_price(self.escalator.clone()) else { - tracing::error!(tx=?old_monitored_tx.hash, "gas price is not set for transaction, dropping from escalator"); + let Some(new_tx) = monitored_tx.escalate_gas_price(self.escalator.clone()) else { + tracing::error!(tx=?monitored_tx.hash, "gas price is not set for transaction, dropping from escalator"); continue; }; - // gas price wasn't escalated - // keep monitoring the old tx - let maybe_tx_to_monitor = if old_monitored_tx.inner.eq(&new_tx) { - Some((old_monitored_tx.hash, old_monitored_tx.creation_time)) - } else { - self.broadcast_tx(old_monitored_tx.clone(), new_tx.clone()).await - }; + let maybe_tx_to_monitor = + self.broadcast_tx_if_escalated(monitored_tx.clone(), new_tx.clone()).await; if let Some((new_txhash, new_creation_time)) = maybe_tx_to_monitor { new_txs_to_monitor.push(MonitoredTransaction { hash: new_txhash, inner: new_tx, creation_time: new_creation_time, - block: old_monitored_tx.block, + block: monitored_tx.block, }); } } diff --git a/ethers-middleware/src/nonce_manager.rs b/ethers-middleware/src/nonce_manager.rs index aed41490f..7905c7b78 100644 --- a/ethers-middleware/src/nonce_manager.rs +++ b/ethers-middleware/src/nonce_manager.rs @@ -5,8 +5,6 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use thiserror::Error; use tracing::instrument; -const DEFAULT_TX_COUNT_FOR_RESYNC: u64 = 10; - #[derive(Debug)] /// Middleware used for calculating nonces locally, useful for signing multiple /// consecutive transactions without waiting for them to hit the mempool @@ -15,8 +13,6 @@ pub struct NonceManagerMiddleware { init_guard: futures_locks::Mutex<()>, initialized: AtomicBool, nonce: AtomicU64, - tx_count_for_resync: Option, - txs_since_resync: AtomicU64, address: Address, } @@ -32,8 +28,6 @@ where init_guard: Default::default(), initialized: Default::default(), nonce: Default::default(), - tx_count_for_resync: Default::default(), - txs_since_resync: 0u64.into(), address, } } @@ -44,13 +38,6 @@ where nonce.into() } - pub fn get_tx_count_for_resync(&self) -> u64 { - self.tx_count_for_resync - .as_ref() - .map(|count| count.load(Ordering::SeqCst)) - .unwrap_or(DEFAULT_TX_COUNT_FOR_RESYNC) - } - pub async fn initialize_nonce( &self, block: Option, @@ -155,20 +142,7 @@ where tracing::debug!(?nonce, "Sending transaction"); match self.inner.send_transaction(tx.clone(), block).await { Ok(pending_tx) => { - let txs_since_resync = self.txs_since_resync.load(Ordering::SeqCst); - let new_txs_since_resync = txs_since_resync + 1; tracing::debug!(?nonce, "Sent transaction"); - let tx_count_for_resync = self.get_tx_count_for_resync(); - if new_txs_since_resync >= tx_count_for_resync { - let onchain_nonce = self.get_transaction_count(self.address, block).await?; - self.nonce.store(onchain_nonce.as_u64(), Ordering::SeqCst); - self.txs_since_resync.store(0, Ordering::SeqCst); - tracing::debug!(?nonce, "Resynced internal nonce with onchain nonce"); - } else { - self.txs_since_resync.store(new_txs_since_resync, Ordering::SeqCst); - let txs_until_resync = tx_count_for_resync - new_txs_since_resync; - tracing::debug!(?txs_until_resync, "Transactions until nonce resync"); - } Ok(pending_tx) } Err(err) => {