diff --git a/backend/redis/redis.go b/backend/redis/redis.go index 25e6d4684..c6ac49430 100755 --- a/backend/redis/redis.go +++ b/backend/redis/redis.go @@ -193,7 +193,7 @@ func (lt *logtracer) ProcessHook(next redis.ProcessHook) redis.ProcessHook { cmdErr := cmd.Err() if cmdErr != nil { vals.span.SetData("error", cmdErr) - le = le.Err(cmdErr) + le.Err(cmdErr).Msg("failed to execute Redis command") paceRedisCmdFailed.With(prometheus.Labels{ "method": cmd.Name(), }).Inc() @@ -201,7 +201,6 @@ func (lt *logtracer) ProcessHook(next redis.ProcessHook) redis.ProcessHook { // do log statement dur := float64(time.Since(vals.startedAt)) / float64(time.Millisecond) - le.Float64("duration", dur).Msg("Redis query") paceRedisCmdDurationSeconds.With(prometheus.Labels{ "method": cmd.Name(), diff --git a/maintenance/failover/failover.go b/maintenance/failover/failover.go index 5ce21712c..82df646cb 100644 --- a/maintenance/failover/failover.go +++ b/maintenance/failover/failover.go @@ -18,8 +18,6 @@ import ( "github.com/redis/go-redis/v9" ) -const waitRetry = time.Millisecond * 500 - type status int const ( @@ -34,17 +32,15 @@ const Label = "github.com.pace.bricks.activepassive" // to deploy a service multiple times but ony one will accept // traffic by using the label selector of kubernetes. // In order to determine the active, a lock needs to be hold -// in redis. Hocks can be passed to handle the case of becoming +// in redis. Hooks can be passed to handle the case of becoming // the active or passive. // The readiness probe will report the state (ACTIVE/PASSIVE) // of each of the members in the cluster. type ActivePassive struct { - // OnActive will be called in case the current processes - // is elected to be the active one + // OnActive will be called in case the current processes is elected to be the active one OnActive func(ctx context.Context) - // OnPassive will be called in case the current process is - // the passive one + // OnPassive will be called in case the current process is the passive one OnPassive func(ctx context.Context) // OnStop is called after the ActivePassive process stops @@ -56,7 +52,7 @@ type ActivePassive struct { locker *redislock.Client // access to the kubernetes api - client *k8sapi.Client + k8sClient *k8sapi.Client // current status of the failover (to show it in the readiness status) state status @@ -64,33 +60,33 @@ type ActivePassive struct { } // NewActivePassive creates a new active passive cluster -// identified by the name, the time to failover determines -// the frequency of checks performed against the redis to +// identified by the name. The time to fail over determines +// the frequency of checks performed against redis to // keep the active state. -// NOTE: creating multiple ActivePassive in one processes -// is not working correctly as there is only one readiness -// probe. +// NOTE: creating multiple ActivePassive in one process +// is not working correctly as there is only one readiness probe. func NewActivePassive(clusterName string, timeToFailover time.Duration, client *redis.Client) (*ActivePassive, error) { - cl, err := k8sapi.NewClient() + k8sClient, err := k8sapi.NewClient() if err != nil { return nil, err } - ap := &ActivePassive{ + activePassive := &ActivePassive{ clusterName: clusterName, timeToFailover: timeToFailover, locker: redislock.New(client), - client: cl, + k8sClient: k8sClient, } - health.SetCustomReadinessCheck(ap.Handler) - return ap, nil + health.SetCustomReadinessCheck(activePassive.Handler) + + return activePassive, nil } -// Run registers the readiness probe and calls the OnActive -// and OnPassive callbacks in case the election toke place. -// Will handle panic safely and therefore can be directly called -// with go. +// Run manages distributed lock-based leadership. +// This method is designed to continually monitor and maintain the leadership status of the calling pod, +// ensuring only one active instance holds the lock at a time, while transitioning other instances to passive +// mode. The handler will try to renew its active status by refreshing the lock periodically. func (a *ActivePassive) Run(ctx context.Context) error { defer errors.HandleWithCtx(ctx, "activepassive failover handler") @@ -101,7 +97,6 @@ func (a *ActivePassive) Run(ctx context.Context) error { a.close = make(chan struct{}) defer close(a.close) - // trigger stop handler defer func() { if a.OnStop != nil { a.OnStop() @@ -110,68 +105,49 @@ func (a *ActivePassive) Run(ctx context.Context) error { var lock *redislock.Lock - // t is a ticker that reminds to call refresh if - // the token was acquired after half of the remaining ttl time - t := time.NewTicker(a.timeToFailover) + // Ticker to try to refresh the lock's TTL before it expires + tryRefreshLock := time.NewTicker(a.timeToFailover) - // retry time triggers to check if the look needs to be acquired - retry := time.NewTicker(waitRetry) + // Ticker to try to acquire the lock if in passive or undefined state + tryAcquireLock := time.NewTicker(500 * time.Millisecond) for { - // allow close or cancel select { case <-ctx.Done(): return ctx.Err() case <-a.close: return nil - case <-t.C: + case <-tryRefreshLock.C: if a.getState() == ACTIVE { err := lock.Refresh(ctx, a.timeToFailover, &redislock.Options{ RetryStrategy: redislock.LimitRetry(redislock.LinearBackoff(a.timeToFailover/3), 3), }) if err != nil { - logger.Debug().Err(err).Msg("failed to refresh") + logger.Info().Err(err).Msg("failed to refresh the lock; becoming undefined...") a.becomeUndefined(ctx) } } - case <-retry.C: - // try to acquire the lock, as we are not the active + case <-tryAcquireLock.C: if a.getState() != ACTIVE { var err error + lock, err = a.locker.Obtain(ctx, lockName, a.timeToFailover, &redislock.Options{ RetryStrategy: redislock.LimitRetry(redislock.LinearBackoff(a.timeToFailover/3), 3), }) if err != nil { - // we became passive, trigger callback if a.getState() != PASSIVE { - logger.Debug().Err(err).Msg("becoming passive") + logger.Info().Err(err).Msg("failed to obtain the lock; becoming passive...") a.becomePassive(ctx) } continue } - // lock acquired - logger.Debug().Msg("becoming active") + logger.Debug().Msg("lock acquired; becoming active...") a.becomeActive(ctx) - // we are active, renew if required - d, err := lock.TTL(ctx) - if err != nil { - logger.Debug().Err(err).Msg("failed to get TTL") - } - if d == 0 { - // TTL seems to be expired, retry to get lock or become - // passive in next iteration - logger.Debug().Msg("ttl expired") - a.becomeUndefined(ctx) - } - refreshTime := d / 2 - - logger.Debug().Msgf("set refresh to %v", refreshTime) - - // set to trigger refresh after TTL / 2 - t.Reset(refreshTime) + // Reset the refresh ticker to half of the time to failover + tryRefreshLock.Reset(a.timeToFailover / 2) } } } @@ -222,7 +198,7 @@ func (a *ActivePassive) becomeUndefined(ctx context.Context) { // setState returns true if the state was set successfully func (a *ActivePassive) setState(ctx context.Context, state status) bool { - err := a.client.SetCurrentPodLabel(ctx, Label, a.label(state)) + err := a.k8sClient.SetCurrentPodLabel(ctx, Label, a.label(state)) if err != nil { log.Ctx(ctx).Error().Err(err).Msg("failed to mark pod as undefined") a.stateMu.Lock()