From cddfd3462a9c28af9cffc326d1673c13019f3ba6 Mon Sep 17 00:00:00 2001 From: Aaron Hill Date: Tue, 1 Oct 2024 18:42:46 -0400 Subject: [PATCH] Add 'message.attempt.recovered' operational webhook This event is fired when a successful attempt is made after several attemtps have already failed. It already exists in the hosted svix.com product --- server/openapi.json | 87 +++++++++++++++++++ .../src/core/operational_webhooks.rs | 2 + server/svix-server/src/openapi.rs | 14 +++ server/svix-server/src/worker.rs | 32 ++++++- .../tests/it/e2e_operational_webhooks.rs | 1 + 5 files changed, 135 insertions(+), 1 deletion(-) diff --git a/server/openapi.json b/server/openapi.json index c6e235d0f..39dddd376 100644 --- a/server/openapi.json +++ b/server/openapi.json @@ -1747,6 +1747,69 @@ ], "type": "object" }, + "MessageAttemptRecoveredEvent": { + "description": "Sent on a successful dispatch after an earlier failure op webhook has already been sent.", + "properties": { + "data": { + "$ref": "#/components/schemas/MessageAttemptRecoveredEventData" + }, + "type": { + "default": "message.attempt.recovered", + "enum": [ + "message.attempt.recovered" + ], + "type": "string" + } + }, + "required": [ + "data", + "type" + ], + "type": "object" + }, + "MessageAttemptRecoveredEventData": { + "description": "Sent when a message delivery has failed (all of the retry attempts have been exhausted) as a \"message.attempt.exhausted\" type or after it's failed four times as a \"message.attempt.failing\" event.", + "properties": { + "appId": { + "example": "app_1srOrx2ZWZBpBUvZwXKQmoEYga2", + "type": "string" + }, + "appUid": { + "example": "unique-app-identifier", + "maxLength": 256, + "minLength": 1, + "nullable": true, + "pattern": "^[a-zA-Z0-9\\-_.]+$", + "type": "string" + }, + "endpointId": { + "example": "ep_1srOrx2ZWZBpBUvZwXKQmoEYga2", + "type": "string" + }, + "lastAttempt": { + "$ref": "#/components/schemas/MessageAttempetLast" + }, + "msgEventId": { + "example": "unique-msg-identifier", + "maxLength": 256, + "minLength": 1, + "nullable": true, + "pattern": "^[a-zA-Z0-9\\-_.]+$", + "type": "string" + }, + "msgId": { + "example": "msg_1srOrx2ZWZBpBUvZwXKQmoEYga2", + "type": "string" + } + }, + "required": [ + "appId", + "endpointId", + "lastAttempt", + "msgId" + ], + "type": "object" + }, "MessageAttemptTriggerType": { "description": "The reason an attempt was made:\n- Scheduled = 0\n- Manual = 1", "enum": [ @@ -7742,6 +7805,30 @@ "Webhooks" ] } + }, + "MessageAttemptRecoveredEvent": { + "post": { + "description": "Sent on a successful dispatch after an earlier failure op webhook has already been sent.", + "operationId": "MessageAttemptRecoveredEvent", + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/MessageAttemptRecoveredEvent" + } + } + } + }, + "responses": { + "2XX": { + "description": "Return any 2XX status to indicate that the data was received successfully" + } + }, + "summary": "MessageAttemptRecoveredEvent", + "tags": [ + "Webhooks" + ] + } } } } diff --git a/server/svix-server/src/core/operational_webhooks.rs b/server/svix-server/src/core/operational_webhooks.rs index f55e3eb04..58ef2c5c3 100644 --- a/server/svix-server/src/core/operational_webhooks.rs +++ b/server/svix-server/src/core/operational_webhooks.rs @@ -103,6 +103,8 @@ pub enum OperationalWebhook { MessageAttemptExhausted(MessageAttemptEvent), #[serde(rename = "message.attempt.failing")] MessageAttemptFailing(MessageAttemptEvent), + #[serde(rename = "message.attempt.recovered")] + MessageAttemptRecovered(MessageAttemptEvent), } pub type OperationalWebhookSender = Arc; diff --git a/server/svix-server/src/openapi.rs b/server/svix-server/src/openapi.rs index 13ec56ff0..86dc822b2 100644 --- a/server/svix-server/src/openapi.rs +++ b/server/svix-server/src/openapi.rs @@ -415,6 +415,13 @@ mod webhooks { common_: MessageAttemptEvent, } + #[derive(JsonSchema)] + #[allow(unused)] + struct MessageAttemptRecoveredEventData { + #[serde(flatten)] + common_: MessageAttemptEvent, + } + webhook_event!( EndpointCreatedEvent, EndpointCreatedEventData, @@ -451,6 +458,12 @@ mod webhooks { "message.attempt.failing", "Sent after a message has been failing for a few times.\nIt's sent on the fourth failure. It complements `message.attempt.exhausted` which is sent after the last failure." ); + webhook_event!( + MessageAttemptRecoveredEvent, + MessageAttemptRecoveredEventData, + "message.attempt.recovered", + "Sent on a successful dispatch after an earlier failure op webhook has already been sent." + ); /// Generates documentation for operational webhooks in the Redoc `x-webhooks` /// format. For more info see https://redocly.com/docs/api-reference-docs/specification-extensions/x-webhooks/ @@ -462,6 +475,7 @@ mod webhooks { document_webhook::(), document_webhook::(), document_webhook::(), + document_webhook::(), ]) } } diff --git a/server/svix-server/src/worker.rs b/server/svix-server/src/worker.rs index abf26a097..a521a2306 100644 --- a/server/svix-server/src/worker.rs +++ b/server/svix-server/src/worker.rs @@ -407,11 +407,19 @@ async fn make_http_call( #[tracing::instrument(skip_all, fields(response_code, msg_dest_id = msg_dest.id.0))] async fn handle_successful_dispatch( - WorkerContext { cache, db, .. }: &WorkerContext<'_>, + WorkerContext { + cache, + db, + op_webhook_sender, + .. + }: &WorkerContext<'_>, DispatchContext { org_id, endp, app_id, + app_uid, + msg_task, + msg_uid, .. }: DispatchContext<'_>, SuccessfulDispatch(mut attempt): SuccessfulDispatch, @@ -432,6 +440,28 @@ async fn handle_successful_dispatch( tracing::Span::current().record("response_code", attempt.response_status_code); tracing::info!("Webhook success."); + if msg_task.attempt_count as usize >= OP_WEBHOOKS_SEND_FAILING_EVENT_AFTER { + if let Err(e) = op_webhook_sender + .send_operational_webhook( + org_id, + OperationalWebhook::MessageAttemptRecovered(MessageAttemptEvent { + app_id: app_id.clone(), + app_uid: app_uid.cloned(), + endpoint_id: msg_task.endpoint_id.clone(), + msg_id: msg_task.msg_id.clone(), + msg_event_id: msg_uid.cloned(), + last_attempt: attempt.into(), + }), + ) + .await + { + tracing::error!( + "Failed sending MessageAttemptRecovered Operational Webhook: {}", + e + ); + } + } + Ok(()) } diff --git a/server/svix-server/tests/it/e2e_operational_webhooks.rs b/server/svix-server/tests/it/e2e_operational_webhooks.rs index 69362885f..9b5669062 100644 --- a/server/svix-server/tests/it/e2e_operational_webhooks.rs +++ b/server/svix-server/tests/it/e2e_operational_webhooks.rs @@ -431,6 +431,7 @@ async fn test_operational_webhooks_event_types_exist() { for et in &[ "message.attempt.failing", "message.attempt.exhausted", + "message.attempt.recovered", "endpoint.created", "endpoint.deleted", "endpoint.disabled",