From 0d3b617be1f8059e8604cd828e428210861b3e8f Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Wed, 28 Feb 2024 15:12:53 +0100 Subject: [PATCH] try another thing --- server/svix-server/src/queue/mod.rs | 9 +-- server/svix-server/src/queue/redis.rs | 95 ++++++++++++++------------- 2 files changed, 50 insertions(+), 54 deletions(-) diff --git a/server/svix-server/src/queue/mod.rs b/server/svix-server/src/queue/mod.rs index eae43f448..43da395a0 100644 --- a/server/svix-server/src/queue/mod.rs +++ b/server/svix-server/src/queue/mod.rs @@ -38,13 +38,8 @@ pub async fn new_pair( prefix: Option<&str>, ) -> (TaskQueueProducer, TaskQueueConsumer) { match cfg.queue_backend() { - QueueBackend::Redis(dsn) => { - let pool = crate::redis::new_redis_pool(dsn, cfg).await; - redis::new_pair(pool, prefix).await - } - QueueBackend::RedisCluster(dsn) => { - let pool = crate::redis::new_redis_pool_clustered(dsn, cfg).await; - redis::new_pair(pool, prefix).await + QueueBackend::Redis(_) | QueueBackend::RedisCluster(_) => { + redis::new_pair(cfg, prefix).await } QueueBackend::Memory => { let (producer, consumer) = InMemoryBackend::builder() diff --git a/server/svix-server/src/queue/redis.rs b/server/svix-server/src/queue/redis.rs index 24114e208..cfce9766e 100644 --- a/server/svix-server/src/queue/redis.rs +++ b/server/svix-server/src/queue/redis.rs @@ -39,6 +39,7 @@ use redis::{ use tokio::time::sleep; use crate::{ + cfg::{Configuration, QueueType}, error::{Error, Result}, queue::Acker, redis::RedisPool, @@ -94,11 +95,11 @@ impl SendConnectionLike for T {} /// Generates a [`TaskQueueProducer`] and a [`TaskQueueConsumer`] backed by Redis. pub async fn new_pair( - pool: RedisPool, + cfg: &Configuration, prefix: Option<&str>, ) -> (TaskQueueProducer, TaskQueueConsumer) { new_pair_inner( - pool, + cfg, Duration::from_secs(45), prefix.unwrap_or_default(), MAIN, @@ -278,16 +279,25 @@ async fn run_migration_schedule(delays: &[Duration], conn: &mut impl SendConnect /// An inner function allowing key constants to be variable for testing purposes async fn new_pair_inner( - pool: RedisPool, + cfg: &Configuration, pending_duration: Duration, queue_prefix: &str, main_queue_name: &'static str, delayed_queue_name: &'static str, - delayed_lock: &'static str, + delayed_lock_name: &'static str, ) -> (TaskQueueProducer, TaskQueueConsumer) { let main_queue_name = format!("{queue_prefix}{main_queue_name}"); let delayed_queue_name = format!("{queue_prefix}{delayed_queue_name}"); - let delayed_lock = format!("{queue_prefix}{delayed_lock}"); + let delayed_lock = format!("{queue_prefix}{delayed_lock_name}"); + + // This fn is only called from + // - `queue::new_pair` if the queue type is redis and a DSN is set + // - redis tests that only makes sense to run with the DSN set + let dsn = cfg.redis_dsn.as_deref().unwrap(); + let pool = match &cfg.queue_type { + QueueType::RedisCluster => crate::redis::new_redis_pool_clustered(dsn, cfg).await, + _ => crate::redis::new_redis_pool(dsn, cfg).await, + }; // Create the stream and consumer group for the MAIN queue should it not already exist. The // consumer is created automatically upon use so it does not have to be created here. @@ -330,36 +340,33 @@ async fn new_pair_inner( let dqn = delayed_queue_name.clone(); // Migrate v1 queues to v2 and v2 queues to v3 on a loop with exponential backoff. - tokio::spawn({ - let pool = pool.clone(); + let pool_ = pool.clone(); + tokio::spawn(async move { + let mut conn = pool_ + .get() + .await + .expect("Error retrieving connection from Redis pool"); - async move { - let mut conn = pool - .get() - .await - .expect("Error retrieving connection from Redis pool"); - - let delays = [ - // 11.25 min - Duration::from_secs(60 * 11 + 15), - // 22.5 min - Duration::from_secs(60 * 22 + 30), - // 45 min - Duration::from_secs(60 * 45), - // 1.5 hours - Duration::from_secs(60 * 30 * 3), - // 3 hours - Duration::from_secs(60 * 60 * 3), - // 6 hours - Duration::from_secs(60 * 60 * 6), - // 12 hours - Duration::from_secs(60 * 60 * 12), - // 24 hours - Duration::from_secs(60 * 60 * 24), - ]; - - run_migration_schedule(&delays, &mut conn).await; - } + let delays = [ + // 11.25 min + Duration::from_secs(60 * 11 + 15), + // 22.5 min + Duration::from_secs(60 * 22 + 30), + // 45 min + Duration::from_secs(60 * 45), + // 1.5 hours + Duration::from_secs(60 * 30 * 3), + // 3 hours + Duration::from_secs(60 * 60 * 3), + // 6 hours + Duration::from_secs(60 * 60 * 6), + // 12 hours + Duration::from_secs(60 * 60 * 12), + // 24 hours + Duration::from_secs(60 * 60 * 24), + ]; + + run_migration_schedule(&delays, &mut conn).await; }); tokio::spawn({ @@ -703,13 +710,9 @@ pub mod tests { pub async fn get_pool(cfg: &Configuration) -> RedisPool { match cfg.cache_type { CacheType::RedisCluster => { - crate::redis::new_redis_pool_clustered( - cfg.redis_dsn.as_ref().unwrap().as_str(), - cfg, - ) - .await + crate::redis::new_redis_pool_clustered(cfg.redis_dsn.as_deref().unwrap(), cfg).await } - _ => crate::redis::new_redis_pool(cfg.redis_dsn.as_ref().unwrap().as_str(), cfg).await, + _ => crate::redis::new_redis_pool(cfg.redis_dsn.as_deref().unwrap(), cfg).await, } } @@ -800,7 +803,7 @@ pub mod tests { let pool = get_pool(&cfg).await; let (p, mut c) = new_pair_inner( - pool.clone(), + &cfg, Duration::from_millis(100), "", "{test}_idle_period", @@ -879,7 +882,7 @@ pub mod tests { .unwrap(); let (p, mut c) = new_pair_inner( - pool.clone(), + &cfg, Duration::from_millis(5000), "", "{test}_ack", @@ -921,10 +924,9 @@ pub mod tests { #[tokio::test] async fn test_nack() { let cfg = crate::cfg::load().unwrap(); - let pool = get_pool(&cfg).await; let (p, mut c) = new_pair_inner( - pool, + &cfg, Duration::from_millis(500), "", "{test}_nack", @@ -964,10 +966,9 @@ pub mod tests { #[tokio::test] async fn test_delay() { let cfg = crate::cfg::load().unwrap(); - let pool = get_pool(&cfg).await; let (p, mut c) = new_pair_inner( - pool, + &cfg, Duration::from_millis(500), "", "{test}_delay", @@ -1123,7 +1124,7 @@ pub mod tests { // Read let (_p, mut c) = new_pair_inner( - pool, + &cfg, Duration::from_secs(5), "", v3_main,