Skip to content

Commit

Permalink
Use prefix in all vtorc check and recover logs
Browse files Browse the repository at this point in the history
This is meant to make recovery actions more easily identified from the logs.
See #17465

Signed-off-by: Eduardo J. Ortega U. <[email protected]>
  • Loading branch information
ejortegau committed Jan 15, 2025
1 parent fd0ffeb commit 0d3685a
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 16 deletions.
91 changes: 91 additions & 0 deletions go/vt/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,94 @@ func (lrms *logRotateMaxSize) String() string {
func (lrms *logRotateMaxSize) Type() string {
return "uint64"
}

type PrefixedLogger struct {
prefix string
}

func NewPrefixedLogger(prefix string) *PrefixedLogger {
return &PrefixedLogger{prefix: prefix}
}

func (pl *PrefixedLogger) V(level glog.Level) glog.Verbose {
return V(level)
}

func (pl *PrefixedLogger) Flush() {
Flush()
}

func (pl *PrefixedLogger) Info(args ...any) {
args = append([]interface{}{pl.prefix}, args...)
Info(args...)
}

func (pl *PrefixedLogger) Infof(format string, args ...any) {
args = append([]interface{}{pl.prefix}, args...)
Infof("%s: "+format, args...)
}

func (pl *PrefixedLogger) InfoDepth(depth int, args ...any) {
args = append([]interface{}{pl.prefix}, args...)
InfoDepth(depth, args...)
}

func (pl *PrefixedLogger) Warning(args ...any) {
args = append([]interface{}{pl.prefix}, args...)
Warning(args...)
}

func (pl *PrefixedLogger) Warningf(format string, args ...any) {
args = append([]interface{}{pl.prefix}, args...)
Warningf("%s: "+format, args...)
}

func (pl *PrefixedLogger) WarningDepth(depth int, args ...any) {
args = append([]interface{}{pl.prefix}, args...)
WarningDepth(depth, args...)
}

func (pl *PrefixedLogger) Error(args ...any) {
args = append([]interface{}{pl.prefix}, args...)
Error(args...)
}

func (pl *PrefixedLogger) Errorf(format string, args ...any) {
args = append([]interface{}{pl.prefix}, args...)
Errorf("%s: "+format, args...)
}

func (pl *PrefixedLogger) ErrorDepth(depth int, args ...any) {
args = append([]interface{}{pl.prefix}, args...)
ErrorDepth(depth, args...)
}

func (pl *PrefixedLogger) Exit(args ...any) {
args = append([]interface{}{pl.prefix}, args...)
Exit(args...)
}

func (pl *PrefixedLogger) Exitf(format string, args ...any) {
args = append([]interface{}{pl.prefix}, args...)
Exitf("%s: "+format, args...)
}

func (pl *PrefixedLogger) ExitDepth(depth int, args ...any) {
args = append([]interface{}{pl.prefix}, args...)
ExitDepth(depth, args...)
}

func (pl *PrefixedLogger) Fatal(args ...any) {
args = append([]interface{}{pl.prefix}, args...)
Fatal(args...)
}

func (pl *PrefixedLogger) Fatalf(format string, args ...any) {
args = append([]interface{}{pl.prefix}, args...)
Fatalf("%s: "+format, args...)
}

func (pl *PrefixedLogger) FatalDepth(depth int, args ...any) {
args = append([]interface{}{pl.prefix}, args...)
FatalDepth(depth, args...)
}
49 changes: 33 additions & 16 deletions go/vt/vtorc/logic/topology_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,48 +447,53 @@ func executeCheckAndRecoverFunction(analysisEntry *inst.ReplicationAnalysis) (er
countPendingRecoveries.Add(1)
defer countPendingRecoveries.Add(-1)

logger := log.NewPrefixedLogger(fmt.Sprintf("Recovery for %s on %s/%s", analysisEntry.Analysis, analysisEntry.AnalyzedKeyspace, analysisEntry.AnalyzedShard))
logger.Info("Starting checkAndRecover")

checkAndRecoverFunctionCode := getCheckAndRecoverFunctionCode(analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias)
isActionableRecovery := hasActionableRecovery(checkAndRecoverFunctionCode)
analysisEntry.IsActionableRecovery = isActionableRecovery

if checkAndRecoverFunctionCode == noRecoveryFunc {
logger.Warning("No recovery strategies for problem, aborting recovery")
// Unhandled problem type
if analysisEntry.Analysis != inst.NoProblem {
if util.ClearToLog("executeCheckAndRecoverFunction", analysisEntry.AnalyzedInstanceAlias) {
log.Warningf("executeCheckAndRecoverFunction: ignoring analysisEntry that has no action plan: %+v; tablet: %+v",
analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias)
logger.Warningf("executeCheckAndRecoverFunction: ignoring analysisEntry that has no action plan: tablet: %+v",
analysisEntry.AnalyzedInstanceAlias)
}
}

return nil
}
// we have a recovery function; its execution still depends on filters if not disabled.
if isActionableRecovery || util.ClearToLog("executeCheckAndRecoverFunction: detection", analysisEntry.AnalyzedInstanceAlias) {
log.Infof("executeCheckAndRecoverFunction: proceeding with %+v detection on %+v; isActionable?: %+v", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias, isActionableRecovery)
logger.Infof("executeCheckAndRecoverFunction: proceeding with %+v detection on %+v; isActionable?: %+v", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias, isActionableRecovery)
}

// At this point we have validated there's a failure scenario for which we have a recovery path.
// Record the failure detected in the logs.
err = InsertRecoveryDetection(analysisEntry)
if err != nil {
log.Errorf("executeCheckAndRecoverFunction: error on inserting recovery detection record: %+v", err)
logger.Errorf("executeCheckAndRecoverFunction: error inserting recovery detection record, aborting recovery: %+v", err)
return err
}

// Check for recovery being disabled globally
if recoveryDisabledGlobally, err := IsRecoveryDisabled(); err != nil {
// Unexpected. Shouldn't get this
log.Errorf("Unable to determine if recovery is disabled globally: %v", err)
logger.Errorf("Unable to determine if recovery is disabled globally, still attempting to recover: %v", err)
} else if recoveryDisabledGlobally {
log.Infof("CheckAndRecover: Analysis: %+v, Tablet: %+v: NOT Recovering host (disabled globally)",
analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias)
logger.Infof("CheckAndRecover: Tablet: %+v: NOT Recovering host (disabled globally)",
analysisEntry.AnalyzedInstanceAlias)

return err
}

// We lock the shard here and then refresh the tablets information
ctx, unlock, err := LockShard(context.Background(), analysisEntry.AnalyzedInstanceAlias, getLockAction(analysisEntry.AnalyzedInstanceAlias, analysisEntry.Analysis))
if err != nil {
logger.Errorf("Failed to lock shard, aborting recovery: %v", err)
return err
}
defer unlock(&err)
Expand All @@ -498,14 +503,15 @@ func executeCheckAndRecoverFunction(analysisEntry *inst.ReplicationAnalysis) (er
// changes, we should be checking that this failure is indeed needed to be fixed. We do this after locking the shard to be sure
// that the data that we use now is up-to-date.
if isActionableRecovery {
log.Errorf("executeCheckAndRecoverFunction: Proceeding with %v recovery on %v validation after acquiring shard lock.", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias)
logger.Infof("executeCheckAndRecoverFunction: Proceeding with recovery on %v validation after acquiring shard lock.", analysisEntry.AnalyzedInstanceAlias)
// The first step we have to do is refresh the keyspace and shard information
// This is required to know if the durability policies have changed or not
// If they have, then recoveries like ReplicaSemiSyncMustNotBeSet, etc won't be valid anymore.
// Similarly, a new primary could have been elected in the mean-time that can cause
// a change in the recovery we run.
err = RefreshKeyspaceAndShard(analysisEntry.AnalyzedKeyspace, analysisEntry.AnalyzedShard)
if err != nil {
logger.Errorf("Failed to refresh keyspace and shard, aborting recovery: %v", err)
return err
}
// If we are about to run a cluster-wide recovery, it is imperative to first refresh all the tablets
Expand All @@ -518,6 +524,7 @@ func executeCheckAndRecoverFunction(analysisEntry *inst.ReplicationAnalysis) (er
}
// We ignore the dead primary tablet because it is going to be unreachable. If all the other tablets aren't able to reach this tablet either,
// we can proceed with the dead primary recovery. We don't need to refresh the information for this dead tablet.
logger.Info("Force refreshing all shard tablets")
forceRefreshAllTabletsInShard(ctx, analysisEntry.AnalyzedKeyspace, analysisEntry.AnalyzedShard, tabletsToIgnore)
} else {
// If we are not running a cluster-wide recovery, then it is only concerned with the specific tablet
Expand All @@ -526,66 +533,76 @@ func executeCheckAndRecoverFunction(analysisEntry *inst.ReplicationAnalysis) (er
// and the host-port set on the tablet in question.
// So, we only need to refresh the tablet info records (to know if the primary tablet has changed),
// and the replication data of the new primary and this tablet.
logger.Info("Refreshing shard tablet info")
refreshTabletInfoOfShard(ctx, analysisEntry.AnalyzedKeyspace, analysisEntry.AnalyzedShard)
logger.Info("Discovering analysis instance")
DiscoverInstance(analysisEntry.AnalyzedInstanceAlias, true)
logger.Info("Getting shard primary")
primaryTablet, err := shardPrimary(analysisEntry.AnalyzedKeyspace, analysisEntry.AnalyzedShard)
if err != nil {
log.Errorf("executeCheckAndRecoverFunction: Analysis: %+v, Tablet: %+v: error while finding the shard primary: %v",
analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias, err)
logger.Errorf("executeCheckAndRecoverFunction: Tablet: %+v: error while finding the shard primary: %v",
analysisEntry.AnalyzedInstanceAlias, err)
return err
}
primaryTabletAlias := topoproto.TabletAliasString(primaryTablet.Alias)
// We can skip the refresh if we know the tablet we are looking at is the primary tablet.
// This would be the case for PrimaryHasPrimary recovery. We don't need to refresh the same tablet twice.
if analysisEntry.AnalyzedInstanceAlias != primaryTabletAlias {
logger.Info("Discovering primary instance")
DiscoverInstance(primaryTabletAlias, true)
}
}
alreadyFixed, err := checkIfAlreadyFixed(analysisEntry)
if err != nil {
log.Errorf("executeCheckAndRecoverFunction: Analysis: %+v, Tablet: %+v: error while trying to find if the problem is already fixed: %v",
analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias, err)
logger.Errorf("executeCheckAndRecoverFunction: Tablet: %+v: error while trying to find if the problem is already fixed: %v",
analysisEntry.AnalyzedInstanceAlias, err)
return err
}
if alreadyFixed {
log.Infof("Analysis: %v on tablet %v - No longer valid, some other agent must have fixed the problem.", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias)
logger.Infof("Analysis: %v on tablet %v - No longer valid, some other agent must have fixed the problem.", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias)
return nil
}
}

// Actually attempt recovery:
if isActionableRecovery || util.ClearToLog("executeCheckAndRecoverFunction: recovery", analysisEntry.AnalyzedInstanceAlias) {
log.Infof("executeCheckAndRecoverFunction: proceeding with %+v recovery on %+v; isRecoverable?: %+v", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias, isActionableRecovery)
logger.Infof("executeCheckAndRecoverFunction: proceeding with recovery on %+v; isRecoverable?: %+v", analysisEntry.AnalyzedInstanceAlias, isActionableRecovery)
}
recoveryAttempted, topologyRecovery, err := getCheckAndRecoverFunction(checkAndRecoverFunctionCode)(ctx, analysisEntry)
if !recoveryAttempted {
logger.Errorf("Recovery not attempted: %+v", err)
return err
}
recoveryName := getRecoverFunctionName(checkAndRecoverFunctionCode)
recoveriesCounter.Add(recoveryName, 1)
if err != nil {
logger.Errorf("Failed to recover: %+v", err)
recoveriesFailureCounter.Add(recoveryName, 1)
} else {
logger.Info("Recovery succeeded")
recoveriesSuccessfulCounter.Add(recoveryName, 1)
}
if topologyRecovery == nil {
logger.Error("Topology recovery is nil - recovery might have failed")
return err
}
if b, err := json.Marshal(topologyRecovery); err == nil {
log.Infof("Topology recovery: %+v", string(b))
logger.Infof("Topology recovery: %+v", string(b))
} else {
log.Infof("Topology recovery: %+v", topologyRecovery)
logger.Infof("Topology recovery: %+v", topologyRecovery)
}
// If we ran a cluster wide recovery and actually attempted it, then we know that the replication state for all the tablets in this cluster
// would have changed. So we can go ahead and pre-emptively refresh them.
// For this refresh we don't use the same context that we used for the recovery, since that context might have expired or could expire soon
// Instead we pass the background context. The call forceRefreshAllTabletsInShard handles adding a timeout to it for us.
if isClusterWideRecovery(checkAndRecoverFunctionCode) {
logger.Info("Forcing refresh of all tablets post recovery")
forceRefreshAllTabletsInShard(context.Background(), analysisEntry.AnalyzedKeyspace, analysisEntry.AnalyzedShard, nil)
} else {
// For all other recoveries, we would have changed the replication status of the analyzed tablet
// so it doesn't hurt to re-read the information of this tablet, otherwise we'll requeue the same recovery
// that we just completed because we would be using stale data.
logger.Info("Force discovering problem instance %s post recovery", analysisEntry.AnalyzedInstanceAlias)
DiscoverInstance(analysisEntry.AnalyzedInstanceAlias, true)
}
return err
Expand Down

0 comments on commit 0d3685a

Please sign in to comment.