Skip to content

Commit

Permalink
Introduce MultiCfIterator (#12153)
Browse files Browse the repository at this point in the history
Summary:
This PR introduces a new implementation of `Iterator` via a new public API called `NewMultiCfIterator()`. The new API takes a vector of column family handles to build a cross-column-family iterator, which internally maintains multiple `DBIter`s as child iterators from a consistent database state. When a key exists in multiple column families, the iterator selects the value (and wide columns) from the first column family containing the key, following the order provided in the `column_families` parameter. Similar to the merging iterator, a min heap is used to iterate across the child iterators. Backward iteration and direction change functionalities will be implemented in future PRs.

The comparator used to compare keys across different column families will be derived from the iterator of the first column family specified in `column_families`. This comparator will be checked against the comparators from all other column families that the iterator will traverse. If there's a mismatch with any of the comparators, the initialization of the iterator will fail.

Please note that this PR is not enough for users to start using `MultiCfIterator`. The `MultiCfIterator` and related APIs are still marked as "**DO NOT USE - UNDER CONSTRUCTION**". This PR is just the first of many PRs that will follow soon.

This PR includes the following:
- Introduction and partial implementation of the `MultiCfIterator`, which implements the generic `Iterator` interface. The implementation includes the construction of the iterator, `SeekToFirst()`, `Next()`, `Valid()`, `key()`, `value()`, and `columns()`.
- Unit tests to verify iteration across multiple column families in two distinct scenarios: (1) keys are unique across all column families, and (2) the same keys exist in multiple column families.

Pull Request resolved: #12153

Reviewed By: pdillinger

Differential Revision: D52308697

Pulled By: jaykorean

fbshipit-source-id: b03e69f13b40af5a8f0598d0f43a0bec01ef8294
  • Loading branch information
jaykorean authored and facebook-github-bot committed Mar 5, 2024
1 parent 3fff57f commit 3412195
Show file tree
Hide file tree
Showing 14 changed files with 623 additions and 1 deletion.
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ else()
endif()
if(MINGW)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-format")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wa,-mbig-obj")
add_definitions(-D_POSIX_C_SOURCE=1)
endif()
if(NOT CMAKE_BUILD_TYPE STREQUAL "Debug")
Expand Down Expand Up @@ -691,6 +692,7 @@ set(SOURCES
db/memtable_list.cc
db/merge_helper.cc
db/merge_operator.cc
db/multi_cf_iterator.cc
db/output_validator.cc
db/periodic_task_scheduler.cc
db/range_del_aggregator.cc
Expand Down Expand Up @@ -1343,6 +1345,7 @@ if(WITH_TESTS)
db/memtable_list_test.cc
db/merge_helper_test.cc
db/merge_test.cc
db/multi_cf_iterator_test.cc
db/options_file_test.cc
db/perf_context_test.cc
db/periodic_task_scheduler_test.cc
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1640,6 +1640,9 @@ wal_edit_test: $(OBJ_DIR)/db/wal_edit_test.o $(TEST_LIBRARY) $(LIBRARY)
dbformat_test: $(OBJ_DIR)/db/dbformat_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

multi_cf_iterator_test: $(OBJ_DIR)/db/multi_cf_iterator_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

env_basic_test: $(OBJ_DIR)/env/env_basic_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

Expand Down
7 changes: 7 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"db/memtable_list.cc",
"db/merge_helper.cc",
"db/merge_operator.cc",
"db/multi_cf_iterator.cc",
"db/output_validator.cc",
"db/periodic_task_scheduler.cc",
"db/range_del_aggregator.cc",
Expand Down Expand Up @@ -5226,6 +5227,12 @@ cpp_unittest_wrapper(name="mock_env_test",
extra_compiler_flags=[])


cpp_unittest_wrapper(name="multi_cf_iterator_test",
srcs=["db/multi_cf_iterator_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])


cpp_unittest_wrapper(name="object_registry_test",
srcs=["utilities/object_registry_test.cc"],
deps=[":rocksdb_test_lib"],
Expand Down
27 changes: 26 additions & 1 deletion db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
#include "db/memtable.h"
#include "db/memtable_list.h"
#include "db/merge_context.h"
#include "db/merge_helper.h"
#include "db/multi_cf_iterator.h"
#include "db/periodic_task_scheduler.h"
#include "db/range_tombstone_fragmenter.h"
#include "db/table_cache.h"
Expand Down Expand Up @@ -3735,6 +3735,31 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(
return db_iter;
}

std::unique_ptr<Iterator> DBImpl::NewMultiCfIterator(
const ReadOptions& _read_options,
const std::vector<ColumnFamilyHandle*>& column_families) {
if (column_families.size() == 0) {
return std::unique_ptr<Iterator>(NewErrorIterator(
Status::InvalidArgument("No Column Family was provided")));
}
const Comparator* first_comparator = column_families[0]->GetComparator();
for (size_t i = 1; i < column_families.size(); ++i) {
const Comparator* cf_comparator = column_families[i]->GetComparator();
if (first_comparator != cf_comparator &&
first_comparator->GetId().compare(cf_comparator->GetId()) != 0) {
return std::unique_ptr<Iterator>(NewErrorIterator(Status::InvalidArgument(
"Different comparators are being used across CFs")));
}
}
std::vector<Iterator*> child_iterators;
Status s = NewIterators(_read_options, column_families, &child_iterators);
if (s.ok()) {
return std::make_unique<MultiCfIterator>(first_comparator, column_families,
std::move(child_iterators));
}
return std::unique_ptr<Iterator>(NewErrorIterator(s));
}

Status DBImpl::NewIterators(
const ReadOptions& _read_options,
const std::vector<ColumnFamilyHandle*>& column_families,
Expand Down
7 changes: 7 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,13 @@ class DBImpl : public DB {

const Snapshot* GetSnapshot() override;
void ReleaseSnapshot(const Snapshot* snapshot) override;

// UNDER CONSTRUCTION - DO NOT USE
// Return a cross-column-family iterator from a consistent database state.
std::unique_ptr<Iterator> NewMultiCfIterator(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_families) override;

// Create a timestamped snapshot. This snapshot can be shared by multiple
// readers. If any of them uses it for write conflict checking, then
// is_write_conflict_boundary is true. For simplicity, set it to true by
Expand Down
8 changes: 8 additions & 0 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3199,6 +3199,14 @@ class ModelDB : public DB {
std::vector<Iterator*>* /*iterators*/) override {
return Status::NotSupported("Not supported yet");
}

// UNDER CONSTRUCTION - DO NOT USE
std::unique_ptr<Iterator> NewMultiCfIterator(
const ReadOptions& /*options*/,
const std::vector<ColumnFamilyHandle*>& /*column_families*/) override {
return nullptr;
}

const Snapshot* GetSnapshot() override {
ModelSnapshot* snapshot = new ModelSnapshot;
snapshot->map_ = map_;
Expand Down
62 changes: 62 additions & 0 deletions db/multi_cf_iterator.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

#include "db/multi_cf_iterator.h"

#include <cassert>

namespace ROCKSDB_NAMESPACE {

void MultiCfIterator::SeekToFirst() {
Reset();
int i = 0;
for (auto& cfh_iter_pair : cfh_iter_pairs_) {
auto& cfh = cfh_iter_pair.first;
auto& iter = cfh_iter_pair.second;
iter->SeekToFirst();
if (iter->Valid()) {
assert(iter->status().ok());
min_heap_.push(MultiCfIteratorInfo{iter.get(), cfh, i});
} else {
considerStatus(iter->status());
}
++i;
}
}

void MultiCfIterator::Next() {
assert(Valid());
// 1. Keep the top iterator (by popping it from the heap)
// 2. Make sure all others have iterated past the top iterator key slice
// 3. Advance the top iterator, and add it back to the heap if valid
auto top = min_heap_.top();
min_heap_.pop();
if (!min_heap_.empty()) {
auto* current = min_heap_.top().iterator;
while (current->Valid() &&
comparator_->Compare(top.iterator->key(), current->key()) == 0) {
assert(current->status().ok());
current->Next();
if (current->Valid()) {
min_heap_.replace_top(min_heap_.top());
} else {
considerStatus(current->status());
min_heap_.pop();
}
if (!min_heap_.empty()) {
current = min_heap_.top().iterator;
}
}
}
top.iterator->Next();
if (top.iterator->Valid()) {
assert(top.iterator->status().ok());
min_heap_.push(top);
} else {
considerStatus(top.iterator->status());
}
}

} // namespace ROCKSDB_NAMESPACE
116 changes: 116 additions & 0 deletions db/multi_cf_iterator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

#pragma once

#include "rocksdb/comparator.h"
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "util/heap.h"

namespace ROCKSDB_NAMESPACE {

// UNDER CONSTRUCTION - DO NOT USE
// A cross-column-family iterator from a consistent database state.
// When the same key exists in more than one column families, the iterator
// selects the value from the first column family containing the key, in the
// order provided in the `column_families` parameter.
class MultiCfIterator : public Iterator {
public:
MultiCfIterator(const Comparator* comparator,
const std::vector<ColumnFamilyHandle*>& column_families,
const std::vector<Iterator*>& child_iterators)
: comparator_(comparator),
min_heap_(MultiCfMinHeapItemComparator(comparator_)) {
assert(column_families.size() > 0 &&
column_families.size() == child_iterators.size());
cfh_iter_pairs_.reserve(column_families.size());
for (size_t i = 0; i < column_families.size(); ++i) {
cfh_iter_pairs_.emplace_back(
column_families[i], std::unique_ptr<Iterator>(child_iterators[i]));
}
}
~MultiCfIterator() override { status_.PermitUncheckedError(); }

// No copy allowed
MultiCfIterator(const MultiCfIterator&) = delete;
MultiCfIterator& operator=(const MultiCfIterator&) = delete;

private:
std::vector<std::pair<ColumnFamilyHandle*, std::unique_ptr<Iterator>>>
cfh_iter_pairs_;
ReadOptions read_options_;
Status status_;

AttributeGroups attribute_groups_;

struct MultiCfIteratorInfo {
Iterator* iterator;
ColumnFamilyHandle* cfh;
int order;
};

class MultiCfMinHeapItemComparator {
public:
explicit MultiCfMinHeapItemComparator(const Comparator* comparator)
: comparator_(comparator) {}

bool operator()(const MultiCfIteratorInfo& a,
const MultiCfIteratorInfo& b) const {
assert(a.iterator);
assert(b.iterator);
assert(a.iterator->Valid());
assert(b.iterator->Valid());
int c = comparator_->Compare(a.iterator->key(), b.iterator->key());
assert(c != 0 || a.order != b.order);
return c == 0 ? a.order - b.order > 0 : c > 0;
}

private:
const Comparator* comparator_;
};

const Comparator* comparator_;
using MultiCfMinHeap =
BinaryHeap<MultiCfIteratorInfo, MultiCfMinHeapItemComparator>;
MultiCfMinHeap min_heap_;
// TODO: MaxHeap for Reverse Iteration
// TODO: Lower and Upper bounds

Slice key() const override {
assert(Valid());
return min_heap_.top().iterator->key();
}
bool Valid() const override { return !min_heap_.empty() && status_.ok(); }
Status status() const override { return status_; }
void considerStatus(Status s) {
if (!s.ok() && status_.ok()) {
status_ = std::move(s);
}
}
void Reset() {
min_heap_.clear();
status_ = Status::OK();
}

void SeekToFirst() override;
void Next() override;

// TODO - Implement these
void Seek(const Slice& /*target*/) override {}
void SeekForPrev(const Slice& /*target*/) override {}
void SeekToLast() override {}
void Prev() override { assert(false); }
Slice value() const override {
assert(Valid());
return min_heap_.top().iterator->value();
}
const WideColumns& columns() const override {
assert(Valid());
return min_heap_.top().iterator->columns();
}
};

} // namespace ROCKSDB_NAMESPACE
Loading

0 comments on commit 3412195

Please sign in to comment.