Skip to content

Commit

Permalink
feat: rename varaibles, add comments and one more test
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 committed Jan 16, 2025
1 parent 67aadff commit 30ad2e4
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 23 deletions.
32 changes: 20 additions & 12 deletions go/vt/topo/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,28 +439,30 @@ func (ts *Server) GetShardNames(ctx context.Context, keyspace string) ([]string,
return DirEntriesToStringArray(children), err
}

// WatchAllKeyspacePrefixData wraps the data we receive on the watch recursive channel
// The WatchKeyspacePrefix API guarantees exactly one of Value or Err will be set.
type WatchAllKeyspacePrefixData struct {
// WatchKeyspacePrefixData wraps the data we receive on the watch recursive channel
// The WatchAllKeyspaceRecords API guarantees exactly one of Value or Err will be set.
type WatchKeyspacePrefixData struct {
KeyspaceInfo *KeyspaceInfo
Err error
}

// WatchAllKeyspaceRecords will set a watch on the Keyspace prefix.
// It has the same contract as conn.WatchRecursive, but it also unpacks the
// contents into a Keyspace object.
func (ts *Server) WatchAllKeyspaceRecords(ctx context.Context) ([]*WatchAllKeyspacePrefixData, <-chan *WatchAllKeyspacePrefixData, error) {
func (ts *Server) WatchAllKeyspaceRecords(ctx context.Context) ([]*WatchKeyspacePrefixData, <-chan *WatchKeyspacePrefixData, error) {
if err := ctx.Err(); err != nil {
return nil, nil, err
}

ctx, cancel := context.WithCancel(ctx)

// Set up a recursive watch on the KeyspacesPath.
current, wdChannel, err := ts.globalCell.WatchRecursive(ctx, KeyspacesPath)
if err != nil {
cancel()
return nil, nil, err
}
// Unpack the initial data.
initialRes, err := checkAndUnpackKeyspaceRecord(current...)
if err != nil {
// Cancel the watch, drain channel.
Expand All @@ -470,7 +472,7 @@ func (ts *Server) WatchAllKeyspaceRecords(ctx context.Context) ([]*WatchAllKeysp
return nil, nil, vterrors.Wrapf(err, "error unpacking initial Keyspace objects")
}

changes := make(chan *WatchAllKeyspacePrefixData, 10)
changes := make(chan *WatchKeyspacePrefixData, 10)
// The background routine reads any event from the watch channel,
// translates it, and sends it to the caller.
// If cancel() is called, the underlying WatchRecursive() code will
Expand All @@ -485,7 +487,7 @@ func (ts *Server) WatchAllKeyspaceRecords(ctx context.Context) ([]*WatchAllKeysp
// Last error value, we're done.
// wdChannel will be closed right after
// this, no need to do anything.
changes <- &WatchAllKeyspacePrefixData{Err: wd.Err}
changes <- &WatchKeyspacePrefixData{Err: wd.Err}
return
}

Expand All @@ -494,10 +496,13 @@ func (ts *Server) WatchAllKeyspaceRecords(ctx context.Context) ([]*WatchAllKeysp
cancel()
for range wdChannel {
}
changes <- &WatchAllKeyspacePrefixData{Err: vterrors.Wrapf(err, "error unpacking object")}
changes <- &WatchKeyspacePrefixData{Err: vterrors.Wrapf(err, "error unpacking object")}
return
}

// Each update will only have a single object, if at all.
// We get updates for all objects in the prefix, but we only
// care about the keyspace objects.
if len(res) == 0 {
continue
}
Expand All @@ -508,17 +513,20 @@ func (ts *Server) WatchAllKeyspaceRecords(ctx context.Context) ([]*WatchAllKeysp
return initialRes, changes, nil
}

func checkAndUnpackKeyspaceRecord(wds ...*WatchDataRecursive) ([]*WatchAllKeyspacePrefixData, error) {
var res []*WatchAllKeyspacePrefixData
// checkAndUnpackKeyspaceRecord checks for Keyspace objects and unpacks them.
func checkAndUnpackKeyspaceRecord(wds ...*WatchDataRecursive) ([]*WatchKeyspacePrefixData, error) {
var res []*WatchKeyspacePrefixData
for _, wd := range wds {
ksDir, fileType := path.Split(wd.Path)
fileDir, fileType := path.Split(wd.Path)
// Check if the file is a keyspace record.
// If it is, then we unpack it.
if fileType == KeyspaceFile {
value := &topodatapb.Keyspace{}
if err := value.UnmarshalVT(wd.Contents); err != nil {
return nil, err
}
res = append(res, &WatchAllKeyspacePrefixData{
KeyspaceInfo: NewKeyspaceInfo(path.Base(ksDir), value),
res = append(res, &WatchKeyspacePrefixData{
KeyspaceInfo: NewKeyspaceInfo(path.Base(fileDir), value),
})
}
}
Expand Down
41 changes: 30 additions & 11 deletions go/vt/topo/keyspace_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ func TestServerGetServingShards(t *testing.T) {
}
}

// TestWatchAllKeyspaceRecords tests the WatchAllKeyspaceRecords method.
// We test out different updates and see if we receive the correct update
// from the watch.
func TestWatchAllKeyspaceRecords(t *testing.T) {
ksDef := &topodatapb.Keyspace{
KeyspaceType: topodatapb.KeyspaceType_NORMAL,
Expand All @@ -234,8 +237,8 @@ func TestWatchAllKeyspaceRecords(t *testing.T) {
name string
setupFunc func(t *testing.T, ts *topo.Server)
updateFunc func(t *testing.T, ts *topo.Server)
wantInitRecords []*topo.WatchAllKeyspacePrefixData
wantChanRecords []*topo.WatchAllKeyspacePrefixData
wantInitRecords []*topo.WatchKeyspacePrefixData
wantChanRecords []*topo.WatchKeyspacePrefixData
}{
{
name: "Update Durability Policy in 1 Keyspace",
Expand All @@ -253,12 +256,12 @@ func TestWatchAllKeyspaceRecords(t *testing.T) {
err = ts.UpdateKeyspace(ctx, ki)
require.NoError(t, err)
},
wantInitRecords: []*topo.WatchAllKeyspacePrefixData{
wantInitRecords: []*topo.WatchKeyspacePrefixData{
{
KeyspaceInfo: topo.NewKeyspaceInfo("ks", ksDef),
},
},
wantChanRecords: []*topo.WatchAllKeyspacePrefixData{
wantChanRecords: []*topo.WatchKeyspacePrefixData{
{
KeyspaceInfo: topo.NewKeyspaceInfo("ks", &topodatapb.Keyspace{
KeyspaceType: topodatapb.KeyspaceType_NORMAL,
Expand All @@ -268,6 +271,21 @@ func TestWatchAllKeyspaceRecords(t *testing.T) {
},
},
},
{
name: "New Keyspace Created",
setupFunc: func(t *testing.T, ts *topo.Server) {
},
updateFunc: func(t *testing.T, ts *topo.Server) {
err := ts.CreateKeyspace(context.Background(), "ks", ksDef)
require.NoError(t, err)
},
wantInitRecords: []*topo.WatchKeyspacePrefixData{},
wantChanRecords: []*topo.WatchKeyspacePrefixData{
{
KeyspaceInfo: topo.NewKeyspaceInfo("ks", ksDef),
},
},
},
{
name: "Update KeyspaceType in 1 Keyspace",
setupFunc: func(t *testing.T, ts *topo.Server) {
Expand All @@ -284,12 +302,12 @@ func TestWatchAllKeyspaceRecords(t *testing.T) {
err = ts.UpdateKeyspace(ctx, ki)
require.NoError(t, err)
},
wantInitRecords: []*topo.WatchAllKeyspacePrefixData{
wantInitRecords: []*topo.WatchKeyspacePrefixData{
{
KeyspaceInfo: topo.NewKeyspaceInfo("ks", ksDef),
},
},
wantChanRecords: []*topo.WatchAllKeyspacePrefixData{
wantChanRecords: []*topo.WatchKeyspacePrefixData{
{
KeyspaceInfo: topo.NewKeyspaceInfo("ks", &topodatapb.Keyspace{
KeyspaceType: topodatapb.KeyspaceType_SNAPSHOT,
Expand Down Expand Up @@ -338,15 +356,15 @@ func TestWatchAllKeyspaceRecords(t *testing.T) {
require.NoError(t, err)
}()
},
wantInitRecords: []*topo.WatchAllKeyspacePrefixData{
wantInitRecords: []*topo.WatchKeyspacePrefixData{
{
KeyspaceInfo: topo.NewKeyspaceInfo("ks", ksDef),
},
{
KeyspaceInfo: topo.NewKeyspaceInfo("ks2", ksDef),
},
},
wantChanRecords: []*topo.WatchAllKeyspacePrefixData{
wantChanRecords: []*topo.WatchKeyspacePrefixData{
{
KeyspaceInfo: topo.NewKeyspaceInfo("ks", &topodatapb.Keyspace{
KeyspaceType: topodatapb.KeyspaceType_SNAPSHOT,
Expand Down Expand Up @@ -381,7 +399,7 @@ func TestWatchAllKeyspaceRecords(t *testing.T) {
defer watchCancel()
initRecords, ch, err := ts.WatchAllKeyspaceRecords(watchCtx)
require.NoError(t, err)
elementsMatchFunc(t, tt.wantInitRecords, initRecords, keyspacePrefixDataMatches)
elementsMatchFunc(t, tt.wantInitRecords, initRecords, watchKeyspacePrefixDataMatches)

// We start a go routine to collect all the records from the channel.
wg := sync.WaitGroup{}
Expand All @@ -393,7 +411,7 @@ func TestWatchAllKeyspaceRecords(t *testing.T) {
if topo.IsErrType(data.Err, topo.Interrupted) {
continue
}
require.True(t, keyspacePrefixDataMatches(tt.wantChanRecords[idx], data))
require.True(t, watchKeyspacePrefixDataMatches(tt.wantChanRecords[idx], data))
idx++
// Stop the watch after we have verified we have received all required updates.
if idx == len(tt.wantChanRecords) {
Expand Down Expand Up @@ -436,7 +454,8 @@ func elementsMatchFunc[T any](t *testing.T, expected, actual []T, equalFn func(a
}
}

func keyspacePrefixDataMatches(a, b *topo.WatchAllKeyspacePrefixData) bool {
// watchKeyspacePrefixDataMatches is a helper function to check equality of two topo.WatchKeyspacePrefixData.
func watchKeyspacePrefixDataMatches(a, b *topo.WatchKeyspacePrefixData) bool {
if a == nil || b == nil {
return a == b
}
Expand Down

0 comments on commit 30ad2e4

Please sign in to comment.