Skip to content

Commit

Permalink
Merge branch 'valkey-io:unstable' into unstable
Browse files Browse the repository at this point in the history
  • Loading branch information
zackcam authored Dec 8, 2024
2 parents f465735 + eaa218f commit e08ca07
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 25 deletions.
45 changes: 33 additions & 12 deletions src/bloom/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::bloom::utils::BloomFilterType;
use crate::configs;
use crate::configs::{
BLOOM_CAPACITY_MAX, BLOOM_CAPACITY_MIN, BLOOM_EXPANSION_MAX, BLOOM_EXPANSION_MIN,
BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN,
BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN, TIGHTENING_RATIO_MAX, TIGHTENING_RATIO_MIN,
};
use std::sync::atomic::Ordering;
use valkey_module::ContextFlags;
Expand Down Expand Up @@ -87,7 +87,7 @@ pub fn bloom_filter_add_value(
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(ValkeyError::Str(utils::ERROR));
return Err(ValkeyError::WrongType);
}
};
// Skip bloom filter size validation on replicated cmds.
Expand All @@ -110,11 +110,13 @@ pub fn bloom_filter_add_value(
None => {
// Instantiate empty bloom filter.
let fp_rate = configs::BLOOM_FP_RATE_DEFAULT;
let tightening_ratio = configs::TIGHTENING_RATIO;
let capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed) as u32;
let expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32;
let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed);
let mut bloom = match BloomFilterType::new_reserved(
fp_rate,
tightening_ratio,
capacity,
expansion,
use_random_seed,
Expand Down Expand Up @@ -173,7 +175,7 @@ pub fn bloom_filter_exists(
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(ValkeyError::Str(utils::ERROR));
return Err(ValkeyError::WrongType);
}
};
if !multi {
Expand Down Expand Up @@ -201,7 +203,7 @@ pub fn bloom_filter_card(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(ValkeyError::Str(utils::ERROR));
return Err(ValkeyError::WrongType);
}
};
match value {
Expand Down Expand Up @@ -270,7 +272,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(ValkeyError::Str(utils::ERROR));
return Err(ValkeyError::WrongType);
}
};
match value {
Expand All @@ -279,8 +281,10 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed);
// Skip bloom filter size validation on replicated cmds.
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let tightening_ratio = configs::TIGHTENING_RATIO;
let bloom = match BloomFilterType::new_reserved(
fp_rate,
tightening_ratio,
capacity,
expansion,
use_random_seed,
Expand All @@ -302,6 +306,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke

pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyResult {
let argc = input_args.len();
let replicated_cmd = ctx.get_flags().contains(ContextFlags::REPLICATED);
// At the very least, we need: BF.INSERT <key> ITEMS <item>
if argc < 4 {
return Err(ValkeyError::WrongArity);
Expand All @@ -311,6 +316,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
let filter_name = &input_args[idx];
idx += 1;
let mut fp_rate = configs::BLOOM_FP_RATE_DEFAULT;
let mut tightening_ratio = configs::TIGHTENING_RATIO;
let mut capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed) as u32;
let mut expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32;
let mut nocreate = false;
Expand All @@ -331,6 +337,21 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
}
};
}
"TIGHTENING" if replicated_cmd => {
if idx >= (argc - 1) {
return Err(ValkeyError::WrongArity);
}
idx += 1;
tightening_ratio = match input_args[idx].to_string_lossy().parse::<f64>() {
Ok(num) if num > TIGHTENING_RATIO_MIN && num < TIGHTENING_RATIO_MAX => num,
Ok(num) if !(num > TIGHTENING_RATIO_MIN && num < TIGHTENING_RATIO_MAX) => {
return Err(ValkeyError::Str(utils::ERROR_RATIO_RANGE));
}
_ => {
return Err(ValkeyError::Str(utils::BAD_ERROR_RATIO));
}
};
}
"CAPACITY" => {
if idx >= (argc - 1) {
return Err(ValkeyError::WrongArity);
Expand Down Expand Up @@ -383,11 +404,10 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(ValkeyError::Str(utils::ERROR));
return Err(ValkeyError::WrongType);
}
};
// Skip bloom filter size validation on replicated cmds.
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let mut add_succeeded = false;
match value {
Some(bloom) => {
Expand All @@ -398,7 +418,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
bloom,
true,
&mut add_succeeded,
validate_size_limit,
!replicated_cmd,
);
replicate_and_notify_events(ctx, filter_name, add_succeeded, false);
response
Expand All @@ -410,10 +430,11 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed);
let mut bloom = match BloomFilterType::new_reserved(
fp_rate,
tightening_ratio,
capacity,
expansion,
use_random_seed,
validate_size_limit,
!replicated_cmd,
) {
Ok(bf) => bf,
Err(err) => return Err(ValkeyError::Str(err.as_str())),
Expand All @@ -425,7 +446,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
&mut bloom,
true,
&mut add_succeeded,
validate_size_limit,
!replicated_cmd,
);
match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) {
Ok(()) => {
Expand All @@ -451,7 +472,7 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(ValkeyError::Str(utils::ERROR));
return Err(ValkeyError::WrongType);
}
};
match value {
Expand Down Expand Up @@ -513,7 +534,7 @@ pub fn bloom_filter_load(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
Ok(v) => v,
Err(_) => {
// error
return Err(ValkeyError::Str(utils::ERROR));
return Err(ValkeyError::WrongType);
}
};
match filter {
Expand Down
5 changes: 5 additions & 0 deletions src/bloom/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ impl ValkeyDataType for BloomFilterType {
let Ok(fp_rate) = raw::load_double(rdb) else {
return None;
};
let Ok(tightening_ratio) = raw::load_double(rdb) else {
return None;
};
let mut filters: Vec<BloomFilter> = Vec::with_capacity(num_filters as usize);
let Ok(is_seed_random_u64) = raw::load_unsigned(rdb) else {
return None;
Expand Down Expand Up @@ -121,6 +124,7 @@ impl ValkeyDataType for BloomFilterType {
let item = BloomFilterType {
expansion: expansion as u32,
fp_rate,
tightening_ratio,
is_seed_random,
filters,
};
Expand All @@ -131,6 +135,7 @@ impl ValkeyDataType for BloomFilterType {
fn debug_digest(&self, mut dig: Digest) {
dig.add_long_long(self.expansion.into());
dig.add_string_buffer(&self.fp_rate.to_le_bytes());
dig.add_string_buffer(&self.tightening_ratio.to_le_bytes());
for filter in &self.filters {
dig.add_string_buffer(filter.bloom.as_slice());
dig.add_long_long(filter.num_items.into());
Expand Down
Loading

0 comments on commit e08ca07

Please sign in to comment.