From 436ba865c96e31c075c73a09c5e8c674a46ed8c7 Mon Sep 17 00:00:00 2001 From: Kevin Hoffman Date: Wed, 10 Mar 2021 10:33:49 -0500 Subject: [PATCH] Upgrading to the new eventstreams interface (#49) * Upgrading to the new eventstreams interface * Better error handling on write event --- redis-streams/Cargo.toml | 6 +++--- redis-streams/src/lib.rs | 30 +++++++++++++++++++----------- 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/redis-streams/Cargo.toml b/redis-streams/Cargo.toml index 54e16f73..c1d3c2e2 100644 --- a/redis-streams/Cargo.toml +++ b/redis-streams/Cargo.toml @@ -1,13 +1,13 @@ [package] name = "wasmcloud-streams-redis" -version = "0.4.1" +version = "0.5.0" authors = ["wasmCloud Team"] edition = "2018" homepage = "https://wasmcloud.dev" repository = "https://github.com/wasmcloud/capability-providers" description = "Redis Event Streams capability provider for the wasmCloud host runtime" license = "Apache-2.0" -documentation = "https://docs.rs/wascc-host" +documentation = "https://docs.rs/wasmcloud-streams-redis" readme = "README.md" keywords = ["webassembly", "events", "stream", "wasmcloud", "redis"] categories = ["wasm", "api-bindings", "database"] @@ -21,7 +21,7 @@ static_plugin = [] [dependencies] wasmcloud-provider-core = "0.1.0" -wasmcloud-actor-eventstreams = "0.1.0" +wasmcloud-actor-eventstreams = "0.2.0" wasmcloud-actor-core = "0.2.2" log = "0.4.8" env_logger = "0.8.3" diff --git a/redis-streams/src/lib.rs b/redis-streams/src/lib.rs index 259466f3..6f6242e8 100644 --- a/redis-streams/src/lib.rs +++ b/redis-streams/src/lib.rs @@ -28,10 +28,6 @@ use std::{ #[allow(unused)] const CAPABILITY_ID: &str = "wasmcloud:eventstreams"; -const OP_WRITE_EVENT: &str = "WriteEvent"; -const _OP_DELIVER_EVENT: &str = "DeliverEvent"; // Currently unused by this provider -const OP_QUERY_STREAM: &str = "QueryStream"; - #[cfg(not(feature = "static_plugin"))] capability_provider!(RedisStreamsProvider, RedisStreamsProvider::new); @@ -98,8 +94,17 @@ impl RedisStreamsProvider { event: Event, ) -> Result, Box> { let data = map_to_tuples(event.values); - let res: String = self.actor_con(actor)?.xadd(event.stream_id, "*", &data)?; - Ok(serialize(res)?) + let ack = match self.actor_con(actor)?.xadd(event.stream_id, "*", &data) { + Ok(res) => EventAck { + error: None, + event_id: Some(res), + }, + Err(e) => EventAck { + error: Some(format!("{}", e)), + event_id: None, + }, + }; + Ok(serialize(ack)?) } fn query_stream( @@ -145,8 +150,9 @@ impl RedisStreamsProvider { values: newmap, }); } + let list = EventList { events }; - Ok(serialize(events)?) + Ok(serialize(list)?) } } @@ -250,9 +256,11 @@ mod test { values: gen_values(), }; let buf = serialize(&ev).unwrap(); - let _res = prov + let res = prov .handle_call("testing-actor", OP_WRITE_EVENT, &buf) .unwrap(); + let evtack: EventAck = deserialize(&res).unwrap(); + assert!(evtack.event_id.is_some()); } let query = StreamQuery { @@ -264,9 +272,9 @@ mod test { let res = prov .handle_call("testing-actor", OP_QUERY_STREAM, &buf) .unwrap(); - let query_res = deserialize::>(res.as_ref()).unwrap(); - assert_eq!(6, query_res.len()); - assert_eq!(query_res[0].values["scruffy-looking"], "nerf-herder"); + let query_res: EventList = deserialize(res.as_ref()).unwrap(); + assert_eq!(6, query_res.events.len()); + assert_eq!(query_res.events[0].values["scruffy-looking"], "nerf-herder"); let _res: bool = c.get_connection().unwrap().del("my-stream").unwrap(); // make sure we start with an empty stream }