Merge origin/master into fix-log-throttling
diff --git a/.github/scripts/package-client-cpp-manylinux228.sh b/.github/scripts/package-client-cpp-manylinux228.sh
index 773e835..6bfef0a 100755
--- a/.github/scripts/package-client-cpp-manylinux228.sh
+++ b/.github/scripts/package-client-cpp-manylinux228.sh
@@ -71,9 +71,22 @@
 cmake --version
 java -version
 
+# manylinux_2_28 is AlmaLinux 8, whose system OpenSSL is 1.1.1 (EOL and not
+# Apache-2.0 - must not be bundled/redistributed in an ASF convenience binary).
+# Build OpenSSL 3.x from source instead (-Diotdb.openssl.from.source=ON), which
+# keeps the glibc 2.28 baseline. OpenSSL 3.x's Configure needs perl plus a few
+# modules (IPC::Cmd, Data::Dumper) that are not on the minimal image - install
+# them even when perl itself is already present.
+if command -v dnf >/dev/null 2>&1; then
+  dnf install -y perl perl-IPC-Cmd perl-Data-Dumper
+else
+  yum install -y perl perl-IPC-Cmd perl-Data-Dumper
+fi
+
 cd "${GITHUB_WORKSPACE:?GITHUB_WORKSPACE is not set}"
 ./mvnw clean package -P with-cpp -pl iotdb-client/client-cpp -am -DskipTests \
   -Dspotless.skip=true \
+  -Diotdb.openssl.from.source=ON \
   -Dclient.cpp.package.classifier="${PACKAGE_CLASSIFIER}"
 
 SO="iotdb-client/client-cpp/target/install/lib/libiotdb_session.so"
diff --git a/.github/workflows/client-cpp-package.yml b/.github/workflows/client-cpp-package.yml
index 3c8c2b2..38eac3f 100644
--- a/.github/workflows/client-cpp-package.yml
+++ b/.github/workflows/client-cpp-package.yml
@@ -309,10 +309,14 @@
         shell: bash
         run: |
           set -euxo pipefail
-          brew install boost openssl llvm@17 bison
+          # Pin openssl@3 (Apache-2.0): the default 'openssl' formula will move to
+          # OpenSSL 4.0, which drops the legacy TLS-method APIs Thrift still uses.
+          brew install boost openssl@3 llvm@17 bison
           ln -sf "$(brew --prefix llvm@17)/bin/clang-format" "$(brew --prefix)/bin/clang-format"
           echo "$(brew --prefix bison)/bin" >> "$GITHUB_PATH"
           echo "$(brew --prefix llvm@17)/bin" >> "$GITHUB_PATH"
+          # Homebrew OpenSSL is keg-only, so point find_package(OpenSSL) at it.
+          echo "OPENSSL_ROOT_DIR=$(brew --prefix openssl@3)" >> "$GITHUB_ENV"
           clang-format --version
           bison --version
       - name: Cache Maven packages
@@ -429,8 +433,16 @@
             throw "Boost not found under C:\local after installing ${{ matrix.boost_choco }}"
           }
           echo $boostDir.FullName >> $env:GITHUB_PATH
-          choco install openssl -y --no-progress
-          $sslPath = (Get-ChildItem 'C:\Program Files\OpenSSL*' -Directory | Select-Object -First 1).FullName
+          # Use a pinned OpenSSL 3.x (Apache-2.0). 'choco install openssl' now
+          # installs OpenSSL 4.0, which removed the legacy TLS-method APIs that
+          # Apache Thrift's TSSLSocket still calls. The FireDaemon zip is a clean
+          # prebuilt OpenSSL 3.5.x that keeps them.
+          $sslZip = "$env:RUNNER_TEMP\openssl-3.5.3.zip"
+          $sslDir = "$env:RUNNER_TEMP\openssl-3"
+          curl.exe -L --fail --retry 3 -o $sslZip 'https://download.firedaemon.com/FireDaemon-OpenSSL/openssl-3.5.3.zip'
+          Expand-Archive -Path $sslZip -DestinationPath $sslDir -Force
+          $sslPath = (Get-ChildItem $sslDir -Recurse -Directory -Filter 'x64' | Select-Object -First 1).FullName
+          if (-not $sslPath) { throw "OpenSSL x64 dir not found under $sslDir" }
           echo "$sslPath\bin" >> $env:GITHUB_PATH
           echo "OPENSSL_ROOT_DIR=$sslPath" >> $env:GITHUB_ENV
       - name: Cache Maven packages
diff --git a/.github/workflows/multi-language-client.yml b/.github/workflows/multi-language-client.yml
index 16c6d93..5437a65 100644
--- a/.github/workflows/multi-language-client.yml
+++ b/.github/workflows/multi-language-client.yml
@@ -144,10 +144,13 @@
         if: runner.os == 'macOS'
         shell: bash
         run: |
-          brew install boost openssl llvm@17 bison
+          # Pin openssl@3 (Apache-2.0); the default formula will move to OpenSSL 4.0.
+          brew install boost openssl@3 llvm@17 bison
           ln -sf "$(brew --prefix llvm@17)/bin/clang-format" "$(brew --prefix)/bin/clang-format"
           echo "$(brew --prefix bison)/bin" >> "$GITHUB_PATH"
           echo "$(brew --prefix llvm@17)/bin" >> "$GITHUB_PATH"
+          # Homebrew OpenSSL is keg-only, so point find_package(OpenSSL) at it.
+          echo "OPENSSL_ROOT_DIR=$(brew --prefix openssl@3)" >> "$GITHUB_ENV"
           clang-format --version
           bison --version
           sudo rm -rf /Applications/Xcode_14.3.1.app
@@ -163,8 +166,14 @@
           $boost_path = (Get-ChildItem -Path 'C:\local\' -Filter 'boost_*').FullName
           echo $boost_path >> $env:GITHUB_PATH
           
-          choco install openssl -y
-          $sslPath = (Get-ChildItem 'C:\Program Files\OpenSSL*' -Directory | Select-Object -First 1).FullName
+          # Pinned OpenSSL 3.x (Apache-2.0): 'choco install openssl' now installs
+          # OpenSSL 4.0, which removed the legacy TLS-method APIs Thrift uses.
+          $sslZip = "$env:RUNNER_TEMP\openssl-3.5.3.zip"
+          $sslDir = "$env:RUNNER_TEMP\openssl-3"
+          curl.exe -L --fail --retry 3 -o $sslZip 'https://download.firedaemon.com/FireDaemon-OpenSSL/openssl-3.5.3.zip'
+          Expand-Archive -Path $sslZip -DestinationPath $sslDir -Force
+          $sslPath = (Get-ChildItem $sslDir -Recurse -Directory -Filter 'x64' | Select-Object -First 1).FullName
+          if (-not $sslPath) { throw "OpenSSL x64 dir not found under $sslDir" }
           echo "$sslPath\bin" >> $env:GITHUB_PATH
           echo "OPENSSL_ROOT_DIR=$sslPath" >> $env:GITHUB_ENV
           choco install llvm --version=17.0.6 --force -y
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
index 5e41807..8399955 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
@@ -90,6 +90,31 @@
   }
 
   @Override
+  public DataNodeConfig setMaxAllocateMemoryRatioForLoad(double maxAllocateMemoryRatioForLoad) {
+    properties.setProperty(
+        "max_allocate_memory_ratio_for_load", String.valueOf(maxAllocateMemoryRatioForLoad));
+    return this;
+  }
+
+  @Override
+  public DataNodeConfig setLoadTsFileTabletConversionBatchMemorySizeInBytes(
+      long loadTsFileTabletConversionBatchMemorySizeInBytes) {
+    properties.setProperty(
+        "load_tsfile_tablet_conversion_batch_memory_size_in_bytes",
+        String.valueOf(loadTsFileTabletConversionBatchMemorySizeInBytes));
+    return this;
+  }
+
+  @Override
+  public DataNodeConfig setLoadActiveListeningCheckIntervalSeconds(
+      long loadActiveListeningCheckIntervalSeconds) {
+    properties.setProperty(
+        "load_active_listening_check_interval_seconds",
+        String.valueOf(loadActiveListeningCheckIntervalSeconds));
+    return this;
+  }
+
+  @Override
   public DataNodeConfig setCompactionScheduleInterval(long compactionScheduleInterval) {
     properties.setProperty(
         "compaction_schedule_interval_in_ms", String.valueOf(compactionScheduleInterval));
@@ -143,4 +168,16 @@
     setProperty("query_cost_stat_window", String.valueOf(queryCostStatWindow));
     return this;
   }
+
+  @Override
+  public DataNodeConfig setDnDataDirs(String dnDataDirs) {
+    setProperty("dn_data_dirs", dnDataDirs);
+    return this;
+  }
+
+  @Override
+  public DataNodeConfig setDnMultiDirStrategy(String multiDirStrategy) {
+    setProperty("dn_multi_dir_strategy", multiDirStrategy);
+    return this;
+  }
 }
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
index dac6cf3..d205044 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
@@ -44,7 +44,6 @@
 import static org.apache.iotdb.it.env.cluster.ClusterConstant.DEFAULT_DATA_NODE_PROPERTIES;
 import static org.apache.iotdb.it.env.cluster.ClusterConstant.DN_CONNECTION_TIMEOUT_MS;
 import static org.apache.iotdb.it.env.cluster.ClusterConstant.DN_CONSENSUS_DIR;
-import static org.apache.iotdb.it.env.cluster.ClusterConstant.DN_DATA_DIRS;
 import static org.apache.iotdb.it.env.cluster.ClusterConstant.DN_DATA_REGION_CONSENSUS_PORT;
 import static org.apache.iotdb.it.env.cluster.ClusterConstant.DN_JOIN_CLUSTER_RETRY_INTERVAL_MS;
 import static org.apache.iotdb.it.env.cluster.ClusterConstant.DN_METRIC_INTERNAL_REPORTER_TYPE;
@@ -125,7 +124,6 @@
 
     immutableNodeProperties.setProperty(IoTDBConstant.DN_SEED_CONFIG_NODE, seedConfigNode);
     immutableNodeProperties.setProperty(DN_SYSTEM_DIR, MppBaseConfig.NULL_VALUE);
-    immutableNodeProperties.setProperty(DN_DATA_DIRS, MppBaseConfig.NULL_VALUE);
     immutableNodeProperties.setProperty(DN_CONSENSUS_DIR, MppBaseConfig.NULL_VALUE);
     immutableNodeProperties.setProperty(DN_WAL_DIRS, MppBaseConfig.NULL_VALUE);
     immutableNodeProperties.setProperty(DN_TRACING_DIR, MppBaseConfig.NULL_VALUE);
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
index bba4c96..a76608e 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
@@ -55,6 +55,23 @@
   }
 
   @Override
+  public DataNodeConfig setMaxAllocateMemoryRatioForLoad(double maxAllocateMemoryRatioForLoad) {
+    return this;
+  }
+
+  @Override
+  public DataNodeConfig setLoadTsFileTabletConversionBatchMemorySizeInBytes(
+      long loadTsFileTabletConversionBatchMemorySizeInBytes) {
+    return this;
+  }
+
+  @Override
+  public DataNodeConfig setLoadActiveListeningCheckIntervalSeconds(
+      long loadActiveListeningCheckIntervalSeconds) {
+    return this;
+  }
+
+  @Override
   public DataNodeConfig setCompactionScheduleInterval(long compactionScheduleInterval) {
     return this;
   }
@@ -98,4 +115,14 @@
   public DataNodeConfig setQueryCostStatWindow(int queryCostStatWindow) {
     return this;
   }
+
+  @Override
+  public DataNodeConfig setDnDataDirs(String dnDataDirs) {
+    return this;
+  }
+
+  @Override
+  public DataNodeConfig setDnMultiDirStrategy(String multiDirStrategy) {
+    return this;
+  }
 }
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
index d57015b..bc045c9 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
@@ -36,6 +36,14 @@
   DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
       long loadTsFileAnalyzeSchemaMemorySizeInBytes);
 
+  DataNodeConfig setMaxAllocateMemoryRatioForLoad(double maxAllocateMemoryRatioForLoad);
+
+  DataNodeConfig setLoadTsFileTabletConversionBatchMemorySizeInBytes(
+      long loadTsFileTabletConversionBatchMemorySizeInBytes);
+
+  DataNodeConfig setLoadActiveListeningCheckIntervalSeconds(
+      long loadActiveListeningCheckIntervalSeconds);
+
   DataNodeConfig setCompactionScheduleInterval(long compactionScheduleInterval);
 
   DataNodeConfig setEnableMQTTService(boolean enableMQTTService);
@@ -53,4 +61,8 @@
   DataNodeConfig setDataNodeMemoryProportion(String dataNodeMemoryProportion);
 
   DataNodeConfig setQueryCostStatWindow(int queryCostStatWindow);
