Pipe: Two stage aggregate framework & count-point-processor plugin (#12328)

diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
index bc33c6d..15578f2 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
@@ -29,6 +29,7 @@
 import org.apache.iotdb.db.pipe.processor.aggregate.window.processor.TumblingWindowingProcessor;
 import org.apache.iotdb.db.pipe.processor.downsampling.sdt.SwingingDoorTrendingSamplingProcessor;
 import org.apache.iotdb.db.pipe.processor.downsampling.tumbling.TumblingTimeSamplingProcessor;
+import org.apache.iotdb.db.pipe.processor.twostage.plugin.TwoStageCountProcessor;
 
 class PipeDataRegionProcessorConstructor extends PipeProcessorConstructor {
 
@@ -57,5 +58,7 @@
     pluginConstructors.put(
         BuiltinPipePlugin.TUMBLING_WINDOWING_PROCESSOR.getPipePluginName(),
         TumblingWindowingProcessor::new);
+    pluginConstructors.put(
+        BuiltinPipePlugin.COUNT_POINT_PROCESSOR.getPipePluginName(), TwoStageCountProcessor::new);
   }
 }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeDataNodeReceiverAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeDataNodeReceiverAgent.java
index 555696d..31d38d0 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeDataNodeReceiverAgent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeDataNodeReceiverAgent.java
@@ -25,17 +25,12 @@
 import org.apache.iotdb.db.pipe.receiver.protocol.legacy.IoTDBLegacyPipeReceiverAgent;
 import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiverAgent;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.File;
 import java.util.Arrays;
 
 /** {@link PipeDataNodeReceiverAgent} is the entry point of all pipe receivers' logic. */
 public class PipeDataNodeReceiverAgent {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(PipeDataNodeReceiverAgent.class);
-
   private final IoTDBDataNodeReceiverAgent thriftAgent;
   private final IoTDBAirGapReceiverAgent airGapAgent;
   private final IoTDBLegacyPipeReceiverAgent legacyAgent;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 8c806fe..b8ace00 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -411,4 +411,13 @@
     // Set pipe meta status to STOPPED
     pipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
   }
+
+  ///////////////////////// Utils /////////////////////////
+
+  public Set<Integer> getPipeTaskRegionIdSet(String pipeName, long creationTime) {
+    final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+    return pipeMeta == null || pipeMeta.getStaticMeta().getCreationTime() != creationTime
+        ? Collections.emptySet()
+        : pipeMeta.getRuntimeMeta().getConsensusGroupId2TaskMetaMap().keySet();
+  }
 }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index c7821e4..798cb1d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -251,6 +251,11 @@
     }
   }
 
+  public long count() {
+    final Tablet covertedTablet = convertToTablet();
+    return (long) covertedTablet.rowSize * covertedTablet.getSchemas().size();
+  }
+
   /////////////////////////// parsePatternOrTime ///////////////////////////
 
   @Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index dccfd0e..f227475 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -225,6 +225,11 @@
     return dataContainer.convertToTablet();
   }
 
+  public long count() {
+    final Tablet covertedTablet = shouldParseTimeOrPattern() ? convertToTablet() : tablet;
+    return (long) covertedTablet.rowSize * covertedTablet.getSchemas().size();
+  }
+
   /////////////////////////// parsePatternOrTime ///////////////////////////
 
   public PipeRawTabletInsertionEvent parseEventWithPatternOrTime() {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 8e3fa5b..c609eb9 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -24,6 +24,7 @@
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
 import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
@@ -323,6 +324,30 @@
     }
   }
 
+  public long count(boolean skipReportOnCommit) throws IOException {
+    long count = 0;
+
+    if (shouldParseTime()) {
+      try {
+        for (final TabletInsertionEvent event : toTabletInsertionEvents()) {
+          final PipeRawTabletInsertionEvent rawEvent = ((PipeRawTabletInsertionEvent) event);
+          count += rawEvent.count();
+          if (skipReportOnCommit) {
+            rawEvent.skipReportOnCommit();
+          }
+        }
+        return count;
+      } finally {
+        close();
+      }
+    }
+
+    try (final TsFileInsertionPointCounter counter =
+        new TsFileInsertionPointCounter(tsFile, pipePattern)) {
+      return counter.count();
+    }
+  }
+
   /** Release the resource of {@link TsFileInsertionDataContainer}. */
   @Override
   public void close() {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java
new file mode 100644
index 0000000..086b03c
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tsfile;
+
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
+
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.PlainDeviceID;
+import org.apache.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public class TsFileInsertionPointCounter implements AutoCloseable {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TsFileInsertionPointCounter.class);
+
+  private final PipePattern pattern;
+
+  private final TsFileSequenceReader tsFileSequenceReader;
+
+  final Map<IDeviceID, Set<String>> filteredDeviceMeasurementMap;
+  final Map<IDeviceID, List<TimeseriesMetadata>> allDeviceTimeseriesMetadataMap;
+
+  private boolean shouldParsePattern = false;
+
+  private long count = 0;
+
+  public TsFileInsertionPointCounter(File tsFile, PipePattern pattern) throws IOException {
+    this.pattern = pattern;
+
+    try {
+      tsFileSequenceReader = new TsFileSequenceReader(tsFile.getPath(), true, true);
+
+      filteredDeviceMeasurementMap = filterDeviceMeasurementsMapByPattern();
+      allDeviceTimeseriesMetadataMap = tsFileSequenceReader.getAllTimeseriesMetadata(false);
+
+      if (shouldParsePattern) {
+        countMatchedTimeseriesPoints();
+      } else {
+        countAllTimeseriesPoints();
+      }
+
+      // No longer need this. Help GC.
+      tsFileSequenceReader.clearCachedDeviceMetadata();
+    } catch (Exception e) {
+      close();
+      throw e;
+    }
+  }
+
+  private Map<IDeviceID, Set<String>> filterDeviceMeasurementsMapByPattern() throws IOException {
+    final Map<IDeviceID, List<String>> originalDeviceMeasurementsMap =
+        tsFileSequenceReader.getDeviceMeasurementsMap();
+    final Map<IDeviceID, Set<String>> filteredDeviceMeasurementsMap = new HashMap<>();
+
+    for (Map.Entry<IDeviceID, List<String>> entry : originalDeviceMeasurementsMap.entrySet()) {
+      final String deviceId = ((PlainDeviceID) entry.getKey()).toStringID();
+
+      // case 1: for example, pattern is root.a.b or pattern is null and device is root.a.b.c
+      // in this case, all data can be matched without checking the measurements
+      if (Objects.isNull(pattern) || pattern.isRoot() || pattern.coversDevice(deviceId)) {
+        if (!entry.getValue().isEmpty()) {
+          filteredDeviceMeasurementsMap.put(
+              new PlainDeviceID(deviceId), new HashSet<>(entry.getValue()));
+        }
+      }
+
+      // case 2: for example, pattern is root.a.b.c and device is root.a.b
+      // in this case, we need to check the full path
+      else if (pattern.mayOverlapWithDevice(deviceId)) {
+        final Set<String> filteredMeasurements = new HashSet<>();
+
+        for (final String measurement : entry.getValue()) {
+          if (pattern.matchesMeasurement(deviceId, measurement)) {
+            filteredMeasurements.add(measurement);
+          } else {
+            // Parse pattern iff there are measurements filtered out
+            shouldParsePattern = true;
+          }
+        }
+
+        if (!filteredMeasurements.isEmpty()) {
+          filteredDeviceMeasurementsMap.put(new PlainDeviceID(deviceId), filteredMeasurements);
+        }
+      }
+
+      // case 3: for example, pattern is root.a.b.c and device is root.a.b.d
+      // in this case, no data can be matched
+      else {
+        // Parse pattern iff there are measurements filtered out
+        shouldParsePattern = true;
+      }
+    }
+
+    return filteredDeviceMeasurementsMap;
+  }
+
+  private void countMatchedTimeseriesPoints() {
+    for (final Map.Entry<IDeviceID, List<TimeseriesMetadata>> deviceTimeseriesMetadataEntry :
+        allDeviceTimeseriesMetadataMap.entrySet()) {
+      final IDeviceID deviceId = deviceTimeseriesMetadataEntry.getKey();
+      if (!filteredDeviceMeasurementMap.containsKey(deviceId)) {
+        continue;
+      }
+
+      final List<TimeseriesMetadata> allTimeseriesMetadata =
+          deviceTimeseriesMetadataEntry.getValue();
+      final Set<String> filteredMeasurements = filteredDeviceMeasurementMap.get(deviceId);
+      for (final TimeseriesMetadata timeseriesMetadata : allTimeseriesMetadata) {
+        if (!filteredMeasurements.contains(timeseriesMetadata.getMeasurementId())) {
+          continue;
+        }
+
+        count += timeseriesMetadata.getStatistics().getCount();
+      }
+    }
+  }
+
+  private void countAllTimeseriesPoints() {
+    for (final List<TimeseriesMetadata> allTimeseriesMetadata :
+        allDeviceTimeseriesMetadataMap.values()) {
+      for (final TimeseriesMetadata timeseriesMetadata : allTimeseriesMetadata) {
+        count += timeseriesMetadata.getStatistics().getCount();
+      }
+    }
+  }
+
+  public long count() {
+    return count;
+  }
+
+  @Override
+  public void close() {
+    try {
+      if (tsFileSequenceReader != null) {
+        tsFileSequenceReader.close();
+      }
+    } catch (IOException e) {
+      LOGGER.warn("Failed to close TsFileSequenceReader", e);
+    }
+  }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/watermark/PipeWatermarkEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/watermark/PipeWatermarkEvent.java
new file mode 100644
index 0000000..a46d83e
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/watermark/PipeWatermarkEvent.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.watermark;
+
+import org.apache.iotdb.pipe.api.event.Event;
+
+public class PipeWatermarkEvent implements Event {
+
+  private final long watermark;
+
+  public PipeWatermarkEvent(long watermark) {
+    this.watermark = watermark;
+  }
+
+  public long getWatermark() {
+    return watermark;
+  }
+
+  @Override
+  public String toString() {
+    return "PipeWatermarkEvent{" + "watermark=" + watermark + '}';
+  }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionWatermarkInjector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionWatermarkInjector.java
new file mode 100644
index 0000000..5d8bbf6
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionWatermarkInjector.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.extractor.dataregion;
+
+import org.apache.iotdb.db.pipe.event.common.watermark.PipeWatermarkEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataRegionWatermarkInjector {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(DataRegionWatermarkInjector.class);
+
+  public static final long MIN_INJECTION_INTERVAL_IN_MS = 30 * 1000; // 30s
+
+  private final int regionId;
+
+  private final long injectionIntervalInMs;
+  private long nextInjectionTime;
+
+  public DataRegionWatermarkInjector(int regionId, long injectionIntervalInMs) {
+    this.regionId = regionId;
+    this.injectionIntervalInMs =
+        Math.max(injectionIntervalInMs, MIN_INJECTION_INTERVAL_IN_MS)
+            / MIN_INJECTION_INTERVAL_IN_MS
+            * MIN_INJECTION_INTERVAL_IN_MS;
+    this.nextInjectionTime = calculateNextInjectionTime(this.injectionIntervalInMs);
+  }
+
+  public long getInjectionIntervalInMs() {
+    return injectionIntervalInMs;
+  }
+
+  public long getNextInjectionTime() {
+    return nextInjectionTime;
+  }
+
+  public PipeWatermarkEvent inject() {
+    if (System.currentTimeMillis() < nextInjectionTime) {
+      return null;
+    }
+
+    try {
+      final PipeWatermarkEvent watermarkEvent = new PipeWatermarkEvent(nextInjectionTime);
+      nextInjectionTime = calculateNextInjectionTime(injectionIntervalInMs);
+      return watermarkEvent;
+    } finally {
+      LOGGER.info(
+          "Data region {}: Injected watermark event with timestamp: {}",
+          regionId,
+          nextInjectionTime);
+    }
+  }
+
+  private static long calculateNextInjectionTime(long injectionIntervalInMs) {
+    final long currentTime = System.currentTimeMillis();
+    return currentTime / injectionIntervalInMs * injectionIntervalInMs + injectionIntervalInMs;
+  }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index c8b4d2b..b429192 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -66,6 +66,8 @@
 import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_LOG_VALUE;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_WATERMARK_INTERVAL_KEY;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY;
@@ -74,6 +76,7 @@
 import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_MODE_KEY;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_WATERMARK_INTERVAL_KEY;
 
 public class IoTDBDataRegionExtractor extends IoTDBExtractor {
 
@@ -82,6 +85,8 @@
   private PipeHistoricalDataRegionExtractor historicalExtractor;
   private PipeRealtimeDataRegionExtractor realtimeExtractor;
 
+  private DataRegionWatermarkInjector watermarkInjector;
+
   private boolean hasNoExtractionNeed = true;
 
   @Override
@@ -255,6 +260,23 @@
     historicalExtractor.customize(parameters, configuration);
     realtimeExtractor.customize(parameters, configuration);
 
+    // Set watermark injector
+    if (parameters.hasAnyAttributes(
+        EXTRACTOR_WATERMARK_INTERVAL_KEY, SOURCE_WATERMARK_INTERVAL_KEY)) {
+      final long watermarkIntervalInMs =
+          parameters.getLongOrDefault(
+              Arrays.asList(EXTRACTOR_WATERMARK_INTERVAL_KEY, SOURCE_WATERMARK_INTERVAL_KEY),
+              EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE);
+      if (watermarkIntervalInMs > 0) {
+        watermarkInjector = new DataRegionWatermarkInjector(regionId, watermarkIntervalInMs);
+        LOGGER.info(
+            "Pipe {}@{}: Set watermark injector with interval {} ms.",
+            pipeName,
+            regionId,
+            watermarkInjector.getInjectionIntervalInMs());
+      }
+    }
+
     // register metric after generating taskID
     PipeExtractorMetrics.getInstance().register(this);
   }
@@ -348,10 +370,18 @@
       return null;
     }
 
-    Event event =
-        historicalExtractor.hasConsumedAll()
-            ? realtimeExtractor.supply()
-            : historicalExtractor.supply();
+    Event event = null;
+    if (!historicalExtractor.hasConsumedAll()) {
+      event = historicalExtractor.supply();
+    } else {
+      if (Objects.nonNull(watermarkInjector)) {
+        event = watermarkInjector.inject();
+      }
+      if (Objects.isNull(event)) {
+        event = realtimeExtractor.supply();
+      }
+    }
+
     if (Objects.nonNull(event)) {
       if (event instanceof TabletInsertionEvent) {
         PipeExtractorMetrics.getInstance().markTabletEvent(taskID);
@@ -361,6 +391,7 @@
         PipeExtractorMetrics.getInstance().markPipeHeartbeatEvent(taskID);
       }
     }
+
     return event;
   }
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 9356ccb..3d1528f 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -21,6 +21,7 @@
 
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
@@ -426,12 +427,23 @@
   }
 
   private boolean mayTsFileContainUnprocessedData(TsFileResource resource) {
-    return startIndex instanceof TimeWindowStateProgressIndex
-        // The resource is closed thus the TsFileResource#getFileEndTime() is safe to use
-        ? ((TimeWindowStateProgressIndex) startIndex).getMinTime() <= resource.getFileEndTime()
-        // Some different tsFiles may share the same max progressIndex, thus tsFiles with an
-        // "equals" max progressIndex must be transmitted to avoid data loss
-        : !startIndex.isAfter(resource.getMaxProgressIndexAfterClose());
+    if (startIndex instanceof TimeWindowStateProgressIndex) {
+      // The resource is closed thus the TsFileResource#getFileEndTime() is safe to use
+      return ((TimeWindowStateProgressIndex) startIndex).getMinTime() <= resource.getFileEndTime();
+    }
+
+    if (startIndex instanceof StateProgressIndex) {
+      // Some different tsFiles may share the same max progressIndex, thus tsFiles with an
+      // "equals" max progressIndex must be transmitted to avoid data loss
+      final ProgressIndex innerProgressIndex =
+          ((StateProgressIndex) startIndex).getInnerProgressIndex();
+      return !innerProgressIndex.isAfter(resource.getMaxProgressIndexAfterClose())
+          && !innerProgressIndex.equals(resource.getMaxProgressIndexAfterClose());
+    }
+
+    // Some different tsFiles may share the same max progressIndex, thus tsFiles with an
+    // "equals" max progressIndex must be transmitted to avoid data loss
+    return !startIndex.isAfter(resource.getMaxProgressIndexAfterClose());
   }
 
   private boolean isTsFileResourceOverlappedWithTimeRange(TsFileResource resource) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/Combiner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/Combiner.java
