Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis dlq support #1478

Merged
merged 3 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,10 @@ To support graceful shutdown on the server, all running tasks are finished befor
One of our main goals with open sourcing the Svix dispatcher is ease of use. The hosted Svix service, however, is quite complex due to our scale and the infrastructure it requires. This complexity is not useful for the vast majority of people and would make this project much harder to use and much more limited.
This is why this code has been adjusted before being released, and some of the features, optimizations, and behaviors supported by the hosted dispatcher are not yet available in this repo. With that being said, other than some known incompatibilities, the internal Svix test suite passes. This means they are already mostly compatible, and we are working hard on bringing them to full feature parity.

# Re-driving Redis DLQ
We have an undocumented endpoint for re-driving failed messages that are DLQ'ed. You can do this by calling `POST /api/v1/admin/redrive-dlq/`.

To monitor the DLQ depth, you should monitor the `svix.queue.depth_dlq` metric. Any non-zero values indicate that there is data in the DLQ.

# Development

Expand Down
2 changes: 1 addition & 1 deletion server/svix-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ urlencoding = "2.1.2"
form_urlencoded = "1.1.0"
lapin = "2.1.1"
sentry = { version = "0.32.2", features = ["tracing"] }
omniqueue = { git = "https://github.com/svix/omniqueue-rs", rev = "75e5a9510ad338ac3702b2e911bacf8967ac58d8", default-features = false, features = ["in_memory", "rabbitmq-with-message-ids", "redis_cluster", "redis_sentinel"] }
omniqueue = { git = "https://github.com/svix/omniqueue-rs", rev = "75e5a9510ad338ac3702b2e911bacf8967ac58d8", default-features = false, features = ["in_memory", "rabbitmq-with-message-ids", "redis_cluster", "redis_sentinel", "beta"] }
# Not a well-known author, and no longer gets updates => pinned.
# Switch to hyper-http-proxy when upgrading hyper to 1.0.
hyper-proxy = { version = "=0.9.1", default-features = false, features = ["openssl-tls"] }
Expand Down
3 changes: 3 additions & 0 deletions server/svix-server/config.default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,6 @@ worker_max_tasks = 500
# Whether or not to disable TLS certificate validation on Webhook dispatch. This is a dangerous flag
# to set true. This value will default to false.
# dangerous_disable_tls_verification = false

# Maximum seconds of queue long-poll
queue_max_poll_secs = 20
10 changes: 10 additions & 0 deletions server/svix-server/src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ const DEFAULTS: &str = include_str!("../config.default.toml");

pub type Configuration = Arc<ConfigurationInner>;

fn default_redis_pending_duration_secs() -> u64 {
45
}

#[derive(Clone, Debug, Deserialize, Validate)]
#[validate(
schema(function = "validate_config_complete"),
Expand Down Expand Up @@ -184,6 +188,9 @@ pub struct ConfigurationInner {
/// Maximum number of concurrent worker tasks to spawn (0 is unlimited)
pub worker_max_tasks: u16,

/// Maximum seconds of a queue long-poll
pub queue_max_poll_secs: u16,

/// The address of the rabbitmq exchange
pub rabbit_dsn: Option<Arc<String>>,
pub rabbit_consumer_prefetch_size: Option<u16>,
Expand All @@ -197,6 +204,9 @@ pub struct ConfigurationInner {
#[serde(flatten)]
pub proxy_config: Option<ProxyConfig>,

#[serde(default = "default_redis_pending_duration_secs")]
pub redis_pending_duration_secs: u64,

#[serde(flatten)]
pub internal: InternalConfig,
}
Expand Down
21 changes: 21 additions & 0 deletions server/svix-server/src/metrics/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub struct RedisQueueMetrics {
main_queue: Option<ObservableGauge<u64>>,
pending_queue: Option<ObservableGauge<u64>>,
delayed_queue: Option<ObservableGauge<u64>>,
deadletter_queue: Option<ObservableGauge<u64>>,
}

impl RedisQueueMetrics {
Expand All @@ -65,10 +66,17 @@ impl RedisQueueMetrics {
.try_init(),
);

let deadletter_queue = init_metric(
meter
.u64_observable_gauge("svix.queue.depth_dlq")
.try_init(),
);

Self {
main_queue,
pending_queue,
delayed_queue,
deadletter_queue,
}
}
pub async fn record(
Expand All @@ -77,6 +85,7 @@ impl RedisQueueMetrics {
main_queue: &RedisQueueType<'_>,
pending_queue: &RedisQueueType<'_>,
delayed_queue: &RedisQueueType<'_>,
deadletter_queue: &RedisQueueType<'_>,
) {
main_queue
.queue_depth(redis)
Expand All @@ -99,6 +108,13 @@ impl RedisQueueMetrics {
.unwrap_or_else(|e| {
tracing::warn!("Failed to record queue depth: {e}");
});
deadletter_queue
.queue_depth(redis)
.await
.map(|d| self.record_deadletter_queue_depth(d))
.unwrap_or_else(|e| {
tracing::warn!("Failed to record queue depth: {e}");
});
}

fn record_main_queue_depth(&self, value: u64) {
Expand All @@ -116,4 +132,9 @@ impl RedisQueueMetrics {
recorder.observe(value, &[]);
}
}
fn record_deadletter_queue_depth(&self, value: u64) {
if let Some(recorder) = &self.deadletter_queue {
recorder.observe(value, &[]);
}
}
}
173 changes: 110 additions & 63 deletions server/svix-server/src/queue/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::{sync::Arc, time::Duration};
use std::{marker::PhantomData, num::NonZeroUsize, sync::Arc, time::Duration};

use omniqueue::{
backends::InMemoryBackend, Delivery, DynConsumer, DynScheduledProducer, QueueConsumer,
ScheduledQueueProducer,
backends::InMemoryBackend, Delivery, DynConsumer, QueueConsumer, ScheduledQueueProducer,
};
use serde::{Deserialize, Serialize};
use serde::{de::DeserializeOwned, Deserialize, Serialize};

use crate::{
cfg::{Configuration, QueueBackend},
Expand All @@ -24,6 +23,8 @@ const RETRY_SCHEDULE: &[Duration] = &[
Duration::from_millis(40),
];

pub type TaskQueueDelivery = SvixOmniDelivery<QueueTask>;

fn should_retry(err: &Error) -> bool {
matches!(err.typ, ErrorType::Queue(_))
}
Expand Down Expand Up @@ -139,19 +140,34 @@ impl QueueTask {
}
}

#[derive(Clone)]
pub struct TaskQueueProducer {
inner: Arc<DynScheduledProducer>,
pub type TaskQueueProducer = SvixOmniProducer<QueueTask>;
pub type TaskQueueConsumer = SvixOmniConsumer<QueueTask>;

pub struct SvixOmniProducer<T: OmniMessage> {
inner: Arc<omniqueue::DynScheduledProducer>,
_phantom: PhantomData<T>,
}

// Manual impl to avoid adding 'Clone' bound on T
impl<T: OmniMessage> Clone for SvixOmniProducer<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
_phantom: PhantomData,
}
}
}

