Skip to content

Commit

Permalink
Revising sources/sinks
Browse files Browse the repository at this point in the history
  • Loading branch information
vladimirvivien committed Jan 1, 2025
1 parent 579140a commit f601257
Show file tree
Hide file tree
Showing 19 changed files with 134 additions and 247 deletions.
8 changes: 5 additions & 3 deletions api/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package api
import "errors"

var (
ErrSourceEmpty = errors.New("source is empty")
ErrStreamEmpty = errors.New("stream is empty")
ErrSinkEmpty = errors.New("sink is empty")
ErrSourceEmpty = errors.New("source is empty")
ErrStreamEmpty = errors.New("stream is empty")
ErrSinkEmpty = errors.New("sink is empty")
ErrInputChannelUndefined = errors.New("undefined input channel")
ErrSourceUndefined = errors.New("source undefined")
)
89 changes: 19 additions & 70 deletions collectors/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,67 +3,66 @@ package collectors
import (
"context"
"encoding/csv"
"errors"
"fmt"
"io"
"os"

"github.com/vladimirvivien/automi/api"
autoctx "github.com/vladimirvivien/automi/api/context"
"github.com/vladimirvivien/automi/util"
)

// CsvCollector represents a node that can collect items streamed as
// type []string and write them as comma-separated values to the specified
// io.Writer or file.
type CsvCollector struct {
// CSVSink collects streamed items []string and uses an io.Writer to write
// them to a subsequent sink resource.
type CSVSink struct {
delimChar rune // delimiter character
headers []string // optional csv headers

snkParam interface{}
file *os.File
input <-chan interface{}
input <-chan []string
snkWriter io.Writer
csvWriter *csv.Writer
logf api.LogFunc
errf api.ErrorFunc
}

// CSV creates a *CsvCollector value
func CSV(sink interface{}) *CsvCollector {
csv := &CsvCollector{
snkParam: sink,
// CSV creates a *CSVSource
func CSV(writer io.Writer) *CSVSink {
csv := &CSVSink{
delimChar: ',',
snkWriter: writer,
}
return csv
}

// DelimChar sets the character to use as delimiter for the
// collected csv items.
func (c *CsvCollector) DelimChar(char rune) *CsvCollector {
func (c *CSVSink) DelimChar(char rune) *CSVSink {
c.delimChar = char
return c
}

// Headers sets the header columns for the CSV items collected
func (c *CsvCollector) Headers(headers []string) *CsvCollector {
func (c *CSVSink) Headers(headers []string) *CSVSink {
c.headers = headers
return c
}

// SetInput sets the channel input
func (c *CsvCollector) SetInput(in <-chan interface{}) {
func (c *CSVSink) SetInput(in <-chan []string) {
c.input = in
}

// init initializes the components
func (c *CsvCollector) init(ctx context.Context) error {
func (c *CSVSink) init(ctx context.Context) error {
//extract log function
c.logf = autoctx.GetLogFunc(ctx)
c.errf = autoctx.GetErrFunc(ctx)

if c.snkWriter == nil {
return api.ErrSourceUndefined
}

if c.input == nil {
return fmt.Errorf("Input attribute not set")
return api.ErrInputChannelUndefined
}

util.Logfn(c.logf, "Opening csv collector")
Expand All @@ -73,10 +72,6 @@ func (c *CsvCollector) init(ctx context.Context) error {
c.delimChar = ','
}

if err := c.setupSink(); err != nil {
return err
}

c.csvWriter = csv.NewWriter(c.snkWriter)
c.csvWriter.Comma = c.delimChar

Expand All @@ -90,7 +85,7 @@ func (c *CsvCollector) init(ctx context.Context) error {
}

// Open is the starting point that opens the sink for data to start flowing
func (c *CsvCollector) Open(ctx context.Context) <-chan error {
func (c *CSVSink) Open(ctx context.Context) <-chan error {
result := make(chan error)
if err := c.init(ctx); err != nil {
go func() { result <- err }()
Expand All @@ -109,15 +104,6 @@ func (c *CsvCollector) Open(ctx context.Context) <-chan error {
return
}

// close file
if c.file != nil {
if e := c.file.Close(); e != nil {
util.Logfn(c.logf, e)
autoctx.Err(c.errf, api.Error(e.Error()))
go func() { result <- e }()
return
}
}
close(result)
}()

Expand All @@ -127,16 +113,8 @@ func (c *CsvCollector) Open(ctx context.Context) <-chan error {
if !opened {
return
}
data, ok := item.([]string)

if !ok { // bad situation, fail fast
msg := fmt.Sprintf("expecting []string, got unexpected type %T", data)
util.Logfn(c.logf, msg)
autoctx.Err(c.errf, api.Error(msg))
panic(msg)
}

if e := c.csvWriter.Write(data); e != nil {
if e := c.csvWriter.Write(item); e != nil {
//TODO distinguish error values for better handling
perr := fmt.Errorf("Unable to write record to file: %s ", e)
util.Logfn(c.logf, perr)
Expand All @@ -160,32 +138,3 @@ func (c *CsvCollector) Open(ctx context.Context) <-chan error {

return result
}

func (c *CsvCollector) setupSink() error {
if c.snkParam == nil {
return errors.New("missing CSV sink")
}
if wtr, ok := c.snkParam.(io.Writer); ok {
util.Logfn(c.logf, "CSV sink to io.Writer")
c.snkWriter = wtr
}

if wtr, ok := c.snkParam.(*os.File); ok {
util.Logfn(c.logf, fmt.Sprintf("CSV sink to file %s", wtr.Name()))
c.snkWriter = wtr
}

if wtr, ok := c.snkParam.(string); ok {
f, err := os.Create(wtr)
if err != nil {
return err
}
util.Logfn(c.logf, fmt.Sprintf("CSV sink to file %s", wtr))
c.snkWriter = f
c.file = f // so we can close it
}
if c.snkWriter == nil {
return errors.New("invalid CSV sink")
}
return nil
}
25 changes: 12 additions & 13 deletions collectors/csv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,19 @@ package collectors
import (
"bytes"
"context"
"io/ioutil"
"os"
"strings"
"testing"
"time"

"github.com/vladimirvivien/automi/testutil"
"github.com/vladimirvivien/gexe"
)

func TestCsvCollector_New(t *testing.T) {
func TestNewCSVSource(t *testing.T) {
data := bytes.NewBufferString("")
csv := CSV(data).DelimChar('|').Headers([]string{"a", "b"})
if csv.snkParam == nil {
t.Fatal("CsvSnk not setting input writer")
}

if csv.delimChar != '|' {
t.Fatal("csv collectors not setting delim char")
}
Expand All @@ -26,8 +24,8 @@ func TestCsvCollector_New(t *testing.T) {
}
}

func TestCsvCollector_IO(t *testing.T) {
in := make(chan interface{})
func TestCSVSinkToWriter(t *testing.T) {
in := make(chan []string)
go func() {
in <- []string{"Christophe", "Petion", "Dessaline"}
in <- []string{"Toussaint", "Guerrier", "Caiman"}
Expand All @@ -54,21 +52,22 @@ func TestCsvCollector_IO(t *testing.T) {
}
}

func TestCsvCollector_File(t *testing.T) {
in := make(chan interface{})
func TestCSVSinkToFile(t *testing.T) {
filePath := "./testdata/csv-test.out"
in := make(chan []string)
go func() {
in <- []string{"Christophe", "Petion", "Dessaline"}
in <- []string{"Toussaint", "Guerrier", "Caiman"}
close(in)
}()

f, err := os.Create("./csv-test.out")
f, err := os.Create(filePath)
if err != nil {
t.Fatal(err)
}
defer func() {
f.Close()
os.Remove("./csv-test.out")
os.Remove(filePath)
}()
csv := CSV(f)
csv.SetInput(in)
Expand All @@ -84,7 +83,7 @@ func TestCsvCollector_File(t *testing.T) {
}

expected := "Christophe,Petion,Dessaline\nToussaint,Guerrier,Caiman"
data, err := ioutil.ReadFile("./csv-test.out")
data := gexe.FileRead(filePath).String()
if err != nil {
t.Fatal(err)
}
Expand All @@ -104,7 +103,7 @@ func BenchmarkCsvCollector(b *testing.B) {
}
return N - int(float64(0.5)*float64(N))
}()
in := make(chan interface{}, chanSize)
in := make(chan []string, chanSize)
b.Log("Created chan size ", chanSize)
go func() {
in <- []string{"col1", "col2", "col3"}
Expand Down
33 changes: 12 additions & 21 deletions collectors/func.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,49 +9,40 @@ import (
"github.com/vladimirvivien/automi/util"
)

// CollectorFunc is a function used to colllect
// incoming stream data. It can be used as a
// stream sink.
type CollectorFunc func(interface{}) error

// FuncCollector is a collector that uses a function
// to collect data. The specified function must be
// of type:
// CollectorFunc
type FuncCollector struct {
input <-chan interface{}
// FuncSink uses a function to collect streamed items of type T.
type FuncSink[T any] struct {
input <-chan T
logf api.LogFunc
errf api.ErrorFunc
f CollectorFunc
f func(T) error
}

// Func creates a new value *FuncCollector that
// will use the specified function parameter to
// collect streaming data.
func Func(f CollectorFunc) *FuncCollector {
return &FuncCollector{f: f}
// Func creates a new *FuncSink using the specified function
// as its parameter.
func Func[T any](f func(T)error) *FuncSink[T] {
return &FuncSink[T]{f: f}
}

// SetInput sets the channel input
func (c *FuncCollector) SetInput(in <-chan interface{}) {
func (c *FuncSink[T]) SetInput(in <-chan T) {
c.input = in
}

// Open is the starting point that starts the collector
func (c *FuncCollector) Open(ctx context.Context) <-chan error {
func (c *FuncSink[T]) Open(ctx context.Context) <-chan error {
c.logf = autoctx.GetLogFunc(ctx)
c.errf = autoctx.GetErrFunc(ctx)

util.Logfn(c.logf, "Opening func collector")
result := make(chan error)

if c.input == nil {
go func() { result <- errors.New("Func collector missing input") }()
go func() { result <- api.ErrInputChannelUndefined }()
return result
}

if c.f == nil {
err := errors.New("Func collector missing function")
err := errors.New("missing function")
util.Logfn(c.logf, err)
autoctx.Err(c.errf, api.Error(err.Error()))
go func() { result <- err }()
Expand Down
10 changes: 5 additions & 5 deletions collectors/func_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (
"time"
)

func TestCollector_Func(t *testing.T) {
func TestFuncSink(t *testing.T) {
count := 0
f := Func(func(val interface{}) error {
f := Func[string](func(val string) error {
count++
return nil
})
in := make(chan interface{})
in := make(chan string)
go func() {
in <- "String 1"
in <- "String 2"
Expand All @@ -34,8 +34,8 @@ func TestCollector_Func(t *testing.T) {
}
}

func TestCollector_FuncErr(t *testing.T) {
f := Func(nil)
func TestFuncSinkErr(t *testing.T) {
f := Func[string](nil)

select {
case err := <-f.Open(context.TODO()):
Expand Down
17 changes: 8 additions & 9 deletions collectors/null.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,24 @@ import (
"github.com/vladimirvivien/automi/util"
)

// NullCollector represents a collector that terminates
// to a noop collector
type NullCollector struct {
input <-chan interface{}
// NullSink is noop sink
type NullSink[T any] struct {
input <-chan T
logf api.LogFunc
}

// Null creates the new value of the collector
func Null() *NullCollector {
return new(NullCollector)
// Null creates a new *NullSink with type T
func Null[T any]() *NullSink[T] {
return new(NullSink[T])
}

// SetInput sets the input source for the collector
func (s *NullCollector) SetInput(in <-chan interface{}) {
func (s *NullSink[T]) SetInput(in <-chan T) {
s.input = in
}

// Open opens the node to start collecting
func (s *NullCollector) Open(ctx context.Context) <-chan error {
func (s *NullSink[T]) Open(ctx context.Context) <-chan error {
result := make(chan error)
s.logf = autoctx.GetLogFunc(ctx)
util.Logfn(s.logf, "Opening null collector")
Expand Down
Loading

0 comments on commit f601257

Please sign in to comment.