Skip to content

Commit

Permalink
Add DLQ support
Browse files Browse the repository at this point in the history
  • Loading branch information
jaymell committed Oct 7, 2024
1 parent 50a8e2b commit 8ae8326
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 2 deletions.
17 changes: 15 additions & 2 deletions server/svix-server/src/queue/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

use std::{num::NonZeroUsize, sync::Arc, time::Duration};

use omniqueue::backends::{RedisBackend, RedisConfig};
use omniqueue::backends::{redis::DeadLetterQueueConfig, RedisBackend, RedisConfig};
use redis::{AsyncCommands as _, RedisResult};

use super::{QueueTask, TaskQueueConsumer, TaskQueueProducer};
Expand All @@ -50,6 +50,9 @@ const DELAYED: &str = "{queue}_svix_delayed";
/// The key for the lock guarding the delayed queue background task.
const DELAYED_LOCK: &str = "{queue}_svix_delayed_lock";

/// The key for the DLQ
const DLQ: &str = "{queue}_svix_dlq";

// v2 KEY CONSTANTS
const LEGACY_V2_MAIN: &str = "{queue}_svix_main";
const LEGACY_V2_PROCESSING: &str = "{queue}_svix_processing";
Expand Down Expand Up @@ -86,6 +89,7 @@ pub async fn new_pair(
MAIN,
DELAYED,
DELAYED_LOCK,
DLQ,
)
.await
}
Expand Down Expand Up @@ -124,6 +128,7 @@ async fn new_pair_inner(
main_queue_name: &'static str,
delayed_queue_name: &'static str,
delayed_lock_name: &'static str,
dlq_name: &'static str,
) -> (TaskQueueProducer, TaskQueueConsumer) {
let main_queue_name = format!("{queue_prefix}{main_queue_name}");
let delayed_queue_name = format!("{queue_prefix}{delayed_queue_name}");
Expand Down Expand Up @@ -206,7 +211,10 @@ async fn new_pair_inner(
consumer_name: WORKER_CONSUMER.to_owned(),
payload_key: QUEUE_KV_KEY.to_owned(),
ack_deadline_ms: pending_duration,
dlq_config: None,
dlq_config: Some(DeadLetterQueueConfig {
queue_key: dlq_name.to_string(),
max_receives: 3,
}),
sentinel_config: cfg.redis_sentinel_cfg.clone().map(|c| c.into()),
};

Expand Down Expand Up @@ -472,6 +480,7 @@ pub mod tests {
"{test}_idle_period",
"{test}_idle_period_delayed",
"{test}_idle_period_delayed_lock",
"{test}_dlq",
)
.await;

Expand Down Expand Up @@ -542,6 +551,7 @@ pub mod tests {
"{test}_ack",
"{test}_ack_delayed",
"{test}_ack_delayed_lock",
"{test}_dlq",
)
.await;

Expand Down Expand Up @@ -588,6 +598,7 @@ pub mod tests {
"{test}_nack",
"{test}_nack_delayed",
"{test}_nack_delayed_lock",
"{test}_dlq",
)
.await;

Expand Down Expand Up @@ -631,6 +642,7 @@ pub mod tests {
"{test}_delay",
"{test}_delay_delayed",
"{test}_delay_delayed_lock",
"{test}_dlq",
)
.await;

Expand Down Expand Up @@ -804,6 +816,7 @@ pub mod tests {
v3_main,
v2_delayed,
v2_delayed_lock,
"dlq-bruh",
)
.await;

Expand Down
28 changes: 28 additions & 0 deletions server/svix-server/src/v1/endpoints/admin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use aide::{
axum::{routing::put_with, ApiRouter},
transform::TransformPathItem,
};
use axum::extract::State;
use svix_server_derive::aide_annotate;

use crate::{core::permissions, error::Result, v1::utils::NoContent, AppState};

/// Redrive DLQ
#[aide_annotate(op_id = "v1.admin.redrive-dlq")]
pub async fn redrive_dlq(
State(AppState { queue_tx, .. }): State<AppState>,
_: permissions::Organization,
) -> Result<NoContent> {
if let Err(e) = queue_tx.redrive_dlq().await {
tracing::warn!(error = ?e, "DLQ redrive failed");
}
Ok(NoContent)
}

pub fn router() -> ApiRouter<AppState> {
ApiRouter::new().api_route_with(
"/admin/redrive-dlq",
put_with(redrive_dlq, redrive_dlq_operation),
move |op: TransformPathItem<'_>| op.tag("Admin".as_ref()).hidden(true),
)
}
1 change: 1 addition & 0 deletions server/svix-server/src/v1/endpoints/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-FileCopyrightText: © 2022 Svix Authors
// SPDX-License-Identifier: MIT

pub mod admin;
pub mod application;
pub mod attempt;
pub mod auth;
Expand Down
1 change: 1 addition & 0 deletions server/svix-server/src/v1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub fn router() -> ApiRouter<AppState> {
.merge(endpoints::event_type::router())
.merge(endpoints::message::router())
.merge(endpoints::attempt::router())
.merge(endpoints::admin::router())
.layer(
TraceLayer::new_for_http()
.make_span_with(AxumOtelSpanCreator)
Expand Down

0 comments on commit 8ae8326

Please sign in to comment.