From 27fbccb1daf28be14990fa1a92badbabe57a7e86 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Tue, 30 Jan 2024 21:04:53 +0800 Subject: [PATCH] disttask: workaround for #50089 to fix subtask rerun (#50821) close pingcap/tidb#50818 --- pkg/disttask/framework/scheduler/balancer.go | 5 +++++ pkg/disttask/framework/scheduler/nodes.go | 2 +- pkg/disttask/framework/taskexecutor/task_executor.go | 11 +++++++++++ 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/pkg/disttask/framework/scheduler/balancer.go b/pkg/disttask/framework/scheduler/balancer.go index d5309b17d3b78..86f1bad9fcfec 100644 --- a/pkg/disttask/framework/scheduler/balancer.go +++ b/pkg/disttask/framework/scheduler/balancer.go @@ -155,6 +155,11 @@ func (b *balancer) doBalanceSubtasks(ctx context.Context, taskID int64, eligible executorWithOneMoreSubtask := make(map[string]struct{}, remainder) for node, sts := range executorSubtasks { if _, ok := adjustedNodeMap[node]; !ok { + b.logger.Info("dead node or not have enough slots, schedule subtasks away", + zap.Int64("task-id", taskID), + zap.String("node", node), + zap.Int("slot-capacity", b.slotMgr.getCapacity()), + zap.Int("used-slots", b.currUsedSlots[node])) // dead node or not have enough slots subtasksNeedSchedule = append(subtasksNeedSchedule, sts...) delete(executorSubtasks, node) diff --git a/pkg/disttask/framework/scheduler/nodes.go b/pkg/disttask/framework/scheduler/nodes.go index cda0bfd000597..65001bd2c0c53 100644 --- a/pkg/disttask/framework/scheduler/nodes.go +++ b/pkg/disttask/framework/scheduler/nodes.go @@ -107,7 +107,7 @@ func (nm *NodeManager) maintainLiveNodes(ctx context.Context, taskMgr TaskManage return } nm.logger.Info("delete dead nodes from dist_framework_meta", - zap.Int("dead-nodes", len(deadNodes))) + zap.Strings("dead-nodes", deadNodes)) err = taskMgr.DeleteDeadNodes(ctx, deadNodes) if err != nil { nm.logger.Warn("delete dead nodes from dist_framework_meta failed", llog.ShortError(err)) diff --git a/pkg/disttask/framework/taskexecutor/task_executor.go b/pkg/disttask/framework/taskexecutor/task_executor.go index 9682219d5b494..39dbccdbed210 100644 --- a/pkg/disttask/framework/taskexecutor/task_executor.go +++ b/pkg/disttask/framework/taskexecutor/task_executor.go @@ -122,6 +122,14 @@ func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context) { e.logger.Error("get subtasks failed", zap.Error(err)) continue } + if ctx.Err() != nil { + // workaround for https://github.com/pingcap/tidb/issues/50089 + // timeline to trigger this: + // - this routine runs GetSubtasksByExecIDAndStepAndStates + // - outer runSubtask finishes and cancel check-context + // - GetSubtasksByExecIDAndStepAndStates returns with no err and no result + return + } if len(subtasks) == 0 { e.logger.Info("subtask is scheduled away, cancel running") // cancels runStep, but leave the subtask state unchanged. @@ -143,6 +151,9 @@ func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context) { if len(extraRunningSubtasks) > 0 { if err = e.taskTable.RunningSubtasksBack2Pending(ctx, extraRunningSubtasks); err != nil { e.logger.Error("update running subtasks back to pending failed", zap.Error(err)) + } else { + e.logger.Info("update extra running subtasks back to pending", + zap.Stringers("subtasks", extraRunningSubtasks)) } } }