fix
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 06d6a51..cce25e7 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -170,13 +170,11 @@
       final PipeParameters sourceParameters = pipeStaticMeta.getSourceParameters();
       final DataRegionId dataRegionId = new DataRegionId(consensusGroupId);
       final boolean needConstructDataRegionTask =
-          StorageEngine.getInstance().getAllDataRegionIds().contains(dataRegionId)
+          StorageEngine.getInstance().getDataRegion(dataRegionId) != null
               && DataRegionListeningFilter.shouldDataRegionBeListened(
                   sourceParameters, dataRegionId);
       final boolean needConstructSchemaRegionTask =
-          SchemaEngine.getInstance()
-                  .getAllSchemaRegionIds()
-                  .contains(new SchemaRegionId(consensusGroupId))
+          SchemaEngine.getInstance().getSchemaRegion(new SchemaRegionId(consensusGroupId)) != null
               && SchemaRegionListeningFilter.shouldSchemaRegionBeListened(
                   consensusGroupId, sourceParameters);
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java
index 46a1013..f4655c7 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java
@@ -36,7 +36,6 @@
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 public class PipeDataNodeBuilder {
@@ -53,9 +52,6 @@
     final PipeStaticMeta pipeStaticMeta = pipeMeta.getStaticMeta();
     final PipeRuntimeMeta pipeRuntimeMeta = pipeMeta.getRuntimeMeta();
 
-    final List<DataRegionId> dataRegionIds = StorageEngine.getInstance().getAllDataRegionIds();
-    final List<SchemaRegionId> schemaRegionIds = SchemaEngine.getInstance().getAllSchemaRegionIds();
-
     final Map<Integer, PipeTask> consensusGroupIdToPipeTaskMap = new HashMap<>();
     for (Map.Entry<Integer, PipeTaskMeta> consensusGroupIdToPipeTaskMeta :
         pipeRuntimeMeta.getConsensusGroupId2TaskMetaMap().entrySet()) {
@@ -66,11 +62,11 @@
         final PipeParameters sourceParameters = pipeStaticMeta.getSourceParameters();
         final DataRegionId dataRegionId = new DataRegionId(consensusGroupId);
         final boolean needConstructDataRegionTask =
-            dataRegionIds.contains(dataRegionId)
+            StorageEngine.getInstance().getDataRegion(dataRegionId) != null
                 && DataRegionListeningFilter.shouldDataRegionBeListened(
                     sourceParameters, dataRegionId);
         final boolean needConstructSchemaRegionTask =
-            schemaRegionIds.contains(new SchemaRegionId(consensusGroupId))
+            SchemaEngine.getInstance().getSchemaRegion(new SchemaRegionId(consensusGroupId)) != null
                 && SchemaRegionListeningFilter.shouldSchemaRegionBeListened(
                     consensusGroupId, sourceParameters);
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
index 2373495..6eff04d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
@@ -77,7 +77,7 @@
             new PipeTaskProcessorRuntimeEnvironment(
                 pipeName, creationTime, regionId, pipeTaskMeta));
     final PipeProcessor pipeProcessor =
-        StorageEngine.getInstance().getAllDataRegionIds().contains(new DataRegionId(regionId))
+        StorageEngine.getInstance().getDataRegion(new DataRegionId(regionId)) != null
                 || PipeRuntimeMeta.isSourceExternal(regionId)
             ? PipeDataNodeAgent.plugin()
                 .dataRegion()
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java
index 5f774ce..84f1b35 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java
@@ -49,7 +49,7 @@
       int regionId,
       PipeTaskMeta pipeTaskMeta) {
     pipeExtractor =
-        StorageEngine.getInstance().getAllDataRegionIds().contains(new DataRegionId(regionId))
+        StorageEngine.getInstance().getDataRegion(new DataRegionId(regionId)) != null
                 || PipeRuntimeMeta.isSourceExternal(regionId)
             ? PipeDataNodeAgent.plugin().dataRegion().reflectSource(sourceParameters)
             : PipeDataNodeAgent.plugin().schemaRegion().reflectSource(sourceParameters);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
index a748218..7179f8c 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
@@ -45,6 +45,7 @@
 import org.apache.iotdb.db.protocol.session.InternalClientSession;
 import org.apache.iotdb.db.protocol.session.SessionManager;
 import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.pipe.api.annotation.TableModel;
 import org.apache.iotdb.pipe.api.annotation.TreeModel;
 import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
@@ -605,19 +606,12 @@
       if (StorageEngine.getInstance()
               .runIfPresent(
                   dataRegionIdObject,
-                  (dataRegion -> {
-                    dataRegion.writeLock(
-                        String.format("Pipe: starting %s", IoTDBDataRegionSource.class.getName()));
-                    try {
-                      startHistoricalExtractorAndRealtimeExtractor(exceptionHolder);
-                    } finally {
-                      dataRegion.writeUnlock();
-                    }
-                  }))
+                  dataRegion ->
+                      startHistoricalExtractorAndRealtimeExtractor(exceptionHolder, dataRegion))
           || StorageEngine.getInstance()
               .runIfAbsent(
                   dataRegionIdObject,
-                  () -> startHistoricalExtractorAndRealtimeExtractor(exceptionHolder))) {
+                  () -> startHistoricalExtractorAndRealtimeExtractor(exceptionHolder, null))) {
         rethrowExceptionIfAny(exceptionHolder);
 
         LOGGER.info(
@@ -634,14 +628,28 @@
   }
 
   private void startHistoricalExtractorAndRealtimeExtractor(
-      final AtomicReference<Exception> exceptionHolder) {
+      final AtomicReference<Exception> exceptionHolder, final DataRegion dataRegion) {
     try {
       // Start realtimeSource first to avoid losing data. This may cause some
       // retransmission, yet it is OK according to the idempotency of IoTDB.
       // Note: The order of historical collection is flushing data -> adding all tsFile events.
       // There can still be writing when tsFile events are added. If we start
       // realtimeSource after the process, then this part of data will be lost.
-      realtimeSource.start();
+      if (Objects.nonNull(dataRegion)) {
+        dataRegion.writeLock(
+            String.format("Pipe: starting %s", IoTDBDataRegionSource.class.getName()));
+        try {
+          realtimeSource.start();
+        } finally {
+          dataRegion.writeUnlock();
+        }
+      } else {
+        realtimeSource.start();
+      }
+
+      // Historical extraction manages its own narrower region write lock. Keeping the outer lock
+      // only for realtime-source registration allows the expensive historical sort/materialization
+      // phase to stay out of the region critical section.
       historicalSource.start();
     } catch (final Exception e) {
       exceptionHolder.set(e);