Skip to content

Commit

Permalink
Addressed additional comments
Browse files Browse the repository at this point in the history
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
  • Loading branch information
akankshamahajan15 committed Dec 19, 2023
1 parent 6c6797c commit 8eee7ec
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 63 deletions.
115 changes: 64 additions & 51 deletions file/file_prefetch_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@

namespace ROCKSDB_NAMESPACE {

void FilePrefetchBuffer::CalculateOffsetAndLen(
BufferInfo* buf, size_t alignment, uint64_t offset, size_t roundup_len,
bool refit_tail, uint64_t& chunk_len) {
void FilePrefetchBuffer::PrepareBufferForRead(BufferInfo* buf, size_t alignment,
uint64_t offset,
size_t roundup_len,
bool refit_tail,
uint64_t& aligned_useful_len) {
uint64_t chunk_offset_in_buffer = 0;
bool copy_data_to_new_buffer = false;
// Check if requested bytes are in the existing buffer_.
Expand All @@ -37,13 +39,13 @@ void FilePrefetchBuffer::CalculateOffsetAndLen(
// new buffer is created.
chunk_offset_in_buffer =
Rounddown(static_cast<size_t>(offset - buf->offset_), alignment);
chunk_len =
aligned_useful_len =
static_cast<uint64_t>(buf->CurrentSize()) - chunk_offset_in_buffer;
assert(chunk_offset_in_buffer % alignment == 0);
assert(chunk_len % alignment == 0);
assert(chunk_offset_in_buffer + chunk_len <=
assert(aligned_useful_len % alignment == 0);
assert(chunk_offset_in_buffer + aligned_useful_len <=
buf->offset_ + buf->CurrentSize());
if (chunk_len > 0) {
if (aligned_useful_len > 0) {
copy_data_to_new_buffer = true;
} else {
// this reset is not necessary, but just to be safe.
Expand All @@ -52,38 +54,40 @@ void FilePrefetchBuffer::CalculateOffsetAndLen(
}

// Create a new buffer only if current capacity is not sufficient, and memcopy
// bytes from old buffer if needed (i.e., if chunk_len is greater than 0).
// bytes from old buffer if needed (i.e., if aligned_useful_len is greater
// than 0).
if (buf->buffer_.Capacity() < roundup_len) {
buf->buffer_.Alignment(alignment);
buf->buffer_.AllocateNewBuffer(
static_cast<size_t>(roundup_len), copy_data_to_new_buffer,
chunk_offset_in_buffer, static_cast<size_t>(chunk_len));
} else if (chunk_len > 0 && refit_tail) {
chunk_offset_in_buffer, static_cast<size_t>(aligned_useful_len));
} else if (aligned_useful_len > 0 && refit_tail) {
// New buffer not needed. But memmove bytes from tail to the beginning since
// chunk_len is greater than 0.
// aligned_useful_len is greater than 0.
buf->buffer_.RefitTail(static_cast<size_t>(chunk_offset_in_buffer),
static_cast<size_t>(chunk_len));
} else if (chunk_len > 0) {
// For async prefetching, it doesn't call RefitTail with chunk_len > 0.
// Allocate new buffer if needed because aligned buffer calculate remaining
// buffer as capacity - cursize which might not be the case in this as it's
// not refitting.
static_cast<size_t>(aligned_useful_len));
} else if (aligned_useful_len > 0) {
// For async prefetching, it doesn't call RefitTail with aligned_useful_len
// > 0. Allocate new buffer if needed because aligned buffer calculate
// remaining buffer as capacity - cursize which might not be the case in
// this as it's not refitting.
// TODO: Use refit_tail for async prefetching too.
buf->buffer_.Alignment(alignment);
buf->buffer_.AllocateNewBuffer(
static_cast<size_t>(roundup_len), copy_data_to_new_buffer,
chunk_offset_in_buffer, static_cast<size_t>(chunk_len));
chunk_offset_in_buffer, static_cast<size_t>(aligned_useful_len));
}
}

Status FilePrefetchBuffer::Read(BufferInfo* buf, const IOOptions& opts,
RandomAccessFileReader* reader,
uint64_t read_len, uint64_t chunk_len,
uint64_t read_len, uint64_t aligned_useful_len,
uint64_t start_offset) {
Slice result;
Status s = reader->Read(opts, start_offset + chunk_len, read_len, &result,
buf->buffer_.BufferStart() + chunk_len,
/*aligned_buf=*/nullptr);
Status s =
reader->Read(opts, start_offset + aligned_useful_len, read_len, &result,
buf->buffer_.BufferStart() + aligned_useful_len,
/*aligned_buf=*/nullptr);
#ifndef NDEBUG
if (result.size() < read_len) {
// Fake an IO error to force db_stress fault injection to ignore
Expand All @@ -99,7 +103,7 @@ Status FilePrefetchBuffer::Read(BufferInfo* buf, const IOOptions& opts,
RecordTick(stats_, PREFETCH_BYTES, read_len);
}
// Update the buffer size.
buf->buffer_.Size(static_cast<size_t>(chunk_len) + result.size());
buf->buffer_.Size(static_cast<size_t>(aligned_useful_len) + result.size());
return s;
}

Expand All @@ -118,7 +122,7 @@ Status FilePrefetchBuffer::ReadAsync(BufferInfo* buf, const IOOptions& opts,
req.scratch = buf->buffer_.BufferStart();
buf->async_req_len_ = req.len;

Status s = reader->ReadAsync(req, opts, fp, buf->pos_, &(buf->io_handle_),
Status s = reader->ReadAsync(req, opts, fp, buf, &(buf->io_handle_),
&(buf->del_fn_), /*aligned_buf =*/nullptr);
req.status.PermitUncheckedError();
if (s.ok()) {
Expand Down Expand Up @@ -148,16 +152,17 @@ Status FilePrefetchBuffer::Prefetch(const IOOptions& opts,
}

size_t alignment = reader->file()->GetRequiredBufferAlignment();
uint64_t rounddown_offset = offset, roundup_end = 0, chunk_len = 0;
uint64_t rounddown_offset = offset, roundup_end = 0, aligned_useful_len = 0;
size_t read_len = 0;

ReadAheadSizeTuning(buf, /*read_curr_block=*/true,
/*refit_tail=*/true, rounddown_offset, alignment, 0, n,
rounddown_offset, roundup_end, read_len, chunk_len);
rounddown_offset, roundup_end, read_len,
aligned_useful_len);

Status s;
if (read_len > 0) {
s = Read(buf, opts, reader, read_len, chunk_len, rounddown_offset);
s = Read(buf, opts, reader, read_len, aligned_useful_len, rounddown_offset);
}

if (usage_ == FilePrefetchBufferUsage::kTableOpenPrefetchTail && s.ok()) {
Expand Down Expand Up @@ -245,6 +250,7 @@ void FilePrefetchBuffer::AbortAllIOs() {
if (buf->io_handle_ != nullptr && buf->del_fn_ != nullptr) {
DestroyAndClearIOHandle(buf);
}
buf->ClearBuffer();
buf->async_read_in_progress_ = false;
}
FreeEmptyBuffers();
Expand Down Expand Up @@ -328,7 +334,7 @@ void FilePrefetchBuffer::ReadAheadSizeTuning(
BufferInfo* buf, bool read_curr_block, bool refit_tail,
uint64_t prev_buf_end_offset, size_t alignment, size_t length,
size_t readahead_size, uint64_t& start_offset, uint64_t& end_offset,
size_t& read_len, uint64_t& chunk_len) {
size_t& read_len, uint64_t& aligned_useful_len) {
uint64_t updated_start_offset = Rounddown(start_offset, alignment);
uint64_t updated_end_offset =
Roundup(start_offset + length + readahead_size, alignment);
Expand Down Expand Up @@ -376,16 +382,16 @@ void FilePrefetchBuffer::ReadAheadSizeTuning(

uint64_t roundup_len = end_offset - start_offset;

CalculateOffsetAndLen(buf, alignment, start_offset, roundup_len, refit_tail,
chunk_len);
assert(roundup_len >= chunk_len);
PrepareBufferForRead(buf, alignment, start_offset, roundup_len, refit_tail,
aligned_useful_len);
assert(roundup_len >= aligned_useful_len);

// Update the buffer offset.
buf->offset_ = start_offset;
// Update the initial end offset of this buffer which will be the starting
// offset of next prefetch.
buf->initial_end_offset_ = initial_end_offset;
read_len = static_cast<size_t>(roundup_len - chunk_len);
read_len = static_cast<size_t>(roundup_len - aligned_useful_len);

UpdateReadAheadTrimmedStat((initial_end_offset - initial_start_offset),
(end_offset - start_offset));
Expand Down Expand Up @@ -447,20 +453,25 @@ Status FilePrefetchBuffer::HandleOverlappingData(
? next_buf->async_req_len_
: next_buf->CurrentSize();
uint64_t start_offset = next_buf->initial_end_offset_;
// Second buffer might be out of bound if first buffer already prefetched
// that data.

// If requested bytes - tmp_offset + tmp_length are in next buffer, freed
// buffer can go for further prefetching.
// If requested bytes are not in next buffer, next buffer has to go for sync
// call to get remaining requested bytes. In that it shouldn't go for async
// prefetching as async prefetching calculates offset based on previous
// buffer end offset and previous buffer has to go for sync prefetching.

if (tmp_offset + tmp_length <= next_buf->offset_ + second_size) {
AllocateBuffer();
BufferInfo* new_buf = GetLastBuffer();
size_t read_len = 0;
uint64_t end_offset = start_offset, chunk_len = 0;
uint64_t end_offset = start_offset, aligned_useful_len = 0;

ReadAheadSizeTuning(new_buf, /*read_curr_block=*/false,
/*refit_tail=*/false, next_buf->offset_ + second_size,
alignment,
/*length=*/0, readahead_size, start_offset,
end_offset, read_len, chunk_len);
end_offset, read_len, aligned_useful_len);
if (read_len > 0) {
s = ReadAsync(new_buf, opts, reader, read_len, start_offset);
if (!s.ok()) {
Expand Down Expand Up @@ -620,15 +631,16 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts,

// Go for ReadAsync and Read (if needed).
// offset and size alignment for first buffer with synchronous prefetching
uint64_t start_offset1 = offset, end_offset1 = 0, chunk_len1 = 0;
uint64_t start_offset1 = offset, end_offset1 = 0, aligned_useful_len1 = 0;
size_t read_len1 = 0;

// For length == 0, skip the synchronous prefetching. read_len1 will be 0.
if (length > 0) {
ReadAheadSizeTuning(buf, /*read_curr_block=*/true, /*refit_tail*/
true, start_offset1, alignment, length, readahead_size,
start_offset1, end_offset1, read_len1, chunk_len1);
UpdateStats(/*found_in_buffer=*/false, chunk_len1);
start_offset1, end_offset1, read_len1,
aligned_useful_len1);
UpdateStats(/*found_in_buffer=*/false, aligned_useful_len1);
} else {
UpdateStats(/*found_in_buffer=*/true, original_length);
}
Expand All @@ -643,7 +655,7 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts,
}

if (read_len1 > 0) {
s = Read(buf, opts, reader, read_len1, chunk_len1, start_offset1);
s = Read(buf, opts, reader, read_len1, aligned_useful_len1, start_offset1);
if (!s.ok()) {
AbortAllIOs();
FreeAllBuffers();
Expand Down Expand Up @@ -882,7 +894,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
size_t alignment = reader->file()->GetRequiredBufferAlignment();
size_t readahead_size = is_eligible_for_prefetching ? readahead_size_ / 2 : 0;
size_t offset_to_read = static_cast<size_t>(offset);
uint64_t start_offset1 = offset, end_offset1 = 0, chunk_len1 = 0;
uint64_t start_offset1 = offset, end_offset1 = 0, aligned_useful_len1 = 0;
size_t read_len1 = 0;

AllocateBufferIfEmpty();
Expand All @@ -904,16 +916,16 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
ReadAheadSizeTuning(buf, /*read_curr_block=*/true, /*refit_tail=*/false,
/*prev_buf_end_offset=*/start_offset1, alignment, n,
readahead_size, start_offset1, end_offset1, read_len1,
chunk_len1);
aligned_useful_len1);
} else {
// No alignment or extra prefetching.
start_offset1 = offset_to_read;
end_offset1 = offset_to_read + n;
roundup_len1 = end_offset1 - start_offset1;
CalculateOffsetAndLen(buf, alignment, start_offset1, roundup_len1, false,
chunk_len1);
assert(chunk_len1 == 0);
assert(roundup_len1 >= chunk_len1);
PrepareBufferForRead(buf, alignment, start_offset1, roundup_len1, false,
aligned_useful_len1);
assert(aligned_useful_len1 == 0);
assert(roundup_len1 >= aligned_useful_len1);
read_len1 = static_cast<size_t>(roundup_len1);
buf->offset_ = start_offset1;
}
Expand Down Expand Up @@ -954,12 +966,13 @@ Status FilePrefetchBuffer::PrefetchRemBuffers(const IOOptions& opts,
AllocateBuffer();
BufferInfo* new_buf = GetLastBuffer();

uint64_t end_offset2 = start_offset2, chunk_len2 = 0;
uint64_t end_offset2 = start_offset2, aligned_useful_len2 = 0;
size_t read_len2 = 0;
ReadAheadSizeTuning(
new_buf, /*read_curr_block=*/false, /*refit_tail=*/false,
/*prev_buf_end_offset=*/end_offset1, alignment, /*length=*/0,
readahead_size, start_offset2, end_offset2, read_len2, chunk_len2);
ReadAheadSizeTuning(new_buf, /*read_curr_block=*/false,
/*refit_tail=*/false,
/*prev_buf_end_offset=*/end_offset1, alignment,
/*length=*/0, readahead_size, start_offset2,
end_offset2, read_len2, aligned_useful_len2);

if (read_len2 > 0) {
TEST_SYNC_POINT("FilePrefetchBuffer::PrefetchAsync:ExtraPrefetching");
Expand Down
24 changes: 12 additions & 12 deletions file/file_prefetch_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,6 @@ struct BufferInfo {

IOHandleDeleter del_fn_ = nullptr;

// pos represents the address of this buffer in queue of BufferInfo. It's
// required during async callback to know which buffer to filled.
BufferInfo* pos_;

// initial_end_offset is used to keep track of the end offset of the buffer
// that was originally called. It's helpful in case of autotuning of readahead
// size when callback is made to BlockBasedTableIterator.
Expand All @@ -99,15 +95,20 @@ struct BufferInfo {
uint64_t initial_end_offset_ = 0;

bool IsDataBlockInBuffer(uint64_t offset, size_t length) {
assert(async_read_in_progress_ == false);
return (offset >= offset_ &&
offset + length <= offset_ + buffer_.CurrentSize());
}

bool IsOffsetInBuffer(uint64_t offset) {
assert(async_read_in_progress_ == false);
return (offset >= offset_ && offset < offset_ + buffer_.CurrentSize());
}

bool DoesBufferContainData() { return buffer_.CurrentSize() > 0; }
bool DoesBufferContainData() {
assert(async_read_in_progress_ == false);
return buffer_.CurrentSize() > 0;
}

bool IsBufferOutdated(uint64_t offset) {
return (!async_read_in_progress_ && DoesBufferContainData() &&
Expand Down Expand Up @@ -210,7 +211,6 @@ class FilePrefetchBuffer {
free_bufs_.resize(num_buffers_);
for (uint32_t i = 0; i < num_buffers_; i++) {
free_bufs_[i] = new BufferInfo();
free_bufs_[i]->pos_ = free_bufs_[i];
}
}

Expand All @@ -232,9 +232,9 @@ class FilePrefetchBuffer {
for (auto& buf : bufs_) {
if (buf->io_handle_ != nullptr) {
DestroyAndClearIOHandle(buf);
buf->async_read_in_progress_ = false;
buf->ClearBuffer();
}
buf->async_read_in_progress_ = false;
}
}

Expand Down Expand Up @@ -384,9 +384,9 @@ class FilePrefetchBuffer {
// Calculates roundoff offset and length to be prefetched based on alignment
// and data present in buffer_. It also allocates new buffer or refit tail if
// required.
void CalculateOffsetAndLen(BufferInfo* buf, size_t alignment, uint64_t offset,
size_t roundup_len, bool refit_tail,
uint64_t& chunk_len);
void PrepareBufferForRead(BufferInfo* buf, size_t alignment, uint64_t offset,
size_t roundup_len, bool refit_tail,
uint64_t& aligned_useful_len);

void AbortOutdatedIO(uint64_t offset);

Expand All @@ -403,7 +403,7 @@ class FilePrefetchBuffer {

Status Read(BufferInfo* buf, const IOOptions& opts,
RandomAccessFileReader* reader, uint64_t read_len,
uint64_t chunk_len, uint64_t start_offset);
uint64_t aligned_useful_len, uint64_t start_offset);

Status ReadAsync(BufferInfo* buf, const IOOptions& opts,
RandomAccessFileReader* reader, uint64_t read_len,
Expand Down Expand Up @@ -483,7 +483,7 @@ class FilePrefetchBuffer {
size_t alignment, size_t length,
size_t readahead_size, uint64_t& offset,
uint64_t& end_offset, size_t& read_len,
uint64_t& chunk_len);
uint64_t& aligned_useful_len);

void UpdateStats(bool found_in_buffer, size_t length_found) {
if (found_in_buffer) {
Expand Down

0 comments on commit 8eee7ec

Please sign in to comment.