Skip to content

Commit

Permalink
Revert "feat: middleware submission tweaks (#23)"
Browse files Browse the repository at this point in the history
This reverts commit f18abe3.
  • Loading branch information
daniel-savu committed Jan 9, 2025
1 parent f18abe3 commit 869bb69
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 78 deletions.
96 changes: 44 additions & 52 deletions ethers-middleware/src/gas_escalator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub(crate) struct GasEscalatorMiddlewareInternal<M> {

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MonitoredTransaction {
hash: Option<TxHash>,
hash: TxHash,
inner: TypedTransaction,
creation_time: Instant,
block: Option<BlockId>,
Expand Down Expand Up @@ -205,30 +205,23 @@ where
) -> Result<PendingTransaction<'_, M::Provider>, GasEscalatorError<M>> {
let tx = tx.into();

match self.inner.send_transaction(tx.clone(), block).await {
Ok(pending_tx) => {
tracing::debug!(tx = ?tx, tx_hash = ?pending_tx.tx_hash(), "Sent tx, adding to gas escalator watcher");
let mut lock = self.txs.lock().await;
lock.push(MonitoredTransaction {
hash: Some(*pending_tx),
inner: tx,
creation_time: Instant::now(),
block,
});
Ok(pending_tx)
}
Err(err) => {
tracing::warn!(tx = ?tx, "Failed to send tx, adding to gas escalator watcher regardless");
let mut lock = self.txs.lock().await;
lock.push(MonitoredTransaction {
hash: None,
inner: tx,
creation_time: Instant::now(),
block: None,
});
Err(GasEscalatorError::MiddlewareError(err))
}
}
let pending_tx = self
.inner
.send_transaction(tx.clone(), block)
.await
.map_err(GasEscalatorError::MiddlewareError)?;

tracing::debug!(tx = ?tx, tx_hash = ?pending_tx.tx_hash(), "Sent tx, adding to gas escalator watcher");
// insert the tx in the pending txs
let mut lock = self.txs.lock().await;
lock.push(MonitoredTransaction {
hash: *pending_tx,
inner: tx,
creation_time: Instant::now(),
block,
});

Ok(pending_tx)
}
}