new file mode 100644
index 0000000..136e760
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/Combiner.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.combiner;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.processor.twostage.operator.Operator;
+import org.apache.iotdb.db.pipe.processor.twostage.state.State;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class Combiner {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(Combiner.class);
+
+  private static final long MAX_COMBINER_LIVE_TIME_IN_MS =
+      PipeConfig.getInstance().getTwoStageAggregateMaxCombinerLiveTimeInMs();
+  private final long creationTimeInMs;
+
+  private final Operator operator;
+
+  private final ConcurrentMap<Integer, Integer> expectedRegionId2DataNodeIdMap;
+  private final Set<Integer> receivedRegionIdSet;
+
+  private final AtomicBoolean isComplete = new AtomicBoolean(false);
+
+  public Combiner(
+      Operator operator, ConcurrentMap<Integer, Integer> expectedRegionId2DataNodeIdMap) {
+    this.creationTimeInMs = System.currentTimeMillis();
+
+    this.operator = operator;
+
+    this.expectedRegionId2DataNodeIdMap = expectedRegionId2DataNodeIdMap;
+    this.receivedRegionIdSet = new HashSet<>();
+  }
+
+  public TSStatus combine(int regionId, State state) {
+    final Set<Integer> finalExpectedRegionIdSet =
+        new HashSet<>(expectedRegionId2DataNodeIdMap.keySet());
+
+    if (finalExpectedRegionIdSet.isEmpty()) {
+      return RpcUtils.getStatus(
+          TSStatusCode.PIPE_ERROR, "Expected region id set is empty. Sender should try again.");
+    }
+
+    receivedRegionIdSet.add(regionId);
+    operator.combine(state);
+
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug(
+          "Combiner combine: regionId: {}, state: {}, receivedRegionIdSet: {}, expectedRegionIdSet: {}",
+          regionId,
+          state,
+          receivedRegionIdSet,
+          finalExpectedRegionIdSet);
+    }
+
+    if (receivedRegionIdSet.containsAll(finalExpectedRegionIdSet)) {
+      operator.onComplete();
+      isComplete.set(true);
+
+      if (LOGGER.isInfoEnabled()) {
+        LOGGER.info(
+            "Combiner combine completed: regionId: {}, state: {}, receivedRegionIdSet: {}, expectedRegionIdSet: {}",
+            regionId,
+            state,
+            receivedRegionIdSet,
+            finalExpectedRegionIdSet);
+      }
+    }
+
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  public boolean isOutdated() {
+    return System.currentTimeMillis() - creationTimeInMs > MAX_COMBINER_LIVE_TIME_IN_MS;
+  }
+
+  public boolean isComplete() {
+    return isComplete.get();
+  }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandler.java
new file mode 100644
index 0000000..55ef5af
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandler.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.combiner;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.cluster.RegionRoleType;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.FetchCombineResultResponse;
+import org.apache.iotdb.db.pipe.processor.twostage.operator.Operator;
+import org.apache.iotdb.db.pipe.processor.twostage.state.State;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+
+public class PipeCombineHandler {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(PipeCombineHandler.class);
+
+  private final String pipeName;
+  private final long creationTime;
+
+  private final Function<String, Operator> /* <combineId, operator> */ operatorConstructor;
+
+  private static final Map<Integer, Integer> ALL_REGION_ID_2_DATANODE_ID_MAP = new HashMap<>();
+  private static final AtomicLong ALL_REGION_ID_2_DATANODE_ID_MAP_LAST_UPDATE_TIME =
+      new AtomicLong(0);
+  private final ConcurrentMap<Integer, Integer> expectedRegionId2DataNodeIdMap =
+      new ConcurrentHashMap<>();
+
+  private final ConcurrentMap<String, Combiner> combineId2Combiner = new ConcurrentHashMap<>();
+
+  public PipeCombineHandler(
+      String pipeName, long creationTime, Function<String, Operator> operatorConstructor) {
+    this.pipeName = pipeName;
+    this.creationTime = creationTime;
+
+    this.operatorConstructor = operatorConstructor;
+
+    fetchAndUpdateExpectedRegionId2DataNodeIdMap();
+  }
+
+  public synchronized TSStatus combine(int regionId, String combineId, State state) {
+    return combineId2Combiner
+        .computeIfAbsent(
+            combineId,
+            id ->
+                new Combiner(operatorConstructor.apply(combineId), expectedRegionId2DataNodeIdMap))
+        .combine(regionId, state);
+  }
+
+  public synchronized FetchCombineResultResponse fetchCombineResult(List<String> combineIdList)
+      throws IOException {
+    final Map<String, FetchCombineResultResponse.CombineResultType> combineId2ResultType =
+        new HashMap<>();
+    for (String combineId : combineIdList) {
+      final Combiner combiner = combineId2Combiner.get(combineId);
+
+      if (combiner == null || combiner.isOutdated()) {
+        combineId2ResultType.put(combineId, FetchCombineResultResponse.CombineResultType.OUTDATED);
+        continue;
+      }
+
+      combineId2ResultType.put(
+          combineId,
+          combiner.isComplete()
+              ? FetchCombineResultResponse.CombineResultType.SUCCESS
+              : FetchCombineResultResponse.CombineResultType.INCOMPLETE);
+    }
+
+    return FetchCombineResultResponse.toTPipeTransferResp(combineId2ResultType);
+  }
+
+  public void fetchAndUpdateExpectedRegionId2DataNodeIdMap() {
+    updateExpectedRegionId2DataNodeIdMap(fetchExpectedRegionId2DataNodeIdMap());
+  }
+
+  private Map<Integer, Integer> fetchExpectedRegionId2DataNodeIdMap() {
+    synchronized (ALL_REGION_ID_2_DATANODE_ID_MAP) {
+      if (System.currentTimeMillis() - ALL_REGION_ID_2_DATANODE_ID_MAP_LAST_UPDATE_TIME.get()
+          > PipeConfig.getInstance().getTwoStageAggregateDataRegionInfoCacheTimeInMs()) {
+        ALL_REGION_ID_2_DATANODE_ID_MAP.clear();
+
+        try (final ConfigNodeClient configNodeClient =
+            ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+          final TShowRegionResp showRegionResp =
+              configNodeClient.showRegion(
+                  new TShowRegionReq().setConsensusGroupType(TConsensusGroupType.DataRegion));
+          if (showRegionResp == null || !showRegionResp.isSetRegionInfoList()) {
+            throw new PipeException("Failed to fetch data region ids");
+          }
+          for (final TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
+            if (!RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) {
+              continue;
+            }
+            ALL_REGION_ID_2_DATANODE_ID_MAP.put(
+                regionInfo.getConsensusGroupId().getId(), regionInfo.getDataNodeId());
+          }
+        } catch (ClientManagerException | TException e) {
+          throw new PipeException("Failed to fetch data nodes", e);
+        }
+
+        ALL_REGION_ID_2_DATANODE_ID_MAP_LAST_UPDATE_TIME.set(System.currentTimeMillis());
+
+        LOGGER.info(
+            "Fetched data region ids {} at {}",
+            ALL_REGION_ID_2_DATANODE_ID_MAP,
+            ALL_REGION_ID_2_DATANODE_ID_MAP_LAST_UPDATE_TIME.get());
+      }
+
+      final Set<Integer> pipeRelatedRegionIdSet =
+          new HashSet<>(PipeAgent.task().getPipeTaskRegionIdSet(pipeName, creationTime));
+      pipeRelatedRegionIdSet.removeIf(
+          regionId -> !ALL_REGION_ID_2_DATANODE_ID_MAP.containsKey(regionId));
+      if (LOGGER.isInfoEnabled()) {
+        LOGGER.info(
+            "Two stage aggregate pipe (pipeName={}, creationTime={}) related region ids {}",
+            pipeName,
+            creationTime,
+            pipeRelatedRegionIdSet);
+      }
+      return ALL_REGION_ID_2_DATANODE_ID_MAP.entrySet().stream()
+          .filter(entry -> pipeRelatedRegionIdSet.contains(entry.getKey()))
+          .collect(
+              HashMap::new,
+              (map, entry) -> map.put(entry.getKey(), entry.getValue()),
+              HashMap::putAll);
+    }
+  }
+
+  private synchronized void updateExpectedRegionId2DataNodeIdMap(
+      Map<Integer, Integer> newExpectedRegionId2DataNodeIdMap) {
+    expectedRegionId2DataNodeIdMap.clear();
+    expectedRegionId2DataNodeIdMap.putAll(newExpectedRegionId2DataNodeIdMap);
+  }
+
+  public synchronized Set<Integer> getExpectedDataNodeIdSet() {
+    return new HashSet<>(expectedRegionId2DataNodeIdMap.values());
+  }
+
+  public synchronized void cleanOutdatedCombiner() {
+    combineId2Combiner
+        .entrySet()
+        .removeIf(
+            entry -> {
+              if (!entry.getValue().isComplete()) {
+                LOGGER.info(
+                    "Clean outdated incomplete combiner: pipeName={}, creationTime={}, combineId={}",
+                    pipeName,
+                    creationTime,
+                    entry.getKey());
+              }
+              return entry.getValue().isOutdated();
+            });
+  }
+
+  public synchronized void close() {
+    combineId2Combiner.clear();
+  }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandlerManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandlerManager.java
new file mode 100644
index 0000000..c08cd23
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandlerManager.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.combiner;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.CombineRequest;
+import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.FetchCombineResultRequest;
+import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.FetchCombineResultResponse;
+import org.apache.iotdb.db.pipe.processor.twostage.operator.Operator;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+
+public class PipeCombineHandlerManager {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(PipeCombineHandlerManager.class);
+
+  private final ConcurrentMap<String, PipeCombineHandler> pipeId2CombineHandler =
+      new ConcurrentHashMap<>();
+  private final ConcurrentMap<String, AtomicInteger> pipeId2ReferenceCount =
+      new ConcurrentHashMap<>();
+  private final ConcurrentMap<String, Object> pipeId2LastCombinedValue = new ConcurrentHashMap<>();
+
+  public synchronized void register(
+      String pipeName, long creationTime, Function<String, Operator> operatorConstructor) {
+    final String pipeId = generatePipeId(pipeName, creationTime);
+
+    pipeId2CombineHandler.putIfAbsent(
+        pipeId, new PipeCombineHandler(pipeName, creationTime, operatorConstructor));
+
+    pipeId2ReferenceCount.putIfAbsent(pipeId, new AtomicInteger(0));
+    pipeId2ReferenceCount.get(pipeId).incrementAndGet();
+  }
+
+  public synchronized void deregister(String pipeName, long creationTime) {
+    final String pipeId = generatePipeId(pipeName, creationTime);
+
+    if (pipeId2ReferenceCount.containsKey(pipeId)
+        && pipeId2ReferenceCount.get(pipeId).decrementAndGet() <= 0) {
+      pipeId2LastCombinedValue.remove(pipeId);
+
+      pipeId2ReferenceCount.remove(pipeId);
+
+      try {
+        pipeId2CombineHandler.remove(pipeId).close();
+      } catch (Exception e) {
+        LOGGER.warn("Error occurred when closing CombineHandler(id = {})", pipeId, e);
+      }
+    }
+  }
+
+  public Object getLastCombinedValue(String pipeName, long creationTime) {
+    return pipeId2LastCombinedValue.get(generatePipeId(pipeName, creationTime));
+  }
+
+  public void updateLastCombinedValue(
+      String pipeName, long creationTime, Object lastCombinedValue) {
+    pipeId2LastCombinedValue.put(generatePipeId(pipeName, creationTime), lastCombinedValue);
+  }
+
+  public synchronized Set<Integer> getExpectedDataNodeIdSet(String pipeName, long creationTime) {
+    final PipeCombineHandler handler =
+        pipeId2CombineHandler.get(generatePipeId(pipeName, creationTime));
+    return Objects.isNull(handler) ? Collections.emptySet() : handler.getExpectedDataNodeIdSet();
+  }
+
+  public TPipeTransferResp handle(CombineRequest combineRequest) {
+    final String pipeId =
+        generatePipeId(combineRequest.getPipeName(), combineRequest.getCreationTime());
+
+    final PipeCombineHandler handler = pipeId2CombineHandler.get(pipeId);
+    if (Objects.isNull(handler)) {
+      throw new PipeException("CombineHandler not found for pipeId = " + pipeId);
+    }
+
+    return new TPipeTransferResp()
+        .setStatus(
+            handler.combine(
+                combineRequest.getRegionId(),
+                combineRequest.getCombineId(),
+                combineRequest.getState()));
+  }
+
+  public FetchCombineResultResponse handle(FetchCombineResultRequest fetchCombineResultRequest)
+      throws IOException {
+    final String pipeId =
+        generatePipeId(
+            fetchCombineResultRequest.getPipeName(), fetchCombineResultRequest.getCreationTime());
+
+    final PipeCombineHandler handler = pipeId2CombineHandler.get(pipeId);
+    if (Objects.isNull(handler)) {
+      throw new PipeException("CombineHandler not found for pipeId = " + pipeId);
+    }
+
+    return handler.fetchCombineResult(fetchCombineResultRequest.getCombineIdList());
+  }
+
+  public void fetchExpectedRegionIdSetAndCleanOutdatedCombiner() {
+    final Map<String, PipeCombineHandler> pipeId2CombineHandlerSnapshot;
+    synchronized (this) {
+      pipeId2CombineHandlerSnapshot = new HashMap<>(pipeId2CombineHandler);
+    }
+
+    pipeId2CombineHandlerSnapshot.forEach(
+        (pipeId, handler) -> {
+          handler.fetchAndUpdateExpectedRegionId2DataNodeIdMap();
+          handler.cleanOutdatedCombiner();
+        });
+  }
+
+  private static String generatePipeId(String pipeName, long creationTime) {
+    return pipeName + "-" + creationTime;
+  }
+
+  /////////////////////////////// Singleton ///////////////////////////////
+
+  private PipeCombineHandlerManager() {
+    PipeAgent.runtime()
+        .registerPeriodicalJob(
+            "CombineHandlerManager#fetchExpectedRegionIdSetAndCleanOutdatedCombiner",
+            this::fetchExpectedRegionIdSetAndCleanOutdatedCombiner,
+            PipeConfig.getInstance().getTwoStageAggregateDataRegionInfoCacheTimeInMs() / 1000 / 2);
+  }
+
+  private static class CombineHandlerManagerHolder {
+    private static final PipeCombineHandlerManager INSTANCE = new PipeCombineHandlerManager();
+  }
+
+  public static PipeCombineHandlerManager getInstance() {
+    return CombineHandlerManagerHolder.INSTANCE;
+  }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/CombineRequest.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/CombineRequest.java
new file mode 100644
index 0000000..cb1ba0b
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/CombineRequest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.exchange.payload;
+
+import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.db.pipe.processor.twostage.state.State;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class CombineRequest extends TPipeTransferReq {
+
+  private String pipeName;
+  private long creationTime;
+  private int regionId;
+  private String combineId;
+
+  private State state;
+
+  private CombineRequest() {
+    // Empty constructor
+  }
+
+  public String getPipeName() {
+    return pipeName;
+  }
+
+  public long getCreationTime() {
+    return creationTime;
+  }
+
+  public int getRegionId() {
+    return regionId;
+  }
+
+  public String getCombineId() {
+    return combineId;
+  }
+
+  public State getState() {
+    return state;
+  }
+
+  public static CombineRequest toTPipeTransferReq(
+      String pipeName, long creationTime, int regionId, String combineId, State state)
+      throws IOException {
+    return new CombineRequest()
+        .convertToTPipeTransferReq(pipeName, creationTime, regionId, combineId, state);
+  }
+
+  public static CombineRequest fromTPipeTransferReq(TPipeTransferReq transferReq) throws Exception {
+    return new CombineRequest().translateFromTPipeTransferReq(transferReq);
+  }
+
+  private CombineRequest convertToTPipeTransferReq(
+      String pipeName, long creationTime, int regionId, String combineId, State state)
+      throws IOException {
+    this.pipeName = pipeName;
+    this.creationTime = creationTime;
+    this.regionId = regionId;
+    this.state = state;
+    this.combineId = combineId;
+
+    this.version = IoTDBConnectorRequestVersion.VERSION_2.getVersion();
+    this.type = RequestType.COMBINE.getType();
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(pipeName, outputStream);
+      ReadWriteIOUtils.write(creationTime, outputStream);
+      ReadWriteIOUtils.write(regionId, outputStream);
+      ReadWriteIOUtils.write(combineId, outputStream);
+
+      ReadWriteIOUtils.write(state.getClass().getName(), outputStream);
+      state.serialize(outputStream);
+
+      this.body = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+    }
+
+    return this;
+  }
+
+  private CombineRequest translateFromTPipeTransferReq(TPipeTransferReq transferReq)
+      throws Exception {
+    pipeName = ReadWriteIOUtils.readString(transferReq.body);
+    creationTime = ReadWriteIOUtils.readLong(transferReq.body);
+    regionId = ReadWriteIOUtils.readInt(transferReq.body);
+    combineId = ReadWriteIOUtils.readString(transferReq.body);
+
+    final String stateClassName = ReadWriteIOUtils.readString(transferReq.body);
+    state = (State) Class.forName(stateClassName).newInstance();
+    state.deserialize(transferReq.body);
+
+    version = transferReq.version;
+    type = transferReq.type;
+    body = transferReq.body;
+
+    return this;
+  }
+
+  @Override
+  public String toString() {
+    return "CombineRequest{"
+        + "pipeName='"
+        + pipeName
+        + '\''
+        + ", creationTime="
+        + creationTime
+        + ", regionId="
+        + regionId
+        + ", combineId='"
+        + combineId
+        + '\''
+        + ", state="
+        + state
+        + '}';
+  }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/FetchCombineResultRequest.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/FetchCombineResultRequest.java
new file mode 100644
index 0000000..b20904a
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/FetchCombineResultRequest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.exchange.payload;
+
+import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class FetchCombineResultRequest extends TPipeTransferReq {
+
+  private String pipeName;
+  private long creationTime;
+  private List<String> combineIdList;
+
+  private FetchCombineResultRequest() {
+    // Empty constructor
+  }
+
+  public String getPipeName() {
+    return pipeName;
+  }
+
+  public long getCreationTime() {
+    return creationTime;
+  }
+
+  public List<String> getCombineIdList() {
+    return combineIdList;
+  }
+
+  public static FetchCombineResultRequest toTPipeTransferReq(
+      String pipeName, long creationTime, List<String> combineIdList) throws IOException {
+    return new FetchCombineResultRequest()
+        .convertToTPipeTransferReq(pipeName, creationTime, combineIdList);
+  }
+
+  public static FetchCombineResultRequest fromTPipeTransferReq(TPipeTransferReq transferReq)
+      throws Exception {
+    return new FetchCombineResultRequest().translateFromTPipeTransferReq(transferReq);
+  }
+
+  private FetchCombineResultRequest convertToTPipeTransferReq(
+      String pipeName, long creationTime, List<String> combineIdList) throws IOException {
+    this.pipeName = pipeName;
+    this.creationTime = creationTime;
+    this.combineIdList = combineIdList;
+
+    this.version = IoTDBConnectorRequestVersion.VERSION_2.getVersion();
+    this.type = RequestType.FETCH_COMBINE_RESULT.getType();
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(pipeName, outputStream);
+      ReadWriteIOUtils.write(creationTime, outputStream);
+
+      ReadWriteIOUtils.write(combineIdList.size(), outputStream);
+      for (String combineId : combineIdList) {
+        ReadWriteIOUtils.write(combineId, outputStream);
+      }
+
+      this.body = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+    }
+
+    return this;
+  }
+
+  private FetchCombineResultRequest translateFromTPipeTransferReq(TPipeTransferReq transferReq) {
+    pipeName = ReadWriteIOUtils.readString(transferReq.body);
+    creationTime = ReadWriteIOUtils.readLong(transferReq.body);
+    combineIdList = new ArrayList<>();
+    final int combineIdListSize = ReadWriteIOUtils.readInt(transferReq.body);
+    for (int i = 0; i < combineIdListSize; i++) {
+      combineIdList.add(ReadWriteIOUtils.readString(transferReq.body));
+    }
+
+    version = transferReq.version;
+    type = transferReq.type;
+    body = transferReq.body;
+
+    return this;
+  }
+
+  @Override
+  public String toString() {
+    return "FetchCombineResultRequest{"
+        + "pipeName='"
+        + pipeName
+        + '\''
+        + ", creationTime="
+        + creationTime
+        + ", combineIdList="
+        + combineIdList
+        + '}';
+  }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/FetchCombineResultResponse.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/FetchCombineResultResponse.java
new file mode 100644
index 0000000..2884eb5
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/FetchCombineResultResponse.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.exchange.payload;
+
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+public class FetchCombineResultResponse extends TPipeTransferResp {
+
+  public enum CombineResultType {
+    SUCCESS,
+    INCOMPLETE,
+    OUTDATED,
+  }
+
+  private Map<String, CombineResultType> combineId2ResultType = new HashMap<>();
+
+  private FetchCombineResultResponse() {
+    // Empty constructor
+  }
+
+  public Map<String, CombineResultType> getCombineId2ResultType() {
+    return combineId2ResultType;
+  }
+
+  public static FetchCombineResultResponse toTPipeTransferResp(
+      Map<String, CombineResultType> combineId2ResultType) throws IOException {
+    final FetchCombineResultResponse response = new FetchCombineResultResponse();
+
+    response.combineId2ResultType = combineId2ResultType;
+
+    try (PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(combineId2ResultType.size(), outputStream);
+      for (Map.Entry<String, CombineResultType> entry : combineId2ResultType.entrySet()) {
+        ReadWriteIOUtils.write(entry.getKey(), outputStream);
+        ReadWriteIOUtils.write(entry.getValue().ordinal(), outputStream);
+      }
+
+      response.body =
+          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+    }
+
+    response.status = RpcUtils.SUCCESS_STATUS;
+
+    return response;
+  }
+
+  public static FetchCombineResultResponse fromTPipeTransferResp(TPipeTransferResp transferResp) {
+    final FetchCombineResultResponse response = new FetchCombineResultResponse();
+
+    response.status = transferResp.status;
+    response.body = transferResp.body;
+
+    response.combineId2ResultType = new HashMap<>();
+    if (response.isSetBody()) {
+      final int size = ReadWriteIOUtils.readInt(transferResp.body);
+      for (int i = 0; i < size; i++) {
+        final String combineId = ReadWriteIOUtils.readString(transferResp.body);
+        final CombineResultType resultType =
+            CombineResultType.values()[ReadWriteIOUtils.readInt(transferResp.body)];
+        response.combineId2ResultType.put(combineId, resultType);
+      }
+    }
+
+    return response;
+  }
+
+  @Override
+  public String toString() {
+    return "FetchCombineResultResponse{" + "combineId2ResultType=" + combineId2ResultType + '}';
+  }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/RequestType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/RequestType.java
new file mode 100644
index 0000000..99f2b16
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/RequestType.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.exchange.payload;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public enum RequestType {
+  COMBINE((short) 0),
+  FETCH_COMBINE_RESULT((short) 1),
+  ;
+
+  private final short type;
+
+  RequestType(short type) {
+    this.type = type;
+  }
+
+  public short getType() {
+    return type;
+  }
+
+  private static final Map<Short, RequestType> TYPE_MAP =
+      Arrays.stream(RequestType.values())
+          .collect(
+              HashMap::new,
+              (typeMap, pipeRequestType) -> typeMap.put(pipeRequestType.getType(), pipeRequestType),
+              HashMap::putAll);
+
+  public static boolean isValidatedRequestType(short type) {
+    return TYPE_MAP.containsKey(type);
+  }
+
+  public static RequestType valueOf(short type) {
+    return TYPE_MAP.get(type);
+  }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/receiver/TwoStageAggregateReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/receiver/TwoStageAggregateReceiver.java
new file mode 100644
index 0000000..1e029d7
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/receiver/TwoStageAggregateReceiver.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.exchange.receiver;
+
+import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiver;
+import org.apache.iotdb.db.pipe.processor.twostage.combiner.PipeCombineHandlerManager;
+import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.CombineRequest;
+import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.FetchCombineResultRequest;
+import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.RequestType;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TwoStageAggregateReceiver implements IoTDBReceiver {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TwoStageAggregateReceiver.class);
+
+  @Override
+  public IoTDBConnectorRequestVersion getVersion() {
+    return IoTDBConnectorRequestVersion.VERSION_2;
+  }
+
+  @Override
+  public TPipeTransferResp receive(TPipeTransferReq req) {
+    try {
+      final short rawRequestType = req.getType();
+      if (RequestType.isValidatedRequestType(rawRequestType)) {
+        switch (RequestType.valueOf(rawRequestType)) {
+          case COMBINE:
+            return PipeCombineHandlerManager.getInstance()
+                .handle(CombineRequest.fromTPipeTransferReq(req));
+          case FETCH_COMBINE_RESULT:
+            return PipeCombineHandlerManager.getInstance()
+                .handle(FetchCombineResultRequest.fromTPipeTransferReq(req));
+          default:
+            break;
+        }
+      }
+
+      LOGGER.warn("Unknown request type {}: {}.", rawRequestType, req);
+      return new TPipeTransferResp(
+          RpcUtils.getStatus(
+              TSStatusCode.PIPE_TYPE_ERROR,
+              String.format("Unknown request type %s.", rawRequestType)));
+    } catch (Exception e) {
+      LOGGER.warn("Error occurs when receiving request: {}.", req, e);
+      return new TPipeTransferResp(
+          RpcUtils.getStatus(
+              TSStatusCode.PIPE_ERROR,
+              String.format("Error occurs when receiving request: %s.", e.getMessage())));
+    }
+  }
+
+  @Override
+  public void handleExit() {
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Two stage aggregate receiver is exiting.");
+    }
+  }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
new file mode 100644
index 0000000..90c6a2f
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.exchange.sender;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
+import org.apache.iotdb.db.pipe.processor.twostage.combiner.PipeCombineHandlerManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class TwoStageAggregateSender implements AutoCloseable {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TwoStageAggregateSender.class);
+
+  private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();
+
+  private final String pipeName;
+  private final long creationTime;
+
+  private static final AtomicLong DATANODE_ID_2_END_POINTS_LAST_UPDATE_TIME = new AtomicLong(0);
+  private static final AtomicReference<Map<Integer, TEndPoint>> DATANODE_ID_2_END_POINTS =
+      new AtomicReference<>();
+
+  private TEndPoint[] endPoints;
+  private final Map<TEndPoint, IoTDBSyncClient> endPointIoTDBSyncClientMap =
+      new ConcurrentHashMap<>();
+
+  public TwoStageAggregateSender(String pipeName, long creationTime) {
+    this.pipeName = pipeName;
+    this.creationTime = creationTime;
+  }
+
+  public synchronized TPipeTransferResp request(long watermark, TPipeTransferReq req)
+      throws TException {
+    final boolean endPointsChanged = tryFetchEndPointsIfNecessary();
+    tryConstructClients(endPointsChanged);
+
+    final TEndPoint endPoint = endPoints[(int) watermark % endPoints.length];
+    IoTDBSyncClient client = endPointIoTDBSyncClientMap.get(endPoint);
+    if (client == null) {
+      client = reconstructIoTDBSyncClient(endPoint);
+    }
+
+    LOGGER.info("Sending request {} (watermark = {}) to {}", req, watermark, endPoint);
+
+    try {
+      return client.pipeTransfer(req);
+    } catch (Exception e) {
+      LOGGER.warn("Failed to send request {} (watermark = {}) to {}", req, watermark, endPoint, e);
+      try {
+        reconstructIoTDBSyncClient(endPoint);
+      } catch (Exception ex) {
+        LOGGER.warn(
+            "Failed to reconstruct IoTDBSyncClient {} after failure to send request {} (watermark = {})",
+            endPoint,
+            req,
+            watermark,
+            ex);
+      }
+      throw e;
+    }
+  }
+
+  private static boolean tryFetchEndPointsIfNecessary() {
+    final long currentTime = System.currentTimeMillis();
+
+    if (DATANODE_ID_2_END_POINTS.get() != null
+        && currentTime - DATANODE_ID_2_END_POINTS_LAST_UPDATE_TIME.get()
+            < PIPE_CONFIG.getTwoStageAggregateSenderEndPointsCacheInMs()) {
+      return false;
+    }
+
+    synchronized (DATANODE_ID_2_END_POINTS) {
+      if (DATANODE_ID_2_END_POINTS.get() != null
+          && currentTime - DATANODE_ID_2_END_POINTS_LAST_UPDATE_TIME.get()
+              < PIPE_CONFIG.getTwoStageAggregateSenderEndPointsCacheInMs()) {
+        return false;
+      }
+
+      final Map<Integer, TEndPoint> dataNodeId2EndPointMap = new HashMap<>();
+      try (final ConfigNodeClient configNodeClient =
+          ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+        final TShowDataNodesResp showDataNodesResp = configNodeClient.showDataNodes();
+        if (showDataNodesResp == null || showDataNodesResp.getDataNodesInfoList() == null) {
+          throw new PipeException("Failed to fetch data nodes");
+        }
+        for (final TDataNodeInfo dataNodeInfo : showDataNodesResp.getDataNodesInfoList()) {
+          dataNodeId2EndPointMap.put(
+              dataNodeInfo.getDataNodeId(),
+              new TEndPoint(dataNodeInfo.getRpcAddresss(), dataNodeInfo.getRpcPort()));
+        }
+      } catch (ClientManagerException | TException e) {
+        throw new PipeException("Failed to fetch data nodes", e);
+      }
+
+      if (dataNodeId2EndPointMap.isEmpty()) {
+        throw new PipeException("No data nodes' endpoints fetched");
+      }
+
+      DATANODE_ID_2_END_POINTS.set(dataNodeId2EndPointMap);
+      DATANODE_ID_2_END_POINTS_LAST_UPDATE_TIME.set(currentTime);
+    }
+
+    LOGGER.info("Data nodes' endpoints for two-stage aggregation: {}", DATANODE_ID_2_END_POINTS);
+    return true;
+  }
+
+  private void tryConstructClients(boolean endPointsChanged) {
+    if (Objects.nonNull(endPoints) && !endPointsChanged) {
+      return;
+    }
+
+    final Set<Integer> expectedDataNodeIdSet =
+        PipeCombineHandlerManager.getInstance().getExpectedDataNodeIdSet(pipeName, creationTime);
+    if (expectedDataNodeIdSet.isEmpty()) {
+      throw new PipeException("No expected region id set fetched");
+    }
+
+    endPoints =
+        DATANODE_ID_2_END_POINTS.get().entrySet().stream()
+            .filter(entry -> expectedDataNodeIdSet.contains(entry.getKey()))
+            .map(Map.Entry::getValue)
+            .toArray(TEndPoint[]::new);
+    LOGGER.info(
+        "End points for two-stage aggregation pipe (pipeName={}, creationTime={}) were updated to {}",
+        pipeName,
+        creationTime,
+        endPoints);
+
+    for (final TEndPoint endPoint : endPoints) {
+      if (endPointIoTDBSyncClientMap.containsKey(endPoint)) {
+        continue;
+      }
+
+      try {
+        endPointIoTDBSyncClientMap.put(endPoint, constructIoTDBSyncClient(endPoint));
+      } catch (TTransportException e) {
+        LOGGER.warn("Failed to construct IoTDBSyncClient", e);
+      }
+    }
+
+    for (final TEndPoint endPoint : new HashSet<>(endPointIoTDBSyncClientMap.keySet())) {
+      if (!DATANODE_ID_2_END_POINTS.get().containsValue(endPoint)) {
+        try {
+          endPointIoTDBSyncClientMap.remove(endPoint).close();
+        } catch (Exception e) {
+          LOGGER.warn("Failed to close IoTDBSyncClient", e);
+        }
+      }
+    }
+  }
+
+  private IoTDBSyncClient reconstructIoTDBSyncClient(TEndPoint endPoint)
+      throws TTransportException {
+    final IoTDBSyncClient oldClient = endPointIoTDBSyncClientMap.remove(endPoint);
+    if (oldClient != null) {
+      try {
+        oldClient.close();
+      } catch (Exception e) {
+        LOGGER.warn("Failed to close old IoTDBSyncClient", e);
+      }
+    }
+    final IoTDBSyncClient newClient = constructIoTDBSyncClient(endPoint);
+    endPointIoTDBSyncClientMap.put(endPoint, newClient);
+    return newClient;
+  }
+
+  private IoTDBSyncClient constructIoTDBSyncClient(TEndPoint endPoint) throws TTransportException {
+    return new IoTDBSyncClient(
+        new ThriftClientProperty.Builder()
+            .setConnectionTimeoutMs((int) PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
+            .setRpcThriftCompressionEnabled(
+                PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
+            .build(),
+        endPoint.getIp(),
+        endPoint.getPort(),
+        false,
+        null,
+        null);
+  }
+
+  @Override
+  public void close() {
+    for (final IoTDBSyncClient client : endPointIoTDBSyncClientMap.values()) {
+      try {
+        client.close();
+      } catch (Exception e) {
+        LOGGER.warn("Failed to close IoTDBSyncClient", e);
+      }
+    }
+
+    endPointIoTDBSyncClientMap.clear();
+    endPoints = null;
+  }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/CountOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/CountOperator.java
new file mode 100644
index 0000000..a4eae9d
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/CountOperator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.operator;
+
+import org.apache.iotdb.db.pipe.processor.twostage.state.CountState;
+import org.apache.iotdb.db.pipe.processor.twostage.state.State;
+
+import org.apache.tsfile.utils.Pair;
+
+import java.util.Queue;
+
+public class CountOperator implements Operator {
+
+  private final long onCompletionTimestamp;
+  private long globalCount;
+
+  private final Queue<Pair<Long, Long>> globalCountQueue;
+
+  public CountOperator(String combineId, Queue<Pair<Long, Long>> globalCountQueue) {
+    onCompletionTimestamp = Long.parseLong(combineId);
+    globalCount = 0;
+
+    this.globalCountQueue = globalCountQueue;
+  }
+
+  @Override
+  public void combine(State state) {
+    globalCount += ((CountState) state).getCount();
+  }
+
+  @Override
+  public void onComplete() {
+    globalCountQueue.add(new Pair<>(onCompletionTimestamp, globalCount));
+  }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/Operator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/Operator.java
new file mode 100644
index 0000000..885c565
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/Operator.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.operator;
+
+import org.apache.iotdb.db.pipe.processor.twostage.state.State;
+
+public interface Operator {
+
+  void combine(State state);
+
+  void onComplete();
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java
new file mode 100644
index 0000000..cbc14d9
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.plugin;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
+import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.utils.PathUtils;
+import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.watermark.PipeWatermarkEvent;
+import org.apache.iotdb.db.pipe.processor.twostage.combiner.PipeCombineHandlerManager;
+import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.CombineRequest;
+import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.FetchCombineResultRequest;
+import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.FetchCombineResultResponse;
+import org.apache.iotdb.db.pipe.processor.twostage.exchange.sender.TwoStageAggregateSender;
+import org.apache.iotdb.db.pipe.processor.twostage.operator.CountOperator;
+import org.apache.iotdb.db.pipe.processor.twostage.state.CountState;
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.collector.EventCollector;
+import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class TwoStageCountProcessor implements PipeProcessor {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TwoStageCountProcessor.class);
+
+  private String pipeName;
+  private long creationTime;
+  private int regionId;
+  private PipeTaskMeta pipeTaskMeta;
+
+  private PartialPath outputSeries;
+
+  private static final String LOCAL_COUNT_STATE_KEY = "count";
+  private final AtomicLong localCount = new AtomicLong(0);
+  private final AtomicReference<ProgressIndex> localCommitProgressIndex =
+      new AtomicReference<>(MinimumProgressIndex.INSTANCE);
+
+  private final Queue<Pair<long[], ProgressIndex> /* ([timestamp, local count], progress index) */>
+      localRequestQueue = new ConcurrentLinkedQueue<>();
+  private final Queue<Pair<long[], ProgressIndex> /* ([timestamp, local count], progress index) */>
+      localCommitQueue = new ConcurrentLinkedQueue<>();
+
+  private TwoStageAggregateSender twoStageAggregateSender;
+  private final Queue<Pair<Long, Long> /* (timestamp, global count) */> globalCountQueue =
+      new ConcurrentLinkedQueue<>();
+
+  @Override
+  public void validate(PipeParameterValidator validator) throws Exception {
+    validator.validateRequiredAttribute(PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY);
+
+    final String rawOutputSeries =
+        validator.getParameters().getString(PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY);
+    try {
+      PathUtils.isLegalPath(rawOutputSeries);
+    } catch (IllegalPathException e) {
+      throw new IllegalArgumentException("Illegal output series path: " + rawOutputSeries);
+    }
+  }
+
+  @Override
+  public void customize(PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration)
+      throws Exception {
+    final PipeTaskProcessorRuntimeEnvironment runtimeEnvironment =
+        (PipeTaskProcessorRuntimeEnvironment) configuration.getRuntimeEnvironment();
+    pipeName = runtimeEnvironment.getPipeName();
+    creationTime = runtimeEnvironment.getCreationTime();
+    regionId = runtimeEnvironment.getRegionId();
+    pipeTaskMeta = runtimeEnvironment.getPipeTaskMeta();
+
+    outputSeries =
+        new PartialPath(parameters.getString(PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY));
+
+    if (Objects.nonNull(pipeTaskMeta) && Objects.nonNull(pipeTaskMeta.getProgressIndex())) {
+      if (pipeTaskMeta.getProgressIndex() instanceof MinimumProgressIndex) {
+        pipeTaskMeta.updateProgressIndex(
+            new StateProgressIndex(Long.MIN_VALUE, new HashMap<>(), MinimumProgressIndex.INSTANCE));
+      }
+
+      final StateProgressIndex stateProgressIndex =
+          (StateProgressIndex) pipeTaskMeta.getProgressIndex();
+      localCommitProgressIndex.set(stateProgressIndex.getInnerProgressIndex());
+      final Binary localCountState = stateProgressIndex.getState().get(LOCAL_COUNT_STATE_KEY);
+      localCount.set(
+          Objects.isNull(localCountState) ? 0 : Long.parseLong(localCountState.toString()));
+    }
+    LOGGER.info(
+        "TwoStageCountProcessor customized by thread {}: pipeName={}, creationTime={}, regionId={}, outputSeries={}, "
+            + "localCommitProgressIndex={}, localCount={}",
+        Thread.currentThread().getName(),
+        pipeName,
+        creationTime,
+        regionId,
+        outputSeries,
+        localCommitProgressIndex.get(),
+        localCount.get());
+
+    PipeCombineHandlerManager.getInstance()
+        .register(
+            pipeName, creationTime, (combineId) -> new CountOperator(combineId, globalCountQueue));
+    twoStageAggregateSender = new TwoStageAggregateSender(pipeName, creationTime);
+  }
+
+  @Override
+  public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector)
+      throws Exception {
+    if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent)
+        && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
+      LOGGER.warn(
+          "Ignored TabletInsertionEvent is not an instance of PipeInsertNodeTabletInsertionEvent or PipeRawTabletInsertionEvent: {}",
+          tabletInsertionEvent);
+      return;
+    }
+
+    final EnrichedEvent event = (EnrichedEvent) tabletInsertionEvent;
+    event.skipReportOnCommit();
+
+    final long count =
+        (event instanceof PipeInsertNodeTabletInsertionEvent)
+            ? ((PipeInsertNodeTabletInsertionEvent) event).count()
+            : ((PipeRawTabletInsertionEvent) event).count();
+    localCount.accumulateAndGet(count, Long::sum);
+
+    localCommitProgressIndex.set(
+        localCommitProgressIndex
+            .get()
+            .updateToMinimumEqualOrIsAfterProgressIndex(event.getProgressIndex()));
+  }
+
+  @Override
+  public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector)
+      throws Exception {
+    if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
+      LOGGER.warn(
+          "Ignored TsFileInsertionEvent is not an instance of PipeTsFileInsertionEvent: {}",
+          tsFileInsertionEvent);
+      return;
+    }
+
+    final PipeTsFileInsertionEvent event = (PipeTsFileInsertionEvent) tsFileInsertionEvent;
+    event.skipReportOnCommit();
+
+    if (!event.waitForTsFileClose()) {
+      LOGGER.warn("Ignored TsFileInsertionEvent is empty: {}", event);
+      return;
+    }
+
+    final long count = event.count(true);
+    localCount.accumulateAndGet(count, Long::sum);
+
+    localCommitProgressIndex.set(
+        localCommitProgressIndex
+            .get()
+            .updateToMinimumEqualOrIsAfterProgressIndex(event.getProgressIndex()));
+  }
+
+  @Override
+  public void process(Event event, EventCollector eventCollector) throws Exception {
+    if (event instanceof PipeHeartbeatEvent) {
+      collectGlobalCountIfNecessary(eventCollector);
+      commitLocalProgressIndexIfNecessary();
+      triggerCombineIfNecessary();
+    }
+
+    if (event instanceof PipeWatermarkEvent) {
+      triggerCombine(
+          new Pair<>(
+              new long[] {((PipeWatermarkEvent) event).getWatermark(), localCount.get()},
+              localCommitProgressIndex.get()));
+    }
+  }
+
+  private void collectGlobalCountIfNecessary(EventCollector eventCollector) throws IOException {
+    while (!globalCountQueue.isEmpty()) {
+      final Object lastCombinedValue =
+          PipeCombineHandlerManager.getInstance().getLastCombinedValue(pipeName, creationTime);
+      final Pair<Long, Long> lastCollectedTimestampCountPair =
+          Objects.isNull(lastCombinedValue)
+              ? new Pair<>(Long.MIN_VALUE, 0L)
+              : (Pair<Long, Long>) lastCombinedValue;
+
+      final Pair<Long, Long> timestampCountPair = globalCountQueue.poll();
+      if (timestampCountPair.right < lastCollectedTimestampCountPair.right) {
+        timestampCountPair.right = lastCollectedTimestampCountPair.right;
+        LOGGER.warn(
+            "Global count is less than the last collected count: timestamp={}, count={}",
+            timestampCountPair.left,
+            timestampCountPair.right);
+      }
+
+      final Tablet tablet =
+          new Tablet(
+              outputSeries.getDevice(),
+              Collections.singletonList(
+                  new MeasurementSchema(outputSeries.getMeasurement(), TSDataType.INT64)),
+              1);
+      tablet.rowSize = 1;
+      tablet.addTimestamp(0, timestampCountPair.left);
+      tablet.addValue(outputSeries.getMeasurement(), 0, timestampCountPair.right);
+
+      eventCollector.collect(
+          new PipeRawTabletInsertionEvent(tablet, false, null, null, null, false));
+
+      PipeCombineHandlerManager.getInstance()
+          .updateLastCombinedValue(pipeName, creationTime, timestampCountPair);
+    }
+  }
+
+  private void commitLocalProgressIndexIfNecessary() {
+    final int currentQueueSize = localCommitQueue.size();
+    for (int i = 0; i < currentQueueSize; i++) {
+      final Pair<long[], ProgressIndex> pair = localCommitQueue.poll();
+      if (Objects.isNull(pair)) {
+        break;
+      }
+
+      try {
+        // TODO: optimize the combine result fetching with batch fetching
+        final FetchCombineResultResponse fetchCombineResultResponse =
+            FetchCombineResultResponse.fromTPipeTransferResp(
+                twoStageAggregateSender.request(
+                    pair.left[0],
+                    FetchCombineResultRequest.toTPipeTransferReq(
+                        pipeName,
+                        creationTime,
+                        Collections.singletonList(Long.toString(pair.left[0])))));
+
+        if (fetchCombineResultResponse.getStatus().getCode()
+            != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          throw new PipeException(
+              "Failed to fetch combine result: "
+                  + fetchCombineResultResponse.getStatus().getMessage());
+        }
+
+        for (final Map.Entry<String, FetchCombineResultResponse.CombineResultType> entry :
+            fetchCombineResultResponse.getCombineId2ResultType().entrySet()) {
+          final String combineId = entry.getKey();
+          final FetchCombineResultResponse.CombineResultType resultType = entry.getValue();
+
+          switch (resultType) {
+            case OUTDATED:
+              LOGGER.warn(
+                  "Two stage combine (region id = {}, combine id = {}) outdated: timestamp={}, count={}, progressIndex={}",
+                  regionId,
+                  combineId,
+                  pair.left[0],
+                  pair.left[1],
+                  pair.right);
+              continue;
+            case INCOMPLETE:
+              LOGGER.info(
+                  "Two stage combine (region id = {}, combine id = {}) incomplete: timestamp={}, count={}, progressIndex={}",
+                  regionId,
+                  combineId,
+                  pair.left[0],
+                  pair.left[1],
+                  pair.right);
+              localCommitQueue.add(pair);
+              continue;
+            case SUCCESS:
+              final Map<String, Binary> state = new HashMap<>();
+              state.put(LOCAL_COUNT_STATE_KEY, new Binary(Long.toString(pair.left[1]).getBytes()));
+              pipeTaskMeta.updateProgressIndex(
+                  new StateProgressIndex(pair.left[0], state, pair.right));
+              LOGGER.info(
+                  "Two stage combine (region id = {}, combine id = {}) success: timestamp={}, count={}, progressIndex={}, committed progressIndex={}",
+                  regionId,
+                  combineId,
+                  pair.left[0],
+                  pair.left[1],
+                  pair.right,
+                  pipeTaskMeta.getProgressIndex());
+              continue;
+            default:
+              throw new PipeException("Unknown combine result type: " + resultType);
+          }
+        }
+      } catch (Exception e) {
+        localCommitQueue.add(pair);
+        LOGGER.warn(
+            "Failure occurred when trying to commit progress index. timestamp={}, count={}, progressIndex={}",
+            pair.left[0],
+            pair.left[1],
+            pair.right,
+            e);
+        return;
+      }
+    }
+  }
+
+  private void triggerCombineIfNecessary() {
+    while (!localRequestQueue.isEmpty()) {
+      if (!triggerCombine(localRequestQueue.poll())) {
+        return;
+      }
+    }
+  }
+
+  private boolean triggerCombine(Pair<long[], ProgressIndex> pair) {
+    final long watermark = pair.getLeft()[0];
+    final long count = pair.getLeft()[1];
+    final ProgressIndex progressIndex = pair.getRight();
+    try {
+      final TPipeTransferResp resp =
+          twoStageAggregateSender.request(
+              watermark,
+              CombineRequest.toTPipeTransferReq(
+                  pipeName,
+                  creationTime,
+                  regionId,
+                  Long.toString(watermark),
+                  new CountState(count)));
+      if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        throw new PipeException("Failed to combine count: " + resp.getStatus().getMessage());
+      }
+      localCommitQueue.add(pair);
+      return true;
+    } catch (Exception e) {
+      localRequestQueue.add(pair);
+      LOGGER.warn(
+          "Failed to trigger combine. watermark={}, count={}, progressIndex={}",
+          watermark,
+          count,
+          progressIndex,
+          e);
+      return false;
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (Objects.nonNull(twoStageAggregateSender)) {
+      twoStageAggregateSender.close();
+    }
+    PipeCombineHandlerManager.getInstance().deregister(pipeName, creationTime);
+  }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/CountState.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/CountState.java
new file mode 100644
index 0000000..0370131
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/CountState.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.state;
+
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public class CountState implements State {
+
+  private long count;
+
+  public CountState() {
+    // For reflection
+  }
+
+  public CountState(long count) {
+    this.count = count;
+  }
+
+  public long getCount() {
+    return count;
+  }
+
+  @Override
+  public void serialize(OutputStream outputStream) throws IOException {
+    ReadWriteIOUtils.write(count, outputStream);
+  }
+
+  @Override
+  public void deserialize(ByteBuffer byteBuffer) {
+    count = ReadWriteIOUtils.readLong(byteBuffer);
+  }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/State.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/State.java
new file mode 100644
index 0000000..36d393e
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/State.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.state;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public interface State {
+
+  void serialize(OutputStream outputStream) throws IOException;
+
+  void deserialize(ByteBuffer byteBuffer);
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index ea8a1ef..d8b4c12 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -201,8 +201,9 @@
           receiverId.get(),
           status);
       return new TPipeTransferResp(status);
-    } catch (final IOException e) {
-      final String error = String.format("Serialization error during pipe receiving, %s", e);
+    } catch (Exception e) {
+      final String error =
+          String.format("Exception %s encountered while handling request %s.", e.getMessage(), req);
       LOGGER.warn("Receiver id = {}: {}", receiverId.get(), error, e);
       return new TPipeTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_ERROR, error));
     }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java
index 932123a..e0016ec 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java
@@ -22,6 +22,7 @@
 import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
 import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiver;
 import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiverAgent;
+import org.apache.iotdb.db.pipe.processor.twostage.exchange.receiver.TwoStageAggregateReceiver;
 
 public class IoTDBDataNodeReceiverAgent extends IoTDBReceiverAgent {
 
@@ -31,6 +32,8 @@
   protected void initConstructors() {
     RECEIVER_CONSTRUCTORS.put(
         IoTDBConnectorRequestVersion.VERSION_1.getVersion(), IoTDBDataNodeReceiver::new);
+    RECEIVER_CONSTRUCTORS.put(
+        IoTDBConnectorRequestVersion.VERSION_2.getVersion(), TwoStageAggregateReceiver::new);
   }
 
   @Override
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 5057ea0..4e4e76e 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -229,6 +229,10 @@
   private long pipeListeningQueueTransferSnapshotThreshold = 1000;
   private int pipeSnapshotExecutionMaxBatchSize = 1000;
 
+  private long twoStageAggregateMaxCombinerLiveTimeInMs = 8 * 60 * 1000; // 8 minutes
+  private long twoStageAggregateDataRegionInfoCacheTimeInMs = 3 * 60 * 1000; // 3 minutes
+  private long twoStageAggregateSenderEndPointsCacheInMs = 3 * 60 * 1000; // 3 minutes
+
   private int subscriptionSubtaskExecutorMaxThreadNum =
       Math.min(5, Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
   private int subscriptionMaxTabletsPerPrefetching = 16;
@@ -970,6 +974,34 @@
     this.pipeSnapshotExecutionMaxBatchSize = pipeSnapshotExecutionMaxBatchSize;
   }
 
+  public long getTwoStageAggregateMaxCombinerLiveTimeInMs() {
+    return twoStageAggregateMaxCombinerLiveTimeInMs;
+  }
+
+  public void setTwoStageAggregateMaxCombinerLiveTimeInMs(
+      long twoStageAggregateMaxCombinerLiveTimeInMs) {
+    this.twoStageAggregateMaxCombinerLiveTimeInMs = twoStageAggregateMaxCombinerLiveTimeInMs;
+  }
+
+  public long getTwoStageAggregateDataRegionInfoCacheTimeInMs() {
+    return twoStageAggregateDataRegionInfoCacheTimeInMs;
+  }
+
+  public void setTwoStageAggregateDataRegionInfoCacheTimeInMs(
+      long twoStageAggregateDataRegionInfoCacheTimeInMs) {
+    this.twoStageAggregateDataRegionInfoCacheTimeInMs =
+        twoStageAggregateDataRegionInfoCacheTimeInMs;
+  }
+
+  public long getTwoStageAggregateSenderEndPointsCacheInMs() {
+    return twoStageAggregateSenderEndPointsCacheInMs;
+  }
+
+  public void setTwoStageAggregateSenderEndPointsCacheInMs(
+      long twoStageAggregateSenderEndPointsCacheInMs) {
+    this.twoStageAggregateSenderEndPointsCacheInMs = twoStageAggregateSenderEndPointsCacheInMs;
+  }
+
   public int getSubscriptionSubtaskExecutorMaxThreadNum() {
     return subscriptionSubtaskExecutorMaxThreadNum;
   }
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index f64a79e..04bd5f1 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -516,11 +516,28 @@
             properties.getProperty(
                 "pipe_listening_queue_transfer_snapshot_threshold",
                 String.valueOf(config.getPipeListeningQueueTransferSnapshotThreshold()))));
