diff --git a/tests/e2e/cluster_downgrade_test.go b/tests/e2e/cluster_downgrade_test.go index f83b8fc0726..086b80894d5 100644 --- a/tests/e2e/cluster_downgrade_test.go +++ b/tests/e2e/cluster_downgrade_test.go @@ -16,16 +16,13 @@ package e2e import ( "context" - "encoding/json" "fmt" - "strings" "testing" "time" "github.com/coreos/go-semver/semver" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap" "go.etcd.io/etcd/api/v3/version" "go.etcd.io/etcd/client/pkg/v3/fileutil" @@ -38,7 +35,6 @@ import ( "go.etcd.io/etcd/server/v3/storage/datadir" "go.etcd.io/etcd/tests/v3/framework/config" "go.etcd.io/etcd/tests/v3/framework/e2e" - "go.etcd.io/etcd/tests/v3/framework/testutils" ) func TestDowngradeUpgradeClusterOf1(t *testing.T) { @@ -78,7 +74,6 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int, triggerSnapshot bool) { lastClusterVersion := semver.New(lastVersionStr) lastClusterVersion.Patch = 0 - lastClusterVersionStr := lastClusterVersion.String() e2e.BeforeTest(t) @@ -86,7 +81,7 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int, triggerSnapshot bool) { var snapshotCount uint64 = 10 epc := newCluster(t, clusterSize, snapshotCount) for i := 0; i < len(epc.Procs); i++ { - validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{ + e2e.ValidateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{ Cluster: currentVersionStr, Server: version.Version, Storage: currentVersionStr, @@ -112,35 +107,10 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int, triggerSnapshot bool) { require.NoError(t, err) beforeMembers, beforeKV := getMembersAndKeys(t, cc) - t.Logf("etcdctl downgrade enable %s", lastVersionStr) - downgradeEnable(t, epc, lastVersion) - - t.Log("Downgrade enabled, validating if cluster is ready for downgrade") - for i := 0; i < len(epc.Procs); i++ { - validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{ - Cluster: lastClusterVersionStr, - Server: version.Version, - Storage: lastClusterVersionStr, - }) - e2e.AssertProcessLogs(t, epc.Procs[i], "The server is ready to downgrade") - } - - t.Log("Cluster is ready for downgrade") + e2e.DowngradeEnable(t, epc, lastVersion) t.Logf("Starting downgrade process to %q", lastVersionStr) - for i := 0; i < len(epc.Procs); i++ { - t.Logf("Downgrading member %d by running %s binary", i, lastReleaseBinary) - stopEtcd(t, epc.Procs[i]) - startEtcd(t, epc.Procs[i], lastReleaseBinary) - } - - t.Log("All members downgraded, validating downgrade") + e2e.DowngradeUpgradeMembers(t, nil, epc, len(epc.Procs), currentVersion, lastClusterVersion) e2e.AssertProcessLogs(t, leader(t, epc), "the cluster has been downgraded") - for i := 0; i < len(epc.Procs); i++ { - validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{ - Cluster: lastClusterVersionStr, - Server: lastVersionStr, - }) - } t.Log("Downgrade complete") afterMembers, afterKV := getMembersAndKeys(t, cc) @@ -165,23 +135,7 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int, triggerSnapshot bool) { beforeMembers, beforeKV = getMembersAndKeys(t, cc) t.Logf("Starting upgrade process to %q", currentVersionStr) - for i := 0; i < len(epc.Procs); i++ { - t.Logf("Upgrading member %d", i) - stopEtcd(t, epc.Procs[i]) - startEtcd(t, epc.Procs[i], currentEtcdBinary) - // NOTE: The leader has monitor to the cluster version, which will - // update cluster version. We don't need to check the transient - // version just in case that it might be flaky. - } - - t.Log("All members upgraded, validating upgrade") - for i := 0; i < len(epc.Procs); i++ { - validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{ - Cluster: currentVersionStr, - Server: version.Version, - Storage: currentVersionStr, - }) - } + e2e.DowngradeUpgradeMembers(t, nil, epc, len(epc.Procs), lastClusterVersion, currentVersion) t.Log("Upgrade complete") afterMembers, afterKV = getMembersAndKeys(t, cc) @@ -206,48 +160,6 @@ func newCluster(t *testing.T, clusterSize int, snapshotCount uint64) *e2e.EtcdPr return epc } -func startEtcd(t *testing.T, ep e2e.EtcdProcess, execPath string) { - ep.Config().ExecPath = execPath - err := ep.Restart(context.TODO()) - if err != nil { - t.Fatalf("could not start etcd process cluster (%v)", err) - } -} - -func downgradeEnable(t *testing.T, epc *e2e.EtcdProcessCluster, ver *semver.Version) { - c := epc.Etcdctl() - testutils.ExecuteWithTimeout(t, 20*time.Second, func() { - err := c.DowngradeEnable(context.TODO(), ver.String()) - require.NoError(t, err) - }) -} - -func stopEtcd(t *testing.T, ep e2e.EtcdProcess) { - err := ep.Stop() - require.NoError(t, err) -} - -func validateVersion(t *testing.T, cfg *e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess, expect version.Versions) { - testutils.ExecuteWithTimeout(t, 30*time.Second, func() { - for { - result, err := getMemberVersionByCurl(cfg, member) - if err != nil { - cfg.Logger.Warn("failed to get member version and retrying", zap.Error(err), zap.String("member", member.Config().Name)) - time.Sleep(time.Second) - continue - } - cfg.Logger.Info("Comparing versions", zap.String("member", member.Config().Name), zap.Any("got", result), zap.Any("want", expect)) - if err := compareMemberVersion(expect, result); err != nil { - cfg.Logger.Warn("Versions didn't match retrying", zap.Error(err), zap.String("member", member.Config().Name)) - time.Sleep(time.Second) - continue - } - cfg.Logger.Info("Versions match", zap.String("member", member.Config().Name)) - break - } - }) -} - func leader(t *testing.T, epc *e2e.EtcdProcessCluster) e2e.EtcdProcess { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() @@ -269,36 +181,6 @@ func leader(t *testing.T, epc *e2e.EtcdProcessCluster) e2e.EtcdProcess { return nil } -func compareMemberVersion(expect version.Versions, target version.Versions) error { - if expect.Server != "" && expect.Server != target.Server { - return fmt.Errorf("expect etcdserver version %v, but got %v", expect.Server, target.Server) - } - - if expect.Cluster != "" && expect.Cluster != target.Cluster { - return fmt.Errorf("expect etcdcluster version %v, but got %v", expect.Cluster, target.Cluster) - } - - if expect.Storage != "" && expect.Storage != target.Storage { - return fmt.Errorf("expect storage version %v, but got %v", expect.Storage, target.Storage) - } - return nil -} - -func getMemberVersionByCurl(cfg *e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess) (version.Versions, error) { - args := e2e.CURLPrefixArgsCluster(cfg, member, "GET", e2e.CURLReq{Endpoint: "/version"}) - lines, err := e2e.RunUtilCompletion(args, nil) - if err != nil { - return version.Versions{}, err - } - - data := strings.Join(lines, "\n") - result := version.Versions{} - if err := json.Unmarshal([]byte(data), &result); err != nil { - return version.Versions{}, fmt.Errorf("failed to unmarshal (%v): %w", data, err) - } - return result, nil -} - func generateSnapshot(t *testing.T, snapshotCount uint64, cc *e2e.EtcdctlV3) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/tests/framework/e2e/curl.go b/tests/framework/e2e/curl.go index d0546622afd..47f47351e7f 100644 --- a/tests/framework/e2e/curl.go +++ b/tests/framework/e2e/curl.go @@ -123,10 +123,8 @@ func CURLPut(clus *EtcdProcessCluster, req CURLReq) error { } func CURLGet(clus *EtcdProcessCluster, req CURLReq) error { - ctx, cancel := context.WithTimeout(context.Background(), req.timeoutDuration()) - defer cancel() - - return SpawnWithExpectsContext(ctx, CURLPrefixArgsCluster(clus.Cfg, clus.Procs[rand.Intn(clus.Cfg.ClusterSize)], "GET", req), nil, req.Expected) + member := clus.Procs[rand.Intn(clus.Cfg.ClusterSize)] + return CURLGetFromMember(clus, member, req) } func CURLGetFromMember(clus *EtcdProcessCluster, member EtcdProcess, req CURLReq) error { diff --git a/tests/framework/e2e/downgrade.go b/tests/framework/e2e/downgrade.go new file mode 100644 index 00000000000..34c9ca4101c --- /dev/null +++ b/tests/framework/e2e/downgrade.go @@ -0,0 +1,176 @@ +// Copyright 2025 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "context" + "encoding/json" + "fmt" + "math/rand" + "strings" + "testing" + "time" + + "github.com/coreos/go-semver/semver" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "go.etcd.io/etcd/api/v3/version" + "go.etcd.io/etcd/server/v3/etcdserver" + "go.etcd.io/etcd/tests/v3/framework/testutils" +) + +func DowngradeEnable(t *testing.T, epc *EtcdProcessCluster, ver *semver.Version) { + t.Logf("etcdctl downgrade enable %s", ver.String()) + c := epc.Etcdctl() + testutils.ExecuteWithTimeout(t, 20*time.Second, func() { + err := c.DowngradeEnable(context.TODO(), ver.String()) + require.NoError(t, err) + }) + + t.Log("Downgrade enabled, validating if cluster is ready for downgrade") + for i := 0; i < len(epc.Procs); i++ { + ValidateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{ + Cluster: ver.String(), + Server: offsetMinor(ver, 1).String(), + Storage: ver.String(), + }) + AssertProcessLogs(t, epc.Procs[i], "The server is ready to downgrade") + } + + t.Log("Cluster is ready for downgrade") +} + +func DowngradeUpgradeMembers(t *testing.T, lg *zap.Logger, clus *EtcdProcessCluster, numberOfMembersToChange int, currentVersion, targetVersion *semver.Version) error { + if lg == nil { + lg = clus.lg + } + isDowngrade := targetVersion.LessThan(*currentVersion) + opString := "upgrading" + newExecPath := BinPath.Etcd + if isDowngrade { + opString = "downgrading" + newExecPath = BinPath.EtcdLastRelease + } + membersToChange := rand.Perm(len(clus.Procs))[:numberOfMembersToChange] + lg.Info(fmt.Sprintf("Test %s members", opString), zap.Any("members", membersToChange)) + + // Need to wait health interval for cluster to prepare for downgrade/upgrade + time.Sleep(etcdserver.HealthInterval) + + for _, memberID := range membersToChange { + member := clus.Procs[memberID] + if member.Config().ExecPath == newExecPath { + return fmt.Errorf("member:%s is already running with the %s target binary - %s", member.Config().Name, opString, member.Config().ExecPath) + } + lg.Info(fmt.Sprintf("%s member", opString), zap.String("member", member.Config().Name)) + if err := member.Stop(); err != nil { + return err + } + member.Config().ExecPath = newExecPath + lg.Info("Restarting member", zap.String("member", member.Config().Name)) + err := member.Start(context.TODO()) + if err != nil { + return err + } + } + lg.Info("Validating versions") + for _, memberID := range membersToChange { + member := clus.Procs[memberID] + if isDowngrade || numberOfMembersToChange == len(clus.Procs) { + ValidateVersion(t, clus.Cfg, member, version.Versions{ + Cluster: targetVersion.String(), + Server: targetVersion.String(), + }) + } else { + ValidateVersion(t, clus.Cfg, member, version.Versions{ + Cluster: currentVersion.String(), + Server: targetVersion.String(), + }) + } + } + return nil +} + +func ValidateVersion(t *testing.T, cfg *EtcdProcessClusterConfig, member EtcdProcess, expect version.Versions) { + testutils.ExecuteWithTimeout(t, 30*time.Second, func() { + for { + result, err := getMemberVersionByCurl(cfg, member) + if err != nil { + cfg.Logger.Warn("failed to get member version and retrying", zap.Error(err), zap.String("member", member.Config().Name)) + time.Sleep(time.Second) + continue + } + cfg.Logger.Info("Comparing versions", zap.String("member", member.Config().Name), zap.Any("got", result), zap.Any("want", expect)) + if err := compareMemberVersion(expect, result); err != nil { + cfg.Logger.Warn("Versions didn't match retrying", zap.Error(err), zap.String("member", member.Config().Name)) + time.Sleep(time.Second) + continue + } + cfg.Logger.Info("Versions match", zap.String("member", member.Config().Name)) + break + } + }) +} + +// offsetMinor returns the version with offset from the original minor, with the same major. +func offsetMinor(v *semver.Version, offset int) *semver.Version { + var minor int64 + if offset >= 0 { + minor = v.Minor + int64(offset) + } else { + diff := int64(-offset) + if diff < v.Minor { + minor = v.Minor - diff + } + } + return &semver.Version{Major: v.Major, Minor: minor} +} + +func majorMinorVersionsEqual(v1, v2 string) bool { + ver1 := semver.New(v1) + ver2 := semver.New(v2) + return ver1.Major == ver2.Major && ver1.Minor == ver2.Minor +} + +func compareMemberVersion(expect version.Versions, target version.Versions) error { + if expect.Server != "" && !majorMinorVersionsEqual(expect.Server, target.Server) { + return fmt.Errorf("expect etcdserver version %v, but got %v", expect.Server, target.Server) + } + + if expect.Cluster != "" && !majorMinorVersionsEqual(expect.Cluster, target.Cluster) { + return fmt.Errorf("expect etcdcluster version %v, but got %v", expect.Cluster, target.Cluster) + } + + if expect.Storage != "" && !majorMinorVersionsEqual(expect.Storage, target.Storage) { + return fmt.Errorf("expect storage version %v, but got %v", expect.Storage, target.Storage) + } + return nil +} + +func getMemberVersionByCurl(cfg *EtcdProcessClusterConfig, member EtcdProcess) (version.Versions, error) { + args := CURLPrefixArgsCluster(cfg, member, "GET", CURLReq{Endpoint: "/version"}) + lines, err := RunUtilCompletion(args, nil) + if err != nil { + return version.Versions{}, err + } + + data := strings.Join(lines, "\n") + result := version.Versions{} + if err := json.Unmarshal([]byte(data), &result); err != nil { + return version.Versions{}, fmt.Errorf("failed to unmarshal (%v): %w", data, err) + } + return result, nil +} diff --git a/tests/robustness/failpoint/cluster.go b/tests/robustness/failpoint/cluster.go index b33465e8379..d2751d5b03e 100644 --- a/tests/robustness/failpoint/cluster.go +++ b/tests/robustness/failpoint/cluster.go @@ -27,10 +27,8 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" - pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/client/pkg/v3/fileutil" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/pkg/v3/expect" "go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/tests/v3/framework/e2e" "go.etcd.io/etcd/tests/v3/robustness/identity" @@ -39,8 +37,9 @@ import ( ) var ( - MemberReplace Failpoint = memberReplace{} - MemberDowngrade Failpoint = memberDowngrade{} + MemberReplace Failpoint = memberReplace{} + MemberDowngrade Failpoint = memberDowngrade{} + MemberDowngradeUpgrade Failpoint = memberDowngradeUpgrade{} ) type memberReplace struct{} @@ -148,14 +147,15 @@ func (f memberReplace) Available(config e2e.EtcdProcessClusterConfig, member e2e type memberDowngrade struct{} func (f memberDowngrade) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) { - v, err := e2e.GetVersionFromBinary(e2e.BinPath.Etcd) + currentVersion, err := e2e.GetVersionFromBinary(e2e.BinPath.Etcd) + if err != nil { + return nil, err + } + lastVersion, err := e2e.GetVersionFromBinary(e2e.BinPath.EtcdLastRelease) if err != nil { return nil, err } - targetVersion := semver.Version{Major: v.Major, Minor: v.Minor - 1} numberOfMembersToDowngrade := rand.Int()%len(clus.Procs) + 1 - membersToDowngrade := rand.Perm(len(clus.Procs))[:numberOfMembersToDowngrade] - lg.Info("Test downgrading members", zap.Any("members", membersToDowngrade)) member := clus.Procs[0] endpoints := []string{member.EndpointsGRPC()[0]} @@ -172,37 +172,80 @@ func (f memberDowngrade) Inject(ctx context.Context, t *testing.T, lg *zap.Logge // Need to wait health interval for cluster to accept changes time.Sleep(etcdserver.HealthInterval) - lg.Info("Enable downgrade") - err = enableDowngrade(ctx, cc, &targetVersion) + e2e.DowngradeEnable(t, clus, lastVersion) + + err = e2e.DowngradeUpgradeMembers(t, lg, clus, numberOfMembersToDowngrade, currentVersion, lastVersion) + time.Sleep(etcdserver.HealthInterval) + return nil, err +} + +func (f memberDowngrade) Name() string { + return "MemberDowngrade" +} + +func (f memberDowngrade) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess, profile traffic.Profile) bool { + if !fileutil.Exist(e2e.BinPath.EtcdLastRelease) { + return false + } + // only run memberDowngrade test if no snapshot would be sent between members. + // see https://github.com/etcd-io/etcd/issues/19147 for context. + if config.ServerConfig.SnapshotCatchUpEntries < etcdserver.DefaultSnapshotCatchUpEntries { + return false + } + v, err := e2e.GetVersionFromBinary(e2e.BinPath.Etcd) + if err != nil { + panic("Failed checking etcd version binary") + } + v3_6 := semver.Version{Major: 3, Minor: 6} + // only current version cluster can be downgraded. + return v.Compare(v3_6) >= 0 && (config.Version == e2e.CurrentVersion && member.Config().ExecPath == e2e.BinPath.Etcd) +} + +type memberDowngradeUpgrade struct{} + +func (f memberDowngradeUpgrade) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) { + currentVersion, err := e2e.GetVersionFromBinary(e2e.BinPath.Etcd) + if err != nil { + return nil, err + } + lastVersion, err := e2e.GetVersionFromBinary(e2e.BinPath.EtcdLastRelease) if err != nil { return nil, err } - // Need to wait health interval for cluster to prepare for downgrade - time.Sleep(etcdserver.HealthInterval) - for _, memberID := range membersToDowngrade { - member = clus.Procs[memberID] - lg.Info("Downgrading member", zap.String("member", member.Config().Name)) - if err = member.Stop(); err != nil { - return nil, err - } - member.Config().ExecPath = e2e.BinPath.EtcdLastRelease - lg.Info("Restarting member", zap.String("member", member.Config().Name)) - err = member.Start(ctx) - if err != nil { - return nil, err - } - err = verifyVersion(t, clus, member, targetVersion) + member := clus.Procs[0] + endpoints := []string{member.EndpointsGRPC()[0]} + cc, err := clientv3.New(clientv3.Config{ + Endpoints: endpoints, + Logger: zap.NewNop(), + DialKeepAliveTime: 10 * time.Second, + DialKeepAliveTimeout: 100 * time.Millisecond, + }) + if err != nil { + return nil, err } + defer cc.Close() + + // Need to wait health interval for cluster to accept changes + time.Sleep(etcdserver.HealthInterval) + e2e.DowngradeEnable(t, clus, lastVersion) + // downgrade all members first + err = e2e.DowngradeUpgradeMembers(t, lg, clus, len(clus.Procs), currentVersion, lastVersion) + if err != nil { + return nil, err + } + // partial upgrade the cluster + numberOfMembersToUpgrade := rand.Int()%len(clus.Procs) + 1 + err = e2e.DowngradeUpgradeMembers(t, lg, clus, numberOfMembersToUpgrade, lastVersion, currentVersion) time.Sleep(etcdserver.HealthInterval) return nil, err } -func (f memberDowngrade) Name() string { - return "MemberDowngrade" +func (f memberDowngradeUpgrade) Name() string { + return "MemberDowngradeUpgrade" } -func (f memberDowngrade) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess, profile traffic.Profile) bool { +func (f memberDowngradeUpgrade) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess, profile traffic.Profile) bool { if !fileutil.Exist(e2e.BinPath.EtcdLastRelease) { return false } @@ -252,29 +295,3 @@ func patchArgs(args []string, flag, newValue string) error { } return fmt.Errorf("--%s flag not found", flag) } - -func enableDowngrade(ctx context.Context, cc *clientv3.Client, targetVersion *semver.Version) error { - _, err := cc.Maintenance.Downgrade(ctx, clientv3.DowngradeAction(pb.DowngradeRequest_VALIDATE), targetVersion.String()) - if err != nil { - return err - } - _, err = cc.Maintenance.Downgrade(ctx, clientv3.DowngradeAction(pb.DowngradeRequest_ENABLE), targetVersion.String()) - return err -} - -func verifyVersion(t *testing.T, clus *e2e.EtcdProcessCluster, member e2e.EtcdProcess, expectedVersion semver.Version) error { - var err error - expected := fmt.Sprintf(`"etcdserver":"%d.%d\..*"etcdcluster":"%d\.%d\.`, expectedVersion.Major, expectedVersion.Minor, expectedVersion.Major, expectedVersion.Minor) - for i := 0; i < 35; i++ { - if err = e2e.CURLGetFromMember(clus, member, e2e.CURLReq{Endpoint: "/version", Expected: expect.ExpectedResponse{Value: expected, IsRegularExpr: true}}); err != nil { - t.Logf("#%d: v3 is not ready yet (%v)", i, err) - time.Sleep(200 * time.Millisecond) - continue - } - break - } - if err != nil { - return fmt.Errorf("failed to verify version, expected %v got (%w)", expected, err) - } - return nil -} diff --git a/tests/robustness/failpoint/failpoint.go b/tests/robustness/failpoint/failpoint.go index 17c0d11b8e7..6522b6942da 100644 --- a/tests/robustness/failpoint/failpoint.go +++ b/tests/robustness/failpoint/failpoint.go @@ -47,6 +47,7 @@ var allFailpoints = []Failpoint{ BeforeApplyOneConfChangeSleep, MemberReplace, MemberDowngrade, + MemberDowngradeUpgrade, DropPeerNetwork, RaftBeforeSaveSleep, RaftAfterSaveSleep,