Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the problem that ttlManager may stop working if no need to lock after retry aggressive locking #1522

Merged
merged 4 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 194 additions & 0 deletions integration_tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1520,6 +1520,200 @@ func (s *testCommitterSuite) TestAggressiveLockingExitIfInapplicable() {
s.NoError(txn.Rollback())
}

func (s *testCommitterSuite) TestAggressiveLockingResetTTLManager() {
// Not blocked
txn := s.begin()
txn.SetPessimistic(true)
txn.StartAggressiveLocking()
s.True(txn.IsInAggressiveLockingMode())
s.True(txn.GetCommitter().IsNil())

lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1")))
s.True(txn.GetCommitter().IsTTLRunning())

txn.CancelAggressiveLocking(context.Background())
s.False(txn.IsInAggressiveLockingMode())
s.False(txn.GetCommitter().IsTTLRunning())

// End the transaction to test the next case.
s.NoError(txn.Rollback())

// txn blocked by txn2
txn = s.begin()
txn.SetPessimistic(true)
txn.StartAggressiveLocking()
s.True(txn.IsInAggressiveLockingMode())
s.True(txn.GetCommitter().IsNil())

txn2 := s.begin()
txn2.SetPessimistic(true)
lockCtx2 := &kv.LockCtx{ForUpdateTS: txn2.StartTS(), WaitStartTime: time.Now()}
s.NoError(txn2.LockKeys(context.Background(), lockCtx2, []byte("k1")))

lockResCh := make(chan error)
go func() {
lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
lockResCh <- txn.LockKeys(context.Background(), lockCtx, []byte("k1"))
}()
// No immediate result as blocked by txn2
select {
case <-time.After(time.Millisecond * 100):
case err := <-lockResCh:
s.FailNowf("get lock result when expected to be blocked", "error: %+v", err)
}

s.NoError(txn2.Set([]byte("k1"), []byte("v1")))
s.NoError(txn2.Commit(context.Background()))

// txn is resumed
select {
case <-time.After(time.Second):
s.FailNow("txn not resumed after blocker is committed")
case err := <-lockResCh:
s.NoError(err)
}

s.Equal(txn2.CommitTS(), lockCtx.MaxLockedWithConflictTS)
s.Greater(lockCtx.MaxLockedWithConflictTS, txn.StartTS())

s.True(txn.GetCommitter().IsTTLRunning())

txn.RetryAggressiveLocking(context.Background())
s.True(txn.GetCommitter().IsTTLRunning())

// Get a new ts as the new forUpdateTS.
forUpdateTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
s.NoError(err)
lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()}
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1")))
s.True(txn.GetCommitter().IsTTLRunning())

txn.CancelAggressiveLocking(context.Background())
s.False(txn.GetCommitter().IsTTLRunning())
s.Zero(txn.GetLockedCount())

// End the test.
s.NoError(txn.Rollback())
}

type aggressiveLockingExitPhase int

const (
exitOnEnterAggressiveLocking aggressiveLockingExitPhase = iota
exitOnFirstLockKeys
exitOnRetry
exitOnSecondLockKeys
)

func (s *testCommitterSuite) testAggressiveLockingResetPrimaryAndTTLManagerAfterExitImpl(done bool, exitPhase aggressiveLockingExitPhase, retryDifferentKey bool) {
s.T().Logf("testing subcase, done-or-cancel: %v, exitPhase: %v, retryDifferentKey: %v", done, exitPhase, retryDifferentKey)
txn := s.begin()
txn.SetPessimistic(true)
txn.StartAggressiveLocking()
s.True(txn.IsInAggressiveLockingMode())
s.True(txn.GetCommitter().IsNil())
defer func() {
s.NoError(txn.Rollback())
}()

if exitPhase == exitOnEnterAggressiveLocking {
if done {
txn.DoneAggressiveLocking(context.Background())
} else {
txn.CancelAggressiveLocking(context.Background())
}
s.False(txn.IsInAggressiveLockingMode())
s.Zero(txn.GetLockedCount())
s.True(txn.GetCommitter().IsNil())
return
}

lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1")))
s.True(txn.GetCommitter().IsTTLRunning())
s.Equal(1, txn.GetLockedCount())

if exitPhase == exitOnFirstLockKeys {
if done {
txn.DoneAggressiveLocking(context.Background())
s.True(txn.GetCommitter().IsTTLRunning())
s.Equal(1, txn.GetLockedCount())
} else {
txn.CancelAggressiveLocking(context.Background())
s.False(txn.GetCommitter().IsTTLRunning())
s.Zero(txn.GetLockedCount())
}
s.False(txn.IsInAggressiveLockingMode())
return
}

txn.RetryAggressiveLocking(context.Background())

if exitPhase == exitOnRetry {
if done {
txn.DoneAggressiveLocking(context.Background())
} else {
txn.CancelAggressiveLocking(context.Background())
}
s.False(txn.IsInAggressiveLockingMode())
s.Zero(txn.GetLockedCount())
s.False(txn.GetCommitter().IsTTLRunning())
return
}

forUpdateTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
s.NoError(err)
lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()}
key := []byte("k1")
if retryDifferentKey {
key = []byte("k2")
}
s.NoError(txn.LockKeys(context.Background(), lockCtx, key))
s.True(txn.GetCommitter().IsTTLRunning())
s.Equal(key, txn.GetCommitter().GetPrimaryKey())
expectedLockCount := 1
if retryDifferentKey {
// When lock k2 during retry, the previously-locked k1 is not immediately released, and it will be released when
// the fair locking state switches (either retry/cancel/done).
expectedLockCount = 2
}
s.Equal(expectedLockCount, txn.GetLockedCount())

