Skip to content

Commit

Permalink
metrics: optimize rpc latency related metrics
Browse files Browse the repository at this point in the history
Signed-off-by: zyguan <[email protected]>
  • Loading branch information
zyguan committed Jan 15, 2025
1 parent a348c17 commit 593cc0b
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 135 deletions.
220 changes: 104 additions & 116 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,13 @@ type connArray struct {
done chan struct{}

monitor *connMonitor

metrics struct {
rpcLatHist *rpcMetrics
rpcSrcLatSum sync.Map
rpcNetLatExternal prometheus.Observer
rpcNetLatInternal prometheus.Observer
}
}

func newConnArray(maxSize uint, addr string, ver uint64, security config.Security,
Expand All @@ -181,6 +188,9 @@ func newConnArray(maxSize uint, addr string, ver uint64, security config.Securit
dialTimeout: dialTimeout,
monitor: m,
}
a.metrics.rpcLatHist = deriveRPCMetrics(metrics.TiKVSendReqHistogram.MustCurryWith(prometheus.Labels{metrics.LblStore: addr}))
a.metrics.rpcNetLatExternal = metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(addr, "false")
a.metrics.rpcNetLatInternal = metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(addr, "true")
if err := a.Init(addr, security, idleNotify, enableBatch, eventListener, opts...); err != nil {
return nil, err
}
Expand Down Expand Up @@ -363,7 +373,7 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint
dialTimeout: a.dialTimeout,
tryLock: tryLock{sync.NewCond(new(sync.Mutex)), false},
eventListener: eventListener,
metrics: &a.metrics,
metrics: &a.batchConn.metrics,
}
batchClient.maxConcurrencyRequestLimit.Store(cfg.TiKVClient.MaxConcurrencyRequestLimit)
a.batchCommandsClients = append(a.batchCommandsClients, batchClient)
Expand Down Expand Up @@ -400,6 +410,40 @@ func (a *connArray) Close() {
close(a.done)
}

func (a *connArray) updateRPCMetrics(req *tikvrpc.Request, resp *tikvrpc.Response, latency time.Duration) {
seconds := latency.Seconds()
stale := req.GetStaleRead()
source := req.GetRequestSource()
internal := util.IsInternalRequest(req.GetRequestSource())

a.metrics.rpcLatHist.get(req.Type, stale, internal).Observe(seconds)

srcLatSum, ok := a.metrics.rpcSrcLatSum.Load(source)
if !ok {
srcLatSum = deriveRPCMetrics(metrics.TiKVSendReqSummary.MustCurryWith(
prometheus.Labels{metrics.LblStore: a.target, metrics.LblSource: source}))
a.metrics.rpcSrcLatSum.Store(source, srcLatSum)
}
srcLatSum.(*rpcMetrics).get(req.Type, stale, internal).Observe(seconds)

if execDetail := resp.GetExecDetailsV2(); execDetail != nil {
var totalRpcWallTimeNs uint64
if execDetail.TimeDetailV2 != nil {
totalRpcWallTimeNs = execDetail.TimeDetailV2.TotalRpcWallTimeNs
} else if execDetail.TimeDetail != nil {
totalRpcWallTimeNs = execDetail.TimeDetail.TotalRpcWallTimeNs
}
if totalRpcWallTimeNs > 0 {
lat := latency - time.Duration(totalRpcWallTimeNs)
if internal {
a.metrics.rpcNetLatInternal.Observe(lat.Seconds())
} else {
a.metrics.rpcNetLatExternal.Observe(lat.Seconds())
}
}
}
}

