diff --git a/async_refactor b/async_refactor new file mode 100644 index 000000000000..e60ef6c001e5 --- /dev/null +++ b/async_refactor @@ -0,0 +1,469 @@ +commit 53b7edf5a5bdc92b77d278edf32be320ac10c3b9 +Author: akankshamahajan +Date: Mon Jul 17 13:35:51 2023 -0700 + + Async refactor + + Summary: + + Test Plan: + + Reviewers: + + Subscribers: + + Tasks: + + Tags: + +diff --git a/file/file_prefetch_buffer.cc b/file/file_prefetch_buffer.cc +index fc07a59da..e8f0674f6 100644 +--- a/file/file_prefetch_buffer.cc ++++ b/file/file_prefetch_buffer.cc +@@ -103,11 +103,10 @@ Status FilePrefetchBuffer::Read(BufferInfo* buf, const IOOptions& opts, + return s; + } + +-/* +-Status FilePrefetchBuffer::ReadAsync(const IOOptions& opts, ++Status FilePrefetchBuffer::ReadAsync(BufferInfo* buf, const IOOptions& opts, + RandomAccessFileReader* reader, + uint64_t read_len, +- uint64_t rounddown_start, uint32_t index) { ++ uint64_t rounddown_start) { + TEST_SYNC_POINT("FilePrefetchBuffer::ReadAsync"); + // callback for async read request. + auto fp = std::bind(&FilePrefetchBuffer::PrefetchAsyncCallback, this, +@@ -117,20 +116,19 @@ Status FilePrefetchBuffer::ReadAsync(const IOOptions& opts, + req.len = read_len; + req.offset = rounddown_start; + req.result = result; +- req.scratch = bufs_[index].buffer_.BufferStart(); +- bufs_[index].async_req_len_ = req.len; ++ req.scratch = buf->buffer_.BufferStart(); ++ buf->async_req_len_ = req.len; ++ ++ Status s = reader->ReadAsync(req, opts, fp, &(buf->pos_), &(buf->io_handle_), ++ &(buf->del_fn_), ++ /*aligned_buf=*/nullptr); + +- Status s = +- reader->ReadAsync(req, opts, fp, &(bufs_[index].pos_), +- &(bufs_[index].io_handle_), &(bufs_[index].del_fn_), +- //aligned_buf/nullptr); + req.status.PermitUncheckedError(); + if (s.ok()) { +- bufs_[index].async_read_in_progress_ = true; ++ buf->async_read_in_progress_ = true; + } + return s; + } +-*/ + + Status FilePrefetchBuffer::Prefetch(const IOOptions& opts, + RandomAccessFileReader* reader, +@@ -203,10 +201,12 @@ void FilePrefetchBuffer::CopyDataToBuffer(uint32_t src, uint64_t& offset, + bufs_[src].buffer_.Clear(); + } + } ++*/ + + // Clear the buffers if it contains outdated data. Outdated data can be + // because previous sequential reads were read from the cache instead of these + // buffer. In that case outdated IOs should be aborted. ++/* + void FilePrefetchBuffer::AbortIOIfNeeded(uint64_t offset) { + uint32_t second = curr_ ^ 1; + std::vector handles; +@@ -228,6 +228,7 @@ void FilePrefetchBuffer::AbortIOIfNeeded(uint64_t offset) { + for (auto& pos : buf_pos) { + // Release io_handle. + DestroyAndClearIOHandle(pos); ++ bufs_[second].async_read_in_progress_ = false; + } + + if (bufs_[second].io_handle_ == nullptr) { +@@ -238,15 +239,16 @@ void FilePrefetchBuffer::AbortIOIfNeeded(uint64_t offset) { + bufs_[curr_].async_read_in_progress_ = false; + } + } ++*/ + + void FilePrefetchBuffer::AbortAllIOs() { +- uint32_t second = curr_ ^ 1; + std::vector handles; +- for (uint32_t i = 0; i < 2; i++) { +- if (bufs_[i].async_read_in_progress_ && bufs_[i].io_handle_ != nullptr) { +- handles.emplace_back(bufs_[i].io_handle_); ++ for (auto& buf : bufs_) { ++ if (buf->async_read_in_progress_ && buf->io_handle_ != nullptr) { ++ handles.emplace_back(buf->io_handle_); + } + } ++ + if (!handles.empty()) { + StopWatch sw(clock_, stats_, ASYNC_PREFETCH_ABORT_MICROS); + Status s = fs_->AbortIO(handles); +@@ -254,16 +256,11 @@ void FilePrefetchBuffer::AbortAllIOs() { + } + + // Release io_handles. +- if (bufs_[curr_].io_handle_ != nullptr && bufs_[curr_].del_fn_ != nullptr) { +- DestroyAndClearIOHandle(curr_); +- } else { +- bufs_[curr_].async_read_in_progress_ = false; +- } +- +- if (bufs_[second].io_handle_ != nullptr && bufs_[second].del_fn_ != nullptr) { +- DestroyAndClearIOHandle(second); +- } else { +- bufs_[second].async_read_in_progress_ = false; ++ for (auto& buf : bufs_) { ++ if (buf->io_handle_ != nullptr && buf->del_fn_ != nullptr) { ++ DestroyAndClearIOHandle(buf); ++ } ++ buf->async_read_in_progress_ = false; + } + } + +@@ -271,17 +268,22 @@ void FilePrefetchBuffer::AbortAllIOs() { + // because previous sequential reads were read from the cache instead of these + // buffer. + void FilePrefetchBuffer::UpdateBuffersIfNeeded(uint64_t offset) { +- uint32_t second = curr_ ^ 1; +- if (IsBufferOutdated(offset, curr_)) { +- bufs_[curr_].buffer_.Clear(); +- } +- if (IsBufferOutdated(offset, second)) { +- bufs_[second].buffer_.Clear(); ++ while (bufs_.size()) { ++ BufferInfo* buf = bufs_.front(); ++ // Offset is greater than this buffer's end offset. ++ if (IsBufferOutdated(buf, offset)) { ++ buf->buffer_.Clear(); ++ bufs_.pop_front(); ++ free_bufs_.push_back(buf); ++ } else { ++ break; ++ } + } + + { + // In case buffers do not align, reset second buffer. This can happen in + // case readahead_size is set. ++ /* + if (!bufs_[second].async_read_in_progress_ && + !bufs_[curr_].async_read_in_progress_) { + if (DoesBufferContainData(curr_)) { +@@ -295,10 +297,12 @@ void FilePrefetchBuffer::UpdateBuffersIfNeeded(uint64_t offset) { + } + } + } ++ */ + } + + // If data starts from second buffer, make it curr_. Second buffer can be + // either partial filled, full or async read is in progress. ++ /* + if (bufs_[second].async_read_in_progress_) { + if (IsOffsetInBufferWithAsyncProgress(offset, second)) { + curr_ = curr_ ^ 1; +@@ -310,8 +314,10 @@ void FilePrefetchBuffer::UpdateBuffersIfNeeded(uint64_t offset) { + curr_ = curr_ ^ 1; + } + } ++ */ + } + ++/* + void FilePrefetchBuffer::PollAndUpdateBuffersIfNeeded(uint64_t offset) { + if (bufs_[curr_].async_read_in_progress_ && fs_ != nullptr) { + if (bufs_[curr_].io_handle_ != nullptr) { +@@ -327,6 +333,7 @@ void FilePrefetchBuffer::PollAndUpdateBuffersIfNeeded(uint64_t offset) { + // Reset and Release io_handle after the Poll API as request has been + // completed. + DestroyAndClearIOHandle(curr_); ++ bufs_[second].async_read_in_progress_ = false; + } + UpdateBuffersIfNeeded(offset); + } +@@ -390,6 +397,7 @@ Status FilePrefetchBuffer::HandleOverlappingData( + s = ReadAsync(opts, reader, read_len, rounddown_start, curr_); + if (!s.ok()) { + DestroyAndClearIOHandle(curr_); ++ bufs_[second].async_read_in_progress_ = false; + bufs_[curr_].buffer_.Clear(); + return s; + } +@@ -798,6 +806,7 @@ bool FilePrefetchBuffer::TryReadFromCacheAsyncUntracked( + } + return true; + } ++*/ + + void FilePrefetchBuffer::PrefetchAsyncCallback(const FSReadRequest& req, + void* cb_arg) { +@@ -813,18 +822,18 @@ void FilePrefetchBuffer::PrefetchAsyncCallback(const FSReadRequest& req, + + if (req.status.ok()) { + if (req.offset + req.result.size() <= +- bufs_[index].offset_ + bufs_[index].buffer_.CurrentSize()) { ++ bufs_[index]->offset_ + bufs_[index]->buffer_.CurrentSize()) { + // All requested bytes are already in the buffer or no data is read + // because of EOF. So no need to update. + return; + } +- if (req.offset < bufs_[index].offset_) { ++ if (req.offset < bufs_[index]->offset_) { + // Next block to be read has changed (Recent read was not a sequential + // read). So ignore this read. + return; + } +- size_t current_size = bufs_[index].buffer_.CurrentSize(); +- bufs_[index].buffer_.Size(current_size + req.result.size()); ++ size_t current_size = bufs_[index]->buffer_.CurrentSize(); ++ bufs_[index]->buffer_.Size(current_size + req.result.size()); + } + } + +@@ -848,44 +857,59 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, + is_eligible_for_prefetching = true; + } + +- // 1. Cancel any pending async read to make code simpler as buffers can be out +- // of sync. +- AbortAllIOs(); ++ // Perform cleaning of buffers. ++ { ++ // 1. Cancel any pending async read to make code simpler as buffers can be ++ // out of sync. ++ AbortAllIOs(); + +- // 2. Clear outdated data. +- UpdateBuffersIfNeeded(offset); +- uint32_t second = curr_ ^ 1; +- // Since PrefetchAsync can be called on non sequential reads. So offset can +- // be less than curr_ buffers' offset. In that case also it clears both +- // buffers. +- if (DoesBufferContainData(curr_) && !IsOffsetInBuffer(offset, curr_)) { +- bufs_[curr_].buffer_.Clear(); +- bufs_[second].buffer_.Clear(); ++ // 2. Clear buffers since because of aborting async read, buffers can be empty ++ // and out of sync. ++ ClearOutOfSyncBuffers(); ++ ++ // 3. Clear outdated data. It clears the data whose end offset is less than ++ // offset to be read. ++ UpdateBuffersIfNeeded(offset); ++ ++ // 4. Since PrefetchAsync can be called on non sequential reads. So offset ++ // can be less than front buffers' offset. In that case also clear all ++ // buffers. ++ if (!bufs_.empty()) { ++ BufferInfo* buf = bufs_.front(); ++ if (!IsOffsetInBuffer(buf, offset)) { ++ // clear all bufs. ++ FreeAllBuffers(); ++ } ++ } + } + + UpdateReadPattern(offset, n, //decrease_readaheadsize=false); + + bool data_found = false; + +- // 3. If curr_ has full data. +- if (DoesBufferContainData(curr_) && IsDataBlockInBuffer(offset, n, curr_)) { +- uint64_t offset_in_buffer = offset - bufs_[curr_].offset_; +- *result = Slice(bufs_[curr_].buffer_.BufferStart() + offset_in_buffer, n); +- data_found = true; +- // Update num_file_reads_ as TryReadFromCacheAsync won't be called for +- // poll and update num_file_reads_ if data is found. +- num_file_reads_++; +- +- // 3.1 If second also has some data or is not eligible for prefetching, +- // return. +- if (!is_eligible_for_prefetching || DoesBufferContainData(second)) { +- return Status::OK(); ++ // Either data is in front or it has no data. ++ ++ if (!bufs_.empty()) { ++ BufferInfo* buf = bufs_.front(); ++ // If front has full data. ++ if (DoesBufferContainData(buf) && IsDataBlockInBuffer(buf, offset, n)) { ++ uint64_t offset_in_buffer = offset - buf->offset_; ++ *result = Slice(buf->buffer_.BufferStart() + offset_in_buffer, n); ++ data_found = true; ++ // Update num_file_reads_ as TryReadFromCacheAsync won't be called for ++ // poll and update num_file_reads_ if data is found. ++ num_file_reads_++; ++ ++ // If next buffer also has some data or is not eligible for prefetching, ++ // return. ++ if (!is_eligible_for_prefetching || bufs_.size() > 1) { ++ return Status::OK(); ++ } ++ } else { ++ // Partial data in front. Clear it to return consecutive buffer. ++ FreeAllBuffers(); + } +- } else { +- // Partial data in curr_. +- bufs_[curr_].buffer_.Clear(); + } +- bufs_[second].buffer_.Clear(); + + Status s; + size_t alignment = reader->file()->GetRequiredBufferAlignment(); +@@ -900,11 +924,11 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, + size_t read_len1 = 0; + size_t read_len2 = 0; + +- // - If curr_ is empty. +- // - Call async read for full data + prefetch_size on curr_. +- // - Call async read for prefetch_size on second if eligible. +- // - If curr_ is filled. +- // - prefetch_size on second. ++ // - If front is empty. ++ // - Call async read for full data + prefetch_size on front. ++ // - Call async read for prefetch_size on second buffer if eligible. ++ // - If front is filled. ++ // - prefetch_size on second buffer. + // Calculate length and offsets for reading. + if (!DoesBufferContainData(curr_)) { + // Prefetch full data + prefetch_size in curr_. +@@ -947,6 +971,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, + s = ReadAsync(opts, reader, read_len1, rounddown_start1, curr_); + if (!s.ok()) { + DestroyAndClearIOHandle(curr_); ++ bufs_[second].async_read_in_progress_ = false; + bufs_[curr_].buffer_.Clear(); + return s; + } +@@ -958,6 +983,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, + s = ReadAsync(opts, reader, read_len2, rounddown_start2, second); + if (!s.ok()) { + DestroyAndClearIOHandle(second); ++ bufs_[second].async_read_in_progress_ = false; + bufs_[second].buffer_.Clear(); + return s; + } +diff --git a/file/file_prefetch_buffer.h b/file/file_prefetch_buffer.h +index a05d3e62c..4c588f177 100644 +--- a/file/file_prefetch_buffer.h ++++ b/file/file_prefetch_buffer.h +@@ -161,6 +161,7 @@ class FilePrefetchBuffer { + } + // Release io_handle. + // DestroyAndClearIOHandle(buf); ++ // buf->async_read_in_progress_ = false; + } + + RecordInHistogram(stats_, PREFETCHED_BYTES_DISCARDED, bytes_discarded); +@@ -309,8 +310,9 @@ class FilePrefetchBuffer { + Env::IOPriority rate_limiter_priority, uint64_t read_len, + uint64_t chunk_len, uint64_t rounddown_start); + +- Status ReadAsync(const IOOptions& opts, RandomAccessFileReader* reader, +- uint64_t read_len, uint64_t rounddown_start, uint32_t index); ++ Status ReadAsync(BufferInfo* buf, const IOOptions& opts, ++ RandomAccessFileReader* reader, uint64_t read_len, ++ uint64_t rounddown_start); + + // Copy the data from src to third buffer. + void CopyDataToBuffer(uint32_t src, uint64_t& offset, size_t& length); +@@ -390,16 +392,17 @@ class FilePrefetchBuffer { + bufs_[second].buffer_.Clear(); + return true; + } ++ */ + +- void DestroyAndClearIOHandle(uint32_t index) { +- if (bufs_[index].io_handle_ != nullptr && bufs_[index].del_fn_ != nullptr) { +- bufs_[index].del_fn_(bufs_[index].io_handle_); +- bufs_[index].io_handle_ = nullptr; +- bufs_[index].del_fn_ = nullptr; ++ void DestroyAndClearIOHandle(BufferInfo* buf) { ++ if (buf->io_handle_ != nullptr && buf->del_fn_ != nullptr) { ++ buf->del_fn_(buf->io_handle_); ++ buf->io_handle_ = nullptr; ++ buf->del_fn_ = nullptr; + } +- bufs_[index].async_read_in_progress_ = false; + } + ++ /* + Status HandleOverlappingData(const IOOptions& opts, + RandomAccessFileReader* reader, uint64_t offset, + size_t length, size_t readahead_size, +@@ -414,14 +417,68 @@ class FilePrefetchBuffer { + bufs_.push_back(buf); + } + +- void FreeBuffer(BufferInfo* buf) { free_bufs_.push_back(buf); } +- + void AllocateBufferIfEmpty() { + if (bufs_.empty()) { + AllocateBuffer(); + } + } + ++ void FreeBuffer(BufferInfo* buf) { free_bufs_.push_back(buf); } ++ ++ void FreeAllBuffers() { ++ for (auto& buf : bufs_) { ++ buf.buffer_.Clear(); ++ buf.pop_front(); ++ free_bufs_.push_back(buf); ++ } ++ } ++ ++ // Buffers can be out of sync because of aborting the Async progress in some ++ // of the buffers. ++ void ClearOutOfSyncBuffers() { ++ if (bufs_.empty()) { ++ return; ++ } ++ ++ // First clear empty buffers. ++ while (!bufs.empty()) { ++ BufferInfo* buf = bufs_.front(); ++ if (!DoesBufferContainData(buf)) { ++ buf.pop_front(); ++ FreeBuffer(buf); ++ } ++ } ++ ++ if (bufs_.empty()) { ++ return; ++ } ++ ++ auto it = bufs_.begin(); ++ BufferInfo* prev_buf = *it; ++ it++; ++ ++ // Find first out of sync buffer or an empty buffer. ++ while (it != bufs_.end() && DoesBufferContainData(*it)) { ++ if (prev_buf->offset_ + prev_buf->buffer_.CurrentSize() != ++ (*it)->offset_) { ++ break; ++ } ++ it++; ++ } ++ ++ if (it == bufs_.end()) { ++ return; ++ } ++ ++ // Free all the out-of-sync and empty buffers. ++ auto it_from_end = bufs_.end(); ++ do { ++ bufs_.back()->buffer_.Clear(); ++ bufs_.pop_back(); ++ it_from_end--; ++ } while (it != it_from_end); ++ } ++ + std::deque bufs_; + std::deque free_bufs_; + std::deque overlap_bufs_; diff --git a/env/fs_posix.cc b/env/fs_posix.cc index dd2f749350da..0f2088fdddc9 100644 --- a/env/fs_posix.cc +++ b/env/fs_posix.cc @@ -997,9 +997,7 @@ class PosixFileSystem : public FileSystem { } #endif // ROCKSDB_IOURING_PRESENT - // EXPERIMENTAL - // - // TODO akankshamahajan: + // TODO: // 1. Update Poll API to take into account min_completions // and returns if number of handles in io_handles (any order) completed is // equal to atleast min_completions. diff --git a/file/file_prefetch_buffer.cc b/file/file_prefetch_buffer.cc index ef28bccff057..b696e3685a39 100644 --- a/file/file_prefetch_buffer.cc +++ b/file/file_prefetch_buffer.cc @@ -105,11 +105,9 @@ Status FilePrefetchBuffer::Read(BufferInfo* buf, const IOOptions& opts, return s; } -/* -Status FilePrefetchBuffer::ReadAsync(const IOOptions& opts, +Status FilePrefetchBuffer::ReadAsync(BufferInfo* buf, const IOOptions& opts, RandomAccessFileReader* reader, - uint64_t read_len, uint64_t start_offset, - uint32_t index) { + uint64_t read_len, uint64_t start_offset) { TEST_SYNC_POINT("FilePrefetchBuffer::ReadAsync"); // callback for async read request. auto fp = std::bind(&FilePrefetchBuffer::PrefetchAsyncCallback, this, @@ -119,21 +117,18 @@ Status FilePrefetchBuffer::ReadAsync(const IOOptions& opts, req.len = read_len; req.offset = start_offset; req.result = result; - req.scratch = bufs_[index].buffer_.BufferStart(); - bufs_[index].async_req_len_ = req.len; + req.scratch = buf->buffer_.BufferStart(); + buf->async_req_len_ = req.len; - Status s = - reader->ReadAsync(req, opts, fp, &(bufs_[index].pos_), - &(bufs_[index].io_handle_), &(bufs_[index].del_fn_), - aligned_buf=nullptr); + Status s = reader->ReadAsync(req, opts, fp, buf->pos_, &(buf->io_handle_), + &(buf->del_fn_), /*aligned_buf =*/nullptr); req.status.PermitUncheckedError(); if (s.ok()) { RecordTick(stats_, PREFETCH_BYTES, read_len); - bufs_[index].async_read_in_progress_ = true; + buf->async_read_in_progress_ = true; } return s; } -*/ Status FilePrefetchBuffer::Prefetch(const IOOptions& opts, RandomAccessFileReader* reader, @@ -174,25 +169,26 @@ Status FilePrefetchBuffer::Prefetch(const IOOptions& opts, } // Copy data from src to third buffer. -/* -void FilePrefetchBuffer::CopyDataToBuffer(uint32_t src, uint64_t& offset, +void FilePrefetchBuffer::CopyDataToBuffer(BufferInfo* src, uint64_t& offset, size_t& length) { if (length == 0) { return; } - uint64_t copy_offset = (offset - bufs_[src].offset_); + + uint64_t copy_offset = (offset - src->offset_); size_t copy_len = 0; - if (IsDataBlockInBuffer(offset, length, src)) { + if (IsDataBlockInBuffer(src, offset, length)) { // All the bytes are in src. copy_len = length; } else { - copy_len = bufs_[src].buffer_.CurrentSize() - copy_offset; + copy_len = src->buffer_.CurrentSize() - copy_offset; } - memcpy(bufs_[2].buffer_.BufferStart() + bufs_[2].buffer_.CurrentSize(), - bufs_[src].buffer_.BufferStart() + copy_offset, copy_len); + BufferInfo* dst = overlap_bufs_.front(); + memcpy(dst->buffer_.BufferStart() + dst->buffer_.CurrentSize(), + src->buffer_.BufferStart() + copy_offset, copy_len); - bufs_[2].buffer_.Size(bufs_[2].buffer_.CurrentSize() + copy_len); + dst->buffer_.Size(dst->buffer_.CurrentSize() + copy_len); // Update offset and length. offset += copy_len; @@ -201,51 +197,45 @@ void FilePrefetchBuffer::CopyDataToBuffer(uint32_t src, uint64_t& offset, // length > 0 indicates it has consumed all data from the src buffer and it // still needs to read more other buffer. if (length > 0) { - bufs_[src].ClearBuffer(); + src->ClearBuffer(); + FreeFrontBuffer(); } } -// Clear the buffers if it contains outdated data. Outdated data can be -// because previous sequential reads were read from the cache instead of these -// buffer. In that case outdated IOs should be aborted. -void FilePrefetchBuffer::AbortIOIfNeeded(uint64_t offset) { - uint32_t second = curr_ ^ 1; +// Clear the buffers if it contains outdated data. Outdated data can be because +// previous sequential reads were read from the cache instead of these buffer. +// In that case outdated IOs should be aborted. +void FilePrefetchBuffer::AbortOutdatedIO(uint64_t offset) { std::vector handles; - autovector buf_pos; - if (IsBufferOutdatedWithAsyncProgress(offset, curr_)) { - handles.emplace_back(bufs_[curr_].io_handle_); - buf_pos.emplace_back(curr_); - } - if (IsBufferOutdatedWithAsyncProgress(offset, second)) { - handles.emplace_back(bufs_[second].io_handle_); - buf_pos.emplace_back(second); + std::vector tmp_buf; + for (auto& buf : bufs_) { + if (IsBufferOutdatedWithAsyncProgress(buf, offset)) { + handles.emplace_back(buf->io_handle_); + tmp_buf.emplace_back(buf); + } } + if (!handles.empty()) { StopWatch sw(clock_, stats_, ASYNC_PREFETCH_ABORT_MICROS); Status s = fs_->AbortIO(handles); assert(s.ok()); } - for (auto& pos : buf_pos) { - // Release io_handle. - DestroyAndClearIOHandle(pos); - } - - if (bufs_[second].io_handle_ == nullptr) { - bufs_[second].async_read_in_progress_ = false; - } - - if (bufs_[curr_].io_handle_ == nullptr) { - bufs_[curr_].async_read_in_progress_ = false; + for (auto& buf : tmp_buf) { + if (buf->async_read_in_progress_) { + DestroyAndClearIOHandle(buf); + buf->async_read_in_progress_ = false; + } + buf->ClearBuffer(); } + FreeEmptyBuffers(); } void FilePrefetchBuffer::AbortAllIOs() { - uint32_t second = curr_ ^ 1; std::vector handles; - for (uint32_t i = 0; i < 2; i++) { - if (bufs_[i].async_read_in_progress_ && bufs_[i].io_handle_ != nullptr) { - handles.emplace_back(bufs_[i].io_handle_); + for (auto& buf : bufs_) { + if (buf->async_read_in_progress_ && buf->io_handle_ != nullptr) { + handles.emplace_back(buf->io_handle_); } } if (!handles.empty()) { @@ -254,33 +244,55 @@ void FilePrefetchBuffer::AbortAllIOs() { assert(s.ok()); } - // Release io_handles. - if (bufs_[curr_].io_handle_ != nullptr && bufs_[curr_].del_fn_ != nullptr) { - DestroyAndClearIOHandle(curr_); - } else { - bufs_[curr_].async_read_in_progress_ = false; - } - - if (bufs_[second].io_handle_ != nullptr && bufs_[second].del_fn_ != nullptr) { - DestroyAndClearIOHandle(second); - } else { - bufs_[second].async_read_in_progress_ = false; + for (auto& buf : bufs_) { + if (buf->io_handle_ != nullptr && buf->del_fn_ != nullptr) { + DestroyAndClearIOHandle(buf); + } + buf->async_read_in_progress_ = false; } + FreeEmptyBuffers(); } // Clear the buffers if it contains outdated data. Outdated data can be // because previous sequential reads were read from the cache instead of these // buffer. void FilePrefetchBuffer::UpdateBuffersIfNeeded(uint64_t offset, size_t length) { - uint32_t second = curr_ ^ 1; + while (!bufs_.empty()) { + BufferInfo* buf = bufs_.front(); + // Offset is greater than this buffer's end offset. + if (IsBufferOutdated(buf, offset)) { + FreeFrontBuffer(); + } else { + break; + } + } - if (IsBufferOutdated(offset, curr_)) { - bufs_[curr_].ClearBuffer(); + if (bufs_.empty()) { + return; } - if (IsBufferOutdated(offset, second)) { - bufs_[second].ClearBuffer(); + + BufferInfo* buf = bufs_.front(); + + // In case buffers do not align, reset next buffer if requested data needs + // to be read in that buffer. + if (bufs_.size() > 1) { + BufferInfo* next_buf = bufs_[1]; + if (!next_buf->async_read_in_progress_ && !buf->async_read_in_progress_ && + DoesBufferContainData(buf)) { + if (buf->offset_ + buf->buffer_.CurrentSize() != next_buf->offset_) { + if (DoesBufferContainData(next_buf) && IsOffsetInBuffer(buf, offset) && + (offset + length > buf->offset_ + buf->buffer_.CurrentSize())) { + // Clear all buffers after first. + for (size_t i = 1; i < bufs_.size(); ++i) { + bufs_[i]->ClearBuffer(); + } + FreeEmptyBuffers(); + } + } + } } + /* { // In case buffers do not align, reset second buffer if requested data needs // to be read in second buffer. @@ -305,6 +317,7 @@ void FilePrefetchBuffer::UpdateBuffersIfNeeded(uint64_t offset, size_t length) { } } + // If data starts from second buffer, make it curr_. Second buffer can be // either partial filled, full or async read is in progress. if (bufs_[second].async_read_in_progress_) { @@ -318,28 +331,31 @@ void FilePrefetchBuffer::UpdateBuffersIfNeeded(uint64_t offset, size_t length) { curr_ = curr_ ^ 1; } } + */ } -void FilePrefetchBuffer::PollAndUpdateBuffersIfNeeded(uint64_t offset, - size_t length) { - if (bufs_[curr_].async_read_in_progress_ && fs_ != nullptr) { - if (bufs_[curr_].io_handle_ != nullptr) { +void FilePrefetchBuffer::PollAndUpdateBuffersIfNeeded(uint64_t /*offset*/, + size_t /*length*/) { + BufferInfo* buf = bufs_.front(); + + if (buf->async_read_in_progress_ && fs_ != nullptr) { + if (buf->io_handle_ != nullptr) { // Wait for prefetch data to complete. // No mutex is needed as async_read_in_progress behaves as mutex and is // updated by main thread only. std::vector handles; - handles.emplace_back(bufs_[curr_].io_handle_); + handles.emplace_back(buf->io_handle_); StopWatch sw(clock_, stats_, POLL_WAIT_MICROS); fs_->Poll(handles, 1).PermitUncheckedError(); } // Reset and Release io_handle after the Poll API as request has been // completed. - DestroyAndClearIOHandle(curr_); + DestroyAndClearIOHandle(buf); } - UpdateBuffersIfNeeded(offset, length); + // TODO Akanksha - Do we need UpdateBuffers? + // UpdateBuffersIfNeeded(offset, length); } -*/ // ReadAheadSizeTuning API calls readaheadsize_cb_ // (BlockBasedTableIterator::BlockCacheLookupForReadAheadSize) to lookup in the @@ -365,8 +381,7 @@ void FilePrefetchBuffer::ReadAheadSizeTuning( uint64_t initial_end_offset = updated_end_offset; // Callback to tune the start and end offsets. - if (readaheadsize_cb_ != nullptr && readahead_size > 0 && - !explicit_prefetch_submitted_) { + if (readaheadsize_cb_ != nullptr && readahead_size > 0) { readaheadsize_cb_(read_curr_block, updated_start_offset, updated_end_offset); } @@ -407,79 +422,93 @@ void FilePrefetchBuffer::ReadAheadSizeTuning( assert(roundup_len >= chunk_len); // Update the buffer offset. - bufs_.front()->offset_ = start_offset; + buf->offset_ = start_offset; // Update the initial end offset of this buffer which will be the starting // offset of next prefetch. - bufs_.front()->initial_end_offset_ = initial_end_offset; + buf->initial_end_offset_ = initial_end_offset; read_len = static_cast(roundup_len - chunk_len); } -/* Status FilePrefetchBuffer::HandleOverlappingData( const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset, size_t length, size_t readahead_size, bool& copy_to_third_buffer, uint64_t& tmp_offset, size_t& tmp_length) { + // No Overlapping of data between 2 buffers. + if (bufs_.empty() || bufs_.size() == 1) { + return Status::OK(); + } + Status s; size_t alignment = reader->file()->GetRequiredBufferAlignment(); - uint32_t second; + + BufferInfo* buf = bufs_.front(); // Check if the first buffer has the required offset and the async read is // still in progress. This should only happen if a prefetch was initiated // by Seek, but the next access is at another offset. - if (bufs_[curr_].async_read_in_progress_ && - IsOffsetInBufferWithAsyncProgress(offset, curr_)) { + if (buf->async_read_in_progress_ && + IsOffsetInBufferWithAsyncProgress(buf, offset)) { PollAndUpdateBuffersIfNeeded(offset, length); } - second = curr_ ^ 1; - - // If data is overlapping over two buffers, copy the data from curr_ and - // call ReadAsync on curr_. - if (!bufs_[curr_].async_read_in_progress_ && DoesBufferContainData(curr_) && - IsOffsetInBuffer(offset, curr_) && - (Data extends over curr_ buffer and second buffer either has data or in - process of population= - (offset + length > bufs_[second].offset_) && - (bufs_[second].async_read_in_progress_ || - DoesBufferContainData(second)))) { + + BufferInfo* next_buf = bufs_[1]; + + // If data is overlapping over two buffers, copy the data from front and + // call ReadAsync on freed buffer. + if (!buf->async_read_in_progress_ && DoesBufferContainData(buf) && + IsOffsetInBuffer(buf, offset) && + (/*Data extends over curr_ buffer and second buffer either has data or in + process of population=*/ + (offset + length > next_buf->offset_) && + (next_buf->async_read_in_progress_ || + DoesBufferContainData(next_buf)))) { + // Clear and Free second buffer. + // Allocate new buffer to third buffer; - bufs_[2].ClearBuffer(); - bufs_[2].buffer_.Alignment(alignment); - bufs_[2].buffer_.AllocateNewBuffer(length); - bufs_[2].offset_ = offset; + BufferInfo* overlap_buf = overlap_bufs_.front(); + overlap_buf->ClearBuffer(); + overlap_buf->buffer_.Alignment(alignment); + overlap_buf->buffer_.AllocateNewBuffer(length); + overlap_buf->offset_ = offset; copy_to_third_buffer = true; - CopyDataToBuffer(curr_, tmp_offset, tmp_length); + CopyDataToBuffer(buf, tmp_offset, tmp_length); // Call async prefetching on curr_ since data has been consumed in curr_ // only if requested data lies within second buffer. - size_t second_size = bufs_[second].async_read_in_progress_ - ? bufs_[second].async_req_len_ - : bufs_[second].buffer_.CurrentSize(); - uint64_t start_offset = bufs_[second].initial_end_offset_; + size_t second_size = next_buf->async_read_in_progress_ + ? next_buf->async_req_len_ + : next_buf->buffer_.CurrentSize(); + uint64_t start_offset = next_buf->initial_end_offset_; // Second buffer might be out of bound if first buffer already prefetched // that data. - if (tmp_offset + tmp_length <= bufs_[second].offset_ + second_size && + if (tmp_offset + tmp_length <= next_buf->offset_ + second_size && !IsOffsetOutOfBound(start_offset)) { + AllocateBuffer(); + BufferInfo* new_buf = bufs_.back(); + size_t read_len = 0; uint64_t end_offset = start_offset, chunk_len = 0; - ReadAheadSizeTuning(read_curr_block=false, refit_tail=false, - bufs_[second].offset_ + second_size, curr_, alignment, - length=0, readahead_size, start_offset, + ReadAheadSizeTuning(new_buf, /*read_curr_block=*/false, + /*refit_tail=*/false, next_buf->offset_ + second_size, + alignment, + /*length=*/0, readahead_size, start_offset, end_offset, read_len, chunk_len); if (read_len > 0) { - s = ReadAsync(opts, reader, read_len, start_offset, curr_); + s = ReadAsync(new_buf, opts, reader, read_len, start_offset); if (!s.ok()) { - DestroyAndClearIOHandle(curr_); - bufs_[curr_].ClearBuffer(); + DestroyAndClearIOHandle(new_buf); + new_buf->ClearBuffer(); + FreeLastBuffer(); return s; } } } - curr_ = curr_ ^ 1; } return s; } + // If async_io is enabled in case of sequential reads, PrefetchAsyncInternal is // called. When buffers are switched, we clear the curr_ buffer as we assume the // data has been consumed because of sequential reads. @@ -518,11 +547,12 @@ Status FilePrefetchBuffer::PrefetchAsyncInternal(const IOOptions& opts, size_t tmp_length = length; size_t original_length = length; - // 1. Abort IO and swap buffers if needed to point curr_ to first buffer with - // data. + // Abort IO. if (!explicit_prefetch_submitted_) { - AbortIOIfNeeded(offset); + AbortOutdatedIO(offset); } + + // Update the buffers to get the buffer which contains the requested data. UpdateBuffersIfNeeded(offset, length); // 2. Handle overlapping data over two buffers. If data is overlapping then @@ -538,21 +568,23 @@ Status FilePrefetchBuffer::PrefetchAsyncInternal(const IOOptions& opts, return s; } + AllocateBufferIfEmpty(); + // 3. Call Poll only if data is needed for the second buffer. // - Return if whole data is in curr_ and second buffer is in progress or // already full. // - If second buffer is empty, it will go for ReadAsync for second buffer. - if (!bufs_[curr_].async_read_in_progress_ && DoesBufferContainData(curr_) && - IsDataBlockInBuffer(offset, length, curr_)) { - // Whole data is in curr_. - UpdateBuffersIfNeeded(offset, length); - if (!IsSecondBuffEligibleForPrefetching()) { - UpdateStats(found_in_buffer=true, original_length); + BufferInfo* buf = bufs_.front(); + if (!buf->async_read_in_progress_ && DoesBufferContainData(buf) && + IsDataBlockInBuffer(buf, offset, length)) { + // Whole data is in buffer. + if (!IsEligibleForFurtherPrefetching()) { + UpdateStats(/*found_in_buffer=*/true, original_length); return s; } } else { - // After poll request, curr_ might be empty because of IOError in - // callback while reading or may contain required data. + // NOTE - After this poll request, first buffer might be empty because of + // IOError in callback while reading or it may contains the required data. PollAndUpdateBuffersIfNeeded(offset, length); } @@ -561,11 +593,14 @@ Status FilePrefetchBuffer::PrefetchAsyncInternal(const IOOptions& opts, length = tmp_length; } + // buf = bufs_.front(); + // 4. After polling and swapping buffers, if all the requested bytes are in // curr_, it will only go for async prefetching. // copy_to_third_buffer is a special case so it will be handled separately. - if (!copy_to_third_buffer && DoesBufferContainData(curr_) && - IsDataBlockInBuffer(offset, length, curr_)) { + + if (!copy_to_third_buffer && DoesBufferContainData(buf) && + IsDataBlockInBuffer(buf, offset, length)) { offset += length; length = 0; @@ -575,36 +610,51 @@ Status FilePrefetchBuffer::PrefetchAsyncInternal(const IOOptions& opts, if (explicit_prefetch_submitted_) { return s; } - if (!IsSecondBuffEligibleForPrefetching()) { - UpdateStats(found_in_buffer=true, original_length); + if (!IsEligibleForFurtherPrefetching()) { + UpdateStats(/*found_in_buffer=*/true, original_length); return s; } } - uint32_t second = curr_ ^ 1; - assert(!bufs_[curr_].async_read_in_progress_); + buf = bufs_.front(); + assert(!buf->async_read_in_progress_); // In case because of some IOError curr_ got empty, abort IO for second as // well. Otherwise data might not align if more data needs to be read in curr_ // which might overlap with second buffer. - if (!DoesBufferContainData(curr_) && bufs_[second].async_read_in_progress_) { - if (bufs_[second].io_handle_ != nullptr) { - std::vector handles; - handles.emplace_back(bufs_[second].io_handle_); - { - StopWatch sw(clock_, stats_, ASYNC_PREFETCH_ABORT_MICROS); - Status status = fs_->AbortIO(handles); - assert(status.ok()); + if (!DoesBufferContainData(buf)) { + std::vector handles; + if (bufs_.size() > 1) { + for (auto& _buf : bufs_) { + if (_buf->async_read_in_progress_) { + handles.emplace_back(_buf->io_handle_); + } + } + } + if (!handles.empty()) { + StopWatch sw(clock_, stats_, ASYNC_PREFETCH_ABORT_MICROS); + s = fs_->AbortIO(handles); + assert(s.ok()); + } + + for (auto& _buf : bufs_) { + if (_buf->async_read_in_progress_) { + DestroyAndClearIOHandle(_buf); + _buf->async_read_in_progress_ = false; + _buf->ClearBuffer(); } } - DestroyAndClearIOHandle(second); - bufs_[second].ClearBuffer(); + // TODO Akanksha - It'll free the first buffer as well. + FreeEmptyBuffers(); } - // 5. Data is overlapping i.e. some of the data has been copied to third + AllocateBufferIfEmpty(); + buf = bufs_.front(); + + // Data is overlapping i.e. some of the data has been copied to third // buffer and remaining will be updated below. - if (copy_to_third_buffer && DoesBufferContainData(curr_)) { - CopyDataToBuffer(curr_, offset, length); + if (copy_to_third_buffer && DoesBufferContainData(buf)) { + CopyDataToBuffer(buf, offset, length); // Length == 0: All the requested data has been copied to third buffer and // it has already gone for async prefetching. It can return without doing @@ -613,14 +663,15 @@ Status FilePrefetchBuffer::PrefetchAsyncInternal(const IOOptions& opts, // and sync prefetching and copy the remaining data to third buffer in the // end. if (length == 0) { - UpdateStats(found_in_buffer=true, original_length); + UpdateStats(/*found_in_buffer=*/true, original_length); return s; } } // 6. Go for ReadAsync and Read (if needed). - assert(!bufs_[second].async_read_in_progress_ && - !DoesBufferContainData(second)); + + // assert(!bufs_[second].async_read_in_progress_ && + // !DoesBufferContainData(second)); // offset and size alignment for curr_ buffer with synchronous prefetching uint64_t start_offset1 = offset, end_offset1 = 0, chunk_len1 = 0; @@ -628,38 +679,45 @@ Status FilePrefetchBuffer::PrefetchAsyncInternal(const IOOptions& opts, // For length == 0, skip the synchronous prefetching. read_len1 will be 0. if (length > 0) { - ReadAheadSizeTuning(read_curr_block=true, refit_tail=false, - start_offset1, curr_, alignment, length, readahead_size, + ReadAheadSizeTuning(buf, /*read_curr_block=*/true, /*refit_tail=*/false, + start_offset1, alignment, length, readahead_size, start_offset1, end_offset1, read_len1, chunk_len1); - UpdateStats(found_in_buffer=false, - length_found=original_length - length); + UpdateStats(/*found_in_buffer=*/false, + /*length_found=*/original_length - length); } else { - end_offset1 = bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize(); - UpdateStats(found_in_buffer=true, original_length); + end_offset1 = buf->offset_ + buf->buffer_.CurrentSize(); + UpdateStats(/*found_in_buffer=*/true, original_length); } // Prefetch in second buffer only if readahead_size > 0. if (readahead_size > 0) { - // offset and size alignment for second buffer for asynchronous - // prefetching. - uint64_t start_offset2 = bufs_[curr_].initial_end_offset_; + while (bufs_.size() < num_buffers_) { + BufferInfo* prev_buf = bufs_.back(); + end_offset1 = prev_buf->offset_ + prev_buf->buffer_.CurrentSize(); + uint64_t start_offset2 = prev_buf->initial_end_offset_; + + // Buffer might be out of bound if previous buffer already prefetched + // that data. + if (IsOffsetOutOfBound(start_offset2)) { + break; + } + + AllocateBuffer(); + BufferInfo* new_buf = bufs_.back(); - // Second buffer might be out of bound if first buffer already prefetched - // that data. - if (!IsOffsetOutOfBound(start_offset2)) { - // Find updated readahead size after tuning - size_t read_len2 = 0; uint64_t end_offset2 = start_offset2, chunk_len2 = 0; - ReadAheadSizeTuning(read_curr_block=false, refit_tail=false, - prev_buf_end_offset=end_offset1, second, - alignment, - length=0, readahead_size, start_offset2, - end_offset2, read_len2, chunk_len2); + size_t read_len2 = 0; + ReadAheadSizeTuning( + new_buf, /*read_curr_block=*/false, /*refit_tail=*/false, + /*prev_buf_end_offset=*/end_offset1, alignment, /*length=*/0, + readahead_size, start_offset2, end_offset2, read_len2, chunk_len2); + if (read_len2 > 0) { - s = ReadAsync(opts, reader, read_len2, start_offset2, second); + s = ReadAsync(new_buf, opts, reader, read_len2, start_offset2); if (!s.ok()) { - DestroyAndClearIOHandle(second); - bufs_[second].ClearBuffer(); + DestroyAndClearIOHandle(new_buf); + new_buf->ClearBuffer(); + FreeLastBuffer(); return s; } } @@ -667,31 +725,20 @@ Status FilePrefetchBuffer::PrefetchAsyncInternal(const IOOptions& opts, } if (read_len1 > 0) { - s = Read(opts, reader, read_len1, chunk_len1, start_offset1, curr_); + s = Read(buf, opts, reader, read_len1, chunk_len1, start_offset1); if (!s.ok()) { - if (bufs_[second].io_handle_ != nullptr) { - std::vector handles; - handles.emplace_back(bufs_[second].io_handle_); - { - StopWatch sw(clock_, stats_, ASYNC_PREFETCH_ABORT_MICROS); - Status status = fs_->AbortIO(handles); - assert(status.ok()); - } - } - DestroyAndClearIOHandle(second); - bufs_[second].ClearBuffer(); - bufs_[curr_].ClearBuffer(); + AbortAllIOs(); + FreeAllBuffers(); return s; } } // Copy remaining requested bytes to third_buffer. if (copy_to_third_buffer && length > 0) { - CopyDataToBuffer(curr_, offset, length); + CopyDataToBuffer(buf, offset, length); } return s; } -*/ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts, RandomAccessFileReader* reader, @@ -782,7 +829,6 @@ bool FilePrefetchBuffer::TryReadFromCacheUntracked( return true; } -/* bool FilePrefetchBuffer::TryReadFromCacheAsync(const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset, size_t n, @@ -817,14 +863,17 @@ bool FilePrefetchBuffer::TryReadFromCacheAsyncUntracked( // Random offset called. So abort the IOs. if (prev_offset_ != offset) { AbortAllIOs(); - bufs_[curr_].ClearBuffer(); - bufs_[curr_ ^ 1].ClearBuffer(); + ClearAllBuffers(); explicit_prefetch_submitted_ = false; return false; } } - if (!explicit_prefetch_submitted_ && offset < bufs_[curr_].offset_) { + AllocateBufferIfEmpty(); + BufferInfo* buf = bufs_.front(); + + // TODO Akanksha - offset < buf->offset_ is not handled properly. + if (!explicit_prefetch_submitted_ && offset < buf->offset_) { return false; } @@ -838,9 +887,8 @@ bool FilePrefetchBuffer::TryReadFromCacheAsyncUntracked( &readahead_size_); if (explicit_prefetch_submitted_ || - (bufs_[curr_].async_read_in_progress_ || - offset + n > - bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize())) { + (buf->async_read_in_progress_ || + offset + n > buf->offset_ + buf->buffer_.CurrentSize())) { // In case readahead_size is trimmed (=0), we still want to poll the data // submitted with explicit_prefetch_submitted_=true. if (readahead_size_ > 0 || explicit_prefetch_submitted_) { @@ -877,17 +925,17 @@ bool FilePrefetchBuffer::TryReadFromCacheAsyncUntracked( return false; } } else { - UpdateStats(found_in_buffer=true, n); + UpdateStats(/*found_in_buffer=*/true, n); } - UpdateReadPattern(offset, n, false decrease_readaheadsize); + UpdateReadPattern(offset, n, /*decrease_readaheadsize=*/false); - uint32_t index = curr_; + buf = bufs_.front(); if (copy_to_third_buffer) { - index = 2; + buf = overlap_bufs_.front(); } - uint64_t offset_in_buffer = offset - bufs_[index].offset_; - *result = Slice(bufs_[index].buffer_.BufferStart() + offset_in_buffer, n); + uint64_t offset_in_buffer = offset - buf->offset_; + *result = Slice(buf->buffer_.BufferStart() + offset_in_buffer, n); if (prefetched) { readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2); } @@ -896,7 +944,8 @@ bool FilePrefetchBuffer::TryReadFromCacheAsyncUntracked( void FilePrefetchBuffer::PrefetchAsyncCallback(const FSReadRequest& req, void* cb_arg) { - uint32_t index = *(static_cast(cb_arg)); + BufferInfo* buf = static_cast(cb_arg); + #ifndef NDEBUG if (req.result.size() < req.len) { // Fake an IO error to force db_stress fault injection to ignore @@ -908,18 +957,18 @@ void FilePrefetchBuffer::PrefetchAsyncCallback(const FSReadRequest& req, if (req.status.ok()) { if (req.offset + req.result.size() <= - bufs_[index].offset_ + bufs_[index].buffer_.CurrentSize()) { + buf->offset_ + buf->buffer_.CurrentSize()) { // All requested bytes are already in the buffer or no data is read // because of EOF. So no need to update. return; } - if (req.offset < bufs_[index].offset_) { + if (req.offset < buf->offset_) { // Next block to be read has changed (Recent read was not a sequential // read). So ignore this read. return; } - size_t current_size = bufs_[index].buffer_.CurrentSize(); - bufs_[index].buffer_.Size(current_size + req.result.size()); + size_t current_size = buf->buffer_.CurrentSize(); + buf->buffer_.Size(current_size + req.result.size()); } } @@ -945,51 +994,55 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, is_eligible_for_prefetching = true; } - // 1. Cancel any pending async read to make code simpler as buffers can be out + // Cancel any pending async read to make code simpler as buffers can be out // of sync. AbortAllIOs(); - // 2. Clear outdated data. + // Clear outdated data. It clears the data whose end offset is less than + // offset to be read. UpdateBuffersIfNeeded(offset, n); - uint32_t second = curr_ ^ 1; // - Since PrefetchAsync can be called on non sequential reads. So offset can - // be less than curr_ buffers' offset. In that case it clears both + // be less than first buffers' offset. In that case it clears all // buffers. // - In case of tuning of readahead_size, on Reseek, we have to clear both // buffers otherwise, we may end up with inconsistent BlockHandles in queue // and data in buffer. - if (readaheadsize_cb_ != nullptr || - (DoesBufferContainData(curr_) && !IsOffsetInBuffer(offset, curr_))) { - bufs_[curr_].ClearBuffer(); - bufs_[second].ClearBuffer(); + if (!bufs_.empty()) { + BufferInfo* buf = bufs_.front(); + if (readaheadsize_cb_ != nullptr || !IsOffsetInBuffer(buf, offset)) { + // clear all bufs. + FreeAllBuffers(); + } } - UpdateReadPattern(offset, n, decrease_readaheadsize=false); + UpdateReadPattern(offset, n, /*decrease_readaheadsize=*/false); bool data_found = false; - // 3. If curr_ has full data. - if (DoesBufferContainData(curr_) && IsDataBlockInBuffer(offset, n, curr_)) { - uint64_t offset_in_buffer = offset - bufs_[curr_].offset_; - *result = Slice(bufs_[curr_].buffer_.BufferStart() + offset_in_buffer, n); - data_found = true; - UpdateStats(found_in_buffer=true, n); - - // Update num_file_reads_ as TryReadFromCacheAsync won't be called for - // poll and update num_file_reads_ if data is found. - num_file_reads_++; - - // 3.1 If second also has some data or is not eligible for prefetching, - // return. - if (!is_eligible_for_prefetching || DoesBufferContainData(second)) { - return Status::OK(); + // If curr_ has full data. + if (!bufs_.empty()) { + BufferInfo* buf = bufs_.front(); + if (DoesBufferContainData(buf) && IsDataBlockInBuffer(buf, offset, n)) { + uint64_t offset_in_buffer = offset - buf->offset_; + *result = Slice(buf->buffer_.BufferStart() + offset_in_buffer, n); + data_found = true; + UpdateStats(/*found_in_buffer=*/true, n); + + // Update num_file_reads_ as TryReadFromCacheAsync won't be called for + // poll and update num_file_reads_ if data is found. + num_file_reads_++; + + // If next buffer also has some data or is not eligible for prefetching, + // return. + if (!is_eligible_for_prefetching || bufs_.size() > 1) { + return Status::OK(); + } + } else { + // Partial data in front. Clear it to return consecutive buffer. + FreeAllBuffers(); } - } else { - // Partial data in curr_. - bufs_[curr_].ClearBuffer(); } - bufs_[second].ClearBuffer(); std::string msg; @@ -999,7 +1052,10 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, size_t offset_to_read = static_cast(offset); uint64_t start_offset1 = offset, end_offset1 = 0, start_offset2 = 0, chunk_len1 = 0; - size_t read_len1 = 0, read_len2 = 0; + size_t read_len1 = 0; + + AllocateBufferIfEmpty(); + BufferInfo* buf = bufs_.front(); // - If curr_ is empty. // - Call async read for full data + readahead_size on curr_. @@ -1007,65 +1063,74 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, // - If curr_ is filled. // - readahead_size on second. // Calculate length and offsets for reading. - if (!DoesBufferContainData(curr_)) { + if (!DoesBufferContainData(buf)) { uint64_t roundup_len1; // Prefetch full data + readahead_size in curr_. if (is_eligible_for_prefetching || reader->use_direct_io()) { - ReadAheadSizeTuning(read_curr_block=true, refit_tail=false, - prev_buf_end_offset=start_offset1, curr_, - alignment, n, readahead_size, start_offset1, - end_offset1, read_len1, chunk_len1); + ReadAheadSizeTuning(buf, /*read_curr_block=*/true, /*refit_tail=*/false, + /*prev_buf_end_offset=*/start_offset1, alignment, n, + readahead_size, start_offset1, end_offset1, read_len1, + chunk_len1); } else { // No alignment or extra prefetching. start_offset1 = offset_to_read; end_offset1 = offset_to_read + n; roundup_len1 = end_offset1 - start_offset1; - CalculateOffsetAndLen(alignment, start_offset1, roundup_len1, curr_, - false, chunk_len1); + CalculateOffsetAndLen(buf, alignment, start_offset1, roundup_len1, false, + chunk_len1); assert(chunk_len1 == 0); assert(roundup_len1 >= chunk_len1); read_len1 = static_cast(roundup_len1); - bufs_[curr_].offset_ = start_offset1; + buf->offset_ = start_offset1; } - } - if (is_eligible_for_prefetching) { - start_offset2 = bufs_[curr_].initial_end_offset_; - // Second buffer might be out of bound if first buffer already prefetched - // that data. - if (!IsOffsetOutOfBound(start_offset2)) { - uint64_t end_offset2 = start_offset2, chunk_len2 = 0; - ReadAheadSizeTuning(read_curr_block=false, refit_tail=false, - prev_buf_end_offset=end_offset1, second, - alignment, - length=0, readahead_size, start_offset2, - end_offset2, read_len2, chunk_len2); + if (read_len1 > 0) { + s = ReadAsync(buf, opts, reader, read_len1, start_offset1); + if (!s.ok()) { + DestroyAndClearIOHandle(buf); + buf->ClearBuffer(); + return s; + } + explicit_prefetch_submitted_ = true; + prev_len_ = 0; } } - if (read_len1) { - s = ReadAsync(opts, reader, read_len1, start_offset1, curr_); - if (!s.ok()) { - DestroyAndClearIOHandle(curr_); - bufs_[curr_].ClearBuffer(); - return s; - } - explicit_prefetch_submitted_ = true; - prev_len_ = 0; - } + if (is_eligible_for_prefetching) { + while (bufs_.size() < num_buffers_) { + BufferInfo* prev_buf = bufs_.back(); + start_offset2 = prev_buf->initial_end_offset_; + + // Buffer might be out of bound if previous buffer already prefetched + // that data. + if (IsOffsetOutOfBound(start_offset2)) { + break; + } - if (read_len2) { - TEST_SYNC_POINT("FilePrefetchBuffer::PrefetchAsync:ExtraPrefetching"); - s = ReadAsync(opts, reader, read_len2, start_offset2, second); - if (!s.ok()) { - DestroyAndClearIOHandle(second); - bufs_[second].ClearBuffer(); - return s; + AllocateBuffer(); + BufferInfo* new_buf = bufs_.back(); + + uint64_t end_offset2 = start_offset2, chunk_len2 = 0; + size_t read_len2 = 0; + ReadAheadSizeTuning( + new_buf, /*read_curr_block=*/false, /*refit_tail=*/false, + /*prev_buf_end_offset=*/end_offset1, alignment, /*length=*/0, + readahead_size, start_offset2, end_offset2, read_len2, chunk_len2); + + if (read_len2 > 0) { + TEST_SYNC_POINT("FilePrefetchBuffer::PrefetchAsync:ExtraPrefetching"); + s = ReadAsync(new_buf, opts, reader, read_len2, start_offset2); + if (!s.ok()) { + DestroyAndClearIOHandle(new_buf); + new_buf->ClearBuffer(); + return s; + } + } + end_offset1 = end_offset2; } readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2); } return (data_found ? Status::OK() : Status::TryAgain()); } -*/ } // namespace ROCKSDB_NAMESPACE diff --git a/file/file_prefetch_buffer.h b/file/file_prefetch_buffer.h index e6ba6056d051..7e97b52d1289 100644 --- a/file/file_prefetch_buffer.h +++ b/file/file_prefetch_buffer.h @@ -28,8 +28,6 @@ namespace ROCKSDB_NAMESPACE { #define DEFAULT_DECREMENT 8 * 1024 -#define NUM_BUFFERS 1 -#define NUM_OVERLAP_BUFFERS 0 struct IOOptions; class RandomAccessFileReader; @@ -58,9 +56,9 @@ struct BufferInfo { IOHandleDeleter del_fn_ = nullptr; - // pos represents the index of this buffer in queue of BufferInfo. It's + // pos represents the address of this buffer in queue of BufferInfo. It's // required during async callback to know which buffer to filled. - uint32_t pos_ = 0; + BufferInfo* pos_; // initial_end_offset is used to keep track of the end offset of the buffer // that was originally called. It's helpful in case of autotuning of readahead @@ -112,7 +110,8 @@ class FilePrefetchBuffer { uint64_t upper_bound_offset = 0, FileSystem* fs = nullptr, SystemClock* clock = nullptr, Statistics* stats = nullptr, const std::function& cb = nullptr, - FilePrefetchBufferUsage usage = FilePrefetchBufferUsage::kUnknown) + FilePrefetchBufferUsage usage = FilePrefetchBufferUsage::kUnknown, + size_t num_buffers = 1) : readahead_size_(readahead_size), initial_auto_readahead_size_(readahead_size), max_readahead_size_(max_readahead_size), @@ -130,17 +129,22 @@ class FilePrefetchBuffer { stats_(stats), usage_(usage), upper_bound_offset_(upper_bound_offset), - readaheadsize_cb_(cb) { + readaheadsize_cb_(cb), + num_buffers_(num_buffers) { assert((num_file_reads_ >= num_file_reads_for_auto_readahead_ + 1) || (num_file_reads_ == 0)); // If ReadOptions.async_io is enabled, data is asynchronously filled in the // queue. If data is overlapping in two buffers, data is copied to third // buffer i.e. overlap_bufs_ to return continuous buffer. - overlap_bufs_.resize(NUM_OVERLAP_BUFFERS); - free_bufs_.resize(NUM_BUFFERS); - for (uint32_t i = 0; i < NUM_BUFFERS; i++) { + overlap_bufs_.resize(1); + for (uint32_t i = 0; i < 1; i++) { + overlap_bufs_[i] = new BufferInfo(); + } + + free_bufs_.resize(num_buffers_); + for (uint32_t i = 0; i < num_buffers_; i++) { free_bufs_[i] = new BufferInfo(); - free_bufs_[i]->pos_ = i; + free_bufs_[i]->pos_ = free_bufs_[i]; } } @@ -292,13 +296,11 @@ class FilePrefetchBuffer { readahead_size_ = initial_auto_readahead_size_; } - /* void TEST_GetBufferOffsetandSize(uint32_t index, uint64_t& offset, size_t& len) { - offset = bufs_[index].offset_; - len = bufs_[index].buffer_.CurrentSize(); + offset = bufs_[index]->offset_; + len = bufs_[index]->buffer_.CurrentSize(); } - */ private: // Calculates roundoff offset and length to be prefetched based on alignment @@ -308,7 +310,7 @@ class FilePrefetchBuffer { size_t roundup_len, bool refit_tail, uint64_t& chunk_len); - void AbortIOIfNeeded(uint64_t offset); + void AbortOutdatedIO(uint64_t offset); void AbortAllIOs(); @@ -328,11 +330,12 @@ class FilePrefetchBuffer { RandomAccessFileReader* reader, uint64_t read_len, uint64_t chunk_len, uint64_t start_offset); - Status ReadAsync(const IOOptions& opts, RandomAccessFileReader* reader, - uint64_t read_len, uint64_t start_offset); + Status ReadAsync(BufferInfo* buf, const IOOptions& opts, + RandomAccessFileReader* reader, uint64_t read_len, + uint64_t start_offset); // Copy the data from src to third buffer. - void CopyDataToBuffer(uint32_t src, uint64_t& offset, size_t& length); + void CopyDataToBuffer(BufferInfo* src, uint64_t& offset, size_t& length); bool IsBlockSequential(const size_t& offset) { return (prev_len_ == 0 || (prev_offset_ + prev_len_ == offset)); @@ -368,7 +371,7 @@ class FilePrefetchBuffer { return true; } - // Helper functions. + // *** BEGIN Helper APIs related to data in Buffers *** bool IsDataBlockInBuffer(BufferInfo* buf, uint64_t offset, size_t length) { return (offset >= buf->offset_ && offset + length <= buf->offset_ + buf->buffer_.CurrentSize()); @@ -392,18 +395,10 @@ class FilePrefetchBuffer { return (buf->async_read_in_progress_ && offset >= buf->offset_ && offset < buf->offset_ + buf->async_req_len_); } + // *** END Helper APIs related to data in Buffers *** - /* - bool IsSecondBuffEligibleForPrefetching() { - uint32_t second = curr_ ^ 1; - if (bufs_[second].async_read_in_progress_) { - return false; - } - assert(!bufs_[curr_].async_read_in_progress_); - - if (DoesBufferContainData(curr_) && DoesBufferContainData(second) && - (bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize() == - bufs_[second].offset_)) { + bool IsEligibleForFurtherPrefetching() { + if (free_bufs_.empty()) { return false; } @@ -412,26 +407,23 @@ class FilePrefetchBuffer { return false; } - bufs_[second].ClearBuffer(); return true; } - void DestroyAndClearIOHandle(uint32_t index) { - if (bufs_[index].io_handle_ != nullptr && bufs_[index].del_fn_ != nullptr) { - bufs_[index].del_fn_(bufs_[index].io_handle_); - bufs_[index].io_handle_ = nullptr; - bufs_[index].del_fn_ = nullptr; + void DestroyAndClearIOHandle(BufferInfo* buf) { + if (buf->io_handle_ != nullptr && buf->del_fn_ != nullptr) { + buf->del_fn_(buf->io_handle_); + buf->io_handle_ = nullptr; + buf->del_fn_ = nullptr; } - bufs_[index].async_read_in_progress_ = false; + buf->async_read_in_progress_ = false; } - Status HandleOverlappingData(const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset, size_t length, size_t readahead_size, bool& copy_to_third_buffer, uint64_t& tmp_offset, size_t& tmp_length); - */ bool TryReadFromCacheUntracked(const IOOptions& opts, RandomAccessFileReader* reader, @@ -479,20 +471,72 @@ class FilePrefetchBuffer { } } + // *** BEGIN APIs related to allocating and freeing buffers *** void AllocateBuffer() { + assert(!free_bufs_.empty()); BufferInfo* buf = free_bufs_.front(); free_bufs_.pop_front(); - bufs_.push_back(buf); + bufs_.emplace_back(buf); } - void FreeBuffer(BufferInfo* buf) { free_bufs_.push_back(buf); } - void AllocateBufferIfEmpty() { if (bufs_.empty()) { AllocateBuffer(); } } + void FreeFrontBuffer() { + BufferInfo* buf = bufs_.front(); + buf->buffer_.Clear(); + bufs_.pop_front(); + free_bufs_.emplace_back(buf); + } + + void FreeLastBuffer() { + BufferInfo* buf = bufs_.back(); + buf->buffer_.Clear(); + bufs_.pop_back(); + free_bufs_.emplace_back(buf); + } + + void FreeAllBuffers() { + for (auto& buf : bufs_) { + buf->ClearBuffer(); + bufs_.pop_front(); + free_bufs_.emplace_back(buf); + } + } + + void FreeEmptyBuffers() { + if (bufs_.empty()) { + return; + } + + std::deque tmp_buf; + while (!bufs_.empty()) { + BufferInfo* buf = bufs_.front(); + bufs_.pop_front(); + if (buf->async_read_in_progress_ || DoesBufferContainData(buf)) { + tmp_buf.emplace_back(buf); + } else { + free_bufs_.emplace_back(buf); + } + } + bufs_ = tmp_buf; + } + + void ClearAllBuffers() { + if (bufs_.empty()) { + return; + } + for (auto& buf : bufs_) { + buf->ClearBuffer(); + } + FreeEmptyBuffers(); + } + + // *** END APIs related to allocating and freeing buffers *** + std::deque bufs_; std::deque free_bufs_; std::deque overlap_bufs_; @@ -539,5 +583,6 @@ class FilePrefetchBuffer { // upper_bound_offset_ during prefetching. uint64_t upper_bound_offset_ = 0; std::function readaheadsize_cb_; + size_t num_buffers_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/file/prefetch_test.cc b/file/prefetch_test.cc index 2c0e8817a449..7a777da4bde4 100644 --- a/file/prefetch_test.cc +++ b/file/prefetch_test.cc @@ -3293,7 +3293,8 @@ TEST_F(FilePrefetchBufferTest, SeekWithoutAlignment) { /*readahead_size=*/8192, /*max_readahead_size=*/16384, /*enable=*/true, /*track_min_offset=*/false, /*implicit_auto_readahead=*/true, /*num_file_reads=*/0, /*num_file_reads_for_auto_readahead=*/2, - /*upper_bound_offset=*/0, fs()); + /*upper_bound_offset=*/0, fs(), nullptr, nullptr, nullptr, + FilePrefetchBufferUsage::kUnknown, 2); Slice result; // Simulate a seek of half of alignment bytes at offset n. Due to the @@ -3325,7 +3326,8 @@ TEST_F(FilePrefetchBufferTest, SeekWithoutAlignment) { /*readahead_size=*/16384, /*max_readahead_size=*/16384, /*enable=*/true, /*track_min_offset=*/false, /*implicit_auto_readahead=*/false, /*num_file_reads=*/0, /*num_file_reads_for_auto_readahead=*/2, - /*upper_bound_offset=*/0, fs()); + /*upper_bound_offset=*/0, fs(), nullptr, nullptr, nullptr, + FilePrefetchBufferUsage::kUnknown, 2); Slice result; // Simulate a seek of half of alignment bytes at offset n. @@ -3362,7 +3364,8 @@ TEST_F(FilePrefetchBufferTest, NoSyncWithAsyncIO) { /*readahead_size=*/8192, /*max_readahead_size=*/16384, /*enable=*/true, /*track_min_offset=*/false, /*implicit_auto_readahead=*/false, /*num_file_reads=*/0, /*num_file_reads_for_auto_readahead=*/0, - /*upper_bound_offset=*/0, fs()); + /*upper_bound_offset=*/0, fs(), nullptr, nullptr, nullptr, + FilePrefetchBufferUsage::kUnknown, 2); int read_async_called = 0; SyncPoint::GetInstance()->SetCallBack( @@ -3417,7 +3420,8 @@ TEST_F(FilePrefetchBufferTest, IterateUpperBoundTest1) { /*readahead_size=*/8192, /*max_readahead_size=*/16384, /*enable=*/true, /*track_min_offset=*/false, /*implicit_auto_readahead=*/false, /*num_file_reads=*/0, /*num_file_reads_for_auto_readahead=*/0, - /*upper_bound_offset=*/8000, fs()); + /*upper_bound_offset=*/8000, fs(), nullptr, nullptr, nullptr, + FilePrefetchBufferUsage::kUnknown, 2); int read_async_called = 0; SyncPoint::GetInstance()->SetCallBack( diff --git a/table/block_based/block_based_table_iterator.cc b/table/block_based/block_based_table_iterator.cc index 9f174c48c792..d7d3d41ddd5e 100644 --- a/table/block_based/block_based_table_iterator.cc +++ b/table/block_based/block_based_table_iterator.cc @@ -375,7 +375,8 @@ void BlockBasedTableIterator::InitDataBlock() { block_prefetcher_.PrefetchIfNeeded( rep, data_block_handle, read_options_.readahead_size, is_for_compaction, - /*no_sequential_checking=*/false, read_options_, readaheadsize_cb); + /*no_sequential_checking=*/false, read_options_, readaheadsize_cb, + (read_options_.async_io ? 2 : 1)); Status s; table_->NewDataBlockIterator( @@ -435,7 +436,7 @@ void BlockBasedTableIterator::AsyncInitDataBlock(bool is_first_pass) { block_prefetcher_.PrefetchIfNeeded( rep, data_block_handle, read_options_.readahead_size, is_for_compaction, /*no_sequential_checking=*/read_options_.async_io, - read_options_, readaheadsize_cb); + read_options_, readaheadsize_cb, 2); Status s; table_->NewDataBlockIterator( diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index 43ea1602d06f..77d89e2d68d2 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -701,14 +701,14 @@ struct BlockBasedTable::Rep { uint64_t num_file_reads, uint64_t num_file_reads_for_auto_readahead, uint64_t upper_bound_offset, const std::function& readaheadsize_cb, - FilePrefetchBufferUsage usage) const { + FilePrefetchBufferUsage usage, size_t num_buffers) const { fpb->reset(new FilePrefetchBuffer( readahead_size, max_readahead_size, !ioptions.allow_mmap_reads /* enable */, false /* track_min_offset */, implicit_auto_readahead, num_file_reads, num_file_reads_for_auto_readahead, upper_bound_offset, ioptions.fs.get(), ioptions.clock, ioptions.stats, readaheadsize_cb, - usage)); + usage, num_buffers)); } void CreateFilePrefetchBufferIfNotExists( @@ -717,12 +717,13 @@ struct BlockBasedTable::Rep { uint64_t num_file_reads, uint64_t num_file_reads_for_auto_readahead, uint64_t upper_bound_offset, const std::function& readaheadsize_cb, - FilePrefetchBufferUsage usage = FilePrefetchBufferUsage::kUnknown) const { + FilePrefetchBufferUsage usage = FilePrefetchBufferUsage::kUnknown, + size_t num_buffers = 1) const { if (!(*fpb)) { - CreateFilePrefetchBuffer(readahead_size, max_readahead_size, fpb, - implicit_auto_readahead, num_file_reads, - num_file_reads_for_auto_readahead, - upper_bound_offset, readaheadsize_cb, usage); + CreateFilePrefetchBuffer( + readahead_size, max_readahead_size, fpb, implicit_auto_readahead, + num_file_reads, num_file_reads_for_auto_readahead, upper_bound_offset, + readaheadsize_cb, usage, num_buffers); } } diff --git a/table/block_based/block_prefetcher.cc b/table/block_based/block_prefetcher.cc index 4e750d7999e5..ca0ad52a993a 100644 --- a/table/block_based/block_prefetcher.cc +++ b/table/block_based/block_prefetcher.cc @@ -16,7 +16,8 @@ void BlockPrefetcher::PrefetchIfNeeded( const BlockBasedTable::Rep* rep, const BlockHandle& handle, const size_t readahead_size, bool is_for_compaction, const bool no_sequential_checking, const ReadOptions& read_options, - const std::function& readaheadsize_cb) { + const std::function& readaheadsize_cb, + size_t num_buffer) { const size_t len = BlockBasedTable::BlockSizeWithTrailer(handle); const size_t offset = handle.offset(); if (is_for_compaction) { @@ -59,7 +60,7 @@ void BlockPrefetcher::PrefetchIfNeeded( /*implicit_auto_readahead=*/false, /*num_file_reads=*/0, /*num_file_reads_for_auto_readahead=*/0, upper_bound_offset_, readaheadsize_cb, - /*usage=*/FilePrefetchBufferUsage::kUserScanPrefetch); + /*usage=*/FilePrefetchBufferUsage::kUserScanPrefetch, num_buffer); return; } @@ -85,7 +86,7 @@ void BlockPrefetcher::PrefetchIfNeeded( /*num_file_reads=*/0, rep->table_options.num_file_reads_for_auto_readahead, upper_bound_offset_, readaheadsize_cb, - /*usage=*/FilePrefetchBufferUsage::kUserScanPrefetch); + /*usage=*/FilePrefetchBufferUsage::kUserScanPrefetch, num_buffer); return; } @@ -117,7 +118,7 @@ void BlockPrefetcher::PrefetchIfNeeded( &prefetch_buffer_, /*implicit_auto_readahead=*/true, num_file_reads_, rep->table_options.num_file_reads_for_auto_readahead, upper_bound_offset_, readaheadsize_cb, - /*usage=*/FilePrefetchBufferUsage::kUserScanPrefetch); + /*usage=*/FilePrefetchBufferUsage::kUserScanPrefetch, num_buffer); return; } @@ -140,7 +141,7 @@ void BlockPrefetcher::PrefetchIfNeeded( &prefetch_buffer_, /*implicit_auto_readahead=*/true, num_file_reads_, rep->table_options.num_file_reads_for_auto_readahead, upper_bound_offset_, readaheadsize_cb, - /*usage=*/FilePrefetchBufferUsage::kUserScanPrefetch); + /*usage=*/FilePrefetchBufferUsage::kUserScanPrefetch, num_buffer); return; } diff --git a/table/block_based/block_prefetcher.h b/table/block_based/block_prefetcher.h index af0a63018e85..5662e22e25eb 100644 --- a/table/block_based/block_prefetcher.h +++ b/table/block_based/block_prefetcher.h @@ -22,7 +22,8 @@ class BlockPrefetcher { const BlockBasedTable::Rep* rep, const BlockHandle& handle, size_t readahead_size, bool is_for_compaction, const bool no_sequential_checking, const ReadOptions& read_options, - const std::function& readaheadsize_cb); + const std::function& readaheadsize_cb, + size_t num_buffer); FilePrefetchBuffer* prefetch_buffer() { return prefetch_buffer_.get(); } void UpdateReadPattern(const uint64_t& offset, const size_t& len) { diff --git a/table/block_based/partitioned_filter_block.cc b/table/block_based/partitioned_filter_block.cc index c908db41d354..25af9b5749d5 100644 --- a/table/block_based/partitioned_filter_block.cc +++ b/table/block_based/partitioned_filter_block.cc @@ -499,7 +499,7 @@ Status PartitionedFilterBlockReader::CacheDependencies( 0, 0, &prefetch_buffer, false /* Implicit autoreadahead */, 0 /*num_reads_*/, 0 /*num_file_reads_for_auto_readahead*/, /*upper_bound_offset*/ 0, /*readaheadsize_cb*/ nullptr, - /*usage=*/FilePrefetchBufferUsage::kUnknown); + /*usage=*/FilePrefetchBufferUsage::kUnknown, 1); IOOptions opts; s = rep->file->PrepareIOOptions(ro, opts); diff --git a/table/block_based/partitioned_index_iterator.cc b/table/block_based/partitioned_index_iterator.cc index cc6f70130927..d40530b18595 100644 --- a/table/block_based/partitioned_index_iterator.cc +++ b/table/block_based/partitioned_index_iterator.cc @@ -92,7 +92,7 @@ void PartitionedIndexIterator::InitPartitionedIndexBlock() { block_prefetcher_.PrefetchIfNeeded( rep, partitioned_index_handle, read_options_.readahead_size, is_for_compaction, /*no_sequential_checking=*/false, read_options_, - /*readaheadsize_cb=*/nullptr); + /*readaheadsize_cb=*/nullptr, 1); Status s; table_->NewDataBlockIterator( read_options_, partitioned_index_handle, &block_iter_, diff --git a/table/block_based/partitioned_index_reader.cc b/table/block_based/partitioned_index_reader.cc index f825907180a8..52ce008b16fa 100644 --- a/table/block_based/partitioned_index_reader.cc +++ b/table/block_based/partitioned_index_reader.cc @@ -171,7 +171,7 @@ Status PartitionIndexReader::CacheDependencies( 0, 0, &prefetch_buffer, false /*Implicit auto readahead*/, 0 /*num_reads_*/, 0 /*num_file_reads_for_auto_readahead*/, /*upper_bound_offset*/ 0, /*readaheadsize_cb*/ nullptr, - /*usage=*/FilePrefetchBufferUsage::kUnknown); + /*usage=*/FilePrefetchBufferUsage::kUnknown, 1); IOOptions opts; { Status s = rep->file->PrepareIOOptions(ro, opts); diff --git a/table/table_test.cc b/table/table_test.cc index bccc8dbf2dbd..d9a84d87444b 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -3500,7 +3500,7 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupAsyncScansSeek) { { // 1st Buffer Verification. // curr buffer - 1. - prefetch_buffer->TEST_GetBufferOffsetandSize(1, buffer_offset, + prefetch_buffer->TEST_GetBufferOffsetandSize(0, buffer_offset, buffer_len); bbt->TEST_GetDataBlockHandle(read_options, kv_iter->first, block_handle); @@ -3508,7 +3508,7 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupAsyncScansSeek) { ASSERT_EQ(buffer_len, 8192); // 2nd Buffer Verification. - prefetch_buffer->TEST_GetBufferOffsetandSize(0, buffer_offset, + prefetch_buffer->TEST_GetBufferOffsetandSize(1, buffer_offset, buffer_len); InternalKey ikey_tmp("00000585", 0, kTypeValue); bbt->TEST_GetDataBlockHandle(read_options, ikey_tmp.Encode().ToString(), @@ -3566,7 +3566,7 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupAsyncScansSeek) { { // 1st Buffer Verification. // curr_ - 1. - prefetch_buffer->TEST_GetBufferOffsetandSize(1, buffer_offset, + prefetch_buffer->TEST_GetBufferOffsetandSize(0, buffer_offset, buffer_len); ASSERT_EQ(buffer_len, 4096); bbt->TEST_GetDataBlockHandle(read_options, kv_iter->first, @@ -3574,7 +3574,7 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupAsyncScansSeek) { ASSERT_EQ(buffer_offset, block_handle.offset()); // 2nd Buffer Verification. - prefetch_buffer->TEST_GetBufferOffsetandSize(0, buffer_offset, + prefetch_buffer->TEST_GetBufferOffsetandSize(1, buffer_offset, buffer_len); InternalKey ikey_tmp("00000630", 0, kTypeValue); bbt->TEST_GetDataBlockHandle(read_options, ikey_tmp.Encode().ToString(),