if exitPhase == exitOnSecondLockKeys {
if done {
txn.DoneAggressiveLocking(context.Background())
s.True(txn.GetCommitter().IsTTLRunning())
s.Equal(1, txn.GetLockedCount())
} else {
txn.CancelAggressiveLocking(context.Background())
s.False(txn.GetCommitter().IsTTLRunning())
s.Zero(txn.GetLockedCount())
}
s.False(txn.IsInAggressiveLockingMode())
return
}

s.FailNow("unreachable")
}

func (s *testCommitterSuite) TestAggressiveLockingResetPrimaryAndTTLManagerAfterExit() {
// Done or cancel
for _, done := range []bool{false, true} {
// Iterate exiting phase
for _, exitPhase := range []aggressiveLockingExitPhase{
exitOnEnterAggressiveLocking,
exitOnFirstLockKeys,
exitOnRetry,
exitOnSecondLockKeys,
} {
for _, retryDifferentKey := range []bool{false, true} {
s.testAggressiveLockingResetPrimaryAndTTLManagerAfterExitImpl(done, exitPhase, retryDifferentKey)
}
}
}
}

// TestElapsedTTL tests that elapsed time is correct even if ts physical time is greater than local time.
func (s *testCommitterSuite) TestElapsedTTL() {
key := []byte("key")
Expand Down
5 changes: 5 additions & 0 deletions txnkv/transaction/test_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ type CommitterProbe struct {
*twoPhaseCommitter
}

// IsNil returns whether tie internal twoPhaseCommitter is nil.
func (c CommitterProbe) IsNil() bool {
return c.twoPhaseCommitter == nil
}

// InitKeysAndMutations prepares the committer for commit.
func (c CommitterProbe) InitKeysAndMutations() error {
return c.initKeysAndMutations(context.Background())
Expand Down
55 changes: 48 additions & 7 deletions txnkv/transaction/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1006,8 +1006,12 @@ func (txn *KVTxn) RetryAggressiveLocking(ctx context.Context) {
}
txn.cleanupAggressiveLockingRedundantLocks(ctx)
if txn.aggressiveLockingContext.assignedPrimaryKey {
txn.resetPrimary()
txn.aggressiveLockingContext.assignedPrimaryKey = false
txn.aggressiveLockingContext.lastAssignedPrimaryKey = true
// Do not reset the ttlManager immediately. Instead, we
// will handle the case specially (see KVTxn.resetTTLManagerForAggressiveLockingMode).
// See: https://github.com/pingcap/tidb/issues/58279
txn.resetPrimary(true)
}

txn.aggressiveLockingContext.lastPrimaryKey = txn.aggressiveLockingContext.primaryKey
Expand Down Expand Up @@ -1039,8 +1043,8 @@ func (txn *KVTxn) CancelAggressiveLocking(ctx context.Context) {
}()

txn.cleanupAggressiveLockingRedundantLocks(context.Background())
if txn.aggressiveLockingContext.assignedPrimaryKey {
txn.resetPrimary()
if txn.aggressiveLockingContext.assignedPrimaryKey || txn.aggressiveLockingContext.lastAssignedPrimaryKey {
txn.resetPrimary(false)
txn.aggressiveLockingContext.assignedPrimaryKey = false
}

Expand Down Expand Up @@ -1075,6 +1079,12 @@ func (txn *KVTxn) DoneAggressiveLocking(ctx context.Context) {
txn.aggressiveLockingContext = nil
}()

// If finally no key locked and ttlManager is just started during the current fair locking procedure, reset the
// ttlManager as no key will be the primary.
if txn.aggressiveLockingContext.lastAssignedPrimaryKey && !txn.aggressiveLockingContext.assignedPrimaryKey {
txn.committer.ttlManager.reset()
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
}

txn.cleanupAggressiveLockingRedundantLocks(context.Background())

if txn.forUpdateTSChecks == nil {
Expand Down Expand Up @@ -1347,7 +1357,7 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func()
}
// It can't transform LockOnlyIfExists mode to normal mode. If so, it can add a lock to a key
// which doesn't exist in tikv. TiDB should ensure that primary key must be set when it sends
// a LockOnlyIfExists pessmistic lock request.
// a LockOnlyIfExists pessimistic lock request.
if (txn.committer == nil || txn.committer.primaryKey == nil) && len(keys) > 1 {
return &tikverr.ErrLockOnlyIfExistsNoPrimaryKey{
StartTS: txn.startTS,
Expand Down Expand Up @@ -1400,6 +1410,8 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func()
metrics.AggressiveLockedKeysDerived.Add(float64(filteredAggressiveLockedKeysCount))
metrics.AggressiveLockedKeysNew.Add(float64(len(keys)))

txn.resetTTLManagerForAggressiveLockingMode(len(keys) == 0)

if len(keys) == 0 {
if lockCtx.Stats != nil {
txn.collectAggressiveLockingStats(lockCtx, 0, 0, filteredAggressiveLockedKeysCount, lockWakeUpMode)
Expand Down Expand Up @@ -1484,7 +1496,7 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func()
}
if assignedPrimaryKey {
// unset the primary key and stop heartbeat if we assigned primary key when failed to lock it.
txn.resetPrimary()
txn.resetPrimary(false)
}
return err
}
Expand Down Expand Up @@ -1559,9 +1571,37 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func()
return nil
}

func (txn *KVTxn) resetPrimary() {
// resetPrimary resets the primary. It's used when the first LockKeys call in a transaction is failed, or need to be
// rolled back for some reason (e.g. TiDB may perform statement rollback), in which case the primary will be unlocked
// another key may be chosen as the new primary.
func (txn *KVTxn) resetPrimary(keepTTLManager bool) {
txn.committer.primaryKey = nil
txn.committer.ttlManager.reset()
if !keepTTLManager {
txn.committer.ttlManager.reset()
}
}

// resetTTLManagerForAggressiveLockingMode is used in a fair locking procedure to reset the ttlManager when necessary.
// This function is only used during the LockKeys invocation, and the parameter noNewLockToAcquire indicates whether
// there are any key needs to be locked in the current LockKeys call, after filtering out those that has already been
// locked before the most recent RetryAggressiveLocking.
// Also note that this function is not the only path that fair locking resets the ttlManager. DoneAggressiveLocking may
// also stop the ttlManager if no key is locked after exiting.
func (txn *KVTxn) resetTTLManagerForAggressiveLockingMode(noNewLockToAcquire bool) {
if !txn.IsInAggressiveLockingMode() {
// Not supposed to be called in a non fair locking context
return
}
// * When there's no new lock to acquire, assume the primary key is not changed in this case. Keep the ttlManager
// running.
// * When there is key to write:
// * If the primary key is not changed, also keep the ttlManager running. Then, when sending the
// acquirePessimisticLock requests, it will call ttlManager.run() again, but it's reentrant and will do nothing
// as the ttlManager is already running.
// * If the primary key is changed, the ttlManager will need to run on the new primary key instead. Reset it.
if !noNewLockToAcquire && !bytes.Equal(txn.aggressiveLockingContext.primaryKey, txn.aggressiveLockingContext.lastPrimaryKey) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe replacing !noNewLockToAcquire with hasNewLockToAcquire is more idiomatic and makes the code more readable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

emmmm, ok

txn.committer.ttlManager.reset()
}
}

func (txn *KVTxn) selectPrimaryForPessimisticLock(sortedKeys [][]byte) {
Expand Down Expand Up @@ -1595,6 +1635,7 @@ func (txn *KVTxn) selectPrimaryForPessimisticLock(sortedKeys [][]byte) {
type aggressiveLockingContext struct {
lastRetryUnnecessaryLocks map[string]tempLockBufferEntry
lastPrimaryKey []byte
lastAssignedPrimaryKey bool
lastAttemptStartTime time.Time

currentLockedKeys map[string]tempLockBufferEntry
Expand Down
Loading