Skip to content

Commit

Permalink
Introducing a memory control mechanism during the query planning stage
Browse files Browse the repository at this point in the history
  • Loading branch information
lancelly authored May 24, 2024
1 parent 911d7b6 commit 462c900
Show file tree
Hide file tree
Showing 47 changed files with 872 additions and 213 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
import org.apache.iotdb.db.queryengine.plan.analyze.lock.SchemaLockType;
import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
import org.apache.iotdb.db.queryengine.statistics.QueryPlanStatistics;

import org.apache.tsfile.read.filter.basic.Filter;
Expand Down Expand Up @@ -75,6 +76,19 @@ public class MPPQueryContext {

QueryPlanStatistics queryPlanStatistics = null;

// To avoid query front-end from consuming too much memory, it needs to reserve memory when
// constructing some Expression and PlanNode.
private long reservedBytesInTotalForFrontEnd = 0;

private long bytesToBeReservedForFrontEnd = 0;

// To avoid reserving memory too frequently, we choose to do it in batches. This is the lower
// bound
// for each batch.
private static final long MEMORY_BATCH_THRESHOLD = 1024L * 1024L;

private final LocalExecutionPlanner LOCAL_EXECUTION_PLANNER = LocalExecutionPlanner.getInstance();

public MPPQueryContext(QueryId queryId) {
this.queryId = queryId;
this.endPointBlackList = new LinkedList<>();
Expand Down Expand Up @@ -113,6 +127,7 @@ public MPPQueryContext(

public void prepareForRetry() {
this.initResultNodeContext();
this.releaseMemoryForFrontEnd();
}

private void initResultNodeContext() {
Expand Down Expand Up @@ -290,4 +305,49 @@ public void setLogicalOptimizationCost(long logicalOptimizeCost) {
}
queryPlanStatistics.setLogicalOptimizationCost(logicalOptimizeCost);
}

// region =========== FE memory related, make sure its not called concurrently ===========

/**
* This method does not require concurrency control because the query plan is generated in a
* single-threaded manner.
*/
public void reserveMemoryForFrontEnd(final long bytes) {
this.bytesToBeReservedForFrontEnd += bytes;
if (this.bytesToBeReservedForFrontEnd >= MEMORY_BATCH_THRESHOLD) {
reserveMemoryForFrontEndImmediately();
}
}

public void reserveMemoryForFrontEndImmediately() {
if (bytesToBeReservedForFrontEnd != 0) {
LOCAL_EXECUTION_PLANNER.reserveMemoryForQueryFrontEnd(
bytesToBeReservedForFrontEnd, reservedBytesInTotalForFrontEnd, queryId.getId());
this.reservedBytesInTotalForFrontEnd += bytesToBeReservedForFrontEnd;
this.bytesToBeReservedForFrontEnd = 0;
}
}

public void releaseMemoryForFrontEnd() {
if (reservedBytesInTotalForFrontEnd != 0) {
LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(reservedBytesInTotalForFrontEnd);
reservedBytesInTotalForFrontEnd = 0;
}
}

public void releaseMemoryForFrontEnd(final long bytes) {
if (bytes != 0) {
long bytesToRelease;
if (bytes <= bytesToBeReservedForFrontEnd) {
bytesToBeReservedForFrontEnd -= bytes;
} else {
bytesToRelease = bytes - bytesToBeReservedForFrontEnd;
bytesToBeReservedForFrontEnd = 0;
LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(bytesToRelease);
reservedBytesInTotalForFrontEnd -= bytesToRelease;
}
}
}

// endregion
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@

package org.apache.iotdb.db.queryengine.exception;

import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.rpc.TSStatusCode;

public class MemoryNotEnoughException extends IoTDBException {
public class MemoryNotEnoughException extends RuntimeException {

public MemoryNotEnoughException(String message) {
super(message, TSStatusCode.QUOTA_MEM_QUERY_NOT_ENOUGH.getStatusCode(), true);
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,10 @@ public static long getEstimatedSizeOfPartialPath(@Nullable final PartialPath par
totalSize += MEASUREMENT_PATH_INSTANCE_SIZE;
MeasurementPath measurementPath = (MeasurementPath) partialPath;
totalSize += RamUsageEstimator.sizeOf(measurementPath.getMeasurementAlias());
totalSize +=
RamUsageEstimator.sizeOf(measurementPath.getMeasurementSchema().getMeasurementId());
if (measurementPath.getMeasurementSchema() != null) {
totalSize +=
RamUsageEstimator.sizeOf(measurementPath.getMeasurementSchema().getMeasurementId());
}
} else {
totalSize += PARTIAL_PATH_INSTANCE_SIZE;
totalSize += RamUsageEstimator.sizeOf(partialPath.getMeasurement());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ private ExecutionResult execution(
}
return result;
} finally {
if (queryContext != null) {
queryContext.releaseMemoryForFrontEnd();
}
if (queryContext != null && !queryContext.getAcquiredLockNumMap().isEmpty()) {
Map<SchemaLockType, Integer> lockMap = queryContext.getAcquiredLockNumMap();
for (Map.Entry<SchemaLockType, Integer> entry : lockMap.entrySet()) {
Expand Down
Loading

0 comments on commit 462c900

Please sign in to comment.