Skip to content

Commit

Permalink
Update functionality
Browse files Browse the repository at this point in the history
Signed-off-by: Nihal Mehta <[email protected]>
  • Loading branch information
nnmehta committed Dec 3, 2024
1 parent c6a256c commit 99acf6f
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 45 deletions.
22 changes: 5 additions & 17 deletions src/bloom/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,10 @@ pub fn bloom_filter_add_value(
None => {
// Instantiate empty bloom filter.
let fp_rate = configs::BLOOM_FP_RATE_DEFAULT;
let tightening_ratio = configs::TIGHTENING_RATIO;
let capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed) as u32;
let expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32;
let mut bloom = match BloomFilterType::new_reserved(
fp_rate,
tightening_ratio,
capacity,
expansion,
validate_size_limit,
Expand Down Expand Up @@ -229,17 +227,6 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
return Err(ValkeyError::Str(utils::BAD_ERROR_RATE));
}
};
curr_cmd_idx += 1;
let tightening_ratio = match input_args[curr_cmd_idx].to_string_lossy().parse::<f64>() {
Ok(num) if num > TIGHTENING_RATIO_MIN && num < TIGHTENING_RATIO_MAX => num,
Ok(num) if !(num > TIGHTENING_RATIO_MIN && num < TIGHTENING_RATIO_MAX) => {
return Err(ValkeyError::Str(utils::ERROR_RATIO_RANGE));
}
_ => {
return Err(ValkeyError::Str(utils::BAD_ERROR_RATIO));
}
};
curr_cmd_idx += 1;
// Parse the capacity
let capacity = match input_args[curr_cmd_idx].to_string_lossy().parse::<u32>() {
Ok(num) if (BLOOM_CAPACITY_MIN..=BLOOM_CAPACITY_MAX).contains(&num) => num,
Expand Down Expand Up @@ -290,7 +277,6 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let bloom = match BloomFilterType::new_reserved(
fp_rate,
tightening_ratio,
capacity,
expansion,
validate_size_limit,
Expand All @@ -312,7 +298,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyResult {
let argc = input_args.len();
// At the very least, we need: BF.INSERT <key> ITEMS <item>
if argc < 4 {
if argc < 5 {
return Err(ValkeyError::WrongArity);
}
let mut idx = 1;
Expand Down Expand Up @@ -341,7 +327,10 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
}
};
}
"RATIO" => {
"TIGHTENING" => {
if !ctx.get_flags().contains(ContextFlags::REPLICATED) {
return Err(ValkeyError::Str(utils::ERROR));
}
idx += 1;
tightening_ratio = match input_args[idx].to_string_lossy().parse::<f64>() {
Ok(num) if num > TIGHTENING_RATIO_MIN && num < TIGHTENING_RATIO_MAX => num,
Expand Down Expand Up @@ -431,7 +420,6 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
}
let mut bloom = match BloomFilterType::new_reserved(
fp_rate,
tightening_ratio,
capacity,
expansion,
validate_size_limit,
Expand Down
2 changes: 1 addition & 1 deletion src/bloom/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl ValkeyDataType for BloomFilterType {
fn debug_digest(&self, mut dig: Digest) {
dig.add_long_long(self.expansion.into());
dig.add_string_buffer(&self.fp_rate.to_le_bytes());
// dig.add_string_buffer(&self.tightening_ratio.to_le_bytes());
dig.add_string_buffer(&self.tightening_ratio.to_le_bytes());
for filter in &self.filters {
dig.add_string_buffer(&filter.bloom.bitmap());
for &(key1, key2) in &filter.sip_keys() {
Expand Down
40 changes: 14 additions & 26 deletions src/bloom/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ impl BloomFilterType {
/// Create a new BloomFilterType object.
pub fn new_reserved(
fp_rate: f64,
tightening_ratio: f64,
capacity: u32,
expansion: u32,
validate_size_limit: bool,
Expand All @@ -99,6 +98,7 @@ impl BloomFilterType {
// Create the bloom filter and add to the main BloomFilter object.
let bloom = BloomFilter::new(fp_rate, capacity);
let filters = vec![bloom];
let tightening_ratio = 0.5;
let bloom = BloomFilterType {
expansion,
fp_rate,
Expand Down Expand Up @@ -628,19 +628,13 @@ mod tests {
let rand_prefix = random_prefix(7);
// 1 in every 1000 operations is expected to be a false positive.
let expected_fp_rate: f64 = 0.001;
let expected_tightening_ratio: f64 = 0.5;
let initial_capacity = 10000;
// Expansion of 0 indicates non scaling.
let expansion = 0;
// Validate the non scaling behavior of the bloom filter.
let mut bf = BloomFilterType::new_reserved(
expected_fp_rate,
expected_tightening_ratio,
initial_capacity,
expansion,
true,
)
.expect("Expect bloom creation to succeed");
let mut bf =
BloomFilterType::new_reserved(expected_fp_rate, initial_capacity, expansion, true)
.expect("Expect bloom creation to succeed");
let (error_count, add_operation_idx) =
add_items_till_capacity(&mut bf, initial_capacity as i64, 1, &rand_prefix);
assert_eq!(
Expand Down Expand Up @@ -692,18 +686,12 @@ mod tests {
let rand_prefix = random_prefix(7);
// 1 in every 1000 operations is expected to be a false positive.
let expected_fp_rate: f64 = 0.001;
let expected_tightening_ratio: f64 = 0.5;
let initial_capacity = 10000;
let expansion = 2;
let num_filters_to_scale = 5;
let mut bf = BloomFilterType::new_reserved(
expected_fp_rate,
expected_tightening_ratio,
initial_capacity,
expansion,
true,
)
.expect("Expect bloom creation to succeed");
let mut bf =
BloomFilterType::new_reserved(expected_fp_rate, initial_capacity, expansion, true)
.expect("Expect bloom creation to succeed");
assert_eq!(bf.capacity(), initial_capacity as i64);
assert_eq!(bf.cardinality(), 0);
let mut total_error_count = 0;
Expand Down Expand Up @@ -776,18 +764,18 @@ mod tests {
fn test_exceeded_size_limit() {
// Validate that bloom filter allocations within bloom objects are rejected if their memory usage would be beyond
// the configured limit.
let result = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, u32::MAX, 1, true);
let result = BloomFilterType::new_reserved(0.5_f64, u32::MAX, 1, true);
assert_eq!(result.err(), Some(BloomError::ExceedsMaxBloomSize));
let capacity = 50000000;
assert!(!BloomFilter::validate_size(capacity, 0.001_f64));
let result2 = BloomFilterType::new_reserved(0.001_f64, 0.5_f64, capacity, 1, true);
let result2 = BloomFilterType::new_reserved(0.001_f64, capacity, 1, true);
assert_eq!(result2.err(), Some(BloomError::ExceedsMaxBloomSize));
}

#[test]
fn test_bf_encode_and_decode() {
// arrange: prepare bloom filter
let mut bf = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true).unwrap();
let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap();
let key = "key";
let _ = bf.add_item(key.as_bytes(), true);

Expand Down Expand Up @@ -817,7 +805,7 @@ mod tests {
#[test]
fn test_bf_decode_when_unsupported_version_should_failed() {
// arrange: prepare bloom filter
let mut bf = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true).unwrap();
let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap();
let key = "key";
let _ = bf.add_item(key.as_bytes(), true).unwrap();

Expand All @@ -839,7 +827,7 @@ mod tests {
#[test]
fn test_bf_decode_when_bytes_is_empty_should_failed() {
// arrange: prepare bloom filter
let mut bf = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true).unwrap();
let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap();
let key = "key";
let _ = bf.add_item(key.as_bytes(), true);

Expand All @@ -859,7 +847,7 @@ mod tests {
#[test]
fn test_bf_decode_when_bytes_is_exceed_limit_should_failed() {
// arrange: prepare bloom filter
let mut bf = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true).unwrap();
let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap();
let key = "key";
let _ = bf.add_item(key.as_bytes(), true);
let origin_expansion = bf.expansion;
Expand Down Expand Up @@ -901,7 +889,7 @@ mod tests {

// 3. build a larger than 64mb filter
let extra_large_filter =
BloomFilterType::new_reserved(0.01_f64, 0.5_f64, 57000000, 2, false).unwrap();
BloomFilterType::new_reserved(0.01_f64, 57000000, 2, false).unwrap();
let vec = extra_large_filter.encode_bloom_filter().unwrap();
// should return error
assert_eq!(
Expand Down
2 changes: 1 addition & 1 deletion src/configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ lazy_static! {
// Tightening ratio used during scale out for the calculation of fp_rate of every new filter within a bloom object to
// maintain the bloom object's overall fp_rate to the configured value.
pub const TIGHTENING_RATIO: f64 = 0.5;
pub const TIGHTENING_RATIO_MIN: f64 = 0.001;
pub const TIGHTENING_RATIO_MIN: f64 = 0.0;
pub const TIGHTENING_RATIO_MAX: f64 = 1.0;
// Max number of filters allowed within a bloom object.
pub const MAX_FILTERS_PER_OBJ: i32 = i32::MAX;
Expand Down
1 change: 1 addition & 0 deletions src/wrapper/bloom_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub unsafe extern "C" fn bloom_rdb_save(rdb: *mut raw::RedisModuleIO, value: *mu
raw::save_unsigned(rdb, v.filters.len() as u64);
raw::save_unsigned(rdb, v.expansion as u64);
raw::save_double(rdb, v.fp_rate);
raw::save_double(rdb, v.tightening_ratio);
let filter_list = &v.filters;
let mut filter_list_iter = filter_list.iter().peekable();
while let Some(filter) = filter_list_iter.next() {
Expand Down

0 comments on commit 99acf6f

Please sign in to comment.