Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics: optimize rpc latency related metrics #1556

Merged
merged 2 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 104 additions & 116 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,13 @@ type connArray struct {
done chan struct{}

monitor *connMonitor

metrics struct {
rpcLatHist *rpcMetrics
rpcSrcLatSum sync.Map
rpcNetLatExternal prometheus.Observer
rpcNetLatInternal prometheus.Observer
}
}

func newConnArray(maxSize uint, addr string, ver uint64, security config.Security,
Expand All @@ -181,6 +188,9 @@ func newConnArray(maxSize uint, addr string, ver uint64, security config.Securit
dialTimeout: dialTimeout,
monitor: m,
}
a.metrics.rpcLatHist = deriveRPCMetrics(metrics.TiKVSendReqHistogram.MustCurryWith(prometheus.Labels{metrics.LblStore: addr}))
a.metrics.rpcNetLatExternal = metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(addr, "false")
a.metrics.rpcNetLatInternal = metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(addr, "true")
if err := a.Init(addr, security, idleNotify, enableBatch, eventListener, opts...); err != nil {
return nil, err
}
Expand Down Expand Up @@ -363,7 +373,7 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint
dialTimeout: a.dialTimeout,
tryLock: tryLock{sync.NewCond(new(sync.Mutex)), false},
eventListener: eventListener,
metrics: &a.metrics,
metrics: &a.batchConn.metrics,
}
batchClient.maxConcurrencyRequestLimit.Store(cfg.TiKVClient.MaxConcurrencyRequestLimit)
a.batchCommandsClients = append(a.batchCommandsClients, batchClient)
Expand Down Expand Up @@ -400,6 +410,40 @@ func (a *connArray) Close() {
close(a.done)
}

func (a *connArray) updateRPCMetrics(req *tikvrpc.Request, resp *tikvrpc.Response, latency time.Duration) {
seconds := latency.Seconds()
stale := req.GetStaleRead()
source := req.GetRequestSource()
internal := util.IsInternalRequest(req.GetRequestSource())

a.metrics.rpcLatHist.get(req.Type, stale, internal).Observe(seconds)

srcLatSum, ok := a.metrics.rpcSrcLatSum.Load(source)
if !ok {
srcLatSum = deriveRPCMetrics(metrics.TiKVSendReqSummary.MustCurryWith(
prometheus.Labels{metrics.LblStore: a.target, metrics.LblSource: source}))
a.metrics.rpcSrcLatSum.Store(source, srcLatSum)
}
srcLatSum.(*rpcMetrics).get(req.Type, stale, internal).Observe(seconds)

if execDetail := resp.GetExecDetailsV2(); execDetail != nil {
var totalRpcWallTimeNs uint64
if execDetail.TimeDetailV2 != nil {
totalRpcWallTimeNs = execDetail.TimeDetailV2.TotalRpcWallTimeNs
} else if execDetail.TimeDetail != nil {
totalRpcWallTimeNs = execDetail.TimeDetail.TotalRpcWallTimeNs
}
if totalRpcWallTimeNs > 0 {
lat := latency - time.Duration(totalRpcWallTimeNs)
if internal {
a.metrics.rpcNetLatInternal.Observe(lat.Seconds())
} else {
a.metrics.rpcNetLatExternal.Observe(lat.Seconds())
}
}
}
}

type option struct {
gRPCDialOptions []grpc.DialOption
security config.Security
Expand Down Expand Up @@ -542,114 +586,6 @@ func (c *RPCClient) closeConns() {
c.Unlock()
}

var (
sendReqHistCache sync.Map
sendReqCounterCache sync.Map
rpcNetLatencyHistCache sync.Map
)

type sendReqHistCacheKey struct {
tp tikvrpc.CmdType
id uint64
staleRad bool
isInternal bool
}

type sendReqCounterCacheKey struct {
sendReqHistCacheKey
requestSource string
}

type rpcNetLatencyCacheKey struct {
storeID uint64
isInternal bool
}

type sendReqCounterCacheValue struct {
counter prometheus.Counter
timeCounter prometheus.Counter
}

func (c *RPCClient) updateSendReqHistogramAndExecStats(req *tikvrpc.Request, resp *tikvrpc.Response, start time.Time, staleRead bool, execDetails *util.ExecDetails) {
elapsed := time.Since(start)
secs := elapsed.Seconds()
storeID := req.Context.GetPeer().GetStoreId()
isInternal := util.IsInternalRequest(req.GetRequestSource())

histKey := sendReqHistCacheKey{
req.Type,
storeID,
staleRead,
isInternal,
}
counterKey := sendReqCounterCacheKey{
histKey,
req.GetRequestSource(),
}

reqType := req.Type.String()
var storeIDStr string

hist, ok := sendReqHistCache.Load(histKey)
if !ok {
if len(storeIDStr) == 0 {
storeIDStr = strconv.FormatUint(storeID, 10)
}
hist = metrics.TiKVSendReqHistogram.WithLabelValues(reqType, storeIDStr,
strconv.FormatBool(staleRead), strconv.FormatBool(isInternal))
sendReqHistCache.Store(histKey, hist)
}
counter, ok := sendReqCounterCache.Load(counterKey)
if !ok {
if len(storeIDStr) == 0 {
storeIDStr = strconv.FormatUint(storeID, 10)
}
counter = sendReqCounterCacheValue{
metrics.TiKVSendReqCounter.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead),
counterKey.requestSource, strconv.FormatBool(isInternal)),
metrics.TiKVSendReqTimeCounter.WithLabelValues(reqType, storeIDStr,
strconv.FormatBool(staleRead), counterKey.requestSource, strconv.FormatBool(isInternal)),
}
sendReqCounterCache.Store(counterKey, counter)
}

