Skip to content

Commit

Permalink
Merge branch 'master' into store-level-commit-ts-check
Browse files Browse the repository at this point in the history
  • Loading branch information
you06 authored Dec 25, 2024
2 parents 000fc5f + f2266d6 commit e122acd
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 83 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/compatibility.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ jobs:
- name: Checkout TiDB
uses: actions/checkout@v2
with:
repository: pingcap/tidb
repository: crazycs520/tidb
ref: opt-stats
path: tidb

- name: Check build
Expand Down
1 change: 1 addition & 0 deletions integration_tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ require (

replace (
github.com/go-ldap/ldap/v3 => github.com/YangKeao/ldap/v3 v3.4.5-0.20230421065457-369a3bab1117
github.com/pingcap/tidb => github.com/crazycs520/tidb v1.1.0-beta.0.20241225031701-9554991d22f6

github.com/tikv/client-go/v2 => ../
)
2 changes: 2 additions & 0 deletions integration_tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,8 @@ github.com/coreos/go-semver v0.3.1/go.mod h1:irMmmIw/7yzSRPWryHsK7EYSg09caPQL03V
github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
github.com/crazycs520/tidb v1.1.0-beta.0.20241225031701-9554991d22f6 h1:yz4Q18nLRc/8R4/zFsG6Lw/q/wT6Yn60QuOC6DpWDXc=
github.com/crazycs520/tidb v1.1.0-beta.0.20241225031701-9554991d22f6/go.mod h1:9slOs9kUMdP48FtRb7K/njHPSKcVNiJqWYVsCxtPvQE=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 h1:iwZdTE0PVqJCos1vaoKsclOGD3ADKpshg3SRtYBbwso=
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548/go.mod h1:e6NPNENfs9mPDVNRekM7lKScauxd5kXTr1Mfyig6TDM=
Expand Down
7 changes: 7 additions & 0 deletions integration_tests/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,13 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats() {
"scan_detail: {total_process_keys: 20, total_process_keys_size: 20, total_keys: 30, get_snapshot_time: 1µs, " +
"rocksdb: {delete_skipped_count: 10, key_skipped_count: 2, block: {cache_hit_count: 20, read_count: 40, read_byte: 30 Bytes}}}"
s.Equal(expect, snapshot.FormatStats())
snapshot.GetResolveLockDetail().ResolveLockTime = int64(time.Second)
expect = "Get:{num_rpc:4, total_time:2s},txnLockFast_backoff:{num:2, total_time:10ms}, " +
"time_detail: {total_process_time: 200ms, total_wait_time: 200ms}, " +
"resolve_lock_time:1s, " +
"scan_detail: {total_process_keys: 20, total_process_keys_size: 20, total_keys: 30, get_snapshot_time: 1µs, " +
"rocksdb: {delete_skipped_count: 10, key_skipped_count: 2, block: {cache_hit_count: 20, read_count: 40, read_byte: 30 Bytes}}}"
s.Equal(expect, snapshot.FormatStats())
}

func (s *testSnapshotSuite) TestRCRead() {
Expand Down
78 changes: 50 additions & 28 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ func (s *RegionRequestSender) String() string {

// RegionRequestRuntimeStats records the runtime stats of send region requests.
type RegionRequestRuntimeStats struct {
RPCStats map[tikvrpc.CmdType]*RPCRuntimeStats
// RPCStatsList uses to record RPC requests stats, since in most cases, only one kind of rpc request is sent at a time, use slice instead of map for performance.
RPCStatsList []RPCRuntimeStats
RequestErrorStats
}

Expand All @@ -142,29 +143,47 @@ type RequestErrorStats struct {
// NewRegionRequestRuntimeStats returns a new RegionRequestRuntimeStats.
func NewRegionRequestRuntimeStats() *RegionRequestRuntimeStats {
return &RegionRequestRuntimeStats{
RPCStats: make(map[tikvrpc.CmdType]*RPCRuntimeStats),
RPCStatsList: make([]RPCRuntimeStats, 0, 1),
}
}

// RPCRuntimeStats indicates the RPC request count and consume time.
type RPCRuntimeStats struct {
Count int64
Cmd tikvrpc.CmdType
Count uint32
// Send region request consume time.
Consume int64
Consume time.Duration
}

// RecordRPCRuntimeStats uses to record the rpc count and duration stats.
func (r *RegionRequestRuntimeStats) RecordRPCRuntimeStats(cmd tikvrpc.CmdType, d time.Duration) {
stat, ok := r.RPCStats[cmd]
if !ok {
r.RPCStats[cmd] = &RPCRuntimeStats{
Count: 1,
Consume: int64(d),
for i := range r.RPCStatsList {
if r.RPCStatsList[i].Cmd == cmd {
r.RPCStatsList[i].Count++
r.RPCStatsList[i].Consume += d
return
}
}
r.RPCStatsList = append(r.RPCStatsList, RPCRuntimeStats{
Cmd: cmd,
Count: 1,
Consume: d,
})
}

// GetRPCStatsCount returns the total rpc types count.
func (r *RegionRequestRuntimeStats) GetRPCStatsCount() int {
return len(r.RPCStatsList)
}

// GetCmdRPCCount returns the rpc count of the specified cmd type.
func (r *RegionRequestRuntimeStats) GetCmdRPCCount(cmd tikvrpc.CmdType) uint32 {
for i := range r.RPCStatsList {
if r.RPCStatsList[i].Cmd == cmd {
return r.RPCStatsList[i].Count
}
return
}
stat.Count++
stat.Consume += int64(d)
return 0
}

// RecordRPCErrorStats uses to record the request error(region error label and rpc error) info and count.
Expand Down Expand Up @@ -198,15 +217,15 @@ func (r *RegionRequestRuntimeStats) String() string {
return ""
}
var builder strings.Builder
for k, v := range r.RPCStats {
for _, v := range r.RPCStatsList {
if builder.Len() > 0 {
builder.WriteByte(',')
}
builder.WriteString(k.String())
builder.WriteString(v.Cmd.String())
builder.WriteString(":{num_rpc:")
builder.WriteString(strconv.FormatInt(v.Count, 10))
builder.WriteString(strconv.FormatUint(uint64(v.Count), 10))
builder.WriteString(", total_time:")
builder.WriteString(util.FormatDuration(time.Duration(v.Consume)))
builder.WriteString(util.FormatDuration(v.Consume))
builder.WriteString("}")
}
if errStatsStr := r.RequestErrorStats.String(); errStatsStr != "" {
Expand Down Expand Up @@ -242,7 +261,8 @@ func (r *RequestErrorStats) String() string {
// Clone returns a copy of itself.
func (r *RegionRequestRuntimeStats) Clone() *RegionRequestRuntimeStats {
newRs := NewRegionRequestRuntimeStats()
maps.Copy(newRs.RPCStats, r.RPCStats)
newRs.RPCStatsList = make([]RPCRuntimeStats, 0, len(r.RPCStatsList))
newRs.RPCStatsList = append(newRs.RPCStatsList, r.RPCStatsList...)
if len(r.ErrStats) > 0 {
newRs.ErrStats = make(map[string]int)
maps.Copy(newRs.ErrStats, r.ErrStats)
Expand All @@ -256,17 +276,8 @@ func (r *RegionRequestRuntimeStats) Merge(rs *RegionRequestRuntimeStats) {
if rs == nil {
return
}
for cmd, v := range rs.RPCStats {
stat, ok := r.RPCStats[cmd]
if !ok {
r.RPCStats[cmd] = &RPCRuntimeStats{
Count: v.Count,
Consume: v.Consume,
}
continue
}
stat.Count += v.Count
stat.Consume += v.Consume
for i := range rs.RPCStatsList {
r.mergeRPCRuntimeStats(rs.RPCStatsList[i])
}
if len(rs.ErrStats) > 0 {
if r.ErrStats == nil {
Expand All @@ -279,6 +290,17 @@ func (r *RegionRequestRuntimeStats) Merge(rs *RegionRequestRuntimeStats) {
}
}

func (r *RegionRequestRuntimeStats) mergeRPCRuntimeStats(rs RPCRuntimeStats) {
for i := range r.RPCStatsList {
if r.RPCStatsList[i].Cmd == rs.Cmd {
r.RPCStatsList[i].Count += rs.Count
r.RPCStatsList[i].Consume += rs.Consume
return
}
}
r.RPCStatsList = append(r.RPCStatsList, rs)
}

// ReplicaAccessStats records the replica access info.
type ReplicaAccessStats struct {
// AccessInfos records the access info
Expand Down
20 changes: 10 additions & 10 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1168,10 +1168,10 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() {
regionErr, err := resp.GetRegionError()
s.Nil(err)
s.True(IsFakeRegionError(regionErr))
s.Equal(1, len(s.regionRequestSender.Stats.RPCStats))
s.Equal(int64(3), s.regionRequestSender.Stats.RPCStats[tikvrpc.CmdGet].Count) // 3 rpc
s.Equal(3, len(reqTargetAddrs)) // each rpc to a different store.
s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry.
s.Equal(1, s.regionRequestSender.Stats.GetRPCStatsCount())
s.Equal(uint32(3), s.regionRequestSender.Stats.GetCmdRPCCount(tikvrpc.CmdGet)) // 3 rpc
s.Equal(3, len(reqTargetAddrs)) // each rpc to a different store.
s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry.
// warn: must rest MaxExecutionDurationMs before retry.
resetStats()
if staleRead {
Expand All @@ -1187,9 +1187,9 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() {
s.Nil(err)
s.Nil(regionErr)
s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value)
s.Equal(1, len(s.regionRequestSender.Stats.RPCStats))
s.Equal(int64(1), s.regionRequestSender.Stats.RPCStats[tikvrpc.CmdGet].Count) // 1 rpc
s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry.
s.Equal(1, s.regionRequestSender.Stats.GetRPCStatsCount())
s.Equal(uint32(1), s.regionRequestSender.Stats.GetCmdRPCCount(tikvrpc.CmdGet)) // 1 rpc
s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry.
}
}

Expand Down Expand Up @@ -1398,9 +1398,9 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadTryFollowerAfterTimeo
s.Nil(err)
s.Nil(regionErr)
s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value)
s.Equal(1, len(s.regionRequestSender.Stats.RPCStats))
s.Equal(int64(2), s.regionRequestSender.Stats.RPCStats[tikvrpc.CmdGet].Count) // 2 rpc
s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry.
s.Equal(1, s.regionRequestSender.Stats.GetRPCStatsCount())
s.Equal(uint32(2), s.regionRequestSender.Stats.GetCmdRPCCount(tikvrpc.CmdGet)) // 2 rpc
s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry.
}

func (s *testRegionRequestToThreeStoresSuite) TestDoNotTryUnreachableLeader() {
Expand Down
73 changes: 31 additions & 42 deletions txnkv/txnsnapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ package txnsnapshot
import (
"bytes"
"context"
"fmt"
"math"
"strconv"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -564,7 +564,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys,
resolveLocksOpts := txnlock.ResolveLocksOptions{
CallerStartTS: s.version,
Locks: locks,
Detail: s.getResolveLockDetail(),
Detail: s.GetResolveLockDetail(),
}
resolveLocksRes, err := cli.ResolveLocksWithOpts(bo, resolveLocksOpts)
msBeforeExpired := resolveLocksRes.TTL
Expand Down Expand Up @@ -785,7 +785,7 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([]
resolveLocksOpts := txnlock.ResolveLocksOptions{
CallerStartTS: s.version,
Locks: locks,
Detail: s.getResolveLockDetail(),
Detail: s.GetResolveLockDetail(),
}
resolveLocksRes, err := cli.ResolveLocksWithOpts(bo, resolveLocksOpts)
if err != nil {
Expand All @@ -810,17 +810,6 @@ func (s *KVSnapshot) mergeExecDetail(detail *kvrpcpb.ExecDetailsV2) {
if detail == nil || s.mu.stats == nil {
return
}
if s.mu.stats.resolveLockDetail == nil {
s.mu.stats.resolveLockDetail = &util.ResolveLockDetail{}
}
if s.mu.stats.scanDetail == nil {
s.mu.stats.scanDetail = &util.ScanDetail{
ResolveLock: s.mu.stats.resolveLockDetail,
}
}
if s.mu.stats.timeDetail == nil {
s.mu.stats.timeDetail = &util.TimeDetail{}
}
s.mu.stats.scanDetail.MergeFromScanDetailV2(detail.ScanDetailV2)
s.mu.stats.timeDetail.MergeFromTimeDetail(detail.TimeDetailV2, detail.TimeDetail)
}
Expand Down Expand Up @@ -1103,13 +1092,14 @@ func (s *KVSnapshot) GetKVReadTimeout() time.Duration {
return s.readTimeout
}

func (s *KVSnapshot) getResolveLockDetail() *util.ResolveLockDetail {
// GetResolveLockDetail returns ResolveLockDetail, exports for testing.
func (s *KVSnapshot) GetResolveLockDetail() *util.ResolveLockDetail {
s.mu.RLock()
defer s.mu.RUnlock()
if s.mu.stats == nil {
return nil
}
return s.mu.stats.resolveLockDetail
return &s.mu.stats.resolveLockDetail
}

// SetPipelined sets the snapshot to pipelined mode.
Expand All @@ -1128,14 +1118,18 @@ type SnapshotRuntimeStats struct {
rpcStats *locate.RegionRequestRuntimeStats
backoffSleepMS map[string]int
backoffTimes map[string]int
scanDetail *util.ScanDetail
timeDetail *util.TimeDetail
resolveLockDetail *util.ResolveLockDetail
scanDetail util.ScanDetail
timeDetail util.TimeDetail
resolveLockDetail util.ResolveLockDetail
}

// Clone implements the RuntimeStats interface.
func (rs *SnapshotRuntimeStats) Clone() *SnapshotRuntimeStats {
newRs := SnapshotRuntimeStats{}
newRs := SnapshotRuntimeStats{
scanDetail: rs.scanDetail,
timeDetail: rs.timeDetail,
resolveLockDetail: rs.resolveLockDetail,
}
if rs.rpcStats != nil {
newRs.rpcStats = rs.rpcStats.Clone()
}
Expand All @@ -1149,19 +1143,6 @@ func (rs *SnapshotRuntimeStats) Clone() *SnapshotRuntimeStats {
newRs.backoffTimes[k] += v
}
}

if rs.scanDetail != nil {
newRs.scanDetail = rs.scanDetail
}

if rs.timeDetail != nil {
newRs.timeDetail = rs.timeDetail
}

if rs.resolveLockDetail != nil {
newRs.resolveLockDetail = rs.resolveLockDetail
}

return &newRs
}

Expand All @@ -1187,6 +1168,9 @@ func (rs *SnapshotRuntimeStats) Merge(other *SnapshotRuntimeStats) {
rs.backoffTimes[k] += v
}
}
rs.scanDetail.Merge(&other.scanDetail)
rs.timeDetail.Merge(&other.timeDetail)
rs.resolveLockDetail.Merge(&other.resolveLockDetail)
}

// String implements fmt.Stringer interface.
Expand All @@ -1201,13 +1185,23 @@ func (rs *SnapshotRuntimeStats) String() string {
}
ms := rs.backoffSleepMS[k]
d := time.Duration(ms) * time.Millisecond
buf.WriteString(fmt.Sprintf("%s_backoff:{num:%d, total_time:%s}", k, v, util.FormatDuration(d)))
buf.WriteString(k)
buf.WriteString("_backoff:{num:")
buf.WriteString(strconv.Itoa(v))
buf.WriteString(", total_time:")
buf.WriteString(util.FormatDuration(d))
buf.WriteString("}")
}
timeDetail := rs.timeDetail.String()
if timeDetail != "" {
buf.WriteString(", ")
buf.WriteString(timeDetail)
}
if rs.resolveLockDetail.ResolveLockTime > 0 {
buf.WriteString(", ")
buf.WriteString("resolve_lock_time:")
buf.WriteString(util.FormatDuration(time.Duration(rs.resolveLockDetail.ResolveLockTime)))
}
scanDetail := rs.scanDetail.String()
if scanDetail != "" {
buf.WriteString(", ")
Expand All @@ -1218,19 +1212,14 @@ func (rs *SnapshotRuntimeStats) String() string {

// GetTimeDetail returns the timeDetail
func (rs *SnapshotRuntimeStats) GetTimeDetail() *util.TimeDetail {
return rs.timeDetail
return &rs.timeDetail
}

// GetCmdRPCCount returns the count of the corresponding kind of rpc requests
func (rs *SnapshotRuntimeStats) GetCmdRPCCount(cmd tikvrpc.CmdType) int64 {
if rs.rpcStats == nil || len(rs.rpcStats.RPCStats) == 0 {
return 0
}

stats, ok := rs.rpcStats.RPCStats[cmd]
if !ok {
if rs.rpcStats == nil {
return 0
}

return stats.Count
return int64(rs.rpcStats.GetCmdRPCCount(cmd))
}
2 changes: 0 additions & 2 deletions util/execdetails.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,8 +417,6 @@ type ScanDetail struct {
RocksdbBlockReadDuration time.Duration
// GetSnapshotDuration is the time spent getting an engine snapshot.
GetSnapshotDuration time.Duration

ResolveLock *ResolveLockDetail
}

// Merge merges scan detail execution details into self.
Expand Down

0 comments on commit e122acd

Please sign in to comment.