Skip to content

Commit

Permalink
[#51]: feature: new JOBS API
Browse files Browse the repository at this point in the history
  • Loading branch information
rustatian authored Jun 21, 2023
2 parents f803dca + 3c28da7 commit 8eca3b1
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 42 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/linters.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v4 # action page: <https://github.com/actions/setup-go>
with:
go-version: '1.20'
go-version: stable

- name: Run linter
uses: golangci/golangci-lint-action@v3.5.0 # Action page: <https://github.com/golangci/golangci-lint-action>
uses: golangci/golangci-lint-action@v3.6.0 # Action page: <https://github.com/golangci/golangci-lint-action>
with:
version: v1.51 # without patch version
version: v1.53 # without patch version
only-new-issues: false # show only new issues if it's a pull request
args: --timeout=10m --build-tags=race
2 changes: 0 additions & 2 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ linters: # All available linters list: <https://golangci-lint.run/usage/linters/
enable:
- asciicheck # Simple linter to check that your code does not contain non-ASCII identifiers
- bodyclose # Checks whether HTTP response body is closed successfully
- depguard # Go linter that checks if package imports are in a list of acceptable packages
- dogsled # Checks assignments with too many blank identifiers (e.g. x, _, _, _, := f())
- dupl # Tool for code clone detection
- errcheck # Errcheck is a program for checking for unchecked errors in go programs. These unchecked errors can be critical bugs in some cases
Expand All @@ -53,7 +52,6 @@ linters: # All available linters list: <https://golangci-lint.run/usage/linters/
- exportloopref # checks for pointers to enclosing loop variables
- gochecknoglobals # Checks that no globals are present in Go code
- gochecknoinits # Checks that no init functions are present in Go code
- gocognit # Computes and checks the cognitive complexity of functions
- goconst # Finds repeated strings that could be replaced by a constant
- gocritic # The most opinionated Go source code linter
- gocyclo # Computes and checks the cyclomatic complexity of functions
Expand Down
4 changes: 2 additions & 2 deletions beanstalkjobs/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (cp *ConnPool) Delete(_ context.Context, id uint64) error {
return nil
}

func (cp *ConnPool) Stats(_ context.Context) (map[string]string, error) {
func (cp *ConnPool) Stats(context.Context) (map[string]string, error) {
cp.RLock()
defer cp.RUnlock()

Expand All @@ -143,9 +143,9 @@ func (cp *ConnPool) Stats(_ context.Context) (map[string]string, error) {
// Stop and close the connections
func (cp *ConnPool) Stop() {
cp.Lock()
defer cp.Unlock()
_ = cp.connTS.Load().Close()
_ = cp.connT.Load().Close()
cp.Unlock()
}

func (cp *ConnPool) redial() error {
Expand Down
17 changes: 8 additions & 9 deletions beanstalkjobs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ import (
"sync/atomic"
"time"

"github.com/roadrunner-server/api/v4/plugins/v1/jobs"
pq "github.com/roadrunner-server/api/v4/plugins/v1/priority_queue"
"github.com/roadrunner-server/api/v4/plugins/v2/jobs"
"github.com/roadrunner-server/errors"
"github.com/roadrunner-server/sdk/v4/utils"
jprop "go.opentelemetry.io/contrib/propagators/jaeger"
Expand All @@ -37,7 +36,7 @@ type Configurer interface {

type Driver struct {
log *zap.Logger
pq pq.Queue
pq jobs.Queue
consumeAll bool
tracer *sdktrace.TracerProvider
prop propagation.TextMapPropagator
Expand All @@ -60,7 +59,7 @@ type Driver struct {
stopCh chan struct{}
}

func FromConfig(tracer *sdktrace.TracerProvider, configKey string, log *zap.Logger, cfg Configurer, pipe jobs.Pipeline, pq pq.Queue) (*Driver, error) {
func FromConfig(tracer *sdktrace.TracerProvider, configKey string, log *zap.Logger, cfg Configurer, pipe jobs.Pipeline, pq jobs.Queue) (*Driver, error) {
const op = errors.Op("new_beanstalk_consumer")

if tracer == nil {
Expand Down Expand Up @@ -131,7 +130,7 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, log *zap.Logg
return jc, nil
}

func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.Logger, cfg Configurer, pq pq.Queue) (*Driver, error) {
func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.Logger, cfg Configurer, pq jobs.Queue) (*Driver, error) {
const op = errors.Op("new_beanstalk_consumer")

if tracer == nil {
Expand Down Expand Up @@ -192,7 +191,7 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.
return jc, nil
}

func (d *Driver) Push(ctx context.Context, jb jobs.Job) error {
func (d *Driver) Push(ctx context.Context, jb jobs.Message) error {
const op = errors.Op("beanstalk_push")
// check if the pipeline registered

Expand All @@ -201,8 +200,8 @@ func (d *Driver) Push(ctx context.Context, jb jobs.Job) error {

// load atomic value
pipe := *d.pipeline.Load()
if pipe.Name() != jb.Pipeline() {
return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Pipeline(), pipe.Name()))
if pipe.Name() != jb.GroupID() {
return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.GroupID(), pipe.Name()))
}

err := d.handleItem(ctx, fromJob(jb))
Expand Down Expand Up @@ -352,7 +351,7 @@ func (d *Driver) Resume(ctx context.Context, p string) error {
func (d *Driver) handleItem(ctx context.Context, item *Item) error {
const op = errors.Op("beanstalk_handle_item")

d.prop.Inject(ctx, propagation.HeaderCarrier(item.Headers))
d.prop.Inject(ctx, propagation.HeaderCarrier(item.headers))

bb := new(bytes.Buffer)
bb.Grow(64)
Expand Down
24 changes: 14 additions & 10 deletions beanstalkjobs/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/beanstalkd/go-beanstalk"
"github.com/goccy/go-json"
"github.com/google/uuid"
"github.com/roadrunner-server/api/v4/plugins/v1/jobs"
"github.com/roadrunner-server/api/v4/plugins/v2/jobs"
"github.com/roadrunner-server/sdk/v4/utils"
"go.uber.org/zap"
)
Expand All @@ -27,7 +27,7 @@ type Item struct {
// Payload is string data (usually JSON) passed to Job broker.
Payload string `json:"payload"`
// Headers with key-values pairs
Headers map[string][]string `json:"headers"`
headers map[string][]string
// Options contains set of PipelineOptions specific to job execution. Can be empty.
Options *Options `json:"options,omitempty"`
}
Expand Down Expand Up @@ -65,13 +65,17 @@ func (i *Item) Priority() int64 {
return i.Options.Priority
}

func (i *Item) GroupID() string {
return i.Options.Pipeline
}

// Body packs job payload into binary payload.
func (i *Item) Body() []byte {
return utils.AsBytes(i.Payload)
}

func (i *Item) Metadata() map[string][]string {
return i.Headers
func (i *Item) Headers() map[string][]string {
return i.headers
}

// Context packs job context (job, id) into binary payload.
Expand All @@ -89,7 +93,7 @@ func (i *Item) Context() ([]byte, error) {
ID: i.Ident,
Job: i.Job,
Driver: pluginName,
Headers: i.Headers,
Headers: i.headers,
Queue: i.Options.Queue,
Pipeline: i.Options.Pipeline,
},
Expand Down Expand Up @@ -119,7 +123,7 @@ func (i *Item) Nack() error {
func (i *Item) Requeue(headers map[string][]string, delay int64) error {
// overwrite the delay
i.Options.Delay = delay
i.Headers = headers
i.headers = headers

err := i.Options.requeueFn(context.Background(), i)
if err != nil {
Expand All @@ -139,16 +143,16 @@ func (i *Item) Respond(_ []byte, _ string) error {
return nil
}

func fromJob(job jobs.Job) *Item {
func fromJob(job jobs.Message) *Item {
return &Item{
Job: job.Name(),
Ident: job.ID(),
Payload: job.Payload(),
Headers: job.Headers(),
headers: job.Headers(),
Options: &Options{
AutoAck: job.AutoAck(),
Priority: job.Priority(),
Pipeline: job.Pipeline(),
Pipeline: job.GroupID(),
Delay: job.Delay(),
},
}
Expand All @@ -172,7 +176,7 @@ func (d *Driver) unpack(id uint64, data []byte, out *Item) error {
Job: auto,
Ident: uid,
Payload: utils.AsString(data),
Headers: make(map[string][]string, 2),
headers: make(map[string][]string, 2),
Options: &Options{
Priority: 10,
Pipeline: (*d.pipeline.Load()).Name(),
Expand Down
11 changes: 9 additions & 2 deletions beanstalkjobs/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ func (d *Driver) listen() {
select {
case <-d.stopCh:
d.log.Debug("beanstalk listener stopped")
// remove all items associated with the pipeline
_ = d.pq.Remove((*d.pipeline.Load()).Name())
return
default:
id, body, err := d.pool.Reserve(d.reserveTimeout)
Expand All @@ -35,7 +37,7 @@ func (d *Driver) listen() {

item := &Item{}
err = d.unpack(id, body, item)
ctx := otel.GetTextMapPropagator().Extract(context.Background(), propagation.HeaderCarrier(item.Headers))
ctx := otel.GetTextMapPropagator().Extract(context.Background(), propagation.HeaderCarrier(item.headers))
ctx, span := d.tracer.Tracer(tracerName).Start(ctx, "beanstalk_listener")

if err != nil {
Expand Down Expand Up @@ -65,7 +67,12 @@ func (d *Driver) listen() {
}
}

d.prop.Inject(ctx, propagation.HeaderCarrier(item.Headers))
// check the header before injecting OTEL headers
if item.headers == nil {
item.headers = make(map[string][]string, 2)
}

d.prop.Inject(ctx, propagation.HeaderCarrier(item.headers))
// insert job into the priority queue
d.pq.Insert(item)
span.End()
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ require (
github.com/cenkalti/backoff/v4 v4.2.1
github.com/goccy/go-json v0.10.2
github.com/google/uuid v1.3.0
github.com/roadrunner-server/api/v4 v4.3.2
github.com/roadrunner-server/api/v4 v4.5.0
github.com/roadrunner-server/endure/v2 v2.2.1
github.com/roadrunner-server/errors v1.2.0
github.com/roadrunner-server/sdk/v4 v4.2.6
github.com/roadrunner-server/sdk/v4 v4.3.0
go.opentelemetry.io/contrib/propagators/jaeger v1.17.0
go.opentelemetry.io/otel v1.16.0
go.opentelemetry.io/otel/sdk v1.16.0
Expand All @@ -25,5 +25,5 @@ require (
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/sys v0.9.0 // indirect
)
14 changes: 7 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@ github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/roadrunner-server/api/v4 v4.3.2 h1:1zMfd2P+i9hTDFIsp9nEhCLNe+GmUrUqgnON8ZUO6Mc=
github.com/roadrunner-server/api/v4 v4.3.2/go.mod h1:HFb1kQ/H5UkD7MBNqi4L7hXQTtc919FcO8JKPqoSVzs=
github.com/roadrunner-server/api/v4 v4.5.0 h1:OUAcCwLeQbgRj7E2/6M6W2nxOnbG6XYPSS6LjW6COAQ=
github.com/roadrunner-server/api/v4 v4.5.0/go.mod h1:nzJvLrDMYT0K9hgPFmeL8dh6q2EvrJEaCHy2XRqz20c=
github.com/roadrunner-server/endure/v2 v2.2.1 h1:OkJUSd6+qqTcnl8in3bbyidEOmhO3B9uOVdR0avba28=
github.com/roadrunner-server/endure/v2 v2.2.1/go.mod h1:4eTAr3fASpdyqgFcbqVckOx68dZ4YPECecrcHvAuSdU=
github.com/roadrunner-server/errors v1.2.0 h1:qBmNXt8Iex9QnYTjCkbJKsBZu2EtYkQCM06GUDcQBbI=
github.com/roadrunner-server/errors v1.2.0/go.mod h1:z0ECxZp/dDa5RahtMcy4mBIavVxiZ9vwE5kByl7kFtY=
github.com/roadrunner-server/sdk/v4 v4.2.6 h1:BSQ+HHklszJKGCo91jqRwvgjhSkuz097cbPHMFAhIfo=
github.com/roadrunner-server/sdk/v4 v4.2.6/go.mod h1:WBLEsz9EMY6CkwpdeageMEPLevD/PaUf4rOOsBsaKlo=
github.com/roadrunner-server/sdk/v4 v4.3.0 h1:QdCK5kd/eLTnyLQsOOFtkd3CQK+NR4j/5PF5OpeHvA0=
github.com/roadrunner-server/sdk/v4 v4.3.0/go.mod h1:7LIxYOBo306SncMKUxEy9Xd3GQWYHW0G4zuDHnKZ3l4=
github.com/roadrunner-server/tcplisten v1.3.0 h1:VDd6IbP8oIjm5vKvMVozeZgeHgOcoP0XYLOyOqcZHCY=
github.com/roadrunner-server/tcplisten v1.3.0/go.mod h1:VR6Ob5am0oEuLMOeLiVvQxG9ShykAEgrlvZddX8EfoU=
github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
go.opentelemetry.io/contrib/propagators/jaeger v1.17.0 h1:Zbpbmwav32Ea5jSotpmkWEl3a6Xvd4tw/3xxGO1i05Y=
go.opentelemetry.io/contrib/propagators/jaeger v1.17.0/go.mod h1:tcTUAlmO8nuInPDSBVfG+CP6Mzjy5+gNV4mPxMbL0IA=
go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s=
Expand All @@ -46,6 +46,6 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.9.0 h1:KS/R3tvhPqvJvwcKfnBHJwwthS11LRhmM5D59eEXa0s=
golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
7 changes: 3 additions & 4 deletions plugin.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package beanstalk

import (
"github.com/roadrunner-server/api/v4/plugins/v1/jobs"
pq "github.com/roadrunner-server/api/v4/plugins/v1/priority_queue"
"github.com/roadrunner-server/api/v4/plugins/v2/jobs"
"github.com/roadrunner-server/beanstalk/v4/beanstalkjobs"
"github.com/roadrunner-server/endure/v2/dep"
"github.com/roadrunner-server/errors"
Expand Down Expand Up @@ -56,11 +55,11 @@ func (p *Plugin) Collects() []*dep.In {
}

// DriverFromConfig constructs kafka driver from the .rr.yaml configuration
func (p *Plugin) DriverFromConfig(configKey string, pq pq.Queue, pipeline jobs.Pipeline, cmder chan<- jobs.Commander) (jobs.Driver, error) {
func (p *Plugin) DriverFromConfig(configKey string, pq jobs.Queue, pipeline jobs.Pipeline, _ chan<- jobs.Commander) (jobs.Driver, error) {
return beanstalkjobs.FromConfig(p.tracer, configKey, p.log, p.cfg, pipeline, pq)
}

// DriverFromPipeline constructs kafka driver from pipeline
func (p *Plugin) DriverFromPipeline(pipe jobs.Pipeline, pq pq.Queue, cmder chan<- jobs.Commander) (jobs.Driver, error) {
func (p *Plugin) DriverFromPipeline(pipe jobs.Pipeline, pq jobs.Queue, _ chan<- jobs.Commander) (jobs.Driver, error) {
return beanstalkjobs.FromPipeline(p.tracer, pipe, p.log, p.cfg, pq)
}

0 comments on commit 8eca3b1

Please sign in to comment.