Skip to content

Commit

Permalink
Add ReadaheadParams
Browse files Browse the repository at this point in the history
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
  • Loading branch information
akankshamahajan15 committed Dec 4, 2023
1 parent 315e4dd commit 873507a
Show file tree
Hide file tree
Showing 15 changed files with 378 additions and 881 deletions.
469 changes: 0 additions & 469 deletions async_refactor

This file was deleted.

6 changes: 4 additions & 2 deletions db/blob/prefetch_buffer_collection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ FilePrefetchBuffer* PrefetchBufferCollection::GetOrCreatePrefetchBuffer(
uint64_t file_number) {
auto& prefetch_buffer = prefetch_buffers_[file_number];
if (!prefetch_buffer) {
prefetch_buffer.reset(
new FilePrefetchBuffer(readahead_size_, readahead_size_));
ReadaheadParams readahead_params;
readahead_params.initial_readahead_size = readahead_size_;
readahead_params.max_readahead_size = readahead_size_;
prefetch_buffer.reset(new FilePrefetchBuffer(readahead_params));
}

return prefetch_buffer.get();
Expand Down
366 changes: 145 additions & 221 deletions file/file_prefetch_buffer.cc

Large diffs are not rendered by default.

206 changes: 120 additions & 86 deletions file/file_prefetch_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,31 @@ namespace ROCKSDB_NAMESPACE {
struct IOOptions;
class RandomAccessFileReader;

struct ReadaheadParams {
ReadaheadParams() {}

// The initial readahead size.
size_t initial_readahead_size = 0;

// The maximum readahead size.
// If max_readahead_size > readahead_size, then readahead size will be doubled
// on every IO until max_readahead_size is hit. Typically this is set as a
// multiple of initial_readahead_size. initial_readahead_size should be
// greater than equal to initial_readahead_size.
size_t max_readahead_size = 0;

// If true, Readahead is enabled implicitly by rocksdb
// after doing sequential scans for num_file_reads_for_auto_readahead.
bool implicit_auto_readahead = false;

uint64_t num_file_reads = 0;
uint64_t num_file_reads_for_auto_readahead = 0;

// Number of buffers to maintain that contains prefetched data. If num_buffers
// > 2 then buffers will be filled asynchronously whenever they get emptied.
size_t num_buffers = 1;
};

struct BufferInfo {
void ClearBuffer() {
buffer_.Clear();
Expand Down Expand Up @@ -65,10 +90,6 @@ struct BufferInfo {
// size when callback is made to BlockBasedTableIterator.
// initial end offset of this buffer which will be the starting
// offset of next prefetch.
//
// Note: No need to update/use in case async_io disabled. Prefetching will
// happen only when there is a cache miss. So start offset for prefetching
// will always be the passed offset to FilePrefetchBuffer (offset to read).
uint64_t initial_end_offset_ = 0;
};

Expand All @@ -78,67 +99,77 @@ enum class FilePrefetchBufferUsage {
kUnknown,
};

// Implementation:
// FilePrefetchBuffer maintains a dequeu of free buffers (free_bufs_) of size
// num_buffers_ and bufs_ which contains the prefetched data. Whenever a buffer
// is consumed or is outdated (w.r.t. to requested offset), that buffer is
// cleared and returned to free_bufs_.
//
// If a buffer is available in free_bufs_, it's moved to bufs_ and is sent for
// prefetching. num_buffers_ defines how many buffers are maintained that
// contains prefetched data.
// If num_buffers_ == 1, it's a sequential read flow. Read API will be called on
// that one buffer whenever the data is requested and is not in the buffer.
// If num_buffers_ > 1, then the data is prefetched asynchronosuly in the
// buffers whenever the data is consumed from the buffers and that buffer is
// freed.
// If num_buffers > 1, then requested data can be overlapping between 2 buffers.
// To return the continuous buffer, overlap_bufs_ is used. The requested data is
// copied from 2 buffers to the overlap_bufs_ and overlap_bufs_ is returned to
// the caller.

// FilePrefetchBuffer is a smart buffer to store and read data from a file.
class FilePrefetchBuffer {
public:
// Constructor.
//
// All arguments are optional.
// readahead_size : the initial readahead size.
// max_readahead_size : the maximum readahead size.
// If max_readahead_size > readahead_size, the readahead size will be
// doubled on every IO until max_readahead_size is hit.
// Typically this is set as a multiple of readahead_size.
// max_readahead_size should be greater than equal to readahead_size.
// enable : controls whether reading from the buffer is enabled.
// If false, TryReadFromCache() always return false, and we only take stats
// for the minimum offset if track_min_offset = true.
// ReadaheadParams : Parameters to control the readahead behavior.
// enable : controls whether reading from the buffer is enabled.
// If false, TryReadFromCache() always return false, and we
// only take stats for the minimum offset if
// track_min_offset = true.
// track_min_offset : Track the minimum offset ever read and collect stats on
// it. Used for adaptable readahead of the file footer/metadata.
// implicit_auto_readahead : Readahead is enabled implicitly by rocksdb after
// doing sequential scans for two times.
// it. Used for adaptable readahead of the file
// footer/metadata.
//
// Automatic readhead is enabled for a file if readahead_size
// and max_readahead_size are passed in.
// A user can construct a FilePrefetchBuffer without any arguments, but use
// `Prefetch` to load data into the buffer.
FilePrefetchBuffer(
size_t readahead_size = 0, size_t max_readahead_size = 0,
bool enable = true, bool track_min_offset = false,
bool implicit_auto_readahead = false, uint64_t num_file_reads = 0,
uint64_t num_file_reads_for_auto_readahead = 0,
uint64_t upper_bound_offset = 0, FileSystem* fs = nullptr,
SystemClock* clock = nullptr, Statistics* stats = nullptr,
const ReadaheadParams& readahead_params = {}, bool enable = true,
bool track_min_offset = false, uint64_t upper_bound_offset = 0,
FileSystem* fs = nullptr, SystemClock* clock = nullptr,
Statistics* stats = nullptr,
const std::function<void(bool, uint64_t&, uint64_t&)>& cb = nullptr,
FilePrefetchBufferUsage usage = FilePrefetchBufferUsage::kUnknown,
size_t num_buffers = 1)
: readahead_size_(readahead_size),
initial_auto_readahead_size_(readahead_size),
max_readahead_size_(max_readahead_size),
FilePrefetchBufferUsage usage = FilePrefetchBufferUsage::kUnknown)
: readahead_size_(readahead_params.initial_readahead_size),
initial_auto_readahead_size_(readahead_params.initial_readahead_size),
max_readahead_size_(readahead_params.max_readahead_size),
min_offset_read_(std::numeric_limits<size_t>::max()),
enable_(enable),
track_min_offset_(track_min_offset),
implicit_auto_readahead_(implicit_auto_readahead),
implicit_auto_readahead_(readahead_params.implicit_auto_readahead),
prev_offset_(0),
prev_len_(0),
num_file_reads_for_auto_readahead_(num_file_reads_for_auto_readahead),
num_file_reads_(num_file_reads),
num_file_reads_for_auto_readahead_(
readahead_params.num_file_reads_for_auto_readahead),
num_file_reads_(readahead_params.num_file_reads),
explicit_prefetch_submitted_(false),
fs_(fs),
clock_(clock),
stats_(stats),
usage_(usage),
upper_bound_offset_(upper_bound_offset),
readaheadsize_cb_(cb),
num_buffers_(num_buffers) {
num_buffers_(readahead_params.num_buffers) {
assert((num_file_reads_ >= num_file_reads_for_auto_readahead_ + 1) ||
(num_file_reads_ == 0));
// If ReadOptions.async_io is enabled, data is asynchronously filled in the
// queue. If data is overlapping in two buffers, data is copied to third
// buffer i.e. overlap_bufs_ to return continuous buffer.
overlap_bufs_.resize(1);
for (uint32_t i = 0; i < 1; i++) {
overlap_bufs_[i] = new BufferInfo();

// If num_buffers_ > 2, data is asynchronously filled in the
// queue. As result, data can be overlapping in two buffers. It copies the
// data to overlap_bufs_ in order to to return continuous buffer.
if (num_buffers_ > 1) {
overlap_bufs_.emplace_back(new BufferInfo());
}

free_bufs_.resize(num_buffers_);
Expand Down Expand Up @@ -278,6 +309,10 @@ class FilePrefetchBuffer {

void DecreaseReadAheadIfEligible(uint64_t offset, size_t size,
size_t value = DEFAULT_DECREMENT) {
if (bufs_.empty()) {
return;
}

// Decrease the readahead_size if
// - its enabled internally by RocksDB (implicit_auto_readahead_) and,
// - readahead_size is greater than 0 and,
Expand All @@ -287,9 +322,6 @@ class FilePrefetchBuffer {
// - block is sequential with the previous read and,
// - num_file_reads_ + 1 (including this read) >
// num_file_reads_for_auto_readahead_
if (bufs_.empty()) {
return;
}

size_t curr_size = bufs_.front()->async_read_in_progress_
? bufs_.front()->async_req_len_
Expand Down Expand Up @@ -331,12 +363,10 @@ class FilePrefetchBuffer {

void AbortAllIOs();

void UpdateBuffersIfNeeded(uint64_t offset, size_t len);
void ClearOutdatedData(uint64_t offset, size_t len);

// It calls Poll API if any there is any pending asynchronous request. It then
// checks if data is in any buffer. It clears the outdated data and swaps the
// buffers if required.
void PollAndUpdateBuffersIfNeeded(uint64_t offset, size_t len);
// It calls Poll API to check for any pending asynchronous request.
void PollIfNeeded(uint64_t offset, size_t len);

Status PrefetchAsyncInternal(const IOOptions& opts,
RandomAccessFileReader* reader, uint64_t offset,
Expand All @@ -351,7 +381,7 @@ class FilePrefetchBuffer {
RandomAccessFileReader* reader, uint64_t read_len,
uint64_t start_offset);

// Copy the data from src to third buffer.
// Copy the data from src to overlap_bufs_.
void CopyDataToBuffer(BufferInfo* src, uint64_t& offset, size_t& length);

bool IsBlockSequential(const size_t& offset) {
Expand Down Expand Up @@ -388,42 +418,14 @@ class FilePrefetchBuffer {
return true;
}

// *** BEGIN Helper APIs related to data in Buffers ***
bool IsDataBlockInBuffer(BufferInfo* buf, uint64_t offset, size_t length) {
return (offset >= buf->offset_ &&
offset + length <= buf->offset_ + buf->buffer_.CurrentSize());
}
bool IsOffsetInBuffer(BufferInfo* buf, uint64_t offset) {
return (offset >= buf->offset_ &&
offset < buf->offset_ + buf->buffer_.CurrentSize());
}
bool DoesBufferContainData(BufferInfo* buf) {
return buf->buffer_.CurrentSize() > 0;
}
bool IsBufferOutdated(BufferInfo* buf, uint64_t offset) {
return (!buf->async_read_in_progress_ && DoesBufferContainData(buf) &&
offset >= buf->offset_ + buf->buffer_.CurrentSize());
}
bool IsBufferOutdatedWithAsyncProgress(BufferInfo* buf, uint64_t offset) {
return (buf->async_read_in_progress_ && buf->io_handle_ != nullptr &&
offset >= buf->offset_ + buf->async_req_len_);
}
bool IsOffsetInBufferWithAsyncProgress(BufferInfo* buf, uint64_t offset) {
return (buf->async_read_in_progress_ && offset >= buf->offset_ &&
offset < buf->offset_ + buf->async_req_len_);
}
// *** END Helper APIs related to data in Buffers ***

bool IsEligibleForFurtherPrefetching() {
if (free_bufs_.empty()) {
return false;
}

// Readahead size can be 0 because of trimming.
if (readahead_size_ == 0) {
return false;
}

return true;
}

Expand Down Expand Up @@ -488,7 +490,46 @@ class FilePrefetchBuffer {
}
}

Status PrefetchRemBuffers(const IOOptions& opts,
RandomAccessFileReader* reader,
uint32_t end_offset1, size_t alignment,
size_t readahead_size);

// *** BEGIN Helper APIs related to data in Buffers ***
bool IsDataBlockInBuffer(BufferInfo* buf, uint64_t offset, size_t length) {
return (offset >= buf->offset_ &&
offset + length <= buf->offset_ + buf->buffer_.CurrentSize());
}
bool IsOffsetInBuffer(BufferInfo* buf, uint64_t offset) {
return (offset >= buf->offset_ &&
offset < buf->offset_ + buf->buffer_.CurrentSize());
}
bool DoesBufferContainData(BufferInfo* buf) {
return buf->buffer_.CurrentSize() > 0;
}
bool IsBufferOutdated(BufferInfo* buf, uint64_t offset) {
return (!buf->async_read_in_progress_ && DoesBufferContainData(buf) &&
offset >= buf->offset_ + buf->buffer_.CurrentSize());
}
bool IsBufferOutdatedWithAsyncProgress(BufferInfo* buf, uint64_t offset) {
return (buf->async_read_in_progress_ && buf->io_handle_ != nullptr &&
offset >= buf->offset_ + buf->async_req_len_);
}
bool IsOffsetInBufferWithAsyncProgress(BufferInfo* buf, uint64_t offset) {
return (buf->async_read_in_progress_ && offset >= buf->offset_ &&
offset < buf->offset_ + buf->async_req_len_);
}
// *** END Helper APIs related to data in Buffers ***

// *** BEGIN APIs related to allocating and freeing buffers ***
bool IsBufferQueueEmpty() { return bufs_.empty(); }

BufferInfo* GetFirstBuffer() { return bufs_.front(); }

BufferInfo* GetLastBuffer() { return bufs_.back(); }

size_t NumBuffersAllocated() { return bufs_.size(); }

void AllocateBuffer() {
assert(!free_bufs_.empty());
BufferInfo* buf = free_bufs_.front();
Expand Down Expand Up @@ -542,16 +583,6 @@ class FilePrefetchBuffer {
bufs_ = tmp_buf;
}

void ClearAllBuffers() {
if (bufs_.empty()) {
return;
}
for (auto& buf : bufs_) {
buf->ClearBuffer();
}
FreeEmptyBuffers();
}

// *** END APIs related to allocating and freeing buffers ***

std::deque<BufferInfo*> bufs_;
Expand Down Expand Up @@ -600,6 +631,9 @@ class FilePrefetchBuffer {
// upper_bound_offset_ during prefetching.
uint64_t upper_bound_offset_ = 0;
std::function<void(bool, uint64_t&, uint64_t&)> readaheadsize_cb_;

// num_buffers_ is the number of buffers maintained by FilePrefetchBuffer to
// prefetch the data at a time.
size_t num_buffers_;
};
} // namespace ROCKSDB_NAMESPACE
Loading

0 comments on commit 873507a

Please sign in to comment.