Skip to content

Commit

Permalink
Bridge: add /events poller
Browse files Browse the repository at this point in the history
I'd call this "quick and dirty" but it was actually a bit of a hefty
lift to pull together. It is however fairly dirty.

The want is to be able to pull messages from `/events` and feed them
into a receiver output.
The flow we'd see with `SenderInput` paired with `ReceiverOutput` is
what we want, but the existing typing didn't allow for this.

Refactors were needed to:
- lift out the parts from the HTTP server that run payloads through
  transformations then forward to an output.
- update config to account for a new type of receiver: a "poller".
- new traits/types for the poller to differentiate it from the standard
  webhook receiver (to ensure we don't accidentally pass one through
  and try to bind it to a URL).
- Lots of "connective tissue" in the form of converters between config
  values and concrete ones that can actually do things.

Some of the "connective tissue" exists purely to mimic bits and pieces
that existed for either the other receivers or senders (remember, this
case is odd in that it's similar to both).

Refactorings aside, the poller itself boasts an exponential backoff for
both error cases (either from `/events` or from the output) as well as
for the case where the `/events` iterator is "done."

This diff comes with a promise that we will (soon) give these additions
another look at clean up the stuff that doesn't make sense or feels
redundant.
  • Loading branch information
svix-onelson committed Jul 10, 2024
1 parent 3467877 commit dd40495
Show file tree
Hide file tree
Showing 7 changed files with 455 additions and 61 deletions.
105 changes: 78 additions & 27 deletions bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bridge/svix-bridge-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ async-trait = "0.1"
tokio.workspace = true
serde.workspace = true
serde_json.workspace = true
svix = "1.17.0"
svix = { version = "1.24.0", features = ["svix_beta"] }
26 changes: 23 additions & 3 deletions bridge/svix-bridge-types/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

pub use async_trait::async_trait;
use serde::{Deserialize, Serialize};
pub use svix;
Expand Down Expand Up @@ -140,6 +142,13 @@ pub trait SenderInput: Send {
async fn run(&self);
}

#[async_trait]
pub trait PollerInput: Send {
fn name(&self) -> &str;
fn set_transformer(&mut self, _tx: Option<TransformerTx>) {}
async fn run(&self);
}

pub type BoxError = Box<dyn std::error::Error + Send + Sync>;

/// Represents something we can hand a webhook payload to.
Expand Down Expand Up @@ -186,16 +195,27 @@ impl ReceiverInputOpts {
}

// N.b. the codegen types we get from openapi don't impl Deserialize so we need our own version.
#[derive(Debug, Default, Deserialize)]
#[derive(Clone, Debug, Default, Deserialize)]
pub struct SvixOptions {
#[serde(default)]
pub debug: bool,
pub server_url: Option<String>,
pub timeout_secs: Option<u64>,
}

impl From<SvixOptions> for _SvixOptions {
fn from(SvixOptions { debug, server_url }: SvixOptions) -> Self {
_SvixOptions { debug, server_url }
fn from(
SvixOptions {
debug,
server_url,
timeout_secs,
}: SvixOptions,
) -> Self {
_SvixOptions {
debug,
server_url,
timeout: timeout_secs.map(Duration::from_secs),
}
}
}

Expand Down
99 changes: 85 additions & 14 deletions bridge/svix-bridge/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,18 @@ use shellexpand::LookupError;
use svix_bridge_plugin_kafka::{KafkaInputOpts, KafkaOutputOpts};
use svix_bridge_plugin_queue::config::{QueueInputOpts, QueueOutputOpts};
use svix_bridge_types::{
ReceiverInputOpts, ReceiverOutput, SenderInput, SenderOutputOpts, TransformationConfig,
svix::api::Svix, ReceiverInputOpts, ReceiverOutput, SenderInput, SenderOutputOpts, SvixOptions,
TransformationConfig,
};
use tracing::Level;

#[derive(Deserialize)]
#[serde(untagged)]
pub enum EitherReceiver {
Webhook(WebhookReceiverConfig),
Poller(PollerReceiverConfig),
}

#[derive(Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Config {
Expand All @@ -27,7 +35,7 @@ pub struct Config {
pub senders: Vec<WebhookSenderConfig>,
/// Config for receiving webhooks and forwarding them to plugins.
#[serde(default)]
pub receivers: Vec<WebhookReceiverConfig>,
pub receivers: Vec<EitherReceiver>,
/// The log level to run the service with. Supported: info, debug, trace
#[serde(default)]
pub log_level: LogLevel,
Expand Down Expand Up @@ -81,18 +89,25 @@ impl Config {
}
}

for rc in &cfg.receivers {
if let Some(tc) = &rc.transformation {
crate::runtime::validate_script(tc.source().as_str()).map_err(|e| {
Error::new(
ErrorKind::Other,
format!(
"failed to parse transformation for receiver `{}`: {:?}",
&rc.name, e,
),
)
})?;
}
for (name, tc) in cfg.receivers.iter().filter_map(|either| match either {
EitherReceiver::Webhook(receiver) => receiver
.transformation
.as_ref()
.map(|tc| (&receiver.name, tc)),
EitherReceiver::Poller(receiver) => receiver
.transformation
.as_ref()
.map(|tc| (&receiver.name, tc)),
}) {
crate::runtime::validate_script(tc.source().as_str()).map_err(|e| {
Error::new(
ErrorKind::Other,
format!(
"failed to parse transformation for receiver `{}`: {:?}",
name, e,
),
)
})?;
}

Ok(cfg)
Expand Down Expand Up @@ -238,5 +253,61 @@ impl WebhookReceiverConfig {
}
}

#[derive(Clone, Deserialize)]
#[serde(tag = "type", rename_all = "kebab-case")]
pub enum PollerInputOpts {
SvixEvents {
app_id: String,
subscription_id: String,
svix_token: String,
#[serde(default)]
svix_options: Option<SvixOptions>,
},
}

impl PollerInputOpts {
pub fn svix_client(&self) -> Option<Svix> {
match self {
PollerInputOpts::SvixEvents {
svix_token,
svix_options,
..
} => Some(Svix::new(
svix_token.clone(),
svix_options.clone().map(Into::into),
)),
}
}
}

/// Config for fetching from HTTP endpoints and forwarding them to plugins.
#[derive(Deserialize)]
pub struct PollerReceiverConfig {
pub name: String,
pub input: PollerInputOpts,
// FIXME: add a configurable polling schedule or interval
#[serde(default)]
pub transformation: Option<TransformationConfig>,
pub output: ReceiverOutputOpts,
}

impl PollerReceiverConfig {
// FIXME: duplicate from WebhookReceiverConfig. Extract/refactor as TryFrom ReceiverOutputOpts?
pub async fn into_receiver_output(self) -> anyhow::Result<Box<dyn ReceiverOutput>> {
match self.output {
ReceiverOutputOpts::Kafka(opts) => {
svix_bridge_plugin_kafka::into_receiver_output(self.name, opts).map_err(Into::into)
}
ReceiverOutputOpts::Queue(x) => svix_bridge_plugin_queue::into_receiver_output(
self.name.clone(),
x,
self.transformation.as_ref(),
)
.await
.map_err(Into::into),
}
}
}

#[cfg(test)]
mod tests;
Loading

0 comments on commit dd40495

Please sign in to comment.