Skip to content

Commit

Permalink
refactor: Add healthcheck on non-blocking broadcast loops
Browse files Browse the repository at this point in the history
  • Loading branch information
KirilMihaylov committed Apr 9, 2024
1 parent e264f0c commit 9aca908
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 67 deletions.
7 changes: 6 additions & 1 deletion alarms-dispatcher/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use semver::Version;
use thiserror::Error as ThisError;

use chain_comms::{
config::ReadFromEnvError, reexport::cosmrs::proto::prost::EncodeError,
config::ReadFromEnvError, interact::healthcheck,
reexport::cosmrs::proto::prost::EncodeError,
};

#[derive(Debug, ThisError)]
Expand Down Expand Up @@ -39,6 +40,10 @@ pub type AppResult<T> = Result<T, Application>;

#[derive(Debug, ThisError)]
pub enum DispatchAlarms {
#[error("Failed to construct healthcheck client! Cause: {0}")]
HealthcheckConstruct(#[from] healthcheck::error::Construct),
#[error("Healthcheck failed! Cause: {0}")]
Healthcheck(#[from] healthcheck::error::Error),
#[error("Failed to pre-encode commit message in the Protobuf format! Cause: {0}")]
PreEncodeCommitMessage(#[from] EncodeError),
#[error("Failed to serialize query message as JSON! Cause: {0}")]
Expand Down
13 changes: 12 additions & 1 deletion alarms-dispatcher/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ use tracing_appender::{
};
use tracing_subscriber::fmt::writer::MakeWriterExt as _;

use crate::generators::TasksConfig;
use broadcast::error::Error as BroadcastError;
use chain_comms::{
client::Client as NodeClient,
interact::query,
rpc_setup::{prepare_rpc, RpcSetup},
signing_key::DEFAULT_COSMOS_HD_PATH,
};

use crate::generators::TasksConfig;

use self::{
config::Config, error::AppResult, generators::Contract, messages::QueryMsg,
};
Expand Down Expand Up @@ -243,4 +245,13 @@ where
spawn_generators,
)
.await
.map_err(|error| match error {
BroadcastError::HealthcheckConstruct(error) => {
error::DispatchAlarms::HealthcheckConstruct(error)
},
BroadcastError::Healthcheck(error) => {
error::DispatchAlarms::Healthcheck(error)
},
BroadcastError::SpawnGenerators(error) => error,
})
}
14 changes: 14 additions & 0 deletions broadcast/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use chain_comms::interact::healthcheck;

#[derive(Debug, thiserror::Error)]
pub enum Error<E>
where
E: std::error::Error,
{
#[error("Constructing healthcheck client failed! Error: {0}")]
HealthcheckConstruct(#[from] healthcheck::error::Construct),
#[error("Healthcheck failed! Error: {0}")]
Healthcheck(#[from] healthcheck::error::Error),
#[error("Spawning generator tasks failed! Error: {0}")]
SpawnGenerators(E),
}
34 changes: 24 additions & 10 deletions broadcast/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use chain_comms::{
error::Error as GetTxResponseError, get_tx_response,
Response as TxResponse,
},
healthcheck::Healthcheck,
TxHash,
},
signer::Signer,
Expand All @@ -50,6 +51,7 @@ use self::{
mod broadcast;
mod cache;
pub mod config;
pub mod error;
pub mod generators;
pub mod log;
pub mod mode;
Expand All @@ -62,12 +64,13 @@ pub async fn broadcast<Impl, SpawnGeneratorsF, SpawnGeneratorsFuture, SpawnE>(
node_client: NodeClient,
node_config: NodeConfig,
spawn_generators: SpawnGeneratorsF,
) -> Result<(), SpawnE>
) -> Result<(), error::Error<SpawnE>>
where
Impl: mode::Impl,
SpawnGeneratorsF:
FnOnce(TxRequestSender<Impl>) -> SpawnGeneratorsFuture + Send,
SpawnGeneratorsFuture: Future<Output = Result<SpawnResult, SpawnE>> + Send,
SpawnE: std::error::Error,
{
let (tx_sender, tx_receiver): (
UnboundedSender<TxRequest<Impl>>,
Expand All @@ -77,7 +80,9 @@ where
let SpawnResult {
mut tx_generators_set,
tx_result_senders,
}: SpawnResult = spawn_generators(tx_sender).await?;
}: SpawnResult = spawn_generators(tx_sender)
.await
.map_err(error::Error::SpawnGenerators)?;

let mut signal = pin!(tokio::signal::ctrl_c());

Expand All @@ -98,7 +103,7 @@ where
true
};

select! {
let result = select! {
result = signal, if signal_installed => {
match result {
Ok(()) => {
Expand All @@ -108,21 +113,23 @@ where
error!(?error, "Error received from Ctrl+C signal handler! Stopping task! Error: {error}");
}
}

Ok(())
},
() = processing_loop(
result = processing_loop(
signer,
config,
node_client,
node_config,
tx_receiver,
&mut tx_generators_set,
tx_result_senders,
) => {}
}
) => result,
};

tx_generators_set.shutdown().await;

Ok(())
result
}

pub async fn poll_delivered_tx(
Expand Down Expand Up @@ -166,16 +173,18 @@ pub(crate) struct ApiAndConfiguration {

#[inline]
#[allow(clippy::future_not_send)]
async fn processing_loop<Impl>(
async fn processing_loop<Impl, E>(
signer: Signer,
config: Config,
node_client: NodeClient,
node_config: NodeConfig,
mut tx_receiver: UnboundedReceiver<TxRequest<Impl>>,
tx_generators_set: &mut JoinSet<Infallible>,
mut tx_result_senders: BTreeMap<usize, CommitResultSender>,
) where
) -> Result<(), error::Error<E>>
where
Impl: mode::Impl,
E: std::error::Error,
{
let mut last_signing_timestamp: Instant = Instant::now();

Expand All @@ -185,6 +194,9 @@ async fn processing_loop<Impl>(

let mut preprocessed_tx_request: Option<preprocess::TxRequest<Impl>> = None;

let mut healthcheck =
Healthcheck::new(node_client.tendermint_service_client()).await?;

let mut api_and_configuration = ApiAndConfiguration {
node_client,
node_config,
Expand All @@ -196,6 +208,8 @@ async fn processing_loop<Impl>(
let mut sequence_mismatch_streak_first_timestamp = None;

loop {
Impl::healthcheck(&mut healthcheck).await?;

try_join_generator_task(tx_generators_set).await;

if matches!(
Expand All @@ -205,7 +219,7 @@ async fn processing_loop<Impl>(
) {
info!("All generator threads stopped. Exiting.");

return;
return Ok(());
}

if preprocessed_tx_request.as_ref().map_or(
Expand Down
56 changes: 54 additions & 2 deletions broadcast/src/mode/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,16 @@ use tokio::time::Instant;
use tracing::{error, error_span, info, warn};

use chain_comms::{
client::Client as NodeClient, interact::commit,
reexport::tonic::Code as TonicStatusCode, signer::Signer,
client::Client as NodeClient,
interact::{
commit,
healthcheck::{
error::Error as HealthcheckError, Healthcheck,
WaitUntilHealthyStatusType,
},
},
reexport::tonic::Code as TonicStatusCode,
signer::Signer,
};

use crate::cache;
Expand All @@ -17,6 +25,46 @@ pub struct Blocking;
impl Impl for Blocking {
type Expiration = ();

async fn healthcheck(
healthcheck: &mut Healthcheck,
) -> Result<(), HealthcheckError> {
let mut counter: u8 = 0;

let mut last_status = WaitUntilHealthyStatusType::Syncing;

healthcheck
.wait_until_healthy(
move |status_type| {
if status_type != last_status {
counter = 0;

last_status = status_type;
}

if counter == 0 {
match status_type {
WaitUntilHealthyStatusType::Syncing => {
warn!(
"Connected node responded with syncing \
status."
);
},
WaitUntilHealthyStatusType::BlockNotIncremented => {
warn!(
"Connected node didn't respond with an \
incremented block height."
);
},
}
}

counter = (counter + 1) % 10;
},
move || info!("Connected node is healthy again."),
)
.await
}

#[inline]
fn purge_cache(cache: &mut cache::TxRequests<Self>) -> PurgeResult {
if cache.values_mut().any(|slot| slot.get_mut().is_some()) {
Expand Down Expand Up @@ -55,6 +103,10 @@ pub struct NonBlocking;
impl Impl for NonBlocking {
type Expiration = Instant;

async fn healthcheck(_: &mut Healthcheck) -> Result<(), HealthcheckError> {
Ok(())
}

#[inline]
fn purge_cache(cache: &mut cache::TxRequests<Self>) -> PurgeResult {
let mut exhausted: bool = true;
Expand Down
11 changes: 10 additions & 1 deletion broadcast/src/mode/sealed.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
use std::future::Future;

use chain_comms::{
client::Client as NodeClient, interact::commit, signer::Signer,
client::Client as NodeClient,
interact::{
commit,
healthcheck::{error::Error as HealthcheckError, Healthcheck},
},
signer::Signer,
};

use crate::cache;

pub trait Impl: Send + Sync + Sized {
type Expiration: Send + Sync + Sized;

fn healthcheck(
healthcheck: &mut Healthcheck,
) -> impl Future<Output = Result<(), HealthcheckError>> + Send;

fn purge_cache(cache: &mut cache::TxRequests<Self>) -> PurgeResult;

fn filter(expiration: &Self::Expiration) -> FilterResult;
Expand Down
17 changes: 12 additions & 5 deletions chain-comms/src/interact/healthcheck/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,27 +47,33 @@ impl Healthcheck {
})
}

pub async fn wait_until_healthy<F>(
pub async fn wait_until_healthy<NotHealthyF, HealthyF>(
&mut self,
mut f: F,
mut not_healthy: NotHealthyF,
healthy: HealthyF,
) -> Result<(), error::Error>
where
F: FnMut(WaitUntilHealthyStatusType) + Send,
NotHealthyF: FnMut(WaitUntilHealthyStatusType) + Send,
HealthyF: FnOnce() + Send,
{
while let Err(error) = self.check().await {
match error {
error::Error::Syncing(error::CheckSyncing::Syncing) => {
f(WaitUntilHealthyStatusType::Syncing);
not_healthy(WaitUntilHealthyStatusType::Syncing);
},
error::Error::BlockHeightNotIncremented => {
f(WaitUntilHealthyStatusType::BlockNotIncremented);
not_healthy(
WaitUntilHealthyStatusType::BlockNotIncremented,
);
},
_ => return Err(error),
}

tokio::time::sleep(Duration::from_secs(1)).await;
}

healthy();

Ok(())
}

Expand Down Expand Up @@ -97,6 +103,7 @@ impl Healthcheck {
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WaitUntilHealthyStatusType {
Syncing,
BlockNotIncremented,
Expand Down
5 changes: 5 additions & 0 deletions market-data-feeder/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{error::Error as StdError, sync::Arc};

use chain_comms::interact::healthcheck;
use semver::Version;
use thiserror::Error as ThisError;

Expand Down Expand Up @@ -35,6 +36,10 @@ pub(crate) enum Application {
InvalidProviderUrl(#[from] url::ParseError),
#[error("Failed to notify workers to proceed! All receivers closed!")]
NotifyAllChecksPassed,
#[error("Failed to construct healthcheck client! Cause: {0}")]
HealthcheckConstruct(#[from] healthcheck::error::Construct),
#[error("Healthcheck failed! Cause: {0}")]
Healthcheck(#[from] healthcheck::error::Error),
#[error("Failed to commit price feeding transaction! Cause: {0}")]
CommitTx(
#[from] chain_comms::interact::commit::error::GasEstimatingTxCommit,
Expand Down
11 changes: 10 additions & 1 deletion market-data-feeder/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tracing_appender::{
};
use tracing_subscriber::fmt::writer::MakeWriterExt as _;

use broadcast::broadcast;
use broadcast::{broadcast, error::Error as BroadcastError};
use chain_comms::{
client::Client as NodeClient,
interact::query,
Expand Down Expand Up @@ -136,6 +136,15 @@ async fn app_main() -> Result<()> {
spawn_generators_f,
)
.await
.map_err(|error| match error {
BroadcastError::HealthcheckConstruct(error) => {
error::Application::HealthcheckConstruct(error)
},
BroadcastError::Healthcheck(error) => {
error::Application::Healthcheck(error)
},
BroadcastError::SpawnGenerators(error) => error,
})
}

#[allow(clippy::future_not_send)]
Expand Down
Loading

0 comments on commit 9aca908

Please sign in to comment.