Skip to content

Commit

Permalink
Merge Async and Sync in one code flow
Browse files Browse the repository at this point in the history
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
  • Loading branch information
akankshamahajan15 committed Dec 4, 2023
1 parent a3b1c95 commit d76e301
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 211 deletions.
217 changes: 74 additions & 143 deletions file/file_prefetch_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ Status FilePrefetchBuffer::Prefetch(const IOOptions& opts,
return Status::OK();
}

assert(num_buffers_ == 1);

AllocateBufferIfEmpty();
BufferInfo* buf = GetFirstBuffer();

Expand Down Expand Up @@ -425,7 +427,11 @@ Status FilePrefetchBuffer::HandleOverlappingData(
overlap_buf->offset_ = offset;
copy_to_overlap_buffer = true;

size_t initial_buf_size = overlap_bufs_.front()->buffer_.CurrentSize();
CopyDataToBuffer(buf, tmp_offset, tmp_length);
UpdateStats(
/*found_in_buffer=*/false,
overlap_bufs_.front()->buffer_.CurrentSize() - initial_buf_size);

// Call async prefetching on freed buffer since data has been consumed
// only if requested data lies within next buffer.
Expand Down Expand Up @@ -476,16 +482,16 @@ Status FilePrefetchBuffer::HandleOverlappingData(
// data from first, free that buffer to send for async request, wait for
// poll to fill next buffer (if any), and copy remaining data from that
// buffer to overlap buffer.
Status FilePrefetchBuffer::PrefetchAsyncInternal(const IOOptions& opts,
RandomAccessFileReader* reader,
uint64_t offset, size_t length,
size_t readahead_size,
bool& copy_to_overlap_buffer) {
Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts,
RandomAccessFileReader* reader,
uint64_t offset, size_t length,
size_t readahead_size,
bool& copy_to_overlap_buffer) {
if (!enable_) {
return Status::OK();
}

TEST_SYNC_POINT("FilePrefetchBuffer::PrefetchAsyncInternal:Start");
TEST_SYNC_POINT("FilePrefetchBuffer::Prefetch:Start");

size_t alignment = reader->file()->GetRequiredBufferAlignment();
Status s;
Expand Down Expand Up @@ -533,21 +539,41 @@ Status FilePrefetchBuffer::PrefetchAsyncInternal(const IOOptions& opts,

// After polling, if all the requested bytes are in first buffer, it will only
// go for async prefetching.
// copy_to_overlap_buffer is a special case so it will be handled separately.
if (!copy_to_overlap_buffer && DoesBufferContainData(buf) &&
IsDataBlockInBuffer(buf, offset, length)) {
offset += length;
length = 0;

// Since async request was submitted directly by calling PrefetchAsync in
// last call, we don't need to prefetch further as this call is to poll
// the data submitted in previous call.
if (explicit_prefetch_submitted_) {
return s;
}
if (!IsEligibleForFurtherPrefetching()) {
UpdateStats(/*found_in_buffer=*/true, original_length);
return s;
if (DoesBufferContainData(buf)) {
if (copy_to_overlap_buffer) {
// Data is overlapping i.e. some of the data has been copied to overlap
// buffer and remaining will be updated below.
size_t initial_buf_size = overlap_bufs_.front()->buffer_.CurrentSize();
CopyDataToBuffer(buf, offset, length);
UpdateStats(
/*found_in_buffer=*/false,
overlap_bufs_.front()->buffer_.CurrentSize() - initial_buf_size);

// Length == 0: All the requested data has been copied to overlap buffer
// and it has already gone for async prefetching. It can return without
// doing anything further. Length > 0: More data needs to be consumed so
// it will continue async and sync prefetching and copy the remaining data
// to overlap buffer in the end.
if (length == 0) {
UpdateStats(/*found_in_buffer=*/true, length);
return s;
}
} else {
if (IsDataBlockInBuffer(buf, offset, length)) {
offset += length;
length = 0;

// Since async request was submitted directly by calling PrefetchAsync
// in last call, we don't need to prefetch further as this call is to
// poll the data submitted in previous call.
if (explicit_prefetch_submitted_) {
return s;
}
if (!IsEligibleForFurtherPrefetching()) {
UpdateStats(/*found_in_buffer=*/true, original_length);
return s;
}
}
}
}

Expand Down Expand Up @@ -584,22 +610,7 @@ Status FilePrefetchBuffer::PrefetchAsyncInternal(const IOOptions& opts,
AllocateBufferIfEmpty();
buf = GetFirstBuffer();

// Data is overlapping i.e. some of the data has been copied to third
// buffer and remaining will be updated below.
if (copy_to_overlap_buffer && DoesBufferContainData(buf)) {
CopyDataToBuffer(buf, offset, length);

// Length == 0: All the requested data has been copied to overlap buffer and
// it has already gone for async prefetching. It can return without doing
// anything further.
// Length > 0: More data needs to be consumed so it will continue async
// and sync prefetching and copy the remaining data to overlap buffer in the
// end.
if (length == 0) {
UpdateStats(/*found_in_buffer=*/true, original_length);
return s;
}
}

AllocateBufferIfEmpty();
buf = GetFirstBuffer();
Expand All @@ -611,11 +622,10 @@ Status FilePrefetchBuffer::PrefetchAsyncInternal(const IOOptions& opts,

// For length == 0, skip the synchronous prefetching. read_len1 will be 0.
if (length > 0) {
ReadAheadSizeTuning(buf, /*read_curr_block=*/true, /*refit_tail=*/false,
start_offset1, alignment, length, readahead_size,
ReadAheadSizeTuning(buf, /*read_curr_block=*/true, /*refit_tail*/
true, start_offset1, alignment, length, readahead_size,
start_offset1, end_offset1, read_len1, chunk_len1);
UpdateStats(/*found_in_buffer=*/false,
/*length_found=*/original_length - length);
UpdateStats(/*found_in_buffer=*/false, chunk_len1);
} else {
UpdateStats(/*found_in_buffer=*/true, original_length);
}
Expand All @@ -638,7 +648,8 @@ Status FilePrefetchBuffer::PrefetchAsyncInternal(const IOOptions& opts,
}
}

// Copy remaining requested bytes to third_buffer.
// Copy remaining requested bytes to overlap_buffer. No need to update stats
// as data is prefetched during this call.
if (copy_to_overlap_buffer && length > 0) {
CopyDataToBuffer(buf, offset, length);
}
Expand All @@ -649,7 +660,7 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,
RandomAccessFileReader* reader,
uint64_t offset, size_t n,
Slice* result, Status* status,
bool for_compaction /* = false */) {
bool for_compaction) {
bool ret = TryReadFromCacheUntracked(opts, reader, offset, n, result, status,
for_compaction);
if (usage_ == FilePrefetchBufferUsage::kTableOpenPrefetchTail && enable_) {
Expand All @@ -664,94 +675,7 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,

bool FilePrefetchBuffer::TryReadFromCacheUntracked(
const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset,
size_t n, Slice* result, Status* status,
bool for_compaction /* = false */) {
if (track_min_offset_ && offset < min_offset_read_) {
min_offset_read_ = static_cast<size_t>(offset);
}

if (!enable_) {
return false;
}

AllocateBufferIfEmpty();
BufferInfo* buf = GetFirstBuffer();

if ((offset < buf->offset_)) {
return false;
}

// If the buffer contains only a few of the requested bytes:
// If readahead is enabled: prefetch the remaining bytes + readahead bytes
// and satisfy the request.
// If readahead is not enabled: return false.
TEST_SYNC_POINT_CALLBACK("FilePrefetchBuffer::TryReadFromCache",
&readahead_size_);
if (offset + n > buf->offset_ + buf->buffer_.CurrentSize()) {
if (readahead_size_ > 0) {
Status s;
assert(reader != nullptr);
assert(max_readahead_size_ >= readahead_size_);
if (for_compaction) {
s = Prefetch(opts, reader, offset, std::max(n, readahead_size_));
} else {
if (IsOffsetInBuffer(buf, offset)) {
RecordTick(stats_, PREFETCH_BYTES_USEFUL,
buf->offset_ + buf->buffer_.CurrentSize() - offset);
}
if (implicit_auto_readahead_) {
if (!IsEligibleForPrefetch(offset, n)) {
// Ignore status as Prefetch is not called.
s.PermitUncheckedError();
return false;
}
}
UpdateReadAheadSizeForUpperBound(offset, n);
s = Prefetch(opts, reader, offset, n + readahead_size_);
}
if (!s.ok()) {
if (status) {
*status = s;
}
#ifndef NDEBUG
IGNORE_STATUS_IF_ERROR(s);
#endif
return false;
}
readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2);
} else {
return false;
}
} else if (!for_compaction) {
RecordTick(stats_, PREFETCH_HITS);
RecordTick(stats_, PREFETCH_BYTES_USEFUL, n);
}
UpdateReadPattern(offset, n, false /*decrease_readaheadsize*/);

uint64_t offset_in_buffer = offset - buf->offset_;
*result = Slice(buf->buffer_.BufferStart() + offset_in_buffer, n);
return true;
}

bool FilePrefetchBuffer::TryReadFromCacheAsync(const IOOptions& opts,
RandomAccessFileReader* reader,
uint64_t offset, size_t n,
Slice* result, Status* status) {
bool ret =
TryReadFromCacheAsyncUntracked(opts, reader, offset, n, result, status);
if (usage_ == FilePrefetchBufferUsage::kTableOpenPrefetchTail && enable_) {
if (ret) {
RecordTick(stats_, TABLE_OPEN_PREFETCH_TAIL_HIT);
} else {
RecordTick(stats_, TABLE_OPEN_PREFETCH_TAIL_MISS);
}
}
return ret;
}

bool FilePrefetchBuffer::TryReadFromCacheAsyncUntracked(
const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset,
size_t n, Slice* result, Status* status) {
size_t n, Slice* result, Status* status, bool for_compaction) {
if (track_min_offset_ && offset < min_offset_read_) {
min_offset_read_ = static_cast<size_t>(offset);
}
Expand Down Expand Up @@ -799,21 +723,28 @@ bool FilePrefetchBuffer::TryReadFromCacheAsyncUntracked(
assert(reader != nullptr);
assert(max_readahead_size_ >= readahead_size_);

if (implicit_auto_readahead_) {
if (!IsEligibleForPrefetch(offset, n)) {
// Ignore status as Prefetch is not called.
s.PermitUncheckedError();
return false;
if (for_compaction) {
s = Prefetch(opts, reader, offset, std::max(n, readahead_size_));
} else {
if (implicit_auto_readahead_) {
if (!IsEligibleForPrefetch(offset, n)) {
// Ignore status as Prefetch is not called.
s.PermitUncheckedError();
return false;
}
}
}

UpdateReadAheadSizeForUpperBound(offset, n);
UpdateReadAheadSizeForUpperBound(offset, n);

// Prefetch n + readahead_size_/2 synchronously as remaining
// readahead_size_/2 will be prefetched asynchronously.
s = PrefetchAsyncInternal(opts, reader, offset, n, readahead_size_ / 2,
copy_to_overlap_buffer);
explicit_prefetch_submitted_ = false;
// Prefetch n + readahead_size_/2 synchronously as remaining
// readahead_size_/2 will be prefetched asynchronously if num_buffers_
// > 1.
s = PrefetchInternal(
opts, reader, offset, n,
(num_buffers_ > 1 ? readahead_size_ / 2 : readahead_size_),
copy_to_overlap_buffer);
explicit_prefetch_submitted_ = false;
}
if (!s.ok()) {
if (status) {
*status = s;
Expand All @@ -827,7 +758,7 @@ bool FilePrefetchBuffer::TryReadFromCacheAsyncUntracked(
} else {
return false;
}
} else {
} else if (!for_compaction) {
UpdateStats(/*found_in_buffer=*/true, n);
}

Expand Down
23 changes: 8 additions & 15 deletions file/file_prefetch_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,14 @@ class FilePrefetchBuffer {

bool Enabled() const { return enable_; }

// Load data into the buffer from a file.
// Called externally by user to only load data into the buffer from a file
// with num_buffers_ should be set to default(1).
//
// opts : the IO options to use.
// reader : the file reader.
// offset : the file offset to start reading from.
// n : the number of bytes to read.
//
Status Prefetch(const IOOptions& opts, RandomAccessFileReader* reader,
uint64_t offset, size_t n);

Expand Down Expand Up @@ -281,10 +284,6 @@ class FilePrefetchBuffer {
uint64_t offset, size_t n, Slice* result, Status* s,
bool for_compaction = false);

bool TryReadFromCacheAsync(const IOOptions& opts,
RandomAccessFileReader* reader, uint64_t offset,
size_t n, Slice* result, Status* status);

// The minimum `offset` ever passed to TryReadFromCache(). This will nly be
// tracked if track_min_offset = true.
size_t min_offset_read() const { return min_offset_read_; }
Expand Down Expand Up @@ -368,10 +367,9 @@ class FilePrefetchBuffer {
// It calls Poll API to check for any pending asynchronous request.
void PollIfNeeded(uint64_t offset, size_t len);

Status PrefetchAsyncInternal(const IOOptions& opts,
RandomAccessFileReader* reader, uint64_t offset,
size_t length, size_t readahead_size,
bool& copy_to_third_buffer);
Status PrefetchInternal(const IOOptions& opts, RandomAccessFileReader* reader,
uint64_t offset, size_t length, size_t readahead_size,
bool& copy_to_third_buffer);

Status Read(BufferInfo* buf, const IOOptions& opts,
RandomAccessFileReader* reader, uint64_t read_len,
Expand Down Expand Up @@ -450,11 +448,6 @@ class FilePrefetchBuffer {
Status* s,
bool for_compaction = false);

bool TryReadFromCacheAsyncUntracked(const IOOptions& opts,
RandomAccessFileReader* reader,
uint64_t offset, size_t n, Slice* result,
Status* status);

void UpdateReadAheadSizeForUpperBound(uint64_t offset, size_t n) {
// Adjust readhahead_size till upper_bound if upper_bound_offset_ is
// set.
Expand Down Expand Up @@ -615,7 +608,7 @@ class FilePrefetchBuffer {
uint64_t num_file_reads_;

// If explicit_prefetch_submitted_ is set then it indicates RocksDB called
// PrefetchAsync to submit request. It needs to call TryReadFromCacheAsync to
// PrefetchAsync to submit request. It needs to call TryReadFromCache to
// poll the submitted request without checking if data is sequential and
// num_file_reads_.
bool explicit_prefetch_submitted_;
Expand Down
Loading

0 comments on commit d76e301

Please sign in to comment.