Skip to content

Commit

Permalink
feat: add an e2e test to verify the assumptions hold for all topo ser…
Browse files Browse the repository at this point in the history
…vers

Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 committed Jan 16, 2025
1 parent 30ad2e4 commit 0a51b8f
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 0 deletions.
55 changes: 55 additions & 0 deletions go/test/endtoend/topotest/consul/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (

topoutils "vitess.io/vitess/go/test/endtoend/topotest/utils"
"vitess.io/vitess/go/vt/log"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtctl/reparentutil/policy"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -221,6 +223,59 @@ func TestKeyspaceLocking(t *testing.T) {
topoutils.WaitForBoolValue(t, &secondThreadLockAcquired, true)
}

// TestWatchAllKeyspaceRecords tests the WatchAllKeyspaceRecords method.
// We test out different updates and see if we receive the correct update
// from the watch.
func TestWatchAllKeyspaceRecords(t *testing.T) {
// Create the topo server connection.
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
require.NoError(t, err)

watchCtx, watchCancel := context.WithCancel(context.Background())
defer watchCancel()
initRecords, ch, err := ts.WatchAllKeyspaceRecords(watchCtx)
require.NoError(t, err)

// Check that we have the initial records.
// The existing keyspace record should be seen.
require.Len(t, initRecords, 1)
require.EqualValues(t, KeyspaceName, initRecords[0].KeyspaceInfo.KeyspaceName())

// Create a new keyspace record and see that we receive an update.
newKeyspaceName := "ksTest"
err = ts.CreateKeyspace(context.Background(), newKeyspaceName, &topodatapb.Keyspace{
KeyspaceType: topodatapb.KeyspaceType_NORMAL,
DurabilityPolicy: policy.DurabilitySemiSync,
})
require.NoError(t, err)
defer func() {
err = ts.DeleteKeyspace(context.Background(), newKeyspaceName)
require.NoError(t, err)
}()

// Wait to receive an update from the watch.
record := <-ch
require.EqualValues(t, newKeyspaceName, record.KeyspaceInfo.KeyspaceName())
require.EqualValues(t, policy.DurabilitySemiSync, record.KeyspaceInfo.Keyspace.DurabilityPolicy)

// Update the keyspace record and see that we receive an update.
func() {
ki, err := ts.GetKeyspace(context.Background(), newKeyspaceName)
require.NoError(t, err)
ctx, unlock, err := ts.LockKeyspace(context.Background(), newKeyspaceName, "TestWatchAllKeyspaceRecords")
require.NoError(t, err)
defer unlock(&err)
ki.DurabilityPolicy = policy.DurabilityCrossCell
err = ts.UpdateKeyspace(ctx, ki)
require.NoError(t, err)
}()

// Wait to receive an update from the watch.
record = <-ch
require.EqualValues(t, newKeyspaceName, record.KeyspaceInfo.KeyspaceName())
require.EqualValues(t, policy.DurabilityCrossCell, record.KeyspaceInfo.Keyspace.DurabilityPolicy)
}

