diff --git a/integration_tests/2pc_test.go b/integration_tests/2pc_test.go index fec8cfb6e5..fdad3b2969 100644 --- a/integration_tests/2pc_test.go +++ b/integration_tests/2pc_test.go @@ -1520,6 +1520,200 @@ func (s *testCommitterSuite) TestAggressiveLockingExitIfInapplicable() { s.NoError(txn.Rollback()) } +func (s *testCommitterSuite) TestAggressiveLockingResetTTLManager() { + // Not blocked + txn := s.begin() + txn.SetPessimistic(true) + txn.StartAggressiveLocking() + s.True(txn.IsInAggressiveLockingMode()) + s.True(txn.GetCommitter().IsNil()) + + lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1"))) + s.True(txn.GetCommitter().IsTTLRunning()) + + txn.CancelAggressiveLocking(context.Background()) + s.False(txn.IsInAggressiveLockingMode()) + s.False(txn.GetCommitter().IsTTLRunning()) + + // End the transaction to test the next case. + s.NoError(txn.Rollback()) + + // txn blocked by txn2 + txn = s.begin() + txn.SetPessimistic(true) + txn.StartAggressiveLocking() + s.True(txn.IsInAggressiveLockingMode()) + s.True(txn.GetCommitter().IsNil()) + + txn2 := s.begin() + txn2.SetPessimistic(true) + lockCtx2 := &kv.LockCtx{ForUpdateTS: txn2.StartTS(), WaitStartTime: time.Now()} + s.NoError(txn2.LockKeys(context.Background(), lockCtx2, []byte("k1"))) + + lockResCh := make(chan error) + go func() { + lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} + lockResCh <- txn.LockKeys(context.Background(), lockCtx, []byte("k1")) + }() + // No immediate result as blocked by txn2 + select { + case <-time.After(time.Millisecond * 100): + case err := <-lockResCh: + s.FailNowf("get lock result when expected to be blocked", "error: %+v", err) + } + + s.NoError(txn2.Set([]byte("k1"), []byte("v1"))) + s.NoError(txn2.Commit(context.Background())) + + // txn is resumed + select { + case <-time.After(time.Second): + s.FailNow("txn not resumed after blocker is committed") + case err := <-lockResCh: + s.NoError(err) + } + + s.Equal(txn2.CommitTS(), lockCtx.MaxLockedWithConflictTS) + s.Greater(lockCtx.MaxLockedWithConflictTS, txn.StartTS()) + + s.True(txn.GetCommitter().IsTTLRunning()) + + txn.RetryAggressiveLocking(context.Background()) + s.True(txn.GetCommitter().IsTTLRunning()) + + // Get a new ts as the new forUpdateTS. + forUpdateTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + s.NoError(err) + lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1"))) + s.True(txn.GetCommitter().IsTTLRunning()) + + txn.CancelAggressiveLocking(context.Background()) + s.False(txn.GetCommitter().IsTTLRunning()) + s.Zero(txn.GetLockedCount()) + + // End the test. + s.NoError(txn.Rollback()) +} + +type aggressiveLockingExitPhase int + +const ( + exitOnEnterAggressiveLocking aggressiveLockingExitPhase = iota + exitOnFirstLockKeys + exitOnRetry + exitOnSecondLockKeys +) + +func (s *testCommitterSuite) testAggressiveLockingResetPrimaryAndTTLManagerAfterExitImpl(done bool, exitPhase aggressiveLockingExitPhase, retryDifferentKey bool) { + s.T().Logf("testing subcase, done-or-cancel: %v, exitPhase: %v, retryDifferentKey: %v", done, exitPhase, retryDifferentKey) + txn := s.begin() + txn.SetPessimistic(true) + txn.StartAggressiveLocking() + s.True(txn.IsInAggressiveLockingMode()) + s.True(txn.GetCommitter().IsNil()) + defer func() { + s.NoError(txn.Rollback()) + }() + + if exitPhase == exitOnEnterAggressiveLocking { + if done { + txn.DoneAggressiveLocking(context.Background()) + } else { + txn.CancelAggressiveLocking(context.Background()) + } + s.False(txn.IsInAggressiveLockingMode()) + s.Zero(txn.GetLockedCount()) + s.True(txn.GetCommitter().IsNil()) + return + } + + lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1"))) + s.True(txn.GetCommitter().IsTTLRunning()) + s.Equal(1, txn.GetLockedCount()) + + if exitPhase == exitOnFirstLockKeys { + if done { + txn.DoneAggressiveLocking(context.Background()) + s.True(txn.GetCommitter().IsTTLRunning()) + s.Equal(1, txn.GetLockedCount()) + } else { + txn.CancelAggressiveLocking(context.Background()) + s.False(txn.GetCommitter().IsTTLRunning()) + s.Zero(txn.GetLockedCount()) + } + s.False(txn.IsInAggressiveLockingMode()) + return + } + + txn.RetryAggressiveLocking(context.Background()) + + if exitPhase == exitOnRetry { + if done { + txn.DoneAggressiveLocking(context.Background()) + } else { + txn.CancelAggressiveLocking(context.Background()) + } + s.False(txn.IsInAggressiveLockingMode()) + s.Zero(txn.GetLockedCount()) + s.False(txn.GetCommitter().IsTTLRunning()) + return + } + + forUpdateTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + s.NoError(err) + lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} + key := []byte("k1") + if retryDifferentKey { + key = []byte("k2") + } + s.NoError(txn.LockKeys(context.Background(), lockCtx, key)) + s.True(txn.GetCommitter().IsTTLRunning()) + s.Equal(key, txn.GetCommitter().GetPrimaryKey()) + expectedLockCount := 1 + if retryDifferentKey { + // When lock k2 during retry, the previously-locked k1 is not immediately released, and it will be released when + // the fair locking state switches (either retry/cancel/done). + expectedLockCount = 2 + } + s.Equal(expectedLockCount, txn.GetLockedCount()) + + if exitPhase == exitOnSecondLockKeys { + if done { + txn.DoneAggressiveLocking(context.Background()) + s.True(txn.GetCommitter().IsTTLRunning()) + s.Equal(1, txn.GetLockedCount()) + } else { + txn.CancelAggressiveLocking(context.Background()) + s.False(txn.GetCommitter().IsTTLRunning()) + s.Zero(txn.GetLockedCount()) + } + s.False(txn.IsInAggressiveLockingMode()) + return + } + + s.FailNow("unreachable") +} + +func (s *testCommitterSuite) TestAggressiveLockingResetPrimaryAndTTLManagerAfterExit() { + // Done or cancel + for _, done := range []bool{false, true} { + // Iterate exiting phase + for _, exitPhase := range []aggressiveLockingExitPhase{ + exitOnEnterAggressiveLocking, + exitOnFirstLockKeys, + exitOnRetry, + exitOnSecondLockKeys, + } { + for _, retryDifferentKey := range []bool{false, true} { + s.testAggressiveLockingResetPrimaryAndTTLManagerAfterExitImpl(done, exitPhase, retryDifferentKey) + } + } + } +} + // TestElapsedTTL tests that elapsed time is correct even if ts physical time is greater than local time. func (s *testCommitterSuite) TestElapsedTTL() { key := []byte("key") diff --git a/txnkv/transaction/test_probe.go b/txnkv/transaction/test_probe.go index 7e0a582e01..d3b049f9cf 100644 --- a/txnkv/transaction/test_probe.go +++ b/txnkv/transaction/test_probe.go @@ -152,6 +152,11 @@ type CommitterProbe struct { *twoPhaseCommitter } +// IsNil returns whether tie internal twoPhaseCommitter is nil. +func (c CommitterProbe) IsNil() bool { + return c.twoPhaseCommitter == nil +} + // InitKeysAndMutations prepares the committer for commit. func (c CommitterProbe) InitKeysAndMutations() error { return c.initKeysAndMutations(context.Background()) diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index aeebceb1c9..daccbe087c 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -1006,8 +1006,12 @@ func (txn *KVTxn) RetryAggressiveLocking(ctx context.Context) { } txn.cleanupAggressiveLockingRedundantLocks(ctx) if txn.aggressiveLockingContext.assignedPrimaryKey { - txn.resetPrimary() txn.aggressiveLockingContext.assignedPrimaryKey = false + txn.aggressiveLockingContext.lastAssignedPrimaryKey = true + // Do not reset the ttlManager immediately. Instead, we + // will handle the case specially (see KVTxn.resetTTLManagerForAggressiveLockingMode). + // See: https://github.com/pingcap/tidb/issues/58279 + txn.resetPrimary(true) } txn.aggressiveLockingContext.lastPrimaryKey = txn.aggressiveLockingContext.primaryKey @@ -1039,8 +1043,8 @@ func (txn *KVTxn) CancelAggressiveLocking(ctx context.Context) { }() txn.cleanupAggressiveLockingRedundantLocks(context.Background()) - if txn.aggressiveLockingContext.assignedPrimaryKey { - txn.resetPrimary() + if txn.aggressiveLockingContext.assignedPrimaryKey || txn.aggressiveLockingContext.lastAssignedPrimaryKey { + txn.resetPrimary(false) txn.aggressiveLockingContext.assignedPrimaryKey = false } @@ -1075,6 +1079,12 @@ func (txn *KVTxn) DoneAggressiveLocking(ctx context.Context) { txn.aggressiveLockingContext = nil }() + // If finally no key locked and ttlManager is just started during the current fair locking procedure, reset the + // ttlManager as no key will be the primary. + if txn.aggressiveLockingContext.lastAssignedPrimaryKey && !txn.aggressiveLockingContext.assignedPrimaryKey { + txn.committer.ttlManager.reset() + } + txn.cleanupAggressiveLockingRedundantLocks(context.Background()) if txn.forUpdateTSChecks == nil { @@ -1347,7 +1357,7 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func() } // It can't transform LockOnlyIfExists mode to normal mode. If so, it can add a lock to a key // which doesn't exist in tikv. TiDB should ensure that primary key must be set when it sends - // a LockOnlyIfExists pessmistic lock request. + // a LockOnlyIfExists pessimistic lock request. if (txn.committer == nil || txn.committer.primaryKey == nil) && len(keys) > 1 { return &tikverr.ErrLockOnlyIfExistsNoPrimaryKey{ StartTS: txn.startTS, @@ -1400,6 +1410,8 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func() metrics.AggressiveLockedKeysDerived.Add(float64(filteredAggressiveLockedKeysCount)) metrics.AggressiveLockedKeysNew.Add(float64(len(keys))) + txn.resetTTLManagerForAggressiveLockingMode(len(keys) != 0) + if len(keys) == 0 { if lockCtx.Stats != nil { txn.collectAggressiveLockingStats(lockCtx, 0, 0, filteredAggressiveLockedKeysCount, lockWakeUpMode) @@ -1484,7 +1496,7 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func() } if assignedPrimaryKey { // unset the primary key and stop heartbeat if we assigned primary key when failed to lock it. - txn.resetPrimary() + txn.resetPrimary(false) } return err } @@ -1559,9 +1571,37 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func() return nil } -func (txn *KVTxn) resetPrimary() { +// resetPrimary resets the primary. It's used when the first LockKeys call in a transaction is failed, or need to be +// rolled back for some reason (e.g. TiDB may perform statement rollback), in which case the primary will be unlocked +// another key may be chosen as the new primary. +func (txn *KVTxn) resetPrimary(keepTTLManager bool) { txn.committer.primaryKey = nil - txn.committer.ttlManager.reset() + if !keepTTLManager { + txn.committer.ttlManager.reset() + } +} + +// resetTTLManagerForAggressiveLockingMode is used in a fair locking procedure to reset the ttlManager when necessary. +// This function is only used during the LockKeys invocation, and the parameter hasNewLockToAcquire indicates whether +// there are any key needs to be locked in the current LockKeys call, after filtering out those that has already been +// locked before the most recent RetryAggressiveLocking. +// Also note that this function is not the only path that fair locking resets the ttlManager. DoneAggressiveLocking may +// also stop the ttlManager if no key is locked after exiting. +func (txn *KVTxn) resetTTLManagerForAggressiveLockingMode(hasNewLockToAcquire bool) { + if !txn.IsInAggressiveLockingMode() { + // Not supposed to be called in a non fair locking context + return + } + // * When there's no new lock to acquire, assume the primary key is not changed in this case. Keep the ttlManager + // running. + // * When there is key to write: + // * If the primary key is not changed, also keep the ttlManager running. Then, when sending the + // acquirePessimisticLock requests, it will call ttlManager.run() again, but it's reentrant and will do nothing + // as the ttlManager is already running. + // * If the primary key is changed, the ttlManager will need to run on the new primary key instead. Reset it. + if hasNewLockToAcquire && !bytes.Equal(txn.aggressiveLockingContext.primaryKey, txn.aggressiveLockingContext.lastPrimaryKey) { + txn.committer.ttlManager.reset() + } } func (txn *KVTxn) selectPrimaryForPessimisticLock(sortedKeys [][]byte) { @@ -1595,6 +1635,7 @@ func (txn *KVTxn) selectPrimaryForPessimisticLock(sortedKeys [][]byte) { type aggressiveLockingContext struct { lastRetryUnnecessaryLocks map[string]tempLockBufferEntry lastPrimaryKey []byte + lastAssignedPrimaryKey bool lastAttemptStartTime time.Time currentLockedKeys map[string]tempLockBufferEntry