From e2980be40eae827b3257faf0c4d860b9dccfc196 Mon Sep 17 00:00:00 2001 From: AndreMouche Date: Wed, 18 Dec 2024 12:51:10 -0800 Subject: [PATCH] refactor handleSingleBatch --- txnkv/transaction/prewrite.go | 459 +++++++++++++++++++--------------- 1 file changed, 253 insertions(+), 206 deletions(-) diff --git a/txnkv/transaction/prewrite.go b/txnkv/transaction/prewrite.go index b48c10c746..3ff35a10cc 100644 --- a/txnkv/transaction/prewrite.go +++ b/txnkv/transaction/prewrite.go @@ -231,183 +231,182 @@ func (action actionPrewrite) handleSingleBatch( return err } + handler := action.newSingleBatchPrewriteReqHandler(c, batch, bo) + defer handler.drop(err) + + for { + retryable, err := handler.sendReqAndCheck() + if !retryable { + return err + } + } + +} + +// handleSingleBatchFailpoint is used to inject errors for test. +func (action actionPrewrite) handleSingleBatchFailpoint(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error { + if c.sessionID <= 0 { + return nil + } + if batch.isPrimary { + if _, err := util.EvalFailpoint("prewritePrimaryFail"); err == nil { + // Delay to avoid cancelling other normally ongoing prewrite requests. + time.Sleep(time.Millisecond * 50) + logutil.Logger(bo.GetCtx()).Info( + "[failpoint] injected error on prewriting primary batch", + zap.Uint64("txnStartTS", c.startTS), + ) + return errors.New("injected error on prewriting primary batch") + } + util.EvalFailpoint("prewritePrimary") // for other failures like sleep or pause + return nil + } + + if _, err := util.EvalFailpoint("prewriteSecondaryFail"); err == nil { + // Delay to avoid cancelling other normally ongoing prewrite requests. + time.Sleep(time.Millisecond * 50) + logutil.Logger(bo.GetCtx()).Info( + "[failpoint] injected error on prewriting secondary batch", + zap.Uint64("txnStartTS", c.startTS), + ) + return errors.New("injected error on prewriting secondary batch") + } + util.EvalFailpoint("prewriteSecondary") // for other failures like sleep or pause + // concurrent failpoint sleep doesn't work as expected. So we need a separate fail point. + // `1*sleep()` can block multiple concurrent threads that meet the failpoint. + if val, err := util.EvalFailpoint("prewriteSecondarySleep"); err == nil { + time.Sleep(time.Millisecond * time.Duration(val.(int))) + } + return nil +} + +func (c *twoPhaseCommitter) prewriteMutations(bo *retry.Backoffer, mutations CommitterMutations) error { + if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("twoPhaseCommitter.prewriteMutations", opentracing.ChildOf(span.Context())) + defer span1.Finish() + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) + } + + // `doActionOnMutations` will unset `useOnePC` if the mutations is splitted into multiple batches. + return c.doActionOnMutations(bo, actionPrewrite{isInternal: c.txn.isInternal()}, mutations) +} + +type prewrite1BatchReqHandler struct { + committer *twoPhaseCommitter + action *actionPrewrite + req *tikvrpc.Request + batch batchMutations + bo *retry.Backoffer + sender *locate.RegionRequestSender + resolvingRecordToken *int + attempts int + begin time.Time +} + +func (action actionPrewrite) newSingleBatchPrewriteReqHandler(c *twoPhaseCommitter, batch batchMutations, bo *retry.Backoffer) *prewrite1BatchReqHandler { txnSize := uint64(c.regionTxnSize[batch.region.GetID()]) // When we retry because of a region miss, we don't know the transaction size. We set the transaction size here // to MaxUint64 to avoid unexpected "resolve lock lite". if action.retry { txnSize = math.MaxUint64 } - - tBegin := time.Now() - attempts := 0 - req := c.buildPrewriteRequest(batch, txnSize) sender := locate.NewRegionRequestSender(c.store.GetRegionCache(), c.store.GetTiKVClient(), c.store.GetOracle()) - var resolvingRecordToken *int - defer func() { - if err != nil { - // If we fail to receive response for async commit prewrite, it will be undetermined whether this - // transaction has been successfully committed. - // If prewrite has been cancelled, all ongoing prewrite RPCs will become errors, we needn't set undetermined - // errors. - if (c.isAsyncCommit() || c.isOnePC()) && sender.GetRPCError() != nil && atomic.LoadUint32(&c.prewriteCancelled) == 0 { - c.setUndeterminedErr(sender.GetRPCError()) - } - } - }() - for { - attempts++ - if action.hasRpcRetries { - req.IsRetryRequest = true - } - reqBegin := time.Now() - if reqBegin.Sub(tBegin) > slowRequestThreshold { - logutil.BgLogger().Warn( - "slow prewrite request", - zap.Uint64("startTS", c.startTS), - zap.Stringer("region", &batch.region), - zap.Int("attempts", attempts), - ) - tBegin = time.Now() - } - - resp, retryTimes, err := sender.SendReq(bo, req, batch.region, client.ReadTimeoutShort) - // Unexpected error occurs, return it directly. - if err != nil { - return err - } - if retryTimes > 0 { - action.hasRpcRetries = true - } - regionErr, err := resp.GetRegionError() - if err != nil { - return err - } - if regionErr != nil { - if err := action.handleRegionErr(bo, regionErr); err != nil { - return err - } - same, err := batch.relocate(bo, c.store.GetRegionCache()) - if err != nil { - return err - } - if same { - continue - } - err = c.doActionOnMutations(bo, actionPrewrite{true, action.isInternal, action.hasRpcRetries}, batch.mutations) - return err - } + return &prewrite1BatchReqHandler{ + action: &action, + req: req, + committer: c, + batch: batch, + bo: bo, + sender: sender, + begin: time.Now(), + attempts: 0, + resolvingRecordToken: nil, + } +} - if resp.Resp == nil { - return errors.WithStack(tikverr.ErrBodyMissing) - } - prewriteResp := resp.Resp.(*kvrpcpb.PrewriteResponse) - keyErrs := prewriteResp.GetErrors() - if len(keyErrs) > 0 { - locks, e := action.extractKeyErrs(c, keyErrs) - if e != nil { - return e - } - if resolvingRecordToken == nil { - token := c.store.GetLockResolver().RecordResolvingLocks(locks, c.startTS) - resolvingRecordToken = &token - defer c.store.GetLockResolver().ResolveLocksDone(c.startTS, *resolvingRecordToken) - } else { - c.store.GetLockResolver().UpdateResolvingLocks(locks, c.startTS, *resolvingRecordToken) - } - if err := action.resolveLocks(c, bo, locks); err != nil { - return err - } - continue +// drop is called when the prewrite request is finished. It checks the error and updates the commit details. +func (handler *prewrite1BatchReqHandler) drop(err error) { + if err != nil { + // If we fail to receive response for async commit prewrite, it will be undetermined whether this + // transaction has been successfully committed. + // If prewrite has been cancelled, all ongoing prewrite RPCs will become errors, we needn't set undetermined + // errors. + if (handler.committer.isAsyncCommit() || handler.committer.isOnePC()) && handler.sender.GetRPCError() != nil && atomic.LoadUint32(&handler.committer.prewriteCancelled) == 0 { + handler.committer.setUndeterminedErr(handler.sender.GetRPCError()) } + } + if handler.resolvingRecordToken != nil { + handler.committer.store.GetLockResolver().ResolveLocksDone(handler.committer.startTS, *handler.resolvingRecordToken) + } +} - // Clear the RPC Error since the request is evaluated successfully. - sender.SetRPCError(nil) - - // Update CommitDetails - reqDuration := time.Since(reqBegin) - c.getDetail().MergePrewriteReqDetails( - reqDuration, - batch.region.GetID(), - sender.GetStoreAddr(), - prewriteResp.ExecDetailsV2, +func (handler *prewrite1BatchReqHandler) beforeSend() { + handler.attempts++ + if handler.action.hasRpcRetries { + handler.req.IsRetryRequest = true + } + reqBegin := time.Now() + if reqBegin.Sub(handler.begin) > slowRequestThreshold { + logutil.BgLogger().Warn( + "slow prewrite request", + zap.Uint64("startTS", handler.committer.startTS), + zap.Stringer("region", &handler.batch.region), + zap.Int("attempts", handler.attempts), ) + handler.begin = time.Now() + } +} - if batch.isPrimary { - // After writing the primary key, if the size of the transaction is larger than 32M, - // start the ttlManager. The ttlManager will be closed in tikvTxn.Commit(). - // In this case 1PC is not expected to be used, but still check it for safety. - if int64(c.txnSize) > config.GetGlobalConfig().TiKVClient.TTLRefreshedTxnSize && - prewriteResp.OnePcCommitTs == 0 { - c.run(c, nil, false) - } - } +// sendAndCheckReq sends the prewrite request to the TiKV server and check the response. +func (handler *prewrite1BatchReqHandler) sendReqAndCheck() (retryable bool, err error) { + handler.beforeSend() + resp, retryTimes, err := handler.sender.SendReq(handler.bo, handler.req, handler.batch.region, client.ReadTimeoutShort) + // Unexpected error occurs, return it directly. + if err != nil { + return false, err + } + if retryTimes > 0 { + handler.action.hasRpcRetries = true + } + regionErr, err := resp.GetRegionError() + if err != nil { + return false, err + } + if regionErr != nil { + return handler.handleRegionErr(regionErr) + } - if c.isOnePC() { - if prewriteResp.OnePcCommitTs == 0 { - if prewriteResp.MinCommitTs != 0 { - return errors.New("MinCommitTs must be 0 when 1pc falls back to 2pc") - } - logutil.Logger(bo.GetCtx()).Warn( - "1pc failed and fallbacks to normal commit procedure", - zap.Uint64("startTS", c.startTS), - ) - metrics.OnePCTxnCounterFallback.Inc() - c.setOnePC(false) - c.setAsyncCommit(false) - } else { - // For 1PC, there's no racing to access `onePCCommitTS` so it's safe - // not to lock the mutex. - if c.onePCCommitTS != 0 { - logutil.Logger(bo.GetCtx()).Fatal( - "one pc happened multiple times", - zap.Uint64("startTS", c.startTS), - ) - } - c.onePCCommitTS = prewriteResp.OnePcCommitTs - } - return nil - } else if prewriteResp.OnePcCommitTs != 0 { - logutil.Logger(bo.GetCtx()).Fatal( - "tikv committed a non-1pc transaction with 1pc protocol", - zap.Uint64("startTS", c.startTS), - ) - } - if c.isAsyncCommit() { - // 0 if the min_commit_ts is not ready or any other reason that async - // commit cannot proceed. The client can then fallback to normal way to - // continue committing the transaction if prewrite are all finished. - if prewriteResp.MinCommitTs == 0 { - if c.testingKnobs.noFallBack { - return nil - } - logutil.Logger(bo.GetCtx()).Warn( - "async commit cannot proceed since the returned minCommitTS is zero, "+ - "fallback to normal path", zap.Uint64("startTS", c.startTS), - ) - c.setAsyncCommit(false) - } else { - c.mu.Lock() - if prewriteResp.MinCommitTs > c.minCommitTSMgr.get() { - c.minCommitTSMgr.tryUpdate(prewriteResp.MinCommitTs, twoPCAccess) - } - c.mu.Unlock() - } + if resp.Resp == nil { + return false, errors.WithStack(tikverr.ErrBodyMissing) + } + prewriteResp := resp.Resp.(*kvrpcpb.PrewriteResponse) + keyErrs := prewriteResp.GetErrors() + if len(keyErrs) == 0 { + return false, handler.handleSingleBatchSucceed(prewriteResp) + } - return nil - } + locks, e := handler.extractKeyErrs(keyErrs) + if e != nil { + return false, e } + + if err := handler.resolveLocks(locks); err != nil { + return false, err + } + return true, nil + } -// handleRegionErr handles region errors when prewriting. -func (action actionPrewrite) handleRegionErr(bo *retry.Backoffer, regionErr *errorpb.Error) error { +// handleRegionErr handles region errors when sending the prewrite request. +func (handler *prewrite1BatchReqHandler) handleRegionErr(regionErr *errorpb.Error) (retryable bool, err error) { // For other region error and the fake region error, backoff because // there's something wrong. // For the real EpochNotMatch error, don't backoff. if regionErr.GetEpochNotMatch() == nil || locate.IsFakeRegionError(regionErr) { - err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) + err := handler.bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) if err != nil { - return err + return false, err } } if regionErr.GetDiskFull() != nil { @@ -417,25 +416,36 @@ func (action actionPrewrite) handleRegionErr(bo *retry.Backoffer, regionErr *err desc += strconv.FormatUint(i, 10) + " " } - logutil.Logger(bo.GetCtx()).Error( + logutil.Logger(handler.bo.GetCtx()).Error( "Request failed cause of TiKV disk full", zap.String("store_id", desc), zap.String("reason", regionErr.GetDiskFull().GetReason()), ) - return errors.New(regionErr.String()) + return false, errors.New(regionErr.String()) } - return nil + + // for the real EpochNotMatch error, relocate the region. + same, err := handler.batch.relocate(handler.bo, handler.committer.store.GetRegionCache()) + if err != nil { + return false, err + } + if same { + return true, nil + } + err = handler.committer.doActionOnMutations(handler.bo, actionPrewrite{true, handler.action.isInternal, handler.action.hasRpcRetries}, handler.batch.mutations) + return false, err } -func (action actionPrewrite) extractKeyErrs(c *twoPhaseCommitter, keyErrs []*kvrpcpb.KeyError) ([]*txnlock.Lock, error) { +// extractKeyErrs extracts locks from key errors. +func (handler *prewrite1BatchReqHandler) extractKeyErrs(keyErrs []*kvrpcpb.KeyError) ([]*txnlock.Lock, error) { var locks []*txnlock.Lock logged := make(map[uint64]struct{}) for _, keyErr := range keyErrs { // Check already exists error if alreadyExist := keyErr.GetAlreadyExist(); alreadyExist != nil { e := &tikverr.ErrKeyExist{AlreadyExist: alreadyExist} - return nil, c.extractKeyExistsErr(e) + return nil, handler.committer.extractKeyExistsErr(e) } // Extract lock from key error @@ -447,8 +457,8 @@ func (action actionPrewrite) extractKeyErrs(c *twoPhaseCommitter, keyErrs []*kvr logutil.BgLogger().Info( "prewrite encounters lock. "+ "More locks belonging to the same transaction may be omitted", - zap.Uint64("session", c.sessionID), - zap.Uint64("txnID", c.startTS), + zap.Uint64("session", handler.committer.sessionID), + zap.Uint64("txnID", handler.committer.startTS), zap.Stringer("lock", lock), ) logged[lock.TxnID] = struct{}{} @@ -458,9 +468,9 @@ func (action actionPrewrite) extractKeyErrs(c *twoPhaseCommitter, keyErrs []*kvr // Pessimistic transactions don't need such an optimization. If this key needs a pessimistic lock, // TiKV will return a PessimisticLockNotFound error directly if it encounters a different lock. Otherwise, // TiKV returns lock.TTL = 0, and we still need to resolve the lock. - if lock.TxnID > c.startTS && !c.isPessimistic { + if lock.TxnID > handler.committer.startTS && !handler.committer.isPessimistic { return nil, tikverr.NewErrWriteConflictWithArgs( - c.startTS, + handler.committer.startTS, lock.TxnID, 0, lock.Key, @@ -472,20 +482,28 @@ func (action actionPrewrite) extractKeyErrs(c *twoPhaseCommitter, keyErrs []*kvr return locks, nil } -func (action actionPrewrite) resolveLocks(c *twoPhaseCommitter, bo *retry.Backoffer, locks []*txnlock.Lock) error { +// resolveLocks resolves locks. +func (handler *prewrite1BatchReqHandler) resolveLocks(locks []*txnlock.Lock) error { + if handler.resolvingRecordToken == nil { + token := handler.committer.store.GetLockResolver().RecordResolvingLocks(locks, handler.committer.startTS) + handler.resolvingRecordToken = &token + } else { + handler.committer.store.GetLockResolver().UpdateResolvingLocks(locks, handler.committer.startTS, *handler.resolvingRecordToken) + } + resolveLockOpts := txnlock.ResolveLocksOptions{ - CallerStartTS: c.startTS, + CallerStartTS: handler.committer.startTS, Locks: locks, - Detail: &c.getDetail().ResolveLock, + Detail: &handler.committer.getDetail().ResolveLock, PessimisticRegionResolve: true, } - resolveLockRes, err := c.store.GetLockResolver().ResolveLocksWithOpts(bo, resolveLockOpts) + resolveLockRes, err := handler.committer.store.GetLockResolver().ResolveLocksWithOpts(handler.bo, resolveLockOpts) if err != nil { return err } msBeforeExpired := resolveLockRes.TTL if msBeforeExpired > 0 { - err = bo.BackoffWithCfgAndMaxSleep( + err = handler.bo.BackoffWithCfgAndMaxSleep( retry.BoTxnLock, int(msBeforeExpired), errors.Errorf("2PC prewrite lockedKeys: %d", len(locks)), @@ -497,50 +515,79 @@ func (action actionPrewrite) resolveLocks(c *twoPhaseCommitter, bo *retry.Backof return nil } -// handleSingleBatchFailpoint is used to inject errors for test. -func (action actionPrewrite) handleSingleBatchFailpoint(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error { - if c.sessionID <= 0 { - return nil +// handleSingleBatchSucceed handles the response when the prewrite request is successful. +func (handler *prewrite1BatchReqHandler) handleSingleBatchSucceed(prewriteResp *kvrpcpb.PrewriteResponse) error { + // Clear the RPC Error since the request is evaluated successfully. + handler.sender.SetRPCError(nil) + + // Update CommitDetails + reqDuration := time.Since(handler.begin) + handler.committer.getDetail().MergePrewriteReqDetails( + reqDuration, + handler.batch.region.GetID(), + handler.sender.GetStoreAddr(), + prewriteResp.ExecDetailsV2, + ) + + if handler.batch.isPrimary { + // After writing the primary key, if the size of the transaction is larger than 32M, + // start the ttlManager. The ttlManager will be closed in tikvTxn.Commit(). + // In this case 1PC is not expected to be used, but still check it for safety. + if int64(handler.committer.txnSize) > config.GetGlobalConfig().TiKVClient.TTLRefreshedTxnSize && + prewriteResp.OnePcCommitTs == 0 { + handler.committer.run(handler.committer, nil, false) + } } - if batch.isPrimary { - if _, err := util.EvalFailpoint("prewritePrimaryFail"); err == nil { - // Delay to avoid cancelling other normally ongoing prewrite requests. - time.Sleep(time.Millisecond * 50) - logutil.Logger(bo.GetCtx()).Info( - "[failpoint] injected error on prewriting primary batch", - zap.Uint64("txnStartTS", c.startTS), + if handler.committer.isOnePC() { + if prewriteResp.OnePcCommitTs == 0 { + if prewriteResp.MinCommitTs != 0 { + return errors.New("MinCommitTs must be 0 when 1pc falls back to 2pc") + } + logutil.Logger(handler.bo.GetCtx()).Warn( + "1pc failed and fallbacks to normal commit procedure", + zap.Uint64("startTS", handler.committer.startTS), ) - return errors.New("injected error on prewriting primary batch") + metrics.OnePCTxnCounterFallback.Inc() + handler.committer.setOnePC(false) + handler.committer.setAsyncCommit(false) + } else { + // For 1PC, there's no racing to access `onePCCommitTS` so it's safe + // not to lock the mutex. + if handler.committer.onePCCommitTS != 0 { + logutil.Logger(handler.bo.GetCtx()).Fatal( + "one pc happened multiple times", + zap.Uint64("startTS", handler.committer.startTS), + ) + } + handler.committer.onePCCommitTS = prewriteResp.OnePcCommitTs } - util.EvalFailpoint("prewritePrimary") // for other failures like sleep or pause return nil - } - - if _, err := util.EvalFailpoint("prewriteSecondaryFail"); err == nil { - // Delay to avoid cancelling other normally ongoing prewrite requests. - time.Sleep(time.Millisecond * 50) - logutil.Logger(bo.GetCtx()).Info( - "[failpoint] injected error on prewriting secondary batch", - zap.Uint64("txnStartTS", c.startTS), + } else if prewriteResp.OnePcCommitTs != 0 { + logutil.Logger(handler.bo.GetCtx()).Fatal( + "tikv committed a non-1pc transaction with 1pc protocol", + zap.Uint64("startTS", handler.committer.startTS), ) - return errors.New("injected error on prewriting secondary batch") } - util.EvalFailpoint("prewriteSecondary") // for other failures like sleep or pause - // concurrent failpoint sleep doesn't work as expected. So we need a separate fail point. - // `1*sleep()` can block multiple concurrent threads that meet the failpoint. - if val, err := util.EvalFailpoint("prewriteSecondarySleep"); err == nil { - time.Sleep(time.Millisecond * time.Duration(val.(int))) + if handler.committer.isAsyncCommit() { + // 0 if the min_commit_ts is not ready or any other reason that async + // commit cannot proceed. The client can then fallback to normal way to + // continue committing the transaction if prewrite are all finished. + if prewriteResp.MinCommitTs == 0 { + if handler.committer.testingKnobs.noFallBack { + return nil + } + logutil.Logger(handler.bo.GetCtx()).Warn( + "async commit cannot proceed since the returned minCommitTS is zero, "+ + "fallback to normal path", zap.Uint64("startTS", handler.committer.startTS), + ) + handler.committer.setAsyncCommit(false) + } else { + handler.committer.mu.Lock() + if prewriteResp.MinCommitTs > handler.committer.minCommitTSMgr.get() { + handler.committer.minCommitTSMgr.tryUpdate(prewriteResp.MinCommitTs, twoPCAccess) + } + handler.committer.mu.Unlock() + } } return nil } - -func (c *twoPhaseCommitter) prewriteMutations(bo *retry.Backoffer, mutations CommitterMutations) error { - if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("twoPhaseCommitter.prewriteMutations", opentracing.ChildOf(span.Context())) - defer span1.Finish() - bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) - } - - // `doActionOnMutations` will unset `useOnePC` if the mutations is splitted into multiple batches. - return c.doActionOnMutations(bo, actionPrewrite{isInternal: c.txn.isInternal()}, mutations) -}