Skip to content

Commit

Permalink
Merge branch 'main' into feat-jans-cedarling-sidecar
Browse files Browse the repository at this point in the history
Signed-off-by: SafinWasi <[email protected]>
  • Loading branch information
SafinWasi committed Jan 15, 2025
2 parents bf7a2c3 + 25c7a49 commit bb38223
Show file tree
Hide file tree
Showing 11 changed files with 686 additions and 379 deletions.
10 changes: 3 additions & 7 deletions jans-cedarling/cedarling/src/log/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub(crate) trait LogWriter {
pub(crate) trait Loggable: serde::Serialize {
/// get unique request ID
fn get_request_id(&self) -> Uuid;

/// get log level for entity
/// not all log entities have log level, only when `log_kind` == `System`
fn get_log_level(&self) -> Option<LogLevel>;
Expand All @@ -34,13 +35,8 @@ pub(crate) trait Loggable: serde::Serialize {
// is used to avoid boilerplate code
fn can_log(&self, logger_level: LogLevel) -> bool {
if let Some(entry_log_level) = self.get_log_level() {
if entry_log_level < logger_level {
// entry log level lower than logger level
false
} else {
// entry log higher or equal than logger level
true
}
// higher level is more important, ie closer to fatal
logger_level <= entry_log_level
} else {
// if `.get_log_level` return None
// it means that `log_kind` != `System` and we should log it
Expand Down
2 changes: 1 addition & 1 deletion jans-cedarling/cedarling/src/log/log_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ impl Loggable for &DecisionLogEntry<'_> {
// TODO: maybe using wasm we can use `js_sys::Date::now()`
// Static variable initialize only once at start of program and available during all program live cycle.
// Import inside function guarantee that it is used only inside function.
fn gen_uuid7() -> Uuid {
pub fn gen_uuid7() -> Uuid {
use std::sync::{LazyLock, Mutex};
use uuid7::V7Generator;

Expand Down
117 changes: 94 additions & 23 deletions jans-cedarling/cedarling/src/log/memory_logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@ use super::interface::{LogStorage, LogWriter, Loggable};
use crate::bootstrap_config::log_config::MemoryLogConfig;

const STORAGE_MUTEX_EXPECT_MESSAGE: &str = "MemoryLogger storage mutex should unlock";
const STORAGE_JSON_PARSE_EXPECT_MESSAGE: &str =
"In MemoryLogger storage value should be valid LogEntry json string";

/// A logger that store logs in-memory.
pub(crate) struct MemoryLogger {
storage: Mutex<SparKV>,
storage: Mutex<SparKV<serde_json::Value>>,
log_level: LogLevel,
}

Expand All @@ -40,6 +38,44 @@ impl MemoryLogger {
}
}

/// In case of failure in MemoryLogger, log to stderr where supported.
/// On WASM, stderr is not supported, so log to whatever the wasm logger uses.
mod fallback {
use crate::LogLevel;

/// conform to Loggable requirement imposed by LogStrategy
#[derive(serde::Serialize)]
struct StrWrap<'a>(&'a str);

impl crate::log::interface::Loggable for StrWrap<'_> {
fn get_request_id(&self) -> uuid7::Uuid {
crate::log::log_entry::gen_uuid7()
}

fn get_log_level(&self) -> Option<LogLevel> {
// These must always be logged.
Some(LogLevel::TRACE)
}
}

/// Fetch the correct logger. That takes some work, and it's done on every
/// call. But this is a fallback logger, so it is not intended to be used
/// often, and in this case correctness and non-fallibility are far more
/// important than performance.
pub fn log(msg: &str) {
let log_config = crate::bootstrap_config::LogConfig{
log_type: crate::bootstrap_config::log_config::LogTypeConfig::StdOut,
// level is so that all messages passed here are logged.
log_level: LogLevel::TRACE,
};
// This should always be a LogStrategy::StdOut(StdOutLogger)
let log_strategy = crate::log::LogStrategy::new(&log_config);
use crate::log::interface::LogWriter;
// a string is always serializable
log_strategy.log_any(StrWrap(msg))
}
}

// Implementation of LogWriter
impl LogWriter for MemoryLogger {
fn log_any<T: Loggable>(&self, entry: T) {
Expand All @@ -48,43 +84,43 @@ impl LogWriter for MemoryLogger {
return;
}

let json_string = serde_json::json!(entry).to_string();
let json = match serde_json::to_value(&entry) {
Ok(json) => json,
Err(err) => {
fallback::log(&format!("could not serialize LogEntry to serde_json::Value: {err:?}"));
return;
},
};

let result = self
let set_result = self
.storage
.lock()
.expect(STORAGE_MUTEX_EXPECT_MESSAGE)
.set(entry.get_request_id().to_string().as_str(), &json_string);
.set(&entry.get_request_id().to_string(), json);

if let Err(err) = result {
// log error to stderr
eprintln!("could not store LogEntry to memory: {err:?}");
if let Err(err) = set_result {
fallback::log(&format!("could not store LogEntry to memory: {err:?}"));
};
}
}

// Implementation of LogStorage
impl LogStorage for MemoryLogger {
fn pop_logs(&self) -> Vec<serde_json::Value> {
// TODO: implement more efficient implementation

let mut storage_guard = self.storage.lock().expect(STORAGE_MUTEX_EXPECT_MESSAGE);

let keys = storage_guard.get_keys();

keys.iter()
.filter_map(|key| storage_guard.pop(key))
// we call unwrap, because we know that the value is valid json
.map(|str_json| serde_json::from_str::<serde_json::Value>(str_json.as_str())
.expect(STORAGE_JSON_PARSE_EXPECT_MESSAGE))
self.storage
.lock()
.expect(STORAGE_MUTEX_EXPECT_MESSAGE)
.drain()
.map(|(_k, value)| value)
.collect()
}

fn get_log_by_id(&self, id: &str) -> Option<serde_json::Value> {
self.storage.lock().expect(STORAGE_MUTEX_EXPECT_MESSAGE)
self.storage
.lock()
.expect(STORAGE_MUTEX_EXPECT_MESSAGE)
.get(id)
// we call unwrap, because we know that the value is valid json
.map(|str_json| serde_json::from_str::<serde_json::Value>(str_json.as_str()).expect(STORAGE_JSON_PARSE_EXPECT_MESSAGE))
.cloned()
}

fn get_log_ids(&self) -> Vec<String> {
Expand Down Expand Up @@ -211,4 +247,39 @@ mod tests {
"Logs were not fully popped"
);
}

#[test]
fn fallback_logger() {
struct FailSerialize;

impl serde::Serialize for FailSerialize {
fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
where S: serde::Serializer {
Err(serde::ser::Error::custom("this always fails"))
}
}

impl crate::log::interface::Loggable for FailSerialize {
fn get_request_id(&self) -> uuid7::Uuid {
crate::log::log_entry::gen_uuid7()
}

fn get_log_level(&self) -> Option<LogLevel> {
// These must always be logged.
Some(LogLevel::TRACE)
}
}

let logger = create_memory_logger();
logger.log_any(FailSerialize);

// There isn't a good way, in unit tests, to verify the output was
// actually written to stderr/json console.
//
// To eyeball-verify it:
// cargo test -- --nocapture fall
// and look in the output for
// "could not serialize LogEntry to serde_json::Value: Error(\"this always fails\", line: 0, column: 0)"
assert!(logger.pop_logs().is_empty(), "logger should be empty");
}
}
3 changes: 3 additions & 0 deletions jans-cedarling/sparkv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,6 @@ homepage = "https://crates.io/crates/sparkv"
[dependencies]
thiserror = { workspace = true }
chrono = { workspace = true }

[dev-dependencies]
serde_json = "*"
2 changes: 1 addition & 1 deletion jans-cedarling/sparkv/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ sparkv.set("your-key", "your-value"); // write
let value = sparkv.get("your-key").unwrap(); // read

// Write with unique TTL
sparkv.set_with_ttl("diff-ttl", "your-value", chrono::Duration::new(60, 0));
sparkv.set_with_ttl("diff-ttl", "your-value", chrono::Duration::seconds(60));
```

See `config.rs` for more configuration options.
Expand Down
14 changes: 4 additions & 10 deletions jans-cedarling/sparkv/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ impl Config {
Config {
max_items: 10_000,
max_item_size: 500_000,
max_ttl: Duration::new(60 * 60, 0).expect("a valid duration"),
default_ttl: Duration::new(5 * 60, 0).expect("a valid duration"), // 5 minutes
max_ttl: Duration::seconds(60 * 60),
default_ttl: Duration::seconds(5 * 60), // 5 minutes
auto_clear_expired: true,
}
}
Expand All @@ -43,14 +43,8 @@ mod tests {
let config: Config = Config::new();
assert_eq!(config.max_items, 10_000);
assert_eq!(config.max_item_size, 500_000);
assert_eq!(
config.max_ttl,
Duration::new(60 * 60, 0).expect("a valid duration")
);
assert_eq!(
config.default_ttl,
Duration::new(5 * 60, 0).expect("a valid duration")
);
assert_eq!(config.max_ttl, Duration::seconds(60 * 60));
assert_eq!(config.default_ttl, Duration::seconds(5 * 60));
assert!(config.auto_clear_expired);
}
}
24 changes: 10 additions & 14 deletions jans-cedarling/sparkv/src/expentry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ pub struct ExpEntry {
}

impl ExpEntry {
pub fn new(key: &str, expiration: Duration) -> Self {
pub fn new<S: AsRef<str>>(key: S, expiration: Duration) -> Self {
let expired_at: DateTime<Utc> = Utc::now() + expiration;
Self {
key: String::from(key),
key: key.as_ref().into(),
expired_at,
}
}

pub fn from_kv_entry(kv_entry: &KvEntry) -> Self {
pub fn from_kv_entry<T>(kv_entry: &KvEntry<T>) -> Self {
Self {
key: kv_entry.key.clone(),
expired_at: kv_entry.expired_at,
Expand Down Expand Up @@ -59,35 +59,31 @@ mod tests {

#[test]
fn test_new() {
let item = ExpEntry::new("key", Duration::new(10, 0).expect("a valid duration"));
let item = ExpEntry::new("key", Duration::seconds(10));
assert_eq!(item.key, "key");
assert!(item.expired_at > Utc::now() + Duration::new(9, 0).expect("a valid duration"));
assert!(item.expired_at <= Utc::now() + Duration::new(10, 0).expect("a valid duration"));
assert!(item.expired_at > Utc::now() + Duration::seconds(9));
assert!(item.expired_at <= Utc::now() + Duration::seconds(10));
}

#[test]
fn test_from_kventry() {
let kv_entry = KvEntry::new(
"keyFromKV",
"value from KV",
Duration::new(10, 0).expect("a valid duration"),
);
let kv_entry = KvEntry::new("keyFromKV", "value from KV", Duration::seconds(10));
let exp_item = ExpEntry::from_kv_entry(&kv_entry);
assert_eq!(exp_item.key, "keyFromKV");
assert_eq!(exp_item.expired_at, kv_entry.expired_at);
}

#[test]
fn test_cmp() {
let item_small = ExpEntry::new("k1", Duration::new(10, 0).expect("a valid duration"));
let item_big = ExpEntry::new("k2", Duration::new(8000, 0).expect("a valid duration"));
let item_small = ExpEntry::new("k1", Duration::seconds(10));
let item_big = ExpEntry::new("k2", Duration::seconds(8000));
assert!(item_small > item_big); // reverse order
assert!(item_big < item_small); // reverse order
}

#[test]
fn test_is_expired() {
let item = ExpEntry::new("k1", Duration::new(0, 100).expect("a valid duration"));
let item = ExpEntry::new("k1", Duration::seconds(0));
std::thread::sleep(std::time::Duration::from_nanos(200));
assert!(item.is_expired());
}
Expand Down
22 changes: 9 additions & 13 deletions jans-cedarling/sparkv/src/kventry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@ use chrono::Duration;
use chrono::prelude::*;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct KvEntry {
pub struct KvEntry<T> {
pub key: String,
pub value: String,
pub value: T,
pub expired_at: DateTime<Utc>,
}

impl KvEntry {
pub fn new(key: &str, value: &str, expiration: Duration) -> Self {
impl<T> KvEntry<T> {
pub fn new<S: AsRef<str>>(key: S, value: T, expiration: Duration) -> Self {
let expired_at: DateTime<Utc> = Utc::now() + expiration;
Self {
key: String::from(key),
value: String::from(value),
key: key.as_ref().into(),
value,
expired_at,
}
}
Expand All @@ -31,14 +31,10 @@ mod tests {

#[test]
fn test_new() {
let item = KvEntry::new(
"key",
"value",
Duration::new(10, 0).expect("a valid duration"),
);
let item = KvEntry::<String>::new("key", "value".into(), Duration::seconds(10));
assert_eq!(item.key, "key");
assert_eq!(item.value, "value");
assert!(item.expired_at > Utc::now() + Duration::new(9, 0).expect("a valid duration"));
assert!(item.expired_at <= Utc::now() + Duration::new(10, 0).expect("a valid duration"));
assert!(item.expired_at > Utc::now() + Duration::seconds(9));
assert!(item.expired_at <= Utc::now() + Duration::seconds(10));
}
}
Loading

0 comments on commit bb38223

Please sign in to comment.