Skip to content

Commit

Permalink
Addressed comments
Browse files Browse the repository at this point in the history
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
  • Loading branch information
akankshamahajan15 committed Dec 15, 2023
1 parent 06c9f9c commit 58fcbfc
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 92 deletions.
92 changes: 45 additions & 47 deletions file/file_prefetch_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,18 @@ void FilePrefetchBuffer::CalculateOffsetAndLen(
// If only a few bytes exist -- reuse them & read only what is really needed.
// This is typically the case of incremental reading of data.
// If no bytes exist in buffer -- full pread.
if (DoesBufferContainData(buf) && IsOffsetInBuffer(buf, offset)) {
if (buf->DoesBufferContainData() && buf->IsOffsetInBuffer(offset)) {
// Only a few requested bytes are in the buffer. memmove those chunk of
// bytes to the beginning, and memcpy them back into the new buffer if a
// new buffer is created.
chunk_offset_in_buffer =
Rounddown(static_cast<size_t>(offset - buf->offset_), alignment);
chunk_len = static_cast<uint64_t>(buf->buffer_.CurrentSize()) -
chunk_offset_in_buffer;
chunk_len =
static_cast<uint64_t>(buf->CurrentSize()) - chunk_offset_in_buffer;
assert(chunk_offset_in_buffer % alignment == 0);
assert(chunk_len % alignment == 0);
assert(chunk_offset_in_buffer + chunk_len <=
buf->offset_ + buf->buffer_.CurrentSize());
buf->offset_ + buf->CurrentSize());
if (chunk_len > 0) {
copy_data_to_new_buffer = true;
} else {
Expand Down Expand Up @@ -142,7 +142,7 @@ Status FilePrefetchBuffer::Prefetch(const IOOptions& opts,

TEST_SYNC_POINT("FilePrefetchBuffer::Prefetch:Start");

if (offset + n <= buf->offset_ + buf->buffer_.CurrentSize()) {
if (offset + n <= buf->offset_ + buf->CurrentSize()) {
// All requested bytes are already in the buffer. So no need to Read again.
return Status::OK();
}
Expand All @@ -166,7 +166,7 @@ Status FilePrefetchBuffer::Prefetch(const IOOptions& opts,
return s;
}

// Copy data from src to overlap_bufs_.
// Copy data from src to overlap_buf_.
void FilePrefetchBuffer::CopyDataToBuffer(BufferInfo* src, uint64_t& offset,
size_t& length) {
if (length == 0) {
Expand All @@ -175,18 +175,18 @@ void FilePrefetchBuffer::CopyDataToBuffer(BufferInfo* src, uint64_t& offset,

uint64_t copy_offset = (offset - src->offset_);
size_t copy_len = 0;
if (IsDataBlockInBuffer(src, offset, length)) {
if (src->IsDataBlockInBuffer(offset, length)) {
// All the bytes are in src.
copy_len = length;
} else {
copy_len = src->buffer_.CurrentSize() - copy_offset;
copy_len = src->CurrentSize() - copy_offset;
}

BufferInfo* dst = overlap_bufs_.front();
memcpy(dst->buffer_.BufferStart() + dst->buffer_.CurrentSize(),
BufferInfo* dst = overlap_buf_;
memcpy(dst->buffer_.BufferStart() + dst->CurrentSize(),
src->buffer_.BufferStart() + copy_offset, copy_len);

dst->buffer_.Size(dst->buffer_.CurrentSize() + copy_len);
dst->buffer_.Size(dst->CurrentSize() + copy_len);

// Update offset and length.
offset += copy_len;
Expand All @@ -206,7 +206,7 @@ void FilePrefetchBuffer::AbortOutdatedIO(uint64_t offset) {
std::vector<void*> handles;
std::vector<BufferInfo*> tmp_buf;
for (auto& buf : bufs_) {
if (IsBufferOutdatedWithAsyncProgress(buf, offset)) {
if (buf->IsBufferOutdatedWithAsyncProgress(offset)) {
handles.emplace_back(buf->io_handle_);
tmp_buf.emplace_back(buf);
}
Expand Down Expand Up @@ -257,7 +257,7 @@ void FilePrefetchBuffer::ClearOutdatedData(uint64_t offset, size_t length) {
while (!IsBufferQueueEmpty()) {
BufferInfo* buf = GetFirstBuffer();
// Offset is greater than this buffer's end offset.
if (IsBufferOutdated(buf, offset)) {
if (buf->IsBufferOutdated(offset)) {
FreeFrontBuffer();
} else {
break;
Expand All @@ -275,10 +275,11 @@ void FilePrefetchBuffer::ClearOutdatedData(uint64_t offset, size_t length) {
if (NumBuffersAllocated() > 1) {
BufferInfo* next_buf = bufs_[1];
if (!next_buf->async_read_in_progress_ && !buf->async_read_in_progress_ &&
DoesBufferContainData(buf)) {
if (buf->offset_ + buf->buffer_.CurrentSize() != next_buf->offset_) {
if (DoesBufferContainData(next_buf) && IsOffsetInBuffer(buf, offset) &&
(offset + length > buf->offset_ + buf->buffer_.CurrentSize())) {
buf->DoesBufferContainData()) {
if (buf->offset_ + buf->CurrentSize() != next_buf->offset_) {
if (next_buf->DoesBufferContainData() &&
buf->IsOffsetInBuffer(offset) &&
(offset + length > buf->offset_ + buf->CurrentSize())) {
// Clear all buffers after first.
for (size_t i = 1; i < bufs_.size(); ++i) {
bufs_[i]->ClearBuffer();
Expand Down Expand Up @@ -412,40 +413,39 @@ Status FilePrefetchBuffer::HandleOverlappingData(
// still in progress. This should only happen if a prefetch was initiated
// by Seek, but the next access is at another offset.
if (buf->async_read_in_progress_ &&
IsOffsetInBufferWithAsyncProgress(buf, offset)) {
buf->IsOffsetInBufferWithAsyncProgress(offset)) {
PollIfNeeded(offset, length);
}

BufferInfo* next_buf = bufs_[1];

// If data is overlapping over two buffers, copy the data from front and
// call ReadAsync on freed buffer.
if (!buf->async_read_in_progress_ && DoesBufferContainData(buf) &&
IsOffsetInBuffer(buf, offset) &&
if (!buf->async_read_in_progress_ && buf->DoesBufferContainData() &&
buf->IsOffsetInBuffer(offset) &&
(/*Data extends over two buffers and second buffer either has data or in
process of population=*/
(offset + length > next_buf->offset_) &&
(next_buf->async_read_in_progress_ ||
DoesBufferContainData(next_buf)))) {
// Allocate new buffer to overlap_bufs_.
BufferInfo* overlap_buf = overlap_bufs_.front();
overlap_buf->ClearBuffer();
overlap_buf->buffer_.Alignment(alignment);
overlap_buf->buffer_.AllocateNewBuffer(length);
overlap_buf->offset_ = offset;
next_buf->DoesBufferContainData()))) {
// Allocate new buffer to overlap_buf_.
overlap_buf_->ClearBuffer();
overlap_buf_->buffer_.Alignment(alignment);
overlap_buf_->buffer_.AllocateNewBuffer(length);
overlap_buf_->offset_ = offset;
copy_to_overlap_buffer = true;

size_t initial_buf_size = overlap_bufs_.front()->buffer_.CurrentSize();
size_t initial_buf_size = overlap_buf_->CurrentSize();
CopyDataToBuffer(buf, tmp_offset, tmp_length);
UpdateStats(
/*found_in_buffer=*/false,
overlap_bufs_.front()->buffer_.CurrentSize() - initial_buf_size);
overlap_buf_->CurrentSize() - initial_buf_size);

// Call async prefetching on freed buffer since data has been consumed
// only if requested data lies within next buffer.
size_t second_size = next_buf->async_read_in_progress_
? next_buf->async_req_len_
: next_buf->buffer_.CurrentSize();
: next_buf->CurrentSize();
uint64_t start_offset = next_buf->initial_end_offset_;
// Second buffer might be out of bound if first buffer already prefetched
// that data.
Expand Down Expand Up @@ -474,8 +474,7 @@ Status FilePrefetchBuffer::HandleOverlappingData(
return s;
}

// If async_io is enabled in case of sequential reads, PrefetchAsyncInternal is
// called. When data is outdated, we clear the first buffer and free it as the
// When data is outdated, we clear the first buffer and free it as the
// data has been consumed because of sequential reads.
//
// Scenarios for prefetching asynchronously:
Expand Down Expand Up @@ -526,8 +525,8 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts,
// - Return if whole data is in first and second buffer is in progress or
// already full.
// - If second buffer is empty, it will go for ReadAsync for second buffer.
if (!buf->async_read_in_progress_ && DoesBufferContainData(buf) &&
IsDataBlockInBuffer(buf, offset, length)) {
if (!buf->async_read_in_progress_ && buf->DoesBufferContainData() &&
buf->IsDataBlockInBuffer(offset, length)) {
// Whole data is in buffer.
if (!IsEligibleForFurtherPrefetching()) {
UpdateStats(/*found_in_buffer=*/true, original_length);
Expand All @@ -546,15 +545,15 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts,

// After polling, if all the requested bytes are in first buffer, it will only
// go for async prefetching.
if (DoesBufferContainData(buf)) {
if (buf->DoesBufferContainData()) {
if (copy_to_overlap_buffer) {
// Data is overlapping i.e. some of the data has been copied to overlap
// buffer and remaining will be updated below.
size_t initial_buf_size = overlap_bufs_.front()->buffer_.CurrentSize();
size_t initial_buf_size = overlap_buf_->CurrentSize();
CopyDataToBuffer(buf, offset, length);
UpdateStats(
/*found_in_buffer=*/false,
overlap_bufs_.front()->buffer_.CurrentSize() - initial_buf_size);
overlap_buf_->CurrentSize() - initial_buf_size);

// Length == 0: All the requested data has been copied to overlap buffer
// and it has already gone for async prefetching. It can return without
Expand All @@ -566,7 +565,7 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts,
return s;
}
} else {
if (IsDataBlockInBuffer(buf, offset, length)) {
if (buf->IsDataBlockInBuffer(offset, length)) {
offset += length;
length = 0;
// Since async request was submitted directly by calling PrefetchAsync
Expand All @@ -591,7 +590,7 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts,
// In case because of some IOError first buffer got empty, abort IO for all
// buffers as well. Otherwise data might not align if more data needs to be
// read in first buffer which might overlap with second buffer.
if (!DoesBufferContainData(buf)) {
if (!buf->DoesBufferContainData()) {
std::vector<void*> handles;
if (NumBuffersAllocated() > 1) {
for (auto& _buf : bufs_) {
Expand Down Expand Up @@ -718,7 +717,7 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts,

if (explicit_prefetch_submitted_ ||
(buf->async_read_in_progress_ ||
offset + n > buf->offset_ + buf->buffer_.CurrentSize())) {
offset + n > buf->offset_ + buf->CurrentSize())) {
// In case readahead_size is trimmed (=0), we still want to poll the data
// submitted with explicit_prefetch_submitted_=true.
if (readahead_size_ > 0 || explicit_prefetch_submitted_) {
Expand Down Expand Up @@ -768,7 +767,7 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts,

buf = GetFirstBuffer();
if (copy_to_overlap_buffer) {
buf = overlap_bufs_.front();
buf = overlap_buf_;
}
uint64_t offset_in_buffer = offset - buf->offset_;
*result = Slice(buf->buffer_.BufferStart() + offset_in_buffer, n);
Expand All @@ -792,8 +791,7 @@ void FilePrefetchBuffer::PrefetchAsyncCallback(const FSReadRequest& req,
#endif

if (req.status.ok()) {
if (req.offset + req.result.size() <=
buf->offset_ + buf->buffer_.CurrentSize()) {
if (req.offset + req.result.size() <= buf->offset_ + buf->CurrentSize()) {
// All requested bytes are already in the buffer or no data is read
// because of EOF. So no need to update.
return;
Expand All @@ -803,7 +801,7 @@ void FilePrefetchBuffer::PrefetchAsyncCallback(const FSReadRequest& req,
// read). So ignore this read.
return;
}
size_t current_size = buf->buffer_.CurrentSize();
size_t current_size = buf->CurrentSize();
buf->buffer_.Size(current_size + req.result.size());
}
}
Expand Down Expand Up @@ -842,7 +840,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
// and data in buffer.
if (!IsBufferQueueEmpty()) {
BufferInfo* buf = GetFirstBuffer();
if (readaheadsize_cb_ != nullptr || !IsOffsetInBuffer(buf, offset)) {
if (readaheadsize_cb_ != nullptr || !buf->IsOffsetInBuffer(offset)) {
FreeAllBuffers();
}
}
Expand All @@ -854,7 +852,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
// If first buffer has full data.
if (!IsBufferQueueEmpty()) {
BufferInfo* buf = GetFirstBuffer();
if (DoesBufferContainData(buf) && IsDataBlockInBuffer(buf, offset, n)) {
if (buf->DoesBufferContainData() && buf->IsDataBlockInBuffer(offset, n)) {
uint64_t offset_in_buffer = offset - buf->offset_;
*result = Slice(buf->buffer_.BufferStart() + offset_in_buffer, n);
data_found = true;
Expand Down Expand Up @@ -897,7 +895,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
// eligible.

// Calculate length and offsets for reading.
if (!DoesBufferContainData(buf)) {
if (!buf->DoesBufferContainData()) {
uint64_t roundup_len1;
// Prefetch full data + readahead_size in the first buffer.
if (is_eligible_for_prefetching || reader->use_direct_io()) {
Expand Down
Loading

0 comments on commit 58fcbfc

Please sign in to comment.