+
+  DataNodeConfig setDnDataDirs(String dnDataDirs);
+
+  DataNodeConfig setDnMultiDirStrategy(String multiDirStrategy);
 }
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithDeletionMultiDataDirIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithDeletionMultiDataDirIT.java
new file mode 100644
index 0000000..8214bf6
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithDeletionMultiDataDirIT.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1;
+
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+
+import org.apache.tsfile.utils.Pair;
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getAllDataNodes;
+import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getDataRegionMapWithLeader;
+
+/**
+ * Tree-model coverage for IoTConsensus region migration over multiple data dirs: a deletion (mods)
+ * must survive the snapshot transfer to the migrated peer. With several data dirs the snapshot
+ * fragments of one TsFile can be received into different folders, so the receiver groups companion
+ * files and the loader relinks them into one data dir; if that breaks, the migrated replica loses
+ * the deletion. See the table-model twin {@link IoTDBRegionMigrateWithDeletionMultiDataDirTableIT}.
+ */
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBRegionMigrateWithDeletionMultiDataDirIT {
+
+  private static final String MULTI_DATA_DIRS =
+      "data/datanode/data/disk0,data/datanode/data/disk1,data/datanode/data/disk2";
+
+  @Before
+  public void setUp() throws Exception {
+    EnvFactory.getEnv()
+        .getConfig()
+        .getCommonConfig()
+        .setDataReplicationFactor(2)
+        .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
+    EnvFactory.getEnv().getConfig().getDataNodeConfig().setDnDataDirs(MULTI_DATA_DIRS);
+    EnvFactory.getEnv().initClusterEnvironment(1, 3);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void testRegionMigratePreservesDeletionWithMultiDataDirs() throws Exception {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("CREATE DATABASE root.db");
+      statement.execute(
+          "INSERT INTO root.db.d1(timestamp, s1) VALUES (100, 100), (200, 200), (300, 300)");
+      statement.execute("FLUSH");
+      statement.execute("DELETE FROM root.db.d1.s1 WHERE time <= 200");
+      statement.execute("FLUSH");
+
+      Map<Integer, Pair<Integer, Set<Integer>>> dataRegionMapWithLeader =
+          getDataRegionMapWithLeader(statement);
+      int dataRegionIdForTest =
+          dataRegionMapWithLeader.keySet().stream().max(Integer::compareTo).orElseThrow();
+      assertDeletionVisibleOnAllReplicas(dataRegionIdForTest, 1);
+
+      Pair<Integer, Set<Integer>> leaderAndNodes = dataRegionMapWithLeader.get(dataRegionIdForTest);
+      Set<Integer> allDataNodes = getAllDataNodes(statement);
+      int leaderId = leaderAndNodes.getLeft();
+      int followerId =
+          leaderAndNodes.getRight().stream().filter(id -> id != leaderId).findFirst().orElseThrow();
+      int destDataNodeId =
+          allDataNodes.stream()
+              .filter(id -> id != leaderId && id != followerId)
+              .findFirst()
+              .orElseThrow();
+
+      statement.execute(
+          String.format(
+              "migrate region %d from %d to %d", dataRegionIdForTest, leaderId, destDataNodeId));
+
+      final int finalDestDataNodeId = destDataNodeId;
+      Awaitility.await()
+          .atMost(10, TimeUnit.MINUTES)
+          .pollDelay(1, TimeUnit.SECONDS)
+          .pollInterval(2, TimeUnit.SECONDS)
+          .untilAsserted(
+              () -> {
+                try (ResultSet showRegions = statement.executeQuery("SHOW REGIONS")) {
+                  boolean migrated = false;
+                  while (showRegions.next()) {
+                    if (showRegions.getInt("RegionId") == dataRegionIdForTest
+                        && showRegions.getInt("DataNodeId") == finalDestDataNodeId) {
+                      migrated = true;
+                      break;
+                    }
+                  }
+                  Assert.assertTrue(migrated);
+                }
+              });
+
+      assertDeletionVisibleOnAllReplicas(dataRegionIdForTest, 1);
+    }
+  }
+
+  private void assertDeletionVisibleOnAllReplicas(int dataRegionId, int expectedCount)
+      throws Exception {
+    Set<Integer> replicaDataNodeIds;
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      replicaDataNodeIds = getReplicaDataNodeIds(statement, dataRegionId);
+    }
+    for (int dataNodeId : replicaDataNodeIds) {
+      DataNodeWrapper dataNodeWrapper =
+          EnvFactory.getEnv().dataNodeIdToWrapper(dataNodeId).orElseThrow();
+      Awaitility.await()
+          .atMost(2, TimeUnit.MINUTES)
+          .pollDelay(500, TimeUnit.MILLISECONDS)
+          .pollInterval(1, TimeUnit.SECONDS)
+          .untilAsserted(() -> assertDeletionVisibleOnReplica(dataNodeWrapper, expectedCount));
+    }
+  }
+
+  private void assertDeletionVisibleOnReplica(DataNodeWrapper dataNodeWrapper, int expectedCount)
+      throws Exception {
+    try (Connection connection = EnvFactory.getEnv().getConnection(dataNodeWrapper);
+        Statement dataNodeStatement = connection.createStatement()) {
+      try (ResultSet countResultSet =
+          dataNodeStatement.executeQuery("SELECT COUNT(s1) FROM root.db.d1")) {
+        Assert.assertTrue(countResultSet.next());
+        Assert.assertEquals(expectedCount, countResultSet.getLong(1));
+      }
+      try (ResultSet deletedRangeResultSet =
+          dataNodeStatement.executeQuery("SELECT s1 FROM root.db.d1 WHERE time <= 200")) {
+        Assert.assertFalse(deletedRangeResultSet.next());
+      }
+    }
+  }
+
+  private Set<Integer> getReplicaDataNodeIds(Statement statement, int dataRegionId)
+      throws Exception {
+    Set<Integer> replicaDataNodeIds = new HashSet<>();
+    try (ResultSet showRegions = statement.executeQuery("SHOW REGIONS")) {
+      while (showRegions.next()) {
+        if ("DataRegion".equals(showRegions.getString("Type"))
+            && showRegions.getInt("RegionId") == dataRegionId) {
+          replicaDataNodeIds.add(showRegions.getInt("DataNodeId"));
+        }
+      }
+    }
+    Assert.assertFalse(replicaDataNodeIds.isEmpty());
+    return replicaDataNodeIds;
+  }
+}
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithDeletionMultiDataDirTableIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithDeletionMultiDataDirTableIT.java
new file mode 100644
index 0000000..59d5a18
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithDeletionMultiDataDirTableIT.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1;
+
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.isession.SessionConfig;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.itbase.env.BaseEnv;
+
+import org.apache.tsfile.utils.Pair;
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getAllDataNodes;
+import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getDataRegionMapWithLeader;
+
+/**
+ * Table-model twin of {@link IoTDBRegionMigrateWithDeletionMultiDataDirIT}: a deletion (mods) must
+ * survive IoTConsensus region migration across multiple data dirs, asserted through the relational
+ * (table) SQL dialect so the table-model cluster CI covers the same snapshot mods-transfer path.
+ */
+@RunWith(IoTDBTestRunner.class)
+@Category({TableClusterIT.class})
+public class IoTDBRegionMigrateWithDeletionMultiDataDirTableIT {
+
+  private static final String MULTI_DATA_DIRS =
+      "data/datanode/data/disk0,data/datanode/data/disk1,data/datanode/data/disk2";
+
+  @Before
+  public void setUp() throws Exception {
+    EnvFactory.getEnv()
+        .getConfig()
+        .getCommonConfig()
+        .setDataReplicationFactor(2)
+        .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
+    EnvFactory.getEnv().getConfig().getDataNodeConfig().setDnDataDirs(MULTI_DATA_DIRS);
+    EnvFactory.getEnv().initClusterEnvironment(1, 3);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void testRegionMigratePreservesDeletionWithMultiDataDirs() throws Exception {
+    try (Connection connection = EnvFactory.getEnv().getTableConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("CREATE DATABASE test");
+      statement.execute("USE test");
+      statement.execute("CREATE TABLE t1 (s1 INT64 FIELD)");
+      statement.execute("INSERT INTO t1 (time, s1) VALUES (100, 100), (200, 200), (300, 300)");
+      statement.execute("FLUSH");
+      statement.execute("DELETE FROM t1 WHERE time <= 200");
+      statement.execute("FLUSH");
+
+      Map<Integer, Pair<Integer, Set<Integer>>> dataRegionMapWithLeader =
+          getDataRegionMapWithLeader(statement);
+      int dataRegionIdForTest =
+          dataRegionMapWithLeader.keySet().stream().max(Integer::compareTo).orElseThrow();
+      assertDeletionVisibleOnAllReplicas(statement, dataRegionIdForTest, 1);
+
+      Pair<Integer, Set<Integer>> leaderAndNodes = dataRegionMapWithLeader.get(dataRegionIdForTest);
+      Set<Integer> allDataNodes = getAllDataNodes(statement);
+      int leaderId = leaderAndNodes.getLeft();
+      int followerId =
+          leaderAndNodes.getRight().stream().filter(id -> id != leaderId).findFirst().orElseThrow();
+      int destDataNodeId =
+          allDataNodes.stream()
+              .filter(id -> id != leaderId && id != followerId)
+              .findFirst()
+              .orElseThrow();
+
+      statement.execute(
+          String.format(
+              "migrate region %d from %d to %d", dataRegionIdForTest, leaderId, destDataNodeId));
+
+      final int finalDestDataNodeId = destDataNodeId;
+      Awaitility.await()
+          .atMost(10, TimeUnit.MINUTES)
+          .pollDelay(1, TimeUnit.SECONDS)
+          .pollInterval(2, TimeUnit.SECONDS)
+          .untilAsserted(
+              () -> {
+                try (ResultSet showRegions = statement.executeQuery("SHOW REGIONS")) {
+                  boolean migrated = false;
+                  while (showRegions.next()) {
+                    if (showRegions.getInt("RegionId") == dataRegionIdForTest
+                        && showRegions.getInt("DataNodeId") == finalDestDataNodeId) {
+                      migrated = true;
+                      break;
+                    }
+                  }
+                  Assert.assertTrue(migrated);
+                }
+              });
+
+      assertDeletionVisibleOnAllReplicas(statement, dataRegionIdForTest, 1);
+    }
+  }
+
+  private void assertDeletionVisibleOnAllReplicas(
+      Statement statement, int dataRegionId, int expectedCount) throws Exception {
+    Set<Integer> replicaDataNodeIds = getReplicaDataNodeIds(statement, dataRegionId);
+    for (int dataNodeId : replicaDataNodeIds) {
+      DataNodeWrapper dataNodeWrapper =
+          EnvFactory.getEnv().dataNodeIdToWrapper(dataNodeId).orElseThrow();
+      Awaitility.await()
+          .atMost(2, TimeUnit.MINUTES)
+          .pollDelay(500, TimeUnit.MILLISECONDS)
+          .pollInterval(1, TimeUnit.SECONDS)
+          .untilAsserted(() -> assertDeletionVisibleOnReplica(dataNodeWrapper, expectedCount));
+    }
+  }
+
+  private void assertDeletionVisibleOnReplica(DataNodeWrapper dataNodeWrapper, int expectedCount)
+      throws Exception {
+    try (Connection connection =
+            EnvFactory.getEnv()
+                .getConnection(
+                    dataNodeWrapper,
+                    SessionConfig.DEFAULT_USER,
+                    SessionConfig.DEFAULT_PASSWORD,
+                    BaseEnv.TABLE_SQL_DIALECT);
+        Statement dataNodeStatement = connection.createStatement()) {
+      dataNodeStatement.execute("USE test");
+      try (ResultSet countResultSet = dataNodeStatement.executeQuery("SELECT COUNT(s1) FROM t1")) {
+        Assert.assertTrue(countResultSet.next());
+        Assert.assertEquals(expectedCount, countResultSet.getLong(1));
+      }
+      try (ResultSet deletedRangeResultSet =
+          dataNodeStatement.executeQuery("SELECT s1 FROM t1 WHERE time <= 200")) {
+        Assert.assertFalse(deletedRangeResultSet.next());
+      }
+    }
+  }
+
+  private Set<Integer> getReplicaDataNodeIds(Statement statement, int dataRegionId)
+      throws Exception {
+    Set<Integer> replicaDataNodeIds = new HashSet<>();
+    try (ResultSet showRegions = statement.executeQuery("SHOW REGIONS")) {
+      while (showRegions.next()) {
+        if ("DataRegion".equals(showRegions.getString("Type"))
+            && showRegions.getInt("RegionId") == dataRegionId) {
+          replicaDataNodeIds.add(showRegions.getInt("DataNodeId"));
+        }
+      }
+    }
+    Assert.assertFalse(replicaDataNodeIds.isEmpty());
+    return replicaDataNodeIds;
+  }
+}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileActiveRetryIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileActiveRetryIT.java
new file mode 100644
index 0000000..3827615
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileActiveRetryIT.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.it.utils.TsFileGenerator;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBLoadTsFileActiveRetryIT {
+
+  private static final String DATABASE = "root.sg.test_0";
+  private static final String DEVICE = DATABASE + ".d_0";
+  private static final String MEASUREMENT = "sensor_00";
+  private static final long UNALLOCATABLE_TABLET_CONVERSION_BATCH_MEMORY_SIZE_IN_BYTES =
+      Long.MAX_VALUE / 4;
+  private static final MeasurementSchema TSFILE_SCHEMA =
+      new MeasurementSchema(MEASUREMENT, TSDataType.INT32, TSEncoding.RLE);
+
+  private File tmpDir;
+
+  @Before
+  public void setUp() throws Exception {
+    tmpDir = new File(Files.createTempDirectory("load-active-retry").toUri());
+    EnvFactory.getEnv().getConfig().getCommonConfig().setPipeMemoryManagementEnabled(false);
+    EnvFactory.getEnv()
+        .getConfig()
+        .getDataNodeConfig()
+        .setMaxAllocateMemoryRatioForLoad(1.0)
+        .setLoadTsFileAnalyzeSchemaMemorySizeInBytes(1)
+        .setLoadTsFileTabletConversionBatchMemorySizeInBytes(
+            UNALLOCATABLE_TABLET_CONVERSION_BATCH_MEMORY_SIZE_IN_BYTES)
+        .setLoadActiveListeningCheckIntervalSeconds(1);
+
+    EnvFactory.getEnv().initClusterEnvironment();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    try (final Connection connection = EnvFactory.getEnv().getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute("delete database " + DATABASE);
+    } catch (final Exception ignored) {
+      // ignore cleanup failure
+    } finally {
+      EnvFactory.getEnv().cleanClusterEnvironment();
+      deleteRecursively(tmpDir);
+    }
+  }
+
+  @Test
+  public void testActiveLoadTemporaryUnavailableShouldKeepFileForRetry() throws Exception {
+    final DataNodeWrapper dataNodeWrapper = EnvFactory.getEnv().getDataNodeWrapper(0);
+    final File retryTsFile = new File(tmpDir, "1-0-0-0.tsfile");
+    final File permanentFailureTsFile = new File(tmpDir, "2-0-0-0.tsfile");
+    generateTsFile(retryTsFile);
+    generateTsFile(permanentFailureTsFile);
+
+    try (final Connection connection =
+            EnvFactory.getEnv().getConnectionWithSpecifiedDataNode(dataNodeWrapper);
+        final Statement statement = connection.createStatement()) {
+      statement.execute("create database " + DATABASE);
+      statement.execute(
+          String.format(
+              "create timeseries %s.%s %s", DEVICE, MEASUREMENT, TSDataType.INT64.name()));
+
+      statement.execute(
+          String.format(
+              "load \"%s\" with ('database-level'='3', 'async'='true', 'on-success'='none', "
+                  + "'convert-on-type-mismatch'='true')",
+              retryTsFile.getAbsolutePath()));
+      statement.execute(
+          String.format(
+              "load \"%s\" with ('database-level'='3', 'async'='true', 'on-success'='none', "
+                  + "'convert-on-type-mismatch'='false')",
+              permanentFailureTsFile.getAbsolutePath()));
+
+      final File activeDir = getActiveLoadDir(dataNodeWrapper);
+      final File failDir = getActiveLoadFailDir(dataNodeWrapper);
+      final File activeTsFile = waitForFile(activeDir, retryTsFile.getName(), 30_000L);
+
+      Assert.assertNotNull(
+          "Async load should copy tsfile into active load directory", activeTsFile);
+
+      Assert.assertNotNull(
+          "Permanent active load failure should be moved to fail dir",
+          waitForFile(failDir, permanentFailureTsFile.getName(), TimeUnit.SECONDS.toMillis(60)));
+
+      assertFileKeptForRetry(
+          activeDir, failDir, retryTsFile.getName(), TimeUnit.SECONDS.toMillis(12));
+    }
+  }
+
+  private void generateTsFile(final File tsFile) throws Exception {
+    try (final TsFileGenerator generator = new TsFileGenerator(tsFile)) {
+      generator.registerTimeseries(DEVICE, Collections.singletonList(TSFILE_SCHEMA));
+      generator.generateData(DEVICE, 10, 1, false);
+    }
+  }
+
+  private File getActiveLoadDir(final DataNodeWrapper dataNodeWrapper) {
+    return new File(
+        dataNodeWrapper.getNodePath()
+            + File.separator
+            + "ext"
+            + File.separator
+            + "load"
+            + File.separator
+            + "pending");
+  }
+
+  private File getActiveLoadFailDir(final DataNodeWrapper dataNodeWrapper) {
+    return new File(
+        dataNodeWrapper.getNodePath()
+            + File.separator
+            + "ext"
+            + File.separator
+            + "load"
+            + File.separator
+            + "failed");
+  }
+
+  private File waitForFile(final File root, final String fileName, final long timeoutMs)
+      throws InterruptedException {
+    final long deadline = System.currentTimeMillis() + timeoutMs;
+    while (System.currentTimeMillis() < deadline) {
+      final File file = findFile(root, fileName);
+      if (file != null) {
+        return file;
+      }
+      Thread.sleep(500L);
+    }
+    return null;
+  }
+
+  private boolean containsFile(final File root, final String fileName) {
+    return findFile(root, fileName) != null;
+  }
+
+  private void assertFileKeptForRetry(
+      final File activeDir, final File failDir, final String fileName, final long observationMs)
+      throws InterruptedException {
+    final long deadline = System.currentTimeMillis() + observationMs;
+    while (System.currentTimeMillis() < deadline) {
+      Assert.assertTrue(
+          "Temporary unavailable active load should keep tsfile for retry",
+          containsFile(activeDir, fileName));
+      Assert.assertFalse(
+          "Temporary unavailable active load must not move tsfile to fail dir",
+          containsFile(failDir, fileName));
+      Thread.sleep(500L);
+    }
+  }
+
+  private File findFile(final File root, final String fileName) {
+    if (root == null || !root.exists()) {
+      return null;
+    }
+    if (root.isFile()) {
+      return root.getName().equals(fileName) ? root : null;
+    }
+
+    final File[] children = root.listFiles();
+    if (children == null) {
+      return null;
+    }
+    for (final File child : children) {
+      final File file = findFile(child, fileName);
+      if (file != null) {
+        return file;
+      }
+    }
+    return null;
+  }
+
+  private void deleteRecursively(final File file) {
+    if (file == null || !file.exists()) {
+      return;
+    }
+    final File[] children = file.listFiles();
+    if (children != null) {
+      for (final File child : children) {
+        deleteRecursively(child);
+      }
+    }
+    Assert.assertTrue(file.delete());
+  }
+}
diff --git a/iotdb-client/client-cpp/CMakeLists.txt b/iotdb-client/client-cpp/CMakeLists.txt
index 749341d..ad357dd 100644
--- a/iotdb-client/client-cpp/CMakeLists.txt
+++ b/iotdb-client/client-cpp/CMakeLists.txt
@@ -78,7 +78,7 @@
     file(WRITE "${_iotdb_cxx11_abi_stamp}" "${_iotdb_cxx11_abi_stamp_value}")
 endif()
 
-option(WITH_SSL "Build with OpenSSL support" OFF)
+option(WITH_SSL "Build with OpenSSL support" ON)
 option(BUILD_TESTING "Build IT test executables" OFF)
 option(IOTDB_OFFLINE "Disable all network access during configure" OFF)
 set(IOTDB_SESSION_VERSION "0.0.0"
@@ -97,7 +97,7 @@
 endif()
 set(BOOST_VERSION "${_iotdb_default_boost_version}"
     CACHE STRING "Boost version used when downloading / unpacking (Thrift build only)")
-set(THRIFT_VERSION "0.21.0"
+set(THRIFT_VERSION "0.23.0"
     CACHE STRING "Apache Thrift version used when downloading / building")
 
 if(WIN32)
@@ -120,6 +120,7 @@
 include(FetchBuildTools)
 if(WITH_SSL)
     include(FetchOpenSSL)
+    include(InstallOpenSSLRuntime)
 endif()
 include(FetchThrift)
 include(GenerateThriftSources)
@@ -144,6 +145,22 @@
             SOVERSION "${IOTDB_SESSION_SOVERSION}")
 endif()
 
+# When SSL is on we bundle the OpenSSL shared libraries next to libiotdb_session
+# in the package lib/ directory. Give the library an $ORIGIN-relative runtime
+# search path so the loader finds them without LD_LIBRARY_PATH / install_name
+# tweaks, keeping the SDK self-contained.
+if(WITH_SSL)
+    if(APPLE)
+        set_target_properties(iotdb_session PROPERTIES
+                BUILD_RPATH "@loader_path"
+                INSTALL_RPATH "@loader_path")
+    elseif(UNIX)
+        set_target_properties(iotdb_session PROPERTIES
+                BUILD_RPATH "$ORIGIN"
+                INSTALL_RPATH "$ORIGIN")
+    endif()
+endif()
+
 add_dependencies(iotdb_session iotdb_thrift_external iotdb_thrift_codegen)
 
 target_compile_definitions(iotdb_session PRIVATE THRIFT_STATIC_DEFINE IOTDB_BUILDING_SHARED)
@@ -223,6 +240,12 @@
         LIBRARY DESTINATION lib
         ARCHIVE DESTINATION lib)
 
+# Ship the OpenSSL shared libraries we link against next to iotdb_session so the
+# packaged SDK is self-contained on machines without a system OpenSSL.
+if(WITH_SSL)
+    iotdb_install_openssl_runtime()
+endif()
+
 foreach(_hdr IN LISTS IOTDB_PUBLIC_HEADERS)
     install(FILES "${CMAKE_CURRENT_SOURCE_DIR}/src/include/${_hdr}"
             DESTINATION include)
diff --git a/iotdb-client/client-cpp/README.md b/iotdb-client/client-cpp/README.md
index 2572fd8..a882937 100644
--- a/iotdb-client/client-cpp/README.md
+++ b/iotdb-client/client-cpp/README.md
@@ -300,7 +300,7 @@
 | ppc64le | `quay.io/pypa/manylinux_2_28_ppc64le` |
 | s390x | `quay.io/pypa/manylinux_2_28_s390x` |
 
-Thrift **0.21.0** is compiled from source during the CMake configure step (see
+Thrift **0.23.0** is compiled from source during the CMake configure step (see
 `cmake/FetchThrift.cmake`). Older releases that used pre-built
 `iotdb-tools-thrift` Maven artifacts and `-Diotdb-tools-thrift.version=...`
 for glibc/MSVC compatibility apply only to the **legacy** client-cpp build;
@@ -378,13 +378,13 @@
 
 | Option                | Default                          | Purpose                                                                                                  |
 |-----------------------|----------------------------------|----------------------------------------------------------------------------------------------------------|
-| `WITH_SSL`            | `OFF`                            | Link against OpenSSL. See *SSL* below.                                                                   |
+| `WITH_SSL`            | `ON`                             | Link against OpenSSL and bundle its runtime libraries. See *SSL* below.                                  |
 | `BUILD_TESTING`       | `OFF` (Maven sets `ON` for verify) | Build Catch2 IT executables (Catch2 v2.13.7 header downloaded at configure time).                        |
 | `CATCH2_INCLUDE_DIR`  | (unset)                          | Pre-downloaded Catch2 include dir (Maven sets this under `target/test/catch2`).                          |
 | `IOTDB_OFFLINE`       | `OFF`                            | Disallow any network access during configure.                                                            |
 | `IOTDB_DEPS_DIR`      | `<client-cpp>/third-party`       | Override the local tarball cache directory.                                                              |
 | `BOOST_VERSION`       | `1.60.0` (`1.84.0` on macOS)     | Boost version that CMake will look for / download.                                                       |
-| `THRIFT_VERSION`      | `0.21.0`                         | Apache Thrift version to build from source.                                                              |
+| `THRIFT_VERSION`      | `0.23.0`                         | Apache Thrift version to build from source.                                                              |
 | `BOOST_ROOT`          | (unset)                          | Existing Boost install to reuse, equivalent to `-Dboost.include.dir=...` from the legacy build.          |
 | `OPENSSL_ROOT_DIR`    | (unset)                          | Existing OpenSSL install when `WITH_SSL=ON`.                                                             |
 | `CMAKE_INSTALL_PREFIX`| `<build>/install`                | Install location.                                                                                        |
@@ -427,12 +427,12 @@
 
    | Platform   | Required files                                                                                                                                                       |
    |------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-   | `linux/`   | `thrift-0.21.0.tar.gz`, `boost_1_60_0.tar.gz`, `m4-1.4.19.tar.gz`, `flex-2.6.4.tar.gz`, `bison-3.8.tar.gz` (and `openssl-3.5.0.tar.gz` when `WITH_SSL=ON`)            |
-   | `mac/`     | `thrift-0.21.0.tar.gz`, `boost_1_84_0.tar.gz` (newer Boost for Xcode/Clang; Apple ships m4/flex/bison; `openssl-3.5.0.tar.gz` optional)                               |
-   | `windows/` | `thrift-0.21.0.tar.gz`, `boost_1_60_0.tar.gz` (Boost headers only - no `b2` build required for `iotdb_session`)                                                      |
+   | `linux/`   | `thrift-0.23.0.tar.gz`, `boost_1_60_0.tar.gz`, `m4-1.4.19.tar.gz`, `flex-2.6.4.tar.gz`, `bison-3.8.tar.gz` (and `openssl-3.5.0.tar.gz` only when `WITH_SSL=ON` and no system OpenSSL is present) |
+   | `mac/`     | `thrift-0.23.0.tar.gz`, `boost_1_84_0.tar.gz` (newer Boost for Xcode/Clang; Apple ships m4/flex/bison; `openssl-3.5.0.tar.gz` optional)                               |
+   | `windows/` | `thrift-0.23.0.tar.gz`, `boost_1_60_0.tar.gz` (Boost headers only - no `b2` build required for `iotdb_session`)                                                      |
 
    Reference URLs (the configure step uses the same):
-   - Apache Thrift 0.21.0: <https://archive.apache.org/dist/thrift/0.21.0/thrift-0.21.0.tar.gz>
+   - Apache Thrift 0.23.0: <https://archive.apache.org/dist/thrift/0.23.0/thrift-0.23.0.tar.gz>
    - Boost 1.60.0:        <https://archives.boost.io/release/1.60.0/source/boost_1_60_0.tar.gz>
    - GNU m4 1.4.19:       <https://ftp.gnu.org/gnu/m4/m4-1.4.19.tar.gz>
    - GNU flex 2.6.4:      <https://github.com/westes/flex/releases/download/v2.6.4/flex-2.6.4.tar.gz>
@@ -461,7 +461,7 @@
 ### Linux
 
 - Tested with GCC 7+ and Clang 9+. Anything that can compile Apache Thrift
-  0.21.0 works.
+  0.23.0 works.
 - Build deps that must already exist on the host (only required when
   CMake auto-builds m4/flex/bison from tarball): `make`, `autoconf`,
   `gcc`, plus the standard C/C++ toolchain. `sudo` is **not** required;
@@ -492,9 +492,11 @@
 2. **flex / bison.** Install <https://sourceforge.net/projects/winflexbison/>
    and rename `win_flex.exe`→`flex.exe`, `win_bison.exe`→`bison.exe` on
    `PATH`.
-3. **OpenSSL** *(only when `WITH_SSL=ON`)*: run the Win64 OpenSSL
-   installer from <https://slproweb.com/products/Win32OpenSSL.html>, then
-   pass `-DOPENSSL_ROOT_DIR=...` to CMake.
+3. **OpenSSL** *(`WITH_SSL=ON` is the default)*: install OpenSSL — e.g.
+   `choco install openssl`, or a Win64 OpenSSL installer from
+   <https://slproweb.com/products/Win32OpenSSL.html> — then pass
+   `-DOPENSSL_ROOT_DIR=...` to CMake if it is not auto-detected. Pass
+   `-DWITH_SSL=OFF` to build without SSL.
 
 On Windows the SDK ships as **`iotdb_session.dll`** plus an import library
 **`iotdb_session.lib`**, built with **`/MD`** (dynamic CRT, same as a
@@ -507,16 +509,27 @@
 
 ## SSL
 
-Both Thrift and `iotdb_session` build without OpenSSL by default. Enable
-SSL with `-Dwith.ssl=ON` (Maven) or `-DWITH_SSL=ON` (standalone CMake).
-CMake first calls `find_package(OpenSSL)`;
-if nothing is found, it falls back to:
+`iotdb_session` builds **with OpenSSL by default** (`WITH_SSL=ON`). Disable
+it with `-Dwith.ssl=OFF` (Maven) or `-DWITH_SSL=OFF` (standalone CMake).
 
-- **Linux / macOS** – use a local `openssl-<ver>.tar.gz` (or download it
-  when not in offline mode), configure with `no-shared`, install into
-  `build/_deps/openssl/install`, and link statically.
-- **Windows** – fail with a friendly message that points at the Win64
-  OpenSSL installer. Building OpenSSL from source via MSVC is out of scope.
+OpenSSL **3.x** is used (Apache-2.0 licensed). Note that **OpenSSL 4.0 removed**
+the legacy TLS-method APIs (`TLSv1_method`, `SSLv3_method`, …) that Apache
+Thrift's `TSSLSocket` still calls, so install/point at a 3.x build, not 4.0.
+
+CMake calls `find_package(OpenSSL)` and uses the system OpenSSL it finds. Its
+shared libraries are **bundled into the package `lib/` directory** (next to
+`iotdb_session`, which records an `$ORIGIN`/`@loader_path` runtime path) so the
+published SDK is self-contained.
+
+Fallbacks:
+
+- **Linux / macOS** – when no system OpenSSL is found (or
+  `-DIOTDB_OPENSSL_FROM_SOURCE=ON`, which the Linux packaging build uses so the
+  AlmaLinux 8 baseline's OpenSSL 1.1.1 is never redistributed), build
+  `openssl-3.5.0.tar.gz` from source as **shared** libraries and bundle them.
+- **Windows** – fail with a friendly message; install a prebuilt OpenSSL 3.x
+  (e.g. the FireDaemon or slproweb 3.5.x zip) and set `-DOPENSSL_ROOT_DIR=...`.
+  Building OpenSSL from source via MSVC is out of scope.
 
 ## Tests
 
diff --git a/iotdb-client/client-cpp/README_zh.md b/iotdb-client/client-cpp/README_zh.md
index 5f12c71..7c4326d 100644
--- a/iotdb-client/client-cpp/README_zh.md
+++ b/iotdb-client/client-cpp/README_zh.md
@@ -236,14 +236,18 @@
 
 | CMake 变量 | Maven 属性 |
 |------------|------------|
-| `WITH_SSL` | `with.ssl`,例如 `-Dwith.ssl=ON` |
+| `WITH_SSL` | `with.ssl`(默认 `ON`,关闭用 `-Dwith.ssl=OFF`) |
 | `IOTDB_OFFLINE` | `iotdb.offline` |
 | `BUILD_TESTING` | `build.tests` |
 | `IOTDB_DEPS_DIR` | `iotdb.deps.dir` |
 | `BOOST_INCLUDEDIR` | `boost.include.dir` |
 | `CMAKE_BUILD_TYPE` | `cmake.build.type`,例如 `-Dcmake.build.type=Debug` |
 
-直接使用 CMake 时传入 `-DWITH_SSL=ON`、`-DIOTDB_OFFLINE=ON` 等即可。
+SSL 默认开启(`WITH_SSL=ON`)。所捆绑的 Apache Thrift 0.23 同时支持 OpenSSL 1.x
+与 3.x,因此直接使用系统的 OpenSSL(任意版本)。CMake 通过 `find_package(OpenSSL)`
+解析系统 OpenSSL,找不到时回退到从源码构建 OpenSSL 3.5.0;并会把所用的 OpenSSL
+动态库一并复制到产物 `lib/` 目录。Windows 可用 `choco install openssl` 安装。
+直接使用 CMake 时传入 `-DWITH_SSL=OFF`、`-DIOTDB_OFFLINE=ON` 等即可。
 Debug 构建请在配置阶段传入 `-DCMAKE_BUILD_TYPE=Debug`。Windows 使用 Visual
 Studio 生成器时也需要传入该选项,以便内置 Thrift 静态库使用 Debug MSVC 运行时;
 随后用 `cmake --build build --config Debug --target install` 构建安装。
diff --git a/iotdb-client/client-cpp/cmake/FetchBuildTools.cmake b/iotdb-client/client-cpp/cmake/FetchBuildTools.cmake
index c9d7482..866cc55 100644
--- a/iotdb-client/client-cpp/cmake/FetchBuildTools.cmake
+++ b/iotdb-client/client-cpp/cmake/FetchBuildTools.cmake
@@ -253,8 +253,23 @@
 endif()
 message(STATUS "[BuildTools] flex  = ${FLEX_EXECUTABLE}")
 
-# bison
+# bison - Thrift 0.23's grammar build uses bison >= 3.7 features (e.g. the
+# --file-prefix-map option), so reject an older system bison (manylinux_2_28
+# ships 3.0.4) and build ${BISON_VERSION} from source instead.
+set(_bison_min_version "3.7")
 find_program(BISON_EXECUTABLE bison)
+if(BISON_EXECUTABLE)
+    execute_process(COMMAND "${BISON_EXECUTABLE}" --version
+            OUTPUT_VARIABLE _bison_ver_out ERROR_QUIET
+            OUTPUT_STRIP_TRAILING_WHITESPACE)
+    string(REGEX MATCH "[0-9]+\\.[0-9]+(\\.[0-9]+)?" _bison_ver "${_bison_ver_out}")
+    if(_bison_ver AND _bison_ver VERSION_LESS _bison_min_version)
+        message(STATUS
+                "[BuildTools] system bison ${_bison_ver} < ${_bison_min_version} "
+                "(too old for Thrift ${THRIFT_VERSION}); building ${BISON_VERSION} from source")
+        unset(BISON_EXECUTABLE CACHE)
+    endif()
+endif()
 if(NOT BISON_EXECUTABLE)
     _iotdb_resolve_tarball(_bison_tarball "bison-${BISON_VERSION}.tar.gz" "${_bison_url}")
     _iotdb_build_autotools(bison "${_bison_tarball}" "bison-${BISON_VERSION}")
diff --git a/iotdb-client/client-cpp/cmake/FetchOpenSSL.cmake b/iotdb-client/client-cpp/cmake/FetchOpenSSL.cmake
index 575e280..aaf41b8 100644
--- a/iotdb-client/client-cpp/cmake/FetchOpenSSL.cmake
+++ b/iotdb-client/client-cpp/cmake/FetchOpenSSL.cmake
@@ -18,14 +18,16 @@
 # =============================================================================
 # FetchOpenSSL.cmake  (only included when WITH_SSL=ON)
 #
+# Apache Thrift 0.23 (bundled by this client) builds against OpenSSL 1.x and 3.x,
+# so any system OpenSSL is used as-is, whatever its version.
+#
 # Resolution order:
 #   1. find_package(OpenSSL) - any system / vendor install is taken as-is.
-#   2. On Linux/macOS:
-#         use tarball ${IOTDB_OS_DEPS_DIR}/openssl-${OPENSSL_VERSION}.tar.gz
+#   2. On Linux/macOS, when no system OpenSSL is present:
+#         use tarball ${IOTDB_OS_DEPS_DIR}/openssl-${OPENSSL_FALLBACK_VERSION}.tar.gz
 #         or download from openssl.org when not in offline mode, then
-#         ./Configure && make && make install_sw into ${CMAKE_BINARY_DIR}/_deps/openssl.
-#   3. On Windows: emit a FATAL_ERROR with instructions to run the bundled
-#      Win64OpenSSL installer (or any other prebuilt OpenSSL); building
+#         ./config && make && make install_sw into ${CMAKE_BINARY_DIR}/_deps/openssl.
+#   3. On Windows: emit a FATAL_ERROR asking for a prebuilt OpenSSL; building
 #      OpenSSL from source on MSVC is out of scope.
 #
 # Side effects:
@@ -33,24 +35,35 @@
 #   so callers can just link against them.
 # =============================================================================
 
-set(OPENSSL_VERSION "3.5.0" CACHE STRING "OpenSSL version to fetch when missing")
+# Version built from source when no system OpenSSL is found. Named distinctly
+# from find_package's OPENSSL_VERSION output variable to avoid collisions.
+set(OPENSSL_FALLBACK_VERSION "3.5.0"
+    CACHE STRING "OpenSSL version built from source when no system OpenSSL is found")
 
-find_package(OpenSSL QUIET)
-if(OpenSSL_FOUND)
-    message(STATUS "[OpenSSL] using system OpenSSL ${OPENSSL_VERSION_MAJOR}.${OPENSSL_VERSION_MINOR}")
-    return()
+# Build OpenSSL from source even if a system one exists. Used by the Linux
+# packaging build, whose AlmaLinux 8 baseline ships OpenSSL 1.1.1 (EOL, not
+# Apache-2.0, must not be redistributed) - we build 3.x there instead.
+option(IOTDB_OPENSSL_FROM_SOURCE
+        "Ignore any system OpenSSL and build OpenSSL ${OPENSSL_FALLBACK_VERSION} from source" OFF)
+
+if(NOT IOTDB_OPENSSL_FROM_SOURCE)
+    find_package(OpenSSL QUIET)
+    if(OpenSSL_FOUND)
+        message(STATUS "[OpenSSL] using system OpenSSL ${OPENSSL_VERSION}")
+        return()
+    endif()
 endif()
 
 if(WIN32)
     message(FATAL_ERROR
             "[OpenSSL] WITH_SSL=ON but no OpenSSL was found on Windows. "
-            "Please run third-party/windows/Win64OpenSSL-3_5_0.exe (or any "
-            "OpenSSL installer), then re-run the configure step with "
-            "-DOPENSSL_ROOT_DIR=<install_path>.")
+            "Please install a prebuilt OpenSSL (e.g. 'choco install openssl'), "
+            "then re-run the configure step with -DOPENSSL_ROOT_DIR=<install_path>. "
+            "Pass -DWITH_SSL=OFF to build without SSL.")
 endif()
 
-# --- Linux / macOS fallback: build from source ---------------------------
-set(_ossl_tarname "openssl-${OPENSSL_VERSION}.tar.gz")
+# --- Linux / macOS: build OpenSSL ${OPENSSL_FALLBACK_VERSION} from source -
+set(_ossl_tarname "openssl-${OPENSSL_FALLBACK_VERSION}.tar.gz")
 set(_ossl_tarball "${IOTDB_OS_DEPS_DIR}/${_ossl_tarname}")
 
 if(NOT EXISTS "${_ossl_tarball}")
@@ -71,9 +84,9 @@
 endif()
 
 set(_ossl_root  "${CMAKE_BINARY_DIR}/_deps/openssl")
-set(_ossl_src   "${_ossl_root}/src/openssl-${OPENSSL_VERSION}")
+set(_ossl_src   "${_ossl_root}/src/openssl-${OPENSSL_FALLBACK_VERSION}")
 set(_ossl_inst  "${_ossl_root}/install")
-set(_ossl_stamp "${_ossl_root}/.built-${OPENSSL_VERSION}")
+set(_ossl_stamp "${_ossl_root}/.built-${OPENSSL_FALLBACK_VERSION}")
 
 if(NOT EXISTS "${_ossl_stamp}")
     file(REMOVE_RECURSE "${_ossl_root}/src")
@@ -88,12 +101,15 @@
     endif()
 
     message(STATUS "[OpenSSL] configuring -> ${_ossl_inst}")
+    # ./config auto-detects the platform target. Build SHARED libraries
+    # (libssl.so.3 / libcrypto.so.3) so they can be bundled next to
+    # libiotdb_session and shipped as the SDK's OpenSSL runtime.
     execute_process(
-            COMMAND ./Configure --prefix=${_ossl_inst} --openssldir=${_ossl_inst}/ssl no-shared
+            COMMAND ./config --prefix=${_ossl_inst} --openssldir=${_ossl_inst}/ssl shared
             WORKING_DIRECTORY "${_ossl_src}"
             RESULT_VARIABLE _rc)
     if(NOT _rc EQUAL 0)
-        message(FATAL_ERROR "[OpenSSL] Configure failed (rc=${_rc})")
+        message(FATAL_ERROR "[OpenSSL] config failed (rc=${_rc})")
     endif()
 
     message(STATUS "[OpenSSL] building (-j${_jobs})")
@@ -116,6 +132,6 @@
 endif()
 
 set(OPENSSL_ROOT_DIR "${_ossl_inst}" CACHE PATH "OpenSSL root" FORCE)
-set(OPENSSL_USE_STATIC_LIBS ON)
+set(OPENSSL_USE_STATIC_LIBS OFF)
 find_package(OpenSSL REQUIRED)
-message(STATUS "[OpenSSL] built locally at ${OPENSSL_ROOT_DIR}")
+message(STATUS "[OpenSSL] built locally (shared) at ${OPENSSL_ROOT_DIR}")
diff --git a/iotdb-client/client-cpp/cmake/FetchThrift.cmake b/iotdb-client/client-cpp/cmake/FetchThrift.cmake
index f26ad64..d69b2a4 100644
--- a/iotdb-client/client-cpp/cmake/FetchThrift.cmake
+++ b/iotdb-client/client-cpp/cmake/FetchThrift.cmake
@@ -100,7 +100,7 @@
 # binary / library can immediately drive code generation and linking.
 # ---------------------------------------------------------------------------
 set(_thrift_cmake_args
-        # CMake 4.x rejects Thrift 0.21's cmake_minimum_required(3.0); set policy first.
+        # CMake 4.x rejects Thrift's old cmake_minimum_required(3.x); set policy first.
         "-DCMAKE_POLICY_VERSION_MINIMUM=3.5"
         "-DCMAKE_INSTALL_PREFIX=${_thrift_install}"
         "-DCMAKE_BUILD_TYPE=${_thrift_build_config}"
@@ -138,6 +138,15 @@
 
 if(WITH_SSL)
     list(APPEND _thrift_cmake_args "-DWITH_OPENSSL=ON")
+    # Build Thrift's TSSLSocket against the same OpenSSL that iotdb_session links
+    # and bundles, so the runtime libraries match. find_package does not set
+    # OPENSSL_ROOT_DIR itself, so derive it from the resolved include dir.
+    if(OPENSSL_ROOT_DIR)
+        list(APPEND _thrift_cmake_args "-DOPENSSL_ROOT_DIR=${OPENSSL_ROOT_DIR}")
+    elseif(OPENSSL_INCLUDE_DIR)
+        get_filename_component(_thrift_ossl_root "${OPENSSL_INCLUDE_DIR}" DIRECTORY)
+        list(APPEND _thrift_cmake_args "-DOPENSSL_ROOT_DIR=${_thrift_ossl_root}")
+    endif()
 else()
     list(APPEND _thrift_cmake_args "-DWITH_OPENSSL=OFF")
 endif()
@@ -152,7 +161,15 @@
 else()
     set(_thrift_abi_stamp "-abidefault")
 endif()
-set(_thrift_stamp "${_thrift_build}/.built-${THRIFT_VERSION}-${_thrift_build_config}-mdll${_thrift_abi_stamp}")
+# Encode WITH_SSL in the stamp: toggling SSL changes WITH_OPENSSL, so a cached
+# build of the opposite flavour must not be reused (otherwise TSSLSocket is
+# missing/extra at link time).
+if(WITH_SSL)
+    set(_thrift_ssl_stamp "-ssl")
+else()
+    set(_thrift_ssl_stamp "-nossl")
+endif()
+set(_thrift_stamp "${_thrift_build}/.built-${THRIFT_VERSION}-${_thrift_build_config}-mdll${_thrift_abi_stamp}${_thrift_ssl_stamp}")
 if(NOT EXISTS "${_thrift_stamp}")
     file(MAKE_DIRECTORY "${_thrift_build}")
     message(STATUS "[Thrift] configuring ${_thrift_dirname}")
diff --git a/iotdb-client/client-cpp/cmake/InstallOpenSSLRuntime.cmake b/iotdb-client/client-cpp/cmake/InstallOpenSSLRuntime.cmake
new file mode 100644
index 0000000..f3e181b
--- /dev/null
+++ b/iotdb-client/client-cpp/cmake/InstallOpenSSLRuntime.cmake
@@ -0,0 +1,121 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# =============================================================================
+# InstallOpenSSLRuntime.cmake  (only used when WITH_SSL=ON)
+#
+# Bundles the OpenSSL shared libraries that iotdb_session links against into the
+# package lib/ directory, so the published SDK is self-contained and runs on
+# machines that do not have OpenSSL installed.
+#
+# Relies on a prior find_package(OpenSSL) having populated
+# OPENSSL_SSL_LIBRARY / OPENSSL_CRYPTO_LIBRARY / OPENSSL_ROOT_DIR /
+# OPENSSL_VERSION_MAJOR.
+#
+# When OpenSSL was linked statically (the from-source fallback uses no-shared),
+# there is nothing to bundle: those objects are already inside libiotdb_session.
+# =============================================================================
+
+# Windows: find_package resolves the import .lib; the runtime DLLs live in
+# <root>/bin. Collect them, filtering by major version so installs that ship
+# several ABIs side by side (e.g. libssl-1_1-x64.dll + libssl-3-x64.dll) only
+# bundle the one we actually linked.
+function(_iotdb_collect_openssl_windows_dlls _out_var)
+    set(_roots "")
+    if(OPENSSL_ROOT_DIR)
+        list(APPEND _roots "${OPENSSL_ROOT_DIR}")
+    endif()
+    foreach(_implib IN LISTS OPENSSL_SSL_LIBRARY OPENSSL_CRYPTO_LIBRARY OPENSSL_LIBRARIES)
+        if(_implib AND EXISTS "${_implib}")
+            # Walk up from the import lib (.../lib, .../lib/VC/x64/MD, ...) to find
+            # a directory that owns a bin/ holding the DLLs.
+            get_filename_component(_dir "${_implib}" DIRECTORY)
+            list(APPEND _roots "${_dir}")
+            foreach(_up RANGE 1 4)
+                get_filename_component(_dir "${_dir}" DIRECTORY)
+                list(APPEND _roots "${_dir}")
+            endforeach()
+        endif()
+    endforeach()
+    list(REMOVE_DUPLICATES _roots)
+
+    set(_dlls "")
+    set(_seen_names "")
+    foreach(_root IN LISTS _roots)
+        if(_root AND IS_DIRECTORY "${_root}")
+            file(GLOB _found
+                    "${_root}/bin/libssl-${OPENSSL_VERSION_MAJOR}*.dll"
+                    "${_root}/bin/libcrypto-${OPENSSL_VERSION_MAJOR}*.dll"
+                    "${_root}/libssl-${OPENSSL_VERSION_MAJOR}*.dll"
+                    "${_root}/libcrypto-${OPENSSL_VERSION_MAJOR}*.dll")
+            # The same DLL can appear under several candidate roots (e.g. bin/ and
+            # the install root); keep only the first occurrence of each filename.
+            foreach(_dll IN LISTS _found)
+                get_filename_component(_name "${_dll}" NAME)
+                if(NOT _name IN_LIST _seen_names)
+                    list(APPEND _seen_names "${_name}")
+                    list(APPEND _dlls "${_dll}")
+                endif()
+            endforeach()
+        endif()
+    endforeach()
+    set(${_out_var} "${_dlls}" PARENT_SCOPE)
+endfunction()
+
+function(iotdb_install_openssl_runtime)
+    if(WIN32)
+        _iotdb_collect_openssl_windows_dlls(_dlls)
+        if(NOT _dlls)
+            message(STATUS
+                    "[OpenSSL] no runtime DLLs found to bundle; ensure the OpenSSL "
+                    "bin/ directory is on PATH when running the SDK")
+            return()
+        endif()
+        foreach(_dll IN LISTS _dlls)
+            message(STATUS "[OpenSSL] bundling runtime library into lib/: ${_dll}")
+        endforeach()
+        install(FILES ${_dlls} DESTINATION lib)
+        return()
+    endif()
+
+    # Linux / macOS: OPENSSL_*_LIBRARY is the developer name (libssl.so /
+    # libssl.dylib), usually a symlink to the SONAME (libssl.so.3 / .1.1).
+    # FOLLOW_SYMLINK_CHAIN installs the whole chain with the symlinks preserved,
+    # so the loader finds the SONAME the binary records. Static archives (.a)
+    # are skipped: they are already linked into libiotdb_session.
+    set(_files_arg "")
+    set(_have_libs OFF)
+    foreach(_lib IN LISTS OPENSSL_SSL_LIBRARY OPENSSL_CRYPTO_LIBRARY)
+        if(_lib AND EXISTS "${_lib}" AND NOT _lib MATCHES "\\.a$")
+            string(APPEND _files_arg " \"${_lib}\"")
+            set(_have_libs ON)
+            message(STATUS "[OpenSSL] bundling runtime library into lib/: ${_lib}")
+        endif()
+    endforeach()
+
+    if(NOT _have_libs)
+        message(STATUS
+                "[OpenSSL] no shared runtime libraries to bundle "
+                "(OpenSSL linked statically); SDK is self-contained")
+        return()
+    endif()
+
+    install(CODE
+            "file(INSTALL DESTINATION \"\${CMAKE_INSTALL_PREFIX}/lib\"
+                  TYPE SHARED_LIBRARY FOLLOW_SYMLINK_CHAIN
+                  FILES ${_files_arg})")
+endfunction()
diff --git a/iotdb-client/client-cpp/examples/CMakeLists.txt b/iotdb-client/client-cpp/examples/CMakeLists.txt
index 0b77bce..4184199 100644
--- a/iotdb-client/client-cpp/examples/CMakeLists.txt
+++ b/iotdb-client/client-cpp/examples/CMakeLists.txt
@@ -118,6 +118,21 @@
     tree_example
     table_example)
 
+# OpenSSL runtime libraries bundled in the SDK lib/ (libssl / libcrypto). When
+# building against an unpacked package, copy them next to each example binary so
+# the examples run without a system OpenSSL - libiotdb_session records them as
+# NEEDED and resolves them via its $ORIGIN runtime path.
+set(_iotdb_sdk_ssl_runtime "")
+if(NOT _iotdb_examples_in_tree)
+    file(GLOB _iotdb_sdk_ssl_runtime
+        "${IOTDB_SDK_ROOT}/lib/libssl*.so*"
+        "${IOTDB_SDK_ROOT}/lib/libcrypto*.so*"
+        "${IOTDB_SDK_ROOT}/lib/libssl*.dylib"
+        "${IOTDB_SDK_ROOT}/lib/libcrypto*.dylib"
+        "${IOTDB_SDK_ROOT}/lib/libssl*.dll"
+        "${IOTDB_SDK_ROOT}/lib/libcrypto*.dll")
+endif()
+
 foreach(_t IN LISTS _example_targets)
   IF(WITH_SSL)
       TARGET_LINK_LIBRARIES(${_t} PRIVATE "${_iotdb_link_lib}" OpenSSL::SSL OpenSSL::Crypto)
@@ -128,6 +143,13 @@
       TARGET_LINK_LIBRARIES(${_t} PRIVATE pthread)
   ENDIF()
 
+  # The packaged libiotdb_session records the bundled OpenSSL libs as DT_NEEDED;
+  # point the linker at the SDK lib/ so it can resolve them without a system
+  # OpenSSL present.
+  if(UNIX AND NOT _iotdb_examples_in_tree)
+      target_link_directories(${_t} PRIVATE "${IOTDB_SDK_ROOT}/lib")
+  endif()
+
   # Run from the build output directory without setting LD_LIBRARY_PATH / PATH.
   if(UNIX)
       set_target_properties(${_t} PROPERTIES
@@ -145,6 +167,12 @@
           COMMAND ${CMAKE_COMMAND} -E copy_if_different
                   "${_iotdb_runtime}" $<TARGET_FILE_DIR:${_t}>
           COMMENT "Copy IoTDB runtime library next to ${_t}")
+      foreach(_ssl_lib IN LISTS _iotdb_sdk_ssl_runtime)
+          add_custom_command(TARGET ${_t} POST_BUILD
+              COMMAND ${CMAKE_COMMAND} -E copy_if_different
+                      "${_ssl_lib}" $<TARGET_FILE_DIR:${_t}>
+              COMMENT "Copy bundled OpenSSL runtime next to ${_t}")
+      endforeach()
   elseif(WIN32)
       message(WARNING "Missing ${_iotdb_runtime}; copy iotdb_session.dll manually before running ${_t}.")
   endif()
@@ -166,6 +194,13 @@
         COMMAND ${CMAKE_COMMAND} -E copy_if_different
                 "${_iotdb_runtime}" "${_example_dist_dir}/")
 endif()
+# Stage the bundled OpenSSL runtime too, so a copied dist/ runs on a machine
+# without a system OpenSSL.
+foreach(_ssl_lib IN LISTS _iotdb_sdk_ssl_runtime)
+    add_custom_command(TARGET example-dist POST_BUILD
+        COMMAND ${CMAKE_COMMAND} -E copy_if_different
+                "${_ssl_lib}" "${_example_dist_dir}/")
+endforeach()
 
 if(IOTDB_EXAMPLES_REGISTER_TESTS)
     set(_runnable_example_targets
diff --git a/iotdb-client/client-cpp/examples/README.md b/iotdb-client/client-cpp/examples/README.md
index 295aa29..763ec69 100644
--- a/iotdb-client/client-cpp/examples/README.md
+++ b/iotdb-client/client-cpp/examples/README.md
@@ -53,7 +53,7 @@
 | macOS arm64 | `macos-aarch64` |
 | Windows (match your Visual Studio version) | `windows-x86_64-msvc14.1` ... `msvc14.4` |
 
-The current build compiles Thrift 0.21 from source at CMake configure time.
+The current build compiles Thrift 0.23 from source at CMake configure time.
 Legacy `-Diotdb-tools-thrift.version=...` flags applied to the **old**
 pre-built Thrift workflow only. Linux release packages are built in the
 `manylinux_2_28` container and require glibc 2.28 or newer. See
diff --git a/iotdb-client/client-cpp/examples/README_zh.md b/iotdb-client/client-cpp/examples/README_zh.md
index 435b58f..4adc38a 100644
--- a/iotdb-client/client-cpp/examples/README_zh.md
+++ b/iotdb-client/client-cpp/examples/README_zh.md
@@ -52,7 +52,7 @@
 | macOS arm64 | `macos-aarch64` |
 | Windows + 与工程相同的 VS 版本 | `windows-x86_64-msvc14.1` ... `msvc14.4` |
 
-当前 CMake 构建在配置阶段从源码编译 Thrift 0.21,**不再**通过
+当前 CMake 构建在配置阶段从源码编译 Thrift 0.23,**不再**通过
 `-Diotdb-tools-thrift.version=0.14.1.1-gcc4-SNAPSHOT` 等旧参数控制 glibc;
 Linux 发版包在 `manylinux_2_28` 容器中构建,部署机需要 glibc 2.28 或更新版本。
 详见 [client-cpp README](../../iotdb-client/client-cpp/README.md)。
diff --git a/iotdb-client/client-cpp/pom.xml b/iotdb-client/client-cpp/pom.xml
index b5b97e6..04f7fa1 100644
--- a/iotdb-client/client-cpp/pom.xml
+++ b/iotdb-client/client-cpp/pom.xml
@@ -49,7 +49,8 @@
         <cmake.install.prefix>${project.build.directory}/install</cmake.install.prefix>
         <iotdb.deps.dir>${project.basedir}/third-party</iotdb.deps.dir>
         <iotdb.offline>OFF</iotdb.offline>
-        <with.ssl>OFF</with.ssl>
+        <with.ssl>ON</with.ssl>
+        <iotdb.openssl.from.source>OFF</iotdb.openssl.from.source>
         <iotdb.cxx11.abi/>
         <!-- Switched to OFF by the .skipTests profile below. -->
         <build.tests>ON</build.tests>
@@ -112,6 +113,7 @@
                                 <option>-DCMAKE_INSTALL_PREFIX=${cmake.install.prefix}</option>
                                 <option>-DBUILD_TESTING=${build.tests}</option>
                                 <option>-DWITH_SSL=${with.ssl}</option>
+                                <option>-DIOTDB_OPENSSL_FROM_SOURCE=${iotdb.openssl.from.source}</option>
                                 <option>-DIOTDB_OFFLINE=${iotdb.offline}</option>
                                 <option>-DIOTDB_DEPS_DIR=${iotdb.deps.dir}</option>
                                 <option>-DIOTDB_USE_CXX11_ABI=${iotdb.cxx11.abi}</option>
diff --git a/iotdb-client/client-cpp/src/assembly/client-cpp.xml b/iotdb-client/client-cpp/src/assembly/client-cpp.xml
index af7184f..3a6a631 100644
--- a/iotdb-client/client-cpp/src/assembly/client-cpp.xml
+++ b/iotdb-client/client-cpp/src/assembly/client-cpp.xml
@@ -52,6 +52,8 @@
             <directory>${project.build.directory}/package-metadata</directory>
             <includes>
                 <include>third_party/DEPENDENCIES.md</include>
+                <include>third_party/NOTICE</include>
+                <include>third_party/licenses/**</include>
             </includes>
             <outputDirectory>${file.separator}</outputDirectory>
         </fileSet>
diff --git a/iotdb-client/client-cpp/src/assembly/package-metadata/third_party/DEPENDENCIES.md b/iotdb-client/client-cpp/src/assembly/package-metadata/third_party/DEPENDENCIES.md
index e921c7e..e321c6f 100644
--- a/iotdb-client/client-cpp/src/assembly/package-metadata/third_party/DEPENDENCIES.md
+++ b/iotdb-client/client-cpp/src/assembly/package-metadata/third_party/DEPENDENCIES.md
@@ -20,15 +20,28 @@
 -->
 # Third-party Dependencies
 
-The release library is built with the following third-party components. Some
-components are linked into the produced IoTDB C++ session library; this file is
-included for provenance.
+## Redistributed in this package
+
+These components are statically linked into the `iotdb_session` library, or
+bundled as shared libraries, and are therefore part of the binary distribution.
+Their licenses are Category A (Apache-2.0 / Boost). Attribution is provided in
+the [`NOTICE`](NOTICE) file in this directory; non-Apache license texts are under
+[`licenses/`](licenses). Apache-2.0 components are covered by the top-level
+`LICENSE` file.
+
+| Component | Version | How | License |
+| --- | --- | --- | --- |
+| Apache Thrift | 0.23.0 | statically linked | Apache License 2.0 |
+| Boost | 1.60.0 on Linux/Windows, 1.84.0 on macOS by default | statically linked (header-only) | Boost Software License 1.0 |
+| OpenSSL | 3.x: system OpenSSL 3.x when present, else 3.5.0 built from source (`WITH_SSL=ON`, default) | bundled shared libs in `lib/` | Apache License 2.0 |
+
+## Build-time only (not redistributed)
+
+These tools are used only to build Thrift / generate code; none of their code
+is included in the distributed library.
 
 | Component | Version | License |
 | --- | --- | --- |
-| Apache Thrift | 0.21.0 | Apache License 2.0 |
-| Boost | 1.60.0 on Linux/Windows, 1.84.0 on macOS by default | Boost Software License 1.0 |
-| OpenSSL | 3.5.0 when `WITH_SSL=ON` | Apache License 2.0 |
 | GNU m4 | 1.4.19 on Linux build bootstrap | GPL-3.0-or-later |
 | GNU flex | 2.6.4 on Linux build bootstrap | BSD-style flex license |
 | GNU bison | 3.8 on Linux build bootstrap | GPL-3.0-or-later |
diff --git a/iotdb-client/client-cpp/src/assembly/package-metadata/third_party/NOTICE b/iotdb-client/client-cpp/src/assembly/package-metadata/third_party/NOTICE
new file mode 100644
index 0000000..4da431f
--- /dev/null
+++ b/iotdb-client/client-cpp/src/assembly/package-metadata/third_party/NOTICE
@@ -0,0 +1,34 @@
+Apache IoTDB C++ Session Client
+Bundled / statically linked third-party components
+==================================================
+
+In addition to the Apache IoTDB code (covered by the top-level LICENSE and
+NOTICE files), this binary distribution statically links or bundles the
+third-party components listed below. Components licensed under the Apache
+License, Version 2.0 are covered by the top-level LICENSE file; other license
+texts are reproduced under third_party/licenses/.
+
+------------------------------------------------------------------------------
+Apache Thrift  (statically linked into the iotdb_session library)
+Licensed under the Apache License, Version 2.0 (see the top-level LICENSE).
+
+Apache Thrift
+Copyright (C) 2006 - 2019, The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+------------------------------------------------------------------------------
+OpenSSL  (bundled shared libraries: libssl / libcrypto, present only when the
+SDK is built with SSL support)
+Copyright 1999-2025 The OpenSSL Project Authors. All Rights Reserved.
+Licensed under the Apache License, Version 2.0 (see the top-level LICENSE).
+
+------------------------------------------------------------------------------
+Boost C++ Libraries  (header-only; used at build time to compile Apache Thrift
+and the iotdb_session library, so portions may be inlined into the shipped
+binary)
+Distributed under the Boost Software License, Version 1.0
+(see third_party/licenses/LICENSE-Boost-1.0). Here "Version 1.0" is the license
+version -- the Boost Software License has only ever had this single version --
+and is unrelated to the Boost library release that was compiled in.
diff --git a/iotdb-client/client-cpp/src/assembly/package-metadata/third_party/licenses/LICENSE-Boost-1.0 b/iotdb-client/client-cpp/src/assembly/package-metadata/third_party/licenses/LICENSE-Boost-1.0
new file mode 100644
index 0000000..36b7cd9
--- /dev/null
+++ b/iotdb-client/client-cpp/src/assembly/package-metadata/third_party/licenses/LICENSE-Boost-1.0
@@ -0,0 +1,23 @@
+Boost Software License - Version 1.0 - August 17th, 2003
+
+Permission is hereby granted, free of charge, to any person or organization
+obtaining a copy of the software and accompanying documentation covered by
+this license (the "Software") to use, reproduce, display, distribute,
+execute, and transmit the Software, and to prepare derivative works of the
+Software, and to permit third-parties to whom the Software is furnished to
+do so, all subject to the following:
+
+The copyright notices in the Software and this entire statement, including
+the above license grant, this restriction and the following disclaimer,
+must be included in all copies of the Software, in whole or in part, and
+all derivative works of the Software, unless such copies or derivative
+works are solely in the form of machine-executable object code generated by
+a source language processor.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
+SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
+FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
+ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+DEALINGS IN THE SOFTWARE.
diff --git a/iotdb-client/client-cpp/third-party/README.md b/iotdb-client/client-cpp/third-party/README.md
index 313a6fb..4cbdd1e 100644
--- a/iotdb-client/client-cpp/third-party/README.md
+++ b/iotdb-client/client-cpp/third-party/README.md
@@ -68,8 +68,8 @@
 
 | Platform   | Typical files |
 |------------|---------------|
-| `linux/`   | `thrift-0.21.0.tar.gz`, `boost_1_60_0.tar.gz`, `m4-1.4.19.tar.gz`, `flex-2.6.4.tar.gz`, `bison-3.8.tar.gz` (+ `openssl-3.5.0.tar.gz` when `WITH_SSL=ON`) |
-| `mac/`     | `thrift-0.21.0.tar.gz`, `boost_1_60_0.tar.gz` (Xcode CLT usually provides m4/flex/bison) |
-| `windows/` | `thrift-0.21.0.tar.gz`, `boost_1_60_0.tar.gz`, `win_flex_bison-2.5.25.zip` (or any `win_flex_bison*.zip`; skip if flex/bison already on `PATH`) |
+| `linux/`   | `thrift-0.23.0.tar.gz`, `boost_1_60_0.tar.gz`, `m4-1.4.19.tar.gz`, `flex-2.6.4.tar.gz`, `bison-3.8.tar.gz` (+ `openssl-3.5.0.tar.gz` only when `WITH_SSL=ON` and no system OpenSSL is present) |
+| `mac/`     | `thrift-0.23.0.tar.gz`, `boost_1_60_0.tar.gz` (Xcode CLT usually provides m4/flex/bison) |
+| `windows/` | `thrift-0.23.0.tar.gz`, `boost_1_60_0.tar.gz`, `win_flex_bison-2.5.25.zip` (or any `win_flex_bison*.zip`; skip if flex/bison already on `PATH`) |
 
 Download URLs: see the *Offline build* table in [`README.md`](../README.md).
diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/CommonOperatorContext.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/CommonOperatorContext.java
index 2c99fa5..0ffe447 100644
--- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/CommonOperatorContext.java
+++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/CommonOperatorContext.java
@@ -103,6 +103,21 @@
     this.totalExecutionTimeInNanos += executionTimeInNanos;
   }
 
+  public void recordScanAggregationFromRawDataCost(long costTimeInNanos) {
+    // calc-commons operators can run in tests or standalone contexts that are not backed by a
+    // DataNode FragmentInstanceContext. DataNode OperatorContext overrides this to forward costs.
+  }
+
+  public void recordScanAggregationFromStatisticsCost(long costTimeInNanos) {
+    // calc-commons operators can run in tests or standalone contexts that are not backed by a
+    // DataNode FragmentInstanceContext. DataNode OperatorContext overrides this to forward costs.
+  }
+
+  public void recordAggregationOperatorFromRawDataCost(long costTimeInNanos) {
+    // calc-commons operators can run in tests or standalone contexts that are not backed by a
+    // DataNode FragmentInstanceContext. DataNode OperatorContext overrides this to forward costs.
+  }
+
   public void recordNextCalled() {
     this.nextCalledCount++;
   }
diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/AggregationOperator.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/AggregationOperator.java
index e65ee82..047894f 100644
--- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/AggregationOperator.java
+++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/AggregationOperator.java
@@ -92,9 +92,7 @@
         return null;
       }
 
-      for (TableAggregator aggregator : aggregators) {
-        aggregator.processBlock(block);
-      }
+      processBlock(block);
 
       return null;
     } else {
@@ -127,6 +125,17 @@
     return operatorContext;
   }
 
+  private void processBlock(TsBlock block) {
+    long startTime = System.nanoTime();
+    try {
+      for (TableAggregator aggregator : aggregators) {
+        aggregator.processBlock(block);
+      }
+    } finally {
+      operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() - startTime);
+    }
+  }
+
   @Override
   public long calculateMaxPeekMemory() {
     return Math.max(
diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/TableAggregator.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/TableAggregator.java
index 1df3f6f..c7a7b6b 100644
--- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/TableAggregator.java
+++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/TableAggregator.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.calc.execution.operator.source.relational.aggregation;
 
-import org.apache.iotdb.calc.metric.QueryExecutionMetricSet;
 import org.apache.iotdb.calc.plan.planner.CommonOperatorUtils;
 import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.AggregationNode;
 
@@ -36,13 +35,9 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
-import static org.apache.iotdb.calc.metric.QueryExecutionMetricSet.AGGREGATION_FROM_RAW_DATA;
 
 public class TableAggregator {
 
-  public static final QueryExecutionMetricSet QUERY_EXECUTION_METRICS =
-      QueryExecutionMetricSet.getInstance();
-
   private final TableAccumulator accumulator;
   private final AggregationNode.Step step;
   private final TSDataType outputType;
@@ -70,34 +65,28 @@
   }
 
   public void processBlock(TsBlock block) {
-    long startTime = System.nanoTime();
-    try {
-      Column[] arguments = block.getColumns(inputChannels);
+    Column[] arguments = block.getColumns(inputChannels);
 
-      // process count(*)
-      if (arguments.length == 0) {
-        arguments =
-            new Column[] {
-              new RunLengthEncodedColumn(
-                  CommonOperatorUtils.TIME_COLUMN_TEMPLATE, block.getPositionCount())
-            };
+    // process count(*)
+    if (arguments.length == 0) {
+      arguments =
+          new Column[] {
+            new RunLengthEncodedColumn(
+                CommonOperatorUtils.TIME_COLUMN_TEMPLATE, block.getPositionCount())
+          };
+    }
+
+    if (step.isInputRaw()) {
+      // Use select-all AggregationMask here because filter of Agg-Function is not supported now
+      AggregationMask mask = AggregationMask.createSelectAll(block.getPositionCount());
+
+      if (maskChannel.isPresent()) {
+        mask.applyMaskBlock(block.getColumn(maskChannel.getAsInt()));
       }
 
-      if (step.isInputRaw()) {
-        // Use select-all AggregationMask here because filter of Agg-Function is not supported now
-        AggregationMask mask = AggregationMask.createSelectAll(block.getPositionCount());
-
-        if (maskChannel.isPresent()) {
-          mask.applyMaskBlock(block.getColumn(maskChannel.getAsInt()));
-        }
-
-        accumulator.addInput(arguments, mask);
-      } else {
-        accumulator.addIntermediate(arguments[0]);
-      }
-    } finally {
-      QUERY_EXECUTION_METRICS.recordExecutionCost(
-          AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime);
+      accumulator.addInput(arguments, mask);
+    } else {
+      accumulator.addIntermediate(arguments[0]);
     }
   }
 
diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/StreamingAggregationOperator.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/StreamingAggregationOperator.java
index 7609ac3..9808a1b 100644
--- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/StreamingAggregationOperator.java
+++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/StreamingAggregationOperator.java
@@ -176,8 +176,13 @@
 
   private void addRowsToAggregators(TsBlock page, int startPosition, int endPosition) {
     TsBlock region = page.getRegion(startPosition, endPosition - startPosition + 1);
-    for (TableAggregator aggregator : aggregators) {
-      aggregator.processBlock(region);
+    long startTime = System.nanoTime();
+    try {
+      for (TableAggregator aggregator : aggregators) {
+        aggregator.processBlock(region);
+      }
+    } finally {
+      operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() - startTime);
     }
   }
 
diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/builder/InMemoryHashAggregationBuilder.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/builder/InMemoryHashAggregationBuilder.java
index f372717..07e4cb9 100644
--- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/builder/InMemoryHashAggregationBuilder.java
+++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/builder/InMemoryHashAggregationBuilder.java
@@ -140,8 +140,13 @@
       operatorContext.recordSpecifiedInfo(MAX_GROUP_NUMBER, Long.toString(groupCount));
       maxGroupNumber = groupCount;
     }
-    for (GroupedAggregator groupedAggregator : groupedAggregators) {
-      groupedAggregator.processBlock(groupCount, groupByIdBlock, block);
+    long startTime = System.nanoTime();
+    try {
+      for (GroupedAggregator groupedAggregator : groupedAggregators) {
+        groupedAggregator.processBlock(groupCount, groupByIdBlock, block);
+      }
+    } finally {
+      operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() - startTime);
     }
   }
 
diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/metric/QueryExecutionMetricSet.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/metric/QueryExecutionMetricSet.java
index 636b11b..5452e54 100644
--- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/metric/QueryExecutionMetricSet.java
+++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/metric/QueryExecutionMetricSet.java
@@ -143,8 +143,11 @@
   // region query aggregation
   public static final String AGGREGATION_FROM_RAW_DATA = "aggregation_from_raw_data";
   public static final String AGGREGATION_FROM_STATISTICS = "aggregation_from_statistics";
+  public static final String AGGREGATION_OPERATOR_FROM_RAW_DATA =
+      "aggregation_operator_from_raw_data";
   private Timer aggregationFromRawDataTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
   private Timer aggregationFromStatisticsTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer aggregationOperatorFromRawDataTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
 
   private void bindQueryAggregation(AbstractMetricService metricService) {
     aggregationFromRawDataTimer =
@@ -156,12 +159,19 @@
             MetricLevel.IMPORTANT,
             Tag.FROM.toString(),
             "statistics");
+    aggregationOperatorFromRawDataTimer =
+        metricService.getOrCreateTimer(
+            Metric.AGGREGATION.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.FROM.toString(),
+            "raw_data_operator");
   }
 
   private void unbindQueryAggregation(AbstractMetricService metricService) {
     aggregationFromRawDataTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
     aggregationFromStatisticsTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
-    Arrays.asList("raw_data", "statistics")
+    aggregationOperatorFromRawDataTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    Arrays.asList("raw_data", "statistics", "raw_data_operator")
         .forEach(
             from ->
                 metricService.remove(
@@ -213,6 +223,9 @@
       case AGGREGATION_FROM_STATISTICS:
         aggregationFromStatisticsTimer.update(costTimeInNanos, TimeUnit.NANOSECONDS);
         break;
+      case AGGREGATION_OPERATOR_FROM_RAW_DATA:
+        aggregationOperatorFromRawDataTimer.update(costTimeInNanos, TimeUnit.NANOSECONDS);
+        break;
       default:
         break;
     }
diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
index f834e41..d542999 100644
--- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
+++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
@@ -21,6 +21,7 @@
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.disk.strategy.DirectoryStrategyType;
 
 import java.util.List;
 import java.util.Optional;
@@ -35,6 +36,7 @@
   private final RatisConfig ratisConfig;
   private final IoTConsensusConfig iotConsensusConfig;
   private final IoTConsensusV2Config iotConsensusV2Config;
+  private final DirectoryStrategyType directoryStrategyType;
 
   private ConsensusConfig(
       TEndPoint thisNode,
@@ -44,7 +46,8 @@
       TConsensusGroupType consensusGroupType,
       RatisConfig ratisConfig,
       IoTConsensusConfig iotConsensusConfig,
-      IoTConsensusV2Config iotConsensusV2Config) {
+      IoTConsensusV2Config iotConsensusV2Config,
+      DirectoryStrategyType directoryStrategyType) {
     this.thisNodeEndPoint = thisNode;
     this.thisNodeId = thisNodeId;
     this.storageDir = storageDir;
@@ -53,6 +56,7 @@
     this.ratisConfig = ratisConfig;
     this.iotConsensusConfig = iotConsensusConfig;
     this.iotConsensusV2Config = iotConsensusV2Config;
+    this.directoryStrategyType = directoryStrategyType;
   }
 
   public TEndPoint getThisNodeEndPoint() {
@@ -87,6 +91,10 @@
     return iotConsensusV2Config;
   }
 
+  public DirectoryStrategyType getDirectoryStrategyType() {
+    return directoryStrategyType;
+  }
+
   public static ConsensusConfig.Builder newBuilder() {
     return new ConsensusConfig.Builder();
   }
@@ -101,6 +109,8 @@
     private RatisConfig ratisConfig;
     private IoTConsensusConfig iotConsensusConfig;
     private IoTConsensusV2Config iotConsensusV2Config;
+    private DirectoryStrategyType directoryStrategyType =
+        DirectoryStrategyType.MIN_FOLDER_OCCUPIED_SPACE_FIRST_STRATEGY;
 
     public ConsensusConfig build() {
       return new ConsensusConfig(
@@ -113,7 +123,8 @@
           Optional.ofNullable(iotConsensusConfig)
               .orElseGet(() -> IoTConsensusConfig.newBuilder().build()),
           Optional.ofNullable(iotConsensusV2Config)
-              .orElseGet(() -> IoTConsensusV2Config.newBuilder().build()));
+              .orElseGet(() -> IoTConsensusV2Config.newBuilder().build()),
+          directoryStrategyType);
     }
 
     public Builder setThisNode(TEndPoint thisNode) {
@@ -155,5 +166,10 @@
       this.iotConsensusV2Config = iotConsensusV2Config;
       return this;
     }
+
+    public Builder setDirectoryStrategyType(DirectoryStrategyType directoryStrategyType) {
+      this.directoryStrategyType = directoryStrategyType;
+      return this;
+    }
   }
 }
diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
index 7720cf5..c6ca9a7 100644
--- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
+++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
@@ -325,6 +325,7 @@
     private final long regionMigrationSpeedLimitBytesPerSecond;
     private final long subscriptionWalRetentionSizeInBytes;
     private final long subscriptionWalRetentionTimeMs;
+    private final long snapshotTransmissionProgressLogIntervalMs;
 
     private Replication(
         int maxLogEntriesNumPerBatch,
@@ -342,7 +343,8 @@
         double maxMemoryRatioForQueue,
         long regionMigrationSpeedLimitBytesPerSecond,
         long subscriptionWalRetentionSizeInBytes,
-        long subscriptionWalRetentionTimeMs) {
+        long subscriptionWalRetentionTimeMs,
+        long snapshotTransmissionProgressLogIntervalMs) {
       this.maxLogEntriesNumPerBatch = maxLogEntriesNumPerBatch;
       this.maxSizePerBatch = maxSizePerBatch;
       this.maxPendingBatchesNum = maxPendingBatchesNum;
@@ -359,6 +361,7 @@
       this.regionMigrationSpeedLimitBytesPerSecond = regionMigrationSpeedLimitBytesPerSecond;
       this.subscriptionWalRetentionSizeInBytes = subscriptionWalRetentionSizeInBytes;
       this.subscriptionWalRetentionTimeMs = subscriptionWalRetentionTimeMs;
+      this.snapshotTransmissionProgressLogIntervalMs = snapshotTransmissionProgressLogIntervalMs;
     }
 
     public int getMaxLogEntriesNumPerBatch() {
@@ -425,6 +428,10 @@
       return subscriptionWalRetentionTimeMs;
     }
 
+    public long getSnapshotTransmissionProgressLogIntervalMs() {
+      return snapshotTransmissionProgressLogIntervalMs;
+    }
+
     public static Replication.Builder newBuilder() {
       return new Replication.Builder();
     }
@@ -450,6 +457,11 @@
       private long regionMigrationSpeedLimitBytesPerSecond = 32 * 1024 * 1024L;
       private long subscriptionWalRetentionSizeInBytes = 0;
       private long subscriptionWalRetentionTimeMs = -1L;
+      // Throttle the per-file snapshot-transmission progress log to at most once per this interval;
+      // a snapshot may contain hundreds of thousands of files, so one INFO line per file is itself
+      // a
+      // heavy IO/string-building cost. A value <= 0 logs every file.
+      private long snapshotTransmissionProgressLogIntervalMs = 5000L;
 
       public Replication.Builder setMaxLogEntriesNumPerBatch(int maxLogEntriesNumPerBatch) {
         this.maxLogEntriesNumPerBatch = maxLogEntriesNumPerBatch;
@@ -535,6 +547,12 @@
         return this;
       }
 
+      public Builder setSnapshotTransmissionProgressLogIntervalMs(
+          long snapshotTransmissionProgressLogIntervalMs) {
+        this.snapshotTransmissionProgressLogIntervalMs = snapshotTransmissionProgressLogIntervalMs;
+        return this;
+      }
+
       public Replication build() {
         return new Replication(
             maxLogEntriesNumPerBatch,
@@ -552,7 +570,8 @@
             maxMemoryRatioForQueue,
             regionMigrationSpeedLimitBytesPerSecond,
             subscriptionWalRetentionSizeInBytes,
-            subscriptionWalRetentionTimeMs);
+            subscriptionWalRetentionTimeMs,
+            snapshotTransmissionProgressLogIntervalMs);
       }
     }
   }
diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index dde577e..946f6de 100644
--- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -26,6 +26,7 @@
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.disk.strategy.DirectoryStrategyType;
 import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.request.IConsensusRequest;
@@ -97,6 +98,7 @@
   private final int thisNodeId;
   private final File storageDir;
   private final List<String> recvSnapshotDirs;
+  private final DirectoryStrategyType recvFolderStrategyType;
   private final IStateMachine.Registry registry;
   private final Map<ConsensusGroupId, IoTConsensusServerImpl> stateMachineMap =
       new ConcurrentHashMap<>();
@@ -127,6 +129,7 @@
     this.thisNodeId = config.getThisNodeId();
     this.storageDir = new File(config.getStorageDir());
     this.recvSnapshotDirs = config.getRecvSnapshotDirs();
+    this.recvFolderStrategyType = config.getDirectoryStrategyType();
     this.config = config.getIotConsensusConfig();
     this.registry = registry;
     this.service = new IoTConsensusRPCService(thisNode, config.getIotConsensusConfig());
@@ -195,6 +198,7 @@
               new IoTConsensusServerImpl(
                   path.toString(),
                   recvSnapshotDirs,
+                  recvFolderStrategyType,
                   new Peer(consensusGroupId, thisNodeId, thisNode),
                   new TreeSet<>(),
                   registry.apply(consensusGroupId),
@@ -309,6 +313,7 @@
                         new IoTConsensusServerImpl(
                             path,
                             recvSnapshotDirs,
+                            recvFolderStrategyType,
                             new Peer(groupId, thisNodeId, thisNode),
                             new TreeSet<>(peers),
                             registry.apply(groupId),
diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 5b818db..07d1f25 100644
--- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -130,6 +130,18 @@
   private final Condition stateMachineCondition = stateMachineLock.newCondition();
   private final String storageDir;
   private FolderManager recvFolderManager = null;
+
+  /**
+   * Per-snapshotId map of TsFile group key ({@code fileKey}) to the chosen receive folder. It keeps
+   * all companion files of one TsFile ({@code .tsfile}/{@code .tsfile.resource}/{@code
+   * .tsfile.mods2}/...) in the same receive folder, so the load phase can hard-link them inside a
+   * single data dir instead of falling back to a cross-disk copy. The {@code fileKey} rule matches
+   * {@code SnapshotLoader#createLinksFromSnapshotToSourceDir} so grouping is consistent end to end.
+   * Entries are removed once the snapshot is loaded or cleaned up.
+   */
+  private final ConcurrentHashMap<String, ConcurrentHashMap<String, String>>
+      snapshotReceiveFolderMap = new ConcurrentHashMap<>();
+
   private final TreeSet<Peer> configuration;
   private final AtomicLong searchIndex;
   private final LogDispatcher logDispatcher;
@@ -172,6 +184,7 @@
   public IoTConsensusServerImpl(
       String storageDir,
       List<String> recvSnapshotDirs,
+      DirectoryStrategyType recvFolderStrategyType,
       Peer thisNode,
       TreeSet<Peer> configuration,
       IStateMachine stateMachine,
@@ -191,9 +204,7 @@
       snapshotDirs.add(storageDir);
     }
 
-    this.recvFolderManager =
-        new FolderManager(
-            snapshotDirs, DirectoryStrategyType.MIN_FOLDER_OCCUPIED_SPACE_FIRST_STRATEGY);
+    this.recvFolderManager = new FolderManager(snapshotDirs, recvFolderStrategyType);
     this.thisNode = thisNode;
     this.stateMachine = stateMachine;
     this.cacheQueueMap = new ConcurrentHashMap<>();
@@ -371,33 +382,42 @@
   public void transmitSnapshot(Peer targetPeer) throws ConsensusGroupModifyPeerException {
     File snapshotDir = new File(storageDir, newSnapshotDirName);
     List<File> snapshotPaths = stateMachine.getSnapshotFiles(snapshotDir);
-    AtomicLong snapshotSizeSumAtomic = new AtomicLong();
-    StringBuilder allFilesStr = new StringBuilder();
-    snapshotPaths.forEach(
-        file -> {
-          long fileSize = file.length();
-          snapshotSizeSumAtomic.addAndGet(fileSize);
-          allFilesStr
-              .append("\n")
-              .append(file.getName())
-              .append(" ")
-              .append(humanReadableByteCountSI(fileSize));
-        });
-    final long snapshotSizeSum = snapshotSizeSumAtomic.get();
+    long snapshotSizeSum = 0;
+    for (File file : snapshotPaths) {
+      snapshotSizeSum += file.length();
+    }
     long transitedSnapshotSizeSum = 0;
     long transitedFilesNum = 0;
     long startTime = System.nanoTime();
+    long lastProgressLogTime = startTime;
+    // Throttle the per-file progress log to at most once per this interval; a snapshot may contain
+    // hundreds of thousands of files, so one INFO line per file is itself a heavy cost.
+    long progressLogIntervalNs =
+        TimeUnit.MILLISECONDS.toNanos(
+            config.getReplication().getSnapshotTransmissionProgressLogIntervalMs());
     logger.info(
         IoTConsensusMessages.SNAPSHOT_TRANSMISSION_START,
         snapshotPaths.size(),
         humanReadableByteCountSI(snapshotSizeSum),
         snapshotDir);
-    logger.info(IoTConsensusMessages.SNAPSHOT_TRANSMISSION_ALL_FILES, allFilesStr);
+    if (logger.isDebugEnabled()) {
+      StringBuilder allFilesStr = new StringBuilder();
+      for (File file : snapshotPaths) {
+        allFilesStr
+            .append("\n")
+            .append(file.getName())
+            .append(" ")
+            .append(humanReadableByteCountSI(file.length()));
+      }
+      logger.debug(IoTConsensusMessages.SNAPSHOT_TRANSMISSION_ALL_FILES, allFilesStr);
+    }
+    ByteBuffer fragmentBuffer =
+        ByteBuffer.allocate(SnapshotFragmentReader.DEFAULT_FILE_FRAGMENT_SIZE);
     try (SyncIoTConsensusServiceClient client =
         syncClientManager.borrowClient(targetPeer.getEndpoint())) {
       for (File file : snapshotPaths) {
         SnapshotFragmentReader reader =
-            new SnapshotFragmentReader(newSnapshotDirName, file.toPath());
+            new SnapshotFragmentReader(newSnapshotDirName, file.toPath(), fragmentBuffer);
         try {
           while (reader.hasNext()) {
             // TODO: zero copy ?
@@ -412,16 +432,20 @@
           }
           transitedSnapshotSizeSum += reader.getTotalReadSize();
           transitedFilesNum++;
-          logger.info(
-              IoTConsensusMessages.SNAPSHOT_TRANSMISSION_PROGRESS,
-              newSnapshotDirName,
-              transitedFilesNum,
-              snapshotPaths.size(),
-              humanReadableByteCountSI(transitedSnapshotSizeSum),
-              humanReadableByteCountSI(snapshotSizeSum),
-              CommonDateTimeUtils.convertMillisecondToDurationStr(
-                  (System.nanoTime() - startTime) / 1_000_000),
-              file);
+          long now = System.nanoTime();
+          if (now - lastProgressLogTime >= progressLogIntervalNs
+              || transitedFilesNum == snapshotPaths.size()) {
+            lastProgressLogTime = now;
+            logger.info(
+                IoTConsensusMessages.SNAPSHOT_TRANSMISSION_PROGRESS,
+                newSnapshotDirName,
+                transitedFilesNum,
+                snapshotPaths.size(),
+                humanReadableByteCountSI(transitedSnapshotSizeSum),
+                humanReadableByteCountSI(snapshotSizeSum),
+                CommonDateTimeUtils.convertMillisecondToDurationStr((now - startTime) / 1_000_000),
+                file);
+          }
         } finally {
           reader.close();
         }
@@ -448,11 +472,32 @@
         return;
       }
 
-      recvFolderManager.getNextWithRetry(
-          folder -> {
-            writeSnapshotFragment(getSnapshotPath(folder, targetFilePath), fileChunk, fileOffset);
-            return null;
-          });
+      // Place every companion file of the same TsFile into one receive folder. The fileKey rule
+      // (filename before the first '.') matches SnapshotLoader so the group stays together. The
+      // folder is selected at most once per fileKey via computeIfAbsent, which is safe under the
+      // concurrent IoTConsensusRPC-Processor receivers.
+      String fileKey = getSnapshotFileKey(targetFilePath);
+      ConcurrentHashMap<String, String> folderMap =
+          snapshotReceiveFolderMap.computeIfAbsent(snapshotId, k -> new ConcurrentHashMap<>());
+      String folder;
+      try {
+        folder =
+            folderMap.computeIfAbsent(
+                fileKey,
+                k -> {
+                  try {
+                    return recvFolderManager.getNextFolder();
+                  } catch (DiskSpaceInsufficientException ex) {
+                    throw new RuntimeException(ex);
+                  }
+                });
+      } catch (RuntimeException re) {
+        if (re.getCause() instanceof DiskSpaceInsufficientException) {
+          throw (DiskSpaceInsufficientException) re.getCause();
+        }
+        throw re;
+      }
+      writeSnapshotFragment(getSnapshotPath(folder, targetFilePath), fileChunk, fileOffset);
     } catch (IOException e) {
       throw new ConsensusGroupModifyPeerException(
           String.format(IoTConsensusMessages.ERROR_RECEIVING_SNAPSHOT, snapshotId), e);
@@ -493,6 +538,14 @@
     return originalFilePath.substring(originalFilePath.indexOf(snapshotId));
   }
 
+  /**
+   * Groups companion files of one TsFile. Uses the same rule as {@code
+   * SnapshotLoader#createLinksFromSnapshotToSourceDir}: the file name up to the first {@code '.'}.
+   */
+  private String getSnapshotFileKey(String targetFilePath) {
+    return new File(targetFilePath).getName().split("\\.")[0];
+  }
+
   private void clearOldSnapshot() {
     File directory = new File(storageDir);
     File[] versionFiles = directory.listFiles((dir, name) -> name.startsWith(SNAPSHOT_DIR_NAME));
@@ -526,17 +579,22 @@
     // Note: an empty region produces a snapshot with zero fragments, so none of the receive folders
     // contains it. That is a legitimate (no-op) load, not a failure, so an absent snapshot must not
     // be reported as failure here.
-    List<File> snapshotDirs = new ArrayList<>();
-    for (String dir : recvFolderManager.getFolders()) {
-      File snapshotDir = getSnapshotPath(dir, snapshotId);
-      if (snapshotDir.exists()) {
-        snapshotDirs.add(snapshotDir);
+    try {
+      List<File> snapshotDirs = new ArrayList<>();
+      for (String dir : recvFolderManager.getFolders()) {
+        File snapshotDir = getSnapshotPath(dir, snapshotId);
+        if (snapshotDir.exists()) {
+          snapshotDirs.add(snapshotDir);
+        }
       }
+      if (snapshotDirs.isEmpty()) {
+        return true;
+      }
+      return stateMachine.loadSnapshot(snapshotDirs);
+    } finally {
+      // Receiving is finished for this snapshot; drop its receive-folder mapping.
+      snapshotReceiveFolderMap.remove(snapshotId);
     }
-    if (snapshotDirs.isEmpty()) {
-      return true;
-    }
-    return stateMachine.loadSnapshot(snapshotDirs);
   }
 
   private File getSnapshotPath(String curStorageDir, String snapshotRelativePath) {
@@ -1178,6 +1236,7 @@
   }
 
   public void cleanupSnapshot(String snapshotId) throws ConsensusGroupModifyPeerException {
+    snapshotReceiveFolderMap.remove(snapshotId);
     List<String> allDirs = new ArrayList<>(Collections.singletonList(storageDir));
     allDirs.addAll(recvFolderManager.getFolders());
     for (String dir : allDirs) {
diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java
index 3331829..f9c905e 100644
--- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java
+++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java
@@ -27,7 +27,7 @@
 
 public class SnapshotFragmentReader {
 
-  private static final int DEFAULT_FILE_FRAGMENT_SIZE = 10 * 1024 * 1024;
+  public static final int DEFAULT_FILE_FRAGMENT_SIZE = 10 * 1024 * 1024;
   private final String snapshotId;
   private final String filePath;
   private final SeekableByteChannel fileChannel;
@@ -36,12 +36,20 @@
   private long totalReadSize;
   private SnapshotFragment cachedSnapshotFragment;
 
-  public SnapshotFragmentReader(String snapshotId, Path path) throws IOException {
+  /**
+   * The {@code buf} is supplied (and owned) by the caller so a single 10MB buffer can be reused
+   * across every file of a snapshot transmission. Allocating a fresh 10MB buffer per file is
+   * extremely wasteful when a snapshot contains hundreds of thousands of tiny files, multiplying GC
+   * pressure and allocation cost. The buffer is fully reset via {@link ByteBuffer#clear()} on each
+   * {@link #hasNext()} call, and each fragment is serialized synchronously before the next read, so
+   * sharing it across files (and across readers) is safe.
+   */
+  public SnapshotFragmentReader(String snapshotId, Path path, ByteBuffer buf) throws IOException {
     this.snapshotId = snapshotId;
     this.filePath = path.toAbsolutePath().toString();
     this.fileSize = Files.size(path);
     this.fileChannel = Files.newByteChannel(path);
-    this.buf = ByteBuffer.allocate(DEFAULT_FILE_FRAGMENT_SIZE);
+    this.buf = buf;
   }
 
   public boolean hasNext() throws IOException {
diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReaderTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReaderTest.java
new file mode 100644
index 0000000..36048d4
--- /dev/null
+++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReaderTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.iot.snapshot;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Random;
+
+/**
+ * Regression test for the snapshot-transmission read path. The sending side reuses a single,
+ * caller-supplied {@link ByteBuffer} across every file of a snapshot (instead of allocating a fresh
+ * 10MB buffer per file), so this verifies that (1) a single buffer reused across many readers still
+ * reconstructs every file byte-for-byte, and (2) the buffer is genuinely shared rather than
+ * re-allocated per file.
+ */
+public class SnapshotFragmentReaderTest {
+
+  @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Test
+  public void reusedBufferReadsEveryFileCorrectly() throws IOException {
+    final Random random = new Random(42);
+    // Deliberately use a tiny buffer so files span multiple fragments, and use file sizes that are
+    // not multiples of the buffer size to exercise the partial-final-fragment path.
+    final int bufferSize = 16;
+    final int[] fileSizes = {0, 1, bufferSize, bufferSize + 1, 5 * bufferSize + 7};
+
+    byte[][] contents = new byte[fileSizes.length][];
+    Path[] paths = new Path[fileSizes.length];
+    for (int i = 0; i < fileSizes.length; i++) {
+      contents[i] = new byte[fileSizes[i]];
+      random.nextBytes(contents[i]);
+      paths[i] = temporaryFolder.newFile("file-" + i).toPath();
+      Files.write(paths[i], contents[i]);
+    }
+
+    final ByteBuffer sharedBuffer = ByteBuffer.allocate(bufferSize);
+    ByteBuffer firstReusedChunk = null;
+
+    for (int i = 0; i < paths.length; i++) {
+      SnapshotFragmentReader reader = new SnapshotFragmentReader("snap", paths[i], sharedBuffer);
+      try {
+        ByteArrayOutputStream reconstructed = new ByteArrayOutputStream();
+        while (reader.hasNext()) {
+          SnapshotFragment fragment = reader.next();
+          // Every fragment must be backed by the one shared buffer, proving it is reused and not
+          // re-allocated per file.
+          Assert.assertSame(sharedBuffer, fragment.getFileChunk());
+          if (firstReusedChunk == null) {
+            firstReusedChunk = fragment.getFileChunk();
+          } else {
+            Assert.assertSame(firstReusedChunk, fragment.getFileChunk());
+          }
+
+          // Drain the fragment immediately, mirroring how the sender serializes each fragment
+          // synchronously before the next read overwrites the shared buffer.
+          ByteBuffer chunk = fragment.getFileChunk();
+          byte[] bytes = new byte[chunk.remaining()];
+          chunk.get(bytes);
+          reconstructed.write(bytes);
+        }
+        Assert.assertArrayEquals(
+            "File " + i + " was not reconstructed correctly",
+            contents[i],
+            reconstructed.toByteArray());
+        Assert.assertEquals(contents[i].length, reader.getTotalReadSize());
+      } finally {
+        reader.close();
+      }
+    }
+  }
+}
diff --git a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/StorageEngineMessages.java b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/StorageEngineMessages.java
index cb90fac..cc708e7 100644
--- a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/StorageEngineMessages.java
+++ b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/StorageEngineMessages.java
@@ -489,6 +489,8 @@
   public static final String RELEASE_DATA_CACHE_MEMORY_BLOCK = "Release Data Cache Memory Block {}";
   public static final String START_DATA_TYPE_CONVERSION_DOT = "Start data type conversion for LoadTsFileStatement: {}.";
   public static final String START_DATA_TYPE_CONVERSION = "Start data type conversion for LoadTsFileStatement: {}";
+  public static final String INTERRUPTED_WAITING_TABLET_CONVERSION_SLOT =
+      "Interrupted while waiting for tablet conversion slot: ";
   public static final String FAIL_TO_LOAD_TSFILE_TO_ACTIVE_DIR = "Fail to load tsfile to Active dir";
   public static final String FAIL_TO_LOAD_DISK_SPACE = "Fail to load disk space of file {}";
   public static final String LOAD_ACTIVE_LISTENING_DIR_NOT_SET = "Load active listening dir is not set.";
@@ -521,6 +523,8 @@
   public static final String ERROR_EXECUTING_ACTIVE_LOAD_JOB = "Error occurred when executing active load periodical job.";
   public static final String ACTIVE_LOAD_EXECUTOR_STARTED = "Active load periodical jobs executor is started successfully.";
   public static final String ACTIVE_LOAD_EXECUTOR_STOPPED = "Active load periodical jobs executor is stopped successfully.";
+  public static final String ACTIVE_LOAD_TEMPORARILY_UNAVAILABLE =
+      "Rejecting auto load tsfile {} (isGeneratedByPipe = {}) due to temporary unavailability, will retry later. Status: {}";
   public static final String ERROR_MOVING_FILE_TO_FAIL_DIR = "Error occurred during moving file {} to fail directory.";
   public static final String FAILED_COUNT_FILES_IN_FAIL_DIR = "Failed to count failed files in fail directory.";
 
diff --git a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/StorageEngineMessages.java b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/StorageEngineMessages.java
index e488c40..792909a 100644
--- a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/StorageEngineMessages.java
+++ b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/StorageEngineMessages.java
@@ -489,6 +489,8 @@
   public static final String RELEASE_DATA_CACHE_MEMORY_BLOCK = "释放数据缓存内存块 {}";
   public static final String START_DATA_TYPE_CONVERSION_DOT = "开始对 LoadTsFileStatement: {} 进行数据类型转换。";
   public static final String START_DATA_TYPE_CONVERSION = "开始对 LoadTsFileStatement: {} 进行数据类型转换";
+  public static final String INTERRUPTED_WAITING_TABLET_CONVERSION_SLOT =
+      "等待 tablet 转换槽位时被中断: ";
   public static final String FAIL_TO_LOAD_TSFILE_TO_ACTIVE_DIR = "加载 TsFile 到 Active 目录失败";
   public static final String FAIL_TO_LOAD_DISK_SPACE = "获取文件 {} 的磁盘空间失败";
   public static final String LOAD_ACTIVE_LISTENING_DIR_NOT_SET = "未设置加载 Active 监听目录。";
@@ -521,6 +523,8 @@
   public static final String ERROR_EXECUTING_ACTIVE_LOAD_JOB = "执行 Active 加载定期任务时发生错误。";
   public static final String ACTIVE_LOAD_EXECUTOR_STARTED = "Active 加载定期任务执行器已成功启动。";
   public static final String ACTIVE_LOAD_EXECUTOR_STOPPED = "Active 加载定期任务执行器已成功停止。";
+  public static final String ACTIVE_LOAD_TEMPORARILY_UNAVAILABLE =
+      "拒绝自动加载 TsFile {} (isGeneratedByPipe = {}),原因是系统暂时不可用,将稍后重试。状态: {}";
   public static final String ERROR_MOVING_FILE_TO_FAIL_DIR = "将文件 {} 移动到失败目录时发生错误。";
   public static final String FAILED_COUNT_FILES_IN_FAIL_DIR = "统计失败目录中的失败文件数量失败。";
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index c20122b..429efe5 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -89,7 +89,9 @@
   private static final Logger logger = LoggerFactory.getLogger(IoTDBConfig.class);
   private static final String MULTI_DIR_STRATEGY_PREFIX = "org.apache.iotdb.commons.disk.strategy.";
   private static final String[] CLUSTER_ALLOWED_MULTI_DIR_STRATEGIES =
-      new String[] {"SequenceStrategy", "MaxDiskUsableSpaceFirstStrategy"};
+      new String[] {
+        "SequenceStrategy", "MaxDiskUsableSpaceFirstStrategy", "MinFolderOccupiedSpaceFirstStrategy"
+      };
   private static final String DEFAULT_MULTI_DIR_STRATEGY = "SequenceStrategy";
 
   private static final String STORAGE_GROUP_MATCHER = "([a-zA-Z0-9`_.\\-\\u2E80-\\u9FFF]+)";
@@ -1099,6 +1101,9 @@
   private int maxPendingBatchesNum = 5;
   private double maxMemoryRatioForQueue = 0.6;
   private long regionMigrationSpeedLimitBytesPerSecond = 48 * 1024 * 1024L;
+  // Throttle the per-file snapshot-transmission progress log in IoTConsensus to at most once per
+  // this interval (ms). A value <= 0 logs every file.
+  private long dataRegionIotSnapshotTransmissionProgressLogIntervalMs = 5000L;
 
   // IoTConsensusV2 Config
   private int iotConsensusV2PipelineSize = 5;
@@ -1273,6 +1278,16 @@
     this.regionMigrationSpeedLimitBytesPerSecond = regionMigrationSpeedLimitBytesPerSecond;
   }
 
+  public long getDataRegionIotSnapshotTransmissionProgressLogIntervalMs() {
+    return dataRegionIotSnapshotTransmissionProgressLogIntervalMs;
+  }
+
+  public void setDataRegionIotSnapshotTransmissionProgressLogIntervalMs(
+      long dataRegionIotSnapshotTransmissionProgressLogIntervalMs) {
+    this.dataRegionIotSnapshotTransmissionProgressLogIntervalMs =
+        dataRegionIotSnapshotTransmissionProgressLogIntervalMs;
+  }
+
   public int getIotConsensusV2PipelineSize() {
     return iotConsensusV2PipelineSize;
   }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 59878f6..147c334 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -369,7 +369,8 @@
     try {
       conf.checkMultiDirStrategyClassName();
     } catch (Exception e) {
-      conf.setMultiDirStrategyClassName(oldMultiDirStrategyClassName.trim());
+      conf.setMultiDirStrategyClassName(
+          oldMultiDirStrategyClassName == null ? null : oldMultiDirStrategyClassName.trim());
       throw e;
     }
 
@@ -1285,6 +1286,12 @@
                 "region_migration_speed_limit_bytes_per_second",
                 ConfigurationFileUtils.getConfigurationDefaultValue(
                     "region_migration_speed_limit_bytes_per_second"))));
+    conf.setDataRegionIotSnapshotTransmissionProgressLogIntervalMs(
+        Long.parseLong(
+            properties.getProperty(
+                "data_region_iot_snapshot_transmission_progress_log_interval_ms",
+                ConfigurationFileUtils.getConfigurationDefaultValue(
+                    "data_region_iot_snapshot_transmission_progress_log_interval_ms"))));
     conf.setKeepSameDiskWhenLoadingSnapshot(
         Boolean.parseBoolean(
             properties.getProperty(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index 080cd9c..a4c8e00 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -25,6 +25,7 @@
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.disk.strategy.DirectoryStrategyType;
 import org.apache.iotdb.commons.memory.IMemoryBlock;
 import org.apache.iotdb.commons.memory.MemoryBlockType;
 import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
@@ -141,6 +142,9 @@
           .setThisNode(new TEndPoint(CONF.getInternalAddress(), CONF.getDataRegionConsensusPort()))
           .setStorageDir(CONF.getDataRegionConsensusDir())
           .setRecvSnapshotDirs(Arrays.asList(CONF.getLocalDataDirs()))
+          // IoTConsensus always balances received snapshot files by least occupied space,
+          // independent of the global dn_multi_dir_strategy.
+          .setDirectoryStrategyType(DirectoryStrategyType.MIN_FOLDER_OCCUPIED_SPACE_FIRST_STRATEGY)
           .setConsensusGroupType(TConsensusGroupType.DataRegion)
           .setIoTConsensusConfig(
               IoTConsensusConfig.newBuilder()
@@ -174,6 +178,8 @@
                               COMMON_CONF.getSubscriptionConsensusWalRetentionSizeInBytes())
                           .setSubscriptionWalRetentionTimeMs(
                               COMMON_CONF.getSubscriptionConsensusWalRetentionTimeMs())
+                          .setSnapshotTransmissionProgressLogIntervalMs(
+                              CONF.getDataRegionIotSnapshotTransmissionProgressLogIntervalMs())
                           .build())
                   .build())
           .setIoTConsensusV2Config(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 033264b..3509e6b 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -109,6 +109,7 @@
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_END_TIME_KEY;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_EXCLUSION_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_INCLUSION_KEY;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_KEY;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_EXCLUSION_KEY;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_INCLUSION_KEY;
@@ -121,6 +122,7 @@
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_END_TIME_KEY;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_START_TIME_KEY;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_EXCLUSION_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_INCLUSION_KEY;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_KEY;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_EXCLUSION_KEY;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_INCLUSION_KEY;
@@ -913,6 +915,8 @@
             || sourceParameters.hasAnyAttributes(
                 EXTRACTOR_PATH_KEY,
                 SOURCE_PATH_KEY,
+                EXTRACTOR_PATH_INCLUSION_KEY,
+                SOURCE_PATH_INCLUSION_KEY,
                 EXTRACTOR_PATTERN_INCLUSION_KEY,
                 SOURCE_PATTERN_INCLUSION_KEY,
                 EXTRACTOR_PATH_EXCLUSION_KEY,
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
index 5c38c3a..4456550 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
@@ -58,6 +58,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY;
@@ -71,11 +72,11 @@
 
   private final Set<TEndPoint> endPointSet;
 
-  private static final Map<String, Integer> RECEIVER_ATTRIBUTES_REF_COUNT =
-      new ConcurrentHashMap<>();
+  private static final Map<String, Integer> CLIENT_RESOURCE_REF_COUNT = new ConcurrentHashMap<>();
   private final String receiverAttributes;
+  private final String clientResourceKey;
 
-  // receiverAttributes -> IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>
+  // clientResourceKey -> IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>
   private static final Map<String, IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>>
       ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER = new ConcurrentHashMap<>();
   private static final Map<String, ExecutorService> TS_FILE_ASYNC_EXECUTOR_HOLDER =
@@ -129,10 +130,11 @@
             shouldMarkAsPipeRequest,
             isTSFileUsed,
             skipIfNoPrivileges);
+    clientResourceKey = generateClientResourceKey(receiverAttributes, endPoints);
     synchronized (IoTDBDataNodeAsyncClientManager.class) {
-      if (!ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.containsKey(receiverAttributes)) {
+      if (!ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.containsKey(clientResourceKey)) {
         ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.putIfAbsent(
-            receiverAttributes,
+            clientResourceKey,
             new IClientManager.Factory<TEndPoint, AsyncPipeDataTransferServiceClient>()
                 .createClientManager(
                     isTSFileUsed
@@ -140,21 +142,21 @@
                             .AsyncPipeTsFileDataTransferServiceClientPoolFactory()
                         : new ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()));
       }
-      endPoint2Client = ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get(receiverAttributes);
+      endPoint2Client = ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get(clientResourceKey);
 
       if (isTSFileUsed) {
-        if (!TS_FILE_ASYNC_EXECUTOR_HOLDER.containsKey(receiverAttributes)) {
+        if (!TS_FILE_ASYNC_EXECUTOR_HOLDER.containsKey(clientResourceKey)) {
           TS_FILE_ASYNC_EXECUTOR_HOLDER.putIfAbsent(
-              receiverAttributes,
+              clientResourceKey,
               IoTDBThreadPoolFactory.newFixedThreadPool(
                   PipeConfig.getInstance().getPipeRealTimeQueueMaxWaitingTsFileSize(),
                   ThreadName.PIPE_TSFILE_ASYNC_SEND_POOL.getName() + "-" + id.getAndIncrement()));
         }
-        executor = TS_FILE_ASYNC_EXECUTOR_HOLDER.get(receiverAttributes);
+        executor = TS_FILE_ASYNC_EXECUTOR_HOLDER.get(clientResourceKey);
       }
 
-      RECEIVER_ATTRIBUTES_REF_COUNT.compute(
-          receiverAttributes, (attributes, refCount) -> refCount == null ? 1 : refCount + 1);
+      CLIENT_RESOURCE_REF_COUNT.compute(
+          clientResourceKey, (attributes, refCount) -> refCount == null ? 1 : refCount + 1);
     }
 
     switch (loadBalanceStrategy) {
@@ -421,30 +423,30 @@
   public void close() {
     isClosed = true;
     synchronized (IoTDBDataNodeAsyncClientManager.class) {
-      RECEIVER_ATTRIBUTES_REF_COUNT.computeIfPresent(
-          receiverAttributes,
+      CLIENT_RESOURCE_REF_COUNT.computeIfPresent(
+          clientResourceKey,
           (attributes, refCount) -> {
             if (refCount <= 1) {
               final IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> clientManager =
-                  ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.remove(receiverAttributes);
+                  ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.remove(clientResourceKey);
               if (clientManager != null) {
                 try {
                   clientManager.close();
                   LOGGER.info(
                       DataNodePipeMessages
                           .CLOSED_ASYNCPIPEDATATRANSFERSERVICECLIENTMANAGER_FOR_RECEIVER_ATTRIBUTES,
-                      receiverAttributes);
+                      clientResourceKey);
                 } catch (final Exception e) {
                   LOGGER.warn(
                       DataNodePipeMessages
                           .FAILED_TO_CLOSE_ASYNCPIPEDATATRANSFERSERVICECLIENTMANAGER_FOR_RECEIVER_ATTRIBUTE,
-                      receiverAttributes,
+                      clientResourceKey,
                       e);
                 }
               }
 
               final ExecutorService executor =
-                  TS_FILE_ASYNC_EXECUTOR_HOLDER.remove(receiverAttributes);
+                  TS_FILE_ASYNC_EXECUTOR_HOLDER.remove(clientResourceKey);
               if (executor != null) {
                 try {
                   executor.shutdown();
@@ -552,4 +554,16 @@
   private void markHealthy(TEndPoint endPoint) {
     unhealthyEndPointMap.remove(endPoint);
   }
+
+  private static String generateClientResourceKey(
+      final String receiverAttributes, final List<TEndPoint> endPoints) {
+    return String.format(
+        "%s-%s",
+        receiverAttributes,
+        endPoints.stream()
+            .map(endPoint -> String.format("%s:%s", endPoint.getIp(), endPoint.getPort()))
+            .distinct()
+            .sorted()
+            .collect(Collectors.joining(",", "[", "]")));
+  }
 }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
index 813d55e..44fae46 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
@@ -85,6 +85,7 @@
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_STRICT_KEY;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODS_ENABLE_KEY;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODS_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_INCLUSION_KEY;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_KEY;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_FORMAT_KEY;
@@ -119,6 +120,7 @@
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_STRICT_KEY;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODS_ENABLE_KEY;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODS_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_INCLUSION_KEY;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_KEY;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_FORMAT_KEY;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_INCLUSION_KEY;
@@ -197,6 +199,8 @@
                 SOURCE_PATH_KEY,
                 EXTRACTOR_PATTERN_KEY,
                 SOURCE_PATTERN_KEY,
+                EXTRACTOR_PATH_INCLUSION_KEY,
+                SOURCE_PATH_INCLUSION_KEY,
                 EXTRACTOR_PATTERN_INCLUSION_KEY,
                 SOURCE_PATTERN_INCLUSION_KEY)) {
       throw new PipeException(DataNodePipeMessages.THE_PIPE_CANNOT_EXTRACT_TREE_MODEL_DATA);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TreeAggregator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TreeAggregator.java
index 07b482b..2e8650f 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TreeAggregator.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TreeAggregator.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.queryengine.execution.aggregation;
 
 import org.apache.iotdb.calc.execution.aggregation.Accumulator;
-import org.apache.iotdb.calc.metric.QueryExecutionMetricSet;
 import org.apache.iotdb.commons.queryengine.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
 
@@ -35,8 +34,6 @@
 import java.util.List;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.iotdb.calc.metric.QueryExecutionMetricSet.AGGREGATION_FROM_RAW_DATA;
-import static org.apache.iotdb.calc.metric.QueryExecutionMetricSet.AGGREGATION_FROM_STATISTICS;
 
 public class TreeAggregator {
 
@@ -44,8 +41,6 @@
   // In some intermediate result input, inputLocation[] should include two columns
   protected List<InputLocation[]> inputLocationList;
   protected final AggregationStep step;
-  public static final QueryExecutionMetricSet QUERY_EXECUTION_METRICS =
-      QueryExecutionMetricSet.getInstance();
 
   // Used for SeriesAggregateScanOperator
   public TreeAggregator(Accumulator accumulator, AggregationStep step) {
@@ -65,57 +60,44 @@
 
   // Used for SeriesAggregateScanOperator and RawDataAggregateOperator
   public void processTsBlock(TsBlock tsBlock, BitMap bitMap) {
-    long startTime = System.nanoTime();
-    try {
-      checkArgument(
-          step.isInputRaw(),
-          "Step in SeriesAggregateScanOperator and RawDataAggregateOperator can only process raw input");
-      for (InputLocation[] inputLocations : inputLocationList) {
-        Column[] timeAndValueColumn = new Column[1 + inputLocations.length];
-        timeAndValueColumn[0] = tsBlock.getTimeColumn();
-        for (int i = 0; i < inputLocations.length; i++) {
-          checkArgument(
-              inputLocations[i].getTsBlockIndex() == 0,
-              "RawDataAggregateOperator can only process one tsBlock input.");
-          int index = inputLocations[i].getValueColumnIndex();
-          // for count_time, time column is also its value column
-          // for max_by, the input column can also be time column.
-          timeAndValueColumn[1 + i] =
-              index == -1 ? timeAndValueColumn[0] : tsBlock.getColumn(index);
-        }
-        accumulator.addInput(timeAndValueColumn, bitMap);
+    checkArgument(
+        step.isInputRaw(),
+        "Step in SeriesAggregateScanOperator and RawDataAggregateOperator can only process raw input");
+    for (InputLocation[] inputLocations : inputLocationList) {
+      Column[] timeAndValueColumn = new Column[1 + inputLocations.length];
+      timeAndValueColumn[0] = tsBlock.getTimeColumn();
+      for (int i = 0; i < inputLocations.length; i++) {
+        checkArgument(
+            inputLocations[i].getTsBlockIndex() == 0,
+            "RawDataAggregateOperator can only process one tsBlock input.");
+        int index = inputLocations[i].getValueColumnIndex();
+        // for count_time, time column is also its value column
+        // for max_by, the input column can also be time column.
+        timeAndValueColumn[1 + i] = index == -1 ? timeAndValueColumn[0] : tsBlock.getColumn(index);
       }
-    } finally {
-      QUERY_EXECUTION_METRICS.recordExecutionCost(
-          AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime);
+      accumulator.addInput(timeAndValueColumn, bitMap);
     }
   }
 
   // Used for AggregateOperator
   public void processTsBlocks(TsBlock[] tsBlock) {
-    long startTime = System.nanoTime();
-    try {
-      checkArgument(!step.isInputRaw(), "Step in AggregateOperator cannot process raw input");
-      if (step.isInputFinal()) {
-        checkArgument(inputLocationList.size() == 1, "Final output can only be single column");
-        Column finalResult =
-            tsBlock[inputLocationList.get(0)[0].getTsBlockIndex()].getColumn(
-                inputLocationList.get(0)[0].getValueColumnIndex());
-        accumulator.setFinal(finalResult);
-      } else {
-        for (InputLocation[] inputLocations : inputLocationList) {
-          Column[] columns = new Column[inputLocations.length];
-          for (int i = 0; i < inputLocations.length; i++) {
-            columns[i] =
-                tsBlock[inputLocations[i].getTsBlockIndex()].getColumn(
-                    inputLocations[i].getValueColumnIndex());
-          }
-          accumulator.addIntermediate(columns);
+    checkArgument(!step.isInputRaw(), "Step in AggregateOperator cannot process raw input");
+    if (step.isInputFinal()) {
+      checkArgument(inputLocationList.size() == 1, "Final output can only be single column");
+      Column finalResult =
+          tsBlock[inputLocationList.get(0)[0].getTsBlockIndex()].getColumn(
+              inputLocationList.get(0)[0].getValueColumnIndex());
+      accumulator.setFinal(finalResult);
+    } else {
+      for (InputLocation[] inputLocations : inputLocationList) {
+        Column[] columns = new Column[inputLocations.length];
+        for (int i = 0; i < inputLocations.length; i++) {
+          columns[i] =
+              tsBlock[inputLocations[i].getTsBlockIndex()].getColumn(
+                  inputLocations[i].getValueColumnIndex());
         }
+        accumulator.addIntermediate(columns);
       }
-    } finally {
-      QUERY_EXECUTION_METRICS.recordExecutionCost(
-          AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime);
     }
   }
 
@@ -129,16 +111,10 @@
 
   /** Used for SeriesAggregateScanOperator. */
   public void processStatistics(Statistics timeStatistics, Statistics[] valueStatistics) {
-    long startTime = System.nanoTime();
-    try {
-      for (InputLocation[] inputLocations : inputLocationList) {
-        int valueIndex = inputLocations[0].getValueColumnIndex();
-        // valueIndex == -1 means it is count_time, we need to use timeStatistics
-        accumulator.addStatistics(valueIndex == -1 ? timeStatistics : valueStatistics[valueIndex]);
-      }
-    } finally {
-      QUERY_EXECUTION_METRICS.recordExecutionCost(
-          AGGREGATION_FROM_STATISTICS, System.nanoTime() - startTime);
+    for (InputLocation[] inputLocations : inputLocationList) {
+      int valueIndex = inputLocations[0].getValueColumnIndex();
+      // valueIndex == -1 means it is count_time, we need to use timeStatistics
+      accumulator.addStatistics(valueIndex == -1 ? timeStatistics : valueStatistics[valueIndex]);
     }
   }
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index 47eab01..3ffa7e6 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -21,6 +21,7 @@
 
 import org.apache.iotdb.calc.exception.MemoryNotEnoughException;
 import org.apache.iotdb.calc.exception.QueryProcessException;
+import org.apache.iotdb.calc.metric.QueryExecutionMetricSet;
 import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.audit.UserEntity;
@@ -86,6 +87,9 @@
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
+import static org.apache.iotdb.calc.metric.QueryExecutionMetricSet.AGGREGATION_FROM_RAW_DATA;
+import static org.apache.iotdb.calc.metric.QueryExecutionMetricSet.AGGREGATION_FROM_STATISTICS;
+import static org.apache.iotdb.calc.metric.QueryExecutionMetricSet.AGGREGATION_OPERATOR_FROM_RAW_DATA;
 import static org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils.getRootCause;
 import static org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet.BLOCK_QUEUED_TIME;
 import static org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet.READY_QUEUED_TIME;
@@ -168,6 +172,9 @@
   private int initQueryDataSourceRetryCount = 0;
   private final AtomicLong readyQueueTime = new AtomicLong(0);
   private final AtomicLong blockQueueTime = new AtomicLong(0);
+  private final AtomicLong scanAggregationFromRawDataCost = new AtomicLong(0);
+  private final AtomicLong scanAggregationFromStatisticsCost = new AtomicLong(0);
+  private final AtomicLong aggregationOperatorFromRawDataCost = new AtomicLong(0);
   private long unclosedSeqFileNum = 0;
   private long unclosedUnseqFileNum = 0;
   private long closedSeqFileNum = 0;
@@ -1117,6 +1124,8 @@
 
     QueryRelatedResourceMetricSet.getInstance().updateFragmentInstanceTime(durationTime);
 
+    recordAggregationCostToMetric();
+
     QueryResourceMetricSet.getInstance()
         .recordInitQueryResourceRetryCount(getInitQueryDataSourceRetryCount());
 
@@ -1264,6 +1273,49 @@
     blockQueueTime.addAndGet(time);
   }
 
+  public void recordScanAggregationFromRawDataCost(long costTimeInNanos) {
+    addCost(scanAggregationFromRawDataCost, costTimeInNanos);
+  }
+
+  public void recordScanAggregationFromStatisticsCost(long costTimeInNanos) {
+    addCost(scanAggregationFromStatisticsCost, costTimeInNanos);
+  }
+
+  public void recordAggregationOperatorFromRawDataCost(long costTimeInNanos) {
+    addCost(aggregationOperatorFromRawDataCost, costTimeInNanos);
+  }
+
+  private void addCost(AtomicLong cost, long costTimeInNanos) {
+    if (costTimeInNanos > 0) {
+      cost.addAndGet(costTimeInNanos);
+    }
+  }
+
+  long drainScanAggregationFromRawDataCost() {
+    return scanAggregationFromRawDataCost.getAndSet(0);
+  }
+
+  long drainScanAggregationFromStatisticsCost() {
+    return scanAggregationFromStatisticsCost.getAndSet(0);
+  }
+
+  long drainAggregationOperatorFromRawDataCost() {
+    return aggregationOperatorFromRawDataCost.getAndSet(0);
+  }
+
+  void recordAggregationCostToMetric() {
+    recordAggregationCost(AGGREGATION_FROM_RAW_DATA, drainScanAggregationFromRawDataCost());
+    recordAggregationCost(AGGREGATION_FROM_STATISTICS, drainScanAggregationFromStatisticsCost());
+    recordAggregationCost(
+        AGGREGATION_OPERATOR_FROM_RAW_DATA, drainAggregationOperatorFromRawDataCost());
+  }
+
+  private void recordAggregationCost(String stage, long costTimeInNanos) {
+    if (costTimeInNanos > 0) {
+      QueryExecutionMetricSet.getInstance().recordExecutionCost(stage, costTimeInNanos);
+    }
+  }
+
   public long getReadyQueueTime() {
     return readyQueueTime.get();
   }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
index d976155..584cb4f 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
@@ -72,6 +72,33 @@
   }
 
   @Override
+  public void recordScanAggregationFromRawDataCost(long costTimeInNanos) {
+    if (driverContext != null && driverContext.getFragmentInstanceContext() != null) {
+      driverContext
+          .getFragmentInstanceContext()
+          .recordScanAggregationFromRawDataCost(costTimeInNanos);
+    }
+  }
+
+  @Override
+  public void recordScanAggregationFromStatisticsCost(long costTimeInNanos) {
+    if (driverContext != null && driverContext.getFragmentInstanceContext() != null) {
+      driverContext
+          .getFragmentInstanceContext()
+          .recordScanAggregationFromStatisticsCost(costTimeInNanos);
+    }
+  }
+
+  @Override
+  public void recordAggregationOperatorFromRawDataCost(long costTimeInNanos) {
+    if (driverContext != null && driverContext.getFragmentInstanceContext() != null) {
+      driverContext
+          .getFragmentInstanceContext()
+          .recordAggregationOperatorFromRawDataCost(costTimeInNanos);
+    }
+  }
+
+  @Override
   public MemoryReservationManager getMemoryReservationContext() {
     return getInstanceContext().getMemoryReservationContext();
   }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationOperator.java
index 51f6e1c..eec1c01 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationOperator.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationOperator.java
@@ -154,19 +154,24 @@
 
   private void calculateNextAggregationResult() {
     // Consume current input tsBlocks
-    for (TreeAggregator aggregator : aggregators) {
-      aggregator.processTsBlocks(inputTsBlocks);
-    }
-
-    for (int i = 0; i < inputOperatorsCount; i++) {
-      inputTsBlocks[i] = inputTsBlocks[i].skipFirst();
-      if (inputTsBlocks[i].isEmpty()) {
-        inputTsBlocks[i] = null;
+    long startTime = System.nanoTime();
+    try {
+      for (TreeAggregator aggregator : aggregators) {
+        aggregator.processTsBlocks(inputTsBlocks);
       }
-    }
 
-    // Update result using aggregators
-    updateResultTsBlock();
+      for (int i = 0; i < inputOperatorsCount; i++) {
+        inputTsBlocks[i] = inputTsBlocks[i].skipFirst();
+        if (inputTsBlocks[i].isEmpty()) {
+          inputTsBlocks[i] = null;
+        }
+      }
+
+      // Update result using aggregators
+      updateResultTsBlock();
+    } finally {
+      operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() - startTime);
+    }
   }
 
   private void updateResultTsBlock() {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/RawDataAggregationOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/RawDataAggregationOperator.java
index 3018adf..385cc48 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/RawDataAggregationOperator.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/RawDataAggregationOperator.java
@@ -150,78 +150,83 @@
 
   @SuppressWarnings({"squid:S3776", "squid:S135"})
   private boolean calculateFromRawData() {
-    // if window is not initialized, we should init window status and reset aggregators
-    if (!windowManager.isCurWindowInit() && !skipPreviousWindowAndInitCurWindow()) {
-      return false;
-    }
+    long startTime = System.nanoTime();
+    try {
+      // if window is not initialized, we should init window status and reset aggregators
+      if (!windowManager.isCurWindowInit() && !skipPreviousWindowAndInitCurWindow()) {
+        return false;
+      }
 
-    // If current window has been initialized, we should judge whether inputTsBlock is empty
-    if (inputTsBlock == null || inputTsBlock.isEmpty()) {
-      return false;
-    }
+      // If current window has been initialized, we should judge whether inputTsBlock is empty
+      if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+        return false;
+      }
 
-    if (windowManager.satisfiedCurWindow(inputTsBlock)) {
+      if (windowManager.satisfiedCurWindow(inputTsBlock)) {
 
-      // Get the indexes in tsBlock which needs to be processed by aggregator, and the last row
-      // needed to be processed.
-      int tsBlockSize = inputTsBlock.getPositionCount();
-      IWindow curWindow = windowManager.getCurWindow();
+        // Get the indexes in tsBlock which needs to be processed by aggregator, and the last row
+        // needed to be processed.
+        int tsBlockSize = inputTsBlock.getPositionCount();
+        IWindow curWindow = windowManager.getCurWindow();
 
-      Column[] controlAndTimeColumn = new Column[2];
-      controlAndTimeColumn[0] = curWindow.getControlColumn(inputTsBlock);
-      controlAndTimeColumn[1] = inputTsBlock.getTimeColumn();
+        Column[] controlAndTimeColumn = new Column[2];
+        controlAndTimeColumn[0] = curWindow.getControlColumn(inputTsBlock);
+        controlAndTimeColumn[1] = inputTsBlock.getTimeColumn();
 
-      BitMap needProcess = new BitMap(tsBlockSize);
-      int lastIndexToProcess = -1;
-      boolean hasSkip = false;
+        BitMap needProcess = new BitMap(tsBlockSize);
+        int lastIndexToProcess = -1;
+        boolean hasSkip = false;
 
-      for (int i = 0; i < tsBlockSize; i++) {
-        if (windowManager.isIgnoringNull() && controlAndTimeColumn[0].isNull(i)) {
+        for (int i = 0; i < tsBlockSize; i++) {
+          if (windowManager.isIgnoringNull() && controlAndTimeColumn[0].isNull(i)) {
+            lastIndexToProcess = i;
+            hasSkip = true;
+            continue;
+          }
+          if (!curWindow.satisfy(controlAndTimeColumn[0], i)) {
+            break;
+          }
+          needProcess.mark(i);
+          curWindow.mergeOnePoint(controlAndTimeColumn, i);
           lastIndexToProcess = i;
-          hasSkip = true;
-          continue;
-        }
-        if (!curWindow.satisfy(controlAndTimeColumn[0], i)) {
-          break;
-        }
-        needProcess.mark(i);
-        curWindow.mergeOnePoint(controlAndTimeColumn, i);
-        lastIndexToProcess = i;
-      }
-
-      // if no row needs to skip, just send a null parameter.
-      if (!hasSkip) {
-        needProcess = null;
-      }
-
-      TsBlock inputRegion = inputTsBlock.getRegion(0, lastIndexToProcess + 1);
-      for (TreeAggregator aggregator : aggregators) {
-        // Current agg method has been calculated
-        if (aggregator.hasFinalResult()) {
-          continue;
         }
 
-        aggregator.processTsBlock(inputRegion, needProcess);
+        // if no row needs to skip, just send a null parameter.
+        if (!hasSkip) {
+          needProcess = null;
+        }
+
+        TsBlock inputRegion = inputTsBlock.getRegion(0, lastIndexToProcess + 1);
+        for (TreeAggregator aggregator : aggregators) {
+          // Current agg method has been calculated
+          if (aggregator.hasFinalResult()) {
+            continue;
+          }
+
+          aggregator.processTsBlock(inputRegion, needProcess);
+        }
+        int lastReadRowIndex = lastIndexToProcess + 1;
+        // If lastReadRowIndex is not zero, some of tsBlock is consumed and result is cached in
+        // aggregators.
+        if (lastReadRowIndex != 0) {
+          hasCachedDataInAggregator = true;
+        }
+        if (lastReadRowIndex >= inputTsBlock.getPositionCount()) {
+          inputTsBlock = null;
+          // For the last index of TsBlock, if we can know the aggregation calculation is over
+          // we can directly updateResultTsBlock and return true
+          return isAllAggregatorsHasFinalResult(aggregators);
+        } else {
+          inputTsBlock = inputTsBlock.subTsBlock(lastReadRowIndex);
+          return true;
+        }
       }
-      int lastReadRowIndex = lastIndexToProcess + 1;
-      // If lastReadRowIndex is not zero, some of tsBlock is consumed and result is cached in
-      // aggregators.
-      if (lastReadRowIndex != 0) {
-        hasCachedDataInAggregator = true;
-      }
-      if (lastReadRowIndex >= inputTsBlock.getPositionCount()) {
-        inputTsBlock = null;
-        // For the last index of TsBlock, if we can know the aggregation calculation is over
-        // we can directly updateResultTsBlock and return true
-        return isAllAggregatorsHasFinalResult(aggregators);
-      } else {
-        inputTsBlock = inputTsBlock.subTsBlock(lastReadRowIndex);
-        return true;
-      }
+
+      boolean isTsBlockOutOfBound = windowManager.isTsBlockOutOfBound(inputTsBlock);
+      return isAllAggregatorsHasFinalResult(aggregators) || isTsBlockOutOfBound;
+    } finally {
+      operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() - startTime);
     }
-
-    boolean isTsBlockOutOfBound = windowManager.isTsBlockOutOfBound(inputTsBlock);
-    return isAllAggregatorsHasFinalResult(aggregators) || isTsBlockOutOfBound;
   }
 
   @Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TagAggregationOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TagAggregationOperator.java
index adab0fb..afa5c4c 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TagAggregationOperator.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TagAggregationOperator.java
@@ -123,12 +123,17 @@
   }
 
   private void aggregate(List<TreeAggregator> aggregators, TsBlock[] rowBlocks) {
-    for (TreeAggregator aggregator : aggregators) {
-      if (aggregator == null) {
-        continue;
+    long startTime = System.nanoTime();
+    try {
+      for (TreeAggregator aggregator : aggregators) {
+        if (aggregator == null) {
+          continue;
+        }
+        aggregator.reset();
+        aggregator.processTsBlocks(rowBlocks);
       }
-      aggregator.reset();
-      aggregator.processTsBlocks(rowBlocks);
+    } finally {
+      operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() - startTime);
     }
   }
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java
index c63cd24..3f9db7b 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java
@@ -245,18 +245,28 @@
   }
 
   private boolean calcFromRawData(TsBlock tsBlock) {
-    Pair<Boolean, TsBlock> calcResult =
-        calculateAggregationFromRawData(tsBlock, aggregators, curTimeRange, ascending);
-    inputTsBlock = calcResult.getRight();
-    return calcResult.getLeft();
+    long startTime = System.nanoTime();
+    try {
+      Pair<Boolean, TsBlock> calcResult =
+          calculateAggregationFromRawData(tsBlock, aggregators, curTimeRange, ascending);
+      inputTsBlock = calcResult.getRight();
+      return calcResult.getLeft();
+    } finally {
+      operatorContext.recordScanAggregationFromRawDataCost(System.nanoTime() - startTime);
+    }
   }
 
   protected void calcFromStatistics(Statistics timeStatistics, Statistics[] valueStatistics) {
-    for (TreeAggregator aggregator : aggregators) {
-      if (aggregator.hasFinalResult()) {
-        continue;
+    long startTime = System.nanoTime();
+    try {
+      for (TreeAggregator aggregator : aggregators) {
+        if (aggregator.hasFinalResult()) {
+          continue;
+        }
+        aggregator.processStatistics(timeStatistics, valueStatistics);
       }
-      aggregator.processStatistics(timeStatistics, valueStatistics);
+    } finally {
+      operatorContext.recordScanAggregationFromStatisticsCost(System.nanoTime() - startTime);
     }
   }
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java
index 031c10d..8569616 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java
@@ -285,28 +285,33 @@
       return new Pair<>(false, inputTsBlock);
     }
 
-    updateCurTimeRange(inputTsBlock.getStartTime());
+    long startTime = System.nanoTime();
+    try {
+      updateCurTimeRange(inputTsBlock.getStartTime());
 
-    TimeRange curTimeRange = timeIterator.getCurTimeRange();
-    // check if the tsBlock does not contain points in current interval
-    if (satisfiedTimeRange(inputTsBlock, curTimeRange, ascending)) {
-      // skip points that cannot be calculated
-      if ((ascending && inputTsBlock.getStartTime() < curTimeRange.getMin())
-          || (!ascending && inputTsBlock.getStartTime() > curTimeRange.getMax())) {
-        inputTsBlock = skipPointsOutOfTimeRange(inputTsBlock, curTimeRange, ascending);
+      TimeRange curTimeRange = timeIterator.getCurTimeRange();
+      // check if the tsBlock does not contain points in current interval
+      if (satisfiedTimeRange(inputTsBlock, curTimeRange, ascending)) {
+        // skip points that cannot be calculated
+        if ((ascending && inputTsBlock.getStartTime() < curTimeRange.getMin())
+            || (!ascending && inputTsBlock.getStartTime() > curTimeRange.getMax())) {
+          inputTsBlock = skipPointsOutOfTimeRange(inputTsBlock, curTimeRange, ascending);
+        }
+
+        inputTsBlock = process(inputTsBlock, curTimeRange);
       }
 
-      inputTsBlock = process(inputTsBlock, curTimeRange);
+      // judge whether the calculation finished
+      boolean isTsBlockOutOfBound =
+          inputTsBlock != null
+              && (ascending
+                  ? inputTsBlock.getEndTime() > curTimeRange.getMax()
+                  : inputTsBlock.getEndTime() < curTimeRange.getMin());
+      return new Pair<>(
+          isAllAggregatorsHasFinalResult(tableAggregators) || isTsBlockOutOfBound, inputTsBlock);
+    } finally {
+      operatorContext.recordScanAggregationFromRawDataCost(System.nanoTime() - startTime);
     }
-
-    // judge whether the calculation finished
-    boolean isTsBlockOutOfBound =
-        inputTsBlock != null
-            && (ascending
-                ? inputTsBlock.getEndTime() > curTimeRange.getMax()
-                : inputTsBlock.getEndTime() < curTimeRange.getMin());
-    return new Pair<>(
-        isAllAggregatorsHasFinalResult(tableAggregators) || isTsBlockOutOfBound, inputTsBlock);
   }
 
   private TsBlock process(TsBlock inputTsBlock, TimeRange curTimeRange) {
@@ -394,28 +399,32 @@
 
   protected void calcFromStatistics(Statistics timeStatistics, Statistics[] valueStatistics) {
     int idx = -1;
+    long startTime = System.nanoTime();
+    try {
+      for (TableAggregator aggregator : tableAggregators) {
+        if (aggregator.hasFinalResult()) {
+          idx += aggregator.getChannelCount();
+          continue;
+        }
 
-    for (TableAggregator aggregator : tableAggregators) {
-      if (aggregator.hasFinalResult()) {
-        idx += aggregator.getChannelCount();
-        continue;
+        Statistics[] statisticsArray = new Statistics[aggregator.getChannelCount()];
+        for (int i = 0; i < aggregator.getChannelCount(); i++) {
+          idx++;
+
+          TsTableColumnCategory columnSchemaCategory =
+              aggColumnSchemas.get(aggregatorInputChannels.get(idx)).getColumnCategory();
+          statisticsArray[i] =
+              buildStatistics(
+                  columnSchemaCategory,
+                  timeStatistics,
+                  valueStatistics,
+                  aggregatorInputChannels.get(idx));
+        }
+
+        aggregator.processStatistics(statisticsArray);
       }
-
-      Statistics[] statisticsArray = new Statistics[aggregator.getChannelCount()];
-      for (int i = 0; i < aggregator.getChannelCount(); i++) {
-        idx++;
-
-        TsTableColumnCategory columnSchemaCategory =
-            aggColumnSchemas.get(aggregatorInputChannels.get(idx)).getColumnCategory();
-        statisticsArray[i] =
-            buildStatistics(
-                columnSchemaCategory,
-                timeStatistics,
-                valueStatistics,
-                aggregatorInputChannels.get(idx));
-      }
-
-      aggregator.processStatistics(statisticsArray);
+    } finally {
+      operatorContext.recordScanAggregationFromStatisticsCost(System.nanoTime() - startTime);
     }
   }
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java
index 17136cd..573d8fc 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java
@@ -146,9 +146,16 @@
     try {
       deleteAllFilesInDataDirs();
       LOGGER.info(StorageEngineMessages.REMOVE_ALL_DATA_FILES_IN_ORIGINAL_DIR);
+      // IoTConsensus may spread the fragments of one snapshot across several receive folders.
+      // The fileTarget map must be shared across all of them so that a tsfile and its companion
+      // files (resource, exclusive mods, etc.) are relinked to the same data dir even when their
+      // fragments were received on different disks.
+      Map<String, String> fileTarget = new HashMap<>();
       for (String path : snapshotPaths) {
         File snapshotDir = new File(path);
-        createLinksFromSnapshotDirToDataDirWithoutLog(snapshotDir);
+        // IoTConsensus fragments arrive under different recv folders; do not map each
+        // fragment back to the same disk as its recv path, rely on fileTarget instead.
+        createLinksFromSnapshotDirToDataDirWithoutLog(snapshotDir, fileTarget, false);
         loadCompressionRatio(snapshotDir);
       }
       return loadSnapshot();
@@ -170,7 +177,7 @@
       }
       LOGGER.info(StorageEngineMessages.MOVING_SNAPSHOT_FILE_TO_DATA_DIRS);
       File snapshotDir = new File(snapshotPath);
-      createLinksFromSnapshotDirToDataDirWithoutLog(snapshotDir);
+      createLinksFromSnapshotDirToDataDirWithoutLog(snapshotDir, new HashMap<>(), true);
       loadCompressionRatio(snapshotDir);
       return loadSnapshot();
     } catch (IOException | DiskSpaceInsufficientException e) {
@@ -294,7 +301,8 @@
     }
   }
 
-  private void createLinksFromSnapshotDirToDataDirWithoutLog(File sourceDir)
+  private void createLinksFromSnapshotDirToDataDirWithoutLog(
+      File sourceDir, Map<String, String> fileTarget, boolean preferKeepSameDiskWhenLoading)
       throws IOException, DiskSpaceInsufficientException {
     if (!sourceDir.exists()) {
       throw new IOException(
@@ -340,7 +348,8 @@
                 + dataRegionId
                 + File.separator
                 + timePartitionFolder.getName();
-        createLinksFromSnapshotToSourceDir(targetSuffix, files, folderManager);
+        createLinksFromSnapshotToSourceDir(
+            targetSuffix, files, folderManager, fileTarget, preferKeepSameDiskWhenLoading);
       }
     }
 
@@ -359,7 +368,8 @@
                 + dataRegionId
                 + File.separator
                 + timePartitionFolder.getName();
-        createLinksFromSnapshotToSourceDir(targetSuffix, files, folderManager);
+        createLinksFromSnapshotToSourceDir(
+            targetSuffix, files, folderManager, fileTarget, preferKeepSameDiskWhenLoading);
       }
     }
   }
@@ -406,8 +416,12 @@
   }
 
   private void createLinksFromSnapshotToSourceDir(
-      String targetSuffix, File[] files, FolderManager folderManager) throws IOException {
-    Map<String, String> fileTarget = new HashMap<>();
+      String targetSuffix,
+      File[] files,
+      FolderManager folderManager,
+      Map<String, String> fileTarget,
+      boolean preferKeepSameDiskWhenLoading)
+      throws IOException {
     for (File file : files) {
       checkTsFileResourceExists(file);
 
@@ -421,7 +435,8 @@
 
       try {
         String firstFolderOfSameDisk =
-            IoTDBDescriptor.getInstance().getConfig().isKeepSameDiskWhenLoadingSnapshot()
+            preferKeepSameDiskWhenLoading
+                    && IoTDBDescriptor.getInstance().getConfig().isKeepSameDiskWhenLoadingSnapshot()
                 ? folderManager.getFirstFolderOfSameDisk(file.getAbsolutePath())
                 : null;
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandler.java
index 92a1d45..4879e4f 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandler.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandler.java
@@ -19,7 +19,10 @@
 
 package org.apache.iotdb.db.storageengine.load.active;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.db.i18n.StorageEngineMessages;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -96,6 +99,20 @@
     void handle(final ActiveLoadPendingQueue.ActiveLoadEntry entry);
   }
 
+  public static boolean isStatusShouldRetry(
+      final ActiveLoadPendingQueue.ActiveLoadEntry entry, final TSStatus status) {
+    if (status != null
+        && status.getCode() == TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) {
+      LOGGER.info(
+          StorageEngineMessages.ACTIVE_LOAD_TEMPORARILY_UNAVAILABLE,
+          entry.getFile(),
+          entry.isGeneratedByPipe(),
+          status);
+      return true;
+    }
+    return isExceptionMessageShouldRetry(entry, status == null ? null : status.getMessage());
+  }
+
   public static boolean isExceptionMessageShouldRetry(
       final ActiveLoadPendingQueue.ActiveLoadEntry entry, final String message) {
     if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
index 61e297a..3f06964 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -294,7 +294,7 @@
 
   private void handleLoadFailure(
       final ActiveLoadPendingQueue.ActiveLoadEntry entry, final TSStatus status) {
-    if (!ActiveLoadFailedMessageHandler.isExceptionMessageShouldRetry(entry, status.getMessage())) {
+    if (!ActiveLoadFailedMessageHandler.isStatusShouldRetry(entry, status)) {
       LOGGER.warn(
           "Failed to auto load tsfile {} (isGeneratedByPipe = {}), status: {}. File will be moved to fail directory.",
           entry.getFile(),
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableConvertedInsertTabletStatementExceptionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableConvertedInsertTabletStatementExceptionVisitor.java
index 28dfafc..ff538cb 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableConvertedInsertTabletStatementExceptionVisitor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableConvertedInsertTabletStatementExceptionVisitor.java
@@ -23,7 +23,6 @@
 import org.apache.iotdb.commons.exception.SemanticException;
 import org.apache.iotdb.commons.exception.auth.AccessDeniedException;
 import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Node;
-import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -32,6 +31,9 @@
     implements AstVisitor<TSStatus, Exception> {
   @Override
   public TSStatus visitNode(final Node node, final Exception context) {
+    if (LoadTsFileDataTypeConverter.isMemoryPressureException(context)) {
+      return LoadTsFileDataTypeConverter.getMemoryPressureStatus(context);
+    }
     if (context instanceof AccessDeniedException) {
       return new TSStatus(TSStatusCode.NO_PERMISSION.getStatusCode())
           .setMessage(context.getMessage());
@@ -42,9 +44,8 @@
 
   @Override
   public TSStatus visitLoadTsFile(final LoadTsFile loadTsFile, final Exception context) {
-    if (context instanceof LoadRuntimeOutOfMemoryException) {
-      return new TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
+    if (LoadTsFileDataTypeConverter.isMemoryPressureException(context)) {
+      return LoadTsFileDataTypeConverter.getMemoryPressureStatus(context);
     } else if (context instanceof SemanticException) {
       return new TSStatus(TSStatusCode.LOAD_USER_CONFLICT_EXCEPTION.getStatusCode())
           .setMessage(context.getMessage());
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeConvertedInsertTabletStatementExceptionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeConvertedInsertTabletStatementExceptionVisitor.java
index 9244551..ce465f8 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeConvertedInsertTabletStatementExceptionVisitor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeConvertedInsertTabletStatementExceptionVisitor.java
@@ -22,7 +22,6 @@
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.SemanticException;
 import org.apache.iotdb.commons.exception.auth.AccessDeniedException;
-import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
@@ -33,6 +32,9 @@
 
   @Override
   public TSStatus visitNode(final StatementNode node, final Exception context) {
+    if (LoadTsFileDataTypeConverter.isMemoryPressureException(context)) {
+      return LoadTsFileDataTypeConverter.getMemoryPressureStatus(context);
+    }
     if (context instanceof AccessDeniedException) {
       return new TSStatus(TSStatusCode.NO_PERMISSION.getStatusCode())
           .setMessage(context.getMessage());
@@ -44,9 +46,8 @@
   @Override
   public TSStatus visitLoadFile(
       final LoadTsFileStatement loadTsFileStatement, final Exception context) {
-    if (context instanceof LoadRuntimeOutOfMemoryException) {
-      return new TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
+    if (LoadTsFileDataTypeConverter.isMemoryPressureException(context)) {
+      return LoadTsFileDataTypeConverter.getMemoryPressureStatus(context);
     } else if (context instanceof SemanticException) {
       return new TSStatus(TSStatusCode.LOAD_USER_CONFLICT_EXCEPTION.getStatusCode())
           .setMessage(context.getMessage());
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
index 3aab4aa..b8dc0e5 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
@@ -21,9 +21,14 @@
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.queryengine.common.SqlDialect;
 import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
+import org.apache.iotdb.db.i18n.StorageEngineMessages;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.protocol.session.IClientSession;
 import org.apache.iotdb.db.protocol.session.InternalClientSession;
 import org.apache.iotdb.db.protocol.session.SessionManager;
@@ -46,6 +51,7 @@
 
 import java.time.ZoneId;
 import java.util.Optional;
+import java.util.concurrent.Semaphore;
 
 public class LoadTsFileDataTypeConverter {
 
@@ -53,6 +59,89 @@
 
   private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
 
+  private static Semaphore getTabletConversionSemaphore() {
+    return TabletConversionSemaphoreHolder.INSTANCE;
+  }
+
+  private static int getTabletConversionPermitCount() {
+    final int configuredThreadCount =
+        Math.max(
+            1,
+            IoTDBDescriptor.getInstance().getConfig().getLoadTsFileTabletConversionThreadCount());
+    if (!PipeConfig.getInstance().getPipeMemoryManagementEnabled()) {
+      return configuredThreadCount;
+    }
+    final long memorySafePermitCount =
+        getAllowedPipeTabletMemorySizeInBytes() / estimatePipeTabletMemorySizePerConversion();
+    return (int) Math.max(1, Math.min((long) configuredThreadCount, memorySafePermitCount));
+  }
+
+  private static long estimatePipeTabletMemorySizePerConversion() {
+    final PipeConfig pipeConfig = PipeConfig.getInstance();
+    final long tabletSize =
+        Math.max(
+            1L, IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes());
+    final long maxReaderChunkSize = Math.max(0L, pipeConfig.getPipeMaxReaderChunkSize());
+    final long tableSize =
+        Math.max(
+            1L,
+            Math.min(tabletSize, IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize()));
+
+    // Both conversion paths keep source/converted tablet data and the current reader chunk in
+    // memory. Table-model conversion may additionally keep source/converted table-sized batches, so
+    // use the larger estimate as the per-conversion working set.
+    final long treeParserMemorySize = 2L * tabletSize + maxReaderChunkSize;
+    final long tableParserMemorySize = 2L * tabletSize + 2L * tableSize + maxReaderChunkSize;
+    return Math.max(1L, Math.max(treeParserMemorySize, tableParserMemorySize));
+  }
+
+  private static long getAllowedPipeTabletMemorySizeInBytes() {
+    final PipeConfig pipeConfig = PipeConfig.getInstance();
+    return (long)
+        ((pipeConfig.getPipeDataStructureTabletMemoryBlockAllocationRejectThreshold()
+                + pipeConfig.getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold() / 2)
+            * PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes());
+  }
+
+  private static Optional<TSStatus> getInterruptedConversionStatus(final InterruptedException e) {
+    Thread.currentThread().interrupt();
+    return Optional.of(
+        new TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+            .setMessage(
+                StorageEngineMessages.INTERRUPTED_WAITING_TABLET_CONVERSION_SLOT + e.getMessage()));
+  }
+
+  public static boolean isMemoryPressureException(final Throwable throwable) {
+    Throwable current = throwable;
+    while (current != null) {
+      if (current instanceof LoadRuntimeOutOfMemoryException
+          || current instanceof PipeRuntimeOutOfMemoryCriticalException) {
+        return true;
+      }
+      current = current.getCause();
+    }
+    return false;
+  }
+
+  public static TSStatus getMemoryPressureStatus(final Throwable throwable) {
+    Throwable current = throwable;
+    while (current != null) {
+      if (current instanceof LoadRuntimeOutOfMemoryException
+          || current instanceof PipeRuntimeOutOfMemoryCriticalException) {
+        return new TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+            .setMessage(current.getMessage());
+      }
+      current = current.getCause();
+    }
+
+    return new TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+        .setMessage(throwable == null ? null : throwable.getMessage());
+  }
+
+  private static class TabletConversionSemaphoreHolder {
+    private static final Semaphore INSTANCE = new Semaphore(getTabletConversionPermitCount());
+  }
+
   public static final LoadConvertedInsertTabletStatementTSStatusVisitor STATEMENT_STATUS_VISITOR =
       new LoadConvertedInsertTabletStatementTSStatusVisitor();
   public static final LoadTreeConvertedInsertTabletStatementExceptionVisitor
@@ -83,16 +172,28 @@
   }
 
   public Optional<TSStatus> convertForTableModel(final LoadTsFile loadTsFileTableStatement) {
+    boolean isPermitAcquired = false;
     try {
+      getTabletConversionSemaphore().acquire();
+      isPermitAcquired = true;
       return loadTsFileTableStatement.accept(
           tableStatementDataTypeConvertExecutionVisitor, loadTsFileTableStatement.getDatabase());
+    } catch (final InterruptedException e) {
+      return getInterruptedConversionStatus(e);
     } catch (Exception e) {
+      if (isMemoryPressureException(e)) {
+        return Optional.of(getMemoryPressureStatus(e));
+      }
       LOGGER.warn(
           "Failed to convert data types for table model statement {}.",
           loadTsFileTableStatement,
           e);
       return Optional.of(
           new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
+    } finally {
+      if (isPermitAcquired) {
+        getTabletConversionSemaphore().release();
+      }
     }
   }
 
@@ -136,14 +237,25 @@
 
   public Optional<TSStatus> convertForTreeModel(final LoadTsFileStatement loadTsFileTreeStatement) {
     DataNodeSchemaLockManager.getInstance().releaseReadLock(context);
+    boolean isPermitAcquired = false;
     try {
+      getTabletConversionSemaphore().acquire();
+      isPermitAcquired = true;
       return loadTsFileTreeStatement.accept(treeStatementDataTypeConvertExecutionVisitor, null);
+    } catch (final InterruptedException e) {
+      return getInterruptedConversionStatus(e);
     } catch (Exception e) {
+      if (isMemoryPressureException(e)) {
+        return Optional.of(getMemoryPressureStatus(e));
+      }
       LOGGER.warn(
           "Failed to convert data types for tree model statement {}.", loadTsFileTreeStatement, e);
       return Optional.of(
           new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
     } finally {
+      if (isPermitAcquired) {
+        getTabletConversionSemaphore().release();
+      }
       DataNodeSchemaLockManager.getInstance()
           .takeReadLock(context, SchemaLockType.VALIDATE_VS_DELETION_TREE);
     }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java
index 200be13..561da82 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java
@@ -22,9 +22,6 @@
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.disk.FolderManager;
 import org.apache.iotdb.commons.disk.strategy.DirectoryStrategyType;
-import org.apache.iotdb.commons.disk.strategy.MaxDiskUsableSpaceFirstStrategy;
-import org.apache.iotdb.commons.disk.strategy.MinFolderOccupiedSpaceFirstStrategy;
-import org.apache.iotdb.commons.disk.strategy.RandomOnDiskUsableSpaceStrategy;
 import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -98,19 +95,8 @@
   }
 
   public synchronized void initFolders() {
-    try {
-      String strategyName = Class.forName(config.getMultiDirStrategyClassName()).getSimpleName();
-      if (strategyName.equals(MaxDiskUsableSpaceFirstStrategy.class.getSimpleName())) {
-        directoryStrategyType = DirectoryStrategyType.MAX_DISK_USABLE_SPACE_FIRST_STRATEGY;
-      } else if (strategyName.equals(MinFolderOccupiedSpaceFirstStrategy.class.getSimpleName())) {
-        directoryStrategyType = DirectoryStrategyType.MIN_FOLDER_OCCUPIED_SPACE_FIRST_STRATEGY;
-      } else if (strategyName.equals(RandomOnDiskUsableSpaceStrategy.class.getSimpleName())) {
-        directoryStrategyType = DirectoryStrategyType.RANDOM_ON_DISK_USABLE_SPACE_STRATEGY;
-      }
-    } catch (Exception e) {
-      logger.error(
-          "Can't find strategy {} for mult-directories.", config.getMultiDirStrategyClassName(), e);
-    }
+    directoryStrategyType =
+        DirectoryStrategyType.fromClassName(config.getMultiDirStrategyClassName());
 
     config.updatePath();
     String[][] tierDirs = config.getTierDataDirs();
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/TreePatternPruningTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/TreePatternPruningTest.java
index eaa7e17..ae8093f 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/TreePatternPruningTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/TreePatternPruningTest.java
@@ -122,7 +122,7 @@
   }
 
   @Test
-  public void testLegacyPatternMultipleRulesRejected() {
+  public void testLegacyPatternMultipleRulesPreserved() {
     final PipeParameters params =
         new PipeParameters(
             new HashMap<String, String>() {
@@ -131,12 +131,9 @@
               }
             });
 
-    try {
-      TreePattern.parsePipePatternFromSourceParameters(params);
-      Assert.fail("Should throw PipeException for legacy multi-pattern parameters");
-    } catch (final PipeException ignored) {
-      // Expected exception
-    }
+    final TreePattern result = TreePattern.parsePipePatternFromSourceParameters(params);
+
+    Assert.assertEquals("root.sg.A,root.sg.B", result.getPattern());
   }
 
   @Test
@@ -157,7 +154,7 @@
   }
 
   @Test
-  public void testLegacyPathMultipleRulesRejected() {
+  public void testLegacyPathMultipleRulesPreserved() {
     final PipeParameters params =
         new PipeParameters(
             new HashMap<String, String>() {
@@ -166,12 +163,28 @@
               }
             });
 
-    try {
-      TreePattern.parsePipePatternFromSourceParameters(params);
-      Assert.fail("Should throw PipeException for legacy multi-path parameters");
-    } catch (final PipeException ignored) {
-      // Expected exception
-    }
+    final TreePattern result = TreePattern.parsePipePatternFromSourceParameters(params);
+
+    Assert.assertTrue(result instanceof UnionIoTDBTreePattern);
+    Assert.assertEquals("root.sg.d1,root.sg.d2", result.getPattern());
+  }
+
+  @Test
+  public void testLegacyPathMultipleExclusionsPreserved() {
+    final PipeParameters params =
+        new PipeParameters(
+            new HashMap<String, String>() {
+              {
+                put(PipeSourceConstant.SOURCE_PATH_KEY, "root.sg.**");
+                put(PipeSourceConstant.SOURCE_PATH_EXCLUSION_KEY, "root.sg.d1,root.sg.d2");
+              }
+            });
+
+    final TreePattern result = TreePattern.parsePipePatternFromSourceParameters(params);
+
+    Assert.assertTrue(result instanceof WithExclusionIoTDBTreePattern);
+    Assert.assertEquals(
+        "INCLUSION(root.sg.**), EXCLUSION(root.sg.d1,root.sg.d2)", result.getPattern());
   }
 
   @Test
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/IoTDBDataNodeAsyncClientManagerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/IoTDBDataNodeAsyncClientManagerTest.java
index fb13e43..f564dbb 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/IoTDBDataNodeAsyncClientManagerTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/IoTDBDataNodeAsyncClientManagerTest.java
@@ -28,6 +28,7 @@
 
 import java.lang.reflect.Field;
 import java.util.Collections;
+import java.util.concurrent.ExecutorService;
 
 public class IoTDBDataNodeAsyncClientManagerTest {
 
@@ -71,6 +72,48 @@
     }
   }
 
+  @Test
+  public void testClientResourcesShouldDifferentiateEndPoints() throws Exception {
+    final IoTDBDataNodeAsyncClientManager firstManager =
+        new IoTDBDataNodeAsyncClientManager(
+            Collections.singletonList(new TEndPoint("127.0.0.1", 6667)),
+            false,
+            "round-robin",
+            new UserEntity(1L, "user", "cli-host"),
+            "password",
+            true,
+            "sync",
+            true,
+            true,
+            true,
+            true);
+    final IoTDBDataNodeAsyncClientManager secondManager =
+        new IoTDBDataNodeAsyncClientManager(
+            Collections.singletonList(new TEndPoint("127.0.0.2", 6667)),
+            false,
+            "round-robin",
+            new UserEntity(1L, "user", "cli-host"),
+            "password",
+            true,
+            "sync",
+            true,
+            true,
+            true,
+            true);
+
+    try {
+      Assert.assertEquals(
+          getReceiverAttributes(firstManager), getReceiverAttributes(secondManager));
+      Assert.assertNotEquals(
+          getClientResourceKey(firstManager), getClientResourceKey(secondManager));
+      Assert.assertNotSame(getEndPoint2Client(firstManager), getEndPoint2Client(secondManager));
+      Assert.assertNotSame(getExecutor(firstManager), getExecutor(secondManager));
+    } finally {
+      firstManager.close();
+      secondManager.close();
+    }
+  }
+
   private static String getReceiverAttributes(final IoTDBDataNodeAsyncClientManager manager)
       throws Exception {
     final Field field =
@@ -79,10 +122,24 @@
     return (String) field.get(manager);
   }
 
+  private static String getClientResourceKey(final IoTDBDataNodeAsyncClientManager manager)
+      throws Exception {
+    final Field field = IoTDBDataNodeAsyncClientManager.class.getDeclaredField("clientResourceKey");
+    field.setAccessible(true);
+    return (String) field.get(manager);
+  }
+
   private static Object getEndPoint2Client(final IoTDBDataNodeAsyncClientManager manager)
       throws Exception {
     final Field field = IoTDBDataNodeAsyncClientManager.class.getDeclaredField("endPoint2Client");
     field.setAccessible(true);
     return field.get(manager);
   }
+
+  private static ExecutorService getExecutor(final IoTDBDataNodeAsyncClientManager manager)
+      throws Exception {
+    final Field field = IoTDBDataNodeAsyncClientManager.class.getDeclaredField("executor");
+    field.setAccessible(true);
+    return (ExecutorService) field.get(manager);
+  }
 }
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContextTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContextTest.java
new file mode 100644
index 0000000..0e62133
--- /dev/null
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContextTest.java
@@ -0,0 +1,623 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.fragment;
+
+import org.apache.iotdb.calc.execution.aggregation.Accumulator;
+import org.apache.iotdb.calc.execution.operator.CommonOperatorContext;
+import org.apache.iotdb.calc.execution.operator.Operator;
+import org.apache.iotdb.calc.execution.operator.source.relational.aggregation.TableAccumulator;
+import org.apache.iotdb.calc.execution.operator.source.relational.aggregation.TableAggregator;
+import org.apache.iotdb.calc.metric.QueryExecutionMetricSet;
+import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.commons.queryengine.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.AggregationNode;
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.execution.aggregation.TreeAggregator;
+import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITimeRangeIterator;
+import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import org.apache.iotdb.db.queryengine.execution.operator.process.TagAggregationOperator;
+import org.apache.iotdb.db.queryengine.execution.operator.source.AbstractSeriesAggregationScanOperator;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.metrics.DoNothingMetricService;
+import org.apache.iotdb.metrics.type.HistogramSnapshot;
+import org.apache.iotdb.metrics.type.Timer;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.junit.Test;
+
+import javax.management.ObjectName;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Collections.singletonList;
+import static org.apache.iotdb.db.queryengine.common.QueryId.MOCK_QUERY_ID;
+import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class FragmentInstanceContextTest {
+
+  @Test
+  public void testDrainAggregationCostsSeparatelyOnce() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 0), "0");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext context = createFragmentInstanceContext(instanceId, stateMachine);
+
+      context.recordScanAggregationFromRawDataCost(10);
+      context.recordScanAggregationFromRawDataCost(20);
+      context.recordScanAggregationFromStatisticsCost(30);
+      context.recordAggregationOperatorFromRawDataCost(40);
+
+      assertEquals(30, context.drainScanAggregationFromRawDataCost());
+      assertEquals(30, context.drainScanAggregationFromStatisticsCost());
+      assertEquals(40, context.drainAggregationOperatorFromRawDataCost());
+
+      assertEquals(0, context.drainScanAggregationFromRawDataCost());
+      assertEquals(0, context.drainScanAggregationFromStatisticsCost());
+      assertEquals(0, context.drainAggregationOperatorFromRawDataCost());
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void testOperatorContextForwardsAggregationCostsToFragmentInstanceContext() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      FragmentInstanceContext context = newFragmentInstanceContext(instanceNotificationExecutor);
+      DriverContext driverContext = new DriverContext(context, 0);
+      OperatorContext operatorContext =
+          driverContext.addOperatorContext(1, new PlanNodeId("forward"), "forward");
+
+      operatorContext.recordScanAggregationFromRawDataCost(11);
+      operatorContext.recordScanAggregationFromStatisticsCost(13);
+      operatorContext.recordAggregationOperatorFromRawDataCost(17);
+
+      assertEquals(11, context.drainScanAggregationFromRawDataCost());
+      assertEquals(13, context.drainScanAggregationFromStatisticsCost());
+      assertEquals(17, context.drainAggregationOperatorFromRawDataCost());
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void testOperatorContextWithoutFragmentInstanceAndCommonContextUseNoOpFallback() {
+    OperatorContext operatorContext =
+        new OperatorContext(1, new PlanNodeId("no-op"), "no-op", new DriverContext());
+    operatorContext.recordScanAggregationFromRawDataCost(11);
+    operatorContext.recordScanAggregationFromStatisticsCost(13);
+    operatorContext.recordAggregationOperatorFromRawDataCost(17);
+
+    TestCommonOperatorContext commonOperatorContext = new TestCommonOperatorContext();
+    commonOperatorContext.recordScanAggregationFromRawDataCost(19);
+    commonOperatorContext.recordScanAggregationFromStatisticsCost(23);
+    commonOperatorContext.recordAggregationOperatorFromRawDataCost(29);
+
+    assertEquals(0, commonOperatorContext.getTotalExecutionTimeInNanos());
+    assertNull(commonOperatorContext.getMemoryReservationContext());
+  }
+
+  @Test
+  public void testAggregationCostsFromOperatorsFlushToMetricStages() throws Exception {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    RecordingMetricService metricService = new RecordingMetricService();
+    QueryExecutionMetricSet metricSet = QueryExecutionMetricSet.getInstance();
+    metricSet.bindTo(metricService);
+    try {
+      FragmentInstanceContext context = newFragmentInstanceContext(instanceNotificationExecutor);
+      DriverContext driverContext = new DriverContext(context, 0);
+      OperatorContext scanOperatorContext =
+          driverContext.addOperatorContext(1, new PlanNodeId("scan"), "scan");
+      TestSeriesAggregationScanOperator scanOperator =
+          new TestSeriesAggregationScanOperator(scanOperatorContext);
+
+      scanOperator.calculateRaw(longTsBlock());
+      scanOperator.calculateStatistics(
+          Statistics.getStatsByType(TSDataType.INT64),
+          new Statistics[] {Statistics.getStatsByType(TSDataType.INT64)});
+
+      OperatorContext aggregationOperatorContext =
+          driverContext.addOperatorContext(2, new PlanNodeId("aggregation"), "aggregation");
+      org.apache.iotdb.calc.execution.operator.source.relational.aggregation.AggregationOperator
+          aggregationOperator = newAggregationOperator(aggregationOperatorContext);
+      aggregationOperator.next();
+
+      context.recordAggregationCostToMetric();
+
+      assertEquals(1, metricService.count("raw_data"));
+      assertTrue(metricService.sum("raw_data") > 0);
+      assertEquals(1, metricService.count("statistics"));
+      assertTrue(metricService.sum("statistics") > 0);
+      assertEquals(1, metricService.count("raw_data_operator"));
+      assertTrue(metricService.sum("raw_data_operator") > 0);
+
+      context.recordAggregationCostToMetric();
+
+      assertEquals(1, metricService.count("raw_data"));
+      assertEquals(1, metricService.count("statistics"));
+      assertEquals(1, metricService.count("raw_data_operator"));
+    } finally {
+      metricSet.unbindFrom(metricService);
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void testAggregationOperatorRecordsTsBlockCostForIntermediateInput() throws Exception {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      FragmentInstanceContext context = newFragmentInstanceContext(instanceNotificationExecutor);
+      DriverContext driverContext = new DriverContext(context, 0);
+      OperatorContext operatorContext =
+          driverContext.addOperatorContext(1, new PlanNodeId("aggregation"), "aggregation");
+      org.apache.iotdb.calc.execution.operator.source.relational.aggregation.AggregationOperator
+          aggregationOperator = newAggregationOperator(operatorContext);
+
+      assertNull(aggregationOperator.next());
+
+      assertTrue(context.drainAggregationOperatorFromRawDataCost() > 0);
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void testTreeAggregationOperatorRecordsTsBlockCost() throws Exception {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      FragmentInstanceContext context = newFragmentInstanceContext(instanceNotificationExecutor);
+      DriverContext driverContext = new DriverContext(context, 0);
+      OperatorContext operatorContext =
+          driverContext.addOperatorContext(1, new PlanNodeId("tree-aggregation"), "aggregation");
+      org.apache.iotdb.db.queryengine.execution.operator.process.AggregationOperator
+          aggregationOperator =
+              new org.apache.iotdb.db.queryengine.execution.operator.process.AggregationOperator(
+                  operatorContext,
+                  singletonList(newTreeFinalAggregator()),
+                  new SingleTimeRangeIterator(new TimeRange(0, 10)),
+                  singletonList(new SingleTsBlockOperator(operatorContext, longTsBlock())),
+                  false,
+                  1024);
+
+      aggregationOperator.isBlocked().get();
+      aggregationOperator.next();
+
+      assertTrue(context.drainAggregationOperatorFromRawDataCost() > 0);
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void testTagAggregationOperatorRecordsTsBlockCost() throws Exception {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      FragmentInstanceContext context = newFragmentInstanceContext(instanceNotificationExecutor);
+      DriverContext driverContext = new DriverContext(context, 0);
+      OperatorContext operatorContext =
+          driverContext.addOperatorContext(1, new PlanNodeId("tag-aggregation"), "tag-aggregation");
+      TagAggregationOperator tagAggregationOperator =
+          new TagAggregationOperator(
+              operatorContext,
+              singletonList(singletonList("tag")),
+              singletonList(singletonList(newTreeFinalAggregator())),
+              singletonList(new SingleTsBlockOperator(operatorContext, longTsBlock())),
+              1024);
+
+      tagAggregationOperator.isBlocked().get();
+      tagAggregationOperator.next();
+
+      assertTrue(context.drainAggregationOperatorFromRawDataCost() > 0);
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  private static FragmentInstanceContext newFragmentInstanceContext(
+      ExecutorService instanceNotificationExecutor) {
+    FragmentInstanceId instanceId =
+        new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 0), "0");
+    FragmentInstanceStateMachine stateMachine =
+        new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+    return createFragmentInstanceContext(instanceId, stateMachine);
+  }
+
+  private static TsBlock longTsBlock() {
+    TsBlockBuilder builder = new TsBlockBuilder(singletonList(TSDataType.INT64));
+    builder.getTimeColumnBuilder().writeLong(0);
+    builder.getColumnBuilder(0).writeLong(1);
+    builder.declarePosition();
+    return builder.build();
+  }
+
+  private static org.apache.iotdb.calc.execution.operator.source.relational.aggregation
+          .AggregationOperator
+      newAggregationOperator(OperatorContext operatorContext) {
+    TableAggregator aggregator =
+        new TableAggregator(
+            new TestTableAccumulator(),
+            AggregationNode.Step.INTERMEDIATE,
+            TSDataType.INT64,
+            singletonList(0),
+            OptionalInt.empty());
+    return new org.apache.iotdb.calc.execution.operator.source.relational.aggregation
+        .AggregationOperator(
+        operatorContext,
+        new SingleTsBlockOperator(operatorContext, longTsBlock()),
+        singletonList(aggregator));
+  }
+
+  private static TreeAggregator newTreeFinalAggregator() {
+    return new TreeAggregator(
+        new TestTreeAccumulator(),
+        AggregationStep.FINAL,
+        singletonList(new InputLocation[] {new InputLocation(0, 0)}));
+  }
+
+  private static class TestSeriesAggregationScanOperator
+      extends AbstractSeriesAggregationScanOperator {
+
+    private TestSeriesAggregationScanOperator(OperatorContext operatorContext) {
+      super(
+          new PlanNodeId("scan"),
+          operatorContext,
+          null,
+          1,
+          singletonList(new TreeAggregator(new TestTreeAccumulator(), AggregationStep.SINGLE)),
+          new SingleTimeRangeIterator(new TimeRange(0, 10)),
+          true,
+          false,
+          null,
+          1024,
+          0,
+          true);
+      this.curTimeRange = new TimeRange(0, 10);
+    }
+
+    private void calculateRaw(TsBlock tsBlock) {
+      this.inputTsBlock = tsBlock;
+      calcFromCachedData();
+    }
+
+    private void calculateStatistics(Statistics timeStatistics, Statistics[] valueStatistics) {
+      calcFromStatistics(timeStatistics, valueStatistics);
+    }
+
+    @Override
+    public long ramBytesUsed() {
+      return 0;
+    }
+  }
+
+  private static class SingleTimeRangeIterator implements ITimeRangeIterator {
+    private final TimeRange timeRange;
+    private boolean consumed;
+
+    private SingleTimeRangeIterator(TimeRange timeRange) {
+      this.timeRange = timeRange;
+    }
+
+    @Override
+    public TimeRange getFirstTimeRange() {
+      return timeRange;
+    }
+
+    @Override
+    public boolean hasNextTimeRange() {
+      return !consumed;
+    }
+
+    @Override
+    public TimeRange nextTimeRange() {
+      consumed = true;
+      return timeRange;
+    }
+
+    @Override
+    public boolean isAscending() {
+      return true;
+    }
+
+    @Override
+    public long currentOutputTime() {
+      return timeRange.getMin();
+    }
+
+    @Override
+    public long getTotalIntervalNum() {
+      return 1;
+    }
+  }
+
+  private static class RecordingMetricService extends DoNothingMetricService {
+    private final Map<String, RecordingTimer> timers = new HashMap<>();
+
+    @Override
+    public Timer getOrCreateTimer(String metric, MetricLevel metricLevel, String... tags) {
+      if (!Metric.AGGREGATION.toString().equals(metric)
+          || tags.length != 2
+          || !Tag.FROM.toString().equals(tags[0])) {
+        return super.getOrCreateTimer(metric, metricLevel, tags);
+      }
+      return timers.computeIfAbsent(tags[1], key -> new RecordingTimer());
+    }
+
+    @Override
+    public void remove(MetricType type, String metric, String... tags) {
+      if (Metric.AGGREGATION.toString().equals(metric)
+          && tags.length == 2
+          && Tag.FROM.toString().equals(tags[0])) {
+        timers.remove(tags[1]);
+      }
+    }
+
+    long count(String from) {
+      RecordingTimer timer = timers.get(from);
+      return timer == null ? 0 : timer.getCount();
+    }
+
+    long sum(String from) {
+      RecordingTimer timer = timers.get(from);
+      return timer == null ? 0 : timer.sum.get();
+    }
+  }
+
+  private static class RecordingTimer implements Timer {
+    private final AtomicLong count = new AtomicLong();
+    private final AtomicLong sum = new AtomicLong();
+
+    @Override
+    public void update(long duration, TimeUnit unit) {
+      count.incrementAndGet();
+      sum.addAndGet(unit.toNanos(duration));
+    }
+
+    @Override
+    public HistogramSnapshot takeSnapshot() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getCount() {
+      return count.get();
+    }
+
+    @Override
+    public void setObjectName(ObjectName objectName) {
+      // no-op
+    }
+  }
+
+  private static class SingleTsBlockOperator implements Operator {
+    private final CommonOperatorContext operatorContext;
+    private final TsBlock tsBlock;
+    private boolean consumed;
+
+    private SingleTsBlockOperator(CommonOperatorContext operatorContext, TsBlock tsBlock) {
+      this.operatorContext = operatorContext;
+      this.tsBlock = tsBlock;
+    }
+
+    @Override
+    public CommonOperatorContext getOperatorContext() {
+      return operatorContext;
+    }
+
+    @Override
+    public TsBlock next() {
+      consumed = true;
+      return tsBlock;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return !consumed;
+    }
+
+    @Override
+    public void close() {
+      // no-op
+    }
+
+    @Override
+    public boolean isFinished() {
+      return consumed;
+    }
+
+    @Override
+    public long calculateMaxPeekMemory() {
+      return 0;
+    }
+
+    @Override
+    public long calculateMaxReturnSize() {
+      return 0;
+    }
+
+    @Override
+    public long calculateRetainedSizeAfterCallingNext() {
+      return 0;
+    }
+
+    @Override
+    public long ramBytesUsed() {
+      return 0;
+    }
+  }
+
+  private static class TestTableAccumulator implements TableAccumulator {
+    @Override
+    public long getEstimatedSize() {
+      return 0;
+    }
+
+    @Override
+    public TableAccumulator copy() {
+      return new TestTableAccumulator();
+    }
+
+    @Override
+    public void addInput(
+        Column[] arguments,
+        org.apache.iotdb.calc.execution.operator.source.relational.aggregation.AggregationMask
+            mask) {
+      // no-op
+    }
+
+    @Override
+    public void addIntermediate(Column argument) {
+      // no-op
+    }
+
+    @Override
+    public void evaluateIntermediate(ColumnBuilder columnBuilder) {
+      columnBuilder.writeLong(0);
+    }
+
+    @Override
+    public void evaluateFinal(ColumnBuilder columnBuilder) {
+      columnBuilder.writeLong(0);
+    }
+
+    @Override
+    public boolean hasFinalResult() {
+      return false;
+    }
+
+    @Override
+    public void addStatistics(Statistics[] statistics) {
+      // no-op
+    }
+
+    @Override
+    public void reset() {
+      // no-op
+    }
+  }
+
+  private static class TestTreeAccumulator implements Accumulator {
+    @Override
+    public void addInput(Column[] columns, org.apache.tsfile.utils.BitMap bitMap) {
+      // no-op
+    }
+
+    @Override
+    public void addIntermediate(Column[] partialResult) {
+      // no-op
+    }
+
+    @Override
+    public void addStatistics(Statistics statistics) {
+      // no-op
+    }
+
+    @Override
+    public void setFinal(Column finalResult) {
+      // no-op
+    }
+
+    @Override
+    public void outputIntermediate(ColumnBuilder[] tsBlockBuilder) {
+      tsBlockBuilder[0].writeLong(0);
+    }
+
+    @Override
+    public void outputFinal(ColumnBuilder tsBlockBuilder) {
+      tsBlockBuilder.writeLong(0);
+    }
+
+    @Override
+    public void reset() {
+      // no-op
+    }
+
+    @Override
+    public boolean hasFinalResult() {
+      return false;
+    }
+
+    @Override
+    public TSDataType[] getIntermediateType() {
+      return new TSDataType[] {TSDataType.INT64};
+    }
+
+    @Override
+    public TSDataType getFinalType() {
+      return TSDataType.INT64;
+    }
+  }
+
+  private static class TestCommonOperatorContext extends CommonOperatorContext {
+    private TestCommonOperatorContext() {
+      super(0, new PlanNodeId("common"), "common");
+    }
+
+    @Override
+    public MemoryReservationManager getMemoryReservationContext() {
+      return null;
+    }
+
+    @Override
+    public int getFragmentId() {
+      return 0;
+    }
+
+    @Override
+    public int getPipelineId() {
+      return 0;
+    }
+
+    @Override
+    public long ramBytesUsed() {
+      return 0;
+    }
+  }
+}
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/MultilevelPriorityQueueTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/MultilevelPriorityQueueTest.java
index 0ee6877..42bb75a 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/MultilevelPriorityQueueTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/MultilevelPriorityQueueTest.java
@@ -38,6 +38,9 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.OptionalInt;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
 
 public class MultilevelPriorityQueueTest {
   @Test
@@ -58,12 +61,14 @@
                 }
               });
       t1.start();
-      Thread.sleep(100);
-      Assert.assertEquals(Thread.State.WAITING, t1.getState());
+      await()
+          .atMost(1, TimeUnit.MINUTES)
+          .untilAsserted(() -> Assert.assertEquals(Thread.State.WAITING, t1.getState()));
       DriverTask e2 = mockDriverTask(mockDriverTaskId(), false);
       queue.push(e2);
-      Thread.sleep(100);
-      Assert.assertEquals(Thread.State.TERMINATED, t1.getState());
+      await()
+          .atMost(1, TimeUnit.MINUTES)
+          .untilAsserted(() -> Assert.assertEquals(Thread.State.TERMINATED, t1.getState()));
       Assert.assertEquals(1, res.size());
       Assert.assertEquals(e2.getDriverTaskId().toString(), res.get(0).getDriverTaskId().toString());
     } catch (Exception e) {
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java
index d042d2a..2eedca1 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java
@@ -52,7 +52,9 @@
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.iotdb.consensus.iot.IoTConsensusServerImpl.SNAPSHOT_DIR_NAME;
 import static org.apache.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
@@ -259,6 +261,51 @@
   }
 
   /**
+   * When IoTConsensus spreads a tsfile and its exclusive mods across different receive folders, the
+   * loader must still relink them to the same data dir. Otherwise the mods file is not found next
+   * to the tsfile and deletion markers are silently ignored.
+   */
+  @Test
+  public void testLoadSnapshotKeepsTsFileAndModsOnSameDataDirWhenFragmentsAreSpread()
+      throws IOException, WriteProcessException {
+    String[][] originDataDirs = IoTDBDescriptor.getInstance().getConfig().getTierDataDirs();
+    IoTDBDescriptor.getInstance().getConfig().setTierDataDirs(testDataDirs);
+    TierManager.getInstance().resetFolders();
+    String recvBase0 = "target" + File.separator + "recv-snapshot-mods-0";
+    String recvBase1 = "target" + File.separator + "recv-snapshot-mods-1";
+    File recvFolder0 = new File(recvBase0, SNAPSHOT_DIR_NAME);
+    File recvFolder1 = new File(recvBase1, SNAPSHOT_DIR_NAME);
+    try {
+      Assert.assertTrue(recvFolder0.mkdirs());
+      Assert.assertTrue(recvFolder1.mkdirs());
+
+      writeSnapshotFragmentWithExclusiveModsSpread(recvFolder0.getAbsolutePath(), 0, recvFolder1);
+
+      DataRegion dataRegion =
+          new SnapshotLoader(
+                  Arrays.asList(recvFolder0.getAbsolutePath(), recvFolder1.getAbsolutePath()),
+                  testSgName,
+                  "0")
+              .loadSnapshotForStateMachine();
+
+      Assert.assertNotNull(dataRegion);
+      TsFileResource resource = dataRegion.getTsFileManager().getTsFileList(true).get(0);
+      File tsFile = resource.getTsFile();
+      File modsFile =
+          org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile
+              .getExclusiveMods(tsFile);
+      Assert.assertTrue(modsFile.exists());
+      Assert.assertEquals(
+          tsFile.getParentFile().getAbsolutePath(), modsFile.getParentFile().getAbsolutePath());
+    } finally {
+      FileUtils.recursivelyDeleteFolder(recvBase0);
+      FileUtils.recursivelyDeleteFolder(recvBase1);
+      IoTDBDescriptor.getInstance().getConfig().setTierDataDirs(originDataDirs);
+      TierManager.getInstance().resetFolders();
+    }
+  }
+
+  /**
    * The fragments of one snapshot are disjoint across the receive folders, so the order in which
    * the folders are relinked must not change the loaded data. This loads the same spread snapshot
    * with the receive folders presented in the opposite order and expects the identical result.
@@ -343,6 +390,41 @@
     resource.serialize();
   }
 
+  private void writeSnapshotFragmentWithExclusiveModsSpread(
+      String tsFileRecvSnapshotDir, int i, File modsRecvFolder)
+      throws IOException, WriteProcessException {
+    writeSnapshotFragment(tsFileRecvSnapshotDir, i);
+    String tsFileName = String.format("%d-%d-0-0.tsfile", i + 1, i + 1);
+    File tsFile =
+        new File(
+            tsFileRecvSnapshotDir
+                + File.separator
+                + "sequence"
+                + File.separator
+                + testSgName
+                + File.separator
+                + "0"
+                + File.separator
+                + "0"
+                + File.separator
+                + tsFileName);
+    File sourceMods =
+        org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile.getExclusiveMods(
+            tsFile);
+    Assert.assertTrue(sourceMods.exists() || sourceMods.createNewFile());
+
+    File targetModsDir =
+        new File(
+            modsRecvFolder,
+            "sequence" + File.separator + testSgName + File.separator + "0" + File.separator + "0");
+    Assert.assertTrue(targetModsDir.exists() || targetModsDir.mkdirs());
+    Files.copy(
+        sourceMods.toPath(),
+        new File(targetModsDir, sourceMods.getName()).toPath(),
+        java.nio.file.StandardCopyOption.REPLACE_EXISTING);
+    Files.delete(sourceMods.toPath());
+  }
+
   @Ignore("Need manual execution to specify different disks")
   @Test
   public void testLoadSnapshotNoHardLink()
@@ -511,12 +593,24 @@
 
     Method method =
         SnapshotLoader.class.getDeclaredMethod(
-            "createLinksFromSnapshotToSourceDir", String.class, File[].class, FolderManager.class);
+            "createLinksFromSnapshotToSourceDir",
+            String.class,
+            File[].class,
+            FolderManager.class,
+            Map.class,
+            boolean.class);
     method.setAccessible(true);
 
     SnapshotLoader loader = new SnapshotLoader("dummy", "root.testsg", "0");
 
-    method.invoke(loader, targetSuffix, files, folderManager);
+    // Tracks fileKey -> chosen data dir, so files sharing a fileKey land in the same dir.
+    Map<String, String> fileTarget = new HashMap<>();
+    method.invoke(loader, targetSuffix, files, folderManager, fileTarget, true);
+
+    // The shared fileKey must be recorded exactly once, pointing at one of the data dirs.
+    String fileKey = tsFile.getName().split("\\.")[0];
+    Assert.assertEquals(1, fileTarget.size());
+    Assert.assertTrue(Arrays.asList(dataDirs).contains(fileTarget.get(fileKey)));
 
     // verify: only ONE dir contains all three files
     int hitDirCount = 0;
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandlerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandlerTest.java
new file mode 100644
index 0000000..67c909c
--- /dev/null
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandlerTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.active;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ActiveLoadFailedMessageHandlerTest {
+
+  @Test
+  public void testTemporaryUnavailableStatusShouldRetryWithoutMessageMatch() {
+    final ActiveLoadPendingQueue.ActiveLoadEntry entry =
+        new ActiveLoadPendingQueue.ActiveLoadEntry("test.tsfile", "pending", true, false);
+
+    Assert.assertTrue(
+        ActiveLoadFailedMessageHandler.isStatusShouldRetry(
+            entry,
+            new TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+                .setMessage("temporarily unavailable")));
+  }
+
+  @Test
+  public void testPermanentStatusShouldNotRetryWithoutMessageMatch() {
+    final ActiveLoadPendingQueue.ActiveLoadEntry entry =
+        new ActiveLoadPendingQueue.ActiveLoadEntry("test.tsfile", "pending", true, false);
+
+    Assert.assertFalse(
+        ActiveLoadFailedMessageHandler.isStatusShouldRetry(
+            entry, new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage("bad")));
+  }
+}
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoaderTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoaderTest.java
new file mode 100644
index 0000000..3208dc1
--- /dev/null
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoaderTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.active;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
+import org.apache.iotdb.db.storageengine.dataregion.modification.v1.ModificationFileV1;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.lang.reflect.Method;
+import java.nio.file.Files;
+
+public class ActiveLoadTsFileLoaderTest {
+
+  private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private File tempDir;
+  private String originalFailDir;
+  private NodeStatus originalNodeStatus;
+
+  @Before
+  public void setUp() throws Exception {
+    tempDir = Files.createTempDirectory("active-load-retry").toFile();
+    originalFailDir = config.getLoadActiveListeningFailDir();
+    originalNodeStatus = CommonDescriptor.getInstance().getConfig().getNodeStatus();
+    CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.Running);
+    config.setLoadActiveListeningFailDir(new File(tempDir, "failed").getAbsolutePath());
+  }
+
+  @After
+  public void tearDown() {
+    config.setLoadActiveListeningFailDir(originalFailDir);
+    CommonDescriptor.getInstance().getConfig().setNodeStatus(originalNodeStatus);
+    deleteRecursively(tempDir);
+  }
+
+  @Test
+  public void testTemporaryUnavailableStatusDoesNotMoveFileToFailDir() throws Exception {
+    final ActiveLoadTsFileLoader loader = new ActiveLoadTsFileLoader();
+    final File tsFile = createTsFileWithCompanionFiles("retry.tsfile");
+    final ActiveLoadPendingQueue.ActiveLoadEntry entry =
+        new ActiveLoadPendingQueue.ActiveLoadEntry(
+            tsFile.getAbsolutePath(), tempDir.getAbsolutePath(), false, false);
+
+    invokeInitFailDirIfNecessary(loader);
+    invokeHandleLoadFailure(
+        loader,
+        entry,
+        new TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+            .setMessage("load conversion is temporarily unavailable"));
+
+    Assert.assertTrue(tsFile.exists());
+    Assert.assertTrue(new File(tsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).exists());
+    Assert.assertTrue(new File(tsFile.getAbsolutePath() + ModificationFileV1.FILE_SUFFIX).exists());
+    Assert.assertTrue(new File(tsFile.getAbsolutePath() + ModificationFile.FILE_SUFFIX).exists());
+    Assert.assertFalse(new File(config.getLoadActiveListeningFailDir(), tsFile.getName()).exists());
+  }
+
+  @Test
+  public void testPermanentFailureStatusMovesFileToFailDir() throws Exception {
+    final ActiveLoadTsFileLoader loader = new ActiveLoadTsFileLoader();
+    final File tsFile = createTsFileWithCompanionFiles("failed.tsfile");
+    final ActiveLoadPendingQueue.ActiveLoadEntry entry =
+        new ActiveLoadPendingQueue.ActiveLoadEntry(
+            tsFile.getAbsolutePath(), tempDir.getAbsolutePath(), false, false);
+
+    invokeInitFailDirIfNecessary(loader);
+    invokeHandleLoadFailure(
+        loader,
+        entry,
+        new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage("permanent error"));
+
+    final File failDir = new File(config.getLoadActiveListeningFailDir());
+    Assert.assertFalse(tsFile.exists());
+    Assert.assertTrue(new File(failDir, tsFile.getName()).exists());
+    Assert.assertTrue(
+        new File(failDir, tsFile.getName() + TsFileResource.RESOURCE_SUFFIX).exists());
+    Assert.assertTrue(
+        new File(failDir, tsFile.getName() + ModificationFileV1.FILE_SUFFIX).exists());
+    Assert.assertTrue(new File(failDir, tsFile.getName() + ModificationFile.FILE_SUFFIX).exists());
+  }
+
+  private File createTsFileWithCompanionFiles(final String fileName) throws Exception {
+    final File tsFile = new File(tempDir, fileName);
+    Assert.assertTrue(tsFile.createNewFile());
+    Assert.assertTrue(
+        new File(tsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).createNewFile());
+    Assert.assertTrue(
+        new File(tsFile.getAbsolutePath() + ModificationFileV1.FILE_SUFFIX).createNewFile());
+    Assert.assertTrue(
+        new File(tsFile.getAbsolutePath() + ModificationFile.FILE_SUFFIX).createNewFile());
+    return tsFile;
+  }
+
+  private void invokeInitFailDirIfNecessary(final ActiveLoadTsFileLoader loader) throws Exception {
+    final Method method = ActiveLoadTsFileLoader.class.getDeclaredMethod("initFailDirIfNecessary");
+    method.setAccessible(true);
+    method.invoke(loader);
+  }
+
+  private void invokeHandleLoadFailure(
+      final ActiveLoadTsFileLoader loader,
+      final ActiveLoadPendingQueue.ActiveLoadEntry entry,
+      final TSStatus status)
+      throws Exception {
+    final Method method =
+        ActiveLoadTsFileLoader.class.getDeclaredMethod(
+            "handleLoadFailure", ActiveLoadPendingQueue.ActiveLoadEntry.class, TSStatus.class);
+    method.setAccessible(true);
+    method.invoke(loader, entry, status);
+  }
+
+  private static void deleteRecursively(final File file) {
+    if (file == null || !file.exists()) {
+      return;
+    }
+    final File[] children = file.listFiles();
+    if (children != null) {
+      for (final File child : children) {
+        deleteRecursively(child);
+      }
+    }
+    Assert.assertTrue(file.delete());
+  }
+}
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitorTest.java
index 32b15a1..29766f5 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitorTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitorTest.java
@@ -21,6 +21,7 @@
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
 import org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser;
@@ -209,6 +210,24 @@
         pointCountByTimeseries.getOrDefault(ALIGNED_DEVICE + ".s12", 0) < ROW_COUNT_PER_DEVICE);
   }
 
+  @Test
+  public void testPipeOutOfMemoryIsTemporaryUnavailable() throws Exception {
+    tsFile = File.createTempFile("oom", ".tsfile");
+
+    final LoadTreeConvertedInsertTabletStatementExceptionVisitor visitor =
+        new LoadTreeConvertedInsertTabletStatementExceptionVisitor();
+    final TSStatus status =
+        visitor.visitLoadFile(
+            LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()),
+            new IllegalStateException(
+                "wrapped memory pressure",
+                new PipeRuntimeOutOfMemoryCriticalException("pipe tablet memory is not enough")));
+
+    Assert.assertEquals(
+        TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode(), status.getCode());
+    Assert.assertNotEquals(TSStatusCode.LOAD_FILE_ERROR.getStatusCode(), status.getCode());
+  }
+
   private void writeTsFile(final File file) throws Exception {
     if (file.exists()) {
       Assert.assertTrue(file.delete());
diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index 1e808d7..facf07f 100644
--- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -1723,6 +1723,13 @@
 # Datatype: long
 region_migration_speed_limit_bytes_per_second = 50331648
 
+# The minimum interval (in ms) between two per-file progress logs while transmitting a snapshot in
+# IoTConsensus. A snapshot may contain a huge number of files, so logging one line per file is
+# costly; this throttles it to at most once per interval. A value <= 0 logs every file.
+# effectiveMode: hot_reload
+# Datatype: long
+data_region_iot_snapshot_transmission_progress_log_interval_ms = 5000
+
 # When loading snapshot, try keeping TsFiles in the same disk as the snapshot dir.
 # This may reduce file copies but may also result in a worse disk load-balance
 # effectiveMode: hot_reload
diff --git a/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/UtilMessages.java b/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/UtilMessages.java
index b13ad21..395754d 100644
--- a/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/UtilMessages.java
+++ b/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/UtilMessages.java
@@ -119,6 +119,8 @@
       "Disk space is insufficient, change system mode to read-only.";
   public static final String CANNOT_CALCULATE_OCCUPIED_SPACE =
       "Cannot calculate occupied space of folder {}";
+  public static final String UNRECOGNIZED_MULTI_DIR_STRATEGY =
+      "Unrecognized multi-dir strategy '{}', falling back to {}.";
 
   // ======================== NodeUrlUtils ========================
 
diff --git a/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/UtilMessages.java b/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/UtilMessages.java
index f4215f6..1a1b97e 100644
--- a/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/UtilMessages.java
+++ b/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/UtilMessages.java
@@ -117,6 +117,8 @@
       "磁盘空间不足,系统切换为只读模式。";
   public static final String CANNOT_CALCULATE_OCCUPIED_SPACE =
       "无法计算文件夹 {} 的已占用空间";
+  public static final String UNRECOGNIZED_MULTI_DIR_STRATEGY =
+      "无法识别的多目录策略 '{}',回退为 {}。";
 
   // ======================== NodeUrlUtils ========================
 
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/DirectoryStrategyType.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/DirectoryStrategyType.java
index 2d081dd..853edf8 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/DirectoryStrategyType.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/DirectoryStrategyType.java
@@ -18,9 +18,39 @@
  */
 package org.apache.iotdb.commons.disk.strategy;
 
+import org.apache.iotdb.commons.i18n.UtilMessages;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public enum DirectoryStrategyType {
   SEQUENCE_STRATEGY,
   MAX_DISK_USABLE_SPACE_FIRST_STRATEGY,
   MIN_FOLDER_OCCUPIED_SPACE_FIRST_STRATEGY,
-  RANDOM_ON_DISK_USABLE_SPACE_STRATEGY,
+  RANDOM_ON_DISK_USABLE_SPACE_STRATEGY;
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(DirectoryStrategyType.class);
+
+  /**
+   * Resolves the strategy type from a multi-dir strategy class name as configured by {@code
+   * dn_multi_dir_strategy}. Accepts either a simple class name (e.g. {@code SequenceStrategy}) or a
+   * fully-qualified one. Returns {@link #SEQUENCE_STRATEGY} for a null or unrecognized value, which
+   * matches the configured default.
+   */
+  public static DirectoryStrategyType fromClassName(String className) {
+    if (className != null) {
+      String simpleName = className.substring(className.lastIndexOf('.') + 1);
+      if (simpleName.equals(MaxDiskUsableSpaceFirstStrategy.class.getSimpleName())) {
+        return MAX_DISK_USABLE_SPACE_FIRST_STRATEGY;
+      } else if (simpleName.equals(MinFolderOccupiedSpaceFirstStrategy.class.getSimpleName())) {
+        return MIN_FOLDER_OCCUPIED_SPACE_FIRST_STRATEGY;
+      } else if (simpleName.equals(RandomOnDiskUsableSpaceStrategy.class.getSimpleName())) {
+        return RANDOM_ON_DISK_USABLE_SPACE_STRATEGY;
+      } else if (simpleName.equals(SequenceStrategy.class.getSimpleName())) {
+        return SEQUENCE_STRATEGY;
+      }
+      LOGGER.warn(UtilMessages.UNRECOGNIZED_MULTI_DIR_STRATEGY, className, SEQUENCE_STRATEGY);
+    }
+    return SEQUENCE_STRATEGY;
+  }
 }
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/MinFolderOccupiedSpaceFirstStrategy.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/MinFolderOccupiedSpaceFirstStrategy.java
index 2b8ffda..89d63a4 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/MinFolderOccupiedSpaceFirstStrategy.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/MinFolderOccupiedSpaceFirstStrategy.java
@@ -25,6 +25,7 @@
 import org.apache.iotdb.commons.utils.TestOnly;
 
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
@@ -126,7 +127,7 @@
       try {
         cachedOccupiedSpace[i] = JVMCommonUtils.getOccupiedSpace(folder);
         resetCannotCalculateOccupiedSpaceLogTime(folder);
-      } catch (IOException e) {
+      } catch (IOException | UncheckedIOException e) {
         logCannotCalculateOccupiedSpaceIfNecessary(folder, e);
         cachedOccupiedSpace[i] = Long.MAX_VALUE;
       }
@@ -135,7 +136,7 @@
     lastRefreshTimeMs = System.currentTimeMillis();
   }
 
-  private static void logCannotCalculateOccupiedSpaceIfNecessary(String folder, IOException e) {
+  private static void logCannotCalculateOccupiedSpaceIfNecessary(String folder, Exception e) {
     AtomicLong lastLogTime =
         CANNOT_CALCULATE_OCCUPIED_SPACE_LAST_LOG_TIME_MAP.computeIfAbsent(
             folder, key -> new AtomicLong(0L));
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java
index b6e9381..e755ef3 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java
@@ -58,6 +58,8 @@
   public static final String SOURCE_PATTERN_INCLUSION_KEY = "source.pattern.inclusion";
   public static final String EXTRACTOR_PATH_KEY = "extractor.path";
   public static final String SOURCE_PATH_KEY = "source.path";
+  public static final String EXTRACTOR_PATH_INCLUSION_KEY = "extractor.path.inclusion";
+  public static final String SOURCE_PATH_INCLUSION_KEY = "source.path.inclusion";
   public static final String EXTRACTOR_PATTERN_FORMAT_KEY = "extractor.pattern.format";
   public static final String SOURCE_PATTERN_FORMAT_KEY = "source.pattern.format";
   public static final String EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE = "prefix";
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
index 48c4c64..2dfbc7b 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
@@ -44,6 +44,7 @@
 import java.util.stream.Collectors;
 
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_EXCLUSION_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_INCLUSION_KEY;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_KEY;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_EXCLUSION_KEY;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE;
@@ -52,6 +53,7 @@
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_INCLUSION_KEY;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_KEY;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_EXCLUSION_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_INCLUSION_KEY;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_KEY;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_EXCLUSION_KEY;
 import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_FORMAT_KEY;
@@ -164,49 +166,9 @@
     final boolean isTreeModelDataAllowedToBeCaptured =
         isTreeModelDataAllowToBeCaptured(sourceParameters);
 
-    final boolean hasPatternInclusionKey =
-        sourceParameters.hasAnyAttributes(
-            EXTRACTOR_PATTERN_INCLUSION_KEY, SOURCE_PATTERN_INCLUSION_KEY);
-    final boolean hasPatternExclusionKey =
-        sourceParameters.hasAnyAttributes(
-            EXTRACTOR_PATTERN_EXCLUSION_KEY, SOURCE_PATTERN_EXCLUSION_KEY);
-    final boolean hasLegacyPathKey =
-        sourceParameters.hasAnyAttributes(EXTRACTOR_PATH_KEY, SOURCE_PATH_KEY);
-    final boolean hasLegacyPatternKey =
-        sourceParameters.hasAnyAttributes(EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY);
-    final boolean usePatternSyntax =
-        hasPatternInclusionKey
-            || (hasPatternExclusionKey && !hasLegacyPathKey && !hasLegacyPatternKey);
-
-    if (hasPatternInclusionKey && (hasLegacyPathKey || hasLegacyPatternKey)) {
-      final String msg =
-          String.format(
-              PipeMessages.PATTERN_INCLUSION_CANNOT_BE_USED_WITH_PATTERN_OR_PATH,
-              SOURCE_PATTERN_INCLUSION_KEY,
-              SOURCE_PATTERN_KEY,
-              SOURCE_PATH_KEY);
-      LOGGER.warn(msg);
-      throw new PipeException(msg);
-    }
-
     // 1. Parse INCLUSION patterns into a list
     List<TreePattern> inclusionPatterns =
-        usePatternSyntax
-            ? parseIoTDBPatternList(
-                sourceParameters.getStringByKeys(
-                    EXTRACTOR_PATTERN_INCLUSION_KEY, SOURCE_PATTERN_INCLUSION_KEY),
-                isTreeModelDataAllowedToBeCaptured,
-                true,
-                SOURCE_PATTERN_INCLUSION_KEY)
-            : parseLegacyPatternList(
-                sourceParameters,
-                isTreeModelDataAllowedToBeCaptured,
-                EXTRACTOR_PATH_KEY,
-                SOURCE_PATH_KEY,
-                EXTRACTOR_PATTERN_KEY,
-                SOURCE_PATTERN_KEY,
-                SOURCE_PATH_KEY,
-                SOURCE_PATTERN_KEY);
+        parseInclusionPatternList(sourceParameters, isTreeModelDataAllowedToBeCaptured);
 
     // If no inclusion patterns are specified, use default "root.**"
     if (inclusionPatterns.isEmpty()) {
@@ -217,35 +179,8 @@
     }
 
     // 2. Parse EXCLUSION patterns into a list
-    if (usePatternSyntax
-        && sourceParameters.hasAnyAttributes(
-            EXTRACTOR_PATH_EXCLUSION_KEY, SOURCE_PATH_EXCLUSION_KEY)) {
-      final String msg =
-          String.format(
-              PipeMessages.PATTERN_INCLUSION_CANNOT_BE_USED_WITH_PATH_EXCLUSION,
-              SOURCE_PATTERN_INCLUSION_KEY,
-              SOURCE_PATH_EXCLUSION_KEY);
-      LOGGER.warn(msg);
-      throw new PipeException(msg);
-    }
-
     List<TreePattern> exclusionPatterns =
-        usePatternSyntax
-            ? parseIoTDBPatternList(
-                sourceParameters.getStringByKeys(
-                    EXTRACTOR_PATTERN_EXCLUSION_KEY, SOURCE_PATTERN_EXCLUSION_KEY),
-                isTreeModelDataAllowedToBeCaptured,
-                true,
-                SOURCE_PATTERN_EXCLUSION_KEY)
-            : parseLegacyPatternList(
-                sourceParameters,
-                isTreeModelDataAllowedToBeCaptured,
-                EXTRACTOR_PATH_EXCLUSION_KEY,
-                SOURCE_PATH_EXCLUSION_KEY,
-                EXTRACTOR_PATTERN_EXCLUSION_KEY,
-                SOURCE_PATTERN_EXCLUSION_KEY,
-                SOURCE_PATH_EXCLUSION_KEY,
-                SOURCE_PATTERN_EXCLUSION_KEY);
+        parseExclusionPatternList(sourceParameters, isTreeModelDataAllowedToBeCaptured);
 
     // 3. Optimize the lists: remove redundant patterns (e.g., if "root.**" exists, "root.db" is
     // redundant)
@@ -264,6 +199,8 @@
               sourceParameters.getStringByKeys(
                   EXTRACTOR_PATTERN_INCLUSION_KEY,
                   SOURCE_PATTERN_INCLUSION_KEY,
+                  EXTRACTOR_PATH_INCLUSION_KEY,
+                  SOURCE_PATH_INCLUSION_KEY,
                   EXTRACTOR_PATH_KEY,
                   SOURCE_PATH_KEY,
                   EXTRACTOR_PATTERN_KEY,
@@ -382,49 +319,91 @@
         parseMultiplePatterns(trimmedPattern, basePatternSupplier));
   }
 
-  /**
-   * Helper method to parse legacy pattern parameters into a list of patterns without creating the
-   * Union object immediately.
-   */
-  private static List<TreePattern> parseLegacyPatternList(
-      final PipeParameters sourceParameters,
-      final boolean isTreeModelDataAllowedToBeCaptured,
-      final String extractorPathKey,
-      final String sourcePathKey,
-      final String extractorPatternKey,
-      final String sourcePatternKey,
-      final String pathKeyName,
-      final String patternKeyName) {
-
-    final String path = sourceParameters.getStringByKeys(extractorPathKey, sourcePathKey);
-    final String pattern = sourceParameters.getStringByKeys(extractorPatternKey, sourcePatternKey);
-
-    if (path != null && pattern != null) {
-      final String msg =
-          String.format(
-              PipeMessages.PATH_AND_PATTERN_CANNOT_BE_USED_TOGETHER, pathKeyName, patternKeyName);
-      LOGGER.warn(msg);
-      throw new PipeException(msg);
-    }
-
+  private static List<TreePattern> parseInclusionPatternList(
+      final PipeParameters sourceParameters, final boolean isTreeModelDataAllowedToBeCaptured) {
     final List<TreePattern> result = new ArrayList<>();
 
-    if (path != null) {
-      result.addAll(
-          parseIoTDBPatternList(path, isTreeModelDataAllowedToBeCaptured, false, pathKeyName));
-    }
+    addPatternsFromPatternParameterIfPresent(
+        result,
+        sourceParameters,
+        isTreeModelDataAllowedToBeCaptured,
+        EXTRACTOR_PATTERN_KEY,
+        SOURCE_PATTERN_KEY,
+        SOURCE_PATTERN_KEY);
+    addIoTDBPatternsIfPresent(
+        result,
+        sourceParameters,
+        isTreeModelDataAllowedToBeCaptured,
+        EXTRACTOR_PATTERN_INCLUSION_KEY,
+        SOURCE_PATTERN_INCLUSION_KEY,
+        SOURCE_PATTERN_INCLUSION_KEY);
+    addIoTDBPatternsIfPresent(
+        result,
+        sourceParameters,
+        isTreeModelDataAllowedToBeCaptured,
+        EXTRACTOR_PATH_KEY,
+        SOURCE_PATH_KEY,
+        SOURCE_PATH_KEY);
+    addIoTDBPatternsIfPresent(
+        result,
+        sourceParameters,
+        isTreeModelDataAllowedToBeCaptured,
+        EXTRACTOR_PATH_INCLUSION_KEY,
+        SOURCE_PATH_INCLUSION_KEY,
+        SOURCE_PATH_INCLUSION_KEY);
 
+    return result;
+  }
+
+  private static List<TreePattern> parseExclusionPatternList(
+      final PipeParameters sourceParameters, final boolean isTreeModelDataAllowedToBeCaptured) {
+    final List<TreePattern> result = new ArrayList<>();
+
+    addIoTDBPatternsIfPresent(
+        result,
+        sourceParameters,
+        isTreeModelDataAllowedToBeCaptured,
+        EXTRACTOR_PATTERN_EXCLUSION_KEY,
+        SOURCE_PATTERN_EXCLUSION_KEY,
+        SOURCE_PATTERN_EXCLUSION_KEY);
+    addIoTDBPatternsIfPresent(
+        result,
+        sourceParameters,
+        isTreeModelDataAllowedToBeCaptured,
+        EXTRACTOR_PATH_EXCLUSION_KEY,
+        SOURCE_PATH_EXCLUSION_KEY,
+        SOURCE_PATH_EXCLUSION_KEY);
+
+    return result;
+  }
+
+  private static void addIoTDBPatternsIfPresent(
+      final List<TreePattern> result,
+      final PipeParameters sourceParameters,
+      final boolean isTreeModelDataAllowedToBeCaptured,
+      final String extractorKey,
+      final String sourceKey,
+      final String parameterKey) {
+    final String pattern = sourceParameters.getStringByKeys(extractorKey, sourceKey);
+    if (pattern != null) {
+      result.addAll(
+          parseIoTDBPatternList(pattern, isTreeModelDataAllowedToBeCaptured, true, parameterKey));
+    }
+  }
+
+  private static void addPatternsFromPatternParameterIfPresent(
+      final List<TreePattern> result,
+      final PipeParameters sourceParameters,
+      final boolean isTreeModelDataAllowedToBeCaptured,
+      final String extractorKey,
+      final String sourceKey,
+      final String parameterKey) {
+    final String pattern = sourceParameters.getStringByKeys(extractorKey, sourceKey);
     if (pattern != null) {
       result.addAll(
           parsePatternsFromPatternParameter(
-              pattern,
-              sourceParameters,
-              isTreeModelDataAllowedToBeCaptured,
-              false,
-              patternKeyName));
+              pattern, sourceParameters, isTreeModelDataAllowedToBeCaptured, true, parameterKey));
     }
-
-    return result;
   }
 
   private static List<TreePattern> parseIoTDBPatternList(
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/JVMCommonUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/JVMCommonUtils.java
index 64279e7..029d270 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/JVMCommonUtils.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/JVMCommonUtils.java
@@ -136,8 +136,17 @@
 
   public static long getOccupiedSpace(String folderPath) throws IOException {
     Path folder = Paths.get(folderPath);
+    if (!Files.exists(folder)) {
+      return 0;
+    }
     try (Stream<Path> s = Files.walk(folder)) {
-      return s.filter(p -> p.toFile().isFile()).mapToLong(p -> p.toFile().length()).sum();
+      return s.filter(p -> p.toFile().isFile())
+          .mapToLong(
+              p -> {
+                File file = p.toFile();
+                return file.exists() ? file.length() : 0L;
+              })
+          .sum();
     }
   }
 
diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/disk/strategy/DirectoryStrategyTypeTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/disk/strategy/DirectoryStrategyTypeTest.java
new file mode 100644
index 0000000..90e56cf
--- /dev/null
+++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/disk/strategy/DirectoryStrategyTypeTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.disk.strategy;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class DirectoryStrategyTypeTest {
+
+  @Test
+  public void fromSimpleClassName() {
+    Assert.assertEquals(
+        DirectoryStrategyType.SEQUENCE_STRATEGY,
+        DirectoryStrategyType.fromClassName("SequenceStrategy"));
+    Assert.assertEquals(
+        DirectoryStrategyType.MAX_DISK_USABLE_SPACE_FIRST_STRATEGY,
+        DirectoryStrategyType.fromClassName("MaxDiskUsableSpaceFirstStrategy"));
+    Assert.assertEquals(
+        DirectoryStrategyType.MIN_FOLDER_OCCUPIED_SPACE_FIRST_STRATEGY,
+        DirectoryStrategyType.fromClassName("MinFolderOccupiedSpaceFirstStrategy"));
+    Assert.assertEquals(
+        DirectoryStrategyType.RANDOM_ON_DISK_USABLE_SPACE_STRATEGY,
+        DirectoryStrategyType.fromClassName("RandomOnDiskUsableSpaceStrategy"));
+  }
+
+  @Test
+  public void fromFullyQualifiedClassName() {
+    Assert.assertEquals(
+        DirectoryStrategyType.SEQUENCE_STRATEGY,
+        DirectoryStrategyType.fromClassName(SequenceStrategy.class.getName()));
+    Assert.assertEquals(
+        DirectoryStrategyType.MIN_FOLDER_OCCUPIED_SPACE_FIRST_STRATEGY,
+        DirectoryStrategyType.fromClassName(MinFolderOccupiedSpaceFirstStrategy.class.getName()));
+  }
+
+  @Test
+  public void nullOrUnknownFallsBackToSequence() {
+    // The configured default (dn_multi_dir_strategy=SequenceStrategy) and any unrecognized value
+    // must resolve to SEQUENCE_STRATEGY.
+    Assert.assertEquals(
+        DirectoryStrategyType.SEQUENCE_STRATEGY, DirectoryStrategyType.fromClassName(null));
+    Assert.assertEquals(
+        DirectoryStrategyType.SEQUENCE_STRATEGY,
+        DirectoryStrategyType.fromClassName("NoSuchStrategy"));
+  }
+}
diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePatternParseTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePatternParseTest.java
new file mode 100644
index 0000000..3ebcd5a
--- /dev/null
+++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePatternParseTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.datastructure.pattern;
+
+import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+public class TreePatternParseTest {
+
+  @Test
+  public void testPatternAndPatternInclusionPreserved() {
+    final PipeParameters params =
+        new PipeParameters(
+            new HashMap<String, String>() {
+              {
+                put(PipeSourceConstant.SOURCE_PATTERN_KEY, "root.sg.A");
+                put(PipeSourceConstant.SOURCE_PATTERN_INCLUSION_KEY, "root.sg.B");
+              }
+            });
+
+    final TreePattern result = TreePattern.parsePipePatternFromSourceParameters(params);
+
+    Assert.assertTrue(result instanceof UnionTreePattern);
+    Assert.assertEquals("root.sg.A,root.sg.B", result.getPattern());
+  }
+
+  @Test
+  public void testPathAndPathInclusionPreserved() {
+    final PipeParameters params =
+        new PipeParameters(
+            new HashMap<String, String>() {
+              {
+                put(PipeSourceConstant.SOURCE_PATH_KEY, "root.sg.d1");
+                put(PipeSourceConstant.SOURCE_PATH_INCLUSION_KEY, "root.sg.d2,root.sg.d3");
+              }
+            });
+
+    final TreePattern result = TreePattern.parsePipePatternFromSourceParameters(params);
+
+    Assert.assertTrue(result instanceof UnionIoTDBTreePattern);
+    Assert.assertEquals("root.sg.d1,root.sg.d2,root.sg.d3", result.getPattern());
+  }
+
+  @Test
+  public void testPathInclusionWithPathExclusionPreserved() {
+    final PipeParameters params =
+        new PipeParameters(
+            new HashMap<String, String>() {
+              {
+                put(PipeSourceConstant.SOURCE_PATH_INCLUSION_KEY, "root.sg.**");
+                put(PipeSourceConstant.SOURCE_PATH_EXCLUSION_KEY, "root.sg.d1,root.sg.d2");
+              }
+            });
+
+    final TreePattern result = TreePattern.parsePipePatternFromSourceParameters(params);
+
+    Assert.assertTrue(result instanceof WithExclusionIoTDBTreePattern);
+    Assert.assertEquals(
+        "INCLUSION(root.sg.**), EXCLUSION(root.sg.d1,root.sg.d2)", result.getPattern());
+  }
+}
diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/utils/JVMCommonUtilsTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/utils/JVMCommonUtilsTest.java
index 2b61178..16e0a22 100644
--- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/utils/JVMCommonUtilsTest.java
+++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/utils/JVMCommonUtilsTest.java
@@ -26,15 +26,21 @@
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.read.ListAppender;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 
 public class JVMCommonUtilsTest {
 
+  @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+
   @Test
   public void getJdkVersionTest() {
     try {
@@ -52,6 +58,24 @@
   }
 
   @Test
+  public void getOccupiedSpaceMissingFolderReturnsZero() throws IOException {
+    File missing = new File(tempFolder.getRoot(), "does-not-exist");
+    Assert.assertFalse(missing.exists());
+    // A non-existent folder must be treated as empty rather than throwing NoSuchFileException.
+    Assert.assertEquals(0L, JVMCommonUtils.getOccupiedSpace(missing.getAbsolutePath()));
+  }
+
+  @Test
+  public void getOccupiedSpaceSumsFileSizes() throws IOException {
+    File dir = tempFolder.newFolder("data");
+    byte[] payload = "hello-iotdb".getBytes(StandardCharsets.UTF_8);
+    Files.write(new File(dir, "a.txt").toPath(), payload);
+    Files.write(new File(dir, "b.txt").toPath(), payload);
+    Assert.assertEquals(
+        2L * payload.length, JVMCommonUtils.getOccupiedSpace(dir.getAbsolutePath()));
+  }
+
+  @Test
   public void unexpectedDiskSpaceErrorsLoggedOnlyOnceWhileErrorPersists() {
     ch.qos.logback.classic.Logger logger =
         (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(JVMCommonUtils.class);
diff --git a/pom.xml b/pom.xml
index 7561fa7..3e0aa63 100644
--- a/pom.xml
+++ b/pom.xml
@@ -785,6 +785,9 @@
                             <exclude>**/*.cvs</exclude>
                             <!-- licenses -->
                             <exclude>licenses/*</exclude>
+                            <!-- bundled third-party NOTICE / license texts for the C++ client package -->
+                            <exclude>**/package-metadata/third_party/NOTICE</exclude>
+                            <exclude>**/package-metadata/third_party/licenses/**</exclude>
                             <!-- only for Travis CI with WinOS-->
                             <exclude>hadoopbin</exclude>
                             <exclude>windowssystem32</exclude>