add retained size of operators after calling next
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
index f7c3f22..47aa680 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
@@ -168,18 +168,33 @@
    *
    * <p>N = (X / ( M * Y)) * Y = X / M
    *
-   * <p>The estimated memory size this FI would use is:
+   * <p>The estimated running memory size this FI would use is:
    *
    * <p>N * avgMemoryUsedPerDriver = N * totalSizeOfDriver / driverNum
+   *
+   * <p>Some operators still retain memory when they are not running, so we need to add the size of
+   * retained memory of all the Drivers when they are not running.
+   *
+   * <p>The total estimated memory size this FI would use is:
+   *
+   * <p>retainedSize + N * totalSizeOfDriver / driverNum
    */
   private long calculateEstimatedMemorySize(
       final List<PipelineDriverFactory> pipelineDriverFactories) {
+    long retainedSize =
+        pipelineDriverFactories.stream()
+            .map(
+                pipelineDriverFactory ->
+                    pipelineDriverFactory.getOperation().calculateRetainedSizeAfterCallingNext())
+            .reduce(0L, Long::sum);
     long totalSizeOfDrivers =
         pipelineDriverFactories.stream()
             .map(PipelineDriverFactory::getEstimatedMemorySize)
             .reduce(0L, Long::sum);
-    return Math.max((QUERY_THREAD_COUNT / ESTIMATED_FI_NUM), 1)
-        * (totalSizeOfDrivers / pipelineDriverFactories.size());
+    long runningMemorySize =
+        Math.max((QUERY_THREAD_COUNT / ESTIMATED_FI_NUM), 1)
+            * (totalSizeOfDrivers / pipelineDriverFactories.size());
+    return retainedSize + runningMemorySize;
   }
 
   private List<PartialPath> collectSourcePaths(LocalExecutionPlanContext context) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/PipelineDriverFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/PipelineDriverFactory.java
index 38d19ef..75eb884 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/PipelineDriverFactory.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/PipelineDriverFactory.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.queryengine.plan.planner;
 
-import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.queryengine.execution.driver.DataDriver;
 import org.apache.iotdb.db.queryengine.execution.driver.DataDriverContext;
 import org.apache.iotdb.db.queryengine.execution.driver.Driver;
@@ -87,7 +86,6 @@
     this.driverContext.setDownstreamOperator(exchangeOperator);
   }
 
-  @TestOnly
   public Operator getOperation() {
     return operation;
   }
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/LocalExecutionPlannerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/LocalExecutionPlannerTest.java
index 348c9b6..f3d433d 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/LocalExecutionPlannerTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/LocalExecutionPlannerTest.java
@@ -82,9 +82,6 @@
           root, context.getDriverContext(), root.calculateMaxPeekMemory());
       Assert.assertEquals(
           ALIGNED_MAX_SIZE, calculateEstimatedMemorySize(context.getPipelineDriverFactories()));
-      Assert.assertEquals(
-          calculateEstimatedMemorySizeFromOperation(context.getPipelineDriverFactories()),
-          calculateEstimatedMemorySize(context.getPipelineDriverFactories()));
     } catch (Exception e) {
       Assert.fail();
     }
@@ -116,32 +113,32 @@
     // ALIGNED_MAX_SIZE * 5 / 2 is calculated by directly applying the algorithm on the Operator
     // Tree.
     Assert.assertEquals(
-        ALIGNED_MAX_SIZE * 5 / 2,
+        ALIGNED_MAX_SIZE * 5 / 2
+            + context.getPipelineDriverFactories().stream()
+                .map(
+                    pipelineDriverFactory ->
+                        pipelineDriverFactory
+                            .getOperation()
+                            .calculateRetainedSizeAfterCallingNext())
+                .reduce(0L, Long::sum),
         calculateEstimatedMemorySize(context.getPipelineDriverFactories()));
-    Assert.assertEquals(
-        calculateEstimatedMemorySizeFromOperation(context.getPipelineDriverFactories()),
-        calculateEstimatedMemorySize(context.getPipelineDriverFactories()));
-  }
-
-  private long calculateEstimatedMemorySizeFromOperation(
-      final List<PipelineDriverFactory> pipelineDriverFactories) {
-    long totalSizeOfDrivers =
-        pipelineDriverFactories.stream()
-            .map(
-                pipelineDriverFactory ->
-                    pipelineDriverFactory.getOperation().calculateMaxPeekMemory())
-            .reduce(0L, Long::sum);
-    return Math.max((QUERY_THREAD_COUNT / ESTIMATED_FI_NUM), 1)
-        * (totalSizeOfDrivers / pipelineDriverFactories.size());
   }
 
   private long calculateEstimatedMemorySize(
       final List<PipelineDriverFactory> pipelineDriverFactories) {
+    long retainedSize =
+        pipelineDriverFactories.stream()
+            .map(
+                pipelineDriverFactory ->
+                    pipelineDriverFactory.getOperation().calculateRetainedSizeAfterCallingNext())
+            .reduce(0L, Long::sum);
     long totalSizeOfDrivers =
         pipelineDriverFactories.stream()
             .map(PipelineDriverFactory::getEstimatedMemorySize)
             .reduce(0L, Long::sum);
-    return Math.max((QUERY_THREAD_COUNT / ESTIMATED_FI_NUM), 1)
-        * (totalSizeOfDrivers / pipelineDriverFactories.size());
+    long runningMemorySize =
+        Math.max((QUERY_THREAD_COUNT / ESTIMATED_FI_NUM), 1)
+            * (totalSizeOfDrivers / pipelineDriverFactories.size());
+    return retainedSize + runningMemorySize;
   }
 }