From 462c9004a55c99774d8eb69494485f8cae3ceb56 Mon Sep 17 00:00:00 2001 From: Liao Lanyu <1435078631@qq.com> Date: Fri, 24 May 2024 08:34:40 +0800 Subject: [PATCH] Introducing a memory control mechanism during the query planning stage --- .../queryengine/common/MPPQueryContext.java | 60 +++++++ .../exception/MemoryNotEnoughException.java | 7 +- .../execution/MemoryEstimationHelper.java | 6 +- .../db/queryengine/plan/Coordinator.java | 3 + .../plan/analyze/AnalyzeVisitor.java | 154 ++++++++++++------ .../plan/analyze/ConcatPathRewriter.java | 33 ++-- .../plan/analyze/ExpressionAnalyzer.java | 42 +++-- .../plan/analyze/ExpressionUtils.java | 66 +++++--- .../plan/analyze/TemplatedAnalyze.java | 12 +- .../plan/execution/QueryExecution.java | 3 + .../plan/expression/Expression.java | 3 +- .../expression/binary/BinaryExpression.java | 12 ++ .../plan/expression/leaf/ConstantOperand.java | 9 + .../plan/expression/leaf/NullOperand.java | 10 ++ .../expression/leaf/TimeSeriesOperand.java | 10 ++ .../expression/leaf/TimestampOperand.java | 10 ++ .../expression/multi/FunctionExpression.java | 26 +++ .../other/CaseWhenThenExpression.java | 16 ++ .../other/GroupByTimeExpression.java | 9 + .../expression/ternary/BetweenExpression.java | 14 ++ .../plan/expression/unary/InExpression.java | 12 +- .../expression/unary/IsNullExpression.java | 10 ++ .../plan/expression/unary/LikeExpression.java | 12 ++ .../expression/unary/LogicNotExpression.java | 11 ++ .../expression/unary/NegationExpression.java | 11 ++ .../expression/unary/RegularExpression.java | 12 ++ .../BindSchemaForExpressionVisitor.java | 50 ++++-- .../BindSchemaForPredicateVisitor.java | 43 +++-- .../cartesian/CartesianProductVisitor.java | 23 ++- ...viceAndBindSchemaForExpressionVisitor.java | 27 ++- ...eviceAndBindSchemaForPredicateVisitor.java | 30 +++- ...oncatExpressionWithSuffixPathsVisitor.java | 28 +++- .../cartesian/QueryContextProvider.java} | 19 +-- .../optimization/AggregationPushDown.java | 91 +++++++++-- .../plan/planner/LocalExecutionPlanner.java | 22 ++- .../plan/planner/LogicalPlanBuilder.java | 76 +++++---- .../distribution/DistributionPlanContext.java | 6 + .../planner/distribution/SourceRewriter.java | 9 + .../node/source/AlignedLastQueryScanNode.java | 13 ++ .../AlignedSeriesAggregationScanNode.java | 11 ++ .../node/source/AlignedSeriesScanNode.java | 12 ++ .../plan/node/source/LastQueryScanNode.java | 13 ++ .../source/SeriesAggregationScanNode.java | 12 ++ .../plan/node/source/SeriesScanNode.java | 12 ++ .../plan/node/source/SeriesSourceNode.java | 4 +- .../iotdb/db/utils/ErrorHandlingUtils.java | 3 + .../plan/analyze/ExpressionAnalyzerTest.java | 8 +- 47 files changed, 872 insertions(+), 213 deletions(-) rename iotdb-core/datanode/src/{test/java/org/apache/iotdb/db/queryengine/exception/MemoryNotEnoughExceptionTest.java => main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/QueryContextProvider.java} (61%) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java index 1cc2f8663dad..3a42ee805b0d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java @@ -26,6 +26,7 @@ import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider; import org.apache.iotdb.db.queryengine.plan.analyze.lock.SchemaLockType; +import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner; import org.apache.iotdb.db.queryengine.statistics.QueryPlanStatistics; import org.apache.tsfile.read.filter.basic.Filter; @@ -75,6 +76,19 @@ public class MPPQueryContext { QueryPlanStatistics queryPlanStatistics = null; + // To avoid query front-end from consuming too much memory, it needs to reserve memory when + // constructing some Expression and PlanNode. + private long reservedBytesInTotalForFrontEnd = 0; + + private long bytesToBeReservedForFrontEnd = 0; + + // To avoid reserving memory too frequently, we choose to do it in batches. This is the lower + // bound + // for each batch. + private static final long MEMORY_BATCH_THRESHOLD = 1024L * 1024L; + + private final LocalExecutionPlanner LOCAL_EXECUTION_PLANNER = LocalExecutionPlanner.getInstance(); + public MPPQueryContext(QueryId queryId) { this.queryId = queryId; this.endPointBlackList = new LinkedList<>(); @@ -113,6 +127,7 @@ public MPPQueryContext( public void prepareForRetry() { this.initResultNodeContext(); + this.releaseMemoryForFrontEnd(); } private void initResultNodeContext() { @@ -290,4 +305,49 @@ public void setLogicalOptimizationCost(long logicalOptimizeCost) { } queryPlanStatistics.setLogicalOptimizationCost(logicalOptimizeCost); } + + // region =========== FE memory related, make sure its not called concurrently =========== + + /** + * This method does not require concurrency control because the query plan is generated in a + * single-threaded manner. + */ + public void reserveMemoryForFrontEnd(final long bytes) { + this.bytesToBeReservedForFrontEnd += bytes; + if (this.bytesToBeReservedForFrontEnd >= MEMORY_BATCH_THRESHOLD) { + reserveMemoryForFrontEndImmediately(); + } + } + + public void reserveMemoryForFrontEndImmediately() { + if (bytesToBeReservedForFrontEnd != 0) { + LOCAL_EXECUTION_PLANNER.reserveMemoryForQueryFrontEnd( + bytesToBeReservedForFrontEnd, reservedBytesInTotalForFrontEnd, queryId.getId()); + this.reservedBytesInTotalForFrontEnd += bytesToBeReservedForFrontEnd; + this.bytesToBeReservedForFrontEnd = 0; + } + } + + public void releaseMemoryForFrontEnd() { + if (reservedBytesInTotalForFrontEnd != 0) { + LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(reservedBytesInTotalForFrontEnd); + reservedBytesInTotalForFrontEnd = 0; + } + } + + public void releaseMemoryForFrontEnd(final long bytes) { + if (bytes != 0) { + long bytesToRelease; + if (bytes <= bytesToBeReservedForFrontEnd) { + bytesToBeReservedForFrontEnd -= bytes; + } else { + bytesToRelease = bytes - bytesToBeReservedForFrontEnd; + bytesToBeReservedForFrontEnd = 0; + LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(bytesToRelease); + reservedBytesInTotalForFrontEnd -= bytesToRelease; + } + } + } + + // endregion } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/exception/MemoryNotEnoughException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/exception/MemoryNotEnoughException.java index 5310a45db973..c0911254cb7e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/exception/MemoryNotEnoughException.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/exception/MemoryNotEnoughException.java @@ -19,12 +19,9 @@ package org.apache.iotdb.db.queryengine.exception; -import org.apache.iotdb.commons.exception.IoTDBException; -import org.apache.iotdb.rpc.TSStatusCode; - -public class MemoryNotEnoughException extends IoTDBException { +public class MemoryNotEnoughException extends RuntimeException { public MemoryNotEnoughException(String message) { - super(message, TSStatusCode.QUOTA_MEM_QUERY_NOT_ENOUGH.getStatusCode(), true); + super(message); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java index 85f13bfb7798..a18e2dbc58bd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java @@ -76,8 +76,10 @@ public static long getEstimatedSizeOfPartialPath(@Nullable final PartialPath par totalSize += MEASUREMENT_PATH_INSTANCE_SIZE; MeasurementPath measurementPath = (MeasurementPath) partialPath; totalSize += RamUsageEstimator.sizeOf(measurementPath.getMeasurementAlias()); - totalSize += - RamUsageEstimator.sizeOf(measurementPath.getMeasurementSchema().getMeasurementId()); + if (measurementPath.getMeasurementSchema() != null) { + totalSize += + RamUsageEstimator.sizeOf(measurementPath.getMeasurementSchema().getMeasurementId()); + } } else { totalSize += PARTIAL_PATH_INSTANCE_SIZE; totalSize += RamUsageEstimator.sizeOf(partialPath.getMeasurement()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index 3b406bdb48cf..65dcd8301953 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -141,6 +141,9 @@ private ExecutionResult execution( } return result; } finally { + if (queryContext != null) { + queryContext.releaseMemoryForFrontEnd(); + } if (queryContext != null && !queryContext.getAcquiredLockNumMap().isEmpty()) { Map lockMap = queryContext.getAcquiredLockNumMap(); for (Map.Entry entry : lockMap.entrySet()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java index 15d05e3430bc..3ed7bbfd169f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java @@ -291,20 +291,21 @@ public Analysis visitQuery(QueryStatement queryStatement, MPPQueryContext contex deviceList = pushDownLimitOffsetInGroupByTimeForDevice(deviceList, queryStatement); } - outputExpressions = analyzeSelect(analysis, queryStatement, schemaTree, deviceList); + outputExpressions = + analyzeSelect(analysis, queryStatement, schemaTree, deviceList, context); if (outputExpressions.isEmpty()) { return finishQuery(queryStatement, analysis); } - analyzeDeviceToWhere(analysis, queryStatement, schemaTree, deviceList); + analyzeDeviceToWhere(analysis, queryStatement, schemaTree, deviceList, context); if (deviceList.isEmpty()) { return finishQuery(queryStatement, analysis, outputExpressions); } analysis.setDeviceList(deviceList); - analyzeDeviceToGroupBy(analysis, queryStatement, schemaTree, deviceList); - analyzeDeviceToOrderBy(analysis, queryStatement, schemaTree, deviceList); - analyzeHaving(analysis, queryStatement, schemaTree, deviceList); + analyzeDeviceToGroupBy(analysis, queryStatement, schemaTree, deviceList, context); + analyzeDeviceToOrderBy(analysis, queryStatement, schemaTree, deviceList, context); + analyzeHaving(analysis, queryStatement, schemaTree, deviceList, context); analyzeDeviceToAggregation(analysis, queryStatement); analyzeDeviceToSourceTransform(analysis, queryStatement); @@ -321,22 +322,25 @@ public Analysis visitQuery(QueryStatement queryStatement, MPPQueryContext contex new GroupByLevelHelper(queryStatement.getGroupByLevelComponent().getLevels()); outputExpressions = - analyzeGroupByLevelSelect(analysis, queryStatement, schemaTree, groupByLevelHelper); + analyzeGroupByLevelSelect( + analysis, queryStatement, schemaTree, groupByLevelHelper, context); if (outputExpressions.isEmpty()) { return finishQuery(queryStatement, analysis); } analysis.setOutputExpressions(outputExpressions); setSelectExpressions(analysis, queryStatement, outputExpressions); - analyzeGroupByLevelHaving(analysis, queryStatement, schemaTree, groupByLevelHelper); + analyzeGroupByLevelHaving( + analysis, queryStatement, schemaTree, groupByLevelHelper, context); - analyzeGroupByLevelOrderBy(analysis, queryStatement, schemaTree, groupByLevelHelper); + analyzeGroupByLevelOrderBy( + analysis, queryStatement, schemaTree, groupByLevelHelper, context); checkDataTypeConsistencyInGroupByLevel( analysis, groupByLevelHelper.getGroupByLevelExpressions()); analysis.setCrossGroupByExpressions(groupByLevelHelper.getGroupByLevelExpressions()); } else { - outputExpressions = analyzeSelect(analysis, queryStatement, schemaTree); + outputExpressions = analyzeSelect(analysis, queryStatement, schemaTree, context); analyzeGroupByTag(analysis, queryStatement, outputExpressions); @@ -346,17 +350,17 @@ public Analysis visitQuery(QueryStatement queryStatement, MPPQueryContext contex analysis.setOutputExpressions(outputExpressions); setSelectExpressions(analysis, queryStatement, outputExpressions); - analyzeHaving(analysis, queryStatement, schemaTree); + analyzeHaving(analysis, queryStatement, schemaTree, context); - analyzeOrderBy(analysis, queryStatement, schemaTree); + analyzeOrderBy(analysis, queryStatement, schemaTree, context); } // analyze aggregation analyzeAggregation(analysis, queryStatement); // analyze aggregation input - analyzeGroupBy(analysis, queryStatement, schemaTree); - analyzeWhere(analysis, queryStatement, schemaTree); + analyzeGroupBy(analysis, queryStatement, schemaTree, context); + analyzeWhere(analysis, queryStatement, schemaTree, context); if (analysis.getWhereExpression() != null && analysis.getWhereExpression().equals(ConstantOperand.FALSE)) { return finishQuery(queryStatement, analysis, outputExpressions); @@ -395,7 +399,7 @@ private ISchemaTree analyzeSchema( queryStatement = (QueryStatement) concatPathRewriter.rewrite( - queryStatement, new PathPatternTree(queryStatement.useWildcard())); + queryStatement, new PathPatternTree(queryStatement.useWildcard()), context); analysis.setStatement(queryStatement); // request schema fetch API @@ -495,7 +499,7 @@ private Analysis analyzeLastQuery( for (ResultColumn resultColumn : queryStatement.getSelectComponent().getResultColumns()) { selectExpressions.add(resultColumn.getExpression()); } - analyzeLastSource(analysis, selectExpressions, schemaTree); + analyzeLastSource(analysis, selectExpressions, schemaTree, context); analysis.setRespDatasetHeader(DatasetHeaderFactory.getLastQueryHeader()); @@ -506,14 +510,17 @@ private Analysis analyzeLastQuery( } private void analyzeLastSource( - Analysis analysis, List selectExpressions, ISchemaTree schemaTree) { + Analysis analysis, + List selectExpressions, + ISchemaTree schemaTree, + MPPQueryContext context) { Set sourceExpressions = new LinkedHashSet<>(); Set lastQueryBaseExpressions = new LinkedHashSet<>(); Map> lastQueryNonWritableViewSourceExpressionMap = null; for (Expression selectExpression : selectExpressions) { for (Expression lastQuerySourceExpression : - bindSchemaForExpression(selectExpression, schemaTree)) { + bindSchemaForExpression(selectExpression, schemaTree, context)) { if (lastQuerySourceExpression instanceof TimeSeriesOperand) { lastQueryBaseExpressions.add(lastQuerySourceExpression); sourceExpressions.add(lastQuerySourceExpression); @@ -584,7 +591,8 @@ private List> analyzeGroupByLevelSelect( Analysis analysis, QueryStatement queryStatement, ISchemaTree schemaTree, - GroupByLevelHelper groupByLevelHelper) { + GroupByLevelHelper groupByLevelHelper, + MPPQueryContext queryContext) { Map>> outputExpressionMap = new HashMap<>(); int columnIndex = 0; @@ -592,7 +600,7 @@ private List> analyzeGroupByLevelSelect( Set> outputExpressionSet = new LinkedHashSet<>(); List resultExpressions = - bindSchemaForExpression(resultColumn.getExpression(), schemaTree); + bindSchemaForExpression(resultColumn.getExpression(), schemaTree, queryContext); boolean isCountStar = resultColumn.getExpression().getExpressionType().equals(ExpressionType.FUNCTION) && ((FunctionExpression) resultColumn.getExpression()).isCountStar(); @@ -640,7 +648,10 @@ private List> analyzeGroupByLevelSelect( /** process select component for align by time. */ private List> analyzeSelect( - Analysis analysis, QueryStatement queryStatement, ISchemaTree schemaTree) { + Analysis analysis, + QueryStatement queryStatement, + ISchemaTree schemaTree, + MPPQueryContext queryContext) { Map>> outputExpressionMap = new HashMap<>(); ColumnPaginationController paginationController = @@ -654,7 +665,7 @@ private List> analyzeSelect( for (ResultColumn resultColumn : queryStatement.getSelectComponent().getResultColumns()) { List> outputExpressions = new ArrayList<>(); List resultExpressions = - bindSchemaForExpression(resultColumn.getExpression(), schemaTree); + bindSchemaForExpression(resultColumn.getExpression(), schemaTree, queryContext); for (Expression resultExpression : resultExpressions) { if (paginationController.hasCurOffset()) { @@ -710,7 +721,8 @@ private List> analyzeSelect( Analysis analysis, QueryStatement queryStatement, ISchemaTree schemaTree, - List deviceList) { + List deviceList, + MPPQueryContext queryContext) { List> outputExpressions = new ArrayList<>(); Map> deviceToSelectExpressions = new HashMap<>(); ColumnPaginationController paginationController = @@ -726,7 +738,8 @@ private List> analyzeSelect( new LinkedHashMap<>(); for (PartialPath device : deviceList) { List selectExpressionsOfOneDevice = - concatDeviceAndBindSchemaForExpression(selectExpression, device, schemaTree); + concatDeviceAndBindSchemaForExpression( + selectExpression, device, schemaTree, queryContext); if (selectExpressionsOfOneDevice.isEmpty()) { continue; } @@ -863,14 +876,16 @@ private void analyzeHavingBase( Analysis analysis, QueryStatement queryStatement, ISchemaTree schemaTree, - UnaryOperator havingExpressionAnalyzer) { + UnaryOperator havingExpressionAnalyzer, + MPPQueryContext queryContext) { // get removeWildcard Expressions in Having List conJunctions = ExpressionAnalyzer.bindSchemaForPredicate( queryStatement.getHavingCondition().getPredicate(), queryStatement.getFromComponent().getPrefixPaths(), schemaTree, - true); + true, + queryContext); Expression havingExpression = PredicateUtils.combineConjuncts( conJunctions.stream().distinct().collect(Collectors.toList())); @@ -888,20 +903,28 @@ private void analyzeHavingBase( } private void analyzeHaving( - Analysis analysis, QueryStatement queryStatement, ISchemaTree schemaTree) { + Analysis analysis, + QueryStatement queryStatement, + ISchemaTree schemaTree, + MPPQueryContext queryContext) { if (!queryStatement.hasHaving()) { return; } analyzeHavingBase( - analysis, queryStatement, schemaTree, ExpressionAnalyzer::normalizeExpression); + analysis, + queryStatement, + schemaTree, + ExpressionAnalyzer::normalizeExpression, + queryContext); } private void analyzeGroupByLevelHaving( Analysis analysis, QueryStatement queryStatement, ISchemaTree schemaTree, - GroupByLevelHelper groupByLevelHelper) { + GroupByLevelHelper groupByLevelHelper, + MPPQueryContext queryContext) { if (!queryStatement.hasHaving()) { return; } @@ -912,7 +935,8 @@ private void analyzeGroupByLevelHaving( schemaTree, havingExpression -> PredicateUtils.removeDuplicateConjunct( - groupByLevelHelper.applyLevels(havingExpression, analysis))); + groupByLevelHelper.applyLevels(havingExpression, analysis)), + queryContext); // update groupByLevelExpressions groupByLevelHelper.updateGroupByLevelExpressions(analysis.getHavingExpression()); } @@ -921,7 +945,8 @@ private void analyzeHaving( Analysis analysis, QueryStatement queryStatement, ISchemaTree schemaTree, - List deviceSet) { + List deviceSet, + MPPQueryContext queryContext) { if (!queryStatement.hasHaving()) { return; } @@ -937,7 +962,8 @@ private void analyzeHaving( for (PartialPath device : deviceSet) { List expressionsInHaving = - concatDeviceAndBindSchemaForExpression(havingExpression, device, schemaTree); + concatDeviceAndBindSchemaForExpression( + havingExpression, device, schemaTree, queryContext); conJunctions.addAll( expressionsInHaving.stream() @@ -1341,7 +1367,8 @@ private void analyzeDeviceToWhere( Analysis analysis, QueryStatement queryStatement, ISchemaTree schemaTree, - List deviceSet) { + List deviceSet, + MPPQueryContext queryContext) { if (!queryStatement.hasWhere()) { return; } @@ -1352,7 +1379,7 @@ private void analyzeDeviceToWhere( while (deviceIterator.hasNext()) { PartialPath devicePath = deviceIterator.next(); Expression whereExpression = - analyzeWhereSplitByDevice(queryStatement, devicePath, schemaTree); + analyzeWhereSplitByDevice(queryStatement, devicePath, schemaTree, queryContext); if (whereExpression.equals(ConstantOperand.FALSE)) { deviceIterator.remove(); } else if (whereExpression.equals(ConstantOperand.TRUE)) { @@ -1371,7 +1398,10 @@ private void analyzeDeviceToWhere( } private void analyzeWhere( - Analysis analysis, QueryStatement queryStatement, ISchemaTree schemaTree) { + Analysis analysis, + QueryStatement queryStatement, + ISchemaTree schemaTree, + MPPQueryContext queryContext) { if (!queryStatement.hasWhere()) { return; } @@ -1380,7 +1410,8 @@ private void analyzeWhere( queryStatement.getWhereCondition().getPredicate(), queryStatement.getFromComponent().getPrefixPaths(), schemaTree, - true); + true, + queryContext); Expression whereExpression = convertConJunctionsToWhereExpression(conJunctions); if (whereExpression.equals(ConstantOperand.TRUE)) { analysis.setWhereExpression(null); @@ -1396,10 +1427,17 @@ private void analyzeWhere( } private Expression analyzeWhereSplitByDevice( - QueryStatement queryStatement, PartialPath devicePath, ISchemaTree schemaTree) { + final QueryStatement queryStatement, + final PartialPath devicePath, + final ISchemaTree schemaTree, + final MPPQueryContext queryContext) { List conJunctions = ExpressionAnalyzer.concatDeviceAndBindSchemaForPredicate( - queryStatement.getWhereCondition().getPredicate(), devicePath, schemaTree, true); + queryStatement.getWhereCondition().getPredicate(), + devicePath, + schemaTree, + true, + queryContext); return convertConJunctionsToWhereExpression(conJunctions); } @@ -1568,11 +1606,13 @@ private void analyzeOrderByBase( Analysis analysis, QueryStatement queryStatement, ISchemaTree schemaTree, - UnaryOperator> orderByExpressionAnalyzer) { + UnaryOperator> orderByExpressionAnalyzer, + MPPQueryContext queryContext) { Set orderByExpressions = new LinkedHashSet<>(); for (Expression expressionForItem : queryStatement.getExpressionSortItemList()) { // Expression in a sortItem only indicates one column - List expressions = bindSchemaForExpression(expressionForItem, schemaTree); + List expressions = + bindSchemaForExpression(expressionForItem, schemaTree, queryContext); if (expressions.isEmpty()) { throw new SemanticException( String.format( @@ -1600,19 +1640,24 @@ private void analyzeOrderByBase( } private void analyzeOrderBy( - Analysis analysis, QueryStatement queryStatement, ISchemaTree schemaTree) { + Analysis analysis, + QueryStatement queryStatement, + ISchemaTree schemaTree, + MPPQueryContext queryContext) { if (!queryStatement.hasOrderByExpression()) { return; } - analyzeOrderByBase(analysis, queryStatement, schemaTree, expressions -> expressions); + analyzeOrderByBase( + analysis, queryStatement, schemaTree, expressions -> expressions, queryContext); } private void analyzeGroupByLevelOrderBy( Analysis analysis, QueryStatement queryStatement, ISchemaTree schemaTree, - GroupByLevelHelper groupByLevelHelper) { + GroupByLevelHelper groupByLevelHelper, + MPPQueryContext queryContext) { if (!queryStatement.hasOrderByExpression()) { return; } @@ -1627,7 +1672,8 @@ private void analyzeGroupByLevelOrderBy( groupedExpressions.add(groupByLevelHelper.applyLevels(expression, analysis)); } return new ArrayList<>(groupedExpressions); - }); + }, + queryContext); // update groupByLevelExpressions for (Expression orderByExpression : analysis.getOrderByExpressions()) { groupByLevelHelper.updateGroupByLevelExpressions(orderByExpression); @@ -1642,7 +1688,8 @@ private void analyzeDeviceToGroupBy( Analysis analysis, QueryStatement queryStatement, ISchemaTree schemaTree, - List deviceSet) { + List deviceSet, + MPPQueryContext queryContext) { if (queryStatement.getGroupByComponent() == null) { return; } @@ -1654,7 +1701,7 @@ private void analyzeDeviceToGroupBy( Expression expression = groupByComponent.getControlColumnExpression(); for (PartialPath device : deviceSet) { List groupByExpressionsOfOneDevice = - concatDeviceAndBindSchemaForExpression(expression, device, schemaTree); + concatDeviceAndBindSchemaForExpression(expression, device, schemaTree, queryContext); if (groupByExpressionsOfOneDevice.isEmpty()) { throw new SemanticException( @@ -1713,7 +1760,8 @@ private void analyzeDeviceToOrderBy( Analysis analysis, QueryStatement queryStatement, ISchemaTree schemaTree, - List deviceSet) { + List deviceSet, + MPPQueryContext queryContext) { if (!queryStatement.hasOrderByExpression()) { return; } @@ -1726,7 +1774,8 @@ private void analyzeDeviceToOrderBy( Set orderByExpressionsForOneDevice = new LinkedHashSet<>(); for (Expression expressionForItem : queryStatement.getExpressionSortItemList()) { List expressions = - concatDeviceAndBindSchemaForExpression(expressionForItem, device, schemaTree); + concatDeviceAndBindSchemaForExpression( + expressionForItem, device, schemaTree, queryContext); if (expressions.isEmpty()) { throw new SemanticException( String.format( @@ -1763,7 +1812,10 @@ private void analyzeDeviceToOrderBy( } private void analyzeGroupBy( - Analysis analysis, QueryStatement queryStatement, ISchemaTree schemaTree) { + Analysis analysis, + QueryStatement queryStatement, + ISchemaTree schemaTree, + MPPQueryContext queryContext) { if (queryStatement.getGroupByComponent() == null) { return; @@ -1775,7 +1827,8 @@ private void analyzeGroupBy( if (queryStatement.hasGroupByExpression()) { groupByExpression = groupByComponent.getControlColumnExpression(); // Expression in group by variation clause only indicates one column - List expressions = bindSchemaForExpression(groupByExpression, schemaTree); + List expressions = + bindSchemaForExpression(groupByExpression, schemaTree, queryContext); if (expressions.isEmpty()) { throw new SemanticException( String.format( @@ -2891,7 +2944,8 @@ public Analysis visitShowTimeSeries( analysis, Collections.singletonList( new TimeSeriesOperand(showTimeSeriesStatement.getPathPattern())), - schemaTree); + schemaTree, + context); analyzeDataPartition(analysis, new QueryStatement(), schemaTree, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ConcatPathRewriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ConcatPathRewriter.java index 7ac927ac6431..041f08e1b5cf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ConcatPathRewriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ConcatPathRewriter.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.PathPatternTree; import org.apache.iotdb.commons.path.PathPatternTreeUtils; import org.apache.iotdb.db.exception.sql.StatementAnalyzeException; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.db.queryengine.plan.statement.component.ResultColumn; @@ -48,7 +49,8 @@ public PathPatternTree getPatternTree() { return patternTree; } - public Statement rewrite(Statement statement, PathPatternTree patternTree) + public Statement rewrite( + Statement statement, PathPatternTree patternTree, MPPQueryContext queryContext) throws StatementAnalyzeException { QueryStatement queryStatement = (QueryStatement) statement; this.patternTree = patternTree; @@ -75,7 +77,7 @@ public Statement rewrite(Statement statement, PathPatternTree patternTree) } else { // concat SELECT with FROM List resultColumns = - concatSelectWithFrom(queryStatement.getSelectComponent(), prefixPaths); + concatSelectWithFrom(queryStatement.getSelectComponent(), prefixPaths, queryContext); queryStatement.getSelectComponent().setResultColumns(resultColumns); // concat GROUP BY with FROM @@ -85,12 +87,13 @@ public Statement rewrite(Statement statement, PathPatternTree patternTree) .setControlColumnExpression( contactGroupByWithFrom( queryStatement.getGroupByComponent().getControlColumnExpression(), - prefixPaths)); + prefixPaths, + queryContext)); } if (queryStatement.hasOrderByExpression()) { List sortItemExpressions = queryStatement.getExpressionSortItemList(); sortItemExpressions.replaceAll( - expression -> contactOrderByWithFrom(expression, prefixPaths)); + expression -> contactOrderByWithFrom(expression, prefixPaths, queryContext)); } } @@ -119,14 +122,16 @@ public Statement rewrite(Statement statement, PathPatternTree patternTree) * path pattern. And construct pattern tree. */ private List concatSelectWithFrom( - SelectComponent selectComponent, List prefixPaths) + final SelectComponent selectComponent, + final List prefixPaths, + final MPPQueryContext queryContext) throws StatementAnalyzeException { // resultColumns after concat List resultColumns = new ArrayList<>(); for (ResultColumn resultColumn : selectComponent.getResultColumns()) { List resultExpressions = ExpressionAnalyzer.concatExpressionWithSuffixPaths( - resultColumn.getExpression(), prefixPaths, patternTree); + resultColumn.getExpression(), prefixPaths, patternTree, queryContext); for (Expression resultExpression : resultExpressions) { resultColumns.add( new ResultColumn( @@ -136,18 +141,26 @@ private List concatSelectWithFrom( return resultColumns; } - private Expression contactGroupByWithFrom(Expression expression, List prefixPaths) { + private Expression contactGroupByWithFrom( + final Expression expression, + final List prefixPaths, + final MPPQueryContext queryContext) { List resultExpressions = - ExpressionAnalyzer.concatExpressionWithSuffixPaths(expression, prefixPaths, patternTree); + ExpressionAnalyzer.concatExpressionWithSuffixPaths( + expression, prefixPaths, patternTree, queryContext); if (resultExpressions.size() != 1) { throw new IllegalStateException("Expression in group by should indicate one value"); } return resultExpressions.get(0); } - private Expression contactOrderByWithFrom(Expression expression, List prefixPaths) { + private Expression contactOrderByWithFrom( + final Expression expression, + final List prefixPaths, + final MPPQueryContext queryContext) { List resultExpressions = - ExpressionAnalyzer.concatExpressionWithSuffixPaths(expression, prefixPaths, patternTree); + ExpressionAnalyzer.concatExpressionWithSuffixPaths( + expression, prefixPaths, patternTree, queryContext); if (resultExpressions.size() != 1) { throw new IllegalStateException("Expression in order by should indicate one value"); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionAnalyzer.java index 010cf8fcd01a..ebc52a86a627 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionAnalyzer.java @@ -26,6 +26,7 @@ import org.apache.iotdb.commons.udf.builtin.BuiltinScalarFunction; import org.apache.iotdb.commons.udf.builtin.BuiltinTimeSeriesGeneratingFunction; import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.header.ColumnHeader; import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; import org.apache.iotdb.db.queryengine.plan.expression.Expression; @@ -264,11 +265,15 @@ public static ResultColumn.ColumnType identifyOutputColumnType( * @return the concatenated expression list */ public static List concatExpressionWithSuffixPaths( - Expression expression, List prefixPaths, PathPatternTree patternTree) { + final Expression expression, + final List prefixPaths, + final PathPatternTree patternTree, + final MPPQueryContext queryContext) { return new ConcatExpressionWithSuffixPathsVisitor() .process( expression, - new ConcatExpressionWithSuffixPathsVisitor.Context(prefixPaths, patternTree)); + new ConcatExpressionWithSuffixPathsVisitor.Context( + prefixPaths, patternTree, queryContext)); } /** @@ -405,8 +410,11 @@ public static void constructPatternTreeFromExpression( * expressions */ public static List bindSchemaForExpression( - Expression expression, ISchemaTree schemaTree) { - return new BindSchemaForExpressionVisitor().process(expression, schemaTree); + final Expression expression, + final ISchemaTree schemaTree, + final MPPQueryContext queryContext) { + return new BindSchemaForExpressionVisitor() + .process(expression, new BindSchemaForExpressionVisitor.Context(schemaTree, queryContext)); } /** @@ -419,10 +427,16 @@ public static List bindSchemaForExpression( * @return the expression list with full path and after binding schema */ public static List bindSchemaForPredicate( - Expression predicate, List prefixPaths, ISchemaTree schemaTree, boolean isRoot) { + final Expression predicate, + final List prefixPaths, + final ISchemaTree schemaTree, + final boolean isRoot, + final MPPQueryContext queryContext) { return new BindSchemaForPredicateVisitor() .process( - predicate, new BindSchemaForPredicateVisitor.Context(prefixPaths, schemaTree, isRoot)); + predicate, + new BindSchemaForPredicateVisitor.Context( + prefixPaths, schemaTree, isRoot, queryContext)); } public static Expression replaceRawPathWithGroupedPath( @@ -445,11 +459,15 @@ public static Expression replaceRawPathWithGroupedPath( * @return expression list with full path and after binding schema */ public static List concatDeviceAndBindSchemaForExpression( - Expression expression, PartialPath devicePath, ISchemaTree schemaTree) { + final Expression expression, + final PartialPath devicePath, + final ISchemaTree schemaTree, + final MPPQueryContext queryContext) { return new ConcatDeviceAndBindSchemaForExpressionVisitor() .process( expression, - new ConcatDeviceAndBindSchemaForExpressionVisitor.Context(devicePath, schemaTree)); + new ConcatDeviceAndBindSchemaForExpressionVisitor.Context( + devicePath, schemaTree, queryContext)); } /** @@ -459,12 +477,16 @@ public static List concatDeviceAndBindSchemaForExpression( * @return the expression list with full path and after binding schema */ public static List concatDeviceAndBindSchemaForPredicate( - Expression predicate, PartialPath devicePath, ISchemaTree schemaTree, boolean isWhere) { + final Expression predicate, + final PartialPath devicePath, + final ISchemaTree schemaTree, + final boolean isWhere, + final MPPQueryContext queryContext) { return new ConcatDeviceAndBindSchemaForPredicateVisitor() .process( predicate, new ConcatDeviceAndBindSchemaForPredicateVisitor.Context( - devicePath, schemaTree, isWhere)); + devicePath, schemaTree, isWhere, queryContext)); } /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionUtils.java index 04b348a2462e..fc9d3a0aa518 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionUtils.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.plan.analyze; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType; import org.apache.iotdb.db.queryengine.plan.expression.UnknownExpressionTypeException; @@ -61,11 +62,16 @@ private ExpressionUtils() { // util class } - public static List reconstructTimeSeriesOperands( - TimeSeriesOperand rawExpression, List actualPaths) { + /* Use queryContext to record the memory usage of the constructed Expression. */ + public static List reconstructTimeSeriesOperandsWithMemoryCheck( + final TimeSeriesOperand rawExpression, + final List actualPaths, + final MPPQueryContext queryContext) { List resultExpressions = new ArrayList<>(); for (PartialPath actualPath : actualPaths) { - resultExpressions.add(reconstructTimeSeriesOperand(rawExpression, actualPath)); + resultExpressions.add( + reserveMemoryForExpression( + queryContext, reconstructTimeSeriesOperand(rawExpression, actualPath))); } return resultExpressions; } @@ -76,11 +82,15 @@ public static Expression reconstructTimeSeriesOperand( return cloneCommonFields(rawExpression, resultExpression); } - public static List reconstructFunctionExpressions( - FunctionExpression expression, List> childExpressionsList) { + public static List reconstructFunctionExpressionsWithMemoryCheck( + final FunctionExpression expression, + final List> childExpressionsList, + final MPPQueryContext queryContext) { List resultExpressions = new ArrayList<>(); for (List functionExpressions : childExpressionsList) { - resultExpressions.add(reconstructFunctionExpression(expression, functionExpressions)); + resultExpressions.add( + reserveMemoryForExpression( + queryContext, reconstructFunctionExpression(expression, functionExpressions))); } return resultExpressions; } @@ -107,11 +117,15 @@ public static Expression reconstructFunctionExpressionWithLowerCaseFunctionName( return cloneCommonFields(rawExpression, resultExpression); } - public static List reconstructUnaryExpressions( - UnaryExpression expression, List childExpressions) { + public static List reconstructUnaryExpressionsWithMemoryCheck( + final UnaryExpression expression, + final List childExpressions, + final MPPQueryContext queryContext) { List resultExpressions = new ArrayList<>(); for (Expression childExpression : childExpressions) { - resultExpressions.add(reconstructUnaryExpression(expression, childExpression)); + resultExpressions.add( + reserveMemoryForExpression( + queryContext, reconstructUnaryExpression(expression, childExpression))); } return resultExpressions; } @@ -172,14 +186,17 @@ public static Expression reconstructUnaryExpression( return cloneCommonFields(rawExpression, resultExpression); } - public static List reconstructBinaryExpressions( - BinaryExpression expression, - List leftExpressions, - List rightExpressions) { + public static List reconstructBinaryExpressionsWithMemoryCheck( + final BinaryExpression expression, + final List leftExpressions, + final List rightExpressions, + final MPPQueryContext queryContext) { List resultExpressions = new ArrayList<>(); for (Expression le : leftExpressions) { for (Expression re : rightExpressions) { - resultExpressions.add(reconstructBinaryExpression(expression, le, re)); + resultExpressions.add( + reserveMemoryForExpression( + queryContext, reconstructBinaryExpression(expression, le, re))); } } return resultExpressions; @@ -238,16 +255,19 @@ public static Expression reconstructBinaryExpression( return cloneCommonFields(rawExpression, resultExpression); } - public static List reconstructTernaryExpressions( - TernaryExpression expression, - List firstExpressions, - List secondExpressions, - List thirdExpressions) { + public static List reconstructTernaryExpressionsWithMemoryCheck( + final TernaryExpression expression, + final List firstExpressions, + final List secondExpressions, + final List thirdExpressions, + final MPPQueryContext queryContext) { List resultExpressions = new ArrayList<>(); for (Expression fe : firstExpressions) { for (Expression se : secondExpressions) for (Expression te : thirdExpressions) { - resultExpressions.add(reconstructTernaryExpression(expression, fe, se, te)); + resultExpressions.add( + reserveMemoryForExpression( + queryContext, reconstructTernaryExpression(expression, fe, se, te))); } } return resultExpressions; @@ -278,6 +298,12 @@ private static Expression cloneCommonFields( return resultExpression; } + private static Expression reserveMemoryForExpression( + MPPQueryContext queryContext, Expression expression) { + queryContext.reserveMemoryForFrontEnd(expression == null ? 0 : expression.ramBytesUsed()); + return expression; + } + /** * Make cartesian product. Attention, in this implementation, the way to handle the empty set is * to ignore it instead of making the result an empty set. diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java index 856476eae241..87bea8c61a98 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java @@ -129,6 +129,8 @@ public static boolean canBuildPlanUseTemplate( TimeSeriesOperand measurementPath = new TimeSeriesOperand( new MeasurementPath(new String[] {measurementName}, measurementSchema)); + // reserve memory for this expression + context.reserveMemoryForFrontEnd(measurementPath.ramBytesUsed()); outputExpressions.add(new Pair<>(measurementPath, null)); paginationController.consumeLimit(); } else { @@ -150,6 +152,8 @@ public static boolean canBuildPlanUseTemplate( TimeSeriesOperand measurementPath = new TimeSeriesOperand( new MeasurementPath(new String[] {measurementName}, measurementSchema)); + // reserve memory for this expression + context.reserveMemoryForFrontEnd(measurementPath.ramBytesUsed()); outputExpressions.add(new Pair<>(measurementPath, resultColumn.getAlias())); } else { break; @@ -183,7 +187,7 @@ public static boolean canBuildPlanUseTemplate( } analysis.setDeviceList(deviceList); - analyzeDeviceToOrderBy(analysis, queryStatement, schemaTree, deviceList); + analyzeDeviceToOrderBy(analysis, queryStatement, schemaTree, deviceList, context); analyzeDeviceToSourceTransform(analysis); analyzeDeviceToSource(analysis); @@ -272,7 +276,8 @@ private static void analyzeDeviceToOrderBy( Analysis analysis, QueryStatement queryStatement, ISchemaTree schemaTree, - List deviceSet) { + List deviceSet, + MPPQueryContext queryContext) { if (!queryStatement.hasOrderByExpression()) { return; } @@ -285,7 +290,8 @@ private static void analyzeDeviceToOrderBy( Set orderByExpressionsForOneDevice = new LinkedHashSet<>(); for (Expression expressionForItem : queryStatement.getExpressionSortItemList()) { List expressions = - concatDeviceAndBindSchemaForExpression(expressionForItem, device, schemaTree); + concatDeviceAndBindSchemaForExpression( + expressionForItem, device, schemaTree, queryContext); if (expressions.isEmpty()) { throw new SemanticException( String.format( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java index 9724a957b524..40a188e3d6a7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java @@ -179,6 +179,9 @@ public void start() { PERFORMANCE_OVERVIEW_METRICS.recordPlanCost(System.nanoTime() - startTime); schedule(); + // The last batch of memory reserved by the front end + context.reserveMemoryForFrontEndImmediately(); + // friendly for gc logicalPlan.clearUselessMemory(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/Expression.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/Expression.java index 48b1b5806dcb..9b89a122b06d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/Expression.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/Expression.java @@ -56,6 +56,7 @@ import org.apache.iotdb.db.queryengine.transformation.dag.udf.UDTFExecutor; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Accountable; import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.DataOutputStream; @@ -71,7 +72,7 @@ import java.util.Objects; /** A skeleton class for expression */ -public abstract class Expression extends StatementNode { +public abstract class Expression extends StatementNode implements Accountable { ///////////////////////////////////////////////////////////////////////////////////////////////// // Operations that Class Expression is not responsible for should be done through a visitor ///////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/binary/BinaryExpression.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/binary/BinaryExpression.java index 793e09d98f8c..39189759a60f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/binary/BinaryExpression.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/binary/BinaryExpression.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.plan.expression.binary; import org.apache.iotdb.db.queryengine.common.NodeRef; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.visitor.ExpressionVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation; @@ -27,6 +28,7 @@ import org.apache.iotdb.db.queryengine.transformation.dag.udf.UDTFExecutor; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.RamUsageEstimator; import java.io.DataOutputStream; import java.io.IOException; @@ -38,6 +40,9 @@ public abstract class BinaryExpression extends Expression { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(BinaryExpression.class); + protected Expression leftExpression; protected Expression rightExpression; @@ -156,6 +161,13 @@ public String getOutputSymbolInternal() { return buildExpression(left, right); } + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(leftExpression) + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(rightExpression); + } + private String buildExpression(String left, String right) { StringBuilder builder = new StringBuilder(); if (leftExpression.getExpressionType().getPriority() < this.getExpressionType().getPriority()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/ConstantOperand.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/ConstantOperand.java index 92a7a7b328ce..f4527cdc755a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/ConstantOperand.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/ConstantOperand.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.queryengine.transformation.dag.memory.LayerMemoryAssigner; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.DataOutputStream; @@ -40,6 +41,9 @@ public class ConstantOperand extends LeafOperand { public static final ConstantOperand FALSE = new ConstantOperand(TSDataType.BOOLEAN, "false"); public static final ConstantOperand TRUE = new ConstantOperand(TSDataType.BOOLEAN, "true"); + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(ConstantOperand.class); + private final String valueString; private final TSDataType dataType; @@ -116,4 +120,9 @@ protected void serialize(DataOutputStream stream) throws IOException { dataType.serializeTo(stream); ReadWriteIOUtils.write(valueString, stream); } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + RamUsageEstimator.sizeOf(valueString); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/NullOperand.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/NullOperand.java index 4c7181b9f814..4e2781a133c5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/NullOperand.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/NullOperand.java @@ -24,6 +24,8 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.queryengine.transformation.dag.memory.LayerMemoryAssigner; +import org.apache.tsfile.utils.RamUsageEstimator; + import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -31,6 +33,9 @@ import java.util.Map; public class NullOperand extends LeafOperand { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(NullOperand.class); + @Override public R accept(ExpressionVisitor visitor, C context) { return visitor.visitNullOperand(this, context); @@ -71,4 +76,9 @@ protected void serialize(ByteBuffer byteBuffer) { protected void serialize(DataOutputStream stream) throws IOException { // do nothing } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/TimeSeriesOperand.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/TimeSeriesOperand.java index a05b2577d936..22ded4d95d75 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/TimeSeriesOperand.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/TimeSeriesOperand.java @@ -22,12 +22,14 @@ import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathDeserializeUtil; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType; import org.apache.iotdb.db.queryengine.plan.expression.visitor.ExpressionVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.queryengine.transformation.dag.memory.LayerMemoryAssigner; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.RamUsageEstimator; import java.io.DataOutputStream; import java.io.IOException; @@ -37,6 +39,9 @@ public class TimeSeriesOperand extends LeafOperand { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(TimeSeriesOperand.class); + private PartialPath path; public TimeSeriesOperand(PartialPath path) { @@ -106,4 +111,9 @@ protected void serialize(ByteBuffer byteBuffer) { protected void serialize(DataOutputStream stream) throws IOException { path.serialize(stream); } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + MemoryEstimationHelper.getEstimatedSizeOfPartialPath(path); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/TimestampOperand.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/TimestampOperand.java index ece2f5ad9ef6..6c817a203363 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/TimestampOperand.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/leaf/TimestampOperand.java @@ -24,6 +24,8 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.queryengine.transformation.dag.memory.LayerMemoryAssigner; +import org.apache.tsfile.utils.RamUsageEstimator; + import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -32,6 +34,9 @@ public class TimestampOperand extends LeafOperand { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(TimeSeriesOperand.class); + public static final String TIMESTAMP_EXPRESSION_STRING = "Time"; public TimestampOperand() { @@ -82,4 +87,9 @@ protected void serialize(ByteBuffer byteBuffer) { protected void serialize(DataOutputStream stream) throws IOException { // do nothing } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/multi/FunctionExpression.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/multi/FunctionExpression.java index 4a0ff5458cf9..a17864ff3be6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/multi/FunctionExpression.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/multi/FunctionExpression.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.udf.builtin.BuiltinScalarFunction; import org.apache.iotdb.commons.udf.service.UDFManagementService; import org.apache.iotdb.db.queryengine.common.NodeRef; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType; import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand; @@ -37,6 +38,7 @@ import org.apache.iotdb.udf.api.customizer.strategy.AccessStrategy; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.DataOutputStream; @@ -53,6 +55,9 @@ public class FunctionExpression extends Expression { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(FunctionExpression.class); + private FunctionType functionType; private final String functionName; @@ -392,4 +397,25 @@ protected void serialize(DataOutputStream stream) throws IOException { Expression.serialize(expression, stream); } } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + RamUsageEstimator.sizeOf(functionName) + + RamUsageEstimator.sizeOf(parametersString) + + RamUsageEstimator.sizeOfMap(functionAttributes) + + (expressions == null + ? 0 + : expressions.stream() + .mapToLong(MemoryEstimationHelper::getEstimatedSizeOfAccountableObject) + .sum()) + + (paths == null + ? 0 + : paths.stream().mapToLong(MemoryEstimationHelper::getEstimatedSizeOfPartialPath).sum()) + + (countTimeExpressions == null + ? 0 + : countTimeExpressions.stream() + .mapToLong(MemoryEstimationHelper::getEstimatedSizeOfAccountableObject) + .sum()); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/other/CaseWhenThenExpression.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/other/CaseWhenThenExpression.java index cd28e9f110e0..142473af8532 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/other/CaseWhenThenExpression.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/other/CaseWhenThenExpression.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.plan.expression.other; import org.apache.iotdb.db.queryengine.common.NodeRef; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType; import org.apache.iotdb.db.queryengine.plan.expression.binary.WhenThenExpression; @@ -31,6 +32,7 @@ import org.apache.commons.lang3.Validate; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.DataOutputStream; @@ -42,6 +44,9 @@ import java.util.Map; public class CaseWhenThenExpression extends Expression { + + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(CaseWhenThenExpression.class); protected List whenThenExpressions = new ArrayList<>(); protected Expression elseExpression; @@ -183,4 +188,15 @@ public String getOutputSymbolInternal() { public R accept(ExpressionVisitor visitor, C context) { return visitor.visitCaseWhenThenExpression(this, context); } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(elseExpression) + + (whenThenExpressions == null + ? 0 + : whenThenExpressions.stream() + .mapToLong(MemoryEstimationHelper::getEstimatedSizeOfAccountableObject) + .sum()); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/other/GroupByTimeExpression.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/other/GroupByTimeExpression.java index 21ed6187449b..4ee3c174bc4b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/other/GroupByTimeExpression.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/other/GroupByTimeExpression.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.queryengine.transformation.dag.udf.UDTFExecutor; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.apache.tsfile.utils.TimeDuration; @@ -41,6 +42,9 @@ /** Only used for representing GROUP BY TIME filter. */ public class GroupByTimeExpression extends Expression { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(GroupByTimeExpression.class); + // [startTime, endTime] private final long startTime; private final long endTime; @@ -157,4 +161,9 @@ protected void serialize(DataOutputStream stream) throws IOException { public R accept(ExpressionVisitor visitor, C context) { return visitor.visitGroupByTimeExpression(this, context); } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/ternary/BetweenExpression.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/ternary/BetweenExpression.java index 04025bcab62e..00b307df3409 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/ternary/BetweenExpression.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/ternary/BetweenExpression.java @@ -21,10 +21,12 @@ package org.apache.iotdb.db.queryengine.plan.expression.ternary; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType; import org.apache.iotdb.db.queryengine.plan.expression.visitor.ExpressionVisitor; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.DataOutputStream; @@ -32,6 +34,10 @@ import java.nio.ByteBuffer; public class BetweenExpression extends TernaryExpression { + + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(TernaryExpression.class); + private final boolean isNotBetween; public boolean isNotBetween() { @@ -102,4 +108,12 @@ public String getOutputSymbolInternal() { public R accept(ExpressionVisitor visitor, C context) { return visitor.visitBetweenExpression(this, context); } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(firstExpression) + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(secondExpression) + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(thirdExpression); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/InExpression.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/InExpression.java index 5943b209ace7..0ace411cd94e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/InExpression.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/InExpression.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.plan.expression.unary; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType; import org.apache.iotdb.db.queryengine.plan.expression.leaf.ConstantOperand; @@ -27,6 +28,7 @@ import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression; import org.apache.iotdb.db.queryengine.plan.expression.visitor.ExpressionVisitor; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import javax.validation.constraints.NotNull; @@ -38,7 +40,8 @@ import java.util.LinkedHashSet; public class InExpression extends UnaryExpression { - + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(InExpression.class); private final boolean isNotIn; private final LinkedHashSet values; @@ -138,4 +141,11 @@ public String getOutputSymbolInternal() { public R accept(ExpressionVisitor visitor, C context) { return visitor.visitInExpression(this, context); } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(expression) + + (values == null ? 0 : values.stream().mapToLong(RamUsageEstimator::sizeOf).sum()); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/IsNullExpression.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/IsNullExpression.java index 75d55b242fd4..9542b29e2da8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/IsNullExpression.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/IsNullExpression.java @@ -19,10 +19,12 @@ package org.apache.iotdb.db.queryengine.plan.expression.unary; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType; import org.apache.iotdb.db.queryengine.plan.expression.visitor.ExpressionVisitor; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.DataOutputStream; @@ -30,6 +32,9 @@ import java.nio.ByteBuffer; public class IsNullExpression extends UnaryExpression { + + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(IsNullExpression.class); private final boolean isNot; public IsNullExpression(Expression expression, boolean isNot) { @@ -77,4 +82,9 @@ public String getOutputSymbolInternal() { public R accept(ExpressionVisitor visitor, C context) { return visitor.visitIsNullExpression(this, context); } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(expression); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/LikeExpression.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/LikeExpression.java index fdc158a9f764..a83824ce68dd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/LikeExpression.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/LikeExpression.java @@ -19,10 +19,12 @@ package org.apache.iotdb.db.queryengine.plan.expression.unary; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType; import org.apache.iotdb.db.queryengine.plan.expression.visitor.ExpressionVisitor; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.DataOutputStream; @@ -35,6 +37,9 @@ public class LikeExpression extends UnaryExpression { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(LikeExpression.class); + private final String patternString; private final Pattern pattern; @@ -107,4 +112,11 @@ public String getOutputSymbolInternal() { public R accept(ExpressionVisitor visitor, C context) { return visitor.visitLikeExpression(this, context); } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(expression) + + RamUsageEstimator.sizeOf(patternString); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/LogicNotExpression.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/LogicNotExpression.java index 69719cfd4158..9797376e5458 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/LogicNotExpression.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/LogicNotExpression.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.plan.expression.unary; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType; import org.apache.iotdb.db.queryengine.plan.expression.leaf.ConstantOperand; @@ -27,10 +28,15 @@ import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression; import org.apache.iotdb.db.queryengine.plan.expression.visitor.ExpressionVisitor; +import org.apache.tsfile.utils.RamUsageEstimator; + import java.nio.ByteBuffer; public class LogicNotExpression extends UnaryExpression { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(LogicNotExpression.class); + public LogicNotExpression(Expression expression) { super(expression); } @@ -64,4 +70,9 @@ public ExpressionType getExpressionType() { public R accept(ExpressionVisitor visitor, C context) { return visitor.visitLogicNotExpression(this, context); } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(expression); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/NegationExpression.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/NegationExpression.java index 52df786b4861..d496a05c2617 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/NegationExpression.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/NegationExpression.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.plan.expression.unary; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType; import org.apache.iotdb.db.queryengine.plan.expression.leaf.ConstantOperand; @@ -27,10 +28,15 @@ import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression; import org.apache.iotdb.db.queryengine.plan.expression.visitor.ExpressionVisitor; +import org.apache.tsfile.utils.RamUsageEstimator; + import java.nio.ByteBuffer; public class NegationExpression extends UnaryExpression { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(NegationExpression.class); + public NegationExpression(Expression expression) { super(expression); } @@ -70,4 +76,9 @@ public ExpressionType getExpressionType() { public R accept(ExpressionVisitor visitor, C context) { return visitor.visitNegationExpression(this, context); } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(expression); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/RegularExpression.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/RegularExpression.java index 39a1baac9495..4d517e219e70 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/RegularExpression.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/unary/RegularExpression.java @@ -19,11 +19,13 @@ package org.apache.iotdb.db.queryengine.plan.expression.unary; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType; import org.apache.iotdb.db.queryengine.plan.expression.visitor.ExpressionVisitor; import org.apache.commons.lang3.Validate; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.DataOutputStream; @@ -35,6 +37,9 @@ public class RegularExpression extends UnaryExpression { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(RegularExpression.class); + private final String patternString; private final Pattern pattern; @@ -111,4 +116,11 @@ public String getOutputSymbolInternal() { public R accept(ExpressionVisitor visitor, C context) { return visitor.visitRegularExpression(this, context); } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(expression) + + RamUsageEstimator.sizeOf(patternString); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForExpressionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForExpressionVisitor.java index 7383e2784ebf..fdbd9c52142c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForExpressionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForExpressionVisitor.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.schema.view.LogicalViewSchema; import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; import org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils; import org.apache.iotdb.db.queryengine.plan.expression.Expression; @@ -34,6 +35,7 @@ import org.apache.iotdb.db.schemaengine.schemaregion.view.visitor.TransformToExpressionVisitor; import org.apache.iotdb.db.utils.constant.SqlConstant; +import org.apache.commons.lang3.Validate; import org.apache.tsfile.write.schema.IMeasurementSchema; import java.util.ArrayList; @@ -43,20 +45,21 @@ import java.util.stream.Collectors; import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.cartesianProduct; -import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructFunctionExpressions; +import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructFunctionExpressionsWithMemoryCheck; import static org.apache.iotdb.db.utils.TypeInferenceUtils.bindTypeForBuiltinAggregationNonSeriesInputExpressions; import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME; -public class BindSchemaForExpressionVisitor extends CartesianProductVisitor { +public class BindSchemaForExpressionVisitor + extends CartesianProductVisitor { @Override public List visitFunctionExpression( - FunctionExpression functionExpression, ISchemaTree schemaTree) { + FunctionExpression functionExpression, Context context) { if (COUNT_TIME.equalsIgnoreCase(functionExpression.getFunctionName())) { List usedExpressions = functionExpression.getExpressions().stream() - .flatMap(e -> process(e, schemaTree).stream()) + .flatMap(e -> process(e, context).stream()) .collect(Collectors.toList()); Expression countTimeExpression = @@ -73,7 +76,7 @@ public List visitFunctionExpression( // to collect the produced expressions. List> extendedExpressions = new ArrayList<>(); for (Expression originExpression : functionExpression.getExpressions()) { - List actualExpressions = process(originExpression, schemaTree); + List actualExpressions = process(originExpression, context); if (actualExpressions.isEmpty()) { // Let's ignore the eval of the function which has at least one non-existence series as // input. See IOTDB-1212: https://github.com/apache/iotdb/pull/3101 @@ -96,15 +99,16 @@ public List visitFunctionExpression( List> childExpressionsList = new ArrayList<>(); cartesianProduct(extendedExpressions, childExpressionsList, 0, new ArrayList<>()); - return reconstructFunctionExpressions(functionExpression, childExpressionsList); + return reconstructFunctionExpressionsWithMemoryCheck( + functionExpression, childExpressionsList, context.getQueryContext()); } @Override public List visitTimeSeriesOperand( - TimeSeriesOperand timeSeriesOperand, ISchemaTree schemaTree) { + TimeSeriesOperand timeSeriesOperand, Context context) { PartialPath timeSeriesOperandPath = timeSeriesOperand.getPath(); List actualPaths = - schemaTree.searchMeasurementPaths(timeSeriesOperandPath).left; + context.getSchemaTree().searchMeasurementPaths(timeSeriesOperandPath).left; // process logical view List nonViewActualPaths = new ArrayList<>(); List viewPaths = new ArrayList<>(); @@ -116,10 +120,11 @@ public List visitTimeSeriesOperand( } } List reconstructTimeSeriesOperands = - ExpressionUtils.reconstructTimeSeriesOperands(timeSeriesOperand, nonViewActualPaths); + ExpressionUtils.reconstructTimeSeriesOperandsWithMemoryCheck( + timeSeriesOperand, nonViewActualPaths, context.getQueryContext()); // handle logical views for (MeasurementPath measurementPath : viewPaths) { - Expression replacedExpression = transformViewPath(measurementPath, schemaTree); + Expression replacedExpression = transformViewPath(measurementPath, context.getSchemaTree()); replacedExpression.setViewPath(measurementPath); reconstructTimeSeriesOperands.add(replacedExpression); } @@ -128,13 +133,12 @@ public List visitTimeSeriesOperand( @Override public List visitTimeStampOperand( - TimestampOperand timestampOperand, ISchemaTree schemaTree) { + TimestampOperand timestampOperand, Context context) { return Collections.singletonList(timestampOperand); } @Override - public List visitConstantOperand( - ConstantOperand constantOperand, ISchemaTree schemaTree) { + public List visitConstantOperand(ConstantOperand constantOperand, Context context) { return Collections.singletonList(constantOperand); } @@ -152,4 +156,24 @@ public static Expression transformViewPath( expression = new CompleteMeasurementSchemaVisitor().process(expression, schemaTree); return expression; } + + public static class Context implements QueryContextProvider { + private final ISchemaTree schemaTree; + private final MPPQueryContext queryContext; + + public Context(final ISchemaTree schemaTree, final MPPQueryContext queryContext) { + this.schemaTree = schemaTree; + Validate.notNull(queryContext, "QueryContext is null"); + this.queryContext = queryContext; + } + + public ISchemaTree getSchemaTree() { + return schemaTree; + } + + @Override + public MPPQueryContext getQueryContext() { + return queryContext; + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForPredicateVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForPredicateVisitor.java index f9c9d7e7d0e5..485a5ba3d4c9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForPredicateVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/BindSchemaForPredicateVisitor.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType; @@ -32,6 +33,8 @@ import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression; import org.apache.iotdb.db.utils.constant.SqlConstant; +import org.apache.commons.lang3.Validate; + import java.util.ArrayList; import java.util.Collections; import java.util.LinkedHashMap; @@ -39,9 +42,9 @@ import java.util.stream.Collectors; import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.cartesianProduct; -import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructBinaryExpressions; -import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructFunctionExpressions; -import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructTimeSeriesOperands; +import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructBinaryExpressionsWithMemoryCheck; +import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructFunctionExpressionsWithMemoryCheck; +import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructTimeSeriesOperandsWithMemoryCheck; import static org.apache.iotdb.db.queryengine.plan.expression.visitor.cartesian.BindSchemaForExpressionVisitor.transformViewPath; import static org.apache.iotdb.db.utils.TypeInferenceUtils.bindTypeForBuiltinAggregationNonSeriesInputExpressions; import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME; @@ -61,7 +64,8 @@ public List visitBinaryExpression( resultExpressions.addAll(rightExpressions); return resultExpressions; } - return reconstructBinaryExpressions(binaryExpression, leftExpressions, rightExpressions); + return reconstructBinaryExpressionsWithMemoryCheck( + binaryExpression, leftExpressions, rightExpressions, context.getQueryContext()); } @Override @@ -86,7 +90,11 @@ public List visitFunctionExpression(FunctionExpression predicate, Co extendedExpressions.add( process( suffixExpression, - new Context(context.getPrefixPaths(), context.getSchemaTree(), false))); + new Context( + context.getPrefixPaths(), + context.getSchemaTree(), + false, + context.getQueryContext()))); // We just process first input Expression of Count_IF, // keep other input Expressions as origin and bind Type @@ -99,7 +107,8 @@ public List visitFunctionExpression(FunctionExpression predicate, Co } List> childExpressionsList = new ArrayList<>(); cartesianProduct(extendedExpressions, childExpressionsList, 0, new ArrayList<>()); - return reconstructFunctionExpressions(predicate, childExpressionsList); + return reconstructFunctionExpressionsWithMemoryCheck( + predicate, childExpressionsList, context.getQueryContext()); } @Override @@ -130,7 +139,8 @@ public List visitTimeSeriesOperand(TimeSeriesOperand predicate, Cont } } List reconstructTimeSeriesOperands = - reconstructTimeSeriesOperands(predicate, nonViewPathList); + reconstructTimeSeriesOperandsWithMemoryCheck( + predicate, nonViewPathList, context.getQueryContext()); for (MeasurementPath measurementPath : viewPathList) { Expression replacedExpression = transformViewPath(measurementPath, context.getSchemaTree()); replacedExpression.setViewPath(measurementPath); @@ -150,19 +160,27 @@ public List visitConstantOperand(ConstantOperand constantOperand, Co return Collections.singletonList(constantOperand); } - public static class Context { + public static class Context implements QueryContextProvider { private final List prefixPaths; private final ISchemaTree schemaTree; private final boolean isRoot; - public Context(List prefixPaths, ISchemaTree schemaTree, boolean isRoot) { + private final MPPQueryContext queryContext; + + public Context( + final List prefixPaths, + final ISchemaTree schemaTree, + final boolean isRoot, + final MPPQueryContext queryContext) { this.prefixPaths = prefixPaths; this.schemaTree = schemaTree; this.isRoot = isRoot; + Validate.notNull(queryContext, "QueryContext is null"); + this.queryContext = queryContext; } public Context notRootClone() { - return new Context(this.prefixPaths, this.schemaTree, false); + return new Context(this.prefixPaths, this.schemaTree, false, queryContext); } public List getPrefixPaths() { @@ -176,5 +194,10 @@ public ISchemaTree getSchemaTree() { public boolean isRoot() { return isRoot; } + + @Override + public MPPQueryContext getQueryContext() { + return queryContext; + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/CartesianProductVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/CartesianProductVisitor.java index 0f8e7decc626..78a9b2ab974a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/CartesianProductVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/CartesianProductVisitor.java @@ -32,34 +32,39 @@ import java.util.List; import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.cartesianProduct; -import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructBinaryExpressions; +import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructBinaryExpressionsWithMemoryCheck; import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructCaseWhenThenExpression; -import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructTernaryExpressions; -import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructUnaryExpressions; +import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructTernaryExpressionsWithMemoryCheck; +import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructUnaryExpressionsWithMemoryCheck; -public abstract class CartesianProductVisitor +public abstract class CartesianProductVisitor extends ExpressionAnalyzeVisitor, C> { @Override public List visitTernaryExpression(TernaryExpression ternaryExpression, C context) { List> childResultsList = getResultsFromChild(ternaryExpression, context); - return reconstructTernaryExpressions( + return reconstructTernaryExpressionsWithMemoryCheck( ternaryExpression, childResultsList.get(0), childResultsList.get(1), - childResultsList.get(2)); + childResultsList.get(2), + context.getQueryContext()); } @Override public List visitBinaryExpression(BinaryExpression binaryExpression, C context) { List> childResultsList = getResultsFromChild(binaryExpression, context); - return reconstructBinaryExpressions( - binaryExpression, childResultsList.get(0), childResultsList.get(1)); + return reconstructBinaryExpressionsWithMemoryCheck( + binaryExpression, + childResultsList.get(0), + childResultsList.get(1), + context.getQueryContext()); } @Override public List visitUnaryExpression(UnaryExpression unaryExpression, C context) { List> childResultsList = getResultsFromChild(unaryExpression, context); - return reconstructUnaryExpressions(unaryExpression, childResultsList.get(0)); + return reconstructUnaryExpressionsWithMemoryCheck( + unaryExpression, childResultsList.get(0), context.getQueryContext()); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatDeviceAndBindSchemaForExpressionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatDeviceAndBindSchemaForExpressionVisitor.java index 25963b4b4f72..fc0bdb6f6691 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatDeviceAndBindSchemaForExpressionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatDeviceAndBindSchemaForExpressionVisitor.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; import org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils; import org.apache.iotdb.db.queryengine.plan.expression.Expression; @@ -31,6 +32,8 @@ import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression; import org.apache.iotdb.db.utils.constant.SqlConstant; +import org.apache.commons.lang3.Validate; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -39,7 +42,7 @@ import java.util.stream.Collectors; import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.cartesianProduct; -import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructFunctionExpressions; +import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructFunctionExpressionsWithMemoryCheck; import static org.apache.iotdb.db.queryengine.plan.expression.visitor.cartesian.BindSchemaForExpressionVisitor.transformViewPath; import static org.apache.iotdb.db.utils.TypeInferenceUtils.bindTypeForBuiltinAggregationNonSeriesInputExpressions; import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME; @@ -82,7 +85,8 @@ public List visitFunctionExpression( List> childExpressionsList = new ArrayList<>(); cartesianProduct(extendedExpressions, childExpressionsList, 0, new ArrayList<>()); - return reconstructFunctionExpressions(functionExpression, childExpressionsList); + return reconstructFunctionExpressionsWithMemoryCheck( + functionExpression, childExpressionsList, context.getQueryContext()); } @Override @@ -108,7 +112,8 @@ public List visitTimeSeriesOperand( } } List reconstructTimeSeriesOperands = - ExpressionUtils.reconstructTimeSeriesOperands(timeSeriesOperand, nonViewActualPaths); + ExpressionUtils.reconstructTimeSeriesOperandsWithMemoryCheck( + timeSeriesOperand, nonViewActualPaths, context.getQueryContext()); // handle logical views for (MeasurementPath measurementPath : viewPaths) { Expression replacedExpression = transformViewPath(measurementPath, context.getSchemaTree()); @@ -134,13 +139,20 @@ public List visitConstantOperand(ConstantOperand constantOperand, Co return Collections.singletonList(constantOperand); } - public static class Context { + public static class Context implements QueryContextProvider { private final PartialPath devicePath; private final ISchemaTree schemaTree; - public Context(PartialPath devicePath, ISchemaTree schemaTree) { + private final MPPQueryContext queryContext; + + public Context( + final PartialPath devicePath, + final ISchemaTree schemaTree, + final MPPQueryContext queryContext) { this.devicePath = devicePath; this.schemaTree = schemaTree; + Validate.notNull(queryContext, "QueryContext is null"); + this.queryContext = queryContext; } public PartialPath getDevicePath() { @@ -150,5 +162,10 @@ public PartialPath getDevicePath() { public ISchemaTree getSchemaTree() { return schemaTree; } + + @Override + public MPPQueryContext getQueryContext() { + return queryContext; + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatDeviceAndBindSchemaForPredicateVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatDeviceAndBindSchemaForPredicateVisitor.java index 7284568f34f6..372c861481a8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatDeviceAndBindSchemaForPredicateVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatDeviceAndBindSchemaForPredicateVisitor.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.leaf.ConstantOperand; @@ -30,13 +31,15 @@ import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand; import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression; +import org.apache.commons.lang3.Validate; + import java.util.ArrayList; import java.util.Collections; import java.util.List; import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.cartesianProduct; -import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructFunctionExpressions; -import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructTimeSeriesOperands; +import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructFunctionExpressionsWithMemoryCheck; +import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructTimeSeriesOperandsWithMemoryCheck; import static org.apache.iotdb.db.queryengine.plan.expression.visitor.cartesian.BindSchemaForExpressionVisitor.transformViewPath; public class ConcatDeviceAndBindSchemaForPredicateVisitor @@ -53,7 +56,8 @@ public List visitFunctionExpression(FunctionExpression predicate, Co } List> childExpressionsList = new ArrayList<>(); cartesianProduct(extendedExpressions, childExpressionsList, 0, new ArrayList<>()); - return reconstructFunctionExpressions(predicate, childExpressionsList); + return reconstructFunctionExpressionsWithMemoryCheck( + predicate, childExpressionsList, context.getQueryContext()); } @Override @@ -77,7 +81,8 @@ public List visitTimeSeriesOperand(TimeSeriesOperand predicate, Cont } List reconstructTimeSeriesOperands = - reconstructTimeSeriesOperands(predicate, nonViewPathList); + reconstructTimeSeriesOperandsWithMemoryCheck( + predicate, nonViewPathList, context.getQueryContext()); for (MeasurementPath measurementPath : viewPathList) { Expression replacedExpression = transformViewPath(measurementPath, context.getSchemaTree()); if (!(replacedExpression instanceof TimeSeriesOperand)) { @@ -102,15 +107,23 @@ public List visitConstantOperand(ConstantOperand constantOperand, Co return Collections.singletonList(constantOperand); } - public static class Context { + public static class Context implements QueryContextProvider { private final PartialPath devicePath; private final ISchemaTree schemaTree; private final boolean isWhere; - public Context(PartialPath devicePath, ISchemaTree schemaTree, boolean isWhere) { + private final MPPQueryContext queryContext; + + public Context( + final PartialPath devicePath, + final ISchemaTree schemaTree, + final boolean isWhere, + final MPPQueryContext queryContext) { this.devicePath = devicePath; this.schemaTree = schemaTree; this.isWhere = isWhere; + Validate.notNull(queryContext, "QueryContext is null"); + this.queryContext = queryContext; } public PartialPath getDevicePath() { @@ -124,5 +137,10 @@ public ISchemaTree getSchemaTree() { public boolean isWhere() { return isWhere; } + + @Override + public MPPQueryContext getQueryContext() { + return queryContext; + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatExpressionWithSuffixPathsVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatExpressionWithSuffixPathsVisitor.java index 3f53190c00d9..7c852083e2bf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatExpressionWithSuffixPathsVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/ConcatExpressionWithSuffixPathsVisitor.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathPatternTree; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.leaf.ConstantOperand; import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand; @@ -28,6 +29,7 @@ import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression; import org.apache.iotdb.db.utils.constant.SqlConstant; +import org.apache.commons.lang3.Validate; import org.apache.tsfile.common.constant.TsFileConstant; import java.util.ArrayList; @@ -35,8 +37,8 @@ import java.util.List; import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.cartesianProduct; -import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructFunctionExpressions; -import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructTimeSeriesOperands; +import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructFunctionExpressionsWithMemoryCheck; +import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils.reconstructTimeSeriesOperandsWithMemoryCheck; public class ConcatExpressionWithSuffixPathsVisitor extends CartesianProductVisitor { @@ -60,7 +62,8 @@ public List visitFunctionExpression( List> childExpressionsList = new ArrayList<>(); cartesianProduct(extendedExpressions, childExpressionsList, 0, new ArrayList<>()); - return reconstructFunctionExpressions(functionExpression, childExpressionsList); + return reconstructFunctionExpressionsWithMemoryCheck( + functionExpression, childExpressionsList, context.getQueryContext()); } @Override @@ -78,7 +81,8 @@ public List visitTimeSeriesOperand( actualPaths.add(concatPath); } } - return reconstructTimeSeriesOperands(timeSeriesOperand, actualPaths); + return reconstructTimeSeriesOperandsWithMemoryCheck( + timeSeriesOperand, actualPaths, context.getQueryContext()); } @Override @@ -92,13 +96,20 @@ public List visitConstantOperand(ConstantOperand constantOperand, Co return Collections.singletonList(constantOperand); } - public static class Context { + public static class Context implements QueryContextProvider { private final List prefixPaths; private final PathPatternTree patternTree; - public Context(List prefixPaths, PathPatternTree patternTree) { + private final MPPQueryContext queryContext; + + public Context( + final List prefixPaths, + final PathPatternTree patternTree, + final MPPQueryContext queryContext) { this.prefixPaths = prefixPaths; this.patternTree = patternTree; + Validate.notNull(queryContext, "QueryContext is null"); + this.queryContext = queryContext; } public List getPrefixPaths() { @@ -108,5 +119,10 @@ public List getPrefixPaths() { public PathPatternTree getPatternTree() { return patternTree; } + + @Override + public MPPQueryContext getQueryContext() { + return queryContext; + } } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/exception/MemoryNotEnoughExceptionTest.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/QueryContextProvider.java similarity index 61% rename from iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/exception/MemoryNotEnoughExceptionTest.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/QueryContextProvider.java index dbf5046f675b..2eb0ec0b4e72 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/exception/MemoryNotEnoughExceptionTest.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/cartesian/QueryContextProvider.java @@ -17,21 +17,10 @@ * under the License. */ -package org.apache.iotdb.db.queryengine.exception; +package org.apache.iotdb.db.queryengine.plan.expression.visitor.cartesian; -import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class MemoryNotEnoughExceptionTest { - - @Test - public void testMemoryNotEnoughExceptionStatusCode() { - MemoryNotEnoughException e = new MemoryNotEnoughException("test"); - assertEquals(TSStatusCode.QUOTA_MEM_QUERY_NOT_ENOUGH.getStatusCode(), e.getErrorCode()); - assertTrue(e.isUserException()); - } +public interface QueryContextProvider { + MPPQueryContext getQueryContext(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java index cf4b34619551..30ae6faff026 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java @@ -27,6 +27,7 @@ import org.apache.iotdb.commons.udf.builtin.BuiltinAggregationFunction; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; import org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils; import org.apache.iotdb.db.queryengine.plan.expression.Expression; @@ -60,6 +61,7 @@ import org.apache.iotdb.db.utils.SchemaUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.Validate; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.write.schema.IMeasurementSchema; @@ -86,8 +88,17 @@ public PlanNode optimize(PlanNode plan, Analysis analysis, MPPQueryContext conte || cannotUseStatistics(queryStatement, analysis)) { return plan; } - return plan.accept( - new Rewriter(), new RewriterContext(analysis, context, queryStatement.isAlignByDevice())); + + RewriterContext rewriterContext = + new RewriterContext(analysis, context, queryStatement.isAlignByDevice()); + PlanNode node; + try { + node = plan.accept(new Rewriter(), rewriterContext); + } finally { + // release the last batch of memory + rewriterContext.releaseMemoryForFrontEndImmediately(); + } + return node; } private boolean cannotUseStatistics(QueryStatement queryStatement, Analysis analysis) { @@ -288,12 +299,29 @@ public PlanNode visitRawDataAggregation(RawDataAggregationNode node, RewriterCon PlanNode resultNode = convergeWithTimeJoin(sourceNodeList, node.getScanOrder(), context); resultNode = planProject(resultNode, node, context); + + // After pushing down the predicate, the original scan nodes are no longer needed, we should + // release the memory that they occupied. + context.releaseMemoryForFrontEnd(getRamBytesUsedOfOldScanNodes(child)); return resultNode; } // cannot push down return node; } + private long getRamBytesUsedOfOldScanNodes(final PlanNode node) { + if (node == null) { + return 0L; + } + if (node instanceof SeriesScanSourceNode) { + SeriesScanSourceNode scanNode = (SeriesScanSourceNode) node; + return scanNode.ramBytesUsed(); + } else if (node instanceof FullOuterTimeJoinNode) { + return node.getChildren().stream().mapToLong(this::getRamBytesUsedOfOldScanNodes).sum(); + } + return 0L; + } + private void createAggregationDescriptor( FunctionExpression sourceExpression, AggregationStep curStep, @@ -407,19 +435,31 @@ private SeriesAggregationSourceNode createAggregationScanNode( GroupByTimeParameter groupByTimeParameter, RewriterContext context) { if (selectPath instanceof MeasurementPath) { // non-aligned series - return new SeriesAggregationScanNode( - context.genPlanNodeId(), - (MeasurementPath) selectPath, - aggregationDescriptorList, - scanOrder, - groupByTimeParameter); + SeriesAggregationSourceNode node = + new SeriesAggregationScanNode( + context.genPlanNodeId(), + (MeasurementPath) selectPath, + aggregationDescriptorList, + scanOrder, + groupByTimeParameter); + context + .getContext() + .reserveMemoryForFrontEnd( + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(node)); + return node; } else if (selectPath instanceof AlignedPath) { // aligned series - return new AlignedSeriesAggregationScanNode( - context.genPlanNodeId(), - (AlignedPath) selectPath, - aggregationDescriptorList, - scanOrder, - groupByTimeParameter); + SeriesAggregationSourceNode node = + new AlignedSeriesAggregationScanNode( + context.genPlanNodeId(), + (AlignedPath) selectPath, + aggregationDescriptorList, + scanOrder, + groupByTimeParameter); + context + .getContext() + .reserveMemoryForFrontEnd( + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(node)); + return node; } else { throw new IllegalArgumentException("unexpected path type"); } @@ -449,14 +489,19 @@ private PlanNode planProject(PlanNode resultNode, PlanNode rawNode, RewriterCont private static class RewriterContext { + private static final long RELEASE_BATCH_SIZE = 1024L * 1024L; + private final Analysis analysis; private final MPPQueryContext context; private final boolean isAlignByDevice; private String curDevice; + private long bytesToBeReleased = 0; + public RewriterContext(Analysis analysis, MPPQueryContext context, boolean isAlignByDevice) { this.analysis = analysis; + Validate.notNull(context, "Query context cannot be null."); this.context = context; this.isAlignByDevice = isAlignByDevice; } @@ -473,11 +518,29 @@ public void setCurDevice(String curDevice) { this.curDevice = curDevice; } + public MPPQueryContext getContext() { + return context; + } + public Set getAggregationExpressions() { if (isAlignByDevice) { return analysis.getDeviceToAggregationExpressions().get(curDevice); } return analysis.getAggregationExpressions(); } + + public void releaseMemoryForFrontEnd(final long bytes) { + bytesToBeReleased += bytes; + if (bytesToBeReleased >= RELEASE_BATCH_SIZE) { + releaseMemoryForFrontEndImmediately(); + } + } + + public void releaseMemoryForFrontEndImmediately() { + if (bytesToBeReleased > 0) { + context.releaseMemoryForFrontEnd(bytesToBeReleased); + bytesToBeReleased = 0; + } + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java index 14fd9be4b05f..6a8ba3365888 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java @@ -213,7 +213,27 @@ public synchronized long tryAllocateFreeMemoryForOperators(long memoryInBytes) { } } - public synchronized void releaseToFreeMemoryForOperators(long memoryInBytes) { + public synchronized void reserveMemoryForQueryFrontEnd( + final long memoryInBytes, final long reservedBytes, final String queryId) { + if (memoryInBytes > freeMemoryForOperators) { + throw new MemoryNotEnoughException( + String.format( + "There is not enough memory for planning-stage of Query %s, " + + "current remaining free memory is %dB, " + + "estimated memory usage is %dB, reserved memory for FE of this query in total is %dB", + queryId, freeMemoryForOperators, memoryInBytes, reservedBytes)); + } else { + freeMemoryForOperators -= memoryInBytes; + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "[ConsumeMemory] consume: {}, current remaining memory: {}", + memoryInBytes, + freeMemoryForOperators); + } + } + } + + public synchronized void releaseToFreeMemoryForOperators(final long memoryInBytes) { freeMemoryForOperators += memoryInBytes; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java index b1786c851129..9559b29e9763 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java @@ -31,6 +31,7 @@ import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.TimeseriesSchemaInfo; import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.execution.aggregation.AccumulatorFactory; import org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil; import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; @@ -82,6 +83,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceRegionScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesSourceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueriesNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.TimeseriesRegionScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor; @@ -138,6 +140,7 @@ public class LogicalPlanBuilder { public LogicalPlanBuilder(Analysis analysis, MPPQueryContext context) { this.analysis = analysis; + Validate.notNull(context, "Query context cannot be null"); this.context = context; } @@ -195,27 +198,26 @@ public LogicalPlanBuilder planRawDataSource( for (PartialPath path : groupedPaths) { if (path instanceof MeasurementPath) { // non-aligned series - SeriesScanNode seriesScanNode = - new SeriesScanNode( - context.getQueryId().genPlanNodeId(), - (MeasurementPath) path, - scanOrder, - limit, - offset, - null); - sourceNodeList.add(seriesScanNode); + sourceNodeList.add( + reserveMemoryForSeriesSourceNode( + new SeriesScanNode( + context.getQueryId().genPlanNodeId(), + (MeasurementPath) path, + scanOrder, + limit, + offset, + null))); } else if (path instanceof AlignedPath) { - // aligned series - AlignedSeriesScanNode alignedSeriesScanNode = - new AlignedSeriesScanNode( - context.getQueryId().genPlanNodeId(), - (AlignedPath) path, - scanOrder, - limit, - offset, - null, - lastLevelUseWildcard); - sourceNodeList.add(alignedSeriesScanNode); + sourceNodeList.add( + reserveMemoryForSeriesSourceNode( + new AlignedSeriesScanNode( + context.getQueryId().genPlanNodeId(), + (AlignedPath) path, + scanOrder, + limit, + offset, + null, + lastLevelUseWildcard))); } else { throw new IllegalArgumentException("Unexpected path type"); } @@ -274,14 +276,16 @@ public LogicalPlanBuilder planLast(Analysis analysis, Ordering timeseriesOrderin if (selectedPath.isUnderAlignedEntity()) { // aligned series sourceNodeList.add( - new AlignedLastQueryScanNode( - context.getQueryId().genPlanNodeId(), - new AlignedPath(selectedPath), - outputViewPath)); + reserveMemoryForSeriesSourceNode( + new AlignedLastQueryScanNode( + context.getQueryId().genPlanNodeId(), + new AlignedPath(selectedPath), + outputViewPath))); } else { // non-aligned series sourceNodeList.add( - new LastQueryScanNode( - context.getQueryId().genPlanNodeId(), selectedPath, outputViewPath)); + reserveMemoryForSeriesSourceNode( + new LastQueryScanNode( + context.getQueryId().genPlanNodeId(), selectedPath, outputViewPath))); } } } else { @@ -296,15 +300,18 @@ public LogicalPlanBuilder planLast(Analysis analysis, Ordering timeseriesOrderin alignedPath.addMeasurement(measurementPath); } sourceNodeList.add( - new AlignedLastQueryScanNode( - context.getQueryId().genPlanNodeId(), alignedPath, null)); + reserveMemoryForSeriesSourceNode( + new AlignedLastQueryScanNode( + context.getQueryId().genPlanNodeId(), alignedPath, null))); } else { // non-aligned series for (Expression sourceExpression : measurementToExpressionsOfDevice.values()) { MeasurementPath selectedPath = (MeasurementPath) ((TimeSeriesOperand) sourceExpression).getPath(); sourceNodeList.add( - new LastQueryScanNode(context.getQueryId().genPlanNodeId(), selectedPath, null)); + reserveMemoryForSeriesSourceNode( + new LastQueryScanNode( + context.getQueryId().genPlanNodeId(), selectedPath, null))); } } } @@ -1356,4 +1363,15 @@ public LogicalPlanBuilder planTimeseriesRegionScan( this.root = timeseriesRegionScanNode; return this; } + + /** + * There could be a lot of SeriesSourceNodes if there are too many series involved in one query. + * We need to check the memory used by SeriesSourceNodes.(Number of other PlanNodes are rather + * small compared to SourceNodes and could be safely ignored for now.) + */ + private PlanNode reserveMemoryForSeriesSourceNode(final SeriesSourceNode sourceNode) { + this.context.reserveMemoryForFrontEnd( + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(sourceNode)); + return sourceNode; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java index 6c49e4478b8e..c0f66e46c6cd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java @@ -22,6 +22,7 @@ import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.plan.expression.Expression; +import org.apache.commons.lang3.Validate; import org.apache.tsfile.read.filter.basic.Filter; import java.util.List; @@ -43,6 +44,7 @@ public class DistributionPlanContext { protected DistributionPlanContext(MPPQueryContext queryContext) { this.isRoot = true; + Validate.notNull(queryContext, "Query context cannot be null"); this.queryContext = queryContext; } @@ -90,4 +92,8 @@ public Filter getPartitionTimeFilter() { public boolean isOneSeriesInMultiRegion() { return oneSeriesInMultiRegion; } + + public MPPQueryContext getQueryContext() { + return queryContext; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java index 3ee8167c662c..cae6acfaf63b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java @@ -27,6 +27,7 @@ import org.apache.iotdb.commons.schema.SchemaConstant; import org.apache.iotdb.commons.utils.TimePartitionUtils; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression; @@ -799,6 +800,10 @@ private List splitSeriesSourceNodeByPartition( SeriesSourceNode split = (SeriesSourceNode) node.clone(); split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId()); split.setRegionReplicaSet(dataRegion); + context + .getQueryContext() + .reserveMemoryForFrontEnd( + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(split)); ret.add(split); } return ret; @@ -858,6 +863,10 @@ private List processSeriesAggregationSource( split.setAggregationDescriptorList(leafAggDescriptorList); split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId()); split.setRegionReplicaSet(dataRegion); + context + .getQueryContext() + .reserveMemoryForFrontEnd( + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(split)); aggregationNode.addChild(split); } return Collections.singletonList(aggregationNode); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedLastQueryScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedLastQueryScanNode.java index 764aaf627f77..c00e7ed81ec3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedLastQueryScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedLastQueryScanNode.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathDeserializeUtil; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; @@ -30,6 +31,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import com.google.common.collect.ImmutableList; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.eclipse.jetty.util.StringUtil; @@ -43,6 +45,9 @@ import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS; public class AlignedLastQueryScanNode extends LastSeriesSourceNode { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(AlignedLastQueryScanNode.class); + // The path of the target series which will be scanned. private final AlignedPath seriesPath; @@ -229,4 +234,12 @@ public String getOutputSymbolForSort() { public PartialPath getPartitionPath() { return getSeriesPath(); } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(id) + + MemoryEstimationHelper.getEstimatedSizeOfPartialPath(seriesPath) + + RamUsageEstimator.sizeOf(outputViewPath); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java index 2b69cdb0bc49..68a284c018de 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathDeserializeUtil; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; @@ -35,6 +36,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import com.google.common.collect.ImmutableList; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import javax.annotation.Nullable; @@ -47,6 +49,8 @@ import java.util.Objects; public class AlignedSeriesAggregationScanNode extends SeriesAggregationSourceNode { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(AlignedSeriesAggregationScanNode.class); // The paths of the target series which will be aggregated. private AlignedPath alignedPath; @@ -294,4 +298,11 @@ public String toString() { this.getAggregationDescriptorList(), PlanNodeUtil.printRegionReplicaSet(getRegionReplicaSet())); } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(id) + + MemoryEstimationHelper.getEstimatedSizeOfPartialPath(alignedPath); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java index abf9a70a09fc..327899e4bf62 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathDeserializeUtil; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; @@ -33,6 +34,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.tsfile.common.constant.TsFileConstant; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import javax.annotation.Nullable; @@ -46,6 +48,9 @@ public class AlignedSeriesScanNode extends SeriesScanSourceNode { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(AlignedSeriesScanNode.class); + // The paths of the target series which will be scanned. private final AlignedPath alignedPath; @@ -260,4 +265,11 @@ public String toString() { public PartialPath getPartitionPath() { return getAlignedPath(); } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(id) + + MemoryEstimationHelper.getEstimatedSizeOfPartialPath(alignedPath); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java index 6c9cbaa1adb1..8d4f8291a620 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathDeserializeUtil; import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; @@ -30,6 +31,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import com.google.common.collect.ImmutableList; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.eclipse.jetty.util.StringUtil; @@ -42,6 +44,9 @@ public class LastQueryScanNode extends LastSeriesSourceNode { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(LastQueryScanNode.class); + public static final List LAST_QUERY_HEADER_COLUMNS = ImmutableList.of( ColumnHeaderConstant.TIMESERIES, @@ -229,4 +234,12 @@ public String outputPathSymbol() { return outputViewPath; } } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(id) + + MemoryEstimationHelper.getEstimatedSizeOfPartialPath(seriesPath) + + RamUsageEstimator.sizeOf(outputViewPath); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesAggregationScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesAggregationScanNode.java index 8c3bcb6ec610..01e801cad221 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesAggregationScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesAggregationScanNode.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathDeserializeUtil; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; @@ -34,6 +35,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import com.google.common.collect.ImmutableList; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import javax.annotation.Nullable; @@ -62,6 +64,9 @@ */ public class SeriesAggregationScanNode extends SeriesAggregationSourceNode { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(SeriesAggregationScanNode.class); + // The path of the target series which will be aggregated. private final MeasurementPath seriesPath; @@ -297,4 +302,11 @@ public String toString() { this.getAggregationDescriptorList(), PlanNodeUtil.printRegionReplicaSet(this.getRegionReplicaSet())); } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(id) + + MemoryEstimationHelper.getEstimatedSizeOfPartialPath(seriesPath); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesScanNode.java index 3bd1087f6242..587d1ac248c4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesScanNode.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathDeserializeUtil; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; @@ -31,6 +32,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import com.google.common.collect.ImmutableList; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import javax.annotation.Nullable; @@ -50,6 +52,9 @@ */ public class SeriesScanNode extends SeriesScanSourceNode { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(SeriesScanNode.class); + // The path of the target series which will be scanned. private final MeasurementPath seriesPath; @@ -195,4 +200,11 @@ public int hashCode() { public PartialPath getPartitionPath() { return getSeriesPath(); } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(id) + + MemoryEstimationHelper.getEstimatedSizeOfPartialPath(seriesPath); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesSourceNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesSourceNode.java index d5b9c64f3214..c2e052e76b7d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesSourceNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesSourceNode.java @@ -23,7 +23,9 @@ import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; -public abstract class SeriesSourceNode extends SourceNode { +import org.apache.tsfile.utils.Accountable; + +public abstract class SeriesSourceNode extends SourceNode implements Accountable { protected SeriesSourceNode(PlanNodeId id) { super(id); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java index 837e4b7ef037..e698ba8487e6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.protocol.thrift.OperationType; +import org.apache.iotdb.db.queryengine.exception.MemoryNotEnoughException; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -147,6 +148,8 @@ private static TSStatus tryCatchQueryException(Exception e) { ((IoTDBException) t.getCause()).getErrorCode(), rootCause.getMessage()); } return RpcUtils.getStatus(TSStatusCode.SEMANTIC_ERROR, rootCause.getMessage()); + } else if (t instanceof MemoryNotEnoughException) { + return RpcUtils.getStatus(TSStatusCode.QUOTA_MEM_QUERY_NOT_ENOUGH, rootCause.getMessage()); } if (t instanceof RuntimeException && rootCause instanceof IoTDBException) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionAnalyzerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionAnalyzerTest.java index a4f36c14dd04..1cd47c695d3a 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionAnalyzerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionAnalyzerTest.java @@ -22,6 +22,8 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathPatternTree; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; import org.junit.Test; @@ -55,7 +57,8 @@ public void testRemoveWildcardInFilter() throws IllegalPathException { and(gt(timeSeries("s1"), intValue("1")), gt(timeSeries("s2"), intValue("1"))), prefixPaths, fakeSchemaTree, - true)); + true, + new MPPQueryContext(new QueryId("test")))); assertEquals( Arrays.asList( @@ -79,6 +82,7 @@ public void testRemoveWildcardInFilter() throws IllegalPathException { count(and(gt(timeSeries("s1"), intValue("1")), gt(timeSeries("s2"), intValue("1")))), prefixPaths, fakeSchemaTree, - true)); + true, + new MPPQueryContext(new QueryId("test")))); } }