hist.(prometheus.Observer).Observe(secs)
counter.(sendReqCounterCacheValue).counter.Inc()
counter.(sendReqCounterCacheValue).timeCounter.Add(secs)

if execDetail := resp.GetExecDetailsV2(); execDetail != nil {
var totalRpcWallTimeNs uint64
if execDetail.TimeDetailV2 != nil {
totalRpcWallTimeNs = execDetail.TimeDetailV2.TotalRpcWallTimeNs
} else if execDetail.TimeDetail != nil {
totalRpcWallTimeNs = execDetail.TimeDetail.TotalRpcWallTimeNs
}
if totalRpcWallTimeNs > 0 {
cacheKey := rpcNetLatencyCacheKey{
storeID,
isInternal,
}
latHist, ok := rpcNetLatencyHistCache.Load(cacheKey)
if !ok {
if len(storeIDStr) == 0 {
storeIDStr = strconv.FormatUint(storeID, 10)
}
latHist = metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(storeIDStr, strconv.FormatBool(isInternal))
rpcNetLatencyHistCache.Store(cacheKey, latHist)
}
latency := elapsed - time.Duration(totalRpcWallTimeNs)*time.Nanosecond
latHist.(prometheus.Observer).Observe(latency.Seconds())
}
}

execNetworkCollector := &networkCollector{}
// update execDetails
if execDetails != nil {
execNetworkCollector.onReq(req, execDetails)
execNetworkCollector.onResp(req, resp, execDetails)
}
}

func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (resp *tikvrpc.Response, err error) {
var spanRPC opentracing.Span
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
Expand All @@ -675,15 +611,17 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R
}

