From 0db357753fc010bf257cdefc76cbadd8e7749df5 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 8 Jul 2024 11:33:16 +0530 Subject: [PATCH] Cleanup: Health Stream removed db pool usage (#16336) Signed-off-by: Harshit Gangal --- .../streamtimeout/healthstream_test.go | 99 ----------------- .../endtoend/streamtimeout/main_test.go | 102 ------------------ .../vttablet/tabletserver/health_streamer.go | 36 +------ .../tabletserver/health_streamer_test.go | 39 ++----- .../tabletserver/state_manager_test.go | 42 ++++---- .../vttablet/tabletserver/tabletenv/config.go | 60 ++++------- go/vt/vttablet/tabletserver/tabletserver.go | 2 +- 7 files changed, 52 insertions(+), 328 deletions(-) delete mode 100644 go/vt/vttablet/endtoend/streamtimeout/healthstream_test.go delete mode 100644 go/vt/vttablet/endtoend/streamtimeout/main_test.go diff --git a/go/vt/vttablet/endtoend/streamtimeout/healthstream_test.go b/go/vt/vttablet/endtoend/streamtimeout/healthstream_test.go deleted file mode 100644 index 9890efd427d..00000000000 --- a/go/vt/vttablet/endtoend/streamtimeout/healthstream_test.go +++ /dev/null @@ -1,99 +0,0 @@ -/* -Copyright 2023 The Vitess Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package streamtimeout - -import ( - "context" - "slices" - "testing" - "time" - - "github.com/stretchr/testify/require" - - "vitess.io/vitess/go/mysql" - querypb "vitess.io/vitess/go/vt/proto/query" - "vitess.io/vitess/go/vt/vttablet/endtoend/framework" -) - -// TestSchemaChangeTimedout ensures that the timeout functionality is working properly -// to prevent queries from hanging up and causing a mutex to be locked forever. -func TestSchemaChangeTimedout(t *testing.T) { - const TableName = "vitess_healthstream" - - client := framework.NewClient() - reloadEstimatedTime := 2 * time.Second - - err := cluster.SimulateMySQLHang() - require.NoError(t, err) - - defer cluster.StopSimulateMySQLHang() - - ch := make(chan []string, 100) - go func(ch chan []string) { - client.StreamHealth(func(response *querypb.StreamHealthResponse) error { - if response.RealtimeStats.TableSchemaChanged != nil { - ch <- response.RealtimeStats.TableSchemaChanged - } - return nil - }) - }(ch) - - // get a clean connection that skips toxyproxy to be able to change the schema in the underlying DB - cleanParams := cluster.MySQLCleanConnParams() - cleanConn, err := mysql.Connect(context.Background(), &cleanParams) - require.NoError(t, err) - defer cleanConn.Close() - - // change the schema to trigger the health_streamer to send a notification at a later time. - _, err = cleanConn.ExecuteFetch("create table "+TableName+"(id bigint primary key)", -1, false) - require.NoError(t, err) - - select { - case <-ch: // get the schema notification - t.Fatalf("received an schema change event from the HealthStreamer (is toxyproxy working?)") - case <-time.After(reloadEstimatedTime): - // Good, continue - } - - // We will wait for the health_streamer to attempt sending a notification. - // It's important to keep in mind that the total wait time after the simulation should be shorter than the reload timeout. - // This is because the query timeout triggers the *DBConn.Kill() method, which in turn holds the mutex lock on the health_streamer. - // Although not indefinitely, this can result in longer wait times. - // It's worth noting that the behavior of *DBConn.Kill() is outside the scope of this test. - reloadInterval := config.SignalSchemaChangeReloadInterval - time.Sleep(reloadInterval) - - // pause simulating the mysql stall to allow the health_streamer to resume. - err = cluster.PauseSimulateMySQLHang() - require.NoError(t, err) - - // wait for the health_streamer to complete retrying the notification. - reloadTimeout := config.SchemaChangeReloadTimeout - retryEstimatedTime := reloadTimeout + reloadInterval + reloadEstimatedTime - timeout := time.After(retryEstimatedTime) - for { - select { - case res := <-ch: // get the schema notification - if slices.Contains(res, TableName) { - return - } - case <-timeout: - t.Errorf("timed out even after the mysql hang was no longer simulated") - return - } - } -} diff --git a/go/vt/vttablet/endtoend/streamtimeout/main_test.go b/go/vt/vttablet/endtoend/streamtimeout/main_test.go deleted file mode 100644 index 0b2f37a987c..00000000000 --- a/go/vt/vttablet/endtoend/streamtimeout/main_test.go +++ /dev/null @@ -1,102 +0,0 @@ -/* -Copyright 2023 The Vitess Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -/* -All tests in this package come with toxiproxy in front of the MySQL server -*/ -package streamtimeout - -import ( - "context" - "flag" - "fmt" - "os" - "testing" - "time" - - "vitess.io/vitess/go/vt/vttablet/endtoend/framework" - "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" - "vitess.io/vitess/go/vt/vttest" - - vttestpb "vitess.io/vitess/go/vt/proto/vttest" -) - -var ( - cluster vttest.LocalCluster - config *tabletenv.TabletConfig -) - -func TestMain(m *testing.M) { - flag.Parse() // Do not remove this comment, import into google3 depends on it - tabletenv.Init() - - exitCode := func() int { - // Launch MySQL. - // We need a Keyspace in the topology, so the DbName is set. - // We need a Shard too, so the database 'vttest' is created. - cfg := vttest.Config{ - Topology: &vttestpb.VTTestTopology{ - Keyspaces: []*vttestpb.Keyspace{ - { - Name: "vttest", - Shards: []*vttestpb.Shard{ - { - Name: "0", - DbNameOverride: "vttest", - }, - }, - }, - }, - }, - OnlyMySQL: true, - Charset: "utf8mb4_general_ci", - } - - env, err := vttest.NewLocalTestEnv(0) - if err != nil { - fmt.Fprintf(os.Stderr, "%v", err) - return 1 - } - env.EnableToxiproxy = true - cluster = vttest.LocalCluster{ - Config: cfg, - Env: env, - } - if err := cluster.Setup(); err != nil { - fmt.Fprintf(os.Stderr, "could not launch mysql: %v\n", err) - return 1 - } - defer cluster.TearDown() - - connParams := cluster.MySQLConnParams() - connAppDebugParams := cluster.MySQLAppDebugConnParams() - config = tabletenv.NewDefaultConfig() - config.SchemaReloadInterval = (2 * time.Second) + (100 * time.Millisecond) - config.SchemaChangeReloadTimeout = 10 * time.Second - config.SignalWhenSchemaChange = true - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - err = framework.StartCustomServer(ctx, connParams, connAppDebugParams, cluster.DbName(), config) - if err != nil { - fmt.Fprintf(os.Stderr, "%v", err) - return 1 - } - defer framework.StopServer() - return m.Run() - }() - os.Exit(exitCode) -} diff --git a/go/vt/vttablet/tabletserver/health_streamer.go b/go/vt/vttablet/tabletserver/health_streamer.go index c13d11df69e..269e3daefd6 100644 --- a/go/vt/vttablet/tabletserver/health_streamer.go +++ b/go/vt/vttablet/tabletserver/health_streamer.go @@ -29,7 +29,6 @@ import ( vtschema "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" - "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/history" @@ -39,7 +38,6 @@ import ( vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" - "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" ) @@ -79,23 +77,12 @@ type healthStreamer struct { se *schema.Engine history *history.History - dbConfig dbconfigs.Connector - conns *connpool.Pool signalWhenSchemaChange bool - reloadTimeout time.Duration viewsEnabled bool } func newHealthStreamer(env tabletenv.Env, alias *topodatapb.TabletAlias, engine *schema.Engine) *healthStreamer { - var pool *connpool.Pool - if env.Config().SignalWhenSchemaChange { - // We need one connection for the reloader. - pool = connpool.NewPool(env, "", tabletenv.ConnPoolConfig{ - Size: 1, - IdleTimeout: env.Config().OltpReadPool.IdleTimeout, - }) - } hs := &healthStreamer{ stats: env.Stats(), degradedThreshold: env.Config().Healthcheck.DegradedThreshold, @@ -110,9 +97,7 @@ func newHealthStreamer(env tabletenv.Env, alias *topodatapb.TabletAlias, engine }, history: history.New(5), - conns: pool, signalWhenSchemaChange: env.Config().SignalWhenSchemaChange, - reloadTimeout: env.Config().SchemaChangeReloadTimeout, viewsEnabled: env.Config().EnableViews, se: engine, } @@ -120,9 +105,8 @@ func newHealthStreamer(env tabletenv.Env, alias *topodatapb.TabletAlias, engine return hs } -func (hs *healthStreamer) InitDBConfig(target *querypb.Target, cp dbconfigs.Connector) { +func (hs *healthStreamer) InitDBConfig(target *querypb.Target) { hs.state.Target = target.CloneVT() - hs.dbConfig = cp } func (hs *healthStreamer) Open() { @@ -133,10 +117,6 @@ func (hs *healthStreamer) Open() { return } hs.ctx, hs.cancel = context.WithCancel(context.Background()) - if hs.conns != nil { - // if we don't have a live conns object, it means we are not configured to signal when the schema changes - hs.conns.Open(hs.dbConfig, hs.dbConfig, hs.dbConfig) - } } func (hs *healthStreamer) Close() { @@ -148,10 +128,6 @@ func (hs *healthStreamer) Close() { hs.cancel() hs.cancel = nil } - if hs.conns != nil { - hs.conns.Close() - hs.conns = nil - } } func (hs *healthStreamer) Stream(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error { @@ -338,16 +314,6 @@ func (hs *healthStreamer) reload(created, altered, dropped []*schema.Table, udfs return nil } - // add a timeout to prevent unbounded waits - ctx, cancel := context.WithTimeout(hs.ctx, hs.reloadTimeout) - defer cancel() - - conn, err := hs.conns.Get(ctx, nil) - if err != nil { - return err - } - defer conn.Recycle() - // We create lists to store the tables that have schema changes. var tables []string var views []string diff --git a/go/vt/vttablet/tabletserver/health_streamer_test.go b/go/vt/vttablet/tabletserver/health_streamer_test.go index a97f64c77c6..95517880339 100644 --- a/go/vt/vttablet/tabletserver/health_streamer_test.go +++ b/go/vt/vttablet/tabletserver/health_streamer_test.go @@ -32,7 +32,6 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/sqltypes" - "vitess.io/vitess/go/vt/dbconfigs" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/vtenv" @@ -41,9 +40,7 @@ import ( ) func TestHealthStreamerClosed(t *testing.T) { - db := fakesqldb.New(t) - defer db.Close() - cfg := newConfig(db) + cfg := newConfig(nil) env := tabletenv.NewEnv(vtenv.NewTestEnv(), cfg, "ReplTrackerTest") alias := &topodatapb.TabletAlias{ Cell: "cell", @@ -59,18 +56,16 @@ func TestHealthStreamerClosed(t *testing.T) { func newConfig(db *fakesqldb.DB) *tabletenv.TabletConfig { cfg := tabletenv.NewDefaultConfig() - cfg.DB = newDBConfigs(db) + if db != nil { + cfg.DB = newDBConfigs(db) + } return cfg } // TestNotServingPrimaryNoWrite makes sure that the health-streamer doesn't write anything to the database when // the state is not serving primary. func TestNotServingPrimaryNoWrite(t *testing.T) { - db := fakesqldb.New(t) - defer db.Close() - cfg := newConfig(db) - cfg.SignalWhenSchemaChange = true - + cfg := newConfig(nil) env := tabletenv.NewEnv(vtenv.NewTestEnv(), cfg, "TestNotServingPrimary") alias := &topodatapb.TabletAlias{ Cell: "cell", @@ -79,11 +74,8 @@ func TestNotServingPrimaryNoWrite(t *testing.T) { // Create a new health streamer and set it to a serving primary state hs := newHealthStreamer(env, alias, &schema.Engine{}) hs.isServingPrimary = true - hs.InitDBConfig(&querypb.Target{TabletType: topodatapb.TabletType_PRIMARY}, cfg.DB.DbaWithDB()) hs.Open() defer hs.Close() - target := &querypb.Target{} - hs.InitDBConfig(target, dbconfigs.New(db.ConnParams())) // Let's say the tablet goes to a non-serving primary state. hs.MakePrimary(false) @@ -93,13 +85,10 @@ func TestNotServingPrimaryNoWrite(t *testing.T) { t1 := schema.NewTable("t1", schema.NoType) err := hs.reload([]*schema.Table{t1}, nil, nil, false) require.NoError(t, err) - require.NoError(t, db.LastError()) } func TestHealthStreamerBroadcast(t *testing.T) { - db := fakesqldb.New(t) - defer db.Close() - cfg := newConfig(db) + cfg := newConfig(nil) cfg.SignalWhenSchemaChange = false env := tabletenv.NewEnv(vtenv.NewTestEnv(), cfg, "ReplTrackerTest") @@ -109,11 +98,8 @@ func TestHealthStreamerBroadcast(t *testing.T) { } blpFunc = testBlpFunc hs := newHealthStreamer(env, alias, &schema.Engine{}) - hs.InitDBConfig(&querypb.Target{TabletType: topodatapb.TabletType_PRIMARY}, cfg.DB.DbaWithDB()) hs.Open() defer hs.Close() - target := &querypb.Target{} - hs.InitDBConfig(target, dbconfigs.New(db.ConnParams())) ch, cancel := testStream(hs) defer cancel() @@ -226,9 +212,6 @@ func TestReloadSchema(t *testing.T) { se := schema.NewEngine(env) hs := newHealthStreamer(env, alias, se) - target := &querypb.Target{TabletType: topodatapb.TabletType_PRIMARY} - configs := cfg.DB - db.AddQueryPattern("SELECT UNIX_TIMESTAMP()"+".*", sqltypes.MakeTestResult( sqltypes.MakeTestFields( "UNIX_TIMESTAMP(now())", @@ -282,8 +265,7 @@ func TestReloadSchema(t *testing.T) { "users|id", )) - hs.InitDBConfig(target, configs.DbaWithDB()) - se.InitDBConfig(configs.DbaWithDB()) + se.InitDBConfig(cfg.DB.DbaWithDB()) hs.Open() defer hs.Close() err := se.Open() @@ -350,7 +332,6 @@ func TestReloadView(t *testing.T) { db := fakesqldb.New(t) defer db.Close() cfg := newConfig(db) - cfg.SignalWhenSchemaChange = true cfg.SchemaReloadInterval = 100 * time.Millisecond cfg.EnableViews = true @@ -359,9 +340,6 @@ func TestReloadView(t *testing.T) { se := schema.NewEngine(env) hs := newHealthStreamer(env, alias, se) - target := &querypb.Target{TabletType: topodatapb.TabletType_PRIMARY} - configs := cfg.DB - db.AddQueryPattern("SELECT UNIX_TIMESTAMP()"+".*", sqltypes.MakeTestResult( sqltypes.MakeTestFields( "UNIX_TIMESTAMP(now())", @@ -411,8 +389,7 @@ func TestReloadView(t *testing.T) { // adding query pattern for udfs db.AddQueryPattern("SELECT name.*", &sqltypes.Result{}) - hs.InitDBConfig(target, configs.DbaWithDB()) - se.InitDBConfig(configs.DbaWithDB()) + se.InitDBConfig(cfg.DB.DbaWithDB()) hs.Open() defer hs.Close() err := se.Open() diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go index f6345b9b29c..02896eeefe0 100644 --- a/go/vt/vttablet/tabletserver/state_manager_test.go +++ b/go/vt/vttablet/tabletserver/state_manager_test.go @@ -31,8 +31,6 @@ import ( "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vtenv" - "vitess.io/vitess/go/mysql/fakesqldb" - "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/log" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -69,7 +67,7 @@ func TestStateManagerStateByName(t *testing.T) { } func TestStateManagerServePrimary(t *testing.T) { - sm := newTestStateManager(t) + sm := newTestStateManager() defer sm.StopService() sm.EnterLameduck() err := sm.SetServingType(topodatapb.TabletType_PRIMARY, testNow, StateServing, "") @@ -100,7 +98,7 @@ func TestStateManagerServePrimary(t *testing.T) { } func TestStateManagerServeNonPrimary(t *testing.T) { - sm := newTestStateManager(t) + sm := newTestStateManager() defer sm.StopService() err := sm.SetServingType(topodatapb.TabletType_REPLICA, testNow, StateServing, "") require.NoError(t, err) @@ -125,7 +123,7 @@ func TestStateManagerServeNonPrimary(t *testing.T) { } func TestStateManagerUnservePrimary(t *testing.T) { - sm := newTestStateManager(t) + sm := newTestStateManager() defer sm.StopService() err := sm.SetServingType(topodatapb.TabletType_PRIMARY, testNow, StateNotServing, "") require.NoError(t, err) @@ -150,7 +148,7 @@ func TestStateManagerUnservePrimary(t *testing.T) { } func TestStateManagerUnserveNonPrimary(t *testing.T) { - sm := newTestStateManager(t) + sm := newTestStateManager() defer sm.StopService() err := sm.SetServingType(topodatapb.TabletType_RDONLY, testNow, StateNotServing, "") require.NoError(t, err) @@ -177,7 +175,7 @@ func TestStateManagerUnserveNonPrimary(t *testing.T) { } func TestStateManagerClose(t *testing.T) { - sm := newTestStateManager(t) + sm := newTestStateManager() defer sm.StopService() err := sm.SetServingType(topodatapb.TabletType_RDONLY, testNow, StateNotConnected, "") require.NoError(t, err) @@ -201,7 +199,7 @@ func TestStateManagerClose(t *testing.T) { } func TestStateManagerStopService(t *testing.T) { - sm := newTestStateManager(t) + sm := newTestStateManager() defer sm.StopService() err := sm.SetServingType(topodatapb.TabletType_REPLICA, testNow, StateServing, "") require.NoError(t, err) @@ -215,7 +213,7 @@ func TestStateManagerStopService(t *testing.T) { } func TestStateManagerGracePeriod(t *testing.T) { - sm := newTestStateManager(t) + sm := newTestStateManager() defer sm.StopService() sm.transitionGracePeriod = 10 * time.Millisecond @@ -269,7 +267,7 @@ func (te *testWatcher) Close() { func TestStateManagerSetServingTypeRace(t *testing.T) { // We don't call StopService because that in turn // will call Close again on testWatcher. - sm := newTestStateManager(t) + sm := newTestStateManager() te := &testWatcher{ t: t, sm: sm, @@ -288,7 +286,7 @@ func TestStateManagerSetServingTypeRace(t *testing.T) { func TestStateManagerSetServingTypeNoChange(t *testing.T) { log.Infof("starting") - sm := newTestStateManager(t) + sm := newTestStateManager() defer sm.StopService() err := sm.SetServingType(topodatapb.TabletType_REPLICA, testNow, StateServing, "") require.NoError(t, err) @@ -319,7 +317,7 @@ func TestStateManagerTransitionFailRetry(t *testing.T) { defer func(saved time.Duration) { transitionRetryInterval = saved }(transitionRetryInterval) transitionRetryInterval = 10 * time.Millisecond - sm := newTestStateManager(t) + sm := newTestStateManager() defer sm.StopService() sm.se.(*testSchemaEngine).failMySQL = true @@ -351,7 +349,7 @@ func TestStateManagerTransitionFailRetry(t *testing.T) { } func TestStateManagerNotConnectedType(t *testing.T) { - sm := newTestStateManager(t) + sm := newTestStateManager() defer sm.StopService() sm.EnterLameduck() err := sm.SetServingType(topodatapb.TabletType_RESTORE, testNow, StateNotServing, "") @@ -404,7 +402,7 @@ func (k *killableConn) SQLParser() *sqlparser.Parser { } func TestStateManagerShutdownGracePeriod(t *testing.T) { - sm := newTestStateManager(t) + sm := newTestStateManager() defer sm.StopService() sm.te = &delayedTxEngine{} @@ -459,7 +457,7 @@ func TestStateManagerCheckMySQL(t *testing.T) { defer func(saved time.Duration) { transitionRetryInterval = saved }(transitionRetryInterval) transitionRetryInterval = 10 * time.Millisecond - sm := newTestStateManager(t) + sm := newTestStateManager() defer sm.StopService() err := sm.SetServingType(topodatapb.TabletType_PRIMARY, testNow, StateServing, "") @@ -527,7 +525,7 @@ func TestStateManagerCheckMySQL(t *testing.T) { func TestStateManagerValidations(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - sm := newTestStateManager(t) + sm := newTestStateManager() target := &querypb.Target{TabletType: topodatapb.TabletType_PRIMARY} sm.target = target.CloneVT() err := sm.StartRequest(ctx, target, false) @@ -590,7 +588,7 @@ func TestStateManagerValidations(t *testing.T) { func TestStateManagerWaitForRequests(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - sm := newTestStateManager(t) + sm := newTestStateManager() defer sm.StopService() target := &querypb.Target{TabletType: topodatapb.TabletType_PRIMARY} sm.target = target @@ -630,7 +628,7 @@ func TestStateManagerWaitForRequests(t *testing.T) { } func TestStateManagerNotify(t *testing.T) { - sm := newTestStateManager(t) + sm := newTestStateManager() defer sm.StopService() blpFunc = testBlpFunc @@ -669,7 +667,7 @@ func TestStateManagerNotify(t *testing.T) { } func TestRefreshReplHealthLocked(t *testing.T) { - sm := newTestStateManager(t) + sm := newTestStateManager() defer sm.StopService() rt := sm.rt.(*testReplTracker) @@ -705,7 +703,7 @@ func TestRefreshReplHealthLocked(t *testing.T) { // TestPanicInWait tests that we don't panic when we wait for requests if more StartRequest calls come up after we start waiting. func TestPanicInWait(t *testing.T) { - sm := newTestStateManager(t) + sm := newTestStateManager() sm.wantState = StateServing sm.state = StateServing sm.replHealthy = true @@ -732,7 +730,7 @@ func verifySubcomponent(t *testing.T, order int64, component any, state testStat assert.Equal(t, state, tos.State()) } -func newTestStateManager(t *testing.T) *stateManager { +func newTestStateManager() *stateManager { order.Store(0) cfg := tabletenv.NewDefaultConfig() parser := sqlparser.NewTestParser() @@ -757,7 +755,7 @@ func newTestStateManager(t *testing.T) *stateManager { rw: newRequestsWaiter(), } sm.Init(env, &querypb.Target{}) - sm.hs.InitDBConfig(&querypb.Target{}, dbconfigs.New(fakesqldb.New(t).ConnParams())) + sm.hs.InitDBConfig(&querypb.Target{}) log.Infof("returning sm: %p", sm) return sm } diff --git a/go/vt/vttablet/tabletserver/tabletenv/config.go b/go/vt/vttablet/tabletserver/tabletenv/config.go index f2cf7e638cf..89f827fa1f3 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config.go @@ -201,7 +201,7 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) { fs.BoolVar(¤tConfig.EnableOnlineDDL, "queryserver_enable_online_ddl", true, "Enable online DDL.") fs.BoolVar(¤tConfig.SanitizeLogMessages, "sanitize_log_messages", false, "Remove potentially sensitive information in tablet INFO, WARNING, and ERROR log messages such as query parameters.") _ = fs.Bool("queryserver-enable-settings-pool", true, "Enable pooling of connections with modified system settings") - fs.MarkDeprecated("queryserver-enable-settings-pool", "New pool implementation does it internally and at the api level this has been enabled since v17") + _ = fs.MarkDeprecated("queryserver-enable-settings-pool", "New pool implementation does it internally and at the api level this has been enabled since v17") fs.Int64Var(¤tConfig.RowStreamer.MaxInnoDBTrxHistLen, "vreplication_copy_phase_max_innodb_history_list_length", 1000000, "The maximum InnoDB transaction history that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet.") fs.Int64Var(¤tConfig.RowStreamer.MaxMySQLReplLagSecs, "vreplication_copy_phase_max_mysql_replication_lag", 43200, "The maximum MySQL replication lag (in seconds) that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet.") @@ -312,24 +312,23 @@ type TabletConfig struct { ReplicationTracker ReplicationTrackerConfig `json:"replicationTracker,omitempty"` // Consolidator can be enable, disable, or notOnPrimary. Default is enable. - Consolidator string `json:"consolidator,omitempty"` - PassthroughDML bool `json:"passthroughDML,omitempty"` - StreamBufferSize int `json:"streamBufferSize,omitempty"` - ConsolidatorStreamTotalSize int64 `json:"consolidatorStreamTotalSize,omitempty"` - ConsolidatorStreamQuerySize int64 `json:"consolidatorStreamQuerySize,omitempty"` - QueryCacheMemory int64 `json:"queryCacheMemory,omitempty"` - QueryCacheDoorkeeper bool `json:"queryCacheDoorkeeper,omitempty"` - SchemaReloadInterval time.Duration `json:"schemaReloadIntervalSeconds,omitempty"` - SignalSchemaChangeReloadInterval time.Duration `json:"signalSchemaChangeReloadIntervalSeconds,omitempty"` - SchemaChangeReloadTimeout time.Duration `json:"schemaChangeReloadTimeout,omitempty"` - WatchReplication bool `json:"watchReplication,omitempty"` - TrackSchemaVersions bool `json:"trackSchemaVersions,omitempty"` - SchemaVersionMaxAgeSeconds int64 `json:"schemaVersionMaxAgeSeconds,omitempty"` - TerseErrors bool `json:"terseErrors,omitempty"` - TruncateErrorLen int `json:"truncateErrorLen,omitempty"` - AnnotateQueries bool `json:"annotateQueries,omitempty"` - MessagePostponeParallelism int `json:"messagePostponeParallelism,omitempty"` - SignalWhenSchemaChange bool `json:"signalWhenSchemaChange,omitempty"` + Consolidator string `json:"consolidator,omitempty"` + PassthroughDML bool `json:"passthroughDML,omitempty"` + StreamBufferSize int `json:"streamBufferSize,omitempty"` + ConsolidatorStreamTotalSize int64 `json:"consolidatorStreamTotalSize,omitempty"` + ConsolidatorStreamQuerySize int64 `json:"consolidatorStreamQuerySize,omitempty"` + QueryCacheMemory int64 `json:"queryCacheMemory,omitempty"` + QueryCacheDoorkeeper bool `json:"queryCacheDoorkeeper,omitempty"` + SchemaReloadInterval time.Duration `json:"schemaReloadIntervalSeconds,omitempty"` + SchemaChangeReloadTimeout time.Duration `json:"schemaChangeReloadTimeout,omitempty"` + WatchReplication bool `json:"watchReplication,omitempty"` + TrackSchemaVersions bool `json:"trackSchemaVersions,omitempty"` + SchemaVersionMaxAgeSeconds int64 `json:"schemaVersionMaxAgeSeconds,omitempty"` + TerseErrors bool `json:"terseErrors,omitempty"` + TruncateErrorLen int `json:"truncateErrorLen,omitempty"` + AnnotateQueries bool `json:"annotateQueries,omitempty"` + MessagePostponeParallelism int `json:"messagePostponeParallelism,omitempty"` + SignalWhenSchemaChange bool `json:"signalWhenSchemaChange,omitempty"` ExternalConnections map[string]*dbconfigs.DBConfigs `json:"externalConnections,omitempty"` @@ -368,9 +367,8 @@ func (cfg *TabletConfig) MarshalJSON() ([]byte, error) { tmp := struct { TCProxy - SchemaReloadInterval string `json:"schemaReloadIntervalSeconds,omitempty"` - SignalSchemaChangeReloadInterval string `json:"signalSchemaChangeReloadIntervalSeconds,omitempty"` - SchemaChangeReloadTimeout string `json:"schemaChangeReloadTimeout,omitempty"` + SchemaReloadInterval string `json:"schemaReloadIntervalSeconds,omitempty"` + SchemaChangeReloadTimeout string `json:"schemaChangeReloadTimeout,omitempty"` }{ TCProxy: TCProxy(*cfg), } @@ -379,10 +377,6 @@ func (cfg *TabletConfig) MarshalJSON() ([]byte, error) { tmp.SchemaReloadInterval = d.String() } - if d := cfg.SignalSchemaChangeReloadInterval; d != 0 { - tmp.SignalSchemaChangeReloadInterval = d.String() - } - if d := cfg.SchemaChangeReloadTimeout; d != 0 { tmp.SchemaChangeReloadTimeout = d.String() } @@ -395,9 +389,8 @@ func (cfg *TabletConfig) UnmarshalJSON(data []byte) (err error) { var tmp struct { TCProxy - SchemaReloadInterval string `json:"schemaReloadIntervalSeconds,omitempty"` - SignalSchemaChangeReloadInterval string `json:"signalSchemaChangeReloadIntervalSeconds,omitempty"` - SchemaChangeReloadTimeout string `json:"schemaChangeReloadTimeout,omitempty"` + SchemaReloadInterval string `json:"schemaReloadIntervalSeconds,omitempty"` + SchemaChangeReloadTimeout string `json:"schemaChangeReloadTimeout,omitempty"` } tmp.TCProxy = TCProxy(*cfg) @@ -417,15 +410,6 @@ func (cfg *TabletConfig) UnmarshalJSON(data []byte) (err error) { cfg.SchemaReloadInterval = 0 } - if tmp.SignalSchemaChangeReloadInterval != "" { - cfg.SignalSchemaChangeReloadInterval, err = time.ParseDuration(tmp.SignalSchemaChangeReloadInterval) - if err != nil { - return err - } - } else { - cfg.SignalSchemaChangeReloadInterval = 0 - } - if tmp.SchemaChangeReloadTimeout != "" { cfg.SchemaChangeReloadTimeout, err = time.ParseDuration(tmp.SchemaChangeReloadTimeout) if err != nil { diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 7a3d7cc5c06..2238195c97d 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -271,7 +271,7 @@ func (tsv *TabletServer) InitDBConfig(target *querypb.Target, dbcfgs *dbconfigs. tsv.rt.InitDBConfig(target, mysqld) tsv.txThrottler.InitDBConfig(target) tsv.vstreamer.InitDBConfig(target.Keyspace, target.Shard) - tsv.hs.InitDBConfig(target, tsv.config.DB.DbaWithDB()) + tsv.hs.InitDBConfig(target) tsv.onlineDDLExecutor.InitDBConfig(target.Keyspace, target.Shard, dbcfgs.DBName) tsv.lagThrottler.InitDBConfig(target.Keyspace, target.Shard) tsv.tableGC.InitDBConfig(target.Keyspace, target.Shard, dbcfgs.DBName)