Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating how we create BloomFilter from rdb loads and upgrading bloomfilter dependency to version 3.0 #23

Merged
merged 7 commits into from
Dec 4, 2024
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ homepage = "https://github.com/valkey-io/valkey-bloom"
valkey-module = "0.1.2"
valkey-module-macros = "0"
linkme = "0"
bloomfilter = { version = "1.0.13", features = ["serde"] }
bloomfilter = { version = "3.0.1", features = ["serde"] }
lazy_static = "1.4.0"
libc = "0.2"
serde = { version = "1.0", features = ["derive"] }
Expand Down
44 changes: 14 additions & 30 deletions src/bloom/data_type.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use crate::bloom::utils::BloomFilter;
use crate::bloom::utils::BloomFilterType;
use crate::configs::{
FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B, FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B,
};
use crate::metrics::BLOOM_NUM_OBJECTS;
use crate::metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES;
use crate::wrapper::bloom_callback;
Expand Down Expand Up @@ -61,7 +58,6 @@ pub trait ValkeyDataType {
impl ValkeyDataType for BloomFilterType {
/// Callback to load and parse RDB data of a bloom item and create it.
fn load_from_rdb(rdb: *mut raw::RedisModuleIO, encver: i32) -> Option<BloomFilterType> {
let mut filters = Vec::new();
if encver > BLOOM_FILTER_TYPE_ENCODING_VERSION {
logging::log_warning(format!("{}: Cannot load bloomfltr data type of version {} because it is higher than the loaded module's bloomfltr supported version {}", MODULE_NAME, encver, BLOOM_FILTER_TYPE_ENCODING_VERSION).as_str());
return None;
Expand All @@ -75,24 +71,26 @@ impl ValkeyDataType for BloomFilterType {
let Ok(fp_rate) = raw::load_double(rdb) else {
return None;
};
let mut filters: Vec<BloomFilter> = Vec::with_capacity(num_filters as usize);
Copy link
Member

@KarthikSubbarao KarthikSubbarao Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: This will be fixed in a follow up PR. We want to make the vector expansion consistent for (1) bloom object creation and scaling (2) COPY (3) RDB Load (4) BF.LOAD.

We can also add the vector capacity into the DEBUG DIGEST. This will be very useful for correctness testing

You can keep this in mind when working on the fix @zackcam


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few comments on RDB Save and Load.

  1. Number of hash functions are included in the bitmap dump as a result of to_bytes. So we can remove this from the save/load logic. Right?
  2. Do we need number of bits for any purpose? Is it needed for bloom filter restoration (external bloom)? If not, can we remove it? If it helps with any purpose (or if it makes size validation impossible), feel free to retain it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need both fields. I removed them both and instead moved the logic of checking the size until we have converted the bloom object into a type where we can get the number of bits

for i in 0..num_filters {
let Ok(bitmap) = raw::load_string_buffer(rdb) else {
return None;
};
let Ok(number_of_bits) = raw::load_unsigned(rdb) else {
let Ok(capacity) = raw::load_unsigned(rdb) else {
return None;
};
// Reject RDB Load if any bloom filter within a bloom object of a size greater than what is allowed.
if !BloomFilter::validate_size_with_bits(number_of_bits) {
let new_fp_rate = match Self::calculate_fp_rate(fp_rate, num_filters as i32) {
Ok(rate) => rate,
Err(_) => {
logging::log_warning("ERR bloom object reached max number of filters");
KarthikSubbarao marked this conversation as resolved.
Show resolved Hide resolved
return None;
}
};
if !BloomFilter::validate_size(capacity as u32, new_fp_rate) {
logging::log_warning("Failed to restore bloom object because it contains a filter larger than the max allowed size limit.");
return None;
}
let Ok(number_of_hash_functions) = raw::load_unsigned(rdb) else {
return None;
};
let Ok(capacity) = raw::load_unsigned(rdb) else {
return None;
};
// Only load num_items when it's the last filter
let num_items = if i == num_filters - 1 {
match raw::load_unsigned(rdb) {
Expand All @@ -102,18 +100,8 @@ impl ValkeyDataType for BloomFilterType {
} else {
capacity
};
let sip_keys = [
(FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B),
(FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B),
];
let filter = BloomFilter::from_existing(
bitmap.as_ref(),
number_of_bits,
number_of_hash_functions as u32,
sip_keys,
num_items as u32,
capacity as u32,
);
let filter =
BloomFilter::from_existing(bitmap.as_ref(), num_items as u32, capacity as u32);
filters.push(filter);
}
BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
Expand All @@ -134,11 +122,7 @@ impl ValkeyDataType for BloomFilterType {
dig.add_long_long(self.expansion.into());
dig.add_string_buffer(&self.fp_rate.to_le_bytes());
for filter in &self.filters {
dig.add_string_buffer(&filter.bloom.bitmap());
for &(key1, key2) in &filter.sip_keys() {
dig.add_long_long(key1 as i64);
dig.add_long_long(key2 as i64);
}
dig.add_string_buffer(filter.bloom.as_slice());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just Confirming. Does the slice contain the seed as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is contained in the header portion of the bitmap. BitMap::set_seed(header, &seed); it is set like this in sync

dig.add_long_long(filter.num_items.into());
dig.add_long_long(filter.capacity.into());
}
Expand Down
91 changes: 29 additions & 62 deletions src/bloom/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ use crate::{
metrics,
};
use bloomfilter;
use bloomfilter::{deserialize, serialize};
use serde::{Deserialize, Serialize};
use std::{mem, sync::atomic::Ordering};

use super::data_type::BLOOM_TYPE_VERSION;
use std::{mem, sync::atomic::Ordering};

/// KeySpace Notification Events
pub const ADD_EVENT: &str = "bloom.add";
Expand Down Expand Up @@ -104,7 +105,7 @@ impl BloomFilterType {

/// Create a new BloomFilterType object from an existing one.
pub fn create_copy_from(from_bf: &BloomFilterType) -> BloomFilterType {
let mut filters = Vec::new();
let mut filters: Vec<BloomFilter> = Vec::with_capacity(from_bf.filters.len());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we worry about over-allocate? ( when filters.len() = 0)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filters.len() will never / should never be 0, so this seems OK

metrics::BLOOM_NUM_OBJECTS.fetch_add(1, Ordering::Relaxed);
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
mem::size_of::<BloomFilterType>(),
Expand Down Expand Up @@ -188,11 +189,9 @@ impl BloomFilterType {
}
// Scale out by adding a new filter with capacity bounded within the u32 range. false positive rate is also
// bound within the range f64::MIN_POSITIVE <= x < 1.0.
let new_fp_rate = match self.fp_rate * configs::TIGHTENING_RATIO.powi(num_filters) {
x if x > f64::MIN_POSITIVE => x,
_ => {
return Err(BloomError::MaxNumScalingFilters);
}
let new_fp_rate = match Self::calculate_fp_rate(self.fp_rate, num_filters) {
Ok(rate) => rate,
Err(e) => return Err(e),
};
let new_capacity = match filter.capacity.checked_mul(self.expansion) {
Some(new_capacity) => new_capacity,
Expand Down Expand Up @@ -231,6 +230,13 @@ impl BloomFilterType {
}
}

pub fn calculate_fp_rate(fp_rate: f64, num_filters: i32) -> Result<f64, BloomError> {
KarthikSubbarao marked this conversation as resolved.
Show resolved Hide resolved
match fp_rate * configs::TIGHTENING_RATIO.powi(num_filters) {
x if x > f64::MIN_POSITIVE => Ok(x),
_ => Err(BloomError::MaxNumScalingFilters),
}
}

/// Deserialize a byte array to bloom filter.
/// We will need to handle any current or previous version and deserializing the bytes into a bloom object of the running Module's current version `BLOOM_TYPE_VERSION`.
pub fn decode_bloom_filter(
Expand Down Expand Up @@ -318,6 +324,7 @@ impl BloomFilterType {
/// well within the u32::MAX limit.
#[derive(Serialize, Deserialize)]
pub struct BloomFilter {
#[serde(serialize_with = "serialize", deserialize_with = "deserialize")]
pub bloom: bloomfilter::Bloom<[u8]>,
pub num_items: u32,
pub capacity: u32,
Expand All @@ -330,7 +337,8 @@ impl BloomFilter {
capacity as usize,
fp_rate,
&configs::FIXED_SEED,
);
)
.expect("We expect bloomfilter::Bloom<[u8]> creation to succeed");
let fltr = BloomFilter {
bloom,
num_items: 0,
Expand All @@ -346,20 +354,10 @@ impl BloomFilter {
}

/// Create a new BloomFilter from dumped information (RDB load).
pub fn from_existing(
bitmap: &[u8],
number_of_bits: u64,
number_of_hash_functions: u32,
sip_keys: [(u64, u64); 2],
num_items: u32,
capacity: u32,
) -> BloomFilter {
let bloom = bloomfilter::Bloom::from_existing(
bitmap,
number_of_bits,
number_of_hash_functions,
sip_keys,
);
pub fn from_existing(bitmap: &[u8], num_items: u32, capacity: u32) -> BloomFilter {
let bloom = bloomfilter::Bloom::from_slice(bitmap)
.expect("We expect bloomfilter::Bloom<[u8]> creation to succeed");

let fltr = BloomFilter {
bloom,
num_items,
Expand All @@ -377,7 +375,7 @@ impl BloomFilter {
}

pub fn number_of_bytes(&self) -> usize {
std::mem::size_of::<BloomFilter>() + (self.bloom.number_of_bits() / 8) as usize
std::mem::size_of::<BloomFilter>() + (self.bloom.len() / 8) as usize
}

/// Caculates the number of bytes that the bloom filter will require to be allocated.
Expand All @@ -392,17 +390,6 @@ impl BloomFilter {
true
}

/// Caculates the number of bytes that the bloom filter will require to be allocated using provided `number_of_bits`.
/// This is used before actually creating the bloom filter when checking if the filter is within the allowed size.
/// Returns whether the bloom filter is of a valid size or not.
pub fn validate_size_with_bits(number_of_bits: u64) -> bool {
let bytes = std::mem::size_of::<BloomFilter>() as u64 + number_of_bits;
if bytes > configs::BLOOM_MEMORY_LIMIT_PER_FILTER.load(Ordering::Relaxed) as u64 {
return false;
}
true
}

pub fn check(&self, item: &[u8]) -> bool {
self.bloom.check(item)
}
Expand All @@ -411,20 +398,9 @@ impl BloomFilter {
self.bloom.set(item)
}

pub fn sip_keys(&self) -> [(u64, u64); 2] {
self.bloom.sip_keys()
}

/// Create a new BloomFilter from an existing BloomFilter object (COPY command).
pub fn create_copy_from(bf: &BloomFilter) -> BloomFilter {
BloomFilter::from_existing(
&bf.bloom.bitmap(),
bf.bloom.number_of_bits(),
bf.bloom.number_of_hash_functions(),
bf.bloom.sip_keys(),
bf.num_items,
bf.capacity,
)
BloomFilter::from_existing(&bf.bloom.to_bytes(), bf.num_items, bf.capacity)
}
}

Expand Down Expand Up @@ -453,9 +429,7 @@ impl Drop for BloomFilter {
#[cfg(test)]
mod tests {
use super::*;
use crate::configs::{
FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B, FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B,
};
use configs::FIXED_SEED;
use rand::{distributions::Alphanumeric, Rng};

/// Returns random string with specified number of characters.
Expand Down Expand Up @@ -545,10 +519,6 @@ mod tests {
fp_margin: f64,
rand_prefix: &String,
) {
let expected_sip_keys = [
(FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B),
(FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B),
];
assert_eq!(
restored_bloom_filter_type.capacity(),
original_bloom_filter_type.capacity()
Expand All @@ -572,8 +542,8 @@ mod tests {
.filters
.iter()
.any(
|filter| (filter.bloom.sip_keys() == restore_filter.bloom.sip_keys())
&& (restore_filter.bloom.sip_keys() == expected_sip_keys)
|filter| (filter.bloom.seed() == restore_filter.bloom.seed())
&& (restore_filter.bloom.seed() == FIXED_SEED)
)));
assert!(restored_bloom_filter_type
.filters
Expand All @@ -589,7 +559,7 @@ mod tests {
.all(|restore_filter| original_bloom_filter_type
.filters
.iter()
.any(|filter| filter.bloom.bitmap() == restore_filter.bloom.bitmap())));
.any(|filter| filter.bloom.as_slice() == restore_filter.bloom.as_slice())));
let (error_count, _) = check_items_exist(
restored_bloom_filter_type,
1,
Expand Down Expand Up @@ -735,14 +705,11 @@ mod tests {
}

#[test]
fn test_sip_keys() {
fn test_seed() {
// The value of sip keys generated by the sip_keys with fixed seed should be equal to the constant in configs.rs
let test_bloom_filter = BloomFilter::new(0.5_f64, 1000_u32);
let test_sip_keys = test_bloom_filter.bloom.sip_keys();
assert_eq!(test_sip_keys[0].0, FIXED_SIP_KEY_ONE_A);
assert_eq!(test_sip_keys[0].1, FIXED_SIP_KEY_ONE_B);
assert_eq!(test_sip_keys[1].0, FIXED_SIP_KEY_TWO_A);
assert_eq!(test_sip_keys[1].1, FIXED_SIP_KEY_TWO_B);
let seed = test_bloom_filter.bloom.seed();
assert_eq!(seed, FIXED_SEED);
}

#[test]
Expand Down
4 changes: 1 addition & 3 deletions src/wrapper/bloom_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,12 @@ pub unsafe extern "C" fn bloom_rdb_save(rdb: *mut raw::RedisModuleIO, value: *mu
let mut filter_list_iter = filter_list.iter().peekable();
while let Some(filter) = filter_list_iter.next() {
let bloom = &filter.bloom;
let bitmap = bloom.bitmap();
let bitmap = bloom.to_bytes();
raw::RedisModule_SaveStringBuffer.unwrap()(
rdb,
bitmap.as_ptr().cast::<c_char>(),
bitmap.len(),
);
raw::save_unsigned(rdb, bloom.number_of_bits());
raw::save_unsigned(rdb, bloom.number_of_hash_functions() as u64);
raw::save_unsigned(rdb, filter.capacity as u64);
if filter_list_iter.peek().is_none() {
raw::save_unsigned(rdb, filter.num_items as u64);
Expand Down
2 changes: 1 addition & 1 deletion tests/test_bloom_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from valkeytests.conftest import resource_port_tracker
from util.waiters import *

DEFAULT_BLOOM_FILTER_SIZE = 179960
DEFAULT_BLOOM_FILTER_SIZE = 179952
DEFAULT_BLOOM_FILTER_CAPACITY = 100000
class TestBloomMetrics(ValkeyBloomTestCaseBase):

Expand Down
Loading