From 285993e8313717367a57cb388d09e0a740f0db2a Mon Sep 17 00:00:00 2001 From: Alex Blease Date: Thu, 21 Sep 2023 15:53:52 +0100 Subject: [PATCH 1/2] QueryOffset QueryOffset gets the last consumer offset stored for a consumer and stream name --- pkg/stream/environment.go | 32 ++++++++++++ pkg/stream/environment_test.go | 94 ++++++++++++++++++++++++++++++++++ pkg/stream/locator.go | 8 +++ 3 files changed, 134 insertions(+) diff --git a/pkg/stream/environment.go b/pkg/stream/environment.go index e0c8906b..32e470ec 100644 --- a/pkg/stream/environment.go +++ b/pkg/stream/environment.go @@ -205,3 +205,35 @@ func (e *Environment) QueryStreamStats(ctx context.Context, name string) (Stats, } return Stats{-1, -1}, lastError } + +// QueryOffset retrieves the last consumer offset stored for a given consumer +// name and stream name. +func (e *Environment) QueryOffset(ctx context.Context, consumer, stream string) (uint64, error) { + logger := raw.LoggerFromCtxOrDiscard(ctx) + rn := rand.Intn(100) + n := len(e.locators) + + var lastError error + for i := 0; i < n; i++ { + l := e.pickLocator((i + rn) % n) + if err := l.maybeInitializeLocator(); err != nil { + lastError = err + logger.Error("error initializing locator", slog.Any("error", err)) + continue + } + + result := l.locatorOperation((*locator).operationQueryOffset, ctx, consumer, stream) + if result[1] != nil { + lastError = result[1].(error) + if isNonRetryableError(lastError) { + return uint64(0), lastError + } + logger.Error("locator operation failed", slog.Any("error", lastError)) + continue + } + + offset := result[0].(uint64) + return offset, nil + } + return uint64(0), lastError +} diff --git a/pkg/stream/environment_test.go b/pkg/stream/environment_test.go index be47e69f..fe8d8752 100644 --- a/pkg/stream/environment_test.go +++ b/pkg/stream/environment_test.go @@ -429,4 +429,98 @@ var _ = Describe("Environment", func() { Eventually(logBuffer).Within(time.Millisecond * 500).Should(gbytes.Say(`"locator operation failed" error="err maybe later"`)) }) }) + + Context("query offset", func() { + BeforeEach(func() { + mockRawClient.EXPECT(). + IsOpen(). + Return(true) // from maybeInitializeLocator + }) + + It("queries offset for a given consumer and stream", func() { + // setup + mockRawClient.EXPECT(). + QueryOffset(gomock.AssignableToTypeOf(ctxType), gomock.AssignableToTypeOf("string"), gomock.AssignableToTypeOf("string")). + Return(uint64(42), nil) + + // act + offset, err := environment.QueryOffset(rootCtx, "consumer-with-offset", "stream") + Expect(err).ToNot(HaveOccurred()) + Expect(offset).To(BeNumerically("==", 42)) + }) + + When("there is an error", func() { + It("bubbles up the error", func() { + // setup + mockRawClient.EXPECT(). + QueryOffset(gomock.AssignableToTypeOf(ctxType), gomock.AssignableToTypeOf("string"), gomock.AssignableToTypeOf("string")). + Return(uint64(0), errors.New("err not today")). + Times(3) + + _, err := environment.QueryOffset(rootCtx, "retryable-error", "stream") + Expect(err).To(MatchError("err not today")) + }) + }) + + When("there are multiple locators", FlakeAttempts(3), Label("flaky"), func() { + var ( + locator2rawClient *stream.MockRawClient + ) + + BeforeEach(func() { + locator2rawClient = stream.NewMockRawClient(mockCtrl) + environment.AppendLocatorRawClient(locator2rawClient) + environment.SetBackoffPolicy(backOffPolicyFn) + }) + + It("uses different locators when one fails", func() { + // setup + locator2rawClient.EXPECT(). + IsOpen(). + Return(true) + locator2rawClient.EXPECT(). + QueryOffset(gomock.AssignableToTypeOf(ctxType), gomock.AssignableToTypeOf("string"), gomock.AssignableToTypeOf("string")). + Return(uint64(42), nil) + + mockRawClient.EXPECT(). + QueryOffset(gomock.AssignableToTypeOf(ctxType), gomock.AssignableToTypeOf("string"), gomock.AssignableToTypeOf("string")). + Return(uint64(0), errors.New("something went wrong")). + Times(3) + + // act + offset, err := environment.QueryOffset(rootCtx, "retried-offset", "stream") + Expect(err).ToNot(HaveOccurred()) + Expect(offset).To(BeNumerically("==", 42)) + }) + + It("gives up on non-retryable errors", func() { + // setup + mockRawClient.EXPECT(). + QueryOffset(gomock.AssignableToTypeOf(ctxType), gomock.Eq("non-retryable"), gomock.AssignableToTypeOf("string")). + Return(uint64(0), raw.ErrStreamDoesNotExist) + + // act + _, err := environment.QueryOffset(rootCtx, "non-retryable", "stream") + Expect(err).To(HaveOccurred()) + }) + }) + + It("logs intermediate error messages", func() { + // setup + logBuffer := gbytes.NewBuffer() + logger := slog.New(slog.NewTextHandler(logBuffer)) + ctx := raw.NewContextWithLogger(context.Background(), *logger) + + mockRawClient.EXPECT(). + QueryOffset(gomock.AssignableToTypeOf(ctxType), gomock.AssignableToTypeOf("string"), gomock.AssignableToTypeOf("string")). + Return(uint64(0), errors.New("err maybe later")). + Times(3) + + // act + _, err := environment.QueryOffset(ctx, "log-things", "stream") + Expect(err).To(HaveOccurred()) + + Eventually(logBuffer).Within(time.Millisecond * 500).Should(gbytes.Say(`"locator operation failed" error="err maybe later"`)) + }) + }) }) diff --git a/pkg/stream/locator.go b/pkg/stream/locator.go index 4eb8208f..62fb78e0 100644 --- a/pkg/stream/locator.go +++ b/pkg/stream/locator.go @@ -196,3 +196,11 @@ func (l *locator) operationQueryStreamStats(args ...any) []any { stats, err := l.client.StreamStats(ctx, name) return []any{stats, err} } + +func (l *locator) operationQueryOffset(args ...any) []any { + ctx := args[0].(context.Context) + reference := args[1].(string) + stream := args[2].(string) + offset, err := l.client.QueryOffset(ctx, reference, stream) + return []any{offset, err} +} From 8259676aaf6cbe131506adf2c00f75917c21e000 Mon Sep 17 00:00:00 2001 From: Alex Blease Date: Fri, 22 Sep 2023 16:21:59 +0100 Subject: [PATCH 2/2] Allow retries to pick the next locator in sequence This is done via the Environment.locatorSelectSequential property, which can be configured from the tests. Signed-off-by: Aitor Perez Cedres --- pkg/stream/environment.go | 36 ++++++++++++++++++++++++++++------ pkg/stream/environment_test.go | 12 ++++++++---- pkg/stream/test_helpers.go | 4 ++++ 3 files changed, 42 insertions(+), 10 deletions(-) diff --git a/pkg/stream/environment.go b/pkg/stream/environment.go index 32e470ec..25d436bf 100644 --- a/pkg/stream/environment.go +++ b/pkg/stream/environment.go @@ -15,9 +15,10 @@ const ( ) type Environment struct { - configuration EnvironmentConfiguration - locators []*locator - backOffPolicy func(int) time.Duration + configuration EnvironmentConfiguration + locators []*locator + backOffPolicy func(int) time.Duration + locatorSelectSequential bool } func NewEnvironment(ctx context.Context, configuration EnvironmentConfiguration) (*Environment, error) { @@ -125,8 +126,15 @@ func (e *Environment) DeleteStream(ctx context.Context, name string) error { n := len(e.locators) var lastError error + var l *locator for i := 0; i < n; i++ { - l := e.pickLocator((i + rn) % n) + if e.locatorSelectSequential { + // round robin / sequential + l = e.locators[i] + } else { + // pick at random + l = e.pickLocator((i + rn) % n) + } if err := l.maybeInitializeLocator(); err != nil { logger.Error("locator not available", slog.Any("error", err)) @@ -182,8 +190,16 @@ func (e *Environment) QueryStreamStats(ctx context.Context, name string) (Stats, n := len(e.locators) var lastError error + var l *locator for i := 0; i < n; i++ { - l := e.pickLocator((i + rn) % n) + if e.locatorSelectSequential { + // round robin / sequential + l = e.locators[i] + } else { + // pick at random + l = e.pickLocator((i + rn) % n) + } + if err := l.maybeInitializeLocator(); err != nil { lastError = err logger.Error("error initializing locator", slog.Any("error", err)) @@ -214,8 +230,16 @@ func (e *Environment) QueryOffset(ctx context.Context, consumer, stream string) n := len(e.locators) var lastError error + var l *locator for i := 0; i < n; i++ { - l := e.pickLocator((i + rn) % n) + if e.locatorSelectSequential { + // round robin / sequential + l = e.locators[i] + } else { + // pick at random + l = e.pickLocator((i + rn) % n) + } + if err := l.maybeInitializeLocator(); err != nil { lastError = err logger.Error("error initializing locator", slog.Any("error", err)) diff --git a/pkg/stream/environment_test.go b/pkg/stream/environment_test.go index fe8d8752..7ca63f34 100644 --- a/pkg/stream/environment_test.go +++ b/pkg/stream/environment_test.go @@ -212,7 +212,7 @@ var _ = Describe("Environment", func() { // marked as flaky because the environment picks a locator randomly // the test flakes if locator2 is picked first - When("there are multiple locators", FlakeAttempts(3), Label("flaky"), func() { + When("there are multiple locators", func() { var ( locator2rawClient *stream.MockRawClient ) @@ -221,6 +221,7 @@ var _ = Describe("Environment", func() { locator2rawClient = stream.NewMockRawClient(mockCtrl) environment.AppendLocatorRawClient(locator2rawClient) environment.SetBackoffPolicy(backOffPolicyFn) + environment.SetLocatorSelectSequential(true) mockRawClient.EXPECT(). IsOpen(). @@ -229,6 +230,7 @@ var _ = Describe("Environment", func() { It("uses different locators when one fails", func() { // setup + locator2rawClient.EXPECT(). IsOpen(). Return(true) @@ -364,7 +366,7 @@ var _ = Describe("Environment", func() { }) }) - When("there are multiple locators", FlakeAttempts(3), Label("flaky"), func() { + When("there are multiple locators", func() { var ( locator2rawClient *stream.MockRawClient ) @@ -373,6 +375,7 @@ var _ = Describe("Environment", func() { locator2rawClient = stream.NewMockRawClient(mockCtrl) environment.AppendLocatorRawClient(locator2rawClient) environment.SetBackoffPolicy(backOffPolicyFn) + environment.SetLocatorSelectSequential(true) // have to set server version again because there's a new locator environment.SetServerVersion("3.11.1") @@ -453,7 +456,7 @@ var _ = Describe("Environment", func() { It("bubbles up the error", func() { // setup mockRawClient.EXPECT(). - QueryOffset(gomock.AssignableToTypeOf(ctxType), gomock.AssignableToTypeOf("string"), gomock.AssignableToTypeOf("string")). + QueryOffset(gomock.AssignableToTypeOf(ctxType), gomock.AssignableToTypeOf("string"), gomock.AssignableToTypeOf("string")). Return(uint64(0), errors.New("err not today")). Times(3) @@ -462,7 +465,7 @@ var _ = Describe("Environment", func() { }) }) - When("there are multiple locators", FlakeAttempts(3), Label("flaky"), func() { + When("there are multiple locators", func() { var ( locator2rawClient *stream.MockRawClient ) @@ -471,6 +474,7 @@ var _ = Describe("Environment", func() { locator2rawClient = stream.NewMockRawClient(mockCtrl) environment.AppendLocatorRawClient(locator2rawClient) environment.SetBackoffPolicy(backOffPolicyFn) + environment.SetLocatorSelectSequential(true) }) It("uses different locators when one fails", func() { diff --git a/pkg/stream/test_helpers.go b/pkg/stream/test_helpers.go index 27246dc5..b6cbc3f1 100644 --- a/pkg/stream/test_helpers.go +++ b/pkg/stream/test_helpers.go @@ -51,3 +51,7 @@ func (e *Environment) SetServerVersion(v string) { func (e *Environment) SetBackoffPolicy(f func(int) time.Duration) { e.backOffPolicy = f } + +func (e *Environment) SetLocatorSelectSequential(v bool) { + e.locatorSelectSequential = v +}