Expand Down Expand Up @@ -286,11 +279,11 @@ impl<M, E: Clone> EscalationTask<M, E> {
err_message: String,
old_monitored_tx: MonitoredTransaction,
new_tx: TypedTransaction,
) -> Option<(Option<H256>, Instant)> {
) -> Option<(H256, Instant)> {
if err_message.contains("nonce too low") {
// may happen if we try to broadcast a new, gas-escalated tx when the original tx
// already landed onchain, meaning we no longer need to escalate it
tracing::warn!(err = err_message, ?old_monitored_tx, ?new_tx, "Nonce error when escalating gas price. Tx may have already been included onchain. Dropping it from escalator");
tracing::warn!(err = err_message, ?old_monitored_tx.hash, ?new_tx, "Nonce error when escalating gas price. Tx may have already been included onchain. Dropping it from escalator");
None
} else if RETRYABLE_ERRORS.iter().any(|err_msg| err_message.contains(err_msg)) {
// if the error is one of the known retryable errors, we can keep trying to escalate
Expand Down Expand Up @@ -318,15 +311,21 @@ impl<M, E: Clone> EscalationTask<M, E> {
///
/// **Returns** a tx hash to monitor and the time it was created, unless the tx was already
/// included or an unknown error occurred
async fn broadcast_tx(
async fn broadcast_tx_if_escalated(
&self,
old_monitored_tx: MonitoredTransaction,
new_tx: TypedTransaction,
) -> Option<(Option<H256>, Instant)>
) -> Option<(H256, Instant)>
where
M: Middleware,
E: GasEscalator,
{
// gas price wasn't escalated
// keep monitoring the old tx
if old_monitored_tx.inner.eq(&new_tx) {
return Some((old_monitored_tx.hash, old_monitored_tx.creation_time));
}

// send a replacement tx with the escalated gas price
match self.inner.send_transaction(new_tx.clone(), old_monitored_tx.block).await {
Ok(new_tx_hash) => {
Expand All @@ -339,7 +338,7 @@ impl<M, E: Clone> EscalationTask<M, E> {
);
// Return the new tx hash to monitor and the time it was created.
// The latter is used to know when to escalate the gas price again
Some((Some(new_tx_hash), Instant::now()))
Some((new_tx_hash, Instant::now()))
}
Err(err) => Self::handle_broadcast_error(err.to_string(), old_monitored_tx, new_tx),
}
Expand All @@ -361,41 +360,34 @@ impl<M, E: Clone> EscalationTask<M, E> {
tracing::trace!(?monitored_txs, "In the escalator watcher loop. Monitoring txs");
}
let mut new_txs_to_monitor = vec![];
for old_monitored_tx in monitored_txs {
let receipt = if let Some(tx_hash) = old_monitored_tx.hash {
tracing::trace!(tx_hash = ?old_monitored_tx.hash, "checking if exists");
self.inner
.get_transaction_receipt(tx_hash)
.await
.map_err(GasEscalatorError::MiddlewareError)?
} else {
None
};
for monitored_tx in monitored_txs {
let receipt = self
.inner
.get_transaction_receipt(monitored_tx.hash)
.await
.map_err(GasEscalatorError::MiddlewareError)?;

tracing::trace!(tx_hash = ?monitored_tx.hash, "checking if exists");

if let Some(receipt) = receipt {
if receipt.is_some() {
// tx was already included, can drop from escalator
tracing::debug!(tx = ?receipt.transaction_hash, "Transaction was included onchain, dropping from escalator");
tracing::debug!(tx = ?monitored_tx.hash, "Transaction was included onchain, dropping from escalator");
continue;
}
let Some(new_tx) = old_monitored_tx.escalate_gas_price(self.escalator.clone()) else {
tracing::error!(tx=?old_monitored_tx.hash, "gas price is not set for transaction, dropping from escalator");
let Some(new_tx) = monitored_tx.escalate_gas_price(self.escalator.clone()) else {
tracing::error!(tx=?monitored_tx.hash, "gas price is not set for transaction, dropping from escalator");
continue;
};

// gas price wasn't escalated
// keep monitoring the old tx
let maybe_tx_to_monitor = if old_monitored_tx.inner.eq(&new_tx) {
Some((old_monitored_tx.hash, old_monitored_tx.creation_time))
} else {
self.broadcast_tx(old_monitored_tx.clone(), new_tx.clone()).await
};
let maybe_tx_to_monitor =
self.broadcast_tx_if_escalated(monitored_tx.clone(), new_tx.clone()).await;

if let Some((new_txhash, new_creation_time)) = maybe_tx_to_monitor {
new_txs_to_monitor.push(MonitoredTransaction {
hash: new_txhash,
inner: new_tx,
creation_time: new_creation_time,
block: old_monitored_tx.block,
block: monitored_tx.block,
});
}
}
Expand Down
26 changes: 0 additions & 26 deletions ethers-middleware/src/nonce_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use thiserror::Error;
use tracing::instrument;

const DEFAULT_TX_COUNT_FOR_RESYNC: u64 = 10;

#[derive(Debug)]
/// Middleware used for calculating nonces locally, useful for signing multiple
/// consecutive transactions without waiting for them to hit the mempool
Expand All @@ -15,8 +13,6 @@ pub struct NonceManagerMiddleware<M> {
init_guard: futures_locks::Mutex<()>,
initialized: AtomicBool,
nonce: AtomicU64,
tx_count_for_resync: Option<AtomicU64>,
txs_since_resync: AtomicU64,
address: Address,
}

Expand All @@ -32,8 +28,6 @@ where
init_guard: Default::default(),
initialized: Default::default(),
nonce: Default::default(),
tx_count_for_resync: Default::default(),
txs_since_resync: 0u64.into(),
address,
}
}
Expand All @@ -44,13 +38,6 @@ where
nonce.into()
}

pub fn get_tx_count_for_resync(&self) -> u64 {
self.tx_count_for_resync
.as_ref()
.map(|count| count.load(Ordering::SeqCst))
.unwrap_or(DEFAULT_TX_COUNT_FOR_RESYNC)
}

pub async fn initialize_nonce(
&self,
block: Option<BlockId>,
Expand Down Expand Up @@ -155,20 +142,7 @@ where
tracing::debug!(?nonce, "Sending transaction");
match self.inner.send_transaction(tx.clone(), block).await {
Ok(pending_tx) => {
let txs_since_resync = self.txs_since_resync.load(Ordering::SeqCst);
let new_txs_since_resync = txs_since_resync + 1;
tracing::debug!(?nonce, "Sent transaction");
let tx_count_for_resync = self.get_tx_count_for_resync();
if new_txs_since_resync >= tx_count_for_resync {
let onchain_nonce = self.get_transaction_count(self.address, block).await?;
self.nonce.store(onchain_nonce.as_u64(), Ordering::SeqCst);
self.txs_since_resync.store(0, Ordering::SeqCst);
tracing::debug!(?nonce, "Resynced internal nonce with onchain nonce");
} else {
self.txs_since_resync.store(new_txs_since_resync, Ordering::SeqCst);
let txs_until_resync = tx_count_for_resync - new_txs_since_resync;
tracing::debug!(?txs_until_resync, "Transactions until nonce resync");
}
Ok(pending_tx)
}
Err(err) => {
Expand Down

0 comments on commit 869bb69

Please sign in to comment.