fix
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 06d6a51..cce25e7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -170,13 +170,11 @@ final PipeParameters sourceParameters = pipeStaticMeta.getSourceParameters(); final DataRegionId dataRegionId = new DataRegionId(consensusGroupId); final boolean needConstructDataRegionTask = - StorageEngine.getInstance().getAllDataRegionIds().contains(dataRegionId) + StorageEngine.getInstance().getDataRegion(dataRegionId) != null && DataRegionListeningFilter.shouldDataRegionBeListened( sourceParameters, dataRegionId); final boolean needConstructSchemaRegionTask = - SchemaEngine.getInstance() - .getAllSchemaRegionIds() - .contains(new SchemaRegionId(consensusGroupId)) + SchemaEngine.getInstance().getSchemaRegion(new SchemaRegionId(consensusGroupId)) != null && SchemaRegionListeningFilter.shouldSchemaRegionBeListened( consensusGroupId, sourceParameters);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java index 46a1013..f4655c7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java
@@ -36,7 +36,6 @@ import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import java.util.HashMap; -import java.util.List; import java.util.Map; public class PipeDataNodeBuilder { @@ -53,9 +52,6 @@ final PipeStaticMeta pipeStaticMeta = pipeMeta.getStaticMeta(); final PipeRuntimeMeta pipeRuntimeMeta = pipeMeta.getRuntimeMeta(); - final List<DataRegionId> dataRegionIds = StorageEngine.getInstance().getAllDataRegionIds(); - final List<SchemaRegionId> schemaRegionIds = SchemaEngine.getInstance().getAllSchemaRegionIds(); - final Map<Integer, PipeTask> consensusGroupIdToPipeTaskMap = new HashMap<>(); for (Map.Entry<Integer, PipeTaskMeta> consensusGroupIdToPipeTaskMeta : pipeRuntimeMeta.getConsensusGroupId2TaskMetaMap().entrySet()) { @@ -66,11 +62,11 @@ final PipeParameters sourceParameters = pipeStaticMeta.getSourceParameters(); final DataRegionId dataRegionId = new DataRegionId(consensusGroupId); final boolean needConstructDataRegionTask = - dataRegionIds.contains(dataRegionId) + StorageEngine.getInstance().getDataRegion(dataRegionId) != null && DataRegionListeningFilter.shouldDataRegionBeListened( sourceParameters, dataRegionId); final boolean needConstructSchemaRegionTask = - schemaRegionIds.contains(new SchemaRegionId(consensusGroupId)) + SchemaEngine.getInstance().getSchemaRegion(new SchemaRegionId(consensusGroupId)) != null && SchemaRegionListeningFilter.shouldSchemaRegionBeListened( consensusGroupId, sourceParameters);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java index 2373495..6eff04d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
@@ -77,7 +77,7 @@ new PipeTaskProcessorRuntimeEnvironment( pipeName, creationTime, regionId, pipeTaskMeta)); final PipeProcessor pipeProcessor = - StorageEngine.getInstance().getAllDataRegionIds().contains(new DataRegionId(regionId)) + StorageEngine.getInstance().getDataRegion(new DataRegionId(regionId)) != null || PipeRuntimeMeta.isSourceExternal(regionId) ? PipeDataNodeAgent.plugin() .dataRegion()
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java index 5f774ce..84f1b35 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java
@@ -49,7 +49,7 @@ int regionId, PipeTaskMeta pipeTaskMeta) { pipeExtractor = - StorageEngine.getInstance().getAllDataRegionIds().contains(new DataRegionId(regionId)) + StorageEngine.getInstance().getDataRegion(new DataRegionId(regionId)) != null || PipeRuntimeMeta.isSourceExternal(regionId) ? PipeDataNodeAgent.plugin().dataRegion().reflectSource(sourceParameters) : PipeDataNodeAgent.plugin().schemaRegion().reflectSource(sourceParameters);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java index a748218..7179f8c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.db.protocol.session.InternalClientSession; import org.apache.iotdb.db.protocol.session.SessionManager; import org.apache.iotdb.db.storageengine.StorageEngine; +import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.pipe.api.annotation.TableModel; import org.apache.iotdb.pipe.api.annotation.TreeModel; import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration; @@ -605,19 +606,12 @@ if (StorageEngine.getInstance() .runIfPresent( dataRegionIdObject, - (dataRegion -> { - dataRegion.writeLock( - String.format("Pipe: starting %s", IoTDBDataRegionSource.class.getName())); - try { - startHistoricalExtractorAndRealtimeExtractor(exceptionHolder); - } finally { - dataRegion.writeUnlock(); - } - })) + dataRegion -> + startHistoricalExtractorAndRealtimeExtractor(exceptionHolder, dataRegion)) || StorageEngine.getInstance() .runIfAbsent( dataRegionIdObject, - () -> startHistoricalExtractorAndRealtimeExtractor(exceptionHolder))) { + () -> startHistoricalExtractorAndRealtimeExtractor(exceptionHolder, null))) { rethrowExceptionIfAny(exceptionHolder); LOGGER.info( @@ -634,14 +628,28 @@ } private void startHistoricalExtractorAndRealtimeExtractor( - final AtomicReference<Exception> exceptionHolder) { + final AtomicReference<Exception> exceptionHolder, final DataRegion dataRegion) { try { // Start realtimeSource first to avoid losing data. This may cause some // retransmission, yet it is OK according to the idempotency of IoTDB. // Note: The order of historical collection is flushing data -> adding all tsFile events. // There can still be writing when tsFile events are added. If we start // realtimeSource after the process, then this part of data will be lost. - realtimeSource.start(); + if (Objects.nonNull(dataRegion)) { + dataRegion.writeLock( + String.format("Pipe: starting %s", IoTDBDataRegionSource.class.getName())); + try { + realtimeSource.start(); + } finally { + dataRegion.writeUnlock(); + } + } else { + realtimeSource.start(); + } + + // Historical extraction manages its own narrower region write lock. Keeping the outer lock + // only for realtime-source registration allows the expensive historical sort/materialization + // phase to stay out of the region critical section. historicalSource.start(); } catch (final Exception e) { exceptionHolder.set(e);