From 6dd85aaaaa1cdec0b5714ff43b035c8e82d44a45 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Thu, 16 Jan 2025 19:32:23 +0530 Subject: [PATCH] introduced executor config Signed-off-by: Harshit Gangal --- go/vt/vtexplain/vtexplain_vtgate.go | 7 +- go/vt/vtgate/bench_test.go | 2 +- go/vt/vtgate/executor.go | 89 ++++++++-------- go/vt/vtgate/executor_dml_test.go | 12 +-- go/vt/vtgate/executor_framework_test.go | 38 +++++-- go/vt/vtgate/executor_select_test.go | 129 +++++++++--------------- go/vt/vtgate/executor_set_test.go | 7 +- go/vt/vtgate/executor_stream_test.go | 2 +- go/vt/vtgate/executor_test.go | 35 ++----- go/vt/vtgate/plan_execute.go | 2 +- go/vt/vtgate/querylog.go | 4 +- go/vt/vtgate/vtgate.go | 26 ++--- go/vt/vtgate/vtgate_test.go | 3 +- 13 files changed, 159 insertions(+), 197 deletions(-) diff --git a/go/vt/vtexplain/vtexplain_vtgate.go b/go/vt/vtexplain/vtexplain_vtgate.go index a0b84acc8ed..155fe6de743 100644 --- a/go/vt/vtexplain/vtexplain_vtgate.go +++ b/go/vt/vtexplain/vtexplain_vtgate.go @@ -74,7 +74,12 @@ func (vte *VTExplain) initVtgateExecutor(ctx context.Context, ts *topo.Server, v var schemaTracker vtgate.SchemaInfo // no schema tracker for these tests queryLogBufferSize := 10 plans := theine.NewStore[vtgate.PlanCacheKey, *engine.Plan](4*1024*1024, false) - vte.vtgateExecutor = vtgate.NewExecutor(ctx, vte.env, vte.explainTopo, Cell, resolver, opts.Normalize, false, streamSize, plans, schemaTracker, false, opts.PlannerVersion, 0, vtgate.NewDynamicViperConfig(), "") + eConfig := vtgate.ExecutorConfig{ + Normalize: opts.Normalize, + StreamSize: streamSize, + AllowScatter: true, + } + vte.vtgateExecutor = vtgate.NewExecutor(ctx, vte.env, vte.explainTopo, Cell, resolver, eConfig, false, plans, schemaTracker, opts.PlannerVersion, vtgate.NewDynamicViperConfig()) vte.vtgateExecutor.SetQueryLogger(streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)) return nil diff --git a/go/vt/vtgate/bench_test.go b/go/vt/vtgate/bench_test.go index 5c64c7e3473..c2336662c69 100644 --- a/go/vt/vtgate/bench_test.go +++ b/go/vt/vtgate/bench_test.go @@ -80,7 +80,7 @@ func BenchmarkWithNormalizer(b *testing.B) { func BenchmarkWithoutNormalizer(b *testing.B) { vtgateInst, _, ctx := createVtgateEnv(b) - vtgateInst.executor.normalize = false + vtgateInst.executor.config.Normalize = false for i := 0; i < b.N; i++ { _, _, err := vtgateInst.Execute( diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index f7e09befb4a..a5915270baf 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -107,40 +107,45 @@ func init() { // Executor is the engine that executes queries by utilizing // the abilities of the underlying vttablets. -type Executor struct { - env *vtenv.Environment - serv srvtopo.Server - cell string - resolver *Resolver - scatterConn *ScatterConn - txConn *TxConn +type ( + ExecutorConfig struct { + Normalize bool + StreamSize int + // AllowScatter will fail planning if set to false and a plan contains any scatter queries + AllowScatter bool + WarmingReadsPercent int + QueryLogToFile string + } - mu sync.Mutex - vschema *vindexes.VSchema - streamSize int - vschemaStats *VSchemaStats + Executor struct { + config ExecutorConfig - plans *PlanCache - epoch atomic.Uint32 + env *vtenv.Environment + serv srvtopo.Server + cell string + resolver *Resolver + scatterConn *ScatterConn + txConn *TxConn - normalize bool + mu sync.Mutex + vschema *vindexes.VSchema + vschemaStats *VSchemaStats - vm *VSchemaManager - schemaTracker SchemaInfo + plans *PlanCache + epoch atomic.Uint32 - // allowScatter will fail planning if set to false and a plan contains any scatter queries - allowScatter bool + vm *VSchemaManager + schemaTracker SchemaInfo - // queryLogger is passed in for logging from this vtgate executor. - queryLogger *streamlog.StreamLogger[*logstats.LogStats] + // queryLogger is passed in for logging from this vtgate executor. + queryLogger *streamlog.StreamLogger[*logstats.LogStats] - warmingReadsPercent int - warmingReadsChannel chan bool + warmingReadsChannel chan bool - vConfig econtext.VCursorConfig - ddlConfig dynamicconfig.DDL - queryLogToFile string -} + vConfig econtext.VCursorConfig + ddlConfig dynamicconfig.DDL + } +) var executorOnce sync.Once @@ -164,32 +169,26 @@ func NewExecutor( serv srvtopo.Server, cell string, resolver *Resolver, - normalize, warnOnShardedOnly bool, - streamSize int, + eConfig ExecutorConfig, + warnOnShardedOnly bool, plans *PlanCache, schemaTracker SchemaInfo, - noScatter bool, pv plancontext.PlannerVersion, - warmingReadsPercent int, ddlConfig dynamicconfig.DDL, - queryLogToFile string, ) *Executor { e := &Executor{ - env: env, - serv: serv, - cell: cell, - resolver: resolver, - scatterConn: resolver.scatterConn, - txConn: resolver.scatterConn.txConn, - normalize: normalize, - streamSize: streamSize, + config: eConfig, + env: env, + serv: serv, + cell: cell, + resolver: resolver, + scatterConn: resolver.scatterConn, + txConn: resolver.scatterConn.txConn, + schemaTracker: schemaTracker, - allowScatter: !noScatter, plans: plans, - warmingReadsPercent: warmingReadsPercent, warmingReadsChannel: make(chan bool, warmingReadsConcurrency), ddlConfig: ddlConfig, - queryLogToFile: queryLogToFile, } // setting the vcursor config. e.initVConfig(warnOnShardedOnly, pv) @@ -330,7 +329,7 @@ func (e *Executor) StreamExecute( byteCount += col.Len() } - if byteCount >= e.streamSize { + if byteCount >= e.config.StreamSize { err := callback(result) seenResults.Store(true) result = &sqltypes.Result{} @@ -1422,7 +1421,7 @@ func (e *Executor) initVConfig(warnOnShardedOnly bool, pv plancontext.PlannerVer DBDDLPlugin: dbDDLPlugin, - WarmingReadsPercent: e.warmingReadsPercent, + WarmingReadsPercent: e.config.WarmingReadsPercent, WarmingReadsTimeout: warmingReadsQueryTimeout, WarmingReadsChannel: e.warmingReadsChannel, } @@ -1542,7 +1541,7 @@ func (e *Executor) startVStream(ctx context.Context, rss []*srvtopo.ResolvedShar } func (e *Executor) checkThatPlanIsValid(stmt sqlparser.Statement, plan *engine.Plan) error { - if e.allowScatter || plan.Instructions == nil || sqlparser.AllowScatterDirective(stmt) { + if e.config.AllowScatter || plan.Instructions == nil || sqlparser.AllowScatterDirective(stmt) { return nil } // we go over all the primitives in the plan, searching for a route that is of SelectScatter opcode diff --git a/go/vt/vtgate/executor_dml_test.go b/go/vt/vtgate/executor_dml_test.go index 503b5e5cd8b..2846c763ae9 100644 --- a/go/vt/vtgate/executor_dml_test.go +++ b/go/vt/vtgate/executor_dml_test.go @@ -608,9 +608,8 @@ func TestUpdateComments(t *testing.T) { } func TestUpdateNormalize(t *testing.T) { - executor, sbc1, sbc2, _, ctx := createExecutorEnv(t) + executor, sbc1, sbc2, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) - executor.normalize = true session := &vtgatepb.Session{ TargetString: "@primary", } @@ -1337,7 +1336,7 @@ func TestInsertSharded(t *testing.T) { testQueryLog(t, executor, logChan, "TestExecute", "INSERT", "insert into user2(id, `name`, lastname) values (2, 'myname', 'mylastname')", 1) // insert with binary values - executor.normalize = true + executor.config.Normalize = true sbc1.Queries = nil sbc2.Queries = nil sbclookup.Queries = nil @@ -1368,8 +1367,7 @@ func TestInsertSharded(t *testing.T) { } func TestInsertNegativeValue(t *testing.T) { - executor, sbc1, sbc2, sbclookup, ctx := createExecutorEnv(t) - executor.normalize = true + executor, sbc1, sbc2, sbclookup, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) logChan := executor.queryLogger.Subscribe("Test") defer executor.queryLogger.Unsubscribe(logChan) @@ -2528,9 +2526,7 @@ func TestDeleteEqualWithPrepare(t *testing.T) { } func TestUpdateLastInsertID(t *testing.T) { - executor, sbc1, _, _, ctx := createExecutorEnv(t) - - executor.normalize = true + executor, sbc1, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) sql := "update user set a = last_insert_id() where id = 1" session := &vtgatepb.Session{ diff --git a/go/vt/vtgate/executor_framework_test.go b/go/vt/vtgate/executor_framework_test.go index a126ddab637..099b785b446 100644 --- a/go/vt/vtgate/executor_framework_test.go +++ b/go/vt/vtgate/executor_framework_test.go @@ -65,10 +65,6 @@ var badVSchema = ` } ` -const ( - testBufferSize = 10 -) - type DestinationAnyShardPickerFirstShard struct{} func (dp DestinationAnyShardPickerFirstShard) PickShard(shardCount int) int { @@ -130,7 +126,7 @@ func init() { vindexes.Register("keyrange_lookuper_unique", newKeyRangeLookuperUnique) } -func createExecutorEnvCallback(t testing.TB, eachShard func(shard, ks string, tabletType topodatapb.TabletType, conn *sandboxconn.SandboxConn)) (executor *Executor, ctx context.Context) { +func createExecutorEnvCallback(t testing.TB, eConfig ExecutorConfig, eachShard func(shard, ks string, tabletType topodatapb.TabletType, conn *sandboxconn.SandboxConn)) (executor *Executor, ctx context.Context) { var cancel context.CancelFunc ctx, cancel = context.WithCancel(context.Background()) cell := "aa" @@ -183,7 +179,7 @@ func createExecutorEnvCallback(t testing.TB, eachShard func(shard, ks string, ta // one-off queries from thrashing the cache. Disable the doorkeeper in the tests to prevent flakiness. plans := theine.NewStore[PlanCacheKey, *engine.Plan](queryPlanCacheMemory, false) - executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, NewDynamicViperConfig(), "") + executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, eConfig, false, plans, nil, querypb.ExecuteOptions_Gen4, NewDynamicViperConfig()) executor.SetQueryLogger(queryLogger) key.AnyShardPicker = DestinationAnyShardPickerFirstShard{} @@ -198,7 +194,11 @@ func createExecutorEnvCallback(t testing.TB, eachShard func(shard, ks string, ta } func createExecutorEnv(t testing.TB) (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn.SandboxConn, ctx context.Context) { - executor, ctx = createExecutorEnvCallback(t, func(shard, ks string, tabletType topodatapb.TabletType, conn *sandboxconn.SandboxConn) { + return createExecutorEnvWithConfig(t, createExecutorConfig()) +} + +func createExecutorEnvWithConfig(t testing.TB, eConfig ExecutorConfig) (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn.SandboxConn, ctx context.Context) { + executor, ctx = createExecutorEnvCallback(t, eConfig, func(shard, ks string, tabletType topodatapb.TabletType, conn *sandboxconn.SandboxConn) { switch { case ks == KsTestSharded && shard == "-20": sbc1 = conn @@ -210,7 +210,6 @@ func createExecutorEnv(t testing.TB) (executor *Executor, sbc1, sbc2, sbclookup }) return } - func createCustomExecutor(t testing.TB, vschema string, mysqlVersion string) (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn.SandboxConn, ctx context.Context) { var cancel context.CancelFunc ctx, cancel = context.WithCancel(context.Background()) @@ -232,7 +231,7 @@ func createCustomExecutor(t testing.TB, vschema string, mysqlVersion string) (ex plans := DefaultPlanCache() env, err := vtenv.New(vtenv.Options{MySQLServerVersion: mysqlVersion}) require.NoError(t, err) - executor = NewExecutor(ctx, env, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, NewDynamicViperConfig(), "") + executor = NewExecutor(ctx, env, serv, cell, resolver, createExecutorConfig(), false, plans, nil, querypb.ExecuteOptions_Gen4, NewDynamicViperConfig()) executor.SetQueryLogger(queryLogger) t.Cleanup(func() { @@ -244,6 +243,21 @@ func createCustomExecutor(t testing.TB, vschema string, mysqlVersion string) (ex return executor, sbc1, sbc2, sbclookup, ctx } +func createExecutorConfig() ExecutorConfig { + return ExecutorConfig{ + StreamSize: 10, + AllowScatter: true, + } +} + +func createExecutorConfigWithNormalizer() ExecutorConfig { + return ExecutorConfig{ + StreamSize: 10, + AllowScatter: true, + Normalize: true, + } +} + func createCustomExecutorSetValues(t testing.TB, vschema string, values []*sqltypes.Result) (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn.SandboxConn, ctx context.Context) { var cancel context.CancelFunc ctx, cancel = context.WithCancel(context.Background()) @@ -269,7 +283,7 @@ func createCustomExecutorSetValues(t testing.TB, vschema string, values []*sqlty sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil) queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) plans := DefaultPlanCache() - executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, NewDynamicViperConfig(), "") + executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, createExecutorConfig(), false, plans, nil, querypb.ExecuteOptions_Gen4, NewDynamicViperConfig()) executor.SetQueryLogger(queryLogger) t.Cleanup(func() { @@ -294,7 +308,9 @@ func createExecutorEnvWithPrimaryReplicaConn(t testing.TB, ctx context.Context, replica = hc.AddTestTablet(cell, "0-replica", 1, KsTestUnsharded, "0", topodatapb.TabletType_REPLICA, true, 1, nil) queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) - executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, DefaultPlanCache(), nil, false, querypb.ExecuteOptions_Gen4, warmingReadsPercent, NewDynamicViperConfig(), "") + eConfig := createExecutorConfig() + eConfig.WarmingReadsPercent = warmingReadsPercent + executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, eConfig, false, DefaultPlanCache(), nil, querypb.ExecuteOptions_Gen4, NewDynamicViperConfig()) executor.SetQueryLogger(queryLogger) t.Cleanup(func() { diff --git a/go/vt/vtgate/executor_select_test.go b/go/vt/vtgate/executor_select_test.go index bd8efe97acc..3d8261495fe 100644 --- a/go/vt/vtgate/executor_select_test.go +++ b/go/vt/vtgate/executor_select_test.go @@ -166,7 +166,7 @@ func TestSelectDBA(t *testing.T) { func TestSystemVariablesMySQLBelow80(t *testing.T) { executor, sbc1, _, _, _ := createCustomExecutor(t, "{}", "5.7.0") - executor.normalize = true + executor.config.Normalize = true setVarEnabled = true session := econtext.NewAutocommitSession(&vtgatepb.Session{EnableSystemSettings: true, TargetString: "TestExecutor"}) @@ -200,7 +200,7 @@ func TestSystemVariablesMySQLBelow80(t *testing.T) { func TestSystemVariablesWithSetVarDisabled(t *testing.T) { executor, sbc1, _, _, _ := createCustomExecutor(t, "{}", "8.0.0") - executor.normalize = true + executor.config.Normalize = true executor.vConfig.SetVarEnabled = false session := econtext.NewAutocommitSession(&vtgatepb.Session{EnableSystemSettings: true, TargetString: "TestExecutor"}) @@ -233,7 +233,7 @@ func TestSystemVariablesWithSetVarDisabled(t *testing.T) { func TestSetSystemVariablesTx(t *testing.T) { executor, sbc1, _, _, _ := createCustomExecutor(t, "{}", "8.0.1") - executor.normalize = true + executor.config.Normalize = true session := econtext.NewAutocommitSession(&vtgatepb.Session{EnableSystemSettings: true, TargetString: "TestExecutor"}) @@ -278,9 +278,7 @@ func TestSetSystemVariablesTx(t *testing.T) { } func TestSetSystemVariables(t *testing.T) { - executor, _, _, lookup, _ := createExecutorEnv(t) - executor.normalize = true - + executor, _, _, lookup, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := econtext.NewAutocommitSession(&vtgatepb.Session{EnableSystemSettings: true, TargetString: KsTestUnsharded, SystemVariables: map[string]string{}}) // Set @@sql_mode and execute a select statement. We should have SET_VAR in the select statement @@ -389,8 +387,7 @@ func TestSetSystemVariables(t *testing.T) { } func TestSetSystemVariablesWithReservedConnection(t *testing.T) { - executor, sbc1, _, _, _ := createExecutorEnv(t) - executor.normalize = true + executor, sbc1, _, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := econtext.NewAutocommitSession(&vtgatepb.Session{EnableSystemSettings: true, SystemVariables: map[string]string{}}) @@ -445,8 +442,7 @@ func TestSelectVindexFunc(t *testing.T) { } func TestCreateTableValidTimestamp(t *testing.T) { - executor, sbc1, _, _, _ := createExecutorEnv(t) - executor.normalize = true + executor, sbc1, _, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := econtext.NewSafeSession(&vtgatepb.Session{TargetString: "TestExecutor", SystemVariables: map[string]string{"sql_mode": "ALLOW_INVALID_DATES"}}) @@ -464,8 +460,7 @@ func TestCreateTableValidTimestamp(t *testing.T) { } func TestGen4SelectDBA(t *testing.T) { - executor, sbc1, _, _, _ := createExecutorEnv(t) - executor.normalize = true + executor, sbc1, _, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) query := "select * from INFORMATION_SCHEMA.TABLE_CONSTRAINTS" _, err := executor.Execute(context.Background(), nil, "TestSelectDBA", @@ -699,18 +694,19 @@ func TestStreamLimitOffset(t *testing.T) { }}, } - executor, _ := createExecutorEnvCallback(t, func(shard, ks string, tabletType topodatapb.TabletType, conn *sandboxconn.SandboxConn) { - if ks == KsTestSharded { - conn.SetResults([]*sqltypes.Result{{ - Fields: []*querypb.Field{ - {Name: "id", Type: sqltypes.Int32, Charset: collations.CollationBinaryID, Flags: uint32(querypb.MySqlFlag_NUM_FLAG)}, - {Name: "textcol", Type: sqltypes.VarChar, Charset: uint32(collations.MySQL8().DefaultConnectionCharset())}, - {Name: "weight_string(id)", Type: sqltypes.VarBinary, Charset: collations.CollationBinaryID, Flags: uint32(querypb.MySqlFlag_BINARY_FLAG)}, - }, - Rows: returnRows[shard], - }}) - } - }) + executor, _ := createExecutorEnvCallback(t, createExecutorConfig(), + func(shard, ks string, tabletType topodatapb.TabletType, conn *sandboxconn.SandboxConn) { + if ks == KsTestSharded { + conn.SetResults([]*sqltypes.Result{{ + Fields: []*querypb.Field{ + {Name: "id", Type: sqltypes.Int32, Charset: collations.CollationBinaryID, Flags: uint32(querypb.MySqlFlag_NUM_FLAG)}, + {Name: "textcol", Type: sqltypes.VarChar, Charset: uint32(collations.MySQL8().DefaultConnectionCharset())}, + {Name: "weight_string(id)", Type: sqltypes.VarBinary, Charset: collations.CollationBinaryID, Flags: uint32(querypb.MySqlFlag_BINARY_FLAG)}, + }, + Rows: returnRows[shard], + }}) + } + }) results := make(chan *sqltypes.Result, 10) session := &vtgatepb.Session{ @@ -756,12 +752,11 @@ func TestStreamLimitOffset(t *testing.T) { } func TestSelectLastInsertId(t *testing.T) { - executor, _, _, _, ctx := createExecutorEnv(t) + executor, _, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := &vtgatepb.Session{ TargetString: "@primary", LastInsertId: 52, } - executor.normalize = true logChan := executor.queryLogger.Subscribe("Test") defer executor.queryLogger.Unsubscribe(logChan) @@ -780,7 +775,7 @@ func TestSelectLastInsertId(t *testing.T) { } func TestSelectSystemVariables(t *testing.T) { - executor, _, _, _, ctx := createExecutorEnv(t) + executor, _, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := &vtgatepb.Session{ TargetString: "@primary", @@ -790,7 +785,6 @@ func TestSelectSystemVariables(t *testing.T) { SessionTrackGtids: true, }, } - executor.normalize = true logChan := executor.queryLogger.Subscribe("Test") defer executor.queryLogger.Unsubscribe(logChan) @@ -840,8 +834,7 @@ func TestSelectSystemVariables(t *testing.T) { } func TestSelectInitializedVitessAwareVariable(t *testing.T) { - executor, _, _, _, ctx := createExecutorEnv(t) - executor.normalize = true + executor, _, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) logChan := executor.queryLogger.Subscribe("Test") defer executor.queryLogger.Unsubscribe(logChan) @@ -872,8 +865,7 @@ func TestSelectInitializedVitessAwareVariable(t *testing.T) { } func TestSelectUserDefinedVariable(t *testing.T) { - executor, _, _, _, ctx := createExecutorEnv(t) - executor.normalize = true + executor, _, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) logChan := executor.queryLogger.Subscribe("Test") defer executor.queryLogger.Unsubscribe(logChan) @@ -908,8 +900,7 @@ func TestSelectUserDefinedVariable(t *testing.T) { } func TestFoundRows(t *testing.T) { - executor, _, _, _, ctx := createExecutorEnv(t) - executor.normalize = true + executor, _, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) logChan := executor.queryLogger.Subscribe("Test") defer executor.queryLogger.Unsubscribe(logChan) @@ -935,8 +926,7 @@ func TestFoundRows(t *testing.T) { } func TestRowCount(t *testing.T) { - executor, _, _, _, ctx := createExecutorEnv(t) - executor.normalize = true + executor, _, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) logChan := executor.queryLogger.Subscribe("Test") defer executor.queryLogger.Unsubscribe(logChan) @@ -968,8 +958,7 @@ func testRowCount(t *testing.T, ctx context.Context, executor *Executor, session } func TestSelectLastInsertIdInUnion(t *testing.T) { - executor, sbc1, _, _, ctx := createExecutorEnv(t) - executor.normalize = true + executor, sbc1, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := &vtgatepb.Session{ TargetString: "@primary", @@ -1002,8 +991,7 @@ func TestSelectLastInsertIdInUnion(t *testing.T) { } func TestSelectLastInsertIdInWhere(t *testing.T) { - executor, _, _, lookup, ctx := createExecutorEnv(t) - executor.normalize = true + executor, _, _, lookup, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) logChan := executor.queryLogger.Subscribe("Test") defer executor.queryLogger.Unsubscribe(logChan) @@ -1022,8 +1010,7 @@ func TestSelectLastInsertIdInWhere(t *testing.T) { } func TestLastInsertIDInVirtualTable(t *testing.T) { - executor, sbc1, _, _, ctx := createExecutorEnv(t) - executor.normalize = true + executor, sbc1, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) result1 := []*sqltypes.Result{{ Fields: []*querypb.Field{ {Name: "id", Type: sqltypes.Int32, Charset: collations.CollationBinaryID, Flags: uint32(querypb.MySqlFlag_NUM_FLAG)}, @@ -1050,8 +1037,7 @@ func TestLastInsertIDInVirtualTable(t *testing.T) { } func TestLastInsertIDInSubQueryExpression(t *testing.T) { - executor, sbc1, sbc2, _, ctx := createExecutorEnv(t) - executor.normalize = true + executor, sbc1, sbc2, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := &vtgatepb.Session{ TargetString: "@primary", LastInsertId: 12345, @@ -1074,8 +1060,7 @@ func TestLastInsertIDInSubQueryExpression(t *testing.T) { } func TestSelectDatabase(t *testing.T) { - executor, _, _, _, _ := createExecutorEnv(t) - executor.normalize = true + executor, _, _, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) sql := "select database()" newSession := &vtgatepb.Session{ TargetString: "@primary", @@ -1334,8 +1319,7 @@ func TestSelectComments(t *testing.T) { } func TestSelectNormalize(t *testing.T) { - executor, sbc1, sbc2, _, ctx := createExecutorEnv(t) - executor.normalize = true + executor, sbc1, sbc2, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := &vtgatepb.Session{ TargetString: "@primary", @@ -1644,7 +1628,7 @@ func TestSelectListArg(t *testing.T) { func createExecutor(ctx context.Context, serv *sandboxTopo, cell string, resolver *Resolver) *Executor { queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) plans := DefaultPlanCache() - ex := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, NewDynamicViperConfig(), "") + ex := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, createExecutorConfig(), false, plans, nil, querypb.ExecuteOptions_Gen4, NewDynamicViperConfig()) ex.SetQueryLogger(queryLogger) return ex } @@ -3123,8 +3107,7 @@ func TestSelectBindvarswithPrepare(t *testing.T) { } func TestSelectDatabasePrepare(t *testing.T) { - executor, _, _, _, ctx := createExecutorEnv(t) - executor.normalize = true + executor, _, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) logChan := executor.queryLogger.Subscribe("Test") defer executor.queryLogger.Unsubscribe(logChan) @@ -3137,8 +3120,7 @@ func TestSelectDatabasePrepare(t *testing.T) { } func TestSelectWithUnionAll(t *testing.T) { - executor, sbc1, sbc2, _, ctx := createExecutorEnv(t) - executor.normalize = true + executor, sbc1, sbc2, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) sql := "select id from user where id in (1, 2, 3) union all select id from user where id in (1, 2, 3)" bv, _ := sqltypes.BuildBindVariable([]int64{1, 2, 3}) bv1, _ := sqltypes.BuildBindVariable([]int64{1, 2}) @@ -3326,7 +3308,7 @@ func TestStreamOrderByWithMultipleResults(t *testing.T) { } queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) plans := DefaultPlanCache() - executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, true, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, NewDynamicViperConfig(), "") + executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, createExecutorConfigWithNormalizer(), false, plans, nil, querypb.ExecuteOptions_Gen4, NewDynamicViperConfig()) executor.SetQueryLogger(queryLogger) defer executor.Close() // some sleep for all goroutines to start @@ -3369,7 +3351,7 @@ func TestStreamOrderByLimitWithMultipleResults(t *testing.T) { } queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) plans := DefaultPlanCache() - executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, true, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, NewDynamicViperConfig(), "") + executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, createExecutorConfigWithNormalizer(), false, plans, nil, querypb.ExecuteOptions_Gen4, NewDynamicViperConfig()) executor.SetQueryLogger(queryLogger) defer executor.Close() // some sleep for all goroutines to start @@ -3420,7 +3402,7 @@ func TestSelectScatterFails(t *testing.T) { executor := createExecutor(ctx, serv, cell, resolver) defer executor.Close() - executor.allowScatter = false + executor.config.AllowScatter = false logChan := executor.queryLogger.Subscribe("Test") defer executor.queryLogger.Unsubscribe(logChan) @@ -3447,8 +3429,7 @@ func TestSelectScatterFails(t *testing.T) { } func TestGen4SelectStraightJoin(t *testing.T) { - executor, sbc1, _, _, _ := createExecutorEnv(t) - executor.normalize = true + executor, sbc1, _, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := econtext.NewSafeSession(&vtgatepb.Session{TargetString: "TestExecutor"}) query := "select u.id from user u straight_join user2 u2 on u.id = u2.id" @@ -3469,8 +3450,7 @@ func TestGen4SelectStraightJoin(t *testing.T) { } func TestGen4MultiColumnVindexEqual(t *testing.T) { - executor, sbc1, sbc2, _, _ := createExecutorEnv(t) - executor.normalize = true + executor, sbc1, sbc2, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := econtext.NewSafeSession(&vtgatepb.Session{TargetString: "TestExecutor"}) query := "select * from user_region where cola = 1 and colb = 2" @@ -3507,8 +3487,7 @@ func TestGen4MultiColumnVindexEqual(t *testing.T) { } func TestGen4MultiColumnVindexIn(t *testing.T) { - executor, sbc1, sbc2, _, _ := createExecutorEnv(t) - executor.normalize = true + executor, sbc1, sbc2, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := econtext.NewSafeSession(&vtgatepb.Session{TargetString: "TestExecutor"}) query := "select * from user_region where cola IN (1,17984) and colb IN (2,3,4)" @@ -3545,8 +3524,7 @@ func TestGen4MultiColumnVindexIn(t *testing.T) { } func TestGen4MultiColMixedColComparision(t *testing.T) { - executor, sbc1, sbc2, _, _ := createExecutorEnv(t) - executor.normalize = true + executor, sbc1, sbc2, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := econtext.NewSafeSession(&vtgatepb.Session{TargetString: "TestExecutor"}) query := "select * from user_region where colb = 2 and cola IN (1,17984)" @@ -3581,8 +3559,7 @@ func TestGen4MultiColMixedColComparision(t *testing.T) { } func TestGen4MultiColBestVindexSel(t *testing.T) { - executor, sbc1, sbc2, _, _ := createExecutorEnv(t) - executor.normalize = true + executor, sbc1, sbc2, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := econtext.NewSafeSession(&vtgatepb.Session{TargetString: "TestExecutor"}) query := "select * from user_region where colb = 2 and cola IN (1,17984) and cola = 1" @@ -3626,8 +3603,7 @@ func TestGen4MultiColBestVindexSel(t *testing.T) { } func TestGen4MultiColMultiEqual(t *testing.T) { - executor, sbc1, sbc2, _, _ := createExecutorEnv(t) - executor.normalize = true + executor, sbc1, sbc2, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := econtext.NewSafeSession(&vtgatepb.Session{TargetString: "TestExecutor"}) query := "select * from user_region where (cola,colb) in ((17984,2),(17984,3))" @@ -4229,8 +4205,7 @@ func TestSelectAggregationRandom(t *testing.T) { } func TestSelectDateTypes(t *testing.T) { - executor, _, _, _, _ := createExecutorEnv(t) - executor.normalize = true + executor, _, _, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := econtext.NewAutocommitSession(&vtgatepb.Session{}) qr, err := executor.Execute(context.Background(), nil, "TestSelectDateTypes", session, "select '2020-01-01' + interval month(date_sub(FROM_UNIXTIME(1234), interval 1 month))-1 month", nil) @@ -4240,8 +4215,7 @@ func TestSelectDateTypes(t *testing.T) { } func TestSelectHexAndBit(t *testing.T) { - executor, _, _, _, _ := createExecutorEnv(t) - executor.normalize = true + executor, _, _, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := econtext.NewAutocommitSession(&vtgatepb.Session{}) qr, err := executor.Execute(context.Background(), nil, "TestSelectHexAndBit", session, "select 0b1001, b'1001', 0x9, x'09'", nil) @@ -4256,8 +4230,7 @@ func TestSelectHexAndBit(t *testing.T) { // TestSelectCFC tests validates that cfc vindex plan gets cached and same plan is getting reused. // This also validates that cache_size is able to calculate the cfc vindex plan size. func TestSelectCFC(t *testing.T) { - executor, _, _, _, _ := createExecutorEnv(t) - executor.normalize = true + executor, _, _, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := econtext.NewAutocommitSession(&vtgatepb.Session{}) _, err := executor.Execute(context.Background(), nil, "TestSelectCFC", session, "select /*vt+ PLANNER=gen4 */ c2 from tbl_cfc where c1 like 'A%'", nil) @@ -4281,12 +4254,11 @@ func TestSelectCFC(t *testing.T) { } func TestSelectView(t *testing.T) { - executor, sbc, _, _, _ := createExecutorEnv(t) + executor, sbc, _, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) // add the view to local vschema err := executor.vschema.AddView(KsTestSharded, "user_details_view", "select user.id, user_extra.col from user join user_extra on user.id = user_extra.user_id", executor.vm.parser) require.NoError(t, err) - executor.normalize = true session := econtext.NewAutocommitSession(&vtgatepb.Session{}) _, err = executor.Execute(context.Background(), nil, "TestSelectView", session, "select * from user_details_view", nil) @@ -4327,7 +4299,7 @@ func TestWarmingReads(t *testing.T) { ctx := utils.LeakCheckContext(t) executor, primary, replica := createExecutorEnvWithPrimaryReplicaConn(t, ctx, 100) - executor.normalize = true + executor.config.Normalize = true session := econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded}) // Since queries on the replica will run in a separate go-routine, we need synchronization for the Queries field in the sandboxconn. replica.RequireQueriesLocking() @@ -4451,8 +4423,7 @@ func TestStreamJoinQuery(t *testing.T) { // It also tests that setting a global variable does not affect the session variable and vice versa. // Also, test what happens on running select @@global and select @@session for a system variable. func TestSysVarGlobalAndSession(t *testing.T) { - executor, sbc1, _, _, _ := createExecutorEnv(t) - executor.normalize = true + executor, sbc1, _, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := econtext.NewAutocommitSession(&vtgatepb.Session{EnableSystemSettings: true, SystemVariables: map[string]string{}}) sbc1.SetResults([]*sqltypes.Result{ diff --git a/go/vt/vtgate/executor_set_test.go b/go/vt/vtgate/executor_set_test.go index f8ed0b558c3..310d885a134 100644 --- a/go/vt/vtgate/executor_set_test.go +++ b/go/vt/vtgate/executor_set_test.go @@ -514,7 +514,7 @@ func createMap(keys []string, values []any) map[string]*querypb.BindVariable { func TestSetVar(t *testing.T) { executor, _, _, sbc, ctx := createCustomExecutor(t, "{}", "8.0.0") - executor.normalize = true + executor.config.Normalize = true session := econtext.NewAutocommitSession(&vtgatepb.Session{EnableSystemSettings: true, TargetString: KsTestUnsharded}) @@ -553,7 +553,7 @@ func TestSetVar(t *testing.T) { func TestSetVarShowVariables(t *testing.T) { executor, _, _, sbc, ctx := createCustomExecutor(t, "{}", "8.0.0") - executor.normalize = true + executor.config.Normalize = true session := econtext.NewAutocommitSession(&vtgatepb.Session{EnableSystemSettings: true, TargetString: KsTestUnsharded}) @@ -576,8 +576,7 @@ func TestSetVarShowVariables(t *testing.T) { } func TestExecutorSetAndSelect(t *testing.T) { - e, _, _, sbc, ctx := createExecutorEnv(t) - e.normalize = true + e, _, _, sbc, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) testcases := []struct { sysVar string diff --git a/go/vt/vtgate/executor_stream_test.go b/go/vt/vtgate/executor_stream_test.go index e44aeb567f9..796e0c0466b 100644 --- a/go/vt/vtgate/executor_stream_test.go +++ b/go/vt/vtgate/executor_stream_test.go @@ -68,7 +68,7 @@ func TestStreamSQLSharded(t *testing.T) { queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) plans := DefaultPlanCache() - executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, NewDynamicViperConfig(), "") + executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, createExecutorConfig(), false, plans, nil, querypb.ExecuteOptions_Gen4, NewDynamicViperConfig()) executor.SetQueryLogger(queryLogger) defer executor.Close() diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index 8b06870e7e1..071e16dbe2c 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -196,9 +196,7 @@ func TestExecutorTransactionsNoAutoCommit(t *testing.T) { } func TestDirectTargetRewrites(t *testing.T) { - executor, _, _, sbclookup, ctx := createExecutorEnv(t) - - executor.normalize = true + executor, _, _, sbclookup, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := &vtgatepb.Session{ TargetString: "TestUnsharded/0@primary", @@ -1631,7 +1629,7 @@ func getPlanCached(t *testing.T, ctx context.Context, e *Executor, vcursor *econ stmt, reservedVars, err := parseAndValidateQuery(sql, sqlparser.NewTestParser()) require.NoError(t, err) - plan, err := e.getPlan(context.Background(), vcursor, sql, stmt, comments, bindVars, reservedVars /* normalize */, e.normalize, logStats) + plan, err := e.getPlan(context.Background(), vcursor, sql, stmt, comments, bindVars, reservedVars /* normalize */, e.config.Normalize, logStats) require.NoError(t, err) // Wait for cache to settle @@ -1691,8 +1689,7 @@ func TestGetPlanCacheUnnormalized(t *testing.T) { func TestGetPlanCacheNormalized(t *testing.T) { t.Run("Cache", func(t *testing.T) { - r, _, _, _, ctx := createExecutorEnv(t) - r.normalize = true + r, _, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) emptyvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, nullResultsObserver{}, r.vConfig) query1 := "select * from music_user_map where id = 1" @@ -1708,8 +1705,7 @@ func TestGetPlanCacheNormalized(t *testing.T) { t.Run("Skip Cache", func(t *testing.T) { // Skip cache using directive - r, _, _, _, ctx := createExecutorEnv(t) - r.normalize = true + r, _, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) unshardedvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, nullResultsObserver{}, r.vConfig) query1 := "insert /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ into user(id) values (1), (2)" @@ -1733,9 +1729,8 @@ func TestGetPlanCacheNormalized(t *testing.T) { } func TestGetPlanNormalized(t *testing.T) { - r, _, _, _, ctx := createExecutorEnv(t) + r, _, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) - r.normalize = true emptyvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, nullResultsObserver{}, econtext.VCursorConfig{}) unshardedvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, nullResultsObserver{}, econtext.VCursorConfig{}) @@ -1790,9 +1785,8 @@ func TestGetPlanPriority(t *testing.T) { testCase := aTestCase t.Run(testCase.name, func(t *testing.T) { - r, _, _, _, ctx := createExecutorEnv(t) + r, _, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) - r.normalize = true logStats := logstats.NewLogStats(ctx, "Test", "", "", nil) vCursor, err := econtext.NewVCursorImpl(session, makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, nullResultsObserver{}, econtext.VCursorConfig{}) assert.NoError(t, err) @@ -1816,7 +1810,7 @@ func TestGetPlanPriority(t *testing.T) { } func TestPassthroughDDL(t *testing.T) { - executor, sbc1, sbc2, _, ctx := createExecutorEnv(t) + executor, sbc1, sbc2, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := &vtgatepb.Session{ TargetString: "TestExecutor", } @@ -1839,7 +1833,6 @@ func TestPassthroughDDL(t *testing.T) { // Force the query to go to only one shard. Normalization doesn't make any difference. session.TargetString = "TestExecutor/40-60" - executor.normalize = true _, err = executorExec(ctx, executor, session, alterDDL, nil) require.NoError(t, err) @@ -1851,7 +1844,6 @@ func TestPassthroughDDL(t *testing.T) { // Use range query session.TargetString = "TestExecutor[-]" - executor.normalize = true _, err = executorExec(ctx, executor, session, alterDDL, nil) require.NoError(t, err) @@ -2001,10 +1993,7 @@ func TestExecutorMaxPayloadSizeExceeded(t *testing.T) { } func TestOlapSelectDatabase(t *testing.T) { - executor, _, _, _, _ := createExecutorEnv(t) - - executor.normalize = true - + executor, _, _, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := &vtgatepb.Session{Autocommit: true} sql := `select database()` @@ -2282,9 +2271,7 @@ func TestExecutorExplainStmt(t *testing.T) { } func TestExecutorVExplain(t *testing.T) { - executor, _, _, _, ctx := createExecutorEnv(t) - - executor.normalize = true + executor, _, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) logChan := executor.queryLogger.Subscribe("Test") defer executor.queryLogger.Unsubscribe(logChan) @@ -2754,9 +2741,7 @@ func TestExecutorStartTxnStmt(t *testing.T) { } func TestExecutorPrepareExecute(t *testing.T) { - executor, _, _, _, _ := createExecutorEnv(t) - - executor.normalize = true + executor, _, _, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := econtext.NewAutocommitSession(&vtgatepb.Session{}) // prepare statement. diff --git a/go/vt/vtgate/plan_execute.go b/go/vt/vtgate/plan_execute.go index db7923c09f0..75a7af6bf2a 100644 --- a/go/vt/vtgate/plan_execute.go +++ b/go/vt/vtgate/plan_execute.go @@ -131,7 +131,7 @@ func (e *Executor) newExecute( // the vtgate to clear the cached plans when processing the new serving vschema. // When buffering ends, many queries might be getting planned at the same time and we then // take full advatange of the cached plan. - plan, err = e.getPlan(ctx, vcursor, query, stmt, comments, bindVars, reservedVars, e.normalize, logStats) + plan, err = e.getPlan(ctx, vcursor, query, stmt, comments, bindVars, reservedVars, e.config.Normalize, logStats) execStart := e.logPlanningFinished(logStats, plan) if err != nil { diff --git a/go/vt/vtgate/querylog.go b/go/vt/vtgate/querylog.go index 574e84f32de..686ff419764 100644 --- a/go/vt/vtgate/querylog.go +++ b/go/vt/vtgate/querylog.go @@ -49,8 +49,8 @@ func (e *Executor) defaultQueryLogger() error { queryzHandler(e, w, r) }) - if e.queryLogToFile != "" { - _, err := queryLogger.LogToFile(e.queryLogToFile, streamlog.GetFormatter(queryLogger)) + if e.config.QueryLogToFile != "" { + _, err := queryLogger.LogToFile(e.config.QueryLogToFile, streamlog.GetFormatter(queryLogger)) if err != nil { return err } diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go index 032c10411ef..b8c0e7db057 100644 --- a/go/vt/vtgate/vtgate.go +++ b/go/vt/vtgate/vtgate.go @@ -352,23 +352,15 @@ func Init( plans := DefaultPlanCache() - executor := NewExecutor( - ctx, - env, - serv, - cell, - resolver, - normalizeQueries, - warnShardedOnly, - streamBufferSize, - plans, - si, - noScatter, - pv, - warmingReadsPercent, - dynamicConfig, - queryLogToFile, - ) + eConfig := ExecutorConfig{ + Normalize: normalizeQueries, + StreamSize: streamBufferSize, + AllowScatter: !noScatter, + WarmingReadsPercent: warmingReadsPercent, + QueryLogToFile: queryLogToFile, + } + + executor := NewExecutor(ctx, env, serv, cell, resolver, eConfig, warnShardedOnly, plans, si, pv, dynamicConfig) if err := executor.defaultQueryLogger(); err != nil { log.Fatalf("error initializing query logger: %v", err) diff --git a/go/vt/vtgate/vtgate_test.go b/go/vt/vtgate/vtgate_test.go index c113ed16308..f05f63474d0 100644 --- a/go/vt/vtgate/vtgate_test.go +++ b/go/vt/vtgate/vtgate_test.go @@ -715,8 +715,7 @@ func createVtgateEnv(t testing.TB) (*VTGate, *sandboxconn.SandboxConn, context.C cell := "aa" sb := createSandbox(KsTestSharded) sb.ShardSpec = "-" - executor, _, _, sbc, ctx := createExecutorEnv(t) - executor.normalize = normalizeQueries + executor, _, _, sbc, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) vsm := newVStreamManager(executor.resolver.resolver, executor.serv, cell) vtg := newVTGate(executor, executor.resolver, vsm, nil, executor.scatterConn.gateway)