add retained size of operators after calling next
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
index f7c3f22..47aa680 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
@@ -168,18 +168,33 @@
*
* <p>N = (X / ( M * Y)) * Y = X / M
*
- * <p>The estimated memory size this FI would use is:
+ * <p>The estimated running memory size this FI would use is:
*
* <p>N * avgMemoryUsedPerDriver = N * totalSizeOfDriver / driverNum
+ *
+ * <p>Some operators still retain memory when they are not running, so we need to add the size of
+ * retained memory of all the Drivers when they are not running.
+ *
+ * <p>The total estimated memory size this FI would use is:
+ *
+ * <p>retainedSize + N * totalSizeOfDriver / driverNum
*/
private long calculateEstimatedMemorySize(
final List<PipelineDriverFactory> pipelineDriverFactories) {
+ long retainedSize =
+ pipelineDriverFactories.stream()
+ .map(
+ pipelineDriverFactory ->
+ pipelineDriverFactory.getOperation().calculateRetainedSizeAfterCallingNext())
+ .reduce(0L, Long::sum);
long totalSizeOfDrivers =
pipelineDriverFactories.stream()
.map(PipelineDriverFactory::getEstimatedMemorySize)
.reduce(0L, Long::sum);
- return Math.max((QUERY_THREAD_COUNT / ESTIMATED_FI_NUM), 1)
- * (totalSizeOfDrivers / pipelineDriverFactories.size());
+ long runningMemorySize =
+ Math.max((QUERY_THREAD_COUNT / ESTIMATED_FI_NUM), 1)
+ * (totalSizeOfDrivers / pipelineDriverFactories.size());
+ return retainedSize + runningMemorySize;
}
private List<PartialPath> collectSourcePaths(LocalExecutionPlanContext context) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/PipelineDriverFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/PipelineDriverFactory.java
index 38d19ef..75eb884 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/PipelineDriverFactory.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/PipelineDriverFactory.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.queryengine.plan.planner;
-import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.queryengine.execution.driver.DataDriver;
import org.apache.iotdb.db.queryengine.execution.driver.DataDriverContext;
import org.apache.iotdb.db.queryengine.execution.driver.Driver;
@@ -87,7 +86,6 @@
this.driverContext.setDownstreamOperator(exchangeOperator);
}
- @TestOnly
public Operator getOperation() {
return operation;
}
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/LocalExecutionPlannerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/LocalExecutionPlannerTest.java
index 348c9b6..f3d433d 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/LocalExecutionPlannerTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/LocalExecutionPlannerTest.java
@@ -82,9 +82,6 @@
root, context.getDriverContext(), root.calculateMaxPeekMemory());
Assert.assertEquals(
ALIGNED_MAX_SIZE, calculateEstimatedMemorySize(context.getPipelineDriverFactories()));
- Assert.assertEquals(
- calculateEstimatedMemorySizeFromOperation(context.getPipelineDriverFactories()),
- calculateEstimatedMemorySize(context.getPipelineDriverFactories()));
} catch (Exception e) {
Assert.fail();
}
@@ -116,32 +113,32 @@
// ALIGNED_MAX_SIZE * 5 / 2 is calculated by directly applying the algorithm on the Operator
// Tree.
Assert.assertEquals(
- ALIGNED_MAX_SIZE * 5 / 2,
+ ALIGNED_MAX_SIZE * 5 / 2
+ + context.getPipelineDriverFactories().stream()
+ .map(
+ pipelineDriverFactory ->
+ pipelineDriverFactory
+ .getOperation()
+ .calculateRetainedSizeAfterCallingNext())
+ .reduce(0L, Long::sum),
calculateEstimatedMemorySize(context.getPipelineDriverFactories()));
- Assert.assertEquals(
- calculateEstimatedMemorySizeFromOperation(context.getPipelineDriverFactories()),
- calculateEstimatedMemorySize(context.getPipelineDriverFactories()));
- }
-
- private long calculateEstimatedMemorySizeFromOperation(
- final List<PipelineDriverFactory> pipelineDriverFactories) {
- long totalSizeOfDrivers =
- pipelineDriverFactories.stream()
- .map(
- pipelineDriverFactory ->
- pipelineDriverFactory.getOperation().calculateMaxPeekMemory())
- .reduce(0L, Long::sum);
- return Math.max((QUERY_THREAD_COUNT / ESTIMATED_FI_NUM), 1)
- * (totalSizeOfDrivers / pipelineDriverFactories.size());
}
private long calculateEstimatedMemorySize(
final List<PipelineDriverFactory> pipelineDriverFactories) {
+ long retainedSize =
+ pipelineDriverFactories.stream()
+ .map(
+ pipelineDriverFactory ->
+ pipelineDriverFactory.getOperation().calculateRetainedSizeAfterCallingNext())
+ .reduce(0L, Long::sum);
long totalSizeOfDrivers =
pipelineDriverFactories.stream()
.map(PipelineDriverFactory::getEstimatedMemorySize)
.reduce(0L, Long::sum);
- return Math.max((QUERY_THREAD_COUNT / ESTIMATED_FI_NUM), 1)
- * (totalSizeOfDrivers / pipelineDriverFactories.size());
+ long runningMemorySize =
+ Math.max((QUERY_THREAD_COUNT / ESTIMATED_FI_NUM), 1)
+ * (totalSizeOfDrivers / pipelineDriverFactories.size());
+ return retainedSize + runningMemorySize;
}
}