Skip to content

Commit

Permalink
Adjust callback and tests to accomodate tightening_ratio changes
Browse files Browse the repository at this point in the history
Signed-off-by: Nihal Mehta <[email protected]>
  • Loading branch information
nnmehta committed Dec 4, 2024
1 parent 2466157 commit 1fb640c
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 17 deletions.
11 changes: 9 additions & 2 deletions src/bloom/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,12 @@ pub fn bloom_filter_add_value(
None => {
// Instantiate empty bloom filter.
let fp_rate = configs::BLOOM_FP_RATE_DEFAULT;
let tightening_ratio = configs::TIGHTENING_RATIO;
let capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed) as u32;
let expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32;
let mut bloom = match BloomFilterType::new_reserved(
fp_rate,
tightening_ratio,
capacity,
expansion,
validate_size_limit,
Expand Down Expand Up @@ -227,6 +229,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
return Err(ValkeyError::Str(utils::BAD_ERROR_RATE));
}
};
curr_cmd_idx += 1;
// Parse the capacity
let capacity = match input_args[curr_cmd_idx].to_string_lossy().parse::<u32>() {
Ok(num) if (BLOOM_CAPACITY_MIN..=BLOOM_CAPACITY_MAX).contains(&num) => num,
Expand Down Expand Up @@ -275,8 +278,10 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
None => {
// Skip bloom filter size validation on replicated cmds.
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let tightening_ratio = configs::TIGHTENING_RATIO;
let bloom = match BloomFilterType::new_reserved(
fp_rate,
tightening_ratio,
capacity,
expansion,
validate_size_limit,
Expand All @@ -297,8 +302,9 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke

pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyResult {
let argc = input_args.len();
let validate_size_limit = ctx.get_flags().contains(ContextFlags::REPLICATED);
// At the very least, we need: BF.INSERT <key> ITEMS <item>
if argc < 5 {
if !validate_size_limit && argc < 4 || validate_size_limit && argc < 5 {
return Err(ValkeyError::WrongArity);
}
let mut idx = 1;
Expand Down Expand Up @@ -328,7 +334,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
};
}
"TIGHTENING" => {
if !ctx.get_flags().contains(ContextFlags::REPLICATED) {
if !validate_size_limit {
return Err(ValkeyError::Str(utils::ERROR));
}
idx += 1;
Expand Down Expand Up @@ -420,6 +426,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
}
let mut bloom = match BloomFilterType::new_reserved(
fp_rate,
tightening_ratio,
capacity,
expansion,
validate_size_limit,
Expand Down
3 changes: 3 additions & 0 deletions src/bloom/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ impl ValkeyDataType for BloomFilterType {
let Ok(fp_rate) = raw::load_double(rdb) else {
return None;
};
let Ok(tightening_ratio) = raw::load_double(rdb) else {
return None;
};
let mut filters: Vec<BloomFilter> = Vec::with_capacity(num_filters as usize);

let Ok(tightening_ratio) = raw::load_double(rdb) else {
Expand Down
47 changes: 34 additions & 13 deletions src/bloom/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ impl BloomFilterType {
/// Create a new BloomFilterType object.
pub fn new_reserved(
fp_rate: f64,
tightening_ratio: f64,
capacity: u32,
expansion: u32,
validate_size_limit: bool,
Expand Down Expand Up @@ -535,6 +536,14 @@ mod tests {
fp_margin: f64,
rand_prefix: &String,
) {
assert_eq!(
restored_bloom_filter_type.fp_rate,
original_bloom_filter_type.fp_rate
);
assert_eq!(
restored_bloom_filter_type.tightening_ratio,
original_bloom_filter_type.tightening_ratio
);
assert_eq!(
restored_bloom_filter_type.capacity(),
original_bloom_filter_type.capacity()
Expand Down Expand Up @@ -599,13 +608,19 @@ mod tests {
let rand_prefix = random_prefix(7);
// 1 in every 1000 operations is expected to be a false positive.
let expected_fp_rate: f64 = 0.001;
let expected_tightening_ratio: f64 = 0.5;
let initial_capacity = 10000;
// Expansion of 0 indicates non scaling.
let expansion = 0;
// Validate the non scaling behavior of the bloom filter.
let mut bf =
BloomFilterType::new_reserved(expected_fp_rate, initial_capacity, expansion, true)
.expect("Expect bloom creation to succeed");
let mut bf = BloomFilterType::new_reserved(
expected_fp_rate,
expected_tightening_ratio,
initial_capacity,
expansion,
true,
)
.expect("Expect bloom creation to succeed");
let (error_count, add_operation_idx) =
add_items_till_capacity(&mut bf, initial_capacity as i64, 1, &rand_prefix);
assert_eq!(
Expand Down Expand Up @@ -657,12 +672,18 @@ mod tests {
let rand_prefix = random_prefix(7);
// 1 in every 1000 operations is expected to be a false positive.
let expected_fp_rate: f64 = 0.001;
let expected_tightening_ratio: f64 = 0.5;
let initial_capacity = 10000;
let expansion = 2;
let num_filters_to_scale = 5;
let mut bf =
BloomFilterType::new_reserved(expected_fp_rate, initial_capacity, expansion, true)
.expect("Expect bloom creation to succeed");
let mut bf = BloomFilterType::new_reserved(
expected_fp_rate,
expected_tightening_ratio,
initial_capacity,
expansion,
true,
)
.expect("Expect bloom creation to succeed");
assert_eq!(bf.capacity(), initial_capacity as i64);
assert_eq!(bf.cardinality(), 0);
let mut total_error_count = 0;
Expand Down Expand Up @@ -732,18 +753,18 @@ mod tests {
fn test_exceeded_size_limit() {
// Validate that bloom filter allocations within bloom objects are rejected if their memory usage would be beyond
// the configured limit.
let result = BloomFilterType::new_reserved(0.5_f64, u32::MAX, 1, true);
let result = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, u32::MAX, 1, true);
assert_eq!(result.err(), Some(BloomError::ExceedsMaxBloomSize));
let capacity = 50000000;
assert!(!BloomFilter::validate_size(capacity, 0.001_f64));
let result2 = BloomFilterType::new_reserved(0.001_f64, capacity, 1, true);
let result2 = BloomFilterType::new_reserved(0.001_f64, 0.5_f64, capacity, 1, true);
assert_eq!(result2.err(), Some(BloomError::ExceedsMaxBloomSize));
}

#[test]
fn test_bf_encode_and_decode() {
// arrange: prepare bloom filter
let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap();
let mut bf = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true).unwrap();
let key = "key";
let _ = bf.add_item(key.as_bytes(), true);

Expand Down Expand Up @@ -773,7 +794,7 @@ mod tests {
#[test]
fn test_bf_decode_when_unsupported_version_should_failed() {
// arrange: prepare bloom filter
let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap();
let mut bf = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true).unwrap();
let key = "key";
let _ = bf.add_item(key.as_bytes(), true).unwrap();

Expand All @@ -795,7 +816,7 @@ mod tests {
#[test]
fn test_bf_decode_when_bytes_is_empty_should_failed() {
// arrange: prepare bloom filter
let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap();
let mut bf = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true).unwrap();
let key = "key";
let _ = bf.add_item(key.as_bytes(), true);

Expand All @@ -815,7 +836,7 @@ mod tests {
#[test]
fn test_bf_decode_when_bytes_is_exceed_limit_should_failed() {
// arrange: prepare bloom filter
let mut bf = BloomFilterType::new_reserved(0.5_f64, 1000_u32, 2, true).unwrap();
let mut bf = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true).unwrap();
let key = "key";
let _ = bf.add_item(key.as_bytes(), true);
let origin_expansion = bf.expansion;
Expand Down Expand Up @@ -857,7 +878,7 @@ mod tests {

// 3. build a larger than 64mb filter
let extra_large_filter =
BloomFilterType::new_reserved(0.01_f64, 57000000, 2, false).unwrap();
BloomFilterType::new_reserved(0.01_f64, 0.5_f64, 57000000, 2, false).unwrap();
let vec = extra_large_filter.encode_bloom_filter().unwrap();
// should return error
assert_eq!(
Expand Down
2 changes: 1 addition & 1 deletion tests/test_bloom_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from valkeytests.conftest import resource_port_tracker
from util.waiters import *

DEFAULT_BLOOM_FILTER_SIZE = 179952
DEFAULT_BLOOM_FILTER_SIZE = 179960
DEFAULT_BLOOM_FILTER_CAPACITY = 100000
class TestBloomMetrics(ValkeyBloomTestCaseBase):

Expand Down
2 changes: 1 addition & 1 deletion tests/test_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_replication_behavior(self):
('BF.ADD', 'BF.ADD key item', 'BF.ADD key item1', 2),
('BF.MADD', 'BF.MADD key item', 'BF.MADD key item1', 2),
('BF.RESERVE', 'BF.RESERVE key 0.001 100000', 'BF.ADD key item1', 1),
('BF.INSERT', 'BF.INSERT key items item', 'BF.INSERT key items item1', 2),
('BF.INSERT', 'BF.INSERT key items item tightening 0.5', 'BF.INSERT key items item1 tightening 0.5', 2),
]
for test_case in bloom_write_cmds:
prefix = test_case[0]
Expand Down

0 comments on commit 1fb640c

Please sign in to comment.