Skip to content

Commit

Permalink
Updating how we create BloomFilter from rdb loads and upgrading bloom…
Browse files Browse the repository at this point in the history
…filter dependency to version 3.0 (valkey-io#23)

* Updating how we create BloomFilter from rdb loads. BloomFilter vec now has capacity of filter we are loading from

Signed-off-by: zackcam <[email protected]>

* Updating bloomfilter dependency to version 3, fixing breaking changes as well

Signed-off-by: zackcam <[email protected]>

* Updating the digest changes to follow updated version of bloom. As well as removing unnecesary fields saved in rdb

Signed-off-by: zackcam <[email protected]>

* Update log in src/bloom/data_type.rs

Signed-off-by: KarthikSubbarao <[email protected]>

* Update comment in src/bloom/utils.rs

Signed-off-by: KarthikSubbarao <[email protected]>

* Clippy error in src/bloom/data_type.rs

Signed-off-by: KarthikSubbarao <[email protected]>

---------

Signed-off-by: zackcam <[email protected]>
Signed-off-by: KarthikSubbarao <[email protected]>
Co-authored-by: KarthikSubbarao <[email protected]>
  • Loading branch information
2 people authored and nnmehta committed Dec 4, 2024
1 parent 078534f commit 0218610
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 100 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ homepage = "https://github.com/valkey-io/valkey-bloom"
valkey-module = "0.1.2"
valkey-module-macros = "0"
linkme = "0"
bloomfilter = { version = "1.0.13", features = ["serde"] }
bloomfilter = { version = "3.0.1", features = ["serde"] }
lazy_static = "1.4.0"
libc = "0.2"
serde = { version = "1.0", features = ["derive"] }
Expand Down
49 changes: 16 additions & 33 deletions src/bloom/data_type.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use crate::bloom::utils::BloomFilter;
use crate::bloom::utils::BloomFilterType;
use crate::configs::{
FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B, FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B,
};
use crate::metrics::BLOOM_NUM_OBJECTS;
use crate::metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES;
use crate::wrapper::bloom_callback;
Expand Down Expand Up @@ -61,7 +58,6 @@ pub trait ValkeyDataType {
impl ValkeyDataType for BloomFilterType {
/// Callback to load and parse RDB data of a bloom item and create it.
fn load_from_rdb(rdb: *mut raw::RedisModuleIO, encver: i32) -> Option<BloomFilterType> {
let mut filters = Vec::new();
if encver > BLOOM_FILTER_TYPE_ENCODING_VERSION {
logging::log_warning(format!("{}: Cannot load bloomfltr data type of version {} because it is higher than the loaded module's bloomfltr supported version {}", MODULE_NAME, encver, BLOOM_FILTER_TYPE_ENCODING_VERSION).as_str());
return None;
Expand All @@ -75,27 +71,28 @@ impl ValkeyDataType for BloomFilterType {
let Ok(fp_rate) = raw::load_double(rdb) else {
return None;
};
let Ok(tightening_ratio) = raw::load_double(rdb) else {
return None;
};
let mut filters: Vec<BloomFilter> = Vec::with_capacity(num_filters as usize);

for i in 0..num_filters {
let Ok(bitmap) = raw::load_string_buffer(rdb) else {
return None;
};
let Ok(number_of_bits) = raw::load_unsigned(rdb) else {
let Ok(capacity) = raw::load_unsigned(rdb) else {
return None;
};
// Reject RDB Load if any bloom filter within a bloom object of a size greater than what is allowed.
if !BloomFilter::validate_size_with_bits(number_of_bits) {
let new_fp_rate = match Self::calculate_fp_rate(fp_rate, num_filters as i32) {
Ok(rate) => rate,
Err(_) => {
logging::log_warning(
"Failed to restore bloom object: Reached max number of filters",
);
return None;
}
};
if !BloomFilter::validate_size(capacity as u32, new_fp_rate) {
logging::log_warning("Failed to restore bloom object because it contains a filter larger than the max allowed size limit.");
return None;
}
let Ok(number_of_hash_functions) = raw::load_unsigned(rdb) else {
return None;
};
let Ok(capacity) = raw::load_unsigned(rdb) else {
return None;
};
// Only load num_items when it's the last filter
let num_items = if i == num_filters - 1 {
match raw::load_unsigned(rdb) {
Expand All @@ -105,18 +102,8 @@ impl ValkeyDataType for BloomFilterType {
} else {
capacity
};
let sip_keys = [
(FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B),
(FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B),
];
let filter = BloomFilter::from_existing(
bitmap.as_ref(),
number_of_bits,
number_of_hash_functions as u32,
sip_keys,
num_items as u32,
capacity as u32,
);
let filter =
BloomFilter::from_existing(bitmap.as_ref(), num_items as u32, capacity as u32);
filters.push(filter);
}
BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
Expand All @@ -139,11 +126,7 @@ impl ValkeyDataType for BloomFilterType {
dig.add_string_buffer(&self.fp_rate.to_le_bytes());
dig.add_string_buffer(&self.tightening_ratio.to_le_bytes());
for filter in &self.filters {
dig.add_string_buffer(&filter.bloom.bitmap());
for &(key1, key2) in &filter.sip_keys() {
dig.add_long_long(key1 as i64);
dig.add_long_long(key2 as i64);
}
dig.add_string_buffer(filter.bloom.as_slice());
dig.add_long_long(filter.num_items.into());
dig.add_long_long(filter.capacity.into());
}
Expand Down
92 changes: 30 additions & 62 deletions src/bloom/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ use crate::{
metrics,
};
use bloomfilter;
use bloomfilter::{deserialize, serialize};
use serde::{Deserialize, Serialize};
use std::{mem, sync::atomic::Ordering};

use super::data_type::BLOOM_TYPE_VERSION;
use std::{mem, sync::atomic::Ordering};

/// KeySpace Notification Events
pub const ADD_EVENT: &str = "bloom.add";
Expand Down Expand Up @@ -110,7 +111,7 @@ impl BloomFilterType {

/// Create a new BloomFilterType object from an existing one.
pub fn create_copy_from(from_bf: &BloomFilterType) -> BloomFilterType {
let mut filters = Vec::new();
let mut filters: Vec<BloomFilter> = Vec::with_capacity(from_bf.filters.len());
metrics::BLOOM_NUM_OBJECTS.fetch_add(1, Ordering::Relaxed);
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
mem::size_of::<BloomFilterType>(),
Expand Down Expand Up @@ -195,11 +196,9 @@ impl BloomFilterType {
}
// Scale out by adding a new filter with capacity bounded within the u32 range. false positive rate is also
// bound within the range f64::MIN_POSITIVE <= x < 1.0.
let new_fp_rate = match self.fp_rate * configs::TIGHTENING_RATIO.powi(num_filters) {
x if x > f64::MIN_POSITIVE => x,
_ => {
return Err(BloomError::MaxNumScalingFilters);
}
let new_fp_rate = match Self::calculate_fp_rate(self.fp_rate, num_filters) {
Ok(rate) => rate,
Err(e) => return Err(e),
};
let new_capacity = match filter.capacity.checked_mul(self.expansion) {
Some(new_capacity) => new_capacity,
Expand Down Expand Up @@ -238,6 +237,14 @@ impl BloomFilterType {
}
}

/// Calculate the false positive rate for the Nth filter using tightening ratio.
pub fn calculate_fp_rate(fp_rate: f64, num_filters: i32) -> Result<f64, BloomError> {
match fp_rate * configs::TIGHTENING_RATIO.powi(num_filters) {
x if x > f64::MIN_POSITIVE => Ok(x),
_ => Err(BloomError::MaxNumScalingFilters),
}
}

/// Deserialize a byte array to bloom filter.
/// We will need to handle any current or previous version and deserializing the bytes into a bloom object of the running Module's current version `BLOOM_TYPE_VERSION`.
pub fn decode_bloom_filter(
Expand Down Expand Up @@ -333,6 +340,7 @@ impl BloomFilterType {
/// well within the u32::MAX limit.
#[derive(Serialize, Deserialize)]
pub struct BloomFilter {
#[serde(serialize_with = "serialize", deserialize_with = "deserialize")]
pub bloom: bloomfilter::Bloom<[u8]>,
pub num_items: u32,
pub capacity: u32,
Expand All @@ -345,7 +353,8 @@ impl BloomFilter {
capacity as usize,
fp_rate,
&configs::FIXED_SEED,
);
)
.expect("We expect bloomfilter::Bloom<[u8]> creation to succeed");
let fltr = BloomFilter {
bloom,
num_items: 0,
Expand All @@ -361,20 +370,10 @@ impl BloomFilter {
}

/// Create a new BloomFilter from dumped information (RDB load).
pub fn from_existing(
bitmap: &[u8],
number_of_bits: u64,
number_of_hash_functions: u32,
sip_keys: [(u64, u64); 2],
num_items: u32,
capacity: u32,
) -> BloomFilter {
let bloom = bloomfilter::Bloom::from_existing(
bitmap,
number_of_bits,
number_of_hash_functions,
sip_keys,
);
pub fn from_existing(bitmap: &[u8], num_items: u32, capacity: u32) -> BloomFilter {
let bloom = bloomfilter::Bloom::from_slice(bitmap)
.expect("We expect bloomfilter::Bloom<[u8]> creation to succeed");

let fltr = BloomFilter {
bloom,
num_items,
Expand All @@ -392,7 +391,7 @@ impl BloomFilter {
}

pub fn number_of_bytes(&self) -> usize {
std::mem::size_of::<BloomFilter>() + (self.bloom.number_of_bits() / 8) as usize
std::mem::size_of::<BloomFilter>() + (self.bloom.len() / 8) as usize
}

/// Caculates the number of bytes that the bloom filter will require to be allocated.
Expand All @@ -407,17 +406,6 @@ impl BloomFilter {
true
}

/// Caculates the number of bytes that the bloom filter will require to be allocated using provided `number_of_bits`.
/// This is used before actually creating the bloom filter when checking if the filter is within the allowed size.
/// Returns whether the bloom filter is of a valid size or not.
pub fn validate_size_with_bits(number_of_bits: u64) -> bool {
let bytes = std::mem::size_of::<BloomFilter>() as u64 + number_of_bits;
if bytes > configs::BLOOM_MEMORY_LIMIT_PER_FILTER.load(Ordering::Relaxed) as u64 {
return false;
}
true
}

pub fn check(&self, item: &[u8]) -> bool {
self.bloom.check(item)
}
Expand All @@ -426,20 +414,9 @@ impl BloomFilter {
self.bloom.set(item)
}

pub fn sip_keys(&self) -> [(u64, u64); 2] {
self.bloom.sip_keys()
}

/// Create a new BloomFilter from an existing BloomFilter object (COPY command).
pub fn create_copy_from(bf: &BloomFilter) -> BloomFilter {
BloomFilter::from_existing(
&bf.bloom.bitmap(),
bf.bloom.number_of_bits(),
bf.bloom.number_of_hash_functions(),
bf.bloom.sip_keys(),
bf.num_items,
bf.capacity,
)
BloomFilter::from_existing(&bf.bloom.to_bytes(), bf.num_items, bf.capacity)
}
}

Expand Down Expand Up @@ -468,9 +445,7 @@ impl Drop for BloomFilter {
#[cfg(test)]
mod tests {
use super::*;
use crate::configs::{
FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B, FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B,
};
use configs::FIXED_SEED;
use rand::{distributions::Alphanumeric, Rng};

/// Returns random string with specified number of characters.
Expand Down Expand Up @@ -560,10 +535,6 @@ mod tests {
fp_margin: f64,
rand_prefix: &String,
) {
let expected_sip_keys = [
(FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B),
(FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B),
];
assert_eq!(
restored_bloom_filter_type.capacity(),
original_bloom_filter_type.capacity()
Expand All @@ -587,8 +558,8 @@ mod tests {
.filters
.iter()
.any(
|filter| (filter.bloom.sip_keys() == restore_filter.bloom.sip_keys())
&& (restore_filter.bloom.sip_keys() == expected_sip_keys)
|filter| (filter.bloom.seed() == restore_filter.bloom.seed())
&& (restore_filter.bloom.seed() == FIXED_SEED)
)));
assert!(restored_bloom_filter_type
.filters
Expand All @@ -604,7 +575,7 @@ mod tests {
.all(|restore_filter| original_bloom_filter_type
.filters
.iter()
.any(|filter| filter.bloom.bitmap() == restore_filter.bloom.bitmap())));
.any(|filter| filter.bloom.as_slice() == restore_filter.bloom.as_slice())));
let (error_count, _) = check_items_exist(
restored_bloom_filter_type,
1,
Expand Down Expand Up @@ -750,14 +721,11 @@ mod tests {
}

#[test]
fn test_sip_keys() {
fn test_seed() {
// The value of sip keys generated by the sip_keys with fixed seed should be equal to the constant in configs.rs
let test_bloom_filter = BloomFilter::new(0.5_f64, 1000_u32);
let test_sip_keys = test_bloom_filter.bloom.sip_keys();
assert_eq!(test_sip_keys[0].0, FIXED_SIP_KEY_ONE_A);
assert_eq!(test_sip_keys[0].1, FIXED_SIP_KEY_ONE_B);
assert_eq!(test_sip_keys[1].0, FIXED_SIP_KEY_TWO_A);
assert_eq!(test_sip_keys[1].1, FIXED_SIP_KEY_TWO_B);
let seed = test_bloom_filter.bloom.seed();
assert_eq!(seed, FIXED_SEED);
}

#[test]
Expand Down
4 changes: 1 addition & 3 deletions src/wrapper/bloom_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,12 @@ pub unsafe extern "C" fn bloom_rdb_save(rdb: *mut raw::RedisModuleIO, value: *mu
let mut filter_list_iter = filter_list.iter().peekable();
while let Some(filter) = filter_list_iter.next() {
let bloom = &filter.bloom;
let bitmap = bloom.bitmap();
let bitmap = bloom.to_bytes();
raw::RedisModule_SaveStringBuffer.unwrap()(
rdb,
bitmap.as_ptr().cast::<c_char>(),
bitmap.len(),
);
raw::save_unsigned(rdb, bloom.number_of_bits());
raw::save_unsigned(rdb, bloom.number_of_hash_functions() as u64);
raw::save_unsigned(rdb, filter.capacity as u64);
if filter_list_iter.peek().is_none() {
raw::save_unsigned(rdb, filter.num_items as u64);
Expand Down
2 changes: 1 addition & 1 deletion tests/test_bloom_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from valkeytests.conftest import resource_port_tracker
from util.waiters import *

DEFAULT_BLOOM_FILTER_SIZE = 179960
DEFAULT_BLOOM_FILTER_SIZE = 179952
DEFAULT_BLOOM_FILTER_CAPACITY = 100000
class TestBloomMetrics(ValkeyBloomTestCaseBase):

Expand Down

0 comments on commit 0218610

Please sign in to comment.