From 4dd20447201bda5bde4a62800cb1ccfc89292c76 Mon Sep 17 00:00:00 2001 From: Thomas Hipp Date: Wed, 8 Jan 2025 15:41:36 +0100 Subject: [PATCH] failover: Allow custom state setter --- maintenance/failover/failover.go | 68 ++++++++++++++++++++++------ maintenance/failover/state_setter.go | 53 ++++++++++++++++++++++ 2 files changed, 108 insertions(+), 13 deletions(-) create mode 100644 maintenance/failover/state_setter.go diff --git a/maintenance/failover/failover.go b/maintenance/failover/failover.go index 82df646c..d72f0872 100644 --- a/maintenance/failover/failover.go +++ b/maintenance/failover/failover.go @@ -11,7 +11,6 @@ import ( "time" "github.com/bsm/redislock" - "github.com/pace/bricks/backend/k8sapi" "github.com/pace/bricks/maintenance/errors" "github.com/pace/bricks/maintenance/health" "github.com/pace/bricks/maintenance/log" @@ -26,8 +25,6 @@ const ( ACTIVE status = 1 ) -const Label = "github.com.pace.bricks.activepassive" - // ActivePassive implements a failover mechanism that allows // to deploy a service multiple times but ony one will accept // traffic by using the label selector of kubernetes. @@ -51,31 +48,76 @@ type ActivePassive struct { timeToFailover time.Duration locker *redislock.Client - // access to the kubernetes api - k8sClient *k8sapi.Client + stateSetter StateSetter // current status of the failover (to show it in the readiness status) state status stateMu sync.RWMutex } +type ActivePassiveOption func(*ActivePassive) error + +func WithCustomStateSetter(fn func(ctx context.Context, state string) error) ActivePassiveOption { + return func(ap *ActivePassive) error { + stateSetter, err := NewCustomStateSetter(fn) + if err != nil { + return fmt.Errorf("failed to create state setter: %w", err) + } + + ap.stateSetter = stateSetter + + return nil + } +} + +func WithNoopStateSetter() ActivePassiveOption { + return func(ap *ActivePassive) error { + ap.stateSetter = &NoopStateSetter{} + + return nil + } +} + +func WithPodStateSetter() ActivePassiveOption { + return func(ap *ActivePassive) error { + stateSetter, err := NewPodStateSetter() + if err != nil { + return fmt.Errorf("failed to create pod state setter: %w", err) + } + + ap.stateSetter = stateSetter + + return nil + } +} + // NewActivePassive creates a new active passive cluster // identified by the name. The time to fail over determines // the frequency of checks performed against redis to // keep the active state. // NOTE: creating multiple ActivePassive in one process // is not working correctly as there is only one readiness probe. -func NewActivePassive(clusterName string, timeToFailover time.Duration, client *redis.Client) (*ActivePassive, error) { - k8sClient, err := k8sapi.NewClient() - if err != nil { - return nil, err - } - +func NewActivePassive(clusterName string, timeToFailover time.Duration, client *redis.Client, opts ...ActivePassiveOption) (*ActivePassive, error) { activePassive := &ActivePassive{ clusterName: clusterName, timeToFailover: timeToFailover, locker: redislock.New(client), - k8sClient: k8sClient, + } + + for _, opt := range opts { + if err := opt(activePassive); err != nil { + return nil, fmt.Errorf("failed to apply option: %w", err) + } + } + + if activePassive.stateSetter == nil { + var err error + + // Default state setter uses the k8s api to set the state. + activePassive.stateSetter, err = NewPodStateSetter() + if err != nil { + return nil, fmt.Errorf("failed to create default state setter: %w", err) + } } health.SetCustomReadinessCheck(activePassive.Handler) @@ -198,7 +240,7 @@ func (a *ActivePassive) becomeUndefined(ctx context.Context) { // setState returns true if the state was set successfully func (a *ActivePassive) setState(ctx context.Context, state status) bool { - err := a.k8sClient.SetCurrentPodLabel(ctx, Label, a.label(state)) + err := a.stateSetter.SetState(ctx, a.label(state)) if err != nil { log.Ctx(ctx).Error().Err(err).Msg("failed to mark pod as undefined") a.stateMu.Lock() diff --git a/maintenance/failover/state_setter.go b/maintenance/failover/state_setter.go new file mode 100644 index 00000000..d818ee4c --- /dev/null +++ b/maintenance/failover/state_setter.go @@ -0,0 +1,53 @@ +package failover + +import ( + "context" + "fmt" + + "github.com/pace/bricks/backend/k8sapi" +) + +const Label = "github.com.pace.bricks.activepassive" + +type StateSetter interface { + SetState(ctx context.Context, state string) error +} + +type podStateSetter struct { + k8sClient *k8sapi.Client +} + +func NewPodStateSetter() (*podStateSetter, error) { + k8sClient, err := k8sapi.NewClient() + if err != nil { + return nil, fmt.Errorf("failed to create k8s client: %w", err) + } + + return &podStateSetter{k8sClient: k8sClient}, nil +} + +func (p *podStateSetter) SetState(ctx context.Context, state string) error { + return p.k8sClient.SetCurrentPodLabel(ctx, Label, state) +} + +type CustomStateSetter struct { + fn func(ctx context.Context, state string) error +} + +func NewCustomStateSetter(fn func(ctx context.Context, state string) error) (*CustomStateSetter, error) { + if fn == nil { + return nil, fmt.Errorf("fn must not be nil") + } + + return &CustomStateSetter{fn: fn}, nil +} + +func (c *CustomStateSetter) SetState(ctx context.Context, state string) error { + return c.fn(ctx, state) +} + +type NoopStateSetter struct{} + +func (n *NoopStateSetter) SetState(ctx context.Context, state string) error { + return nil +}