diff --git a/config/confighttp/compression_test.go b/config/confighttp/compression_test.go index 2d0f322cef7..841687ab9ff 100644 --- a/config/confighttp/compression_test.go +++ b/config/confighttp/compression_test.go @@ -137,8 +137,6 @@ func TestHTTPClientCompression(t *testing.T) { message := fmt.Sprintf("unsupported compression type and level %s - %d", tt.encoding, tt.level) assert.Equal(t, message, err.Error()) return - } else { - require.NoError(t, err) } client, err := clientSettings.ToClient(context.Background(), componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings()) require.NoError(t, err) diff --git a/config/confighttp/compressor.go b/config/confighttp/compressor.go index 917f1db4fa5..750bda5795d 100644 --- a/config/confighttp/compressor.go +++ b/config/confighttp/compressor.go @@ -8,7 +8,6 @@ import ( "compress/gzip" "compress/zlib" "errors" - "fmt" "io" "sync" @@ -24,70 +23,70 @@ type writeCloserReset interface { Reset(w io.Writer) } -type compressorMap struct { - pools map[string]*compressor -} - type compressor struct { pool sync.Pool } +type compressorMap map[compressionMapKey]*compressor + +type compressionMapKey struct { + compressionType configcompression.Type + compressionParams configcompression.CompressionParams +} + var ( - compressorPools = &compressorMap{pools: make(map[string]*compressor)} - snappyCompressor = &compressor{} - lz4Compressor = &compressor{} - _ writeCloserReset = (*gzip.Writer)(nil) - _ writeCloserReset = (*snappy.Writer)(nil) - _ writeCloserReset = (*zstd.Encoder)(nil) - _ writeCloserReset = (*zlib.Writer)(nil) - _ writeCloserReset = (*lz4.Writer)(nil) + compressorPools = make(compressorMap) + compressorPoolsMu sync.Mutex ) // writerFactory defines writer field in CompressRoundTripper. // The validity of input is already checked when NewCompressRoundTripper was called in confighttp, func newCompressor(compressionType configcompression.Type, compressionParams configcompression.CompressionParams) (*compressor, error) { - mapKey := fmt.Sprintf("%s/%d", compressionType, compressionParams.Level) + compressorPoolsMu.Lock() + defer compressorPoolsMu.Unlock() + mapKey := compressionMapKey{compressionType, compressionParams} + c, ok := compressorPools[mapKey] + if ok { + return c, nil + } + + f, err := newWriteCloserResetFunc(compressionType, compressionParams) + if err != nil { + return nil, err + } + c = &compressor{pool: sync.Pool{New: func() any { return f() }}} + compressorPools[mapKey] = c + return c, nil +} + +func newWriteCloserResetFunc(compressionType configcompression.Type, compressionParams configcompression.CompressionParams) (func() writeCloserReset, error) { switch compressionType { case configcompression.TypeGzip: - gZipCompressor, gzipExists := compressorPools.pools[mapKey] - if gzipExists { - return gZipCompressor, nil - } - gZipCompressor = &compressor{} - gZipCompressor.pool = sync.Pool{New: func() any { w, _ := gzip.NewWriterLevel(nil, int(compressionParams.Level)); return w }} - compressorPools.pools[mapKey] = gZipCompressor - return gZipCompressor, nil + return func() writeCloserReset { + w, _ := gzip.NewWriterLevel(nil, int(compressionParams.Level)) + return w + }, nil case configcompression.TypeSnappy: - if snappyCompressor.pool.Get() == nil { - snappyCompressor.pool = sync.Pool{New: func() any { return snappy.NewBufferedWriter(nil) }} - return snappyCompressor, nil - } - return snappyCompressor, nil + return func() writeCloserReset { + return snappy.NewBufferedWriter(nil) + }, nil case configcompression.TypeZstd: - zstdCompressor, zstdExists := compressorPools.pools[mapKey] - compression := zstd.EncoderLevelFromZstd(int(compressionParams.Level)) - encoderLevel := zstd.WithEncoderLevel(compression) - if zstdExists { - return zstdCompressor, nil - } - zstdCompressor = &compressor{} - zstdCompressor.pool = sync.Pool{New: func() any { zw, _ := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1), encoderLevel); return zw }} - return zstdCompressor, nil + level := zstd.WithEncoderLevel(zstd.EncoderLevelFromZstd(int(compressionParams.Level))) + return func() writeCloserReset { + zw, _ := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1), level) + return zw + }, nil case configcompression.TypeZlib, configcompression.TypeDeflate: - zlibCompressor, zlibExists := compressorPools.pools[mapKey] - if zlibExists { - return zlibCompressor, nil - } - zlibCompressor = &compressor{} - zlibCompressor.pool = sync.Pool{New: func() any { w, _ := zlib.NewWriterLevel(nil, int(compressionParams.Level)); return w }} - compressorPools.pools[mapKey] = zlibCompressor - return zlibCompressor, nil + return func() writeCloserReset { + w, _ := zlib.NewWriterLevel(nil, int(compressionParams.Level)) + return w + }, nil case configcompression.TypeLz4: - if lz4Compressor.pool.Get() == nil { - lz4Compressor.pool = sync.Pool{New: func() any { lz := lz4.NewWriter(nil); _ = lz.Apply(lz4.ConcurrencyOption(1)); return lz }} - return lz4Compressor, nil - } - return lz4Compressor, nil + return func() writeCloserReset { + lz := lz4.NewWriter(nil) + _ = lz.Apply(lz4.ConcurrencyOption(1)) + return lz + }, nil } return nil, errors.New("unsupported compression type") }