Skip to content

Commit

Permalink
ACL Updates, BF.INSERT updates, other misc updates (#38)
Browse files Browse the repository at this point in the history
Signed-off-by: Karthik Subbarao <[email protected]>
  • Loading branch information
KarthikSubbarao committed Jan 12, 2025
1 parent ab158bb commit a9b578e
Show file tree
Hide file tree
Showing 15 changed files with 98 additions and 76 deletions.
19 changes: 10 additions & 9 deletions src/bloom/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,15 +441,14 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
/// Function that implements logic to handle the BF.INSERT command.
pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyResult {
let argc = input_args.len();
// At the very least, we need: BF.INSERT <key> ITEMS <item>
if argc < 4 {
// At the very least, we need: BF.INSERT <key>
if argc < 2 {
return Err(ValkeyError::WrongArity);
}
let mut idx = 1;
// Parse the filter name
let filter_name = &input_args[idx];
idx += 1;
let replicated_cmd = ctx.get_flags().contains(ContextFlags::REPLICATED);
let mut fp_rate = *configs::BLOOM_FP_RATE_F64
.lock()
.expect("Unable to get a lock on fp_rate static");
Expand All @@ -464,6 +463,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
false => (Some(configs::FIXED_SEED), false),
};
let mut nocreate = false;
let mut items_provided = false;
while idx < argc {
match input_args[idx].to_string_lossy().to_uppercase().as_str() {
"ERROR" => {
Expand All @@ -481,7 +481,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
}
};
}
"TIGHTENING" if replicated_cmd => {
"TIGHTENING" => {
// Note: This argument is only supported on replicated commands since primary nodes replicate bloom objects
// deterministically using every global bloom config/property.
if idx >= (argc - 1) {
Expand Down Expand Up @@ -520,7 +520,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
}
};
}
"SEED" if replicated_cmd => {
"SEED" => {
// Note: This argument is only supported on replicated commands since primary nodes replicate bloom objects
// deterministically using every global bloom config/property.
if idx >= (argc - 1) {
Expand Down Expand Up @@ -555,6 +555,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
}
"ITEMS" => {
idx += 1;
items_provided = true;
break;
}
_ => {
Expand All @@ -563,7 +564,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
}
idx += 1;
}
if idx == argc && !replicated_cmd {
if idx == argc && items_provided {
// We expect the ITEMS <item> [<item> ...] argument to be provided on the BF.INSERT command used on primary nodes.
// For replicated commands, this is optional to allow BF.INSERT to be used to replicate bloom object creation
// commands without any items (BF.RESERVE).
Expand All @@ -578,7 +579,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
}
};
// Skip bloom filter size validation on replicated cmds.
let validate_size_limit = !replicated_cmd;
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let mut add_succeeded = false;
match value {
Some(bloom) => {
Expand All @@ -589,7 +590,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
bloom,
true,
&mut add_succeeded,
!replicated_cmd,
validate_size_limit,
);
let replicate_args = ReplicateArgs {
capacity: bloom.capacity(),
Expand Down Expand Up @@ -632,7 +633,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
&mut bloom,
true,
&mut add_succeeded,
!replicated_cmd,
validate_size_limit,
);
match filter_key.set_value(&BLOOM_TYPE, bloom) {
Ok(()) => {
Expand Down
6 changes: 3 additions & 3 deletions src/bloom/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,6 @@ impl ValkeyDataType for BloomObject {
// Calculate the memory usage of the BloomFilter/s by summing up BloomFilter sizes as they are de-serialized.
let mut filters_memory_usage = 0;
for i in 0..num_filters {
let Ok(bitmap) = raw::load_string_buffer(rdb) else {
return None;
};
let Ok(capacity) = raw::load_unsigned(rdb) else {
return None;
};
Expand Down Expand Up @@ -119,6 +116,9 @@ impl ValkeyDataType for BloomObject {
} else {
capacity
};
let Ok(bitmap) = raw::load_string_buffer(rdb) else {
return None;
};
let filter =
BloomFilter::from_existing(bitmap.as_ref(), num_items as i64, capacity as i64);
if !is_seed_random && filter.seed() != configs::FIXED_SEED {
Expand Down
13 changes: 8 additions & 5 deletions src/bloom/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ mod tests {
add_items_till_capacity(
&mut bf,
initial_capacity + 1,
1,
add_operation_idx + 1,
&rand_prefix,
Some(BloomError::NonScalingFilterFull),
);
Expand Down Expand Up @@ -852,10 +852,13 @@ mod tests {
// Validate that the real fp_rate is not much more than the configured fp_rate.
fp_assert(error_count, num_operations, expected_fp_rate, fp_margin);
// Verify restore
let mut restore_bf = BloomObject::create_copy_from(&bf);
assert_eq!(
restore_bf.add_item(b"new_item", true),
Err(BloomError::NonScalingFilterFull)
let restore_bf = BloomObject::create_copy_from(&bf);
add_items_till_capacity(
&mut bf,
initial_capacity + 1,
add_operation_idx + 1,
&rand_prefix,
Some(BloomError::NonScalingFilterFull),
);
verify_restored_items(
&bf,
Expand Down
4 changes: 2 additions & 2 deletions src/configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub fn on_string_config_set(
};
match name {
"bloom-fp-rate" => {
if !(BLOOM_FP_RATE_MIN..BLOOM_FP_RATE_MAX).contains(&value) {
if !(value > BLOOM_FP_RATE_MIN && value < BLOOM_FP_RATE_MAX) {
return Err(ValkeyError::Str(utils::ERROR_RATE_RANGE));
}
let mut fp_rate = BLOOM_FP_RATE_F64
Expand All @@ -96,7 +96,7 @@ pub fn on_string_config_set(
Ok(())
}
"bloom-tightening-ratio" => {
if !(BLOOM_TIGHTENING_RATIO_MIN..BLOOM_TIGHTENING_RATIO_MAX).contains(&value) {
if !(value > BLOOM_TIGHTENING_RATIO_MIN && value < BLOOM_TIGHTENING_RATIO_MAX) {
return Err(ValkeyError::Str(utils::TIGHTENING_RATIO_RANGE));
}
let mut tightening = BLOOM_TIGHTENING_F64
Expand Down
18 changes: 9 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,15 @@ valkey_module! {
"bloom",
]
commands: [
["BF.ADD", bloom_add_command, "write fast deny-oom", 1, 1, 1, "bloom"],
["BF.MADD", bloom_madd_command, "write fast deny-oom", 1, 1, 1, "bloom"],
["BF.EXISTS", bloom_exists_command, "readonly fast", 1, 1, 1, "bloom"],
["BF.MEXISTS", bloom_mexists_command, "readonly fast", 1, 1, 1, "bloom"],
["BF.CARD", bloom_card_command, "readonly fast", 1, 1, 1, "bloom"],
["BF.RESERVE", bloom_reserve_command, "write fast deny-oom", 1, 1, 1, "bloom"],
["BF.INFO", bloom_info_command, "readonly fast", 1, 1, 1, "bloom"],
["BF.INSERT", bloom_insert_command, "write fast deny-oom", 1, 1, 1, "bloom"],
["BF.LOAD", bloom_load_command, "write fast deny-oom", 1, 1, 1, "bloom"]
["BF.ADD", bloom_add_command, "write fast deny-oom", 1, 1, 1, "fast write bloom"],
["BF.MADD", bloom_madd_command, "write fast deny-oom", 1, 1, 1, "fast write bloom"],
["BF.EXISTS", bloom_exists_command, "readonly fast", 1, 1, 1, "fast read bloom"],
["BF.MEXISTS", bloom_mexists_command, "readonly fast", 1, 1, 1, "fast read bloom"],
["BF.CARD", bloom_card_command, "readonly fast", 1, 1, 1, "fast read bloom"],
["BF.RESERVE", bloom_reserve_command, "write fast deny-oom", 1, 1, 1, "fast write bloom"],
["BF.INFO", bloom_info_command, "readonly fast", 1, 1, 1, "fast read bloom"],
["BF.INSERT", bloom_insert_command, "write fast deny-oom", 1, 1, 1, "fast write bloom"],
["BF.LOAD", bloom_load_command, "write deny-oom", 1, 1, 1, "write bloom"]
],
configurations: [
i64: [
Expand Down
8 changes: 4 additions & 4 deletions src/wrapper/bloom_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,17 @@ pub unsafe extern "C" fn bloom_rdb_save(rdb: *mut raw::RedisModuleIO, value: *mu
let filter_list = v.filters();
let mut filter_list_iter = filter_list.iter().peekable();
while let Some(filter) = filter_list_iter.next() {
raw::save_unsigned(rdb, filter.capacity() as u64);
if filter_list_iter.peek().is_none() {
raw::save_unsigned(rdb, filter.num_items() as u64);
}
let bloom = filter.raw_bloom();
let bitmap = bloom.as_slice();
raw::RedisModule_SaveStringBuffer.unwrap()(
rdb,
bitmap.as_ptr().cast::<c_char>(),
bitmap.len(),
);
raw::save_unsigned(rdb, filter.capacity() as u64);
if filter_list_iter.peek().is_none() {
raw::save_unsigned(rdb, filter.num_items() as u64);
}
}
}

Expand Down
92 changes: 53 additions & 39 deletions tests/test_bloom_acl_category.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import pytest
from valkeytests.conftest import resource_port_tracker
from valkey_bloom_test_case import ValkeyBloomTestCaseBase
from util.waiters import *

class TestBloomACLCategory(ValkeyBloomTestCaseBase):

def test_bloom_acl_category(self):
def test_bloom_acl_category_permissions(self):
# List of bloom commands and the expected returns if the command is valid
bloom_commands = [
('BF.ADD add_key item', 1),
Expand All @@ -19,52 +20,65 @@ def test_bloom_acl_category(self):
client = self.server.get_new_client()
# Get a list of all commands with the acl category bloom
list_of_bloom_commands = client.execute_command("COMMAND LIST FILTERBY ACLCAT bloom")
# Create users with differnt acl permissions
client.execute_command("ACL SETUSER nonbloomuser1 on >bloom_pass -@bloom")
client.execute_command("ACL SETUSER nonbloomuser2 on >bloom_pass -@all")
client.execute_command("ACL SETUSER bloomuser1 on >bloom_pass ~* &* +@all ")
client.execute_command("ACL SETUSER bloomuser2 on >bloom_pass ~* &* -@all +@bloom ")
client.execute_command("ACL SETUSER bloomuser3 on >bloom_pass ~* &* -@all +@write +@read ")
client.execute_command("ACL SETUSER bloomuser4 on >bloom_pass ~* &* -@all +@write +@bloom")
# Switch to the users with no bloom command access and check error occurs as expected
for i in range(1, 3):
client.execute_command(f"AUTH nonbloomuser{i} bloom_pass")
for cmd in bloom_commands:
self.verify_invalid_user_permissions(client, cmd, list_of_bloom_commands)
# Switch to the users with bloom command access and check commands are run as expected
for i in range(1, 5):
client.execute_command(f"AUTH bloomuser{i} bloom_pass")
for cmd in bloom_commands:
self.verify_valid_user_permissions(client, cmd)
self.client.execute_command('FLUSHDB')
wait_for_equal(lambda: self.client.execute_command('DBSIZE'), 0)

# Create two users one with denied access to bloom commands and one with access to bloom commands and all keys
client.execute_command("ACL SETUSER nonbloomuser on >bloom_pass -@bloom")
client.execute_command("ACL SETUSER bloomuser on >bloom_pass +@bloom ~*")
def verify_valid_user_permissions(self, client, cmd):
cmd_name = cmd[0].split()[0]
try:
result = client.execute_command(cmd[0])
if cmd[0].startswith("BF.M"):
assert len(result) == cmd[1]
# The first add in a new bloom object should always return 1. For MEXISTS the first item we check will have been added as well so should exist
assert result[0] == 1
else:
assert result == cmd[1], f"{cmd_name} should work for default user"
except Exception as e:
assert False, f"bloomuser should be able to execute {cmd_name}: {str(e)}"

# Switch to the user with no bloom command access and check error occurs as expected
client.execute_command("AUTH nonbloomuser bloom_pass")
for cmd in bloom_commands:
cmd_name = cmd[0].split()[0]
# Check that each command we try to run appeared in the list of commands with the bloom acl category
assert cmd_name.encode() in list_of_bloom_commands
try:
result = client.execute_command(cmd[0])
assert False, f"User with no bloom category access shouldnt be able to run {cmd_name}"
except Exception as e:
assert str(e) == f"User nonbloomuser has no permissions to run the '{cmd_name}' command"

# Switch to the user with bloom command access and check commands are run as expected
client.execute_command(f"AUTH bloomuser bloom_pass")
for cmd in bloom_commands:
cmd_name = cmd[0].split()[0]
try:
result = client.execute_command(cmd[0])
if cmd[0].startswith("BF.M"):
assert len(result) == cmd[1]
# The first add in a new bloom object should always return 1. For MEXISTS the first item we check will have been added as well so should exist
assert result[0] == 1
else:
assert result == cmd[1], f"{cmd_name} should work for default user"
except Exception as e:
assert False, f"bloomuser should be able to execute {cmd_name}: {str(e)}"
def verify_invalid_user_permissions(self, client, cmd, list_of_bloom_commands):
cmd_name = cmd[0].split()[0]
# Check that each command we try to run appeared in the list of commands with the bloom acl category
assert cmd_name.encode() in list_of_bloom_commands
try:
result = client.execute_command(cmd[0])
assert False, f"User with no bloom category access shouldnt be able to run {cmd_name}"
except Exception as e:
assert f"has no permissions to run the '{cmd_name}' command" in str(e)

def test_bloom_command_acl_categories(self):
# List of bloom commands and their acl categories
bloom_commands = [
('BF.ADD', [b'write' , b'denyoom', b'module', b'fast']),
('BF.EXISTS', [b'readonly', b'module', b'fast']),
('BF.MADD', [b'write', b'denyoom', b'module', b'fast']),
('BF.MEXISTS', [b'readonly', b'module', b'fast']),
('BF.INSERT', [b'write', b'denyoom', b'module', b'fast']),
('BF.INFO', [b'readonly', b'module', b'fast']),
('BF.CARD', [b'readonly', b'module', b'fast']),
('BF.RESERVE', [b'write', b'denyoom', b'module', b'fast']),
('BF.ADD', [b'write' , b'denyoom', b'module', b'fast'], [b'@write', b'@fast', b'@bloom']),
('BF.EXISTS', [b'readonly', b'module', b'fast'], [b'@read', b'@fast', b'@bloom']),
('BF.MADD', [b'write', b'denyoom', b'module', b'fast'], [b'@write', b'@fast', b'@bloom']),
('BF.MEXISTS', [b'readonly', b'module', b'fast'], [b'@read', b'@fast', b'@bloom']),
('BF.INSERT', [b'write', b'denyoom', b'module', b'fast'], [b'@write', b'@fast', b'@bloom']),
('BF.INFO', [b'readonly', b'module', b'fast'], [b'@read', b'@fast', b'@bloom']),
('BF.CARD', [b'readonly', b'module', b'fast'], [b'@read', b'@fast', b'@bloom']),
('BF.RESERVE', [b'write', b'denyoom', b'module', b'fast'], [b'@write', b'@fast', b'@bloom']),
('BF.LOAD', [b'write', b'denyoom', b'module'], [b'@write', b'@bloom']),
]
for cmd in bloom_commands:
# Get the info of the commands and compare the acl categories
cmd_info = self.client.execute_command(f'COMMAND INFO {cmd[0]}')
assert cmd_info[0][2] == cmd[1]
assert cmd_info[0][6] == [b'@bloom']
for category in cmd[2]:
assert category in cmd_info[0][6]
File renamed without changes.
File renamed without changes.
12 changes: 8 additions & 4 deletions tests/test_bloom_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,16 @@ def test_bloom_command_error(self):
('bf.insert key CAPACITY 10000 ERROR 0.01 EXPANSION 0.99 NOCREATE NONSCALING ITEMS test1 test2 test3', 'bad expansion'),
('BF.INSERT KEY HELLO WORLD', 'unknown argument received'),
('BF.INSERT KEY error 2 ITEMS test1', '(0 < error rate range < 1)'),
('BF.INSERT KEY ERROR err ITEMS test1', 'bad error rate'),
('BF.INSERT KEY TIGHTENING tr ITEMS test1', 'bad tightening ratio'),
('BF.INSERT KEY TIGHTENING 2 ITEMS test1', '(0 < tightening ratio range < 1)'),
('BF.INSERT TEST_LIMIT ERROR 0.99999999999999999 ITEMS ERROR_RATE', '(0 < error rate range < 1)'),
('BF.INSERT TEST_LIMIT TIGHTENING 0.99999999999999999 ITEMS ERROR_RATE', '(0 < tightening ratio range < 1)'),
('BF.INSERT TEST_LIMIT CAPACITY 9223372036854775808 ITEMS CAP', 'bad capacity'),
('BF.INSERT TEST_LIMIT CAPACITY 0 ITEMS CAP0', '(capacity should be larger than 0)'),
('BF.INSERT TEST_LIMIT EXPANSION 4294967299 ITEMS EXPAN', 'bad expansion'),
('BF.INSERT TEST_NOCREATE NOCREATE ITEMS A B', 'not found'),
('BF.INSERT KEY HELLO', 'unknown argument received'),
('BF.RESERVE KEY String 100', 'bad error rate'),
('BF.RESERVE KEY 0.99999999999999999 3000', '(0 < error rate range < 1)'),
('BF.RESERVE KEY 2 100', '(0 < error rate range < 1)'),
Expand All @@ -61,9 +66,6 @@ def test_bloom_command_error(self):
('BF.INFO', 'wrong number of arguments for \'BF.INFO\' command'),
('bf.info key capacity size', 'wrong number of arguments for \'BF.INFO\' command'),
('BF.INSERT', 'wrong number of arguments for \'BF.INSERT\' command'),
('BF.INSERT KEY', 'wrong number of arguments for \'BF.INSERT\' command'),
('BF.INSERT KEY HELLO', 'wrong number of arguments for \'BF.INSERT\' command'),
('BF.INSERT MISS_ITEM EXPANSION 2', 'wrong number of arguments for \'BF.INSERT\' command'),
('BF.INSERT MISS_ITEM EXPANSION 2 ITEMS', 'wrong number of arguments for \'BF.INSERT\' command'),
('BF.INSERT MISS_VAL ERROR 0.5 EXPANSION', 'wrong number of arguments for \'BF.INSERT\' command'),
('BF.INSERT MISS_VAL ERROR 0.5 CAPACITY', 'wrong number of arguments for \'BF.INSERT\' command'),
Expand Down Expand Up @@ -107,6 +109,8 @@ def test_bloom_command_behavior(self):
('BF.INSERT TEST_EXPANSION EXPANSION 9 ITEMS ITEM', 1),
('BF.INSERT TEST_CAPACITY CAPACITY 2000 ITEMS ITEM', 1),
('BF.INSERT TEST_ITEMS ITEMS 1 2 3 EXPANSION 2', 5),
('BF.INSERT KEY', 0),
('BF.INSERT KEY EXPANSION 2', 0),
('BF.INFO TEST Capacity', 100),
('BF.INFO TEST ITEMS', 5),
('BF.INFO TEST filters', 1),
Expand Down Expand Up @@ -147,4 +151,4 @@ def test_bloom_command_behavior(self):
assert bf_info[capacity_index] == self.client.execute_command('BF.INFO BF_INFO CAPACITY') == 2000
assert bf_info[filter_index] == self.client.execute_command('BF.INFO BF_INFO FILTERS') == 1
assert bf_info[item_index] == self.client.execute_command('BF.INFO BF_INFO ITEMS') == 0
assert bf_info[expansion_index] == self.client.execute_command('BF.INFO BF_INFO EXPANSION') == None
assert bf_info[expansion_index] == self.client.execute_command('BF.INFO BF_INFO EXPANSION') == None
File renamed without changes.
Loading

0 comments on commit a9b578e

Please sign in to comment.