Skip to content

Commit

Permalink
resource_control: add ru details in ExecDetails (tikv#1070)
Browse files Browse the repository at this point in the history
* collecting the RU information by pasing point through context.Value (tikv#1032)

Signed-off-by: zzm <[email protected]>

* add ruWaitDuration to RUDetails and update pd-client

Signed-off-by: glorv <[email protected]>

---------

Signed-off-by: zzm <[email protected]>
Signed-off-by: glorv <[email protected]>
Co-authored-by: zzm <[email protected]>
  • Loading branch information
2 people authored and nolouch committed Feb 2, 2024
1 parent 46811b6 commit 77cd257
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 125 deletions.
34 changes: 15 additions & 19 deletions internal/client/client_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package client

import (
"context"
"sync"
"sync/atomic"
"time"

Expand All @@ -35,17 +34,16 @@ var _ Client = interceptedClient{}

type interceptedClient struct {
Client
ruRuntimeStatsMap *sync.Map
}

// NewInterceptedClient creates a Client which can execute interceptor.
func NewInterceptedClient(client Client, ruRuntimeStatsMap *sync.Map) Client {
return interceptedClient{client, ruRuntimeStatsMap}
func NewInterceptedClient(client Client) Client {
return interceptedClient{client}
}

func (r interceptedClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
// Build the resource control interceptor.
var finalInterceptor interceptor.RPCInterceptor = buildResourceControlInterceptor(ctx, req, r.getRURuntimeStats(req.GetStartTS()))
var finalInterceptor interceptor.RPCInterceptor = buildResourceControlInterceptor(ctx, req)
// Chain the interceptors if there are multiple interceptors.
if it := interceptor.GetRPCInterceptorFromCtx(ctx); it != nil {
if finalInterceptor != nil {
Expand All @@ -62,16 +60,6 @@ func (r interceptedClient) SendRequest(ctx context.Context, addr string, req *ti
return r.Client.SendRequest(ctx, addr, req, timeout)
}

func (r interceptedClient) getRURuntimeStats(startTS uint64) *util.RURuntimeStats {
if r.ruRuntimeStatsMap == nil || startTS == 0 {
return nil
}
if v, ok := r.ruRuntimeStatsMap.Load(startTS); ok {
return v.(*util.RURuntimeStats)
}
return nil
}

var (
// ResourceControlSwitch is used to control whether to enable the resource control.
ResourceControlSwitch atomic.Value
Expand All @@ -84,7 +72,6 @@ var (
func buildResourceControlInterceptor(
ctx context.Context,
req *tikvrpc.Request,
ruRuntimeStats *util.RURuntimeStats,
) interceptor.RPCInterceptor {
if !ResourceControlSwitch.Load().(bool) {
return nil
Expand All @@ -102,6 +89,8 @@ func buildResourceControlInterceptor(
}
resourceControlInterceptor := *rcInterceptor

ruDetails := ctx.Value(util.RUDetailsCtxKey)

// Make the request info.
reqInfo := resourcecontrol.MakeRequestInfo(req)
// Build the interceptor.
Expand All @@ -116,26 +105,33 @@ func buildResourceControlInterceptor(
return next(target, req)
}

consumption, penalty, priority, err := resourceControlInterceptor.OnRequestWait(ctx, resourceGroupName, reqInfo)
consumption, penalty, waitDuration, priority, err := resourceControlInterceptor.OnRequestWait(ctx, resourceGroupName, reqInfo)
if err != nil {
return nil, err
}
req.GetResourceControlContext().Penalty = penalty
ruRuntimeStats.Update(consumption)
// override request priority with resource group priority if it's not set.
// Get the priority at tikv side has some performance issue, so we pass it
// at client side. See: https://github.com/tikv/tikv/issues/15994 for more details.
if req.GetResourceControlContext().OverridePriority == 0 {
req.GetResourceControlContext().OverridePriority = uint64(priority)
}
if ruDetails != nil {
detail := ruDetails.(*util.RUDetails)
detail.Update(consumption, waitDuration)
}

resp, err := next(target, req)
if resp != nil {
respInfo := resourcecontrol.MakeResponseInfo(resp)
consumption, err = resourceControlInterceptor.OnResponse(resourceGroupName, reqInfo, respInfo)
if err != nil {
return nil, err
}
ruRuntimeStats.Update(consumption)
if ruDetails != nil {
detail := ruDetails.(*util.RUDetails)
detail.Update(consumption, time.Duration(0))
}
}
return resp, err
}
Expand Down
4 changes: 2 additions & 2 deletions internal/client/client_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (c emptyClient) CloseAddr(addr string) error {

func TestInterceptedClient(t *testing.T) {
executed := false
client := NewInterceptedClient(emptyClient{}, nil)
client := NewInterceptedClient(emptyClient{})
ctx := interceptor.WithRPCInterceptor(context.Background(), interceptor.NewRPCInterceptor("test", func(next interceptor.RPCInterceptorFunc) interceptor.RPCInterceptorFunc {
return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
executed = true
Expand All @@ -54,7 +54,7 @@ func TestInterceptedClient(t *testing.T) {

func TestAppendChainedInterceptor(t *testing.T) {
executed := make([]int, 0, 10)
client := NewInterceptedClient(emptyClient{}, nil)
client := NewInterceptedClient(emptyClient{})

mkInterceptorFn := func(i int) interceptor.RPCInterceptor {
return interceptor.NewRPCInterceptor(fmt.Sprintf("%d", i), func(next interceptor.RPCInterceptorFunc) interceptor.RPCInterceptorFunc {
Expand Down
50 changes: 2 additions & 48 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,6 @@ const (
// DCLabelKey indicates the key of label which represents the dc for Store.
DCLabelKey = "zone"
safeTSUpdateInterval = time.Second * 2
// Since the default max transaction TTL is 1 hour, we can use this to
// clean up the RU runtime stats as well.
ruRuntimeStatsCleanThreshold = time.Hour
ruRuntimeStatsCleanInterval = ruRuntimeStatsCleanThreshold / 2
)

func createEtcdKV(addrs []string, tlsConfig *tls.Config) (*clientv3.Client, error) {
Expand Down Expand Up @@ -137,9 +133,6 @@ type KVStore struct {

replicaReadSeed uint32 // this is used to load balance followers / learners when replica read is enabled

// StartTS -> RURuntimeStats, stores the RU runtime stats for certain transaction.
ruRuntimeStatsMap sync.Map

ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
Expand Down Expand Up @@ -229,14 +222,13 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Cl
cancel: cancel,
gP: NewSpool(128, 10*time.Second),
}
store.clientMu.client = client.NewReqCollapse(client.NewInterceptedClient(tikvclient, &store.ruRuntimeStatsMap))
store.clientMu.client = client.NewReqCollapse(client.NewInterceptedClient(tikvclient))
store.lockResolver = txnlock.NewLockResolver(store)
loadOption(store, opt...)

store.wg.Add(3)
store.wg.Add(2)
go store.runSafePointChecker()
go store.safeTSUpdater()
go store.ruRuntimeStatsMapCleaner()

return store, nil
}
Expand Down Expand Up @@ -709,44 +701,6 @@ func (s *KVStore) updateGlobalTxnScopeTSFromPD(ctx context.Context) bool {
return false
}

func (s *KVStore) ruRuntimeStatsMapCleaner() {
defer s.wg.Done()
t := time.NewTicker(ruRuntimeStatsCleanInterval)
defer t.Stop()
ctx, cancel := context.WithCancel(s.ctx)
ctx = util.WithInternalSourceType(ctx, util.InternalTxnGC)
defer cancel()

cleanThreshold := ruRuntimeStatsCleanThreshold
if _, e := util.EvalFailpoint("mockFastRURuntimeStatsMapClean"); e == nil {
t.Reset(time.Millisecond * 100)
cleanThreshold = time.Millisecond
}

for {
select {
case <-ctx.Done():
return
case now := <-t.C:
s.ruRuntimeStatsMap.Range(
func(key, _ interface{}) bool {
startTSTime := oracle.GetTimeFromTS(key.(uint64))
if now.Sub(startTSTime) >= cleanThreshold {
s.ruRuntimeStatsMap.Delete(key)
}
return true
},
)
}
}
}

// CreateRURuntimeStats creates a RURuntimeStats for the startTS and returns it.
func (s *KVStore) CreateRURuntimeStats(startTS uint64) *util.RURuntimeStats {
rrs, _ := s.ruRuntimeStatsMap.LoadOrStore(startTS, util.NewRURuntimeStats())
return rrs.(*util.RURuntimeStats)
}

// EnableResourceControl enables the resource control.
func EnableResourceControl() {
client.ResourceControlSwitch.Store(true)
Expand Down
30 changes: 0 additions & 30 deletions tikv/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -126,31 +124,3 @@ func (s *testKVSuite) TestMinSafeTs() {
s.Require().GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(2))
s.Require().Equal(uint64(80), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
}

func TestRURuntimeStatsCleanUp(t *testing.T) {
util.EnableFailpoints()
require := require.New(t)
require.Nil(failpoint.Enable("tikvclient/mockFastRURuntimeStatsMapClean", `return()`))
defer func() {
require.Nil(failpoint.Disable("tikvclient/mockFastRURuntimeStatsMapClean"))
}()

client, cluster, pdClient, err := testutils.NewMockTiKV("", nil)
require.Nil(err)
testutils.BootstrapWithSingleStore(cluster)
store, err := NewTestTiKVStore(client, pdClient, nil, nil, 0)
require.Nil(err)
defer store.Close()

// Create a ruRuntimeStats first.
startTS := oracle.ComposeTS(oracle.GetPhysical(time.Now()), 0)
ruRuntimeStats := store.CreateRURuntimeStats(startTS)
require.NotNil(ruRuntimeStats)
// Wait for the cleanup goroutine to clean up the ruRuntimeStatsMap.
time.Sleep(time.Millisecond * 150)
// The ruRuntimeStatsMap should be cleaned up.
store.ruRuntimeStatsMap.Range(func(key, value interface{}) bool {
require.Fail("ruRuntimeStatsMap should be cleaned up")
return true
})
}
66 changes: 40 additions & 26 deletions util/execdetails.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
type commitDetailCtxKeyType struct{}
type lockKeysDetailCtxKeyType struct{}
type execDetailsCtxKeyType struct{}
type ruDetailsCtxKeyType struct{}
type traceExecDetailsCtxKeyType struct{}

var (
Expand All @@ -64,6 +65,9 @@ var (
// ExecDetailsKey presents ExecDetail info key in context.
ExecDetailsKey = execDetailsCtxKeyType{}

// ruDetailsCtxKey presents RUDetals info key in context.
RUDetailsCtxKey = ruDetailsCtxKeyType{}

// traceExecDetailsKey is a context key whose value indicates whether to add ExecDetails to trace.
traceExecDetailsKey = traceExecDetailsCtxKeyType{}
)
Expand Down Expand Up @@ -682,54 +686,64 @@ func (rd *ResolveLockDetail) Merge(resolveLock *ResolveLockDetail) {
rd.ResolveLockTime += resolveLock.ResolveLockTime
}

// RURuntimeStats is the runtime stats collector for RU.
type RURuntimeStats struct {
readRU *uatomic.Float64
writeRU *uatomic.Float64
// RUDetails contains RU detail info.
type RUDetails struct {
readRU *uatomic.Float64
writeRU *uatomic.Float64
ruWaitDuration *uatomic.Duration
}

// NewRURuntimeStats creates a new RURuntimeStats.
func NewRURuntimeStats() *RURuntimeStats {
return &RURuntimeStats{
readRU: uatomic.NewFloat64(0),
writeRU: uatomic.NewFloat64(0),
// NewRUDetails creates a new RUDetails.
func NewRUDetails() *RUDetails {
return &RUDetails{
readRU: uatomic.NewFloat64(0),
writeRU: uatomic.NewFloat64(0),
ruWaitDuration: uatomic.NewDuration(0),
}
}

// Clone implements the RuntimeStats interface.
func (rs *RURuntimeStats) Clone() *RURuntimeStats {
return &RURuntimeStats{
readRU: uatomic.NewFloat64(rs.readRU.Load()),
writeRU: uatomic.NewFloat64(rs.writeRU.Load()),
func (rd *RUDetails) Clone() *RUDetails {
return &RUDetails{
readRU: uatomic.NewFloat64(rd.readRU.Load()),
writeRU: uatomic.NewFloat64(rd.writeRU.Load()),
ruWaitDuration: uatomic.NewDuration(rd.ruWaitDuration.Load()),
}
}

// Merge implements the RuntimeStats interface.
func (rs *RURuntimeStats) Merge(other *RURuntimeStats) {
rs.readRU.Add(other.readRU.Load())
rs.writeRU.Add(other.writeRU.Load())
func (rd *RUDetails) Merge(other *RUDetails) {
rd.readRU.Add(other.readRU.Load())
rd.writeRU.Add(other.writeRU.Load())
rd.ruWaitDuration.Add(other.ruWaitDuration.Load())
}

// String implements fmt.Stringer interface.
func (rs *RURuntimeStats) String() string {
return fmt.Sprintf("RRU:%f, WRU:%f", rs.readRU.Load(), rs.writeRU.Load())
func (rd *RUDetails) String() string {
return fmt.Sprintf("RRU:%f, WRU:%f, WaitDuration:%v", rd.readRU.Load(), rd.writeRU.Load(), rd.ruWaitDuration.Load())
}

// RRU returns the read RU.
func (rs RURuntimeStats) RRU() float64 {
return rs.readRU.Load()
func (rd *RUDetails) RRU() float64 {
return rd.readRU.Load()
}

// WRU returns the write RU.
func (rs RURuntimeStats) WRU() float64 {
return rs.writeRU.Load()
func (rd *RUDetails) WRU() float64 {
return rd.writeRU.Load()
}

// RUWaitDuration returns the time duration waiting for available RU.
func (rd *RUDetails) RUWaitDuration() time.Duration {
return rd.ruWaitDuration.Load()
}

// Update updates the RU runtime stats with the given consumption info.
func (rs *RURuntimeStats) Update(consumption *rmpb.Consumption) {
if rs == nil || consumption == nil {
func (rd *RUDetails) Update(consumption *rmpb.Consumption, waitDuration time.Duration) {
if rd == nil || consumption == nil {
return
}
rs.readRU.Add(consumption.RRU)
rs.writeRU.Add(consumption.WRU)
rd.readRU.Add(consumption.RRU)
rd.writeRU.Add(consumption.WRU)
rd.ruWaitDuration.Add(waitDuration)
}

0 comments on commit 77cd257

Please sign in to comment.