+
     config.setPipeSnapshotExecutionMaxBatchSize(
         Integer.parseInt(
             properties.getProperty(
                 "pipe_snapshot_execution_max_batch_size",
                 String.valueOf(config.getPipeSnapshotExecutionMaxBatchSize()))));
+
+    config.setTwoStageAggregateMaxCombinerLiveTimeInMs(
+        Long.parseLong(
+            properties.getProperty(
+                "two_stage_aggregate_max_combiner_live_time_in_ms",
+                String.valueOf(config.getTwoStageAggregateMaxCombinerLiveTimeInMs()))));
+    config.setTwoStageAggregateDataRegionInfoCacheTimeInMs(
+        Long.parseLong(
+            properties.getProperty(
+                "two_stage_aggregate_data_region_info_cache_time_in_ms",
+                String.valueOf(config.getTwoStageAggregateDataRegionInfoCacheTimeInMs()))));
+    config.setTwoStageAggregateSenderEndPointsCacheInMs(
+        Long.parseLong(
+            properties.getProperty(
+                "two_stage_aggregate_sender_end_points_cache_in_ms",
+                String.valueOf(config.getTwoStageAggregateSenderEndPointsCacheInMs()))));
   }
 
   private void loadSubscriptionProps(Properties properties) {
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
index 494d42a..ad82065 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
@@ -21,6 +21,7 @@
 
 import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
 
 import com.google.common.collect.ImmutableList;
 
@@ -169,8 +170,10 @@
       return progressIndex1; // progressIndex1 is not null
     }
 
