Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "feat: middleware submission tweaks (#23)" #29

Merged
merged 1 commit into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 44 additions & 52 deletions ethers-middleware/src/gas_escalator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub(crate) struct GasEscalatorMiddlewareInternal<M> {

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MonitoredTransaction {
hash: Option<TxHash>,
hash: TxHash,
inner: TypedTransaction,
creation_time: Instant,
block: Option<BlockId>,
Expand Down Expand Up @@ -205,30 +205,23 @@ where
) -> Result<PendingTransaction<'_, M::Provider>, GasEscalatorError<M>> {
let tx = tx.into();

match self.inner.send_transaction(tx.clone(), block).await {
Ok(pending_tx) => {
tracing::debug!(tx = ?tx, tx_hash = ?pending_tx.tx_hash(), "Sent tx, adding to gas escalator watcher");
let mut lock = self.txs.lock().await;
lock.push(MonitoredTransaction {
hash: Some(*pending_tx),
inner: tx,
creation_time: Instant::now(),
block,
});
Ok(pending_tx)
}
Err(err) => {
tracing::warn!(tx = ?tx, "Failed to send tx, adding to gas escalator watcher regardless");
let mut lock = self.txs.lock().await;
lock.push(MonitoredTransaction {
hash: None,
inner: tx,
creation_time: Instant::now(),
block: None,
});
Err(GasEscalatorError::MiddlewareError(err))
}
}
let pending_tx = self
.inner
.send_transaction(tx.clone(), block)
.await
.map_err(GasEscalatorError::MiddlewareError)?;

tracing::debug!(tx = ?tx, tx_hash = ?pending_tx.tx_hash(), "Sent tx, adding to gas escalator watcher");
// insert the tx in the pending txs
let mut lock = self.txs.lock().await;
lock.push(MonitoredTransaction {
hash: *pending_tx,
inner: tx,
creation_time: Instant::now(),
block,
});

Ok(pending_tx)
}
}

