From 0d3685a37d586ecb2723b6c5fd40c9d444fcf806 Mon Sep 17 00:00:00 2001 From: "Eduardo J. Ortega U." <5791035+ejortegau@users.noreply.github.com> Date: Wed, 15 Jan 2025 14:01:16 +0100 Subject: [PATCH] Use prefix in all vtorc check and recover logs This is meant to make recovery actions more easily identified from the logs. See https://github.com/vitessio/vitess/issues/17465 Signed-off-by: Eduardo J. Ortega U. <5791035+ejortegau@users.noreply.github.com> --- go/vt/log/log.go | 91 ++++++++++++++++++++++++++ go/vt/vtorc/logic/topology_recovery.go | 49 +++++++++----- 2 files changed, 124 insertions(+), 16 deletions(-) diff --git a/go/vt/log/log.go b/go/vt/log/log.go index 79be1da464c..ea8dd1c0081 100644 --- a/go/vt/log/log.go +++ b/go/vt/log/log.go @@ -111,3 +111,94 @@ func (lrms *logRotateMaxSize) String() string { func (lrms *logRotateMaxSize) Type() string { return "uint64" } + +type PrefixedLogger struct { + prefix string +} + +func NewPrefixedLogger(prefix string) *PrefixedLogger { + return &PrefixedLogger{prefix: prefix} +} + +func (pl *PrefixedLogger) V(level glog.Level) glog.Verbose { + return V(level) +} + +func (pl *PrefixedLogger) Flush() { + Flush() +} + +func (pl *PrefixedLogger) Info(args ...any) { + args = append([]interface{}{pl.prefix}, args...) + Info(args...) +} + +func (pl *PrefixedLogger) Infof(format string, args ...any) { + args = append([]interface{}{pl.prefix}, args...) + Infof("%s: "+format, args...) +} + +func (pl *PrefixedLogger) InfoDepth(depth int, args ...any) { + args = append([]interface{}{pl.prefix}, args...) + InfoDepth(depth, args...) +} + +func (pl *PrefixedLogger) Warning(args ...any) { + args = append([]interface{}{pl.prefix}, args...) + Warning(args...) +} + +func (pl *PrefixedLogger) Warningf(format string, args ...any) { + args = append([]interface{}{pl.prefix}, args...) + Warningf("%s: "+format, args...) +} + +func (pl *PrefixedLogger) WarningDepth(depth int, args ...any) { + args = append([]interface{}{pl.prefix}, args...) + WarningDepth(depth, args...) +} + +func (pl *PrefixedLogger) Error(args ...any) { + args = append([]interface{}{pl.prefix}, args...) + Error(args...) +} + +func (pl *PrefixedLogger) Errorf(format string, args ...any) { + args = append([]interface{}{pl.prefix}, args...) + Errorf("%s: "+format, args...) +} + +func (pl *PrefixedLogger) ErrorDepth(depth int, args ...any) { + args = append([]interface{}{pl.prefix}, args...) + ErrorDepth(depth, args...) +} + +func (pl *PrefixedLogger) Exit(args ...any) { + args = append([]interface{}{pl.prefix}, args...) + Exit(args...) +} + +func (pl *PrefixedLogger) Exitf(format string, args ...any) { + args = append([]interface{}{pl.prefix}, args...) + Exitf("%s: "+format, args...) +} + +func (pl *PrefixedLogger) ExitDepth(depth int, args ...any) { + args = append([]interface{}{pl.prefix}, args...) + ExitDepth(depth, args...) +} + +func (pl *PrefixedLogger) Fatal(args ...any) { + args = append([]interface{}{pl.prefix}, args...) + Fatal(args...) +} + +func (pl *PrefixedLogger) Fatalf(format string, args ...any) { + args = append([]interface{}{pl.prefix}, args...) + Fatalf("%s: "+format, args...) +} + +func (pl *PrefixedLogger) FatalDepth(depth int, args ...any) { + args = append([]interface{}{pl.prefix}, args...) + FatalDepth(depth, args...) +} diff --git a/go/vt/vtorc/logic/topology_recovery.go b/go/vt/vtorc/logic/topology_recovery.go index 0d0bbff5b53..6ae4ff88e20 100644 --- a/go/vt/vtorc/logic/topology_recovery.go +++ b/go/vt/vtorc/logic/topology_recovery.go @@ -447,16 +447,20 @@ func executeCheckAndRecoverFunction(analysisEntry *inst.ReplicationAnalysis) (er countPendingRecoveries.Add(1) defer countPendingRecoveries.Add(-1) + logger := log.NewPrefixedLogger(fmt.Sprintf("Recovery for %s on %s/%s", analysisEntry.Analysis, analysisEntry.AnalyzedKeyspace, analysisEntry.AnalyzedShard)) + logger.Info("Starting checkAndRecover") + checkAndRecoverFunctionCode := getCheckAndRecoverFunctionCode(analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias) isActionableRecovery := hasActionableRecovery(checkAndRecoverFunctionCode) analysisEntry.IsActionableRecovery = isActionableRecovery if checkAndRecoverFunctionCode == noRecoveryFunc { + logger.Warning("No recovery strategies for problem, aborting recovery") // Unhandled problem type if analysisEntry.Analysis != inst.NoProblem { if util.ClearToLog("executeCheckAndRecoverFunction", analysisEntry.AnalyzedInstanceAlias) { - log.Warningf("executeCheckAndRecoverFunction: ignoring analysisEntry that has no action plan: %+v; tablet: %+v", - analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias) + logger.Warningf("executeCheckAndRecoverFunction: ignoring analysisEntry that has no action plan: tablet: %+v", + analysisEntry.AnalyzedInstanceAlias) } } @@ -464,24 +468,24 @@ func executeCheckAndRecoverFunction(analysisEntry *inst.ReplicationAnalysis) (er } // we have a recovery function; its execution still depends on filters if not disabled. if isActionableRecovery || util.ClearToLog("executeCheckAndRecoverFunction: detection", analysisEntry.AnalyzedInstanceAlias) { - log.Infof("executeCheckAndRecoverFunction: proceeding with %+v detection on %+v; isActionable?: %+v", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias, isActionableRecovery) + logger.Infof("executeCheckAndRecoverFunction: proceeding with %+v detection on %+v; isActionable?: %+v", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias, isActionableRecovery) } // At this point we have validated there's a failure scenario for which we have a recovery path. // Record the failure detected in the logs. err = InsertRecoveryDetection(analysisEntry) if err != nil { - log.Errorf("executeCheckAndRecoverFunction: error on inserting recovery detection record: %+v", err) + logger.Errorf("executeCheckAndRecoverFunction: error inserting recovery detection record, aborting recovery: %+v", err) return err } // Check for recovery being disabled globally if recoveryDisabledGlobally, err := IsRecoveryDisabled(); err != nil { // Unexpected. Shouldn't get this - log.Errorf("Unable to determine if recovery is disabled globally: %v", err) + logger.Errorf("Unable to determine if recovery is disabled globally, still attempting to recover: %v", err) } else if recoveryDisabledGlobally { - log.Infof("CheckAndRecover: Analysis: %+v, Tablet: %+v: NOT Recovering host (disabled globally)", - analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias) + logger.Infof("CheckAndRecover: Tablet: %+v: NOT Recovering host (disabled globally)", + analysisEntry.AnalyzedInstanceAlias) return err } @@ -489,6 +493,7 @@ func executeCheckAndRecoverFunction(analysisEntry *inst.ReplicationAnalysis) (er // We lock the shard here and then refresh the tablets information ctx, unlock, err := LockShard(context.Background(), analysisEntry.AnalyzedInstanceAlias, getLockAction(analysisEntry.AnalyzedInstanceAlias, analysisEntry.Analysis)) if err != nil { + logger.Errorf("Failed to lock shard, aborting recovery: %v", err) return err } defer unlock(&err) @@ -498,7 +503,7 @@ func executeCheckAndRecoverFunction(analysisEntry *inst.ReplicationAnalysis) (er // changes, we should be checking that this failure is indeed needed to be fixed. We do this after locking the shard to be sure // that the data that we use now is up-to-date. if isActionableRecovery { - log.Errorf("executeCheckAndRecoverFunction: Proceeding with %v recovery on %v validation after acquiring shard lock.", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias) + logger.Infof("executeCheckAndRecoverFunction: Proceeding with recovery on %v validation after acquiring shard lock.", analysisEntry.AnalyzedInstanceAlias) // The first step we have to do is refresh the keyspace and shard information // This is required to know if the durability policies have changed or not // If they have, then recoveries like ReplicaSemiSyncMustNotBeSet, etc won't be valid anymore. @@ -506,6 +511,7 @@ func executeCheckAndRecoverFunction(analysisEntry *inst.ReplicationAnalysis) (er // a change in the recovery we run. err = RefreshKeyspaceAndShard(analysisEntry.AnalyzedKeyspace, analysisEntry.AnalyzedShard) if err != nil { + logger.Errorf("Failed to refresh keyspace and shard, aborting recovery: %v", err) return err } // If we are about to run a cluster-wide recovery, it is imperative to first refresh all the tablets @@ -518,6 +524,7 @@ func executeCheckAndRecoverFunction(analysisEntry *inst.ReplicationAnalysis) (er } // We ignore the dead primary tablet because it is going to be unreachable. If all the other tablets aren't able to reach this tablet either, // we can proceed with the dead primary recovery. We don't need to refresh the information for this dead tablet. + logger.Info("Force refreshing all shard tablets") forceRefreshAllTabletsInShard(ctx, analysisEntry.AnalyzedKeyspace, analysisEntry.AnalyzedShard, tabletsToIgnore) } else { // If we are not running a cluster-wide recovery, then it is only concerned with the specific tablet @@ -526,66 +533,76 @@ func executeCheckAndRecoverFunction(analysisEntry *inst.ReplicationAnalysis) (er // and the host-port set on the tablet in question. // So, we only need to refresh the tablet info records (to know if the primary tablet has changed), // and the replication data of the new primary and this tablet. + logger.Info("Refreshing shard tablet info") refreshTabletInfoOfShard(ctx, analysisEntry.AnalyzedKeyspace, analysisEntry.AnalyzedShard) + logger.Info("Discovering analysis instance") DiscoverInstance(analysisEntry.AnalyzedInstanceAlias, true) + logger.Info("Getting shard primary") primaryTablet, err := shardPrimary(analysisEntry.AnalyzedKeyspace, analysisEntry.AnalyzedShard) if err != nil { - log.Errorf("executeCheckAndRecoverFunction: Analysis: %+v, Tablet: %+v: error while finding the shard primary: %v", - analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias, err) + logger.Errorf("executeCheckAndRecoverFunction: Tablet: %+v: error while finding the shard primary: %v", + analysisEntry.AnalyzedInstanceAlias, err) return err } primaryTabletAlias := topoproto.TabletAliasString(primaryTablet.Alias) // We can skip the refresh if we know the tablet we are looking at is the primary tablet. // This would be the case for PrimaryHasPrimary recovery. We don't need to refresh the same tablet twice. if analysisEntry.AnalyzedInstanceAlias != primaryTabletAlias { + logger.Info("Discovering primary instance") DiscoverInstance(primaryTabletAlias, true) } } alreadyFixed, err := checkIfAlreadyFixed(analysisEntry) if err != nil { - log.Errorf("executeCheckAndRecoverFunction: Analysis: %+v, Tablet: %+v: error while trying to find if the problem is already fixed: %v", - analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias, err) + logger.Errorf("executeCheckAndRecoverFunction: Tablet: %+v: error while trying to find if the problem is already fixed: %v", + analysisEntry.AnalyzedInstanceAlias, err) return err } if alreadyFixed { - log.Infof("Analysis: %v on tablet %v - No longer valid, some other agent must have fixed the problem.", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias) + logger.Infof("Analysis: %v on tablet %v - No longer valid, some other agent must have fixed the problem.", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias) return nil } } // Actually attempt recovery: if isActionableRecovery || util.ClearToLog("executeCheckAndRecoverFunction: recovery", analysisEntry.AnalyzedInstanceAlias) { - log.Infof("executeCheckAndRecoverFunction: proceeding with %+v recovery on %+v; isRecoverable?: %+v", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias, isActionableRecovery) + logger.Infof("executeCheckAndRecoverFunction: proceeding with recovery on %+v; isRecoverable?: %+v", analysisEntry.AnalyzedInstanceAlias, isActionableRecovery) } recoveryAttempted, topologyRecovery, err := getCheckAndRecoverFunction(checkAndRecoverFunctionCode)(ctx, analysisEntry) if !recoveryAttempted { + logger.Errorf("Recovery not attempted: %+v", err) return err } recoveryName := getRecoverFunctionName(checkAndRecoverFunctionCode) recoveriesCounter.Add(recoveryName, 1) if err != nil { + logger.Errorf("Failed to recover: %+v", err) recoveriesFailureCounter.Add(recoveryName, 1) } else { + logger.Info("Recovery succeeded") recoveriesSuccessfulCounter.Add(recoveryName, 1) } if topologyRecovery == nil { + logger.Error("Topology recovery is nil - recovery might have failed") return err } if b, err := json.Marshal(topologyRecovery); err == nil { - log.Infof("Topology recovery: %+v", string(b)) + logger.Infof("Topology recovery: %+v", string(b)) } else { - log.Infof("Topology recovery: %+v", topologyRecovery) + logger.Infof("Topology recovery: %+v", topologyRecovery) } // If we ran a cluster wide recovery and actually attempted it, then we know that the replication state for all the tablets in this cluster // would have changed. So we can go ahead and pre-emptively refresh them. // For this refresh we don't use the same context that we used for the recovery, since that context might have expired or could expire soon // Instead we pass the background context. The call forceRefreshAllTabletsInShard handles adding a timeout to it for us. if isClusterWideRecovery(checkAndRecoverFunctionCode) { + logger.Info("Forcing refresh of all tablets post recovery") forceRefreshAllTabletsInShard(context.Background(), analysisEntry.AnalyzedKeyspace, analysisEntry.AnalyzedShard, nil) } else { // For all other recoveries, we would have changed the replication status of the analyzed tablet // so it doesn't hurt to re-read the information of this tablet, otherwise we'll requeue the same recovery // that we just completed because we would be using stale data. + logger.Info("Force discovering problem instance %s post recovery", analysisEntry.AnalyzedInstanceAlias) DiscoverInstance(analysisEntry.AnalyzedInstanceAlias, true) } return err