Skip to content

Commit

Permalink
Fix stress test failures and addressed comments
Browse files Browse the repository at this point in the history
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
  • Loading branch information
akankshamahajan15 committed Jan 5, 2024
1 parent e66dace commit 419fbd5
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 70 deletions.
131 changes: 63 additions & 68 deletions file/file_prefetch_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,6 @@ void FilePrefetchBuffer::AbortOutdatedIO(uint64_t offset) {
}
buf->ClearBuffer();
}
FreeEmptyBuffers();
}

void FilePrefetchBuffer::AbortAllIOs() {
Expand All @@ -250,15 +249,17 @@ void FilePrefetchBuffer::AbortAllIOs() {
if (buf->io_handle_ != nullptr && buf->del_fn_ != nullptr) {
DestroyAndClearIOHandle(buf);
}
buf->ClearBuffer();
buf->async_read_in_progress_ = false;
}
FreeEmptyBuffers();
}

// Clear the buffers if it contains outdated data. Outdated data can be
// because previous sequential reads were read from the cache instead of these
// buffer.
// Clear the buffers if it contains outdated data wrt offset. Outdated data can
// be because previous sequential reads were read from the cache instead of
// these buffer or there is IOError while filling the buffers.
//
// offset - the offset requested to be read. This API makes sure that the
// front/first buffer in bufs_ should contain this offset, otherwise, all
// buffers will be freed.
void FilePrefetchBuffer::ClearOutdatedData(uint64_t offset, size_t length) {
while (!IsBufferQueueEmpty()) {
BufferInfo* buf = GetFirstBuffer();
Expand All @@ -270,34 +271,47 @@ void FilePrefetchBuffer::ClearOutdatedData(uint64_t offset, size_t length) {
}
}

if (IsBufferQueueEmpty()) {
if (IsBufferQueueEmpty() || NumBuffersAllocated() == 1) {
return;
}

BufferInfo* buf = GetFirstBuffer();

// In case buffers do not align, reset next buffer if requested data needs
// to be read in that buffer.
if (NumBuffersAllocated() > 1) {
if (buf->async_read_in_progress_) {
FreeEmptyBuffers();
return;
}

// Below handles the case for Overlapping buffers (NumBuffersAllocated > 1).
bool abort_io = false;

if (buf->DoesBufferContainData() && buf->IsOffsetInBuffer(offset)) {
BufferInfo* next_buf = bufs_[1];
if (!next_buf->async_read_in_progress_ && !buf->async_read_in_progress_ &&
buf->DoesBufferContainData()) {
if (buf->offset_ + buf->CurrentSize() != next_buf->offset_) {
if (next_buf->DoesBufferContainData() &&
buf->IsOffsetInBuffer(offset) &&
(offset + length > buf->offset_ + buf->CurrentSize())) {
// Clear all buffers after first.
for (size_t i = 1; i < bufs_.size(); ++i) {
bufs_[i]->ClearBuffer();
}
FreeEmptyBuffers();
}
}
if (/* next buffer doesn't align with first buffer and requested data
overlaps with next buffer */
((buf->offset_ + buf->CurrentSize() != next_buf->offset_) &&
(offset + length > buf->offset_ + buf->CurrentSize()))) {
abort_io = true;
}
} else {
// buffer with offset doesn't contain data or offset doesn't lie in this
// buffer.
buf->ClearBuffer();
abort_io = true;
}

if (abort_io) {
AbortAllIOs();
// Clear all buffers after first.
for (size_t i = 1; i < bufs_.size(); ++i) {
bufs_[i]->ClearBuffer();
}
}
FreeEmptyBuffers();
assert(IsBufferQueueEmpty() || buf->IsOffsetInBuffer(offset));
}

void FilePrefetchBuffer::PollIfNeeded(uint64_t /*offset*/, size_t /*length*/) {
void FilePrefetchBuffer::PollIfNeeded(uint64_t offset, size_t length) {
BufferInfo* buf = GetFirstBuffer();

if (buf->async_read_in_progress_ && fs_ != nullptr) {
Expand All @@ -315,6 +329,10 @@ void FilePrefetchBuffer::PollIfNeeded(uint64_t /*offset*/, size_t /*length*/) {
// completed.
DestroyAndClearIOHandle(buf);
}

// Always call outdated data after Poll as Buffers might be out of sync w.r.t
// offset and length.
ClearOutdatedData(offset, length);
}

// ReadAheadSizeTuning API calls readaheadsize_cb_
Expand Down Expand Up @@ -349,6 +367,7 @@ void FilePrefetchBuffer::ReadAheadSizeTuning(

// read_len will be 0 and there is nothing to read/prefetch.
if (updated_start_offset == updated_end_offset) {
start_offset = end_offset = updated_start_offset;
UpdateReadAheadTrimmedStat((initial_end_offset - initial_start_offset),
(updated_end_offset - updated_start_offset));
return;
Expand Down Expand Up @@ -423,6 +442,10 @@ Status FilePrefetchBuffer::HandleOverlappingData(
PollIfNeeded(offset, length);
}

if (IsBufferQueueEmpty() || NumBuffersAllocated() == 1) {
return Status::OK();
}

BufferInfo* next_buf = bufs_[1];

// If data is overlapping over two buffers, copy the data from front and
Expand Down Expand Up @@ -457,9 +480,10 @@ Status FilePrefetchBuffer::HandleOverlappingData(
// If requested bytes - tmp_offset + tmp_length are in next buffer, freed
// buffer can go for further prefetching.
// If requested bytes are not in next buffer, next buffer has to go for sync
// call to get remaining requested bytes. In that it shouldn't go for async
// prefetching as async prefetching calculates offset based on previous
// buffer end offset and previous buffer has to go for sync prefetching.
// call to get remaining requested bytes. In that case it shouldn't go for
// async prefetching as async prefetching calculates offset based on
// previous buffer end offset and previous buffer has to go for sync
// prefetching.

if (tmp_offset + tmp_length <= next_buf->offset_ + second_size) {
AllocateBuffer();
Expand Down Expand Up @@ -519,6 +543,7 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts,
// Abort outdated IO.
if (!explicit_prefetch_submitted_) {
AbortOutdatedIO(offset);
FreeEmptyBuffers();
}
ClearOutdatedData(offset, length);

Expand All @@ -544,15 +569,13 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts,
return s;
}
} else {
// NOTE - After this poll request, first buffer might be empty because of
// IOError in callback while reading or it may contains the required data.
PollIfNeeded(offset, length);
PollIfNeeded(tmp_offset, tmp_length);
}

if (copy_to_overlap_buffer) {
offset = tmp_offset;
length = tmp_length;
}
AllocateBufferIfEmpty();
buf = GetFirstBuffer();
offset = tmp_offset;
length = tmp_length;

// After polling, if all the requested bytes are in first buffer, it will only
// go for async prefetching.
Expand All @@ -568,9 +591,10 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts,

// Length == 0: All the requested data has been copied to overlap buffer
// and it has already gone for async prefetching. It can return without
// doing anything further. Length > 0: More data needs to be consumed so
// it will continue async and sync prefetching and copy the remaining data
// to overlap buffer in the end.
// doing anything further.
// Length > 0: More data needs to be consumed so it will continue async
// and sync prefetching and copy the remaining data to overlap buffer in
// the end.
if (length == 0) {
UpdateStats(/*found_in_buffer=*/true, length);
return s;
Expand Down Expand Up @@ -598,37 +622,6 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts,

assert(!buf->async_read_in_progress_);

// In case because of some IOError first buffer got empty, abort IO for all
// buffers as well. Otherwise data might not align if more data needs to be
// read in first buffer which might overlap with second buffer.
if (!buf->DoesBufferContainData()) {
std::vector<void*> handles;
if (NumBuffersAllocated() > 1) {
for (auto& _buf : bufs_) {
if (_buf->async_read_in_progress_) {
handles.emplace_back(_buf->io_handle_);
}
}
}
if (!handles.empty()) {
StopWatch sw(clock_, stats_, ASYNC_PREFETCH_ABORT_MICROS);
s = fs_->AbortIO(handles);
assert(s.ok());
}

for (auto& _buf : bufs_) {
if (_buf->async_read_in_progress_) {
DestroyAndClearIOHandle(_buf);
_buf->async_read_in_progress_ = false;
_buf->ClearBuffer();
}
}
FreeEmptyBuffers();
}

AllocateBufferIfEmpty();
buf = GetFirstBuffer();

// Go for ReadAsync and Read (if needed).
// offset and size alignment for first buffer with synchronous prefetching
uint64_t start_offset1 = offset, end_offset1 = 0, aligned_useful_len1 = 0;
Expand Down Expand Up @@ -844,6 +837,8 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
// Cancel any pending async read to make code simpler as buffers can be out
// of sync.
AbortAllIOs();
// Free empty buffers after aborting IOs.
FreeEmptyBuffers();
ClearOutdatedData(offset, n);

// - Since PrefetchAsync can be called on non sequential reads. So offset can
Expand Down
1 change: 1 addition & 0 deletions file/file_prefetch_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ struct ReadaheadParams {
// after doing sequential scans for num_file_reads_for_auto_readahead.
bool implicit_auto_readahead = false;

// TODO akanksha - Remove num_file_reads when BlockPrefetcher is refactored.
uint64_t num_file_reads = 0;
uint64_t num_file_reads_for_auto_readahead = 0;

Expand Down
4 changes: 2 additions & 2 deletions table/block_based/block_based_table_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,6 @@ void BlockBasedTableIterator::InitializeStartAndEndOffsets(
// It can be due to reading error in second buffer in FilePrefetchBuffer.
// BlockHandles already added to the queue but there was error in fetching
// those data blocks. So in this call they need to be read again.
assert(block_handles_.front().is_cache_hit_ == false);
found_first_miss_block = true;
// Initialize prev_handles_size to 0 as all those handles need to be read
// again.
Expand Down Expand Up @@ -856,7 +855,8 @@ void BlockBasedTableIterator::BlockCacheLookupForReadAheadSize(
auto it_end =
block_handles_.rbegin() + (block_handles_.size() - prev_handles_size);

while (it != it_end && (*it).is_cache_hit_) {
while (it != it_end && (*it).is_cache_hit_ &&
start_updated_offset != (*it).handle_.offset()) {
it++;
}
end_updated_offset = (*it).handle_.offset() + footer + (*it).handle_.size();
Expand Down

0 comments on commit 419fbd5

Please sign in to comment.