Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: refactor intercepted client #1557

Merged
merged 2 commits into from
Jan 16, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 73 additions & 13 deletions internal/client/client_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,49 @@ func NewInterceptedClient(client Client) Client {
return interceptedClient{client}
}

func (r interceptedClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
// Build the resource control interceptor.
var finalInterceptor interceptor.RPCInterceptor = buildResourceControlInterceptor(ctx, req)
// Chain the interceptors if there are multiple interceptors.
if it := interceptor.GetRPCInterceptorFromCtx(ctx); it != nil {
if finalInterceptor != nil {
finalInterceptor = interceptor.ChainRPCInterceptors(finalInterceptor, it)
} else {
finalInterceptor = it
func (r interceptedClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (resp *tikvrpc.Response, err error) {
var ruDetails *util.RUDetails

resourceGroupName, resourceControlInterceptor, reqInfo := getResourceControlInfo(ctx, req)
if resourceControlInterceptor != nil {
consumption, penalty, waitDuration, priority, err := resourceControlInterceptor.OnRequestWait(ctx, resourceGroupName, reqInfo)
if err != nil {
return nil, err
}
req.GetResourceControlContext().Penalty = penalty
// override request priority with resource group priority if it's not set.
// Get the priority at tikv side has some performance issue, so we pass it
// at client side. See: https://github.com/tikv/tikv/issues/15994 for more details.
if req.GetResourceControlContext().OverridePriority == 0 {
req.GetResourceControlContext().OverridePriority = uint64(priority)
}

if val := ctx.Value(util.RUDetailsCtxKey); val != nil {
ruDetails = val.(*util.RUDetails)
ruDetails.Update(consumption, waitDuration)
}
}
if finalInterceptor != nil {
return finalInterceptor.Wrap(func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {

if ctxInterceptor := interceptor.GetRPCInterceptorFromCtx(ctx); ctxInterceptor == nil {
resp, err = r.Client.SendRequest(ctx, addr, req, timeout)
} else {
resp, err = ctxInterceptor.Wrap(func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
return r.Client.SendRequest(ctx, target, req, timeout)
})(addr, req)
}
return r.Client.SendRequest(ctx, addr, req, timeout)

if resourceControlInterceptor != nil && resp != nil {
respInfo := resourcecontrol.MakeResponseInfo(resp)
consumption, waitDuration, err := resourceControlInterceptor.OnResponseWait(ctx, resourceGroupName, reqInfo, respInfo)
if err != nil {
return nil, err
}
if ruDetails != nil {
ruDetails.Update(consumption, waitDuration)
}
}

return resp, err
}

var (
Expand All @@ -67,9 +93,43 @@ var (
ResourceControlInterceptor atomic.Pointer[resourceControlClient.ResourceGroupKVInterceptor]
)

func getResourceControlInfo(ctx context.Context, req *tikvrpc.Request) (
string,
resourceControlClient.ResourceGroupKVInterceptor,
*resourcecontrol.RequestInfo,
) {
resourceGroupName := req.GetResourceControlContext().GetResourceGroupName()
if len(resourceGroupName) == 0 {
return "", nil, nil
}
if !ResourceControlSwitch.Load().(bool) {
return "", nil, nil
}
rcInterceptor := ResourceControlInterceptor.Load()
if rcInterceptor == nil {
return "", nil, nil
}
// bypass some internal requests and it's may influence user experience. For example, the
// request of `alter user password`, totally bypasses the resource control. it's not cost
// many resources, but it's may influence the user experience.
// If the resource group has background jobs, we should not record consumption and wait for it.
// Background jobs will record and report in tikv side.
resourceControlInterceptor := *rcInterceptor
if resourceControlInterceptor.IsBackgroundRequest(ctx, resourceGroupName, req.RequestSource) {
return "", nil, nil
}
reqInfo := resourcecontrol.MakeRequestInfo(req)
if reqInfo.Bypass() {
return "", nil, nil
}
return resourceGroupName, resourceControlInterceptor, reqInfo
}

// buildResourceControlInterceptor builds a resource control interceptor with
// the given resource group name.
func buildResourceControlInterceptor(
//
// Deprecated: embedded in `interceptedClient.SendRequest` directly to reduce overhead.
func buildResourceControlInterceptor( //nolint:unused
ctx context.Context,
req *tikvrpc.Request,
) interceptor.RPCInterceptor {
Expand Down
Loading