From d5d90f10cf78ea312150a5fc0af2f9b2a09d0925 Mon Sep 17 00:00:00 2001 From: James Lucas Date: Tue, 10 Sep 2024 16:23:08 -0500 Subject: [PATCH] Add redis sentinel support This adds support for using Redis Sentinel for queuing and caching. Sentinel potentially requires a lot of additional configuration params, so support for that that has been added here, although the `redis_dsn` field has been reused and should point to a sentinel server if using the `redissentinel` queue/cache types. --- server/Cargo.lock | 62 ++++- server/run-tests.sh | 13 ++ server/svix-server/Cargo.toml | 6 +- server/svix-server/src/cfg.rs | 93 ++++++++ server/svix-server/src/lib.rs | 4 +- server/svix-server/src/queue/mod.rs | 6 +- server/svix-server/src/queue/redis.rs | 15 +- server/svix-server/src/redis/mod.rs | 259 +++++++++++++++++---- server/svix-server/src/redis/sentinel.rs | 53 +++++ server/svix-server/tests/it/message_app.rs | 8 +- server/testing-docker-compose.yml | 36 +++ 11 files changed, 484 insertions(+), 71 deletions(-) create mode 100644 server/svix-server/src/redis/sentinel.rs diff --git a/server/Cargo.lock b/server/Cargo.lock index 02b7c8802..b7c587c03 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -639,13 +639,24 @@ dependencies = [ [[package]] name = "bb8-redis" -version = "0.15.0" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c6910977c026bb1d0a6b523508d1a893d6d4c2ba216355e9569d8181d92ccbe" +dependencies = [ + "async-trait", + "bb8", + "redis 0.26.1", +] + +[[package]] +name = "bb8-redis" +version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7eb4f141b33a750b5f667c445bd8588de10b8f2b045cd2aabc040ca746fb53ae" +checksum = "1781f22daa0ae97d934fdf04a5c66646f154a164c4bdc157ec8d3c11166c05cc" dependencies = [ "async-trait", "bb8", - "redis", + "redis 0.27.2", ] [[package]] @@ -2877,18 +2888,19 @@ dependencies = [ [[package]] name = "omniqueue" version = "0.2.1" -source = "git+https://github.com/svix/omniqueue-rs?rev=5ae22000e2ea214ba707cac81657f098e5785a76#5ae22000e2ea214ba707cac81657f098e5785a76" +source = "git+https://github.com/svix/omniqueue-rs?rev=75e5a9510ad338ac3702b2e911bacf8967ac58d8#75e5a9510ad338ac3702b2e911bacf8967ac58d8" dependencies = [ "async-trait", "bb8", - "bb8-redis", + "bb8-redis 0.17.0", "bytesize", "futures-util", "lapin", - "redis", + "redis 0.27.2", "serde", "serde_json", "svix-ksuid 0.8.0", + "sync_wrapper 1.0.1", "thiserror", "time", "tokio", @@ -3586,9 +3598,9 @@ dependencies = [ [[package]] name = "redis" -version = "0.25.4" +version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0d7a6955c7511f60f3ba9e86c6d02b3c3f144f8c24b288d1f4e18074ab8bbec" +checksum = "e902a69d09078829137b4a5d9d082e0490393537badd7c91a3d69d14639e115f" dependencies = [ "arc-swap", "async-trait", @@ -3600,6 +3612,7 @@ dependencies = [ "itoa", "log", "native-tls", + "num-bigint", "percent-encoding", "pin-project-lite", "rand", @@ -3613,6 +3626,35 @@ dependencies = [ "url", ] +[[package]] +name = "redis" +version = "0.27.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7e86f5670bd8b028edfb240f0616cad620705b31ec389d55e4f3da2c38dcd48" +dependencies = [ + "arc-swap", + "async-trait", + "bytes", + "combine", + "crc16", + "futures", + "futures-util", + "itoa", + "log", + "native-tls", + "num-bigint", + "percent-encoding", + "pin-project-lite", + "rand", + "ryu", + "sha1_smol", + "socket2 0.5.7", + "tokio", + "tokio-native-tls", + "tokio-util", + "url", +] + [[package]] name = "redox_syscall" version = "0.4.1" @@ -4856,7 +4898,7 @@ dependencies = [ "axum-server", "base64 0.13.1", "bb8", - "bb8-redis", + "bb8-redis 0.16.0", "blake2", "bytes", "chacha20poly1305", @@ -4890,7 +4932,7 @@ dependencies = [ "opentelemetry-otlp", "opentelemetry_sdk", "rand", - "redis", + "redis 0.26.1", "regex", "reqwest 0.11.27", "schemars", diff --git a/server/run-tests.sh b/server/run-tests.sh index 28f232b2c..88d2daa72 100755 --- a/server/run-tests.sh +++ b/server/run-tests.sh @@ -80,3 +80,16 @@ echo "*********** RUN 6 ***********" ${TEST_COMMAND} -- --ignored rabbitmq fi ) + +echo "*********** RUN 7 ***********" +( + export SVIX_QUEUE_TYPE="redissentinel" + export SVIX_CACHE_TYPE="redissentinel" + export SVIX_REDIS_DSN="redis://localhost:26379" + export SVIX_SENTINEL_SERVICE_NAME="master0" + + ${TEST_COMMAND} "$@" + if [[ -z "$@" ]]; then + ${TEST_COMMAND} -- --ignored redis + fi +) diff --git a/server/svix-server/Cargo.toml b/server/svix-server/Cargo.toml index f1ac467d0..cf42a8750 100644 --- a/server/svix-server/Cargo.toml +++ b/server/svix-server/Cargo.toml @@ -44,8 +44,8 @@ ed25519-compact = "2.1.1" chrono = { version="0.4.26", features = ["serde"] } reqwest = { version = "0.11.27", features = ["json", "rustls-tls", "hickory-resolver"], default-features = false } bb8 = "0.8" -bb8-redis = "0.15.0" -redis = { version = "0.25.4", features = ["tokio-comp", "tokio-native-tls-comp", "streams", "cluster-async", "tcp_nodelay", "connection-manager"] } +bb8-redis = "0.16.0" +redis = { version = "0.26", features = ["tokio-comp", "tokio-native-tls-comp", "streams", "cluster-async", "tcp_nodelay", "connection-manager", "sentinel"] } thiserror = "1.0.30" bytes = "1.1.0" blake2 = "0.10.4" @@ -68,7 +68,7 @@ urlencoding = "2.1.2" form_urlencoded = "1.1.0" lapin = "2.1.1" sentry = { version = "0.32.2", features = ["tracing"] } -omniqueue = { git = "https://github.com/svix/omniqueue-rs", rev = "5ae22000e2ea214ba707cac81657f098e5785a76", default-features = false, features = ["in_memory", "rabbitmq-with-message-ids", "redis_cluster"] } +omniqueue = { git = "https://github.com/svix/omniqueue-rs", rev = "75e5a9510ad338ac3702b2e911bacf8967ac58d8", default-features = false, features = ["in_memory", "rabbitmq-with-message-ids", "redis_cluster", "redis_sentinel"] } # Not a well-known author, and no longer gets updates => pinned. # Switch to hyper-http-proxy when upgrading hyper to 1.0. hyper-proxy = { version = "=0.9.1", default-features = false, features = ["openssl-tls"] } diff --git a/server/svix-server/src/cfg.rs b/server/svix-server/src/cfg.rs index 6c987355f..c3d63704a 100644 --- a/server/svix-server/src/cfg.rs +++ b/server/svix-server/src/cfg.rs @@ -131,11 +131,16 @@ pub struct ConfigurationInner { pub db_pool_max_size: u16, /// The DSN for redis (can be left empty if not using redis) + /// Note that if using Redis Sentinel, this will be the the DSN + /// for a Sentinel instance. pub redis_dsn: Option, /// The maximum number of connections for the Redis pool #[validate(range(min = 10))] pub redis_pool_max_size: u16, + #[serde(flatten, default)] + pub redis_sentinel_cfg: Option, + /// What kind of message queue to use. Supported: memory, redis (must have redis_dsn or /// queue_dsn configured). pub queue_type: QueueType, @@ -255,6 +260,27 @@ fn validate_config_complete(config: &ConfigurationInner) -> Result<(), Validatio }); } } + CacheType::RedisSentinel => { + if config.cache_dsn().is_none() { + return Err(ValidationError { + code: Cow::from("missing field"), + message: Some(Cow::from( + "The redis_dsn or cache_dsn field must be set if the cache_type is `redissentinel`" + )), + params: HashMap::new(), + }); + } + + if config.redis_sentinel_cfg.is_none() { + return Err(ValidationError { + code: Cow::from("missing field"), + message: Some(Cow::from( + "sentinel_service_name must be set if the cache_type is `redissentinel`", + )), + params: HashMap::new(), + }); + } + } } match config.queue_type { @@ -281,6 +307,27 @@ fn validate_config_complete(config: &ConfigurationInner) -> Result<(), Validatio }); } } + QueueType::RedisSentinel => { + if config.queue_dsn().is_none() { + return Err(ValidationError { + code: Cow::from("missing field"), + message: Some(Cow::from( + "The redis_dsn or queue_dsn field must be set if the queue_type is `redissentinel`" + )), + params: HashMap::new(), + }); + } + + if config.redis_sentinel_cfg.is_none() { + return Err(ValidationError { + code: Cow::from("missing field"), + message: Some(Cow::from( + "sentinel_service_name must be set if the queue_type is `redissentinel`", + )), + params: HashMap::new(), + }); + } + } } Ok(()) @@ -304,6 +351,10 @@ impl ConfigurationInner { QueueType::Memory => QueueBackend::Memory, QueueType::Redis => QueueBackend::Redis(self.queue_dsn().expect(err)), QueueType::RedisCluster => QueueBackend::RedisCluster(self.queue_dsn().expect(err)), + QueueType::RedisSentinel => QueueBackend::RedisSentinel( + self.queue_dsn().expect(err), + self.redis_sentinel_cfg.as_ref().expect(err), + ), QueueType::RabbitMQ => QueueBackend::RabbitMq(self.rabbit_dsn.as_ref().expect(err)), } } @@ -318,6 +369,10 @@ impl ConfigurationInner { CacheType::Memory => CacheBackend::Memory, CacheType::Redis => CacheBackend::Redis(self.cache_dsn().expect(err)), CacheType::RedisCluster => CacheBackend::RedisCluster(self.cache_dsn().expect(err)), + CacheType::RedisSentinel => CacheBackend::RedisSentinel( + self.cache_dsn().expect(err), + self.redis_sentinel_cfg.as_ref().expect(err), + ), } } } @@ -346,6 +401,7 @@ pub enum QueueBackend<'a> { Memory, Redis(&'a str), RedisCluster(&'a str), + RedisSentinel(&'a str, &'a SentinelConfig), RabbitMq(&'a str), } @@ -355,6 +411,7 @@ pub enum CacheBackend<'a> { Memory, Redis(&'a str), RedisCluster(&'a str), + RedisSentinel(&'a str, &'a SentinelConfig), } #[derive(Clone, Debug, Deserialize)] @@ -378,6 +435,7 @@ pub enum QueueType { Memory, Redis, RedisCluster, + RedisSentinel, RabbitMQ, } @@ -387,6 +445,7 @@ pub enum CacheType { Memory, Redis, RedisCluster, + RedisSentinel, None, } @@ -430,6 +489,40 @@ impl fmt::Display for LogLevel { } } +#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] +pub struct SentinelConfig { + #[serde(rename = "sentinel_service_name")] + pub service_name: String, + #[serde(default)] + pub redis_tls_mode_secure: bool, + pub redis_db: Option, + pub redis_username: Option, + pub redis_password: Option, + #[serde(default)] + pub redis_use_resp3: bool, +} + +impl From for omniqueue::backends::redis::SentinelConfig { + fn from(val: SentinelConfig) -> Self { + let SentinelConfig { + service_name, + redis_tls_mode_secure, + redis_db, + redis_username, + redis_password, + redis_use_resp3, + } = val; + omniqueue::backends::redis::SentinelConfig { + service_name, + redis_tls_mode_secure, + redis_db, + redis_username, + redis_password, + redis_use_resp3, + } + } +} + pub fn load() -> Result> { if let Ok(db_url) = std::env::var("DATABASE_URL") { // If we have DATABASE_URL set, we should potentially use it. diff --git a/server/svix-server/src/lib.rs b/server/svix-server/src/lib.rs index 6a0689642..16af6dead 100644 --- a/server/svix-server/src/lib.rs +++ b/server/svix-server/src/lib.rs @@ -111,7 +111,9 @@ pub async fn run_with_prefix( let cache = match &cache_backend { CacheBackend::None => cache::none::new(), CacheBackend::Memory => cache::memory::new(), - CacheBackend::Redis(_) | CacheBackend::RedisCluster(_) => { + CacheBackend::Redis(_) + | CacheBackend::RedisCluster(_) + | CacheBackend::RedisSentinel(_, _) => { let mgr = RedisManager::from_cache_backend(&cache_backend).await; cache::redis::new(mgr) } diff --git a/server/svix-server/src/queue/mod.rs b/server/svix-server/src/queue/mod.rs index fe9cf4e4c..8f40a6639 100644 --- a/server/svix-server/src/queue/mod.rs +++ b/server/svix-server/src/queue/mod.rs @@ -33,9 +33,9 @@ pub async fn new_pair( prefix: Option<&str>, ) -> (TaskQueueProducer, TaskQueueConsumer) { match cfg.queue_backend() { - QueueBackend::Redis(_) | QueueBackend::RedisCluster(_) => { - redis::new_pair(cfg, prefix).await - } + QueueBackend::Redis(_) + | QueueBackend::RedisCluster(_) + | QueueBackend::RedisSentinel(_, _) => redis::new_pair(cfg, prefix).await, QueueBackend::Memory => { let (producer, consumer) = InMemoryBackend::builder() .build_pair() diff --git a/server/svix-server/src/queue/redis.rs b/server/svix-server/src/queue/redis.rs index d653a6b39..b5ce5a3d3 100644 --- a/server/svix-server/src/queue/redis.rs +++ b/server/svix-server/src/queue/redis.rs @@ -206,6 +206,8 @@ async fn new_pair_inner( consumer_name: WORKER_CONSUMER.to_owned(), payload_key: QUEUE_KV_KEY.to_owned(), ack_deadline_ms: pending_duration, + dlq_config: None, + sentinel_config: cfg.redis_sentinel_cfg.clone().map(|c| c.into()), }; match &cfg.queue_type { @@ -219,7 +221,17 @@ async fn new_pair_inner( let consumer = TaskQueueConsumer::new(consumer); (producer, consumer) } - _ => { + QueueType::RedisSentinel => { + let (producer, consumer) = RedisBackend::sentinel_builder(config) + .build_pair() + .await + .expect("Error initializing redis-cluster queue"); + + let producer = TaskQueueProducer::new(producer); + let consumer = TaskQueueConsumer::new(consumer); + (producer, consumer) + } + QueueType::Redis => { let (producer, consumer) = RedisBackend::builder(config) .build_pair() .await @@ -229,6 +241,7 @@ async fn new_pair_inner( let consumer = TaskQueueConsumer::new(consumer); (producer, consumer) } + _ => panic!("Unsupported backend!"), } } diff --git a/server/svix-server/src/redis/mod.rs b/server/svix-server/src/redis/mod.rs index dd0f152c9..e3a2e355f 100644 --- a/server/svix-server/src/redis/mod.rs +++ b/server/svix-server/src/redis/mod.rs @@ -1,88 +1,177 @@ mod cluster; +mod sentinel; -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use bb8::{Pool, RunError}; use bb8_redis::RedisConnectionManager; -use redis::{FromRedisValue, RedisError, RedisResult}; +use redis::{ + aio::ConnectionManagerConfig, sentinel::SentinelNodeConnectionInfo, AsyncConnectionConfig, + FromRedisValue, ProtocolVersion, RedisConnectionInfo, RedisError, RedisResult, TlsMode, +}; +use sentinel::RedisSentinelConnectionManager; +use tokio::sync::Mutex; pub use self::cluster::RedisClusterConnectionManager; -use crate::cfg::{CacheBackend, QueueBackend}; +use crate::cfg::{CacheBackend, QueueBackend, SentinelConfig}; pub const REDIS_CONN_TIMEOUT: Duration = Duration::from_secs(2); +pub enum RedisVariant<'a> { + Clustered, + NonClustered, + Sentinel(&'a SentinelConfig), +} + #[derive(Clone, Debug)] pub enum RedisManager { Clustered(ClusteredRedisPool), ClusteredUnpooled(ClusteredRedisUnpooled), NonClustered(NonClusteredRedisPool), NonClusteredUnpooled(NonClusteredRedisUnpooled), + Sentinel(SentinelPooled), + SentinelUnpooled(SentinelUnpooled), } impl RedisManager { - async fn new_pooled(dsn: &str, clustered: bool, max_conns: u16) -> Self { - if clustered { - let mgr = RedisClusterConnectionManager::new(dsn) - .expect("Error initializing redis cluster client"); - let pool = bb8::Pool::builder() - .max_size(max_conns.into()) - .build(mgr) - .await - .expect("Error initializing redis cluster connection pool"); - let pool = ClusteredRedisPool { pool }; - RedisManager::Clustered(pool) - } else { - let mgr = RedisConnectionManager::new(dsn).expect("Error initializing redis client"); - let pool = bb8::Pool::builder() - .max_size(max_conns.into()) - .build(mgr) - .await - .expect("Error initializing redis connection pool"); - let pool = NonClusteredRedisPool { pool }; - RedisManager::NonClustered(pool) + async fn new_pooled(dsn: &str, variant: RedisVariant<'_>, max_conns: u16) -> Self { + match variant { + RedisVariant::Clustered => { + let mgr = RedisClusterConnectionManager::new(dsn) + .expect("Error initializing redis cluster client"); + let pool = bb8::Pool::builder() + .max_size(max_conns.into()) + .build(mgr) + .await + .expect("Error initializing redis cluster connection pool"); + let pool = ClusteredRedisPool { pool }; + RedisManager::Clustered(pool) + } + RedisVariant::NonClustered => { + let mgr = + RedisConnectionManager::new(dsn).expect("Error initializing redis client"); + let pool = bb8::Pool::builder() + .max_size(max_conns.into()) + .build(mgr) + .await + .expect("Error initializing redis connection pool"); + let pool = NonClusteredRedisPool { pool }; + RedisManager::NonClustered(pool) + } + RedisVariant::Sentinel(cfg) => { + let tls_mode = cfg.redis_tls_mode_secure.then_some(TlsMode::Secure); + let protocol = if cfg.redis_use_resp3 { + ProtocolVersion::RESP3 + } else { + ProtocolVersion::default() + }; + let mgr = RedisSentinelConnectionManager::new( + vec![dsn], + cfg.service_name.clone(), + Some(SentinelNodeConnectionInfo { + tls_mode, + redis_connection_info: Some(RedisConnectionInfo { + db: cfg.redis_db.unwrap_or(0), + username: cfg.redis_username.clone(), + password: cfg.redis_password.clone(), + protocol, + }), + }), + ) + .expect("Error initializing RedisSentinelConnectionManager"); + let pool = bb8::Pool::builder() + .max_size(max_conns.into()) + .build(mgr) + .await + .expect("Error initializing redis connection pool"); + + let pool = SentinelPooled { pool }; + RedisManager::Sentinel(pool) + } } } - async fn new_unpooled(dsn: &str, clustered: bool) -> Self { - if clustered { - let cli = redis::cluster::ClusterClient::builder(vec![dsn]) - .retries(1) - .connection_timeout(REDIS_CONN_TIMEOUT) - .build() - .expect("Error initializing redis-unpooled cluster client"); - let con = cli - .get_async_connection() + async fn new_unpooled(dsn: &str, variant: RedisVariant<'_>) -> Self { + match variant { + RedisVariant::Clustered => { + let cli = redis::cluster::ClusterClient::builder(vec![dsn]) + .retries(1) + .connection_timeout(REDIS_CONN_TIMEOUT) + .build() + .expect("Error initializing redis-unpooled cluster client"); + let con = cli + .get_async_connection() + .await + .expect("Failed to get redis-cluster-unpooled connection"); + RedisManager::ClusteredUnpooled(ClusteredRedisUnpooled { con }) + } + RedisVariant::NonClustered => { + let cli = + redis::Client::open(dsn).expect("Error initializing redis unpooled client"); + let con = redis::aio::ConnectionManager::new_with_config( + cli, + ConnectionManagerConfig::new() + .set_number_of_retries(1) + .set_connection_timeout(REDIS_CONN_TIMEOUT), + ) .await - .expect("Failed to get redis-cluster-unpooled connection"); - RedisManager::ClusteredUnpooled(ClusteredRedisUnpooled { con }) - } else { - let cli = redis::Client::open(dsn).expect("Error initializing redis unpooled client"); - let con = redis::aio::ConnectionManager::new_with_backoff_and_timeouts( - cli, - 2, - 100, - 1, - Duration::MAX, - REDIS_CONN_TIMEOUT, - ) - .await - .expect("Failed to get redis-unpooled connection manager"); - RedisManager::NonClusteredUnpooled(NonClusteredRedisUnpooled { con }) + .expect("Failed to get redis-unpooled connection manager"); + RedisManager::NonClusteredUnpooled(NonClusteredRedisUnpooled { con }) + } + RedisVariant::Sentinel(cfg) => { + let tls_mode = cfg.redis_tls_mode_secure.then_some(TlsMode::Secure); + let protocol = if cfg.redis_use_resp3 { + ProtocolVersion::RESP3 + } else { + ProtocolVersion::default() + }; + let cli = redis::sentinel::SentinelClient::build( + vec![dsn], + cfg.service_name.clone(), + Some(SentinelNodeConnectionInfo { + tls_mode, + redis_connection_info: Some(RedisConnectionInfo { + db: cfg.redis_db.unwrap_or(0), + username: cfg.redis_username.clone(), + password: cfg.redis_password.clone(), + protocol, + }), + }), + redis::sentinel::SentinelServerType::Master, + ) + .expect("Failed to build sentinel client"); + + RedisManager::SentinelUnpooled(SentinelUnpooled { + pool: Arc::new(Mutex::new(cli)), + }) + } } } pub async fn from_cache_backend(cache_backend: &CacheBackend<'_>) -> Self { match cache_backend { - CacheBackend::Redis(dsn) => Self::new_unpooled(dsn, false).await, - CacheBackend::RedisCluster(dsn) => Self::new_unpooled(dsn, true).await, + CacheBackend::Redis(dsn) => Self::new_unpooled(dsn, RedisVariant::NonClustered).await, + CacheBackend::RedisCluster(dsn) => { + Self::new_unpooled(dsn, RedisVariant::Clustered).await + } + CacheBackend::RedisSentinel(dsn, cfg) => { + Self::new_unpooled(dsn, RedisVariant::Sentinel(cfg)).await + } _ => panic!("Queue type not supported with redis"), } } pub async fn from_queue_backend(queue_backend: &QueueBackend<'_>, max_conns: u16) -> Self { match queue_backend { - QueueBackend::Redis(dsn) => Self::new_pooled(dsn, false, max_conns).await, - QueueBackend::RedisCluster(dsn) => Self::new_pooled(dsn, true, max_conns).await, + QueueBackend::Redis(dsn) => { + Self::new_pooled(dsn, RedisVariant::NonClustered, max_conns).await + } + QueueBackend::RedisCluster(dsn) => { + Self::new_pooled(dsn, RedisVariant::Clustered, max_conns).await + } + QueueBackend::RedisSentinel(dsn, cfg) => { + Self::new_pooled(dsn, RedisVariant::Sentinel(cfg), max_conns).await + } _ => panic!("Queue type not supported with redis"), } } @@ -93,6 +182,8 @@ impl RedisManager { Self::NonClustered(pool) => pool.get().await, Self::ClusteredUnpooled(pool) => pool.get().await, Self::NonClusteredUnpooled(pool) => pool.get().await, + Self::Sentinel(pool) => pool.get().await, + Self::SentinelUnpooled(pool) => pool.get().await, } } } @@ -153,6 +244,50 @@ impl std::fmt::Debug for NonClusteredRedisUnpooled { } } +#[derive(Clone)] +pub struct SentinelPooled { + pool: Pool, +} + +impl SentinelPooled { + pub async fn get(&self) -> Result, RunError> { + Ok(RedisConnection::SentinelPooled(SentinelPooledConnection { + con: self.pool.get().await?, + })) + } +} + +impl std::fmt::Debug for SentinelPooled { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SentinelPooled").finish() + } +} + +#[derive(Clone)] +pub struct SentinelUnpooled { + pool: Arc>, +} + +impl SentinelUnpooled { + pub async fn get(&self) -> Result, RunError> { + let mut pool = (*self.pool).lock().await; + let con = pool + .get_async_connection_with_config( + &AsyncConnectionConfig::new().set_response_timeout(REDIS_CONN_TIMEOUT), + ) + .await?; + Ok(RedisConnection::SentinelUnpooled( + SentinelUnpooledConnection { con }, + )) + } +} + +impl std::fmt::Debug for SentinelUnpooled { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SentinelUnpooled").finish() + } +} + #[derive(Clone, Debug)] pub struct NonClusteredRedisPool { pool: Pool, @@ -171,6 +306,8 @@ pub enum RedisConnection<'a> { ClusteredUnpooled(ClusteredUnpooledConnection), NonClustered(NonClusteredPooledConnection<'a>), NonClusteredUnpooled(NonClusteredUnpooledConnection), + SentinelPooled(SentinelPooledConnection<'a>), + SentinelUnpooled(SentinelUnpooledConnection), } impl RedisConnection<'_> { @@ -196,6 +333,8 @@ impl redis::aio::ConnectionLike for RedisConnection<'_> { RedisConnection::NonClustered(conn) => conn.con.req_packed_command(cmd), RedisConnection::ClusteredUnpooled(conn) => conn.con.req_packed_command(cmd), RedisConnection::NonClusteredUnpooled(conn) => conn.con.req_packed_command(cmd), + RedisConnection::SentinelPooled(conn) => conn.con.req_packed_command(cmd), + RedisConnection::SentinelUnpooled(conn) => conn.con.req_packed_command(cmd), } } @@ -214,6 +353,12 @@ impl redis::aio::ConnectionLike for RedisConnection<'_> { RedisConnection::NonClusteredUnpooled(conn) => { conn.con.req_packed_commands(cmd, offset, count) } + RedisConnection::SentinelPooled(conn) => { + conn.con.req_packed_commands(cmd, offset, count) + } + RedisConnection::SentinelUnpooled(conn) => { + conn.con.req_packed_commands(cmd, offset, count) + } } } @@ -223,6 +368,8 @@ impl redis::aio::ConnectionLike for RedisConnection<'_> { RedisConnection::NonClustered(conn) => conn.con.get_db(), RedisConnection::ClusteredUnpooled(conn) => conn.con.get_db(), RedisConnection::NonClusteredUnpooled(conn) => conn.con.get_db(), + RedisConnection::SentinelPooled(conn) => conn.con.get_db(), + RedisConnection::SentinelUnpooled(conn) => conn.con.get_db(), } } } @@ -287,6 +434,11 @@ pooled_connection!( bb8::PooledConnection<'a, RedisClusterConnectionManager> ); +pooled_connection!( + SentinelPooledConnection, + bb8::PooledConnection<'a, RedisSentinelConnectionManager> +); + connection!( NonClusteredUnpooledConnection, redis::aio::ConnectionManager @@ -297,6 +449,11 @@ connection!( redis::cluster_async::ClusterConnection ); +connection!( + SentinelUnpooledConnection, + redis::aio::MultiplexedConnection +); + #[cfg(test)] mod tests { use redis::AsyncCommands; diff --git a/server/svix-server/src/redis/sentinel.rs b/server/svix-server/src/redis/sentinel.rs new file mode 100644 index 000000000..9e56200d0 --- /dev/null +++ b/server/svix-server/src/redis/sentinel.rs @@ -0,0 +1,53 @@ +use axum::async_trait; +use redis::{ + sentinel::{SentinelClient, SentinelNodeConnectionInfo, SentinelServerType}, + ErrorKind, IntoConnectionInfo, RedisError, +}; +use tokio::sync::Mutex; + +struct LockedSentinelClient(pub(crate) Mutex); + +/// ConnectionManager that implements `bb8::ManageConnection` and supports +/// asynchronous Sentinel connections via `redis::sentinel::SentinelClient` +pub struct RedisSentinelConnectionManager { + client: LockedSentinelClient, +} + +impl RedisSentinelConnectionManager { + pub fn new( + info: Vec, + service_name: String, + node_connection_info: Option, + ) -> Result { + Ok(RedisSentinelConnectionManager { + client: LockedSentinelClient(Mutex::new(SentinelClient::build( + info, + service_name, + node_connection_info, + SentinelServerType::Master, + )?)), + }) + } +} + +#[async_trait] +impl bb8::ManageConnection for RedisSentinelConnectionManager { + type Connection = redis::aio::MultiplexedConnection; + type Error = RedisError; + + async fn connect(&self) -> Result { + self.client.0.lock().await.get_async_connection().await + } + + async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> { + let pong: String = redis::cmd("PING").query_async(conn).await?; + match pong.as_str() { + "PONG" => Ok(()), + _ => Err((ErrorKind::ResponseError, "ping request").into()), + } + } + + fn has_broken(&self, _: &mut Self::Connection) -> bool { + false + } +} diff --git a/server/svix-server/tests/it/message_app.rs b/server/svix-server/tests/it/message_app.rs index aa59fcecd..627b3db95 100644 --- a/server/svix-server/tests/it/message_app.rs +++ b/server/svix-server/tests/it/message_app.rs @@ -66,7 +66,9 @@ async fn test_app_deletion() { // Delete the cached [`CreateMessageApp`] here instead of waiting 30s for it to expire let cache = match cfg.cache_backend() { CacheBackend::None => cache::none::new(), - CacheBackend::Redis(_) | CacheBackend::RedisCluster(_) => { + CacheBackend::Redis(_) + | CacheBackend::RedisCluster(_) + | CacheBackend::RedisSentinel(_, _) => { let mgr = RedisManager::from_cache_backend(&cfg.cache_backend()).await; cache::redis::new(mgr) } @@ -146,7 +148,9 @@ async fn test_endp_deletion() { // Delete the cached [`CreateMessageApp`] here instead of waiting 30s for it to expire let cache = match cfg.cache_backend() { CacheBackend::None => cache::none::new(), - CacheBackend::Redis(_) | CacheBackend::RedisCluster(_) => { + CacheBackend::Redis(_) + | CacheBackend::RedisCluster(_) + | CacheBackend::RedisSentinel(_, _) => { let mgr = RedisManager::from_cache_backend(&cfg.cache_backend()).await; cache::redis::new(mgr) } diff --git a/server/testing-docker-compose.yml b/server/testing-docker-compose.yml index 4f6443158..046f9b450 100644 --- a/server/testing-docker-compose.yml +++ b/server/testing-docker-compose.yml @@ -72,6 +72,42 @@ services: ALLOW_EMPTY_PASSWORD: "yes" REDIS_NODES: "redis-cluster redis-cluster-node-0 redis-cluster-node-1 redis-cluster-node-2 redis-cluster-node-3 redis-cluster-node-4" + redis-sentinel: + image: docker.io/redis:7 + ports: + - "26379:26379" + command: > + sh -c 'echo "bind 0.0.0.0" > /etc/sentinel.conf && + echo "sentinel monitor master0 redis-master-0 6379 2" >> /etc/sentinel.conf && + echo "sentinel resolve-hostnames yes" >> /etc/sentinel.conf && + echo "sentinel down-after-milliseconds master0 10000" >> /etc/sentinel.conf && + echo "sentinel failover-timeout master0 10000" >> /etc/sentinel.conf && + echo "sentinel parallel-syncs master0 1" >> /etc/sentinel.conf && + redis-sentinel /etc/sentinel.conf' + + redis-master-0: + image: docker.io/redis:7 + ports: + - "6387:6379" + + redis-replica-0: + image: docker.io/redis:7 + ports: + - "6388:6379" + command: + [ + "redis-server", + "--appendonly", + "yes", + "--replicaof", + "redis-master-0", + "6379", + "--repl-diskless-load", + "on-empty-db", + "--protected-mode", + "no" + ] + rabbitmq: image: "docker.io/rabbitmq:3.11.13-management-alpine" ports: