Skip to content

Commit

Permalink
Upgrade omniqueue (#1256)
Browse files Browse the repository at this point in the history
… and one related dependency.

Also fix a regression in the rabbitmq queue backend that would result in
tasks from a new API process not being picked up by old worker processes
due to missing message IDs.
  • Loading branch information
svix-jplatte authored Mar 7, 2024
2 parents 25cf751 + 2329c13 commit 49a8184
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 16 deletions.
1 change: 1 addition & 0 deletions .github/workflows/other-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ jobs:
VALIDATE_RUST_2018: false
VALIDATE_RUST_2021: false
VALIDATE_RUST_CLIPPY: false
VALIDATE_SHELL_SHFMT: false
VALIDATE_SQL: false
VALIDATE_SQLFLUFF: false
FILTER_REGEX_EXCLUDE: (gradlew|javascript/tsconfig.json)
9 changes: 5 additions & 4 deletions bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions bridge/svix-bridge-plugin-queue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
omniqueue = { git = "https://github.com/svix/omniqueue-rs", rev = "044fc688670759859f193bb646fac043bbfd08ba" }
omniqueue = "0.2.0"
serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
svix-bridge-types = { path = "../svix-bridge-types" }
Expand All @@ -20,7 +20,7 @@ aws-config = "1.1.5"
aws-sdk-sqs = "1.13.0"
fastrand = "2.0.1"
google-cloud-googleapis = "0.12.0"
google-cloud-pubsub = "0.22.1"
google-cloud-pubsub = "0.23.0"
lapin = "2"
redis = { version = "0.24.0", features = ["tokio-comp", "streams"] }
tracing-subscriber = "0.3"
Expand Down
6 changes: 4 additions & 2 deletions server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 8 additions & 5 deletions server/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,11 @@ SVIX_REDIS_DSN="redis://localhost:6379" \
${TEST_COMMAND}

echo "*********** RUN 6 ***********"
SVIX_QUEUE_TYPE="rabbitmq" \
SVIX_CACHE_TYPE="redis" \
SVIX_REDIS_DSN="redis://localhost:6379" \
SVIX_RABBIT_DSN="amqp://xivs:xivs@localhost:5672/%2f" \
${TEST_COMMAND}
(
export SVIX_QUEUE_TYPE="rabbitmq"
export SVIX_CACHE_TYPE="redis"
export SVIX_REDIS_DSN="redis://localhost:6379"
export SVIX_RABBIT_DSN="amqp://xivs:xivs@localhost:5672/%2f"
${TEST_COMMAND}
cargo test -- --ignored rabbitmq
)
2 changes: 1 addition & 1 deletion server/svix-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ urlencoding = "2.1.2"
form_urlencoded = "1.1.0"
lapin = "2.1.1"
sentry = { version = "0.32.2", features = ["tracing"] }
omniqueue = { git = "https://github.com/svix/omniqueue-rs.git", rev = "044fc688670759859f193bb646fac043bbfd08ba", default-features = false, features = ["in_memory", "rabbitmq", "redis_cluster"] }
omniqueue = { version = "0.2.0", default-features = false, features = ["in_memory", "rabbitmq-with-message-ids", "redis_cluster"] }

[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = { version = "0.5", optional = true }
Expand Down
4 changes: 2 additions & 2 deletions server/svix-server/src/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{sync::Arc, time::Duration};

use axum::async_trait;
use omniqueue::{
backends::InMemoryBackend, Delivery, DynConsumer, DynScheduledQueueProducer, QueueConsumer,
backends::InMemoryBackend, Delivery, DynConsumer, DynScheduledProducer, QueueConsumer,
ScheduledQueueProducer,
};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -143,7 +143,7 @@ impl QueueTask {

#[derive(Clone)]
pub struct TaskQueueProducer {
inner: Arc<DynScheduledQueueProducer>,
inner: Arc<DynScheduledProducer>,
}

impl TaskQueueProducer {
Expand Down
66 changes: 66 additions & 0 deletions server/svix-server/src/queue/rabbitmq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,69 @@ async fn declare_bound_queue(

Ok(())
}

#[cfg(test)]
mod tests {
use futures::StreamExt as _;
use lapin::options::{BasicConsumeOptions, BasicQosOptions};
use svix_ksuid::{KsuidLike as _, KsuidMs};

use crate::{cfg, queue::QueueTask};

#[tokio::test]
// run with `cargo test -- --ignored rabbitmq` only when rabbitmq is up and configured
#[ignore]
async fn test_messages_have_ids() {
const QUEUE_NAME: &str = "test_messages_have_ids_q";

let cfg = cfg::load().expect("Error loading configuration");
let cfg::QueueBackend::RabbitMq(dsn) = cfg.queue_backend() else {
panic!("This test must only run when the rabbitmq backend is enabled");
};
let prefetch_size = cfg.rabbit_consumer_prefetch_size.unwrap_or(1);

// Send message with omniqueue
{
let (producer, _) = super::new_pair(dsn, QUEUE_NAME.to_owned(), prefetch_size)
.await
.unwrap();

producer.send(QueueTask::HealthCheck, None).await.unwrap();
}

// Receive with lapin consumer
{
let conn = lapin::Connection::connect(dsn, lapin::ConnectionProperties::default())
.await
.unwrap();
let channel = conn.create_channel().await.unwrap();

let consumer_tag = format!(
"{QUEUE_NAME}-consumer-{}",
KsuidMs::new(None, None).to_string()
);

let opts = BasicConsumeOptions {
no_local: false,
no_ack: false,
exclusive: false,
nowait: false,
};

channel
.basic_qos(prefetch_size, BasicQosOptions { global: false })
.await
.unwrap();

let mut consumer = channel
.basic_consume(QUEUE_NAME, &consumer_tag, opts, Default::default())
.await
.unwrap();

let delivery = consumer.next().await.unwrap().unwrap();

// ... to assert that there is a message ID
delivery.properties.message_id().as_ref().unwrap();
}
}
}

0 comments on commit 49a8184

Please sign in to comment.