From 419fbd59e60dcd5079b828ba82e75decdc03668b Mon Sep 17 00:00:00 2001 From: akankshamahajan Date: Thu, 4 Jan 2024 17:27:20 -0800 Subject: [PATCH] Fix stress test failures and addressed comments Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags: --- file/file_prefetch_buffer.cc | 131 +++++++++--------- file/file_prefetch_buffer.h | 1 + .../block_based/block_based_table_iterator.cc | 4 +- 3 files changed, 66 insertions(+), 70 deletions(-) diff --git a/file/file_prefetch_buffer.cc b/file/file_prefetch_buffer.cc index 85f61f158ca5..11deed49f751 100644 --- a/file/file_prefetch_buffer.cc +++ b/file/file_prefetch_buffer.cc @@ -230,7 +230,6 @@ void FilePrefetchBuffer::AbortOutdatedIO(uint64_t offset) { } buf->ClearBuffer(); } - FreeEmptyBuffers(); } void FilePrefetchBuffer::AbortAllIOs() { @@ -250,15 +249,17 @@ void FilePrefetchBuffer::AbortAllIOs() { if (buf->io_handle_ != nullptr && buf->del_fn_ != nullptr) { DestroyAndClearIOHandle(buf); } - buf->ClearBuffer(); buf->async_read_in_progress_ = false; } - FreeEmptyBuffers(); } -// Clear the buffers if it contains outdated data. Outdated data can be -// because previous sequential reads were read from the cache instead of these -// buffer. +// Clear the buffers if it contains outdated data wrt offset. Outdated data can +// be because previous sequential reads were read from the cache instead of +// these buffer or there is IOError while filling the buffers. +// +// offset - the offset requested to be read. This API makes sure that the +// front/first buffer in bufs_ should contain this offset, otherwise, all +// buffers will be freed. void FilePrefetchBuffer::ClearOutdatedData(uint64_t offset, size_t length) { while (!IsBufferQueueEmpty()) { BufferInfo* buf = GetFirstBuffer(); @@ -270,34 +271,47 @@ void FilePrefetchBuffer::ClearOutdatedData(uint64_t offset, size_t length) { } } - if (IsBufferQueueEmpty()) { + if (IsBufferQueueEmpty() || NumBuffersAllocated() == 1) { return; } BufferInfo* buf = GetFirstBuffer(); - // In case buffers do not align, reset next buffer if requested data needs - // to be read in that buffer. - if (NumBuffersAllocated() > 1) { + if (buf->async_read_in_progress_) { + FreeEmptyBuffers(); + return; + } + + // Below handles the case for Overlapping buffers (NumBuffersAllocated > 1). + bool abort_io = false; + + if (buf->DoesBufferContainData() && buf->IsOffsetInBuffer(offset)) { BufferInfo* next_buf = bufs_[1]; - if (!next_buf->async_read_in_progress_ && !buf->async_read_in_progress_ && - buf->DoesBufferContainData()) { - if (buf->offset_ + buf->CurrentSize() != next_buf->offset_) { - if (next_buf->DoesBufferContainData() && - buf->IsOffsetInBuffer(offset) && - (offset + length > buf->offset_ + buf->CurrentSize())) { - // Clear all buffers after first. - for (size_t i = 1; i < bufs_.size(); ++i) { - bufs_[i]->ClearBuffer(); - } - FreeEmptyBuffers(); - } - } + if (/* next buffer doesn't align with first buffer and requested data + overlaps with next buffer */ + ((buf->offset_ + buf->CurrentSize() != next_buf->offset_) && + (offset + length > buf->offset_ + buf->CurrentSize()))) { + abort_io = true; + } + } else { + // buffer with offset doesn't contain data or offset doesn't lie in this + // buffer. + buf->ClearBuffer(); + abort_io = true; + } + + if (abort_io) { + AbortAllIOs(); + // Clear all buffers after first. + for (size_t i = 1; i < bufs_.size(); ++i) { + bufs_[i]->ClearBuffer(); } } + FreeEmptyBuffers(); + assert(IsBufferQueueEmpty() || buf->IsOffsetInBuffer(offset)); } -void FilePrefetchBuffer::PollIfNeeded(uint64_t /*offset*/, size_t /*length*/) { +void FilePrefetchBuffer::PollIfNeeded(uint64_t offset, size_t length) { BufferInfo* buf = GetFirstBuffer(); if (buf->async_read_in_progress_ && fs_ != nullptr) { @@ -315,6 +329,10 @@ void FilePrefetchBuffer::PollIfNeeded(uint64_t /*offset*/, size_t /*length*/) { // completed. DestroyAndClearIOHandle(buf); } + + // Always call outdated data after Poll as Buffers might be out of sync w.r.t + // offset and length. + ClearOutdatedData(offset, length); } // ReadAheadSizeTuning API calls readaheadsize_cb_ @@ -349,6 +367,7 @@ void FilePrefetchBuffer::ReadAheadSizeTuning( // read_len will be 0 and there is nothing to read/prefetch. if (updated_start_offset == updated_end_offset) { + start_offset = end_offset = updated_start_offset; UpdateReadAheadTrimmedStat((initial_end_offset - initial_start_offset), (updated_end_offset - updated_start_offset)); return; @@ -423,6 +442,10 @@ Status FilePrefetchBuffer::HandleOverlappingData( PollIfNeeded(offset, length); } + if (IsBufferQueueEmpty() || NumBuffersAllocated() == 1) { + return Status::OK(); + } + BufferInfo* next_buf = bufs_[1]; // If data is overlapping over two buffers, copy the data from front and @@ -457,9 +480,10 @@ Status FilePrefetchBuffer::HandleOverlappingData( // If requested bytes - tmp_offset + tmp_length are in next buffer, freed // buffer can go for further prefetching. // If requested bytes are not in next buffer, next buffer has to go for sync - // call to get remaining requested bytes. In that it shouldn't go for async - // prefetching as async prefetching calculates offset based on previous - // buffer end offset and previous buffer has to go for sync prefetching. + // call to get remaining requested bytes. In that case it shouldn't go for + // async prefetching as async prefetching calculates offset based on + // previous buffer end offset and previous buffer has to go for sync + // prefetching. if (tmp_offset + tmp_length <= next_buf->offset_ + second_size) { AllocateBuffer(); @@ -519,6 +543,7 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts, // Abort outdated IO. if (!explicit_prefetch_submitted_) { AbortOutdatedIO(offset); + FreeEmptyBuffers(); } ClearOutdatedData(offset, length); @@ -544,15 +569,13 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts, return s; } } else { - // NOTE - After this poll request, first buffer might be empty because of - // IOError in callback while reading or it may contains the required data. - PollIfNeeded(offset, length); + PollIfNeeded(tmp_offset, tmp_length); } - if (copy_to_overlap_buffer) { - offset = tmp_offset; - length = tmp_length; - } + AllocateBufferIfEmpty(); + buf = GetFirstBuffer(); + offset = tmp_offset; + length = tmp_length; // After polling, if all the requested bytes are in first buffer, it will only // go for async prefetching. @@ -568,9 +591,10 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts, // Length == 0: All the requested data has been copied to overlap buffer // and it has already gone for async prefetching. It can return without - // doing anything further. Length > 0: More data needs to be consumed so - // it will continue async and sync prefetching and copy the remaining data - // to overlap buffer in the end. + // doing anything further. + // Length > 0: More data needs to be consumed so it will continue async + // and sync prefetching and copy the remaining data to overlap buffer in + // the end. if (length == 0) { UpdateStats(/*found_in_buffer=*/true, length); return s; @@ -598,37 +622,6 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts, assert(!buf->async_read_in_progress_); - // In case because of some IOError first buffer got empty, abort IO for all - // buffers as well. Otherwise data might not align if more data needs to be - // read in first buffer which might overlap with second buffer. - if (!buf->DoesBufferContainData()) { - std::vector handles; - if (NumBuffersAllocated() > 1) { - for (auto& _buf : bufs_) { - if (_buf->async_read_in_progress_) { - handles.emplace_back(_buf->io_handle_); - } - } - } - if (!handles.empty()) { - StopWatch sw(clock_, stats_, ASYNC_PREFETCH_ABORT_MICROS); - s = fs_->AbortIO(handles); - assert(s.ok()); - } - - for (auto& _buf : bufs_) { - if (_buf->async_read_in_progress_) { - DestroyAndClearIOHandle(_buf); - _buf->async_read_in_progress_ = false; - _buf->ClearBuffer(); - } - } - FreeEmptyBuffers(); - } - - AllocateBufferIfEmpty(); - buf = GetFirstBuffer(); - // Go for ReadAsync and Read (if needed). // offset and size alignment for first buffer with synchronous prefetching uint64_t start_offset1 = offset, end_offset1 = 0, aligned_useful_len1 = 0; @@ -844,6 +837,8 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, // Cancel any pending async read to make code simpler as buffers can be out // of sync. AbortAllIOs(); + // Free empty buffers after aborting IOs. + FreeEmptyBuffers(); ClearOutdatedData(offset, n); // - Since PrefetchAsync can be called on non sequential reads. So offset can diff --git a/file/file_prefetch_buffer.h b/file/file_prefetch_buffer.h index 4c92849c7966..0c3057b3642e 100644 --- a/file/file_prefetch_buffer.h +++ b/file/file_prefetch_buffer.h @@ -49,6 +49,7 @@ struct ReadaheadParams { // after doing sequential scans for num_file_reads_for_auto_readahead. bool implicit_auto_readahead = false; + // TODO akanksha - Remove num_file_reads when BlockPrefetcher is refactored. uint64_t num_file_reads = 0; uint64_t num_file_reads_for_auto_readahead = 0; diff --git a/table/block_based/block_based_table_iterator.cc b/table/block_based/block_based_table_iterator.cc index 2968196abb7c..6a81acbeb6dc 100644 --- a/table/block_based/block_based_table_iterator.cc +++ b/table/block_based/block_based_table_iterator.cc @@ -713,7 +713,6 @@ void BlockBasedTableIterator::InitializeStartAndEndOffsets( // It can be due to reading error in second buffer in FilePrefetchBuffer. // BlockHandles already added to the queue but there was error in fetching // those data blocks. So in this call they need to be read again. - assert(block_handles_.front().is_cache_hit_ == false); found_first_miss_block = true; // Initialize prev_handles_size to 0 as all those handles need to be read // again. @@ -856,7 +855,8 @@ void BlockBasedTableIterator::BlockCacheLookupForReadAheadSize( auto it_end = block_handles_.rbegin() + (block_handles_.size() - prev_handles_size); - while (it != it_end && (*it).is_cache_hit_) { + while (it != it_end && (*it).is_cache_hit_ && + start_updated_offset != (*it).handle_.offset()) { it++; } end_updated_offset = (*it).handle_.offset() + footer + (*it).handle_.size();