Skip to content

Commit

Permalink
refactor: Removed recovery mode
Browse files Browse the repository at this point in the history
  • Loading branch information
KirilMihaylov committed Dec 14, 2023
1 parent abfe9bd commit ff81b44
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 116 deletions.
2 changes: 0 additions & 2 deletions market-data-feeder/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,4 @@ pub(crate) enum Worker {
PriceComparisonGuard(#[from] PriceComparisonGuardError),
#[error("Failed to serialize price feed message as JSON! Cause: {0}")]
SerializeExecuteMessage(#[from] serde_json_wasm::ser::Error),
#[error("Recovery mode state watch closed!")]
RecoveryModeWatchClosed,
}
88 changes: 7 additions & 81 deletions market-data-feeder/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,11 @@ use std::{
use futures::future::poll_fn;
use serde::Deserialize;
use tokio::{
sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
watch,
},
sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
task::JoinSet,
time::{sleep, timeout, Instant},
time::{timeout, Instant},
};
use tracing::{error, info, info_span, warn};
use tracing::{error, info, warn};
use tracing_appender::{
non_blocking::{self, NonBlocking},
rolling,
Expand All @@ -32,7 +29,6 @@ use chain_comms::{
log,
reexport::tonic::transport::Channel as TonicChannel,
rpc_setup::{prepare_rpc, RpcSetup},
signer::Signer,
signing_key::DEFAULT_COSMOS_HD_PATH,
};
use semver::{
Expand Down Expand Up @@ -63,7 +59,6 @@ const COMPATIBLE_VERSION: SemVerComparator = SemVerComparator {
};

type UnboundedChannel<T> = (UnboundedSender<T>, UnboundedReceiver<T>);
type WatchChannel<T> = (watch::Sender<T>, watch::Receiver<T>);

#[tokio::main]
async fn main() -> Result<()> {
Expand Down Expand Up @@ -102,8 +97,6 @@ async fn app_main() -> Result<()> {

let tick_time: Duration = Duration::from_secs(config.tick_time);

let (recovery_mode_sender, recovery_mode_receiver): WatchChannel<bool> = watch::channel(false);

let workers::SpawnWorkersReturn {
set: mut price_fetchers_set,
receivers,
Expand All @@ -113,7 +106,6 @@ async fn app_main() -> Result<()> {
config.providers,
config.comparison_providers,
tick_time,
recovery_mode_receiver,
)
.await?;

Expand Down Expand Up @@ -203,8 +195,12 @@ async fn app_main() -> Result<()> {

while let Some(tx) = if is_retry {
if tx_time.elapsed() < tick_time {
info!("Retrying transaction...");

tx.take()
} else {
info!("Transaction data timed out.");

None
}
} else {
Expand Down Expand Up @@ -243,12 +239,6 @@ async fn app_main() -> Result<()> {

if successful {
continue 'outer_loop;
} else if signer.needs_update()
&& recovery_loop(&mut signer, &recovery_mode_sender, &nolus_node)
.await
.is_error()
{
break 'outer_loop;
}
}
}
Expand Down Expand Up @@ -307,55 +297,6 @@ async fn price_feeder(
}
}

enum RecoveryStatus {
Success,
Error,
}

impl RecoveryStatus {
const fn is_error(&self) -> bool {
matches!(self, Self::Error)
}
}

async fn recovery_loop(
signer: &mut Signer,
recovery_mode_sender: &watch::Sender<bool>,
client: &NodeClient,
) -> RecoveryStatus {
let set_in_recovery = info_span!("recover-after-error").in_scope(|| {
info!("After-error recovery needed!");

|in_recovery: bool| {
let is_error: bool = recovery_mode_sender.send(in_recovery).is_err();

if is_error {
error!("Recovery mode state watch closed! Exiting broadcasting loop...");
}

is_error
}
});

let recovered: RecoveryStatus = recover_after_error(signer, client).await;

if recovered.is_error() {
if set_in_recovery(true) {
return RecoveryStatus::Error;
}

while recover_after_error(signer, client).await.is_error() {
sleep(Duration::from_secs(15)).await;
}

if set_in_recovery(false) {
return RecoveryStatus::Error;
}
}

RecoveryStatus::Success
}

async fn check_compatibility(config: &Config, client: &NodeClient) -> Result<()> {
#[derive(Deserialize)]
struct JsonVersion {
Expand Down Expand Up @@ -401,18 +342,3 @@ async fn check_compatibility(config: &Config, client: &NodeClient) -> Result<()>

Ok(())
}

#[must_use]
async fn recover_after_error(signer: &mut Signer, client: &NodeClient) -> RecoveryStatus {
if let Err(error) = signer.update_account(client).await {
error!("{error}");

return RecoveryStatus::Error;
}

info!("Successfully updated local copy of account data.");

info!("Continuing normal workflow...");

RecoveryStatus::Success
}
39 changes: 6 additions & 33 deletions market-data-feeder/src/workers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,7 @@ use std::{collections::BTreeMap, sync::Arc, time::Duration};

use tokio::{
runtime::Handle,
select,
sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
watch,
},
sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
task::{block_in_place, JoinSet},
time::{interval, sleep, Instant, Interval},
};
Expand Down Expand Up @@ -60,7 +56,6 @@ pub(crate) async fn spawn(
providers: BTreeMap<Arc<str>, ProviderWithComparisonConfig>,
price_comparison_providers: BTreeMap<Arc<str>, ComparisonProviderConfig>,
tick_time: Duration,
recovery_mode: watch::Receiver<bool>,
) -> AppResult<SpawnWorkersReturn> {
let mut set: JoinSet<Result<(), error::Worker>> = JoinSet::new();

Expand Down Expand Up @@ -107,7 +102,6 @@ pub(crate) async fn spawn(
price_comparison_providers,
&mut set,
tick_time,
recovery_mode,
nolus_node,
senders,
))
Expand Down Expand Up @@ -143,7 +137,6 @@ fn try_for_each_provider_f(
price_comparison_providers: BTreeMap<Arc<str>, Arc<dyn ComparisonProvider>>,
set: &mut JoinSet<Result<(), error::Worker>>,
tick_time: Duration,
recovery_mode: watch::Receiver<bool>,
nolus_node: NodeClient,
price_data_senders: PriceDataSenders,
) -> impl FnMut((usize, (Arc<str>, ProviderWithComparisonConfig))) -> AppResult<()> + '_ {
Expand Down Expand Up @@ -185,7 +178,6 @@ fn try_for_each_provider_f(
set,
monotonic_id,
tick_time,
recovery_mode: &recovery_mode,
price_comparison_provider,
},
provider_id: &id,
Expand All @@ -206,7 +198,6 @@ struct TaskSpawnerConfig<'r> {
set: &'r mut JoinSet<Result<(), error::Worker>>,
monotonic_id: usize,
tick_time: Duration,
recovery_mode: &'r watch::Receiver<bool>,
price_comparison_provider: Option<(&'r Arc<dyn ComparisonProvider>, u64)>,
}

Expand Down Expand Up @@ -273,11 +264,8 @@ impl<'r> ProviderVisitor for TaskSpawningProviderVisitor<'r> {
}),
self.time_before_feeding,
self.sender.clone(),
(
self.worker_task_spawner_config.monotonic_id,
self.worker_task_spawner_config.tick_time,
),
self.worker_task_spawner_config.recovery_mode.clone(),
self.worker_task_spawner_config.monotonic_id,
self.worker_task_spawner_config.tick_time,
));

Ok(())
Expand All @@ -295,8 +283,8 @@ async fn perform_check_and_enter_loop<P>(
comparison_provider_and_deviation: Option<(Arc<dyn ComparisonProvider>, u64)>,
time_before_feeding: Duration,
sender: UnboundedSender<(usize, Instant, Vec<u8>)>,
(monotonic_id, tick_time): (usize, Duration),
recovery_mode: watch::Receiver<bool>,
monotonic_id: usize,
tick_time: Duration,
) -> Result<(), error::Worker>
where
P: Provider,
Expand Down Expand Up @@ -339,7 +327,6 @@ where
},
provider_name,
tick_time,
recovery_mode,
)
.await
}
Expand All @@ -349,7 +336,6 @@ async fn provider_main_loop<SenderFn, P>(
sender: SenderFn,
provider_name: Box<str>,
tick_time: Duration,
mut recovery_mode: watch::Receiver<bool>,
) -> Result<(), error::Worker>
where
SenderFn: Fn(Vec<u8>) -> Result<(), ()>,
Expand All @@ -360,20 +346,7 @@ where
let mut seq_error_counter: u8 = 0;

'worker_loop: loop {
if select! {
_ = interval.tick() => false,
Ok(()) = recovery_mode.changed() => {
*recovery_mode.borrow()
}
} {
while *recovery_mode.borrow() {
if recovery_mode.changed().await.is_err() {
error!("Recovery mode state watch closed! Exiting worker loop...");

break 'worker_loop Err(error::Worker::RecoveryModeWatchClosed);
}
}
}
interval.tick().await;

match provider.get_prices(true).await {
Ok(prices) => {
Expand Down

0 comments on commit ff81b44

Please sign in to comment.