type option struct {
gRPCDialOptions []grpc.DialOption
security config.Security
Expand Down Expand Up @@ -542,114 +586,6 @@ func (c *RPCClient) closeConns() {
c.Unlock()
}

var (
sendReqHistCache sync.Map
sendReqCounterCache sync.Map
rpcNetLatencyHistCache sync.Map
)

type sendReqHistCacheKey struct {
tp tikvrpc.CmdType
id uint64
staleRad bool
isInternal bool
}

type sendReqCounterCacheKey struct {
sendReqHistCacheKey
requestSource string
}

type rpcNetLatencyCacheKey struct {
storeID uint64
isInternal bool
}

type sendReqCounterCacheValue struct {
counter prometheus.Counter
timeCounter prometheus.Counter
}

func (c *RPCClient) updateSendReqHistogramAndExecStats(req *tikvrpc.Request, resp *tikvrpc.Response, start time.Time, staleRead bool, execDetails *util.ExecDetails) {
elapsed := time.Since(start)
secs := elapsed.Seconds()
storeID := req.Context.GetPeer().GetStoreId()
isInternal := util.IsInternalRequest(req.GetRequestSource())

histKey := sendReqHistCacheKey{
req.Type,
storeID,
staleRead,
isInternal,
}
counterKey := sendReqCounterCacheKey{
histKey,
req.GetRequestSource(),
}

reqType := req.Type.String()
var storeIDStr string

hist, ok := sendReqHistCache.Load(histKey)
if !ok {
if len(storeIDStr) == 0 {
storeIDStr = strconv.FormatUint(storeID, 10)
}
hist = metrics.TiKVSendReqHistogram.WithLabelValues(reqType, storeIDStr,
strconv.FormatBool(staleRead), strconv.FormatBool(isInternal))
sendReqHistCache.Store(histKey, hist)
}
counter, ok := sendReqCounterCache.Load(counterKey)
if !ok {
if len(storeIDStr) == 0 {
storeIDStr = strconv.FormatUint(storeID, 10)
}
counter = sendReqCounterCacheValue{
metrics.TiKVSendReqCounter.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead),
counterKey.requestSource, strconv.FormatBool(isInternal)),
metrics.TiKVSendReqTimeCounter.WithLabelValues(reqType, storeIDStr,
strconv.FormatBool(staleRead), counterKey.requestSource, strconv.FormatBool(isInternal)),
}
sendReqCounterCache.Store(counterKey, counter)
}

hist.(prometheus.Observer).Observe(secs)
counter.(sendReqCounterCacheValue).counter.Inc()
counter.(sendReqCounterCacheValue).timeCounter.Add(secs)

if execDetail := resp.GetExecDetailsV2(); execDetail != nil {
var totalRpcWallTimeNs uint64
if execDetail.TimeDetailV2 != nil {
totalRpcWallTimeNs = execDetail.TimeDetailV2.TotalRpcWallTimeNs
} else if execDetail.TimeDetail != nil {
totalRpcWallTimeNs = execDetail.TimeDetail.TotalRpcWallTimeNs
}
if totalRpcWallTimeNs > 0 {
cacheKey := rpcNetLatencyCacheKey{
storeID,
isInternal,
}
latHist, ok := rpcNetLatencyHistCache.Load(cacheKey)
if !ok {
if len(storeIDStr) == 0 {
storeIDStr = strconv.FormatUint(storeID, 10)
}
latHist = metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(storeIDStr, strconv.FormatBool(isInternal))
rpcNetLatencyHistCache.Store(cacheKey, latHist)
}
latency := elapsed - time.Duration(totalRpcWallTimeNs)*time.Nanosecond
latHist.(prometheus.Observer).Observe(latency.Seconds())
}
}

execNetworkCollector := &networkCollector{}
// update execDetails
if execDetails != nil {
execNetworkCollector.onReq(req, execDetails)
execNetworkCollector.onResp(req, resp, execDetails)
}
}

func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (resp *tikvrpc.Response, err error) {
var spanRPC opentracing.Span
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
Expand All @@ -675,15 +611,17 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R
}

