Skip to content

Commit

Permalink
sort of complete flow, top to bottom
Browse files Browse the repository at this point in the history
buncha refactor - theoretically we have the complete flow but need to
make sure the poller stays alive. Right now the process is terminating
because the queue isn't up, which idk. I guess the queue should be up?

I'll setup a queue and see how it handles when the events endpoint
doesn't respond nice...
  • Loading branch information
svix-onelson committed Jul 9, 2024
1 parent 1a7b073 commit 4cafbbc
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 33 deletions.
2 changes: 1 addition & 1 deletion bridge/svix-bridge-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ async-trait = "0.1"
tokio.workspace = true
serde.workspace = true
serde_json.workspace = true
svix = "1.24.0"
svix = { version = "1.24.0", features = ["svix_beta"] }
2 changes: 1 addition & 1 deletion bridge/svix-bridge/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ impl WebhookReceiverConfig {
}
}

#[derive(Deserialize)]
#[derive(Clone, Deserialize)]
#[serde(tag = "type", rename_all = "kebab-case")]
pub enum PollerInputOpts {
SvixEvents {
Expand Down
144 changes: 113 additions & 31 deletions bridge/svix-bridge/src/webhook_receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@ use axum::{
Router,
};
use svix_bridge_types::{
async_trait, svix::api::Svix, ForwardRequest, PollerInput, ReceiverOutput,
TransformationConfig, TransformerInput, TransformerInputFormat, TransformerJob,
TransformerOutput, TransformerTx,
async_trait,
svix::api::{Svix, V1MessageStreamParams},
ForwardRequest, PollerInput, ReceiverOutput, TransformationConfig, TransformerInput,
TransformerInputFormat, TransformerJob, TransformerOutput, TransformerTx,
};
use tracing::instrument;
use types::{IntegrationId, IntegrationState, InternalState, SerializableRequest, Unvalidated};

use crate::{
config::{PollerInputOpts, PollerReceiverConfig, WebhookReceiverConfig},
webhook_receiver::types::{SerializablePayload, Validated},
webhook_receiver::types::SerializablePayload,
};

mod config;
Expand Down Expand Up @@ -75,10 +76,22 @@ async fn route(
}) = routes.get(&integration_id)
{
match req.validate(verifier).await {
Ok(req) => match handle_request(req, transformer_tx, output, transformation).await {
Ok(value) => value,
Err(value) => return value,
},
Ok(req) => {
let payload = match parse_payload(
req.payload(),
transformation.as_ref(),
transformer_tx.clone(),
)
.await
{
Err(e) => return e,
Ok(p) => p,
};
match handle(payload, output.clone()).await {
Ok(value) => value,
Err(value) => return value,
}
}
Err(code) => {
tracing::warn!("validation failed: {code}");
code
Expand All @@ -91,17 +104,10 @@ async fn route(
}

// FIXME: Really odd return type - artifact of being extracted from the HTTP server
async fn handle_request(
req: SerializableRequest<Validated>,
transformer_tx: TransformerTx,
output: &Arc<Box<dyn ReceiverOutput>>,
transformation: &Option<TransformationConfig>,
async fn handle(
payload: ForwardRequest,
output: Arc<Box<dyn ReceiverOutput>>,
) -> Result<http::StatusCode, http::StatusCode> {
// FIXME: for svix-event pollers we already know the payload is json so there's some redundant ser/deser cycles being lost
let payload = match parse_payload(req.payload(), transformation, transformer_tx.clone()).await {
Err(e) => return Err(e),
Ok(p) => p,
};
tracing::debug!("forwarding request");
Ok(match output.handle(payload).await {
Ok(_) => http::StatusCode::NO_CONTENT,
Expand All @@ -124,7 +130,7 @@ async fn handle_request(
/// For either case, we expect the value produced to match the schema of a [`ForwardRequest`].
async fn parse_payload(
payload: &SerializablePayload,
transformation: &Option<TransformationConfig>,
transformation: Option<&TransformationConfig>,
transformer_tx: TransformerTx,
) -> Result<ForwardRequest, http::StatusCode> {
match transformation {
Expand Down Expand Up @@ -197,6 +203,7 @@ struct SvixEventsPoller {
transformation: Option<TransformationConfig>,
transformer_tx: Option<TransformerTx>,
svix_client: Svix,
output: Arc<Box<dyn ReceiverOutput>>,
}

#[async_trait]
Expand All @@ -215,39 +222,114 @@ impl PollerInput for SvixEventsPoller {
}

impl PollerReceiverConfig {
fn into_poller_input(self, transformer_tx: TransformerTx) -> Box<dyn PollerInput> {
async fn into_poller_input(
self,
transformer_tx: TransformerTx,
) -> std::io::Result<Box<dyn PollerInput>> {
let svix_client = self
.input
.svix_client()
.expect("only one poller type; svix client required");
Box::new(SvixEventsPoller {
name: self.name,
input_opts: self.input,
transformation: self.transformation,
let name = self.name.clone();
let input_opts = self.input.clone();
let transformation = self.transformation.clone();
let output = Arc::new(
self.into_receiver_output()
.await
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?,
);
Ok(Box::new(SvixEventsPoller {
name,
input_opts,
transformation,
transformer_tx: Some(transformer_tx.clone()),
svix_client,
})
output,
}))
}
}

async fn run_inner(poller: &SvixEventsPoller) -> ! {
// How long to wait if the request comes back non-2xx.
const ERROR_SLEEP: Duration = Duration::from_secs(3);
// FIXME: exponential back off
let mut interval = tokio::time::interval(Duration::from_secs(3));
let PollerInputOpts::SvixEvents {
app_id,
subscription_id,
..
} = &poller.input_opts;
let mut iterator = None;
loop {
interval.tick().await;
tracing::debug!("want to poll /events");
tracing::trace!(app_id, subscription_id, "poll /events");
match poller
.svix_client
.message()
.events(V1MessageStreamParams {
// FIXME: expose more params as poller input cfg
app_id: app_id.clone(),
limit: None,
iterator: iterator.clone(),
event_types: None,
channels: None,
after: None,
})
.await
{
Ok(resp) => {
for msg in resp.data.into_iter() {
let payload = match parse_payload(
&SerializablePayload::Standard(
// FIXME: for svix-event pollers we already know the payload is json so there's some redundant ser/deser cycles being lost
serde_json::to_vec(&msg)
.expect("just fetched as json, must be serializable"),
),
poller.transformation.as_ref(),
poller
.transformer_tx
.clone()
.expect("transformer tx is required"),
)
.await
{
Err(status) => {
tracing::error!(
status = status.as_u16(),
"error while parsing polled message"
);
// Retry the current iterator
continue;
}
Ok(p) => p,
};
if let Err(status) = handle(payload, poller.output.clone()).await {
// FIXME: need to refactor handle to not give http status codes so we can report what happened here.
tracing::error!(
status = status.as_u16(),
"error while handling polled message"
);
}
}
// Update the iterator _after we've handled all the messages in the batch_.
iterator = Some(resp.iterator.clone());
}
Err(_) => {
tokio::time::sleep(ERROR_SLEEP).await;
}
}
}
}

/// Pollers make HTTP requests in a loop and forward what they fetch to their `ReceiverOutput`
pub async fn run_pollers(
pollers: Vec<PollerReceiverConfig>,
poller_cfgs: Vec<PollerReceiverConfig>,
transformer_tx: TransformerTx,
) -> std::io::Result<()> {
let pollers: Vec<Box<dyn PollerInput>> = pollers
.into_iter()
.map(|prc| prc.into_poller_input(transformer_tx.clone()))
.collect();
let mut pollers: Vec<Box<dyn PollerInput>> = Vec::with_capacity(poller_cfgs.len());
for poller_cfg in poller_cfgs {
pollers.push(poller_cfg.into_poller_input(transformer_tx.clone()).await?);
}
supervise_pollers(pollers).await
}

Expand Down

0 comments on commit 4cafbbc

Please sign in to comment.