Skip to content

Commit

Permalink
refactor to passing blockenv
Browse files Browse the repository at this point in the history
  • Loading branch information
EdwardX29 committed Jan 15, 2025
1 parent 3748221 commit ebcb6e0
Show file tree
Hide file tree
Showing 29 changed files with 146 additions and 185 deletions.
4 changes: 2 additions & 2 deletions cockroachkvs/cockroachkvs_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func benchmarkRandSeekInSST(
rp := sstable.MakeTrivialReaderProvider(reader)
iter, err := reader.NewPointIter(
ctx, sstable.NoTransforms, nil, nil, nil, sstable.NeverUseFilterBlock,
&stats, nil, rp)
block.ReadEnv{Stats: &stats, IterStats: nil}, rp)
require.NoError(b, err)
n := 0
for kv := iter.First(); kv != nil; kv = iter.Next() {
Expand All @@ -133,7 +133,7 @@ func benchmarkRandSeekInSST(
key := queryKeys[i%numQueryKeys]
iter, err := reader.NewPointIter(
ctx, sstable.NoTransforms, nil, nil, nil, sstable.NeverUseFilterBlock,
&stats, nil, rp)
block.ReadEnv{Stats: &stats, IterStats: nil}, rp)
if err != nil {
b.Fatal(err)
}
Expand Down
7 changes: 4 additions & 3 deletions external_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/sstable/block"
)