start := time.Now()
staleRead := req.GetStaleRead()
defer func() {
stmtExec := ctx.Value(util.ExecDetailsKey)
var detail *util.ExecDetails
if stmtExec != nil {
detail = stmtExec.(*util.ExecDetails)
atomic.AddInt64(&detail.WaitKVRespDuration, int64(time.Since(start)))
elapsed := time.Since(start)
connArray.updateRPCMetrics(req, resp, elapsed)

if stmtExec := ctx.Value(util.ExecDetailsKey); stmtExec != nil {
execDetails := stmtExec.(*util.ExecDetails)
atomic.AddInt64(&execDetails.WaitKVRespDuration, int64(elapsed))
execNetworkCollector := networkCollector{}
execNetworkCollector.onReq(req, execDetails)
execNetworkCollector.onResp(req, resp, execDetails)
}
c.updateSendReqHistogramAndExecStats(req, resp, start, staleRead, detail)

if spanRPC != nil && util.TraceExecDetailsEnabled(ctx) {
if si := buildSpanInfoFromResp(resp); si != nil {
Expand Down Expand Up @@ -1037,3 +975,53 @@ func buildSpanInfoFromResp(resp *tikvrpc.Response) *spanInfo {

return &spanRPC
}

func deriveRPCMetrics(root prometheus.ObserverVec) *rpcMetrics {
return &rpcMetrics{
root: root,
latGet: root.With(prometheus.Labels{metrics.LblType: tikvrpc.CmdGet.String(), metrics.LblStaleRead: "false", metrics.LblScope: "false"}),
latCop: root.With(prometheus.Labels{metrics.LblType: tikvrpc.CmdCop.String(), metrics.LblStaleRead: "false", metrics.LblScope: "false"}),
latBatchGet: root.With(prometheus.Labels{metrics.LblType: tikvrpc.CmdBatchGet.String(), metrics.LblStaleRead: "false", metrics.LblScope: "false"}),
}
}

type rpcMetrics struct {
root prometheus.ObserverVec

// static metrics
latGet prometheus.Observer
latCop prometheus.Observer
latBatchGet prometheus.Observer

latOther sync.Map
}

func (m *rpcMetrics) get(cmd tikvrpc.CmdType, stale bool, internal bool) prometheus.Observer {
if !stale && !internal {
switch cmd {
case tikvrpc.CmdGet:
return m.latGet
case tikvrpc.CmdCop:
return m.latCop
case tikvrpc.CmdBatchGet:
return m.latBatchGet
}
}
key := uint64(cmd)
if stale {
key |= 1 << 16
}
if internal {
key |= 1 << 17
}
lat, ok := m.latOther.Load(key)
if !ok {
lat = m.root.With(prometheus.Labels{
metrics.LblType: cmd.String(),
metrics.LblStaleRead: strconv.FormatBool(stale),
metrics.LblScope: strconv.FormatBool(internal),
})
m.latOther.Store(key, lat)
}
return lat.(prometheus.Observer)
}
1 change: 0 additions & 1 deletion internal/client/metrics_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ func (s *networkCollector) onReq(req *tikvrpc.Request, details *util.ExecDetails
// ignore others
return
}
size += req.Context.Size()
isTiflashTarget := req.StoreTp == tikvrpc.TiFlash
var total, crossZone *int64
if isTiflashTarget {
Expand Down
25 changes: 7 additions & 18 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ var (
TiKVTxnCmdHistogram *prometheus.HistogramVec
TiKVBackoffHistogram *prometheus.HistogramVec
TiKVSendReqHistogram *prometheus.HistogramVec
TiKVSendReqCounter *prometheus.CounterVec
TiKVSendReqTimeCounter *prometheus.CounterVec
TiKVSendReqSummary *prometheus.SummaryVec
TiKVRPCNetLatencyHistogram *prometheus.HistogramVec
TiKVCoprocessorHistogram *prometheus.HistogramVec
TiKVLockResolverCounter *prometheus.CounterVec
Expand Down Expand Up @@ -176,23 +175,14 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) {
ConstLabels: constLabels,
}, []string{LblType, LblStore, LblStaleRead, LblScope})

TiKVSendReqCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "request_counter",
Help: "Counter of sending request with multi dimensions.",
ConstLabels: constLabels,
}, []string{LblType, LblStore, LblStaleRead, LblSource, LblScope})

TiKVSendReqTimeCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
TiKVSendReqSummary = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "request_time_counter",
Help: "Counter of request time with multi dimensions.",
Name: "source_request_seconds",
Help: "Summary of sending request with multi dimensions.",
ConstLabels: constLabels,
}, []string{LblType, LblStore, LblStaleRead, LblSource, LblScope})
}, []string{LblType, LblStore, LblStaleRead, LblScope, LblSource})

TiKVRPCNetLatencyHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Expand Down Expand Up @@ -884,8 +874,7 @@ func RegisterMetrics() {
prometheus.MustRegister(TiKVTxnCmdHistogram)
prometheus.MustRegister(TiKVBackoffHistogram)
prometheus.MustRegister(TiKVSendReqHistogram)
prometheus.MustRegister(TiKVSendReqCounter)
prometheus.MustRegister(TiKVSendReqTimeCounter)
prometheus.MustRegister(TiKVSendReqSummary)
prometheus.MustRegister(TiKVRPCNetLatencyHistogram)
prometheus.MustRegister(TiKVCoprocessorHistogram)
prometheus.MustRegister(TiKVLockResolverCounter)
Expand Down

0 comments on commit 593cc0b

Please sign in to comment.