-    return new HybridProgressIndex(progressIndex1)
-        .updateToMinimumEqualOrIsAfterProgressIndex(progressIndex2);
+    return progressIndex1 instanceof StateProgressIndex
+        ? progressIndex1.updateToMinimumEqualOrIsAfterProgressIndex(progressIndex2)
+        : new HybridProgressIndex(progressIndex1)
+            .updateToMinimumEqualOrIsAfterProgressIndex(progressIndex2);
   }
 
   /**
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java
index c5c2ed6..58548e1 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java
@@ -25,6 +25,7 @@
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex;
 
 import org.apache.tsfile.utils.ReadWriteIOUtils;
@@ -42,7 +43,7 @@
   HYBRID_PROGRESS_INDEX((short) 5),
   META_PROGRESS_INDEX((short) 6),
   TIME_WINDOW_STATE_PROGRESS_INDEX((short) 7),
-  ;
+  STATE_PROGRESS_INDEX((short) 8);
 
   private final short type;
 
@@ -79,6 +80,8 @@
         return MetaProgressIndex.deserializeFrom(byteBuffer);
       case 7:
         return TimeWindowStateProgressIndex.deserializeFrom(byteBuffer);
+      case 8:
+        return StateProgressIndex.deserializeFrom(byteBuffer);
       default:
         throw new UnsupportedOperationException(
             String.format("Unsupported progress index type %s.", indexType));
@@ -102,6 +105,8 @@
         return MetaProgressIndex.deserializeFrom(stream);
       case 7:
         return TimeWindowStateProgressIndex.deserializeFrom(stream);
+      case 8:
+        return StateProgressIndex.deserializeFrom(stream);
       default:
         throw new UnsupportedOperationException(
             String.format("Unsupported progress index type %s.", indexType));
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
index c3955ce..8e4541e 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
@@ -173,6 +173,10 @@
         return this;
       }
 
+      if (progressIndex instanceof StateProgressIndex) {
+        return progressIndex.updateToMinimumEqualOrIsAfterProgressIndex(this);
+      }
+
       if (!(progressIndex instanceof HybridProgressIndex)) {
         type2Index.compute(
             progressIndex.getType().getType(),
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MetaProgressIndex.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MetaProgressIndex.java
index 8fb6c26..679eecb 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MetaProgressIndex.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MetaProgressIndex.java
@@ -138,7 +138,7 @@
     lock.writeLock().lock();
     try {
       if (!(progressIndex instanceof MetaProgressIndex)) {
-        return this;
+        return ProgressIndex.blendProgressIndex(this, progressIndex);
       }
 
       this.index = Math.max(this.index, ((MetaProgressIndex) progressIndex).index);
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/StateProgressIndex.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/StateProgressIndex.java
new file mode 100644
index 0000000..8b44edf
--- /dev/null
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/StateProgressIndex.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.consensus.index.impl;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
+
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class StateProgressIndex extends ProgressIndex {
+
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+  private long version;
+  private Map<String, Binary> state;
+  private ProgressIndex innerProgressIndex;
+
+  public StateProgressIndex(
+      long version, Map<String, Binary> state, ProgressIndex innerProgressIndex) {
+    this.version = version;
+    this.state = state;
+    this.innerProgressIndex = innerProgressIndex;
+  }
+
+  public long getVersion() {
+    return version;
+  }
+
+  public ProgressIndex getInnerProgressIndex() {
+    return innerProgressIndex == null ? MinimumProgressIndex.INSTANCE : innerProgressIndex;
+  }
+
+  public Map<String, Binary> getState() {
+    return state;
+  }
+
+  @Override
+  public void serialize(ByteBuffer byteBuffer) {
+    lock.readLock().lock();
+    try {
+      ProgressIndexType.STATE_PROGRESS_INDEX.serialize(byteBuffer);
+
+      ReadWriteIOUtils.write(version, byteBuffer);
+
+      ReadWriteIOUtils.write(state.size(), byteBuffer);
+      for (final Map.Entry<String, Binary> entry : state.entrySet()) {
+        ReadWriteIOUtils.write(entry.getKey(), byteBuffer);
+        ReadWriteIOUtils.write(entry.getValue(), byteBuffer);
+      }
+
+      innerProgressIndex.serialize(byteBuffer);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public void serialize(OutputStream stream) throws IOException {
+    lock.readLock().lock();
+    try {
+      ProgressIndexType.STATE_PROGRESS_INDEX.serialize(stream);
+
+      ReadWriteIOUtils.write(version, stream);
+
+      ReadWriteIOUtils.write(state.size(), stream);
+      for (final Map.Entry<String, Binary> entry : state.entrySet()) {
+        ReadWriteIOUtils.write(entry.getKey(), stream);
+        ReadWriteIOUtils.write(entry.getValue(), stream);
+      }
+
+      innerProgressIndex.serialize(stream);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public boolean isAfter(@Nonnull ProgressIndex progressIndex) {
+    lock.readLock().lock();
+    try {
+      if (progressIndex instanceof MinimumProgressIndex) {
+        return innerProgressIndex.isAfter(progressIndex);
+      }
+
+      if (progressIndex instanceof HybridProgressIndex) {
+        return ((HybridProgressIndex) progressIndex)
+            .isGivenProgressIndexAfterSelf(innerProgressIndex);
+      }
+
+      if (!(progressIndex instanceof StateProgressIndex)) {
+        return false;
+      }
+
+      return innerProgressIndex.isAfter(((StateProgressIndex) progressIndex).innerProgressIndex)
+          && version > ((StateProgressIndex) progressIndex).version;
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public boolean equals(ProgressIndex progressIndex) {
+    lock.readLock().lock();
+    try {
+      return progressIndex instanceof StateProgressIndex
+          && innerProgressIndex.equals(((StateProgressIndex) progressIndex).innerProgressIndex)
+          && version == ((StateProgressIndex) progressIndex).version;
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+    }
+    if (this == obj) {
+      return true;
+    }
+    if (!(obj instanceof StateProgressIndex)) {
+      return false;
+    }
+    return this.equals((StateProgressIndex) obj);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(innerProgressIndex, version);
+  }
+
+  @Override
+  public ProgressIndex updateToMinimumEqualOrIsAfterProgressIndex(ProgressIndex progressIndex) {
+    lock.writeLock().lock();
+    try {
+      innerProgressIndex =
+          innerProgressIndex.updateToMinimumEqualOrIsAfterProgressIndex(
+              progressIndex instanceof StateProgressIndex
+                  ? ((StateProgressIndex) progressIndex).innerProgressIndex
+                  : progressIndex);
+      if (progressIndex instanceof StateProgressIndex
+          && version <= ((StateProgressIndex) progressIndex).version) {
+        version = ((StateProgressIndex) progressIndex).version;
+        state = ((StateProgressIndex) progressIndex).state;
+      }
+      return this;
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public ProgressIndexType getType() {
+    return ProgressIndexType.STATE_PROGRESS_INDEX;
+  }
+
+  @Override
+  public TotalOrderSumTuple getTotalOrderSumTuple() {
+    return innerProgressIndex.getTotalOrderSumTuple();
+  }
+
+  public static StateProgressIndex deserializeFrom(ByteBuffer byteBuffer) {
+    final long version = ReadWriteIOUtils.readLong(byteBuffer);
+
+    final Map<String, Binary> state = new HashMap<>();
+    final int size = ReadWriteIOUtils.readInt(byteBuffer);
+    for (int i = 0; i < size; ++i) {
+      final String key = ReadWriteIOUtils.readString(byteBuffer);
+      final Binary value = ReadWriteIOUtils.readBinary(byteBuffer);
+      state.put(key, value);
+    }
+
+    final ProgressIndex progressIndex = ProgressIndexType.deserializeFrom(byteBuffer);
+
+    return new StateProgressIndex(version, state, progressIndex);
+  }
+
+  public static StateProgressIndex deserializeFrom(InputStream stream) throws IOException {
+    final long version = ReadWriteIOUtils.readLong(stream);
+
+    final Map<String, Binary> state = new HashMap<>();
+    final int size = ReadWriteIOUtils.readInt(stream);
+    for (int i = 0; i < size; ++i) {
+      final String key = ReadWriteIOUtils.readString(stream);
+      final Binary value = ReadWriteIOUtils.readBinary(stream);
+      state.put(key, value);
+    }
+
+    final ProgressIndex progressIndex = ProgressIndexType.deserializeFrom(stream);
+
+    return new StateProgressIndex(version, state, progressIndex);
+  }
+
+  @Override
+  public String toString() {
+    return "StateProgressIndex{"
+        + "version="
+        + version
+        + ", state="
+        + state
+        + ", innerProgressIndex="
+        + innerProgressIndex
+        + '}';
+  }
+}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 6fab25c..7a0391d 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -245,6 +245,20 @@
     return COMMON_CONFIG.getPipeMemoryExpanderIntervalSeconds();
   }
 
+  /////////////////////////////// TwoStage ///////////////////////////////
+
+  public long getTwoStageAggregateMaxCombinerLiveTimeInMs() {
+    return COMMON_CONFIG.getTwoStageAggregateMaxCombinerLiveTimeInMs();
+  }
+
+  public long getTwoStageAggregateDataRegionInfoCacheTimeInMs() {
+    return COMMON_CONFIG.getTwoStageAggregateDataRegionInfoCacheTimeInMs();
+  }
+
+  public long getTwoStageAggregateSenderEndPointsCacheInMs() {
+    return COMMON_CONFIG.getTwoStageAggregateSenderEndPointsCacheInMs();
+  }
+
   /////////////////////////////// Utils ///////////////////////////////
 
   private static final Logger LOGGER = LoggerFactory.getLogger(PipeConfig.class);
@@ -337,6 +351,17 @@
     LOGGER.info(
         "PipeMemoryAllocateForTsFileSequenceReaderInBytes: {}",
         getPipeMemoryAllocateForTsFileSequenceReaderInBytes());
+    LOGGER.info("PipeMemoryExpanderIntervalSeconds: {}", getPipeMemoryExpanderIntervalSeconds());
+
+    LOGGER.info(
+        "TwoStageAggregateMaxCombinerLiveTimeInMs: {}",
+        getTwoStageAggregateMaxCombinerLiveTimeInMs());
+    LOGGER.info(
+        "TwoStageAggregateDataRegionInfoCacheTimeInMs: {}",
+        getTwoStageAggregateDataRegionInfoCacheTimeInMs());
+    LOGGER.info(
+        "TwoStageAggregateSenderEndPointsCacheInMs: {}",
+        getTwoStageAggregateSenderEndPointsCacheInMs());
   }
 
   /////////////////////////////// Singleton ///////////////////////////////
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
index 80c102a..05ed098 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
@@ -79,6 +79,10 @@
   public static final String EXTRACTOR_END_TIME_KEY = "extractor.end-time";
   public static final String SOURCE_END_TIME_KEY = "source.end-time";
 
+  public static final String EXTRACTOR_WATERMARK_INTERVAL_KEY = "extractor.watermark-interval-ms";
+  public static final String SOURCE_WATERMARK_INTERVAL_KEY = "source.watermark-interval-ms";
+  public static final long EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE = -1; // -1 means no watermark
+
   private PipeExtractorConstant() {
     throw new IllegalStateException("Utility class");
   }
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
index 719fa3c..49fa2e2 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
@@ -66,6 +66,8 @@
       "processor.sdt.max-time-interval";
   public static final long PROCESSOR_SDT_MAX_TIME_INTERVAL_DEFAULT_VALUE = Long.MAX_VALUE;
 
+  public static final String PROCESSOR_OUTPUT_SERIES_KEY = "processor.output-series";
+
   private PipeProcessorConstant() {
     throw new IllegalStateException("Utility class");
   }
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
index 9f853dd..74219ef 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
@@ -21,6 +21,7 @@
 
 public enum IoTDBConnectorRequestVersion {
   VERSION_1((byte) 1),
+  VERSION_2((byte) 2),
   ;
 
   private final byte version;
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index a7fc2da..511282f 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -38,6 +38,7 @@
 import org.apache.iotdb.commons.pipe.plugin.builtin.processor.downsampling.SwingingDoorTrendingSamplingProcessor;
 import org.apache.iotdb.commons.pipe.plugin.builtin.processor.downsampling.TumblingTimeSamplingProcessor;
 import org.apache.iotdb.commons.pipe.plugin.builtin.processor.throwing.ThrowingExceptionProcessor;
+import org.apache.iotdb.commons.pipe.plugin.builtin.processor.twostage.TwoStageCountProcessor;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -60,6 +61,7 @@
   SDT_SAMPLING_PROCESSOR("sdt-sampling-processor", SwingingDoorTrendingSamplingProcessor.class),
   THROWING_EXCEPTION_PROCESSOR("throwing-exception-processor", ThrowingExceptionProcessor.class),
   AGGREGATE_PROCESSOR("aggregate-processor", AggregateProcessor.class),
+  COUNT_POINT_PROCESSOR("count-point-processor", TwoStageCountProcessor.class),
 
   // Hidden-processors, which are plugins of the processors
   STANDARD_STATISTICS_PROCESSOR("standard-statistics-processor", StandardStatisticsProcessor.class),
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/twostage/TwoStageCountProcessor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/twostage/TwoStageCountProcessor.java
new file mode 100644
index 0000000..b8bf577
--- /dev/null
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/twostage/TwoStageCountProcessor.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.plugin.builtin.processor.twostage;
+
+import org.apache.iotdb.commons.pipe.plugin.builtin.processor.PlaceHolderProcessor;
+
+/**
+ * This class is a placeholder and should not be initialized. It represents the
+ * TwoStageCountProcessor. There is a real implementation in the server module but cannot be
+ * imported here. The pipe agent in the server module will replace this class with the real
+ * implementation when initializing the TwoStageCountProcessor.
+ */
+public class TwoStageCountProcessor extends PlaceHolderProcessor {}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 91b8cbe..c6c7f12 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -99,7 +99,7 @@
               "Receiver id = {}: Original receiver file dir {} was deleted.",
               receiverId.get(),
               receiverFileDirWithIdSuffix.get().getPath());
