Skip to content

Commit

Permalink
Add ReadaheadParams
Browse files Browse the repository at this point in the history
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
  • Loading branch information
akankshamahajan15 committed Dec 1, 2023
1 parent 3c0980e commit 8cef5f0
Show file tree
Hide file tree
Showing 14 changed files with 137 additions and 589 deletions.
469 changes: 0 additions & 469 deletions async_refactor

This file was deleted.

6 changes: 4 additions & 2 deletions db/blob/prefetch_buffer_collection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ FilePrefetchBuffer* PrefetchBufferCollection::GetOrCreatePrefetchBuffer(
uint64_t file_number) {
auto& prefetch_buffer = prefetch_buffers_[file_number];
if (!prefetch_buffer) {
prefetch_buffer.reset(
new FilePrefetchBuffer(readahead_size_, readahead_size_));
ReadaheadParams readahead_params;
readahead_params.initial_readahead_size = readahead_size_;
readahead_params.max_readahead_size = readahead_size_;
prefetch_buffer.reset(new FilePrefetchBuffer(readahead_params));
}

return prefetch_buffer.get();
Expand Down
39 changes: 24 additions & 15 deletions file/file_prefetch_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ namespace ROCKSDB_NAMESPACE {
struct IOOptions;
class RandomAccessFileReader;

struct ReadaheadParams {
ReadaheadParams() {}

size_t initial_readahead_size = 0;
size_t max_readahead_size = 0;
bool implicit_auto_readahead = false;
uint64_t num_file_reads = 0;
uint64_t num_file_reads_for_auto_readahead = 0;
size_t num_buffers = 1;
};

struct BufferInfo {
void ClearBuffer() {
buffer_.Clear();
Expand Down Expand Up @@ -103,34 +114,32 @@ class FilePrefetchBuffer {
// A user can construct a FilePrefetchBuffer without any arguments, but use
// `Prefetch` to load data into the buffer.
FilePrefetchBuffer(
size_t readahead_size = 0, size_t max_readahead_size = 0,
bool enable = true, bool track_min_offset = false,
bool implicit_auto_readahead = false, uint64_t num_file_reads = 0,
uint64_t num_file_reads_for_auto_readahead = 0,
uint64_t upper_bound_offset = 0, FileSystem* fs = nullptr,
SystemClock* clock = nullptr, Statistics* stats = nullptr,
const ReadaheadParams& readahead_params, bool enable = true,
bool track_min_offset = false, uint64_t upper_bound_offset = 0,
FileSystem* fs = nullptr, SystemClock* clock = nullptr,
Statistics* stats = nullptr,
const std::function<void(bool, uint64_t&, uint64_t&)>& cb = nullptr,
FilePrefetchBufferUsage usage = FilePrefetchBufferUsage::kUnknown,
size_t num_buffers = 1)
: readahead_size_(readahead_size),
initial_auto_readahead_size_(readahead_size),
max_readahead_size_(max_readahead_size),
FilePrefetchBufferUsage usage = FilePrefetchBufferUsage::kUnknown)
: readahead_size_(readahead_params.initial_readahead_size),
initial_auto_readahead_size_(readahead_params.initial_readahead_size),
max_readahead_size_(readahead_params.max_readahead_size),
min_offset_read_(std::numeric_limits<size_t>::max()),
enable_(enable),
track_min_offset_(track_min_offset),
implicit_auto_readahead_(implicit_auto_readahead),
implicit_auto_readahead_(readahead_params.implicit_auto_readahead),
prev_offset_(0),
prev_len_(0),
num_file_reads_for_auto_readahead_(num_file_reads_for_auto_readahead),
num_file_reads_(num_file_reads),
num_file_reads_for_auto_readahead_(
readahead_params.num_file_reads_for_auto_readahead),
num_file_reads_(readahead_params.num_file_reads),
explicit_prefetch_submitted_(false),
fs_(fs),
clock_(clock),
stats_(stats),
usage_(usage),
upper_bound_offset_(upper_bound_offset),
readaheadsize_cb_(cb),
num_buffers_(num_buffers) {
num_buffers_(readahead_params.num_buffers) {
assert((num_file_reads_ >= num_file_reads_for_auto_readahead_ + 1) ||
(num_file_reads_ == 0));
// If ReadOptions.async_io is enabled, data is asynchronously filled in the
Expand Down
71 changes: 45 additions & 26 deletions file/prefetch_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3243,7 +3243,11 @@ TEST_F(FilePrefetchBufferTest, SeekWithBlockCacheHit) {
std::unique_ptr<RandomAccessFileReader> r;
Read(fname, opts, &r);

FilePrefetchBuffer fpb(16384, 16384, true, false, false, 0, 0, 0, fs());
ReadaheadParams readahead_params;
readahead_params.initial_readahead_size = 16384;
readahead_params.max_readahead_size = 16384;

FilePrefetchBuffer fpb(readahead_params, true, false, 0, fs());
Slice result;
// Simulate a seek of 4096 bytes at offset 0. Due to the readahead settings,
// it will do two reads of 4096+8192 and 8192
Expand Down Expand Up @@ -3289,12 +3293,17 @@ TEST_F(FilePrefetchBufferTest, SeekWithoutAlignment) {
// Without readahead enabled, there will be no alignment and offset of buffer
// will be n.
{
FilePrefetchBuffer fpb(
/*readahead_size=*/8192, /*max_readahead_size=*/16384, /*enable=*/true,
/*track_min_offset=*/false, /*implicit_auto_readahead=*/true,
/*num_file_reads=*/0, /*num_file_reads_for_auto_readahead=*/2,
/*upper_bound_offset=*/0, fs(), nullptr, nullptr, nullptr,
FilePrefetchBufferUsage::kUnknown, 2);
ReadaheadParams readahead_params;
readahead_params.initial_readahead_size = 8192;
readahead_params.max_readahead_size = 16384;
readahead_params.implicit_auto_readahead = true;
readahead_params.num_file_reads_for_auto_readahead = 2;
readahead_params.num_buffers = 2;

FilePrefetchBuffer fpb(readahead_params, /*enable=*/true,
/*track_min_offset=*/false,
/*upper_bound_offset=*/0, fs(), nullptr, nullptr,
nullptr, FilePrefetchBufferUsage::kUnknown);

Slice result;
// Simulate a seek of half of alignment bytes at offset n. Due to the
Expand Down Expand Up @@ -3322,12 +3331,15 @@ TEST_F(FilePrefetchBufferTest, SeekWithoutAlignment) {
// buffer will be 0.
{
read_async_called = false;
FilePrefetchBuffer fpb(
/*readahead_size=*/16384, /*max_readahead_size=*/16384, /*enable=*/true,
/*track_min_offset=*/false, /*implicit_auto_readahead=*/false,
/*num_file_reads=*/0, /*num_file_reads_for_auto_readahead=*/2,
/*upper_bound_offset=*/0, fs(), nullptr, nullptr, nullptr,
FilePrefetchBufferUsage::kUnknown, 2);
ReadaheadParams readahead_params;
readahead_params.initial_readahead_size = 16384;
readahead_params.max_readahead_size = 16384;
readahead_params.num_file_reads_for_auto_readahead = 2;
readahead_params.num_buffers = 2;
FilePrefetchBuffer fpb(readahead_params, /*enable=*/true,
/*track_min_offset=*/false,
/*upper_bound_offset=*/0, fs(), nullptr, nullptr,
nullptr, FilePrefetchBufferUsage::kUnknown);

Slice result;
// Simulate a seek of half of alignment bytes at offset n.
Expand Down Expand Up @@ -3360,12 +3372,14 @@ TEST_F(FilePrefetchBufferTest, NoSyncWithAsyncIO) {
std::unique_ptr<RandomAccessFileReader> r;
Read(fname, opts, &r);

FilePrefetchBuffer fpb(
/*readahead_size=*/8192, /*max_readahead_size=*/16384, /*enable=*/true,
/*track_min_offset=*/false, /*implicit_auto_readahead=*/false,
/*num_file_reads=*/0, /*num_file_reads_for_auto_readahead=*/0,
/*upper_bound_offset=*/0, fs(), nullptr, nullptr, nullptr,
FilePrefetchBufferUsage::kUnknown, 2);
ReadaheadParams readahead_params;
readahead_params.initial_readahead_size = 8192;
readahead_params.max_readahead_size = 16384;
readahead_params.num_buffers = 2;
FilePrefetchBuffer fpb(readahead_params, /*enable=*/true,
/*track_min_offset=*/false,
/*upper_bound_offset=*/0, fs(), nullptr, nullptr,
nullptr, FilePrefetchBufferUsage::kUnknown);

int read_async_called = 0;
SyncPoint::GetInstance()->SetCallBack(
Expand Down Expand Up @@ -3416,12 +3430,14 @@ TEST_F(FilePrefetchBufferTest, IterateUpperBoundTest1) {
std::unique_ptr<RandomAccessFileReader> r;
Read(fname, opts, &r);

FilePrefetchBuffer fpb(
/*readahead_size=*/8192, /*max_readahead_size=*/16384, /*enable=*/true,
/*track_min_offset=*/false, /*implicit_auto_readahead=*/false,
/*num_file_reads=*/0, /*num_file_reads_for_auto_readahead=*/0,
/*upper_bound_offset=*/8000, fs(), nullptr, nullptr, nullptr,
FilePrefetchBufferUsage::kUnknown, 2);
ReadaheadParams readahead_params;
readahead_params.initial_readahead_size = 8192;
readahead_params.max_readahead_size = 16384;
readahead_params.num_buffers = 2;
FilePrefetchBuffer fpb(readahead_params, /*enable=*/true,
/*track_min_offset=*/false,
/*upper_bound_offset=*/8000, fs(), nullptr, nullptr,
nullptr, FilePrefetchBufferUsage::kUnknown);

int read_async_called = 0;
SyncPoint::GetInstance()->SetCallBack(
Expand Down Expand Up @@ -3472,7 +3488,10 @@ TEST_F(FilePrefetchBufferTest, SyncReadaheadStats) {
Read(fname, opts, &r);

std::shared_ptr<Statistics> stats = CreateDBStatistics();
FilePrefetchBuffer fpb(8192, 8192, true, false, false, 0, 0, 0, fs(), nullptr,
ReadaheadParams readahead_params;
readahead_params.initial_readahead_size = 8192;
readahead_params.max_readahead_size = 8192;
FilePrefetchBuffer fpb(readahead_params, true, false, 0, fs(), nullptr,
stats.get());
Slice result;
// Simulate a seek of 4096 bytes at offset 0. Due to the readahead settings,
Expand Down
4 changes: 2 additions & 2 deletions table/block_based/block_based_table_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ void BlockBasedTableIterator::InitDataBlock() {
rep, data_block_handle, read_options_.readahead_size,
is_for_compaction,
/*no_sequential_checking=*/false, read_options_, readaheadsize_cb,
(read_options_.async_io ? 2 : 1));
read_options_.async_io);

Status s;
table_->NewDataBlockIterator<DataBlockIter>(
Expand Down Expand Up @@ -436,7 +436,7 @@ void BlockBasedTableIterator::AsyncInitDataBlock(bool is_first_pass) {
block_prefetcher_.PrefetchIfNeeded(
rep, data_block_handle, read_options_.readahead_size,
is_for_compaction, /*no_sequential_checking=*/read_options_.async_io,
read_options_, readaheadsize_cb, 2);
read_options_, readaheadsize_cb, read_options_.async_io);

Status s;
table_->NewDataBlockIterator<DataBlockIter>(
Expand Down
17 changes: 7 additions & 10 deletions table/block_based/block_based_table_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -613,8 +613,7 @@ Status BlockBasedTable::Open(
} else {
// Should not prefetch for mmap mode.
prefetch_buffer.reset(new FilePrefetchBuffer(
0 /* readahead_size */, 0 /* max_readahead_size */, false /* enable */,
true /* track_min_offset */));
ReadaheadParams(), false /* enable */, true /* track_min_offset */));
}

// Read in the following order:
Expand Down Expand Up @@ -876,17 +875,14 @@ Status BlockBasedTable::PrefetchTail(
if (s.ok() && !file->use_direct_io() && !force_direct_prefetch) {
if (!file->Prefetch(opts, prefetch_off, prefetch_len).IsNotSupported()) {
prefetch_buffer->reset(new FilePrefetchBuffer(
0 /* readahead_size */, 0 /* max_readahead_size */,
false /* enable */, true /* track_min_offset */));
ReadaheadParams(), false /* enable */, true /* track_min_offset */));
return Status::OK();
}
}

// Use `FilePrefetchBuffer`
prefetch_buffer->reset(new FilePrefetchBuffer(
0 /* readahead_size */, 0 /* max_readahead_size */, true /* enable */,
true /* track_min_offset */, false /* implicit_auto_readahead */,
0 /* num_file_reads */, 0 /* num_file_reads_for_auto_readahead */,
ReadaheadParams(), true /* enable */, true /* track_min_offset */,
0 /* upper_bound_offset */, nullptr /* fs */, nullptr /* clock */, stats,
/* readahead_cb */ nullptr,
FilePrefetchBufferUsage::kTableOpenPrefetchTail));
Expand Down Expand Up @@ -2499,10 +2495,11 @@ Status BlockBasedTable::VerifyChecksumInBlocks(
: rep_->table_options.max_auto_readahead_size;
// FilePrefetchBuffer doesn't work in mmap mode and readahead is not
// needed there.
ReadaheadParams readahead_params;
readahead_params.initial_readahead_size = readahead_size;
readahead_params.max_readahead_size = readahead_size;
FilePrefetchBuffer prefetch_buffer(
readahead_size /* readahead_size */,
readahead_size /* max_readahead_size */,
!rep_->ioptions.allow_mmap_reads /* enable */);
readahead_params, !rep_->ioptions.allow_mmap_reads /* enable */);

for (index_iter->SeekToFirst(); index_iter->Valid(); index_iter->Next()) {
s = index_iter->status();
Expand Down
32 changes: 11 additions & 21 deletions table/block_based/block_based_table_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -696,34 +696,24 @@ struct BlockBasedTable::Rep {
return file ? TableFileNameToNumber(file->file_name()) : UINT64_MAX;
}
void CreateFilePrefetchBuffer(
size_t readahead_size, size_t max_readahead_size,
std::unique_ptr<FilePrefetchBuffer>* fpb, bool implicit_auto_readahead,
uint64_t num_file_reads, uint64_t num_file_reads_for_auto_readahead,
uint64_t upper_bound_offset,
const ReadaheadParams& readahead_params,
std::unique_ptr<FilePrefetchBuffer>* fpb, uint64_t upper_bound_offset,
const std::function<void(bool, uint64_t&, uint64_t&)>& readaheadsize_cb,
FilePrefetchBufferUsage usage, size_t num_buffers) const {
FilePrefetchBufferUsage usage) const {
fpb->reset(new FilePrefetchBuffer(
readahead_size, max_readahead_size,
!ioptions.allow_mmap_reads /* enable */, false /* track_min_offset */,
implicit_auto_readahead, num_file_reads,
num_file_reads_for_auto_readahead, upper_bound_offset,
ioptions.fs.get(), ioptions.clock, ioptions.stats, readaheadsize_cb,
usage, num_buffers));
readahead_params, !ioptions.allow_mmap_reads /* enable */,
false /* track_min_offset */, upper_bound_offset, ioptions.fs.get(),
ioptions.clock, ioptions.stats, readaheadsize_cb, usage));
}

void CreateFilePrefetchBufferIfNotExists(
size_t readahead_size, size_t max_readahead_size,
std::unique_ptr<FilePrefetchBuffer>* fpb, bool implicit_auto_readahead,
uint64_t num_file_reads, uint64_t num_file_reads_for_auto_readahead,
uint64_t upper_bound_offset,
const ReadaheadParams& readahead_params,
std::unique_ptr<FilePrefetchBuffer>* fpb, uint64_t upper_bound_offset,
const std::function<void(bool, uint64_t&, uint64_t&)>& readaheadsize_cb,
FilePrefetchBufferUsage usage = FilePrefetchBufferUsage::kUnknown,
size_t num_buffers = 1) const {
FilePrefetchBufferUsage usage = FilePrefetchBufferUsage::kUnknown) const {
if (!(*fpb)) {
CreateFilePrefetchBuffer(
readahead_size, max_readahead_size, fpb, implicit_auto_readahead,
num_file_reads, num_file_reads_for_auto_readahead, upper_bound_offset,
readaheadsize_cb, usage, num_buffers);
CreateFilePrefetchBuffer(readahead_params, fpb, upper_bound_offset,
readaheadsize_cb, usage);
}
}

Expand Down
Loading

0 comments on commit 8cef5f0

Please sign in to comment.