Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dicts: Add option to use the *_byReference variants at creation time #60

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ update-zstd:
cp zstd/lib/zstd_errors.h .

test:
CGO_ENABLED=1 GODEBUG=cgocheck=2 go test -v
CGO_ENABLED=1 GOEXPERIMENT=cgocheck2 go test -v
coxley marked this conversation as resolved.
Show resolved Hide resolved

bench:
CGO_ENABLED=1 go test -bench=.
75 changes: 74 additions & 1 deletion dict.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ static ZSTD_DDict* ZSTD_createDDict_wrapper(uintptr_t dictBuffer, size_t dictSiz
return ZSTD_createDDict((const void *)dictBuffer, dictSize);
}

static ZSTD_CDict* ZSTD_createCDict_byReference_wrapper(uintptr_t dictBuffer, size_t dictSize, int compressionLevel) {
return ZSTD_createCDict_byReference((const void *)dictBuffer, dictSize, compressionLevel);
}

static ZSTD_DDict* ZSTD_createDDict_byReference_wrapper(uintptr_t dictBuffer, size_t dictSize) {
return ZSTD_createDDict_byReference((const void *)dictBuffer, dictSize);
}

*/
import "C"

Expand Down Expand Up @@ -103,7 +111,9 @@ var buildDictLock sync.Mutex
//
// A single CDict may be re-used in concurrently running goroutines.
type CDict struct {
p *C.ZSTD_CDict
p *C.ZSTD_CDict
// Keep the underlying bytes alive from the GC's point of view (if passing by ref)
input []byte
compressionLevel int
}

Expand All @@ -114,6 +124,16 @@ func NewCDict(dict []byte) (*CDict, error) {
return NewCDictLevel(dict, DefaultCompressionLevel)
}

// NewCDictByRef creates a new CDict that shares the given dict
//
// Callers *must not* mutate the underlying array of 'dict'. Doing so will lead
// to undefined and undesirable behavior.
//
// Call Release when the returned dict is no longer used.
func NewCDictByRef(dict []byte) (*CDict, error) {
return NewCDictLevelByRef(dict, DefaultCompressionLevel)
}

// NewCDictLevel creates new CDict from the given dict
// using the given compressionLevel.
//
Expand All @@ -136,6 +156,32 @@ func NewCDictLevel(dict []byte, compressionLevel int) (*CDict, error) {
return cd, nil
}

// NewCDictLevelByRef creates a new CDict that shares the given dict using
// the given compressionLevel.
//
// Callers *must not* mutate the underlying array of 'dict'. Doing so will lead
// to undefined and undesirable behavior.
//
// Call Release when the returned dict is no longer used.
func NewCDictLevelByRef(dict []byte, compressionLevel int) (*CDict, error) {
if len(dict) == 0 {
return nil, fmt.Errorf("dict cannot be empty")
}

cd := &CDict{
p: C.ZSTD_createCDict_byReference_wrapper(
C.uintptr_t(uintptr(unsafe.Pointer(&dict[0]))),
C.size_t(len(dict)),
C.int(compressionLevel)),
input: dict,
compressionLevel: compressionLevel,
}
// Prevent from GC'ing of dict during CGO call above.
runtime.KeepAlive(dict)
runtime.SetFinalizer(cd, freeCDict)
return cd, nil
}

// Release releases resources occupied by cd.
//
// cd cannot be used after the release.
Expand All @@ -146,6 +192,7 @@ func (cd *CDict) Release() {
result := C.ZSTD_freeCDict(cd.p)
ensureNoError("ZSTD_freeCDict", result)
cd.p = nil
cd.input = nil
}

