Skip to content

Commit

Permalink
server: Generalize migration methods
Browse files Browse the repository at this point in the history
… to work with non-pooled connections as well.
  • Loading branch information
svix-jplatte committed Feb 28, 2024
1 parent 54e8e36 commit a69008d
Showing 1 changed file with 20 additions and 17 deletions.
37 changes: 20 additions & 17 deletions server/svix-server/src/queue/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use tokio::time::sleep;
use crate::{
error::{Error, Result},
queue::Acker,
redis::{PooledConnection, RedisPool},
redis::RedisPool,
};

use super::{
Expand Down Expand Up @@ -595,21 +595,24 @@ impl TaskQueueReceive for RedisQueueConsumer {
}
}

async fn migrate_v2_to_v3_queues(pool: &mut PooledConnection<'_>) -> Result<()> {
migrate_list_to_stream(pool, LEGACY_V2_MAIN, MAIN).await?;
migrate_list_to_stream(pool, LEGACY_V2_PROCESSING, MAIN).await?;
trait SendConnectionLike: redis::aio::ConnectionLike + Send {}
impl<T: redis::aio::ConnectionLike + Send + ?Sized> SendConnectionLike for T {}

async fn migrate_v2_to_v3_queues(conn: &mut impl SendConnectionLike) -> Result<()> {
migrate_list_to_stream(conn, LEGACY_V2_MAIN, MAIN).await?;
migrate_list_to_stream(conn, LEGACY_V2_PROCESSING, MAIN).await?;

Ok(())
}

async fn migrate_list_to_stream(
pool: &mut PooledConnection<'_>,
conn: &mut impl SendConnectionLike,
legacy_queue: &str,
queue: &str,
) -> Result<()> {
let batch_size = 1000;
loop {
let legacy_keys: Vec<String> = pool
let legacy_keys: Vec<String> = conn
.lpop(legacy_queue, NonZeroUsize::new(batch_size))
.await?;
if legacy_keys.is_empty() {
Expand All @@ -631,27 +634,27 @@ async fn migrate_list_to_stream(
);
}

let _: () = pool.query_async_pipeline(pipe).await?;
let _: () = pipe.query_async(conn).await?;
}
}

async fn migrate_v1_to_v2_queues(pool: &mut PooledConnection<'_>) -> Result<()> {
migrate_list(pool, LEGACY_V1_MAIN, LEGACY_V2_MAIN).await?;
migrate_list(pool, LEGACY_V1_PROCESSING, LEGACY_V2_PROCESSING).await?;
migrate_sset(pool, LEGACY_V1_DELAYED, DELAYED).await?;
async fn migrate_v1_to_v2_queues(conn: &mut impl SendConnectionLike) -> Result<()> {
migrate_list(conn, LEGACY_V1_MAIN, LEGACY_V2_MAIN).await?;
migrate_list(conn, LEGACY_V1_PROCESSING, LEGACY_V2_PROCESSING).await?;
migrate_sset(conn, LEGACY_V1_DELAYED, DELAYED).await?;

Ok(())
}

async fn migrate_list(
pool: &mut PooledConnection<'_>,
conn: &mut impl SendConnectionLike,
legacy_queue: &str,
queue: &str,
) -> Result<()> {
let batch_size = 1000;
loop {
// Checking for old messages from queue
let legacy_keys: Vec<String> = pool
let legacy_keys: Vec<String> = conn
.lpop(legacy_queue, NonZeroUsize::new(batch_size))
.await?;
if legacy_keys.is_empty() {
Expand All @@ -662,19 +665,19 @@ async fn migrate_list(
legacy_keys.len(),
legacy_queue
);
let _: () = pool.rpush(queue, legacy_keys).await?;
let _: () = conn.rpush(queue, legacy_keys).await?;
}
}

async fn migrate_sset(
pool: &mut PooledConnection<'_>,
conn: &mut impl SendConnectionLike,
legacy_queue: &str,
queue: &str,
) -> Result<()> {
let batch_size = 1000;
loop {
// Checking for old messages from LEGACY_DELAYED
let legacy_keys: Vec<(String, f64)> = pool.zpopmin(legacy_queue, batch_size).await?;
let legacy_keys: Vec<(String, f64)> = conn.zpopmin(legacy_queue, batch_size).await?;

if legacy_keys.is_empty() {
break Ok(());
Expand All @@ -687,7 +690,7 @@ async fn migrate_sset(
let legacy_keys: Vec<(f64, String)> =
legacy_keys.into_iter().map(|(x, y)| (y, x)).collect();

let _: () = pool.zadd_multiple(queue, &legacy_keys).await?;
let _: () = conn.zadd_multiple(queue, &legacy_keys).await?;
}
}

Expand Down

0 comments on commit a69008d

Please sign in to comment.