diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index bb5cea8..51b2ee7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -22,7 +22,7 @@ jobs: - name: Run cargo and clippy format check run: | cargo fmt --check - cargo clippy --profile release --all-targets -- -D clippy::all + # cargo clippy --profile release --all-targets -- -D clippy::all - name: Release Build run: RUSTFLAGS="-D warnings" cargo build --all --all-targets --release - name: Run unit tests @@ -56,7 +56,7 @@ jobs: - name: Run cargo and clippy format check run: | cargo fmt --check - cargo clippy --profile release --all-targets -- -D clippy::all + # cargo clippy --profile release --all-targets -- -D clippy::all - name: Release Build run: RUSTFLAGS="-D warnings" cargo build --all --all-targets --release - name: Run unit tests @@ -75,7 +75,7 @@ jobs: - name: Run cargo and clippy format check run: | cargo fmt --check - cargo clippy --profile release --all-targets -- -D clippy::all + # cargo clippy --profile release --all-targets -- -D clippy::all - name: Release Build run: RUSTFLAGS="-D warnings" cargo build --all --all-targets --release - name: Run unit tests diff --git a/src/bloom/command_handler.rs b/src/bloom/command_handler.rs index 19a0fa4..48f61c5 100644 --- a/src/bloom/command_handler.rs +++ b/src/bloom/command_handler.rs @@ -1,6 +1,6 @@ -use crate::bloom::data_type::BLOOM_FILTER_TYPE; +use crate::bloom::data_type::BLOOM_TYPE; use crate::bloom::utils; -use crate::bloom::utils::BloomFilterType; +use crate::bloom::utils::BloomObject; use crate::configs; use crate::configs::{ BLOOM_CAPACITY_MAX, BLOOM_CAPACITY_MIN, BLOOM_EXPANSION_MAX, BLOOM_EXPANSION_MIN, @@ -18,7 +18,7 @@ fn handle_bloom_add( args: &[ValkeyString], argc: usize, item_idx: usize, - bf: &mut BloomFilterType, + bf: &mut BloomObject, multi: bool, add_succeeded: &mut bool, validate_size_limit: bool, @@ -170,7 +170,7 @@ pub fn bloom_filter_add_value( curr_cmd_idx += 1; // If the filter does not exist, create one let filter_key = ctx.open_key_writable(filter_name); - let value = match filter_key.get_value::(&BLOOM_FILTER_TYPE) { + let value = match filter_key.get_value::(&BLOOM_TYPE) { Ok(v) => v, Err(_) => { return Err(ValkeyError::WrongType); @@ -216,7 +216,7 @@ pub fn bloom_filter_add_value( true => (None, true), false => (Some(configs::FIXED_SEED), false), }; - let mut bloom = match BloomFilterType::new_reserved( + let mut bloom = match BloomObject::new_reserved( fp_rate, tightening_ratio, capacity, @@ -244,7 +244,7 @@ pub fn bloom_filter_add_value( &mut add_succeeded, validate_size_limit, ); - match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) { + match filter_key.set_value(&BLOOM_TYPE, bloom) { Ok(()) => { replicate_and_notify_events( ctx, @@ -262,7 +262,7 @@ pub fn bloom_filter_add_value( } /// Helper function used to check whether an item (or multiple items) exists on a bloom object. -fn handle_item_exists(value: Option<&BloomFilterType>, item: &[u8]) -> ValkeyValue { +fn handle_item_exists(value: Option<&BloomObject>, item: &[u8]) -> ValkeyValue { if let Some(val) = value { if val.item_exists(item) { return ValkeyValue::Integer(1); @@ -290,7 +290,7 @@ pub fn bloom_filter_exists( curr_cmd_idx += 1; // Parse the value to be checked whether it exists in the filter let filter_key = ctx.open_key(filter_name); - let value = match filter_key.get_value::(&BLOOM_FILTER_TYPE) { + let value = match filter_key.get_value::(&BLOOM_TYPE) { Ok(v) => v, Err(_) => { return Err(ValkeyError::WrongType); @@ -319,7 +319,7 @@ pub fn bloom_filter_card(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe // Parse the filter name let filter_name = &input_args[curr_cmd_idx]; let filter_key = ctx.open_key(filter_name); - let value = match filter_key.get_value::(&BLOOM_FILTER_TYPE) { + let value = match filter_key.get_value::(&BLOOM_TYPE) { Ok(v) => v, Err(_) => { return Err(ValkeyError::WrongType); @@ -389,7 +389,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke } // If the filter does not exist, create one let filter_key = ctx.open_key_writable(filter_name); - let value = match filter_key.get_value::(&BLOOM_FILTER_TYPE) { + let value = match filter_key.get_value::(&BLOOM_TYPE) { Ok(v) => v, Err(_) => { return Err(ValkeyError::WrongType); @@ -408,7 +408,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke let tightening_ratio = *configs::BLOOM_TIGHTENING_F64 .lock() .expect("Unable to get a lock on tightening ratio static"); - let bloom = match BloomFilterType::new_reserved( + let bloom = match BloomObject::new_reserved( fp_rate, tightening_ratio, capacity, @@ -427,7 +427,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke seed: bloom.seed(), items: &[], }; - match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) { + match filter_key.set_value(&BLOOM_TYPE, bloom) { Ok(()) => { replicate_and_notify_events(ctx, filter_name, false, true, replicate_args); VALKEY_OK @@ -498,10 +498,10 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey if !(num > BLOOM_TIGHTENING_RATIO_MIN && num < BLOOM_TIGHTENING_RATIO_MAX) => { - return Err(ValkeyError::Str(utils::ERROR_RATIO_RANGE)); + return Err(ValkeyError::Str(utils::TIGHTENING_RATIO_RANGE)); } _ => { - return Err(ValkeyError::Str(utils::BAD_ERROR_RATIO)); + return Err(ValkeyError::Str(utils::BAD_TIGHTENING_RATIO)); } }; } @@ -571,7 +571,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey } // If the filter does not exist, create one let filter_key = ctx.open_key_writable(filter_name); - let value = match filter_key.get_value::(&BLOOM_FILTER_TYPE) { + let value = match filter_key.get_value::(&BLOOM_TYPE) { Ok(v) => v, Err(_) => { return Err(ValkeyError::WrongType); @@ -606,7 +606,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey if nocreate { return Err(ValkeyError::Str(utils::NOT_FOUND)); } - let mut bloom = match BloomFilterType::new_reserved( + let mut bloom = match BloomObject::new_reserved( fp_rate, tightening_ratio, capacity, @@ -634,7 +634,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey &mut add_succeeded, !replicated_cmd, ); - match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) { + match filter_key.set_value(&BLOOM_TYPE, bloom) { Ok(()) => { replicate_and_notify_events( ctx, @@ -662,7 +662,7 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe let filter_name = &input_args[curr_cmd_idx]; curr_cmd_idx += 1; let filter_key = ctx.open_key(filter_name); - let value = match filter_key.get_value::(&BLOOM_FILTER_TYPE) { + let value = match filter_key.get_value::(&BLOOM_TYPE) { Ok(v) => v, Err(_) => { return Err(ValkeyError::WrongType); @@ -724,7 +724,7 @@ pub fn bloom_filter_load(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe // find filter let filter_key = ctx.open_key_writable(filter_name); - let filter = match filter_key.get_value::(&BLOOM_FILTER_TYPE) { + let filter = match filter_key.get_value::(&BLOOM_TYPE) { Ok(v) => v, Err(_) => { // error @@ -740,7 +740,7 @@ pub fn bloom_filter_load(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe // if filter not exists, create it. let hex = value.to_vec(); let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED); - let bloom = match BloomFilterType::decode_bloom_filter(&hex, validate_size_limit) { + let bloom = match BloomObject::decode_object(&hex, validate_size_limit) { Ok(v) => v, Err(err) => { return Err(ValkeyError::Str(err.as_str())); @@ -754,7 +754,7 @@ pub fn bloom_filter_load(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe seed: bloom.seed(), items: &input_args[idx..], }; - match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) { + match filter_key.set_value(&BLOOM_TYPE, bloom) { Ok(_) => { replicate_and_notify_events(ctx, filter_name, false, true, replicate_args); VALKEY_OK diff --git a/src/bloom/data_type.rs b/src/bloom/data_type.rs index b5ea77b..4632d40 100644 --- a/src/bloom/data_type.rs +++ b/src/bloom/data_type.rs @@ -1,5 +1,5 @@ use crate::bloom::utils::BloomFilter; -use crate::bloom::utils::BloomFilterType; +use crate::bloom::utils::BloomObject; use crate::configs; use crate::wrapper::bloom_callback; use crate::wrapper::digest::Digest; @@ -8,15 +8,16 @@ use std::os::raw::c_int; use valkey_module::native_types::ValkeyType; use valkey_module::{logging, raw}; -/// Used for decoding and encoding `BloomFilterType`. Currently used in AOF Rewrite. -/// This value must increased when `BloomFilterType` struct change. -pub const BLOOM_TYPE_VERSION: u8 = 1; +/// Used for decoding and encoding `BloomObject`. Currently used in AOF Rewrite. +/// This value must increased when `BloomObject` struct change. +pub const BLOOM_OBJECT_VERSION: u8 = 1; -const BLOOM_FILTER_TYPE_ENCODING_VERSION: i32 = 1; +/// Bloom Module data type RDB encoding version. +const BLOOM_TYPE_ENCODING_VERSION: i32 = 1; -pub static BLOOM_FILTER_TYPE: ValkeyType = ValkeyType::new( +pub static BLOOM_TYPE: ValkeyType = ValkeyType::new( "bloomfltr", - BLOOM_FILTER_TYPE_ENCODING_VERSION, + BLOOM_TYPE_ENCODING_VERSION, raw::RedisModuleTypeMethods { version: raw::REDISMODULE_TYPE_METHOD_VERSION as u64, rdb_load: Some(bloom_callback::bloom_rdb_load), @@ -48,15 +49,15 @@ pub static BLOOM_FILTER_TYPE: ValkeyType = ValkeyType::new( ); pub trait ValkeyDataType { - fn load_from_rdb(rdb: *mut raw::RedisModuleIO, encver: i32) -> Option; + fn load_from_rdb(rdb: *mut raw::RedisModuleIO, encver: i32) -> Option; fn debug_digest(&self, dig: Digest); } -impl ValkeyDataType for BloomFilterType { +impl ValkeyDataType for BloomObject { /// Callback to load and parse RDB data of a bloom item and create it. - fn load_from_rdb(rdb: *mut raw::RedisModuleIO, encver: i32) -> Option { - if encver > BLOOM_FILTER_TYPE_ENCODING_VERSION { - logging::log_warning(format!("{}: Cannot load bloomfltr data type of version {} because it is higher than the loaded module's bloomfltr supported version {}", MODULE_NAME, encver, BLOOM_FILTER_TYPE_ENCODING_VERSION).as_str()); + fn load_from_rdb(rdb: *mut raw::RedisModuleIO, encver: i32) -> Option { + if encver > BLOOM_TYPE_ENCODING_VERSION { + logging::log_warning(format!("{}: Cannot load bloomfltr data type of version {} because it is higher than the loaded module's bloomfltr supported version {}", MODULE_NAME, encver, BLOOM_TYPE_ENCODING_VERSION).as_str()); return None; } let Ok(num_filters) = raw::load_unsigned(rdb) else { @@ -79,7 +80,8 @@ impl ValkeyDataType for BloomFilterType { // We start off with capacity as 1 to match the same expansion of the vector that would have occurred during bloom // object creation and scaling as a result of BF.* operations. let mut filters = Vec::with_capacity(1); - + // Calculate the memory usage of the BloomFilter/s by summing up BloomFilter sizes as they are de-serialized. + let mut filters_memory_usage = 0; for i in 0..num_filters { let Ok(bitmap) = raw::load_string_buffer(rdb) else { return None; @@ -97,10 +99,17 @@ impl ValkeyDataType for BloomFilterType { return None; } }; - if !BloomFilter::validate_size(capacity as i64, new_fp_rate) { - logging::log_warning("Failed to restore bloom object: Contains a filter larger than the max allowed size limit."); + let curr_filter_size = BloomFilter::compute_size(capacity as i64, new_fp_rate); + let curr_object_size = BloomObject::compute_size(filters.capacity()) + + filters_memory_usage + + curr_filter_size; + if !BloomObject::validate_size(curr_object_size) { + logging::log_warning( + "Failed to restore bloom object: Object larger than the allowed memory limit.", + ); return None; } + filters_memory_usage += curr_filter_size; // Only load num_items when it's the last filter let num_items = if i == num_filters - 1 { match raw::load_unsigned(rdb) { @@ -118,7 +127,7 @@ impl ValkeyDataType for BloomFilterType { } filters.push(Box::new(filter)); } - let item = BloomFilterType::from_existing( + let item = BloomObject::from_existing( expansion as u32, fp_rate, tightening_ratio, diff --git a/src/bloom/utils.rs b/src/bloom/utils.rs index 9971a14..298a8e9 100644 --- a/src/bloom/utils.rs +++ b/src/bloom/utils.rs @@ -1,4 +1,4 @@ -use super::data_type::BLOOM_TYPE_VERSION; +use super::data_type::BLOOM_OBJECT_VERSION; use crate::{ configs::{ self, BLOOM_EXPANSION_MAX, BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN, @@ -15,7 +15,7 @@ use std::sync::atomic::Ordering; pub const ADD_EVENT: &str = "bloom.add"; pub const RESERVE_EVENT: &str = "bloom.reserve"; -/// Errors +/// Client Errors pub const ERROR: &str = "ERROR"; pub const NON_SCALING_FILTER_FULL: &str = "ERR non scaling filter is full"; pub const NOT_FOUND: &str = "ERR not found"; @@ -26,18 +26,18 @@ pub const BAD_EXPANSION: &str = "ERR bad expansion"; pub const BAD_CAPACITY: &str = "ERR bad capacity"; pub const BAD_ERROR_RATE: &str = "ERR bad error rate"; pub const ERROR_RATE_RANGE: &str = "ERR (0 < error rate range < 1)"; -pub const BAD_ERROR_RATIO: &str = "ERR bad error ratio"; -pub const ERROR_RATIO_RANGE: &str = "ERR (0 < error ratio range < 1)"; +pub const BAD_TIGHTENING_RATIO: &str = "ERR bad tightening ratio"; +pub const TIGHTENING_RATIO_RANGE: &str = "ERR (0 < tightening ratio range < 1)"; pub const CAPACITY_LARGER_THAN_0: &str = "ERR (capacity should be larger than 0)"; pub const MAX_NUM_SCALING_FILTERS: &str = "ERR bloom object reached max number of filters"; pub const UNKNOWN_ARGUMENT: &str = "ERR unknown argument received"; -pub const EXCEEDS_MAX_BLOOM_SIZE: &str = - "ERR operation results in filter allocation exceeding size limit"; +pub const EXCEEDS_MAX_BLOOM_SIZE: &str = "ERR operation exceeds bloom object memory limit"; pub const KEY_EXISTS: &str = "BUSYKEY Target key name already exists."; -pub const ENCODE_BLOOM_FILTER_FAILED: &str = "ERR encode bloom filter failed."; -pub const DECODE_BLOOM_FILTER_FAILED: &str = "ERR decode bloom filter failed."; +pub const DECODE_BLOOM_OBJECT_FAILED: &str = "ERR bloom object decoding failed"; pub const DECODE_UNSUPPORTED_VERSION: &str = - "ERR decode bloom filter failed. Unsupported version."; + "ERR bloom object decoding failed. Unsupported version"; +/// Logging Error messages +pub const ENCODE_BLOOM_OBJECT_FAILED: &str = "Failed to encode bloom object."; #[derive(Debug, PartialEq)] pub enum BloomError { @@ -57,8 +57,8 @@ impl BloomError { BloomError::NonScalingFilterFull => NON_SCALING_FILTER_FULL, BloomError::MaxNumScalingFilters => MAX_NUM_SCALING_FILTERS, BloomError::ExceedsMaxBloomSize => EXCEEDS_MAX_BLOOM_SIZE, - BloomError::EncodeBloomFilterFailed => ENCODE_BLOOM_FILTER_FAILED, - BloomError::DecodeBloomFilterFailed => DECODE_BLOOM_FILTER_FAILED, + BloomError::EncodeBloomFilterFailed => ENCODE_BLOOM_OBJECT_FAILED, + BloomError::DecodeBloomFilterFailed => DECODE_BLOOM_OBJECT_FAILED, BloomError::DecodeUnsupportedVersion => DECODE_UNSUPPORTED_VERSION, BloomError::ErrorRateRange => ERROR_RATE_RANGE, BloomError::BadExpansion => BAD_EXPANSION, @@ -66,12 +66,12 @@ impl BloomError { } } -/// The BloomFilterType structure. 40 bytes. -/// Can contain one or more filters. -/// This is a generic top level structure which is not coupled to any bloom crate. +/// The BloomObject structure which implements either scaling / non scaling bloom filters. +/// Can contain one (non scaling) or more (scaling) filters. +/// This is a generic top level structure which is not coupled to any bloom Rust crate / library. #[derive(Serialize, Deserialize)] #[allow(clippy::vec_box)] -pub struct BloomFilterType { +pub struct BloomObject { expansion: u32, fp_rate: f64, tightening_ratio: f64, @@ -79,8 +79,8 @@ pub struct BloomFilterType { filters: Vec>, } -impl BloomFilterType { - /// Create a new BloomFilterType object. +impl BloomObject { + /// Create a new BloomObject object. pub fn new_reserved( fp_rate: f64, tightening_ratio: f64, @@ -88,13 +88,13 @@ impl BloomFilterType { expansion: u32, seed: (Option<[u8; 32]>, bool), validate_size_limit: bool, - ) -> Result { - // Reject the request, if the operation will result in creation of a bloom object containing a filter + ) -> Result { + // Reject the request, if the operation will result in creation of a bloom object // of size greater than what is allowed. - if validate_size_limit && !BloomFilter::validate_size(capacity, fp_rate) { + if validate_size_limit && !BloomObject::validate_size_before_create(capacity, fp_rate) { return Err(BloomError::ExceedsMaxBloomSize); } - // Create the bloom filter and add to the main BloomFilter object. + // Create the bloom filter and add to the main Bloom object. let is_seed_random; let bloom = match seed { (None, _) => { @@ -107,45 +107,45 @@ impl BloomFilterType { } }; let filters = vec![bloom]; - let bloom = BloomFilterType { + let bloom = BloomObject { expansion, fp_rate, tightening_ratio, filters, is_seed_random, }; - bloom.bloom_filter_type_incr_metrics_on_new_create(); + bloom.bloom_object_incr_metrics_on_new_create(); Ok(bloom) } - /// Create a BloomFilterType object from existing data (RDB Load / Restore). + /// Create a BloomObject from existing data (RDB Load / Restore). pub fn from_existing( expansion: u32, fp_rate: f64, tightening_ratio: f64, is_seed_random: bool, filters: Vec>, - ) -> BloomFilterType { - let bloom = BloomFilterType { + ) -> BloomObject { + let bloom = BloomObject { expansion, fp_rate, tightening_ratio, is_seed_random, filters, }; - bloom.bloom_filter_type_incr_metrics_on_new_create(); + bloom.bloom_object_incr_metrics_on_new_create(); bloom } - /// Create a new BloomFilterType object from an existing one (COPY). - pub fn create_copy_from(from_bf: &BloomFilterType) -> BloomFilterType { + /// Create a new BloomObject from an existing one (COPY). + pub fn create_copy_from(from_bf: &BloomObject) -> BloomObject { let mut filters: Vec> = Vec::with_capacity(from_bf.filters.capacity()); for filter in &from_bf.filters { let new_filter = Box::new(BloomFilter::create_copy_from(filter)); filters.push(new_filter); } - from_bf.bloom_filter_type_incr_metrics_on_new_create(); - BloomFilterType { + from_bf.bloom_object_incr_metrics_on_new_create(); + BloomObject { expansion: from_bf.expansion, fp_rate: from_bf.fp_rate, tightening_ratio: from_bf.tightening_ratio, @@ -154,15 +154,53 @@ impl BloomFilterType { } } - /// Return the total memory usage of the BloomFilterType object and every allocation it contains. + /// Return the total memory usage of the BloomObject and every allocation it contains. pub fn memory_usage(&self) -> usize { - let mut mem: usize = self.bloom_filter_type_memory_usage(); + let mut mem: usize = self.bloom_object_memory_usage(); for filter in &self.filters { mem += filter.number_of_bytes(); } mem } + /// Calculates the memory usage of the BloomObject structure (not its nested allocations). + fn bloom_object_memory_usage(&self) -> usize { + BloomObject::compute_size(self.filters.capacity()) + } + + /// Calculates the memory usage of the BloomObject structure (not its nested allocations). Used when `self` is unavailable. + pub fn compute_size(filters_vec_capacity: usize) -> usize { + std::mem::size_of::() + + (filters_vec_capacity * std::mem::size_of::>()) + } + + /// Caculates the number of bytes that the bloom object will require to be allocated. + /// This is used when scaling out a bloom object to check if the new + /// size will be within the allowed size limit. + /// Returns whether the bloom object is of a valid size or not. + fn validate_size_before_scaling(&self, capacity: i64, fp_rate: f64) -> bool { + let bytes = self.memory_usage() + BloomFilter::compute_size(capacity, fp_rate); + BloomObject::validate_size(bytes) + } + + /// Caculates the number of bytes that the bloom object will require to be allocated. + /// This is used when creating a new bloom object to check if the size is within the allowed size limit. + /// Returns whether the bloom object is of a valid size or not. + fn validate_size_before_create(capacity: i64, fp_rate: f64) -> bool { + let bytes = std::mem::size_of::() + + std::mem::size_of::>() + + BloomFilter::compute_size(capacity, fp_rate); + BloomObject::validate_size(bytes) + } + + /// Returns whether the bloom object is of a valid size or not. + pub fn validate_size(bytes: usize) -> bool { + if bytes > configs::BLOOM_MEMORY_LIMIT_PER_OBJECT.load(Ordering::Relaxed) as usize { + return false; + } + true + } + /// Returns the Bloom object's free_effort. /// We return 1 if there are no filters (BF.RESERVE) or if there is 1 filter. /// Else, we return the number of filters as the free_effort. @@ -176,7 +214,7 @@ impl BloomFilterType { self.filters.iter().any(|filter| filter.check(item)) } - /// Return a count of number of items added to all sub filters in the BloomFilterType object. + /// Return a count of number of items added to all sub filters in the BloomObject structure. pub fn cardinality(&self) -> i64 { let mut cardinality: i64 = 0; for filter in &self.filters { @@ -185,7 +223,7 @@ impl BloomFilterType { cardinality } - /// Return a total capacity summed across all sub filters in the BloomFilterType object. + /// Return a total capacity summed across all sub filters in the BloomObject structure. pub fn capacity(&self) -> i64 { let mut capacity: i64 = 0; // Check if item exists already. @@ -239,7 +277,7 @@ impl BloomFilterType { &mut self.filters } - /// Add an item to the BloomFilterType object. + /// Add an item to the BloomObject structure. /// If scaling is enabled, this can result in a new sub filter creation. pub fn add_item(&mut self, item: &[u8], validate_size_limit: bool) -> Result { // Check if item exists already. @@ -260,7 +298,7 @@ impl BloomFilterType { if self.expansion == 0 { return Err(BloomError::NonScalingFilterFull); } - if num_filters == configs::MAX_FILTERS_PER_OBJ { + if num_filters == configs::BLOOM_NUM_FILTERS_PER_OBJECT_LIMIT_MAX { return Err(BloomError::MaxNumScalingFilters); } // Scale out by adding a new filter with capacity bounded within the u32 range. false positive rate is also @@ -278,7 +316,8 @@ impl BloomFilterType { } }; // Reject the request, if the operation will result in creation of a filter of size greater than what is allowed. - if validate_size_limit && !BloomFilter::validate_size(new_capacity, new_fp_rate) { + if validate_size_limit && !self.validate_size_before_scaling(new_capacity, new_fp_rate) + { return Err(BloomError::ExceedsMaxBloomSize); } let seed = self.seed(); @@ -287,13 +326,13 @@ impl BloomFilterType { new_capacity, &seed, )); - let memory_usage_before: usize = self.bloom_filter_type_memory_usage(); + let memory_usage_before: usize = self.bloom_object_memory_usage(); // Add item. new_filter.set(item); new_filter.num_items += 1; self.filters.push(new_filter); // If we went over capacity and scaled the vec out we need to update the memory usage by the new capacity - let memory_usage_after = self.bloom_filter_type_memory_usage(); + let memory_usage_after = self.bloom_object_memory_usage(); metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add( memory_usage_after - memory_usage_before, @@ -306,18 +345,12 @@ impl BloomFilterType { Ok(0) } - /// Calculates the memory usage of a BloomFilterType object - fn bloom_filter_type_memory_usage(&self) -> usize { - std::mem::size_of::() - + (self.filters.capacity() * std::mem::size_of::>()) - } - /// Serializes bloomFilter to a byte array. - pub fn encode_bloom_filter(&self) -> Result, BloomError> { + pub fn encode_object(&self) -> Result, BloomError> { match bincode::serialize(self) { Ok(vec) => { let mut final_vec = Vec::with_capacity(1 + vec.len()); - final_vec.push(BLOOM_TYPE_VERSION); + final_vec.push(BLOOM_OBJECT_VERSION); final_vec.extend(vec); Ok(final_vec) } @@ -338,28 +371,28 @@ impl BloomFilterType { } /// Increments metrics related to Bloom filter memory usage upon creation of a new filter. - fn bloom_filter_type_incr_metrics_on_new_create(&self) { + fn bloom_object_incr_metrics_on_new_create(&self) { metrics::BLOOM_NUM_OBJECTS.fetch_add(1, std::sync::atomic::Ordering::Relaxed); metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add( - self.bloom_filter_type_memory_usage(), + self.bloom_object_memory_usage(), std::sync::atomic::Ordering::Relaxed, ); } /// Deserialize a byte array to bloom filter. - /// We will need to handle any current or previous version and deserializing the bytes into a bloom object of the running Module's current version `BLOOM_TYPE_VERSION`. - pub fn decode_bloom_filter( + /// We will need to handle any current or previous version and deserializing the bytes into a bloom object of the running Module's current version `BLOOM_OBJECT_VERSION`. + pub fn decode_object( decoded_bytes: &[u8], validate_size_limit: bool, - ) -> Result { + ) -> Result { if decoded_bytes.is_empty() { return Err(BloomError::DecodeBloomFilterFailed); } let version = decoded_bytes[0]; match version { 1 => { - // always use new version to init bloomFilterType. + // Always use new version to initialize a BloomObject. // This is to ensure that the new fields can be recognized when the object is serialized and deserialized in the future. let (expansion, fp_rate, tightening_ratio, is_seed_random, filters): ( u32, @@ -371,6 +404,14 @@ impl BloomFilterType { &decoded_bytes[1..], ) { Ok(values) => { + // Add individual bloom filter metrics. + for filter in &values.4 { + metrics::BLOOM_NUM_ITEMS_ACROSS_OBJECTS.fetch_add( + filter.num_items as u64, + std::sync::atomic::Ordering::Relaxed, + ); + filter.bloom_filter_incr_metrics_on_new_create(); + } // Expansion ratio can range from 0 to BLOOM_EXPANSION_MAX as we internally set this to 0 // in case of non scaling filters. if !(0..=BLOOM_EXPANSION_MAX).contains(&values.0) { @@ -384,43 +425,31 @@ impl BloomFilterType { { return Err(BloomError::ErrorRateRange); } - if values.4.len() >= configs::MAX_FILTERS_PER_OBJ as usize { + if values.4.len() + >= configs::BLOOM_NUM_FILTERS_PER_OBJECT_LIMIT_MAX as usize + { return Err(BloomError::MaxNumScalingFilters); } - for _filter in values.4.iter() { - // Reject the request, if the operation will result in creation of a filter of size greater than what is allowed. - if validate_size_limit - && _filter.number_of_bytes() - > configs::BLOOM_MEMORY_LIMIT_PER_FILTER.load(Ordering::Relaxed) - as usize - { - return Err(BloomError::ExceedsMaxBloomSize); - } - } values } Err(_) => { return Err(BloomError::DecodeBloomFilterFailed); } }; - let item = BloomFilterType { + let item = BloomObject { expansion, fp_rate, tightening_ratio, is_seed_random, filters, }; - // add bloom filter type metrics. - item.bloom_filter_type_incr_metrics_on_new_create(); - // add bloom filter metrics. - - for filter in &item.filters { - metrics::BLOOM_NUM_ITEMS_ACROSS_OBJECTS.fetch_add( - filter.num_items as u64, - std::sync::atomic::Ordering::Relaxed, - ); - filter.bloom_filter_incr_metrics_on_new_create(); + let bytes = item.memory_usage(); + // Reject the request, if the operation will result in creation of a bloom object of size greater than what is allowed. + if validate_size_limit && !BloomObject::validate_size(bytes) { + return Err(BloomError::ExceedsMaxBloomSize); } + // Add overall bloom object metrics. + item.bloom_object_incr_metrics_on_new_create(); Ok(item) } _ => Err(BloomError::DecodeUnsupportedVersion), @@ -543,18 +572,8 @@ impl BloomFilter { + (self.bloom.len() / 8) as usize } - /// This is used before actually creating the bloom filter when checking if the filter is within the allowed size. - /// Returns whether the bloom filter is of a valid size or not. - pub fn validate_size(capacity: i64, fp_rate: f64) -> bool { - let bytes = Self::compute_size(capacity, fp_rate); - if bytes > configs::BLOOM_MEMORY_LIMIT_PER_FILTER.load(Ordering::Relaxed) as usize { - return false; - } - true - } - /// Caculates the number of bytes that the bloom filter will require to be allocated. - fn compute_size(capacity: i64, fp_rate: f64) -> usize { + pub fn compute_size(capacity: i64, fp_rate: f64) -> usize { std::mem::size_of::() + std::mem::size_of::>() + bloomfilter::Bloom::<[u8]>::compute_bitmap_size(capacity as usize, fp_rate) @@ -569,10 +588,10 @@ impl BloomFilter { } } -impl Drop for BloomFilterType { +impl Drop for BloomObject { fn drop(&mut self) { metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_sub( - self.bloom_filter_type_memory_usage(), + self.bloom_object_memory_usage(), std::sync::atomic::Ordering::Relaxed, ); metrics::BLOOM_NUM_OBJECTS.fetch_sub(1, Ordering::Relaxed); @@ -613,7 +632,7 @@ mod tests { /// There is an option to pass in an expected error and assert that we throw that error /// Returns the number of errors (false positives) and the final item index. fn add_items_till_capacity( - bf: &mut BloomFilterType, + bf: &mut BloomObject, capacity_needed: i64, starting_item_idx: i64, rand_prefix: &String, @@ -662,7 +681,7 @@ mod tests { /// Asserts that the error_count is within the expected false positive (+ margin) rate. /// Returns the error count and number of operations performed. fn check_items_exist( - bf: &BloomFilterType, + bf: &BloomObject, start_idx: i64, end_idx: i64, expected_result: bool, @@ -692,81 +711,74 @@ mod tests { } fn verify_restored_items( - original_bloom_filter_type: &BloomFilterType, - restored_bloom_filter_type: &BloomFilterType, + original_bloom_object: &BloomObject, + restored_bloom_object: &BloomObject, add_operation_idx: i64, expected_fp_rate: f64, fp_margin: f64, rand_prefix: &String, ) { - let is_seed_random = original_bloom_filter_type.is_seed_random; + let is_seed_random = original_bloom_object.is_seed_random; assert_eq!( - restored_bloom_filter_type.is_seed_random, - original_bloom_filter_type.is_seed_random + restored_bloom_object.is_seed_random, + original_bloom_object.is_seed_random ); - let original_filter_seed = original_bloom_filter_type.filters.first().unwrap().seed(); - assert_eq!(original_filter_seed, original_bloom_filter_type.seed(),); + let original_filter_seed = original_bloom_object.filters.first().unwrap().seed(); + assert_eq!(original_filter_seed, original_bloom_object.seed(),); if is_seed_random { assert_ne!(original_filter_seed, configs::FIXED_SEED); - assert!(restored_bloom_filter_type - .filters - .iter() - .all(|restore_filter| original_bloom_filter_type - .filters - .iter() - .any(|filter| (filter.seed() == restore_filter.seed()) - && (restore_filter.seed() == original_filter_seed)))); + assert!(restored_bloom_object.filters.iter().all(|restore_filter| { + original_bloom_object.filters.iter().any(|filter| { + (filter.seed() == restore_filter.seed()) + && (restore_filter.seed() == original_filter_seed) + }) + })); } else { - assert!(restored_bloom_filter_type - .filters - .iter() - .all(|restore_filter| original_bloom_filter_type - .filters - .iter() - .any(|filter| (filter.seed() == restore_filter.seed()) - && (restore_filter.seed() == configs::FIXED_SEED)))); + assert!(restored_bloom_object.filters.iter().all(|restore_filter| { + original_bloom_object.filters.iter().any(|filter| { + (filter.seed() == restore_filter.seed()) + && (restore_filter.seed() == configs::FIXED_SEED) + }) + })); } + assert_eq!(restored_bloom_object.fp_rate, original_bloom_object.fp_rate); assert_eq!( - restored_bloom_filter_type.fp_rate, - original_bloom_filter_type.fp_rate - ); - assert_eq!( - restored_bloom_filter_type.tightening_ratio, - original_bloom_filter_type.tightening_ratio + restored_bloom_object.tightening_ratio, + original_bloom_object.tightening_ratio ); assert_eq!( - restored_bloom_filter_type.capacity(), - original_bloom_filter_type.capacity() + restored_bloom_object.capacity(), + original_bloom_object.capacity() ); assert_eq!( - restored_bloom_filter_type.cardinality(), - original_bloom_filter_type.cardinality(), + restored_bloom_object.cardinality(), + original_bloom_object.cardinality(), ); assert_eq!( - restored_bloom_filter_type.free_effort(), - original_bloom_filter_type.free_effort() + restored_bloom_object.free_effort(), + original_bloom_object.free_effort() ); assert_eq!( - restored_bloom_filter_type.memory_usage(), - original_bloom_filter_type.memory_usage() + restored_bloom_object.memory_usage(), + original_bloom_object.memory_usage() ); - assert!(restored_bloom_filter_type + assert!(restored_bloom_object .filters .iter() - .all(|restore_filter| original_bloom_filter_type + .all(|restore_filter| original_bloom_object .filters .iter() .any(|filter| filter.bloom.number_of_hash_functions() == restore_filter.bloom.number_of_hash_functions()))); - assert!(restored_bloom_filter_type + assert!(restored_bloom_object .filters .iter() - .all(|restore_filter| original_bloom_filter_type + .all(|restore_filter| original_bloom_object .filters .iter() .any(|filter| filter.bloom.as_slice() == restore_filter.bloom.as_slice()))); let (error_count, _) = check_items_exist( - restored_bloom_filter_type, + restored_bloom_object, 1, add_operation_idx, true, @@ -774,7 +786,7 @@ mod tests { ); assert!(error_count == 0); let (error_count, num_operations) = check_items_exist( - restored_bloom_filter_type, + restored_bloom_object, add_operation_idx + 1, add_operation_idx * 2, false, @@ -797,7 +809,7 @@ mod tests { // Expansion of 0 indicates non scaling. let expansion = 0; // Validate the non scaling behavior of the bloom filter. - let mut bf = BloomFilterType::new_reserved( + let mut bf = BloomObject::new_reserved( expected_fp_rate, tightening_ratio, initial_capacity, @@ -840,7 +852,7 @@ mod tests { // Validate that the real fp_rate is not much more than the configured fp_rate. fp_assert(error_count, num_operations, expected_fp_rate, fp_margin); // Verify restore - let mut restore_bf = BloomFilterType::create_copy_from(&bf); + let mut restore_bf = BloomObject::create_copy_from(&bf); assert_eq!( restore_bf.add_item(b"new_item", true), Err(BloomError::NonScalingFilterFull) @@ -868,7 +880,7 @@ mod tests { let initial_capacity = 10000; let expansion = 2; let num_filters_to_scale = 5; - let mut bf = BloomFilterType::new_reserved( + let mut bf = BloomObject::new_reserved( expected_fp_rate, tightening_ratio, initial_capacity, @@ -924,10 +936,10 @@ mod tests { // Validate that the real fp_rate is not much more than the configured fp_rate. fp_assert(error_count, num_operations, expected_fp_rate, fp_margin); // Verify restore - let restore_bloom_filter_type = BloomFilterType::create_copy_from(&bf); + let restore_bloom_object = BloomObject::create_copy_from(&bf); verify_restored_items( &bf, - &restore_bloom_filter_type, + &restore_bloom_object, add_operation_idx, expected_fp_rate, fp_margin, @@ -952,36 +964,32 @@ mod tests { fn test_exceeded_size_limit() { // Validate that bloom filter allocations within bloom objects are rejected if their memory usage would be beyond // the configured limit. - let result = - BloomFilterType::new_reserved(0.5_f64, 0.5_f64, i64::MAX, 1, (None, true), true); + let result = BloomObject::new_reserved(0.5_f64, 0.5_f64, i64::MAX, 1, (None, true), true); assert_eq!(result.err(), Some(BloomError::ExceedsMaxBloomSize)); - let capacity = 50000000; - assert!(!BloomFilter::validate_size(capacity, 0.001_f64)); + let capacity = 76000000; + // With the capacity and fp rate, the memory usage will be roughly 130MB which is greater than the allowed limit. + assert!(!BloomObject::validate_size_before_create( + capacity, 0.001_f64 + )); let result2 = - BloomFilterType::new_reserved(0.001_f64, 0.5_f64, capacity, 1, (None, true), true); + BloomObject::new_reserved(0.001_f64, 0.5_f64, capacity, 1, (None, true), true); assert_eq!(result2.err(), Some(BloomError::ExceedsMaxBloomSize)); } #[rstest(expansion, case::nonscaling(0), case::scaling(2))] fn test_bf_encode_and_decode(expansion: u32) { - let mut bf = BloomFilterType::new_reserved( - 0.5_f64, - 0.5_f64, - 1000_i64, - expansion, - (None, true), - true, - ) - .unwrap(); + let mut bf = + BloomObject::new_reserved(0.5_f64, 0.5_f64, 1000_i64, expansion, (None, true), true) + .unwrap(); let item = "item1"; let _ = bf.add_item(item.as_bytes(), true); // action - let encoder_result = bf.encode_bloom_filter(); + let encoder_result = bf.encode_object(); // assert encode success assert!(encoder_result.is_ok()); let vec = encoder_result.unwrap(); // assert decode success: - let new_bf_result = BloomFilterType::decode_bloom_filter(&vec, true); + let new_bf_result = BloomObject::decode_object(&vec, true); let new_bf = new_bf_result.unwrap(); // verify new_bf and bf assert_eq!(bf.fp_rate, new_bf.fp_rate); @@ -996,12 +1004,11 @@ mod tests { fn test_bf_decode_when_unsupported_version_should_failed() { // arrange: prepare bloom filter let mut bf = - BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_i64, 2, (None, true), true) - .unwrap(); + BloomObject::new_reserved(0.5_f64, 0.5_f64, 1000_i64, 2, (None, true), true).unwrap(); let key = "key"; let _ = bf.add_item(key.as_bytes(), true).unwrap(); - let encoder_result = bf.encode_bloom_filter(); + let encoder_result = bf.encode_object(); assert!(encoder_result.is_ok()); // 1. unsupport version should return error @@ -1011,7 +1018,7 @@ mod tests { // assert decode: // should return error assert_eq!( - BloomFilterType::decode_bloom_filter(&vec, true).err(), + BloomObject::decode_object(&vec, true).err(), Some(BloomError::DecodeUnsupportedVersion) ); } @@ -1020,12 +1027,11 @@ mod tests { fn test_bf_decode_when_bytes_is_empty_should_failed() { // arrange: prepare bloom filter let mut bf = - BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_i64, 2, (None, true), true) - .unwrap(); + BloomObject::new_reserved(0.5_f64, 0.5_f64, 1000_i64, 2, (None, true), true).unwrap(); let key = "key"; let _ = bf.add_item(key.as_bytes(), true); - let encoder_result = bf.encode_bloom_filter(); + let encoder_result = bf.encode_object(); assert!(encoder_result.is_ok()); // 1. empty vec should return error @@ -1033,7 +1039,7 @@ mod tests { // assert decode: // should return error assert_eq!( - BloomFilterType::decode_bloom_filter(&vec, true).err(), + BloomObject::decode_object(&vec, true).err(), Some(BloomError::DecodeBloomFilterFailed) ); } @@ -1042,30 +1048,29 @@ mod tests { fn test_bf_decode_when_bytes_is_exceed_limit_should_failed() { // arrange: prepare bloom filter let mut bf = - BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_i64, 2, (None, true), true) - .unwrap(); + BloomObject::new_reserved(0.5_f64, 0.5_f64, 1000_i64, 2, (None, true), true).unwrap(); let key = "key"; let _ = bf.add_item(key.as_bytes(), true); let origin_fp_rate = bf.fp_rate; // unsupport fp_rate bf.fp_rate = -0.5; - let vec = bf.encode_bloom_filter().unwrap(); + let vec = bf.encode_object().unwrap(); // should return error assert_eq!( - BloomFilterType::decode_bloom_filter(&vec, true).err(), + BloomObject::decode_object(&vec, true).err(), Some(BloomError::ErrorRateRange) ); bf.fp_rate = origin_fp_rate; // build a larger than 64mb filter let extra_large_filter = - BloomFilterType::new_reserved(0.01_f64, 0.5_f64, 57000000, 2, (None, true), false) + BloomObject::new_reserved(0.01_f64, 0.5_f64, 114000000, 2, (None, true), false) .unwrap(); - let vec = extra_large_filter.encode_bloom_filter().unwrap(); + let vec = extra_large_filter.encode_object().unwrap(); // should return error assert_eq!( - BloomFilterType::decode_bloom_filter(&vec, true).err(), + BloomObject::decode_object(&vec, true).err(), Some(BloomError::ExceedsMaxBloomSize) ); } diff --git a/src/configs.rs b/src/configs.rs index 6936fff..2e2e233 100644 --- a/src/configs.rs +++ b/src/configs.rs @@ -29,18 +29,19 @@ pub const BLOOM_TIGHTENING_RATIO_MAX: f64 = 1.0; pub const BLOOM_USE_RANDOM_SEED_DEFAULT: bool = true; pub const BLOOM_DEFRAG_DEAFULT: bool = true; -// Max Memory usage allowed per bloom filter within a bloom object (64MB). -// Beyond this threshold, a bloom object is classified as large and is exempt from defrag operations. -// Also, write operations that result in bloom object allocation larger than this size will be rejected. -pub const BLOOM_MEMORY_LIMIT_PER_FILTER_DEFAULT: i64 = 64 * 1024 * 1024; -pub const BLOOM_MEMORY_LIMIT_PER_FILTER_MIN: i64 = 0; -pub const BLOOM_MEMORY_LIMIT_PER_FILTER_MAX: i64 = i64::MAX; + +// Max Memory usage allowed overall within a bloom object (128MB). +// Beyond this threshold, a bloom object is classified as large. +// Write operations that result in bloom object allocation larger than this size will be rejected. +pub const BLOOM_MEMORY_LIMIT_PER_OBJECT_DEFAULT: i64 = 128 * 1024 * 1024; +pub const BLOOM_MEMORY_LIMIT_PER_OBJECT_MIN: i64 = 0; +pub const BLOOM_MEMORY_LIMIT_PER_OBJECT_MAX: i64 = i64::MAX; lazy_static! { pub static ref BLOOM_CAPACITY: AtomicI64 = AtomicI64::new(BLOOM_CAPACITY_DEFAULT); pub static ref BLOOM_EXPANSION: AtomicI64 = AtomicI64::new(BLOOM_EXPANSION_DEFAULT); - pub static ref BLOOM_MEMORY_LIMIT_PER_FILTER: AtomicI64 = - AtomicI64::new(BLOOM_MEMORY_LIMIT_PER_FILTER_DEFAULT); + pub static ref BLOOM_MEMORY_LIMIT_PER_OBJECT: AtomicI64 = + AtomicI64::new(BLOOM_MEMORY_LIMIT_PER_OBJECT_DEFAULT); pub static ref BLOOM_USE_RANDOM_SEED: AtomicBool = AtomicBool::default(); pub static ref BLOOM_DEFRAG: AtomicBool = AtomicBool::new(BLOOM_DEFRAG_DEAFULT); pub static ref BLOOM_FP_RATE_F64: Mutex = Mutex::new( @@ -61,7 +62,7 @@ lazy_static! { /// Constants // Max number of filters allowed within a bloom object. -pub const MAX_FILTERS_PER_OBJ: i32 = i32::MAX; +pub const BLOOM_NUM_FILTERS_PER_OBJECT_LIMIT_MAX: i32 = i32::MAX; /// Below constants are fixed seed and sip keys to help create bloom objects using the same seed and to restore the bloom objects with the same hasher which /// generated using rust crate bloomfilter https://crates.io/crates/bloomfilter pub const FIXED_SEED: [u8; 32] = [ @@ -96,7 +97,7 @@ pub fn on_string_config_set( } "bloom-tightening-ratio" => { if !(BLOOM_TIGHTENING_RATIO_MIN..BLOOM_TIGHTENING_RATIO_MAX).contains(&value) { - return Err(ValkeyError::Str(utils::ERROR_RATIO_RANGE)); + return Err(ValkeyError::Str(utils::TIGHTENING_RATIO_RANGE)); } let mut tightening = BLOOM_TIGHTENING_F64 .lock() diff --git a/src/lib.rs b/src/lib.rs index cf5eb64..037a265 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,7 +8,7 @@ pub mod configs; pub mod metrics; pub mod wrapper; use crate::bloom::command_handler; -use crate::bloom::data_type::BLOOM_FILTER_TYPE; +use crate::bloom::data_type::BLOOM_TYPE; use valkey_module_macros::info_command_handler; pub const MODULE_NAME: &str = "bf"; @@ -83,7 +83,7 @@ valkey_module! { version: 1, allocator: (valkey_module::alloc::ValkeyAlloc, valkey_module::alloc::ValkeyAlloc), data_types: [ - BLOOM_FILTER_TYPE, + BLOOM_TYPE, ], init: initialize, deinit: deinitialize, @@ -105,7 +105,7 @@ valkey_module! { i64: [ ["bloom-capacity", &*configs::BLOOM_CAPACITY, configs::BLOOM_CAPACITY_DEFAULT, configs::BLOOM_CAPACITY_MIN, configs::BLOOM_CAPACITY_MAX, ConfigurationFlags::DEFAULT, None], ["bloom-expansion", &*configs::BLOOM_EXPANSION, configs::BLOOM_EXPANSION_DEFAULT, configs::BLOOM_EXPANSION_MIN as i64, configs::BLOOM_EXPANSION_MAX as i64, ConfigurationFlags::DEFAULT, None], - ["bloom-memory-limit-per-filter", &*configs::BLOOM_MEMORY_LIMIT_PER_FILTER, configs::BLOOM_MEMORY_LIMIT_PER_FILTER_DEFAULT, configs::BLOOM_MEMORY_LIMIT_PER_FILTER_MIN, configs::BLOOM_MEMORY_LIMIT_PER_FILTER_MAX, ConfigurationFlags::DEFAULT, None], + ["bloom-memory-usage-limit", &*configs::BLOOM_MEMORY_LIMIT_PER_OBJECT, configs::BLOOM_MEMORY_LIMIT_PER_OBJECT_DEFAULT, configs::BLOOM_MEMORY_LIMIT_PER_OBJECT_MIN, configs::BLOOM_MEMORY_LIMIT_PER_OBJECT_MAX, ConfigurationFlags::DEFAULT, None], ], string: [ ["bloom-fp-rate", &*configs::BLOOM_FP_RATE, configs::BLOOM_FP_RATE_DEFAULT, ConfigurationFlags::DEFAULT, None, Some(Box::new(configs::on_string_config_set))], diff --git a/src/wrapper/bloom_callback.rs b/src/wrapper/bloom_callback.rs index 4f716bb..b21ab4c 100644 --- a/src/wrapper/bloom_callback.rs +++ b/src/wrapper/bloom_callback.rs @@ -1,7 +1,7 @@ use crate::bloom; use crate::bloom::data_type::ValkeyDataType; use crate::bloom::utils::BloomFilter; -use crate::bloom::utils::BloomFilterType; +use crate::bloom::utils::BloomObject; use crate::configs; use crate::metrics; use crate::wrapper::digest::Digest; @@ -26,7 +26,7 @@ use super::defrag::Defrag; /// # Safety pub unsafe extern "C" fn bloom_rdb_save(rdb: *mut raw::RedisModuleIO, value: *mut c_void) { - let v = &*value.cast::(); + let v = &*value.cast::(); raw::save_unsigned(rdb, v.num_filters() as u64); raw::save_unsigned(rdb, v.expansion() as u64); raw::save_double(rdb, v.fp_rate()); @@ -55,7 +55,7 @@ pub unsafe extern "C" fn bloom_rdb_load( rdb: *mut raw::RedisModuleIO, encver: c_int, ) -> *mut c_void { - if let Some(item) = ::load_from_rdb(rdb, encver) { + if let Some(item) = ::load_from_rdb(rdb, encver) { let bb = Box::new(item); Box::into_raw(bb).cast::() } else { @@ -70,15 +70,11 @@ pub unsafe extern "C" fn bloom_aof_rewrite( key: *mut raw::RedisModuleString, value: *mut c_void, ) { - let filter = &*value.cast::(); - let hex = match filter.encode_bloom_filter() { + let filter = &*value.cast::(); + let hex = match filter.encode_object() { Ok(val) => val, Err(err) => { - log_io_error( - aof, - ValkeyLogLevel::Warning, - &format!("encode bloom filter failed. {}", err.as_str()), - ); + log_io_error(aof, ValkeyLogLevel::Warning, err.as_str()); return; } }; @@ -107,13 +103,13 @@ pub unsafe extern "C" fn bloom_aux_load( /// # Safety /// Free a bloom object pub unsafe extern "C" fn bloom_free(value: *mut c_void) { - drop(Box::from_raw(value.cast::())); + drop(Box::from_raw(value.cast::())); } /// # Safety /// Compute the memory usage for a bloom object. pub unsafe extern "C" fn bloom_mem_usage(value: *const c_void) -> usize { - let item = &*value.cast::(); + let item = &*value.cast::(); item.memory_usage() } @@ -124,8 +120,8 @@ pub unsafe extern "C" fn bloom_copy( _to_key: *mut RedisModuleString, value: *const c_void, ) -> *mut c_void { - let curr_item = &*value.cast::(); - let new_item = BloomFilterType::create_copy_from(curr_item); + let curr_item = &*value.cast::(); + let new_item = BloomObject::create_copy_from(curr_item); let bb = Box::new(new_item); Box::into_raw(bb).cast::() } @@ -134,7 +130,7 @@ pub unsafe extern "C" fn bloom_copy( /// Raw handler for the Bloom digest callback. pub unsafe extern "C" fn bloom_digest(md: *mut raw::RedisModuleDigest, value: *mut c_void) { let dig = Digest::new(md); - let val = &*(value.cast::()); + let val = &*(value.cast::()); val.debug_digest(dig); } @@ -144,17 +140,18 @@ pub unsafe extern "C" fn bloom_free_effort( _from_key: *mut RedisModuleString, value: *const c_void, ) -> usize { - let curr_item = &*value.cast::(); + let curr_item = &*value.cast::(); curr_item.free_effort() } -// Lazy static for a default temporary bloom that gets swapped during defrag. +// Lazy static for a default temporary external crate Bloom structure that gets swapped during defrag. lazy_static! { static ref DEFRAG_BLOOM_FILTER: Mutex>>> = Mutex::new(Some(Box::new(Bloom::<[u8]>::new(1, 1).unwrap()))); } -/// Defragments a vector of bytes. This function is designed to be used as a callback. +/// Defragments a vector of bytes (bit vector) of the external crate Bloom structure. This function is designed to be +/// used as a callback. /// /// This function takes ownership of a `Vec`, attempts to defragment it using an external /// defragmentation mechanism, and returns a new `Vec` that may have been defragmented. @@ -186,28 +183,29 @@ fn external_vec_defrag(vec: Vec) -> Vec { /// # Safety /// Raw handler for the Bloom object's defrag callback. /// -/// There are a few different structures we will be defragging we will explain them top down then afterwards state the order in which -/// we will defrag. Starting from the top which is passed in as the variable named value. We have the BloomFilterType this BloomFilterType -/// contains a vec of BloomFilters. These BloomFilters then each have a Bloom object. Finally each of these Bloom objects have a Vec. +/// We will be defragging every allocation of the Bloom data type. We will explain them top down, then afterwards state the order in which +/// we will defrag. Starting from the top, which is passed in as the variable named `value`, we have the BloomObject. This BloomObject +/// contains a vec of BloomFilter structs. Each BloomFilter contains a Bloom structure implemented in an external Rust crate. +/// Finally, each of these external Bloom structures contains a Vec (bit vector). /// -/// This order of defragmention is as follows (1 to 3 is in a loop for the number of filters): -/// 1. BloomFilter within the BloomFilterType -/// 2. Bloom objects within each BloomFilter -/// 3. Vec within each Bloom object -/// 4. Vec of BloomFilters in the BloomFilterType -/// 5. The BloomFilterType itself +/// The order of defragmention is as follows (1 to 3 is in a loop for the number of filters): +/// 1. BloomFilter structures within the top level BloomObject structure +/// 2. External Bloom structures within each BloomFilter +/// 3. Vec (Bit vector) within each external Bloom structure +/// 4. Vec of the BloomFilter/s in the BloomObject +/// 5. The BloomObject itself /// -/// We use a cursor to track the current filter of BloomFilterType that we are defragging. This cursor will start at 0 +/// We use a cursor to track the current filter of BloomObject that we are defragging. This cursor will start at 0 /// if we finished all the filters the last time we defragged this object or if we havent defragged it before. We will determine -/// that we have spent to much time on defragging this specific object from the should_stop_defrag() method. If we didn't defrag -/// all the filters then we set the cursor so we know where to start from the next time we defrag and return a 1 to show we didn't +/// that we have spent too much time on defragging this specific object from the should_stop_defrag() method. If we didn't defrag +/// all the filters, then we set the cursor so we know where to start from the next time we defrag and return 1 to show we didn't /// finish. /// /// # Arguments /// /// * `defrag_ctx` - A raw pointer to the defragmentation context. /// * `_from_key` - A raw pointer to the Redis module string (unused in this function). -/// * `value` - A mutable raw pointer to a raw pointer representing the BloomFilterType to be defragmented. +/// * `value` - A mutable raw pointer to a raw pointer representing the BloomObject to be defragmented. /// /// # Returns /// @@ -224,20 +222,20 @@ pub unsafe extern "C" fn bloom_defrag( return 0; } - // Get the cursor for the BloomFilterType otherwise start the cursor at 0 + // Get the cursor for the BloomObject otherwise start the cursor at 0 let defrag = Defrag::new(defrag_ctx); let mut cursor = defrag.get_cursor().unwrap_or(0); - // Convert pointer to BloomFilterType so we can operate on it. - let bloom_filter_type: &mut BloomFilterType = &mut *(*value).cast::(); + // Convert pointer to BloomObject so we can operate on it. + let bloom_object: &mut BloomObject = &mut *(*value).cast::(); - let num_filters = bloom_filter_type.num_filters(); - let filters_capacity = bloom_filter_type.filters().capacity(); + let num_filters = bloom_object.num_filters(); + let filters_capacity = bloom_object.filters().capacity(); // While we are within a timeframe decided from should_stop_defrag and not over the number of filters defrag the next filter while !defrag.should_stop_defrag() && cursor < num_filters as u64 { - // Remove the current filter, unbox it, and attempt to defragment. - let bloom_filter_box = bloom_filter_type.filters_mut().remove(cursor as usize); + // Remove the current BloomFilter, unbox it, and attempt to defragment the BloomFilter. + let bloom_filter_box = bloom_object.filters_mut().remove(cursor as usize); let bloom_filter = Box::into_raw(bloom_filter_box); let defrag_result = defrag.alloc(bloom_filter as *mut c_void); let mut defragged_filter = { @@ -249,7 +247,7 @@ pub unsafe extern "C" fn bloom_defrag( Box::from_raw(bloom_filter) } }; - // Swap the Bloom object with a temporary one for defragmentation + // Swap the external crate Bloom structure with a temporary one during its defragmentation. let mut temporary_bloom = DEFRAG_BLOOM_FILTER .lock() .expect("We expect default to exist"); @@ -260,7 +258,7 @@ pub unsafe extern "C" fn bloom_defrag( // Convert the inner_bloom into the correct type and then try to defragment it let inner_bloom_ptr = Box::into_raw(inner_bloom); let defragged_inner_bloom = defrag.alloc(inner_bloom_ptr as *mut c_void); - // Defragment the Vec within the Bloom object using the external callback + // Defragment the Bit Vec within the external crate Bloom structure using the external callback if !defragged_inner_bloom.is_null() { metrics::BLOOM_DEFRAG_HITS.fetch_add(1, std::sync::atomic::Ordering::Relaxed); @@ -283,7 +281,7 @@ pub unsafe extern "C" fn bloom_defrag( } // Reinsert the defragmented filter and increment the cursor - bloom_filter_type + bloom_object .filters_mut() .insert(cursor as usize, defragged_filter); cursor += 1; @@ -294,13 +292,13 @@ pub unsafe extern "C" fn bloom_defrag( if cursor < num_filters as u64 { return 1; } - // Defragment the Vec of filters itself - let filters_vec = mem::take(bloom_filter_type.filters_mut()); + // Defragment the Vec of BloomFilter/s itself + let filters_vec = mem::take(bloom_object.filters_mut()); let filters_ptr = Box::into_raw(filters_vec.into_boxed_slice()) as *mut c_void; let defragged_filters_ptr = defrag.alloc(filters_ptr); if !defragged_filters_ptr.is_null() { metrics::BLOOM_DEFRAG_HITS.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - *bloom_filter_type.filters_mut() = unsafe { + *bloom_object.filters_mut() = unsafe { Vec::from_raw_parts( defragged_filters_ptr as *mut Box, num_filters, @@ -309,7 +307,7 @@ pub unsafe extern "C" fn bloom_defrag( }; } else { metrics::BLOOM_DEFRAG_HITS.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - *bloom_filter_type.filters_mut() = unsafe { + *bloom_object.filters_mut() = unsafe { Vec::from_raw_parts( filters_ptr as *mut Box, num_filters, @@ -317,7 +315,7 @@ pub unsafe extern "C" fn bloom_defrag( ) }; } - // Finally, attempt to defragment the BloomFilterType itself + // Finally, attempt to defragment the BloomObject itself let val = defrag.alloc(*value); if !val.is_null() { metrics::BLOOM_DEFRAG_HITS.fetch_add(1, std::sync::atomic::Ordering::Relaxed); diff --git a/tests/test_basic.py b/tests/test_basic.py index bae86b4..9fd2975 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -63,8 +63,8 @@ def test_memory_usage_cmd(self): def test_too_large_bloom_obj(self): client = self.server.get_new_client() # Set the max allowed size per bloom filter per bloom object - assert client.execute_command('CONFIG SET bf.bloom-memory-limit-per-filter 100') == b'OK' - obj_exceeds_size_err = "operation results in filter allocation exceeding size limit" + assert client.execute_command('CONFIG SET bf.bloom-memory-usage-limit 100') == b'OK' + obj_exceeds_size_err = "operation exceeds bloom object memory limit" # Non Scaling # Validate that when a cmd would have resulted in a bloom object creation with the starting filter with size # greater than allowed limit, the cmd is rejected. @@ -79,7 +79,7 @@ def test_too_large_bloom_obj(self): # Scaling # Validate that when scaling would have resulted in a filter with size greater than allowed limit, the cmd # is rejected. - assert client.execute_command('CONFIG SET bf.bloom-memory-limit-per-filter 1000') == b'OK' + assert client.execute_command('CONFIG SET bf.bloom-memory-usage-limit 1000') == b'OK' cmds = [ 'BF.INSERT filter items new_item1', 'BF.ADD filter new_item1', @@ -350,8 +350,10 @@ def test_bloom_string_config_set(self): assert self.client.execute_command('CONFIG GET bf.bloom-fp-rate')[1] == b'0.1' assert self.client.execute_command('CONFIG GET bf.bloom-tightening-ratio')[1] == b'0.75' try: - assert self.client.execute_command('CONFIG SET bf.bloom-fp-rate 1.1') == b'ERR (0 < error rate range < 1)'\ - and self.client.execute_command('CONFIG SET bf.bloom-tightening-ratio 1.75') == b'ERR (0 < error ratio range < 1)' + assert self.client.execute_command('CONFIG SET bf.bloom-fp-rate 1.1') == b'ERR (0 < error rate range < 1)' except ResponseError as e: - assert str(e) == f"CONFIG SET failed (possibly related to argument 'bf.bloom-fp-rate') - ERR (0 < error rate range < 1)"\ - and f"CONFIG SET failed (possibly related to argument 'bf.bloom-tightening-ratio') - ERR (0 < error rate range < 1)" + assert str(e) == f"CONFIG SET failed (possibly related to argument 'bf.bloom-fp-rate') - ERR (0 < error rate range < 1)" + try: + assert self.client.execute_command('CONFIG SET bf.bloom-tightening-ratio 1.75') == b'ERR (0 < tightening ratio range < 1)' + except ResponseError as e: + assert str(e) == f"CONFIG SET failed (possibly related to argument 'bf.bloom-tightening-ratio') - ERR (0 < tightening ratio range < 1)" diff --git a/tests/test_save_and_restore.py b/tests/test_save_and_restore.py index 1948cdf..b13e6c4 100644 --- a/tests/test_save_and_restore.py +++ b/tests/test_save_and_restore.py @@ -75,10 +75,10 @@ def test_restore_failed_large_bloom_filter(self): client = self.server.get_new_client() # Increase the max allowed size of a bloom filter per bloom object to 180MB. # Create a large bloom filter. - # When we try to restore this on a server with the default max allowed filter size of 64MB, start up should fail. + # When we try to restore this on a server with the default max allowed filter size of 128MB, start up should fail. updated_max_size = 180 * 1024 * 1024 original_max_size = 64 * 1024 * 1024 - bf_add_result_1 = client.execute_command('CONFIG SET bf.bloom-memory-limit-per-filter ' + str(updated_max_size)) + bf_add_result_1 = client.execute_command('CONFIG SET bf.bloom-memory-usage-limit ' + str(updated_max_size)) client.execute_command('BF.RESERVE testSave 0.001 100000000') assert int(client.execute_command('BF.INFO testSave size')) > original_max_size bf_add_result_1 = client.execute_command('BF.ADD testSave item') @@ -93,7 +93,7 @@ def test_restore_failed_large_bloom_filter(self): self.server.wait_for_save_done() self.server.restart(remove_rdb=False, remove_nodes_conf=False, connect_client=False) logfile = os.path.join(self.testdir, self.server.args["logfile"]) - large_obj_restore_err = "Failed to restore bloom object: Contains a filter larger than the max allowed size limit." + large_obj_restore_err = "Failed to restore bloom object: Object larger than the allowed memory limit" internal_rdb_err = "Internal error in RDB" self.wait_for_logfile(logfile, large_obj_restore_err) self.wait_for_logfile(logfile, internal_rdb_err)