Skip to content

Commit

Permalink
Update default configuration and related test
Browse files Browse the repository at this point in the history
Signed-off-by: VanessaTang <[email protected]>
  • Loading branch information
YueTang-Vanessa committed Dec 6, 2024
1 parent 1b0d819 commit 5789496
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 66 deletions.
8 changes: 4 additions & 4 deletions src/bloom/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ pub fn bloom_filter_add_value(
None => {
// Instantiate empty bloom filter.
let fp_rate = configs::BLOOM_FP_RATE_DEFAULT;
let capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed) as u32;
let capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed);
let expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32;
let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed);
let mut bloom = match BloomFilterType::new_reserved(
Expand Down Expand Up @@ -231,7 +231,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
};
curr_cmd_idx += 1;
// Parse the capacity
let capacity = match input_args[curr_cmd_idx].to_string_lossy().parse::<u32>() {
let capacity = match input_args[curr_cmd_idx].to_string_lossy().parse::<i64>() {
Ok(num) if (BLOOM_CAPACITY_MIN..=BLOOM_CAPACITY_MAX).contains(&num) => num,
Ok(0) => {
return Err(ValkeyError::Str(utils::CAPACITY_LARGER_THAN_0));
Expand Down Expand Up @@ -311,7 +311,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
let filter_name = &input_args[idx];
idx += 1;
let mut fp_rate = configs::BLOOM_FP_RATE_DEFAULT;
let mut capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed) as u32;
let mut capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed);
let mut expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32;
let mut nocreate = false;
while idx < argc {
Expand All @@ -336,7 +336,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
return Err(ValkeyError::WrongArity);
}
idx += 1;
capacity = match input_args[idx].to_string_lossy().parse::<u32>() {
capacity = match input_args[idx].to_string_lossy().parse::<i64>() {
Ok(num) if (BLOOM_CAPACITY_MIN..=BLOOM_CAPACITY_MAX).contains(&num) => num,
Ok(0) => {
return Err(ValkeyError::Str(utils::CAPACITY_LARGER_THAN_0));
Expand Down
8 changes: 4 additions & 4 deletions src/bloom/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl ValkeyDataType for BloomFilterType {
return None;
}
};
if !BloomFilter::validate_size(capacity as u32, new_fp_rate) {
if !BloomFilter::validate_size(capacity as i64, new_fp_rate) {
logging::log_warning("Failed to restore bloom object: Contains a filter larger than the max allowed size limit.");
return None;
}
Expand All @@ -106,7 +106,7 @@ impl ValkeyDataType for BloomFilterType {
capacity
};
let filter =
BloomFilter::from_existing(bitmap.as_ref(), num_items as u32, capacity as u32);
BloomFilter::from_existing(bitmap.as_ref(), num_items as i64, capacity as i64);
if !is_seed_random && filter.seed() != configs::FIXED_SEED {
logging::log_warning("Failed to restore bloom object: Object in fixed seed mode, but seed does not match FIXED_SEED.");
return None;
Expand All @@ -133,8 +133,8 @@ impl ValkeyDataType for BloomFilterType {
dig.add_string_buffer(&self.fp_rate.to_le_bytes());
for filter in &self.filters {
dig.add_string_buffer(filter.bloom.as_slice());
dig.add_long_long(filter.num_items.into());
dig.add_long_long(filter.capacity.into());
dig.add_long_long(filter.num_items);
dig.add_long_long(filter.capacity);
}
dig.end_sequence();
}
Expand Down
78 changes: 34 additions & 44 deletions src/bloom/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl BloomFilterType {
/// Create a new BloomFilterType object.
pub fn new_reserved(
fp_rate: f64,
capacity: u32,
capacity: i64,
expansion: u32,
use_random_seed: bool,
validate_size_limit: bool,
Expand Down Expand Up @@ -151,7 +151,7 @@ impl BloomFilterType {
pub fn cardinality(&self) -> i64 {
let mut cardinality: i64 = 0;
for filter in &self.filters {
cardinality += filter.num_items as i64;
cardinality += filter.num_items;
}
cardinality
}
Expand All @@ -161,7 +161,7 @@ impl BloomFilterType {
let mut capacity: i64 = 0;
// Check if item exists already.
for filter in &self.filters {
capacity += filter.capacity as i64;
capacity += filter.capacity;
}
capacity
}
Expand Down Expand Up @@ -205,7 +205,7 @@ impl BloomFilterType {
Ok(rate) => rate,
Err(e) => return Err(e),
};
let new_capacity = match filter.capacity.checked_mul(self.expansion) {
let new_capacity = match filter.capacity.checked_mul(self.expansion.into()) {
Some(new_capacity) => new_capacity,
None => {
// u32:max cannot be reached with 64MB memory usage limit per filter even with a high fp rate (e.g. 0.9).
Expand Down Expand Up @@ -321,11 +321,11 @@ impl BloomFilterType {
std::sync::atomic::Ordering::Relaxed,
);
metrics::BLOOM_NUM_ITEMS_ACROSS_OBJECTS.fetch_add(
filter.num_items.into(),
filter.num_items as u64,
std::sync::atomic::Ordering::Relaxed,
);
metrics::BLOOM_CAPACITY_ACROSS_OBJECTS
.fetch_add(filter.capacity.into(), std::sync::atomic::Ordering::Relaxed);
.fetch_add(filter.capacity as u64, std::sync::atomic::Ordering::Relaxed);
}
Ok(item)
}
Expand All @@ -344,13 +344,13 @@ impl BloomFilterType {
pub struct BloomFilter {
#[serde(serialize_with = "serialize", deserialize_with = "deserialize")]
pub bloom: bloomfilter::Bloom<[u8]>,
pub num_items: u32,
pub capacity: u32,
pub num_items: i64,
pub capacity: i64,
}

impl BloomFilter {
/// Instantiate empty BloomFilter object with a fixed seed used to create sip keys.
pub fn with_fixed_seed(fp_rate: f64, capacity: u32, fixed_seed: &[u8; 32]) -> BloomFilter {
pub fn with_fixed_seed(fp_rate: f64, capacity: i64, fixed_seed: &[u8; 32]) -> BloomFilter {
let bloom =
bloomfilter::Bloom::new_for_fp_rate_with_seed(capacity as usize, fp_rate, fixed_seed)
.expect("We expect bloomfilter::Bloom<[u8]> creation to succeed");
Expand All @@ -364,7 +364,7 @@ impl BloomFilter {
}

/// Instantiate empty BloomFilter object with a randomly generated seed used to create sip keys.
pub fn with_random_seed(fp_rate: f64, capacity: u32) -> BloomFilter {
pub fn with_random_seed(fp_rate: f64, capacity: i64) -> BloomFilter {
let bloom = bloomfilter::Bloom::new_for_fp_rate(capacity as usize, fp_rate)
.expect("We expect bloomfilter::Bloom<[u8]> creation to succeed");
let fltr = BloomFilter {
Expand All @@ -377,7 +377,7 @@ impl BloomFilter {
}

/// Create a new BloomFilter from dumped information (RDB load).
pub fn from_existing(bitmap: &[u8], num_items: u32, capacity: u32) -> BloomFilter {
pub fn from_existing(bitmap: &[u8], num_items: i64, capacity: i64) -> BloomFilter {
let bloom = bloomfilter::Bloom::from_slice(bitmap)
.expect("We expect bloomfilter::Bloom<[u8]> creation to succeed");

Expand All @@ -388,7 +388,7 @@ impl BloomFilter {
};
fltr.incr_metrics_on_new_create();
metrics::BLOOM_NUM_ITEMS_ACROSS_OBJECTS
.fetch_add(num_items.into(), std::sync::atomic::Ordering::Relaxed);
.fetch_add(num_items as u64, std::sync::atomic::Ordering::Relaxed);
fltr
}

Expand All @@ -403,7 +403,7 @@ impl BloomFilter {
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES
.fetch_add(self.number_of_bytes(), std::sync::atomic::Ordering::Relaxed);
metrics::BLOOM_CAPACITY_ACROSS_OBJECTS
.fetch_add(self.capacity.into(), std::sync::atomic::Ordering::Relaxed);
.fetch_add(self.capacity as u64, std::sync::atomic::Ordering::Relaxed);
}

/// Return the seed used by the sip hasher of the raw bloom.
Expand All @@ -418,7 +418,7 @@ impl BloomFilter {
/// Caculates the number of bytes that the bloom filter will require to be allocated.
/// This is used before actually creating the bloom filter when checking if the filter is within the allowed size.
/// Returns whether the bloom filter is of a valid size or not.
pub fn validate_size(capacity: u32, fp_rate: f64) -> bool {
pub fn validate_size(capacity: i64, fp_rate: f64) -> bool {
let bytes = bloomfilter::Bloom::<[u8]>::compute_bitmap_size(capacity as usize, fp_rate)
+ std::mem::size_of::<BloomFilter>();
if bytes > configs::BLOOM_MEMORY_LIMIT_PER_FILTER.load(Ordering::Relaxed) as usize {
Expand Down Expand Up @@ -452,9 +452,9 @@ impl Drop for BloomFilter {
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES
.fetch_sub(self.number_of_bytes(), std::sync::atomic::Ordering::Relaxed);
metrics::BLOOM_NUM_ITEMS_ACROSS_OBJECTS
.fetch_sub(self.num_items.into(), std::sync::atomic::Ordering::Relaxed);
.fetch_sub(self.num_items as u64, std::sync::atomic::Ordering::Relaxed);
metrics::BLOOM_CAPACITY_ACROSS_OBJECTS
.fetch_sub(self.capacity.into(), std::sync::atomic::Ordering::Relaxed);
.fetch_sub(self.capacity as u64, std::sync::atomic::Ordering::Relaxed);
}
}

Expand Down Expand Up @@ -646,13 +646,13 @@ mod tests {
)
.expect("Expect bloom creation to succeed");
let (error_count, add_operation_idx) =
add_items_till_capacity(&mut bf, initial_capacity as i64, 1, &rand_prefix);
add_items_till_capacity(&mut bf, initial_capacity, 1, &rand_prefix);
assert_eq!(
bf.add_item(b"new_item", true),
Err(BloomError::NonScalingFilterFull)
);
assert_eq!(bf.capacity(), initial_capacity as i64);
assert_eq!(bf.cardinality(), initial_capacity as i64);
assert_eq!(bf.capacity(), initial_capacity);
assert_eq!(bf.cardinality(), initial_capacity);
let expected_free_effort = 1;
assert_eq!(bf.free_effort(), expected_free_effort);
assert!(bf.memory_usage() > 0);
Expand Down Expand Up @@ -706,23 +706,24 @@ mod tests {
true,
)
.expect("Expect bloom creation to succeed");
assert_eq!(bf.capacity(), initial_capacity as i64);
assert_eq!(bf.capacity(), initial_capacity);
assert_eq!(bf.cardinality(), 0);
let mut total_error_count = 0;
let mut add_operation_idx = 0;
// Validate the scaling behavior of the bloom filter.
for filter_idx in 1..=num_filters_to_scale {
let expected_total_capacity = initial_capacity * (expansion.pow(filter_idx) - 1);
let filter_expansion: i64 = (expansion.pow(filter_idx) - 1).into();
let expected_total_capacity = initial_capacity * filter_expansion;
let (error_count, new_add_operation_idx) = add_items_till_capacity(
&mut bf,
expected_total_capacity as i64,
expected_total_capacity,
add_operation_idx + 1,
&rand_prefix,
);
add_operation_idx = new_add_operation_idx;
total_error_count += error_count;
assert_eq!(bf.capacity(), expected_total_capacity as i64);
assert_eq!(bf.cardinality(), expected_total_capacity as i64);
assert_eq!(bf.capacity(), expected_total_capacity);
assert_eq!(bf.cardinality(), expected_total_capacity);
let expected_free_effort = filter_idx as usize;
assert_eq!(bf.free_effort(), expected_free_effort);
assert!(bf.memory_usage() > 0);
Expand Down Expand Up @@ -766,11 +767,11 @@ mod tests {
fn test_seed() {
// When using the with_fixed_seed API, the sip keys generated should be equal to the constants from configs.rs
let test_bloom_filter1 =
BloomFilter::with_fixed_seed(0.5_f64, 1000_u32, &configs::FIXED_SEED);
BloomFilter::with_fixed_seed(0.5_f64, 1000_i64, &configs::FIXED_SEED);
let test_seed1 = test_bloom_filter1.seed();
assert_eq!(test_seed1, configs::FIXED_SEED);
// When using the with_random_seed API, the sip keys generated should not be equal to the constant sip_keys.
let test_bloom_filter2 = BloomFilter::with_random_seed(0.5_f64, 1000_u32);
let test_bloom_filter2 = BloomFilter::with_random_seed(0.5_f64, 1000_i64);
let test_seed2 = test_bloom_filter2.seed();
assert_ne!(test_seed2, configs::FIXED_SEED);
}
Expand All @@ -779,7 +780,7 @@ mod tests {
fn test_exceeded_size_limit() {
// Validate that bloom filter allocations within bloom objects are rejected if their memory usage would be beyond
// the configured limit.
let result = BloomFilterType::new_reserved(0.5_f64, u32::MAX, 1, true, true);
let result = BloomFilterType::new_reserved(0.5_f64, i64::MAX, 1, true, true);
assert_eq!(result.err(), Some(BloomError::ExceedsMaxBloomSize));
let capacity = 50000000;
assert!(!BloomFilter::validate_size(capacity, 0.001_f64));
Expand All @@ -790,7 +791,7 @@ mod tests {
#[rstest(expansion, case::nonscaling(0), case::scaling(2))]
fn test_bf_encode_and_decode(expansion: u32) {
let mut bf =
BloomFilterType::new_reserved(0.5_f64, 1000_u32, expansion, true, true).unwrap();
BloomFilterType::new_reserved(0.5_f64, 1000_i64, expansion, true, true).unwrap();
let item = "item1";
let _ = bf.add_item(item.as_bytes(), true);
// action
Expand All @@ -812,7 +813,7 @@ mod tests {
#[test]
fn test_bf_decode_when_unsupported_version_should_failed() {
// arrange: prepare bloom filter
let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true, true).unwrap();
let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_i64, 2, true, true).unwrap();
let key = "key";
let _ = bf.add_item(key.as_bytes(), true).unwrap();

Expand All @@ -834,7 +835,7 @@ mod tests {
#[test]
fn test_bf_decode_when_bytes_is_empty_should_failed() {
// arrange: prepare bloom filter
let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true, true).unwrap();
let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_i64, 2, true, true).unwrap();
let key = "key";
let _ = bf.add_item(key.as_bytes(), true);

Expand All @@ -854,23 +855,12 @@ mod tests {
#[test]
fn test_bf_decode_when_bytes_is_exceed_limit_should_failed() {
// arrange: prepare bloom filter
let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true, true).unwrap();
let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_i64, 2, true, true).unwrap();
let key = "key";
let _ = bf.add_item(key.as_bytes(), true);
let origin_expansion = bf.expansion;
let origin_fp_rate = bf.fp_rate;
// 1. Exceeded the maximum expansion
bf.expansion = BLOOM_EXPANSION_MAX + 1;

let vec = bf.encode_bloom_filter().unwrap();
assert_eq!(
BloomFilterType::decode_bloom_filter(&vec, true).err(),
Some(BloomError::BadExpansion)
);
// recover
bf.expansion = origin_expansion;

// 2. unsupport fp_rate
// unsupport fp_rate
bf.fp_rate = -0.5;
let vec = bf.encode_bloom_filter().unwrap();
// should return error
Expand All @@ -880,7 +870,7 @@ mod tests {
);
bf.fp_rate = origin_fp_rate;

// 3. build a larger than 64mb filter
// build a larger than 64mb filter
let extra_large_filter =
BloomFilterType::new_reserved(0.01_f64, 57000000, 2, true, false).unwrap();
let vec = extra_large_filter.encode_bloom_filter().unwrap();
Expand Down
10 changes: 5 additions & 5 deletions src/configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicI64;

/// Configurations
pub const BLOOM_CAPACITY_DEFAULT: i64 = 100000;
pub const BLOOM_CAPACITY_MIN: u32 = 1;
pub const BLOOM_CAPACITY_MAX: u32 = u32::MAX;
pub const BLOOM_CAPACITY_DEFAULT: i64 = 100;
pub const BLOOM_CAPACITY_MIN: i64 = 1;
pub const BLOOM_CAPACITY_MAX: i64 = i64::MAX;

pub const BLOOM_EXPANSION_DEFAULT: i64 = 2;
pub const BLOOM_EXPANSION_MIN: u32 = 1;
pub const BLOOM_EXPANSION_MAX: u32 = 10;
pub const BLOOM_EXPANSION_MAX: u32 = u32::MAX;

pub const BLOOM_FP_RATE_DEFAULT: f64 = 0.001;
pub const BLOOM_FP_RATE_DEFAULT: f64 = 0.01;
pub const BLOOM_FP_RATE_MIN: f64 = 0.0;
pub const BLOOM_FP_RATE_MAX: f64 = 1.0;

Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ valkey_module! {
],
configurations: [
i64: [
["bloom-capacity", &*configs::BLOOM_CAPACITY, configs::BLOOM_CAPACITY_DEFAULT, configs::BLOOM_CAPACITY_MIN as i64, configs::BLOOM_CAPACITY_MAX as i64, ConfigurationFlags::DEFAULT, None],
["bloom-capacity", &*configs::BLOOM_CAPACITY, configs::BLOOM_CAPACITY_DEFAULT, configs::BLOOM_CAPACITY_MIN, configs::BLOOM_CAPACITY_MAX, ConfigurationFlags::DEFAULT, None],
["bloom-expansion", &*configs::BLOOM_EXPANSION, configs::BLOOM_EXPANSION_DEFAULT, configs::BLOOM_EXPANSION_MIN as i64, configs::BLOOM_EXPANSION_MAX as i64, ConfigurationFlags::DEFAULT, None],
["bloom-memory-limit-per-filter", &*configs::BLOOM_MEMORY_LIMIT_PER_FILTER, configs::BLOOM_MEMORY_LIMIT_PER_FILTER_DEFAULT, configs::BLOOM_MEMORY_LIMIT_PER_FILTER_MIN, configs::BLOOM_MEMORY_LIMIT_PER_FILTER_MAX, ConfigurationFlags::DEFAULT, None],
],
Expand Down
5 changes: 3 additions & 2 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ def test_memory_usage_cmd(self):

def test_too_large_bloom_obj(self):
client = self.server.get_new_client()
# Set the max allowed size per bloom filter per bloom object to be 1000 bytes.
assert client.execute_command('CONFIG SET bf.bloom-memory-limit-per-filter 1000') == b'OK'
# Set the max allowed size per bloom filter per bloom object to be 300 bytes.
assert client.execute_command('CONFIG SET bf.bloom-memory-limit-per-filter 300') == b'OK'
obj_exceeds_size_err = "operation results in filter allocation exceeding size limit"
# Non Scaling
# Validate that when a cmd would have resulted in a bloom object creation with the starting filter with size
Expand All @@ -79,6 +79,7 @@ def test_too_large_bloom_obj(self):
# Scaling
# Validate that when scaling would have resulted in a filter with size greater than allowed limit, the cmd
# is rejected.
assert client.execute_command('CONFIG SET bf.bloom-memory-limit-per-filter 1000') == b'OK'
cmds = [
'BF.INSERT filter items new_item1',
'BF.ADD filter new_item1',
Expand Down
Loading

0 comments on commit 5789496

Please sign in to comment.