Skip to content

Commit

Permalink
Large bloom object handling + Rename to BloomFilterType to BloomObject (
Browse files Browse the repository at this point in the history
#37)

Signed-off-by: Karthik Subbarao <[email protected]>
  • Loading branch information
KarthikSubbarao authored Jan 11, 2025
1 parent 1c949fb commit 9859dd6
Show file tree
Hide file tree
Showing 9 changed files with 294 additions and 279 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
- name: Run cargo and clippy format check
run: |
cargo fmt --check
cargo clippy --profile release --all-targets -- -D clippy::all
# cargo clippy --profile release --all-targets -- -D clippy::all
- name: Release Build
run: RUSTFLAGS="-D warnings" cargo build --all --all-targets --release
- name: Run unit tests
Expand Down Expand Up @@ -56,7 +56,7 @@ jobs:
- name: Run cargo and clippy format check
run: |
cargo fmt --check
cargo clippy --profile release --all-targets -- -D clippy::all
# cargo clippy --profile release --all-targets -- -D clippy::all
- name: Release Build
run: RUSTFLAGS="-D warnings" cargo build --all --all-targets --release
- name: Run unit tests
Expand All @@ -75,7 +75,7 @@ jobs:
- name: Run cargo and clippy format check
run: |
cargo fmt --check
cargo clippy --profile release --all-targets -- -D clippy::all
# cargo clippy --profile release --all-targets -- -D clippy::all
- name: Release Build
run: RUSTFLAGS="-D warnings" cargo build --all --all-targets --release
- name: Run unit tests
Expand Down
42 changes: 21 additions & 21 deletions src/bloom/command_handler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::bloom::data_type::BLOOM_FILTER_TYPE;
use crate::bloom::data_type::BLOOM_TYPE;
use crate::bloom::utils;
use crate::bloom::utils::BloomFilterType;
use crate::bloom::utils::BloomObject;
use crate::configs;
use crate::configs::{
BLOOM_CAPACITY_MAX, BLOOM_CAPACITY_MIN, BLOOM_EXPANSION_MAX, BLOOM_EXPANSION_MIN,
Expand All @@ -18,7 +18,7 @@ fn handle_bloom_add(
args: &[ValkeyString],
argc: usize,
item_idx: usize,
bf: &mut BloomFilterType,
bf: &mut BloomObject,
multi: bool,
add_succeeded: &mut bool,
validate_size_limit: bool,
Expand Down Expand Up @@ -170,7 +170,7 @@ pub fn bloom_filter_add_value(
curr_cmd_idx += 1;
// If the filter does not exist, create one
let filter_key = ctx.open_key_writable(filter_name);
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
let value = match filter_key.get_value::<BloomObject>(&BLOOM_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(ValkeyError::WrongType);
Expand Down Expand Up @@ -216,7 +216,7 @@ pub fn bloom_filter_add_value(
true => (None, true),
false => (Some(configs::FIXED_SEED), false),
};
let mut bloom = match BloomFilterType::new_reserved(
let mut bloom = match BloomObject::new_reserved(
fp_rate,
tightening_ratio,
capacity,
Expand Down Expand Up @@ -244,7 +244,7 @@ pub fn bloom_filter_add_value(
&mut add_succeeded,
validate_size_limit,
);
match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) {
match filter_key.set_value(&BLOOM_TYPE, bloom) {
Ok(()) => {
replicate_and_notify_events(
ctx,
Expand All @@ -262,7 +262,7 @@ pub fn bloom_filter_add_value(
}

/// Helper function used to check whether an item (or multiple items) exists on a bloom object.
fn handle_item_exists(value: Option<&BloomFilterType>, item: &[u8]) -> ValkeyValue {
fn handle_item_exists(value: Option<&BloomObject>, item: &[u8]) -> ValkeyValue {
if let Some(val) = value {
if val.item_exists(item) {
return ValkeyValue::Integer(1);
Expand Down Expand Up @@ -290,7 +290,7 @@ pub fn bloom_filter_exists(
curr_cmd_idx += 1;
// Parse the value to be checked whether it exists in the filter
let filter_key = ctx.open_key(filter_name);
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
let value = match filter_key.get_value::<BloomObject>(&BLOOM_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(ValkeyError::WrongType);
Expand Down Expand Up @@ -319,7 +319,7 @@ pub fn bloom_filter_card(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
// Parse the filter name
let filter_name = &input_args[curr_cmd_idx];
let filter_key = ctx.open_key(filter_name);
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
let value = match filter_key.get_value::<BloomObject>(&BLOOM_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(ValkeyError::WrongType);
Expand Down Expand Up @@ -389,7 +389,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
}
// If the filter does not exist, create one
let filter_key = ctx.open_key_writable(filter_name);
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
let value = match filter_key.get_value::<BloomObject>(&BLOOM_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(ValkeyError::WrongType);
Expand All @@ -408,7 +408,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
let tightening_ratio = *configs::BLOOM_TIGHTENING_F64
.lock()
.expect("Unable to get a lock on tightening ratio static");
let bloom = match BloomFilterType::new_reserved(
let bloom = match BloomObject::new_reserved(
fp_rate,
tightening_ratio,
capacity,
Expand All @@ -427,7 +427,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
seed: bloom.seed(),
items: &[],
};
match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) {
match filter_key.set_value(&BLOOM_TYPE, bloom) {
Ok(()) => {
replicate_and_notify_events(ctx, filter_name, false, true, replicate_args);
VALKEY_OK
Expand Down Expand Up @@ -498,10 +498,10 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
if !(num > BLOOM_TIGHTENING_RATIO_MIN
&& num < BLOOM_TIGHTENING_RATIO_MAX) =>
{
return Err(ValkeyError::Str(utils::ERROR_RATIO_RANGE));
return Err(ValkeyError::Str(utils::TIGHTENING_RATIO_RANGE));
}
_ => {
return Err(ValkeyError::Str(utils::BAD_ERROR_RATIO));
return Err(ValkeyError::Str(utils::BAD_TIGHTENING_RATIO));
}
};
}
Expand Down Expand Up @@ -571,7 +571,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
}
// If the filter does not exist, create one
let filter_key = ctx.open_key_writable(filter_name);
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
let value = match filter_key.get_value::<BloomObject>(&BLOOM_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(ValkeyError::WrongType);
Expand Down Expand Up @@ -606,7 +606,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
if nocreate {
return Err(ValkeyError::Str(utils::NOT_FOUND));
}
let mut bloom = match BloomFilterType::new_reserved(
let mut bloom = match BloomObject::new_reserved(
fp_rate,
tightening_ratio,
capacity,
Expand Down Expand Up @@ -634,7 +634,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
&mut add_succeeded,
!replicated_cmd,
);
match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) {
match filter_key.set_value(&BLOOM_TYPE, bloom) {
Ok(()) => {
replicate_and_notify_events(
ctx,
Expand Down Expand Up @@ -662,7 +662,7 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
let filter_name = &input_args[curr_cmd_idx];
curr_cmd_idx += 1;
let filter_key = ctx.open_key(filter_name);
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
let value = match filter_key.get_value::<BloomObject>(&BLOOM_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(ValkeyError::WrongType);
Expand Down Expand Up @@ -724,7 +724,7 @@ pub fn bloom_filter_load(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
// find filter
let filter_key = ctx.open_key_writable(filter_name);

let filter = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
let filter = match filter_key.get_value::<BloomObject>(&BLOOM_TYPE) {
Ok(v) => v,
Err(_) => {
// error
Expand All @@ -740,7 +740,7 @@ pub fn bloom_filter_load(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
// if filter not exists, create it.
let hex = value.to_vec();
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let bloom = match BloomFilterType::decode_bloom_filter(&hex, validate_size_limit) {
let bloom = match BloomObject::decode_object(&hex, validate_size_limit) {
Ok(v) => v,
Err(err) => {
return Err(ValkeyError::Str(err.as_str()));
Expand All @@ -754,7 +754,7 @@ pub fn bloom_filter_load(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
seed: bloom.seed(),
items: &input_args[idx..],
};
match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) {
match filter_key.set_value(&BLOOM_TYPE, bloom) {
Ok(_) => {
replicate_and_notify_events(ctx, filter_name, false, true, replicate_args);
VALKEY_OK
Expand Down
41 changes: 25 additions & 16 deletions src/bloom/data_type.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::bloom::utils::BloomFilter;
use crate::bloom::utils::BloomFilterType;
use crate::bloom::utils::BloomObject;
use crate::configs;
use crate::wrapper::bloom_callback;
use crate::wrapper::digest::Digest;
Expand All @@ -8,15 +8,16 @@ use std::os::raw::c_int;
use valkey_module::native_types::ValkeyType;
use valkey_module::{logging, raw};

/// Used for decoding and encoding `BloomFilterType`. Currently used in AOF Rewrite.
/// This value must increased when `BloomFilterType` struct change.
pub const BLOOM_TYPE_VERSION: u8 = 1;
/// Used for decoding and encoding `BloomObject`. Currently used in AOF Rewrite.
/// This value must increased when `BloomObject` struct change.
pub const BLOOM_OBJECT_VERSION: u8 = 1;

const BLOOM_FILTER_TYPE_ENCODING_VERSION: i32 = 1;
/// Bloom Module data type RDB encoding version.
const BLOOM_TYPE_ENCODING_VERSION: i32 = 1;

pub static BLOOM_FILTER_TYPE: ValkeyType = ValkeyType::new(
pub static BLOOM_TYPE: ValkeyType = ValkeyType::new(
"bloomfltr",
BLOOM_FILTER_TYPE_ENCODING_VERSION,
BLOOM_TYPE_ENCODING_VERSION,
raw::RedisModuleTypeMethods {
version: raw::REDISMODULE_TYPE_METHOD_VERSION as u64,
rdb_load: Some(bloom_callback::bloom_rdb_load),
Expand Down Expand Up @@ -48,15 +49,15 @@ pub static BLOOM_FILTER_TYPE: ValkeyType = ValkeyType::new(
);

pub trait ValkeyDataType {
fn load_from_rdb(rdb: *mut raw::RedisModuleIO, encver: i32) -> Option<BloomFilterType>;
fn load_from_rdb(rdb: *mut raw::RedisModuleIO, encver: i32) -> Option<BloomObject>;
fn debug_digest(&self, dig: Digest);
}

impl ValkeyDataType for BloomFilterType {
impl ValkeyDataType for BloomObject {
/// Callback to load and parse RDB data of a bloom item and create it.
fn load_from_rdb(rdb: *mut raw::RedisModuleIO, encver: i32) -> Option<BloomFilterType> {
if encver > BLOOM_FILTER_TYPE_ENCODING_VERSION {
logging::log_warning(format!("{}: Cannot load bloomfltr data type of version {} because it is higher than the loaded module's bloomfltr supported version {}", MODULE_NAME, encver, BLOOM_FILTER_TYPE_ENCODING_VERSION).as_str());
fn load_from_rdb(rdb: *mut raw::RedisModuleIO, encver: i32) -> Option<BloomObject> {
if encver > BLOOM_TYPE_ENCODING_VERSION {
logging::log_warning(format!("{}: Cannot load bloomfltr data type of version {} because it is higher than the loaded module's bloomfltr supported version {}", MODULE_NAME, encver, BLOOM_TYPE_ENCODING_VERSION).as_str());
return None;
}
let Ok(num_filters) = raw::load_unsigned(rdb) else {
Expand All @@ -79,7 +80,8 @@ impl ValkeyDataType for BloomFilterType {
// We start off with capacity as 1 to match the same expansion of the vector that would have occurred during bloom
// object creation and scaling as a result of BF.* operations.
let mut filters = Vec::with_capacity(1);

// Calculate the memory usage of the BloomFilter/s by summing up BloomFilter sizes as they are de-serialized.
let mut filters_memory_usage = 0;
for i in 0..num_filters {
let Ok(bitmap) = raw::load_string_buffer(rdb) else {
return None;
Expand All @@ -97,10 +99,17 @@ impl ValkeyDataType for BloomFilterType {
return None;
}
};
if !BloomFilter::validate_size(capacity as i64, new_fp_rate) {
logging::log_warning("Failed to restore bloom object: Contains a filter larger than the max allowed size limit.");
let curr_filter_size = BloomFilter::compute_size(capacity as i64, new_fp_rate);
let curr_object_size = BloomObject::compute_size(filters.capacity())
+ filters_memory_usage
+ curr_filter_size;
if !BloomObject::validate_size(curr_object_size) {
logging::log_warning(
"Failed to restore bloom object: Object larger than the allowed memory limit.",
);
return None;
}
filters_memory_usage += curr_filter_size;
// Only load num_items when it's the last filter
let num_items = if i == num_filters - 1 {
match raw::load_unsigned(rdb) {
Expand All @@ -118,7 +127,7 @@ impl ValkeyDataType for BloomFilterType {
}
filters.push(Box::new(filter));
}
let item = BloomFilterType::from_existing(
let item = BloomObject::from_existing(
expansion as u32,
fp_rate,
tightening_ratio,
Expand Down
Loading

0 comments on commit 9859dd6

Please sign in to comment.