Skip to content
This repository has been archived by the owner on Jun 10, 2024. It is now read-only.

Commit

Permalink
Upgrading to the new eventstreams interface (#49)
Browse files Browse the repository at this point in the history
* Upgrading to the new eventstreams interface

* Better error handling on write event
  • Loading branch information
autodidaddict authored Mar 10, 2021
1 parent cb44d84 commit 436ba86
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 14 deletions.
6 changes: 3 additions & 3 deletions redis-streams/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
[package]
name = "wasmcloud-streams-redis"
version = "0.4.1"
version = "0.5.0"
authors = ["wasmCloud Team"]
edition = "2018"
homepage = "https://wasmcloud.dev"
repository = "https://github.com/wasmcloud/capability-providers"
description = "Redis Event Streams capability provider for the wasmCloud host runtime"
license = "Apache-2.0"
documentation = "https://docs.rs/wascc-host"
documentation = "https://docs.rs/wasmcloud-streams-redis"
readme = "README.md"
keywords = ["webassembly", "events", "stream", "wasmcloud", "redis"]
categories = ["wasm", "api-bindings", "database"]
Expand All @@ -21,7 +21,7 @@ static_plugin = []

[dependencies]
wasmcloud-provider-core = "0.1.0"
wasmcloud-actor-eventstreams = "0.1.0"
wasmcloud-actor-eventstreams = "0.2.0"
wasmcloud-actor-core = "0.2.2"
log = "0.4.8"
env_logger = "0.8.3"
Expand Down
30 changes: 19 additions & 11 deletions redis-streams/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ use std::{
#[allow(unused)]
const CAPABILITY_ID: &str = "wasmcloud:eventstreams";

const OP_WRITE_EVENT: &str = "WriteEvent";
const _OP_DELIVER_EVENT: &str = "DeliverEvent"; // Currently unused by this provider
const OP_QUERY_STREAM: &str = "QueryStream";

#[cfg(not(feature = "static_plugin"))]
capability_provider!(RedisStreamsProvider, RedisStreamsProvider::new);

Expand Down Expand Up @@ -98,8 +94,17 @@ impl RedisStreamsProvider {
event: Event,
) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
let data = map_to_tuples(event.values);
let res: String = self.actor_con(actor)?.xadd(event.stream_id, "*", &data)?;
Ok(serialize(res)?)
let ack = match self.actor_con(actor)?.xadd(event.stream_id, "*", &data) {
Ok(res) => EventAck {
error: None,
event_id: Some(res),
},
Err(e) => EventAck {
error: Some(format!("{}", e)),
event_id: None,
},
};
Ok(serialize(ack)?)
}

fn query_stream(
Expand Down Expand Up @@ -145,8 +150,9 @@ impl RedisStreamsProvider {
values: newmap,
});
}
let list = EventList { events };

Ok(serialize(events)?)
Ok(serialize(list)?)
}
}

Expand Down Expand Up @@ -250,9 +256,11 @@ mod test {
values: gen_values(),
};
let buf = serialize(&ev).unwrap();
let _res = prov
let res = prov
.handle_call("testing-actor", OP_WRITE_EVENT, &buf)
.unwrap();
let evtack: EventAck = deserialize(&res).unwrap();
assert!(evtack.event_id.is_some());
}

let query = StreamQuery {
Expand All @@ -264,9 +272,9 @@ mod test {
let res = prov
.handle_call("testing-actor", OP_QUERY_STREAM, &buf)
.unwrap();
let query_res = deserialize::<Vec<Event>>(res.as_ref()).unwrap();
assert_eq!(6, query_res.len());
assert_eq!(query_res[0].values["scruffy-looking"], "nerf-herder");
let query_res: EventList = deserialize(res.as_ref()).unwrap();
assert_eq!(6, query_res.events.len());
assert_eq!(query_res.events[0].values["scruffy-looking"], "nerf-herder");
let _res: bool = c.get_connection().unwrap().del("my-stream").unwrap(); // make sure we start with an empty stream
}

Expand Down

0 comments on commit 436ba86

Please sign in to comment.