diff --git a/.chloggen/mv-int64.yaml b/.chloggen/mv-int64.yaml new file mode 100644 index 00000000000..2d5ba597138 --- /dev/null +++ b/.chloggen/mv-int64.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterqueue + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Change Queue Size and Capacity to return explicit int64. + +# One or more tracking issues or pull requests related to the change +issues: [] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/exporter/exporterhelper/internal/queue_sender.go b/exporter/exporterhelper/internal/queue_sender.go index 3c99cf38876..473d55fbc11 100644 --- a/exporter/exporterhelper/internal/queue_sender.go +++ b/exporter/exporterhelper/internal/queue_sender.go @@ -131,7 +131,7 @@ func (qs *QueueSender) Start(ctx context.Context, host component.Host) error { dataTypeAttr := attribute.String(DataTypeKey, qs.obsrep.Signal.String()) - reg1, err1 := qs.obsrep.TelemetryBuilder.InitExporterQueueSize(func() int64 { return int64(qs.queue.Size()) }, + reg1, err1 := qs.obsrep.TelemetryBuilder.InitExporterQueueSize(func() int64 { return qs.queue.Size() }, metric.WithAttributeSet(attribute.NewSet(qs.traceAttribute, dataTypeAttr))) if reg1 != nil { @@ -140,7 +140,7 @@ func (qs *QueueSender) Start(ctx context.Context, host component.Host) error { }) } - reg2, err2 := qs.obsrep.TelemetryBuilder.InitExporterQueueCapacity(func() int64 { return int64(qs.queue.Capacity()) }, + reg2, err2 := qs.obsrep.TelemetryBuilder.InitExporterQueueCapacity(func() int64 { return qs.queue.Capacity() }, metric.WithAttributeSet(attribute.NewSet(qs.traceAttribute))) if reg2 != nil { diff --git a/exporter/exporterhelper/internal/queue_sender_test.go b/exporter/exporterhelper/internal/queue_sender_test.go index 8acf7b37131..3a55fb09024 100644 --- a/exporter/exporterhelper/internal/queue_sender_test.go +++ b/exporter/exporterhelper/internal/queue_sender_test.go @@ -54,7 +54,7 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) { require.NoError(t, be.Send(context.Background(), secondMockR)) }) - require.LessOrEqual(t, 1, be.QueueSender.(*QueueSender).queue.Size()) + require.LessOrEqual(t, int64(1), be.QueueSender.(*QueueSender).queue.Size()) require.NoError(t, be.Shutdown(context.Background())) diff --git a/exporter/exporterqueue/bounded_memory_queue_test.go b/exporter/exporterqueue/bounded_memory_queue_test.go index f723f8263f8..78c0cb1b8fb 100644 --- a/exporter/exporterqueue/bounded_memory_queue_test.go +++ b/exporter/exporterqueue/bounded_memory_queue_test.go @@ -35,15 +35,15 @@ func TestBoundedQueue(t *testing.T) { return nil })) assert.Equal(t, 1, numConsumed) - assert.Equal(t, 0, q.Size()) + assert.Equal(t, int64(0), q.Size()) // produce two more items. The first one should be accepted, but not consumed. require.NoError(t, q.Offer(context.Background(), "b")) - assert.Equal(t, 1, q.Size()) + assert.Equal(t, int64(1), q.Size()) // the second should be rejected since the queue is full require.ErrorIs(t, q.Offer(context.Background(), "c"), ErrQueueIsFull) - assert.Equal(t, 1, q.Size()) + assert.Equal(t, int64(1), q.Size()) assert.True(t, consume(q, func(_ context.Context, item string) error { assert.Equal(t, "b", item) @@ -82,7 +82,7 @@ func TestShutdownWhileNotEmpty(t *testing.T) { } assert.NoError(t, q.Shutdown(context.Background())) - assert.Equal(t, 10, q.Size()) + assert.Equal(t, int64(10), q.Size()) numConsumed := 0 for i := 0; i < 10; i++ { assert.True(t, consume(q, func(_ context.Context, item string) error { @@ -92,7 +92,7 @@ func TestShutdownWhileNotEmpty(t *testing.T) { })) } assert.Equal(t, 10, numConsumed) - assert.Equal(t, 0, q.Size()) + assert.Equal(t, int64(0), q.Size()) assert.False(t, consume(q, func(_ context.Context, item string) error { panic(item) diff --git a/exporter/exporterqueue/persistent_queue.go b/exporter/exporterqueue/persistent_queue.go index 586a5139630..3d191734574 100644 --- a/exporter/exporterqueue/persistent_queue.go +++ b/exporter/exporterqueue/persistent_queue.go @@ -112,14 +112,14 @@ func (pq *persistentQueue[T]) Start(ctx context.Context, host component.Host) er return nil } -func (pq *persistentQueue[T]) Size() int { +func (pq *persistentQueue[T]) Size() int64 { pq.mu.Lock() defer pq.mu.Unlock() - return int(pq.queueSize) + return pq.queueSize } -func (pq *persistentQueue[T]) Capacity() int { - return int(pq.set.capacity) +func (pq *persistentQueue[T]) Capacity() int64 { + return pq.set.capacity } func (pq *persistentQueue[T]) initClient(ctx context.Context, client storage.Client) { diff --git a/exporter/exporterqueue/persistent_queue_test.go b/exporter/exporterqueue/persistent_queue_test.go index 6866573efdb..7945ef2aada 100644 --- a/exporter/exporterqueue/persistent_queue_test.go +++ b/exporter/exporterqueue/persistent_queue_test.go @@ -280,7 +280,7 @@ func TestPersistentQueue_FullCapacity(t *testing.T) { name string sizer sizer[ptrace.Traces] capacity int64 - sizeMultiplier int + sizeMultiplier int64 }{ { name: "requests_capacity", @@ -302,7 +302,7 @@ func TestPersistentQueue_FullCapacity(t *testing.T) { <-done return nil }) - assert.Equal(t, 0, pq.Size()) + assert.Equal(t, int64(0), pq.Size()) req := newTracesRequest(1, 10) @@ -519,7 +519,7 @@ func TestPersistentQueue_CorruptedData(t *testing.T) { corruptCurrentlyDispatchedItemsKey bool corruptReadIndex bool corruptWriteIndex bool - desiredQueueSize int + desiredQueueSize int64 }{ { name: "corrupted no items", @@ -572,11 +572,11 @@ func TestPersistentQueue_CorruptedData(t *testing.T) { err := ps.Offer(context.Background(), req) require.NoError(t, err) } - assert.Equal(t, 3, ps.Size()) + assert.Equal(t, int64(3), ps.Size()) require.True(t, consume(ps, func(context.Context, ptrace.Traces) error { return experr.NewShutdownErr(nil) })) - assert.Equal(t, 2, ps.Size()) + assert.Equal(t, int64(2), ps.Size()) // We can corrupt data (in several ways) and not worry since we return ShutdownErr client will not be touched. if c.corruptAllData || c.corruptSomeData { @@ -641,7 +641,7 @@ func TestPersistentQueue_CurrentlyProcessedItems(t *testing.T) { // Reload the storage. Since items 0 was not finished, this should be re-enqueued at the end. // The queue should be essentially {3,4,0,2}. newPs := createTestPersistentQueueWithRequestsCapacity(t, ext, 1000) - assert.Equal(t, 4, newPs.Size()) + assert.Equal(t, int64(4), newPs.Size()) requireCurrentlyDispatchedItemsEqual(t, newPs, []uint64{}) // We should be able to pull all remaining items now @@ -654,7 +654,7 @@ func TestPersistentQueue_CurrentlyProcessedItems(t *testing.T) { // The queue should be now empty requireCurrentlyDispatchedItemsEqual(t, newPs, []uint64{}) - assert.Equal(t, 0, newPs.Size()) + assert.Equal(t, int64(0), newPs.Size()) // The writeIndex should be now set accordingly require.EqualValues(t, 6, newPs.writeIndex) @@ -683,14 +683,14 @@ func TestPersistentQueueStartWithNonDispatched(t *testing.T) { require.True(t, consume(ps, func(context.Context, ptrace.Traces) error { // put one more item in require.NoError(t, ps.Offer(context.Background(), req)) - require.Equal(t, 5, ps.Size()) + require.Equal(t, int64(5), ps.Size()) return experr.NewShutdownErr(nil) })) require.NoError(t, ps.Shutdown(context.Background())) // Reload with extra capacity to make sure we re-enqueue in-progress items. newPs := createTestPersistentQueueWithRequestsCapacity(t, ext, 6) - require.Equal(t, 6, newPs.Size()) + require.Equal(t, int64(6), newPs.Size()) } func TestPersistentQueueStartWithNonDispatchedConcurrent(t *testing.T) { @@ -761,31 +761,31 @@ func TestPersistentQueue_PutCloseReadClose(t *testing.T) { req := newTracesRequest(5, 10) ext := storagetest.NewMockStorageExtension(nil) ps := createTestPersistentQueueWithRequestsCapacity(t, ext, 1000) - assert.Equal(t, 0, ps.Size()) + assert.Equal(t, int64(0), ps.Size()) // Put two elements and close the extension assert.NoError(t, ps.Offer(context.Background(), req)) assert.NoError(t, ps.Offer(context.Background(), req)) - assert.Equal(t, 2, ps.Size()) + assert.Equal(t, int64(2), ps.Size()) // TODO: Remove this, after the initialization writes the readIndex. _, _, _, _ = ps.Read(context.Background()) require.NoError(t, ps.Shutdown(context.Background())) newPs := createTestPersistentQueueWithRequestsCapacity(t, ext, 1000) - require.Equal(t, 2, newPs.Size()) + require.Equal(t, int64(2), newPs.Size()) // Let's read both of the elements we put consume(newPs, func(_ context.Context, traces ptrace.Traces) error { require.Equal(t, req, traces) return nil }) - assert.Equal(t, 1, newPs.Size()) + assert.Equal(t, int64(1), newPs.Size()) consume(newPs, func(_ context.Context, traces ptrace.Traces) error { require.Equal(t, req, traces) return nil }) - require.Equal(t, 0, newPs.Size()) + require.Equal(t, int64(0), newPs.Size()) assert.NoError(t, newPs.Shutdown(context.Background())) } @@ -891,7 +891,7 @@ func TestItemIndexArrayMarshaling(t *testing.T) { func TestPersistentQueue_ShutdownWhileConsuming(t *testing.T) { ps := createTestPersistentQueueWithRequestsCapacity(t, storagetest.NewMockStorageExtension(nil), 1000) - assert.Equal(t, 0, ps.Size()) + assert.Equal(t, int64(0), ps.Size()) assert.False(t, ps.client.(*storagetest.MockStorageClient).IsClosed()) require.NoError(t, ps.Offer(context.Background(), newTracesRequest(5, 10))) @@ -916,7 +916,7 @@ func TestPersistentQueue_StorageFull(t *testing.T) { ps := createTestPersistentQueueWithClient(client) // Put enough items in to fill the underlying storage - reqCount := 0 + reqCount := int64(0) for { err = ps.Offer(context.Background(), req) if errors.Is(err, syscall.ENOSPC) { @@ -1005,46 +1005,46 @@ func TestPersistentQueue_ItemsCapacityUsageRestoredOnShutdown(t *testing.T) { ext := storagetest.NewMockStorageExtension(nil) pq := createTestPersistentQueueWithItemsCapacity(t, ext, 100) - assert.Equal(t, 0, pq.Size()) + assert.Equal(t, int64(0), pq.Size()) // Fill the queue up to the capacity. assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(4, 10))) assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(4, 10))) assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(2, 10))) - assert.Equal(t, 100, pq.Size()) + assert.Equal(t, int64(100), pq.Size()) require.ErrorIs(t, pq.Offer(context.Background(), newTracesRequest(5, 5)), ErrQueueIsFull) - assert.Equal(t, 100, pq.Size()) + assert.Equal(t, int64(100), pq.Size()) assert.True(t, consume(pq, func(_ context.Context, traces ptrace.Traces) error { assert.Equal(t, 40, traces.SpanCount()) return nil })) - assert.Equal(t, 60, pq.Size()) + assert.Equal(t, int64(60), pq.Size()) require.NoError(t, pq.Shutdown(context.Background())) newPQ := createTestPersistentQueueWithItemsCapacity(t, ext, 100) // The queue should be restored to the previous size. - assert.Equal(t, 60, newPQ.Size()) + assert.Equal(t, int64(60), newPQ.Size()) require.NoError(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5))) // Check the combined queue size. - assert.Equal(t, 70, newPQ.Size()) + assert.Equal(t, int64(70), newPQ.Size()) assert.True(t, consume(newPQ, func(_ context.Context, traces ptrace.Traces) error { assert.Equal(t, 40, traces.SpanCount()) return nil })) - assert.Equal(t, 30, newPQ.Size()) + assert.Equal(t, int64(30), newPQ.Size()) assert.True(t, consume(newPQ, func(_ context.Context, traces ptrace.Traces) error { assert.Equal(t, 20, traces.SpanCount()) return nil })) - assert.Equal(t, 10, newPQ.Size()) + assert.Equal(t, int64(10), newPQ.Size()) assert.NoError(t, newPQ.Shutdown(context.Background())) } @@ -1054,54 +1054,54 @@ func TestPersistentQueue_ItemsCapacityUsageIsNotPreserved(t *testing.T) { ext := storagetest.NewMockStorageExtension(nil) pq := createTestPersistentQueueWithRequestsCapacity(t, ext, 100) - assert.Equal(t, 0, pq.Size()) + assert.Equal(t, int64(0), pq.Size()) assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(4, 10))) assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(2, 10))) assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(5, 5))) - assert.Equal(t, 3, pq.Size()) + assert.Equal(t, int64(3), pq.Size()) assert.True(t, consume(pq, func(_ context.Context, traces ptrace.Traces) error { assert.Equal(t, 40, traces.SpanCount()) return nil })) - assert.Equal(t, 2, pq.Size()) + assert.Equal(t, int64(2), pq.Size()) require.NoError(t, pq.Shutdown(context.Background())) newPQ := createTestPersistentQueueWithItemsCapacity(t, ext, 100) // The queue items size cannot be restored, fall back to request-based size - assert.Equal(t, 2, newPQ.Size()) + assert.Equal(t, int64(2), newPQ.Size()) require.NoError(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5))) // Only new items are correctly reflected - assert.Equal(t, 12, newPQ.Size()) + assert.Equal(t, int64(12), newPQ.Size()) // Consuming a restored request should reduce the restored size by 20 but it should not go to below zero assert.True(t, consume(newPQ, func(_ context.Context, traces ptrace.Traces) error { assert.Equal(t, 20, traces.SpanCount()) return nil })) - assert.Equal(t, 0, newPQ.Size()) + assert.Equal(t, int64(0), newPQ.Size()) // Consuming another restored request should not affect the restored size since it's already dropped to 0. assert.True(t, consume(newPQ, func(_ context.Context, traces ptrace.Traces) error { assert.Equal(t, 25, traces.SpanCount()) return nil })) - assert.Equal(t, 0, newPQ.Size()) + assert.Equal(t, int64(0), newPQ.Size()) // Adding another batch should update the size accordingly require.NoError(t, newPQ.Offer(context.Background(), newTracesRequest(5, 5))) - assert.Equal(t, 25, newPQ.Size()) + assert.Equal(t, int64(25), newPQ.Size()) assert.True(t, consume(newPQ, func(_ context.Context, traces ptrace.Traces) error { assert.Equal(t, 10, traces.SpanCount()) return nil })) - assert.Equal(t, 15, newPQ.Size()) + assert.Equal(t, int64(15), newPQ.Size()) assert.NoError(t, newPQ.Shutdown(context.Background())) } @@ -1112,7 +1112,7 @@ func TestPersistentQueue_RequestCapacityLessAfterRestart(t *testing.T) { ext := storagetest.NewMockStorageExtension(nil) pq := createTestPersistentQueueWithRequestsCapacity(t, ext, 100) - assert.Equal(t, 0, pq.Size()) + assert.Equal(t, int64(0), pq.Size()) assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(4, 10))) assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(2, 10))) @@ -1125,7 +1125,7 @@ func TestPersistentQueue_RequestCapacityLessAfterRestart(t *testing.T) { assert.Equal(t, 40, traces.SpanCount()) return nil })) - assert.Equal(t, 3, pq.Size()) + assert.Equal(t, int64(3), pq.Size()) require.NoError(t, pq.Shutdown(context.Background())) @@ -1134,7 +1134,7 @@ func TestPersistentQueue_RequestCapacityLessAfterRestart(t *testing.T) { newPQ := createTestPersistentQueueWithRequestsCapacity(t, ext, 2) // The queue items size cannot be restored, fall back to request-based size - assert.Equal(t, 3, newPQ.Size()) + assert.Equal(t, int64(3), newPQ.Size()) // Queue is full require.Error(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5))) @@ -1143,7 +1143,7 @@ func TestPersistentQueue_RequestCapacityLessAfterRestart(t *testing.T) { assert.Equal(t, 20, traces.SpanCount()) return nil })) - assert.Equal(t, 2, newPQ.Size()) + assert.Equal(t, int64(2), newPQ.Size()) // Still full require.Error(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5))) @@ -1152,7 +1152,7 @@ func TestPersistentQueue_RequestCapacityLessAfterRestart(t *testing.T) { assert.Equal(t, 25, traces.SpanCount()) return nil })) - assert.Equal(t, 1, newPQ.Size()) + assert.Equal(t, int64(1), newPQ.Size()) // Now it can accept new items assert.NoError(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5))) @@ -1166,34 +1166,34 @@ func TestPersistentQueue_RestoredUsedSizeIsCorrectedOnDrain(t *testing.T) { ext := storagetest.NewMockStorageExtension(nil) pq := createTestPersistentQueueWithItemsCapacity(t, ext, 1000) - assert.Equal(t, 0, pq.Size()) + assert.Equal(t, int64(0), pq.Size()) for i := 0; i < 6; i++ { require.NoError(t, pq.Offer(context.Background(), newTracesRequest(2, 5))) } - assert.Equal(t, 60, pq.Size()) + assert.Equal(t, int64(60), pq.Size()) // Consume 30 items for i := 0; i < 3; i++ { assert.True(t, consume(pq, func(context.Context, ptrace.Traces) error { return nil })) } // The used size is now 30, but the snapshot should have 50, because it's taken every 5 read/writes. - assert.Equal(t, 30, pq.Size()) + assert.Equal(t, int64(30), pq.Size()) // Create a new queue pointed to the same storage newPQ := createTestPersistentQueueWithItemsCapacity(t, ext, 1000) // This is an incorrect size restored from the snapshot. // In reality the size should be 30. Once the queue is drained, it will be updated to the correct size. - assert.Equal(t, 50, newPQ.Size()) + assert.Equal(t, int64(50), newPQ.Size()) assert.True(t, consume(newPQ, func(context.Context, ptrace.Traces) error { return nil })) assert.True(t, consume(newPQ, func(context.Context, ptrace.Traces) error { return nil })) - assert.Equal(t, 30, newPQ.Size()) + assert.Equal(t, int64(30), newPQ.Size()) // Now the size must be correctly reflected assert.True(t, consume(newPQ, func(context.Context, ptrace.Traces) error { return nil })) - assert.Equal(t, 0, newPQ.Size()) + assert.Equal(t, int64(0), newPQ.Size()) assert.NoError(t, newPQ.Shutdown(context.Background())) assert.NoError(t, pq.Shutdown(context.Background())) diff --git a/exporter/exporterqueue/queue.go b/exporter/exporterqueue/queue.go index cb2a6b82de1..7e995a464ee 100644 --- a/exporter/exporterqueue/queue.go +++ b/exporter/exporterqueue/queue.go @@ -28,9 +28,9 @@ type Queue[T any] interface { // It returns ErrQueueIsFull if no space is currently available. Offer(ctx context.Context, item T) error // Size returns the current Size of the queue - Size() int + Size() int64 // Capacity returns the capacity of the queue. - Capacity() int + Capacity() int64 // Read pulls the next available item from the queue along with its index. Once processing is // finished, the index should be called with OnProcessingFinished to clean up the storage. // The function blocks until an item is available or if the queue is stopped. diff --git a/exporter/exporterqueue/sized_queue.go b/exporter/exporterqueue/sized_queue.go index a9b58241c28..2c15b6e483e 100644 --- a/exporter/exporterqueue/sized_queue.go +++ b/exporter/exporterqueue/sized_queue.go @@ -127,12 +127,12 @@ func (sq *sizedQueue[T]) Shutdown(context.Context) error { return nil } -func (sq *sizedQueue[T]) Size() int { +func (sq *sizedQueue[T]) Size() int64 { sq.mu.Lock() defer sq.mu.Unlock() - return int(sq.size) + return sq.size } -func (sq *sizedQueue[T]) Capacity() int { - return int(sq.cap) +func (sq *sizedQueue[T]) Capacity() int64 { + return sq.cap } diff --git a/exporter/exporterqueue/sized_queue_test.go b/exporter/exporterqueue/sized_queue_test.go index 4fa5e81dee8..352dbe76711 100644 --- a/exporter/exporterqueue/sized_queue_test.go +++ b/exporter/exporterqueue/sized_queue_test.go @@ -20,25 +20,25 @@ func (s sizerInt) Sizeof(el int) int64 { func TestSizedQueue(t *testing.T) { q := newSizedQueue[int](7, sizerInt{}) require.NoError(t, q.Offer(context.Background(), 1)) - assert.Equal(t, 1, q.Size()) - assert.Equal(t, 7, q.Capacity()) + assert.Equal(t, int64(1), q.Size()) + assert.Equal(t, int64(7), q.Capacity()) require.NoError(t, q.Offer(context.Background(), 3)) - assert.Equal(t, 4, q.Size()) + assert.Equal(t, int64(4), q.Size()) // should not be able to send to the full queue require.Error(t, q.Offer(context.Background(), 4)) - assert.Equal(t, 4, q.Size()) + assert.Equal(t, int64(4), q.Size()) _, el, ok := q.pop() assert.Equal(t, 1, el) assert.True(t, ok) - assert.Equal(t, 3, q.Size()) + assert.Equal(t, int64(3), q.Size()) _, el, ok = q.pop() assert.Equal(t, 3, el) assert.True(t, ok) - assert.Equal(t, 0, q.Size()) + assert.Equal(t, int64(0), q.Size()) require.NoError(t, q.Shutdown(context.Background())) _, el, ok = q.pop() @@ -54,13 +54,13 @@ func TestSizedQueue_DrainAllElements(t *testing.T) { _, el, ok := q.pop() assert.Equal(t, 1, el) assert.True(t, ok) - assert.Equal(t, 3, q.Size()) + assert.Equal(t, int64(3), q.Size()) require.NoError(t, q.Shutdown(context.Background())) _, el, ok = q.pop() assert.Equal(t, 3, el) assert.True(t, ok) - assert.Equal(t, 0, q.Size()) + assert.Equal(t, int64(0), q.Size()) _, el, ok = q.pop() assert.False(t, ok)