From 58fcbfcedcf2083f3a1b0aad46be3e5d1d0a4a5d Mon Sep 17 00:00:00 2001 From: akankshamahajan Date: Thu, 14 Dec 2023 15:33:56 -0800 Subject: [PATCH] Addressed comments Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags: --- file/file_prefetch_buffer.cc | 92 +++++++++++++++++----------------- file/file_prefetch_buffer.h | 96 +++++++++++++++++++----------------- 2 files changed, 96 insertions(+), 92 deletions(-) diff --git a/file/file_prefetch_buffer.cc b/file/file_prefetch_buffer.cc index d5a989d34ff6..fdb6537fd0e7 100644 --- a/file/file_prefetch_buffer.cc +++ b/file/file_prefetch_buffer.cc @@ -31,18 +31,18 @@ void FilePrefetchBuffer::CalculateOffsetAndLen( // If only a few bytes exist -- reuse them & read only what is really needed. // This is typically the case of incremental reading of data. // If no bytes exist in buffer -- full pread. - if (DoesBufferContainData(buf) && IsOffsetInBuffer(buf, offset)) { + if (buf->DoesBufferContainData() && buf->IsOffsetInBuffer(offset)) { // Only a few requested bytes are in the buffer. memmove those chunk of // bytes to the beginning, and memcpy them back into the new buffer if a // new buffer is created. chunk_offset_in_buffer = Rounddown(static_cast(offset - buf->offset_), alignment); - chunk_len = static_cast(buf->buffer_.CurrentSize()) - - chunk_offset_in_buffer; + chunk_len = + static_cast(buf->CurrentSize()) - chunk_offset_in_buffer; assert(chunk_offset_in_buffer % alignment == 0); assert(chunk_len % alignment == 0); assert(chunk_offset_in_buffer + chunk_len <= - buf->offset_ + buf->buffer_.CurrentSize()); + buf->offset_ + buf->CurrentSize()); if (chunk_len > 0) { copy_data_to_new_buffer = true; } else { @@ -142,7 +142,7 @@ Status FilePrefetchBuffer::Prefetch(const IOOptions& opts, TEST_SYNC_POINT("FilePrefetchBuffer::Prefetch:Start"); - if (offset + n <= buf->offset_ + buf->buffer_.CurrentSize()) { + if (offset + n <= buf->offset_ + buf->CurrentSize()) { // All requested bytes are already in the buffer. So no need to Read again. return Status::OK(); } @@ -166,7 +166,7 @@ Status FilePrefetchBuffer::Prefetch(const IOOptions& opts, return s; } -// Copy data from src to overlap_bufs_. +// Copy data from src to overlap_buf_. void FilePrefetchBuffer::CopyDataToBuffer(BufferInfo* src, uint64_t& offset, size_t& length) { if (length == 0) { @@ -175,18 +175,18 @@ void FilePrefetchBuffer::CopyDataToBuffer(BufferInfo* src, uint64_t& offset, uint64_t copy_offset = (offset - src->offset_); size_t copy_len = 0; - if (IsDataBlockInBuffer(src, offset, length)) { + if (src->IsDataBlockInBuffer(offset, length)) { // All the bytes are in src. copy_len = length; } else { - copy_len = src->buffer_.CurrentSize() - copy_offset; + copy_len = src->CurrentSize() - copy_offset; } - BufferInfo* dst = overlap_bufs_.front(); - memcpy(dst->buffer_.BufferStart() + dst->buffer_.CurrentSize(), + BufferInfo* dst = overlap_buf_; + memcpy(dst->buffer_.BufferStart() + dst->CurrentSize(), src->buffer_.BufferStart() + copy_offset, copy_len); - dst->buffer_.Size(dst->buffer_.CurrentSize() + copy_len); + dst->buffer_.Size(dst->CurrentSize() + copy_len); // Update offset and length. offset += copy_len; @@ -206,7 +206,7 @@ void FilePrefetchBuffer::AbortOutdatedIO(uint64_t offset) { std::vector handles; std::vector tmp_buf; for (auto& buf : bufs_) { - if (IsBufferOutdatedWithAsyncProgress(buf, offset)) { + if (buf->IsBufferOutdatedWithAsyncProgress(offset)) { handles.emplace_back(buf->io_handle_); tmp_buf.emplace_back(buf); } @@ -257,7 +257,7 @@ void FilePrefetchBuffer::ClearOutdatedData(uint64_t offset, size_t length) { while (!IsBufferQueueEmpty()) { BufferInfo* buf = GetFirstBuffer(); // Offset is greater than this buffer's end offset. - if (IsBufferOutdated(buf, offset)) { + if (buf->IsBufferOutdated(offset)) { FreeFrontBuffer(); } else { break; @@ -275,10 +275,11 @@ void FilePrefetchBuffer::ClearOutdatedData(uint64_t offset, size_t length) { if (NumBuffersAllocated() > 1) { BufferInfo* next_buf = bufs_[1]; if (!next_buf->async_read_in_progress_ && !buf->async_read_in_progress_ && - DoesBufferContainData(buf)) { - if (buf->offset_ + buf->buffer_.CurrentSize() != next_buf->offset_) { - if (DoesBufferContainData(next_buf) && IsOffsetInBuffer(buf, offset) && - (offset + length > buf->offset_ + buf->buffer_.CurrentSize())) { + buf->DoesBufferContainData()) { + if (buf->offset_ + buf->CurrentSize() != next_buf->offset_) { + if (next_buf->DoesBufferContainData() && + buf->IsOffsetInBuffer(offset) && + (offset + length > buf->offset_ + buf->CurrentSize())) { // Clear all buffers after first. for (size_t i = 1; i < bufs_.size(); ++i) { bufs_[i]->ClearBuffer(); @@ -412,7 +413,7 @@ Status FilePrefetchBuffer::HandleOverlappingData( // still in progress. This should only happen if a prefetch was initiated // by Seek, but the next access is at another offset. if (buf->async_read_in_progress_ && - IsOffsetInBufferWithAsyncProgress(buf, offset)) { + buf->IsOffsetInBufferWithAsyncProgress(offset)) { PollIfNeeded(offset, length); } @@ -420,32 +421,31 @@ Status FilePrefetchBuffer::HandleOverlappingData( // If data is overlapping over two buffers, copy the data from front and // call ReadAsync on freed buffer. - if (!buf->async_read_in_progress_ && DoesBufferContainData(buf) && - IsOffsetInBuffer(buf, offset) && + if (!buf->async_read_in_progress_ && buf->DoesBufferContainData() && + buf->IsOffsetInBuffer(offset) && (/*Data extends over two buffers and second buffer either has data or in process of population=*/ (offset + length > next_buf->offset_) && (next_buf->async_read_in_progress_ || - DoesBufferContainData(next_buf)))) { - // Allocate new buffer to overlap_bufs_. - BufferInfo* overlap_buf = overlap_bufs_.front(); - overlap_buf->ClearBuffer(); - overlap_buf->buffer_.Alignment(alignment); - overlap_buf->buffer_.AllocateNewBuffer(length); - overlap_buf->offset_ = offset; + next_buf->DoesBufferContainData()))) { + // Allocate new buffer to overlap_buf_. + overlap_buf_->ClearBuffer(); + overlap_buf_->buffer_.Alignment(alignment); + overlap_buf_->buffer_.AllocateNewBuffer(length); + overlap_buf_->offset_ = offset; copy_to_overlap_buffer = true; - size_t initial_buf_size = overlap_bufs_.front()->buffer_.CurrentSize(); + size_t initial_buf_size = overlap_buf_->CurrentSize(); CopyDataToBuffer(buf, tmp_offset, tmp_length); UpdateStats( /*found_in_buffer=*/false, - overlap_bufs_.front()->buffer_.CurrentSize() - initial_buf_size); + overlap_buf_->CurrentSize() - initial_buf_size); // Call async prefetching on freed buffer since data has been consumed // only if requested data lies within next buffer. size_t second_size = next_buf->async_read_in_progress_ ? next_buf->async_req_len_ - : next_buf->buffer_.CurrentSize(); + : next_buf->CurrentSize(); uint64_t start_offset = next_buf->initial_end_offset_; // Second buffer might be out of bound if first buffer already prefetched // that data. @@ -474,8 +474,7 @@ Status FilePrefetchBuffer::HandleOverlappingData( return s; } -// If async_io is enabled in case of sequential reads, PrefetchAsyncInternal is -// called. When data is outdated, we clear the first buffer and free it as the +// When data is outdated, we clear the first buffer and free it as the // data has been consumed because of sequential reads. // // Scenarios for prefetching asynchronously: @@ -526,8 +525,8 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts, // - Return if whole data is in first and second buffer is in progress or // already full. // - If second buffer is empty, it will go for ReadAsync for second buffer. - if (!buf->async_read_in_progress_ && DoesBufferContainData(buf) && - IsDataBlockInBuffer(buf, offset, length)) { + if (!buf->async_read_in_progress_ && buf->DoesBufferContainData() && + buf->IsDataBlockInBuffer(offset, length)) { // Whole data is in buffer. if (!IsEligibleForFurtherPrefetching()) { UpdateStats(/*found_in_buffer=*/true, original_length); @@ -546,15 +545,15 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts, // After polling, if all the requested bytes are in first buffer, it will only // go for async prefetching. - if (DoesBufferContainData(buf)) { + if (buf->DoesBufferContainData()) { if (copy_to_overlap_buffer) { // Data is overlapping i.e. some of the data has been copied to overlap // buffer and remaining will be updated below. - size_t initial_buf_size = overlap_bufs_.front()->buffer_.CurrentSize(); + size_t initial_buf_size = overlap_buf_->CurrentSize(); CopyDataToBuffer(buf, offset, length); UpdateStats( /*found_in_buffer=*/false, - overlap_bufs_.front()->buffer_.CurrentSize() - initial_buf_size); + overlap_buf_->CurrentSize() - initial_buf_size); // Length == 0: All the requested data has been copied to overlap buffer // and it has already gone for async prefetching. It can return without @@ -566,7 +565,7 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts, return s; } } else { - if (IsDataBlockInBuffer(buf, offset, length)) { + if (buf->IsDataBlockInBuffer(offset, length)) { offset += length; length = 0; // Since async request was submitted directly by calling PrefetchAsync @@ -591,7 +590,7 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts, // In case because of some IOError first buffer got empty, abort IO for all // buffers as well. Otherwise data might not align if more data needs to be // read in first buffer which might overlap with second buffer. - if (!DoesBufferContainData(buf)) { + if (!buf->DoesBufferContainData()) { std::vector handles; if (NumBuffersAllocated() > 1) { for (auto& _buf : bufs_) { @@ -718,7 +717,7 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts, if (explicit_prefetch_submitted_ || (buf->async_read_in_progress_ || - offset + n > buf->offset_ + buf->buffer_.CurrentSize())) { + offset + n > buf->offset_ + buf->CurrentSize())) { // In case readahead_size is trimmed (=0), we still want to poll the data // submitted with explicit_prefetch_submitted_=true. if (readahead_size_ > 0 || explicit_prefetch_submitted_) { @@ -768,7 +767,7 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts, buf = GetFirstBuffer(); if (copy_to_overlap_buffer) { - buf = overlap_bufs_.front(); + buf = overlap_buf_; } uint64_t offset_in_buffer = offset - buf->offset_; *result = Slice(buf->buffer_.BufferStart() + offset_in_buffer, n); @@ -792,8 +791,7 @@ void FilePrefetchBuffer::PrefetchAsyncCallback(const FSReadRequest& req, #endif if (req.status.ok()) { - if (req.offset + req.result.size() <= - buf->offset_ + buf->buffer_.CurrentSize()) { + if (req.offset + req.result.size() <= buf->offset_ + buf->CurrentSize()) { // All requested bytes are already in the buffer or no data is read // because of EOF. So no need to update. return; @@ -803,7 +801,7 @@ void FilePrefetchBuffer::PrefetchAsyncCallback(const FSReadRequest& req, // read). So ignore this read. return; } - size_t current_size = buf->buffer_.CurrentSize(); + size_t current_size = buf->CurrentSize(); buf->buffer_.Size(current_size + req.result.size()); } } @@ -842,7 +840,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, // and data in buffer. if (!IsBufferQueueEmpty()) { BufferInfo* buf = GetFirstBuffer(); - if (readaheadsize_cb_ != nullptr || !IsOffsetInBuffer(buf, offset)) { + if (readaheadsize_cb_ != nullptr || !buf->IsOffsetInBuffer(offset)) { FreeAllBuffers(); } } @@ -854,7 +852,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, // If first buffer has full data. if (!IsBufferQueueEmpty()) { BufferInfo* buf = GetFirstBuffer(); - if (DoesBufferContainData(buf) && IsDataBlockInBuffer(buf, offset, n)) { + if (buf->DoesBufferContainData() && buf->IsDataBlockInBuffer(offset, n)) { uint64_t offset_in_buffer = offset - buf->offset_; *result = Slice(buf->buffer_.BufferStart() + offset_in_buffer, n); data_found = true; @@ -897,7 +895,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, // eligible. // Calculate length and offsets for reading. - if (!DoesBufferContainData(buf)) { + if (!buf->DoesBufferContainData()) { uint64_t roundup_len1; // Prefetch full data + readahead_size in the first buffer. if (is_eligible_for_prefetching || reader->use_direct_io()) { diff --git a/file/file_prefetch_buffer.h b/file/file_prefetch_buffer.h index f130b80187ef..ddf3d7d01fe1 100644 --- a/file/file_prefetch_buffer.h +++ b/file/file_prefetch_buffer.h @@ -61,6 +61,7 @@ struct BufferInfo { void ClearBuffer() { buffer_.Clear(); initial_end_offset_ = 0; + async_req_len_ = 0; } AlignedBuffer buffer_; @@ -96,6 +97,34 @@ struct BufferInfo { // prefetch call, start_offset should be intialized to 100 i.e start_offset = // buf->initial_end_offset_. uint64_t initial_end_offset_ = 0; + + bool IsDataBlockInBuffer(uint64_t offset, size_t length) { + return (offset >= offset_ && + offset + length <= offset_ + buffer_.CurrentSize()); + } + + bool IsOffsetInBuffer(uint64_t offset) { + return (offset >= offset_ && offset < offset_ + buffer_.CurrentSize()); + } + + bool DoesBufferContainData() { return buffer_.CurrentSize() > 0; } + + bool IsBufferOutdated(uint64_t offset) { + return (!async_read_in_progress_ && DoesBufferContainData() && + offset >= offset_ + buffer_.CurrentSize()); + } + + bool IsBufferOutdatedWithAsyncProgress(uint64_t offset) { + return (async_read_in_progress_ && io_handle_ != nullptr && + offset >= offset_ + async_req_len_); + } + + bool IsOffsetInBufferWithAsyncProgress(uint64_t offset) { + return (async_read_in_progress_ && offset >= offset_ && + offset < offset_ + async_req_len_); + } + + size_t CurrentSize() { return buffer_.CurrentSize(); } }; enum class FilePrefetchBufferUsage { @@ -105,22 +134,25 @@ enum class FilePrefetchBufferUsage { }; // Implementation: -// FilePrefetchBuffer maintains a dequeu of free buffers (free_bufs_) of size -// num_buffers_ and bufs_ which contains the prefetched data. Whenever a buffer -// is consumed or is outdated (w.r.t. to requested offset), that buffer is -// cleared and returned to free_bufs_. +// FilePrefetchBuffer maintains a dequeu of free buffers (free_bufs_) with no +// data and bufs_ which contains the prefetched data. Whenever a buffer is +// consumed or is outdated (w.r.t. to requested offset), that buffer is cleared +// and returned to free_bufs_. // // If a buffer is available in free_bufs_, it's moved to bufs_ and is sent for -// prefetching. num_buffers_ defines how many buffers are maintained that -// contains prefetched data. +// prefetching. +// num_buffers_ defines how many buffers FilePrefetchBuffer can maintain at a +// time that contains prefetched data with num_buffers_ == bufs_.size() + +// free_bufs_.size(). +// // If num_buffers_ == 1, it's a sequential read flow. Read API will be called on // that one buffer whenever the data is requested and is not in the buffer. // If num_buffers_ > 1, then the data is prefetched asynchronosuly in the // buffers whenever the data is consumed from the buffers and that buffer is // freed. // If num_buffers > 1, then requested data can be overlapping between 2 buffers. -// To return the continuous buffer, overlap_bufs_ is used. The requested data is -// copied from 2 buffers to the overlap_bufs_ and overlap_bufs_ is returned to +// To return the continuous buffer, overlap_buf_ is used. The requested data is +// copied from 2 buffers to the overlap_buf_ and overlap_buf_ is returned to // the caller. // FilePrefetchBuffer is a smart buffer to store and read data from a file. @@ -170,9 +202,9 @@ class FilePrefetchBuffer { // If num_buffers_ > 1, data is asynchronously filled in the // queue. As result, data can be overlapping in two buffers. It copies the - // data to overlap_bufs_ in order to to return continuous buffer. + // data to overlap_buf_ in order to to return continuous buffer. if (num_buffers_ > 1) { - overlap_bufs_.emplace_back(new BufferInfo()); + overlap_buf_ = new BufferInfo(); } free_bufs_.resize(num_buffers_); @@ -210,7 +242,7 @@ class FilePrefetchBuffer { uint64_t bytes_discarded = 0; // Iterated over buffers. for (auto& buf : bufs_) { - if (DoesBufferContainData(buf)) { + if (buf->DoesBufferContainData()) { // If last read was from this block and some bytes are still unconsumed. if (prev_offset_ >= buf->offset_ && prev_offset_ + prev_len_ < @@ -238,9 +270,9 @@ class FilePrefetchBuffer { buf = nullptr; } - for (auto& buf : overlap_bufs_) { - delete buf; - buf = nullptr; + if (overlap_buf_ != nullptr) { + delete overlap_buf_; + overlap_buf_ = nullptr; } } @@ -377,7 +409,7 @@ class FilePrefetchBuffer { RandomAccessFileReader* reader, uint64_t read_len, uint64_t start_offset); - // Copy the data from src to overlap_bufs_. + // Copy the data from src to overlap_buf_. void CopyDataToBuffer(BufferInfo* src, uint64_t& offset, size_t& length); bool IsBlockSequential(const size_t& offset) { @@ -474,32 +506,6 @@ class FilePrefetchBuffer { uint64_t end_offset1, size_t alignment, size_t readahead_size); - // *** BEGIN Helper APIs related to data in Buffers *** - bool IsDataBlockInBuffer(BufferInfo* buf, uint64_t offset, size_t length) { - return (offset >= buf->offset_ && - offset + length <= buf->offset_ + buf->buffer_.CurrentSize()); - } - bool IsOffsetInBuffer(BufferInfo* buf, uint64_t offset) { - return (offset >= buf->offset_ && - offset < buf->offset_ + buf->buffer_.CurrentSize()); - } - bool DoesBufferContainData(BufferInfo* buf) { - return buf->buffer_.CurrentSize() > 0; - } - bool IsBufferOutdated(BufferInfo* buf, uint64_t offset) { - return (!buf->async_read_in_progress_ && DoesBufferContainData(buf) && - offset >= buf->offset_ + buf->buffer_.CurrentSize()); - } - bool IsBufferOutdatedWithAsyncProgress(BufferInfo* buf, uint64_t offset) { - return (buf->async_read_in_progress_ && buf->io_handle_ != nullptr && - offset >= buf->offset_ + buf->async_req_len_); - } - bool IsOffsetInBufferWithAsyncProgress(BufferInfo* buf, uint64_t offset) { - return (buf->async_read_in_progress_ && offset >= buf->offset_ && - offset < buf->offset_ + buf->async_req_len_); - } - // *** END Helper APIs related to data in Buffers *** - // *** BEGIN APIs related to allocating and freeing buffers *** bool IsBufferQueueEmpty() { return bufs_.empty(); } @@ -524,14 +530,14 @@ class FilePrefetchBuffer { void FreeFrontBuffer() { BufferInfo* buf = bufs_.front(); - buf->buffer_.Clear(); + buf->ClearBuffer(); bufs_.pop_front(); free_bufs_.emplace_back(buf); } void FreeLastBuffer() { BufferInfo* buf = bufs_.back(); - buf->buffer_.Clear(); + buf->ClearBuffer(); bufs_.pop_back(); free_bufs_.emplace_back(buf); } @@ -553,7 +559,7 @@ class FilePrefetchBuffer { while (!bufs_.empty()) { BufferInfo* buf = bufs_.front(); bufs_.pop_front(); - if (buf->async_read_in_progress_ || DoesBufferContainData(buf)) { + if (buf->async_read_in_progress_ || buf->DoesBufferContainData()) { tmp_buf.emplace_back(buf); } else { free_bufs_.emplace_back(buf); @@ -566,7 +572,7 @@ class FilePrefetchBuffer { std::deque bufs_; std::deque free_bufs_; - std::deque overlap_bufs_; + BufferInfo* overlap_buf_ = nullptr; size_t readahead_size_; size_t initial_auto_readahead_size_;