design draft
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
index a1f73ef..397319f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.mpp.aggregation;
 
+import org.apache.iotdb.db.mpp.execution.operator.IWindow;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -40,7 +41,7 @@
   protected List<InputLocation[]> inputLocationList;
   protected final AggregationStep step;
 
-  protected TimeRange curTimeRange = new TimeRange(0, Long.MAX_VALUE);
+  protected IWindow curWindow;
 
   // Used for SeriesAggregateScanOperator
   public Aggregator(Accumulator accumulator, AggregationStep step) {
@@ -64,7 +65,7 @@
         step.isInputRaw(),
         "Step in SeriesAggregateScanOperator and RawDataAggregateOperator can only process raw input");
     if (inputLocationList == null) {
-      return accumulator.addInput(tsBlock.getTimeAndValueColumn(0), curTimeRange);
+      return accumulator.addInput(tsBlock.getTimeAndValueColumn(0), curWindow);
     } else {
       int lastReadReadIndex = 0;
       for (InputLocation[] inputLocations : inputLocationList) {
@@ -75,7 +76,7 @@
         timeValueColumn[0] = tsBlock.getTimeColumn();
         timeValueColumn[1] = tsBlock.getColumn(inputLocations[0].getValueColumnIndex());
         lastReadReadIndex =
-            Math.max(lastReadReadIndex, accumulator.addInput(timeValueColumn, curTimeRange));
+            Math.max(lastReadReadIndex, accumulator.addInput(timeValueColumn, curWindow));
       }
       return lastReadReadIndex;
     }
@@ -128,20 +129,24 @@
   }
 
   public void reset() {
-    curTimeRange = new TimeRange(0, Long.MAX_VALUE);
+    curWindow = null;
     accumulator.reset();
   }
 
   public boolean hasFinalResult() {
+    if (!curWindow.isTimeWindow()) {
+      return false;
+    }
     return accumulator.hasFinalResult();
   }
 
   public void updateTimeRange(TimeRange curTimeRange) {
     reset();
-    this.curTimeRange = curTimeRange;
+    this.curWindow = new TimeWindow(curTimeRange);
   }
 
