diff --git a/bridge/Cargo.lock b/bridge/Cargo.lock index 0c9bfcbf3..537de404f 100644 --- a/bridge/Cargo.lock +++ b/bridge/Cargo.lock @@ -682,7 +682,7 @@ dependencies = [ "http 0.2.12", "http-body 0.4.6", "hyper 0.14.29", - "hyper-rustls", + "hyper-rustls 0.24.2", "once_cell", "pin-project-lite", "pin-utils", @@ -2170,7 +2170,26 @@ dependencies = [ "rustls 0.21.12", "rustls-native-certs 0.6.3", "tokio", - "tokio-rustls", + "tokio-rustls 0.24.1", +] + +[[package]] +name = "hyper-rustls" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c" +dependencies = [ + "futures-util", + "http 1.1.0", + "hyper 1.3.1", + "hyper-util", + "log", + "rustls 0.22.4", + "rustls-native-certs 0.7.0", + "rustls-pki-types", + "tokio", + "tokio-rustls 0.25.0", + "tower-service", ] [[package]] @@ -2198,6 +2217,22 @@ dependencies = [ "tokio-native-tls", ] +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper 1.3.1", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.5" @@ -2205,12 +2240,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b875924a60b96e5d7b9ae7b066540b1dd1cbd90d1828f54c92e02a283351c56" dependencies = [ "bytes", + "futures-channel", "futures-util", "http 1.1.0", "http-body 1.0.0", "hyper 1.3.1", "pin-project-lite", + "socket2 0.5.7", "tokio", + "tower", + "tower-service", + "tracing", ] [[package]] @@ -2514,16 +2554,6 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" -[[package]] -name = "mime_guess" -version = "2.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef" -dependencies = [ - "mime", - "unicase", -] - [[package]] name = "minimal-lexical" version = "0.2.1" @@ -3390,12 +3420,11 @@ dependencies = [ "http 0.2.12", "http-body 0.4.6", "hyper 0.14.29", - "hyper-tls", + "hyper-tls 0.5.0", "ipnet", "js-sys", "log", "mime", - "mime_guess", "native-tls", "once_cell", "percent-encoding", @@ -3515,6 +3544,20 @@ dependencies = [ "sct", ] +[[package]] +name = "rustls" +version = "0.22.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432" +dependencies = [ + "log", + "ring", + "rustls-pki-types", + "rustls-webpki 0.102.4", + "subtle", + "zeroize", +] + [[package]] name = "rustls" version = "0.23.9" @@ -4082,20 +4125,26 @@ checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" [[package]] name = "svix" -version = "1.17.0" +version = "1.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47ca1e53956ea8fa121372664b70bd97ba79a72d58f8da61443b57f8e208d5f9" +checksum = "b02d3be1b7856ceb8ccf13506b48b9e5f576739636be9ca9950a2bd56954c65f" dependencies = [ "base64 0.13.1", "hmac-sha256", "http 0.2.12", - "reqwest", + "http 1.1.0", + "http-body-util", + "hyper 1.3.1", + "hyper-rustls 0.26.0", + "hyper-tls 0.6.0", + "hyper-util", "serde", "serde_derive", "serde_json", "serde_repr", "thiserror", "time", + "tokio", "url", ] @@ -4629,6 +4678,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-rustls" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" +dependencies = [ + "rustls 0.22.4", + "rustls-pki-types", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.15" @@ -4693,7 +4753,7 @@ dependencies = [ "rustls 0.21.12", "rustls-pemfile 1.0.4", "tokio", - "tokio-rustls", + "tokio-rustls 0.24.1", "tokio-stream", "tower", "tower-layer", @@ -4882,15 +4942,6 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" -[[package]] -name = "unicase" -version = "2.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7d2d4dafb69621809a81864c9c1b864479e1235c0dd4e199924b9742439ed89" -dependencies = [ - "version_check", -] - [[package]] name = "unicode-bidi" version = "0.3.15" diff --git a/bridge/svix-bridge-types/Cargo.toml b/bridge/svix-bridge-types/Cargo.toml index 5ef963296..c9277514e 100644 --- a/bridge/svix-bridge-types/Cargo.toml +++ b/bridge/svix-bridge-types/Cargo.toml @@ -9,4 +9,4 @@ async-trait = "0.1" tokio.workspace = true serde.workspace = true serde_json.workspace = true -svix = "1.17.0" +svix = { version = "1.24.0", features = ["svix_beta"] } diff --git a/bridge/svix-bridge-types/src/lib.rs b/bridge/svix-bridge-types/src/lib.rs index d20e9ee91..2b74a4a2b 100644 --- a/bridge/svix-bridge-types/src/lib.rs +++ b/bridge/svix-bridge-types/src/lib.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + pub use async_trait::async_trait; use serde::{Deserialize, Serialize}; pub use svix; @@ -140,6 +142,13 @@ pub trait SenderInput: Send { async fn run(&self); } +#[async_trait] +pub trait PollerInput: Send { + fn name(&self) -> &str; + fn set_transformer(&mut self, _tx: Option) {} + async fn run(&self); +} + pub type BoxError = Box; /// Represents something we can hand a webhook payload to. @@ -186,16 +195,27 @@ impl ReceiverInputOpts { } // N.b. the codegen types we get from openapi don't impl Deserialize so we need our own version. -#[derive(Debug, Default, Deserialize)] +#[derive(Clone, Debug, Default, Deserialize)] pub struct SvixOptions { #[serde(default)] pub debug: bool, pub server_url: Option, + pub timeout_secs: Option, } impl From for _SvixOptions { - fn from(SvixOptions { debug, server_url }: SvixOptions) -> Self { - _SvixOptions { debug, server_url } + fn from( + SvixOptions { + debug, + server_url, + timeout_secs, + }: SvixOptions, + ) -> Self { + _SvixOptions { + debug, + server_url, + timeout: timeout_secs.map(Duration::from_secs), + } } } diff --git a/bridge/svix-bridge/src/config/mod.rs b/bridge/svix-bridge/src/config/mod.rs index f3bde68cd..cf3dc8a91 100644 --- a/bridge/svix-bridge/src/config/mod.rs +++ b/bridge/svix-bridge/src/config/mod.rs @@ -15,10 +15,18 @@ use shellexpand::LookupError; use svix_bridge_plugin_kafka::{KafkaInputOpts, KafkaOutputOpts}; use svix_bridge_plugin_queue::config::{QueueInputOpts, QueueOutputOpts}; use svix_bridge_types::{ - ReceiverInputOpts, ReceiverOutput, SenderInput, SenderOutputOpts, TransformationConfig, + svix::api::Svix, ReceiverInputOpts, ReceiverOutput, SenderInput, SenderOutputOpts, SvixOptions, + TransformationConfig, }; use tracing::Level; +#[derive(Deserialize)] +#[serde(untagged)] +pub enum EitherReceiver { + Webhook(WebhookReceiverConfig), + Poller(PollerReceiverConfig), +} + #[derive(Deserialize)] #[serde(deny_unknown_fields)] pub struct Config { @@ -27,7 +35,7 @@ pub struct Config { pub senders: Vec, /// Config for receiving webhooks and forwarding them to plugins. #[serde(default)] - pub receivers: Vec, + pub receivers: Vec, /// The log level to run the service with. Supported: info, debug, trace #[serde(default)] pub log_level: LogLevel, @@ -81,18 +89,25 @@ impl Config { } } - for rc in &cfg.receivers { - if let Some(tc) = &rc.transformation { - crate::runtime::validate_script(tc.source().as_str()).map_err(|e| { - Error::new( - ErrorKind::Other, - format!( - "failed to parse transformation for receiver `{}`: {:?}", - &rc.name, e, - ), - ) - })?; - } + for (name, tc) in cfg.receivers.iter().filter_map(|either| match either { + EitherReceiver::Webhook(receiver) => receiver + .transformation + .as_ref() + .map(|tc| (&receiver.name, tc)), + EitherReceiver::Poller(receiver) => receiver + .transformation + .as_ref() + .map(|tc| (&receiver.name, tc)), + }) { + crate::runtime::validate_script(tc.source().as_str()).map_err(|e| { + Error::new( + ErrorKind::Other, + format!( + "failed to parse transformation for receiver `{}`: {:?}", + name, e, + ), + ) + })?; } Ok(cfg) @@ -238,5 +253,61 @@ impl WebhookReceiverConfig { } } +#[derive(Clone, Deserialize)] +#[serde(tag = "type", rename_all = "kebab-case")] +pub enum PollerInputOpts { + SvixEvents { + app_id: String, + subscription_id: String, + svix_token: String, + #[serde(default)] + svix_options: Option, + }, +} + +impl PollerInputOpts { + pub fn svix_client(&self) -> Option { + match self { + PollerInputOpts::SvixEvents { + svix_token, + svix_options, + .. + } => Some(Svix::new( + svix_token.clone(), + svix_options.clone().map(Into::into), + )), + } + } +} + +/// Config for fetching from HTTP endpoints and forwarding them to plugins. +#[derive(Deserialize)] +pub struct PollerReceiverConfig { + pub name: String, + pub input: PollerInputOpts, + // FIXME: add a configurable polling schedule or interval + #[serde(default)] + pub transformation: Option, + pub output: ReceiverOutputOpts, +} + +impl PollerReceiverConfig { + // FIXME: duplicate from WebhookReceiverConfig. Extract/refactor as TryFrom ReceiverOutputOpts? + pub async fn into_receiver_output(self) -> anyhow::Result> { + match self.output { + ReceiverOutputOpts::Kafka(opts) => { + svix_bridge_plugin_kafka::into_receiver_output(self.name, opts).map_err(Into::into) + } + ReceiverOutputOpts::Queue(x) => svix_bridge_plugin_queue::into_receiver_output( + self.name.clone(), + x, + self.transformation.as_ref(), + ) + .await + .map_err(Into::into), + } + } +} + #[cfg(test)] mod tests; diff --git a/bridge/svix-bridge/src/config/tests.rs b/bridge/svix-bridge/src/config/tests.rs index dc412223e..515400028 100644 --- a/bridge/svix-bridge/src/config/tests.rs +++ b/bridge/svix-bridge/src/config/tests.rs @@ -609,3 +609,23 @@ fn test_var_substitution_json_values_ok() { // Should not be an error let _cfg = Config::from_src(src, Some(&vars)).unwrap(); } + +#[test] +fn test_pollers_parse_ok() { + let src = r#" + receivers: + - name: "forward-to-rabbitmq-example" + input: + type: "svix-events" + app_id: "xxx" + subscription_id: "xxx" + svix_token: "xxx" + output: + type: "rabbitmq" + uri: "amqp://guest:guest@localhost:5672/%2f" + exchange: "" + routing_key: "example" + "#; + // Should not be an error + let _cfg = Config::from_src(src, None).unwrap(); +} diff --git a/bridge/svix-bridge/src/main.rs b/bridge/svix-bridge/src/main.rs index 77cffaf37..6e3889766 100644 --- a/bridge/svix-bridge/src/main.rs +++ b/bridge/svix-bridge/src/main.rs @@ -11,7 +11,7 @@ use opentelemetry_sdk::{ metrics::{data::Temporality, reader::TemporalitySelector, InstrumentKind, SdkMeterProvider}, runtime::Tokio, }; -use svix_bridge_types::{SenderInput, TransformerJob}; +use svix_bridge_types::{PollerInput, SenderInput, TransformerJob}; use svix_ksuid::{KsuidLike as _, KsuidMs}; #[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))] use tikv_jemallocator::Jemalloc; @@ -28,6 +28,7 @@ mod webhook_receiver; use crate::{ allocator::{get_allocator_stat_mibs, get_allocator_stats}, + config::{EitherReceiver, PollerReceiverConfig, WebhookReceiverConfig}, metrics::CommonMetrics, }; @@ -193,6 +194,40 @@ async fn supervise_senders(inputs: Vec>) -> Result<()> { Ok(()) } +/// Pollers make HTTP requests in a loop and forward what they fetch to their `ReceiverOutput` +async fn supervise_pollers(inputs: Vec>) -> std::io::Result<()> { + let mut set = tokio::task::JoinSet::new(); + for input in inputs { + set.spawn(async move { + // FIXME: needs much better signaling for termination + loop { + // If this future returns, the consumer terminated unexpectedly. + input.run().await; + + tracing::warn!("poller input {} unexpectedly terminated", input.name()); + tokio::time::sleep(Duration::from_secs(1)).await; + } + }); + } + + // FIXME: add signal handling to trigger a (intentional) graceful shutdown. + + // FIXME: when a plugin exits unexpectedly, what do? + // Most consumers are probably stateful/brittle and may disconnect from time to time. + // Ideally none of these tasks would ever return Ok or Err. They'd run forever. + // Having the tasks themselves try to recover means if we see a task finish here, something + // must be really wrong, so maybe we trigger a shutdown of the rest when one stops here. + while let Some(_res) = set.join_next().await { + // In order for plugins to coordinate a shutdown, maybe they could: + // - have a shutdown method and handle their own internal signalling, or maybe + // - take a oneshot channel as an arg to `run()` + // Basically we need something that formalizes the shutdown flow in a cross-crate + // friendly way. + todo!("graceful shutdown"); + } + Ok(()) +} + #[derive(Parser)] pub struct Args { #[arg(long, env = "SVIX_BRIDGE_CFG_FILE", help = "Path to the config file.")] @@ -326,9 +361,34 @@ async fn main() -> Result<()> { if cfg.receivers.is_empty() { tracing::warn!("No receivers configured.") } - let receivers_fut = webhook_receiver::run(cfg.http_listen_address, cfg.receivers, xform_tx); + let (webhook_receivers, poller_receivers) = cfg.receivers.into_iter().fold( + ( + Vec::::new(), + Vec::::new(), + ), + |mut acc, either| match either { + EitherReceiver::Webhook(x) => { + acc.0.push(x); + acc + } + EitherReceiver::Poller(y) => { + acc.1.push(y); + acc + } + }, + ); + + let webhook_receivers_fut = + webhook_receiver::run(cfg.http_listen_address, webhook_receivers, xform_tx.clone()); + + let mut pollers: Vec> = Vec::with_capacity(poller_receivers.len()); + for poller_cfg in poller_receivers { + pollers.push(poller_cfg.into_poller_input(xform_tx.clone()).await?); + } + + let poller_receivers_fut = supervise_pollers(pollers); - match tokio::try_join!(senders_fut, receivers_fut) { + match tokio::try_join!(senders_fut, webhook_receivers_fut, poller_receivers_fut) { Ok(_) => tracing::error!("unexpectedly exiting"), Err(e) => tracing::error!("unexpectedly exiting: {}", e), } diff --git a/bridge/svix-bridge/src/webhook_receiver/mod.rs b/bridge/svix-bridge/src/webhook_receiver/mod.rs index c93dae0c3..b7f8dcc8a 100644 --- a/bridge/svix-bridge/src/webhook_receiver/mod.rs +++ b/bridge/svix-bridge/src/webhook_receiver/mod.rs @@ -1,4 +1,4 @@ -use std::net::SocketAddr; +use std::{net::SocketAddr, sync::Arc, time::Duration}; use axum::{ body::Body, @@ -7,13 +7,18 @@ use axum::{ Router, }; use svix_bridge_types::{ - ForwardRequest, TransformationConfig, TransformerInput, TransformerInputFormat, TransformerJob, - TransformerOutput, TransformerTx, + async_trait, + svix::api::{Svix, V1MessageStreamParams}, + ForwardRequest, PollerInput, ReceiverOutput, TransformationConfig, TransformerInput, + TransformerInputFormat, TransformerJob, TransformerOutput, TransformerTx, }; use tracing::instrument; use types::{IntegrationId, IntegrationState, InternalState, SerializableRequest, Unvalidated}; -use crate::{config::WebhookReceiverConfig, webhook_receiver::types::SerializablePayload}; +use crate::{ + config::{PollerInputOpts, PollerReceiverConfig, WebhookReceiverConfig}, + webhook_receiver::types::SerializablePayload, +}; mod config; mod types; @@ -74,7 +79,7 @@ async fn route( Ok(req) => { let payload = match parse_payload( req.payload(), - transformation, + transformation.as_ref(), transformer_tx.clone(), ) .await @@ -82,13 +87,9 @@ async fn route( Err(e) => return e, Ok(p) => p, }; - tracing::debug!("forwarding request"); - match output.handle(payload).await { - Ok(_) => http::StatusCode::NO_CONTENT, - Err(e) => { - tracing::error!("Error forwarding request: {}", e); - http::StatusCode::INTERNAL_SERVER_ERROR - } + match handle(payload, output.clone()).await { + Ok(value) => value, + Err(value) => return value, } } Err(code) => { @@ -102,6 +103,21 @@ async fn route( } } +// FIXME: Really odd return type - artifact of being extracted from the HTTP server +async fn handle( + payload: ForwardRequest, + output: Arc>, +) -> Result { + tracing::debug!("forwarding request"); + Ok(match output.handle(payload).await { + Ok(_) => http::StatusCode::NO_CONTENT, + Err(e) => { + tracing::error!("Error forwarding request: {}", e); + http::StatusCode::INTERNAL_SERVER_ERROR + } + }) +} + /// Figures out how to build a JSON object from the payload, optionally running it through a /// transformation. /// @@ -114,7 +130,7 @@ async fn route( /// For either case, we expect the value produced to match the schema of a [`ForwardRequest`]. async fn parse_payload( payload: &SerializablePayload, - transformation: &Option, + transformation: Option<&TransformationConfig>, transformer_tx: TransformerTx, ) -> Result { match transformation { @@ -181,5 +197,161 @@ async fn transform( } } +struct SvixEventsPoller { + name: String, + input_opts: PollerInputOpts, + transformation: Option, + transformer_tx: Option, + svix_client: Svix, + output: Arc>, +} + +#[async_trait] +impl PollerInput for SvixEventsPoller { + fn name(&self) -> &str { + &self.name + } + + fn set_transformer(&mut self, tx: Option) { + self.transformer_tx = tx; + } + + async fn run(&self) { + run_inner(self).await + } +} + +impl PollerReceiverConfig { + pub async fn into_poller_input( + self, + transformer_tx: TransformerTx, + ) -> std::io::Result> { + let svix_client = self + .input + .svix_client() + .expect("only one poller type; svix client required"); + let name = self.name.clone(); + let input_opts = self.input.clone(); + let transformation = self.transformation.clone(); + let output = Arc::new( + self.into_receiver_output() + .await + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?, + ); + Ok(Box::new(SvixEventsPoller { + name, + input_opts, + transformation, + transformer_tx: Some(transformer_tx.clone()), + svix_client, + output, + })) + } +} + +async fn run_inner(poller: &SvixEventsPoller) -> ! { + const MIN_SLEEP: Duration = Duration::from_millis(10); + const MAX_SLEEP: Duration = Duration::from_secs(300); + const NO_SLEEP: Duration = Duration::ZERO; + let mut sleep_time = NO_SLEEP; + + let PollerInputOpts::SvixEvents { + app_id, + subscription_id, + .. + } = &poller.input_opts; + + let mut iterator = None; + + loop { + tracing::trace!(app_id, subscription_id, "polling `/events`"); + match poller + .svix_client + .message() + .events(V1MessageStreamParams { + // FIXME: expose more params as poller input cfg + app_id: app_id.clone(), + limit: None, + iterator: iterator.clone(), + event_types: None, + channels: None, + after: None, + }) + .await + { + Ok(resp) => { + tracing::trace!(count = resp.data.len(), "got messages"); + for msg in resp.data.into_iter() { + let payload = match parse_payload( + &SerializablePayload::Standard( + // FIXME: for svix-event pollers we already know the payload is json so + // there's some wasted ser/deser/ser cycles. + serde_json::to_vec(&msg) + .expect("just fetched as json, must be serializable"), + ), + poller.transformation.as_ref(), + poller + .transformer_tx + .clone() + .expect("transformer tx is required"), + ) + .await + { + Err(status) => { + tracing::error!( + status = status.as_u16(), + "error while parsing polled message" + ); + // BACKOFF + sleep_time = (sleep_time * 2).clamp(MIN_SLEEP, MAX_SLEEP); + // Retry the current iterator. + continue; + } + Ok(p) => p, + }; + if let Err(status) = handle(payload, poller.output.clone()).await { + // FIXME: need to refactor handle to not give http status codes so we can report what happened here. + tracing::error!( + status = status.as_u16(), + "error while handling polled message" + ); + // BACKOFF + sleep_time = (sleep_time * 2).clamp(MIN_SLEEP, MAX_SLEEP); + } + } + tracing::trace!( + iterator =? &iterator, + next_iterator =? &resp.iterator, + "batch handled, updating local iterator" + ); + // Update the iterator _after we've handled all the messages in the batch_. + iterator = Some(resp.iterator.clone()); + // If the iterator is "done" we can backoff to wait for new messages to arrive. + sleep_time = if resp.done { + // BACKOFF + (sleep_time * 2).clamp(MIN_SLEEP, MAX_SLEEP) + } else { + NO_SLEEP + }; + } + + Err(err) => { + tracing::trace!( + error =? err, + iterator =? &iterator, + "request failed, retrying current iterator" + ); + // BACKOFF + sleep_time = (sleep_time * 2).clamp(MIN_SLEEP, MAX_SLEEP); + } + } + + if !sleep_time.is_zero() { + tracing::trace!(sleep_time=?sleep_time, "sleeping"); + tokio::time::sleep(sleep_time).await; + } + } +} + #[cfg(test)] mod tests;