diff --git a/README.md b/README.md index 383e21652..314109135 100644 --- a/README.md +++ b/README.md @@ -389,6 +389,10 @@ To support graceful shutdown on the server, all running tasks are finished befor One of our main goals with open sourcing the Svix dispatcher is ease of use. The hosted Svix service, however, is quite complex due to our scale and the infrastructure it requires. This complexity is not useful for the vast majority of people and would make this project much harder to use and much more limited. This is why this code has been adjusted before being released, and some of the features, optimizations, and behaviors supported by the hosted dispatcher are not yet available in this repo. With that being said, other than some known incompatibilities, the internal Svix test suite passes. This means they are already mostly compatible, and we are working hard on bringing them to full feature parity. +# Re-driving Redis DLQ +We have an undocumented endpoint for re-driving failed messages that are DLQ'ed. You can do this by calling `POST /api/v1/admin/redrive-dlq/`. + +To monitor the DLQ depth, you should monitor the `svix.queue.depth_dlq` metric. Any non-zero values indicate that there is data in the DLQ. # Development diff --git a/server/svix-server/Cargo.toml b/server/svix-server/Cargo.toml index 5987d3727..c12227b19 100644 --- a/server/svix-server/Cargo.toml +++ b/server/svix-server/Cargo.toml @@ -68,7 +68,7 @@ urlencoding = "2.1.2" form_urlencoded = "1.1.0" lapin = "2.1.1" sentry = { version = "0.32.2", features = ["tracing"] } -omniqueue = { git = "https://github.com/svix/omniqueue-rs", rev = "75e5a9510ad338ac3702b2e911bacf8967ac58d8", default-features = false, features = ["in_memory", "rabbitmq-with-message-ids", "redis_cluster", "redis_sentinel"] } +omniqueue = { git = "https://github.com/svix/omniqueue-rs", rev = "75e5a9510ad338ac3702b2e911bacf8967ac58d8", default-features = false, features = ["in_memory", "rabbitmq-with-message-ids", "redis_cluster", "redis_sentinel", "beta"] } # Not a well-known author, and no longer gets updates => pinned. # Switch to hyper-http-proxy when upgrading hyper to 1.0. hyper-proxy = { version = "=0.9.1", default-features = false, features = ["openssl-tls"] } diff --git a/server/svix-server/config.default.toml b/server/svix-server/config.default.toml index c8dc27387..286b053b6 100644 --- a/server/svix-server/config.default.toml +++ b/server/svix-server/config.default.toml @@ -120,3 +120,6 @@ worker_max_tasks = 500 # Whether or not to disable TLS certificate validation on Webhook dispatch. This is a dangerous flag # to set true. This value will default to false. # dangerous_disable_tls_verification = false + +# Maximum seconds of queue long-poll +queue_max_poll_secs = 20 \ No newline at end of file diff --git a/server/svix-server/src/cfg.rs b/server/svix-server/src/cfg.rs index c3d63704a..d34435d3f 100644 --- a/server/svix-server/src/cfg.rs +++ b/server/svix-server/src/cfg.rs @@ -69,6 +69,10 @@ const DEFAULTS: &str = include_str!("../config.default.toml"); pub type Configuration = Arc; +fn default_redis_pending_duration_secs() -> u64 { + 45 +} + #[derive(Clone, Debug, Deserialize, Validate)] #[validate( schema(function = "validate_config_complete"), @@ -184,6 +188,9 @@ pub struct ConfigurationInner { /// Maximum number of concurrent worker tasks to spawn (0 is unlimited) pub worker_max_tasks: u16, + /// Maximum seconds of a queue long-poll + pub queue_max_poll_secs: u16, + /// The address of the rabbitmq exchange pub rabbit_dsn: Option>, pub rabbit_consumer_prefetch_size: Option, @@ -197,6 +204,9 @@ pub struct ConfigurationInner { #[serde(flatten)] pub proxy_config: Option, + #[serde(default = "default_redis_pending_duration_secs")] + pub redis_pending_duration_secs: u64, + #[serde(flatten)] pub internal: InternalConfig, } diff --git a/server/svix-server/src/metrics/redis.rs b/server/svix-server/src/metrics/redis.rs index 6c948494b..bb514a144 100644 --- a/server/svix-server/src/metrics/redis.rs +++ b/server/svix-server/src/metrics/redis.rs @@ -43,6 +43,7 @@ pub struct RedisQueueMetrics { main_queue: Option>, pending_queue: Option>, delayed_queue: Option>, + deadletter_queue: Option>, } impl RedisQueueMetrics { @@ -65,10 +66,17 @@ impl RedisQueueMetrics { .try_init(), ); + let deadletter_queue = init_metric( + meter + .u64_observable_gauge("svix.queue.depth_dlq") + .try_init(), + ); + Self { main_queue, pending_queue, delayed_queue, + deadletter_queue, } } pub async fn record( @@ -77,6 +85,7 @@ impl RedisQueueMetrics { main_queue: &RedisQueueType<'_>, pending_queue: &RedisQueueType<'_>, delayed_queue: &RedisQueueType<'_>, + deadletter_queue: &RedisQueueType<'_>, ) { main_queue .queue_depth(redis) @@ -99,6 +108,13 @@ impl RedisQueueMetrics { .unwrap_or_else(|e| { tracing::warn!("Failed to record queue depth: {e}"); }); + deadletter_queue + .queue_depth(redis) + .await + .map(|d| self.record_deadletter_queue_depth(d)) + .unwrap_or_else(|e| { + tracing::warn!("Failed to record queue depth: {e}"); + }); } fn record_main_queue_depth(&self, value: u64) { @@ -116,4 +132,9 @@ impl RedisQueueMetrics { recorder.observe(value, &[]); } } + fn record_deadletter_queue_depth(&self, value: u64) { + if let Some(recorder) = &self.deadletter_queue { + recorder.observe(value, &[]); + } + } } diff --git a/server/svix-server/src/queue/mod.rs b/server/svix-server/src/queue/mod.rs index 8f40a6639..7a9d8cd6d 100644 --- a/server/svix-server/src/queue/mod.rs +++ b/server/svix-server/src/queue/mod.rs @@ -1,10 +1,9 @@ -use std::{sync::Arc, time::Duration}; +use std::{marker::PhantomData, num::NonZeroUsize, sync::Arc, time::Duration}; use omniqueue::{ - backends::InMemoryBackend, Delivery, DynConsumer, DynScheduledProducer, QueueConsumer, - ScheduledQueueProducer, + backends::InMemoryBackend, Delivery, DynConsumer, QueueConsumer, ScheduledQueueProducer, }; -use serde::{Deserialize, Serialize}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; use crate::{ cfg::{Configuration, QueueBackend}, @@ -24,6 +23,8 @@ const RETRY_SCHEDULE: &[Duration] = &[ Duration::from_millis(40), ]; +pub type TaskQueueDelivery = SvixOmniDelivery; + fn should_retry(err: &Error) -> bool { matches!(err.typ, ErrorType::Queue(_)) } @@ -139,19 +140,34 @@ impl QueueTask { } } -#[derive(Clone)] -pub struct TaskQueueProducer { - inner: Arc, +pub type TaskQueueProducer = SvixOmniProducer; +pub type TaskQueueConsumer = SvixOmniConsumer; + +pub struct SvixOmniProducer { + inner: Arc, + _phantom: PhantomData, +} + +// Manual impl to avoid adding 'Clone' bound on T +impl Clone for SvixOmniProducer { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + _phantom: PhantomData, + } + } } -impl TaskQueueProducer { - pub fn new(inner: impl ScheduledQueueProducer + 'static) -> Self { +impl SvixOmniProducer { + pub(super) fn new(inner: impl ScheduledQueueProducer + 'static) -> Self { Self { inner: Arc::new(inner.into_dyn_scheduled()), + _phantom: PhantomData, } } - pub async fn send(&self, task: QueueTask, delay: Option) -> Result<()> { + #[tracing::instrument(skip_all, name = "queue_send")] + pub async fn send(&self, task: &T, delay: Option) -> Result<()> { let task = Arc::new(task); run_with_retries( || async { @@ -169,57 +185,99 @@ impl TaskQueueProducer { ) .await } + + #[tracing::instrument(skip_all, name = "redrive_dlq")] + pub async fn redrive_dlq(&self) -> Result<()> { + self.inner.redrive_dlq().await.map_err(Into::into) + } } -pub struct TaskQueueConsumer { +pub struct SvixOmniConsumer { inner: DynConsumer, + _phantom: PhantomData, +} + +pub trait OmniMessage: Serialize + DeserializeOwned + Send + Sync { + fn task_id(&self) -> Option<&str>; } -impl TaskQueueConsumer { - pub fn new(inner: impl QueueConsumer + 'static) -> Self { +impl OmniMessage for QueueTask { + fn task_id(&self) -> Option<&str> { + self.msg_id() + } +} + +impl SvixOmniConsumer { + pub(super) fn new(inner: impl QueueConsumer + 'static) -> Self { Self { inner: inner.into_dyn(), + _phantom: PhantomData, } } - pub async fn receive_all(&mut self) -> Result> { - const MAX_MESSAGES: usize = 128; - // FIXME(onelson): need to figure out what deadline/duration to use here + #[tracing::instrument(skip_all, name = "queue_receive_all")] + pub async fn receive_all(&mut self, deadline: Duration) -> Result>> { + pub const MAX_MESSAGES: usize = 128; self.inner - .receive_all(MAX_MESSAGES, Duration::from_secs(30)) + .receive_all(MAX_MESSAGES, deadline) .await - .map_err(Into::into) + .map_err(Error::from) .trace()? .into_iter() - .map(TryInto::try_into) + .map(|acker| { + Ok(SvixOmniDelivery { + task: Arc::new( + acker + .payload_serde_json() + .map_err(|e| { + Error::queue(format!("Failed to decode queue task: {e:?}")) + })? + .ok_or_else(|| Error::queue("Unexpected empty delivery"))?, + ), + + acker, + }) + }) .collect() } + + pub fn max_messages(&self) -> Option { + self.inner.max_messages() + } } #[derive(Debug)] -pub struct TaskQueueDelivery { - pub task: Arc, - acker: Delivery, +pub struct SvixOmniDelivery { + pub task: Arc, + pub(super) acker: Delivery, } -impl TaskQueueDelivery { +impl SvixOmniDelivery { + pub async fn set_ack_deadline(&mut self, duration: Duration) -> Result<()> { + Ok(self.acker.set_ack_deadline(duration).await?) + } pub async fn ack(self) -> Result<()> { - tracing::trace!(msg_id = self.task.msg_id(), "ack"); + tracing::trace!( + task_id = self.task.task_id().map(tracing::field::display), + "ack" + ); let mut retry = Retry::new(should_retry, RETRY_SCHEDULE); let mut acker = Some(self.acker); loop { if let Some(result) = retry .run(|| async { - let delivery = acker - .take() - .expect("acker is always Some when trying to ack"); - delivery.ack().await.map_err(|(e, delivery)| { - // Put the delivery back in acker before retrying, to - // satisfy the expect above. - acker = Some(delivery); - e.into() - }) + match acker.take() { + Some(delivery) => { + delivery.ack().await.map_err(|(e, delivery)| { + // Put the delivery back in acker before retrying, to + // satisfy the expect above. + acker = Some(delivery); + e.into() + }) + } + None => unreachable!(), + } }) .await { @@ -229,27 +287,31 @@ impl TaskQueueDelivery { } pub async fn nack(self) -> Result<()> { - tracing::trace!(msg_id = self.task.msg_id(), "nack"); + tracing::trace!( + task_id = self.task.task_id().map(tracing::field::display), + "nack" + ); let mut retry = Retry::new(should_retry, RETRY_SCHEDULE); let mut acker = Some(self.acker); loop { if let Some(result) = retry .run(|| async { - let delivery = acker - .take() - .expect("acker is always Some when trying to ack"); - - delivery - .nack() - .await - .map_err(|(e, delivery)| { - // Put the delivery back in acker before retrying, to - // satisfy the expect above. - acker = Some(delivery); - e.into() - }) - .trace() + match acker.take() { + Some(delivery) => { + delivery + .nack() + .await + .map_err(|(e, delivery)| { + // Put the delivery back in acker before retrying, to + // satisfy the expect above. + acker = Some(delivery); + Error::from(e) + }) + .trace() + } + _ => unreachable!(), + } }) .await { @@ -258,18 +320,3 @@ impl TaskQueueDelivery { } } } - -impl TryFrom for TaskQueueDelivery { - type Error = Error; - fn try_from(value: Delivery) -> Result { - Ok(TaskQueueDelivery { - task: Arc::new( - value - .payload_serde_json() - .map_err(|_| Error::queue("Failed to decode queue task"))? - .ok_or_else(|| Error::queue("Unexpected empty delivery"))?, - ), - acker: value, - }) - } -} diff --git a/server/svix-server/src/queue/rabbitmq.rs b/server/svix-server/src/queue/rabbitmq.rs index bc7886cf5..a5e282a3d 100644 --- a/server/svix-server/src/queue/rabbitmq.rs +++ b/server/svix-server/src/queue/rabbitmq.rs @@ -186,7 +186,7 @@ mod tests { .await .unwrap(); - producer.send(QueueTask::HealthCheck, None).await.unwrap(); + producer.send(&QueueTask::HealthCheck, None).await.unwrap(); } // Receive with lapin consumer diff --git a/server/svix-server/src/queue/redis.rs b/server/svix-server/src/queue/redis.rs index c2ced7bb2..85f5fd344 100644 --- a/server/svix-server/src/queue/redis.rs +++ b/server/svix-server/src/queue/redis.rs @@ -29,7 +29,7 @@ use std::{num::NonZeroUsize, sync::Arc, time::Duration}; -use omniqueue::backends::{RedisBackend, RedisConfig}; +use omniqueue::backends::{redis::DeadLetterQueueConfig, RedisBackend, RedisConfig}; use redis::{AsyncCommands as _, RedisResult}; use super::{QueueTask, TaskQueueConsumer, TaskQueueProducer}; @@ -51,6 +51,9 @@ const DELAYED: &str = "{queue}_svix_delayed"; /// The key for the lock guarding the delayed queue background task. const DELAYED_LOCK: &str = "{queue}_svix_delayed_lock"; +/// The key for the DLQ +const DLQ: &str = "{queue}_svix_dlq"; + // v2 KEY CONSTANTS const LEGACY_V2_MAIN: &str = "{queue}_svix_main"; const LEGACY_V2_PROCESSING: &str = "{queue}_svix_processing"; @@ -82,11 +85,12 @@ pub async fn new_pair( ) -> (TaskQueueProducer, TaskQueueConsumer) { new_pair_inner( cfg, - Duration::from_secs(45), + Duration::from_secs(cfg.redis_pending_duration_secs), prefix.unwrap_or_default(), MAIN, DELAYED, DELAYED_LOCK, + DLQ, ) .await } @@ -125,10 +129,12 @@ async fn new_pair_inner( main_queue_name: &'static str, delayed_queue_name: &'static str, delayed_lock_name: &'static str, + dlq_name: &'static str, ) -> (TaskQueueProducer, TaskQueueConsumer) { let main_queue_name = format!("{queue_prefix}{main_queue_name}"); let delayed_queue_name = format!("{queue_prefix}{delayed_queue_name}"); let delayed_lock_name = format!("{queue_prefix}{delayed_lock_name}"); + let dlq_name = format!("{queue_prefix}{dlq_name}"); // This fn is only called from // - `queue::new_pair` if the queue type is redis and a DSN is set @@ -205,6 +211,7 @@ async fn new_pair_inner( let pool = pool.clone(); let main_queue_name = main_queue_name.clone(); let delayed_queue_name = delayed_queue_name.clone(); + let deadletter_queue_name = dlq_name.clone(); async move { let mut interval = tokio::time::interval(Duration::from_secs(1)); @@ -214,12 +221,19 @@ async fn new_pair_inner( group: WORKERS_GROUP, }; let delayed_queue = RedisQueueType::SortedSet(&delayed_queue_name); + let deadletter_queue = RedisQueueType::List(&deadletter_queue_name); let metrics = crate::metrics::RedisQueueMetrics::new(&opentelemetry::global::meter("svix.com")); loop { interval.tick().await; metrics - .record(&pool, &main_queue, &pending, &delayed_queue) + .record( + &pool, + &main_queue, + &pending, + &delayed_queue, + &deadletter_queue, + ) .await; } } @@ -236,7 +250,10 @@ async fn new_pair_inner( consumer_name: WORKER_CONSUMER.to_owned(), payload_key: QUEUE_KV_KEY.to_owned(), ack_deadline_ms: pending_duration, - dlq_config: None, + dlq_config: Some(DeadLetterQueueConfig { + queue_key: dlq_name.to_string(), + max_receives: 3, + }), sentinel_config: cfg.redis_sentinel_cfg.clone().map(|c| c.into()), }; @@ -389,7 +406,6 @@ async fn migrate_sset( pub mod tests { use std::time::Duration; - use assert_matches::assert_matches; use chrono::Utc; use redis::{streams::StreamReadReply, AsyncCommands as _, Direction}; use tokio::time::timeout; @@ -398,10 +414,12 @@ pub mod tests { use crate::{ cfg::Configuration, core::types::{ApplicationId, EndpointId, MessageAttemptTriggerType, MessageId}, - queue::{MessageTask, QueueTask, TaskQueueConsumer, TaskQueueProducer}, + queue::{MessageTask, QueueTask}, redis::RedisManager, }; + const TEST_RECV_DEADLINE: Duration = Duration::from_secs(5); + async fn get_pool(cfg: &Configuration) -> RedisManager { RedisManager::from_queue_backend(&cfg.queue_backend(), cfg.redis_pool_max_size).await } @@ -473,13 +491,12 @@ pub mod tests { assert_eq!(should_be_none, vec![]); } - /// Reads and acknowledges all items in the queue with the given name for clearing out entries - /// from previous test runs - async fn flush_stale_queue_items(_p: TaskQueueProducer, c: &mut TaskQueueConsumer) { - while let Ok(recv) = timeout(Duration::from_millis(100), c.receive_all()).await { - let recv = recv.unwrap().pop().unwrap(); - recv.ack().await.unwrap(); - } + async fn cleanup(pool: &RedisManager, q1: &str, q2: &str, q3: &str) { + let mut conn = pool + .get() + .await + .expect("Error retrieving connection from Redis pool"); + let _: () = conn.del(&[q1, q2, q3]).await.unwrap(); } #[tokio::test] @@ -488,18 +505,16 @@ pub mod tests { let cfg = crate::cfg::load().unwrap(); let pool = get_pool(&cfg).await; - let (p, mut c) = new_pair_inner( - &cfg, - Duration::from_millis(100), - "", - "{test}_idle_period", - "{test}_idle_period_delayed", - "{test}_idle_period_delayed_lock", - ) - .await; + let main_queue = "{test}_idle_period"; + let delayed = "{test}_idle_period_delayed"; + let lock = "{test}_idle_period_delayed_lock"; + let dlq = "{test}_dlq"; + + let delay = Duration::from_millis(100); + + cleanup(&pool, main_queue, delayed, lock).await; - tokio::time::sleep(Duration::from_millis(150)).await; - flush_stale_queue_items(p.clone(), &mut c).await; + let (p, mut c) = new_pair_inner(&cfg, delay, "", main_queue, delayed, lock, dlq).await; let mt = QueueTask::MessageV1(MessageTask { msg_id: MessageId("test".to_owned()), @@ -508,16 +523,16 @@ pub mod tests { trigger_type: MessageAttemptTriggerType::Manual, attempt_count: 0, }); - p.send(mt.clone(), None).await.unwrap(); + p.send(&mt, None).await.unwrap(); - let recv = timeout(Duration::from_secs(5), c.receive_all()) + let recv = timeout(Duration::from_secs(5), c.receive_all(TEST_RECV_DEADLINE)) .await .expect("`c.receive()` has timed out"); assert_eq!(*recv.unwrap()[0].task, mt); - tokio::time::sleep(Duration::from_millis(100)).await; + tokio::time::sleep(delay).await; - let recv = timeout(Duration::from_secs(5), c.receive_all()) + let recv = timeout(Duration::from_secs(1), c.receive_all(TEST_RECV_DEADLINE)) .await .expect("`c.receive()` has timed out"); let recv = recv.unwrap().pop().unwrap(); @@ -530,12 +545,12 @@ pub mod tests { .get() .await .expect("Error retrieving connection from Redis pool"); - let keys = conn - .xread::<_, _, StreamReadReply>(&["{test}_ack"], &[0]) + assert!(conn + .xread::<_, _, StreamReadReply>(&[main_queue], &[0]) .await .unwrap() - .keys; - assert_matches!(keys.as_slice(), []); + .keys + .is_empty()); } #[tokio::test] @@ -544,29 +559,16 @@ pub mod tests { let cfg = crate::cfg::load().unwrap(); let pool = get_pool(&cfg).await; - // Delete the keys used in this test to ensure nothing pollutes the output - let mut conn = pool - .get() - .await - .expect("Error retrieving connection from Redis pool"); - let _: () = conn - .del(&[ - "{test}_ack", - "{test}_ack_delayed", - "{test}_ack_delayed_lock", - ]) - .await - .unwrap(); + let main_queue = "{test}_ack"; + let delayed = "{test}_ack_delayed"; + let lock = "{test}_ack_delayed_lock"; + let dlq = "{test}_dlq"; - let (p, mut c) = new_pair_inner( - &cfg, - Duration::from_millis(5000), - "", - "{test}_ack", - "{test}_ack_delayed", - "{test}_ack_delayed_lock", - ) - .await; + cleanup(&pool, main_queue, delayed, lock).await; + + let delay = Duration::from_millis(100); + + let (p, mut c) = new_pair_inner(&cfg, delay, "", main_queue, delayed, lock, dlq).await; let mt = QueueTask::MessageV1(MessageTask { msg_id: MessageId("test2".to_owned()), @@ -575,43 +577,50 @@ pub mod tests { trigger_type: MessageAttemptTriggerType::Manual, attempt_count: 0, }); - p.send(mt.clone(), None).await.unwrap(); + p.send(&mt, None).await.unwrap(); - let recv = c.receive_all().await.unwrap().pop().unwrap(); + let recv = c + .receive_all(TEST_RECV_DEADLINE) + .await + .unwrap() + .pop() + .unwrap(); assert_eq!(*recv.task, mt); recv.ack().await.unwrap(); - if let Ok(recv) = timeout(Duration::from_secs(1), c.receive_all()).await { + if let Ok(recv) = timeout(delay, c.receive_all(TEST_RECV_DEADLINE)).await { panic!("Received unexpected QueueTask {:?}", recv.unwrap()[0].task); } + let mut conn = pool + .get() + .await + .expect("Error retrieving connection from Redis pool"); // And assert that the task has been deleted - let keys = conn - .xread::<_, _, StreamReadReply>(&["{test}_ack"], &[0]) + assert!(conn + .xread::<_, _, StreamReadReply>(&[main_queue], &[0]) .await .unwrap() - .keys; - assert_matches!(keys.as_slice(), []); + .keys + .is_empty()); } #[tokio::test] #[ignore] async fn test_nack() { let cfg = crate::cfg::load().unwrap(); + let pool = get_pool(&cfg).await; - let (p, mut c) = new_pair_inner( - &cfg, - Duration::from_millis(500), - "", - "{test}_nack", - "{test}_nack_delayed", - "{test}_nack_delayed_lock", - ) - .await; + let main_queue = "{test}_nack"; + let delayed = "{test}_nack_delayed"; + let lock = "{test}_nack_delayed_lock"; + let dlq = "{test}_nack_delayed_dlq"; + + cleanup(&pool, main_queue, delayed, lock).await; - tokio::time::sleep(Duration::from_millis(550)).await; + let delay = Duration::from_millis(100); - flush_stale_queue_items(p.clone(), &mut c).await; + let (p, mut c) = new_pair_inner(&cfg, delay, "", main_queue, delayed, lock, dlq).await; let mt = QueueTask::MessageV1(MessageTask { msg_id: MessageId("test".to_owned()), @@ -620,15 +629,23 @@ pub mod tests { trigger_type: MessageAttemptTriggerType::Manual, attempt_count: 0, }); - p.send(mt.clone(), None).await.unwrap(); + p.send(&mt, None).await.unwrap(); - let recv = c.receive_all().await.unwrap().pop().unwrap(); + let recv = c + .receive_all(TEST_RECV_DEADLINE) + .await + .unwrap() + .pop() + .unwrap(); assert_eq!(*recv.task, mt); recv.nack().await.unwrap(); - let recv = timeout(Duration::from_secs(1), c.receive_all()) - .await - .expect("Expected QueueTask"); + let recv = timeout( + Duration::from_millis(500) + delay, + c.receive_all(TEST_RECV_DEADLINE), + ) + .await + .expect("Expected QueueTask"); assert_eq!(*recv.unwrap().pop().unwrap().task, mt); } @@ -636,20 +653,17 @@ pub mod tests { #[ignore] async fn test_delay() { let cfg = crate::cfg::load().unwrap(); + let pool = get_pool(&cfg).await; - let (p, mut c) = new_pair_inner( - &cfg, - Duration::from_millis(500), - "", - "{test}_delay", - "{test}_delay_delayed", - "{test}_delay_delayed_lock", - ) - .await; + let main_queue = "{test}_delay"; + let delayed = "{test}_delay_delayed"; + let lock = "{test}_delay_delayed_lock"; + let dlq = "{test}_delay_delayed_dlq"; - tokio::time::sleep(Duration::from_millis(550)).await; + cleanup(&pool, main_queue, delayed, lock).await; - flush_stale_queue_items(p.clone(), &mut c).await; + let delay = Duration::from_millis(500); + let (p, mut c) = new_pair_inner(&cfg, delay, "", main_queue, delayed, lock, dlq).await; let mt1 = QueueTask::MessageV1(MessageTask { msg_id: MessageId("test1".to_owned()), @@ -666,16 +680,26 @@ pub mod tests { attempt_count: 0, }); - p.send(mt1.clone(), Some(Duration::from_millis(2000))) + p.send(&mt1, Some(Duration::from_millis(2000))) .await .unwrap(); - p.send(mt2.clone(), None).await.unwrap(); + p.send(&mt2, None).await.unwrap(); - let [recv2] = c.receive_all().await.unwrap().try_into().unwrap(); + let recv2 = c + .receive_all(TEST_RECV_DEADLINE) + .await + .unwrap() + .pop() + .unwrap(); assert_eq!(*recv2.task, mt2); recv2.ack().await.unwrap(); - let [recv1] = c.receive_all().await.unwrap().try_into().unwrap(); + let recv1 = c + .receive_all(TEST_RECV_DEADLINE) + .await + .unwrap() + .pop() + .unwrap(); assert_eq!(*recv1.task, mt1); recv1.ack().await.unwrap(); } @@ -807,15 +831,16 @@ pub mod tests { v3_main, v2_delayed, v2_delayed_lock, + "dlq-bruh", ) .await; // 2 second delay on the delayed and pending queue is inserted after main queue, so first // the 6-10 should appear, then 1-5, then 11-15 - let mut items = c.receive_all().await.unwrap(); + let mut items = c.receive_all(TEST_RECV_DEADLINE).await.unwrap(); while items.len() < 15 { - let more_tasks = c.receive_all().await.unwrap(); + let more_tasks = c.receive_all(TEST_RECV_DEADLINE).await.unwrap(); assert!(!more_tasks.is_empty(), "failed to receive all the tasks"); items.extend(more_tasks); } diff --git a/server/svix-server/src/v1/endpoints/admin.rs b/server/svix-server/src/v1/endpoints/admin.rs new file mode 100644 index 000000000..48d8c5bc1 --- /dev/null +++ b/server/svix-server/src/v1/endpoints/admin.rs @@ -0,0 +1,28 @@ +use aide::{ + axum::{routing::post_with, ApiRouter}, + transform::TransformPathItem, +}; +use axum::extract::State; +use svix_server_derive::aide_annotate; + +use crate::{core::permissions, error::Result, v1::utils::NoContent, AppState}; + +/// Redrive DLQ +#[aide_annotate(op_id = "v1.admin.redrive-dlq")] +pub async fn redrive_dlq( + State(AppState { queue_tx, .. }): State, + _: permissions::Organization, +) -> Result { + if let Err(e) = queue_tx.redrive_dlq().await { + tracing::warn!(error = ?e, "DLQ redrive failed"); + } + Ok(NoContent) +} + +pub fn router() -> ApiRouter { + ApiRouter::new().api_route_with( + "/admin/redrive-dlq", + post_with(redrive_dlq, redrive_dlq_operation), + move |op: TransformPathItem<'_>| op.tag("Admin".as_ref()).hidden(true), + ) +} diff --git a/server/svix-server/src/v1/endpoints/attempt.rs b/server/svix-server/src/v1/endpoints/attempt.rs index 7c4f66776..6dd6f1cff 100644 --- a/server/svix-server/src/v1/endpoints/attempt.rs +++ b/server/svix-server/src/v1/endpoints/attempt.rs @@ -845,7 +845,7 @@ async fn resend_webhook( queue_tx .send( - MessageTask::new_task( + &MessageTask::new_task( msg.id.clone(), app.id, endp.id, diff --git a/server/svix-server/src/v1/endpoints/endpoint/recovery.rs b/server/svix-server/src/v1/endpoints/endpoint/recovery.rs index 3d0f6b81d..1d51f26b1 100644 --- a/server/svix-server/src/v1/endpoints/endpoint/recovery.rs +++ b/server/svix-server/src/v1/endpoints/endpoint/recovery.rs @@ -53,7 +53,7 @@ async fn bulk_recover_failed_messages( for msg_dest in items { queue_tx .send( - MessageTask::new_task( + &MessageTask::new_task( msg_dest.msg_id, app.id.clone(), msg_dest.endp_id, diff --git a/server/svix-server/src/v1/endpoints/health.rs b/server/svix-server/src/v1/endpoints/health.rs index da8e59d57..211d75da0 100644 --- a/server/svix-server/src/v1/endpoints/health.rs +++ b/server/svix-server/src/v1/endpoints/health.rs @@ -100,7 +100,7 @@ async fn health( .into(); // Send a [`HealthCheck`] through the queue - let queue: HealthStatus = queue_tx.send(QueueTask::HealthCheck, None).await.into(); + let queue: HealthStatus = queue_tx.send(&QueueTask::HealthCheck, None).await.into(); // Set a cache value with an expiration to ensure it works let cache: HealthStatus = cache diff --git a/server/svix-server/src/v1/endpoints/message.rs b/server/svix-server/src/v1/endpoints/message.rs index 0e7e2b178..67375bd90 100644 --- a/server/svix-server/src/v1/endpoints/message.rs +++ b/server/svix-server/src/v1/endpoints/message.rs @@ -376,7 +376,7 @@ pub(crate) async fn create_message_inner( { queue_tx .send( - MessageTaskBatch::new_task( + &MessageTaskBatch::new_task( msg.id.clone(), app.id.clone(), force_endpoint, diff --git a/server/svix-server/src/v1/endpoints/mod.rs b/server/svix-server/src/v1/endpoints/mod.rs index 558abc23d..001aa3115 100644 --- a/server/svix-server/src/v1/endpoints/mod.rs +++ b/server/svix-server/src/v1/endpoints/mod.rs @@ -1,6 +1,7 @@ // SPDX-FileCopyrightText: © 2022 Svix Authors // SPDX-License-Identifier: MIT +pub mod admin; pub mod application; pub mod attempt; pub mod auth; diff --git a/server/svix-server/src/v1/mod.rs b/server/svix-server/src/v1/mod.rs index 2cbd9ff46..3cc1be9d8 100644 --- a/server/svix-server/src/v1/mod.rs +++ b/server/svix-server/src/v1/mod.rs @@ -21,6 +21,7 @@ pub fn router() -> ApiRouter { .merge(endpoints::event_type::router()) .merge(endpoints::message::router()) .merge(endpoints::attempt::router()) + .merge(endpoints::admin::router()) .layer( TraceLayer::new_for_http() .make_span_with(AxumOtelSpanCreator) diff --git a/server/svix-server/src/worker.rs b/server/svix-server/src/worker.rs index a521a2306..18e9016db 100644 --- a/server/svix-server/src/worker.rs +++ b/server/svix-server/src/worker.rs @@ -553,7 +553,7 @@ async fn handle_failed_dispatch( } queue_tx .send( - QueueTask::MessageV1(MessageTask { + &QueueTask::MessageV1(MessageTask { attempt_count: msg_task.attempt_count + 1, ..msg_task.clone() }), @@ -918,6 +918,8 @@ pub async fn queue_handler( mut queue_rx: TaskQueueConsumer, op_webhook_sender: OperationalWebhookSender, ) -> Result<()> { + let recv_deadline = Duration::from_secs(cfg.queue_max_poll_secs.into()); + static NUM_WORKERS: AtomicUsize = AtomicUsize::new(0); let task_limit = cfg.worker_max_tasks; @@ -980,7 +982,7 @@ pub async fn queue_handler( break; } - match queue_rx.receive_all().await { + match queue_rx.receive_all(recv_deadline).await { Ok(batch) => { for delivery in batch { let cfg = cfg.clone(); diff --git a/server/svix-server/tests/it/redis_queue.rs b/server/svix-server/tests/it/redis_queue.rs index 0fe191dc6..cdc4a81f1 100644 --- a/server/svix-server/tests/it/redis_queue.rs +++ b/server/svix-server/tests/it/redis_queue.rs @@ -6,17 +6,27 @@ use std::{str::FromStr, time::Duration}; +use http::StatusCode; use redis::AsyncCommands as _; +use svix_ksuid::KsuidLike; use svix_server::{ cfg::Configuration, - core::types::{ApplicationId, EndpointId, MessageAttemptTriggerType, MessageId}, + core::types::{ + ApplicationId, BaseId, EndpointId, MessageAttemptTriggerType, MessageId, OrganizationId, + }, queue::{ new_pair, MessageTask, QueueTask, TaskQueueConsumer, TaskQueueDelivery, TaskQueueProducer, }, redis::RedisManager, + v1::endpoints::message::MessageOut, }; use tokio::time::timeout; +use crate::utils::{ + common_calls::{create_test_app, create_test_endpoint, message_in}, + get_default_test_config, start_svix_server_with_cfg_and_org_id_and_prefix, +}; + // TODO: Don't copy this from the Redis queue test directly, place the fn somewhere both can access async fn get_pool(cfg: &Configuration) -> RedisManager { RedisManager::from_queue_backend(&cfg.queue_backend(), cfg.redis_pool_max_size).await @@ -59,7 +69,7 @@ async fn test_many_queue_consumers_inner(prefix: &str, delay: Option) for (index, (p, _c)) in producers_and_consumers.iter().enumerate() { for num in 0..200 { p.send( - QueueTask::MessageV1(MessageTask { + &QueueTask::MessageV1(MessageTask { msg_id: MessageId(format!("{}", index * 200 + num)), app_id: ApplicationId("TestApplicationId".to_owned()), endpoint_id: EndpointId("TestEndpointId".to_owned()), @@ -88,7 +98,12 @@ async fn test_many_queue_consumers_inner(prefix: &str, delay: Option) let mut out = Vec::new(); let mut read = 0; - while let Ok(recv) = timeout(Duration::from_secs(1), c.receive_all()).await { + while let Ok(recv) = timeout( + Duration::from_secs(1), + c.receive_all(Duration::from_secs(5)), + ) + .await + { let recv = recv.unwrap(); read += recv.len(); for r in recv { @@ -143,3 +158,90 @@ async fn test_many_queue_consumers_delayed() { ) .await; } + +#[tokio::test] +#[ignore] +async fn test_redis_streams_dlq() { + let mut cfg = get_default_test_config(); + cfg.worker_enabled = false; + cfg.redis_pending_duration_secs = 1; + + let cfg = std::sync::Arc::new(cfg); + let prefix = svix_ksuid::Ksuid::new(None, None).to_string(); + + let pool = get_pool(&cfg).await; + let mut conn = pool.get().await.unwrap(); + + let _: () = conn + .del(format!("{prefix}{{queue}}_svix_v3_main")) + .await + .unwrap(); + + let _: () = conn + .del(format!("{prefix}{{queue}}_svix_dlq")) + .await + .unwrap(); + + let (client, _jh) = start_svix_server_with_cfg_and_org_id_and_prefix( + &cfg, + OrganizationId::new(None, None), + prefix.clone(), + ) + .await; + + let app_id = create_test_app(&client, "v1MessageCRTestApp") + .await + .unwrap() + .id; + + let _endp_id = create_test_endpoint(&client, &app_id, "http://localhost:2/bad/url/") + .await + .unwrap() + .id; + + let _message_1: MessageOut = client + .post( + &format!("api/v1/app/{app_id}/msg/"), + message_in(&app_id, serde_json::json!({"test": "value"})).unwrap(), + StatusCode::ACCEPTED, + ) + .await + .unwrap(); + + let (_p, mut c) = new_pair(&cfg, Some(&prefix)).await; + + let wait_time = std::time::Duration::from_millis(1_500); + for _ in 0..3 { + let res = c.receive_all(wait_time).await.unwrap(); + assert!(!res.is_empty()); + for j in res { + j.nack().await.unwrap(); + } + } + + let res = c.receive_all(wait_time).await.unwrap(); + assert!(res.is_empty()); + + tokio::time::sleep(wait_time).await; + + // Redrive + client + .post_without_response( + "/api/v1/admin/redrive-dlq", + serde_json::Value::Null, + StatusCode::NO_CONTENT, + ) + .await + .unwrap(); + + for _ in 0..3 { + let res = c.receive_all(wait_time).await.unwrap(); + assert!(!res.is_empty()); + for j in res { + j.nack().await.unwrap(); + } + } + + let res = c.receive_all(wait_time).await.unwrap(); + assert!(res.is_empty()); +} diff --git a/server/svix-server/tests/it/utils/mod.rs b/server/svix-server/tests/it/utils/mod.rs index 5ba32f0b3..93620bbb9 100644 --- a/server/svix-server/tests/it/utils/mod.rs +++ b/server/svix-server/tests/it/utils/mod.rs @@ -330,6 +330,15 @@ pub async fn start_svix_server_with_cfg( pub async fn start_svix_server_with_cfg_and_org_id( cfg: &ConfigurationInner, org_id: OrganizationId, +) -> (TestClient, tokio::task::JoinHandle<()>) { + let prefix = svix_ksuid::Ksuid::new(None, None).to_string(); + start_svix_server_with_cfg_and_org_id_and_prefix(cfg, org_id, prefix).await +} + +pub async fn start_svix_server_with_cfg_and_org_id_and_prefix( + cfg: &ConfigurationInner, + org_id: OrganizationId, + prefix: String, ) -> (TestClient, tokio::task::JoinHandle<()>) { let (tracing_subscriber, _guard) = setup_tracing(cfg, /* for_test = */ true); @@ -340,12 +349,8 @@ pub async fn start_svix_server_with_cfg_and_org_id( let base_uri = format!("http://{}", listener.local_addr().unwrap()); let jh = tokio::spawn( - svix_server::run_with_prefix( - Some(svix_ksuid::Ksuid::new(None, None).to_string()), - cfg, - Some(listener), - ) - .with_subscriber(tracing_subscriber), + svix_server::run_with_prefix(Some(prefix), cfg, Some(listener)) + .with_subscriber(tracing_subscriber), ); (TestClient::new(base_uri, &token), jh)