From d71002b71dc9995127eb3a45c00f815883f60386 Mon Sep 17 00:00:00 2001 From: you06 Date: Fri, 20 Dec 2024 14:47:30 +0900 Subject: [PATCH 1/2] setLastTS always push tso Signed-off-by: you06 --- oracle/oracles/pd.go | 11 ++++++++++- oracle/oracles/pd_test.go | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/oracle/oracles/pd.go b/oracle/oracles/pd.go index b5b0c5f65c..bd55ae6cf9 100644 --- a/oracle/oracles/pd.go +++ b/oracle/oracles/pd.go @@ -309,9 +309,12 @@ func (o *pdOracle) setLastTS(ts uint64, txnScope string) { lastTSPointer := lastTSInterface.(*atomic.Pointer[lastTSO]) for { last := lastTSPointer.Load() - if current.tso <= last.tso || !current.arrival.After(last.arrival) { + if current.tso <= last.tso { return } + if last.arrival.After(current.arrival) { + current.arrival = last.arrival + } if lastTSPointer.CompareAndSwap(last, current) { return } @@ -691,6 +694,12 @@ func (o *pdOracle) ValidateReadTS(ctx context.Context, readTS uint64, isStaleRea o.adjustUpdateLowResolutionTSIntervalWithRequestedStaleness(readTS, currentTS, time.Now()) } if readTS > currentTS { + logutil.Logger(ctx).Warn("DBG", + zap.Uint64("readTS", readTS), + zap.Uint64("currentTS", currentTS), + zap.Uint64("latestTS", latestTSInfo.tso), + zap.Time("latest arrive", latestTSInfo.arrival), + ) return oracle.ErrFutureTSRead{ ReadTS: readTS, CurrentTS: currentTS, diff --git a/oracle/oracles/pd_test.go b/oracle/oracles/pd_test.go index 5110cc266c..01c67c5e46 100644 --- a/oracle/oracles/pd_test.go +++ b/oracle/oracles/pd_test.go @@ -551,3 +551,38 @@ func TestValidateReadTSForNormalReadDoNotAffectUpdateInterval(t *testing.T) { assert.NoError(t, err) mustNoNotify() } + +func TestSetLastTSAlwaysPushTS(t *testing.T) { + oracleInterface, err := NewPdOracle(&MockPdClient{}, &PDOracleOptions{ + UpdateInterval: time.Second * 2, + NoUpdateTS: true, + }) + assert.NoError(t, err) + o := oracleInterface.(*pdOracle) + defer o.Close() + + var wg sync.WaitGroup + cancel := make(chan struct{}) + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + ctx := context.Background() + for { + select { + case <-cancel: + return + default: + } + ts, err := o.GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + assert.NoError(t, err) + lastTS, found := o.getLastTS(oracle.GlobalTxnScope) + assert.True(t, found) + assert.GreaterOrEqual(t, lastTS, ts) + } + }() + } + time.Sleep(time.Second) + close(cancel) + wg.Wait() +} From 89692bff0cfa3b58e3aee1065a4abad04f2a9d46 Mon Sep 17 00:00:00 2001 From: you06 Date: Fri, 20 Dec 2024 14:50:13 +0900 Subject: [PATCH 2/2] remove debug log Signed-off-by: you06 --- oracle/oracles/pd.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/oracle/oracles/pd.go b/oracle/oracles/pd.go index bd55ae6cf9..75e297169a 100644 --- a/oracle/oracles/pd.go +++ b/oracle/oracles/pd.go @@ -694,12 +694,6 @@ func (o *pdOracle) ValidateReadTS(ctx context.Context, readTS uint64, isStaleRea o.adjustUpdateLowResolutionTSIntervalWithRequestedStaleness(readTS, currentTS, time.Now()) } if readTS > currentTS { - logutil.Logger(ctx).Warn("DBG", - zap.Uint64("readTS", readTS), - zap.Uint64("currentTS", currentTS), - zap.Uint64("latestTS", latestTSInfo.tso), - zap.Time("latest arrive", latestTSInfo.arrival), - ) return oracle.ErrFutureTSRead{ ReadTS: readTS, CurrentTS: currentTS,