Skip to content

Commit

Permalink
Allow retries to pick the next locator in sequence
Browse files Browse the repository at this point in the history
This is done via the Environment.locatorSelectSequential property, which
can be configured from the tests.

Signed-off-by: Aitor Perez Cedres <[email protected]>
  • Loading branch information
ablease authored and Zerpet committed Sep 22, 2023
1 parent 285993e commit 8259676
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 10 deletions.
36 changes: 30 additions & 6 deletions pkg/stream/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ const (
)

type Environment struct {
configuration EnvironmentConfiguration
locators []*locator
backOffPolicy func(int) time.Duration
configuration EnvironmentConfiguration
locators []*locator
backOffPolicy func(int) time.Duration
locatorSelectSequential bool
}

func NewEnvironment(ctx context.Context, configuration EnvironmentConfiguration) (*Environment, error) {
Expand Down Expand Up @@ -125,8 +126,15 @@ func (e *Environment) DeleteStream(ctx context.Context, name string) error {
n := len(e.locators)

var lastError error
var l *locator
for i := 0; i < n; i++ {
l := e.pickLocator((i + rn) % n)
if e.locatorSelectSequential {
// round robin / sequential
l = e.locators[i]
} else {
// pick at random
l = e.pickLocator((i + rn) % n)
}

if err := l.maybeInitializeLocator(); err != nil {
logger.Error("locator not available", slog.Any("error", err))
Expand Down Expand Up @@ -182,8 +190,16 @@ func (e *Environment) QueryStreamStats(ctx context.Context, name string) (Stats,
n := len(e.locators)

var lastError error
var l *locator
for i := 0; i < n; i++ {
l := e.pickLocator((i + rn) % n)
if e.locatorSelectSequential {
// round robin / sequential
l = e.locators[i]
} else {
// pick at random
l = e.pickLocator((i + rn) % n)
}

if err := l.maybeInitializeLocator(); err != nil {
lastError = err
logger.Error("error initializing locator", slog.Any("error", err))
Expand Down Expand Up @@ -214,8 +230,16 @@ func (e *Environment) QueryOffset(ctx context.Context, consumer, stream string)
n := len(e.locators)

var lastError error
var l *locator
for i := 0; i < n; i++ {
l := e.pickLocator((i + rn) % n)
if e.locatorSelectSequential {
// round robin / sequential
l = e.locators[i]
} else {
// pick at random
l = e.pickLocator((i + rn) % n)
}

if err := l.maybeInitializeLocator(); err != nil {
lastError = err
logger.Error("error initializing locator", slog.Any("error", err))
Expand Down
12 changes: 8 additions & 4 deletions pkg/stream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ var _ = Describe("Environment", func() {

// marked as flaky because the environment picks a locator randomly
// the test flakes if locator2 is picked first
When("there are multiple locators", FlakeAttempts(3), Label("flaky"), func() {
When("there are multiple locators", func() {
var (
locator2rawClient *stream.MockRawClient
)
Expand All @@ -221,6 +221,7 @@ var _ = Describe("Environment", func() {
locator2rawClient = stream.NewMockRawClient(mockCtrl)
environment.AppendLocatorRawClient(locator2rawClient)
environment.SetBackoffPolicy(backOffPolicyFn)
environment.SetLocatorSelectSequential(true)

mockRawClient.EXPECT().
IsOpen().
Expand All @@ -229,6 +230,7 @@ var _ = Describe("Environment", func() {

It("uses different locators when one fails", func() {
// setup

locator2rawClient.EXPECT().
IsOpen().
Return(true)
Expand Down Expand Up @@ -364,7 +366,7 @@ var _ = Describe("Environment", func() {
})
})

When("there are multiple locators", FlakeAttempts(3), Label("flaky"), func() {
When("there are multiple locators", func() {
var (
locator2rawClient *stream.MockRawClient
)
Expand All @@ -373,6 +375,7 @@ var _ = Describe("Environment", func() {
locator2rawClient = stream.NewMockRawClient(mockCtrl)
environment.AppendLocatorRawClient(locator2rawClient)
environment.SetBackoffPolicy(backOffPolicyFn)
environment.SetLocatorSelectSequential(true)

// have to set server version again because there's a new locator
environment.SetServerVersion("3.11.1")
Expand Down Expand Up @@ -453,7 +456,7 @@ var _ = Describe("Environment", func() {
It("bubbles up the error", func() {
// setup
mockRawClient.EXPECT().
QueryOffset(gomock.AssignableToTypeOf(ctxType), gomock.AssignableToTypeOf("string"), gomock.AssignableToTypeOf("string")).
QueryOffset(gomock.AssignableToTypeOf(ctxType), gomock.AssignableToTypeOf("string"), gomock.AssignableToTypeOf("string")).
Return(uint64(0), errors.New("err not today")).
Times(3)

Expand All @@ -462,7 +465,7 @@ var _ = Describe("Environment", func() {
})
})

When("there are multiple locators", FlakeAttempts(3), Label("flaky"), func() {
When("there are multiple locators", func() {
var (
locator2rawClient *stream.MockRawClient
)
Expand All @@ -471,6 +474,7 @@ var _ = Describe("Environment", func() {
locator2rawClient = stream.NewMockRawClient(mockCtrl)
environment.AppendLocatorRawClient(locator2rawClient)
environment.SetBackoffPolicy(backOffPolicyFn)
environment.SetLocatorSelectSequential(true)
})

It("uses different locators when one fails", func() {
Expand Down
4 changes: 4 additions & 0 deletions pkg/stream/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,7 @@ func (e *Environment) SetServerVersion(v string) {
func (e *Environment) SetBackoffPolicy(f func(int) time.Duration) {
e.backOffPolicy = f
}

func (e *Environment) SetLocatorSelectSequential(v bool) {
e.locatorSelectSequential = v
}

0 comments on commit 8259676

Please sign in to comment.