Skip to content

Commit

Permalink
check commit ts in oracle
Browse files Browse the repository at this point in the history
Signed-off-by: you06 <[email protected]>
  • Loading branch information
you06 committed Dec 25, 2024
1 parent 8365dcf commit 000fc5f
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 39 deletions.
4 changes: 2 additions & 2 deletions integration_tests/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import (
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv"
"github.com/tikv/client-go/v2/txnkv/transaction"
"github.com/tikv/client-go/v2/util"
)

func TestStore(t *testing.T) {
Expand Down Expand Up @@ -196,7 +196,7 @@ func (s *testStoreSuite) TestFailBusyServerKV() {
}

func testUpdateLatestCommitInfo(require *require.Assertions, store tikv.StoreProbe, mode string) {
doTxn := func() *transaction.CommitInfo {
doTxn := func() *util.CommitInfo {
txn, err := store.Begin()
require.Nil(err)
switch mode {
Expand Down
24 changes: 21 additions & 3 deletions oracle/oracles/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/tikv/client-go/v2/internal/logutil"
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/util"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/clients/tso"
"go.uber.org/zap"
Expand Down Expand Up @@ -153,6 +154,9 @@ type pdOracle struct {
// we don't require the ts for validation to be strictly the latest one.
// Note that the result can't be reused for different txnScopes. The txnScope is used as the key.
tsForValidation singleflight.Group

// lastCommitTxnInfo stores the last commit info of the store, the validation of the commit info is checked by every arriving timestamp from PD.
lastCommitTxnInfo *atomic.Pointer[util.CommitInfo]
}

// lastTSO stores the last timestamp oracle gets from PD server and the local time when the TSO is fetched.
Expand All @@ -166,6 +170,8 @@ type PDOracleOptions struct {
UpdateInterval time.Duration
// Disable the background periodic update of the last ts. This is for test purposes only.
NoUpdateTS bool
// LastCommitTxnInfo is the last commit info of the store.
LastCommitTxnInfo *atomic.Pointer[util.CommitInfo]
}

// NewPdOracle create an Oracle that uses a pd client source.
Expand All @@ -182,6 +188,7 @@ func NewPdOracle(pdClient pd.Client, options *PDOracleOptions) (oracle.Oracle, e
c: pdClient,
quit: make(chan struct{}),
lastTSUpdateInterval: atomic.Int64{},
lastCommitTxnInfo: options.LastCommitTxnInfo,
}
o.adaptiveUpdateIntervalState.shrinkIntervalCh = make(chan time.Duration, 1)
o.lastTSUpdateInterval.Store(int64(options.UpdateInterval))
Expand Down Expand Up @@ -228,8 +235,9 @@ func (o *pdOracle) GetAllTSOKeyspaceGroupMinTS(ctx context.Context) (uint64, err

type tsFuture struct {
tso.TSFuture
o *pdOracle
txnScope string
o *pdOracle
txnScope string
toBeVerifyCommitInfo *util.CommitInfo
}

// Wait implements the oracle.Future interface.
Expand All @@ -241,12 +249,22 @@ func (f *tsFuture) Wait() (uint64, error) {
return 0, errors.WithStack(err)
}
ts := oracle.ComposeTS(physical, logical)
if f.toBeVerifyCommitInfo != nil {
if ts < f.toBeVerifyCommitInfo.CommitTS || ts <= f.toBeVerifyCommitInfo.StartTS {
msg := fmt.Sprintf("transaction with invalid ts found, ts: %d, txnInfo: %s", ts, f.toBeVerifyCommitInfo.String())
panic(msg)
}
}
f.o.setLastTS(ts, f.txnScope)
return ts, nil
}

func (o *pdOracle) GetTimestampAsync(ctx context.Context, opt *oracle.Option) oracle.Future {
return &tsFuture{o.c.GetTSAsync(ctx), o, opt.TxnScope}
var commitInfo *util.CommitInfo
if o.lastCommitTxnInfo != nil {
commitInfo = o.lastCommitTxnInfo.Load()
}
return &tsFuture{o.c.GetTSAsync(ctx), o, opt.TxnScope, commitInfo}
}

func (o *pdOracle) getTimestamp(ctx context.Context, txnScope string) (uint64, error) {
Expand Down
44 changes: 23 additions & 21 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ type KVStore struct {
gP Pool

// lastCommitTxnInfo is used to store the commit info of the latest committed transaction.
lastCommitTxnInfo atomic.Pointer[transaction.CommitInfo]
lastCommitTxnInfo atomic.Pointer[util.CommitInfo]
}

var _ Storage = (*KVStore)(nil)
Expand Down Expand Up @@ -264,8 +264,10 @@ func requestHealthFeedbackFromKVClient(ctx context.Context, addr string, tikvCli

// NewKVStore creates a new TiKV store instance.
func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Client, opt ...Option) (*KVStore, error) {
var store KVStore
o, err := oracles.NewPdOracle(pdClient, &oracles.PDOracleOptions{
UpdateInterval: defaultOracleUpdateInterval,
UpdateInterval: defaultOracleUpdateInterval,
LastCommitTxnInfo: &store.lastCommitTxnInfo,
})
if err != nil {
return nil, err
Expand All @@ -274,31 +276,31 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Cl
regionCache := locate.NewRegionCache(pdClient, locate.WithRequestHealthFeedbackCallback(func(ctx context.Context, addr string) error {
return requestHealthFeedbackFromKVClient(ctx, addr, tikvclient)
}))
store := &KVStore{
clusterID: pdClient.GetClusterID(context.TODO()),
uuid: uuid,
oracle: o,
pdClient: pdClient,
regionCache: regionCache,
kv: spkv,
safePoint: 0,
spTime: time.Now(),
replicaReadSeed: rand.Uint32(),
ctx: ctx,
cancel: cancel,
gP: NewSpool(128, 10*time.Second),
}

store.clusterID = pdClient.GetClusterID(context.TODO())
store.uuid = uuid
store.oracle = o
store.pdClient = pdClient
store.regionCache = regionCache
store.kv = spkv
store.safePoint = 0
store.spTime = time.Now()
store.replicaReadSeed = rand.Uint32()
store.ctx = ctx
store.cancel = cancel
store.gP = NewSpool(128, 10*time.Second)

store.clientMu.client = client.NewReqCollapse(client.NewInterceptedClient(tikvclient))
store.clientMu.client.SetEventListener(regionCache.GetClientEventListener())

store.lockResolver = txnlock.NewLockResolver(store)
loadOption(store, opt...)
store.lockResolver = txnlock.NewLockResolver(&store)
loadOption(&store, opt...)

store.wg.Add(2)
go store.runSafePointChecker()
go store.safeTSUpdater()

return store, nil
return &store, nil
}

// NewPDClient returns an unwrapped pd client.
Expand Down Expand Up @@ -862,7 +864,7 @@ func (s *KVStore) updateGlobalTxnScopeTSFromPD(ctx context.Context) bool {
}

// SetLastCommitInfo sets the last committed transaction's information.
func (s *KVStore) SetLastCommitInfo(ci *transaction.CommitInfo) {
func (s *KVStore) SetLastCommitInfo(ci *util.CommitInfo) {
for {
old := s.lastCommitTxnInfo.Load()
if old != nil && old.CommitTS > ci.CommitTS {
Expand All @@ -875,7 +877,7 @@ func (s *KVStore) SetLastCommitInfo(ci *transaction.CommitInfo) {
}

// GetLastCommitInfo get the last committed transaction's information.
func (s *KVStore) GetLastCommitInfo() *transaction.CommitInfo {
func (s *KVStore) GetLastCommitInfo() *util.CommitInfo {
return s.lastCommitTxnInfo.Load()
}

Expand Down
16 changes: 4 additions & 12 deletions txnkv/transaction/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2376,7 +2376,7 @@ func (c *twoPhaseCommitter) mutationsOfKeys(keys [][]byte) CommitterMutations {
return &res
}

func (c *twoPhaseCommitter) getCommitInfo() *CommitInfo {
func (c *twoPhaseCommitter) getCommitInfo() *util.CommitInfo {
var txnType string
if c.isAsyncCommit() {
txnType = "async"
Expand All @@ -2385,7 +2385,7 @@ func (c *twoPhaseCommitter) getCommitInfo() *CommitInfo {
} else {
txnType = "2pc"
}
return &CommitInfo{
return &util.CommitInfo{
TxnType: txnType,
StartTS: c.startTS,
CommitTS: atomic.LoadUint64(&c.commitTS),
Expand All @@ -2399,16 +2399,8 @@ func (c *twoPhaseCommitter) updateStoreCommitInfo() {
c.store.SetLastCommitInfo(c.getCommitInfo())
}

type CommitInfo struct {
TxnType string
StartTS uint64
CommitTS uint64
MutationLen int
TxnSize int
}

type storeCommitInfo interface {
SetLastCommitInfo(*CommitInfo)
SetLastCommitInfo(*util.CommitInfo)
// GetLastCommitInfo get the last committed transaction's information.
GetLastCommitInfo() *CommitInfo
GetLastCommitInfo() *util.CommitInfo
}
3 changes: 2 additions & 1 deletion txnkv/transaction/test_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
"github.com/tikv/client-go/v2/util"
)

// TxnProbe wraps a txn and exports internal states for testing purpose.
Expand Down Expand Up @@ -380,7 +381,7 @@ func (c CommitterProbe) ResolveFlushedLocks(bo *retry.Backoffer, start, end []by
}

// GetCommitInfo expose CommitInfo of committer for testing purpose.
func (c CommitterProbe) GetCommitInfo() *CommitInfo {
func (c CommitterProbe) GetCommitInfo() *util.CommitInfo {
return c.getCommitInfo()
}

Expand Down
13 changes: 13 additions & 0 deletions util/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,3 +226,16 @@ func None[T interface{}]() Option[T] {
func (o Option[T]) Inner() *T {
return o.inner
}

type CommitInfo struct {
TxnType string
StartTS uint64
CommitTS uint64
MutationLen int
TxnSize int
}

func (c *CommitInfo) String() string {
return fmt.Sprintf("TxnType: %s, StartTS: %d, CommitTS: %d, MutationLen: %d, TxnSize: %d",
c.TxnType, c.StartTS, c.CommitTS, c.MutationLen, c.TxnSize)
}

0 comments on commit 000fc5f

Please sign in to comment.