Skip to content

Commit

Permalink
introduced executor config
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal committed Jan 16, 2025
1 parent 26be5e5 commit 6dd85aa
Show file tree
Hide file tree
Showing 13 changed files with 159 additions and 197 deletions.
7 changes: 6 additions & 1 deletion go/vt/vtexplain/vtexplain_vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,12 @@ func (vte *VTExplain) initVtgateExecutor(ctx context.Context, ts *topo.Server, v
var schemaTracker vtgate.SchemaInfo // no schema tracker for these tests
queryLogBufferSize := 10
plans := theine.NewStore[vtgate.PlanCacheKey, *engine.Plan](4*1024*1024, false)
vte.vtgateExecutor = vtgate.NewExecutor(ctx, vte.env, vte.explainTopo, Cell, resolver, opts.Normalize, false, streamSize, plans, schemaTracker, false, opts.PlannerVersion, 0, vtgate.NewDynamicViperConfig(), "")
eConfig := vtgate.ExecutorConfig{
Normalize: opts.Normalize,
StreamSize: streamSize,
AllowScatter: true,
}
vte.vtgateExecutor = vtgate.NewExecutor(ctx, vte.env, vte.explainTopo, Cell, resolver, eConfig, false, plans, schemaTracker, opts.PlannerVersion, vtgate.NewDynamicViperConfig())
vte.vtgateExecutor.SetQueryLogger(streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize))

