Skip to content

Commit

Permalink
Pass Context through span processors (#6534)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- Context is lost in the process, see
#6491 (comment)

## Description of the changes
- Add Context argument to handler and span processor methods

## How was this change tested?
- CI

---------

Signed-off-by: Yuri Shkuro <[email protected]>
  • Loading branch information
yurishkuro authored Jan 13, 2025
1 parent f60bd09 commit 441c274
Show file tree
Hide file tree
Showing 11 changed files with 47 additions and 33 deletions.
2 changes: 1 addition & 1 deletion cmd/collector/app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func TestAggregator(t *testing.T) {
},
},
}
_, err := c.spanProcessor.ProcessSpans(processor.SpansV1{
_, err := c.spanProcessor.ProcessSpans(context.Background(), processor.SpansV1{
Spans: spans,
Details: processor.Details{
SpanFormat: processor.JaegerSpanFormat,
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/handler/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (c *batchConsumer) consume(ctx context.Context, batch *model.Batch) error {
span.Process = batch.Process
}
}
_, err = c.spanProcessor.ProcessSpans(processor.SpansV1{
_, err = c.spanProcessor.ProcessSpans(ctx, processor.SpansV1{
Spans: batch.Spans,
Details: processor.Details{
InboundTransport: c.spanOptions.InboundTransport,
Expand Down
4 changes: 3 additions & 1 deletion cmd/collector/app/handler/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

var _ processor.SpanProcessor = (*mockSpanProcessor)(nil)

type mockSpanProcessor struct {
expectedError error
mux sync.Mutex
Expand All @@ -34,7 +36,7 @@ type mockSpanProcessor struct {
spanFormat processor.SpanFormat
}

func (p *mockSpanProcessor) ProcessSpans(batch processor.Batch) ([]bool, error) {
func (p *mockSpanProcessor) ProcessSpans(_ context.Context, batch processor.Batch) ([]bool, error) {
p.mux.Lock()
defer p.mux.Unlock()
batch.GetSpans(func(spans []*model.Span) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/handler/http_thrift_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (aH *APIHandler) SaveSpan(w http.ResponseWriter, r *http.Request) {
}
batches := []*tJaeger.Batch{batch}
opts := SubmitBatchOptions{InboundTransport: processor.HTTPTransport}
if _, err = aH.jaegerBatchesHandler.SubmitBatches(batches, opts); err != nil {
if _, err = aH.jaegerBatchesHandler.SubmitBatches(r.Context(), batches, opts); err != nil {
http.Error(w, fmt.Sprintf("Cannot submit Jaeger batch: %v", err), http.StatusInternalServerError)
return
}
Expand Down
7 changes: 5 additions & 2 deletions cmd/collector/app/handler/http_thrift_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,18 @@ import (
"github.com/jaegertracing/jaeger/thrift-gen/jaeger"
)

var httpClient = &http.Client{Timeout: 2 * time.Second}
var (
httpClient = &http.Client{Timeout: 2 * time.Second}
_ JaegerBatchesHandler = (*mockJaegerHandler)(nil)
)

type mockJaegerHandler struct {
err error
mux sync.Mutex
batches []*jaeger.Batch
}

func (p *mockJaegerHandler) SubmitBatches(batches []*jaeger.Batch, _ SubmitBatchOptions) ([]*jaeger.BatchSubmitResponse, error) {
func (p *mockJaegerHandler) SubmitBatches(_ context.Context, batches []*jaeger.Batch, _ SubmitBatchOptions) ([]*jaeger.BatchSubmitResponse, error) {
p.mux.Lock()
defer p.mux.Unlock()
p.batches = append(p.batches, batches...)
Expand Down
14 changes: 8 additions & 6 deletions cmd/collector/app/handler/thrift_span_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package handler

import (
"context"

"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
Expand All @@ -24,13 +26,13 @@ type SubmitBatchOptions struct {
// ZipkinSpansHandler consumes and handles zipkin spans
type ZipkinSpansHandler interface {
// SubmitZipkinBatch records a batch of spans in Zipkin Thrift format
SubmitZipkinBatch(spans []*zipkincore.Span, options SubmitBatchOptions) ([]*zipkincore.Response, error)
SubmitZipkinBatch(ctx context.Context, spans []*zipkincore.Span, options SubmitBatchOptions) ([]*zipkincore.Response, error)
}

// JaegerBatchesHandler consumes and handles Jaeger batches
type JaegerBatchesHandler interface {
// SubmitBatches records a batch of spans in Jaeger Thrift format
SubmitBatches(batches []*jaeger.Batch, options SubmitBatchOptions) ([]*jaeger.BatchSubmitResponse, error)
SubmitBatches(ctx context.Context, batches []*jaeger.Batch, options SubmitBatchOptions) ([]*jaeger.BatchSubmitResponse, error)
}

type jaegerBatchesHandler struct {
Expand All @@ -46,15 +48,15 @@ func NewJaegerSpanHandler(logger *zap.Logger, modelProcessor processor.SpanProce
}
}

func (jbh *jaegerBatchesHandler) SubmitBatches(batches []*jaeger.Batch, options SubmitBatchOptions) ([]*jaeger.BatchSubmitResponse, error) {
func (jbh *jaegerBatchesHandler) SubmitBatches(ctx context.Context, batches []*jaeger.Batch, options SubmitBatchOptions) ([]*jaeger.BatchSubmitResponse, error) {
responses := make([]*jaeger.BatchSubmitResponse, 0, len(batches))
for _, batch := range batches {
mSpans := make([]*model.Span, 0, len(batch.Spans))
for _, span := range batch.Spans {
mSpan := jConv.ToDomainSpan(span, batch.Process)
mSpans = append(mSpans, mSpan)
}
oks, err := jbh.modelProcessor.ProcessSpans(processor.SpansV1{
oks, err := jbh.modelProcessor.ProcessSpans(ctx, processor.SpansV1{
Spans: mSpans,
Details: processor.Details{
InboundTransport: options.InboundTransport,
Expand Down Expand Up @@ -98,7 +100,7 @@ func NewZipkinSpanHandler(logger *zap.Logger, modelHandler processor.SpanProcess
}

// SubmitZipkinBatch records a batch of spans already in Zipkin Thrift format.
func (h *zipkinSpanHandler) SubmitZipkinBatch(spans []*zipkincore.Span, options SubmitBatchOptions) ([]*zipkincore.Response, error) {
func (h *zipkinSpanHandler) SubmitZipkinBatch(ctx context.Context, spans []*zipkincore.Span, options SubmitBatchOptions) ([]*zipkincore.Response, error) {
mSpans := make([]*model.Span, 0, len(spans))
convCount := make([]int, len(spans))
for i, span := range spans {
Expand All @@ -108,7 +110,7 @@ func (h *zipkinSpanHandler) SubmitZipkinBatch(spans []*zipkincore.Span, options
convCount[i] = len(converted)
mSpans = append(mSpans, converted...)
}
bools, err := h.modelProcessor.ProcessSpans(processor.SpansV1{
bools, err := h.modelProcessor.ProcessSpans(ctx, processor.SpansV1{
Spans: mSpans,
Details: processor.Details{
InboundTransport: options.InboundTransport,
Expand Down
12 changes: 8 additions & 4 deletions cmd/collector/app/handler/thrift_span_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package handler

import (
"context"
"encoding/json"
"errors"
"os"
Expand Down Expand Up @@ -36,7 +37,7 @@ func TestJaegerSpanHandler(t *testing.T) {
for _, tc := range testChunks {
logger := zap.NewNop()
h := NewJaegerSpanHandler(logger, &shouldIErrorProcessor{tc.expectedErr != nil})
res, err := h.SubmitBatches([]*jaeger.Batch{
res, err := h.SubmitBatches(context.Background(), []*jaeger.Batch{
{
Process: &jaeger.Process{ServiceName: "someServiceName"},
Spans: []*jaeger.Span{{SpanId: 21345}},
Expand All @@ -57,9 +58,12 @@ type shouldIErrorProcessor struct {
shouldError bool
}

var errTestError = errors.New("Whoops")
var (
_ processor.SpanProcessor = (*shouldIErrorProcessor)(nil)
errTestError = errors.New("Whoops")
)

func (s *shouldIErrorProcessor) ProcessSpans(batch processor.Batch) ([]bool, error) {
func (s *shouldIErrorProcessor) ProcessSpans(_ context.Context, batch processor.Batch) ([]bool, error) {
if s.shouldError {
return nil, errTestError
}
Expand Down Expand Up @@ -121,7 +125,7 @@ func TestZipkinSpanHandler(t *testing.T) {
},
}
}
res, err := h.SubmitZipkinBatch(spans, SubmitBatchOptions{})
res, err := h.SubmitZipkinBatch(context.Background(), spans, SubmitBatchOptions{})
if tc.expectedErr != nil {
assert.Nil(t, res)
assert.Equal(t, tc.expectedErr, err)
Expand Down
3 changes: 2 additions & 1 deletion cmd/collector/app/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package processor

import (
"context"
"io"

"go.opentelemetry.io/collector/pdata/ptrace"
Expand All @@ -29,7 +30,7 @@ type Batch interface {
// SpanProcessor handles spans
type SpanProcessor interface {
// ProcessSpans processes spans and return with either a list of true/false success or an error
ProcessSpans(spans Batch) ([]bool, error)
ProcessSpans(ctx context.Context, spans Batch) ([]bool, error)
io.Closer
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/server/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ func (*mockSpanProcessor) Close() error {
return nil
}

func (*mockSpanProcessor) ProcessSpans(_ processor.Batch) ([]bool, error) {
func (*mockSpanProcessor) ProcessSpans(_ context.Context, _ processor.Batch) ([]bool, error) {
return []bool{}, nil
}
11 changes: 6 additions & 5 deletions cmd/collector/app/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const (
minRequiredChange = 1.2
)

var _ processor.SpanProcessor = (*spanProcessor)(nil)

type spanProcessor struct {
queue *queue.BoundedQueue[queueItem]
otelExporter exporter.Traces
Expand Down Expand Up @@ -239,19 +241,18 @@ func (sp *spanProcessor) countSpansInQueue(span *model.Span, _ string /* tenant
sp.spansProcessed.Add(1)
}

// TODO pass Context
func (sp *spanProcessor) ProcessSpans(batch processor.Batch) ([]bool, error) {
func (sp *spanProcessor) ProcessSpans(ctx context.Context, batch processor.Batch) ([]bool, error) {
// We call preProcessSpans on a batch, it's responsibility of implementation
// to understand v1/v2 distinction. Jaeger itself does not use pre-processors.
sp.preProcessSpans(batch)

var batchOks []bool
var batchErr error
batch.GetSpans(func(spans []*model.Span) {
batchOks, batchErr = sp.processSpans(batch, spans)
batchOks, batchErr = sp.processSpans(ctx, batch, spans)
}, func(traces ptrace.Traces) {
// TODO verify if the context will survive all the way to the consumer threads.
ctx := tenancy.WithTenant(context.Background(), batch.GetTenant())
ctx := tenancy.WithTenant(ctx, batch.GetTenant())

// the exporter will eventually call pushTraces from consumer threads.
if err := sp.otelExporter.ConsumeTraces(ctx, traces); err != nil {
Expand All @@ -266,7 +267,7 @@ func (sp *spanProcessor) ProcessSpans(batch processor.Batch) ([]bool, error) {
return batchOks, batchErr
}

func (sp *spanProcessor) processSpans(batch processor.Batch, spans []*model.Span) ([]bool, error) {
func (sp *spanProcessor) processSpans(_ context.Context, batch processor.Batch, spans []*model.Span) ([]bool, error) {
sp.metrics.BatchSize.Update(int64(len(spans)))
retMe := make([]bool, len(spans))

Expand Down
21 changes: 11 additions & 10 deletions cmd/collector/app/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ func TestBySvcMetrics(t *testing.T) {
span := makeZipkinSpan(test.serviceName, test.rootSpan, test.debug)
sanitizer := zipkinsanitizer.NewChainedSanitizer(zipkinsanitizer.NewStandardSanitizers()...)
zHandler := handler.NewZipkinSpanHandler(logger, sp, sanitizer)
zHandler.SubmitZipkinBatch([]*zc.Span{span, span}, handler.SubmitBatchOptions{})
zHandler.SubmitZipkinBatch(context.Background(), []*zc.Span{span, span}, handler.SubmitBatchOptions{})
metricPrefix = "service"
format = "zipkin"
case processor.JaegerSpanFormat:
span, process := makeJaegerSpan(test.serviceName, test.rootSpan, test.debug)
jHandler := handler.NewJaegerSpanHandler(logger, sp)
jHandler.SubmitBatches([]*jaeger.Batch{
jHandler.SubmitBatches(context.Background(), []*jaeger.Batch{
{
Spans: []*jaeger.Span{
span,
Expand Down Expand Up @@ -248,6 +248,7 @@ func TestSpanProcessor(t *testing.T) {
require.NoError(t, err)

res, err := p.ProcessSpans(
context.Background(),
processor.SpansV1{
Spans: []*model.Span{{}}, // empty span should be enriched by sanitizers
Details: processor.Details{
Expand Down Expand Up @@ -280,7 +281,7 @@ func TestSpanProcessorErrors(t *testing.T) {
require.NoError(t, err)
p := pp.(*spanProcessor)

res, err := p.ProcessSpans(processor.SpansV1{
res, err := p.ProcessSpans(context.Background(), processor.SpansV1{
Spans: []*model.Span{
{
Process: &model.Process{
Expand Down Expand Up @@ -340,7 +341,7 @@ func TestSpanProcessorBusy(t *testing.T) {
w.Lock()
defer w.Unlock()

res, err := p.ProcessSpans(processor.SpansV1{
res, err := p.ProcessSpans(context.Background(), processor.SpansV1{
Spans: []*model.Span{
{
Process: &model.Process{
Expand Down Expand Up @@ -427,7 +428,7 @@ func TestSpanProcessorWithCollectorTags(t *testing.T) {
Spans: []*model.Span{span},
}
}
_, err = p.ProcessSpans(batch)
_, err = p.ProcessSpans(context.Background(), batch)
require.NoError(t, err)

require.Eventually(t, func() bool {
Expand Down Expand Up @@ -684,7 +685,7 @@ func TestAdditionalProcessors(t *testing.T) {
// nil doesn't fail
p, err := NewSpanProcessor(v1adapter.NewTraceWriter(w), nil, Options.QueueSize(1))
require.NoError(t, err)
res, err := p.ProcessSpans(processor.SpansV1{
res, err := p.ProcessSpans(context.Background(), processor.SpansV1{
Spans: []*model.Span{
{
Process: &model.Process{
Expand All @@ -707,7 +708,7 @@ func TestAdditionalProcessors(t *testing.T) {
}
p, err = NewSpanProcessor(v1adapter.NewTraceWriter(w), []ProcessSpan{f}, Options.QueueSize(1))
require.NoError(t, err)
res, err = p.ProcessSpans(processor.SpansV1{
res, err = p.ProcessSpans(context.Background(), processor.SpansV1{
Spans: []*model.Span{
{
Process: &model.Process{
Expand All @@ -732,7 +733,7 @@ func TestSpanProcessorContextPropagation(t *testing.T) {

dummyTenant := "context-prop-test-tenant"

res, err := p.ProcessSpans(processor.SpansV1{
res, err := p.ProcessSpans(context.Background(), processor.SpansV1{
Spans: []*model.Span{
{
Process: &model.Process{
Expand Down Expand Up @@ -777,7 +778,7 @@ func TestSpanProcessorWithOnDroppedSpanOption(t *testing.T) {
w.Lock()
defer w.Unlock()

_, err = p.ProcessSpans(processor.SpansV1{
_, err = p.ProcessSpans(context.Background(), processor.SpansV1{
Spans: []*model.Span{
{OperationName: "op1"},
},
Expand All @@ -794,7 +795,7 @@ func TestSpanProcessorWithOnDroppedSpanOption(t *testing.T) {

// Now the queue is empty again and can accept one more item, but no workers available.
// If we send two items, the last one will have to be dropped.
_, err = p.ProcessSpans(processor.SpansV1{
_, err = p.ProcessSpans(context.Background(), processor.SpansV1{
Spans: []*model.Span{
{OperationName: "op2"},
{OperationName: "op3"},
Expand Down

0 comments on commit 441c274

Please sign in to comment.