Skip to content

Commit

Permalink
Cleanup: Health Stream removed db pool usage (#16336)
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal authored Jul 8, 2024
1 parent d9475d8 commit 0db3577
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 328 deletions.
99 changes: 0 additions & 99 deletions go/vt/vttablet/endtoend/streamtimeout/healthstream_test.go

This file was deleted.

102 changes: 0 additions & 102 deletions go/vt/vttablet/endtoend/streamtimeout/main_test.go

This file was deleted.

36 changes: 1 addition & 35 deletions go/vt/vttablet/tabletserver/health_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
vtschema "vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"

"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/servenv"

"vitess.io/vitess/go/history"
Expand All @@ -39,7 +38,6 @@ import (
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
)

Expand Down Expand Up @@ -79,23 +77,12 @@ type healthStreamer struct {
se *schema.Engine
history *history.History

dbConfig dbconfigs.Connector
conns *connpool.Pool
signalWhenSchemaChange bool
reloadTimeout time.Duration

viewsEnabled bool
}

func newHealthStreamer(env tabletenv.Env, alias *topodatapb.TabletAlias, engine *schema.Engine) *healthStreamer {
var pool *connpool.Pool
if env.Config().SignalWhenSchemaChange {
// We need one connection for the reloader.
pool = connpool.NewPool(env, "", tabletenv.ConnPoolConfig{
Size: 1,
IdleTimeout: env.Config().OltpReadPool.IdleTimeout,
})
}
hs := &healthStreamer{
stats: env.Stats(),
degradedThreshold: env.Config().Healthcheck.DegradedThreshold,
Expand All @@ -110,19 +97,16 @@ func newHealthStreamer(env tabletenv.Env, alias *topodatapb.TabletAlias, engine
},

history: history.New(5),
conns: pool,
signalWhenSchemaChange: env.Config().SignalWhenSchemaChange,
reloadTimeout: env.Config().SchemaChangeReloadTimeout,
viewsEnabled: env.Config().EnableViews,
se: engine,
}
hs.unhealthyThreshold.Store(env.Config().Healthcheck.UnhealthyThreshold.Nanoseconds())
return hs
}

func (hs *healthStreamer) InitDBConfig(target *querypb.Target, cp dbconfigs.Connector) {
func (hs *healthStreamer) InitDBConfig(target *querypb.Target) {
hs.state.Target = target.CloneVT()
hs.dbConfig = cp
}

func (hs *healthStreamer) Open() {
Expand All @@ -133,10 +117,6 @@ func (hs *healthStreamer) Open() {
return
}
hs.ctx, hs.cancel = context.WithCancel(context.Background())
if hs.conns != nil {
// if we don't have a live conns object, it means we are not configured to signal when the schema changes
hs.conns.Open(hs.dbConfig, hs.dbConfig, hs.dbConfig)
}
}

func (hs *healthStreamer) Close() {
Expand All @@ -148,10 +128,6 @@ func (hs *healthStreamer) Close() {
hs.cancel()
hs.cancel = nil
}
if hs.conns != nil {
hs.conns.Close()
hs.conns = nil
}
}

func (hs *healthStreamer) Stream(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error {
Expand Down Expand Up @@ -338,16 +314,6 @@ func (hs *healthStreamer) reload(created, altered, dropped []*schema.Table, udfs
return nil
}

// add a timeout to prevent unbounded waits
ctx, cancel := context.WithTimeout(hs.ctx, hs.reloadTimeout)
defer cancel()

conn, err := hs.conns.Get(ctx, nil)
if err != nil {
return err
}
defer conn.Recycle()

// We create lists to store the tables that have schema changes.
var tables []string
var views []string
Expand Down
Loading

0 comments on commit 0db3577

Please sign in to comment.