Skip to content

Commit

Permalink
op/restream update
Browse files Browse the repository at this point in the history
  • Loading branch information
vladimirvivien committed Jan 4, 2025
1 parent 62541f1 commit f5c3f4e
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 12 deletions.
25 changes: 14 additions & 11 deletions op/stream/streamop.go → op/stream/restream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package stream

import (
"context"
"fmt"
"reflect"

"github.com/vladimirvivien/automi/api"
Expand All @@ -11,39 +10,42 @@ import (
"github.com/vladimirvivien/automi/util"
)

// Operator is an operator takes streamed items of type
// map, array, or slice and unpacks and emits each item individually
// downstream.
type Operator struct {
// RestreamExecutor re-streams bulk items received from the input channel.
// Items are expected to be of types:
//
// []T, [...]T, or map[K]V
//
// Because of the multi-type support, this executor uses reflection.
type RestreamExecutor struct {
input <-chan interface{}
output chan interface{}
logf api.LogFunc
}

// New creates a *Operator value
func New() *Operator {
r := new(Operator)
func New() *RestreamExecutor {
r := new(RestreamExecutor)
r.output = make(chan interface{}, 1024)
return r
}

// SetInput sets the input channel for the executor node
func (r *Operator) SetInput(in <-chan interface{}) {
func (r *RestreamExecutor) SetInput(in <-chan interface{}) {
r.input = in
}

// GetOutput returns the output channel of the executer node
func (r *Operator) GetOutput() <-chan interface{} {
func (r *RestreamExecutor) GetOutput() <-chan interface{} {
return r.output
}

// Exec is the execution starting point for the executor node.
func (r *Operator) Exec(ctx context.Context) (err error) {
func (r *RestreamExecutor) Exec(ctx context.Context) (err error) {
r.logf = autoctx.GetLogFunc(ctx)
util.Logfn(r.logf, "Stream operator starting")

if r.input == nil {
err = fmt.Errorf("No input channel found")
err = api.ErrInputChannelUndefined
return
}

Expand Down Expand Up @@ -87,6 +89,7 @@ func (r *Operator) Exec(ctx context.Context) (err error) {
}
}
default:
// if items are not bundled, they are sent as is
select {
case r.output <- item:
case <-exeCtx.Done():
Expand Down
1 change: 0 additions & 1 deletion op/stream/streamop_test.go → op/stream/restream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

func TestStreamOp_New(t *testing.T) {
s := New()

if s.output == nil {
t.Fatal("Missing output")
}
Expand Down

0 comments on commit f5c3f4e

Please sign in to comment.