Skip to content

Commit

Permalink
Merge pull request #396 from pace/fix-failover
Browse files Browse the repository at this point in the history
Fix failover
  • Loading branch information
monstermunchkin authored Jan 17, 2025
2 parents c943684 + 04db2a5 commit 79b8e04
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 57 deletions.
3 changes: 1 addition & 2 deletions backend/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,15 +193,14 @@ func (lt *logtracer) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
cmdErr := cmd.Err()
if cmdErr != nil {
vals.span.SetData("error", cmdErr)
le = le.Err(cmdErr)
le.Err(cmdErr).Msg("failed to execute Redis command")
paceRedisCmdFailed.With(prometheus.Labels{
"method": cmd.Name(),
}).Inc()
}

// do log statement
dur := float64(time.Since(vals.startedAt)) / float64(time.Millisecond)
le.Float64("duration", dur).Msg("Redis query")

paceRedisCmdDurationSeconds.With(prometheus.Labels{
"method": cmd.Name(),
Expand Down
86 changes: 31 additions & 55 deletions maintenance/failover/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
"github.com/redis/go-redis/v9"
)

const waitRetry = time.Millisecond * 500

type status int

const (
Expand All @@ -34,17 +32,15 @@ const Label = "github.com.pace.bricks.activepassive"
// to deploy a service multiple times but ony one will accept
// traffic by using the label selector of kubernetes.
// In order to determine the active, a lock needs to be hold
// in redis. Hocks can be passed to handle the case of becoming
// in redis. Hooks can be passed to handle the case of becoming
// the active or passive.
// The readiness probe will report the state (ACTIVE/PASSIVE)
// of each of the members in the cluster.
type ActivePassive struct {
// OnActive will be called in case the current processes
// is elected to be the active one
// OnActive will be called in case the current processes is elected to be the active one
OnActive func(ctx context.Context)

// OnPassive will be called in case the current process is
// the passive one
// OnPassive will be called in case the current process is the passive one
OnPassive func(ctx context.Context)

// OnStop is called after the ActivePassive process stops
Expand All @@ -56,41 +52,41 @@ type ActivePassive struct {
locker *redislock.Client

// access to the kubernetes api
client *k8sapi.Client
k8sClient *k8sapi.Client

// current status of the failover (to show it in the readiness status)
state status
stateMu sync.RWMutex
}

// NewActivePassive creates a new active passive cluster
// identified by the name, the time to failover determines
// the frequency of checks performed against the redis to
// identified by the name. The time to fail over determines
// the frequency of checks performed against redis to
// keep the active state.
// NOTE: creating multiple ActivePassive in one processes
// is not working correctly as there is only one readiness
// probe.
// NOTE: creating multiple ActivePassive in one process
// is not working correctly as there is only one readiness probe.
func NewActivePassive(clusterName string, timeToFailover time.Duration, client *redis.Client) (*ActivePassive, error) {
cl, err := k8sapi.NewClient()
k8sClient, err := k8sapi.NewClient()
if err != nil {
return nil, err
}

ap := &ActivePassive{
activePassive := &ActivePassive{
clusterName: clusterName,
timeToFailover: timeToFailover,
locker: redislock.New(client),
client: cl,
k8sClient: k8sClient,
}
health.SetCustomReadinessCheck(ap.Handler)

return ap, nil
health.SetCustomReadinessCheck(activePassive.Handler)

return activePassive, nil
}

// Run registers the readiness probe and calls the OnActive
// and OnPassive callbacks in case the election toke place.
// Will handle panic safely and therefore can be directly called
// with go.
// Run manages distributed lock-based leadership.
// This method is designed to continually monitor and maintain the leadership status of the calling pod,
// ensuring only one active instance holds the lock at a time, while transitioning other instances to passive
// mode. The handler will try to renew its active status by refreshing the lock periodically.
func (a *ActivePassive) Run(ctx context.Context) error {
defer errors.HandleWithCtx(ctx, "activepassive failover handler")

Expand All @@ -101,7 +97,6 @@ func (a *ActivePassive) Run(ctx context.Context) error {
a.close = make(chan struct{})
defer close(a.close)

// trigger stop handler
defer func() {
if a.OnStop != nil {
a.OnStop()
Expand All @@ -110,68 +105,49 @@ func (a *ActivePassive) Run(ctx context.Context) error {

var lock *redislock.Lock

// t is a ticker that reminds to call refresh if
// the token was acquired after half of the remaining ttl time
t := time.NewTicker(a.timeToFailover)
// Ticker to try to refresh the lock's TTL before it expires
tryRefreshLock := time.NewTicker(a.timeToFailover)

// retry time triggers to check if the look needs to be acquired
retry := time.NewTicker(waitRetry)
// Ticker to try to acquire the lock if in passive or undefined state
tryAcquireLock := time.NewTicker(500 * time.Millisecond)

for {
// allow close or cancel
select {
case <-ctx.Done():
return ctx.Err()
case <-a.close:
return nil
case <-t.C:
case <-tryRefreshLock.C:
if a.getState() == ACTIVE {
err := lock.Refresh(ctx, a.timeToFailover, &redislock.Options{
RetryStrategy: redislock.LimitRetry(redislock.LinearBackoff(a.timeToFailover/3), 3),
})
if err != nil {
logger.Debug().Err(err).Msg("failed to refresh")
logger.Info().Err(err).Msg("failed to refresh the lock; becoming undefined...")
a.becomeUndefined(ctx)
}
}
case <-retry.C:
// try to acquire the lock, as we are not the active
case <-tryAcquireLock.C:
if a.getState() != ACTIVE {
var err error

lock, err = a.locker.Obtain(ctx, lockName, a.timeToFailover, &redislock.Options{
RetryStrategy: redislock.LimitRetry(redislock.LinearBackoff(a.timeToFailover/3), 3),
})
if err != nil {
// we became passive, trigger callback
if a.getState() != PASSIVE {
logger.Debug().Err(err).Msg("becoming passive")
logger.Info().Err(err).Msg("failed to obtain the lock; becoming passive...")
a.becomePassive(ctx)
}

continue
}

// lock acquired
logger.Debug().Msg("becoming active")
logger.Debug().Msg("lock acquired; becoming active...")
a.becomeActive(ctx)

// we are active, renew if required
d, err := lock.TTL(ctx)
if err != nil {
logger.Debug().Err(err).Msg("failed to get TTL")
}
if d == 0 {
// TTL seems to be expired, retry to get lock or become
// passive in next iteration
logger.Debug().Msg("ttl expired")
a.becomeUndefined(ctx)
}
refreshTime := d / 2

logger.Debug().Msgf("set refresh to %v", refreshTime)

// set to trigger refresh after TTL / 2
t.Reset(refreshTime)
// Reset the refresh ticker to half of the time to failover
tryRefreshLock.Reset(a.timeToFailover / 2)
}
}
}
Expand Down Expand Up @@ -222,7 +198,7 @@ func (a *ActivePassive) becomeUndefined(ctx context.Context) {

// setState returns true if the state was set successfully
func (a *ActivePassive) setState(ctx context.Context, state status) bool {
err := a.client.SetCurrentPodLabel(ctx, Label, a.label(state))
err := a.k8sClient.SetCurrentPodLabel(ctx, Label, a.label(state))
if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("failed to mark pod as undefined")
a.stateMu.Lock()
Expand Down

0 comments on commit 79b8e04

Please sign in to comment.