func freeCDict(v interface{}) {
Expand All @@ -157,6 +204,8 @@ func freeCDict(v interface{}) {
// A single DDict may be re-used in concurrently running goroutines.
type DDict struct {
p *C.ZSTD_DDict
// Keep the underlying bytes alive from the GC's point of view (if passing by ref)
input []byte
}

// NewDDict creates new DDict from the given dict.
Expand All @@ -178,6 +227,29 @@ func NewDDict(dict []byte) (*DDict, error) {
return dd, nil
}

// NewDDictByRef creates a new DDict that shares the given dict
//
// Callers *must not* mutate the underlying array of 'dict'. Doing so will lead
// to undefined and undesirable behavior.
//
// Call Release when the returned dict is no longer needed.
func NewDDictByRef(dict []byte) (*DDict, error) {
if len(dict) == 0 {
return nil, fmt.Errorf("dict cannot be empty")
}

dd := &DDict{
p: C.ZSTD_createDDict_byReference_wrapper(
C.uintptr_t(uintptr(unsafe.Pointer(&dict[0]))),
C.size_t(len(dict))),
input: dict,
}
// Prevent from GC'ing of dict during CGO call above.
runtime.KeepAlive(dict)
runtime.SetFinalizer(dd, freeDDict)
return dd, nil
}

// Release releases resources occupied by dd.
//
// dd cannot be used after the release.
Expand All @@ -189,6 +261,7 @@ func (dd *DDict) Release() {
result := C.ZSTD_freeDDict(dd.p)
ensureNoError("ZSTD_freeDDict", result)
dd.p = nil
dd.input = nil
}

func freeDDict(v interface{}) {
Expand Down
32 changes: 32 additions & 0 deletions dict_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,22 @@ func TestCDictCreateRelease(t *testing.T) {
}
}

func TestCDictByRefCreateRelease(t *testing.T) {
var samples [][]byte
for i := 0; i < 1000; i++ {
samples = append(samples, []byte(fmt.Sprintf("sample %d", i)))
}
dict := BuildDict(samples, 64*1024)

for i := 0; i < 10; i++ {
cd, err := NewCDictByRef(dict)
if err != nil {
t.Fatalf("cannot create dict: %s", err)
}
cd.Release()
}
}

func TestDDictCreateRelease(t *testing.T) {
var samples [][]byte
for i := 0; i < 1000; i++ {
Expand All @@ -59,6 +75,22 @@ func TestDDictCreateRelease(t *testing.T) {
}
}

func TestDDictByRefCreateRelease(t *testing.T) {
var samples [][]byte
for i := 0; i < 1000; i++ {
samples = append(samples, []byte(fmt.Sprintf("sample %d", i)))
}
dict := BuildDict(samples, 64*1024)

for i := 0; i < 10; i++ {
dd, err := NewDDictByRef(dict)
if err != nil {
t.Fatalf("cannot create dict: %s", err)
}
dd.Release()
}
}

func TestBuildDict(t *testing.T) {
for _, samplesCount := range []int{0, 1, 10, 100, 1000} {
t.Run(fmt.Sprintf("samples_%d", samplesCount), func(t *testing.T) {
Expand Down
61 changes: 61 additions & 0 deletions gozstd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,67 @@ func TestCompressDecompressDistinctConcurrentDicts(t *testing.T) {
}
}

func TestCompressDecompressDistinctConcurrentDictsByRef(t *testing.T) {
// Build multiple distinct dicts, sharing the underlying byte array
var cdicts []*CDict
var ddicts []*DDict
defer func() {
for _, cd := range cdicts {
cd.Release()
}
for _, dd := range ddicts {
dd.Release()
}
}()
for i := 0; i < 4; i++ {
var samples [][]byte
for j := 0; j < 1000; j++ {
sample := fmt.Sprintf("this is %d,%d sample", j, i)
samples = append(samples, []byte(sample))
}
dict := BuildDict(samples, 4*1024)
cd, err := NewCDictByRef(dict)
if err != nil {
t.Fatalf("cannot create CDict: %s", err)
}
cdicts = append(cdicts, cd)
dd, err := NewDDictByRef(dict)
if err != nil {
t.Fatalf("cannot create DDict: %s", err)
}
ddicts = append(ddicts, dd)
}

// Build data for the compression.
var bb bytes.Buffer
i := 0
for bb.Len() < 1e4 {
fmt.Fprintf(&bb, "%d sample line this is %d", bb.Len(), i)
i++
}
data := bb.Bytes()

// Run concurrent goroutines compressing/decompressing with distinct dicts.
ch := make(chan error, len(cdicts))
for i := 0; i < cap(ch); i++ {
go func(cd *CDict, dd *DDict) {
ch <- testCompressDecompressDistinctConcurrentDicts(cd, dd, data)
}(cdicts[i], ddicts[i])
}

// Wait for goroutines to finish.
for i := 0; i < cap(ch); i++ {
select {
case err := <-ch:
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
case <-time.After(time.Second):
t.Fatalf("timeout")
}
}
}

func testCompressDecompressDistinctConcurrentDicts(cd *CDict, dd *DDict, data []byte) error {
var compressedData, decompressedData []byte
for j := 0; j < 10; j++ {
Expand Down
84 changes: 74 additions & 10 deletions gozstd_timing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,33 @@ func BenchmarkDecompressDict(b *testing.B) {
b.Run(fmt.Sprintf("blockSize_%d", blockSize), func(b *testing.B) {
for _, level := range benchCompressionLevels {
b.Run(fmt.Sprintf("level_%d", level), func(b *testing.B) {
benchmarkDecompressDict(b, blockSize, level)
benchmarkDecompressDict(b, blockSize, level, false)
})
}
})
}
}

func benchmarkDecompressDict(b *testing.B, blockSize, level int) {
func BenchmarkDecompressDictByRef(b *testing.B) {
for _, blockSize := range benchBlockSizes {
b.Run(fmt.Sprintf("blockSize_%d", blockSize), func(b *testing.B) {
for _, level := range benchCompressionLevels {
b.Run(fmt.Sprintf("level_%d", level), func(b *testing.B) {
benchmarkDecompressDict(b, blockSize, level, true)
})
}
})
}
}

func benchmarkDecompressDict(b *testing.B, blockSize, level int, byReference bool) {
block := newBenchString(blockSize)
bd := getBenchDicts(level)
var bd *benchDicts
if byReference {
bd = getBenchDictsByRef(level)
} else {
bd = getBenchDicts(level)
}
src := CompressDict(nil, block, bd.cd)
b.Logf("compressionRatio: %f", float64(len(block))/float64(len(src)))
b.ReportAllocs()
Expand All @@ -54,16 +71,33 @@ func BenchmarkCompressDict(b *testing.B) {
b.Run(fmt.Sprintf("blockSize_%d", blockSize), func(b *testing.B) {
for _, level := range benchCompressionLevels {
b.Run(fmt.Sprintf("level_%d", level), func(b *testing.B) {
benchmarkCompressDict(b, blockSize, level)
benchmarkCompressDict(b, blockSize, level, false)
})
}
})
}
}

func BenchmarkCompressDictByRef(b *testing.B) {
for _, blockSize := range benchBlockSizes {
b.Run(fmt.Sprintf("blockSize_%d", blockSize), func(b *testing.B) {
for _, level := range benchCompressionLevels {
b.Run(fmt.Sprintf("level_%d", level), func(b *testing.B) {
benchmarkCompressDict(b, blockSize, level, true)
})
}
})
}
}

func benchmarkCompressDict(b *testing.B, blockSize, level int) {
func benchmarkCompressDict(b *testing.B, blockSize, level int, byReference bool) {
src := newBenchString(blockSize)
bd := getBenchDicts(level)
var bd *benchDicts
if byReference {
bd = getBenchDictsByRef(level)
} else {
bd = getBenchDicts(level)
}
b.ReportAllocs()
b.SetBytes(int64(len(src)))
b.ResetTimer()
Expand All @@ -89,15 +123,45 @@ func getBenchDicts(level int) *benchDicts {
return tmp
}

func getBenchDictsByRef(level int) *benchDicts {
benchDictsByRefLock.Lock()
tmp := benchDictsByRefMap[level]
if tmp == nil {
tmp = newBenchDictsByRef(level)
benchDictsByRefMap[level] = tmp
}
benchDictsByRefLock.Unlock()
return tmp
}

type benchDicts struct {
cd *CDict
dd *DDict
}

var benchDictsMap = make(map[int]*benchDicts)
var benchDictsLock sync.Mutex
var (
benchDictsMap = make(map[int]*benchDicts)
benchDictsLock sync.Mutex
benchDictsByRefMap = make(map[int]*benchDicts)
benchDictsByRefLock sync.Mutex
)

func newBenchDicts(level int) *benchDicts {
return createNewBenchDicts(NewCDictLevel, NewDDict, level)
}

func newBenchDictsByRef(level int) *benchDicts {
return createNewBenchDicts(NewCDictLevelByRef, NewDDictByRef, level)
}

// Make it easier to toggle between copying the underlying bytes on creation
// vs. sharing by reference.
type (
cdictFactory func(dict []byte, level int) (*CDict, error)
ddictFactory func(dict []byte) (*DDict, error)
)

func createNewBenchDicts(createCDict cdictFactory, createDDict ddictFactory, level int) *benchDicts {
var samples [][]byte
for i := 0; i < 300; i++ {
sampleLen := rand.Intn(300)
Expand All @@ -106,11 +170,11 @@ func newBenchDicts(level int) *benchDicts {
}

dict := BuildDict(samples, 32*1024)
cd, err := NewCDictLevel(dict, level)
cd, err := createCDict(dict, level)
if err != nil {
panic(fmt.Errorf("cannot create CDict: %s", err))
}
dd, err := NewDDict(dict)
dd, err := createDDict(dict)
if err != nil {
panic(fmt.Errorf("cannot create DDict: %s", err))
}
Expand Down