diff --git a/go/vt/vttablet/tabletserver/query_executor_test.go b/go/vt/vttablet/tabletserver/query_executor_test.go index ade79ecaef5..5820b809c8e 100644 --- a/go/vt/vttablet/tabletserver/query_executor_test.go +++ b/go/vt/vttablet/tabletserver/query_executor_test.go @@ -1862,8 +1862,9 @@ func (m mockTxThrottler) Open() (err error) { return nil } -func (m mockTxThrottler) Close() { -} +func (m mockTxThrottler) Close() {} +func (m mockTxThrottler) MakePrimary() {} +func (m mockTxThrottler) MakeNonPrimary() {} func (m mockTxThrottler) Throttle(priority int, workload string) (result bool) { return m.throttle diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index cae6a237dc8..b069f2704e0 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -175,6 +175,8 @@ type ( txThrottler interface { Open() error Close() + MakePrimary() + MakeNonPrimary() } onlineDDLExecutor interface { @@ -457,6 +459,7 @@ func (sm *stateManager) servePrimary() error { sm.hs.MakePrimary(true) sm.se.MakePrimary(true) sm.rt.MakePrimary() + sm.txThrottler.MakePrimary() sm.tracker.Open() // We instantly kill all stateful queries to allow for // te to quickly transition into RW, but olap and stateless @@ -483,6 +486,7 @@ func (sm *stateManager) unservePrimary() error { sm.se.MakePrimary(false) sm.hs.MakePrimary(false) sm.rt.MakePrimary() + sm.txThrottler.MakePrimary() sm.setState(topodatapb.TabletType_PRIMARY, StateNotServing) return nil } @@ -499,6 +503,7 @@ func (sm *stateManager) serveNonPrimary(wantTabletType topodatapb.TabletType) er sm.tracker.Close() sm.se.MakeNonPrimary() sm.hs.MakeNonPrimary() + sm.txThrottler.MakeNonPrimary() if err := sm.connect(wantTabletType, true); err != nil { return err @@ -517,6 +522,7 @@ func (sm *stateManager) unserveNonPrimary(wantTabletType topodatapb.TabletType) sm.se.MakeNonPrimary() sm.hs.MakeNonPrimary() + sm.txThrottler.MakeNonPrimary() if err := sm.connect(wantTabletType, false); err != nil { return err @@ -539,7 +545,7 @@ func (sm *stateManager) connect(tabletType topodatapb.TabletType, serving bool) if err := sm.qe.Open(); err != nil { return err } - return sm.txThrottler.Open() + return nil } func (sm *stateManager) unserveCommon() { diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go index df819c6f05c..1fcc6a734f3 100644 --- a/go/vt/vttablet/tabletserver/state_manager_test.go +++ b/go/vt/vttablet/tabletserver/state_manager_test.go @@ -77,12 +77,11 @@ func TestStateManagerServePrimary(t *testing.T) { assert.Equal(t, testNow, sm.ptsTimestamp) verifySubcomponent(t, 1, sm.watcher, testStateClosed) - verifySubcomponent(t, 2, sm.se, testStateOpen) verifySubcomponent(t, 3, sm.vstreamer, testStateOpen) verifySubcomponent(t, 4, sm.qe, testStateOpen) - verifySubcomponent(t, 5, sm.txThrottler, testStateOpen) - verifySubcomponent(t, 6, sm.rt, testStatePrimary) + verifySubcomponent(t, 5, sm.rt, testStatePrimary) + verifySubcomponent(t, 6, sm.txThrottler, testStatePrimary) verifySubcomponent(t, 7, sm.tracker, testStateOpen) verifySubcomponent(t, 8, sm.te, testStatePrimary) verifySubcomponent(t, 9, sm.messager, testStateOpen) @@ -109,10 +108,10 @@ func TestStateManagerServeNonPrimary(t *testing.T) { verifySubcomponent(t, 4, sm.tracker, testStateClosed) assert.True(t, sm.se.(*testSchemaEngine).nonPrimary) - verifySubcomponent(t, 5, sm.se, testStateOpen) - verifySubcomponent(t, 6, sm.vstreamer, testStateOpen) - verifySubcomponent(t, 7, sm.qe, testStateOpen) - verifySubcomponent(t, 8, sm.txThrottler, testStateOpen) + verifySubcomponent(t, 5, sm.txThrottler, testStateNonPrimary) + verifySubcomponent(t, 6, sm.se, testStateOpen) + verifySubcomponent(t, 7, sm.vstreamer, testStateOpen) + verifySubcomponent(t, 8, sm.qe, testStateOpen) verifySubcomponent(t, 9, sm.te, testStateNonPrimary) verifySubcomponent(t, 10, sm.rt, testStateNonPrimary) verifySubcomponent(t, 11, sm.watcher, testStateOpen) @@ -139,9 +138,9 @@ func TestStateManagerUnservePrimary(t *testing.T) { verifySubcomponent(t, 8, sm.se, testStateOpen) verifySubcomponent(t, 9, sm.vstreamer, testStateOpen) verifySubcomponent(t, 10, sm.qe, testStateOpen) - verifySubcomponent(t, 11, sm.txThrottler, testStateOpen) - verifySubcomponent(t, 12, sm.rt, testStatePrimary) + verifySubcomponent(t, 11, sm.rt, testStatePrimary) + verifySubcomponent(t, 12, sm.txThrottler, testStatePrimary) assert.Equal(t, topodatapb.TabletType_PRIMARY, sm.target.TabletType) assert.Equal(t, StateNotServing, sm.state) @@ -162,10 +161,10 @@ func TestStateManagerUnserveNonPrimary(t *testing.T) { verifySubcomponent(t, 6, sm.tracker, testStateClosed) assert.True(t, sm.se.(*testSchemaEngine).nonPrimary) - verifySubcomponent(t, 7, sm.se, testStateOpen) - verifySubcomponent(t, 8, sm.vstreamer, testStateOpen) - verifySubcomponent(t, 9, sm.qe, testStateOpen) - verifySubcomponent(t, 10, sm.txThrottler, testStateOpen) + verifySubcomponent(t, 7, sm.txThrottler, testStateNonPrimary) + verifySubcomponent(t, 8, sm.se, testStateOpen) + verifySubcomponent(t, 9, sm.vstreamer, testStateOpen) + verifySubcomponent(t, 10, sm.qe, testStateOpen) verifySubcomponent(t, 11, sm.rt, testStateNonPrimary) verifySubcomponent(t, 12, sm.watcher, testStateOpen) @@ -300,10 +299,10 @@ func TestStateManagerSetServingTypeNoChange(t *testing.T) { verifySubcomponent(t, 4, sm.tracker, testStateClosed) assert.True(t, sm.se.(*testSchemaEngine).nonPrimary) - verifySubcomponent(t, 5, sm.se, testStateOpen) - verifySubcomponent(t, 6, sm.vstreamer, testStateOpen) - verifySubcomponent(t, 7, sm.qe, testStateOpen) - verifySubcomponent(t, 8, sm.txThrottler, testStateOpen) + verifySubcomponent(t, 5, sm.txThrottler, testStateNonPrimary) + verifySubcomponent(t, 6, sm.se, testStateOpen) + verifySubcomponent(t, 7, sm.vstreamer, testStateOpen) + verifySubcomponent(t, 8, sm.qe, testStateOpen) verifySubcomponent(t, 9, sm.te, testStateNonPrimary) verifySubcomponent(t, 10, sm.rt, testStateNonPrimary) verifySubcomponent(t, 11, sm.watcher, testStateOpen) @@ -819,7 +818,7 @@ func (te *testSchemaEngine) EnsureConnectionAndDB(topodatapb.TabletType, bool) e } func (te *testSchemaEngine) Open() error { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateOpen return nil } @@ -833,7 +832,7 @@ func (te *testSchemaEngine) MakePrimary(serving bool) { } func (te *testSchemaEngine) Close() { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateClosed } @@ -844,17 +843,17 @@ type testReplTracker struct { } func (te *testReplTracker) MakePrimary() { - te.order = order.Add(1) + te.order = addOrder() te.state = testStatePrimary } func (te *testReplTracker) MakeNonPrimary() { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateNonPrimary } func (te *testReplTracker) Close() { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateClosed } @@ -869,7 +868,7 @@ type testQueryEngine struct { } func (te *testQueryEngine) Open() error { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateOpen return nil } @@ -883,7 +882,7 @@ func (te *testQueryEngine) IsMySQLReachable() error { } func (te *testQueryEngine) Close() { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateClosed } @@ -892,17 +891,17 @@ type testTxEngine struct { } func (te *testTxEngine) AcceptReadWrite() { - te.order = order.Add(1) + te.order = addOrder() te.state = testStatePrimary } func (te *testTxEngine) AcceptReadOnly() { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateNonPrimary } func (te *testTxEngine) Close() { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateClosed } @@ -913,12 +912,12 @@ type testSubcomponent struct { } func (te *testSubcomponent) Open() { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateOpen } func (te *testSubcomponent) Close() { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateClosed } @@ -927,28 +926,44 @@ type testTxThrottler struct { } func (te *testTxThrottler) Open() error { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateOpen return nil } func (te *testTxThrottler) Close() { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateClosed } +func (te *testTxThrottler) MakePrimary() { + te.order = addOrder() + te.state = testStatePrimary +} + +func (te *testTxThrottler) MakeNonPrimary() { + te.order = addOrder() + te.state = testStateNonPrimary +} + +func addOrder() int64 { + newVal := order.Add(1) + + return newVal +} + type testOnlineDDLExecutor struct { testOrderState } func (te *testOnlineDDLExecutor) Open() error { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateOpen return nil } func (te *testOnlineDDLExecutor) Close() { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateClosed } @@ -957,13 +972,13 @@ type testLagThrottler struct { } func (te *testLagThrottler) Open() error { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateOpen return nil } func (te *testLagThrottler) Close() { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateClosed } @@ -972,12 +987,12 @@ type testTableGC struct { } func (te *testTableGC) Open() error { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateOpen return nil } func (te *testTableGC) Close() { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateClosed } diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index cf0a88ad310..2413c13718d 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -71,6 +71,8 @@ type TxThrottler interface { InitDBConfig(target *querypb.Target) Open() (err error) Close() + MakePrimary() + MakeNonPrimary() Throttle(priority int, workload string) (result bool) } @@ -135,6 +137,8 @@ type txThrottler struct { } type txThrottlerState interface { + makePrimary() + makeNonPrimary() deallocateResources() StatsUpdate(tabletStats *discovery.TabletHealth) throttle() bool @@ -143,6 +147,7 @@ type txThrottlerState interface { // txThrottlerStateImpl holds the state of an open TxThrottler object. type txThrottlerStateImpl struct { config *tabletenv.TabletConfig + target *querypb.Target txThrottler *txThrottler // throttleMu serializes calls to throttler.Throttler.Throttle(threadId). @@ -153,17 +158,16 @@ type txThrottlerStateImpl struct { ctx context.Context cancel context.CancelFunc - healthCheck discovery.HealthCheck - healthCheckChan chan *discovery.TabletHealth - healthCheckCells []string - cellsFromTopo bool + cellsFromTopo bool + healthCheck discovery.HealthCheck + healthCheckCancel context.CancelFunc + healthCheckCells []string + healthCheckChan chan *discovery.TabletHealth + maxLag int64 + wg sync.WaitGroup // tabletTypes stores the tablet types for throttling tabletTypes map[topodatapb.TabletType]bool - - maxLag int64 - done chan bool - waitForTermination sync.WaitGroup } // NewTxThrottler tries to construct a txThrottler from the relevant @@ -228,6 +232,22 @@ func (t *txThrottler) Close() { log.Info("txThrottler: closed") } +// MakePrimary performs a transition to a primary tablet. This will enable healthchecks to +// watch the replication lag state of other tablets. +func (t *txThrottler) MakePrimary() { + if t.state != nil { + t.state.makePrimary() + } +} + +// MakePrimary performs a transition to a non-primary tablet. This disables healthchecks +// for replication lag state if we were primary. +func (t *txThrottler) MakeNonPrimary() { + if t.state != nil { + t.state.makeNonPrimary() + } +} + // Throttle should be called before a new transaction is started. // It returns true if the transaction should not proceed (the caller // should back off). Throttle requires that Open() was previously called @@ -275,16 +295,27 @@ func newTxThrottlerState(txThrottler *txThrottler, config *tabletenv.TabletConfi tabletTypes[tabletType] = true } + // get cells from topo if none defined in tabletenv config + var cellsFromTopo bool + healthCheckCells := config.TxThrottlerHealthCheckCells + if len(healthCheckCells) == 0 { + ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) + defer cancel() + healthCheckCells = fetchKnownCells(ctx, txThrottler.topoServer, target) + cellsFromTopo = true + } + ctx, cancel := context.WithCancel(context.Background()) state := &txThrottlerStateImpl{ ctx: ctx, cancel: cancel, config: config, - healthCheckCells: config.TxThrottlerHealthCheckCells, + cellsFromTopo: cellsFromTopo, + healthCheckCells: healthCheckCells, tabletTypes: tabletTypes, + target: target, throttler: t, txThrottler: txThrottler, - done: make(chan bool, 1), } // get cells from topo if none defined in tabletenv config @@ -295,32 +326,32 @@ func newTxThrottlerState(txThrottler *txThrottler, config *tabletenv.TabletConfi state.cellsFromTopo = true } - if err := state.initHealthCheckStream(txThrottler.topoServer, target); err != nil { - return nil, err - } - state.healthCheck.RegisterStats() - go state.healthChecksProcessor(txThrottler.topoServer, target) - state.waitForTermination.Add(1) - go state.updateMaxLag() - return state, nil } -func (ts *txThrottlerStateImpl) initHealthCheckStream(topoServer *topo.Server, target *querypb.Target) (err error) { +func (ts *txThrottlerStateImpl) initHealthCheck(topoServer *topo.Server, target *querypb.Target) (err error) { ts.healthCheck, err = healthCheckFactory(ts.ctx, topoServer, target.Cell, target.Keyspace, target.Shard, ts.healthCheckCells) if err != nil { + ts.healthCheck = nil + return err } + ts.healthCheckChan = ts.healthCheck.Subscribe() + ts.healthCheck.RegisterStats() + return nil } -func (ts *txThrottlerStateImpl) closeHealthCheckStream() { +func (ts *txThrottlerStateImpl) closeHealthCheck() { if ts.healthCheck == nil { return } + ts.cancel() ts.healthCheck.Close() + ts.healthCheck = nil + ts.maxLag = 0 } func (ts *txThrottlerStateImpl) updateHealthCheckCells(topoServer *topo.Server, target *querypb.Target) error { @@ -331,13 +362,15 @@ func (ts *txThrottlerStateImpl) updateHealthCheckCells(topoServer *topo.Server, if !slices.Equal(knownCells, ts.healthCheckCells) { log.Info("txThrottler: restarting healthcheck stream due to topology cells update") ts.healthCheckCells = knownCells - ts.closeHealthCheckStream() - return ts.initHealthCheckStream(topoServer, target) + ts.closeHealthCheck() + return ts.initHealthCheck(topoServer, target) } + return nil } func (ts *txThrottlerStateImpl) healthChecksProcessor(topoServer *topo.Server, target *querypb.Target) { + defer ts.wg.Done() var cellsUpdateTicks <-chan time.Time if ts.cellsFromTopo { ticker := time.NewTicker(ts.config.TxThrottlerTopoRefreshInterval) @@ -358,11 +391,38 @@ func (ts *txThrottlerStateImpl) healthChecksProcessor(topoServer *topo.Server, t } } +func (ts *txThrottlerStateImpl) makePrimary() { + err := ts.initHealthCheck(ts.txThrottler.topoServer, ts.target) + if err != nil { + log.Errorf("txThrottler: failed to initialize health check while attempting to make primary: %v", err) + + return + } + + var ctx context.Context + ctx, ts.healthCheckCancel = context.WithCancel(context.Background()) + + ts.wg.Add(1) + go ts.healthChecksProcessor(ts.txThrottler.topoServer, ts.target) + + ts.wg.Add(1) + go ts.updateMaxLag(ctx) +} + +func (ts *txThrottlerStateImpl) makeNonPrimary() { + ts.closeHealthCheck() +} + func (ts *txThrottlerStateImpl) throttle() bool { if ts.throttler == nil { log.Error("txThrottler: throttle called after deallocateResources was called") return false } + // return false if we are not watching lag + if ts.healthCheck == nil { + return false + } + // Serialize calls to ts.throttle.Throttle() ts.throttleMu.Lock() defer ts.throttleMu.Unlock() @@ -373,17 +433,17 @@ func (ts *txThrottlerStateImpl) throttle() bool { ts.throttler.Throttle(0 /* threadId */) > 0 } -func (ts *txThrottlerStateImpl) updateMaxLag() { - defer ts.waitForTermination.Done() +func (ts *txThrottlerStateImpl) updateMaxLag(ctx context.Context) { + defer ts.wg.Done() // We use half of the target lag to ensure we have enough resolution to see changes in lag below that value ticker := time.NewTicker(time.Duration(ts.config.TxThrottlerConfig.TargetReplicationLagSec/2) * time.Second) defer ticker.Stop() -outerloop: for { select { + case <-ctx.Done(): + return case <-ticker.C: var maxLag uint32 - for tabletType := range ts.tabletTypes { maxLagPerTabletType := ts.throttler.MaxLag(tabletType) if maxLagPerTabletType > maxLag { @@ -391,28 +451,23 @@ outerloop: } } atomic.StoreInt64(&ts.maxLag, int64(maxLag)) - case <-ts.done: - break outerloop } } } func (ts *txThrottlerStateImpl) deallocateResources() { - // Close healthcheck and topo watchers - ts.closeHealthCheckStream() - ts.healthCheck = nil + // Close healthcheck and max lag updater + ts.closeHealthCheck() - ts.done <- true - ts.waitForTermination.Wait() // After ts.healthCheck is closed txThrottlerStateImpl.StatsUpdate() is guaranteed not // to be executing, so we can safely close the throttler. ts.throttler.Close() ts.throttler = nil } -// StatsUpdate updates the health of a tablet with the given healthcheck. +// StatsUpdate updates the health of a tablet with the given healthcheck, when primary. func (ts *txThrottlerStateImpl) StatsUpdate(tabletStats *discovery.TabletHealth) { - if len(ts.tabletTypes) == 0 { + if ts.healthCheck == nil || len(ts.tabletTypes) == 0 { return } diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index 1d3c9f57e72..514ef7a1c14 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -51,6 +51,7 @@ func TestDisabledThrottler(t *testing.T) { Shard: "shard", }) assert.Nil(t, throttler.Open()) + throttler.MakePrimary() assert.False(t, throttler.Throttle(0, "some-workload")) throttlerImpl, _ := throttler.(*txThrottler) assert.Zero(t, throttlerImpl.throttlerRunning.Get()) @@ -144,9 +145,17 @@ func TestEnabledThrottler(t *testing.T) { assert.Equal(t, map[topodatapb.TabletType]bool{topodatapb.TabletType_REPLICA: true}, throttlerStateImpl.tabletTypes) assert.Equal(t, int64(1), throttlerImpl.throttlerRunning.Get()) - // Stop the go routine that keeps updating the cached shard's max lag to prevent it from changing the value in a + // check .throttle() returns false when non-primary and healthCheck is nil (not watching for lag) + assert.False(t, throttlerStateImpl.throttle()) + assert.Nil(t, throttlerStateImpl.healthCheck) + + // makePrimary and confirm healthcheck starts + throttlerStateImpl.makePrimary() + assert.NotNil(t, throttlerStateImpl.healthCheck) + + // Stop the lag/healthcheck go routines that keeps updating the cached shard's max lag to prevent it from changing the value in a // way that will interfere with how we manipulate that value in our tests to evaluate different cases: - throttlerStateImpl.done <- true + throttlerStateImpl.healthCheckCancel() // 1 should not throttle due to return value of underlying Throttle(), despite high lag atomic.StoreInt64(&throttlerStateImpl.maxLag, 20) @@ -243,12 +252,10 @@ type mockTxThrottlerState struct { shouldThrottle bool } -func (t *mockTxThrottlerState) deallocateResources() { - -} -func (t *mockTxThrottlerState) StatsUpdate(tabletStats *discovery.TabletHealth) { - -} +func (t *mockTxThrottlerState) makePrimary() {} +func (t *mockTxThrottlerState) makeNonPrimary() {} +func (t *mockTxThrottlerState) deallocateResources() {} +func (t *mockTxThrottlerState) StatsUpdate(tabletStats *discovery.TabletHealth) {} func (t *mockTxThrottlerState) throttle() bool { return t.shouldThrottle