From 05b17398a1597310776b804ca81da7e8fa7acf7f Mon Sep 17 00:00:00 2001 From: Elder Ryan Date: Thu, 28 Mar 2024 16:59:42 +0800 Subject: [PATCH] feat: transport channel pool (#562) 1. Set DEFAULT_TTL_MS: to 600 * 1000; 2. don't print whole payload on debug log 3. Use `ProviderRef` in `WASM` environment to avoid ownership issues (may cause undefined here) 4. Use request_internal instead of request to avoid multi-serde from JS and WASM 5. Implementation of round robin pool for data channel --- Cargo.lock | 8 +- Cargo.toml | 2 +- crates/core/src/consts.rs | 2 +- crates/node/src/backend/snark/browser.rs | 17 +- crates/node/src/backend/snark/mod.rs | 18 +- crates/node/src/provider/browser/provider.rs | 24 +++ crates/node/src/provider/mod.rs | 2 +- crates/node/src/tests/wasm/snark.rs | 7 +- crates/transport/Cargo.toml | 3 + .../src/connections/native_webrtc/mod.rs | 67 ++++++-- .../src/connections/web_sys_webrtc/mod.rs | 61 +++++-- crates/transport/src/core/mod.rs | 1 + crates/transport/src/core/pool.rs | 159 ++++++++++++++++++ crates/transport/src/error.rs | 6 + 14 files changed, 321 insertions(+), 56 deletions(-) create mode 100644 crates/transport/src/core/pool.rs diff --git a/Cargo.lock b/Cargo.lock index 0e275890b..3c6f6feb3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3804,7 +3804,7 @@ dependencies = [ "rings-derive", "rings-transport", "serde", - "serde-wasm-bindgen 0.6.3", + "serde-wasm-bindgen 0.6.5", "serde_json", "sha1", "sha2 0.10.8", @@ -3888,7 +3888,7 @@ dependencies = [ "rings-snark", "rings-transport", "serde", - "serde-wasm-bindgen 0.6.3", + "serde-wasm-bindgen 0.6.5", "serde_json", "serde_yaml", "strum", @@ -4255,9 +4255,9 @@ dependencies = [ [[package]] name = "serde-wasm-bindgen" -version = "0.6.3" +version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9b713f70513ae1f8d92665bbbbda5c295c2cf1da5542881ae5eefe20c9af132" +checksum = "8302e169f0eddcc139c70f139d19d6467353af16f9fce27e8c30158036a1e16b" dependencies = [ "js-sys", "serde", diff --git a/Cargo.toml b/Cargo.toml index 3140d4f4b..3dd6a043e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ rings-node = { path = "crates/node" } rings-rpc = { path = "crates/rpc", default-features = false } rings-snark = { path = "crates/snark", default-features = false } rings-transport = { path = "crates/transport" } -serde-wasm-bindgen = "0.6.1" +serde-wasm-bindgen = "0.6.5" wasm-bindgen = "0.2.87" wasm-bindgen-futures = "0.4.37" wasm-bindgen-macro-support = "0.2.84" diff --git a/crates/core/src/consts.rs b/crates/core/src/consts.rs index f38cb1a86..d92c51ec4 100644 --- a/crates/core/src/consts.rs +++ b/crates/core/src/consts.rs @@ -1,7 +1,7 @@ //! Constant variables. /// /// default ttl in ms -pub const DEFAULT_TTL_MS: u64 = 300 * 1000; +pub const DEFAULT_TTL_MS: u64 = 600 * 1000; pub const MAX_TTL_MS: u64 = DEFAULT_TTL_MS * 10; pub const TS_OFFSET_TOLERANCE_MS: u128 = 3000; pub const DEFAULT_SESSION_TTL_MS: u64 = 30 * 24 * 3600 * 1000; diff --git a/crates/node/src/backend/snark/browser.rs b/crates/node/src/backend/snark/browser.rs index 2048d2e08..07124d033 100644 --- a/crates/node/src/backend/snark/browser.rs +++ b/crates/node/src/backend/snark/browser.rs @@ -13,6 +13,7 @@ use crate::backend::types::snark::SNARKProofTask; use crate::backend::types::snark::SNARKVerifyTask; use crate::backend::BackendMessageHandlerDynObj; use crate::prelude::rings_core::utils::js_value; +use crate::provider::browser::ProviderRef; /// We need this ref to pass Task ref to js_sys #[wasm_bindgen] @@ -131,7 +132,7 @@ impl SNARKBehaviour { /// Handle js native message pub fn handle_snark_task_message( self, - provider: Provider, + provider: ProviderRef, ctx: JsValue, msg: JsValue, ) -> js_sys::Promise { @@ -139,7 +140,7 @@ impl SNARKBehaviour { future_to_promise(async move { let ctx = js_value::deserialize::(ctx)?; let msg = js_value::deserialize::(msg)?; - ins.handle_message(provider.into(), &ctx, &msg) + ins.handle_message(provider.inner(), &ctx, &msg) .await .map_err(|e| Error::BackendError(e.to_string()))?; Ok(JsValue::NULL) @@ -168,14 +169,18 @@ impl SNARKBehaviour { /// send proof task to did pub fn send_proof_task_to( &self, - provider: Provider, + provider: ProviderRef, task: SNARKProofTaskRef, did: String, ) -> js_sys::Promise { let ins = self.clone(); future_to_promise(async move { let ret = ins - .send_proof_task(provider.clone().into(), task.as_ref(), Did::from_str(&did)?) + .send_proof_task( + provider.inner().clone(), + task.as_ref(), + Did::from_str(&did)?, + ) .await .map_err(JsError::from)?; Ok(JsValue::from(ret)) @@ -185,14 +190,14 @@ impl SNARKBehaviour { /// Generate a proof task and send it to did pub fn gen_and_send_proof_task_to( &self, - provider: Provider, + provider: ProviderRef, circuits: Vec, did: String, ) -> js_sys::Promise { let ins = self.clone(); future_to_promise(async move { let ret = ins - .gen_and_send_proof_task(provider.clone().into(), circuits, Did::from_str(&did)?) + .gen_and_send_proof_task(provider.inner().clone(), circuits, Did::from_str(&did)?) .await .map_err(JsError::from)?; Ok(JsValue::from(ret)) diff --git a/crates/node/src/backend/snark/mod.rs b/crates/node/src/backend/snark/mod.rs index ef113e569..b84bd4a25 100644 --- a/crates/node/src/backend/snark/mod.rs +++ b/crates/node/src/backend/snark/mod.rs @@ -939,18 +939,12 @@ impl MessageHandler for SNARKBehaviour { } .into(); let params = resp.into_send_backend_message_request(verifier)?; - #[cfg(not(target_arch = "wasm32"))] - provider.request(Method::SendBackendMessage, params).await?; - #[cfg(target_arch = "wasm32")] - { - let req = rings_core::utils::js_value::serialize(¶ms)?; - let promise = provider.request(Method::SendBackendMessage.to_string(), req); - wasm_bindgen_futures::JsFuture::from(promise) - .await - .map_err(|e| { - Error::JsError(format!("Failed send backend message: {:?}", e)) - })?; - } + provider + .request_internal( + Method::SendBackendMessage.to_string(), + serde_json::to_value(params)?, + ) + .await?; Ok(()) } SNARKTask::SNARKVerify(t) => { diff --git a/crates/node/src/provider/browser/provider.rs b/crates/node/src/provider/browser/provider.rs index a6e2b21dd..034e0ac68 100644 --- a/crates/node/src/provider/browser/provider.rs +++ b/crates/node/src/provider/browser/provider.rs @@ -43,6 +43,30 @@ pub enum AddressType { Ed25519, } +/// A wrapper of Arc Ref of Provider +#[derive(Clone)] +#[wasm_export] +pub struct ProviderRef { + inner: Arc, +} + +impl ProviderRef { + /// get wrapped arc, this is useful for wasm case + pub fn inner(&self) -> Arc { + self.inner.clone() + } +} + +#[wasm_export] +impl Provider { + /// make provider as an As arc ref + pub fn as_ref(&self) -> ProviderRef { + ProviderRef { + inner: Arc::new(self.clone()), + } + } +} + #[wasm_export] impl Provider { /// Create new instance of Provider, return Promise diff --git a/crates/node/src/provider/mod.rs b/crates/node/src/provider/mod.rs index c5199d03d..4bf4cdf09 100644 --- a/crates/node/src/provider/mod.rs +++ b/crates/node/src/provider/mod.rs @@ -165,7 +165,7 @@ impl Provider { method: String, params: serde_json::Value, ) -> Result { - tracing::debug!("request {} params: {:?}", method, params); + tracing::debug!("request {}", method); self.handler .handle_request(self.processor.clone(), method, params) .await diff --git a/crates/node/src/tests/wasm/snark.rs b/crates/node/src/tests/wasm/snark.rs index dd38d70e1..22ddd2011 100644 --- a/crates/node/src/tests/wasm/snark.rs +++ b/crates/node/src/tests/wasm/snark.rs @@ -65,7 +65,10 @@ async fn test_send_snark_backend_message() { console_log!("wait for register"); js_utils::window_sleep(1000).await.unwrap(); console_log!("gen snark task and send"); - let promise = - snark_behaviour.gen_and_send_proof_task_to(provider1, circuits, provider2.address()); + let promise = snark_behaviour.gen_and_send_proof_task_to( + provider1.as_ref(), + circuits, + provider2.address(), + ); wasm_bindgen_futures::JsFuture::from(promise).await.unwrap(); } diff --git a/crates/transport/Cargo.toml b/crates/transport/Cargo.toml index 361de7546..b19b4e441 100644 --- a/crates/transport/Cargo.toml +++ b/crates/transport/Cargo.toml @@ -46,6 +46,9 @@ web-sys = { version = "0.3.64", optional = true, features = [ "RtcSessionDescription", "RtcSessionDescriptionInit", "RtcStatsReport", + "Window", + "WorkerGlobalScope", + "ServiceWorkerGlobalScope", ] } # Common dependencies diff --git a/crates/transport/src/connections/native_webrtc/mod.rs b/crates/transport/src/connections/native_webrtc/mod.rs index e42be3bd5..1bf310459 100644 --- a/crates/transport/src/connections/native_webrtc/mod.rs +++ b/crates/transport/src/connections/native_webrtc/mod.rs @@ -18,6 +18,10 @@ use webrtc::peer_connection::RTCPeerConnection; use crate::callback::InnerTransportCallback; use crate::connection_ref::ConnectionRef; use crate::core::callback::BoxedTransportCallback; +use crate::core::pool::MessageSenderPool; +use crate::core::pool::RoundRobin; +use crate::core::pool::RoundRobinPool; +use crate::core::pool::StatusPool; use crate::core::transport::ConnectionInterface; use crate::core::transport::TransportInterface; use crate::core::transport::TransportMessage; @@ -31,12 +35,35 @@ use crate::pool::Pool; const WEBRTC_WAIT_FOR_DATA_CHANNEL_OPEN_TIMEOUT: u8 = 8; // seconds const WEBRTC_GATHER_TIMEOUT: u8 = 60; // seconds +/// pool size of data channel +const DATA_CHANNEL_POOL_SIZE: u8 = 4; + +#[cfg_attr(arch_family = "wasm", async_trait(?Send))] +#[cfg_attr(not(arch_family = "wasm"), async_trait)] +impl MessageSenderPool> for RoundRobinPool> { + type Message = TransportMessage; + async fn send(&self, msg: TransportMessage) -> Result<()> { + let channel = self.select()?; + let data = bincode::serialize(&msg).map(Bytes::from)?; + if let Err(e) = channel.send(&data).await { + tracing::error!("{:?}, Data size: {:?}", e, data.len()); + return Err(e.into()); + } + Ok(()) + } +} + +impl StatusPool> for RoundRobinPool> { + fn all_ready(&self) -> Result { + self.all(|c| c.ready_state() == RTCDataChannelState::Open) + } +} /// A connection that implemented by webrtc-rs library. /// Used for native environment. pub struct WebrtcConnection { webrtc_conn: RTCPeerConnection, - webrtc_data_channel: Arc, + webrtc_data_channel: Arc>>, webrtc_data_channel_state_notifier: Notifier, cancel_token: CancellationToken, } @@ -52,7 +79,7 @@ pub struct WebrtcTransport { impl WebrtcConnection { fn new( webrtc_conn: RTCPeerConnection, - webrtc_data_channel: Arc, + webrtc_data_channel: Arc>>, webrtc_data_channel_state_notifier: Notifier, ) -> Self { Self { @@ -108,12 +135,7 @@ impl ConnectionInterface for WebrtcConnection { async fn send_message(&self, msg: TransportMessage) -> Result<()> { self.webrtc_wait_for_data_channel_open().await?; - let data = bincode::serialize(&msg).map(Bytes::from)?; - if let Err(e) = self.webrtc_data_channel.send(&data).await { - tracing::error!("{:?}, Data size: {:?}", e, data.len()); - return Err(e.into()); - } - Ok(()) + self.webrtc_data_channel.send(msg).await } async fn get_stats(&self) -> Vec { @@ -171,7 +193,7 @@ impl ConnectionInterface for WebrtcConnection { return Err(Error::DataChannelOpen("Connection unavailable".to_string())); } - if self.webrtc_data_channel.ready_state() == RTCDataChannelState::Open { + if self.webrtc_data_channel.all_ready()? { return Ok(()); } @@ -179,9 +201,7 @@ impl ConnectionInterface for WebrtcConnection { .set_timeout(WEBRTC_WAIT_FOR_DATA_CHANNEL_OPEN_TIMEOUT); self.webrtc_data_channel_state_notifier.clone().await; - dbg!(self.webrtc_data_channel.ready_state()); - - if self.webrtc_data_channel.ready_state() == RTCDataChannelState::Open { + if self.webrtc_data_channel.all_ready()? { return Ok(()); } else { return Err(Error::DataChannelOpen(format!( @@ -239,7 +259,7 @@ impl TransportInterface for WebrtcTransport { // // Create webrtc connection // - let webrtc_conn = webrtc_api.new_peer_connection(webrtc_config).await?; + let webrtc_conn: RTCPeerConnection = webrtc_api.new_peer_connection(webrtc_config).await?; // // Set callbacks @@ -251,15 +271,23 @@ impl TransportInterface for WebrtcTransport { webrtc_data_channel_state_notifier.clone(), )); + let channel_pool = Arc::new(RoundRobinPool::default()); + let channel_pool_ref = channel_pool.clone(); let data_channel_inner_cb = inner_cb.clone(); webrtc_conn.on_data_channel(Box::new(move |d: Arc| { let d_label = d.label(); let d_id = d.id(); tracing::debug!("New DataChannel {d_label} {d_id}"); - + let channel_pool = channel_pool_ref.clone(); let on_open_inner_cb = data_channel_inner_cb.clone(); d.on_open(Box::new(move || { - Box::pin(async move { on_open_inner_cb.on_data_channel_open().await }) + Box::pin(async move { + // check all channels are ready + // trigger on_data_channel_open callback iff all channels ready (open) + if let Ok(true) = channel_pool.all_ready() { + on_open_inner_cb.on_data_channel_open().await + } + }) })); let on_close_inner_cb = data_channel_inner_cb.clone(); @@ -300,14 +328,19 @@ impl TransportInterface for WebrtcTransport { // // Create data channel // - let webrtc_data_channel = webrtc_conn.create_data_channel("rings", None).await?; + for i in 0..DATA_CHANNEL_POOL_SIZE { + let ch = webrtc_conn + .create_data_channel(&format!("rings_data_channel_{}", i), None) + .await?; + channel_pool.push(ch)?; + } // // Construct the Connection // let conn = WebrtcConnection::new( webrtc_conn, - webrtc_data_channel, + channel_pool, webrtc_data_channel_state_notifier, ); diff --git a/crates/transport/src/connections/web_sys_webrtc/mod.rs b/crates/transport/src/connections/web_sys_webrtc/mod.rs index 762cd29d4..9b4ff0fcd 100644 --- a/crates/transport/src/connections/web_sys_webrtc/mod.rs +++ b/crates/transport/src/connections/web_sys_webrtc/mod.rs @@ -24,6 +24,10 @@ use web_sys::RtcStatsReport; use crate::callback::InnerTransportCallback; use crate::connection_ref::ConnectionRef; use crate::core::callback::BoxedTransportCallback; +use crate::core::pool::MessageSenderPool; +use crate::core::pool::RoundRobin; +use crate::core::pool::RoundRobinPool; +use crate::core::pool::StatusPool; use crate::core::transport::ConnectionInterface; use crate::core::transport::TransportInterface; use crate::core::transport::TransportMessage; @@ -37,12 +41,37 @@ use crate::pool::Pool; const WEBRTC_WAIT_FOR_DATA_CHANNEL_OPEN_TIMEOUT: u8 = 8; // seconds const WEBRTC_GATHER_TIMEOUT: u8 = 60; // seconds +/// pool size of data channel +const DATA_CHANNEL_POOL_SIZE: u8 = 4; + +#[async_trait(?Send)] +impl MessageSenderPool for RoundRobinPool { + type Message = TransportMessage; + async fn send(&self, msg: TransportMessage) -> Result<()> { + let channel = self.select()?; + let data = bincode::serialize(&msg)?; + if let Err(e) = channel + .send_with_u8_array(&data) + .map_err(Error::WebSysWebrtc) + { + tracing::error!("{:?}, Data size: {:?}", e, data.len()); + return Err(e.into()); + } + Ok(()) + } +} + +impl StatusPool for RoundRobinPool { + fn all_ready(&self) -> Result { + self.all(|c| c.ready_state() == RtcDataChannelState::Open) + } +} /// A connection that implemented by web_sys library. /// Used for browser environment. pub struct WebSysWebrtcConnection { webrtc_conn: RtcPeerConnection, - webrtc_data_channel: RtcDataChannel, + webrtc_data_channel: Arc>, webrtc_data_channel_state_notifier: Notifier, } @@ -56,7 +85,7 @@ pub struct WebSysWebrtcTransport { impl WebSysWebrtcConnection { fn new( webrtc_conn: RtcPeerConnection, - webrtc_data_channel: RtcDataChannel, + webrtc_data_channel: Arc>, webrtc_data_channel_state_notifier: Notifier, ) -> Self { Self { @@ -119,10 +148,7 @@ impl ConnectionInterface for WebSysWebrtcConnection { async fn send_message(&self, msg: TransportMessage) -> Result<()> { self.webrtc_wait_for_data_channel_open().await?; - let data = bincode::serialize(&msg)?; - self.webrtc_data_channel - .send_with_u8_array(&data) - .map_err(Error::WebSysWebrtc)?; + self.webrtc_data_channel.send(msg).await?; Ok(()) } @@ -205,7 +231,7 @@ impl ConnectionInterface for WebSysWebrtcConnection { return Err(Error::DataChannelOpen("Connection unavailable".to_string())); } - if self.webrtc_data_channel.ready_state() == RtcDataChannelState::Open { + if self.webrtc_data_channel.all_ready()? { return Ok(()); } @@ -213,7 +239,7 @@ impl ConnectionInterface for WebSysWebrtcConnection { .set_timeout(WEBRTC_WAIT_FOR_DATA_CHANNEL_OPEN_TIMEOUT); self.webrtc_data_channel_state_notifier.clone().await; - if self.webrtc_data_channel.ready_state() == RtcDataChannelState::Open { + if self.webrtc_data_channel.all_ready()? { return Ok(()); } else { return Err(Error::DataChannelOpen(format!( @@ -270,16 +296,24 @@ impl TransportInterface for WebSysWebrtcTransport { )); let data_channel_inner_cb = inner_cb.clone(); + let channel_pool = Arc::new(RoundRobinPool::default()); + let channel_pool_ref = channel_pool.clone(); + let on_data_channel = Box::new(move |ev: RtcDataChannelEvent| { let d = ev.channel(); let d_label = d.label(); tracing::debug!("New DataChannel {d_label}"); - + let channel_pool = channel_pool_ref.clone(); let on_open_inner_cb = data_channel_inner_cb.clone(); let on_open = Box::new(move || { + let channel_pool = channel_pool.clone(); let inner_cb = on_open_inner_cb.clone(); spawn_local(async move { - inner_cb.on_data_channel_open().await; + // check all channels are ready + // trigger on_data_channel_open callback iff all channels ready (open) + if let Ok(true) = channel_pool.all_ready() { + inner_cb.on_data_channel_open().await; + } }) }); @@ -363,14 +397,17 @@ impl TransportInterface for WebSysWebrtcTransport { // // Create data channel // - let webrtc_data_channel = webrtc_conn.create_data_channel("rings"); + for i in 0..DATA_CHANNEL_POOL_SIZE { + let ch = webrtc_conn.create_data_channel(&format!("rings_data_channel_{}", i)); + channel_pool.push(ch)?; + } // // Construct the Connection // let conn = WebSysWebrtcConnection::new( webrtc_conn, - webrtc_data_channel, + channel_pool, webrtc_data_channel_state_notifier, ); diff --git a/crates/transport/src/core/mod.rs b/crates/transport/src/core/mod.rs index 04978fd68..1bb210b19 100644 --- a/crates/transport/src/core/mod.rs +++ b/crates/transport/src/core/mod.rs @@ -12,4 +12,5 @@ //! coming data channel message and etc. See the [callback] module. pub mod callback; +pub mod pool; pub mod transport; diff --git a/crates/transport/src/core/pool.rs b/crates/transport/src/core/pool.rs new file mode 100644 index 000000000..3bc01e68a --- /dev/null +++ b/crates/transport/src/core/pool.rs @@ -0,0 +1,159 @@ +//! A module implementing a generic round-robin pool for various transport systems. +//! +//! This module provides the foundation for creating a pool of resources (e.g., connections, channels) and +//! enables round-robin selection among these resources. It's designed with flexibility in mind, allowing +//! integration with different types of transport mechanisms. This ensures efficient and balanced resource +//! utilization across multiple channels or connections, irrespective of their specific implementation details. + +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::sync::RwLock; + +use async_trait::async_trait; + +use crate::error::Error; +use crate::error::Result; + +/// Defines the behavior for managing resources in a round-robin manner. +/// +/// This trait outlines essential operations for a round-robin resource pool, facilitating equitable +/// selection and access distribution among pooled resources. It's intended for various use cases where +/// managing a collection of elements (e.g., network connections, data channels) efficiently is crucial. +pub trait RoundRobin { + /// Selects a resource from the pool, ensuring it is in a ready state for use. + fn select(&self) -> Result; + + /// Check if all contained element of pool match the statement. + fn all(&self, statement: fn(&T) -> bool) -> Result; +} + +/// Implements a round-robin pool for resources that can be cloned. +/// +/// This structure provides a concrete round-robin pooling mechanism, supporting the sequential +/// selection of resources. It's generic over the resource type, requiring only that they implement +/// the `Clone` trait, thus ensuring wide applicability to various types of resources. +pub struct RoundRobinPool { + pool: RwLock>, + idx: AtomicUsize, +} + +impl Default for RoundRobinPool { + fn default() -> Self { + Self { + pool: RwLock::new(vec![]), + idx: AtomicUsize::from(0), + } + } +} + +impl RoundRobinPool { + /// Creates a new round-robin pool from a provided vector of resources. + /// + /// Initializes the pool with the specified resources and sets the initial selection index to zero. + /// This is the entry point for creating a pool and managing resource selection in a round-robin fashion. + pub fn from_vec(conns: Vec) -> Self { + Self { + pool: RwLock::new(conns), + idx: AtomicUsize::from(0), + } + } + + /// Push a item with type T to the pool, this operator will increate the pool size + pub fn push(&self, item: T) -> Result<()> { + let mut pool = self + .pool + .write() + .map_err(|_| Error::RwLockWrite("Failed to write RR pool".to_string()))?; + pool.push(item); + Ok(()) + } +} + +impl RoundRobin for RoundRobinPool { + /// Selects the next resource from the pool in a round-robin order. + /// + /// Safely increments the internal index to cycle through resources, ensuring each is selected + /// sequentially. The method ensures thread-safety and atomicity in its operations, suitable for + /// concurrent environments. + fn select(&self) -> Result { + let pool = self + .pool + .read() + .map_err(|_| Error::RwLockRead("Failed to read RR pool when selecting".to_string()))?; + let len = pool.len(); + let idx = self + .idx + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |x| { + Some((x + 1) % len) + }) + .expect("Unable to update index for round-robin selection."); + + Ok(pool[idx].clone()) + } + + /// Accesses all resources pooled, potentially for inspection or bulk operations. + /// + /// Offers direct access to the pool's underlying resources, enabling operations that require knowledge + /// or manipulation of the entire collection of resources. + fn all(&self, statement: fn(&T) -> bool) -> Result { + let pool = self.pool.read().map_err(|_| { + Error::RwLockRead( + "Failed to read RR pool when fetching all contained element".to_string(), + ) + })?; + Ok(pool.iter().all(statement)) + } +} + +/// A trait for pools capable of asynchronously sending messages through their resources. +/// +/// Extends `RoundRobin` with functionality for asynchronous message transmission, leveraging the pooled +/// resources for communication. It's adaptable to various messaging patterns and data types, specified +/// by the generic `Message` associated type. +#[cfg_attr(target_family = "wasm", async_trait(?Send))] +#[cfg_attr(not(target_family = "wasm"), async_trait)] +pub trait MessageSenderPool: RoundRobin { + /// The type of messages that can be sent through the pool. + /// + /// This associated type specifies the format and structure of messages suitable for transmission + /// using the pool's resources. By defining this as a generic type, the trait allows for implementation + /// with a wide variety of message types, making the pool versatile and adaptable to different + /// communication needs and protocols. + type Message; + /// Asynchronously sends a message using one of the resources in the pool. + /// + /// A generic method accommodating different message types, facilitating their transmission + /// through the pool's resources selected in a round-robin manner. It underscores the pool's + /// versatility in handling diverse communication scenarios. + async fn send(&self, msg: Self::Message) -> Result<()>; +} + +/// A trait for assessing the readiness of all resources in a pool. +/// +/// Enhances `RoundRobin` with the ability to verify the operational readiness of pooled resources. +/// It caters to use cases requiring assurance that all resources are prepared for task execution +/// or data handling before proceeding with operations. +pub trait StatusPool: RoundRobin { + /// Evaluates the readiness of all pooled resources. + /// + /// Determines whether every resource in the pool is ready for operations, facilitating decision-making + /// processes in resource management and task allocation. + fn all_ready(&self) -> Result; +} + +#[cfg(test)] +pub mod tests { + use super::*; + + #[test] + fn test_rr_pool() { + let pool = RoundRobinPool::::from_vec(vec![1, 2, 3, 4]); + assert_eq!(pool.select().unwrap(), 1); + assert_eq!(pool.select().unwrap(), 2); + assert_eq!(pool.select().unwrap(), 3); + assert_eq!(pool.select().unwrap(), 4); + assert_eq!(pool.select().unwrap(), 1); + assert_eq!(pool.select().unwrap(), 2); + assert_eq!(pool.select().unwrap(), 3); + } +} diff --git a/crates/transport/src/error.rs b/crates/transport/src/error.rs index 23cbe3e83..fcead6526 100644 --- a/crates/transport/src/error.rs +++ b/crates/transport/src/error.rs @@ -47,6 +47,12 @@ pub enum Error { #[error("Connection {0} is released")] ConnectionReleased(String), + + #[error("Rwlock try write failed: {0}")] + RwLockWrite(String), + + #[error("Rwlock try read failed: {0}")] + RwLockRead(String), } #[cfg(feature = "web-sys-webrtc")]