Skip to content

Commit

Permalink
Changing the map key for the compressor pools
Browse files Browse the repository at this point in the history
  • Loading branch information
rnishtala-sumo committed Jan 8, 2025
1 parent b937b07 commit 64722dd
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 51 deletions.
2 changes: 0 additions & 2 deletions config/confighttp/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,6 @@ func TestHTTPClientCompression(t *testing.T) {
message := fmt.Sprintf("unsupported compression type and level %s - %d", tt.encoding, tt.level)
assert.Equal(t, message, err.Error())
return
} else {
require.NoError(t, err)
}
client, err := clientSettings.ToClient(context.Background(), componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
require.NoError(t, err)
Expand Down
97 changes: 48 additions & 49 deletions config/confighttp/compressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"compress/gzip"
"compress/zlib"
"errors"
"fmt"
"io"
"sync"

Expand All @@ -24,70 +23,70 @@ type writeCloserReset interface {
Reset(w io.Writer)
}

type compressorMap struct {
pools map[string]*compressor
}

type compressor struct {
pool sync.Pool
}

type compressorMap map[compressionMapKey]*compressor

type compressionMapKey struct {
compressionType configcompression.Type
compressionParams configcompression.CompressionParams
}

var (
compressorPools = &compressorMap{pools: make(map[string]*compressor)}
snappyCompressor = &compressor{}
lz4Compressor = &compressor{}
_ writeCloserReset = (*gzip.Writer)(nil)
_ writeCloserReset = (*snappy.Writer)(nil)
_ writeCloserReset = (*zstd.Encoder)(nil)
_ writeCloserReset = (*zlib.Writer)(nil)
_ writeCloserReset = (*lz4.Writer)(nil)
compressorPools = make(compressorMap)
compressorPoolsMu sync.Mutex
)

// writerFactory defines writer field in CompressRoundTripper.
// The validity of input is already checked when NewCompressRoundTripper was called in confighttp,
func newCompressor(compressionType configcompression.Type, compressionParams configcompression.CompressionParams) (*compressor, error) {
mapKey := fmt.Sprintf("%s/%d", compressionType, compressionParams.Level)
compressorPoolsMu.Lock()
defer compressorPoolsMu.Unlock()
mapKey := compressionMapKey{compressionType, compressionParams}
c, ok := compressorPools[mapKey]
if ok {
return c, nil
}

f, err := newWriteCloserResetFunc(compressionType, compressionParams)
if err != nil {
return nil, err
}
c = &compressor{pool: sync.Pool{New: func() any { return f() }}}
compressorPools[mapKey] = c
return c, nil
}

func newWriteCloserResetFunc(compressionType configcompression.Type, compressionParams configcompression.CompressionParams) (func() writeCloserReset, error) {
switch compressionType {
case configcompression.TypeGzip:
gZipCompressor, gzipExists := compressorPools.pools[mapKey]
if gzipExists {
return gZipCompressor, nil
}
gZipCompressor = &compressor{}
gZipCompressor.pool = sync.Pool{New: func() any { w, _ := gzip.NewWriterLevel(nil, int(compressionParams.Level)); return w }}
compressorPools.pools[mapKey] = gZipCompressor
return gZipCompressor, nil
return func() writeCloserReset {
w, _ := gzip.NewWriterLevel(nil, int(compressionParams.Level))
return w
}, nil
case configcompression.TypeSnappy:
if snappyCompressor.pool.Get() == nil {
snappyCompressor.pool = sync.Pool{New: func() any { return snappy.NewBufferedWriter(nil) }}
return snappyCompressor, nil
}
return snappyCompressor, nil
return func() writeCloserReset {
return snappy.NewBufferedWriter(nil)
}, nil
case configcompression.TypeZstd:
zstdCompressor, zstdExists := compressorPools.pools[mapKey]
compression := zstd.EncoderLevelFromZstd(int(compressionParams.Level))
encoderLevel := zstd.WithEncoderLevel(compression)
if zstdExists {
return zstdCompressor, nil
}
zstdCompressor = &compressor{}
zstdCompressor.pool = sync.Pool{New: func() any { zw, _ := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1), encoderLevel); return zw }}
return zstdCompressor, nil
level := zstd.WithEncoderLevel(zstd.EncoderLevelFromZstd(int(compressionParams.Level)))
return func() writeCloserReset {
zw, _ := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1), level)
return zw
}, nil
case configcompression.TypeZlib, configcompression.TypeDeflate:
zlibCompressor, zlibExists := compressorPools.pools[mapKey]
if zlibExists {
return zlibCompressor, nil
}
zlibCompressor = &compressor{}
zlibCompressor.pool = sync.Pool{New: func() any { w, _ := zlib.NewWriterLevel(nil, int(compressionParams.Level)); return w }}
compressorPools.pools[mapKey] = zlibCompressor
return zlibCompressor, nil
return func() writeCloserReset {
w, _ := zlib.NewWriterLevel(nil, int(compressionParams.Level))
return w
}, nil
case configcompression.TypeLz4:
if lz4Compressor.pool.Get() == nil {
lz4Compressor.pool = sync.Pool{New: func() any { lz := lz4.NewWriter(nil); _ = lz.Apply(lz4.ConcurrencyOption(1)); return lz }}
return lz4Compressor, nil
}
return lz4Compressor, nil
return func() writeCloserReset {
lz := lz4.NewWriter(nil)
_ = lz.Apply(lz4.ConcurrencyOption(1))
return lz
}, nil
}
return nil, errors.New("unsupported compression type")
}
Expand Down

0 comments on commit 64722dd

Please sign in to comment.