diff --git a/rust/agents/validator/src/submit.rs b/rust/agents/validator/src/submit.rs index a08aa15620..3faf8bb263 100644 --- a/rust/agents/validator/src/submit.rs +++ b/rust/agents/validator/src/submit.rs @@ -57,7 +57,7 @@ impl ValidatorSubmitter { } } - #[instrument(err, skip(self), fields(domain=%self.mailbox.domain()))] + #[instrument(err, skip(self, tree), fields(domain=%self.mailbox.domain()))] pub(crate) async fn checkpoint_submitter( self, mut tree: IncrementalMerkle, @@ -97,20 +97,37 @@ impl ValidatorSubmitter { // compare against every queued checkpoint to prevent ingesting past target if checkpoint == correctness_checkpoint { - debug!( - index = checkpoint.index, - "Reached tree consistency, signing queued checkpoints" - ); + debug!(index = checkpoint.index, "Reached tree consistency"); // drain and sign all checkpoints in the queue for queued_checkpoint in checkpoint_queue.drain(..) { + let existing = self + .checkpoint_syncer + .fetch_checkpoint(queued_checkpoint.index) + .await?; + if existing.is_some() { + debug!( + index = queued_checkpoint.index, + "Checkpoint already submitted" + ); + continue; + } + let signed_checkpoint = self.signer.sign(queued_checkpoint).await?; self.checkpoint_syncer .write_checkpoint(&signed_checkpoint) .await?; - info!(index = queued_checkpoint.index, "Signed checkpoint"); + debug!( + index = queued_checkpoint.index, + "Signed and submitted checkpoint" + ); + + // small sleep before signing next checkpoint to avoid rate limiting + sleep(Duration::from_millis(100)).await; } + info!(index = checkpoint.index, "Signed all queued checkpoints"); + self.metrics .latest_checkpoint_processed .set(checkpoint.index as i64);