Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor failover mechanism #393

Closed
wants to merge 11 commits into from
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ tools/testserver/math/math.pb.go: tools/testserver/math/math.proto

.PHONY: lint
lint:
(cd /; $(GO) install -v -x github.com/golangci/golangci-lint/cmd/golangci-lint@latest)
(cd /; $(GO) install -x github.com/golangci/golangci-lint/cmd/golangci-lint@latest)

golangci-lint run --timeout 2m

Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ require (
github.com/PuerkitoBio/rehttp v1.4.0
github.com/adjust/rmq/v5 v5.2.0
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d
github.com/bsm/redislock v0.9.3
github.com/caarlos0/env/v10 v10.0.0
github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d
github.com/dave/jennifer v1.4.1
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/bsm/redislock v0.9.3 h1:osmvugkXGiLDEhzUPdM0EUtKpTEgLLuli4Ky2Z4vx38=
github.com/bsm/redislock v0.9.3/go.mod h1:Epf7AJLiSFwLCiZcfi6pWFO/8eAYrYpQXFxEDPoDeAk=
github.com/caarlos0/env/v10 v10.0.0 h1:yIHUBZGsyqCnpTkbjk8asUlx6RFhhEs+h7TOBdgdzXA=
github.com/caarlos0/env/v10 v10.0.0/go.mod h1:ZfulV76NvVPw3tm591U4SwL3Xx9ldzBP9aGxzeN7G18=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
Expand Down
160 changes: 106 additions & 54 deletions maintenance/failover/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@
import (
"context"
"fmt"
"github.com/pace/bricks/pkg/lock/redislock"

Check failure on line 8 in maintenance/failover/failover.go

View workflow job for this annotation

GitHub Actions / test (1.23)

File is not properly formatted (gofumpt)

Check failure on line 8 in maintenance/failover/failover.go

View workflow job for this annotation

GitHub Actions / test (1.23)

File is not properly formatted (gofumpt)
"net/http"
"strings"
"sync"
"time"

"github.com/bsm/redislock"
"github.com/rs/zerolog"

"github.com/pace/bricks/backend/k8sapi"
"github.com/pace/bricks/maintenance/errors"
"github.com/pace/bricks/maintenance/health"
"github.com/pace/bricks/maintenance/log"
"github.com/redis/go-redis/v9"
)

const waitRetry = time.Millisecond * 500

type status int

const (
Expand All @@ -34,17 +34,15 @@
// to deploy a service multiple times but ony one will accept
// traffic by using the label selector of kubernetes.
// In order to determine the active, a lock needs to be hold
// in redis. Hocks can be passed to handle the case of becoming
// in redis. Hooks can be passed to handle the case of becoming
// the active or passive.
// The readiness probe will report the state (ACTIVE/PASSIVE)
// of each of the members in the cluster.
type ActivePassive struct {
// OnActive will be called in case the current processes
// is elected to be the active one
// OnActive will be called in case the current processes is elected to be the active one
OnActive func(ctx context.Context)

// OnPassive will be called in case the current process is
// the passive one
// OnPassive will be called in case the current process is the passive one
OnPassive func(ctx context.Context)

// OnStop is called after the ActivePassive process stops
Expand All @@ -54,54 +52,57 @@
clusterName string
timeToFailover time.Duration
locker *redislock.Client
redisClient *redis.Client

// access to the kubernetes api
client *k8sapi.Client
k8sClient *k8sapi.Client

// current status of the failover (to show it in the readiness status)
state status
stateMu sync.RWMutex
}

// NewActivePassive creates a new active passive cluster
// identified by the name, the time to failover determines
// the frequency of checks performed against the redis to
// identified by the name. The time to fail over determines
// the frequency of checks performed against redis to
// keep the active state.
// NOTE: creating multiple ActivePassive in one processes
// is not working correctly as there is only one readiness
// probe.
func NewActivePassive(clusterName string, timeToFailover time.Duration, client *redis.Client) (*ActivePassive, error) {
cl, err := k8sapi.NewClient()
// NOTE: creating multiple ActivePassive in one process
// is not working correctly as there is only one readiness probe.
func NewActivePassive(clusterName string, timeToFailover time.Duration, redisClient *redis.Client) (*ActivePassive, error) {
k8sClient, err := k8sapi.NewClient()
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to create k8s client: %w", err)
}

ap := &ActivePassive{
activePassive := &ActivePassive{
clusterName: clusterName,
timeToFailover: timeToFailover,
locker: redislock.New(client),
client: cl,
locker: redislock.New(redisClient),
redisClient: redisClient,
k8sClient: k8sClient,
}
health.SetCustomReadinessCheck(ap.Handler)
health.SetCustomReadinessCheck(activePassive.Handler)

return ap, nil
return activePassive, nil
}

// Run registers the readiness probe and calls the OnActive
// and OnPassive callbacks in case the election toke place.
// Will handle panic safely and therefore can be directly called
// with go.
// Run manages distributed lock-based leadership.
// This method is designed to continually monitor and maintain the leadership status of the calling pod,
// ensuring only one active instance holds the lock at a time, while transitioning other instances to passive
// mode. The handler will try to renew its active status by refreshing the lock periodically.
func (a *ActivePassive) Run(ctx context.Context) error {
defer errors.HandleWithCtx(ctx, "activepassive failover handler")

lockName := "activepassive:lock:" + a.clusterName
token := "activepassive:token:" + a.clusterName

logger := log.Ctx(ctx).With().Str("failover", lockName).Logger()
ctx = logger.WithContext(ctx)

a.close = make(chan struct{})
defer close(a.close)

// trigger stop handler
// Trigger stop handler
defer func() {
if a.OnStop != nil {
a.OnStop()
Expand All @@ -110,68 +111,97 @@

var lock *redislock.Lock

// t is a ticker that reminds to call refresh if
// the token was acquired after half of the remaining ttl time
t := time.NewTicker(a.timeToFailover)
// Ticker to refresh the lock's TTL before it expires
tryRefreshLock := time.NewTicker(a.timeToFailover)

// retry time triggers to check if the look needs to be acquired
retry := time.NewTicker(waitRetry)
// Ticker to check if the lock can be acquired if in passive or undefined state
retryInterval := 500 * time.Millisecond
retryAcquireLock := time.NewTicker(retryInterval)

for {
// allow close or cancel
select {
case <-ctx.Done():
return ctx.Err()
case <-a.close:
return nil
case <-t.C:
case <-tryRefreshLock.C:
if a.getState() == ACTIVE {
err := lock.Refresh(ctx, a.timeToFailover, &redislock.Options{
RetryStrategy: redislock.LimitRetry(redislock.LinearBackoff(a.timeToFailover/3), 3),
Token: token,
})
if err != nil {
logger.Debug().Err(err).Msg("failed to refresh")
logger.Info().Err(err).Msg("failed to refresh the lock; becoming undefined...")
a.becomeUndefined(ctx)
}
}
case <-retry.C:
// try to acquire the lock, as we are not the active
case <-retryAcquireLock.C:
if a.getState() != ACTIVE {
var err error

lock, err = a.locker.Obtain(ctx, lockName, a.timeToFailover, &redislock.Options{
RetryStrategy: redislock.LimitRetry(redislock.LinearBackoff(a.timeToFailover/3), 3),
Token: token,
})
if err != nil {
// we became passive, trigger callback
if a.getState() != PASSIVE {
logger.Debug().Err(err).Msg("becoming passive")
logger.Debug().Err(err).Msg("couldn't obtain the lock; becoming passive...")
a.becomePassive(ctx)
}

continue
}

// lock acquired
logger.Debug().Msg("becoming active")
logger.Debug().Msg("lock acquired; becoming active...")
a.becomeActive(ctx)

// we are active, renew if required
d, err := lock.TTL(ctx)
logger.Debug().Msg("check if lock exists")

// Verify that key exists, then, retrieve the value
keyExists, err := a.redisClient.Exists(ctx, lockName).Result()
if err != nil {
logger.Error().Err(err).Msgf("Stefan: Failed to check that lock/key '%v' exists", lockName)

continue
}

if keyExists == 0 {
logger.Info().Msgf("Stefan: Lock/Key '%s' does not exist", lockName)

continue
}

lockValue, err := a.redisClient.Get(ctx, lockName).Result()
if err != nil {
logger.Error().Err(err).Msg("Error getting key value")

continue
}

logger.Info().Msgf("Stefan: Key value is: %v", lockValue)

// Check TTL of the newly acquired lock
ttl, err := safeGetTTL(ctx, lock, logger)
if err != nil {
logger.Debug().Err(err).Msg("failed to get TTL")
logger.Info().Err(err).Msg("failed to get lock TTL")

continue
}
if d == 0 {
// TTL seems to be expired, retry to get lock or become
// passive in next iteration
logger.Debug().Msg("ttl expired")

if ttl == 0 {
// Since the lock is very fresh with a TTL well > 0 this case is just a safeguard against rare occasions.
logger.Info().Msg("lock TTL is expired although the lock has been just acquired; becoming undefined...")
a.becomeUndefined(ctx)

continue
}
refreshTime := d / 2

logger.Debug().Msgf("set refresh to %v", refreshTime)
refreshTime := ttl / 2

// set to trigger refresh after TTL / 2
t.Reset(refreshTime)
logger.Debug().Msgf("set refresh ticker to %v ms", refreshTime)

// Reset the refresh ticker to TTL / 2
tryRefreshLock.Reset(refreshTime)
}
}
}
Expand All @@ -186,7 +216,11 @@
func (a *ActivePassive) Handler(w http.ResponseWriter, r *http.Request) {
label := a.label(a.getState())
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, strings.ToUpper(label))

_, err := fmt.Fprintln(w, strings.ToUpper(label))
if err != nil {
log.Printf("failed to write label '%s' to response: %v", label, err)
}
}

func (a *ActivePassive) label(s status) string {
Expand Down Expand Up @@ -222,7 +256,7 @@

// setState returns true if the state was set successfully
func (a *ActivePassive) setState(ctx context.Context, state status) bool {
err := a.client.SetCurrentPodLabel(ctx, Label, a.label(state))
err := a.k8sClient.SetCurrentPodLabel(ctx, Label, a.label(state))
if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("failed to mark pod as undefined")
a.stateMu.Lock()
Expand All @@ -242,3 +276,21 @@
a.stateMu.RUnlock()
return state
}

// safeGetTTL tries to get the TTL from the provided redis lock and recovers from a panic inside TTL().
func safeGetTTL(ctx context.Context, lock *redislock.Lock, logger zerolog.Logger) (time.Duration, error) {
var (
err error
ttl time.Duration
)

defer func() {
if r := recover(); r != nil {
logger.Error().Msgf("Recovered from panic in lock.TTL(): %v", r)
err = fmt.Errorf("panic during lock.TTL(): %v", r)
}
}()

ttl, err = lock.TTL(ctx)
return ttl, err
}
2 changes: 1 addition & 1 deletion pkg/lock/redis/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import (
"context"
"errors"
"fmt"
"github.com/pace/bricks/pkg/lock/redislock"
"sync"
"time"

redisbackend "github.com/pace/bricks/backend/redis"
pberrors "github.com/pace/bricks/maintenance/errors"

"github.com/bsm/redislock"
"github.com/redis/go-redis/v9"
"github.com/rs/zerolog/log"
)
Expand Down
Loading
Loading