Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
Merge branch 'release-1.0' into updateAnsible
Browse files Browse the repository at this point in the history
  • Loading branch information
GMHDBJD authored Jun 21, 2021
2 parents ddbe95f + ac09bd4 commit 764f702
Show file tree
Hide file tree
Showing 8 changed files with 218 additions and 104 deletions.
133 changes: 95 additions & 38 deletions loader/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type CheckPoint interface {
// GetAllRestoringFileInfo return all restoring files position
GetAllRestoringFileInfo() map[string][]int64

// IsTableCreated checks if db / table was created. set `table` to "" when check db
IsTableCreated(db, table string) bool

// IsTableFinished query if table has finished
IsTableFinished(db, table string) bool

Expand All @@ -66,6 +69,13 @@ type CheckPoint interface {

// GenSQL generates sql to update checkpoint to DB
GenSQL(filename string, offset int64) string

// UpdateOffset keeps `cp.restoringFiles` in memory same with checkpoint in DB,
// should be called after update checkpoint in DB
UpdateOffset(filename string, offset int64)

// AllFinished returns `true` when all restoring job are finished
AllFinished() bool
}

// RemoteCheckPoint implements CheckPoint by saving status in remote database system, mostly in TiDB.
Expand All @@ -80,7 +90,10 @@ type RemoteCheckPoint struct {
id string
schema string
tableName string // tableName contains schema name
restoringFiles map[string]map[string]FilePosSet
restoringFiles struct {
sync.RWMutex
pos map[string]map[string]FilePosSet // schema -> table -> FilePosSet(filename -> [cur, end])
}
finishedTables map[string]struct{}
logCtx *tcontext.Context
}
Expand All @@ -95,12 +108,12 @@ func newRemoteCheckPoint(tctx *tcontext.Context, cfg *config.SubTaskConfig, id s
db: db,
conn: dbConns[0],
id: id,
restoringFiles: make(map[string]map[string]FilePosSet),
finishedTables: make(map[string]struct{}),
schema: dbutil.ColumnName(cfg.MetaSchema),
tableName: dbutil.TableName(cfg.MetaSchema, fmt.Sprintf("%s_loader_checkpoint", cfg.Name)),
logCtx: tcontext.Background().WithLogger(tctx.L().WithFields(zap.String("component", "remote checkpoint"))),
}
cp.restoringFiles.pos = make(map[string]map[string]FilePosSet)

err = cp.prepare(tctx)
if err != nil {
Expand Down Expand Up @@ -174,17 +187,19 @@ func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context) error {
endPos int64
)

cp.restoringFiles = make(map[string]map[string]FilePosSet) // reset to empty
cp.restoringFiles.Lock()
defer cp.restoringFiles.Unlock()
cp.restoringFiles.pos = make(map[string]map[string]FilePosSet) // reset to empty
for rows.Next() {
err := rows.Scan(&filename, &schema, &table, &offset, &endPos)
if err != nil {
return terror.WithScope(terror.DBErrorAdapt(err, terror.ErrDBDriverError), terror.ScopeDownstream)
}

if _, ok := cp.restoringFiles[schema]; !ok {
cp.restoringFiles[schema] = make(map[string]FilePosSet)
if _, ok := cp.restoringFiles.pos[schema]; !ok {
cp.restoringFiles.pos[schema] = make(map[string]FilePosSet)
}
tables := cp.restoringFiles[schema]
tables := cp.restoringFiles.pos[schema]
if _, ok := tables[table]; !ok {
tables[table] = make(map[string][]int64)
}
Expand All @@ -197,27 +212,53 @@ func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context) error {

// GetRestoringFileInfo implements CheckPoint.GetRestoringFileInfo
func (cp *RemoteCheckPoint) GetRestoringFileInfo(db, table string) map[string][]int64 {
if tables, ok := cp.restoringFiles[db]; ok {
cp.restoringFiles.RLock()
defer cp.restoringFiles.RUnlock()
results := make(map[string][]int64)
if tables, ok := cp.restoringFiles.pos[db]; ok {
if restoringFiles, ok := tables[table]; ok {
return restoringFiles
// make a copy of restoringFiles, and its slice value
for k, v := range restoringFiles {
results[k] = make([]int64, len(v))
copy(results[k], v)
}
return results
}
}
return make(map[string][]int64)
return results
}

// GetAllRestoringFileInfo implements CheckPoint.GetAllRestoringFileInfo
func (cp *RemoteCheckPoint) GetAllRestoringFileInfo() map[string][]int64 {
cp.restoringFiles.RLock()
defer cp.restoringFiles.RUnlock()
results := make(map[string][]int64)
for _, tables := range cp.restoringFiles {
for _, tables := range cp.restoringFiles.pos {
for _, files := range tables {
for file, pos := range files {
results[file] = pos
results[file] = make([]int64, len(pos))
copy(results[file], pos)
}
}
}
return results
}

// IsTableCreated implements CheckPoint.IsTableCreated
func (cp *RemoteCheckPoint) IsTableCreated(db, table string) bool {
cp.restoringFiles.RLock()
defer cp.restoringFiles.RUnlock()
tables, ok := cp.restoringFiles.pos[db]
if !ok {
return false
}
if table == "" {
return true
}
_, ok = tables[table]
return ok
}

// IsTableFinished implements CheckPoint.IsTableFinished
func (cp *RemoteCheckPoint) IsTableFinished(db, table string) bool {
key := strings.Join([]string{db, table}, ".")
Expand All @@ -229,8 +270,10 @@ func (cp *RemoteCheckPoint) IsTableFinished(db, table string) bool {

// CalcProgress implements CheckPoint.CalcProgress
func (cp *RemoteCheckPoint) CalcProgress(allFiles map[string]Tables2DataFiles) error {
cp.restoringFiles.RLock()
defer cp.restoringFiles.RUnlock()
cp.finishedTables = make(map[string]struct{}) // reset to empty
for db, tables := range cp.restoringFiles {
for db, tables := range cp.restoringFiles.pos {
dbTables, ok := allFiles[db]
if !ok {
return terror.ErrCheckpointDBNotExistInFile.Generate(db)
Expand All @@ -257,7 +300,7 @@ func (cp *RemoteCheckPoint) CalcProgress(allFiles map[string]Tables2DataFiles) e
}
}

cp.logCtx.L().Info("calculate checkpoint finished.", zap.Reflect("finished tables", cp.finishedTables))
cp.logCtx.L().Info("calculate checkpoint finished.", zap.Any("finished tables", cp.finishedTables))
return nil
}

Expand All @@ -274,20 +317,27 @@ func (cp *RemoteCheckPoint) allFilesFinished(files map[string][]int64) bool {
return true
}

// AllFinished implements CheckPoint.AllFinished
func (cp *RemoteCheckPoint) AllFinished() bool {
cp.restoringFiles.RLock()
defer cp.restoringFiles.RUnlock()
for _, tables := range cp.restoringFiles.pos {
for _, restoringFiles := range tables {
if !cp.allFilesFinished(restoringFiles) {
return false
}
}
}
return true
}

// Init implements CheckPoint.Init
func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context, filename string, endPos int64) error {
idx := strings.LastIndex(filename, ".sql")
if idx < 0 {
return terror.ErrCheckpointInvalidTableFile.Generate(filename)
}
fname := filename[:idx]
fields := strings.Split(fname, ".")
if len(fields) != 2 && len(fields) != 3 {
// fields[0] -> db name, fields[1] -> table name
schema, table, err := getDBAndTableFromFilename(filename)
if err != nil {
return terror.ErrCheckpointInvalidTableFile.Generate(filename)
}

// fields[0] -> db name, fields[1] -> table name
schema, table := fields[0], fields[1]
sql2 := fmt.Sprintf("INSERT INTO %s (`id`, `filename`, `cp_schema`, `cp_table`, `offset`, `end_pos`) VALUES(?,?,?,?,?,?)", cp.tableName)
cp.logCtx.L().Info("initial checkpoint record",
zap.String("sql", sql2),
Expand All @@ -299,20 +349,22 @@ func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context, filename string, endPos
zap.Int64("end position", endPos))
args := []interface{}{cp.id, filename, schema, table, 0, endPos}
cp.connMutex.Lock()
err := cp.conn.executeSQL(tctx, []string{sql2}, args)
err = cp.conn.executeSQL(tctx, []string{sql2}, args)
cp.connMutex.Unlock()
if err != nil {
if isErrDupEntry(err) {
cp.logCtx.L().Info("checkpoint record already exists, skip it.", zap.String("id", cp.id), zap.String("filename", filename))
cp.logCtx.L().Error("checkpoint record already exists, skip it.", zap.String("id", cp.id), zap.String("filename", filename))
return nil
}
return terror.WithScope(terror.Annotate(err, "initialize checkpoint"), terror.ScopeDownstream)
}
// checkpoint not exists and no error, cache endPos in memory
if _, ok := cp.restoringFiles[schema]; !ok {
cp.restoringFiles[schema] = make(map[string]FilePosSet)
cp.restoringFiles.Lock()
defer cp.restoringFiles.Unlock()
if _, ok := cp.restoringFiles.pos[schema]; !ok {
cp.restoringFiles.pos[schema] = make(map[string]FilePosSet)
}
tables := cp.restoringFiles[schema]
tables := cp.restoringFiles.pos[schema]
if _, ok := tables[table]; !ok {
tables[table] = make(map[string][]int64)
}
Expand Down Expand Up @@ -345,6 +397,18 @@ func (cp *RemoteCheckPoint) GenSQL(filename string, offset int64) string {
return sql
}

// UpdateOffset implements CheckPoint.UpdateOffset
func (cp *RemoteCheckPoint) UpdateOffset(filename string, offset int64) {
cp.restoringFiles.Lock()
defer cp.restoringFiles.Unlock()
db, table, err := getDBAndTableFromFilename(filename)
if err != nil {
cp.logCtx.L().Error("error in checkpoint UpdateOffset", zap.Error(err))
return
}
cp.restoringFiles.pos[db][table][filename][0] = offset
}

// Clear implements CheckPoint.Clear
func (cp *RemoteCheckPoint) Clear(tctx *tcontext.Context) error {
sql2 := fmt.Sprintf("DELETE FROM %s WHERE `id` = '%s'", cp.tableName, cp.id)
Expand Down Expand Up @@ -379,17 +443,10 @@ func (cp *RemoteCheckPoint) Count(tctx *tcontext.Context) (int, error) {
}

func (cp *RemoteCheckPoint) String() string {
// `String` is often used to log something, it's not a big problem even fail,
// so 1min should be enough.
tctx2, cancel := cp.logCtx.WithTimeout(time.Minute)
defer cancel()

if err := cp.Load(tctx2); err != nil {
return err.Error()
}

cp.restoringFiles.RLock()
defer cp.restoringFiles.RUnlock()
result := make(map[string][]int64)
for _, tables := range cp.restoringFiles {
for _, tables := range cp.restoringFiles.pos {
for _, files := range tables {
for file, set := range files {
result[file] = set
Expand Down
54 changes: 54 additions & 0 deletions loader/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ func (t *testCheckPointSuite) TestForDB(c *C) {
{"db1.tbl3.sql", 789},
}

allFiles := map[string]Tables2DataFiles{
"db1": {
"tbl1": {cases[0].filename},
"tbl2": {cases[1].filename},
"tbl3": {cases[2].filename},
},
}

id := "test_for_db"
tctx := tcontext.Background()
cp, err := newRemoteCheckPoint(tctx, t.cfg, id)
Expand All @@ -89,12 +97,26 @@ func (t *testCheckPointSuite) TestForDB(c *C) {
c.Assert(err, IsNil)
c.Assert(count, Equals, 0)

c.Assert(cp.IsTableCreated("db1", ""), IsFalse)
c.Assert(cp.IsTableCreated("db1", "tbl1"), IsFalse)
c.Assert(cp.CalcProgress(allFiles), IsNil)
c.Assert(cp.IsTableFinished("db1", "tbl1"), IsFalse)

// insert default checkpoints
for _, cs := range cases {
err = cp.Init(tctx, cs.filename, cs.endPos)
c.Assert(err, IsNil)
}

c.Assert(cp.IsTableCreated("db1", ""), IsTrue)
c.Assert(cp.IsTableCreated("db1", "tbl1"), IsTrue)
c.Assert(cp.CalcProgress(allFiles), IsNil)
c.Assert(cp.IsTableFinished("db1", "tbl1"), IsFalse)

info := cp.GetRestoringFileInfo("db1", "tbl1")
c.Assert(info, HasLen, 1)
c.Assert(info[cases[0].filename], DeepEquals, []int64{0, cases[0].endPos})

err = cp.Load(tctx)
c.Assert(err, IsNil)

Expand Down Expand Up @@ -129,6 +151,15 @@ func (t *testCheckPointSuite) TestForDB(c *C) {
err = cp.Load(tctx)
c.Assert(err, IsNil)

c.Assert(cp.IsTableCreated("db1", ""), IsTrue)
c.Assert(cp.IsTableCreated("db1", "tbl1"), IsTrue)
c.Assert(cp.CalcProgress(allFiles), IsNil)
c.Assert(cp.IsTableFinished("db1", "tbl1"), IsTrue)

info = cp.GetRestoringFileInfo("db1", "tbl1")
c.Assert(info, HasLen, 1)
c.Assert(info[cases[0].filename], DeepEquals, []int64{cases[0].endPos, cases[0].endPos})

infos = cp.GetAllRestoringFileInfo()
c.Assert(len(infos), Equals, len(cases))
for _, cs := range cases {
Expand All @@ -150,6 +181,11 @@ func (t *testCheckPointSuite) TestForDB(c *C) {
err = cp.Load(tctx)
c.Assert(err, IsNil)

c.Assert(cp.IsTableCreated("db1", ""), IsFalse)
c.Assert(cp.IsTableCreated("db1", "tbl1"), IsFalse)
c.Assert(cp.CalcProgress(allFiles), IsNil)
c.Assert(cp.IsTableFinished("db1", "tbl1"), IsFalse)

infos = cp.GetAllRestoringFileInfo()
c.Assert(len(infos), Equals, 0)

Expand All @@ -158,3 +194,21 @@ func (t *testCheckPointSuite) TestForDB(c *C) {
c.Assert(err, IsNil)
c.Assert(count, Equals, 0)
}

func (t *testCheckPointSuite) TestDeepCopy(c *C) {
cp := RemoteCheckPoint{}
cp.restoringFiles.pos = make(map[string]map[string]FilePosSet)
cp.restoringFiles.pos["db"] = make(map[string]FilePosSet)
cp.restoringFiles.pos["db"]["table"] = make(map[string][]int64)
cp.restoringFiles.pos["db"]["table"]["file"] = []int64{0, 100}

ret := cp.GetRestoringFileInfo("db", "table")
cp.restoringFiles.pos["db"]["table"]["file"][0] = 10
cp.restoringFiles.pos["db"]["table"]["file2"] = []int64{0, 100}
c.Assert(ret, DeepEquals, map[string][]int64{"file": {0, 100}})

ret = cp.GetAllRestoringFileInfo()
cp.restoringFiles.pos["db"]["table"]["file"][0] = 20
cp.restoringFiles.pos["db"]["table"]["file3"] = []int64{0, 100}
c.Assert(ret, DeepEquals, map[string][]int64{"file": {10, 100}, "file2": {0, 100}})
}
Loading

0 comments on commit 764f702

Please sign in to comment.