impl TaskQueueProducer {
pub fn new(inner: impl ScheduledQueueProducer + 'static) -> Self {
impl<T: OmniMessage> SvixOmniProducer<T> {
pub(super) fn new(inner: impl ScheduledQueueProducer + 'static) -> Self {
Self {
inner: Arc::new(inner.into_dyn_scheduled()),
_phantom: PhantomData,
}
}

pub async fn send(&self, task: QueueTask, delay: Option<Duration>) -> Result<()> {
#[tracing::instrument(skip_all, name = "queue_send")]
pub async fn send(&self, task: &T, delay: Option<Duration>) -> Result<()> {
let task = Arc::new(task);
run_with_retries(
|| async {
Expand All @@ -169,57 +185,99 @@ impl TaskQueueProducer {
)
.await
}

#[tracing::instrument(skip_all, name = "redrive_dlq")]
pub async fn redrive_dlq(&self) -> Result<()> {
self.inner.redrive_dlq().await.map_err(Into::into)
}
}

pub struct TaskQueueConsumer {
pub struct SvixOmniConsumer<T: OmniMessage> {
inner: DynConsumer,
_phantom: PhantomData<T>,
}

pub trait OmniMessage: Serialize + DeserializeOwned + Send + Sync {
fn task_id(&self) -> Option<&str>;
}

impl TaskQueueConsumer {
pub fn new(inner: impl QueueConsumer + 'static) -> Self {
impl OmniMessage for QueueTask {
fn task_id(&self) -> Option<&str> {
self.msg_id()
}
}

impl<T: OmniMessage> SvixOmniConsumer<T> {
pub(super) fn new(inner: impl QueueConsumer + 'static) -> Self {
Self {
inner: inner.into_dyn(),
_phantom: PhantomData,
}
}

pub async fn receive_all(&mut self) -> Result<Vec<TaskQueueDelivery>> {
const MAX_MESSAGES: usize = 128;
// FIXME(onelson): need to figure out what deadline/duration to use here
#[tracing::instrument(skip_all, name = "queue_receive_all")]
pub async fn receive_all(&mut self, deadline: Duration) -> Result<Vec<SvixOmniDelivery<T>>> {
pub const MAX_MESSAGES: usize = 128;
self.inner
.receive_all(MAX_MESSAGES, Duration::from_secs(30))
.receive_all(MAX_MESSAGES, deadline)
.await
.map_err(Into::into)
.map_err(Error::from)
.trace()?
.into_iter()
.map(TryInto::try_into)
.map(|acker| {
Ok(SvixOmniDelivery {
task: Arc::new(
acker
.payload_serde_json()
.map_err(|e| {
Error::queue(format!("Failed to decode queue task: {e:?}"))
})?
.ok_or_else(|| Error::queue("Unexpected empty delivery"))?,
),

acker,
})
})
.collect()
}

pub fn max_messages(&self) -> Option<NonZeroUsize> {
self.inner.max_messages()
}
}

#[derive(Debug)]
pub struct TaskQueueDelivery {
pub task: Arc<QueueTask>,
acker: Delivery,
pub struct SvixOmniDelivery<T> {
pub task: Arc<T>,
pub(super) acker: Delivery,
}

impl TaskQueueDelivery {
impl<T: OmniMessage> SvixOmniDelivery<T> {
pub async fn set_ack_deadline(&mut self, duration: Duration) -> Result<()> {
Ok(self.acker.set_ack_deadline(duration).await?)
}
pub async fn ack(self) -> Result<()> {
tracing::trace!(msg_id = self.task.msg_id(), "ack");
tracing::trace!(
task_id = self.task.task_id().map(tracing::field::display),
"ack"
);

let mut retry = Retry::new(should_retry, RETRY_SCHEDULE);
let mut acker = Some(self.acker);
loop {
if let Some(result) = retry
.run(|| async {
let delivery = acker
.take()
.expect("acker is always Some when trying to ack");
delivery.ack().await.map_err(|(e, delivery)| {
// Put the delivery back in acker before retrying, to
// satisfy the expect above.
acker = Some(delivery);
e.into()
})
match acker.take() {
Some(delivery) => {
delivery.ack().await.map_err(|(e, delivery)| {
// Put the delivery back in acker before retrying, to
// satisfy the expect above.
acker = Some(delivery);
e.into()
})
}
None => unreachable!(),
}
})
.await
{
Expand All @@ -229,27 +287,31 @@ impl TaskQueueDelivery {
}

pub async fn nack(self) -> Result<()> {
tracing::trace!(msg_id = self.task.msg_id(), "nack");
tracing::trace!(
task_id = self.task.task_id().map(tracing::field::display),
"nack"
);

let mut retry = Retry::new(should_retry, RETRY_SCHEDULE);
let mut acker = Some(self.acker);
loop {
if let Some(result) = retry
.run(|| async {
let delivery = acker
.take()
.expect("acker is always Some when trying to ack");

delivery
.nack()
.await
.map_err(|(e, delivery)| {
// Put the delivery back in acker before retrying, to
// satisfy the expect above.
acker = Some(delivery);
e.into()
})
.trace()
match acker.take() {
Some(delivery) => {
delivery
.nack()
.await
.map_err(|(e, delivery)| {
// Put the delivery back in acker before retrying, to
// satisfy the expect above.
acker = Some(delivery);
Error::from(e)
})
.trace()
}
_ => unreachable!(),
}
})
.await
{
Expand All @@ -258,18 +320,3 @@ impl TaskQueueDelivery {
}
}
}

impl TryFrom<Delivery> for TaskQueueDelivery {
type Error = Error;
fn try_from(value: Delivery) -> Result<Self> {
Ok(TaskQueueDelivery {
task: Arc::new(
value
.payload_serde_json()
.map_err(|_| Error::queue("Failed to decode queue task"))?
.ok_or_else(|| Error::queue("Unexpected empty delivery"))?,
),
acker: value,
})
}
}
2 changes: 1 addition & 1 deletion server/svix-server/src/queue/rabbitmq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ mod tests {
.await
.unwrap();

producer.send(QueueTask::HealthCheck, None).await.unwrap();
producer.send(&QueueTask::HealthCheck, None).await.unwrap();
}

// Receive with lapin consumer
Expand Down
Loading