From 8259676aaf6cbe131506adf2c00f75917c21e000 Mon Sep 17 00:00:00 2001 From: Alex Blease Date: Fri, 22 Sep 2023 16:21:59 +0100 Subject: [PATCH] Allow retries to pick the next locator in sequence This is done via the Environment.locatorSelectSequential property, which can be configured from the tests. Signed-off-by: Aitor Perez Cedres --- pkg/stream/environment.go | 36 ++++++++++++++++++++++++++++------ pkg/stream/environment_test.go | 12 ++++++++---- pkg/stream/test_helpers.go | 4 ++++ 3 files changed, 42 insertions(+), 10 deletions(-) diff --git a/pkg/stream/environment.go b/pkg/stream/environment.go index 32e470ec..25d436bf 100644 --- a/pkg/stream/environment.go +++ b/pkg/stream/environment.go @@ -15,9 +15,10 @@ const ( ) type Environment struct { - configuration EnvironmentConfiguration - locators []*locator - backOffPolicy func(int) time.Duration + configuration EnvironmentConfiguration + locators []*locator + backOffPolicy func(int) time.Duration + locatorSelectSequential bool } func NewEnvironment(ctx context.Context, configuration EnvironmentConfiguration) (*Environment, error) { @@ -125,8 +126,15 @@ func (e *Environment) DeleteStream(ctx context.Context, name string) error { n := len(e.locators) var lastError error + var l *locator for i := 0; i < n; i++ { - l := e.pickLocator((i + rn) % n) + if e.locatorSelectSequential { + // round robin / sequential + l = e.locators[i] + } else { + // pick at random + l = e.pickLocator((i + rn) % n) + } if err := l.maybeInitializeLocator(); err != nil { logger.Error("locator not available", slog.Any("error", err)) @@ -182,8 +190,16 @@ func (e *Environment) QueryStreamStats(ctx context.Context, name string) (Stats, n := len(e.locators) var lastError error + var l *locator for i := 0; i < n; i++ { - l := e.pickLocator((i + rn) % n) + if e.locatorSelectSequential { + // round robin / sequential + l = e.locators[i] + } else { + // pick at random + l = e.pickLocator((i + rn) % n) + } + if err := l.maybeInitializeLocator(); err != nil { lastError = err logger.Error("error initializing locator", slog.Any("error", err)) @@ -214,8 +230,16 @@ func (e *Environment) QueryOffset(ctx context.Context, consumer, stream string) n := len(e.locators) var lastError error + var l *locator for i := 0; i < n; i++ { - l := e.pickLocator((i + rn) % n) + if e.locatorSelectSequential { + // round robin / sequential + l = e.locators[i] + } else { + // pick at random + l = e.pickLocator((i + rn) % n) + } + if err := l.maybeInitializeLocator(); err != nil { lastError = err logger.Error("error initializing locator", slog.Any("error", err)) diff --git a/pkg/stream/environment_test.go b/pkg/stream/environment_test.go index fe8d8752..7ca63f34 100644 --- a/pkg/stream/environment_test.go +++ b/pkg/stream/environment_test.go @@ -212,7 +212,7 @@ var _ = Describe("Environment", func() { // marked as flaky because the environment picks a locator randomly // the test flakes if locator2 is picked first - When("there are multiple locators", FlakeAttempts(3), Label("flaky"), func() { + When("there are multiple locators", func() { var ( locator2rawClient *stream.MockRawClient ) @@ -221,6 +221,7 @@ var _ = Describe("Environment", func() { locator2rawClient = stream.NewMockRawClient(mockCtrl) environment.AppendLocatorRawClient(locator2rawClient) environment.SetBackoffPolicy(backOffPolicyFn) + environment.SetLocatorSelectSequential(true) mockRawClient.EXPECT(). IsOpen(). @@ -229,6 +230,7 @@ var _ = Describe("Environment", func() { It("uses different locators when one fails", func() { // setup + locator2rawClient.EXPECT(). IsOpen(). Return(true) @@ -364,7 +366,7 @@ var _ = Describe("Environment", func() { }) }) - When("there are multiple locators", FlakeAttempts(3), Label("flaky"), func() { + When("there are multiple locators", func() { var ( locator2rawClient *stream.MockRawClient ) @@ -373,6 +375,7 @@ var _ = Describe("Environment", func() { locator2rawClient = stream.NewMockRawClient(mockCtrl) environment.AppendLocatorRawClient(locator2rawClient) environment.SetBackoffPolicy(backOffPolicyFn) + environment.SetLocatorSelectSequential(true) // have to set server version again because there's a new locator environment.SetServerVersion("3.11.1") @@ -453,7 +456,7 @@ var _ = Describe("Environment", func() { It("bubbles up the error", func() { // setup mockRawClient.EXPECT(). - QueryOffset(gomock.AssignableToTypeOf(ctxType), gomock.AssignableToTypeOf("string"), gomock.AssignableToTypeOf("string")). + QueryOffset(gomock.AssignableToTypeOf(ctxType), gomock.AssignableToTypeOf("string"), gomock.AssignableToTypeOf("string")). Return(uint64(0), errors.New("err not today")). Times(3) @@ -462,7 +465,7 @@ var _ = Describe("Environment", func() { }) }) - When("there are multiple locators", FlakeAttempts(3), Label("flaky"), func() { + When("there are multiple locators", func() { var ( locator2rawClient *stream.MockRawClient ) @@ -471,6 +474,7 @@ var _ = Describe("Environment", func() { locator2rawClient = stream.NewMockRawClient(mockCtrl) environment.AppendLocatorRawClient(locator2rawClient) environment.SetBackoffPolicy(backOffPolicyFn) + environment.SetLocatorSelectSequential(true) }) It("uses different locators when one fails", func() { diff --git a/pkg/stream/test_helpers.go b/pkg/stream/test_helpers.go index 27246dc5..b6cbc3f1 100644 --- a/pkg/stream/test_helpers.go +++ b/pkg/stream/test_helpers.go @@ -51,3 +51,7 @@ func (e *Environment) SetServerVersion(v string) { func (e *Environment) SetBackoffPolicy(f func(int) time.Duration) { e.backOffPolicy = f } + +func (e *Environment) SetLocatorSelectSequential(v bool) { + e.locatorSelectSequential = v +}