Skip to content

Commit

Permalink
Support for DEBUG DIGEST module data type callback (#21)
Browse files Browse the repository at this point in the history
* Support for DEBUG DIGEST module data type callback

Signed-off-by: Nihal Mehta <[email protected]>

* Update test cases

Signed-off-by: Nihal Mehta <[email protected]>

* Move digest to wrapper

Signed-off-by: Nihal Mehta <[email protected]>

* Update tests

Signed-off-by: Nihal Mehta <[email protected]>

* Add more scenarios for debug test

Signed-off-by: Nihal Mehta <[email protected]>

* Clean code and add scenario for debug test

Signed-off-by: Nihal Mehta <[email protected]>

---------

Signed-off-by: Nihal Mehta <[email protected]>
  • Loading branch information
nnmehta authored and KarthikSubbarao committed Nov 30, 2024
1 parent a33e0e3 commit 7c25468
Show file tree
Hide file tree
Showing 12 changed files with 206 additions and 5 deletions.
20 changes: 19 additions & 1 deletion src/bloom/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::configs::{
use crate::metrics::BLOOM_NUM_OBJECTS;
use crate::metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES;
use crate::wrapper::bloom_callback;
use crate::wrapper::digest::Digest;
use crate::MODULE_NAME;
use std::mem;
use std::os::raw::c_int;
Expand All @@ -26,10 +27,10 @@ pub static BLOOM_FILTER_TYPE: ValkeyType = ValkeyType::new(
rdb_load: Some(bloom_callback::bloom_rdb_load),
rdb_save: Some(bloom_callback::bloom_rdb_save),
aof_rewrite: Some(bloom_callback::bloom_aof_rewrite),
digest: Some(bloom_callback::bloom_digest),

mem_usage: Some(bloom_callback::bloom_mem_usage),
// TODO
digest: None,
free: Some(bloom_callback::bloom_free),

aux_load: Some(bloom_callback::bloom_aux_load),
Expand All @@ -54,6 +55,7 @@ pub static BLOOM_FILTER_TYPE: ValkeyType = ValkeyType::new(

pub trait ValkeyDataType {
fn load_from_rdb(rdb: *mut raw::RedisModuleIO, encver: i32) -> Option<BloomFilterType>;
fn debug_digest(&self, dig: Digest);
}

impl ValkeyDataType for BloomFilterType {
Expand Down Expand Up @@ -126,6 +128,22 @@ impl ValkeyDataType for BloomFilterType {
};
Some(item)
}

/// Function that is used to generate a digest on the Bloom Object.
fn debug_digest(&self, mut dig: Digest) {
dig.add_long_long(self.expansion.into());
dig.add_string_buffer(&self.fp_rate.to_le_bytes());
for filter in &self.filters {
dig.add_string_buffer(&filter.bloom.bitmap());
for &(key1, key2) in &filter.sip_keys() {
dig.add_long_long(key1 as i64);
dig.add_long_long(key2 as i64);
}
dig.add_long_long(filter.num_items.into());
dig.add_long_long(filter.capacity.into());
}
dig.end_sequence();
}
}

/// Load the auxiliary data outside of the regular keyspace from the RDB file
Expand Down
4 changes: 4 additions & 0 deletions src/bloom/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,10 @@ impl BloomFilter {
self.bloom.set(item)
}

pub fn sip_keys(&self) -> [(u64, u64); 2] {
self.bloom.sip_keys()
}

/// Create a new BloomFilter from an existing BloomFilter object (COPY command).
pub fn create_copy_from(bf: &BloomFilter) -> BloomFilter {
BloomFilter::from_existing(
Expand Down
9 changes: 9 additions & 0 deletions src/wrapper/bloom_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::bloom;
use crate::bloom::data_type::ValkeyDataType;
use crate::bloom::utils::BloomFilterType;
use crate::configs;
use crate::wrapper::digest::Digest;
use std::ffi::CString;
use std::os::raw::{c_char, c_int, c_void};
use std::ptr::null_mut;
Expand Down Expand Up @@ -118,6 +119,14 @@ pub unsafe extern "C" fn bloom_copy(
Box::into_raw(bb).cast::<libc::c_void>()
}

/// # Safety
/// Raw handler for the Bloom digest callback.
pub unsafe extern "C" fn bloom_digest(md: *mut raw::RedisModuleDigest, value: *mut c_void) {
let mut dig = Digest::new(md);
let val = &*(value.cast::<BloomFilterType>());
val.debug_digest(dig);
}

/// # Safety
/// Raw handler for the Bloom object's free_effort callback.
pub unsafe extern "C" fn bloom_free_effort(
Expand Down
80 changes: 80 additions & 0 deletions src/wrapper/digest.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use std::os::raw::c_char;
use valkey_module::raw;
use valkey_module::ValkeyString;

/// `Digest` is a high-level rust interface to the Valkey module C API
/// abstracting away the raw C ffi calls.
pub struct Digest {
pub dig: *mut raw::RedisModuleDigest,
}

impl Digest {
pub const fn new(dig: *mut raw::RedisModuleDigest) -> Self {
Self { dig }
}

/// Returns the key name of this [`Digest`].
///
/// # Panics
///
/// Will panic if `RedisModule_GetKeyNameFromDigest` is missing in redismodule.h
pub fn get_key_name(&self) -> ValkeyString {
ValkeyString::from_redis_module_string(std::ptr::null_mut(), unsafe {
raw::RedisModule_GetKeyNameFromDigest
.expect("RedisModule_GetKeyNameFromDigest is not available.")(self.dig)
.cast_mut()
})
}

/// Returns the database ID of this [`Digest`].
///
/// # Panics
///
/// Will panic if `RedisModule_GetDbIdFromDigest` is missing in redismodule.h
pub fn get_db_id(&self) -> i32 {
unsafe {
raw::RedisModule_GetDbIdFromDigest
.expect("RedisModule_GetDbIdFromDigest is not available.")(self.dig)
}
}

/// Adds a new element to this [`Digest`].
///
/// # Panics
///
/// Will panic if `RedisModule_DigestAddStringBuffer` is missing in redismodule.h
pub fn add_string_buffer(&mut self, ele: &[u8]) {
unsafe {
raw::RedisModule_DigestAddStringBuffer
.expect("RedisModule_DigestAddStringBuffer is not available.")(
self.dig,
ele.as_ptr().cast::<c_char>(),
ele.len(),
)
}
}

/// Similar to [`Digest::add_string_buffer`], but takes [`i64`].
///
/// # Panics
///
/// Will panic if `RedisModule_DigestAddLongLong` is missing in redismodule.h
pub fn add_long_long(&mut self, ll: i64) {
unsafe {
raw::RedisModule_DigestAddLongLong
.expect("RedisModule_DigestAddLongLong is not available.")(self.dig, ll)
}
}

/// Ends the current sequence in this [`Digest`].
///
/// # Panics
///
/// Will panic if `RedisModule_DigestEndSequence` is missing in redismodule.h
pub fn end_sequence(&mut self) {
unsafe {
raw::RedisModule_DigestEndSequence
.expect("RedisModule_DigestEndSequence is not available.")(self.dig)
}
}
}
1 change: 1 addition & 0 deletions src/wrapper/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod bloom_callback;
pub mod digest;
20 changes: 19 additions & 1 deletion tests/test_aofrewrite.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,22 @@ def test_basic_aofrewrite_and_restore(self):
bf_info_result_1 = client.execute_command('BF.INFO testSave')
assert(len(bf_info_result_1)) != 0
curr_item_count_1 = client.info_obj().num_keys()

# cmd debug digest
server_digest = client.debug_digest()
assert server_digest != None or 0000000000000000000000000000000000000000
object_digest = client.execute_command('DEBUG DIGEST-VALUE testSave')

# save aof, restart sever
client.bgrewriteaof()
self.server.wait_for_action_done(ValkeyAction.AOF_REWRITE)
# Keep the server running for 1 second more to have a larger uptime.
time.sleep(1)
self.server.restart(remove_rdb=False, remove_nodes_conf=False, connect_client=True)
assert self.server.is_alive()
restored_server_digest = client.debug_digest()
restored_object_digest = client.execute_command('DEBUG DIGEST-VALUE testSave')
assert restored_server_digest == server_digest
assert restored_object_digest == object_digest

# verify restore results
curr_item_count_2 = client.info_obj().num_keys()
Expand All @@ -49,12 +57,22 @@ def test_aofrewrite_bloomfilter_metrics(self):
for var in variables:
self.client.execute_command(f'BF.ADD key1 {var}')

# cmd debug digest
server_digest = self.client.debug_digest()
assert server_digest != None or 0000000000000000000000000000000000000000
object_digest = self.client.execute_command('DEBUG DIGEST-VALUE key1')

# save aof, restart sever
self.client.bgrewriteaof()
self.server.wait_for_action_done(ValkeyAction.AOF_REWRITE)
# restart server
time.sleep(1)
self.server.restart(remove_rdb=False, remove_nodes_conf=False, connect_client=True)
assert self.server.is_alive()
restored_server_digest = self.client.debug_digest()
restored_object_digest = self.client.execute_command('DEBUG DIGEST-VALUE key1')
assert restored_server_digest == server_digest
assert restored_object_digest == object_digest

# Check info for scaled bloomfilter matches metrics data for bloomfilter
new_info_obj = self.client.execute_command(f'BF.INFO key1')
Expand Down
51 changes: 51 additions & 0 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,19 @@ def test_copy_and_exists_cmd(self):
assert client.execute_command('EXISTS filter') == 1
mexists_result = client.execute_command('BF.MEXISTS filter item1 item2 item3 item4')
assert len(madd_result) == 4 and len(mexists_result) == 4
# cmd debug digest
server_digest = client.debug_digest()
assert server_digest != None or 0000000000000000000000000000000000000000
object_digest = client.execute_command('DEBUG DIGEST-VALUE filter')
assert client.execute_command('COPY filter new_filter') == 1
copied_server_digest = client.debug_digest()
assert copied_server_digest != None or 0000000000000000000000000000000000000000
copied_object_digest = client.execute_command('DEBUG DIGEST-VALUE filter')
assert client.execute_command('EXISTS new_filter') == 1
copy_mexists_result = client.execute_command('BF.MEXISTS new_filter item1 item2 item3 item4')
assert mexists_result == copy_mexists_result
assert server_digest != copied_server_digest
assert copied_object_digest == object_digest

def test_memory_usage_cmd(self):
client = self.server.get_new_client()
Expand Down Expand Up @@ -240,3 +249,45 @@ def test_bloom_expiration(self):
assert client.execute_command('TTL TEST_PERSIST') > 0
assert client.execute_command('PERSIST TEST_PERSIST') == 1
assert client.execute_command('TTL TEST_PERSIST') == -1

def test_debug_cmd(self):
client = self.server.get_new_client()
default_obj = client.execute_command('BF.RESERVE default_obj 0.001 1000')
default_object_digest = client.execute_command('DEBUG DIGEST-VALUE default_obj')

# scenario1 validates that digest differs on bloom objects (with same properties) when different items are added.
scenario1_obj = client.execute_command('BF.INSERT scenario1 error 0.001 capacity 1000 items 1')
scenario1_object_digest = client.execute_command('DEBUG DIGEST-VALUE scenario1')
assert scenario1_obj != default_obj
assert scenario1_object_digest != default_object_digest

# scenario2 validates that digest differs on bloom objects with different false positive rate.
scenario2_obj = client.execute_command('BF.INSERT scenario2 error 0.002 capacity 1000 items 1')
scenario2_object_digest = client.execute_command('DEBUG DIGEST-VALUE scenario2')
assert scenario2_obj != default_obj
assert scenario2_object_digest != default_object_digest

# scenario3 validates that digest differs on bloom objects with different expansion.
scenario3_obj = client.execute_command('BF.INSERT scenario3 error 0.002 capacity 1000 expansion 3 items 1')
scenario3_object_digest = client.execute_command('DEBUG DIGEST-VALUE scenario3')
assert scenario3_obj != default_obj
assert scenario3_object_digest != default_object_digest


# scenario4 validates that digest differs on bloom objects with different capacity.
scenario4_obj = client.execute_command('BF.INSERT scenario4 error 0.001 capacity 2000 items 1')
scenario4_object_digest = client.execute_command('DEBUG DIGEST-VALUE scenario4')
assert scenario4_obj != default_obj
assert scenario4_object_digest != default_object_digest

# scenario5 validates that digest is equal on bloom objects with same properties and same items.
scenario5_obj = client.execute_command('BF.INSERT scenario5 error 0.001 capacity 1000 items 1')
scenario5_object_digest = client.execute_command('DEBUG DIGEST-VALUE scenario5')
assert scenario5_obj != default_obj
assert scenario5_object_digest != default_object_digest

client.execute_command('BF.MADD default_obj 1 2 3')
client.execute_command('BF.MADD scenario5 2 3')
madd_default_object_digest = client.execute_command('DEBUG DIGEST-VALUE default_obj')
madd_scenario_object_digest = client.execute_command('DEBUG DIGEST-VALUE scenario5')
assert madd_scenario_object_digest == madd_default_object_digest
1 change: 0 additions & 1 deletion tests/test_correctness.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ def test_scaling_filter(self):
filter_name = "filter1"
# Create a scaling bloom filter and validate its behavior.
assert client.execute_command(f'BF.RESERVE {filter_name} {expected_fp_rate} {initial_capacity} EXPANSION {expansion}') == b"OK"

info = client.execute_command(f'BF.INFO {filter_name}')
it = iter(info)
info_dict = dict(zip(it, it))
Expand Down
10 changes: 10 additions & 0 deletions tests/test_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ def test_replication_behavior(self):
assert primary_cmd_stats['cmdstat_BF.ADD']["calls"] == 2 and replica_cmd_stats['cmdstat_BF.ADD']["calls"] == 1
else:
assert primary_cmd_stats['cmdstat_' + prefix]["calls"] == (expected_calls + 1) and replica_cmd_stats['cmdstat_' + prefix]["calls"] == expected_calls

# cmd debug digest
server_digest_primary = self.client.debug_digest()
assert server_digest_primary != None or 0000000000000000000000000000000000000000
object_digest_primary = self.client.execute_command('DEBUG DIGEST-VALUE key')
server_digest_replica = self.client.debug_digest()
assert server_digest_primary == server_digest_replica
debug_digest_replica = self.replicas[0].client.execute_command('DEBUG DIGEST-VALUE key')
assert object_digest_primary == debug_digest_replica

self.client.execute_command('FLUSHALL')
self.waitForReplicaToSyncUp(self.replicas[0])
self.client.execute_command('CONFIG RESETSTAT')
Expand Down
10 changes: 9 additions & 1 deletion tests/test_save_and_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ def test_basic_save_and_restore(self):
bf_info_result_1 = client.execute_command('BF.INFO testSave')
assert(len(bf_info_result_1)) != 0
curr_item_count_1 = client.info_obj().num_keys()

# cmd debug digest
server_digest = client.debug_digest()
assert server_digest != None or 0000000000000000000000000000000000000000
object_digest = client.execute_command('DEBUG DIGEST-VALUE testSave')

# save rdb, restart sever
client.bgsave()
self.server.wait_for_save_done()
Expand All @@ -26,6 +30,10 @@ def test_basic_save_and_restore(self):
assert self.server.is_alive()
assert uptime_in_sec_1 > uptime_in_sec_2
assert self.server.is_rdb_done_loading()
restored_server_digest = client.debug_digest()
restored_object_digest = client.execute_command('DEBUG DIGEST-VALUE testSave')
assert restored_server_digest == server_digest
assert restored_object_digest == object_digest

# verify restore results
curr_item_count_2 = client.info_obj().num_keys()
Expand Down
3 changes: 3 additions & 0 deletions tests/valkey_bloom_test_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ def validate_copied_bloom_correctness(self, client, original_filter_name, item_p
"""
copy_filter_name = "filter_copy"
assert client.execute_command(f'COPY {original_filter_name} {copy_filter_name}') == 1
object_digest = client.execute_command(f'DEBUG DIGEST-VALUE {original_filter_name}')
copied_object_digest = client.execute_command(f'DEBUG DIGEST-VALUE {copy_filter_name}')
assert copied_object_digest == object_digest
assert client.execute_command('DBSIZE') == 2
copy_info = client.execute_command(f'BF.INFO {copy_filter_name}')
copy_it = iter(copy_info)
Expand Down
2 changes: 1 addition & 1 deletion tests/valkeytests/valkey_test_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ def port_tracker_fixture(self, resource_port_tracker):
self.port_tracker = resource_port_tracker

def _get_valkey_args(self):
self.args.update({"maxmemory":self.maxmemory, "maxmemory-policy":"allkeys-random", "activerehashing":"yes", "databases": self.num_dbs, "repl-diskless-sync": "yes", "save": ""})
self.args.update({"maxmemory":self.maxmemory, "maxmemory-policy":"allkeys-random", "activerehashing":"yes", "databases": self.num_dbs, "repl-diskless-sync": "yes", "save": "", "enable-debug-command":"yes"})
self.args.update(self.get_custom_args())
return self.args

Expand Down

0 comments on commit 7c25468

Please sign in to comment.