diff --git a/src/bloom/data_type.rs b/src/bloom/data_type.rs index 215f182..187afc5 100644 --- a/src/bloom/data_type.rs +++ b/src/bloom/data_type.rs @@ -6,6 +6,7 @@ use crate::configs::{ use crate::metrics::BLOOM_NUM_OBJECTS; use crate::metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES; use crate::wrapper::bloom_callback; +use crate::wrapper::digest::Digest; use crate::MODULE_NAME; use std::mem; use std::os::raw::c_int; @@ -26,10 +27,10 @@ pub static BLOOM_FILTER_TYPE: ValkeyType = ValkeyType::new( rdb_load: Some(bloom_callback::bloom_rdb_load), rdb_save: Some(bloom_callback::bloom_rdb_save), aof_rewrite: Some(bloom_callback::bloom_aof_rewrite), + digest: Some(bloom_callback::bloom_digest), mem_usage: Some(bloom_callback::bloom_mem_usage), // TODO - digest: None, free: Some(bloom_callback::bloom_free), aux_load: Some(bloom_callback::bloom_aux_load), @@ -54,6 +55,7 @@ pub static BLOOM_FILTER_TYPE: ValkeyType = ValkeyType::new( pub trait ValkeyDataType { fn load_from_rdb(rdb: *mut raw::RedisModuleIO, encver: i32) -> Option; + fn debug_digest(&self, dig: Digest); } impl ValkeyDataType for BloomFilterType { @@ -126,6 +128,22 @@ impl ValkeyDataType for BloomFilterType { }; Some(item) } + + /// Function that is used to generate a digest on the Bloom Object. + fn debug_digest(&self, mut dig: Digest) { + dig.add_long_long(self.expansion.into()); + dig.add_string_buffer(&self.fp_rate.to_le_bytes()); + for filter in &self.filters { + dig.add_string_buffer(&filter.bloom.bitmap()); + for &(key1, key2) in &filter.sip_keys() { + dig.add_long_long(key1 as i64); + dig.add_long_long(key2 as i64); + } + dig.add_long_long(filter.num_items.into()); + dig.add_long_long(filter.capacity.into()); + } + dig.end_sequence(); + } } /// Load the auxiliary data outside of the regular keyspace from the RDB file diff --git a/src/bloom/utils.rs b/src/bloom/utils.rs index 55e327f..84adbcb 100644 --- a/src/bloom/utils.rs +++ b/src/bloom/utils.rs @@ -411,6 +411,10 @@ impl BloomFilter { self.bloom.set(item) } + pub fn sip_keys(&self) -> [(u64, u64); 2] { + self.bloom.sip_keys() + } + /// Create a new BloomFilter from an existing BloomFilter object (COPY command). pub fn create_copy_from(bf: &BloomFilter) -> BloomFilter { BloomFilter::from_existing( diff --git a/src/wrapper/bloom_callback.rs b/src/wrapper/bloom_callback.rs index caa882b..e2bde8e 100644 --- a/src/wrapper/bloom_callback.rs +++ b/src/wrapper/bloom_callback.rs @@ -2,6 +2,7 @@ use crate::bloom; use crate::bloom::data_type::ValkeyDataType; use crate::bloom::utils::BloomFilterType; use crate::configs; +use crate::wrapper::digest::Digest; use std::ffi::CString; use std::os::raw::{c_char, c_int, c_void}; use std::ptr::null_mut; @@ -118,6 +119,14 @@ pub unsafe extern "C" fn bloom_copy( Box::into_raw(bb).cast::() } +/// # Safety +/// Raw handler for the Bloom digest callback. +pub unsafe extern "C" fn bloom_digest(md: *mut raw::RedisModuleDigest, value: *mut c_void) { + let mut dig = Digest::new(md); + let val = &*(value.cast::()); + val.debug_digest(dig); +} + /// # Safety /// Raw handler for the Bloom object's free_effort callback. pub unsafe extern "C" fn bloom_free_effort( diff --git a/src/wrapper/digest.rs b/src/wrapper/digest.rs new file mode 100644 index 0000000..94ecabe --- /dev/null +++ b/src/wrapper/digest.rs @@ -0,0 +1,80 @@ +use std::os::raw::c_char; +use valkey_module::raw; +use valkey_module::ValkeyString; + +/// `Digest` is a high-level rust interface to the Valkey module C API +/// abstracting away the raw C ffi calls. +pub struct Digest { + pub dig: *mut raw::RedisModuleDigest, +} + +impl Digest { + pub const fn new(dig: *mut raw::RedisModuleDigest) -> Self { + Self { dig } + } + + /// Returns the key name of this [`Digest`]. + /// + /// # Panics + /// + /// Will panic if `RedisModule_GetKeyNameFromDigest` is missing in redismodule.h + pub fn get_key_name(&self) -> ValkeyString { + ValkeyString::from_redis_module_string(std::ptr::null_mut(), unsafe { + raw::RedisModule_GetKeyNameFromDigest + .expect("RedisModule_GetKeyNameFromDigest is not available.")(self.dig) + .cast_mut() + }) + } + + /// Returns the database ID of this [`Digest`]. + /// + /// # Panics + /// + /// Will panic if `RedisModule_GetDbIdFromDigest` is missing in redismodule.h + pub fn get_db_id(&self) -> i32 { + unsafe { + raw::RedisModule_GetDbIdFromDigest + .expect("RedisModule_GetDbIdFromDigest is not available.")(self.dig) + } + } + + /// Adds a new element to this [`Digest`]. + /// + /// # Panics + /// + /// Will panic if `RedisModule_DigestAddStringBuffer` is missing in redismodule.h + pub fn add_string_buffer(&mut self, ele: &[u8]) { + unsafe { + raw::RedisModule_DigestAddStringBuffer + .expect("RedisModule_DigestAddStringBuffer is not available.")( + self.dig, + ele.as_ptr().cast::(), + ele.len(), + ) + } + } + + /// Similar to [`Digest::add_string_buffer`], but takes [`i64`]. + /// + /// # Panics + /// + /// Will panic if `RedisModule_DigestAddLongLong` is missing in redismodule.h + pub fn add_long_long(&mut self, ll: i64) { + unsafe { + raw::RedisModule_DigestAddLongLong + .expect("RedisModule_DigestAddLongLong is not available.")(self.dig, ll) + } + } + + /// Ends the current sequence in this [`Digest`]. + /// + /// # Panics + /// + /// Will panic if `RedisModule_DigestEndSequence` is missing in redismodule.h + pub fn end_sequence(&mut self) { + unsafe { + raw::RedisModule_DigestEndSequence + .expect("RedisModule_DigestEndSequence is not available.")(self.dig) + } + } +} diff --git a/src/wrapper/mod.rs b/src/wrapper/mod.rs index 09023e2..8610a17 100644 --- a/src/wrapper/mod.rs +++ b/src/wrapper/mod.rs @@ -1 +1,2 @@ pub mod bloom_callback; +pub mod digest; diff --git a/tests/test_aofrewrite.py b/tests/test_aofrewrite.py index 42db513..fc94a1d 100644 --- a/tests/test_aofrewrite.py +++ b/tests/test_aofrewrite.py @@ -20,7 +20,11 @@ def test_basic_aofrewrite_and_restore(self): bf_info_result_1 = client.execute_command('BF.INFO testSave') assert(len(bf_info_result_1)) != 0 curr_item_count_1 = client.info_obj().num_keys() - + # cmd debug digest + server_digest = client.debug_digest() + assert server_digest != None or 0000000000000000000000000000000000000000 + object_digest = client.execute_command('DEBUG DIGEST-VALUE testSave') + # save aof, restart sever client.bgrewriteaof() self.server.wait_for_action_done(ValkeyAction.AOF_REWRITE) @@ -28,6 +32,10 @@ def test_basic_aofrewrite_and_restore(self): time.sleep(1) self.server.restart(remove_rdb=False, remove_nodes_conf=False, connect_client=True) assert self.server.is_alive() + restored_server_digest = client.debug_digest() + restored_object_digest = client.execute_command('DEBUG DIGEST-VALUE testSave') + assert restored_server_digest == server_digest + assert restored_object_digest == object_digest # verify restore results curr_item_count_2 = client.info_obj().num_keys() @@ -49,12 +57,22 @@ def test_aofrewrite_bloomfilter_metrics(self): for var in variables: self.client.execute_command(f'BF.ADD key1 {var}') + # cmd debug digest + server_digest = self.client.debug_digest() + assert server_digest != None or 0000000000000000000000000000000000000000 + object_digest = self.client.execute_command('DEBUG DIGEST-VALUE key1') + # save aof, restart sever self.client.bgrewriteaof() self.server.wait_for_action_done(ValkeyAction.AOF_REWRITE) # restart server time.sleep(1) self.server.restart(remove_rdb=False, remove_nodes_conf=False, connect_client=True) + assert self.server.is_alive() + restored_server_digest = self.client.debug_digest() + restored_object_digest = self.client.execute_command('DEBUG DIGEST-VALUE key1') + assert restored_server_digest == server_digest + assert restored_object_digest == object_digest # Check info for scaled bloomfilter matches metrics data for bloomfilter new_info_obj = self.client.execute_command(f'BF.INFO key1') diff --git a/tests/test_basic.py b/tests/test_basic.py index ae0357b..35a3510 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -39,10 +39,19 @@ def test_copy_and_exists_cmd(self): assert client.execute_command('EXISTS filter') == 1 mexists_result = client.execute_command('BF.MEXISTS filter item1 item2 item3 item4') assert len(madd_result) == 4 and len(mexists_result) == 4 + # cmd debug digest + server_digest = client.debug_digest() + assert server_digest != None or 0000000000000000000000000000000000000000 + object_digest = client.execute_command('DEBUG DIGEST-VALUE filter') assert client.execute_command('COPY filter new_filter') == 1 + copied_server_digest = client.debug_digest() + assert copied_server_digest != None or 0000000000000000000000000000000000000000 + copied_object_digest = client.execute_command('DEBUG DIGEST-VALUE filter') assert client.execute_command('EXISTS new_filter') == 1 copy_mexists_result = client.execute_command('BF.MEXISTS new_filter item1 item2 item3 item4') assert mexists_result == copy_mexists_result + assert server_digest != copied_server_digest + assert copied_object_digest == object_digest def test_memory_usage_cmd(self): client = self.server.get_new_client() @@ -240,3 +249,45 @@ def test_bloom_expiration(self): assert client.execute_command('TTL TEST_PERSIST') > 0 assert client.execute_command('PERSIST TEST_PERSIST') == 1 assert client.execute_command('TTL TEST_PERSIST') == -1 + + def test_debug_cmd(self): + client = self.server.get_new_client() + default_obj = client.execute_command('BF.RESERVE default_obj 0.001 1000') + default_object_digest = client.execute_command('DEBUG DIGEST-VALUE default_obj') + + # scenario1 validates that digest differs on bloom objects (with same properties) when different items are added. + scenario1_obj = client.execute_command('BF.INSERT scenario1 error 0.001 capacity 1000 items 1') + scenario1_object_digest = client.execute_command('DEBUG DIGEST-VALUE scenario1') + assert scenario1_obj != default_obj + assert scenario1_object_digest != default_object_digest + + # scenario2 validates that digest differs on bloom objects with different false positive rate. + scenario2_obj = client.execute_command('BF.INSERT scenario2 error 0.002 capacity 1000 items 1') + scenario2_object_digest = client.execute_command('DEBUG DIGEST-VALUE scenario2') + assert scenario2_obj != default_obj + assert scenario2_object_digest != default_object_digest + + # scenario3 validates that digest differs on bloom objects with different expansion. + scenario3_obj = client.execute_command('BF.INSERT scenario3 error 0.002 capacity 1000 expansion 3 items 1') + scenario3_object_digest = client.execute_command('DEBUG DIGEST-VALUE scenario3') + assert scenario3_obj != default_obj + assert scenario3_object_digest != default_object_digest + + + # scenario4 validates that digest differs on bloom objects with different capacity. + scenario4_obj = client.execute_command('BF.INSERT scenario4 error 0.001 capacity 2000 items 1') + scenario4_object_digest = client.execute_command('DEBUG DIGEST-VALUE scenario4') + assert scenario4_obj != default_obj + assert scenario4_object_digest != default_object_digest + + # scenario5 validates that digest is equal on bloom objects with same properties and same items. + scenario5_obj = client.execute_command('BF.INSERT scenario5 error 0.001 capacity 1000 items 1') + scenario5_object_digest = client.execute_command('DEBUG DIGEST-VALUE scenario5') + assert scenario5_obj != default_obj + assert scenario5_object_digest != default_object_digest + + client.execute_command('BF.MADD default_obj 1 2 3') + client.execute_command('BF.MADD scenario5 2 3') + madd_default_object_digest = client.execute_command('DEBUG DIGEST-VALUE default_obj') + madd_scenario_object_digest = client.execute_command('DEBUG DIGEST-VALUE scenario5') + assert madd_scenario_object_digest == madd_default_object_digest diff --git a/tests/test_correctness.py b/tests/test_correctness.py index 9d66f36..391e387 100644 --- a/tests/test_correctness.py +++ b/tests/test_correctness.py @@ -67,7 +67,6 @@ def test_scaling_filter(self): filter_name = "filter1" # Create a scaling bloom filter and validate its behavior. assert client.execute_command(f'BF.RESERVE {filter_name} {expected_fp_rate} {initial_capacity} EXPANSION {expansion}') == b"OK" - info = client.execute_command(f'BF.INFO {filter_name}') it = iter(info) info_dict = dict(zip(it, it)) diff --git a/tests/test_replication.py b/tests/test_replication.py index fdb3dbb..36012f2 100644 --- a/tests/test_replication.py +++ b/tests/test_replication.py @@ -68,6 +68,16 @@ def test_replication_behavior(self): assert primary_cmd_stats['cmdstat_BF.ADD']["calls"] == 2 and replica_cmd_stats['cmdstat_BF.ADD']["calls"] == 1 else: assert primary_cmd_stats['cmdstat_' + prefix]["calls"] == (expected_calls + 1) and replica_cmd_stats['cmdstat_' + prefix]["calls"] == expected_calls + + # cmd debug digest + server_digest_primary = self.client.debug_digest() + assert server_digest_primary != None or 0000000000000000000000000000000000000000 + object_digest_primary = self.client.execute_command('DEBUG DIGEST-VALUE key') + server_digest_replica = self.client.debug_digest() + assert server_digest_primary == server_digest_replica + debug_digest_replica = self.replicas[0].client.execute_command('DEBUG DIGEST-VALUE key') + assert object_digest_primary == debug_digest_replica + self.client.execute_command('FLUSHALL') self.waitForReplicaToSyncUp(self.replicas[0]) self.client.execute_command('CONFIG RESETSTAT') diff --git a/tests/test_save_and_restore.py b/tests/test_save_and_restore.py index c160f85..3ce539e 100644 --- a/tests/test_save_and_restore.py +++ b/tests/test_save_and_restore.py @@ -14,7 +14,11 @@ def test_basic_save_and_restore(self): bf_info_result_1 = client.execute_command('BF.INFO testSave') assert(len(bf_info_result_1)) != 0 curr_item_count_1 = client.info_obj().num_keys() - + # cmd debug digest + server_digest = client.debug_digest() + assert server_digest != None or 0000000000000000000000000000000000000000 + object_digest = client.execute_command('DEBUG DIGEST-VALUE testSave') + # save rdb, restart sever client.bgsave() self.server.wait_for_save_done() @@ -26,6 +30,10 @@ def test_basic_save_and_restore(self): assert self.server.is_alive() assert uptime_in_sec_1 > uptime_in_sec_2 assert self.server.is_rdb_done_loading() + restored_server_digest = client.debug_digest() + restored_object_digest = client.execute_command('DEBUG DIGEST-VALUE testSave') + assert restored_server_digest == server_digest + assert restored_object_digest == object_digest # verify restore results curr_item_count_2 = client.info_obj().num_keys() diff --git a/tests/valkey_bloom_test_case.py b/tests/valkey_bloom_test_case.py index 7e3129d..4e42d3e 100644 --- a/tests/valkey_bloom_test_case.py +++ b/tests/valkey_bloom_test_case.py @@ -110,6 +110,9 @@ def validate_copied_bloom_correctness(self, client, original_filter_name, item_p """ copy_filter_name = "filter_copy" assert client.execute_command(f'COPY {original_filter_name} {copy_filter_name}') == 1 + object_digest = client.execute_command(f'DEBUG DIGEST-VALUE {original_filter_name}') + copied_object_digest = client.execute_command(f'DEBUG DIGEST-VALUE {copy_filter_name}') + assert copied_object_digest == object_digest assert client.execute_command('DBSIZE') == 2 copy_info = client.execute_command(f'BF.INFO {copy_filter_name}') copy_it = iter(copy_info) diff --git a/tests/valkeytests/valkey_test_case.py b/tests/valkeytests/valkey_test_case.py index 47a5f6c..656ae8c 100644 --- a/tests/valkeytests/valkey_test_case.py +++ b/tests/valkeytests/valkey_test_case.py @@ -474,7 +474,7 @@ def port_tracker_fixture(self, resource_port_tracker): self.port_tracker = resource_port_tracker def _get_valkey_args(self): - self.args.update({"maxmemory":self.maxmemory, "maxmemory-policy":"allkeys-random", "activerehashing":"yes", "databases": self.num_dbs, "repl-diskless-sync": "yes", "save": ""}) + self.args.update({"maxmemory":self.maxmemory, "maxmemory-policy":"allkeys-random", "activerehashing":"yes", "databases": self.num_dbs, "repl-diskless-sync": "yes", "save": "", "enable-debug-command":"yes"}) self.args.update(self.get_custom_args()) return self.args