Expand Down Expand Up @@ -286,11 +279,11 @@ impl<M, E: Clone> EscalationTask<M, E> {
err_message: String,
old_monitored_tx: MonitoredTransaction,
new_tx: TypedTransaction,
) -> Option<(Option<H256>, Instant)> {
) -> Option<(H256, Instant)> {
if err_message.contains("nonce too low") {
// may happen if we try to broadcast a new, gas-escalated tx when the original tx
// already landed onchain, meaning we no longer need to escalate it
tracing::warn!(err = err_message, ?old_monitored_tx, ?new_tx, "Nonce error when escalating gas price. Tx may have already been included onchain. Dropping it from escalator");
tracing::warn!(err = err_message, ?old_monitored_tx.hash, ?new_tx, "Nonce error when escalating gas price. Tx may have already been included onchain. Dropping it from escalator");
None
} else if RETRYABLE_ERRORS.iter().any(|err_msg| err_message.contains(err_msg)) {
// if the error is one of the known retryable errors, we can keep trying to escalate
Expand Down Expand Up @@ -318,15 +311,21 @@ impl<M, E: Clone> EscalationTask<M, E> {
///
/// **Returns** a tx hash to monitor and the time it was created, unless the tx was already
/// included or an unknown error occurred
async fn broadcast_tx(
async fn broadcast_tx_if_escalated(
&self,
old_monitored_tx: MonitoredTransaction,
new_tx: TypedTransaction,
) -> Option<(Option<H256>, Instant)>
) -> Option<(H256, Instant)>
where
M: Middleware,
E: GasEscalator,
{
// gas price wasn't escalated
// keep monitoring the old tx
if old_monitored_tx.inner.eq(&new_tx) {
return Some((old_monitored_tx.hash, old_monitored_tx.creation_time));
}

// send a replacement tx with the escalated gas price
match self.inner.send_transaction(new_tx.clone(), old_monitored_tx.block).await {
Ok(new_tx_hash) => {
Expand All @@ -339,7 +338,7 @@ impl<M, E: Clone> EscalationTask<M, E> {
);
// Return the new tx hash to monitor and the time it was created.
// The latter is used to know when to escalate the gas price again
Some((Some(new_tx_hash), Instant::now()))
Some((new_tx_hash, Instant::now()))
}
Err(err) => Self::handle_broadcast_error(err.to_string(), old_monitored_tx, new_tx),
}
Expand All @@ -361,41 +360,34 @@ impl<M, E: Clone> EscalationTask<M, E> {
tracing::trace!(?monitored_txs, "In the escalator watcher loop. Monitoring txs");
}
let mut new_txs_to_monitor = vec![];
for old_monitored_tx in monitored_txs {
let receipt = if let Some(tx_hash) = old_monitored_tx.hash {
tracing::trace!(tx_hash = ?old_monitored_tx.hash, "checking if exists");
self.inner
.get_transaction_receipt(tx_hash)
.await
.map_err(GasEscalatorError::MiddlewareError)?
} else {
None
};
for monitored_tx in monitored_txs {
let receipt = self
.inner
.get_transaction_receipt(monitored_tx.hash)
.await
.map_err(GasEscalatorError::MiddlewareError)?;

tracing::trace!(tx_hash = ?monitored_tx.hash, "checking if exists");

if let Some(receipt) = receipt {
if receipt.is_some() {
// tx was already included, can drop from escalator
tracing::debug!(tx = ?receipt.transaction_hash, "Transaction was included onchain, dropping from escalator");
tracing::debug!(tx = ?monitored_tx.hash, "Transaction was included onchain, dropping from escalator");
continue;
}
let Some(new_tx) = old_monitored_tx.escalate_gas_price(self.escalator.clone()) else {
tracing::error!(tx=?old_monitored_tx.hash, "gas price is not set for transaction, dropping from escalator");
let Some(new_tx) = monitored_tx.escalate_gas_price(self.escalator.clone()) else {
tracing::error!(tx=?monitored_tx.hash, "gas price is not set for transaction, dropping from escalator");
continue;
};

// gas price wasn't escalated
// keep monitoring the old tx
let maybe_tx_to_monitor = if old_monitored_tx.inner.eq(&new_tx) {
Some((old_monitored_tx.hash, old_monitored_tx.creation_time))
} else {
self.broadcast_tx(old_monitored_tx.clone(), new_tx.clone()).await
};
let maybe_tx_to_monitor =
self.broadcast_tx_if_escalated(monitored_tx.clone(), new_tx.clone()).await;

if let Some((new_txhash, new_creation_time)) = maybe_tx_to_monitor {
new_txs_to_monitor.push(MonitoredTransaction {
hash: new_txhash,
inner: new_tx,
creation_time: new_creation_time,
block: old_monitored_tx.block,
block: monitored_tx.block,
});
}
}
Expand Down
26 changes: 0 additions & 26 deletions ethers-middleware/src/nonce_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use thiserror::Error;
use tracing::instrument;

const DEFAULT_TX_COUNT_FOR_RESYNC: u64 = 10;

#[derive(Debug)]
/// Middleware used for calculating nonces locally, useful for signing multiple
/// consecutive transactions without waiting for them to hit the mempool
Expand All @@ -15,8 +13,6 @@ pub struct NonceManagerMiddleware<M> {
init_guard: futures_locks::Mutex<()>,
initialized: AtomicBool,
nonce: AtomicU64,
tx_count_for_resync: Option<AtomicU64>,
txs_since_resync: AtomicU64,
address: Address,
}

Expand All @@ -32,8 +28,6 @@ where
init_guard: Default::default(),
initialized: Default::default(),
nonce: Default::default(),
tx_count_for_resync: Default::default(),
txs_since_resync: 0u64.into(),
address,
}
}
Expand All @@ -44,13 +38,6 @@ where
nonce.into()
}

pub fn get_tx_count_for_resync(&self) -> u64 {
self.tx_count_for_resync
.as_ref()
.map(|count| count.load(Ordering::SeqCst))
.unwrap_or(DEFAULT_TX_COUNT_FOR_RESYNC)
}

pub async fn initialize_nonce(
&self,
block: Option<BlockId>,
Expand Down Expand Up @@ -155,20 +142,7 @@ where
tracing::debug!(?nonce, "Sending transaction");
match self.inner.send_transaction(tx.clone(), block).await {
Ok(pending_tx) => {
let txs_since_resync = self.txs_since_resync.load(Ordering::SeqCst);
let new_txs_since_resync = txs_since_resync + 1;
tracing::debug!(?nonce, "Sent transaction");
let tx_count_for_resync = self.get_tx_count_for_resync();
if new_txs_since_resync >= tx_count_for_resync {
let onchain_nonce = self.get_transaction_count(self.address, block).await?;
self.nonce.store(onchain_nonce.as_u64(), Ordering::SeqCst);
self.txs_since_resync.store(0, Ordering::SeqCst);
tracing::debug!(?nonce, "Resynced internal nonce with onchain nonce");
} else {
self.txs_since_resync.store(new_txs_since_resync, Ordering::SeqCst);
let txs_until_resync = tx_count_for_resync - new_txs_since_resync;
tracing::debug!(?txs_until_resync, "Transactions until nonce resync");
}
Ok(pending_tx)
}
Err(err) => {
Expand Down
Loading