func execute(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result {
t.Helper()
qr, err := conn.ExecuteFetch(query, 1000, true)
Expand Down
55 changes: 55 additions & 0 deletions go/test/endtoend/topotest/etcd2/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (

topoutils "vitess.io/vitess/go/test/endtoend/topotest/utils"
"vitess.io/vitess/go/test/endtoend/utils"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtctl/reparentutil/policy"

"vitess.io/vitess/go/vt/log"

Expand Down Expand Up @@ -268,6 +270,59 @@ func TestNamedLocking(t *testing.T) {
topoutils.WaitForBoolValue(t, &secondCallerAcquired, true)
}

// TestWatchAllKeyspaceRecords tests the WatchAllKeyspaceRecords method.
// We test out different updates and see if we receive the correct update
// from the watch.
func TestWatchAllKeyspaceRecords(t *testing.T) {
// Create the topo server connection.
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
require.NoError(t, err)

watchCtx, watchCancel := context.WithCancel(context.Background())
defer watchCancel()
initRecords, ch, err := ts.WatchAllKeyspaceRecords(watchCtx)
require.NoError(t, err)

// Check that we have the initial records.
// The existing keyspace record should be seen.
require.Len(t, initRecords, 1)
require.EqualValues(t, KeyspaceName, initRecords[0].KeyspaceInfo.KeyspaceName())

// Create a new keyspace record and see that we receive an update.
newKeyspaceName := "ksTest"
err = ts.CreateKeyspace(context.Background(), newKeyspaceName, &topodatapb.Keyspace{
KeyspaceType: topodatapb.KeyspaceType_NORMAL,
DurabilityPolicy: policy.DurabilitySemiSync,
})
require.NoError(t, err)
defer func() {
err = ts.DeleteKeyspace(context.Background(), newKeyspaceName)
require.NoError(t, err)
}()

// Wait to receive an update from the watch.
record := <-ch
require.EqualValues(t, newKeyspaceName, record.KeyspaceInfo.KeyspaceName())
require.EqualValues(t, policy.DurabilitySemiSync, record.KeyspaceInfo.Keyspace.DurabilityPolicy)

// Update the keyspace record and see that we receive an update.
func() {
ki, err := ts.GetKeyspace(context.Background(), newKeyspaceName)
require.NoError(t, err)
ctx, unlock, err := ts.LockKeyspace(context.Background(), newKeyspaceName, "TestWatchAllKeyspaceRecords")
require.NoError(t, err)
defer unlock(&err)
ki.DurabilityPolicy = policy.DurabilityCrossCell
err = ts.UpdateKeyspace(ctx, ki)
require.NoError(t, err)
}()

// Wait to receive an update from the watch.
record = <-ch
require.EqualValues(t, newKeyspaceName, record.KeyspaceInfo.KeyspaceName())
require.EqualValues(t, policy.DurabilityCrossCell, record.KeyspaceInfo.Keyspace.DurabilityPolicy)
}

func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result {
t.Helper()
var res []*sqltypes.Result
Expand Down
55 changes: 55 additions & 0 deletions go/test/endtoend/topotest/zk2/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (

topoutils "vitess.io/vitess/go/test/endtoend/topotest/utils"
"vitess.io/vitess/go/test/endtoend/utils"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtctl/reparentutil/policy"

"vitess.io/vitess/go/vt/log"

Expand Down Expand Up @@ -197,6 +199,59 @@ func TestKeyspaceLocking(t *testing.T) {
topoutils.WaitForBoolValue(t, &secondThreadLockAcquired, true)
}

// TestWatchAllKeyspaceRecords tests the WatchAllKeyspaceRecords method.
// We test out different updates and see if we receive the correct update
// from the watch.
func TestWatchAllKeyspaceRecords(t *testing.T) {
// Create the topo server connection.
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
require.NoError(t, err)

watchCtx, watchCancel := context.WithCancel(context.Background())
defer watchCancel()
initRecords, ch, err := ts.WatchAllKeyspaceRecords(watchCtx)
require.NoError(t, err)

// Check that we have the initial records.
// The existing keyspace record should be seen.
require.Len(t, initRecords, 1)
require.EqualValues(t, KeyspaceName, initRecords[0].KeyspaceInfo.KeyspaceName())

// Create a new keyspace record and see that we receive an update.
newKeyspaceName := "ksTest"
err = ts.CreateKeyspace(context.Background(), newKeyspaceName, &topodatapb.Keyspace{
KeyspaceType: topodatapb.KeyspaceType_NORMAL,
DurabilityPolicy: policy.DurabilitySemiSync,
})
require.NoError(t, err)
defer func() {
err = ts.DeleteKeyspace(context.Background(), newKeyspaceName)
require.NoError(t, err)
}()

// Wait to receive an update from the watch.
record := <-ch
require.EqualValues(t, newKeyspaceName, record.KeyspaceInfo.KeyspaceName())
require.EqualValues(t, policy.DurabilitySemiSync, record.KeyspaceInfo.Keyspace.DurabilityPolicy)

// Update the keyspace record and see that we receive an update.
func() {
ki, err := ts.GetKeyspace(context.Background(), newKeyspaceName)
require.NoError(t, err)
ctx, unlock, err := ts.LockKeyspace(context.Background(), newKeyspaceName, "TestWatchAllKeyspaceRecords")
require.NoError(t, err)
defer unlock(&err)
ki.DurabilityPolicy = policy.DurabilityCrossCell
err = ts.UpdateKeyspace(ctx, ki)
require.NoError(t, err)
}()

// Wait to receive an update from the watch.
record = <-ch
require.EqualValues(t, newKeyspaceName, record.KeyspaceInfo.KeyspaceName())
require.EqualValues(t, policy.DurabilityCrossCell, record.KeyspaceInfo.Keyspace.DurabilityPolicy)
}

func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result {
t.Helper()
var res []*sqltypes.Result
Expand Down

0 comments on commit 0a51b8f

Please sign in to comment.