diff --git a/op/stream/streamop.go b/op/stream/restream.go similarity index 75% rename from op/stream/streamop.go rename to op/stream/restream.go index 4dd3478..129aade 100644 --- a/op/stream/streamop.go +++ b/op/stream/restream.go @@ -2,7 +2,6 @@ package stream import ( "context" - "fmt" "reflect" "github.com/vladimirvivien/automi/api" @@ -11,39 +10,42 @@ import ( "github.com/vladimirvivien/automi/util" ) -// Operator is an operator takes streamed items of type -// map, array, or slice and unpacks and emits each item individually -// downstream. -type Operator struct { +// RestreamExecutor re-streams bulk items received from the input channel. +// Items are expected to be of types: +// +// []T, [...]T, or map[K]V +// +// Because of the multi-type support, this executor uses reflection. +type RestreamExecutor struct { input <-chan interface{} output chan interface{} logf api.LogFunc } // New creates a *Operator value -func New() *Operator { - r := new(Operator) +func New() *RestreamExecutor { + r := new(RestreamExecutor) r.output = make(chan interface{}, 1024) return r } // SetInput sets the input channel for the executor node -func (r *Operator) SetInput(in <-chan interface{}) { +func (r *RestreamExecutor) SetInput(in <-chan interface{}) { r.input = in } // GetOutput returns the output channel of the executer node -func (r *Operator) GetOutput() <-chan interface{} { +func (r *RestreamExecutor) GetOutput() <-chan interface{} { return r.output } // Exec is the execution starting point for the executor node. -func (r *Operator) Exec(ctx context.Context) (err error) { +func (r *RestreamExecutor) Exec(ctx context.Context) (err error) { r.logf = autoctx.GetLogFunc(ctx) util.Logfn(r.logf, "Stream operator starting") if r.input == nil { - err = fmt.Errorf("No input channel found") + err = api.ErrInputChannelUndefined return } @@ -87,6 +89,7 @@ func (r *Operator) Exec(ctx context.Context) (err error) { } } default: + // if items are not bundled, they are sent as is select { case r.output <- item: case <-exeCtx.Done(): diff --git a/op/stream/streamop_test.go b/op/stream/restream_test.go similarity index 99% rename from op/stream/streamop_test.go rename to op/stream/restream_test.go index ede60d4..aa24e54 100644 --- a/op/stream/streamop_test.go +++ b/op/stream/restream_test.go @@ -11,7 +11,6 @@ import ( func TestStreamOp_New(t *testing.T) { s := New() - if s.output == nil { t.Fatal("Missing output") }