From 25fedaf7e56dae439a7ad0ae50e995685b752a96 Mon Sep 17 00:00:00 2001 From: Karthik Subbarao Date: Tue, 26 Nov 2024 20:10:51 +0000 Subject: [PATCH] Support random seed per bloom object by default (configurable) Signed-off-by: Karthik Subbarao --- src/bloom/command_handler.rs | 6 + src/bloom/data_type.rs | 14 +- src/bloom/utils.rs | 526 ++++++++++++++------------ src/configs.rs | 8 + src/lib.rs | 1 + src/wrapper/bloom_callback.rs | 8 +- tests/conftest.py | 5 + tests/test_aofrewrite.py | 13 +- tests/test_basic.py | 10 +- tests/test_bloom_metrics.py | 27 +- tests/test_correctness.py | 2 +- tests/test_replication.py | 34 +- tests/valkey_bloom_test_case.py | 24 ++ tests/valkeytests/valkey_test_case.py | 2 +- 14 files changed, 394 insertions(+), 286 deletions(-) create mode 100644 tests/conftest.py diff --git a/src/bloom/command_handler.rs b/src/bloom/command_handler.rs index cd61763..2fe7464 100644 --- a/src/bloom/command_handler.rs +++ b/src/bloom/command_handler.rs @@ -112,10 +112,12 @@ pub fn bloom_filter_add_value( let fp_rate = configs::BLOOM_FP_RATE_DEFAULT; let capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed) as u32; let expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32; + let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed); let mut bloom = match BloomFilterType::new_reserved( fp_rate, capacity, expansion, + use_random_seed, validate_size_limit, ) { Ok(bf) => bf, @@ -274,12 +276,14 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke match value { Some(_) => Err(ValkeyError::Str(utils::ITEM_EXISTS)), None => { + let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed); // Skip bloom filter size validation on replicated cmds. let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED); let bloom = match BloomFilterType::new_reserved( fp_rate, capacity, expansion, + use_random_seed, validate_size_limit, ) { Ok(bf) => bf, @@ -403,10 +407,12 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey if nocreate { return Err(ValkeyError::Str(utils::NOT_FOUND)); } + let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed); let mut bloom = match BloomFilterType::new_reserved( fp_rate, capacity, expansion, + use_random_seed, validate_size_limit, ) { Ok(bf) => bf, diff --git a/src/bloom/data_type.rs b/src/bloom/data_type.rs index c8af83a..5ae8aab 100644 --- a/src/bloom/data_type.rs +++ b/src/bloom/data_type.rs @@ -1,5 +1,6 @@ use crate::bloom::utils::BloomFilter; use crate::bloom::utils::BloomFilterType; +use crate::configs; use crate::metrics::BLOOM_NUM_OBJECTS; use crate::metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES; use crate::wrapper::bloom_callback; @@ -27,7 +28,6 @@ pub static BLOOM_FILTER_TYPE: ValkeyType = ValkeyType::new( digest: Some(bloom_callback::bloom_digest), mem_usage: Some(bloom_callback::bloom_mem_usage), - // TODO free: Some(bloom_callback::bloom_free), aux_load: Some(bloom_callback::bloom_aux_load), @@ -72,7 +72,10 @@ impl ValkeyDataType for BloomFilterType { return None; }; let mut filters: Vec = Vec::with_capacity(num_filters as usize); - + let Ok(is_seed_random_u64) = raw::load_unsigned(rdb) else { + return None; + }; + let is_seed_random = is_seed_random_u64 == 1; for i in 0..num_filters { let Ok(bitmap) = raw::load_string_buffer(rdb) else { return None; @@ -90,7 +93,7 @@ impl ValkeyDataType for BloomFilterType { } }; if !BloomFilter::validate_size(capacity as u32, new_fp_rate) { - logging::log_warning("Failed to restore bloom object because it contains a filter larger than the max allowed size limit."); + logging::log_warning("Failed to restore bloom object: Contains a filter larger than the max allowed size limit."); return None; } // Only load num_items when it's the last filter @@ -104,6 +107,10 @@ impl ValkeyDataType for BloomFilterType { }; let filter = BloomFilter::from_existing(bitmap.as_ref(), num_items as u32, capacity as u32); + if !is_seed_random && filter.seed() != configs::FIXED_SEED { + logging::log_warning("Failed to restore bloom object: Object in fixed seed mode, but seed does not match fixed seed."); + return None; + } filters.push(filter); } BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add( @@ -114,6 +121,7 @@ impl ValkeyDataType for BloomFilterType { let item = BloomFilterType { expansion: expansion as u32, fp_rate, + is_seed_random, filters, }; Some(item) diff --git a/src/bloom/utils.rs b/src/bloom/utils.rs index f066d05..71aaa05 100644 --- a/src/bloom/utils.rs +++ b/src/bloom/utils.rs @@ -1,14 +1,11 @@ +use super::data_type::BLOOM_TYPE_VERSION; use crate::{ - configs::{ - self, BLOOM_EXPANSION_MAX, BLOOM_EXPANSION_MIN, BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN, - }, + configs::{self, BLOOM_EXPANSION_MAX, BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN}, metrics, }; use bloomfilter; use bloomfilter::{deserialize, serialize}; use serde::{Deserialize, Serialize}; - -use super::data_type::BLOOM_TYPE_VERSION; use std::{mem, sync::atomic::Ordering}; /// KeySpace Notification Events @@ -70,6 +67,7 @@ impl BloomError { pub struct BloomFilterType { pub expansion: u32, pub fp_rate: f64, + pub is_seed_random: bool, pub filters: Vec, } @@ -79,6 +77,7 @@ impl BloomFilterType { fp_rate: f64, capacity: u32, expansion: u32, + use_random_seed: bool, validate_size_limit: bool, ) -> Result { // Reject the request, if the operation will result in creation of a bloom object containing a filter @@ -91,14 +90,17 @@ impl BloomFilterType { mem::size_of::(), std::sync::atomic::Ordering::Relaxed, ); - // Create the bloom filter and add to the main BloomFilter object. - let bloom = BloomFilter::new(fp_rate, capacity); + let bloom = match use_random_seed { + true => BloomFilter::with_random_seed(fp_rate, capacity), + false => BloomFilter::with_fixed_seed(fp_rate, capacity, &configs::FIXED_SEED), + }; let filters = vec![bloom]; let bloom = BloomFilterType { expansion, fp_rate, filters, + is_seed_random: use_random_seed, }; Ok(bloom) } @@ -118,6 +120,7 @@ impl BloomFilterType { BloomFilterType { expansion: from_bf.expansion, fp_rate: from_bf.fp_rate, + is_seed_random: from_bf.is_seed_random, filters, } } @@ -163,6 +166,15 @@ impl BloomFilterType { capacity } + /// Return the seed used by the Bloom object. Every filter in the bloom object uses the same seed as the + /// first filter regardless if the seed is fixed or randomly generated. + pub fn seed(&self) -> [u8; 32] { + self.filters + .first() + .expect("Every BloomObject is expected to have at least one filter") + .seed() + } + /// Add an item to the BloomFilterType object. /// If scaling is enabled, this can result in a new sub filter creation. pub fn add_item(&mut self, item: &[u8], validate_size_limit: bool) -> Result { @@ -204,12 +216,12 @@ impl BloomFilterType { if validate_size_limit && !BloomFilter::validate_size(new_capacity, new_fp_rate) { return Err(BloomError::ExceedsMaxBloomSize); } - let mut new_filter = BloomFilter::new(new_fp_rate, new_capacity); + let seed = self.seed(); + let mut new_filter = BloomFilter::with_fixed_seed(new_fp_rate, new_capacity, &seed); // Add item. new_filter.set(item); new_filter.num_items += 1; self.filters.push(new_filter); - metrics::BLOOM_NUM_ITEMS_ACROSS_OBJECTS .fetch_add(1, std::sync::atomic::Ordering::Relaxed); return Ok(1); @@ -252,39 +264,46 @@ impl BloomFilterType { 1 => { // always use new version to init bloomFilterType. // This is to ensure that the new fields can be recognized when the object is serialized and deserialized in the future. - let (expansion, fp_rate, filters): (u32, f64, Vec) = - match bincode::deserialize::<(u32, f64, Vec)>(&decoded_bytes[1..]) - { - Ok(values) => { - if !(BLOOM_EXPANSION_MIN..=BLOOM_EXPANSION_MAX).contains(&values.0) { - return Err(BloomError::BadExpansion); - } - if !(values.1 > BLOOM_FP_RATE_MIN && values.1 < BLOOM_FP_RATE_MAX) { - return Err(BloomError::ErrorRateRange); - } - if values.2.len() >= configs::MAX_FILTERS_PER_OBJ as usize { - return Err(BloomError::MaxNumScalingFilters); - } - for _filter in values.2.iter() { - // Reject the request, if the operation will result in creation of a filter of size greater than what is allowed. - if validate_size_limit - && _filter.number_of_bytes() - > configs::BLOOM_MEMORY_LIMIT_PER_FILTER - .load(Ordering::Relaxed) - as usize - { - return Err(BloomError::ExceedsMaxBloomSize); - } - } - values + let (expansion, fp_rate, is_seed_random, filters): ( + u32, + f64, + bool, + Vec, + ) = match bincode::deserialize::<(u32, f64, bool, Vec)>( + &decoded_bytes[1..], + ) { + Ok(values) => { + // Expansion ratio can range from 0 to BLOOM_EXPANSION_MAX as we internally set this to 0 + // in case of non scaling filters. + if !(0..=BLOOM_EXPANSION_MAX).contains(&values.0) { + return Err(BloomError::BadExpansion); + } + if !(values.1 > BLOOM_FP_RATE_MIN && values.1 < BLOOM_FP_RATE_MAX) { + return Err(BloomError::ErrorRateRange); } - Err(_) => { - return Err(BloomError::DecodeBloomFilterFailed); + if values.3.len() >= configs::MAX_FILTERS_PER_OBJ as usize { + return Err(BloomError::MaxNumScalingFilters); } - }; + for _filter in values.3.iter() { + // Reject the request, if the operation will result in creation of a filter of size greater than what is allowed. + if validate_size_limit + && _filter.number_of_bytes() + > configs::BLOOM_MEMORY_LIMIT_PER_FILTER.load(Ordering::Relaxed) + as usize + { + return Err(BloomError::ExceedsMaxBloomSize); + } + } + values + } + Err(_) => { + return Err(BloomError::DecodeBloomFilterFailed); + } + }; let item = BloomFilterType { expansion, fp_rate, + is_seed_random, filters, }; // add bloom filter type metrics. @@ -308,10 +327,8 @@ impl BloomFilterType { metrics::BLOOM_CAPACITY_ACROSS_OBJECTS .fetch_add(filter.capacity.into(), std::sync::atomic::Ordering::Relaxed); } - Ok(item) } - _ => Err(BloomError::DecodeUnsupportedVersion), } } @@ -332,25 +349,30 @@ pub struct BloomFilter { } impl BloomFilter { - /// Instantiate empty BloomFilter object. - pub fn new(fp_rate: f64, capacity: u32) -> BloomFilter { - let bloom = bloomfilter::Bloom::new_for_fp_rate_with_seed( - capacity as usize, - fp_rate, - &configs::FIXED_SEED, - ) - .expect("We expect bloomfilter::Bloom<[u8]> creation to succeed"); + /// Instantiate empty BloomFilter object with a fixed seed used to create sip keys. + pub fn with_fixed_seed(fp_rate: f64, capacity: u32, fixed_seed: &[u8; 32]) -> BloomFilter { + let bloom = + bloomfilter::Bloom::new_for_fp_rate_with_seed(capacity as usize, fp_rate, fixed_seed) + .expect("We expect bloomfilter::Bloom<[u8]> creation to succeed"); let fltr = BloomFilter { bloom, num_items: 0, capacity, }; - metrics::BLOOM_NUM_FILTERS_ACROSS_OBJECTS - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES - .fetch_add(fltr.number_of_bytes(), std::sync::atomic::Ordering::Relaxed); - metrics::BLOOM_CAPACITY_ACROSS_OBJECTS - .fetch_add(capacity.into(), std::sync::atomic::Ordering::Relaxed); + fltr.incr_metrics_on_new_create(); + fltr + } + + /// Instantiate empty BloomFilter object with a randomly generated seed used to create sip keys. + pub fn with_random_seed(fp_rate: f64, capacity: u32) -> BloomFilter { + let bloom = bloomfilter::Bloom::new_for_fp_rate(capacity as usize, fp_rate) + .expect("We expect bloomfilter::Bloom<[u8]> creation to succeed"); + let fltr = BloomFilter { + bloom, + num_items: 0, + capacity, + }; + fltr.incr_metrics_on_new_create(); fltr } @@ -364,15 +386,29 @@ impl BloomFilter { num_items, capacity, }; + fltr.incr_metrics_on_new_create(); + metrics::BLOOM_NUM_ITEMS_ACROSS_OBJECTS + .fetch_add(num_items.into(), std::sync::atomic::Ordering::Relaxed); + fltr + } + + /// Create a new BloomFilter from an existing BloomFilter object (COPY command). + pub fn create_copy_from(bf: &BloomFilter) -> BloomFilter { + BloomFilter::from_existing(&bf.bloom.to_bytes(), bf.num_items, bf.capacity) + } + + fn incr_metrics_on_new_create(&self) { metrics::BLOOM_NUM_FILTERS_ACROSS_OBJECTS .fetch_add(1, std::sync::atomic::Ordering::Relaxed); metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES - .fetch_add(fltr.number_of_bytes(), std::sync::atomic::Ordering::Relaxed); - metrics::BLOOM_NUM_ITEMS_ACROSS_OBJECTS - .fetch_add(num_items.into(), std::sync::atomic::Ordering::Relaxed); + .fetch_add(self.number_of_bytes(), std::sync::atomic::Ordering::Relaxed); metrics::BLOOM_CAPACITY_ACROSS_OBJECTS - .fetch_add(capacity.into(), std::sync::atomic::Ordering::Relaxed); - fltr + .fetch_add(self.capacity.into(), std::sync::atomic::Ordering::Relaxed); + } + + /// Return the seed used by the sip hasher of the raw bloom. + pub fn seed(&self) -> [u8; 32] { + self.bloom.seed() } pub fn number_of_bytes(&self) -> usize { @@ -398,11 +434,6 @@ impl BloomFilter { pub fn set(&mut self, item: &[u8]) { self.bloom.set(item) } - - /// Create a new BloomFilter from an existing BloomFilter object (COPY command). - pub fn create_copy_from(bf: &BloomFilter) -> BloomFilter { - BloomFilter::from_existing(&bf.bloom.to_bytes(), bf.num_items, bf.capacity) - } } impl Drop for BloomFilterType { @@ -430,7 +461,7 @@ impl Drop for BloomFilter { #[cfg(test)] mod tests { use super::*; - use configs::FIXED_SEED; + use configs; use rand::{distributions::Alphanumeric, Rng}; /// Returns random string with specified number of characters. @@ -520,6 +551,33 @@ mod tests { fp_margin: f64, rand_prefix: &String, ) { + let is_seed_random = original_bloom_filter_type.is_seed_random; + assert_eq!( + restored_bloom_filter_type.is_seed_random, + original_bloom_filter_type.is_seed_random + ); + let original_filter_seed = original_bloom_filter_type.filters.first().unwrap().seed(); + assert_eq!(original_filter_seed, original_bloom_filter_type.seed(),); + if is_seed_random { + assert_ne!(original_filter_seed, configs::FIXED_SEED); + assert!(restored_bloom_filter_type + .filters + .iter() + .all(|restore_filter| original_bloom_filter_type + .filters + .iter() + .any(|filter| (filter.seed() == restore_filter.seed()) + && (restore_filter.seed() == original_filter_seed)))); + } else { + assert!(restored_bloom_filter_type + .filters + .iter() + .all(|restore_filter| original_bloom_filter_type + .filters + .iter() + .any(|filter| (filter.seed() == restore_filter.seed()) + && (restore_filter.seed() == configs::FIXED_SEED)))); + } assert_eq!( restored_bloom_filter_type.capacity(), original_bloom_filter_type.capacity() @@ -536,16 +594,6 @@ mod tests { restored_bloom_filter_type.memory_usage(), original_bloom_filter_type.memory_usage() ); - assert!(restored_bloom_filter_type - .filters - .iter() - .all(|restore_filter| original_bloom_filter_type - .filters - .iter() - .any( - |filter| (filter.bloom.seed() == restore_filter.bloom.seed()) - && (restore_filter.bloom.seed() == FIXED_SEED) - ))); assert!(restored_bloom_filter_type .filters .iter() @@ -581,183 +629,199 @@ mod tests { #[test] fn test_non_scaling_filter() { - let rand_prefix = random_prefix(7); - // 1 in every 1000 operations is expected to be a false positive. - let expected_fp_rate: f64 = 0.001; - let initial_capacity = 10000; - // Expansion of 0 indicates non scaling. - let expansion = 0; - // Validate the non scaling behavior of the bloom filter. - let mut bf = - BloomFilterType::new_reserved(expected_fp_rate, initial_capacity, expansion, true) - .expect("Expect bloom creation to succeed"); - let (error_count, add_operation_idx) = - add_items_till_capacity(&mut bf, initial_capacity as i64, 1, &rand_prefix); - assert_eq!( - bf.add_item(b"new_item", true), - Err(BloomError::NonScalingFilterFull) - ); - assert_eq!(bf.capacity(), initial_capacity as i64); - assert_eq!(bf.cardinality(), initial_capacity as i64); - let expected_free_effort = 1; - assert_eq!(bf.free_effort(), expected_free_effort); - assert!(bf.memory_usage() > 0); - // Use a margin on the expected_fp_rate when asserting for correctness. - let fp_margin = 0.002; - // Validate that item "add" operations on bloom filters are ensuring correctness. - fp_assert(error_count, add_operation_idx, expected_fp_rate, fp_margin); - // Validate item "exists" operations on bloom filters are ensuring correctness. - // This tests for items already added to the filter and expects them to exist. - let (error_count, _) = check_items_exist(&bf, 1, add_operation_idx, true, &rand_prefix); - assert!(error_count == 0); - // This tests for items which are not added to the filter and expects them to not exist. - let (error_count, num_operations) = check_items_exist( - &bf, - add_operation_idx + 1, - add_operation_idx * 2, - false, - &rand_prefix, - ); - // Validate that the real fp_rate is not much more than the configured fp_rate. - fp_assert(error_count, num_operations, expected_fp_rate, fp_margin); - - // Verify restore - let mut restore_bf = BloomFilterType::create_copy_from(&bf); - assert_eq!( - restore_bf.add_item(b"new_item", true), - Err(BloomError::NonScalingFilterFull) - ); - verify_restored_items( - &bf, - &restore_bf, - add_operation_idx, - expected_fp_rate, - fp_margin, - &rand_prefix, - ); + let is_seed_random_vec = [true, false]; + for is_seed_random in is_seed_random_vec { + let rand_prefix = random_prefix(7); + // 1 in every 1000 operations is expected to be a false positive. + let expected_fp_rate: f64 = 0.001; + let initial_capacity = 10000; + // Expansion of 0 indicates non scaling. + let expansion = 0; + // Validate the non scaling behavior of the bloom filter. + let mut bf = BloomFilterType::new_reserved( + expected_fp_rate, + initial_capacity, + expansion, + is_seed_random, + true, + ) + .expect("Expect bloom creation to succeed"); + let (error_count, add_operation_idx) = + add_items_till_capacity(&mut bf, initial_capacity as i64, 1, &rand_prefix); + assert_eq!( + bf.add_item(b"new_item", true), + Err(BloomError::NonScalingFilterFull) + ); + assert_eq!(bf.capacity(), initial_capacity as i64); + assert_eq!(bf.cardinality(), initial_capacity as i64); + let expected_free_effort = 1; + assert_eq!(bf.free_effort(), expected_free_effort); + assert!(bf.memory_usage() > 0); + // Use a margin on the expected_fp_rate when asserting for correctness. + let fp_margin = 0.002; + // Validate that item "add" operations on bloom filters are ensuring correctness. + fp_assert(error_count, add_operation_idx, expected_fp_rate, fp_margin); + // Validate item "exists" operations on bloom filters are ensuring correctness. + // This tests for items already added to the filter and expects them to exist. + let (error_count, _) = check_items_exist(&bf, 1, add_operation_idx, true, &rand_prefix); + assert!(error_count == 0); + // This tests for items which are not added to the filter and expects them to not exist. + let (error_count, num_operations) = check_items_exist( + &bf, + add_operation_idx + 1, + add_operation_idx * 2, + false, + &rand_prefix, + ); + // Validate that the real fp_rate is not much more than the configured fp_rate. + fp_assert(error_count, num_operations, expected_fp_rate, fp_margin); + // Verify restore + let mut restore_bf = BloomFilterType::create_copy_from(&bf); + assert_eq!( + restore_bf.add_item(b"new_item", true), + Err(BloomError::NonScalingFilterFull) + ); + verify_restored_items( + &bf, + &restore_bf, + add_operation_idx, + expected_fp_rate, + fp_margin, + &rand_prefix, + ); + } } #[test] fn test_scaling_filter() { - let rand_prefix = random_prefix(7); - // 1 in every 1000 operations is expected to be a false positive. - let expected_fp_rate: f64 = 0.001; - let initial_capacity = 10000; - let expansion = 2; - let num_filters_to_scale = 5; - let mut bf = - BloomFilterType::new_reserved(expected_fp_rate, initial_capacity, expansion, true) - .expect("Expect bloom creation to succeed"); - assert_eq!(bf.capacity(), initial_capacity as i64); - assert_eq!(bf.cardinality(), 0); - let mut total_error_count = 0; - let mut add_operation_idx = 0; - // Validate the scaling behavior of the bloom filter. - for filter_idx in 1..=num_filters_to_scale { - let expected_total_capacity = initial_capacity * (expansion.pow(filter_idx) - 1); - let (error_count, new_add_operation_idx) = add_items_till_capacity( - &mut bf, - expected_total_capacity as i64, + let is_seed_random_vec = [true, false]; + for is_seed_random in is_seed_random_vec { + let rand_prefix = random_prefix(7); + // 1 in every 1000 operations is expected to be a false positive. + let expected_fp_rate: f64 = 0.001; + let initial_capacity = 10000; + let expansion = 2; + let num_filters_to_scale = 5; + let mut bf = BloomFilterType::new_reserved( + expected_fp_rate, + initial_capacity, + expansion, + is_seed_random, + true, + ) + .expect("Expect bloom creation to succeed"); + assert_eq!(bf.capacity(), initial_capacity as i64); + assert_eq!(bf.cardinality(), 0); + let mut total_error_count = 0; + let mut add_operation_idx = 0; + // Validate the scaling behavior of the bloom filter. + for filter_idx in 1..=num_filters_to_scale { + let expected_total_capacity = initial_capacity * (expansion.pow(filter_idx) - 1); + let (error_count, new_add_operation_idx) = add_items_till_capacity( + &mut bf, + expected_total_capacity as i64, + add_operation_idx + 1, + &rand_prefix, + ); + add_operation_idx = new_add_operation_idx; + total_error_count += error_count; + assert_eq!(bf.capacity(), expected_total_capacity as i64); + assert_eq!(bf.cardinality(), expected_total_capacity as i64); + let expected_free_effort = filter_idx as usize; + assert_eq!(bf.free_effort(), expected_free_effort); + assert!(bf.memory_usage() > 0); + } + // Use a margin on the expected_fp_rate when asserting for correctness. + let fp_margin = 0.002; + // Validate that item "add" operations on bloom filters are ensuring correctness. + fp_assert( + total_error_count, + add_operation_idx, + expected_fp_rate, + fp_margin, + ); + // Validate item "exists" operations on bloom filters are ensuring correctness. + // This tests for items already added to the filter and expects them to exist. + let (error_count, _) = check_items_exist(&bf, 1, add_operation_idx, true, &rand_prefix); + assert!(error_count == 0); + // This tests for items which are not added to the filter and expects them to not exist. + let (error_count, num_operations) = check_items_exist( + &bf, add_operation_idx + 1, + add_operation_idx * 2, + false, + &rand_prefix, + ); + // Validate that the real fp_rate is not much more than the configured fp_rate. + fp_assert(error_count, num_operations, expected_fp_rate, fp_margin); + // Verify restore + let restore_bloom_filter_type = BloomFilterType::create_copy_from(&bf); + verify_restored_items( + &bf, + &restore_bloom_filter_type, + add_operation_idx, + expected_fp_rate, + fp_margin, &rand_prefix, ); - add_operation_idx = new_add_operation_idx; - total_error_count += error_count; - assert_eq!(bf.capacity(), expected_total_capacity as i64); - assert_eq!(bf.cardinality(), expected_total_capacity as i64); - let expected_free_effort = filter_idx as usize; - assert_eq!(bf.free_effort(), expected_free_effort); - assert!(bf.memory_usage() > 0); } - // Use a margin on the expected_fp_rate when asserting for correctness. - let fp_margin = 0.002; - // Validate that item "add" operations on bloom filters are ensuring correctness. - fp_assert( - total_error_count, - add_operation_idx, - expected_fp_rate, - fp_margin, - ); - // Validate item "exists" operations on bloom filters are ensuring correctness. - // This tests for items already added to the filter and expects them to exist. - let (error_count, _) = check_items_exist(&bf, 1, add_operation_idx, true, &rand_prefix); - assert!(error_count == 0); - // This tests for items which are not added to the filter and expects them to not exist. - let (error_count, num_operations) = check_items_exist( - &bf, - add_operation_idx + 1, - add_operation_idx * 2, - false, - &rand_prefix, - ); - // Validate that the real fp_rate is not much more than the configured fp_rate. - fp_assert(error_count, num_operations, expected_fp_rate, fp_margin); - - // Verify restore - let restore_bloom_filter_type = BloomFilterType::create_copy_from(&bf); - verify_restored_items( - &bf, - &restore_bloom_filter_type, - add_operation_idx, - expected_fp_rate, - fp_margin, - &rand_prefix, - ); } #[test] fn test_seed() { - // The value of sip keys generated by the sip_keys with fixed seed should be equal to the constant in configs.rs - let test_bloom_filter = BloomFilter::new(0.5_f64, 1000_u32); - let seed = test_bloom_filter.bloom.seed(); - assert_eq!(seed, FIXED_SEED); + // When using the with_fixed_seed API, the sip keys generated should be equal to the constants from configs.rs + let test_bloom_filter1 = + BloomFilter::with_fixed_seed(0.5_f64, 1000_u32, &configs::FIXED_SEED); + let test_seed1 = test_bloom_filter1.seed(); + assert_eq!(test_seed1, configs::FIXED_SEED); + // When using the with_random_seed API, the sip keys generated should not be equal to the constant sip_keys. + let test_bloom_filter2 = BloomFilter::with_random_seed(0.5_f64, 1000_u32); + let test_seed2 = test_bloom_filter2.seed(); + assert_ne!(test_seed2, configs::FIXED_SEED); } #[test] fn test_exceeded_size_limit() { // Validate that bloom filter allocations within bloom objects are rejected if their memory usage would be beyond // the configured limit. - let result = BloomFilterType::new_reserved(0.5_f64, u32::MAX, 1, true); + let result = BloomFilterType::new_reserved(0.5_f64, u32::MAX, 1, true, true); assert_eq!(result.err(), Some(BloomError::ExceedsMaxBloomSize)); let capacity = 50000000; assert!(!BloomFilter::validate_size(capacity, 0.001_f64)); - let result2 = BloomFilterType::new_reserved(0.001_f64, capacity, 1, true); + let result2 = BloomFilterType::new_reserved(0.001_f64, capacity, 1, true, true); assert_eq!(result2.err(), Some(BloomError::ExceedsMaxBloomSize)); } #[test] fn test_bf_encode_and_decode() { - // arrange: prepare bloom filter - let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap(); - let key = "key"; - let _ = bf.add_item(key.as_bytes(), true); - - // action - let encoder_result = bf.encode_bloom_filter(); - - // assert - // encoder sucess - assert!(encoder_result.is_ok()); - let vec = encoder_result.unwrap(); - - // assert decode: - let new_bf_result = BloomFilterType::decode_bloom_filter(&vec, true); - - let new_bf = new_bf_result.unwrap(); - - // verify new_bf and bf - assert_eq!(bf.fp_rate, new_bf.fp_rate); - assert_eq!(bf.expansion, new_bf.expansion); - assert_eq!(bf.capacity(), new_bf.capacity()); - - // contains key - assert!(new_bf.item_exists(key.as_bytes())); + // Validate with non scaling (0) and scaling (2). + let test_expansions = [0, 2]; + for expansion in test_expansions { + let mut bf = + BloomFilterType::new_reserved(0.5_f64, 1000_u32, expansion, true, true).unwrap(); + let item = "item1"; + let _ = bf.add_item(item.as_bytes(), true); + // action + let encoder_result = bf.encode_bloom_filter(); + // assert encode success + assert!(encoder_result.is_ok()); + let vec = encoder_result.unwrap(); + // assert decode success: + let new_bf_result = BloomFilterType::decode_bloom_filter(&vec, true); + let new_bf = new_bf_result.unwrap(); + // verify new_bf and bf + assert_eq!(bf.fp_rate, new_bf.fp_rate); + assert_eq!(bf.expansion, new_bf.expansion); + assert_eq!(bf.capacity(), new_bf.capacity()); + // verify item1 exists. + assert!(new_bf.item_exists(item.as_bytes())); + } } #[test] fn test_bf_decode_when_unsupported_version_should_failed() { // arrange: prepare bloom filter - let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap(); + let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true, true).unwrap(); let key = "key"; let _ = bf.add_item(key.as_bytes(), true).unwrap(); @@ -779,7 +843,7 @@ mod tests { #[test] fn test_bf_decode_when_bytes_is_empty_should_failed() { // arrange: prepare bloom filter - let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap(); + let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true, true).unwrap(); let key = "key"; let _ = bf.add_item(key.as_bytes(), true); @@ -799,26 +863,12 @@ mod tests { #[test] fn test_bf_decode_when_bytes_is_exceed_limit_should_failed() { // arrange: prepare bloom filter - let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap(); + let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true, true).unwrap(); let key = "key"; let _ = bf.add_item(key.as_bytes(), true); let origin_expansion = bf.expansion; let origin_fp_rate = bf.fp_rate; - // unsupoort expansion - bf.expansion = 0; - - let encoder_result = bf.encode_bloom_filter(); - - // 1. unsupport expansion - let vec = encoder_result.unwrap(); - // assert decode: - // should return error - assert_eq!( - BloomFilterType::decode_bloom_filter(&vec, true).err(), - Some(BloomError::BadExpansion) - ); - - // 1.2 Exceeded the maximum expansion + // 1. Exceeded the maximum expansion bf.expansion = BLOOM_EXPANSION_MAX + 1; let vec = bf.encode_bloom_filter().unwrap(); @@ -841,7 +891,7 @@ mod tests { // 3. build a larger than 64mb filter let extra_large_filter = - BloomFilterType::new_reserved(0.01_f64, 57000000, 2, false).unwrap(); + BloomFilterType::new_reserved(0.01_f64, 57000000, 2, true, false).unwrap(); let vec = extra_large_filter.encode_bloom_filter().unwrap(); // should return error assert_eq!( diff --git a/src/configs.rs b/src/configs.rs index db83832..0c0f156 100644 --- a/src/configs.rs +++ b/src/configs.rs @@ -1,4 +1,5 @@ use lazy_static::lazy_static; +use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicI64; /// Configurations @@ -14,6 +15,8 @@ pub const BLOOM_FP_RATE_DEFAULT: f64 = 0.001; pub const BLOOM_FP_RATE_MIN: f64 = 0.0; pub const BLOOM_FP_RATE_MAX: f64 = 1.0; +pub const BLOOM_USE_RANDOM_SEED_DEFAULT: bool = true; + // Max Memory usage allowed per bloom filter within a bloom object (64MB). // Beyond this threshold, a bloom object is classified as large and is exempt from defrag operations. // Also, write operations that result in bloom object allocation larger than this size will be rejected. @@ -26,6 +29,7 @@ lazy_static! { pub static ref BLOOM_EXPANSION: AtomicI64 = AtomicI64::new(BLOOM_EXPANSION_DEFAULT); pub static ref BLOOM_MEMORY_LIMIT_PER_FILTER: AtomicI64 = AtomicI64::new(BLOOM_MEMORY_LIMIT_PER_FILTER_DEFAULT); + pub static ref BLOOM_USE_RANDOM_SEED: AtomicBool = AtomicBool::default(); } /// Constants @@ -44,3 +48,7 @@ pub const FIXED_SIP_KEY_ONE_A: u64 = 15713473521876537177; pub const FIXED_SIP_KEY_ONE_B: u64 = 15671187751654921383; pub const FIXED_SIP_KEY_TWO_A: u64 = 9766223185946773789; pub const FIXED_SIP_KEY_TWO_B: u64 = 9453907914610147120; +pub const SIP_KEYS: [(u64, u64); 2] = [ + (FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B), + (FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B), +]; diff --git a/src/lib.rs b/src/lib.rs index 0b1b9c3..8f5b971 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -105,6 +105,7 @@ valkey_module! { string: [ ], bool: [ + ["bloom-use-random-seed", &*configs::BLOOM_USE_RANDOM_SEED, configs::BLOOM_USE_RANDOM_SEED_DEFAULT, ConfigurationFlags::DEFAULT, None], ], enum: [ ], diff --git a/src/wrapper/bloom_callback.rs b/src/wrapper/bloom_callback.rs index da9e21d..c3e1652 100644 --- a/src/wrapper/bloom_callback.rs +++ b/src/wrapper/bloom_callback.rs @@ -21,6 +21,11 @@ pub unsafe extern "C" fn bloom_rdb_save(rdb: *mut raw::RedisModuleIO, value: *mu raw::save_unsigned(rdb, v.filters.len() as u64); raw::save_unsigned(rdb, v.expansion as u64); raw::save_double(rdb, v.fp_rate); + let mut is_seed_random = 0; + if v.is_seed_random { + is_seed_random = 1; + } + raw::save_unsigned(rdb, is_seed_random); let filter_list = &v.filters; let mut filter_list_iter = filter_list.iter().peekable(); while let Some(filter) = filter_list_iter.next() { @@ -47,6 +52,7 @@ pub unsafe extern "C" fn bloom_rdb_load( let bb = Box::new(item); Box::into_raw(bb).cast::() } else { + // TODO: Add error logging here. null_mut() } } @@ -120,7 +126,7 @@ pub unsafe extern "C" fn bloom_copy( /// # Safety /// Raw handler for the Bloom digest callback. pub unsafe extern "C" fn bloom_digest(md: *mut raw::RedisModuleDigest, value: *mut c_void) { - let mut dig = Digest::new(md); + let dig = Digest::new(md); let val = &*(value.cast::()); val.debug_digest(dig); } diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..9b2c832 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,5 @@ +import pytest + +@pytest.fixture(params=['random-seed', 'fixed-seed']) +def bloom_config_parameterization(request): + return request.param \ No newline at end of file diff --git a/tests/test_aofrewrite.py b/tests/test_aofrewrite.py index fc94a1d..0f62fcf 100644 --- a/tests/test_aofrewrite.py +++ b/tests/test_aofrewrite.py @@ -8,7 +8,7 @@ class TestBloomAofRewrite(ValkeyBloomTestCaseBase): def get_custom_args(self): # test aof rewrite should avoid bloom filter override as rdb. use aof args = super().get_custom_args() - args.update({'aof-use-rdb-preamble': 'no', 'appendonly': 'yes', 'appenddirname': 'aof-{}'.format(self.port)}) + args.update({'aof-use-rdb-preamble': 'no', 'appendonly': 'yes'}) return args def test_basic_aofrewrite_and_restore(self): @@ -47,22 +47,17 @@ def test_basic_aofrewrite_and_restore(self): client.execute_command('DEL testSave') def test_aofrewrite_bloomfilter_metrics(self): + # Create scaled bloom filter and add 7500 items to trigger a scale out. self.client.execute_command('BF.RESERVE key1 0.001 7000') - # We use a number greater than 7000 in order to have a buffer for any false positives - variables = [f"key{i+1}" for i in range(7500)] - - # Get original size to compare against size after scaled info_obj = self.client.execute_command('BF.INFO key1') - # Add keys until bloomfilter will scale out - for var in variables: - self.client.execute_command(f'BF.ADD key1 {var}') + self.add_items_till_capacity(self.client, "key1", 7500, 1, "item_prefix") # cmd debug digest server_digest = self.client.debug_digest() assert server_digest != None or 0000000000000000000000000000000000000000 object_digest = self.client.execute_command('DEBUG DIGEST-VALUE key1') - # save aof, restart sever + # save aof, restart server self.client.bgrewriteaof() self.server.wait_for_action_done(ValkeyAction.AOF_REWRITE) # restart server diff --git a/tests/test_basic.py b/tests/test_basic.py index 35a3510..9298f42 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -280,14 +280,20 @@ def test_debug_cmd(self): assert scenario4_obj != default_obj assert scenario4_object_digest != default_object_digest - # scenario5 validates that digest is equal on bloom objects with same properties and same items. + # scenario5 validates that digest is equal on bloom objects with same properties and same items only when we are + # using a fixed seed. Not when we are using a random seed. + is_random_seed = client.execute_command('CONFIG GET bf.bloom-use-random-seed') scenario5_obj = client.execute_command('BF.INSERT scenario5 error 0.001 capacity 1000 items 1') scenario5_object_digest = client.execute_command('DEBUG DIGEST-VALUE scenario5') assert scenario5_obj != default_obj assert scenario5_object_digest != default_object_digest + # Add the same items to both the original and the new bloom object. client.execute_command('BF.MADD default_obj 1 2 3') client.execute_command('BF.MADD scenario5 2 3') madd_default_object_digest = client.execute_command('DEBUG DIGEST-VALUE default_obj') madd_scenario_object_digest = client.execute_command('DEBUG DIGEST-VALUE scenario5') - assert madd_scenario_object_digest == madd_default_object_digest + if is_random_seed[1] == b'yes': + assert madd_scenario_object_digest != madd_default_object_digest + else: + madd_scenario_object_digest == madd_default_object_digest diff --git a/tests/test_bloom_metrics.py b/tests/test_bloom_metrics.py index 3df634c..4f33ed1 100644 --- a/tests/test_bloom_metrics.py +++ b/tests/test_bloom_metrics.py @@ -13,13 +13,13 @@ def test_basic_command_metrics(self): self.verify_bloom_metrics(self.client.execute_command("INFO bf"), 0, 0, 0, 0, 0) self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), 0, 0, 0, 0, 0) - # Create a default bloom filter and check its metrics values are correct - assert(self.client.execute_command('BF.ADD key item') == 1) + # Create a default bloom filter, add an item and check its metrics values are correct + self.add_items_till_capacity(self.client, "key", 1, 1, "item") self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE, 1, 1, 1, DEFAULT_BLOOM_FILTER_CAPACITY) self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE, 1, 1, 1, DEFAULT_BLOOM_FILTER_CAPACITY) # Check that other commands don't influence metrics - assert(self.client.execute_command('BF.EXISTS key item') == 1) + assert(self.client.execute_command('BF.EXISTS key item1') == 1) assert(self.client.execute_command('BF.ADD key item2') == 1) assert(self.client.execute_command('BF.MADD key item3 item4')== [1, 1]) assert(self.client.execute_command('BF.MEXISTS key item3 item5')== [1, 0]) @@ -85,8 +85,7 @@ def test_scaled_bloomfilter_metrics(self): # Get original size to compare against size after scaled info_obj = self.client.execute_command('BF.INFO key1') # Add keys until bloomfilter will scale out - for var in variables: - self.client.execute_command(f'BF.ADD key1 {var}') + self.add_items_till_capacity(self.client, "key1", 7500, 1, "item_prefix") # Check info for scaled bloomfilter matches metrics data for bloomfilter new_info_obj = self.client.execute_command(f'BF.INFO key1') @@ -101,28 +100,26 @@ def test_scaled_bloomfilter_metrics(self): def test_copy_metrics(self): - # Create a bloomfilter and copy it - assert(self.client.execute_command('BF.ADD key{123} item') == 1) - assert(self.client.execute_command('COPY key{123} copiedkey{123}') == 1) + # Create a bloomfilter, add one item and copy it + self.add_items_till_capacity(self.client, "originalKey", 1, 1, "item_prefix") + assert(self.client.execute_command('COPY originalKey copiedkey') == 1) # Verify that the metrics were updated correctly after copying self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2, 2, DEFAULT_BLOOM_FILTER_CAPACITY * 2) # Perform a FLUSHALL which should set all metrics data to 0 self.client.execute_command('FLUSHALL') - wait_for_equal(lambda: self.client.execute_command('BF.EXISTS key{123} item'), 0) + wait_for_equal(lambda: self.client.execute_command('DBSIZE'), 0) self.verify_bloom_metrics(self.client.execute_command("INFO bf"), 0, 0, 0, 0, 0) def test_save_and_restore_metrics(self): - # Create default bloom filter - assert(self.client.execute_command('BF.ADD testSave item') == 1) + # Create default bloom filter and add one item + self.add_items_till_capacity(self.client, "nonscaledfilter", 1, 1, "item_prefix") - # Create scaled bloom filter + # Create scaled bloom filter and add 7500 items to trigger a scale out. self.client.execute_command('BF.RESERVE key1 0.001 7000') - variables = [f"key{i+1}" for i in range(7500)] - for var in variables: - self.client.execute_command(f'BF.ADD key1 {var}') + self.add_items_till_capacity(self.client, "key1", 7500, 1, "item_prefix") # Get info and metrics stats of bloomfilter before rdb load original_info_obj = self.client.execute_command('BF.INFO key1') diff --git a/tests/test_correctness.py b/tests/test_correctness.py index 391e387..19354f6 100644 --- a/tests/test_correctness.py +++ b/tests/test_correctness.py @@ -80,7 +80,7 @@ def test_scaling_filter(self): total_error_count = 0 add_operation_idx = 0 for filter_idx in range(1, num_filters_to_scale + 1): - expected_total_capacity = initial_capacity * ((expansion ** filter_idx) - 1) + expected_total_capacity = self.calculate_expected_capacity(initial_capacity, expansion, filter_idx) error_count, new_add_operation_idx = self.add_items_till_capacity(client, filter_name, expected_total_capacity, add_operation_idx + 1, item_prefix) add_operation_idx = new_add_operation_idx total_error_count += error_count diff --git a/tests/test_replication.py b/tests/test_replication.py index 36012f2..c34f996 100644 --- a/tests/test_replication.py +++ b/tests/test_replication.py @@ -6,29 +6,26 @@ class TestBloomReplication(ReplicationTestCase): + # Global Parameterized Configs + use_random_seed = 'no' + def get_custom_args(self): self.set_server_version(os.environ['SERVER_VERSION']) return { 'loadmodule': os.getenv('MODULE_PATH'), + 'bf.bloom-use-random-seed': self.use_random_seed, } - def test_replication_success(self): - self.setup_replication(num_replicas=1) - assert self.client.execute_command('BF.ADD key item1') == 1 - bf_exists_result = self.client.execute_command('BF.EXISTS key item1') - bf_non_added_exists_result = self.client.execute_command('BF.EXISTS key item2') - bf_info_result = self.client.execute_command('BF.INFO key') - - self.waitForReplicaToSyncUp(self.replicas[0]) - bf_replica_exists_result = self.replicas[0].client.execute_command('BF.EXISTS key item1') - assert bf_exists_result == bf_replica_exists_result - bf_replica_non_added_exists_result = self.replicas[0].client.execute_command('BF.EXISTS key item2') - assert bf_non_added_exists_result == bf_replica_non_added_exists_result - bf_replica_info_result = self.replicas[0].client.execute_command('BF.INFO key') - assert bf_info_result == bf_replica_info_result + @pytest.fixture(autouse=True) + def use_random_seed_fixture(self, bloom_config_parameterization): + if bloom_config_parameterization == "random-seed": + self.use_random_seed = "yes" + elif bloom_config_parameterization == "fixed-seed": + self.use_random_seed = "no" def test_replication_behavior(self): self.setup_replication(num_replicas=1) + is_random_seed = self.client.execute_command('CONFIG GET bf.bloom-use-random-seed') # Test replication for write commands. bloom_write_cmds = [ ('BF.ADD', 'BF.ADD key item', 'BF.ADD key item1', 2), @@ -72,11 +69,16 @@ def test_replication_behavior(self): # cmd debug digest server_digest_primary = self.client.debug_digest() assert server_digest_primary != None or 0000000000000000000000000000000000000000 - object_digest_primary = self.client.execute_command('DEBUG DIGEST-VALUE key') server_digest_replica = self.client.debug_digest() assert server_digest_primary == server_digest_replica + object_digest_primary = self.client.execute_command('DEBUG DIGEST-VALUE key') debug_digest_replica = self.replicas[0].client.execute_command('DEBUG DIGEST-VALUE key') - assert object_digest_primary == debug_digest_replica + # TODO: Update the test here to validate that digest always matches during replication. Once we implement + # deterministic replication (including replicating seeds), this assert will be updated. + if is_random_seed[1] == b'yes': + assert object_digest_primary != debug_digest_replica + else: + assert object_digest_primary == debug_digest_replica self.client.execute_command('FLUSHALL') self.waitForReplicaToSyncUp(self.replicas[0]) diff --git a/tests/valkey_bloom_test_case.py b/tests/valkey_bloom_test_case.py index 4e42d3e..42cfbe9 100644 --- a/tests/valkey_bloom_test_case.py +++ b/tests/valkey_bloom_test_case.py @@ -7,12 +7,23 @@ class ValkeyBloomTestCaseBase(ValkeyTestCase): + # Global Parameterized Configs + use_random_seed = 'no' + def get_custom_args(self): self.set_server_version(os.environ['SERVER_VERSION']) return { 'loadmodule': os.getenv('MODULE_PATH'), + 'bf.bloom-use-random-seed': self.use_random_seed, } + @pytest.fixture(autouse=True) + def use_random_seed_fixture(self, bloom_config_parameterization): + if bloom_config_parameterization == "random-seed": + self.use_random_seed = "yes" + elif bloom_config_parameterization == "fixed-seed": + self.use_random_seed = "no" + def verify_error_response(self, client, cmd, expected_err_reply): try: client.execute_command(cmd) @@ -143,6 +154,19 @@ def validate_copied_bloom_correctness(self, client, original_filter_name, item_p ) self.fp_assert(error_count, num_operations, expected_fp_rate, fp_margin) + def calculate_expected_capacity(self, initial_capacity, expansion, num_filters): + """ + This function accepts the starting capacity (of the first filter), expansion and number of filters in + the object to calculate the expected total capacity (across all the filters) within the bloom object. + """ + curr_filt_capacity = initial_capacity + total_capacity = curr_filt_capacity + for i in range(2, num_filters + 1): + new_filt_capacity = curr_filt_capacity * expansion + curr_filt_capacity = new_filt_capacity + total_capacity += curr_filt_capacity + return total_capacity + def verify_bloom_metrics(self, info_response, expected_memory, expected_num_objects, expected_num_filters, expected_num_items, expected_sum_capacity): """ Verify the metric values are recorded properly, the expected values are as below diff --git a/tests/valkeytests/valkey_test_case.py b/tests/valkeytests/valkey_test_case.py index 656ae8c..13b3a47 100644 --- a/tests/valkeytests/valkey_test_case.py +++ b/tests/valkeytests/valkey_test_case.py @@ -474,7 +474,7 @@ def port_tracker_fixture(self, resource_port_tracker): self.port_tracker = resource_port_tracker def _get_valkey_args(self): - self.args.update({"maxmemory":self.maxmemory, "maxmemory-policy":"allkeys-random", "activerehashing":"yes", "databases": self.num_dbs, "repl-diskless-sync": "yes", "save": "", "enable-debug-command":"yes"}) + self.args.update({"maxmemory":self.maxmemory, "maxmemory-policy":"allkeys-random", "activerehashing":"yes", "databases": self.num_dbs, "repl-diskless-sync": "yes", "save": "", 'appenddirname': f'aof-{self.port}', "enable-debug-command":"yes"}) self.args.update(self.get_custom_args()) return self.args