Skip to content

Commit

Permalink
failover: Allow custom state setter
Browse files Browse the repository at this point in the history
  • Loading branch information
monstermunchkin committed Jan 17, 2025
1 parent 79b8e04 commit 4dd2044
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 13 deletions.
68 changes: 55 additions & 13 deletions maintenance/failover/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"time"

"github.com/bsm/redislock"
"github.com/pace/bricks/backend/k8sapi"
"github.com/pace/bricks/maintenance/errors"
"github.com/pace/bricks/maintenance/health"
"github.com/pace/bricks/maintenance/log"
Expand All @@ -26,8 +25,6 @@ const (
ACTIVE status = 1
)

const Label = "github.com.pace.bricks.activepassive"

// ActivePassive implements a failover mechanism that allows
// to deploy a service multiple times but ony one will accept
// traffic by using the label selector of kubernetes.
Expand All @@ -51,31 +48,76 @@ type ActivePassive struct {
timeToFailover time.Duration
locker *redislock.Client

// access to the kubernetes api
k8sClient *k8sapi.Client
stateSetter StateSetter

// current status of the failover (to show it in the readiness status)
state status
stateMu sync.RWMutex
}

type ActivePassiveOption func(*ActivePassive) error

func WithCustomStateSetter(fn func(ctx context.Context, state string) error) ActivePassiveOption {
return func(ap *ActivePassive) error {
stateSetter, err := NewCustomStateSetter(fn)
if err != nil {
return fmt.Errorf("failed to create state setter: %w", err)
}

ap.stateSetter = stateSetter

return nil
}
}

func WithNoopStateSetter() ActivePassiveOption {
return func(ap *ActivePassive) error {
ap.stateSetter = &NoopStateSetter{}

return nil
}
}

func WithPodStateSetter() ActivePassiveOption {
return func(ap *ActivePassive) error {
stateSetter, err := NewPodStateSetter()
if err != nil {
return fmt.Errorf("failed to create pod state setter: %w", err)
}

ap.stateSetter = stateSetter

return nil
}
}

// NewActivePassive creates a new active passive cluster
// identified by the name. The time to fail over determines
// the frequency of checks performed against redis to
// keep the active state.
// NOTE: creating multiple ActivePassive in one process
// is not working correctly as there is only one readiness probe.
func NewActivePassive(clusterName string, timeToFailover time.Duration, client *redis.Client) (*ActivePassive, error) {
k8sClient, err := k8sapi.NewClient()
if err != nil {
return nil, err
}

func NewActivePassive(clusterName string, timeToFailover time.Duration, client *redis.Client, opts ...ActivePassiveOption) (*ActivePassive, error) {
activePassive := &ActivePassive{
clusterName: clusterName,
timeToFailover: timeToFailover,
locker: redislock.New(client),
k8sClient: k8sClient,
}

for _, opt := range opts {
if err := opt(activePassive); err != nil {
return nil, fmt.Errorf("failed to apply option: %w", err)
}
}

if activePassive.stateSetter == nil {
var err error

// Default state setter uses the k8s api to set the state.
activePassive.stateSetter, err = NewPodStateSetter()
if err != nil {
return nil, fmt.Errorf("failed to create default state setter: %w", err)
}
}

health.SetCustomReadinessCheck(activePassive.Handler)
Expand Down Expand Up @@ -198,7 +240,7 @@ func (a *ActivePassive) becomeUndefined(ctx context.Context) {

// setState returns true if the state was set successfully
func (a *ActivePassive) setState(ctx context.Context, state status) bool {
err := a.k8sClient.SetCurrentPodLabel(ctx, Label, a.label(state))
err := a.stateSetter.SetState(ctx, a.label(state))
if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("failed to mark pod as undefined")
a.stateMu.Lock()
Expand Down
53 changes: 53 additions & 0 deletions maintenance/failover/state_setter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package failover

import (
"context"
"fmt"

"github.com/pace/bricks/backend/k8sapi"
)

const Label = "github.com.pace.bricks.activepassive"

type StateSetter interface {
SetState(ctx context.Context, state string) error
}

type podStateSetter struct {
k8sClient *k8sapi.Client
}

func NewPodStateSetter() (*podStateSetter, error) {
k8sClient, err := k8sapi.NewClient()
if err != nil {
return nil, fmt.Errorf("failed to create k8s client: %w", err)
}

return &podStateSetter{k8sClient: k8sClient}, nil
}

func (p *podStateSetter) SetState(ctx context.Context, state string) error {
return p.k8sClient.SetCurrentPodLabel(ctx, Label, state)
}

type CustomStateSetter struct {
fn func(ctx context.Context, state string) error
}

func NewCustomStateSetter(fn func(ctx context.Context, state string) error) (*CustomStateSetter, error) {
if fn == nil {
return nil, fmt.Errorf("fn must not be nil")
}

return &CustomStateSetter{fn: fn}, nil
}

func (c *CustomStateSetter) SetState(ctx context.Context, state string) error {
return c.fn(ctx, state)
}

type NoopStateSetter struct{}

func (n *NoopStateSetter) SetState(ctx context.Context, state string) error {
return nil
}

0 comments on commit 4dd2044

Please sign in to comment.