Skip to content

Commit

Permalink
Format code
Browse files Browse the repository at this point in the history
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
  • Loading branch information
akankshamahajan15 committed Dec 15, 2023
1 parent 58fcbfc commit 3332598
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 108 deletions.
208 changes: 106 additions & 102 deletions file/file_prefetch_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -656,126 +656,128 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts,
if (copy_to_overlap_buffer && length > 0) {
CopyDataToBuffer(buf, offset, length);
}
return s;
}
return s;
}

bool FilePrefetchBuffer::TryReadFromCache(
const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset,
size_t n, Slice* result, Status* status, bool for_compaction) {
bool ret = TryReadFromCacheUntracked(opts, reader, offset, n, result,
status, for_compaction);
if (usage_ == FilePrefetchBufferUsage::kTableOpenPrefetchTail && enable_) {
if (ret) {
RecordTick(stats_, TABLE_OPEN_PREFETCH_TAIL_HIT);
} else {
RecordTick(stats_, TABLE_OPEN_PREFETCH_TAIL_MISS);
}
bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,
RandomAccessFileReader* reader,
uint64_t offset, size_t n,
Slice* result, Status* status,
bool for_compaction) {
bool ret = TryReadFromCacheUntracked(opts, reader, offset, n, result, status,
for_compaction);
if (usage_ == FilePrefetchBufferUsage::kTableOpenPrefetchTail && enable_) {
if (ret) {
RecordTick(stats_, TABLE_OPEN_PREFETCH_TAIL_HIT);
} else {
RecordTick(stats_, TABLE_OPEN_PREFETCH_TAIL_MISS);
}
return ret;
}
return ret;
}

bool FilePrefetchBuffer::TryReadFromCacheUntracked(
const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset,
size_t n, Slice* result, Status* status, bool for_compaction) {
if (track_min_offset_ && offset < min_offset_read_) {
min_offset_read_ = static_cast<size_t>(offset);
}

if (!enable_) {
return false;
}

if (explicit_prefetch_submitted_) {
// explicit_prefetch_submitted_ is special case where it expects request
// submitted in PrefetchAsync should match with this request. Otherwise
// buffers will be outdated.
// Random offset called. So abort the IOs.
if (prev_offset_ != offset) {
AbortAllIOs();
FreeAllBuffers();
explicit_prefetch_submitted_ = false;
return false;
}
}
bool FilePrefetchBuffer::TryReadFromCacheUntracked(
const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset,
size_t n, Slice* result, Status* status, bool for_compaction) {
if (track_min_offset_ && offset < min_offset_read_) {
min_offset_read_ = static_cast<size_t>(offset);
}

AllocateBufferIfEmpty();
BufferInfo* buf = GetFirstBuffer();
if (!enable_) {
return false;
}

if (!explicit_prefetch_submitted_ && offset < buf->offset_) {
if (explicit_prefetch_submitted_) {
// explicit_prefetch_submitted_ is special case where it expects request
// submitted in PrefetchAsync should match with this request. Otherwise
// buffers will be outdated.
// Random offset called. So abort the IOs.
if (prev_offset_ != offset) {
AbortAllIOs();
FreeAllBuffers();
explicit_prefetch_submitted_ = false;
return false;
}
}

bool prefetched = false;
bool copy_to_overlap_buffer = false;
// If the buffer contains only a few of the requested bytes:
// If readahead is enabled: prefetch the remaining bytes + readahead
// bytes
// and satisfy the request.
// If readahead is not enabled: return false.
TEST_SYNC_POINT_CALLBACK("FilePrefetchBuffer::TryReadFromCache",
&readahead_size_);

if (explicit_prefetch_submitted_ ||
(buf->async_read_in_progress_ ||
offset + n > buf->offset_ + buf->CurrentSize())) {
// In case readahead_size is trimmed (=0), we still want to poll the data
// submitted with explicit_prefetch_submitted_=true.
if (readahead_size_ > 0 || explicit_prefetch_submitted_) {
Status s;
assert(reader != nullptr);
assert(max_readahead_size_ >= readahead_size_);

if (for_compaction) {
s = Prefetch(opts, reader, offset, std::max(n, readahead_size_));
} else {
if (implicit_auto_readahead_) {
if (!IsEligibleForPrefetch(offset, n)) {
// Ignore status as Prefetch is not called.
s.PermitUncheckedError();
return false;
}
}
AllocateBufferIfEmpty();
BufferInfo* buf = GetFirstBuffer();

// Prefetch n + readahead_size_/2 synchronously as remaining
// readahead_size_/2 will be prefetched asynchronously if num_buffers_
// > 1.
s = PrefetchInternal(
opts, reader, offset, n,
(num_buffers_ > 1 ? readahead_size_ / 2 : readahead_size_),
copy_to_overlap_buffer);
explicit_prefetch_submitted_ = false;
if (!explicit_prefetch_submitted_ && offset < buf->offset_) {
return false;
}

bool prefetched = false;
bool copy_to_overlap_buffer = false;
// If the buffer contains only a few of the requested bytes:
// If readahead is enabled: prefetch the remaining bytes + readahead
// bytes
// and satisfy the request.
// If readahead is not enabled: return false.
TEST_SYNC_POINT_CALLBACK("FilePrefetchBuffer::TryReadFromCache",
&readahead_size_);

if (explicit_prefetch_submitted_ ||
(buf->async_read_in_progress_ ||
offset + n > buf->offset_ + buf->CurrentSize())) {
// In case readahead_size is trimmed (=0), we still want to poll the data
// submitted with explicit_prefetch_submitted_=true.
if (readahead_size_ > 0 || explicit_prefetch_submitted_) {
Status s;
assert(reader != nullptr);
assert(max_readahead_size_ >= readahead_size_);

if (for_compaction) {
s = Prefetch(opts, reader, offset, std::max(n, readahead_size_));
} else {
if (implicit_auto_readahead_) {
if (!IsEligibleForPrefetch(offset, n)) {
// Ignore status as Prefetch is not called.
s.PermitUncheckedError();
return false;
}
}

if (!s.ok()) {
if (status) {
*status = s;
}
// Prefetch n + readahead_size_/2 synchronously as remaining
// readahead_size_/2 will be prefetched asynchronously if num_buffers_
// > 1.
s = PrefetchInternal(
opts, reader, offset, n,
(num_buffers_ > 1 ? readahead_size_ / 2 : readahead_size_),
copy_to_overlap_buffer);
explicit_prefetch_submitted_ = false;
}

if (!s.ok()) {
if (status) {
*status = s;
}
#ifndef NDEBUG
IGNORE_STATUS_IF_ERROR(s);
#endif
return false;
}
prefetched = explicit_prefetch_submitted_ ? false : true;
} else {
return false;
}
} else if (!for_compaction) {
UpdateStats(/*found_in_buffer=*/true, n);
prefetched = explicit_prefetch_submitted_ ? false : true;
} else {
return false;
}
} else if (!for_compaction) {
UpdateStats(/*found_in_buffer=*/true, n);
}

UpdateReadPattern(offset, n, /*decrease_readaheadsize=*/false);
UpdateReadPattern(offset, n, /*decrease_readaheadsize=*/false);

buf = GetFirstBuffer();
if (copy_to_overlap_buffer) {
buf = overlap_buf_;
}
uint64_t offset_in_buffer = offset - buf->offset_;
*result = Slice(buf->buffer_.BufferStart() + offset_in_buffer, n);
if (prefetched) {
readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2);
}
return true;
buf = GetFirstBuffer();
if (copy_to_overlap_buffer) {
buf = overlap_buf_;
}
uint64_t offset_in_buffer = offset - buf->offset_;
*result = Slice(buf->buffer_.BufferStart() + offset_in_buffer, n);
if (prefetched) {
readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2);
}
return true;
}

void FilePrefetchBuffer::PrefetchAsyncCallback(const FSReadRequest& req,
void* cb_arg) {
Expand Down Expand Up @@ -939,9 +941,11 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
return (data_found ? Status::OK() : Status::TryAgain());
}

Status FilePrefetchBuffer::PrefetchRemBuffers(
const IOOptions& opts, RandomAccessFileReader* reader, uint64_t end_offset1,
size_t alignment, size_t readahead_size) {
Status FilePrefetchBuffer::PrefetchRemBuffers(const IOOptions& opts,
RandomAccessFileReader* reader,
uint64_t end_offset1,
size_t alignment,
size_t readahead_size) {
Status s;
while (NumBuffersAllocated() < num_buffers_) {
BufferInfo* prev_buf = GetLastBuffer();
Expand Down
10 changes: 4 additions & 6 deletions table/block_based/block_based_table_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -707,12 +707,10 @@ struct BlockBasedTable::Rep {
}

void CreateFilePrefetchBufferIfNotExists(
const ReadaheadParams& readahead_params,
std::unique_ptr<FilePrefetchBuffer>* fpb,
const std::function<void(bool, uint64_t&, uint64_t&)>&
readaheadsize_cb,
FilePrefetchBufferUsage usage = FilePrefetchBufferUsage::kUnknown)
const {
const ReadaheadParams& readahead_params,
std::unique_ptr<FilePrefetchBuffer>* fpb,
const std::function<void(bool, uint64_t&, uint64_t&)>& readaheadsize_cb,
FilePrefetchBufferUsage usage = FilePrefetchBufferUsage::kUnknown) const {
if (!(*fpb)) {
CreateFilePrefetchBuffer(readahead_params, fpb, readaheadsize_cb, usage);
}
Expand Down

0 comments on commit 3332598

Please sign in to comment.