diff --git a/br/pkg/lightning/backend/local/BUILD.bazel b/br/pkg/lightning/backend/local/BUILD.bazel index c2f81cbdf4195..c41f144642f92 100644 --- a/br/pkg/lightning/backend/local/BUILD.bazel +++ b/br/pkg/lightning/backend/local/BUILD.bazel @@ -51,7 +51,6 @@ go_library( "//pkg/parser/model", "//pkg/parser/mysql", "//pkg/sessionctx/variable", - "//pkg/store/pdtypes", "//pkg/table", "//pkg/tablecodec", "//pkg/types", @@ -82,6 +81,7 @@ go_library( "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_client_go_v2//util", "@com_github_tikv_pd_client//:client", + "@com_github_tikv_pd_client//http", "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//backoff", "@org_golang_google_grpc//codes", @@ -128,7 +128,6 @@ go_test( "//br/pkg/lightning/mydump", "//br/pkg/membuf", "//br/pkg/mock/mocklocal", - "//br/pkg/pdutil", "//br/pkg/restore/split", "//br/pkg/storage", "//br/pkg/utils", @@ -168,6 +167,7 @@ go_test( "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_pd_client//:client", "@com_github_tikv_pd_client//errs", + "@com_github_tikv_pd_client//http", "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//encoding", diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 1d49881163fb8..ae9f3b52ce7d6 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -56,7 +56,6 @@ import ( "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/parser/model" - "github.com/pingcap/tidb/pkg/store/pdtypes" "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/codec" @@ -64,6 +63,7 @@ import ( "github.com/tikv/client-go/v2/oracle" tikvclient "github.com/tikv/client-go/v2/tikv" pd "github.com/tikv/pd/client" + pdhttp "github.com/tikv/pd/client/http" "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -466,7 +466,8 @@ type Backend struct { engines sync.Map // sync version of map[uuid.UUID]*Engine externalEngine map[uuid.UUID]common.Engine - pdCtl *pdutil.PdController + pdCli pd.Client + pdHTTPCli pdhttp.Client splitCli split.SplitClient tikvCli *tikvclient.KVStore tls *common.TLS @@ -502,11 +503,19 @@ func openDuplicateDB(storeDir string) (*pebble.DB, error) { return pebble.Open(dbPath, opts) } +const ( + pdCliMaxMsgSize = int(128 * units.MiB) // pd.ScanRegion may return a large response +) + var ( // RunInTest indicates whether the current process is running in test. RunInTest bool // LastAlloc is the last ID allocator. - LastAlloc manual.Allocator + LastAlloc manual.Allocator + maxCallMsgSize = []grpc.DialOption{ + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(pdCliMaxMsgSize)), + grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(pdCliMaxMsgSize)), + } ) // NewBackend creates new connections to tikv. @@ -523,11 +532,20 @@ func NewBackend( } }() config.adjust() - pdCtl, err := pdutil.NewPdController(ctx, config.PDAddr, tls.TLSConfig(), tls.ToPDSecurityOption()) + pdAddrs := strings.Split(config.PDAddr, ",") + pdCli, err := pd.NewClientWithContext( + ctx, pdAddrs, tls.ToPDSecurityOption(), + pd.WithGRPCDialOptions(maxCallMsgSize...), + // If the time too short, we may scatter a region many times, because + // the interface `ScatterRegions` may time out. + pd.WithCustomTimeoutOption(60*time.Second), + pd.WithMaxErrorRetry(3), + ) if err != nil { return nil, common.NormalizeOrWrapErr(common.ErrCreatePDClient, err) } - splitCli := split.NewSplitClient(pdCtl.GetPDClient(), tls.TLSConfig(), false) + pdHTTPCli := pdhttp.NewClient("lightning", pdAddrs, pdhttp.WithTLSConfig(tls.TLSConfig())) + splitCli := split.NewSplitClient(pdCli, tls.TLSConfig(), false) shouldCreate := true if config.CheckpointEnabled { @@ -562,9 +580,9 @@ func NewBackend( var pdCliForTiKV *tikvclient.CodecPDClient if config.KeyspaceName == "" { - pdCliForTiKV = tikvclient.NewCodecPDClient(tikvclient.ModeTxn, pdCtl.GetPDClient()) + pdCliForTiKV = tikvclient.NewCodecPDClient(tikvclient.ModeTxn, pdCli) } else { - pdCliForTiKV, err = tikvclient.NewCodecPDClientWithKeyspace(tikvclient.ModeTxn, pdCtl.GetPDClient(), config.KeyspaceName) + pdCliForTiKV, err = tikvclient.NewCodecPDClientWithKeyspace(tikvclient.ModeTxn, pdCli, config.KeyspaceName) if err != nil { return nil, common.ErrCreatePDClient.Wrap(err).GenWithStackByArgs() } @@ -595,7 +613,8 @@ func NewBackend( local := &Backend{ engines: sync.Map{}, externalEngine: map[uuid.UUID]common.Engine{}, - pdCtl: pdCtl, + pdCli: pdCli, + pdHTTPCli: pdHTTPCli, splitCli: splitCli, tikvCli: tikvCli, tls: tls, @@ -635,7 +654,7 @@ func (local *Backend) TotalMemoryConsume() int64 { } func (local *Backend) checkMultiIngestSupport(ctx context.Context) error { - stores, err := local.pdCtl.GetPDClient().GetAllStores(ctx, pd.WithExcludeTombstone()) + stores, err := local.pdCli.GetAllStores(ctx, pd.WithExcludeTombstone()) if err != nil { return errors.Trace(err) } @@ -802,7 +821,8 @@ func (local *Backend) Close() { } } _ = local.tikvCli.Close() - local.pdCtl.Close() + local.pdHTTPCli.Close() + local.pdCli.Close() } // FlushEngine ensure the written data is saved successfully, to make sure no data lose after restart @@ -933,7 +953,7 @@ func (local *Backend) allocateTSIfNotExists(ctx context.Context, engine *Engine) if engine.TS > 0 { return nil } - physical, logical, err := local.pdCtl.GetPDClient().GetTS(ctx) + physical, logical, err := local.pdCli.GetTS(ctx) if err != nil { return err } @@ -953,7 +973,7 @@ func (local *Backend) CloseEngine(ctx context.Context, cfg *backend.EngineConfig if err != nil { return err } - physical, logical, err := local.pdCtl.GetPDClient().GetTS(ctx) + physical, logical, err := local.pdCli.GetTS(ctx) if err != nil { return err } @@ -1400,6 +1420,27 @@ func (*Backend) isRetryableImportTiKVError(err error) bool { return common.IsRetryableError(err) } +func checkDiskAvail(ctx context.Context, store *pdhttp.StoreInfo) error { + logger := log.FromContext(ctx) + capacity, err := units.RAMInBytes(store.Status.Capacity) + if err != nil { + logger.Warn("failed to parse capacity", + zap.String("capacity", store.Status.Capacity), zap.Error(err)) + return nil + } + available, err := units.RAMInBytes(store.Status.Available) + if err != nil { + logger.Warn("failed to parse available", + zap.String("available", store.Status.Available), zap.Error(err)) + return nil + } + ratio := available * 100 / capacity + if ratio < 10 { + return errors.Errorf("the remaining storage capacity of TiKV(%s) is less than 10%%; please increase the storage capacity of TiKV and try again", store.Store.Address) + } + return nil +} + // executeJob handles a regionJob and tries to convert it to ingested stage. // If non-retryable error occurs, it will return the error. // If retryable error occurs, it will return nil and caller should check the stage @@ -1414,26 +1455,14 @@ func (local *Backend) executeJob( }) if local.ShouldCheckTiKV { for _, peer := range job.region.Region.GetPeers() { - var ( - store *pdtypes.StoreInfo - err error - ) - for i := 0; i < maxRetryTimes; i++ { - store, err = local.pdCtl.GetStoreInfo(ctx, peer.StoreId) - if err != nil { - continue - } - if store.Status.Capacity > 0 { - // The available disk percent of TiKV - ratio := store.Status.Available * 100 / store.Status.Capacity - if ratio < 10 { - return errors.Errorf("the remaining storage capacity of TiKV(%s) is less than 10%%; please increase the storage capacity of TiKV and try again", store.Store.Address) - } - } - break - } + store, err := local.pdHTTPCli.GetStore(ctx, peer.StoreId) if err != nil { log.FromContext(ctx).Error("failed to get StoreInfo from pd http api", zap.Error(err)) + continue + } + err = checkDiskAvail(ctx, store) + if err != nil { + return err } } } @@ -1502,7 +1531,7 @@ func (local *Backend) ImportEngine( log.FromContext(ctx).Info("engine contains no kv, skip import", zap.Stringer("engine", engineUUID)) return nil } - kvRegionSplitSize, kvRegionSplitKeys, err := GetRegionSplitSizeKeys(ctx, local.pdCtl.GetPDClient(), local.tls) + kvRegionSplitSize, kvRegionSplitKeys, err := GetRegionSplitSizeKeys(ctx, local.pdCli, local.tls) if err == nil { if kvRegionSplitSize > regionSplitSize { regionSplitSize = kvRegionSplitSize @@ -1532,7 +1561,7 @@ func (local *Backend) ImportEngine( if len(regionRanges[len(regionRanges)-1].End) > 0 { endKey = codec.EncodeBytes(nil, regionRanges[len(regionRanges)-1].End) } - done, err := local.pdCtl.PauseSchedulersByKeyRange(subCtx, startKey, endKey) + done, err := pdutil.PauseSchedulersByKeyRange(subCtx, local.pdHTTPCli, startKey, endKey) if err != nil { return errors.Trace(err) } @@ -1582,7 +1611,7 @@ func (local *Backend) ImportEngine( // GetRegionSplitSizeKeys gets the region split size and keys from PD. func (local *Backend) GetRegionSplitSizeKeys(ctx context.Context) (finalSize int64, finalKeys int64, err error) { - return GetRegionSplitSizeKeys(ctx, local.pdCtl.GetPDClient(), local.tls) + return GetRegionSplitSizeKeys(ctx, local.pdCli, local.tls) } // expose these variables to unit test. @@ -1865,7 +1894,7 @@ func (local *Backend) LocalWriter(_ context.Context, cfg *backend.LocalWriterCon // This function will spawn a goroutine to keep switch mode periodically until the context is done. // The return done channel is used to notify the caller that the background goroutine is exited. func (local *Backend) SwitchModeByKeyRanges(ctx context.Context, ranges []common.Range) (<-chan struct{}, error) { - switcher := NewTiKVModeSwitcher(local.tls, local.pdCtl.GetPDClient(), log.FromContext(ctx).Logger) + switcher := NewTiKVModeSwitcher(local.tls, local.pdCli, log.FromContext(ctx).Logger) done := make(chan struct{}) keyRanges := make([]*sst.Range, 0, len(ranges)) @@ -1961,11 +1990,6 @@ func (local *Backend) EngineFileSizes() (res []backend.EngineFileSize) { return } -// GetPDClient returns the PD client. -func (local *Backend) GetPDClient() pd.Client { - return local.pdCtl.GetPDClient() -} - var getSplitConfFromStoreFunc = getSplitConfFromStore // return region split size, region split keys, error diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index b6347d5bc8f3f..780d3c621dcd5 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -46,7 +46,6 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/membuf" - "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/utils" @@ -60,6 +59,7 @@ import ( "github.com/pingcap/tidb/pkg/util/hack" "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/http" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/encoding" @@ -1058,11 +1058,9 @@ func TestMultiIngest(t *testing.T) { err: testCase.err, multiIngestCheckFn: testCase.multiIngestSupport, } - pdCtl := &pdutil.PdController{} - pdCtl.SetPDClient(&mockPdClient{stores: stores}) local := &Backend{ - pdCtl: pdCtl, + pdCli: &mockPdClient{stores: stores}, importClientFactory: &mockImportClientFactory{ stores: allStores, createClientFn: func(store *metapb.Store) sst.ImportSSTClient { @@ -2308,8 +2306,6 @@ func TestExternalEngine(t *testing.T) { TotalKVCount: int64(config.SplitRegionKeys) + 1, } engineUUID := uuid.New() - pdCtl := &pdutil.PdController{} - pdCtl.SetPDClient(&mockPdClient{}) local := &Backend{ BackendConfig: BackendConfig{ WorkerConcurrency: 2, @@ -2317,7 +2313,7 @@ func TestExternalEngine(t *testing.T) { splitCli: initTestSplitClient([][]byte{ keys[0], keys[50], endKey, }, nil), - pdCtl: pdCtl, + pdCli: &mockPdClient{}, externalEngine: map[uuid.UUID]common.Engine{}, keyAdapter: common.NoopKeyAdapter{}, } @@ -2384,3 +2380,10 @@ func TestGetExternalEngineKVStatistics(t *testing.T) { require.Zero(t, size) require.Zero(t, count) } + +func TestCheckDiskAvail(t *testing.T) { + store := &http.StoreInfo{Status: http.StoreStatus{Capacity: "100 GB", Available: "50 GB"}} + ctx := context.Background() + err := checkDiskAvail(ctx, store) + require.NoError(t, err) +} diff --git a/br/pkg/pdutil/pd.go b/br/pkg/pdutil/pd.go index 27395e6adfcce..701a82ab9de56 100644 --- a/br/pkg/pdutil/pd.go +++ b/br/pkg/pdutil/pd.go @@ -959,67 +959,26 @@ type KeyRangeRule struct { EndKeyHex string `json:"end_key"` // hex format end key, for marshal/unmarshal } -// CreateOrUpdateRegionLabelRule creates or updates a region label rule. -func (p *PdController) CreateOrUpdateRegionLabelRule(ctx context.Context, rule LabelRule) error { - reqData, err := json.Marshal(&rule) - if err != nil { - panic(err) - } - var lastErr error - addrs := p.getAllPDAddrs() - for i, addr := range addrs { - _, lastErr = pdRequest(ctx, addr, pdhttp.RegionLabelRule, - p.cli, http.MethodPost, reqData) - if lastErr == nil { - return nil - } - if berrors.IsContextCanceled(lastErr) { - return errors.Trace(lastErr) - } - - if i < len(addrs) { - log.Warn("failed to create or update region label rule, will try next pd address", - zap.Error(lastErr), zap.String("pdAddr", addr)) - } - } - return errors.Trace(lastErr) -} - -// DeleteRegionLabelRule deletes a region label rule. -func (p *PdController) DeleteRegionLabelRule(ctx context.Context, ruleID string) error { - var lastErr error - addrs := p.getAllPDAddrs() - for i, addr := range addrs { - _, lastErr = pdRequest(ctx, addr, fmt.Sprintf("%s/%s", pdhttp.RegionLabelRule, ruleID), - p.cli, http.MethodDelete, nil) - if lastErr == nil { - return nil - } - if berrors.IsContextCanceled(lastErr) { - return errors.Trace(lastErr) - } - - if i < len(addrs) { - log.Warn("failed to delete region label rule, will try next pd address", - zap.Error(lastErr), zap.String("pdAddr", addr)) - } - } - return errors.Trace(lastErr) -} - // PauseSchedulersByKeyRange will pause schedulers for regions in the specific key range. // This function will spawn a goroutine to keep pausing schedulers periodically until the context is done. // The return done channel is used to notify the caller that the background goroutine is exited. -func (p *PdController) PauseSchedulersByKeyRange(ctx context.Context, - startKey, endKey []byte) (done <-chan struct{}, err error) { - return p.pauseSchedulerByKeyRangeWithTTL(ctx, startKey, endKey, pauseTimeout) +func PauseSchedulersByKeyRange( + ctx context.Context, + pdHTTPCli pdhttp.Client, + startKey, endKey []byte, +) (done <-chan struct{}, err error) { + return pauseSchedulerByKeyRangeWithTTL(ctx, pdHTTPCli, startKey, endKey, pauseTimeout) } -func (p *PdController) pauseSchedulerByKeyRangeWithTTL(ctx context.Context, - startKey, endKey []byte, ttl time.Duration) (_done <-chan struct{}, err error) { - rule := LabelRule{ +func pauseSchedulerByKeyRangeWithTTL( + ctx context.Context, + pdHTTPCli pdhttp.Client, + startKey, endKey []byte, + ttl time.Duration, +) (<-chan struct{}, error) { + rule := &pdhttp.LabelRule{ ID: uuid.New().String(), - Labels: []RegionLabel{{ + Labels: []pdhttp.RegionLabel{{ Key: "schedule", Value: "deny", TTL: ttl.String(), @@ -1033,7 +992,8 @@ func (p *PdController) pauseSchedulerByKeyRangeWithTTL(ctx context.Context, }}, } done := make(chan struct{}) - if err := p.CreateOrUpdateRegionLabelRule(ctx, rule); err != nil { + + if err := pdHTTPCli.SetRegionLabelRule(ctx, rule); err != nil { close(done) return nil, errors.Trace(err) } @@ -1046,7 +1006,7 @@ func (p *PdController) pauseSchedulerByKeyRangeWithTTL(ctx context.Context, for { select { case <-ticker.C: - if err := p.CreateOrUpdateRegionLabelRule(ctx, rule); err != nil { + if err := pdHTTPCli.SetRegionLabelRule(ctx, rule); err != nil { if berrors.IsContextCanceled(err) { break loop } @@ -1062,7 +1022,8 @@ func (p *PdController) pauseSchedulerByKeyRangeWithTTL(ctx context.Context, defer cancel() // Set ttl to 0 to remove the rule. rule.Labels[0].TTL = time.Duration(0).String() - if err := p.DeleteRegionLabelRule(recoverCtx, rule.ID); err != nil { + deleteRule := &pdhttp.LabelRulePatch{DeleteRules: []string{rule.ID}} + if err := pdHTTPCli.PatchRegionLabelRules(recoverCtx, deleteRule); err != nil { log.Warn("failed to delete region label rule, the rule will be removed after ttl expires", zap.String("rule-id", rule.ID), zap.Duration("ttl", ttl), zap.Error(err)) } diff --git a/br/pkg/pdutil/pd_serial_test.go b/br/pkg/pdutil/pd_serial_test.go index f76f61f6dddb9..cc2498084c686 100644 --- a/br/pkg/pdutil/pd_serial_test.go +++ b/br/pkg/pdutil/pd_serial_test.go @@ -12,7 +12,6 @@ import ( "net/http" "net/http/httptest" "net/url" - "strings" "sync" "testing" "time" @@ -23,7 +22,7 @@ import ( "github.com/pingcap/tidb/pkg/store/pdtypes" "github.com/pingcap/tidb/pkg/util/codec" "github.com/stretchr/testify/require" - pd "github.com/tikv/pd/client/http" + pdhttp "github.com/tikv/pd/client/http" ) func TestScheduler(t *testing.T) { @@ -273,7 +272,7 @@ func TestStoreInfo(t *testing.T) { _ context.Context, addr string, prefix string, _ *http.Client, _ string, _ []byte, ) ([]byte, error) { require.Equal(t, - fmt.Sprintf("http://mock%s", pd.StoreByID(1)), + fmt.Sprintf("http://mock%s", pdhttp.StoreByID(1)), fmt.Sprintf("%s%s", addr, prefix)) ret, err := json.Marshal(storeInfo) require.NoError(t, err) @@ -306,37 +305,43 @@ func TestPauseSchedulersByKeyRange(t *testing.T) { if deleted { return } - if r.Method == http.MethodDelete { - ruleID := strings.TrimPrefix(r.URL.Path, pd.RegionLabelRule+"/") - delete(labelExpires, ruleID) + switch r.Method { + case http.MethodPatch: + var patch pdhttp.LabelRulePatch + err := json.NewDecoder(r.Body).Decode(&patch) + require.NoError(t, err) + require.Len(t, patch.SetRules, 0) + require.Len(t, patch.DeleteRules, 1) + delete(labelExpires, patch.DeleteRules[0]) deleted = true - return - } - var labelRule LabelRule - err := json.NewDecoder(r.Body).Decode(&labelRule) - require.NoError(t, err) - require.Len(t, labelRule.Labels, 1) - regionLabel := labelRule.Labels[0] - require.Equal(t, "schedule", regionLabel.Key) - require.Equal(t, "deny", regionLabel.Value) - reqTTL, err := time.ParseDuration(regionLabel.TTL) - require.NoError(t, err) - if reqTTL == 0 { - delete(labelExpires, labelRule.ID) - } else { - require.Equal(t, ttl, reqTTL) - if expire, ok := labelExpires[labelRule.ID]; ok { - require.True(t, expire.After(time.Now()), "should not expire before now") + case http.MethodPost: + var labelRule LabelRule + err := json.NewDecoder(r.Body).Decode(&labelRule) + require.NoError(t, err) + require.Len(t, labelRule.Labels, 1) + regionLabel := labelRule.Labels[0] + require.Equal(t, "schedule", regionLabel.Key) + require.Equal(t, "deny", regionLabel.Value) + reqTTL, err := time.ParseDuration(regionLabel.TTL) + require.NoError(t, err) + if reqTTL == 0 { + delete(labelExpires, labelRule.ID) + } else { + require.Equal(t, ttl, reqTTL) + if expire, ok := labelExpires[labelRule.ID]; ok { + require.True(t, expire.After(time.Now()), "should not expire before now") + } + labelExpires[labelRule.ID] = time.Now().Add(ttl) } - labelExpires[labelRule.ID] = time.Now().Add(ttl) } })) defer httpSrv.Close() - pdController := &PdController{addrs: []string{httpSrv.URL}, cli: http.DefaultClient} + pdHTTPCli := pdhttp.NewClient("test", []string{httpSrv.URL}) + defer pdHTTPCli.Close() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - done, err := pdController.pauseSchedulerByKeyRangeWithTTL(ctx, []byte{0, 0, 0, 0}, []byte{0xff, 0xff, 0xff, 0xff}, ttl) + done, err := pauseSchedulerByKeyRangeWithTTL(ctx, pdHTTPCli, []byte{0, 0, 0, 0}, []byte{0xff, 0xff, 0xff, 0xff}, ttl) require.NoError(t, err) time.Sleep(ttl * 3) cancel()