From 3ca5bf6b5d90231f97abb42d535652864fc6a8f8 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Fri, 10 Jan 2025 16:33:14 -0800 Subject: [PATCH] Add capability for memory and persistent queue to block when add items Signed-off-by: Bogdan Drutu --- .chloggen/add-blocking.yaml | 25 +++++++ .../exporterqueue/bounded_memory_queue.go | 5 +- .../bounded_memory_queue_test.go | 48 +++++++++++- exporter/exporterqueue/cond.go | 59 +++++++++++++++ exporter/exporterqueue/persistent_queue.go | 74 ++++++++++--------- .../exporterqueue/persistent_queue_test.go | 64 ++++++++++++++-- exporter/exporterqueue/sized_queue.go | 47 +++++++----- exporter/exporterqueue/sized_queue_test.go | 8 +- 8 files changed, 265 insertions(+), 65 deletions(-) create mode 100644 .chloggen/add-blocking.yaml create mode 100644 exporter/exporterqueue/cond.go diff --git a/.chloggen/add-blocking.yaml b/.chloggen/add-blocking.yaml new file mode 100644 index 00000000000..5b2d3895b7e --- /dev/null +++ b/.chloggen/add-blocking.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add capability for memory and persistent queue to block when add items + +# One or more tracking issues or pull requests related to the change +issues: [12074] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/exporter/exporterqueue/bounded_memory_queue.go b/exporter/exporterqueue/bounded_memory_queue.go index 7543958b2a1..e94eb5da1c3 100644 --- a/exporter/exporterqueue/bounded_memory_queue.go +++ b/exporter/exporterqueue/bounded_memory_queue.go @@ -23,17 +23,18 @@ type boundedMemoryQueue[T any] struct { type memoryQueueSettings[T any] struct { sizer sizer[T] capacity int64 + blocking bool } // newBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional // callback for dropped items (e.g. useful to emit metrics). func newBoundedMemoryQueue[T any](set memoryQueueSettings[T]) Queue[T] { return &boundedMemoryQueue[T]{ - sizedQueue: newSizedQueue[T](set.capacity, set.sizer), + sizedQueue: newSizedQueue[T](set.capacity, set.sizer, set.blocking), } } -func (q *boundedMemoryQueue[T]) Read(_ context.Context) (uint64, context.Context, T, bool) { +func (q *boundedMemoryQueue[T]) Read(context.Context) (uint64, context.Context, T, bool) { ctx, req, ok := q.sizedQueue.pop() return 0, ctx, req, ok } diff --git a/exporter/exporterqueue/bounded_memory_queue_test.go b/exporter/exporterqueue/bounded_memory_queue_test.go index f723f8263f8..dc39dd09b4e 100644 --- a/exporter/exporterqueue/bounded_memory_queue_test.go +++ b/exporter/exporterqueue/bounded_memory_queue_test.go @@ -133,6 +133,48 @@ func TestQueueUsage(t *testing.T) { } } +func TestBlockingQueueUsage(t *testing.T) { + tests := []struct { + name string + sizer sizer[ptrace.Traces] + }{ + { + name: "requests_based", + sizer: &requestSizer[ptrace.Traces]{}, + }, + { + name: "items_based", + sizer: &itemsSizer{}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + q := newBoundedMemoryQueue[ptrace.Traces](memoryQueueSettings[ptrace.Traces]{sizer: tt.sizer, capacity: int64(100), blocking: true}) + consumed := &atomic.Int64{} + require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) + ac := newAsyncConsumer(q, 10, func(context.Context, ptrace.Traces) error { + consumed.Add(1) + return nil + }) + td := testdata.GenerateTraces(10) + wg := &sync.WaitGroup{} + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 100_000; j++ { + assert.NoError(t, q.Offer(context.Background(), td)) + } + }() + } + wg.Wait() + assert.NoError(t, q.Shutdown(context.Background())) + assert.NoError(t, ac.Shutdown(context.Background())) + assert.Equal(t, int64(1_000_000), consumed.Load()) + }) + } +} + func TestZeroSizeNoConsumers(t *testing.T) { q := newBoundedMemoryQueue[string](memoryQueueSettings[string]{sizer: &requestSizer[string]{}, capacity: 0}) @@ -149,8 +191,7 @@ func consume[T any](q Queue[T], consumeFunc func(context.Context, T) error) bool if !ok { return false } - consumeErr := consumeFunc(ctx, req) - q.OnProcessingFinished(index, consumeErr) + q.OnProcessingFinished(index, consumeFunc(ctx, req)) return true } @@ -170,8 +211,7 @@ func newAsyncConsumer[T any](q Queue[T], numConsumers int, consumeFunc func(cont if !ok { return } - consumeErr := consumeFunc(ctx, req) - q.OnProcessingFinished(index, consumeErr) + q.OnProcessingFinished(index, consumeFunc(ctx, req)) } }() } diff --git a/exporter/exporterqueue/cond.go b/exporter/exporterqueue/cond.go new file mode 100644 index 00000000000..a7845a65453 --- /dev/null +++ b/exporter/exporterqueue/cond.go @@ -0,0 +1,59 @@ +package exporterqueue + +import ( + "context" + "sync" +) + +// cond is equivalent with sync.Cond, but context.Context aware. Which means Wait() will return if context is done. +// Also, it requires the caller to hold the c.L during all calls. +type cond struct { + L sync.Locker + ch chan struct{} + waiting int64 +} + +func newCond(l sync.Locker) *cond { + return &cond{L: l, ch: make(chan struct{}, 1)} +} + +// Signal wakes one goroutine waiting on c, if there is any. +// It requires for the caller to hold c.L during the call. +func (c *cond) Signal() { + if c.waiting == 0 { + return + } + c.waiting-- + c.ch <- struct{}{} +} + +// Broadcast wakes all goroutines waiting on c. +// It requires for the caller to hold c.L during the call. +func (c *cond) Broadcast() { + for ; c.waiting > 0; c.waiting-- { + c.ch <- struct{}{} + } +} + +// Wait atomically unlocks c.L and suspends execution of the calling goroutine. After later resuming execution, Wait locks c.L before returning. +func (c *cond) Wait(ctx context.Context) error { + c.waiting++ + c.L.Unlock() + select { + case <-ctx.Done(): + c.L.Lock() + if c.waiting == 0 { + // If waiting is 0, it means that there was a signal sent and nobody else waits for it. + // Consume it, so that we don't unblock other consumer unnecessary, + // or we don't block the producer because the channel buffer is full. + <-c.ch + } else { + // Decrease the number of waiting routines. + c.waiting-- + } + return ctx.Err() + case <-c.ch: + c.L.Lock() + return nil + } +} diff --git a/exporter/exporterqueue/persistent_queue.go b/exporter/exporterqueue/persistent_queue.go index 586a5139630..ceda5727ad8 100644 --- a/exporter/exporterqueue/persistent_queue.go +++ b/exporter/exporterqueue/persistent_queue.go @@ -11,7 +11,6 @@ import ( "strconv" "sync" - "go.uber.org/multierr" "go.uber.org/zap" "go.opentelemetry.io/collector/component" @@ -42,6 +41,7 @@ var ( type persistentQueueSettings[T any] struct { sizer sizer[T] capacity int64 + blocking bool signal pipeline.Signal storageID component.ID marshaler Marshaler[T] @@ -81,7 +81,8 @@ type persistentQueue[T any] struct { // mu guards everything declared below. mu sync.Mutex - hasElements *sync.Cond + hasMoreElements *cond + hasMoreSpace *cond readIndex uint64 writeIndex uint64 currentlyDispatchedItems []uint64 @@ -98,7 +99,8 @@ func newPersistentQueue[T any](set persistentQueueSettings[T]) Queue[T] { logger: set.set.Logger, isRequestSized: isRequestSized, } - pq.hasElements = sync.NewCond(&pq.mu) + pq.hasMoreElements = newCond(&pq.mu) + pq.hasMoreSpace = newCond(&pq.mu) return pq } @@ -194,8 +196,8 @@ func (pq *persistentQueue[T]) Shutdown(ctx context.Context) error { backupErr := pq.backupQueueSize(ctx) // Mark this queue as stopped, so consumer don't start any more work. pq.stopped = true - pq.hasElements.Broadcast() - return multierr.Combine(backupErr, pq.unrefClient(ctx)) + pq.hasMoreElements.Broadcast() + return errors.Join(backupErr, pq.unrefClient(ctx)) } // backupQueueSize writes the current queue size to storage. The value is used to recover the queue size @@ -233,8 +235,13 @@ func (pq *persistentQueue[T]) Offer(ctx context.Context, req T) error { // putInternal is the internal version that requires caller to hold the mutex lock. func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error { reqSize := pq.set.sizer.Sizeof(req) - if pq.queueSize+reqSize > pq.set.capacity { - return ErrQueueIsFull + for pq.queueSize+reqSize > pq.set.capacity { + if !pq.set.blocking { + return ErrQueueIsFull + } + if err := pq.hasMoreSpace.Wait(ctx); err != nil { + return err + } } reqBuf, err := pq.set.marshaler(req) @@ -253,7 +260,7 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error { pq.writeIndex++ pq.queueSize += reqSize - pq.hasElements.Signal() + pq.hasMoreElements.Signal() // Back up the queue size to storage every 10 writes. The stored value is used to recover the queue size // in case if the collector is killed. The recovered queue size is allowed to be inaccurate. @@ -269,32 +276,38 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error { func (pq *persistentQueue[T]) Read(ctx context.Context) (uint64, context.Context, T, bool) { pq.mu.Lock() defer pq.mu.Unlock() - for { - if pq.stopped { - var req T - return 0, context.Background(), req, false - } - - // If queue is empty, wait until more elements and restart. - if pq.readIndex == pq.writeIndex { - pq.hasElements.Wait() - continue - } - index, req, consumed := pq.getNextItem(ctx) - if consumed { - pq.queueSize -= pq.set.sizer.Sizeof(req) - // The size might be not in sync with the queue in case it's restored from the disk - // because we don't flush the current queue size on the disk on every read/write. - // In that case we need to make sure it doesn't go below 0. - if pq.queueSize < 0 { + for { + // Read until either a successful retrieved element or no more elements in the storage. + for pq.readIndex != pq.writeIndex { + index, req, consumed := pq.getNextItem(ctx) + // Ensure the used size and the channel size are in sync. + if pq.readIndex == pq.writeIndex { pq.queueSize = 0 + pq.hasMoreSpace.Signal() + } + if consumed { + pq.queueSize -= pq.set.sizer.Sizeof(req) + // The size might be not in sync with the queue in case it's restored from the disk + // because we don't flush the current queue size on the disk on every read/write. + // In that case we need to make sure it doesn't go below 0. + if pq.queueSize < 0 { + pq.queueSize = 0 + } + pq.hasMoreSpace.Signal() + + return index, context.Background(), req, true } + } - return index, context.Background(), req, true + if pq.stopped { + var req T + return 0, context.Background(), req, false } - // If we did not consume any element retry from the beginning. + // TODO: Change the Queue interface to return an error to allow distinguish between shutdown and context canceled. + // Ok to ignore the error, since the context.Background() will never be done. + _ = pq.hasMoreElements.Wait(context.Background()) } } @@ -363,11 +376,6 @@ func (pq *persistentQueue[T]) OnProcessingFinished(index uint64, consumeErr erro pq.logger.Error("Error writing queue size to storage", zap.Error(qsErr)) } } - - // Ensure the used size and the channel size are in sync. - if pq.readIndex == pq.writeIndex { - pq.queueSize = 0 - } } // retrieveAndEnqueueNotDispatchedReqs gets the items for which sending was not finished, cleans the storage diff --git a/exporter/exporterqueue/persistent_queue_test.go b/exporter/exporterqueue/persistent_queue_test.go index 6866573efdb..196bb34591b 100644 --- a/exporter/exporterqueue/persistent_queue_test.go +++ b/exporter/exporterqueue/persistent_queue_test.go @@ -370,12 +370,10 @@ func TestPersistentQueue_ConsumersProducers(t *testing.T) { t.Run(fmt.Sprintf("#messages: %d #consumers: %d", c.numMessagesProduced, c.numConsumers), func(t *testing.T) { req := newTracesRequest(1, 10) - numMessagesConsumed := &atomic.Int32{} + numMessagesConsumed := &atomic.Int64{} pq := createAndStartTestPersistentQueue(t, &requestSizer[ptrace.Traces]{}, 1000, c.numConsumers, - func(context.Context, - ptrace.Traces, - ) error { - numMessagesConsumed.Add(int32(1)) + func(context.Context, ptrace.Traces) error { + numMessagesConsumed.Add(int64(1)) return nil }) @@ -390,6 +388,62 @@ func TestPersistentQueue_ConsumersProducers(t *testing.T) { } } +func TestPersistentBlockingQueue(t *testing.T) { + tests := []struct { + name string + sizer sizer[ptrace.Traces] + }{ + { + name: "requests_based", + sizer: &requestSizer[ptrace.Traces]{}, + }, + { + name: "items_based", + sizer: &itemsSizer{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pq := newPersistentQueue[ptrace.Traces](persistentQueueSettings[ptrace.Traces]{ + sizer: tt.sizer, + capacity: 100, + blocking: true, + signal: pipeline.SignalTraces, + storageID: component.ID{}, + marshaler: marshalTracesRequest, + unmarshaler: unmarshalTracesRequest, + set: exportertest.NewNopSettings(), + }) + host := &mockHost{ext: map[component.ID]component.Component{ + {}: storagetest.NewMockStorageExtension(nil), + }} + require.NoError(t, pq.Start(context.Background(), host)) + consumed := &atomic.Int64{} + ac := newAsyncConsumer(pq, 10, func(context.Context, ptrace.Traces) error { + consumed.Add(int64(1)) + return nil + }) + + td := newTracesRequest(1, 10) + wg := &sync.WaitGroup{} + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 100_000; j++ { + assert.NoError(t, pq.Offer(context.Background(), td)) + } + }() + } + wg.Wait() + assert.NoError(t, pq.Shutdown(context.Background())) + assert.NoError(t, ac.Shutdown(context.Background())) + assert.Equal(t, int64(1_000_000), consumed.Load()) + }) + } +} + func newTracesRequest(numTraces int, numSpans int) ptrace.Traces { traces := ptrace.NewTraces() batch := traces.ResourceSpans().AppendEmpty() diff --git a/exporter/exporterqueue/sized_queue.go b/exporter/exporterqueue/sized_queue.go index a9b58241c28..d5eeb93903c 100644 --- a/exporter/exporterqueue/sized_queue.go +++ b/exporter/exporterqueue/sized_queue.go @@ -50,22 +50,26 @@ type sizedQueue[T any] struct { sizer sizer[T] cap int64 - mu sync.Mutex - hasElements *sync.Cond - items *linkedQueue[T] - size int64 - stopped bool + mu sync.Mutex + hasMoreElements *cond + hasMoreSpace *cond + items *linkedQueue[T] + size int64 + stopped bool + blocking bool } // newSizedQueue creates a sized elements channel. Each element is assigned a size by the provided sizer. // capacity is the capacity of the queue. -func newSizedQueue[T any](capacity int64, sizer sizer[T]) *sizedQueue[T] { +func newSizedQueue[T any](capacity int64, sizer sizer[T], blocking bool) *sizedQueue[T] { sq := &sizedQueue[T]{ - sizer: sizer, - cap: capacity, - items: &linkedQueue[T]{}, + sizer: sizer, + cap: capacity, + items: &linkedQueue[T]{}, + blocking: blocking, } - sq.hasElements = sync.NewCond(&sq.mu) + sq.hasMoreElements = newCond(&sq.mu) + sq.hasMoreSpace = newCond(&sq.mu) return sq } @@ -84,14 +88,20 @@ func (sq *sizedQueue[T]) Offer(ctx context.Context, el T) error { sq.mu.Lock() defer sq.mu.Unlock() - if sq.size+elSize > sq.cap { - return ErrQueueIsFull + for sq.size+elSize > sq.cap { + if !sq.blocking { + return ErrQueueIsFull + } + // Wait for more space or before the ctx is Done. + if err := sq.hasMoreSpace.Wait(ctx); err != nil { + return err + } } sq.size += elSize sq.items.push(ctx, el, elSize) // Signal one consumer if any. - sq.hasElements.Signal() + sq.hasMoreElements.Signal() return nil } @@ -104,9 +114,10 @@ func (sq *sizedQueue[T]) pop() (context.Context, T, bool) { for { if sq.size > 0 { - ctx, el, elSize := sq.items.pop() + elCtx, el, elSize := sq.items.pop() sq.size -= elSize - return ctx, el, true + sq.hasMoreSpace.Signal() + return elCtx, el, true } if sq.stopped { @@ -114,7 +125,9 @@ func (sq *sizedQueue[T]) pop() (context.Context, T, bool) { return context.Background(), el, false } - sq.hasElements.Wait() + // TODO: Change the Queue interface to return an error to allow distinguish between shutdown and context canceled. + // Ok to ignore the error, since the context.Background() will never be done. + _ = sq.hasMoreElements.Wait(context.Background()) } } @@ -123,7 +136,7 @@ func (sq *sizedQueue[T]) Shutdown(context.Context) error { sq.mu.Lock() defer sq.mu.Unlock() sq.stopped = true - sq.hasElements.Broadcast() + sq.hasMoreElements.Broadcast() return nil } diff --git a/exporter/exporterqueue/sized_queue_test.go b/exporter/exporterqueue/sized_queue_test.go index 4fa5e81dee8..66632acdb74 100644 --- a/exporter/exporterqueue/sized_queue_test.go +++ b/exporter/exporterqueue/sized_queue_test.go @@ -18,7 +18,7 @@ func (s sizerInt) Sizeof(el int) int64 { } func TestSizedQueue(t *testing.T) { - q := newSizedQueue[int](7, sizerInt{}) + q := newSizedQueue[int](7, sizerInt{}, false) require.NoError(t, q.Offer(context.Background(), 1)) assert.Equal(t, 1, q.Size()) assert.Equal(t, 7, q.Capacity()) @@ -47,7 +47,7 @@ func TestSizedQueue(t *testing.T) { } func TestSizedQueue_DrainAllElements(t *testing.T) { - q := newSizedQueue[int](7, sizerInt{}) + q := newSizedQueue[int](7, sizerInt{}, false) require.NoError(t, q.Offer(context.Background(), 1)) require.NoError(t, q.Offer(context.Background(), 3)) @@ -68,12 +68,12 @@ func TestSizedQueue_DrainAllElements(t *testing.T) { } func TestSizedChannel_OfferInvalidSize(t *testing.T) { - q := newSizedQueue[int](1, sizerInt{}) + q := newSizedQueue[int](1, sizerInt{}, false) require.ErrorIs(t, q.Offer(context.Background(), -1), errInvalidSize) } func TestSizedChannel_OfferZeroSize(t *testing.T) { - q := newSizedQueue[int](1, sizerInt{}) + q := newSizedQueue[int](1, sizerInt{}, false) require.NoError(t, q.Offer(context.Background(), 0)) require.NoError(t, q.Shutdown(context.Background())) // Because the size 0 is ignored, nothing to drain.