start := time.Now()
staleRead := req.GetStaleRead()
defer func() {
stmtExec := ctx.Value(util.ExecDetailsKey)
var detail *util.ExecDetails
if stmtExec != nil {
detail = stmtExec.(*util.ExecDetails)
atomic.AddInt64(&detail.WaitKVRespDuration, int64(time.Since(start)))
elapsed := time.Since(start)
connArray.updateRPCMetrics(req, resp, elapsed)

if stmtExec := ctx.Value(util.ExecDetailsKey); stmtExec != nil {
execDetails := stmtExec.(*util.ExecDetails)
atomic.AddInt64(&execDetails.WaitKVRespDuration, int64(elapsed))
execNetworkCollector := networkCollector{}
execNetworkCollector.onReq(req, execDetails)
execNetworkCollector.onResp(req, resp, execDetails)
}
c.updateSendReqHistogramAndExecStats(req, resp, start, staleRead, detail)

if spanRPC != nil && util.TraceExecDetailsEnabled(ctx) {
if si := buildSpanInfoFromResp(resp); si != nil {
Expand Down Expand Up @@ -1037,3 +975,53 @@ func buildSpanInfoFromResp(resp *tikvrpc.Response) *spanInfo {

return &spanRPC
}

func deriveRPCMetrics(root prometheus.ObserverVec) *rpcMetrics {
return &rpcMetrics{
root: root,
latGet: root.With(prometheus.Labels{metrics.LblType: tikvrpc.CmdGet.String(), metrics.LblStaleRead: "false", metrics.LblScope: "false"}),
latCop: root.With(prometheus.Labels{metrics.LblType: tikvrpc.CmdCop.String(), metrics.LblStaleRead: "false", metrics.LblScope: "false"}),
latBatchGet: root.With(prometheus.Labels{metrics.LblType: tikvrpc.CmdBatchGet.String(), metrics.LblStaleRead: "false", metrics.LblScope: "false"}),
}
}

type rpcMetrics struct {
root prometheus.ObserverVec

// static metrics
latGet prometheus.Observer
latCop prometheus.Observer
latBatchGet prometheus.Observer

latOther sync.Map
}

func (m *rpcMetrics) get(cmd tikvrpc.CmdType, stale bool, internal bool) prometheus.Observer {
if !stale && !internal {
switch cmd {
case tikvrpc.CmdGet:
return m.latGet
case tikvrpc.CmdCop:
return m.latCop
case tikvrpc.CmdBatchGet:
return m.latBatchGet
}
}
key := uint64(cmd)
if stale {
key |= 1 << 16
}
if internal {
key |= 1 << 17
}
lat, ok := m.latOther.Load(key)
if !ok {
lat = m.root.With(prometheus.Labels{
metrics.LblType: cmd.String(),
metrics.LblStaleRead: strconv.FormatBool(stale),
metrics.LblScope: strconv.FormatBool(internal),
})
m.latOther.Store(key, lat)
}
return lat.(prometheus.Observer)
}
1 change: 0 additions & 1 deletion internal/client/metrics_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ func (s *networkCollector) onReq(req *tikvrpc.Request, details *util.ExecDetails
// ignore others
return
}
size += req.Context.Size()
isTiflashTarget := req.StoreTp == tikvrpc.TiFlash
var total, crossZone *int64
if isTiflashTarget {
Expand Down
14 changes: 4 additions & 10 deletions internal/client/metrics_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,13 @@ func TestNetworkCollectorOnReq(t *testing.T) {
reqs := []*tikvrpc.Request{
tikvrpc.NewRequest(
tikvrpc.CmdGet,
&kvrpcpb.GetRequest{Key: []byte("key")},
kvrpcpb.Context{
BusyThresholdMs: 50,
},
&kvrpcpb.GetRequest{Context: &kvrpcpb.Context{BusyThresholdMs: 50}, Key: []byte("key")},
),
tikvrpc.NewReplicaReadRequest(
tikvrpc.CmdGet,
&kvrpcpb.GetRequest{Key: []byte("key")},
&kvrpcpb.GetRequest{Context: &kvrpcpb.Context{StaleRead: true}, Key: []byte("key")},
kv.ReplicaReadFollower,
nil,
kvrpcpb.Context{
StaleRead: true,
},
),
}

Expand All @@ -58,12 +52,12 @@ func TestNetworkCollectorOnReq(t *testing.T) {
req *tikvrpc.Request
}{
{
expectUnpackedBytesSentKV: 8,
expectUnpackedBytesSentKV: 10,
expectUnpackedBytesSentKVCrossZone: 0,
req: reqs[0],
},
{
expectUnpackedBytesSentKV: 18,
expectUnpackedBytesSentKV: 20,
expectUnpackedBytesSentKVCrossZone: 0,
req: reqs[1],
},
Expand Down
25 changes: 7 additions & 18 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ var (
TiKVTxnCmdHistogram *prometheus.HistogramVec
TiKVBackoffHistogram *prometheus.HistogramVec
TiKVSendReqHistogram *prometheus.HistogramVec
TiKVSendReqCounter *prometheus.CounterVec
TiKVSendReqTimeCounter *prometheus.CounterVec
TiKVSendReqSummary *prometheus.SummaryVec
TiKVRPCNetLatencyHistogram *prometheus.HistogramVec
TiKVCoprocessorHistogram *prometheus.HistogramVec
TiKVLockResolverCounter *prometheus.CounterVec
Expand Down Expand Up @@ -176,23 +175,14 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) {
ConstLabels: constLabels,
}, []string{LblType, LblStore, LblStaleRead, LblScope})

TiKVSendReqCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "request_counter",
Help: "Counter of sending request with multi dimensions.",
ConstLabels: constLabels,
}, []string{LblType, LblStore, LblStaleRead, LblSource, LblScope})

TiKVSendReqTimeCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
TiKVSendReqSummary = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "request_time_counter",
Help: "Counter of request time with multi dimensions.",
Name: "source_request_seconds",
Help: "Summary of sending request with multi dimensions.",
ConstLabels: constLabels,
}, []string{LblType, LblStore, LblStaleRead, LblSource, LblScope})
}, []string{LblType, LblStore, LblStaleRead, LblScope, LblSource})

TiKVRPCNetLatencyHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Expand Down Expand Up @@ -884,8 +874,7 @@ func RegisterMetrics() {
prometheus.MustRegister(TiKVTxnCmdHistogram)
prometheus.MustRegister(TiKVBackoffHistogram)
prometheus.MustRegister(TiKVSendReqHistogram)
prometheus.MustRegister(TiKVSendReqCounter)
prometheus.MustRegister(TiKVSendReqTimeCounter)
prometheus.MustRegister(TiKVSendReqSummary)
prometheus.MustRegister(TiKVRPCNetLatencyHistogram)
prometheus.MustRegister(TiKVCoprocessorHistogram)
prometheus.MustRegister(TiKVLockResolverCounter)
Expand Down
Loading