diff --git a/maintenance/failover/failover.go b/maintenance/failover/failover.go index 5ce21712..11495b3c 100644 --- a/maintenance/failover/failover.go +++ b/maintenance/failover/failover.go @@ -11,7 +11,6 @@ import ( "time" "github.com/bsm/redislock" - "github.com/pace/bricks/backend/k8sapi" "github.com/pace/bricks/maintenance/errors" "github.com/pace/bricks/maintenance/health" "github.com/pace/bricks/maintenance/log" @@ -28,8 +27,6 @@ const ( ACTIVE status = 1 ) -const Label = "github.com.pace.bricks.activepassive" - // ActivePassive implements a failover mechanism that allows // to deploy a service multiple times but ony one will accept // traffic by using the label selector of kubernetes. @@ -55,14 +52,47 @@ type ActivePassive struct { timeToFailover time.Duration locker *redislock.Client - // access to the kubernetes api - client *k8sapi.Client + stateSetter StateSetter // current status of the failover (to show it in the readiness status) state status stateMu sync.RWMutex } +type ActivePassiveOption func(*ActivePassive) error + +func WithCustomStateSetter(fn func(ctx context.Context, state string) error) ActivePassiveOption { + return func(ap *ActivePassive) error { + stateSetter, err := NewCustomStateSetter(fn) + if err != nil { + return fmt.Errorf("failed to create state setter: %w", err) + } + + ap.stateSetter = stateSetter + return nil + } +} + +func WithNoopStateSetter() ActivePassiveOption { + return func(ap *ActivePassive) error { + ap.stateSetter = &NoopStateSetter{} + return nil + } +} + +func WithPodStateSetter() ActivePassiveOption { + return func(ap *ActivePassive) error { + stateSetter, err := NewPodStateSetter() + if err != nil { + return fmt.Errorf("failed to create pod state setter: %w", err) + } + + ap.stateSetter = stateSetter + + return nil + } +} + // NewActivePassive creates a new active passive cluster // identified by the name, the time to failover determines // the frequency of checks performed against the redis to @@ -70,18 +100,29 @@ type ActivePassive struct { // NOTE: creating multiple ActivePassive in one processes // is not working correctly as there is only one readiness // probe. -func NewActivePassive(clusterName string, timeToFailover time.Duration, client *redis.Client) (*ActivePassive, error) { - cl, err := k8sapi.NewClient() - if err != nil { - return nil, err - } - +func NewActivePassive(clusterName string, timeToFailover time.Duration, client *redis.Client, opts ...ActivePassiveOption) (*ActivePassive, error) { ap := &ActivePassive{ clusterName: clusterName, timeToFailover: timeToFailover, locker: redislock.New(client), - client: cl, } + + for _, opt := range opts { + if err := opt(ap); err != nil { + return nil, fmt.Errorf("failed to apply option: %w", err) + } + } + + if ap.stateSetter == nil { + var err error + + // Default state setter uses the k8s api to set the state. + ap.stateSetter, err = NewPodStateSetter() + if err != nil { + return nil, fmt.Errorf("failed to create default state setter: %w", err) + } + } + health.SetCustomReadinessCheck(ap.Handler) return ap, nil @@ -165,6 +206,8 @@ func (a *ActivePassive) Run(ctx context.Context) error { // passive in next iteration logger.Debug().Msg("ttl expired") a.becomeUndefined(ctx) + + continue } refreshTime := d / 2 @@ -222,7 +265,7 @@ func (a *ActivePassive) becomeUndefined(ctx context.Context) { // setState returns true if the state was set successfully func (a *ActivePassive) setState(ctx context.Context, state status) bool { - err := a.client.SetCurrentPodLabel(ctx, Label, a.label(state)) + err := a.stateSetter.SetState(ctx, a.label(state)) if err != nil { log.Ctx(ctx).Error().Err(err).Msg("failed to mark pod as undefined") a.stateMu.Lock() diff --git a/maintenance/failover/state_setter.go b/maintenance/failover/state_setter.go new file mode 100644 index 00000000..d63754cb --- /dev/null +++ b/maintenance/failover/state_setter.go @@ -0,0 +1,53 @@ +package failover + +import ( + "context" + "fmt" + + "github.com/pace/bricks/backend/k8sapi" +) + +const Label = "github.com.pace.bricks.activepassive" + +type StateSetter interface { + SetState(ctx context.Context, state string) error +} + +type podStateSetter struct { + client *k8sapi.Client +} + +func NewPodStateSetter() (*podStateSetter, error) { + client, err := k8sapi.NewClient() + if err != nil { + return nil, fmt.Errorf("failed to create k8s client: %w", err) + } + + return &podStateSetter{client: client}, nil +} + +func (p *podStateSetter) SetState(ctx context.Context, state string) error { + return p.client.SetCurrentPodLabel(ctx, Label, state) +} + +type CustomStateSetter struct { + fn func(ctx context.Context, state string) error +} + +func NewCustomStateSetter(fn func(ctx context.Context, state string) error) (*CustomStateSetter, error) { + if fn == nil { + return nil, fmt.Errorf("fn must not be nil") + } + + return &CustomStateSetter{fn: fn}, nil +} + +func (c *CustomStateSetter) SetState(ctx context.Context, state string) error { + return c.fn(ctx, state) +} + +type NoopStateSetter struct{} + +func (n *NoopStateSetter) SetState(ctx context.Context, state string) error { + return nil +}