-  public TimeRange getCurTimeRange() {
-    return curTimeRange;
+  public void updateWindow(IWindow curWindow) {
+    reset();
+    this.curWindow = curWindow;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java
index e58d239..23ed041 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.mpp.aggregation;
 
+import org.apache.iotdb.db.mpp.execution.operator.IWindow;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
@@ -222,21 +223,25 @@
     return firstValue.getDataType();
   }
 
-  protected int addIntInput(Column[] column, TimeRange timeRange) {
-    int curPositionCount = column[0].getPositionCount();
-    long curMinTime = timeRange.getMin();
-    long curMaxTime = timeRange.getMax();
+  protected int addIntInput(Column[] column, IWindow curWindow) {
+    int windowControlColumnIndex = curWindow.getControlColumnIndex();
+    boolean isTimeWindow = curWindow.isTimeWindow();
+    int curPositionCount = column[windowControlColumnIndex].getPositionCount();
+
     for (int i = 0; i < curPositionCount; i++) {
-      long curTime = column[0].getLong(i);
-      if (curTime > curMaxTime || curTime < curMinTime) {
+      if (!curWindow.satisfy(column[windowControlColumnIndex])) {
         return i;
       }
+
+      curWindow.acceptOnePoint();
       if (!column[1].isNull(i)) {
         updateIntFirstValue(column[1].getInt(i), curTime);
-        return i;
+        if (isTimeWindow) {
+          return i;
+        }
       }
     }
-    return column[0].getPositionCount();
+    return column[windowControlColumnIndex].getPositionCount();
   }
 
   protected void updateIntFirstValue(int value, long curTime) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java
index 10035dd..eb0acde 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.mpp.aggregation;
 
+import org.apache.iotdb.db.mpp.execution.operator.IWindow;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
@@ -221,7 +222,7 @@
     return lastValue.getDataType();
   }
 
-  protected int addIntInput(Column[] column, TimeRange timeRange) {
+  protected int addIntInput(Column[] column, IWindow curWindow) {
     int curPositionCount = column[0].getPositionCount();
     long curMinTime = timeRange.getMin();
     long curMaxTime = timeRange.getMax();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregator.java
index ec848b3..8d328d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregator.java
@@ -45,6 +45,8 @@
   // cached partial aggregation result of pre-aggregate windows
   protected Deque<PartialAggregationResult> deque;
 
+  protected TimeRange curTimeRange;
+
   public SlidingWindowAggregator(
       Accumulator accumulator, List<InputLocation[]> inputLocationList, AggregationStep step) {
     super(accumulator, step, inputLocationList);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
index 5bbb1ad..fb33af7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
@@ -132,12 +132,10 @@
 
   /** Append a row of aggregation results to the result tsBlock. */
   public static void appendAggregationResult(
-      TsBlockBuilder tsBlockBuilder,
-      List<? extends Aggregator> aggregators,
-      ITimeRangeIterator timeRangeIterator) {
+      TsBlockBuilder tsBlockBuilder, List<? extends Aggregator> aggregators, long ouputTime) {
     TimeColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder();
     // Use start time of current time range as time column
-    timeColumnBuilder.writeLong(timeRangeIterator.currentOutputTime());
+    timeColumnBuilder.writeLong(ouputTime);
     ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
     int columnIndex = 0;
     for (Aggregator aggregator : aggregators) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/IWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/IWindow.java
new file mode 100644
index 0000000..21c4b70
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/IWindow.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+public interface IWindow {
+
+  int getControlColumnIndex();
+
+  boolean satisfy(Column column);
+
+  boolean isTimeWindow();
+
+  void acceptOnePoint();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/IWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/IWindowManager.java
new file mode 100644
index 0000000..67cbb4a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/IWindowManager.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+public interface IWindowManager {
+
+  boolean isCurWindowReady();
+
+  void initCurWindow(TsBlock tsBlock);
+
+  boolean hasNext();
+
+  void next();
+
+  long currentOutputTime();
+
+  IWindow getCurWindow();
+
+  TsBlock skipOutOfWindowPoints(TsBlock inputTsBlock);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
index d959688..a0023e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
@@ -21,14 +21,14 @@
 
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
+import org.apache.iotdb.db.mpp.execution.operator.IWindow;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.util.List;
 
-import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateAggregationFromRawData;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.isAllAggregatorsHasFinalResult;
 
 /**
  * RawDataAggregationOperator is used to process raw data tsBlock input calculating using value
@@ -69,16 +69,54 @@
       }
     }
 
-    // update result using aggregators
-    updateResultTsBlock();
-
     return true;
   }
 
   private boolean calcFromRawData() {
-    Pair<Boolean, TsBlock> calcResult =
-        calculateAggregationFromRawData(inputTsBlock, aggregators, curTimeRange, ascending);
-    inputTsBlock = calcResult.getRight();
-    return calcResult.getLeft();
+    if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+      return false;
+    }
+
+    if (!windowManager.isCurWindowReady()) {
+      windowManager.initCurWindow(inputTsBlock);
+      IWindow curWindow = windowManager.getCurWindow();
+      for (Aggregator aggregator : aggregators) {
+        aggregator.updateWindow(curWindow);
+      }
+    }
+
+    inputTsBlock = windowManager.skipOutOfWindowPoints(inputTsBlock);
+
+    int lastReadRowIndex = 0;
+    for (Aggregator aggregator : aggregators) {
+      // current agg method has been calculated
+      if (aggregator.hasFinalResult()) {
+        continue;
+      }
+
+      lastReadRowIndex = Math.max(lastReadRowIndex, aggregator.processTsBlock(inputTsBlock));
+    }
+    if (lastReadRowIndex >= inputTsBlock.getPositionCount()) {
+      inputTsBlock = null;
+      if (isAllAggregatorsHasFinalResult(aggregators)) {
+        updateResultTsBlock();
+        return true;
+      }
+      return false;
+    } else {
+      inputTsBlock = inputTsBlock.subTsBlock(lastReadRowIndex);
+      updateResultTsBlock();
+      return true;
+    }
+  }
+
+  @Override
+  protected void updateResultTsBlock() {
+    appendAggregationResult(resultTsBlockBuilder, aggregators, windowManager.currentOutputTime());
+    if (windowManager.hasNext()) {
+      windowManager.next();
+    } else {
+      finish = true;
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
index 8632041..d9c7592 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
@@ -21,6 +21,7 @@
 
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
+import org.apache.iotdb.db.mpp.execution.operator.IWindowManager;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -50,6 +51,8 @@
   // current interval of aggregation window [curStartTime, curEndTime)
   protected TimeRange curTimeRange;
 
+  protected IWindowManager windowManager;
+
   protected final List<Aggregator> aggregators;
 
   // using for building result tsBlock
@@ -91,9 +94,14 @@
     return child.isBlocked();
   }
 
+  protected boolean finish;
+
   @Override
   public boolean hasNext() {
-    return curTimeRange != null || timeRangeIterator.hasNextTimeRange();
+    if (finish) {
+      return false;
+    }
+    return inputTsBlock != null || child.hasNext();
   }
 
   @Override
@@ -105,18 +113,17 @@
     // reset operator state
     canCallNext = true;
 
-    while (System.nanoTime() - start < maxRuntime
-        && (curTimeRange != null || timeRangeIterator.hasNextTimeRange())
-        && !resultTsBlockBuilder.isFull()) {
-      if (curTimeRange == null && timeRangeIterator.hasNextTimeRange()) {
-        // move to next time window
-        curTimeRange = timeRangeIterator.nextTimeRange();
+    while (System.nanoTime() - start < maxRuntime && hasNext() && !resultTsBlockBuilder.isFull()) {
 
-        // clear previous aggregation result
-        for (Aggregator aggregator : aggregators) {
-          aggregator.updateTimeRange(curTimeRange);
-        }
-      }
+      //      if (curTimeRange == null && timeRangeIterator.hasNextTimeRange()) {
+      //        // move to next time window
+      //        curTimeRange = timeRangeIterator.nextTimeRange();
+      //
+      //        // clear previous aggregation result
+      //        for (Aggregator aggregator : aggregators) {
+      //          aggregator.updateTimeRange(curTimeRange);
+      //        }
+      //      }
 
       // calculate aggregation result on current time window
       if (!calculateNextAggregationResult()) {
@@ -147,7 +154,8 @@
 
   protected void updateResultTsBlock() {
     curTimeRange = null;
-    appendAggregationResult(resultTsBlockBuilder, aggregators, timeRangeIterator);
+    appendAggregationResult(
+        resultTsBlockBuilder, aggregators, timeRangeIterator.currentOutputTime());
   }
 
   @Override