Skip to content

Commit

Permalink
Add 'message.attempt.recovered' operational webhook
Browse files Browse the repository at this point in the history
This event is fired when a successful attempt is made after
several attemtps have already failed.
It already exists in the hosted svix.com product
  • Loading branch information
svix-aaron1011 committed Oct 2, 2024
1 parent 1cf8a7f commit e9f1386
Show file tree
Hide file tree
Showing 5 changed files with 7,968 additions and 2 deletions.
87 changes: 87 additions & 0 deletions server/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -1747,6 +1747,69 @@
],
"type": "object"
},
"MessageAttemptRecoveredEvent": {
"description": "Sent on a successful dispatch after an earlier failure op webhook has already been sent.",
"properties": {
"data": {
"$ref": "#/components/schemas/MessageAttemptRecoveredEventData"
},
"type": {
"default": "message.attempt.recovered",
"enum": [
"message.attempt.recovered"
],
"type": "string"
}
},
"required": [
"data",
"type"
],
"type": "object"
},
"MessageAttemptRecoveredEventData": {
"description": "Sent when a message delivery has failed (all of the retry attempts have been exhausted) as a \"message.attempt.exhausted\" type or after it's failed four times as a \"message.attempt.failing\" event.",
"properties": {
"appId": {
"example": "app_1srOrx2ZWZBpBUvZwXKQmoEYga2",
"type": "string"
},
"appUid": {
"example": "unique-app-identifier",
"maxLength": 256,
"minLength": 1,
"nullable": true,
"pattern": "^[a-zA-Z0-9\\-_.]+$",
"type": "string"
},
"endpointId": {
"example": "ep_1srOrx2ZWZBpBUvZwXKQmoEYga2",
"type": "string"
},
"lastAttempt": {
"$ref": "#/components/schemas/MessageAttempetLast"
},
"msgEventId": {
"example": "unique-msg-identifier",
"maxLength": 256,
"minLength": 1,
"nullable": true,
"pattern": "^[a-zA-Z0-9\\-_.]+$",
"type": "string"
},
"msgId": {
"example": "msg_1srOrx2ZWZBpBUvZwXKQmoEYga2",
"type": "string"
}
},
"required": [
"appId",
"endpointId",
"lastAttempt",
"msgId"
],
"type": "object"
},
"MessageAttemptTriggerType": {
"description": "The reason an attempt was made:\n- Scheduled = 0\n- Manual = 1",
"enum": [
Expand Down Expand Up @@ -7742,6 +7805,30 @@
"Webhooks"
]
}
},
"MessageAttemptRecoveredEvent": {
"post": {
"description": "Sent on a successful dispatch after an earlier failure op webhook has already been sent.",
"operationId": "MessageAttemptRecoveredEvent",
"requestBody": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/MessageAttemptRecoveredEvent"
}
}
}
},
"responses": {
"2XX": {
"description": "Return any 2XX status to indicate that the data was received successfully"
}
},
"summary": "MessageAttemptRecoveredEvent",
"tags": [
"Webhooks"
]
}
}
}
}
2 changes: 2 additions & 0 deletions server/svix-server/src/core/operational_webhooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ pub enum OperationalWebhook {
MessageAttemptExhausted(MessageAttemptEvent),
#[serde(rename = "message.attempt.failing")]
MessageAttemptFailing(MessageAttemptEvent),
#[serde(rename = "message.attempt.recovered")]
MessageAttemptRecovered(MessageAttemptEvent),
}

pub type OperationalWebhookSender = Arc<OperationalWebhookSenderInner>;
Expand Down
14 changes: 14 additions & 0 deletions server/svix-server/src/openapi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,13 @@ mod webhooks {
common_: MessageAttemptEvent,
}

#[derive(JsonSchema)]
#[allow(unused)]
struct MessageAttemptRecoveredEventData {
#[serde(flatten)]
common_: MessageAttemptEvent,
}

webhook_event!(
EndpointCreatedEvent,
EndpointCreatedEventData,
Expand Down Expand Up @@ -451,6 +458,12 @@ mod webhooks {
"message.attempt.failing",
"Sent after a message has been failing for a few times.\nIt's sent on the fourth failure. It complements `message.attempt.exhausted` which is sent after the last failure."
);
webhook_event!(
MessageAttemptRecoveredEvent,
MessageAttemptRecoveredEventData,
"message.attempt.recovered",
"Sent on a successful dispatch after an earlier failure op webhook has already been sent."
);

/// Generates documentation for operational webhooks in the Redoc `x-webhooks`
/// format. For more info see https://redocly.com/docs/api-reference-docs/specification-extensions/x-webhooks/
Expand All @@ -462,6 +475,7 @@ mod webhooks {
document_webhook::<EndpointUpdatedEvent>(),
document_webhook::<MessageAttemptExhaustedEvent>(),
document_webhook::<MessageAttemptFailingEvent>(),
document_webhook::<MessageAttemptRecoveredEvent>(),
])
}
}
7,835 changes: 7,834 additions & 1 deletion server/svix-server/src/static/openapi.json

Large diffs are not rendered by default.

32 changes: 31 additions & 1 deletion server/svix-server/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,11 +407,19 @@ async fn make_http_call(

#[tracing::instrument(skip_all, fields(response_code, msg_dest_id = msg_dest.id.0))]
async fn handle_successful_dispatch(
WorkerContext { cache, db, .. }: &WorkerContext<'_>,
WorkerContext {
cache,
db,
op_webhook_sender,
..
}: &WorkerContext<'_>,
DispatchContext {
org_id,
endp,
app_id,
app_uid,
msg_task,
msg_uid,
..
}: DispatchContext<'_>,
SuccessfulDispatch(mut attempt): SuccessfulDispatch,
Expand All @@ -432,6 +440,28 @@ async fn handle_successful_dispatch(
tracing::Span::current().record("response_code", attempt.response_status_code);
tracing::info!("Webhook success.");

if msg_task.attempt_count as usize >= OP_WEBHOOKS_SEND_FAILING_EVENT_AFTER {
if let Err(e) = op_webhook_sender
.send_operational_webhook(
org_id,
OperationalWebhook::MessageAttemptRecovered(MessageAttemptEvent {
app_id: app_id.clone(),
app_uid: app_uid.cloned(),
endpoint_id: msg_task.endpoint_id.clone(),
msg_id: msg_task.msg_id.clone(),
msg_event_id: msg_uid.cloned(),
last_attempt: attempt.into(),
}),
)
.await
{
tracing::error!(
"Failed sending MessageAttemptRecovered Operational Webhook: {}",
e
);
}
}

Ok(())
}

Expand Down

0 comments on commit e9f1386

Please sign in to comment.