-        } catch (IOException e) {
+        } catch (Exception e) {
           LOGGER.warn(
               "Receiver id = {}: Failed to delete original receiver file dir {}, because {}.",
               receiverId.get(),
@@ -278,7 +278,7 @@
       try {
         return PipeTransferFilePieceResp.toTPipeTransferResp(
             status, PipeTransferFilePieceResp.ERROR_END_OFFSET);
-      } catch (IOException ex) {
+      } catch (Exception ex) {
         return PipeTransferFilePieceResp.toTPipeTransferResp(status);
       }
     }
@@ -340,7 +340,7 @@
             "Receiver id = {}: Current writing file writer {} was closed.",
             receiverId.get(),
             writingFile == null ? "null" : writingFile.getPath());
-      } catch (IOException e) {
+      } catch (Exception e) {
         LOGGER.warn(
             "Receiver id = {}: Failed to close current writing file writer {}, because {}.",
             receiverId.get(),
@@ -377,7 +377,7 @@
             "Receiver id = {}: Original writing file {} was deleted.",
             receiverId.get(),
             file.getPath());
-      } catch (IOException e) {
+      } catch (Exception e) {
         LOGGER.warn(
             "Receiver id = {}: Failed to delete original writing file {}, because {}.",
             receiverId.get(),
@@ -454,7 +454,7 @@
             status.getMessage());
       }
       return new TPipeTransferResp(status);
-    } catch (IOException e) {
+    } catch (Exception e) {
       LOGGER.warn(
           "Receiver id = {}: Failed to seal file {} from req {}.",
           receiverId.get(),
@@ -534,7 +534,7 @@
             status);
       }
       return new TPipeTransferResp(status);
-    } catch (IOException | IllegalPathException e) {
+    } catch (Exception e) {
       LOGGER.warn(
           "Receiver id = {}: Failed to seal file {} from req {}.", receiverId.get(), files, req, e);
       return new TPipeTransferResp(
@@ -701,7 +701,7 @@
               "Receiver id = {}: Handling exit: Original receiver file dir {} was deleted.",
               receiverId.get(),
               receiverFileDirWithIdSuffix.get().getPath());
-        } catch (IOException e) {
+        } catch (Exception e) {
           LOGGER.warn(
               "Receiver id = {}: Handling exit: Delete original receiver file dir {} error.",
               receiverId.get(),