From d22ee9141d6e13b73ac4dfb92fd775df226502f0 Mon Sep 17 00:00:00 2001 From: Shirly Date: Wed, 15 Jan 2025 20:23:16 +0800 Subject: [PATCH] txnkv/transaction/prewrite: tiny refactor HandleSingleBatch (#1529) Signed-off-by: AndreMouche --- txnkv/transaction/prewrite.go | 598 ++++++++++++++++++++-------------- 1 file changed, 348 insertions(+), 250 deletions(-) diff --git a/txnkv/transaction/prewrite.go b/txnkv/transaction/prewrite.go index 21b84e8a39..b01e1bba61 100644 --- a/txnkv/transaction/prewrite.go +++ b/txnkv/transaction/prewrite.go @@ -42,6 +42,7 @@ import ( "time" "github.com/opentracing/opentracing-go" + "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -226,288 +227,385 @@ func (action actionPrewrite) handleSingleBatch( // regions. It invokes `prewriteMutations` recursively here, and the number of batches will be // checked there. - if c.sessionID > 0 { - if batch.isPrimary { - if _, err := util.EvalFailpoint("prewritePrimaryFail"); err == nil { - // Delay to avoid cancelling other normally ongoing prewrite requests. - time.Sleep(time.Millisecond * 50) - logutil.Logger(bo.GetCtx()).Info( - "[failpoint] injected error on prewriting primary batch", - zap.Uint64("txnStartTS", c.startTS), - ) - return errors.New("injected error on prewriting primary batch") - } - util.EvalFailpoint("prewritePrimary") // for other failures like sleep or pause - } else { - if _, err := util.EvalFailpoint("prewriteSecondaryFail"); err == nil { - // Delay to avoid cancelling other normally ongoing prewrite requests. - time.Sleep(time.Millisecond * 50) - logutil.Logger(bo.GetCtx()).Info( - "[failpoint] injected error on prewriting secondary batch", - zap.Uint64("txnStartTS", c.startTS), - ) - return errors.New("injected error on prewriting secondary batch") - } - util.EvalFailpoint("prewriteSecondary") // for other failures like sleep or pause - // concurrent failpoint sleep doesn't work as expected. So we need a separate fail point. - // `1*sleep()` can block multiple concurrent threads that meet the failpoint. - if val, err := util.EvalFailpoint("prewriteSecondarySleep"); err == nil { - time.Sleep(time.Millisecond * time.Duration(val.(int))) - } + if err = action.handleSingleBatchFailpoint(c, bo, batch); err != nil { + return err + } + + handler := action.newSingleBatchPrewriteReqHandler(c, batch, bo) + + var retryable bool + for { + // It will return false if the request is success or meet an unretryable error. + // otherwise if the error is retryable, it will return true. + retryable, err = handler.sendReqAndCheck() + if !retryable { + handler.drop(err) + return err + } + } +} + +// handleSingleBatchFailpoint is used to inject errors for test. +func (action actionPrewrite) handleSingleBatchFailpoint(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error { + if c.sessionID <= 0 { + return nil + } + if batch.isPrimary { + if _, err := util.EvalFailpoint("prewritePrimaryFail"); err == nil { + // Delay to avoid cancelling other normally ongoing prewrite requests. + time.Sleep(time.Millisecond * 50) + logutil.Logger(bo.GetCtx()).Info( + "[failpoint] injected error on prewriting primary batch", + zap.Uint64("txnStartTS", c.startTS), + ) + return errors.New("injected error on prewriting primary batch") } + util.EvalFailpoint("prewritePrimary") // for other failures like sleep or pause + return nil + } + + if _, err := util.EvalFailpoint("prewriteSecondaryFail"); err == nil { + // Delay to avoid cancelling other normally ongoing prewrite requests. + time.Sleep(time.Millisecond * 50) + logutil.Logger(bo.GetCtx()).Info( + "[failpoint] injected error on prewriting secondary batch", + zap.Uint64("txnStartTS", c.startTS), + ) + return errors.New("injected error on prewriting secondary batch") + } + util.EvalFailpoint("prewriteSecondary") // for other failures like sleep or pause + // concurrent failpoint sleep doesn't work as expected. So we need a separate fail point. + // `1*sleep()` can block multiple concurrent threads that meet the failpoint. + if val, err := util.EvalFailpoint("prewriteSecondarySleep"); err == nil { + time.Sleep(time.Millisecond * time.Duration(val.(int))) } + return nil +} + +func (c *twoPhaseCommitter) prewriteMutations(bo *retry.Backoffer, mutations CommitterMutations) error { + if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("twoPhaseCommitter.prewriteMutations", opentracing.ChildOf(span.Context())) + defer span1.Finish() + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) + } + + // `doActionOnMutations` will unset `useOnePC` if the mutations is splitted into multiple batches. + return c.doActionOnMutations(bo, actionPrewrite{isInternal: c.txn.isInternal()}, mutations) +} +// prewrite1BatchReqHandler is used to handle 1 singleBatch prewrite request. +type prewrite1BatchReqHandler struct { + committer *twoPhaseCommitter + action *actionPrewrite + req *tikvrpc.Request + batch batchMutations + bo *retry.Backoffer + sender *locate.RegionRequestSender + resolvingRecordToken *int + attempts int + // begin is the time when the first request is sent, + // it will be reset once the total duration exceeds the slowRequestThreshold + // It's used to log slow prewrite requests. + begin time.Time +} + +func (action actionPrewrite) newSingleBatchPrewriteReqHandler(c *twoPhaseCommitter, batch batchMutations, bo *retry.Backoffer) *prewrite1BatchReqHandler { txnSize := uint64(c.regionTxnSize[batch.region.GetID()]) // When we retry because of a region miss, we don't know the transaction size. We set the transaction size here // to MaxUint64 to avoid unexpected "resolve lock lite". if action.retry { txnSize = math.MaxUint64 } - - tBegin := time.Now() - attempts := 0 - req := c.buildPrewriteRequest(batch, txnSize) sender := locate.NewRegionRequestSender(c.store.GetRegionCache(), c.store.GetTiKVClient(), c.store.GetOracle()) - var resolvingRecordToken *int - defer func() { - if err != nil { - // If we fail to receive response for async commit prewrite, it will be undetermined whether this - // transaction has been successfully committed. - // If prewrite has been cancelled, all ongoing prewrite RPCs will become errors, we needn't set undetermined - // errors. - if (c.isAsyncCommit() || c.isOnePC()) && sender.GetRPCError() != nil && atomic.LoadUint32(&c.prewriteCancelled) == 0 { - c.setUndeterminedErr(sender.GetRPCError()) - } - } - }() - for { - attempts++ - if action.hasRpcRetries { - req.IsRetryRequest = true - } - reqBegin := time.Now() - if reqBegin.Sub(tBegin) > slowRequestThreshold { - logutil.BgLogger().Warn( - "slow prewrite request", - zap.Uint64("startTS", c.startTS), - zap.Stringer("region", &batch.region), - zap.Int("attempts", attempts), - ) - tBegin = time.Now() - } + return &prewrite1BatchReqHandler{ + action: &action, + req: req, + committer: c, + batch: batch, + bo: bo, + sender: sender, + begin: time.Now(), + attempts: 0, + resolvingRecordToken: nil, + } +} - resp, retryTimes, err := sender.SendReq(bo, req, batch.region, client.ReadTimeoutShort) - // Unexpected error occurs, return it - if err != nil { - return err - } - if retryTimes > 0 { - action.hasRpcRetries = true +// drop is called when the prewrite request is finished. It checks the error and updates the commit details. +func (handler *prewrite1BatchReqHandler) drop(err error) { + if err != nil { + // If we fail to receive response for async commit prewrite, it will be undetermined whether this + // transaction has been successfully committed. + // If prewrite has been cancelled, all ongoing prewrite RPCs will become errors, we needn't set undetermined + // errors. + if (handler.committer.isAsyncCommit() || handler.committer.isOnePC()) && handler.sender.GetRPCError() != nil && atomic.LoadUint32(&handler.committer.prewriteCancelled) == 0 { + handler.committer.setUndeterminedErr(handler.sender.GetRPCError()) } + } + if handler.resolvingRecordToken != nil { + handler.committer.store.GetLockResolver().ResolveLocksDone(handler.committer.startTS, *handler.resolvingRecordToken) + } +} + +func (handler *prewrite1BatchReqHandler) beforeSend(reqBegin time.Time) { + handler.attempts++ + if handler.action.hasRpcRetries { + handler.req.IsRetryRequest = true + } + if handler.attempts == 1 { + return + } + if reqBegin.Sub(handler.begin) > slowRequestThreshold { + logutil.BgLogger().Warn( + "slow prewrite request", + zap.Uint64("startTS", handler.committer.startTS), + zap.Stringer("region", &handler.batch.region), + zap.Int("attempts", handler.attempts), + ) + handler.begin = reqBegin + } +} + +// sendAndCheckReq sends the prewrite request to the TiKV server and check the response. +// If the TiKV server returns a retryable error, the function returns true. +// If the TiKV server returns ok or a non-retryable error, the function returns false. +func (handler *prewrite1BatchReqHandler) sendReqAndCheck() (retryable bool, err error) { + reqBegin := time.Now() + handler.beforeSend(reqBegin) + resp, retryTimes, err := handler.sender.SendReq(handler.bo, handler.req, handler.batch.region, client.ReadTimeoutShort) + // Unexpected error occurs, return it directly. + if err != nil { + return false, err + } + if retryTimes > 0 { + handler.action.hasRpcRetries = true + } + regionErr, err := resp.GetRegionError() + if err != nil { + return false, err + } + if regionErr != nil { + return handler.handleRegionErr(regionErr) + } + + if resp.Resp == nil { + return false, errors.WithStack(tikverr.ErrBodyMissing) + } + prewriteResp := resp.Resp.(*kvrpcpb.PrewriteResponse) + keyErrs := prewriteResp.GetErrors() + if len(keyErrs) == 0 { + return false, handler.handleSingleBatchSucceed(reqBegin, prewriteResp) + } + + locks, e := handler.extractKeyErrs(keyErrs) + if e != nil { + return false, e + } - regionErr, err := resp.GetRegionError() + if err := handler.resolveLocks(locks); err != nil { + return false, err + } + return true, nil + +} + +// handleRegionErr handles region errors when sending the prewrite request. +// If the region error is EpochNotMatch and the data is still in the same region, return with retrable true. +// Otherwise, the function returns with retryable false if the region error is not retryable: +// 1. The region epoch is changed and the data is not in the same region any more: +// doActionOnMutations directly and return retryable false regardless of success or failure. +// 2. Other region errors. +func (handler *prewrite1BatchReqHandler) handleRegionErr(regionErr *errorpb.Error) (retryable bool, err error) { + // For other region error and the fake region error, backoff because + // there's something wrong. + // For the real EpochNotMatch error, don't backoff. + if regionErr.GetEpochNotMatch() == nil || locate.IsFakeRegionError(regionErr) { + err := handler.bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) if err != nil { - return err + return false, err } - if regionErr != nil { - // For other region error and the fake region error, backoff because - // there's something wrong. - // For the real EpochNotMatch error, don't backoff. - if regionErr.GetEpochNotMatch() == nil || locate.IsFakeRegionError(regionErr) { - err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) - if err != nil { - return err - } - } - if regionErr.GetDiskFull() != nil { - storeIds := regionErr.GetDiskFull().GetStoreId() - desc := " " - for _, i := range storeIds { - desc += strconv.FormatUint(i, 10) + " " - } - - logutil.Logger(bo.GetCtx()).Error( - "Request failed cause of TiKV disk full", - zap.String("store_id", desc), - zap.String("reason", regionErr.GetDiskFull().GetReason()), - ) - - return errors.New(regionErr.String()) - } - same, err := batch.relocate(bo, c.store.GetRegionCache()) - if err != nil { - return err - } - if same { - continue - } - err = c.doActionOnMutations(bo, actionPrewrite{true, action.isInternal, action.hasRpcRetries}, batch.mutations) - return err + } + if regionErr.GetDiskFull() != nil { + storeIds := regionErr.GetDiskFull().GetStoreId() + desc := " " + for _, i := range storeIds { + desc += strconv.FormatUint(i, 10) + " " } - if resp.Resp == nil { - return errors.WithStack(tikverr.ErrBodyMissing) - } - prewriteResp := resp.Resp.(*kvrpcpb.PrewriteResponse) - keyErrs := prewriteResp.GetErrors() - if len(keyErrs) == 0 { - // Clear the RPC Error since the request is evaluated successfully. - sender.SetRPCError(nil) - - // Update CommitDetails - reqDuration := time.Since(reqBegin) - c.getDetail().MergePrewriteReqDetails( - reqDuration, - batch.region.GetID(), - sender.GetStoreAddr(), - prewriteResp.ExecDetailsV2, - ) + logutil.Logger(handler.bo.GetCtx()).Error( + "Request failed cause of TiKV disk full", + zap.String("store_id", desc), + zap.String("reason", regionErr.GetDiskFull().GetReason()), + ) - if batch.isPrimary { - // After writing the primary key, if the size of the transaction is larger than 32M, - // start the ttlManager. The ttlManager will be closed in tikvTxn.Commit(). - // In this case 1PC is not expected to be used, but still check it for safety. - if int64(c.txnSize) > config.GetGlobalConfig().TiKVClient.TTLRefreshedTxnSize && - prewriteResp.OnePcCommitTs == 0 { - c.run(c, nil, false) - } - } + return false, errors.New(regionErr.String()) + } - if c.isOnePC() { - if prewriteResp.OnePcCommitTs == 0 { - if prewriteResp.MinCommitTs != 0 { - return errors.New("MinCommitTs must be 0 when 1pc falls back to 2pc") - } - logutil.Logger(bo.GetCtx()).Warn( - "1pc failed and fallbacks to normal commit procedure", - zap.Uint64("startTS", c.startTS), - ) - metrics.OnePCTxnCounterFallback.Inc() - c.setOnePC(false) - c.setAsyncCommit(false) - } else { - // For 1PC, there's no racing to access `onePCCommitTS` so it's safe - // not to lock the mutex. - if c.onePCCommitTS != 0 { - logutil.Logger(bo.GetCtx()).Fatal( - "one pc happened multiple times", - zap.Uint64("startTS", c.startTS), - ) - } - c.onePCCommitTS = prewriteResp.OnePcCommitTs - } - return nil - } else if prewriteResp.OnePcCommitTs != 0 { - logutil.Logger(bo.GetCtx()).Fatal( - "tikv committed a non-1pc transaction with 1pc protocol", - zap.Uint64("startTS", c.startTS), - ) - } - if c.isAsyncCommit() { - // 0 if the min_commit_ts is not ready or any other reason that async - // commit cannot proceed. The client can then fallback to normal way to - // continue committing the transaction if prewrite are all finished. - if prewriteResp.MinCommitTs == 0 { - if c.testingKnobs.noFallBack { - return nil - } - logutil.Logger(bo.GetCtx()).Warn( - "async commit cannot proceed since the returned minCommitTS is zero, "+ - "fallback to normal path", zap.Uint64("startTS", c.startTS), - ) - c.setAsyncCommit(false) - } else { - c.mu.Lock() - if prewriteResp.MinCommitTs > c.minCommitTSMgr.get() { - c.minCommitTSMgr.tryUpdate(prewriteResp.MinCommitTs, twoPCAccess) - } - c.mu.Unlock() - } - } + // for the real EpochNotMatch error, relocate the region. + same, err := handler.batch.relocate(handler.bo, handler.committer.store.GetRegionCache()) + if err != nil { + return false, err + } + if same { + return true, nil + } + err = handler.committer.doActionOnMutations(handler.bo, actionPrewrite{true, handler.action.isInternal, handler.action.hasRpcRetries}, handler.batch.mutations) + return false, err +} - return nil +// extractKeyErrs extracts locks from key errors. +func (handler *prewrite1BatchReqHandler) extractKeyErrs(keyErrs []*kvrpcpb.KeyError) ([]*txnlock.Lock, error) { + var locks []*txnlock.Lock + logged := make(map[uint64]struct{}) + for _, keyErr := range keyErrs { + // Check already exists error + if alreadyExist := keyErr.GetAlreadyExist(); alreadyExist != nil { + e := &tikverr.ErrKeyExist{AlreadyExist: alreadyExist} + return nil, handler.committer.extractKeyExistsErr(e) } - var locks []*txnlock.Lock - logged := make(map[uint64]struct{}) - for _, keyErr := range keyErrs { - // Check already exists error - if alreadyExist := keyErr.GetAlreadyExist(); alreadyExist != nil { - e := &tikverr.ErrKeyExist{AlreadyExist: alreadyExist} - return c.extractKeyExistsErr(e) - } - // Extract lock from key error - lock, err1 := txnlock.ExtractLockFromKeyErr(keyErr) - if err1 != nil { - return err1 - } - if _, ok := logged[lock.TxnID]; !ok { - logutil.BgLogger().Info( - "prewrite encounters lock. "+ - "More locks belonging to the same transaction may be omitted", - zap.Uint64("session", c.sessionID), - zap.Uint64("txnID", c.startTS), - zap.Stringer("lock", lock), - zap.Stringer("policy", c.txn.prewriteEncounterLockPolicy), - ) - logged[lock.TxnID] = struct{}{} - } - // If an optimistic transaction encounters a lock with larger TS, this transaction will certainly - // fail due to a WriteConflict error. So we can construct and return an error here early. - // Pessimistic transactions don't need such an optimization. If this key needs a pessimistic lock, - // TiKV will return a PessimisticLockNotFound error directly if it encounters a different lock. Otherwise, - // TiKV returns lock.TTL = 0, and we still need to resolve the lock. - if (lock.TxnID > c.startTS && !c.isPessimistic) || - c.txn.prewriteEncounterLockPolicy == NoResolvePolicy { - return tikverr.NewErrWriteConflictWithArgs( - c.startTS, - lock.TxnID, - 0, - lock.Key, - kvrpcpb.WriteConflict_Optimistic, - ) - } - locks = append(locks, lock) + // Extract lock from key error + lock, err1 := txnlock.ExtractLockFromKeyErr(keyErr) + if err1 != nil { + return nil, err1 } - if resolvingRecordToken == nil { - token := c.store.GetLockResolver().RecordResolvingLocks(locks, c.startTS) - resolvingRecordToken = &token - defer c.store.GetLockResolver().ResolveLocksDone(c.startTS, *resolvingRecordToken) - } else { - c.store.GetLockResolver().UpdateResolvingLocks(locks, c.startTS, *resolvingRecordToken) + if _, ok := logged[lock.TxnID]; !ok { + logutil.BgLogger().Info( + "prewrite encounters lock. "+ + "More locks belonging to the same transaction may be omitted", + zap.Uint64("session", handler.committer.sessionID), + zap.Uint64("txnID", handler.committer.startTS), + zap.Stringer("lock", lock), + zap.Stringer("policy", handler.committer.txn.prewriteEncounterLockPolicy), + ) + logged[lock.TxnID] = struct{}{} } - resolveLockOpts := txnlock.ResolveLocksOptions{ - CallerStartTS: c.startTS, - Locks: locks, - Detail: &c.getDetail().ResolveLock, - PessimisticRegionResolve: true, + // If an optimistic transaction encounters a lock with larger TS, this transaction will certainly + // fail due to a WriteConflict error. So we can construct and return an error here early. + // Pessimistic transactions don't need such an optimization. If this key needs a pessimistic lock, + // TiKV will return a PessimisticLockNotFound error directly if it encounters a different lock. Otherwise, + // TiKV returns lock.TTL = 0, and we still need to resolve the lock. + if (lock.TxnID > handler.committer.startTS && !handler.committer.isPessimistic) || + handler.committer.txn.prewriteEncounterLockPolicy == NoResolvePolicy { + return nil, tikverr.NewErrWriteConflictWithArgs( + handler.committer.startTS, + lock.TxnID, + 0, + lock.Key, + kvrpcpb.WriteConflict_Optimistic, + ) } - resolveLockRes, err := c.store.GetLockResolver().ResolveLocksWithOpts(bo, resolveLockOpts) + locks = append(locks, lock) + } + return locks, nil +} + +// resolveLocks resolves locks. +func (handler *prewrite1BatchReqHandler) resolveLocks(locks []*txnlock.Lock) error { + if handler.resolvingRecordToken == nil { + token := handler.committer.store.GetLockResolver().RecordResolvingLocks(locks, handler.committer.startTS) + handler.resolvingRecordToken = &token + } else { + handler.committer.store.GetLockResolver().UpdateResolvingLocks(locks, handler.committer.startTS, *handler.resolvingRecordToken) + } + + resolveLockOpts := txnlock.ResolveLocksOptions{ + CallerStartTS: handler.committer.startTS, + Locks: locks, + Detail: &handler.committer.getDetail().ResolveLock, + PessimisticRegionResolve: true, + } + resolveLockRes, err := handler.committer.store.GetLockResolver().ResolveLocksWithOpts(handler.bo, resolveLockOpts) + if err != nil { + return err + } + msBeforeExpired := resolveLockRes.TTL + if msBeforeExpired > 0 { + err = handler.bo.BackoffWithCfgAndMaxSleep( + retry.BoTxnLock, + int(msBeforeExpired), + errors.Errorf("2PC prewrite lockedKeys: %d", len(locks)), + ) if err != nil { return err } - msBeforeExpired := resolveLockRes.TTL - if msBeforeExpired > 0 { - err = bo.BackoffWithCfgAndMaxSleep( - retry.BoTxnLock, - int(msBeforeExpired), - errors.Errorf("2PC prewrite lockedKeys: %d", len(locks)), + } + return nil +} + +// handleSingleBatchSucceed handles the response when the prewrite request is successful. +func (handler *prewrite1BatchReqHandler) handleSingleBatchSucceed(reqBegin time.Time, prewriteResp *kvrpcpb.PrewriteResponse) error { + // Clear the RPC Error since the request is evaluated successfully. + handler.sender.SetRPCError(nil) + + // Update CommitDetails + reqDuration := time.Since(reqBegin) + handler.committer.getDetail().MergePrewriteReqDetails( + reqDuration, + handler.batch.region.GetID(), + handler.sender.GetStoreAddr(), + prewriteResp.ExecDetailsV2, + ) + + if handler.batch.isPrimary { + // After writing the primary key, if the size of the transaction is larger than 32M, + // start the ttlManager. The ttlManager will be closed in tikvTxn.Commit(). + // In this case 1PC is not expected to be used, but still check it for safety. + if int64(handler.committer.txnSize) > config.GetGlobalConfig().TiKVClient.TTLRefreshedTxnSize && + prewriteResp.OnePcCommitTs == 0 { + handler.committer.ttlManager.run(handler.committer, nil, false) + } + } + if handler.committer.isOnePC() { + if prewriteResp.OnePcCommitTs == 0 { + if prewriteResp.MinCommitTs != 0 { + return errors.New("MinCommitTs must be 0 when 1pc falls back to 2pc") + } + logutil.Logger(handler.bo.GetCtx()).Warn( + "1pc failed and fallbacks to normal commit procedure", + zap.Uint64("startTS", handler.committer.startTS), ) - if err != nil { - return err + metrics.OnePCTxnCounterFallback.Inc() + handler.committer.setOnePC(false) + handler.committer.setAsyncCommit(false) + } else { + // For 1PC, there's no racing to access `onePCCommitTS` so it's safe + // not to lock the mutex. + if handler.committer.onePCCommitTS != 0 { + logutil.Logger(handler.bo.GetCtx()).Fatal( + "one pc happened multiple times", + zap.Uint64("startTS", handler.committer.startTS), + ) } + handler.committer.onePCCommitTS = prewriteResp.OnePcCommitTs } + return nil + } else if prewriteResp.OnePcCommitTs != 0 { + logutil.Logger(handler.bo.GetCtx()).Fatal( + "tikv committed a non-1pc transaction with 1pc protocol", + zap.Uint64("startTS", handler.committer.startTS), + ) } -} - -func (c *twoPhaseCommitter) prewriteMutations(bo *retry.Backoffer, mutations CommitterMutations) error { - if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("twoPhaseCommitter.prewriteMutations", opentracing.ChildOf(span.Context())) - defer span1.Finish() - bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) + if handler.committer.isAsyncCommit() { + // 0 if the min_commit_ts is not ready or any other reason that async + // commit cannot proceed. The client can then fallback to normal way to + // continue committing the transaction if prewrite are all finished. + if prewriteResp.MinCommitTs == 0 { + if handler.committer.testingKnobs.noFallBack { + return nil + } + logutil.Logger(handler.bo.GetCtx()).Warn( + "async commit cannot proceed since the returned minCommitTS is zero, "+ + "fallback to normal path", zap.Uint64("startTS", handler.committer.startTS), + ) + handler.committer.setAsyncCommit(false) + } else { + handler.committer.mu.Lock() + if prewriteResp.MinCommitTs > handler.committer.minCommitTSMgr.get() { + handler.committer.minCommitTSMgr.tryUpdate(prewriteResp.MinCommitTs, twoPCAccess) + } + handler.committer.mu.Unlock() + } } - - // `doActionOnMutations` will unset `useOnePC` if the mutations is splitted into multiple batches. - return c.doActionOnMutations(bo, actionPrewrite{isInternal: c.txn.isInternal()}, mutations) + return nil }