From 020bd5a07f0e815730802f2f1cbbdef332ed3be5 Mon Sep 17 00:00:00 2001 From: vladimir vivien Date: Sat, 11 Jan 2025 22:29:05 +0000 Subject: [PATCH] Update/fix stream batch funcs --- operators/batch/funcs.go | 64 ++--- operators/batch/funcs_test.go | 27 +- operators/executor/batch_funcs.go | 4 +- stream/stream_batch_test.go | 427 +++++++++++++----------------- 4 files changed, 242 insertions(+), 280 deletions(-) diff --git a/operators/batch/funcs.go b/operators/batch/funcs.go index 65ceacc..1920985 100644 --- a/operators/batch/funcs.go +++ b/operators/batch/funcs.go @@ -275,6 +275,9 @@ func SortSliceByIndexFunc[SLICE ~[][]ITEM, ITEM cmp.Ordered](index int) api.Exec // The function returns a sorted []T func SortByStructFieldFunc[SLICE ~[]ITEM, ITEM any](name string) api.ExecFunction[SLICE, SLICE] { return func(ctx context.Context, param0 SLICE) SLICE { + if reflect.TypeOf(param0).Elem().Kind() != reflect.Struct { + panic("SortByStructField requires struct") + } slices.SortFunc(param0, func(i, j ITEM) int { fieldI := reflect.ValueOf(i).FieldByName(name) @@ -306,39 +309,38 @@ func SortByStructFieldFunc[SLICE ~[]ITEM, ITEM any](name string) api.ExecFunctio // []map[K]V - where K is a comparable type // // The function returns sorted []map[K] -func SortByMapKeyFunc[MAPS ~[]map[K]V, K comparable, V any](key K) api.ExecFunction[MAPS, MAPS] { +func SortByMapKeyFunc[MAPS ~[]map[K]V, K comparable, V cmp.Ordered](key K) api.ExecFunction[MAPS, MAPS] { return func(ctx context.Context, param0 MAPS) MAPS { slices.SortFunc(param0, func(i, j map[K]V) int { - itemI, okI := i[key] - itemJ, okJ := j[key] - - // Handle cases where the key might not exist in one or both maps - if !okI && !okJ { - return 0 // Both missing, consider equal - } - if !okI && okJ { - return 1 // Only j has the key, j is "greater" - } - if okI && !okJ { - return -1 // Only i has the key, i is "less" - } - - if okI && okJ { - valI := reflect.ValueOf(itemI) - valJ := reflect.ValueOf(itemJ) - switch { - case util.IsEqual(valI, valJ): - return 0 - case util.IsLess(valI, valJ): - return -1 - case util.IsMore(valI, valJ): - return 1 - default: - return 0 - } - } - - return 0 + return cmp.Compare(i[key], j[key]) + + // // Handle cases where the key might not exist in one or both maps + // if !okI && !okJ { + // return 0 // Both missing, consider equal + // } + // if !okI && okJ { + // return 1 // Only j has the key, j is "greater" + // } + // if okI && !okJ { + // return -1 // Only i has the key, i is "less" + // } + + // if okI && okJ { + // valI := reflect.ValueOf(itemI) + // valJ := reflect.ValueOf(itemJ) + // switch { + // case util.IsEqual(valI, valJ): + // return 0 + // case util.IsLess(valI, valJ): + // return -1 + // case util.IsMore(valI, valJ): + // return 1 + // default: + // return 0 + // } + // } + + // return 0 }) return param0 } diff --git a/operators/batch/funcs_test.go b/operators/batch/funcs_test.go index d0832e0..e35d817 100644 --- a/operators/batch/funcs_test.go +++ b/operators/batch/funcs_test.go @@ -4,6 +4,7 @@ import ( "cmp" "context" "fmt" + "slices" "testing" ) @@ -170,8 +171,7 @@ func TestSortSlice(t *testing.T) { op := SortSliceFunc[[]string]() data := []string{"Spirit", "Voyager", "BigFoot", "Enola", "Memphis"} sorted := op(context.TODO(), data) - - if sorted[0] != "BigFoot" && sorted[1] != "Enola" && sorted[2] != "Memphis" { + if !slices.IsSorted[[]string](sorted) { t.Fatal("unexpected sort order for result: ", sorted) } } @@ -186,7 +186,11 @@ func TestSortSliceByIndex(t *testing.T) { {"Memphis", "plane", "propeller"}, } sorted := op(context.TODO(), data) - if sorted[0][0] != "BigFoot" && sorted[1][0] != "Enola" && sorted[2][0] != "Memphis" { + var col []string + for _, row := range sorted { + col = append(col, row[0]) + } + if !slices.IsSorted[[]string](col) { t.Fatal("unexpected sort order for result: ", sorted) } } @@ -208,8 +212,12 @@ func TestSortByStructField(t *testing.T) { } sorted := op(context.TODO(), data) fmt.Printf("sorted: %v\n", sorted) - if sorted[0].Vehicle != "BigFoot" && sorted[1].Vehicle != "Enola" && sorted[2].Vehicle != "Memphis" { - t.Fatal("Unexpected sort order") + var col []string + for _, row := range sorted { + col = append(col, row.Vehicle) + } + if !slices.IsSorted[[]string](col) { + t.Fatal("unexpected sort order for result: ", sorted) } } @@ -223,9 +231,12 @@ func TestSortByMapKey(t *testing.T) { {"Vehicle": "Memphis", "Kind": "plane", "Engine": "propeller"}, } sorted := op(context.TODO(), data) - - if sorted[0]["Vehicle"] != "BigFoot" && sorted[1]["Vehicle"] != "Enola" && sorted[2]["Vehicle"] != "Memphis" { - t.Fatal("Unexpected sort order") + var col []string + for _, row := range sorted { + col = append(col, row["Vehicle"]) + } + if !slices.IsSorted[[]string](col) { + t.Fatal("unexpected sort order for result: ", sorted) } } diff --git a/operators/executor/batch_funcs.go b/operators/executor/batch_funcs.go index 2a972b0..e72c404 100644 --- a/operators/executor/batch_funcs.go +++ b/operators/executor/batch_funcs.go @@ -55,10 +55,10 @@ func SortByStructField[SLICE ~[]ITEM, ITEM any](name string) *FuncExecutor[SLICE return Execute(batch.SortByStructFieldFunc[SLICE](name)) } -func SortByMapKey[MAPS ~[]map[K]V, K comparable, V any](key K) *FuncExecutor[MAPS, MAPS] { +func SortByMapKey[MAPS ~[]map[K]V, K comparable, V cmp.Ordered](key K) *FuncExecutor[MAPS, MAPS] { return Execute(batch.SortByMapKeyFunc[MAPS](key)) } func SortWithFunc[SLICE ~[]ITEM, ITEM any](f func(i, j ITEM) int) *FuncExecutor[SLICE, SLICE] { return Execute(batch.SortWithFuncFunc[SLICE](f)) -} \ No newline at end of file +} diff --git a/stream/stream_batch_test.go b/stream/stream_batch_test.go index c9c255e..2d256da 100644 --- a/stream/stream_batch_test.go +++ b/stream/stream_batch_test.go @@ -1,7 +1,10 @@ package stream import ( + "cmp" "context" + "fmt" + "slices" "sync/atomic" "testing" "time" @@ -320,242 +323,188 @@ func TestStreamBatch_SumAll2D(t *testing.T) { } } -// func TestStreamBatch_Sort(t *testing.T) { -// src := emitters.Slice([]int{12742, 4879, 50724, 116464, 12104}) - -// snk := collectors.Slice() -// strm := From(src).Flow( -// batch.New(batch.TriggerNone[int](nil)), -// batch.SortSliceByIndexFunc[]() -// ).Into(snk) - -// select { -// case err := <-strm.Open(): -// if err != nil { -// t.Fatal(err) -// } -// result := snk.Get()[0].([]int) -// if result[0] != 4879 && result[1] != 12104 && result[2] != 12742 && result[3] != 50724 { -// t.Fatal("unexpected sort order", result) -// } -// case <-time.After(10 * time.Millisecond): -// t.Fatal("Took too long") -// } -// } - -// func TestStream_SortByKey(t *testing.T) { -// src := emitters.Slice([]map[string]string{ -// {"Name": "Mercury", "Diameter": "4879"}, -// {"Name": "Venus", "Diameter": "12104"}, -// {"Name": "Uranus", "Diameter": "50724"}, -// {"Name": "Saturn", "Diameter": "116464"}, -// {"Name": "Earth", "Diameter": "12742"}, -// }) - -// snk := collectors.Slice() -// strm := New(src).Batch().SortByKey("Name").Into(snk) - -// select { -// case err := <-strm.Open(): -// if err != nil { -// t.Fatal(err) -// } -// result := snk.Get()[0].([]map[string]string) -// if result[0]["Name"] != "Earth" && result[1]["Name"] != "Mercury" && result[2]["Name"] != "Saturn" { -// t.Fatal("unexpected sort order", result) -// } -// case <-time.After(10 * time.Millisecond): -// t.Fatal("Took too long") -// } -// } - -// func TestStream_SortByName(t *testing.T) { -// src := emitters.Slice([]struct { -// Name string -// Diameter int -// }{ -// {Name: "Mercury", Diameter: 4879}, -// {Name: "Venus", Diameter: 12104}, -// {Name: "Uranus", Diameter: 50724}, -// {Name: "Saturn", Diameter: 116464}, -// {Name: "Earth", Diameter: 12742}, -// }) - -// snk := collectors.Slice() -// strm := New(src).Batch().SortByName("Name").Into(snk) - -// select { -// case err := <-strm.Open(): -// if err != nil { -// t.Fatal(err) -// } -// result := snk.Get()[0].([]struct { -// Name string -// Diameter int -// }) -// if result[0].Name != "Earth" && result[1].Name != "Mercury" && result[2].Name != "Saturn" { -// t.Fatal("unexpected sort order", result) -// } -// case <-time.After(10 * time.Millisecond): -// t.Fatal("Took too long") -// } -// } - -// func TestStream_SortByPos(t *testing.T) { -// src := emitters.Slice([][]string{ -// {"Mercury", "4879"}, -// {"Venus", "12104"}, -// {"Uranus", "50724"}, -// {"Saturn", "116464"}, -// {"Earth", "12742"}, -// }) - -// snk := collectors.Slice() -// strm := New(src).Batch().SortByPos(0).Into(snk) - -// select { -// case err := <-strm.Open(): -// if err != nil { -// t.Fatal(err) -// } -// result := snk.Get()[0].([][]string) -// if result[0][0] != "Earth" && result[1][0] != "Mercury" && result[2][0] != "Saturn" { -// t.Fatal("unexpected sort order", result) -// } -// case <-time.After(10 * time.Millisecond): -// t.Fatal("Took too long") -// } -// } - -// func TestStream_SortWith(t *testing.T) { -// src := emitters.Slice([]string{ -// "Mercury", -// "Venus", -// "Uranus", -// "Saturn", -// "Earth", -// }) - -// snk := collectors.Slice() -// strm := New(src).Batch().SortWith(func(batch interface{}, i, j int) bool { -// items := batch.([]string) -// return items[i] < items[j] -// }) -// strm = strm.Into(snk) - -// select { -// case err := <-strm.Open(): -// if err != nil { -// t.Fatal(err) -// } -// result := snk.Get()[0].([]string) -// if result[0] != "Earth" && result[1] != "Mercury" && result[2] != "Saturn" { -// t.Fatal("unexpected sort order", result) -// } -// case <-time.After(10 * time.Millisecond): -// t.Fatal("Took too long") -// } -// } - -// func TestStream_Sum(t *testing.T) { -// src := emitters.Slice([]int{ -// 4879, -// 12104, -// 50724, -// 116464, -// 12742, -// }) - -// snk := collectors.Slice() -// strm := New(src).Batch().Sum().Into(snk) - -// select { -// case err := <-strm.Open(): -// if err != nil { -// t.Fatal(err) -// } -// result := snk.Get()[0].(float64) -// if result <= 116464 { -// t.Fatal("unexpected result:", result) -// } -// case <-time.After(10 * time.Millisecond): -// t.Fatal("Took too long") -// } -// } - -// func TestStream_SumByKey(t *testing.T) { -// src := emitters.Slice([]map[string]int{ -// {"Diameter": 4879}, -// {"Diameter": 12104}, -// {"Diameter": 50724}, -// {"Diameter": 116464}, -// {"Diameter": 12742}, -// }) - -// snk := collectors.Slice() -// strm := New(src).Batch().SumByKey("Diameter").Into(snk) - -// select { -// case err := <-strm.Open(): -// if err != nil { -// t.Fatal(err) -// } -// result := snk.Get()[0].([]map[interface{}]float64) -// t.Log("Sum calculated:", result[0]["Diameter"]) -// if result[0]["Diameter"] <= 116464 { -// t.Fatal("unexpected result:", result[0]) -// } -// case <-time.After(10 * time.Millisecond): -// t.Fatal("Took too long") -// } -// } - -// func TestStream_SumByName(t *testing.T) { -// src := emitters.Slice([]struct{ Diam int }{ -// {Diam: 4879}, -// {Diam: 12104}, -// {Diam: 50724}, -// {Diam: 116464}, -// {Diam: 12742}, -// }) - -// snk := collectors.Slice() -// strm := New(src).Batch().SumByName("Diam").Into(snk) - -// select { -// case err := <-strm.Open(): -// if err != nil { -// t.Fatal(err) -// } -// result := snk.Get()[0].([]map[string]float64) -// t.Log("Sum calculated:", result[0]) -// if result[0]["Diam"] <= 116464 { -// t.Fatal("unexpected result:", result[0]) -// } -// case <-time.After(10 * time.Millisecond): -// t.Fatal("Took too long") -// } -// } - -// func TestStream_SumByPos(t *testing.T) { -// src := emitters.Slice([][]int{ -// {4879, 12104, 50724, 116464, 12742}, -// {1, -1, 0, 1, 0}, -// }) - -// snk := collectors.Slice() -// strm := New(src).Batch().SumByPos(3).Into(snk) - -// select { -// case err := <-strm.Open(): -// if err != nil { -// t.Fatal(err) -// } -// result := snk.Get()[0].([]map[int]float64) -// if result[0][3] <= 116464 { -// t.Fatal("unexpected result:", result[0]) -// } -// case <-time.After(10 * time.Millisecond): -// t.Fatal("Took too long") -// } -// } +func TestStreamBatch_SortSlice(t *testing.T) { + src := emitters.Slice([]string{"Spirit", "Voyager", "BigFoot", "Enola", "Memphis"}) + + strm := From(src).Flow( + // Batch(with incoming type T), will output []T + batch.New(batch.TriggerNone[string]()), + // SortSlice receives []T, outputs sum of values float64 + executor.SortSlice[[]string](), + ) + + // setup sink to collect data + strm.Into(collectors.Func(func(sorted []string) error { + if !slices.IsSorted[[]string](sorted) { + t.Fatal("Data is not sorted: ", sorted) + } + return nil + })) + + select { + case err := <-strm.Open(context.Background()): + if err != nil { + t.Fatal(err) + } + case <-time.After(10 * time.Millisecond): + t.Fatal("Took too long") + } +} + +func TestStreamBatch_SortSliceByIndex(t *testing.T) { + src := emitters.Slice([][]string{ + {"Spirit", "plane", "propeller"}, + {"Voyager", "satellite", "gravitational"}, + {"BigFoot", "truck", "diesel"}, + {"Enola", "plane", "propeller"}, + {"Memphis", "plane", "propeller"}, + }) + + strm := From(src).Flow( + // Batch(with incoming type T), will output []T + batch.New(batch.TriggerNone[[]string]()), + // SortSlice receives []T, outputs sum of values float64 + executor.SortSliceByIndex[[][]string](0), + ) + + // setup sink to collect data + strm.Into(collectors.Func(func(sorted [][]string) error { + var col []string + for _, row := range sorted { + col = append(col, row[0]) + } + fmt.Println(col) + if !slices.IsSorted[[]string](col) { + t.Fatal("Data is not sorted: ", col) + } + return nil + })) + + select { + case err := <-strm.Open(context.Background()): + if err != nil { + t.Fatal(err) + } + case <-time.After(10 * time.Millisecond): + t.Fatal("Took too long") + } +} + +func TestStreamBatch_SortSliceByStructField(t *testing.T) { + type vehicle struct { + Vehicle, Kind, Engine string + Size int + } + + src := emitters.Slice([]vehicle{ + {"Spirit", "plane", "propeller", 12}, + {"Voyager", "satellite", "gravitational", 8}, + {"BigFoot", "truck", "diesel", 8}, + {"Enola", "plane", "propeller", 12}, + {"Memphis", "plane", "propeller", 48}, + }) + + strm := From(src).Flow( + // Batch(with incoming type T), will output []T + batch.New(batch.TriggerNone[vehicle]()), + // SortSlice receives []T, outputs sum of values float64 + executor.SortByStructField[[]vehicle]("Vehicle"), + ) + + // setup sink to collect data + strm.Into(collectors.Func(func(sorted []vehicle) error { + var col []string + for _, row := range sorted { + col = append(col, row.Vehicle) + } + fmt.Println(col) + if !slices.IsSorted[[]string](col) { + t.Fatal("Data is not sorted: ", col) + } + return nil + })) + + select { + case err := <-strm.Open(context.Background()): + if err != nil { + t.Fatal(err) + } + case <-time.After(10 * time.Millisecond): + t.Fatal("Took too long") + } +} + +func TestStreamBatch_SortByMapKey(t *testing.T) { + src := emitters.Slice([]map[string]string{ + {"Vehicle": "Spirit", "Kind": "plane", "Engine": "propeller"}, + {"Vehicle": "Voyager", "Kind": "satellite", "Engine": "gravitational"}, + {"Vehicle": "BigFoot", "Kind": "truck", "Engine": "diesel"}, + {"Vehicle": "Enola", "Kind": "plane", "Engine": "propeller"}, + {"Vehicle": "Memphis", "Kind": "plane", "Engine": "propeller"}, + }) + + strm := From(src).Flow( + // Batch(with incoming type T), will output []T + batch.New(batch.TriggerNone[map[string]string]()), + // SortSlice receives []T, outputs sum of values float64 + executor.SortByMapKey[[]map[string]string]("Vehicle"), + ) + + // setup sink to collect data + strm.Into(collectors.Func(func(sorted []map[string]string) error { + var col []string + for _, row := range sorted { + col = append(col, row["Vehicle"]) + } + fmt.Println(col) + if !slices.IsSorted[[]string](col) { + t.Fatal("Data is not sorted: ", col) + } + return nil + })) + + select { + case err := <-strm.Open(context.Background()): + if err != nil { + t.Fatal(err) + } + case <-time.After(10 * time.Millisecond): + t.Fatal("Took too long") + } +} + +func TestStreamBatch_SortWithFunc(t *testing.T) { + src := emitters.Slice([]string{ + "Spririt", + "Voyager", + "BigFoot", + "Enola", + "Memphis", + }) + + strm := From(src).Flow( + // Batch(with incoming type T), will output []T + batch.New(batch.TriggerNone[string]()), + // SortSlice receives []T, outputs sum of values float64 + executor.SortWithFunc[[]string](func(i, j string) int { + return cmp.Compare(i, j) + }), + ) + + // setup sink to collect data + strm.Into(collectors.Func(func(sorted []string) error { + if !slices.IsSorted[[]string](sorted) { + t.Fatal("Data is not sorted: ", sorted) + } + return nil + })) + + select { + case err := <-strm.Open(context.Background()): + if err != nil { + t.Fatal(err) + } + case <-time.After(10 * time.Millisecond): + t.Fatal("Took too long") + } +}