Skip to content

Commit

Permalink
Rename PooledConnection to RedisConnection
Browse files Browse the repository at this point in the history
Not all of our connection variants are pooled now, so rename
to make it less confusing.
  • Loading branch information
jaymell authored and svix-james committed Oct 2, 2024
1 parent 0f446bd commit e794bdf
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 32 deletions.
12 changes: 6 additions & 6 deletions server/svix-server/src/queue/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use super::{QueueTask, TaskQueueConsumer, TaskQueueProducer};
use crate::{
cfg::{Configuration, QueueType},
error::Result,
redis::{PooledConnection, RedisManager},
redis::{RedisConnection, RedisManager},
};

/// This is the key of the main queue. As a KV store, redis places the entire stream under this key.
Expand Down Expand Up @@ -240,15 +240,15 @@ fn task_from_redis_key(key: &str) -> serde_json::Result<Arc<QueueTask>> {
serde_json::from_str(&key[pos + 1..])
}

async fn migrate_v2_to_v3_queues(conn: &mut PooledConnection<'_>) -> Result<()> {
async fn migrate_v2_to_v3_queues(conn: &mut RedisConnection<'_>) -> Result<()> {
migrate_list_to_stream(conn, LEGACY_V2_MAIN, MAIN).await?;
migrate_list_to_stream(conn, LEGACY_V2_PROCESSING, MAIN).await?;

Ok(())
}

async fn migrate_list_to_stream(
conn: &mut PooledConnection<'_>,
conn: &mut RedisConnection<'_>,
legacy_queue: &str,
queue: &str,
) -> Result<()> {
Expand Down Expand Up @@ -286,7 +286,7 @@ async fn migrate_list_to_stream(
}
}

async fn migrate_v1_to_v2_queues(conn: &mut PooledConnection<'_>) -> Result<()> {
async fn migrate_v1_to_v2_queues(conn: &mut RedisConnection<'_>) -> Result<()> {
migrate_list(conn, LEGACY_V1_MAIN, LEGACY_V2_MAIN).await?;
migrate_list(conn, LEGACY_V1_PROCESSING, LEGACY_V2_PROCESSING).await?;
migrate_sset(conn, LEGACY_V1_DELAYED, DELAYED).await?;
Expand All @@ -295,7 +295,7 @@ async fn migrate_v1_to_v2_queues(conn: &mut PooledConnection<'_>) -> Result<()>
}

async fn migrate_list(
conn: &mut PooledConnection<'_>,
conn: &mut RedisConnection<'_>,
legacy_queue: &str,
queue: &str,
) -> Result<()> {
Expand All @@ -318,7 +318,7 @@ async fn migrate_list(
}

async fn migrate_sset(
conn: &mut PooledConnection<'_>,
conn: &mut RedisConnection<'_>,
legacy_queue: &str,
queue: &str,
) -> Result<()> {
Expand Down
50 changes: 24 additions & 26 deletions server/svix-server/src/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl RedisManager {
}
}

pub async fn get(&self) -> Result<PooledConnection<'_>, RunError<RedisError>> {
pub async fn get(&self) -> Result<RedisConnection<'_>, RunError<RedisError>> {
match self {
Self::Clustered(pool) => pool.get().await,
Self::NonClustered(pool) => pool.get().await,
Expand All @@ -103,11 +103,11 @@ pub struct ClusteredRedisPool {
}

impl ClusteredRedisPool {
pub async fn get(&self) -> Result<PooledConnection<'_>, RunError<RedisError>> {
pub async fn get(&self) -> Result<RedisConnection<'_>, RunError<RedisError>> {
let con = ClusteredPooledConnection {
con: self.pool.get().await?,
};
Ok(PooledConnection::Clustered(con))
Ok(RedisConnection::Clustered(con))
}
}

Expand All @@ -117,8 +117,8 @@ pub struct ClusteredRedisUnpooled {
}

impl ClusteredRedisUnpooled {
pub async fn get(&self) -> Result<PooledConnection<'_>, RunError<RedisError>> {
Ok(PooledConnection::ClusteredUnpooled(
pub async fn get(&self) -> Result<RedisConnection<'_>, RunError<RedisError>> {
Ok(RedisConnection::ClusteredUnpooled(
ClusteredUnpooledConnection {
con: self.con.clone(),
},
Expand All @@ -138,8 +138,8 @@ pub struct NonClusteredRedisUnpooled {
}

impl NonClusteredRedisUnpooled {
pub async fn get(&self) -> Result<PooledConnection<'_>, RunError<RedisError>> {
Ok(PooledConnection::NonClusteredUnpooled(
pub async fn get(&self) -> Result<RedisConnection<'_>, RunError<RedisError>> {
Ok(RedisConnection::NonClusteredUnpooled(
NonClusteredUnpooledConnection {
con: self.con.clone(),
},
Expand All @@ -159,21 +159,21 @@ pub struct NonClusteredRedisPool {
}

impl NonClusteredRedisPool {
pub async fn get(&self) -> Result<PooledConnection<'_>, RunError<RedisError>> {
pub async fn get(&self) -> Result<RedisConnection<'_>, RunError<RedisError>> {
let con = self.pool.get().await?;
let con = NonClusteredPooledConnection { con };
Ok(PooledConnection::NonClustered(con))
Ok(RedisConnection::NonClustered(con))
}
}

pub enum PooledConnection<'a> {
pub enum RedisConnection<'a> {
Clustered(ClusteredPooledConnection<'a>),
ClusteredUnpooled(ClusteredUnpooledConnection),
NonClustered(NonClusteredPooledConnection<'a>),
NonClusteredUnpooled(NonClusteredUnpooledConnection),
}

impl PooledConnection<'_> {
impl RedisConnection<'_> {
pub async fn query_async<T: FromRedisValue>(&mut self, cmd: redis::Cmd) -> RedisResult<T> {
cmd.query_async(self).await
}
Expand All @@ -186,16 +186,16 @@ impl PooledConnection<'_> {
}
}

impl redis::aio::ConnectionLike for PooledConnection<'_> {
impl redis::aio::ConnectionLike for RedisConnection<'_> {
fn req_packed_command<'a>(
&'a mut self,
cmd: &'a redis::Cmd,
) -> redis::RedisFuture<'a, redis::Value> {
match self {
PooledConnection::Clustered(conn) => conn.con.req_packed_command(cmd),
PooledConnection::NonClustered(conn) => conn.con.req_packed_command(cmd),
PooledConnection::ClusteredUnpooled(conn) => conn.con.req_packed_command(cmd),
PooledConnection::NonClusteredUnpooled(conn) => conn.con.req_packed_command(cmd),
RedisConnection::Clustered(conn) => conn.con.req_packed_command(cmd),
RedisConnection::NonClustered(conn) => conn.con.req_packed_command(cmd),
RedisConnection::ClusteredUnpooled(conn) => conn.con.req_packed_command(cmd),
RedisConnection::NonClusteredUnpooled(conn) => conn.con.req_packed_command(cmd),
}
}

Expand All @@ -206,25 +206,23 @@ impl redis::aio::ConnectionLike for PooledConnection<'_> {
count: usize,
) -> redis::RedisFuture<'a, Vec<redis::Value>> {
match self {
PooledConnection::Clustered(conn) => conn.con.req_packed_commands(cmd, offset, count),
PooledConnection::NonClustered(conn) => {
RedisConnection::Clustered(conn) => conn.con.req_packed_commands(cmd, offset, count),
RedisConnection::NonClustered(conn) => conn.con.req_packed_commands(cmd, offset, count),
RedisConnection::ClusteredUnpooled(conn) => {
conn.con.req_packed_commands(cmd, offset, count)
}
PooledConnection::ClusteredUnpooled(conn) => {
conn.con.req_packed_commands(cmd, offset, count)
}
PooledConnection::NonClusteredUnpooled(conn) => {
RedisConnection::NonClusteredUnpooled(conn) => {
conn.con.req_packed_commands(cmd, offset, count)
}
}
}

fn get_db(&self) -> i64 {
match self {
PooledConnection::Clustered(conn) => conn.con.get_db(),
PooledConnection::NonClustered(conn) => conn.con.get_db(),
PooledConnection::ClusteredUnpooled(conn) => conn.con.get_db(),
PooledConnection::NonClusteredUnpooled(conn) => conn.con.get_db(),
RedisConnection::Clustered(conn) => conn.con.get_db(),
RedisConnection::NonClustered(conn) => conn.con.get_db(),
RedisConnection::ClusteredUnpooled(conn) => conn.con.get_db(),
RedisConnection::NonClusteredUnpooled(conn) => conn.con.get_db(),
}
}
}
Expand Down

0 comments on commit e794bdf

Please sign in to comment.