Pipe: Two stage aggregate framework & count-point-processor plugin (#12328)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
index bc33c6d..15578f2 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
@@ -29,6 +29,7 @@
import org.apache.iotdb.db.pipe.processor.aggregate.window.processor.TumblingWindowingProcessor;
import org.apache.iotdb.db.pipe.processor.downsampling.sdt.SwingingDoorTrendingSamplingProcessor;
import org.apache.iotdb.db.pipe.processor.downsampling.tumbling.TumblingTimeSamplingProcessor;
+import org.apache.iotdb.db.pipe.processor.twostage.plugin.TwoStageCountProcessor;
class PipeDataRegionProcessorConstructor extends PipeProcessorConstructor {
@@ -57,5 +58,7 @@
pluginConstructors.put(
BuiltinPipePlugin.TUMBLING_WINDOWING_PROCESSOR.getPipePluginName(),
TumblingWindowingProcessor::new);
+ pluginConstructors.put(
+ BuiltinPipePlugin.COUNT_POINT_PROCESSOR.getPipePluginName(), TwoStageCountProcessor::new);
}
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeDataNodeReceiverAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeDataNodeReceiverAgent.java
index 555696d..31d38d0 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeDataNodeReceiverAgent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeDataNodeReceiverAgent.java
@@ -25,17 +25,12 @@
import org.apache.iotdb.db.pipe.receiver.protocol.legacy.IoTDBLegacyPipeReceiverAgent;
import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiverAgent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.File;
import java.util.Arrays;
/** {@link PipeDataNodeReceiverAgent} is the entry point of all pipe receivers' logic. */
public class PipeDataNodeReceiverAgent {
- private static final Logger LOGGER = LoggerFactory.getLogger(PipeDataNodeReceiverAgent.class);
-
private final IoTDBDataNodeReceiverAgent thriftAgent;
private final IoTDBAirGapReceiverAgent airGapAgent;
private final IoTDBLegacyPipeReceiverAgent legacyAgent;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 8c806fe..b8ace00 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -411,4 +411,13 @@
// Set pipe meta status to STOPPED
pipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
}
+
+ ///////////////////////// Utils /////////////////////////
+
+ public Set<Integer> getPipeTaskRegionIdSet(String pipeName, long creationTime) {
+ final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+ return pipeMeta == null || pipeMeta.getStaticMeta().getCreationTime() != creationTime
+ ? Collections.emptySet()
+ : pipeMeta.getRuntimeMeta().getConsensusGroupId2TaskMetaMap().keySet();
+ }
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index c7821e4..798cb1d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -251,6 +251,11 @@
}
}
+ public long count() {
+ final Tablet covertedTablet = convertToTablet();
+ return (long) covertedTablet.rowSize * covertedTablet.getSchemas().size();
+ }
+
/////////////////////////// parsePatternOrTime ///////////////////////////
@Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index dccfd0e..f227475 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -225,6 +225,11 @@
return dataContainer.convertToTablet();
}
+ public long count() {
+ final Tablet covertedTablet = shouldParseTimeOrPattern() ? convertToTablet() : tablet;
+ return (long) covertedTablet.rowSize * covertedTablet.getSchemas().size();
+ }
+
/////////////////////////// parsePatternOrTime ///////////////////////////
public PipeRawTabletInsertionEvent parseEventWithPatternOrTime() {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 8e3fa5b..c609eb9 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -24,6 +24,7 @@
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
@@ -323,6 +324,30 @@
}
}
+ public long count(boolean skipReportOnCommit) throws IOException {
+ long count = 0;
+
+ if (shouldParseTime()) {
+ try {
+ for (final TabletInsertionEvent event : toTabletInsertionEvents()) {
+ final PipeRawTabletInsertionEvent rawEvent = ((PipeRawTabletInsertionEvent) event);
+ count += rawEvent.count();
+ if (skipReportOnCommit) {
+ rawEvent.skipReportOnCommit();
+ }
+ }
+ return count;
+ } finally {
+ close();
+ }
+ }
+
+ try (final TsFileInsertionPointCounter counter =
+ new TsFileInsertionPointCounter(tsFile, pipePattern)) {
+ return counter.count();
+ }
+ }
+
/** Release the resource of {@link TsFileInsertionDataContainer}. */
@Override
public void close() {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java
new file mode 100644
index 0000000..086b03c
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tsfile;
+
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
+
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.PlainDeviceID;
+import org.apache.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public class TsFileInsertionPointCounter implements AutoCloseable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TsFileInsertionPointCounter.class);
+
+ private final PipePattern pattern;
+
+ private final TsFileSequenceReader tsFileSequenceReader;
+
+ final Map<IDeviceID, Set<String>> filteredDeviceMeasurementMap;
+ final Map<IDeviceID, List<TimeseriesMetadata>> allDeviceTimeseriesMetadataMap;
+
+ private boolean shouldParsePattern = false;
+
+ private long count = 0;
+
+ public TsFileInsertionPointCounter(File tsFile, PipePattern pattern) throws IOException {
+ this.pattern = pattern;
+
+ try {
+ tsFileSequenceReader = new TsFileSequenceReader(tsFile.getPath(), true, true);
+
+ filteredDeviceMeasurementMap = filterDeviceMeasurementsMapByPattern();
+ allDeviceTimeseriesMetadataMap = tsFileSequenceReader.getAllTimeseriesMetadata(false);
+
+ if (shouldParsePattern) {
+ countMatchedTimeseriesPoints();
+ } else {
+ countAllTimeseriesPoints();
+ }
+
+ // No longer need this. Help GC.
+ tsFileSequenceReader.clearCachedDeviceMetadata();
+ } catch (Exception e) {
+ close();
+ throw e;
+ }
+ }
+
+ private Map<IDeviceID, Set<String>> filterDeviceMeasurementsMapByPattern() throws IOException {
+ final Map<IDeviceID, List<String>> originalDeviceMeasurementsMap =
+ tsFileSequenceReader.getDeviceMeasurementsMap();
+ final Map<IDeviceID, Set<String>> filteredDeviceMeasurementsMap = new HashMap<>();
+
+ for (Map.Entry<IDeviceID, List<String>> entry : originalDeviceMeasurementsMap.entrySet()) {
+ final String deviceId = ((PlainDeviceID) entry.getKey()).toStringID();
+
+ // case 1: for example, pattern is root.a.b or pattern is null and device is root.a.b.c
+ // in this case, all data can be matched without checking the measurements
+ if (Objects.isNull(pattern) || pattern.isRoot() || pattern.coversDevice(deviceId)) {
+ if (!entry.getValue().isEmpty()) {
+ filteredDeviceMeasurementsMap.put(
+ new PlainDeviceID(deviceId), new HashSet<>(entry.getValue()));
+ }
+ }
+
+ // case 2: for example, pattern is root.a.b.c and device is root.a.b
+ // in this case, we need to check the full path
+ else if (pattern.mayOverlapWithDevice(deviceId)) {
+ final Set<String> filteredMeasurements = new HashSet<>();
+
+ for (final String measurement : entry.getValue()) {
+ if (pattern.matchesMeasurement(deviceId, measurement)) {
+ filteredMeasurements.add(measurement);
+ } else {
+ // Parse pattern iff there are measurements filtered out
+ shouldParsePattern = true;
+ }
+ }
+
+ if (!filteredMeasurements.isEmpty()) {
+ filteredDeviceMeasurementsMap.put(new PlainDeviceID(deviceId), filteredMeasurements);
+ }
+ }
+
+ // case 3: for example, pattern is root.a.b.c and device is root.a.b.d
+ // in this case, no data can be matched
+ else {
+ // Parse pattern iff there are measurements filtered out
+ shouldParsePattern = true;
+ }
+ }
+
+ return filteredDeviceMeasurementsMap;
+ }
+
+ private void countMatchedTimeseriesPoints() {
+ for (final Map.Entry<IDeviceID, List<TimeseriesMetadata>> deviceTimeseriesMetadataEntry :
+ allDeviceTimeseriesMetadataMap.entrySet()) {
+ final IDeviceID deviceId = deviceTimeseriesMetadataEntry.getKey();
+ if (!filteredDeviceMeasurementMap.containsKey(deviceId)) {
+ continue;
+ }
+
+ final List<TimeseriesMetadata> allTimeseriesMetadata =
+ deviceTimeseriesMetadataEntry.getValue();
+ final Set<String> filteredMeasurements = filteredDeviceMeasurementMap.get(deviceId);
+ for (final TimeseriesMetadata timeseriesMetadata : allTimeseriesMetadata) {
+ if (!filteredMeasurements.contains(timeseriesMetadata.getMeasurementId())) {
+ continue;
+ }
+
+ count += timeseriesMetadata.getStatistics().getCount();
+ }
+ }
+ }
+
+ private void countAllTimeseriesPoints() {
+ for (final List<TimeseriesMetadata> allTimeseriesMetadata :
+ allDeviceTimeseriesMetadataMap.values()) {
+ for (final TimeseriesMetadata timeseriesMetadata : allTimeseriesMetadata) {
+ count += timeseriesMetadata.getStatistics().getCount();
+ }
+ }
+ }
+
+ public long count() {
+ return count;
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (tsFileSequenceReader != null) {
+ tsFileSequenceReader.close();
+ }
+ } catch (IOException e) {
+ LOGGER.warn("Failed to close TsFileSequenceReader", e);
+ }
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/watermark/PipeWatermarkEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/watermark/PipeWatermarkEvent.java
new file mode 100644
index 0000000..a46d83e
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/watermark/PipeWatermarkEvent.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.watermark;
+
+import org.apache.iotdb.pipe.api.event.Event;
+
+public class PipeWatermarkEvent implements Event {
+
+ private final long watermark;
+
+ public PipeWatermarkEvent(long watermark) {
+ this.watermark = watermark;
+ }
+
+ public long getWatermark() {
+ return watermark;
+ }
+
+ @Override
+ public String toString() {
+ return "PipeWatermarkEvent{" + "watermark=" + watermark + '}';
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionWatermarkInjector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionWatermarkInjector.java
new file mode 100644
index 0000000..5d8bbf6
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/DataRegionWatermarkInjector.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.extractor.dataregion;
+
+import org.apache.iotdb.db.pipe.event.common.watermark.PipeWatermarkEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataRegionWatermarkInjector {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DataRegionWatermarkInjector.class);
+
+ public static final long MIN_INJECTION_INTERVAL_IN_MS = 30 * 1000; // 30s
+
+ private final int regionId;
+
+ private final long injectionIntervalInMs;
+ private long nextInjectionTime;
+
+ public DataRegionWatermarkInjector(int regionId, long injectionIntervalInMs) {
+ this.regionId = regionId;
+ this.injectionIntervalInMs =
+ Math.max(injectionIntervalInMs, MIN_INJECTION_INTERVAL_IN_MS)
+ / MIN_INJECTION_INTERVAL_IN_MS
+ * MIN_INJECTION_INTERVAL_IN_MS;
+ this.nextInjectionTime = calculateNextInjectionTime(this.injectionIntervalInMs);
+ }
+
+ public long getInjectionIntervalInMs() {
+ return injectionIntervalInMs;
+ }
+
+ public long getNextInjectionTime() {
+ return nextInjectionTime;
+ }
+
+ public PipeWatermarkEvent inject() {
+ if (System.currentTimeMillis() < nextInjectionTime) {
+ return null;
+ }
+
+ try {
+ final PipeWatermarkEvent watermarkEvent = new PipeWatermarkEvent(nextInjectionTime);
+ nextInjectionTime = calculateNextInjectionTime(injectionIntervalInMs);
+ return watermarkEvent;
+ } finally {
+ LOGGER.info(
+ "Data region {}: Injected watermark event with timestamp: {}",
+ regionId,
+ nextInjectionTime);
+ }
+ }
+
+ private static long calculateNextInjectionTime(long injectionIntervalInMs) {
+ final long currentTime = System.currentTimeMillis();
+ return currentTime / injectionIntervalInMs * injectionIntervalInMs + injectionIntervalInMs;
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index c8b4d2b..b429192 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -66,6 +66,8 @@
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_LOG_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_WATERMARK_INTERVAL_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY;
@@ -74,6 +76,7 @@
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_MODE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_WATERMARK_INTERVAL_KEY;
public class IoTDBDataRegionExtractor extends IoTDBExtractor {
@@ -82,6 +85,8 @@
private PipeHistoricalDataRegionExtractor historicalExtractor;
private PipeRealtimeDataRegionExtractor realtimeExtractor;
+ private DataRegionWatermarkInjector watermarkInjector;
+
private boolean hasNoExtractionNeed = true;
@Override
@@ -255,6 +260,23 @@
historicalExtractor.customize(parameters, configuration);
realtimeExtractor.customize(parameters, configuration);
+ // Set watermark injector
+ if (parameters.hasAnyAttributes(
+ EXTRACTOR_WATERMARK_INTERVAL_KEY, SOURCE_WATERMARK_INTERVAL_KEY)) {
+ final long watermarkIntervalInMs =
+ parameters.getLongOrDefault(
+ Arrays.asList(EXTRACTOR_WATERMARK_INTERVAL_KEY, SOURCE_WATERMARK_INTERVAL_KEY),
+ EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE);
+ if (watermarkIntervalInMs > 0) {
+ watermarkInjector = new DataRegionWatermarkInjector(regionId, watermarkIntervalInMs);
+ LOGGER.info(
+ "Pipe {}@{}: Set watermark injector with interval {} ms.",
+ pipeName,
+ regionId,
+ watermarkInjector.getInjectionIntervalInMs());
+ }
+ }
+
// register metric after generating taskID
PipeExtractorMetrics.getInstance().register(this);
}
@@ -348,10 +370,18 @@
return null;
}
- Event event =
- historicalExtractor.hasConsumedAll()
- ? realtimeExtractor.supply()
- : historicalExtractor.supply();
+ Event event = null;
+ if (!historicalExtractor.hasConsumedAll()) {
+ event = historicalExtractor.supply();
+ } else {
+ if (Objects.nonNull(watermarkInjector)) {
+ event = watermarkInjector.inject();
+ }
+ if (Objects.isNull(event)) {
+ event = realtimeExtractor.supply();
+ }
+ }
+
if (Objects.nonNull(event)) {
if (event instanceof TabletInsertionEvent) {
PipeExtractorMetrics.getInstance().markTabletEvent(taskID);
@@ -361,6 +391,7 @@
PipeExtractorMetrics.getInstance().markPipeHeartbeatEvent(taskID);
}
}
+
return event;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 9356ccb..3d1528f 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -21,6 +21,7 @@
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
@@ -426,12 +427,23 @@
}
private boolean mayTsFileContainUnprocessedData(TsFileResource resource) {
- return startIndex instanceof TimeWindowStateProgressIndex
- // The resource is closed thus the TsFileResource#getFileEndTime() is safe to use
- ? ((TimeWindowStateProgressIndex) startIndex).getMinTime() <= resource.getFileEndTime()
- // Some different tsFiles may share the same max progressIndex, thus tsFiles with an
- // "equals" max progressIndex must be transmitted to avoid data loss
- : !startIndex.isAfter(resource.getMaxProgressIndexAfterClose());
+ if (startIndex instanceof TimeWindowStateProgressIndex) {
+ // The resource is closed thus the TsFileResource#getFileEndTime() is safe to use
+ return ((TimeWindowStateProgressIndex) startIndex).getMinTime() <= resource.getFileEndTime();
+ }
+
+ if (startIndex instanceof StateProgressIndex) {
+ // Some different tsFiles may share the same max progressIndex, thus tsFiles with an
+ // "equals" max progressIndex must be transmitted to avoid data loss
+ final ProgressIndex innerProgressIndex =
+ ((StateProgressIndex) startIndex).getInnerProgressIndex();
+ return !innerProgressIndex.isAfter(resource.getMaxProgressIndexAfterClose())
+ && !innerProgressIndex.equals(resource.getMaxProgressIndexAfterClose());
+ }
+
+ // Some different tsFiles may share the same max progressIndex, thus tsFiles with an
+ // "equals" max progressIndex must be transmitted to avoid data loss
+ return !startIndex.isAfter(resource.getMaxProgressIndexAfterClose());
}
private boolean isTsFileResourceOverlappedWithTimeRange(TsFileResource resource) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/Combiner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/Combiner.java
new file mode 100644
index 0000000..136e760
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/Combiner.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.combiner;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.processor.twostage.operator.Operator;
+import org.apache.iotdb.db.pipe.processor.twostage.state.State;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class Combiner {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(Combiner.class);
+
+ private static final long MAX_COMBINER_LIVE_TIME_IN_MS =
+ PipeConfig.getInstance().getTwoStageAggregateMaxCombinerLiveTimeInMs();
+ private final long creationTimeInMs;
+
+ private final Operator operator;
+
+ private final ConcurrentMap<Integer, Integer> expectedRegionId2DataNodeIdMap;
+ private final Set<Integer> receivedRegionIdSet;
+
+ private final AtomicBoolean isComplete = new AtomicBoolean(false);
+
+ public Combiner(
+ Operator operator, ConcurrentMap<Integer, Integer> expectedRegionId2DataNodeIdMap) {
+ this.creationTimeInMs = System.currentTimeMillis();
+
+ this.operator = operator;
+
+ this.expectedRegionId2DataNodeIdMap = expectedRegionId2DataNodeIdMap;
+ this.receivedRegionIdSet = new HashSet<>();
+ }
+
+ public TSStatus combine(int regionId, State state) {
+ final Set<Integer> finalExpectedRegionIdSet =
+ new HashSet<>(expectedRegionId2DataNodeIdMap.keySet());
+
+ if (finalExpectedRegionIdSet.isEmpty()) {
+ return RpcUtils.getStatus(
+ TSStatusCode.PIPE_ERROR, "Expected region id set is empty. Sender should try again.");
+ }
+
+ receivedRegionIdSet.add(regionId);
+ operator.combine(state);
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "Combiner combine: regionId: {}, state: {}, receivedRegionIdSet: {}, expectedRegionIdSet: {}",
+ regionId,
+ state,
+ receivedRegionIdSet,
+ finalExpectedRegionIdSet);
+ }
+
+ if (receivedRegionIdSet.containsAll(finalExpectedRegionIdSet)) {
+ operator.onComplete();
+ isComplete.set(true);
+
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info(
+ "Combiner combine completed: regionId: {}, state: {}, receivedRegionIdSet: {}, expectedRegionIdSet: {}",
+ regionId,
+ state,
+ receivedRegionIdSet,
+ finalExpectedRegionIdSet);
+ }
+ }
+
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
+ public boolean isOutdated() {
+ return System.currentTimeMillis() - creationTimeInMs > MAX_COMBINER_LIVE_TIME_IN_MS;
+ }
+
+ public boolean isComplete() {
+ return isComplete.get();
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandler.java
new file mode 100644
index 0000000..55ef5af
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandler.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.combiner;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.cluster.RegionRoleType;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.FetchCombineResultResponse;
+import org.apache.iotdb.db.pipe.processor.twostage.operator.Operator;
+import org.apache.iotdb.db.pipe.processor.twostage.state.State;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+
+public class PipeCombineHandler {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PipeCombineHandler.class);
+
+ private final String pipeName;
+ private final long creationTime;
+
+ private final Function<String, Operator> /* <combineId, operator> */ operatorConstructor;
+
+ private static final Map<Integer, Integer> ALL_REGION_ID_2_DATANODE_ID_MAP = new HashMap<>();
+ private static final AtomicLong ALL_REGION_ID_2_DATANODE_ID_MAP_LAST_UPDATE_TIME =
+ new AtomicLong(0);
+ private final ConcurrentMap<Integer, Integer> expectedRegionId2DataNodeIdMap =
+ new ConcurrentHashMap<>();
+
+ private final ConcurrentMap<String, Combiner> combineId2Combiner = new ConcurrentHashMap<>();
+
+ public PipeCombineHandler(
+ String pipeName, long creationTime, Function<String, Operator> operatorConstructor) {
+ this.pipeName = pipeName;
+ this.creationTime = creationTime;
+
+ this.operatorConstructor = operatorConstructor;
+
+ fetchAndUpdateExpectedRegionId2DataNodeIdMap();
+ }
+
+ public synchronized TSStatus combine(int regionId, String combineId, State state) {
+ return combineId2Combiner
+ .computeIfAbsent(
+ combineId,
+ id ->
+ new Combiner(operatorConstructor.apply(combineId), expectedRegionId2DataNodeIdMap))
+ .combine(regionId, state);
+ }
+
+ public synchronized FetchCombineResultResponse fetchCombineResult(List<String> combineIdList)
+ throws IOException {
+ final Map<String, FetchCombineResultResponse.CombineResultType> combineId2ResultType =
+ new HashMap<>();
+ for (String combineId : combineIdList) {
+ final Combiner combiner = combineId2Combiner.get(combineId);
+
+ if (combiner == null || combiner.isOutdated()) {
+ combineId2ResultType.put(combineId, FetchCombineResultResponse.CombineResultType.OUTDATED);
+ continue;
+ }
+
+ combineId2ResultType.put(
+ combineId,
+ combiner.isComplete()
+ ? FetchCombineResultResponse.CombineResultType.SUCCESS
+ : FetchCombineResultResponse.CombineResultType.INCOMPLETE);
+ }
+
+ return FetchCombineResultResponse.toTPipeTransferResp(combineId2ResultType);
+ }
+
+ public void fetchAndUpdateExpectedRegionId2DataNodeIdMap() {
+ updateExpectedRegionId2DataNodeIdMap(fetchExpectedRegionId2DataNodeIdMap());
+ }
+
+ private Map<Integer, Integer> fetchExpectedRegionId2DataNodeIdMap() {
+ synchronized (ALL_REGION_ID_2_DATANODE_ID_MAP) {
+ if (System.currentTimeMillis() - ALL_REGION_ID_2_DATANODE_ID_MAP_LAST_UPDATE_TIME.get()
+ > PipeConfig.getInstance().getTwoStageAggregateDataRegionInfoCacheTimeInMs()) {
+ ALL_REGION_ID_2_DATANODE_ID_MAP.clear();
+
+ try (final ConfigNodeClient configNodeClient =
+ ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ final TShowRegionResp showRegionResp =
+ configNodeClient.showRegion(
+ new TShowRegionReq().setConsensusGroupType(TConsensusGroupType.DataRegion));
+ if (showRegionResp == null || !showRegionResp.isSetRegionInfoList()) {
+ throw new PipeException("Failed to fetch data region ids");
+ }
+ for (final TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
+ if (!RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) {
+ continue;
+ }
+ ALL_REGION_ID_2_DATANODE_ID_MAP.put(
+ regionInfo.getConsensusGroupId().getId(), regionInfo.getDataNodeId());
+ }
+ } catch (ClientManagerException | TException e) {
+ throw new PipeException("Failed to fetch data nodes", e);
+ }
+
+ ALL_REGION_ID_2_DATANODE_ID_MAP_LAST_UPDATE_TIME.set(System.currentTimeMillis());
+
+ LOGGER.info(
+ "Fetched data region ids {} at {}",
+ ALL_REGION_ID_2_DATANODE_ID_MAP,
+ ALL_REGION_ID_2_DATANODE_ID_MAP_LAST_UPDATE_TIME.get());
+ }
+
+ final Set<Integer> pipeRelatedRegionIdSet =
+ new HashSet<>(PipeAgent.task().getPipeTaskRegionIdSet(pipeName, creationTime));
+ pipeRelatedRegionIdSet.removeIf(
+ regionId -> !ALL_REGION_ID_2_DATANODE_ID_MAP.containsKey(regionId));
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info(
+ "Two stage aggregate pipe (pipeName={}, creationTime={}) related region ids {}",
+ pipeName,
+ creationTime,
+ pipeRelatedRegionIdSet);
+ }
+ return ALL_REGION_ID_2_DATANODE_ID_MAP.entrySet().stream()
+ .filter(entry -> pipeRelatedRegionIdSet.contains(entry.getKey()))
+ .collect(
+ HashMap::new,
+ (map, entry) -> map.put(entry.getKey(), entry.getValue()),
+ HashMap::putAll);
+ }
+ }
+
+ private synchronized void updateExpectedRegionId2DataNodeIdMap(
+ Map<Integer, Integer> newExpectedRegionId2DataNodeIdMap) {
+ expectedRegionId2DataNodeIdMap.clear();
+ expectedRegionId2DataNodeIdMap.putAll(newExpectedRegionId2DataNodeIdMap);
+ }
+
+ public synchronized Set<Integer> getExpectedDataNodeIdSet() {
+ return new HashSet<>(expectedRegionId2DataNodeIdMap.values());
+ }
+
+ public synchronized void cleanOutdatedCombiner() {
+ combineId2Combiner
+ .entrySet()
+ .removeIf(
+ entry -> {
+ if (!entry.getValue().isComplete()) {
+ LOGGER.info(
+ "Clean outdated incomplete combiner: pipeName={}, creationTime={}, combineId={}",
+ pipeName,
+ creationTime,
+ entry.getKey());
+ }
+ return entry.getValue().isOutdated();
+ });
+ }
+
+ public synchronized void close() {
+ combineId2Combiner.clear();
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandlerManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandlerManager.java
new file mode 100644
index 0000000..c08cd23
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/combiner/PipeCombineHandlerManager.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.combiner;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.CombineRequest;
+import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.FetchCombineResultRequest;
+import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.FetchCombineResultResponse;
+import org.apache.iotdb.db.pipe.processor.twostage.operator.Operator;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+
+public class PipeCombineHandlerManager {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PipeCombineHandlerManager.class);
+
+ private final ConcurrentMap<String, PipeCombineHandler> pipeId2CombineHandler =
+ new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, AtomicInteger> pipeId2ReferenceCount =
+ new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, Object> pipeId2LastCombinedValue = new ConcurrentHashMap<>();
+
+ public synchronized void register(
+ String pipeName, long creationTime, Function<String, Operator> operatorConstructor) {
+ final String pipeId = generatePipeId(pipeName, creationTime);
+
+ pipeId2CombineHandler.putIfAbsent(
+ pipeId, new PipeCombineHandler(pipeName, creationTime, operatorConstructor));
+
+ pipeId2ReferenceCount.putIfAbsent(pipeId, new AtomicInteger(0));
+ pipeId2ReferenceCount.get(pipeId).incrementAndGet();
+ }
+
+ public synchronized void deregister(String pipeName, long creationTime) {
+ final String pipeId = generatePipeId(pipeName, creationTime);
+
+ if (pipeId2ReferenceCount.containsKey(pipeId)
+ && pipeId2ReferenceCount.get(pipeId).decrementAndGet() <= 0) {
+ pipeId2LastCombinedValue.remove(pipeId);
+
+ pipeId2ReferenceCount.remove(pipeId);
+
+ try {
+ pipeId2CombineHandler.remove(pipeId).close();
+ } catch (Exception e) {
+ LOGGER.warn("Error occurred when closing CombineHandler(id = {})", pipeId, e);
+ }
+ }
+ }
+
+ public Object getLastCombinedValue(String pipeName, long creationTime) {
+ return pipeId2LastCombinedValue.get(generatePipeId(pipeName, creationTime));
+ }
+
+ public void updateLastCombinedValue(
+ String pipeName, long creationTime, Object lastCombinedValue) {
+ pipeId2LastCombinedValue.put(generatePipeId(pipeName, creationTime), lastCombinedValue);
+ }
+
+ public synchronized Set<Integer> getExpectedDataNodeIdSet(String pipeName, long creationTime) {
+ final PipeCombineHandler handler =
+ pipeId2CombineHandler.get(generatePipeId(pipeName, creationTime));
+ return Objects.isNull(handler) ? Collections.emptySet() : handler.getExpectedDataNodeIdSet();
+ }
+
+ public TPipeTransferResp handle(CombineRequest combineRequest) {
+ final String pipeId =
+ generatePipeId(combineRequest.getPipeName(), combineRequest.getCreationTime());
+
+ final PipeCombineHandler handler = pipeId2CombineHandler.get(pipeId);
+ if (Objects.isNull(handler)) {
+ throw new PipeException("CombineHandler not found for pipeId = " + pipeId);
+ }
+
+ return new TPipeTransferResp()
+ .setStatus(
+ handler.combine(
+ combineRequest.getRegionId(),
+ combineRequest.getCombineId(),
+ combineRequest.getState()));
+ }
+
+ public FetchCombineResultResponse handle(FetchCombineResultRequest fetchCombineResultRequest)
+ throws IOException {
+ final String pipeId =
+ generatePipeId(
+ fetchCombineResultRequest.getPipeName(), fetchCombineResultRequest.getCreationTime());
+
+ final PipeCombineHandler handler = pipeId2CombineHandler.get(pipeId);
+ if (Objects.isNull(handler)) {
+ throw new PipeException("CombineHandler not found for pipeId = " + pipeId);
+ }
+
+ return handler.fetchCombineResult(fetchCombineResultRequest.getCombineIdList());
+ }
+
+ public void fetchExpectedRegionIdSetAndCleanOutdatedCombiner() {
+ final Map<String, PipeCombineHandler> pipeId2CombineHandlerSnapshot;
+ synchronized (this) {
+ pipeId2CombineHandlerSnapshot = new HashMap<>(pipeId2CombineHandler);
+ }
+
+ pipeId2CombineHandlerSnapshot.forEach(
+ (pipeId, handler) -> {
+ handler.fetchAndUpdateExpectedRegionId2DataNodeIdMap();
+ handler.cleanOutdatedCombiner();
+ });
+ }
+
+ private static String generatePipeId(String pipeName, long creationTime) {
+ return pipeName + "-" + creationTime;
+ }
+
+ /////////////////////////////// Singleton ///////////////////////////////
+
+ private PipeCombineHandlerManager() {
+ PipeAgent.runtime()
+ .registerPeriodicalJob(
+ "CombineHandlerManager#fetchExpectedRegionIdSetAndCleanOutdatedCombiner",
+ this::fetchExpectedRegionIdSetAndCleanOutdatedCombiner,
+ PipeConfig.getInstance().getTwoStageAggregateDataRegionInfoCacheTimeInMs() / 1000 / 2);
+ }
+
+ private static class CombineHandlerManagerHolder {
+ private static final PipeCombineHandlerManager INSTANCE = new PipeCombineHandlerManager();
+ }
+
+ public static PipeCombineHandlerManager getInstance() {
+ return CombineHandlerManagerHolder.INSTANCE;
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/CombineRequest.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/CombineRequest.java
new file mode 100644
index 0000000..cb1ba0b
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/CombineRequest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.exchange.payload;
+
+import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.db.pipe.processor.twostage.state.State;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class CombineRequest extends TPipeTransferReq {
+
+ private String pipeName;
+ private long creationTime;
+ private int regionId;
+ private String combineId;
+
+ private State state;
+
+ private CombineRequest() {
+ // Empty constructor
+ }
+
+ public String getPipeName() {
+ return pipeName;
+ }
+
+ public long getCreationTime() {
+ return creationTime;
+ }
+
+ public int getRegionId() {
+ return regionId;
+ }
+
+ public String getCombineId() {
+ return combineId;
+ }
+
+ public State getState() {
+ return state;
+ }
+
+ public static CombineRequest toTPipeTransferReq(
+ String pipeName, long creationTime, int regionId, String combineId, State state)
+ throws IOException {
+ return new CombineRequest()
+ .convertToTPipeTransferReq(pipeName, creationTime, regionId, combineId, state);
+ }
+
+ public static CombineRequest fromTPipeTransferReq(TPipeTransferReq transferReq) throws Exception {
+ return new CombineRequest().translateFromTPipeTransferReq(transferReq);
+ }
+
+ private CombineRequest convertToTPipeTransferReq(
+ String pipeName, long creationTime, int regionId, String combineId, State state)
+ throws IOException {
+ this.pipeName = pipeName;
+ this.creationTime = creationTime;
+ this.regionId = regionId;
+ this.state = state;
+ this.combineId = combineId;
+
+ this.version = IoTDBConnectorRequestVersion.VERSION_2.getVersion();
+ this.type = RequestType.COMBINE.getType();
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(pipeName, outputStream);
+ ReadWriteIOUtils.write(creationTime, outputStream);
+ ReadWriteIOUtils.write(regionId, outputStream);
+ ReadWriteIOUtils.write(combineId, outputStream);
+
+ ReadWriteIOUtils.write(state.getClass().getName(), outputStream);
+ state.serialize(outputStream);
+
+ this.body = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+ }
+
+ return this;
+ }
+
+ private CombineRequest translateFromTPipeTransferReq(TPipeTransferReq transferReq)
+ throws Exception {
+ pipeName = ReadWriteIOUtils.readString(transferReq.body);
+ creationTime = ReadWriteIOUtils.readLong(transferReq.body);
+ regionId = ReadWriteIOUtils.readInt(transferReq.body);
+ combineId = ReadWriteIOUtils.readString(transferReq.body);
+
+ final String stateClassName = ReadWriteIOUtils.readString(transferReq.body);
+ state = (State) Class.forName(stateClassName).newInstance();
+ state.deserialize(transferReq.body);
+
+ version = transferReq.version;
+ type = transferReq.type;
+ body = transferReq.body;
+
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "CombineRequest{"
+ + "pipeName='"
+ + pipeName
+ + '\''
+ + ", creationTime="
+ + creationTime
+ + ", regionId="
+ + regionId
+ + ", combineId='"
+ + combineId
+ + '\''
+ + ", state="
+ + state
+ + '}';
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/FetchCombineResultRequest.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/FetchCombineResultRequest.java
new file mode 100644
index 0000000..b20904a
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/FetchCombineResultRequest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.exchange.payload;
+
+import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class FetchCombineResultRequest extends TPipeTransferReq {
+
+ private String pipeName;
+ private long creationTime;
+ private List<String> combineIdList;
+
+ private FetchCombineResultRequest() {
+ // Empty constructor
+ }
+
+ public String getPipeName() {
+ return pipeName;
+ }
+
+ public long getCreationTime() {
+ return creationTime;
+ }
+
+ public List<String> getCombineIdList() {
+ return combineIdList;
+ }
+
+ public static FetchCombineResultRequest toTPipeTransferReq(
+ String pipeName, long creationTime, List<String> combineIdList) throws IOException {
+ return new FetchCombineResultRequest()
+ .convertToTPipeTransferReq(pipeName, creationTime, combineIdList);
+ }
+
+ public static FetchCombineResultRequest fromTPipeTransferReq(TPipeTransferReq transferReq)
+ throws Exception {
+ return new FetchCombineResultRequest().translateFromTPipeTransferReq(transferReq);
+ }
+
+ private FetchCombineResultRequest convertToTPipeTransferReq(
+ String pipeName, long creationTime, List<String> combineIdList) throws IOException {
+ this.pipeName = pipeName;
+ this.creationTime = creationTime;
+ this.combineIdList = combineIdList;
+
+ this.version = IoTDBConnectorRequestVersion.VERSION_2.getVersion();
+ this.type = RequestType.FETCH_COMBINE_RESULT.getType();
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(pipeName, outputStream);
+ ReadWriteIOUtils.write(creationTime, outputStream);
+
+ ReadWriteIOUtils.write(combineIdList.size(), outputStream);
+ for (String combineId : combineIdList) {
+ ReadWriteIOUtils.write(combineId, outputStream);
+ }
+
+ this.body = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+ }
+
+ return this;
+ }
+
+ private FetchCombineResultRequest translateFromTPipeTransferReq(TPipeTransferReq transferReq) {
+ pipeName = ReadWriteIOUtils.readString(transferReq.body);
+ creationTime = ReadWriteIOUtils.readLong(transferReq.body);
+ combineIdList = new ArrayList<>();
+ final int combineIdListSize = ReadWriteIOUtils.readInt(transferReq.body);
+ for (int i = 0; i < combineIdListSize; i++) {
+ combineIdList.add(ReadWriteIOUtils.readString(transferReq.body));
+ }
+
+ version = transferReq.version;
+ type = transferReq.type;
+ body = transferReq.body;
+
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "FetchCombineResultRequest{"
+ + "pipeName='"
+ + pipeName
+ + '\''
+ + ", creationTime="
+ + creationTime
+ + ", combineIdList="
+ + combineIdList
+ + '}';
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/FetchCombineResultResponse.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/FetchCombineResultResponse.java
new file mode 100644
index 0000000..2884eb5
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/FetchCombineResultResponse.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.exchange.payload;
+
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+public class FetchCombineResultResponse extends TPipeTransferResp {
+
+ public enum CombineResultType {
+ SUCCESS,
+ INCOMPLETE,
+ OUTDATED,
+ }
+
+ private Map<String, CombineResultType> combineId2ResultType = new HashMap<>();
+
+ private FetchCombineResultResponse() {
+ // Empty constructor
+ }
+
+ public Map<String, CombineResultType> getCombineId2ResultType() {
+ return combineId2ResultType;
+ }
+
+ public static FetchCombineResultResponse toTPipeTransferResp(
+ Map<String, CombineResultType> combineId2ResultType) throws IOException {
+ final FetchCombineResultResponse response = new FetchCombineResultResponse();
+
+ response.combineId2ResultType = combineId2ResultType;
+
+ try (PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(combineId2ResultType.size(), outputStream);
+ for (Map.Entry<String, CombineResultType> entry : combineId2ResultType.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), outputStream);
+ ReadWriteIOUtils.write(entry.getValue().ordinal(), outputStream);
+ }
+
+ response.body =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+ }
+
+ response.status = RpcUtils.SUCCESS_STATUS;
+
+ return response;
+ }
+
+ public static FetchCombineResultResponse fromTPipeTransferResp(TPipeTransferResp transferResp) {
+ final FetchCombineResultResponse response = new FetchCombineResultResponse();
+
+ response.status = transferResp.status;
+ response.body = transferResp.body;
+
+ response.combineId2ResultType = new HashMap<>();
+ if (response.isSetBody()) {
+ final int size = ReadWriteIOUtils.readInt(transferResp.body);
+ for (int i = 0; i < size; i++) {
+ final String combineId = ReadWriteIOUtils.readString(transferResp.body);
+ final CombineResultType resultType =
+ CombineResultType.values()[ReadWriteIOUtils.readInt(transferResp.body)];
+ response.combineId2ResultType.put(combineId, resultType);
+ }
+ }
+
+ return response;
+ }
+
+ @Override
+ public String toString() {
+ return "FetchCombineResultResponse{" + "combineId2ResultType=" + combineId2ResultType + '}';
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/RequestType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/RequestType.java
new file mode 100644
index 0000000..99f2b16
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/RequestType.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.exchange.payload;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public enum RequestType {
+ COMBINE((short) 0),
+ FETCH_COMBINE_RESULT((short) 1),
+ ;
+
+ private final short type;
+
+ RequestType(short type) {
+ this.type = type;
+ }
+
+ public short getType() {
+ return type;
+ }
+
+ private static final Map<Short, RequestType> TYPE_MAP =
+ Arrays.stream(RequestType.values())
+ .collect(
+ HashMap::new,
+ (typeMap, pipeRequestType) -> typeMap.put(pipeRequestType.getType(), pipeRequestType),
+ HashMap::putAll);
+
+ public static boolean isValidatedRequestType(short type) {
+ return TYPE_MAP.containsKey(type);
+ }
+
+ public static RequestType valueOf(short type) {
+ return TYPE_MAP.get(type);
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/receiver/TwoStageAggregateReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/receiver/TwoStageAggregateReceiver.java
new file mode 100644
index 0000000..1e029d7
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/receiver/TwoStageAggregateReceiver.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.exchange.receiver;
+
+import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiver;
+import org.apache.iotdb.db.pipe.processor.twostage.combiner.PipeCombineHandlerManager;
+import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.CombineRequest;
+import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.FetchCombineResultRequest;
+import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.RequestType;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TwoStageAggregateReceiver implements IoTDBReceiver {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TwoStageAggregateReceiver.class);
+
+ @Override
+ public IoTDBConnectorRequestVersion getVersion() {
+ return IoTDBConnectorRequestVersion.VERSION_2;
+ }
+
+ @Override
+ public TPipeTransferResp receive(TPipeTransferReq req) {
+ try {
+ final short rawRequestType = req.getType();
+ if (RequestType.isValidatedRequestType(rawRequestType)) {
+ switch (RequestType.valueOf(rawRequestType)) {
+ case COMBINE:
+ return PipeCombineHandlerManager.getInstance()
+ .handle(CombineRequest.fromTPipeTransferReq(req));
+ case FETCH_COMBINE_RESULT:
+ return PipeCombineHandlerManager.getInstance()
+ .handle(FetchCombineResultRequest.fromTPipeTransferReq(req));
+ default:
+ break;
+ }
+ }
+
+ LOGGER.warn("Unknown request type {}: {}.", rawRequestType, req);
+ return new TPipeTransferResp(
+ RpcUtils.getStatus(
+ TSStatusCode.PIPE_TYPE_ERROR,
+ String.format("Unknown request type %s.", rawRequestType)));
+ } catch (Exception e) {
+ LOGGER.warn("Error occurs when receiving request: {}.", req, e);
+ return new TPipeTransferResp(
+ RpcUtils.getStatus(
+ TSStatusCode.PIPE_ERROR,
+ String.format("Error occurs when receiving request: %s.", e.getMessage())));
+ }
+ }
+
+ @Override
+ public void handleExit() {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Two stage aggregate receiver is exiting.");
+ }
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
new file mode 100644
index 0000000..90c6a2f
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.exchange.sender;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
+import org.apache.iotdb.db.pipe.processor.twostage.combiner.PipeCombineHandlerManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class TwoStageAggregateSender implements AutoCloseable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TwoStageAggregateSender.class);
+
+ private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();
+
+ private final String pipeName;
+ private final long creationTime;
+
+ private static final AtomicLong DATANODE_ID_2_END_POINTS_LAST_UPDATE_TIME = new AtomicLong(0);
+ private static final AtomicReference<Map<Integer, TEndPoint>> DATANODE_ID_2_END_POINTS =
+ new AtomicReference<>();
+
+ private TEndPoint[] endPoints;
+ private final Map<TEndPoint, IoTDBSyncClient> endPointIoTDBSyncClientMap =
+ new ConcurrentHashMap<>();
+
+ public TwoStageAggregateSender(String pipeName, long creationTime) {
+ this.pipeName = pipeName;
+ this.creationTime = creationTime;
+ }
+
+ public synchronized TPipeTransferResp request(long watermark, TPipeTransferReq req)
+ throws TException {
+ final boolean endPointsChanged = tryFetchEndPointsIfNecessary();
+ tryConstructClients(endPointsChanged);
+
+ final TEndPoint endPoint = endPoints[(int) watermark % endPoints.length];
+ IoTDBSyncClient client = endPointIoTDBSyncClientMap.get(endPoint);
+ if (client == null) {
+ client = reconstructIoTDBSyncClient(endPoint);
+ }
+
+ LOGGER.info("Sending request {} (watermark = {}) to {}", req, watermark, endPoint);
+
+ try {
+ return client.pipeTransfer(req);
+ } catch (Exception e) {
+ LOGGER.warn("Failed to send request {} (watermark = {}) to {}", req, watermark, endPoint, e);
+ try {
+ reconstructIoTDBSyncClient(endPoint);
+ } catch (Exception ex) {
+ LOGGER.warn(
+ "Failed to reconstruct IoTDBSyncClient {} after failure to send request {} (watermark = {})",
+ endPoint,
+ req,
+ watermark,
+ ex);
+ }
+ throw e;
+ }
+ }
+
+ private static boolean tryFetchEndPointsIfNecessary() {
+ final long currentTime = System.currentTimeMillis();
+
+ if (DATANODE_ID_2_END_POINTS.get() != null
+ && currentTime - DATANODE_ID_2_END_POINTS_LAST_UPDATE_TIME.get()
+ < PIPE_CONFIG.getTwoStageAggregateSenderEndPointsCacheInMs()) {
+ return false;
+ }
+
+ synchronized (DATANODE_ID_2_END_POINTS) {
+ if (DATANODE_ID_2_END_POINTS.get() != null
+ && currentTime - DATANODE_ID_2_END_POINTS_LAST_UPDATE_TIME.get()
+ < PIPE_CONFIG.getTwoStageAggregateSenderEndPointsCacheInMs()) {
+ return false;
+ }
+
+ final Map<Integer, TEndPoint> dataNodeId2EndPointMap = new HashMap<>();
+ try (final ConfigNodeClient configNodeClient =
+ ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ final TShowDataNodesResp showDataNodesResp = configNodeClient.showDataNodes();
+ if (showDataNodesResp == null || showDataNodesResp.getDataNodesInfoList() == null) {
+ throw new PipeException("Failed to fetch data nodes");
+ }
+ for (final TDataNodeInfo dataNodeInfo : showDataNodesResp.getDataNodesInfoList()) {
+ dataNodeId2EndPointMap.put(
+ dataNodeInfo.getDataNodeId(),
+ new TEndPoint(dataNodeInfo.getRpcAddresss(), dataNodeInfo.getRpcPort()));
+ }
+ } catch (ClientManagerException | TException e) {
+ throw new PipeException("Failed to fetch data nodes", e);
+ }
+
+ if (dataNodeId2EndPointMap.isEmpty()) {
+ throw new PipeException("No data nodes' endpoints fetched");
+ }
+
+ DATANODE_ID_2_END_POINTS.set(dataNodeId2EndPointMap);
+ DATANODE_ID_2_END_POINTS_LAST_UPDATE_TIME.set(currentTime);
+ }
+
+ LOGGER.info("Data nodes' endpoints for two-stage aggregation: {}", DATANODE_ID_2_END_POINTS);
+ return true;
+ }
+
+ private void tryConstructClients(boolean endPointsChanged) {
+ if (Objects.nonNull(endPoints) && !endPointsChanged) {
+ return;
+ }
+
+ final Set<Integer> expectedDataNodeIdSet =
+ PipeCombineHandlerManager.getInstance().getExpectedDataNodeIdSet(pipeName, creationTime);
+ if (expectedDataNodeIdSet.isEmpty()) {
+ throw new PipeException("No expected region id set fetched");
+ }
+
+ endPoints =
+ DATANODE_ID_2_END_POINTS.get().entrySet().stream()
+ .filter(entry -> expectedDataNodeIdSet.contains(entry.getKey()))
+ .map(Map.Entry::getValue)
+ .toArray(TEndPoint[]::new);
+ LOGGER.info(
+ "End points for two-stage aggregation pipe (pipeName={}, creationTime={}) were updated to {}",
+ pipeName,
+ creationTime,
+ endPoints);
+
+ for (final TEndPoint endPoint : endPoints) {
+ if (endPointIoTDBSyncClientMap.containsKey(endPoint)) {
+ continue;
+ }
+
+ try {
+ endPointIoTDBSyncClientMap.put(endPoint, constructIoTDBSyncClient(endPoint));
+ } catch (TTransportException e) {
+ LOGGER.warn("Failed to construct IoTDBSyncClient", e);
+ }
+ }
+
+ for (final TEndPoint endPoint : new HashSet<>(endPointIoTDBSyncClientMap.keySet())) {
+ if (!DATANODE_ID_2_END_POINTS.get().containsValue(endPoint)) {
+ try {
+ endPointIoTDBSyncClientMap.remove(endPoint).close();
+ } catch (Exception e) {
+ LOGGER.warn("Failed to close IoTDBSyncClient", e);
+ }
+ }
+ }
+ }
+
+ private IoTDBSyncClient reconstructIoTDBSyncClient(TEndPoint endPoint)
+ throws TTransportException {
+ final IoTDBSyncClient oldClient = endPointIoTDBSyncClientMap.remove(endPoint);
+ if (oldClient != null) {
+ try {
+ oldClient.close();
+ } catch (Exception e) {
+ LOGGER.warn("Failed to close old IoTDBSyncClient", e);
+ }
+ }
+ final IoTDBSyncClient newClient = constructIoTDBSyncClient(endPoint);
+ endPointIoTDBSyncClientMap.put(endPoint, newClient);
+ return newClient;
+ }
+
+ private IoTDBSyncClient constructIoTDBSyncClient(TEndPoint endPoint) throws TTransportException {
+ return new IoTDBSyncClient(
+ new ThriftClientProperty.Builder()
+ .setConnectionTimeoutMs((int) PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
+ .setRpcThriftCompressionEnabled(
+ PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
+ .build(),
+ endPoint.getIp(),
+ endPoint.getPort(),
+ false,
+ null,
+ null);
+ }
+
+ @Override
+ public void close() {
+ for (final IoTDBSyncClient client : endPointIoTDBSyncClientMap.values()) {
+ try {
+ client.close();
+ } catch (Exception e) {
+ LOGGER.warn("Failed to close IoTDBSyncClient", e);
+ }
+ }
+
+ endPointIoTDBSyncClientMap.clear();
+ endPoints = null;
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/CountOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/CountOperator.java
new file mode 100644
index 0000000..a4eae9d
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/CountOperator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.operator;
+
+import org.apache.iotdb.db.pipe.processor.twostage.state.CountState;
+import org.apache.iotdb.db.pipe.processor.twostage.state.State;
+
+import org.apache.tsfile.utils.Pair;
+
+import java.util.Queue;
+
+public class CountOperator implements Operator {
+
+ private final long onCompletionTimestamp;
+ private long globalCount;
+
+ private final Queue<Pair<Long, Long>> globalCountQueue;
+
+ public CountOperator(String combineId, Queue<Pair<Long, Long>> globalCountQueue) {
+ onCompletionTimestamp = Long.parseLong(combineId);
+ globalCount = 0;
+
+ this.globalCountQueue = globalCountQueue;
+ }
+
+ @Override
+ public void combine(State state) {
+ globalCount += ((CountState) state).getCount();
+ }
+
+ @Override
+ public void onComplete() {
+ globalCountQueue.add(new Pair<>(onCompletionTimestamp, globalCount));
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/Operator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/Operator.java
new file mode 100644
index 0000000..885c565
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/operator/Operator.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.operator;
+
+import org.apache.iotdb.db.pipe.processor.twostage.state.State;
+
+public interface Operator {
+
+ void combine(State state);
+
+ void onComplete();
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java
new file mode 100644
index 0000000..cbc14d9
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.plugin;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
+import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.utils.PathUtils;
+import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.watermark.PipeWatermarkEvent;
+import org.apache.iotdb.db.pipe.processor.twostage.combiner.PipeCombineHandlerManager;
+import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.CombineRequest;
+import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.FetchCombineResultRequest;
+import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.FetchCombineResultResponse;
+import org.apache.iotdb.db.pipe.processor.twostage.exchange.sender.TwoStageAggregateSender;
+import org.apache.iotdb.db.pipe.processor.twostage.operator.CountOperator;
+import org.apache.iotdb.db.pipe.processor.twostage.state.CountState;
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.collector.EventCollector;
+import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class TwoStageCountProcessor implements PipeProcessor {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TwoStageCountProcessor.class);
+
+ private String pipeName;
+ private long creationTime;
+ private int regionId;
+ private PipeTaskMeta pipeTaskMeta;
+
+ private PartialPath outputSeries;
+
+ private static final String LOCAL_COUNT_STATE_KEY = "count";
+ private final AtomicLong localCount = new AtomicLong(0);
+ private final AtomicReference<ProgressIndex> localCommitProgressIndex =
+ new AtomicReference<>(MinimumProgressIndex.INSTANCE);
+
+ private final Queue<Pair<long[], ProgressIndex> /* ([timestamp, local count], progress index) */>
+ localRequestQueue = new ConcurrentLinkedQueue<>();
+ private final Queue<Pair<long[], ProgressIndex> /* ([timestamp, local count], progress index) */>
+ localCommitQueue = new ConcurrentLinkedQueue<>();
+
+ private TwoStageAggregateSender twoStageAggregateSender;
+ private final Queue<Pair<Long, Long> /* (timestamp, global count) */> globalCountQueue =
+ new ConcurrentLinkedQueue<>();
+
+ @Override
+ public void validate(PipeParameterValidator validator) throws Exception {
+ validator.validateRequiredAttribute(PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY);
+
+ final String rawOutputSeries =
+ validator.getParameters().getString(PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY);
+ try {
+ PathUtils.isLegalPath(rawOutputSeries);
+ } catch (IllegalPathException e) {
+ throw new IllegalArgumentException("Illegal output series path: " + rawOutputSeries);
+ }
+ }
+
+ @Override
+ public void customize(PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration)
+ throws Exception {
+ final PipeTaskProcessorRuntimeEnvironment runtimeEnvironment =
+ (PipeTaskProcessorRuntimeEnvironment) configuration.getRuntimeEnvironment();
+ pipeName = runtimeEnvironment.getPipeName();
+ creationTime = runtimeEnvironment.getCreationTime();
+ regionId = runtimeEnvironment.getRegionId();
+ pipeTaskMeta = runtimeEnvironment.getPipeTaskMeta();
+
+ outputSeries =
+ new PartialPath(parameters.getString(PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY));
+
+ if (Objects.nonNull(pipeTaskMeta) && Objects.nonNull(pipeTaskMeta.getProgressIndex())) {
+ if (pipeTaskMeta.getProgressIndex() instanceof MinimumProgressIndex) {
+ pipeTaskMeta.updateProgressIndex(
+ new StateProgressIndex(Long.MIN_VALUE, new HashMap<>(), MinimumProgressIndex.INSTANCE));
+ }
+
+ final StateProgressIndex stateProgressIndex =
+ (StateProgressIndex) pipeTaskMeta.getProgressIndex();
+ localCommitProgressIndex.set(stateProgressIndex.getInnerProgressIndex());
+ final Binary localCountState = stateProgressIndex.getState().get(LOCAL_COUNT_STATE_KEY);
+ localCount.set(
+ Objects.isNull(localCountState) ? 0 : Long.parseLong(localCountState.toString()));
+ }
+ LOGGER.info(
+ "TwoStageCountProcessor customized by thread {}: pipeName={}, creationTime={}, regionId={}, outputSeries={}, "
+ + "localCommitProgressIndex={}, localCount={}",
+ Thread.currentThread().getName(),
+ pipeName,
+ creationTime,
+ regionId,
+ outputSeries,
+ localCommitProgressIndex.get(),
+ localCount.get());
+
+ PipeCombineHandlerManager.getInstance()
+ .register(
+ pipeName, creationTime, (combineId) -> new CountOperator(combineId, globalCountQueue));
+ twoStageAggregateSender = new TwoStageAggregateSender(pipeName, creationTime);
+ }
+
+ @Override
+ public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector)
+ throws Exception {
+ if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent)
+ && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
+ LOGGER.warn(
+ "Ignored TabletInsertionEvent is not an instance of PipeInsertNodeTabletInsertionEvent or PipeRawTabletInsertionEvent: {}",
+ tabletInsertionEvent);
+ return;
+ }
+
+ final EnrichedEvent event = (EnrichedEvent) tabletInsertionEvent;
+ event.skipReportOnCommit();
+
+ final long count =
+ (event instanceof PipeInsertNodeTabletInsertionEvent)
+ ? ((PipeInsertNodeTabletInsertionEvent) event).count()
+ : ((PipeRawTabletInsertionEvent) event).count();
+ localCount.accumulateAndGet(count, Long::sum);
+
+ localCommitProgressIndex.set(
+ localCommitProgressIndex
+ .get()
+ .updateToMinimumEqualOrIsAfterProgressIndex(event.getProgressIndex()));
+ }
+
+ @Override
+ public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector)
+ throws Exception {
+ if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
+ LOGGER.warn(
+ "Ignored TsFileInsertionEvent is not an instance of PipeTsFileInsertionEvent: {}",
+ tsFileInsertionEvent);
+ return;
+ }
+
+ final PipeTsFileInsertionEvent event = (PipeTsFileInsertionEvent) tsFileInsertionEvent;
+ event.skipReportOnCommit();
+
+ if (!event.waitForTsFileClose()) {
+ LOGGER.warn("Ignored TsFileInsertionEvent is empty: {}", event);
+ return;
+ }
+
+ final long count = event.count(true);
+ localCount.accumulateAndGet(count, Long::sum);
+
+ localCommitProgressIndex.set(
+ localCommitProgressIndex
+ .get()
+ .updateToMinimumEqualOrIsAfterProgressIndex(event.getProgressIndex()));
+ }
+
+ @Override
+ public void process(Event event, EventCollector eventCollector) throws Exception {
+ if (event instanceof PipeHeartbeatEvent) {
+ collectGlobalCountIfNecessary(eventCollector);
+ commitLocalProgressIndexIfNecessary();
+ triggerCombineIfNecessary();
+ }
+
+ if (event instanceof PipeWatermarkEvent) {
+ triggerCombine(
+ new Pair<>(
+ new long[] {((PipeWatermarkEvent) event).getWatermark(), localCount.get()},
+ localCommitProgressIndex.get()));
+ }
+ }
+
+ private void collectGlobalCountIfNecessary(EventCollector eventCollector) throws IOException {
+ while (!globalCountQueue.isEmpty()) {
+ final Object lastCombinedValue =
+ PipeCombineHandlerManager.getInstance().getLastCombinedValue(pipeName, creationTime);
+ final Pair<Long, Long> lastCollectedTimestampCountPair =
+ Objects.isNull(lastCombinedValue)
+ ? new Pair<>(Long.MIN_VALUE, 0L)
+ : (Pair<Long, Long>) lastCombinedValue;
+
+ final Pair<Long, Long> timestampCountPair = globalCountQueue.poll();
+ if (timestampCountPair.right < lastCollectedTimestampCountPair.right) {
+ timestampCountPair.right = lastCollectedTimestampCountPair.right;
+ LOGGER.warn(
+ "Global count is less than the last collected count: timestamp={}, count={}",
+ timestampCountPair.left,
+ timestampCountPair.right);
+ }
+
+ final Tablet tablet =
+ new Tablet(
+ outputSeries.getDevice(),
+ Collections.singletonList(
+ new MeasurementSchema(outputSeries.getMeasurement(), TSDataType.INT64)),
+ 1);
+ tablet.rowSize = 1;
+ tablet.addTimestamp(0, timestampCountPair.left);
+ tablet.addValue(outputSeries.getMeasurement(), 0, timestampCountPair.right);
+
+ eventCollector.collect(
+ new PipeRawTabletInsertionEvent(tablet, false, null, null, null, false));
+
+ PipeCombineHandlerManager.getInstance()
+ .updateLastCombinedValue(pipeName, creationTime, timestampCountPair);
+ }
+ }
+
+ private void commitLocalProgressIndexIfNecessary() {
+ final int currentQueueSize = localCommitQueue.size();
+ for (int i = 0; i < currentQueueSize; i++) {
+ final Pair<long[], ProgressIndex> pair = localCommitQueue.poll();
+ if (Objects.isNull(pair)) {
+ break;
+ }
+
+ try {
+ // TODO: optimize the combine result fetching with batch fetching
+ final FetchCombineResultResponse fetchCombineResultResponse =
+ FetchCombineResultResponse.fromTPipeTransferResp(
+ twoStageAggregateSender.request(
+ pair.left[0],
+ FetchCombineResultRequest.toTPipeTransferReq(
+ pipeName,
+ creationTime,
+ Collections.singletonList(Long.toString(pair.left[0])))));
+
+ if (fetchCombineResultResponse.getStatus().getCode()
+ != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(
+ "Failed to fetch combine result: "
+ + fetchCombineResultResponse.getStatus().getMessage());
+ }
+
+ for (final Map.Entry<String, FetchCombineResultResponse.CombineResultType> entry :
+ fetchCombineResultResponse.getCombineId2ResultType().entrySet()) {
+ final String combineId = entry.getKey();
+ final FetchCombineResultResponse.CombineResultType resultType = entry.getValue();
+
+ switch (resultType) {
+ case OUTDATED:
+ LOGGER.warn(
+ "Two stage combine (region id = {}, combine id = {}) outdated: timestamp={}, count={}, progressIndex={}",
+ regionId,
+ combineId,
+ pair.left[0],
+ pair.left[1],
+ pair.right);
+ continue;
+ case INCOMPLETE:
+ LOGGER.info(
+ "Two stage combine (region id = {}, combine id = {}) incomplete: timestamp={}, count={}, progressIndex={}",
+ regionId,
+ combineId,
+ pair.left[0],
+ pair.left[1],
+ pair.right);
+ localCommitQueue.add(pair);
+ continue;
+ case SUCCESS:
+ final Map<String, Binary> state = new HashMap<>();
+ state.put(LOCAL_COUNT_STATE_KEY, new Binary(Long.toString(pair.left[1]).getBytes()));
+ pipeTaskMeta.updateProgressIndex(
+ new StateProgressIndex(pair.left[0], state, pair.right));
+ LOGGER.info(
+ "Two stage combine (region id = {}, combine id = {}) success: timestamp={}, count={}, progressIndex={}, committed progressIndex={}",
+ regionId,
+ combineId,
+ pair.left[0],
+ pair.left[1],
+ pair.right,
+ pipeTaskMeta.getProgressIndex());
+ continue;
+ default:
+ throw new PipeException("Unknown combine result type: " + resultType);
+ }
+ }
+ } catch (Exception e) {
+ localCommitQueue.add(pair);
+ LOGGER.warn(
+ "Failure occurred when trying to commit progress index. timestamp={}, count={}, progressIndex={}",
+ pair.left[0],
+ pair.left[1],
+ pair.right,
+ e);
+ return;
+ }
+ }
+ }
+
+ private void triggerCombineIfNecessary() {
+ while (!localRequestQueue.isEmpty()) {
+ if (!triggerCombine(localRequestQueue.poll())) {
+ return;
+ }
+ }
+ }
+
+ private boolean triggerCombine(Pair<long[], ProgressIndex> pair) {
+ final long watermark = pair.getLeft()[0];
+ final long count = pair.getLeft()[1];
+ final ProgressIndex progressIndex = pair.getRight();
+ try {
+ final TPipeTransferResp resp =
+ twoStageAggregateSender.request(
+ watermark,
+ CombineRequest.toTPipeTransferReq(
+ pipeName,
+ creationTime,
+ regionId,
+ Long.toString(watermark),
+ new CountState(count)));
+ if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException("Failed to combine count: " + resp.getStatus().getMessage());
+ }
+ localCommitQueue.add(pair);
+ return true;
+ } catch (Exception e) {
+ localRequestQueue.add(pair);
+ LOGGER.warn(
+ "Failed to trigger combine. watermark={}, count={}, progressIndex={}",
+ watermark,
+ count,
+ progressIndex,
+ e);
+ return false;
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (Objects.nonNull(twoStageAggregateSender)) {
+ twoStageAggregateSender.close();
+ }
+ PipeCombineHandlerManager.getInstance().deregister(pipeName, creationTime);
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/CountState.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/CountState.java
new file mode 100644
index 0000000..0370131
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/CountState.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.state;
+
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public class CountState implements State {
+
+ private long count;
+
+ public CountState() {
+ // For reflection
+ }
+
+ public CountState(long count) {
+ this.count = count;
+ }
+
+ public long getCount() {
+ return count;
+ }
+
+ @Override
+ public void serialize(OutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write(count, outputStream);
+ }
+
+ @Override
+ public void deserialize(ByteBuffer byteBuffer) {
+ count = ReadWriteIOUtils.readLong(byteBuffer);
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/State.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/State.java
new file mode 100644
index 0000000..36d393e
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/state/State.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.state;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public interface State {
+
+ void serialize(OutputStream outputStream) throws IOException;
+
+ void deserialize(ByteBuffer byteBuffer);
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index ea8a1ef..d8b4c12 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -201,8 +201,9 @@
receiverId.get(),
status);
return new TPipeTransferResp(status);
- } catch (final IOException e) {
- final String error = String.format("Serialization error during pipe receiving, %s", e);
+ } catch (Exception e) {
+ final String error =
+ String.format("Exception %s encountered while handling request %s.", e.getMessage(), req);
LOGGER.warn("Receiver id = {}: {}", receiverId.get(), error, e);
return new TPipeTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_ERROR, error));
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java
index 932123a..e0016ec 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java
@@ -22,6 +22,7 @@
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiver;
import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiverAgent;
+import org.apache.iotdb.db.pipe.processor.twostage.exchange.receiver.TwoStageAggregateReceiver;
public class IoTDBDataNodeReceiverAgent extends IoTDBReceiverAgent {
@@ -31,6 +32,8 @@
protected void initConstructors() {
RECEIVER_CONSTRUCTORS.put(
IoTDBConnectorRequestVersion.VERSION_1.getVersion(), IoTDBDataNodeReceiver::new);
+ RECEIVER_CONSTRUCTORS.put(
+ IoTDBConnectorRequestVersion.VERSION_2.getVersion(), TwoStageAggregateReceiver::new);
}
@Override
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 5057ea0..4e4e76e 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -229,6 +229,10 @@
private long pipeListeningQueueTransferSnapshotThreshold = 1000;
private int pipeSnapshotExecutionMaxBatchSize = 1000;
+ private long twoStageAggregateMaxCombinerLiveTimeInMs = 8 * 60 * 1000; // 8 minutes
+ private long twoStageAggregateDataRegionInfoCacheTimeInMs = 3 * 60 * 1000; // 3 minutes
+ private long twoStageAggregateSenderEndPointsCacheInMs = 3 * 60 * 1000; // 3 minutes
+
private int subscriptionSubtaskExecutorMaxThreadNum =
Math.min(5, Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
private int subscriptionMaxTabletsPerPrefetching = 16;
@@ -970,6 +974,34 @@
this.pipeSnapshotExecutionMaxBatchSize = pipeSnapshotExecutionMaxBatchSize;
}
+ public long getTwoStageAggregateMaxCombinerLiveTimeInMs() {
+ return twoStageAggregateMaxCombinerLiveTimeInMs;
+ }
+
+ public void setTwoStageAggregateMaxCombinerLiveTimeInMs(
+ long twoStageAggregateMaxCombinerLiveTimeInMs) {
+ this.twoStageAggregateMaxCombinerLiveTimeInMs = twoStageAggregateMaxCombinerLiveTimeInMs;
+ }
+
+ public long getTwoStageAggregateDataRegionInfoCacheTimeInMs() {
+ return twoStageAggregateDataRegionInfoCacheTimeInMs;
+ }
+
+ public void setTwoStageAggregateDataRegionInfoCacheTimeInMs(
+ long twoStageAggregateDataRegionInfoCacheTimeInMs) {
+ this.twoStageAggregateDataRegionInfoCacheTimeInMs =
+ twoStageAggregateDataRegionInfoCacheTimeInMs;
+ }
+
+ public long getTwoStageAggregateSenderEndPointsCacheInMs() {
+ return twoStageAggregateSenderEndPointsCacheInMs;
+ }
+
+ public void setTwoStageAggregateSenderEndPointsCacheInMs(
+ long twoStageAggregateSenderEndPointsCacheInMs) {
+ this.twoStageAggregateSenderEndPointsCacheInMs = twoStageAggregateSenderEndPointsCacheInMs;
+ }
+
public int getSubscriptionSubtaskExecutorMaxThreadNum() {
return subscriptionSubtaskExecutorMaxThreadNum;
}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index f64a79e..04bd5f1 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -516,11 +516,28 @@
properties.getProperty(
"pipe_listening_queue_transfer_snapshot_threshold",
String.valueOf(config.getPipeListeningQueueTransferSnapshotThreshold()))));
+
config.setPipeSnapshotExecutionMaxBatchSize(
Integer.parseInt(
properties.getProperty(
"pipe_snapshot_execution_max_batch_size",
String.valueOf(config.getPipeSnapshotExecutionMaxBatchSize()))));
+
+ config.setTwoStageAggregateMaxCombinerLiveTimeInMs(
+ Long.parseLong(
+ properties.getProperty(
+ "two_stage_aggregate_max_combiner_live_time_in_ms",
+ String.valueOf(config.getTwoStageAggregateMaxCombinerLiveTimeInMs()))));
+ config.setTwoStageAggregateDataRegionInfoCacheTimeInMs(
+ Long.parseLong(
+ properties.getProperty(
+ "two_stage_aggregate_data_region_info_cache_time_in_ms",
+ String.valueOf(config.getTwoStageAggregateDataRegionInfoCacheTimeInMs()))));
+ config.setTwoStageAggregateSenderEndPointsCacheInMs(
+ Long.parseLong(
+ properties.getProperty(
+ "two_stage_aggregate_sender_end_points_cache_in_ms",
+ String.valueOf(config.getTwoStageAggregateSenderEndPointsCacheInMs()))));
}
private void loadSubscriptionProps(Properties properties) {
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
index 494d42a..ad82065 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
@@ -21,6 +21,7 @@
import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
import com.google.common.collect.ImmutableList;
@@ -169,8 +170,10 @@
return progressIndex1; // progressIndex1 is not null
}
- return new HybridProgressIndex(progressIndex1)
- .updateToMinimumEqualOrIsAfterProgressIndex(progressIndex2);
+ return progressIndex1 instanceof StateProgressIndex
+ ? progressIndex1.updateToMinimumEqualOrIsAfterProgressIndex(progressIndex2)
+ : new HybridProgressIndex(progressIndex1)
+ .updateToMinimumEqualOrIsAfterProgressIndex(progressIndex2);
}
/**
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java
index c5c2ed6..58548e1 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java
@@ -25,6 +25,7 @@
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex;
import org.apache.tsfile.utils.ReadWriteIOUtils;
@@ -42,7 +43,7 @@
HYBRID_PROGRESS_INDEX((short) 5),
META_PROGRESS_INDEX((short) 6),
TIME_WINDOW_STATE_PROGRESS_INDEX((short) 7),
- ;
+ STATE_PROGRESS_INDEX((short) 8);
private final short type;
@@ -79,6 +80,8 @@
return MetaProgressIndex.deserializeFrom(byteBuffer);
case 7:
return TimeWindowStateProgressIndex.deserializeFrom(byteBuffer);
+ case 8:
+ return StateProgressIndex.deserializeFrom(byteBuffer);
default:
throw new UnsupportedOperationException(
String.format("Unsupported progress index type %s.", indexType));
@@ -102,6 +105,8 @@
return MetaProgressIndex.deserializeFrom(stream);
case 7:
return TimeWindowStateProgressIndex.deserializeFrom(stream);
+ case 8:
+ return StateProgressIndex.deserializeFrom(stream);
default:
throw new UnsupportedOperationException(
String.format("Unsupported progress index type %s.", indexType));
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
index c3955ce..8e4541e 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
@@ -173,6 +173,10 @@
return this;
}
+ if (progressIndex instanceof StateProgressIndex) {
+ return progressIndex.updateToMinimumEqualOrIsAfterProgressIndex(this);
+ }
+
if (!(progressIndex instanceof HybridProgressIndex)) {
type2Index.compute(
progressIndex.getType().getType(),
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MetaProgressIndex.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MetaProgressIndex.java
index 8fb6c26..679eecb 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MetaProgressIndex.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MetaProgressIndex.java
@@ -138,7 +138,7 @@
lock.writeLock().lock();
try {
if (!(progressIndex instanceof MetaProgressIndex)) {
- return this;
+ return ProgressIndex.blendProgressIndex(this, progressIndex);
}
this.index = Math.max(this.index, ((MetaProgressIndex) progressIndex).index);
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/StateProgressIndex.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/StateProgressIndex.java
new file mode 100644
index 0000000..8b44edf
--- /dev/null
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/StateProgressIndex.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.consensus.index.impl;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
+
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class StateProgressIndex extends ProgressIndex {
+
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private long version;
+ private Map<String, Binary> state;
+ private ProgressIndex innerProgressIndex;
+
+ public StateProgressIndex(
+ long version, Map<String, Binary> state, ProgressIndex innerProgressIndex) {
+ this.version = version;
+ this.state = state;
+ this.innerProgressIndex = innerProgressIndex;
+ }
+
+ public long getVersion() {
+ return version;
+ }
+
+ public ProgressIndex getInnerProgressIndex() {
+ return innerProgressIndex == null ? MinimumProgressIndex.INSTANCE : innerProgressIndex;
+ }
+
+ public Map<String, Binary> getState() {
+ return state;
+ }
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ lock.readLock().lock();
+ try {
+ ProgressIndexType.STATE_PROGRESS_INDEX.serialize(byteBuffer);
+
+ ReadWriteIOUtils.write(version, byteBuffer);
+
+ ReadWriteIOUtils.write(state.size(), byteBuffer);
+ for (final Map.Entry<String, Binary> entry : state.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), byteBuffer);
+ ReadWriteIOUtils.write(entry.getValue(), byteBuffer);
+ }
+
+ innerProgressIndex.serialize(byteBuffer);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public void serialize(OutputStream stream) throws IOException {
+ lock.readLock().lock();
+ try {
+ ProgressIndexType.STATE_PROGRESS_INDEX.serialize(stream);
+
+ ReadWriteIOUtils.write(version, stream);
+
+ ReadWriteIOUtils.write(state.size(), stream);
+ for (final Map.Entry<String, Binary> entry : state.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), stream);
+ ReadWriteIOUtils.write(entry.getValue(), stream);
+ }
+
+ innerProgressIndex.serialize(stream);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public boolean isAfter(@Nonnull ProgressIndex progressIndex) {
+ lock.readLock().lock();
+ try {
+ if (progressIndex instanceof MinimumProgressIndex) {
+ return innerProgressIndex.isAfter(progressIndex);
+ }
+
+ if (progressIndex instanceof HybridProgressIndex) {
+ return ((HybridProgressIndex) progressIndex)
+ .isGivenProgressIndexAfterSelf(innerProgressIndex);
+ }
+
+ if (!(progressIndex instanceof StateProgressIndex)) {
+ return false;
+ }
+
+ return innerProgressIndex.isAfter(((StateProgressIndex) progressIndex).innerProgressIndex)
+ && version > ((StateProgressIndex) progressIndex).version;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public boolean equals(ProgressIndex progressIndex) {
+ lock.readLock().lock();
+ try {
+ return progressIndex instanceof StateProgressIndex
+ && innerProgressIndex.equals(((StateProgressIndex) progressIndex).innerProgressIndex)
+ && version == ((StateProgressIndex) progressIndex).version;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof StateProgressIndex)) {
+ return false;
+ }
+ return this.equals((StateProgressIndex) obj);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(innerProgressIndex, version);
+ }
+
+ @Override
+ public ProgressIndex updateToMinimumEqualOrIsAfterProgressIndex(ProgressIndex progressIndex) {
+ lock.writeLock().lock();
+ try {
+ innerProgressIndex =
+ innerProgressIndex.updateToMinimumEqualOrIsAfterProgressIndex(
+ progressIndex instanceof StateProgressIndex
+ ? ((StateProgressIndex) progressIndex).innerProgressIndex
+ : progressIndex);
+ if (progressIndex instanceof StateProgressIndex
+ && version <= ((StateProgressIndex) progressIndex).version) {
+ version = ((StateProgressIndex) progressIndex).version;
+ state = ((StateProgressIndex) progressIndex).state;
+ }
+ return this;
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public ProgressIndexType getType() {
+ return ProgressIndexType.STATE_PROGRESS_INDEX;
+ }
+
+ @Override
+ public TotalOrderSumTuple getTotalOrderSumTuple() {
+ return innerProgressIndex.getTotalOrderSumTuple();
+ }
+
+ public static StateProgressIndex deserializeFrom(ByteBuffer byteBuffer) {
+ final long version = ReadWriteIOUtils.readLong(byteBuffer);
+
+ final Map<String, Binary> state = new HashMap<>();
+ final int size = ReadWriteIOUtils.readInt(byteBuffer);
+ for (int i = 0; i < size; ++i) {
+ final String key = ReadWriteIOUtils.readString(byteBuffer);
+ final Binary value = ReadWriteIOUtils.readBinary(byteBuffer);
+ state.put(key, value);
+ }
+
+ final ProgressIndex progressIndex = ProgressIndexType.deserializeFrom(byteBuffer);
+
+ return new StateProgressIndex(version, state, progressIndex);
+ }
+
+ public static StateProgressIndex deserializeFrom(InputStream stream) throws IOException {
+ final long version = ReadWriteIOUtils.readLong(stream);
+
+ final Map<String, Binary> state = new HashMap<>();
+ final int size = ReadWriteIOUtils.readInt(stream);
+ for (int i = 0; i < size; ++i) {
+ final String key = ReadWriteIOUtils.readString(stream);
+ final Binary value = ReadWriteIOUtils.readBinary(stream);
+ state.put(key, value);
+ }
+
+ final ProgressIndex progressIndex = ProgressIndexType.deserializeFrom(stream);
+
+ return new StateProgressIndex(version, state, progressIndex);
+ }
+
+ @Override
+ public String toString() {
+ return "StateProgressIndex{"
+ + "version="
+ + version
+ + ", state="
+ + state
+ + ", innerProgressIndex="
+ + innerProgressIndex
+ + '}';
+ }
+}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 6fab25c..7a0391d 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -245,6 +245,20 @@
return COMMON_CONFIG.getPipeMemoryExpanderIntervalSeconds();
}
+ /////////////////////////////// TwoStage ///////////////////////////////
+
+ public long getTwoStageAggregateMaxCombinerLiveTimeInMs() {
+ return COMMON_CONFIG.getTwoStageAggregateMaxCombinerLiveTimeInMs();
+ }
+
+ public long getTwoStageAggregateDataRegionInfoCacheTimeInMs() {
+ return COMMON_CONFIG.getTwoStageAggregateDataRegionInfoCacheTimeInMs();
+ }
+
+ public long getTwoStageAggregateSenderEndPointsCacheInMs() {
+ return COMMON_CONFIG.getTwoStageAggregateSenderEndPointsCacheInMs();
+ }
+
/////////////////////////////// Utils ///////////////////////////////
private static final Logger LOGGER = LoggerFactory.getLogger(PipeConfig.class);
@@ -337,6 +351,17 @@
LOGGER.info(
"PipeMemoryAllocateForTsFileSequenceReaderInBytes: {}",
getPipeMemoryAllocateForTsFileSequenceReaderInBytes());
+ LOGGER.info("PipeMemoryExpanderIntervalSeconds: {}", getPipeMemoryExpanderIntervalSeconds());
+
+ LOGGER.info(
+ "TwoStageAggregateMaxCombinerLiveTimeInMs: {}",
+ getTwoStageAggregateMaxCombinerLiveTimeInMs());
+ LOGGER.info(
+ "TwoStageAggregateDataRegionInfoCacheTimeInMs: {}",
+ getTwoStageAggregateDataRegionInfoCacheTimeInMs());
+ LOGGER.info(
+ "TwoStageAggregateSenderEndPointsCacheInMs: {}",
+ getTwoStageAggregateSenderEndPointsCacheInMs());
}
/////////////////////////////// Singleton ///////////////////////////////
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
index 80c102a..05ed098 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
@@ -79,6 +79,10 @@
public static final String EXTRACTOR_END_TIME_KEY = "extractor.end-time";
public static final String SOURCE_END_TIME_KEY = "source.end-time";
+ public static final String EXTRACTOR_WATERMARK_INTERVAL_KEY = "extractor.watermark-interval-ms";
+ public static final String SOURCE_WATERMARK_INTERVAL_KEY = "source.watermark-interval-ms";
+ public static final long EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE = -1; // -1 means no watermark
+
private PipeExtractorConstant() {
throw new IllegalStateException("Utility class");
}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
index 719fa3c..49fa2e2 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
@@ -66,6 +66,8 @@
"processor.sdt.max-time-interval";
public static final long PROCESSOR_SDT_MAX_TIME_INTERVAL_DEFAULT_VALUE = Long.MAX_VALUE;
+ public static final String PROCESSOR_OUTPUT_SERIES_KEY = "processor.output-series";
+
private PipeProcessorConstant() {
throw new IllegalStateException("Utility class");
}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
index 9f853dd..74219ef 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/IoTDBConnectorRequestVersion.java
@@ -21,6 +21,7 @@
public enum IoTDBConnectorRequestVersion {
VERSION_1((byte) 1),
+ VERSION_2((byte) 2),
;
private final byte version;
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index a7fc2da..511282f 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -38,6 +38,7 @@
import org.apache.iotdb.commons.pipe.plugin.builtin.processor.downsampling.SwingingDoorTrendingSamplingProcessor;
import org.apache.iotdb.commons.pipe.plugin.builtin.processor.downsampling.TumblingTimeSamplingProcessor;
import org.apache.iotdb.commons.pipe.plugin.builtin.processor.throwing.ThrowingExceptionProcessor;
+import org.apache.iotdb.commons.pipe.plugin.builtin.processor.twostage.TwoStageCountProcessor;
import java.util.Arrays;
import java.util.Collections;
@@ -60,6 +61,7 @@
SDT_SAMPLING_PROCESSOR("sdt-sampling-processor", SwingingDoorTrendingSamplingProcessor.class),
THROWING_EXCEPTION_PROCESSOR("throwing-exception-processor", ThrowingExceptionProcessor.class),
AGGREGATE_PROCESSOR("aggregate-processor", AggregateProcessor.class),
+ COUNT_POINT_PROCESSOR("count-point-processor", TwoStageCountProcessor.class),
// Hidden-processors, which are plugins of the processors
STANDARD_STATISTICS_PROCESSOR("standard-statistics-processor", StandardStatisticsProcessor.class),
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/twostage/TwoStageCountProcessor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/twostage/TwoStageCountProcessor.java
new file mode 100644
index 0000000..b8bf577
--- /dev/null
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/twostage/TwoStageCountProcessor.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.plugin.builtin.processor.twostage;
+
+import org.apache.iotdb.commons.pipe.plugin.builtin.processor.PlaceHolderProcessor;
+
+/**
+ * This class is a placeholder and should not be initialized. It represents the
+ * TwoStageCountProcessor. There is a real implementation in the server module but cannot be
+ * imported here. The pipe agent in the server module will replace this class with the real
+ * implementation when initializing the TwoStageCountProcessor.
+ */
+public class TwoStageCountProcessor extends PlaceHolderProcessor {}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 91b8cbe..c6c7f12 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -99,7 +99,7 @@
"Receiver id = {}: Original receiver file dir {} was deleted.",
receiverId.get(),
receiverFileDirWithIdSuffix.get().getPath());
- } catch (IOException e) {
+ } catch (Exception e) {
LOGGER.warn(
"Receiver id = {}: Failed to delete original receiver file dir {}, because {}.",
receiverId.get(),
@@ -278,7 +278,7 @@
try {
return PipeTransferFilePieceResp.toTPipeTransferResp(
status, PipeTransferFilePieceResp.ERROR_END_OFFSET);
- } catch (IOException ex) {
+ } catch (Exception ex) {
return PipeTransferFilePieceResp.toTPipeTransferResp(status);
}
}
@@ -340,7 +340,7 @@
"Receiver id = {}: Current writing file writer {} was closed.",
receiverId.get(),
writingFile == null ? "null" : writingFile.getPath());
- } catch (IOException e) {
+ } catch (Exception e) {
LOGGER.warn(
"Receiver id = {}: Failed to close current writing file writer {}, because {}.",
receiverId.get(),
@@ -377,7 +377,7 @@
"Receiver id = {}: Original writing file {} was deleted.",
receiverId.get(),
file.getPath());
- } catch (IOException e) {
+ } catch (Exception e) {
LOGGER.warn(
"Receiver id = {}: Failed to delete original writing file {}, because {}.",
receiverId.get(),
@@ -454,7 +454,7 @@
status.getMessage());
}
return new TPipeTransferResp(status);
- } catch (IOException e) {
+ } catch (Exception e) {
LOGGER.warn(
"Receiver id = {}: Failed to seal file {} from req {}.",
receiverId.get(),
@@ -534,7 +534,7 @@
status);
}
return new TPipeTransferResp(status);
- } catch (IOException | IllegalPathException e) {
+ } catch (Exception e) {
LOGGER.warn(
"Receiver id = {}: Failed to seal file {} from req {}.", receiverId.get(), files, req, e);
return new TPipeTransferResp(
@@ -701,7 +701,7 @@
"Receiver id = {}: Handling exit: Original receiver file dir {} was deleted.",
receiverId.get(),
receiverFileDirWithIdSuffix.get().getPath());
- } catch (IOException e) {
+ } catch (Exception e) {
LOGGER.warn(
"Receiver id = {}: Handling exit: Delete original receiver file dir {} error.",
receiverId.get(),