Skip to content

Commit

Permalink
Incorporate secondary testing as part of regular get() tests
Browse files Browse the repository at this point in the history
  • Loading branch information
archang19 committed Jan 8, 2025
1 parent 3f13b5d commit 8820235
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 92 deletions.
227 changes: 136 additions & 91 deletions db_stress_tool/no_batched_ops_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <iostream>

#include "db/dbformat.h"
#include "db_stress_tool/db_stress_listener.h"
Expand Down Expand Up @@ -154,6 +153,46 @@ class NonBatchedOpsStressTest : public StressTest {
from_db.data(), from_db.size());
}
}

if (secondary_db_) {
assert(secondary_cfhs_.size() == column_families_.size());
// We are going to read in the expected values before catching the
// secondary up to the primary. This sets the lower bound of the
// acceptable values that can be returned from the secondary. After
// each Get() to the secondary, we are going to read in the expected
// value again to determine the upper bound. As long as the returned
// value from Get() is within these bounds, we consider that okay. The
// lower bound will always be moving forwards anyways as
// TryCatchUpWithPrimary() gets called.
std::vector<ExpectedValue> pre_read_expected_values;
for (int64_t i = start; i < end; ++i) {
pre_read_expected_values.push_back(
shared->Get(static_cast<int>(cf), i));
}

Status s = secondary_db_->TryCatchUpWithPrimary();
if (!s.ok()) {
VerificationAbort(shared,
"Secondary failed to catch up to the primary");
}

for (int64_t i = start; i < end; ++i) {
if (thread->shared->HasVerificationFailedYet()) {
break;
}

const std::string key = Key(i);
std::string from_db;

s = secondary_db_->Get(options, column_families_[cf], key,
&from_db);

VerifyValueRange(static_cast<int>(cf), i, options, shared, from_db,
/* msg_prefix */ "Secondary get verification", s,
pre_read_expected_values[i - start]);
}
}

} else if (method == VerificationMethod::kGetEntity) {
for (int64_t i = start; i < end; ++i) {
if (thread->shared->HasVerificationFailedYet()) {
Expand Down Expand Up @@ -339,82 +378,12 @@ class NonBatchedOpsStressTest : public StressTest {
}
assert(secondary_db_);
assert(!secondary_cfhs_.empty());

auto* shared = thread->shared;
assert(shared);

// Each thread is going to verify a different portion of the key space
const int64_t max_key = shared->GetMaxKey();
const int64_t keys_per_thread = max_key / shared->GetNumThreads();
int64_t start = keys_per_thread * thread->tid;
int64_t end = thread->tid == shared->GetNumThreads() - 1
? max_key
: start + keys_per_thread;

// We are going to read in the expected values before catching the secondary
// up to the primary. This sets the lower bound of the acceptable values
// that can be returned from the secondary. After each Get() to the
// secondary, we are going to read in the expected value again to determine
// the upper bound. As long as the returned value from Get() is within these
// bounds, we consider that okay. The lower bound will always be moving
// forwards anyways as TryCatchUpWithPrimary() gets called.
std::vector<ExpectedValue> pre_read_expected_values;
for (int64_t i = start; i < end; ++i) {
pre_read_expected_values.push_back(shared->Get(0, i));
}

Status s = secondary_db_->TryCatchUpWithPrimary();
if (!s.ok()) {
VerificationAbort(shared, "Secondary failed to catch up to the primary");
assert(false);
exit(1);
}

ReadOptions read_opts(FLAGS_verify_checksum, true);
// Check that there are no keys in the secondary that should not exist, and
// that the values fall into the acceptable range for those that should
// exist
for (int64_t i = start; i < end; ++i) {
if (shared->HasVerificationFailedYet()) {
break;
}
const ExpectedValue pre_read_expected_value =
pre_read_expected_values[i - start];
std::string from_db;
std::string key_str = Key(i);
Slice key = key_str;
s = db_->Get(read_opts, column_families_[0], key_str, &from_db);
const ExpectedValue post_read_expected_value = shared->Get(0, i);
if (s.ok()) {
const Slice slice(from_db);
const uint32_t value_base_from_db = GetValueBase(slice);
if (!ExpectedValueHelper::InExpectedValueBaseRange(
value_base_from_db, pre_read_expected_value,
post_read_expected_value)) {
std::cout << "Secondary verification failed for i=" << i
<< ", key=" << key.ToString(true)
<< ". Get() returned a value base " << value_base_from_db
<< " which was outside of the value base range of"
<< pre_read_expected_value.GetValueBase() << " to"
<< post_read_expected_value.GetFinalValueBase()
<< std::endl;
shared->SetVerificationFailure();
break;
}
} else if (s.IsNotFound()) {
if (ExpectedValueHelper::MustHaveExisted(pre_read_expected_value,
post_read_expected_value)) {
std::cout
<< "Secondary verification failed for i=" << i
<< ", key=" << key.ToString(true)
<< ". Get() returned NotFound when the key should have existed."
<< std::endl;
shared->SetVerificationFailure();
break;
}
}
}

const auto checksum_column_family = [](Iterator* iter,
uint32_t* checksum) -> Status {
assert(nullptr != checksum);
Expand All @@ -427,6 +396,10 @@ class NonBatchedOpsStressTest : public StressTest {
return iter->status();
};

auto* shared = thread->shared;
assert(shared);
const int64_t max_key = shared->GetMaxKey();
ReadOptions read_opts(FLAGS_verify_checksum, true);
std::string ts_str;
Slice ts;
if (FLAGS_user_timestamp_size > 0) {
Expand All @@ -436,8 +409,19 @@ class NonBatchedOpsStressTest : public StressTest {
}

static Random64 rand64(shared->GetSeed());
for (size_t cf = 0; cf < secondary_cfhs_.size(); cf++) {
ColumnFamilyHandle* handle = secondary_cfhs_[cf];

{
uint32_t crc = 0;
std::unique_ptr<Iterator> it(secondary_db_->NewIterator(read_opts));
s = checksum_column_family(it.get(), &crc);
if (!s.ok()) {
fprintf(stderr, "Computing checksum of default cf: %s\n",
s.ToString().c_str());
assert(false);
}
}

for (auto* handle : secondary_cfhs_) {
if (thread->rand.OneInOpt(3)) {
// Use Get()
uint64_t key = rand64.Uniform(static_cast<uint64_t>(max_key));
Expand All @@ -448,11 +432,7 @@ class NonBatchedOpsStressTest : public StressTest {
read_opts, handle, key_str, &value,
FLAGS_user_timestamp_size > 0 ? &key_ts : nullptr);
s.PermitUncheckedError();
} else if (!FLAGS_inplace_update_support) {
// I think this portion of the verification failed because the
// combination of inplace_update_support=true and backward iteration is
// not allowed.

} else {
// Use range scan
std::unique_ptr<Iterator> iter(
secondary_db_->NewIterator(read_opts, handle));
Expand All @@ -476,7 +456,6 @@ class NonBatchedOpsStressTest : public StressTest {
iter->Seek(key_str);
for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Next()) {
}

} else {
// SeekForPrev() + Prev()*5
uint64_t key = rand64.Uniform(static_cast<uint64_t>(max_key));
Expand All @@ -485,18 +464,6 @@ class NonBatchedOpsStressTest : public StressTest {
for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Prev()) {
}
}
} else {
uint32_t crc = 0;
std::unique_ptr<Iterator> it(
secondary_db_->NewIterator(read_opts, handle));
s = checksum_column_family(it.get(), &crc);
if (!s.ok()) {
std::string checksum_err_msg =
"Failed to compute checksum for secondary cf " +
std::to_string(cf) + ". Status: " + s.ToString();
VerificationAbort(shared, checksum_err_msg);
assert(false);
}
}
}
}
Expand Down Expand Up @@ -2885,6 +2852,84 @@ class NonBatchedOpsStressTest : public StressTest {
return true;
}

// Compared to VerifyOrSyncValue, VerifyValueRange takes in a
// pre_read_expected_value to determine the lower bound of acceptable values.
// Anything from the pre_read_expected_value to the post_read_expected_value
// is considered acceptable. VerifyValueRange does not perform the initial
// "sync" step and does not compare the exact data/lengths for the values.
// This verification is suitable for verifying secondary or follower databases
bool VerifyValueRange(int cf, int64_t key, const ReadOptions& opts,
SharedState* shared, const std::string& value_from_db,
const std::string& msg_prefix, const Status& s,
const ExpectedValue& pre_read_expected_value) const {
if (shared->HasVerificationFailedYet()) {
return false;
}
const ExpectedValue post_read_expected_value = shared->Get(cf, key);
char expected_value_data[kValueMaxLen];
size_t expected_value_data_size =
GenerateValue(post_read_expected_value.GetValueBase(),
expected_value_data, sizeof(expected_value_data));

std::ostringstream read_u64ts;
if (opts.timestamp) {
read_u64ts << " while read with timestamp: ";
uint64_t read_ts;
if (DecodeU64Ts(*opts.timestamp, &read_ts).ok()) {
read_u64ts << std::to_string(read_ts) << ", ";
} else {
read_u64ts << s.ToString()
<< " Encoded read timestamp: " << opts.timestamp->ToString()
<< ", ";
}
}

// Compare value_from_db with the range of possible values from
// pre_read_expected_value to post_read_expected_value
if (s.ok()) {
const Slice slice(value_from_db);
const uint32_t value_base_from_db = GetValueBase(slice);
if (ExpectedValueHelper::MustHaveNotExisted(pre_read_expected_value,
post_read_expected_value)) {
VerificationAbort(shared,
msg_prefix +
": Unexpected value found that should not exist" +
read_u64ts.str(),
cf, key, value_from_db, "");
return false;
}
if (!ExpectedValueHelper::InExpectedValueBaseRange(
value_base_from_db, pre_read_expected_value,
post_read_expected_value)) {
VerificationAbort(
shared,
msg_prefix +
": Unexpected value found outside of the value base range" +
read_u64ts.str(),
cf, key, value_from_db,
Slice(expected_value_data, expected_value_data_size));
return false;
}
} else if (s.IsNotFound()) {
if (ExpectedValueHelper::MustHaveExisted(pre_read_expected_value,
post_read_expected_value)) {
VerificationAbort(shared,
msg_prefix + ": Value not found which should exist" +
read_u64ts.str() + s.ToString(),
cf, key, "",
Slice(expected_value_data, expected_value_data_size));
return false;
}
} else {
VerificationAbort(
shared,
msg_prefix + "Non-OK status" + read_u64ts.str() + s.ToString(), cf,
key, "", Slice(expected_value_data, expected_value_data_size));
return false;
}
return true;
}

void PrepareTxnDbOptions(SharedState* shared,
TransactionDBOptions& txn_db_opts) override {
txn_db_opts.rollback_deletion_type_callback =
Expand Down
1 change: 0 additions & 1 deletion tools/db_crashtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,6 @@ def is_direct_io_supported(dbname):
"level_compaction_dynamic_level_bytes": lambda: random.randint(0, 1),
"paranoid_file_checks": lambda: random.choice([0, 1, 1, 1]),
"test_secondary": lambda: random.choice([0, 1]),
"continuous_verification_interval": lambda: random.choice([0, 1000]),
}

blackbox_simple_default_params = {
Expand Down

0 comments on commit 8820235

Please sign in to comment.