return nil
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func BenchmarkWithNormalizer(b *testing.B) {
func BenchmarkWithoutNormalizer(b *testing.B) {
vtgateInst, _, ctx := createVtgateEnv(b)

vtgateInst.executor.normalize = false
vtgateInst.executor.config.Normalize = false

for i := 0; i < b.N; i++ {
_, _, err := vtgateInst.Execute(
Expand Down
89 changes: 44 additions & 45 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,40 +107,45 @@ func init() {

// Executor is the engine that executes queries by utilizing
// the abilities of the underlying vttablets.
type Executor struct {
env *vtenv.Environment
serv srvtopo.Server
cell string
resolver *Resolver
scatterConn *ScatterConn
txConn *TxConn
type (
ExecutorConfig struct {
Normalize bool
StreamSize int
// AllowScatter will fail planning if set to false and a plan contains any scatter queries
AllowScatter bool
WarmingReadsPercent int
QueryLogToFile string
}

mu sync.Mutex
vschema *vindexes.VSchema
streamSize int
vschemaStats *VSchemaStats
Executor struct {
config ExecutorConfig

plans *PlanCache
epoch atomic.Uint32
env *vtenv.Environment
serv srvtopo.Server
cell string
resolver *Resolver
scatterConn *ScatterConn
txConn *TxConn

normalize bool
mu sync.Mutex
vschema *vindexes.VSchema
vschemaStats *VSchemaStats

vm *VSchemaManager
schemaTracker SchemaInfo
plans *PlanCache
epoch atomic.Uint32

// allowScatter will fail planning if set to false and a plan contains any scatter queries
allowScatter bool
vm *VSchemaManager
schemaTracker SchemaInfo

// queryLogger is passed in for logging from this vtgate executor.
queryLogger *streamlog.StreamLogger[*logstats.LogStats]
// queryLogger is passed in for logging from this vtgate executor.
queryLogger *streamlog.StreamLogger[*logstats.LogStats]

warmingReadsPercent int
warmingReadsChannel chan bool
warmingReadsChannel chan bool

vConfig econtext.VCursorConfig
ddlConfig dynamicconfig.DDL
queryLogToFile string
}
vConfig econtext.VCursorConfig
ddlConfig dynamicconfig.DDL
}
)

var executorOnce sync.Once

Expand All @@ -164,32 +169,26 @@ func NewExecutor(
serv srvtopo.Server,
cell string,
resolver *Resolver,
normalize, warnOnShardedOnly bool,
streamSize int,
eConfig ExecutorConfig,
warnOnShardedOnly bool,
plans *PlanCache,
schemaTracker SchemaInfo,
noScatter bool,
pv plancontext.PlannerVersion,
warmingReadsPercent int,
ddlConfig dynamicconfig.DDL,
queryLogToFile string,
) *Executor {
e := &Executor{
env: env,
serv: serv,
cell: cell,
resolver: resolver,
scatterConn: resolver.scatterConn,
txConn: resolver.scatterConn.txConn,
normalize: normalize,
streamSize: streamSize,
config: eConfig,
env: env,
serv: serv,
cell: cell,
resolver: resolver,
scatterConn: resolver.scatterConn,
txConn: resolver.scatterConn.txConn,

schemaTracker: schemaTracker,
allowScatter: !noScatter,
plans: plans,
warmingReadsPercent: warmingReadsPercent,
warmingReadsChannel: make(chan bool, warmingReadsConcurrency),
ddlConfig: ddlConfig,
queryLogToFile: queryLogToFile,
}
// setting the vcursor config.
e.initVConfig(warnOnShardedOnly, pv)
Expand Down Expand Up @@ -330,7 +329,7 @@ func (e *Executor) StreamExecute(
byteCount += col.Len()
}

if byteCount >= e.streamSize {
if byteCount >= e.config.StreamSize {
err := callback(result)
seenResults.Store(true)
result = &sqltypes.Result{}
Expand Down Expand Up @@ -1422,7 +1421,7 @@ func (e *Executor) initVConfig(warnOnShardedOnly bool, pv plancontext.PlannerVer

DBDDLPlugin: dbDDLPlugin,

WarmingReadsPercent: e.warmingReadsPercent,
WarmingReadsPercent: e.config.WarmingReadsPercent,
WarmingReadsTimeout: warmingReadsQueryTimeout,
WarmingReadsChannel: e.warmingReadsChannel,
}
Expand Down Expand Up @@ -1542,7 +1541,7 @@ func (e *Executor) startVStream(ctx context.Context, rss []*srvtopo.ResolvedShar
}

func (e *Executor) checkThatPlanIsValid(stmt sqlparser.Statement, plan *engine.Plan) error {
if e.allowScatter || plan.Instructions == nil || sqlparser.AllowScatterDirective(stmt) {
if e.config.AllowScatter || plan.Instructions == nil || sqlparser.AllowScatterDirective(stmt) {
return nil
}
// we go over all the primitives in the plan, searching for a route that is of SelectScatter opcode
Expand Down
12 changes: 4 additions & 8 deletions go/vt/vtgate/executor_dml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,9 +608,8 @@ func TestUpdateComments(t *testing.T) {
}

func TestUpdateNormalize(t *testing.T) {
executor, sbc1, sbc2, _, ctx := createExecutorEnv(t)
executor, sbc1, sbc2, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer())

executor.normalize = true
session := &vtgatepb.Session{
TargetString: "@primary",
}
Expand Down Expand Up @@ -1337,7 +1336,7 @@ func TestInsertSharded(t *testing.T) {
testQueryLog(t, executor, logChan, "TestExecute", "INSERT", "insert into user2(id, `name`, lastname) values (2, 'myname', 'mylastname')", 1)

// insert with binary values
executor.normalize = true
executor.config.Normalize = true
sbc1.Queries = nil
sbc2.Queries = nil
sbclookup.Queries = nil
Expand Down Expand Up @@ -1368,8 +1367,7 @@ func TestInsertSharded(t *testing.T) {
}

func TestInsertNegativeValue(t *testing.T) {
executor, sbc1, sbc2, sbclookup, ctx := createExecutorEnv(t)
executor.normalize = true
executor, sbc1, sbc2, sbclookup, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer())

logChan := executor.queryLogger.Subscribe("Test")
defer executor.queryLogger.Unsubscribe(logChan)
Expand Down Expand Up @@ -2528,9 +2526,7 @@ func TestDeleteEqualWithPrepare(t *testing.T) {
}

func TestUpdateLastInsertID(t *testing.T) {
executor, sbc1, _, _, ctx := createExecutorEnv(t)

executor.normalize = true
executor, sbc1, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer())

sql := "update user set a = last_insert_id() where id = 1"
session := &vtgatepb.Session{
Expand Down
38 changes: 27 additions & 11 deletions go/vt/vtgate/executor_framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,6 @@ var badVSchema = `
}
`

const (
testBufferSize = 10
)

type DestinationAnyShardPickerFirstShard struct{}

func (dp DestinationAnyShardPickerFirstShard) PickShard(shardCount int) int {
Expand Down Expand Up @@ -130,7 +126,7 @@ func init() {
vindexes.Register("keyrange_lookuper_unique", newKeyRangeLookuperUnique)
}

func createExecutorEnvCallback(t testing.TB, eachShard func(shard, ks string, tabletType topodatapb.TabletType, conn *sandboxconn.SandboxConn)) (executor *Executor, ctx context.Context) {
func createExecutorEnvCallback(t testing.TB, eConfig ExecutorConfig, eachShard func(shard, ks string, tabletType topodatapb.TabletType, conn *sandboxconn.SandboxConn)) (executor *Executor, ctx context.Context) {
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(context.Background())
cell := "aa"
Expand Down Expand Up @@ -183,7 +179,7 @@ func createExecutorEnvCallback(t testing.TB, eachShard func(shard, ks string, ta
// one-off queries from thrashing the cache. Disable the doorkeeper in the tests to prevent flakiness.
plans := theine.NewStore[PlanCacheKey, *engine.Plan](queryPlanCacheMemory, false)

executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, NewDynamicViperConfig(), "")
executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, eConfig, false, plans, nil, querypb.ExecuteOptions_Gen4, NewDynamicViperConfig())
executor.SetQueryLogger(queryLogger)

key.AnyShardPicker = DestinationAnyShardPickerFirstShard{}
Expand All @@ -198,7 +194,11 @@ func createExecutorEnvCallback(t testing.TB, eachShard func(shard, ks string, ta
}

func createExecutorEnv(t testing.TB) (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn.SandboxConn, ctx context.Context) {
executor, ctx = createExecutorEnvCallback(t, func(shard, ks string, tabletType topodatapb.TabletType, conn *sandboxconn.SandboxConn) {
return createExecutorEnvWithConfig(t, createExecutorConfig())
}

func createExecutorEnvWithConfig(t testing.TB, eConfig ExecutorConfig) (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn.SandboxConn, ctx context.Context) {
executor, ctx = createExecutorEnvCallback(t, eConfig, func(shard, ks string, tabletType topodatapb.TabletType, conn *sandboxconn.SandboxConn) {
switch {
case ks == KsTestSharded && shard == "-20":
sbc1 = conn
Expand All @@ -210,7 +210,6 @@ func createExecutorEnv(t testing.TB) (executor *Executor, sbc1, sbc2, sbclookup
})
return
}

func createCustomExecutor(t testing.TB, vschema string, mysqlVersion string) (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn.SandboxConn, ctx context.Context) {
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(context.Background())
Expand All @@ -232,7 +231,7 @@ func createCustomExecutor(t testing.TB, vschema string, mysqlVersion string) (ex
plans := DefaultPlanCache()
env, err := vtenv.New(vtenv.Options{MySQLServerVersion: mysqlVersion})
require.NoError(t, err)
executor = NewExecutor(ctx, env, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, NewDynamicViperConfig(), "")
executor = NewExecutor(ctx, env, serv, cell, resolver, createExecutorConfig(), false, plans, nil, querypb.ExecuteOptions_Gen4, NewDynamicViperConfig())
executor.SetQueryLogger(queryLogger)

t.Cleanup(func() {
Expand All @@ -244,6 +243,21 @@ func createCustomExecutor(t testing.TB, vschema string, mysqlVersion string) (ex
return executor, sbc1, sbc2, sbclookup, ctx
}

func createExecutorConfig() ExecutorConfig {
return ExecutorConfig{
StreamSize: 10,
AllowScatter: true,
}
}

func createExecutorConfigWithNormalizer() ExecutorConfig {
return ExecutorConfig{
StreamSize: 10,
AllowScatter: true,
Normalize: true,
}
}

func createCustomExecutorSetValues(t testing.TB, vschema string, values []*sqltypes.Result) (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn.SandboxConn, ctx context.Context) {
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(context.Background())
Expand All @@ -269,7 +283,7 @@ func createCustomExecutorSetValues(t testing.TB, vschema string, values []*sqlty
sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil)
queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)
plans := DefaultPlanCache()
executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, NewDynamicViperConfig(), "")
executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, createExecutorConfig(), false, plans, nil, querypb.ExecuteOptions_Gen4, NewDynamicViperConfig())
executor.SetQueryLogger(queryLogger)

t.Cleanup(func() {
Expand All @@ -294,7 +308,9 @@ func createExecutorEnvWithPrimaryReplicaConn(t testing.TB, ctx context.Context,
replica = hc.AddTestTablet(cell, "0-replica", 1, KsTestUnsharded, "0", topodatapb.TabletType_REPLICA, true, 1, nil)

queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)
executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, DefaultPlanCache(), nil, false, querypb.ExecuteOptions_Gen4, warmingReadsPercent, NewDynamicViperConfig(), "")
eConfig := createExecutorConfig()
eConfig.WarmingReadsPercent = warmingReadsPercent
executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, eConfig, false, DefaultPlanCache(), nil, querypb.ExecuteOptions_Gen4, NewDynamicViperConfig())
executor.SetQueryLogger(queryLogger)

t.Cleanup(func() {
Expand Down
Loading

0 comments on commit 6dd85aa

Please sign in to comment.