// NewExternalIter takes an input 2d array of sstable files which may overlap
Expand Down Expand Up @@ -164,14 +165,14 @@ func createExternalPointIter(ctx context.Context, it *Iterator) (topLevelIterato
pointIter, err = r.NewPointIter(
ctx, transforms, it.opts.LowerBound, it.opts.UpperBound, nil, /* BlockPropertiesFilterer */
sstable.NeverUseFilterBlock,
&it.stats.InternalStats, nil,
block.ReadEnv{Stats: &it.stats.InternalStats, IterStats: nil},
sstable.MakeTrivialReaderProvider(r))
if err != nil {
return nil, err
}
rangeDelIter, err = r.NewRawRangeDelIter(ctx, sstable.FragmentIterTransforms{
SyntheticSeqNum: sstable.SyntheticSeqNum(seqNum),
})
}, block.ReadEnv{Stats: &it.stats.InternalStats, IterStats: nil})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -220,7 +221,7 @@ func finishInitializingExternal(ctx context.Context, it *Iterator) error {
for _, r := range readers {
transforms := sstable.FragmentIterTransforms{SyntheticSeqNum: sstable.SyntheticSeqNum(seqNum)}
seqNum--
if rki, err := r.NewRawRangeKeyIter(ctx, transforms); err != nil {
if rki, err := r.NewRawRangeKeyIter(ctx, transforms, block.ReadEnv{Stats: &it.stats.InternalStats, IterStats: nil}); err != nil {
return err
} else if rki != nil {
rangeKeyIters = append(rangeKeyIters, rki)
Expand Down
29 changes: 22 additions & 7 deletions file_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,10 +454,10 @@ func (c *fileCacheShard) newIters(
var iters iterSet
var err error
if kinds.RangeKey() && file.HasRangeKeys {
iters.rangeKey, err = newRangeKeyIter(ctx, r, file, cr, opts.SpanIterOptions())
iters.rangeKey, err = newRangeKeyIter(ctx, r, file, cr, opts.SpanIterOptions(), internalOpts)
}
if kinds.RangeDeletion() && file.HasPointKeys && err == nil {
iters.rangeDeletion, err = newRangeDelIter(ctx, file, cr, dbOpts)
iters.rangeDeletion, err = newRangeDelIter(ctx, file, cr, dbOpts, internalOpts)
}
if kinds.Point() && err == nil {
iters.point, err = c.newPointIter(ctx, v, file, cr, opts, internalOpts, dbOpts)
Expand Down Expand Up @@ -574,11 +574,11 @@ func (c *fileCacheShard) newPointIter(
uint64(uintptr(unsafe.Pointer(r))), opts.Category)
}
if internalOpts.compaction {
iter, err = cr.NewCompactionIter(transforms, iterStatsAccum, &v.readerProvider, internalOpts.bufferPool)
iter, err = cr.NewCompactionIter(transforms, block.ReadEnv{IterStats: iterStatsAccum, BufferPool: internalOpts.bufferPool}, &v.readerProvider)
} else {
iter, err = cr.NewPointIter(
ctx, transforms, opts.GetLowerBound(), opts.GetUpperBound(), filterer, filterBlockSizeLimit,
internalOpts.stats, iterStatsAccum, &v.readerProvider)
block.ReadEnv{Stats: internalOpts.stats, IterStats: iterStatsAccum}, &v.readerProvider)
}
if err != nil {
return nil, err
Expand All @@ -600,11 +600,20 @@ func (c *fileCacheShard) newPointIter(
// sstable's range deletions. This function is for table-cache internal use
// only, and callers should use newIters instead.
func newRangeDelIter(
ctx context.Context, file *manifest.FileMetadata, cr sstable.CommonReader, dbOpts *fileCacheOpts,
ctx context.Context,
file *manifest.FileMetadata,
cr sstable.CommonReader,
dbOpts *fileCacheOpts,
internalOpts internalIterOpts,
) (keyspan.FragmentIterator, error) {
// NB: range-del iterator does not maintain a reference to the table, nor
// does it need to read from it after creation.
rangeDelIter, err := cr.NewRawRangeDelIter(ctx, file.FragmentIterTransforms())
readBlockEnv := block.ReadEnv{
Stats: internalOpts.stats,
IterStats: internalOpts.iterStatsAccumulator,
BufferPool: nil,
}
rangeDelIter, err := cr.NewRawRangeDelIter(ctx, file.FragmentIterTransforms(), readBlockEnv)
if err != nil {
return nil, err
}
Expand All @@ -630,6 +639,7 @@ func newRangeKeyIter(
file *fileMetadata,
cr sstable.CommonReader,
opts keyspan.SpanIterOptions,
internalOpts internalIterOpts,
) (keyspan.FragmentIterator, error) {
transforms := file.FragmentIterTransforms()
// Don't filter a table's range keys if the file contains RANGEKEYDELs.
Expand All @@ -646,7 +656,12 @@ func newRangeKeyIter(
}
}
// TODO(radu): wrap in an AssertBounds.
return cr.NewRawRangeKeyIter(ctx, transforms)
readBlockEnv := block.ReadEnv{
Stats: internalOpts.stats,
IterStats: internalOpts.iterStatsAccumulator,
BufferPool: nil,
}
return cr.NewRawRangeKeyIter(ctx, transforms, readBlockEnv)
}

// tableCacheShardReaderProvider implements sstable.ReaderProvider for a
Expand Down
5 changes: 3 additions & 2 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/pebble/objstorage"
"github.com/cockroachdb/pebble/objstorage/remote"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/sstable/block"
)

func sstableKeyCompare(userCmp Compare, a, b InternalKey) int {
Expand Down Expand Up @@ -378,7 +379,7 @@ func ingestLoad1(
}
}

iter, err := r.NewRawRangeDelIter(ctx, sstable.NoFragmentTransforms)
iter, err := r.NewRawRangeDelIter(ctx, sstable.NoFragmentTransforms, block.NoReadEnv)
if err != nil {
return nil, keyspan.Span{}, err
}
Expand Down Expand Up @@ -408,7 +409,7 @@ func ingestLoad1(

// Update the range-key bounds for the table.
{
iter, err := r.NewRawRangeKeyIter(ctx, sstable.NoFragmentTransforms)
iter, err := r.NewRawRangeKeyIter(ctx, sstable.NoFragmentTransforms, block.NoReadEnv)
if err != nil {
return nil, keyspan.Span{}, err
}
Expand Down
5 changes: 3 additions & 2 deletions level_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/pebble/internal/testkeys"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/sstable/block"
"github.com/cockroachdb/pebble/sstable/colblk"
"github.com/cockroachdb/pebble/vfs"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -103,9 +104,9 @@ func TestCheckLevelsCornerCases(t *testing.T) {

var fileNum FileNum
newIters :=
func(_ context.Context, file *manifest.FileMetadata, _ *IterOptions, _ internalIterOpts, _ iterKinds) (iterSet, error) {
func(_ context.Context, file *manifest.FileMetadata, _ *IterOptions, iio internalIterOpts, _ iterKinds) (iterSet, error) {
r := readers[file.FileNum]
rangeDelIter, err := r.NewRawRangeDelIter(context.Background(), sstable.NoFragmentTransforms)
rangeDelIter, err := r.NewRawRangeDelIter(context.Background(), sstable.NoFragmentTransforms, block.ReadEnv{Stats: iio.stats})
if err != nil {
return iterSet{}, err
}
Expand Down
6 changes: 3 additions & 3 deletions level_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/pebble/internal/testkeys"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/sstable/block"
"github.com/cockroachdb/pebble/vfs"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -184,15 +185,14 @@ func (lt *levelIterTest) newIters(
if kinds.Point() {
iter, err := lt.readers[file.FileNum].NewPointIter(
ctx, transforms,
opts.LowerBound, opts.UpperBound, nil, sstable.AlwaysUseFilterBlock, iio.stats,
nil, sstable.MakeTrivialReaderProvider(lt.readers[file.FileNum]))
opts.LowerBound, opts.UpperBound, nil, sstable.AlwaysUseFilterBlock, block.ReadEnv{Stats: iio.stats, IterStats: nil}, sstable.MakeTrivialReaderProvider(lt.readers[file.FileNum]))
if err != nil {
return iterSet{}, errors.CombineErrors(err, set.CloseAll())
}
set.point = iter
}
if kinds.RangeDeletion() {
rangeDelIter, err := lt.readers[file.FileNum].NewRawRangeDelIter(context.Background(), file.FragmentIterTransforms())
rangeDelIter, err := lt.readers[file.FileNum].NewRawRangeDelIter(context.Background(), file.FragmentIterTransforms(), block.NoReadEnv)
if err != nil {
return iterSet{}, errors.CombineErrors(err, set.CloseAll())
}
Expand Down
10 changes: 5 additions & 5 deletions merging_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/pebble/internal/testutils/indenttree"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/sstable/block"
"github.com/cockroachdb/pebble/vfs"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -169,7 +170,7 @@ func TestMergingIterDataDriven(t *testing.T) {
var err error
r := readers[file.FileNum]
if kinds.RangeDeletion() {
set.rangeDeletion, err = r.NewRawRangeDelIter(context.Background(), sstable.NoFragmentTransforms)
set.rangeDeletion, err = r.NewRawRangeDelIter(context.Background(), sstable.NoFragmentTransforms, block.ReadEnv{Stats: iio.stats})
if err != nil {
return iterSet{}, errors.CombineErrors(err, set.CloseAll())
}
Expand All @@ -178,8 +179,7 @@ func TestMergingIterDataDriven(t *testing.T) {
set.point, err = r.NewPointIter(
context.Background(),
sstable.NoTransforms,
opts.GetLowerBound(), opts.GetUpperBound(), nil, sstable.AlwaysUseFilterBlock, iio.stats,
nil, sstable.MakeTrivialReaderProvider(r))
opts.GetLowerBound(), opts.GetUpperBound(), nil, sstable.AlwaysUseFilterBlock, block.ReadEnv{Stats: iio.stats, IterStats: nil}, sstable.MakeTrivialReaderProvider(r))
if err != nil {
return iterSet{}, errors.CombineErrors(err, set.CloseAll())
}
Expand Down Expand Up @@ -671,14 +671,14 @@ func buildMergingIter(readers [][]*sstable.Reader, levelSlices []manifest.LevelS
levelIndex := i
level := len(readers) - 1 - i
newIters := func(
_ context.Context, file *manifest.FileMetadata, opts *IterOptions, _ internalIterOpts, _ iterKinds,
_ context.Context, file *manifest.FileMetadata, opts *IterOptions, iio internalIterOpts, _ iterKinds,
) (iterSet, error) {
iter, err := readers[levelIndex][file.FileNum].NewIter(
sstable.NoTransforms, opts.LowerBound, opts.UpperBound)
if err != nil {
return iterSet{}, err
}
rdIter, err := readers[levelIndex][file.FileNum].NewRawRangeDelIter(context.Background(), sstable.NoFragmentTransforms)
rdIter, err := readers[levelIndex][file.FileNum].NewRawRangeDelIter(context.Background(), sstable.NoFragmentTransforms, block.ReadEnv{Stats: iio.stats})
if err != nil {
iter.Close()
return iterSet{}, err
Expand Down
5 changes: 3 additions & 2 deletions metamorphic/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/pebble/objstorage"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/sstable/block"
"github.com/cockroachdb/pebble/vfs"
)

Expand Down Expand Up @@ -269,7 +270,7 @@ func openExternalObj(
pointIter, err = reader.NewIter(sstable.NoTransforms, start, end)
panicIfErr(err)

rangeDelIter, err = reader.NewRawRangeDelIter(context.Background(), sstable.NoFragmentTransforms)
rangeDelIter, err = reader.NewRawRangeDelIter(context.Background(), sstable.NoFragmentTransforms, block.NoReadEnv)
panicIfErr(err)
if rangeDelIter != nil {
rangeDelIter = keyspan.Truncate(
Expand All @@ -279,7 +280,7 @@ func openExternalObj(
)
}

rangeKeyIter, err = reader.NewRawRangeKeyIter(context.Background(), sstable.NoFragmentTransforms)
rangeKeyIter, err = reader.NewRawRangeKeyIter(context.Background(), sstable.NoFragmentTransforms, block.NoReadEnv)
panicIfErr(err)
if rangeKeyIter != nil {
rangeKeyIter = keyspan.Truncate(
Expand Down
5 changes: 3 additions & 2 deletions replay/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/pebble/internal/rangekey"
"github.com/cockroachdb/pebble/record"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/sstable/block"
"github.com/cockroachdb/pebble/vfs"
"golang.org/x/perf/benchfmt"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -1022,7 +1023,7 @@ func loadFlushedSSTableKeys(
}

// Load all the range tombstones.
if iter, err := r.NewRawRangeDelIter(context.Background(), sstable.NoFragmentTransforms); err != nil {
if iter, err := r.NewRawRangeDelIter(context.Background(), sstable.NoFragmentTransforms, block.NoReadEnv); err != nil {
return err
} else if iter != nil {
defer iter.Close()
Expand All @@ -1045,7 +1046,7 @@ func loadFlushedSSTableKeys(
}

// Load all the range keys.
if iter, err := r.NewRawRangeKeyIter(context.Background(), sstable.NoFragmentTransforms); err != nil {
if iter, err := r.NewRawRangeKeyIter(context.Background(), sstable.NoFragmentTransforms, block.NoReadEnv); err != nil {
return err
} else if iter != nil {
defer iter.Close()
Expand Down
2 changes: 1 addition & 1 deletion sstable/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ type ReadEnv struct {
// iterator is closed. In the important code paths, the CategoryStatsCollector
// is managed by the fileCacheContainer.
Stats *base.InternalIteratorStats
IterStats *ChildIterStatsAccumulator
IterStats IterStatsAccumulator

// BufferPool is not-nil if we read blocks into a buffer pool and not into the
// cache. This is used during compactions.
Expand Down
49 changes: 12 additions & 37 deletions sstable/block/category_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,12 @@ type CategoryStats struct {
BlockReadDuration time.Duration
}

func (s *CategoryStats) aggregate(a CategoryStats) {
s.BlockBytes += a.BlockBytes
s.BlockBytesInCache += a.BlockBytesInCache
s.BlockReadDuration += a.BlockReadDuration
func (s *CategoryStats) aggregate(
blockBytes, blockBytesInCache uint64, blockReadDuration time.Duration,
) {
s.BlockBytes += blockBytes
s.BlockBytesInCache += blockBytesInCache
s.BlockReadDuration += blockReadDuration
}

// CategoryStatsAggregate is the aggregate for the given category.
Expand All @@ -176,9 +178,11 @@ type categoryStatsWithMu struct {
}

// Accumulate implements the IterStatsAccumulator interface.
func (c *categoryStatsWithMu) Accumulate(stats CategoryStats) {
func (c *categoryStatsWithMu) Accumulate(
blockBytes, blockBytesInCache uint64, blockReadDuration time.Duration,
) {
c.mu.Lock()
c.stats.aggregate(stats)
c.stats.aggregate(blockBytes, blockBytesInCache, blockReadDuration)
c.mu.Unlock()
}

Expand Down Expand Up @@ -210,7 +214,7 @@ func (s *shardedCategoryStats) getStats() CategoryStatsAggregate {
}
for i := range s.shards {
s.shards[i].mu.Lock()
agg.CategoryStats.aggregate(s.shards[i].stats)
agg.CategoryStats.aggregate(s.shards[i].stats.BlockBytes, s.shards[i].stats.BlockBytesInCache, s.shards[i].stats.BlockReadDuration)
s.shards[i].mu.Unlock()
}
return agg
Expand Down Expand Up @@ -250,34 +254,5 @@ func (c *CategoryStatsCollector) GetStats() []CategoryStatsAggregate {

type IterStatsAccumulator interface {
// Accumulate accumulates the provided stats.
Accumulate(cas CategoryStats)
}

// ChildIterStatsAccumulator is a helper for a sstable iterator to accumulate
// stats, which are reported to the CategoryStatsCollector when the accumulator
// is closed.
type ChildIterStatsAccumulator struct {
stats CategoryStats
parent IterStatsAccumulator
}

// Init initializes the accumulator with the parent accumulator.
func (a *ChildIterStatsAccumulator) Init(parent IterStatsAccumulator) {
a.parent = parent
}

// Accumulate accumulates the provided stats.
func (a *ChildIterStatsAccumulator) Accumulate(
blockBytes, blockBytesInCache uint64, blockReadDuration time.Duration,
) {
a.stats.BlockBytes += blockBytes
a.stats.BlockBytesInCache += blockBytesInCache
a.stats.BlockReadDuration += blockReadDuration
}

// Close closes the accumulator, accumulating the stats to the parent.
func (a *ChildIterStatsAccumulator) Close() {
if a.parent != nil {
a.parent.Accumulate(a.stats)
}
Accumulate(blockBytes, blockBytesInCache uint64, blockReadDuration time.Duration)
}
6 changes: 2 additions & 4 deletions sstable/block_property_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -988,8 +988,7 @@ func TestBlockProperties(t *testing.T) {
}
iter, err := r.NewPointIter(
context.Background(),
NoTransforms, lower, upper, filterer, NeverUseFilterBlock, &stats,
nil, MakeTrivialReaderProvider(r))
NoTransforms, lower, upper, filterer, NeverUseFilterBlock, block.ReadEnv{Stats: &stats, IterStats: nil}, MakeTrivialReaderProvider(r))
if err != nil {
return err.Error()
}
Expand Down Expand Up @@ -1072,8 +1071,7 @@ func TestBlockProperties_BoundLimited(t *testing.T) {
}
iter, err := r.NewPointIter(
context.Background(),
NoTransforms, lower, upper, filterer, NeverUseFilterBlock, &stats,
nil, MakeTrivialReaderProvider(r))
NoTransforms, lower, upper, filterer, NeverUseFilterBlock, block.ReadEnv{Stats: &stats, IterStats: nil}, MakeTrivialReaderProvider(r))
if err != nil {
return err.Error()
}
Expand Down
Loading

0 comments on commit ebcb6e0

Please sign in to comment.