From 593cc0bd9a5f3c18c4336b63aeab1ac758e663b9 Mon Sep 17 00:00:00 2001 From: zyguan Date: Wed, 15 Jan 2025 01:27:25 +0000 Subject: [PATCH 1/2] metrics: optimize rpc latency related metrics Signed-off-by: zyguan --- internal/client/client.go | 220 +++++++++++++-------------- internal/client/metrics_collector.go | 1 - metrics/metrics.go | 25 +-- 3 files changed, 111 insertions(+), 135 deletions(-) diff --git a/internal/client/client.go b/internal/client/client.go index 9bd7fd5156..908c23baf4 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -168,6 +168,13 @@ type connArray struct { done chan struct{} monitor *connMonitor + + metrics struct { + rpcLatHist *rpcMetrics + rpcSrcLatSum sync.Map + rpcNetLatExternal prometheus.Observer + rpcNetLatInternal prometheus.Observer + } } func newConnArray(maxSize uint, addr string, ver uint64, security config.Security, @@ -181,6 +188,9 @@ func newConnArray(maxSize uint, addr string, ver uint64, security config.Securit dialTimeout: dialTimeout, monitor: m, } + a.metrics.rpcLatHist = deriveRPCMetrics(metrics.TiKVSendReqHistogram.MustCurryWith(prometheus.Labels{metrics.LblStore: addr})) + a.metrics.rpcNetLatExternal = metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(addr, "false") + a.metrics.rpcNetLatInternal = metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(addr, "true") if err := a.Init(addr, security, idleNotify, enableBatch, eventListener, opts...); err != nil { return nil, err } @@ -363,7 +373,7 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint dialTimeout: a.dialTimeout, tryLock: tryLock{sync.NewCond(new(sync.Mutex)), false}, eventListener: eventListener, - metrics: &a.metrics, + metrics: &a.batchConn.metrics, } batchClient.maxConcurrencyRequestLimit.Store(cfg.TiKVClient.MaxConcurrencyRequestLimit) a.batchCommandsClients = append(a.batchCommandsClients, batchClient) @@ -400,6 +410,40 @@ func (a *connArray) Close() { close(a.done) } +func (a *connArray) updateRPCMetrics(req *tikvrpc.Request, resp *tikvrpc.Response, latency time.Duration) { + seconds := latency.Seconds() + stale := req.GetStaleRead() + source := req.GetRequestSource() + internal := util.IsInternalRequest(req.GetRequestSource()) + + a.metrics.rpcLatHist.get(req.Type, stale, internal).Observe(seconds) + + srcLatSum, ok := a.metrics.rpcSrcLatSum.Load(source) + if !ok { + srcLatSum = deriveRPCMetrics(metrics.TiKVSendReqSummary.MustCurryWith( + prometheus.Labels{metrics.LblStore: a.target, metrics.LblSource: source})) + a.metrics.rpcSrcLatSum.Store(source, srcLatSum) + } + srcLatSum.(*rpcMetrics).get(req.Type, stale, internal).Observe(seconds) + + if execDetail := resp.GetExecDetailsV2(); execDetail != nil { + var totalRpcWallTimeNs uint64 + if execDetail.TimeDetailV2 != nil { + totalRpcWallTimeNs = execDetail.TimeDetailV2.TotalRpcWallTimeNs + } else if execDetail.TimeDetail != nil { + totalRpcWallTimeNs = execDetail.TimeDetail.TotalRpcWallTimeNs + } + if totalRpcWallTimeNs > 0 { + lat := latency - time.Duration(totalRpcWallTimeNs) + if internal { + a.metrics.rpcNetLatInternal.Observe(lat.Seconds()) + } else { + a.metrics.rpcNetLatExternal.Observe(lat.Seconds()) + } + } + } +} + type option struct { gRPCDialOptions []grpc.DialOption security config.Security @@ -542,114 +586,6 @@ func (c *RPCClient) closeConns() { c.Unlock() } -var ( - sendReqHistCache sync.Map - sendReqCounterCache sync.Map - rpcNetLatencyHistCache sync.Map -) - -type sendReqHistCacheKey struct { - tp tikvrpc.CmdType - id uint64 - staleRad bool - isInternal bool -} - -type sendReqCounterCacheKey struct { - sendReqHistCacheKey - requestSource string -} - -type rpcNetLatencyCacheKey struct { - storeID uint64 - isInternal bool -} - -type sendReqCounterCacheValue struct { - counter prometheus.Counter - timeCounter prometheus.Counter -} - -func (c *RPCClient) updateSendReqHistogramAndExecStats(req *tikvrpc.Request, resp *tikvrpc.Response, start time.Time, staleRead bool, execDetails *util.ExecDetails) { - elapsed := time.Since(start) - secs := elapsed.Seconds() - storeID := req.Context.GetPeer().GetStoreId() - isInternal := util.IsInternalRequest(req.GetRequestSource()) - - histKey := sendReqHistCacheKey{ - req.Type, - storeID, - staleRead, - isInternal, - } - counterKey := sendReqCounterCacheKey{ - histKey, - req.GetRequestSource(), - } - - reqType := req.Type.String() - var storeIDStr string - - hist, ok := sendReqHistCache.Load(histKey) - if !ok { - if len(storeIDStr) == 0 { - storeIDStr = strconv.FormatUint(storeID, 10) - } - hist = metrics.TiKVSendReqHistogram.WithLabelValues(reqType, storeIDStr, - strconv.FormatBool(staleRead), strconv.FormatBool(isInternal)) - sendReqHistCache.Store(histKey, hist) - } - counter, ok := sendReqCounterCache.Load(counterKey) - if !ok { - if len(storeIDStr) == 0 { - storeIDStr = strconv.FormatUint(storeID, 10) - } - counter = sendReqCounterCacheValue{ - metrics.TiKVSendReqCounter.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead), - counterKey.requestSource, strconv.FormatBool(isInternal)), - metrics.TiKVSendReqTimeCounter.WithLabelValues(reqType, storeIDStr, - strconv.FormatBool(staleRead), counterKey.requestSource, strconv.FormatBool(isInternal)), - } - sendReqCounterCache.Store(counterKey, counter) - } - - hist.(prometheus.Observer).Observe(secs) - counter.(sendReqCounterCacheValue).counter.Inc() - counter.(sendReqCounterCacheValue).timeCounter.Add(secs) - - if execDetail := resp.GetExecDetailsV2(); execDetail != nil { - var totalRpcWallTimeNs uint64 - if execDetail.TimeDetailV2 != nil { - totalRpcWallTimeNs = execDetail.TimeDetailV2.TotalRpcWallTimeNs - } else if execDetail.TimeDetail != nil { - totalRpcWallTimeNs = execDetail.TimeDetail.TotalRpcWallTimeNs - } - if totalRpcWallTimeNs > 0 { - cacheKey := rpcNetLatencyCacheKey{ - storeID, - isInternal, - } - latHist, ok := rpcNetLatencyHistCache.Load(cacheKey) - if !ok { - if len(storeIDStr) == 0 { - storeIDStr = strconv.FormatUint(storeID, 10) - } - latHist = metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(storeIDStr, strconv.FormatBool(isInternal)) - rpcNetLatencyHistCache.Store(cacheKey, latHist) - } - latency := elapsed - time.Duration(totalRpcWallTimeNs)*time.Nanosecond - latHist.(prometheus.Observer).Observe(latency.Seconds()) - } - } - - execNetworkCollector := &networkCollector{} - // update execDetails - if execDetails != nil { - execNetworkCollector.onReq(req, execDetails) - execNetworkCollector.onResp(req, resp, execDetails) - } -} - func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (resp *tikvrpc.Response, err error) { var spanRPC opentracing.Span if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { @@ -675,15 +611,17 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R } start := time.Now() - staleRead := req.GetStaleRead() defer func() { - stmtExec := ctx.Value(util.ExecDetailsKey) - var detail *util.ExecDetails - if stmtExec != nil { - detail = stmtExec.(*util.ExecDetails) - atomic.AddInt64(&detail.WaitKVRespDuration, int64(time.Since(start))) + elapsed := time.Since(start) + connArray.updateRPCMetrics(req, resp, elapsed) + + if stmtExec := ctx.Value(util.ExecDetailsKey); stmtExec != nil { + execDetails := stmtExec.(*util.ExecDetails) + atomic.AddInt64(&execDetails.WaitKVRespDuration, int64(elapsed)) + execNetworkCollector := networkCollector{} + execNetworkCollector.onReq(req, execDetails) + execNetworkCollector.onResp(req, resp, execDetails) } - c.updateSendReqHistogramAndExecStats(req, resp, start, staleRead, detail) if spanRPC != nil && util.TraceExecDetailsEnabled(ctx) { if si := buildSpanInfoFromResp(resp); si != nil { @@ -1037,3 +975,53 @@ func buildSpanInfoFromResp(resp *tikvrpc.Response) *spanInfo { return &spanRPC } + +func deriveRPCMetrics(root prometheus.ObserverVec) *rpcMetrics { + return &rpcMetrics{ + root: root, + latGet: root.With(prometheus.Labels{metrics.LblType: tikvrpc.CmdGet.String(), metrics.LblStaleRead: "false", metrics.LblScope: "false"}), + latCop: root.With(prometheus.Labels{metrics.LblType: tikvrpc.CmdCop.String(), metrics.LblStaleRead: "false", metrics.LblScope: "false"}), + latBatchGet: root.With(prometheus.Labels{metrics.LblType: tikvrpc.CmdBatchGet.String(), metrics.LblStaleRead: "false", metrics.LblScope: "false"}), + } +} + +type rpcMetrics struct { + root prometheus.ObserverVec + + // static metrics + latGet prometheus.Observer + latCop prometheus.Observer + latBatchGet prometheus.Observer + + latOther sync.Map +} + +func (m *rpcMetrics) get(cmd tikvrpc.CmdType, stale bool, internal bool) prometheus.Observer { + if !stale && !internal { + switch cmd { + case tikvrpc.CmdGet: + return m.latGet + case tikvrpc.CmdCop: + return m.latCop + case tikvrpc.CmdBatchGet: + return m.latBatchGet + } + } + key := uint64(cmd) + if stale { + key |= 1 << 16 + } + if internal { + key |= 1 << 17 + } + lat, ok := m.latOther.Load(key) + if !ok { + lat = m.root.With(prometheus.Labels{ + metrics.LblType: cmd.String(), + metrics.LblStaleRead: strconv.FormatBool(stale), + metrics.LblScope: strconv.FormatBool(internal), + }) + m.latOther.Store(key, lat) + } + return lat.(prometheus.Observer) +} diff --git a/internal/client/metrics_collector.go b/internal/client/metrics_collector.go index 6484dc0693..977ad7e50a 100644 --- a/internal/client/metrics_collector.go +++ b/internal/client/metrics_collector.go @@ -92,7 +92,6 @@ func (s *networkCollector) onReq(req *tikvrpc.Request, details *util.ExecDetails // ignore others return } - size += req.Context.Size() isTiflashTarget := req.StoreTp == tikvrpc.TiFlash var total, crossZone *int64 if isTiflashTarget { diff --git a/metrics/metrics.go b/metrics/metrics.go index 29548afa80..82d85f8718 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -44,8 +44,7 @@ var ( TiKVTxnCmdHistogram *prometheus.HistogramVec TiKVBackoffHistogram *prometheus.HistogramVec TiKVSendReqHistogram *prometheus.HistogramVec - TiKVSendReqCounter *prometheus.CounterVec - TiKVSendReqTimeCounter *prometheus.CounterVec + TiKVSendReqSummary *prometheus.SummaryVec TiKVRPCNetLatencyHistogram *prometheus.HistogramVec TiKVCoprocessorHistogram *prometheus.HistogramVec TiKVLockResolverCounter *prometheus.CounterVec @@ -176,23 +175,14 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { ConstLabels: constLabels, }, []string{LblType, LblStore, LblStaleRead, LblScope}) - TiKVSendReqCounter = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "request_counter", - Help: "Counter of sending request with multi dimensions.", - ConstLabels: constLabels, - }, []string{LblType, LblStore, LblStaleRead, LblSource, LblScope}) - - TiKVSendReqTimeCounter = prometheus.NewCounterVec( - prometheus.CounterOpts{ + TiKVSendReqSummary = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "request_time_counter", - Help: "Counter of request time with multi dimensions.", + Name: "source_request_seconds", + Help: "Summary of sending request with multi dimensions.", ConstLabels: constLabels, - }, []string{LblType, LblStore, LblStaleRead, LblSource, LblScope}) + }, []string{LblType, LblStore, LblStaleRead, LblScope, LblSource}) TiKVRPCNetLatencyHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -884,8 +874,7 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVTxnCmdHistogram) prometheus.MustRegister(TiKVBackoffHistogram) prometheus.MustRegister(TiKVSendReqHistogram) - prometheus.MustRegister(TiKVSendReqCounter) - prometheus.MustRegister(TiKVSendReqTimeCounter) + prometheus.MustRegister(TiKVSendReqSummary) prometheus.MustRegister(TiKVRPCNetLatencyHistogram) prometheus.MustRegister(TiKVCoprocessorHistogram) prometheus.MustRegister(TiKVLockResolverCounter) From 3001a0d4e83ab19c9f4557d787e92659a34df8e7 Mon Sep 17 00:00:00 2001 From: zyguan Date: Wed, 15 Jan 2025 01:48:48 +0000 Subject: [PATCH 2/2] fix test Signed-off-by: zyguan --- internal/client/metrics_collector_test.go | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/internal/client/metrics_collector_test.go b/internal/client/metrics_collector_test.go index f5fda8035a..b1f32c73de 100644 --- a/internal/client/metrics_collector_test.go +++ b/internal/client/metrics_collector_test.go @@ -36,19 +36,13 @@ func TestNetworkCollectorOnReq(t *testing.T) { reqs := []*tikvrpc.Request{ tikvrpc.NewRequest( tikvrpc.CmdGet, - &kvrpcpb.GetRequest{Key: []byte("key")}, - kvrpcpb.Context{ - BusyThresholdMs: 50, - }, + &kvrpcpb.GetRequest{Context: &kvrpcpb.Context{BusyThresholdMs: 50}, Key: []byte("key")}, ), tikvrpc.NewReplicaReadRequest( tikvrpc.CmdGet, - &kvrpcpb.GetRequest{Key: []byte("key")}, + &kvrpcpb.GetRequest{Context: &kvrpcpb.Context{StaleRead: true}, Key: []byte("key")}, kv.ReplicaReadFollower, nil, - kvrpcpb.Context{ - StaleRead: true, - }, ), } @@ -58,12 +52,12 @@ func TestNetworkCollectorOnReq(t *testing.T) { req *tikvrpc.Request }{ { - expectUnpackedBytesSentKV: 8, + expectUnpackedBytesSentKV: 10, expectUnpackedBytesSentKVCrossZone: 0, req: reqs[0], }, { - expectUnpackedBytesSentKV: 18, + expectUnpackedBytesSentKV: 20, expectUnpackedBytesSentKVCrossZone: 0, req: reqs[1], },