Skip to content

Commit

Permalink
disttask: workaround for pingcap#50089 to fix subtask rerun (pingcap#…
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter authored Jan 30, 2024
1 parent 6210edd commit 27fbccb
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 1 deletion.
5 changes: 5 additions & 0 deletions pkg/disttask/framework/scheduler/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ func (b *balancer) doBalanceSubtasks(ctx context.Context, taskID int64, eligible
executorWithOneMoreSubtask := make(map[string]struct{}, remainder)
for node, sts := range executorSubtasks {
if _, ok := adjustedNodeMap[node]; !ok {
b.logger.Info("dead node or not have enough slots, schedule subtasks away",
zap.Int64("task-id", taskID),
zap.String("node", node),
zap.Int("slot-capacity", b.slotMgr.getCapacity()),
zap.Int("used-slots", b.currUsedSlots[node]))
// dead node or not have enough slots
subtasksNeedSchedule = append(subtasksNeedSchedule, sts...)
delete(executorSubtasks, node)
Expand Down
2 changes: 1 addition & 1 deletion pkg/disttask/framework/scheduler/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (nm *NodeManager) maintainLiveNodes(ctx context.Context, taskMgr TaskManage
return
}
nm.logger.Info("delete dead nodes from dist_framework_meta",
zap.Int("dead-nodes", len(deadNodes)))
zap.Strings("dead-nodes", deadNodes))
err = taskMgr.DeleteDeadNodes(ctx, deadNodes)
if err != nil {
nm.logger.Warn("delete dead nodes from dist_framework_meta failed", llog.ShortError(err))
Expand Down
11 changes: 11 additions & 0 deletions pkg/disttask/framework/taskexecutor/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,14 @@ func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context) {
e.logger.Error("get subtasks failed", zap.Error(err))
continue
}
if ctx.Err() != nil {
// workaround for https://github.com/pingcap/tidb/issues/50089
// timeline to trigger this:
// - this routine runs GetSubtasksByExecIDAndStepAndStates
// - outer runSubtask finishes and cancel check-context
// - GetSubtasksByExecIDAndStepAndStates returns with no err and no result
return
}
if len(subtasks) == 0 {
e.logger.Info("subtask is scheduled away, cancel running")
// cancels runStep, but leave the subtask state unchanged.
Expand All @@ -143,6 +151,9 @@ func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context) {
if len(extraRunningSubtasks) > 0 {
if err = e.taskTable.RunningSubtasksBack2Pending(ctx, extraRunningSubtasks); err != nil {
e.logger.Error("update running subtasks back to pending failed", zap.Error(err))
} else {
e.logger.Info("update extra running subtasks back to pending",
zap.Stringers("subtasks", extraRunningSubtasks))
}
}
}
Expand Down

0 comments on commit 27fbccb

Please sign in to comment.