diff --git a/src/bloom/command_handler.rs b/src/bloom/command_handler.rs index d32c3fb..80bd831 100644 --- a/src/bloom/command_handler.rs +++ b/src/bloom/command_handler.rs @@ -110,10 +110,12 @@ pub fn bloom_filter_add_value( None => { // Instantiate empty bloom filter. let fp_rate = configs::BLOOM_FP_RATE_DEFAULT; + let tightening_ratio = configs::TIGHTENING_RATIO; let capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed) as u32; let expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32; let mut bloom = match BloomFilterType::new_reserved( fp_rate, + tightening_ratio, capacity, expansion, validate_size_limit, @@ -227,6 +229,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke return Err(ValkeyError::Str(utils::BAD_ERROR_RATE)); } }; + curr_cmd_idx += 1; // Parse the capacity let capacity = match input_args[curr_cmd_idx].to_string_lossy().parse::() { Ok(num) if (BLOOM_CAPACITY_MIN..=BLOOM_CAPACITY_MAX).contains(&num) => num, @@ -275,8 +278,10 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke None => { // Skip bloom filter size validation on replicated cmds. let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED); + let tightening_ratio = configs::TIGHTENING_RATIO; let bloom = match BloomFilterType::new_reserved( fp_rate, + tightening_ratio, capacity, expansion, validate_size_limit, @@ -297,8 +302,9 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyResult { let argc = input_args.len(); + let validate_size_limit = ctx.get_flags().contains(ContextFlags::REPLICATED); // At the very least, we need: BF.INSERT ITEMS - if argc < 5 { + if !validate_size_limit && argc < 4 || validate_size_limit && argc < 5 { return Err(ValkeyError::WrongArity); } let mut idx = 1; @@ -328,7 +334,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey }; } "TIGHTENING" => { - if !ctx.get_flags().contains(ContextFlags::REPLICATED) { + if !validate_size_limit { return Err(ValkeyError::Str(utils::ERROR)); } idx += 1; @@ -420,6 +426,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey } let mut bloom = match BloomFilterType::new_reserved( fp_rate, + tightening_ratio, capacity, expansion, validate_size_limit, diff --git a/src/bloom/data_type.rs b/src/bloom/data_type.rs index 129f804..b187500 100644 --- a/src/bloom/data_type.rs +++ b/src/bloom/data_type.rs @@ -71,6 +71,9 @@ impl ValkeyDataType for BloomFilterType { let Ok(fp_rate) = raw::load_double(rdb) else { return None; }; + let Ok(tightening_ratio) = raw::load_double(rdb) else { + return None; + }; let mut filters: Vec = Vec::with_capacity(num_filters as usize); let Ok(tightening_ratio) = raw::load_double(rdb) else { diff --git a/src/bloom/utils.rs b/src/bloom/utils.rs index 51514e6..ccae178 100644 --- a/src/bloom/utils.rs +++ b/src/bloom/utils.rs @@ -81,6 +81,7 @@ impl BloomFilterType { /// Create a new BloomFilterType object. pub fn new_reserved( fp_rate: f64, + tightening_ratio: f64, capacity: u32, expansion: u32, validate_size_limit: bool, @@ -535,6 +536,14 @@ mod tests { fp_margin: f64, rand_prefix: &String, ) { + assert_eq!( + restored_bloom_filter_type.fp_rate, + original_bloom_filter_type.fp_rate + ); + assert_eq!( + restored_bloom_filter_type.tightening_ratio, + original_bloom_filter_type.tightening_ratio + ); assert_eq!( restored_bloom_filter_type.capacity(), original_bloom_filter_type.capacity() @@ -599,13 +608,19 @@ mod tests { let rand_prefix = random_prefix(7); // 1 in every 1000 operations is expected to be a false positive. let expected_fp_rate: f64 = 0.001; + let expected_tightening_ratio: f64 = 0.5; let initial_capacity = 10000; // Expansion of 0 indicates non scaling. let expansion = 0; // Validate the non scaling behavior of the bloom filter. - let mut bf = - BloomFilterType::new_reserved(expected_fp_rate, initial_capacity, expansion, true) - .expect("Expect bloom creation to succeed"); + let mut bf = BloomFilterType::new_reserved( + expected_fp_rate, + expected_tightening_ratio, + initial_capacity, + expansion, + true, + ) + .expect("Expect bloom creation to succeed"); let (error_count, add_operation_idx) = add_items_till_capacity(&mut bf, initial_capacity as i64, 1, &rand_prefix); assert_eq!( @@ -657,12 +672,18 @@ mod tests { let rand_prefix = random_prefix(7); // 1 in every 1000 operations is expected to be a false positive. let expected_fp_rate: f64 = 0.001; + let expected_tightening_ratio: f64 = 0.5; let initial_capacity = 10000; let expansion = 2; let num_filters_to_scale = 5; - let mut bf = - BloomFilterType::new_reserved(expected_fp_rate, initial_capacity, expansion, true) - .expect("Expect bloom creation to succeed"); + let mut bf = BloomFilterType::new_reserved( + expected_fp_rate, + expected_tightening_ratio, + initial_capacity, + expansion, + true, + ) + .expect("Expect bloom creation to succeed"); assert_eq!(bf.capacity(), initial_capacity as i64); assert_eq!(bf.cardinality(), 0); let mut total_error_count = 0; @@ -732,18 +753,18 @@ mod tests { fn test_exceeded_size_limit() { // Validate that bloom filter allocations within bloom objects are rejected if their memory usage would be beyond // the configured limit. - let result = BloomFilterType::new_reserved(0.5_f64, u32::MAX, 1, true); + let result = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, u32::MAX, 1, true); assert_eq!(result.err(), Some(BloomError::ExceedsMaxBloomSize)); let capacity = 50000000; assert!(!BloomFilter::validate_size(capacity, 0.001_f64)); - let result2 = BloomFilterType::new_reserved(0.001_f64, capacity, 1, true); + let result2 = BloomFilterType::new_reserved(0.001_f64, 0.5_f64, capacity, 1, true); assert_eq!(result2.err(), Some(BloomError::ExceedsMaxBloomSize)); } #[test] fn test_bf_encode_and_decode() { // arrange: prepare bloom filter - let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap(); + let mut bf = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true).unwrap(); let key = "key"; let _ = bf.add_item(key.as_bytes(), true); @@ -773,7 +794,7 @@ mod tests { #[test] fn test_bf_decode_when_unsupported_version_should_failed() { // arrange: prepare bloom filter - let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap(); + let mut bf = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true).unwrap(); let key = "key"; let _ = bf.add_item(key.as_bytes(), true).unwrap(); @@ -795,7 +816,7 @@ mod tests { #[test] fn test_bf_decode_when_bytes_is_empty_should_failed() { // arrange: prepare bloom filter - let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap(); + let mut bf = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true).unwrap(); let key = "key"; let _ = bf.add_item(key.as_bytes(), true); @@ -815,7 +836,7 @@ mod tests { #[test] fn test_bf_decode_when_bytes_is_exceed_limit_should_failed() { // arrange: prepare bloom filter - let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap(); + let mut bf = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true).unwrap(); let key = "key"; let _ = bf.add_item(key.as_bytes(), true); let origin_expansion = bf.expansion; @@ -857,7 +878,7 @@ mod tests { // 3. build a larger than 64mb filter let extra_large_filter = - BloomFilterType::new_reserved(0.01_f64, 57000000, 2, false).unwrap(); + BloomFilterType::new_reserved(0.01_f64, 0.5_f64, 57000000, 2, false).unwrap(); let vec = extra_large_filter.encode_bloom_filter().unwrap(); // should return error assert_eq!( diff --git a/tests/test_bloom_metrics.py b/tests/test_bloom_metrics.py index 3df634c..29eb4e6 100644 --- a/tests/test_bloom_metrics.py +++ b/tests/test_bloom_metrics.py @@ -4,7 +4,7 @@ from valkeytests.conftest import resource_port_tracker from util.waiters import * -DEFAULT_BLOOM_FILTER_SIZE = 179952 +DEFAULT_BLOOM_FILTER_SIZE = 179960 DEFAULT_BLOOM_FILTER_CAPACITY = 100000 class TestBloomMetrics(ValkeyBloomTestCaseBase): diff --git a/tests/test_replication.py b/tests/test_replication.py index 36012f2..749a753 100644 --- a/tests/test_replication.py +++ b/tests/test_replication.py @@ -34,7 +34,7 @@ def test_replication_behavior(self): ('BF.ADD', 'BF.ADD key item', 'BF.ADD key item1', 2), ('BF.MADD', 'BF.MADD key item', 'BF.MADD key item1', 2), ('BF.RESERVE', 'BF.RESERVE key 0.001 100000', 'BF.ADD key item1', 1), - ('BF.INSERT', 'BF.INSERT key items item', 'BF.INSERT key items item1', 2), + ('BF.INSERT', 'BF.INSERT key items item tightening 0.5', 'BF.INSERT key items item1 tightening 0.5', 2), ] for test_case in bloom_write_cmds: prefix = test_case[0]