Skip to content

Commit

Permalink
try another thing
Browse files Browse the repository at this point in the history
  • Loading branch information
svix-jplatte committed Feb 28, 2024
1 parent 9e48ab3 commit 0d3b617
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 54 deletions.
9 changes: 2 additions & 7 deletions server/svix-server/src/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,8 @@ pub async fn new_pair(
prefix: Option<&str>,
) -> (TaskQueueProducer, TaskQueueConsumer) {
match cfg.queue_backend() {
QueueBackend::Redis(dsn) => {
let pool = crate::redis::new_redis_pool(dsn, cfg).await;
redis::new_pair(pool, prefix).await
}
QueueBackend::RedisCluster(dsn) => {
let pool = crate::redis::new_redis_pool_clustered(dsn, cfg).await;
redis::new_pair(pool, prefix).await
QueueBackend::Redis(_) | QueueBackend::RedisCluster(_) => {
redis::new_pair(cfg, prefix).await
}
QueueBackend::Memory => {
let (producer, consumer) = InMemoryBackend::builder()
Expand Down
95 changes: 48 additions & 47 deletions server/svix-server/src/queue/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use redis::{
use tokio::time::sleep;

use crate::{
cfg::{Configuration, QueueType},
error::{Error, Result},
queue::Acker,
redis::RedisPool,
Expand Down Expand Up @@ -94,11 +95,11 @@ impl<T: redis::aio::ConnectionLike + Send + ?Sized> SendConnectionLike for T {}

/// Generates a [`TaskQueueProducer`] and a [`TaskQueueConsumer`] backed by Redis.
pub async fn new_pair(
pool: RedisPool,
cfg: &Configuration,
prefix: Option<&str>,
) -> (TaskQueueProducer, TaskQueueConsumer) {
new_pair_inner(
pool,
cfg,
Duration::from_secs(45),
prefix.unwrap_or_default(),
MAIN,
Expand Down Expand Up @@ -278,16 +279,25 @@ async fn run_migration_schedule(delays: &[Duration], conn: &mut impl SendConnect

/// An inner function allowing key constants to be variable for testing purposes
async fn new_pair_inner(
pool: RedisPool,
cfg: &Configuration,
pending_duration: Duration,
queue_prefix: &str,
main_queue_name: &'static str,
delayed_queue_name: &'static str,
delayed_lock: &'static str,
delayed_lock_name: &'static str,
) -> (TaskQueueProducer, TaskQueueConsumer) {
let main_queue_name = format!("{queue_prefix}{main_queue_name}");
let delayed_queue_name = format!("{queue_prefix}{delayed_queue_name}");
let delayed_lock = format!("{queue_prefix}{delayed_lock}");
let delayed_lock = format!("{queue_prefix}{delayed_lock_name}");

// This fn is only called from
// - `queue::new_pair` if the queue type is redis and a DSN is set
// - redis tests that only makes sense to run with the DSN set
let dsn = cfg.redis_dsn.as_deref().unwrap();
let pool = match &cfg.queue_type {
QueueType::RedisCluster => crate::redis::new_redis_pool_clustered(dsn, cfg).await,
_ => crate::redis::new_redis_pool(dsn, cfg).await,
};

// Create the stream and consumer group for the MAIN queue should it not already exist. The
// consumer is created automatically upon use so it does not have to be created here.
Expand Down Expand Up @@ -330,36 +340,33 @@ async fn new_pair_inner(
let dqn = delayed_queue_name.clone();

// Migrate v1 queues to v2 and v2 queues to v3 on a loop with exponential backoff.
tokio::spawn({
let pool = pool.clone();
let pool_ = pool.clone();
tokio::spawn(async move {
let mut conn = pool_
.get()
.await
.expect("Error retrieving connection from Redis pool");

async move {
let mut conn = pool
.get()
.await
.expect("Error retrieving connection from Redis pool");

let delays = [
// 11.25 min
Duration::from_secs(60 * 11 + 15),
// 22.5 min
Duration::from_secs(60 * 22 + 30),
// 45 min
Duration::from_secs(60 * 45),
// 1.5 hours
Duration::from_secs(60 * 30 * 3),
// 3 hours
Duration::from_secs(60 * 60 * 3),
// 6 hours
Duration::from_secs(60 * 60 * 6),
// 12 hours
Duration::from_secs(60 * 60 * 12),
// 24 hours
Duration::from_secs(60 * 60 * 24),
];

run_migration_schedule(&delays, &mut conn).await;
}
let delays = [
// 11.25 min
Duration::from_secs(60 * 11 + 15),
// 22.5 min
Duration::from_secs(60 * 22 + 30),
// 45 min
Duration::from_secs(60 * 45),
// 1.5 hours
Duration::from_secs(60 * 30 * 3),
// 3 hours
Duration::from_secs(60 * 60 * 3),
// 6 hours
Duration::from_secs(60 * 60 * 6),
// 12 hours
Duration::from_secs(60 * 60 * 12),
// 24 hours
Duration::from_secs(60 * 60 * 24),
];

run_migration_schedule(&delays, &mut conn).await;
});

tokio::spawn({
Expand Down Expand Up @@ -703,13 +710,9 @@ pub mod tests {
pub async fn get_pool(cfg: &Configuration) -> RedisPool {
match cfg.cache_type {
CacheType::RedisCluster => {
crate::redis::new_redis_pool_clustered(
cfg.redis_dsn.as_ref().unwrap().as_str(),
cfg,
)
.await
crate::redis::new_redis_pool_clustered(cfg.redis_dsn.as_deref().unwrap(), cfg).await
}
_ => crate::redis::new_redis_pool(cfg.redis_dsn.as_ref().unwrap().as_str(), cfg).await,
_ => crate::redis::new_redis_pool(cfg.redis_dsn.as_deref().unwrap(), cfg).await,
}
}

Expand Down Expand Up @@ -800,7 +803,7 @@ pub mod tests {
let pool = get_pool(&cfg).await;

let (p, mut c) = new_pair_inner(
pool.clone(),
&cfg,
Duration::from_millis(100),
"",
"{test}_idle_period",
Expand Down Expand Up @@ -879,7 +882,7 @@ pub mod tests {
.unwrap();

let (p, mut c) = new_pair_inner(
pool.clone(),
&cfg,
Duration::from_millis(5000),
"",
"{test}_ack",
Expand Down Expand Up @@ -921,10 +924,9 @@ pub mod tests {
#[tokio::test]
async fn test_nack() {
let cfg = crate::cfg::load().unwrap();
let pool = get_pool(&cfg).await;

let (p, mut c) = new_pair_inner(
pool,
&cfg,
Duration::from_millis(500),
"",
"{test}_nack",
Expand Down Expand Up @@ -964,10 +966,9 @@ pub mod tests {
#[tokio::test]
async fn test_delay() {
let cfg = crate::cfg::load().unwrap();
let pool = get_pool(&cfg).await;

let (p, mut c) = new_pair_inner(
pool,
&cfg,
Duration::from_millis(500),
"",
"{test}_delay",
Expand Down Expand Up @@ -1123,7 +1124,7 @@ pub mod tests {

// Read
let (_p, mut c) = new_pair_inner(
pool,
&cfg,
Duration::from_secs(5),
"",
v3_main,
Expand Down

0 comments on commit 0d3b617

Please sign in to comment.