Merge origin/master into fix-log-throttling
diff --git a/.github/scripts/package-client-cpp-manylinux228.sh b/.github/scripts/package-client-cpp-manylinux228.sh index 773e835..6bfef0a 100755 --- a/.github/scripts/package-client-cpp-manylinux228.sh +++ b/.github/scripts/package-client-cpp-manylinux228.sh
@@ -71,9 +71,22 @@ cmake --version java -version +# manylinux_2_28 is AlmaLinux 8, whose system OpenSSL is 1.1.1 (EOL and not +# Apache-2.0 - must not be bundled/redistributed in an ASF convenience binary). +# Build OpenSSL 3.x from source instead (-Diotdb.openssl.from.source=ON), which +# keeps the glibc 2.28 baseline. OpenSSL 3.x's Configure needs perl plus a few +# modules (IPC::Cmd, Data::Dumper) that are not on the minimal image - install +# them even when perl itself is already present. +if command -v dnf >/dev/null 2>&1; then + dnf install -y perl perl-IPC-Cmd perl-Data-Dumper +else + yum install -y perl perl-IPC-Cmd perl-Data-Dumper +fi + cd "${GITHUB_WORKSPACE:?GITHUB_WORKSPACE is not set}" ./mvnw clean package -P with-cpp -pl iotdb-client/client-cpp -am -DskipTests \ -Dspotless.skip=true \ + -Diotdb.openssl.from.source=ON \ -Dclient.cpp.package.classifier="${PACKAGE_CLASSIFIER}" SO="iotdb-client/client-cpp/target/install/lib/libiotdb_session.so"
diff --git a/.github/workflows/client-cpp-package.yml b/.github/workflows/client-cpp-package.yml index 3c8c2b2..38eac3f 100644 --- a/.github/workflows/client-cpp-package.yml +++ b/.github/workflows/client-cpp-package.yml
@@ -309,10 +309,14 @@ shell: bash run: | set -euxo pipefail - brew install boost openssl llvm@17 bison + # Pin openssl@3 (Apache-2.0): the default 'openssl' formula will move to + # OpenSSL 4.0, which drops the legacy TLS-method APIs Thrift still uses. + brew install boost openssl@3 llvm@17 bison ln -sf "$(brew --prefix llvm@17)/bin/clang-format" "$(brew --prefix)/bin/clang-format" echo "$(brew --prefix bison)/bin" >> "$GITHUB_PATH" echo "$(brew --prefix llvm@17)/bin" >> "$GITHUB_PATH" + # Homebrew OpenSSL is keg-only, so point find_package(OpenSSL) at it. + echo "OPENSSL_ROOT_DIR=$(brew --prefix openssl@3)" >> "$GITHUB_ENV" clang-format --version bison --version - name: Cache Maven packages @@ -429,8 +433,16 @@ throw "Boost not found under C:\local after installing ${{ matrix.boost_choco }}" } echo $boostDir.FullName >> $env:GITHUB_PATH - choco install openssl -y --no-progress - $sslPath = (Get-ChildItem 'C:\Program Files\OpenSSL*' -Directory | Select-Object -First 1).FullName + # Use a pinned OpenSSL 3.x (Apache-2.0). 'choco install openssl' now + # installs OpenSSL 4.0, which removed the legacy TLS-method APIs that + # Apache Thrift's TSSLSocket still calls. The FireDaemon zip is a clean + # prebuilt OpenSSL 3.5.x that keeps them. + $sslZip = "$env:RUNNER_TEMP\openssl-3.5.3.zip" + $sslDir = "$env:RUNNER_TEMP\openssl-3" + curl.exe -L --fail --retry 3 -o $sslZip 'https://download.firedaemon.com/FireDaemon-OpenSSL/openssl-3.5.3.zip' + Expand-Archive -Path $sslZip -DestinationPath $sslDir -Force + $sslPath = (Get-ChildItem $sslDir -Recurse -Directory -Filter 'x64' | Select-Object -First 1).FullName + if (-not $sslPath) { throw "OpenSSL x64 dir not found under $sslDir" } echo "$sslPath\bin" >> $env:GITHUB_PATH echo "OPENSSL_ROOT_DIR=$sslPath" >> $env:GITHUB_ENV - name: Cache Maven packages
diff --git a/.github/workflows/multi-language-client.yml b/.github/workflows/multi-language-client.yml index 16c6d93..5437a65 100644 --- a/.github/workflows/multi-language-client.yml +++ b/.github/workflows/multi-language-client.yml
@@ -144,10 +144,13 @@ if: runner.os == 'macOS' shell: bash run: | - brew install boost openssl llvm@17 bison + # Pin openssl@3 (Apache-2.0); the default formula will move to OpenSSL 4.0. + brew install boost openssl@3 llvm@17 bison ln -sf "$(brew --prefix llvm@17)/bin/clang-format" "$(brew --prefix)/bin/clang-format" echo "$(brew --prefix bison)/bin" >> "$GITHUB_PATH" echo "$(brew --prefix llvm@17)/bin" >> "$GITHUB_PATH" + # Homebrew OpenSSL is keg-only, so point find_package(OpenSSL) at it. + echo "OPENSSL_ROOT_DIR=$(brew --prefix openssl@3)" >> "$GITHUB_ENV" clang-format --version bison --version sudo rm -rf /Applications/Xcode_14.3.1.app @@ -163,8 +166,14 @@ $boost_path = (Get-ChildItem -Path 'C:\local\' -Filter 'boost_*').FullName echo $boost_path >> $env:GITHUB_PATH - choco install openssl -y - $sslPath = (Get-ChildItem 'C:\Program Files\OpenSSL*' -Directory | Select-Object -First 1).FullName + # Pinned OpenSSL 3.x (Apache-2.0): 'choco install openssl' now installs + # OpenSSL 4.0, which removed the legacy TLS-method APIs Thrift uses. + $sslZip = "$env:RUNNER_TEMP\openssl-3.5.3.zip" + $sslDir = "$env:RUNNER_TEMP\openssl-3" + curl.exe -L --fail --retry 3 -o $sslZip 'https://download.firedaemon.com/FireDaemon-OpenSSL/openssl-3.5.3.zip' + Expand-Archive -Path $sslZip -DestinationPath $sslDir -Force + $sslPath = (Get-ChildItem $sslDir -Recurse -Directory -Filter 'x64' | Select-Object -First 1).FullName + if (-not $sslPath) { throw "OpenSSL x64 dir not found under $sslDir" } echo "$sslPath\bin" >> $env:GITHUB_PATH echo "OPENSSL_ROOT_DIR=$sslPath" >> $env:GITHUB_ENV choco install llvm --version=17.0.6 --force -y
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java index 5e41807..8399955 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
@@ -90,6 +90,31 @@ } @Override + public DataNodeConfig setMaxAllocateMemoryRatioForLoad(double maxAllocateMemoryRatioForLoad) { + properties.setProperty( + "max_allocate_memory_ratio_for_load", String.valueOf(maxAllocateMemoryRatioForLoad)); + return this; + } + + @Override + public DataNodeConfig setLoadTsFileTabletConversionBatchMemorySizeInBytes( + long loadTsFileTabletConversionBatchMemorySizeInBytes) { + properties.setProperty( + "load_tsfile_tablet_conversion_batch_memory_size_in_bytes", + String.valueOf(loadTsFileTabletConversionBatchMemorySizeInBytes)); + return this; + } + + @Override + public DataNodeConfig setLoadActiveListeningCheckIntervalSeconds( + long loadActiveListeningCheckIntervalSeconds) { + properties.setProperty( + "load_active_listening_check_interval_seconds", + String.valueOf(loadActiveListeningCheckIntervalSeconds)); + return this; + } + + @Override public DataNodeConfig setCompactionScheduleInterval(long compactionScheduleInterval) { properties.setProperty( "compaction_schedule_interval_in_ms", String.valueOf(compactionScheduleInterval)); @@ -143,4 +168,16 @@ setProperty("query_cost_stat_window", String.valueOf(queryCostStatWindow)); return this; } + + @Override + public DataNodeConfig setDnDataDirs(String dnDataDirs) { + setProperty("dn_data_dirs", dnDataDirs); + return this; + } + + @Override + public DataNodeConfig setDnMultiDirStrategy(String multiDirStrategy) { + setProperty("dn_multi_dir_strategy", multiDirStrategy); + return this; + } }
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java index dac6cf3..d205044 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
@@ -44,7 +44,6 @@ import static org.apache.iotdb.it.env.cluster.ClusterConstant.DEFAULT_DATA_NODE_PROPERTIES; import static org.apache.iotdb.it.env.cluster.ClusterConstant.DN_CONNECTION_TIMEOUT_MS; import static org.apache.iotdb.it.env.cluster.ClusterConstant.DN_CONSENSUS_DIR; -import static org.apache.iotdb.it.env.cluster.ClusterConstant.DN_DATA_DIRS; import static org.apache.iotdb.it.env.cluster.ClusterConstant.DN_DATA_REGION_CONSENSUS_PORT; import static org.apache.iotdb.it.env.cluster.ClusterConstant.DN_JOIN_CLUSTER_RETRY_INTERVAL_MS; import static org.apache.iotdb.it.env.cluster.ClusterConstant.DN_METRIC_INTERNAL_REPORTER_TYPE; @@ -125,7 +124,6 @@ immutableNodeProperties.setProperty(IoTDBConstant.DN_SEED_CONFIG_NODE, seedConfigNode); immutableNodeProperties.setProperty(DN_SYSTEM_DIR, MppBaseConfig.NULL_VALUE); - immutableNodeProperties.setProperty(DN_DATA_DIRS, MppBaseConfig.NULL_VALUE); immutableNodeProperties.setProperty(DN_CONSENSUS_DIR, MppBaseConfig.NULL_VALUE); immutableNodeProperties.setProperty(DN_WAL_DIRS, MppBaseConfig.NULL_VALUE); immutableNodeProperties.setProperty(DN_TRACING_DIR, MppBaseConfig.NULL_VALUE);
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java index bba4c96..a76608e 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
@@ -55,6 +55,23 @@ } @Override + public DataNodeConfig setMaxAllocateMemoryRatioForLoad(double maxAllocateMemoryRatioForLoad) { + return this; + } + + @Override + public DataNodeConfig setLoadTsFileTabletConversionBatchMemorySizeInBytes( + long loadTsFileTabletConversionBatchMemorySizeInBytes) { + return this; + } + + @Override + public DataNodeConfig setLoadActiveListeningCheckIntervalSeconds( + long loadActiveListeningCheckIntervalSeconds) { + return this; + } + + @Override public DataNodeConfig setCompactionScheduleInterval(long compactionScheduleInterval) { return this; } @@ -98,4 +115,14 @@ public DataNodeConfig setQueryCostStatWindow(int queryCostStatWindow) { return this; } + + @Override + public DataNodeConfig setDnDataDirs(String dnDataDirs) { + return this; + } + + @Override + public DataNodeConfig setDnMultiDirStrategy(String multiDirStrategy) { + return this; + } }
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java index d57015b..bc045c9 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
@@ -36,6 +36,14 @@ DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes( long loadTsFileAnalyzeSchemaMemorySizeInBytes); + DataNodeConfig setMaxAllocateMemoryRatioForLoad(double maxAllocateMemoryRatioForLoad); + + DataNodeConfig setLoadTsFileTabletConversionBatchMemorySizeInBytes( + long loadTsFileTabletConversionBatchMemorySizeInBytes); + + DataNodeConfig setLoadActiveListeningCheckIntervalSeconds( + long loadActiveListeningCheckIntervalSeconds); + DataNodeConfig setCompactionScheduleInterval(long compactionScheduleInterval); DataNodeConfig setEnableMQTTService(boolean enableMQTTService); @@ -53,4 +61,8 @@ DataNodeConfig setDataNodeMemoryProportion(String dataNodeMemoryProportion); DataNodeConfig setQueryCostStatWindow(int queryCostStatWindow); + + DataNodeConfig setDnDataDirs(String dnDataDirs); + + DataNodeConfig setDnMultiDirStrategy(String multiDirStrategy); }
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithDeletionMultiDataDirIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithDeletionMultiDataDirIT.java new file mode 100644 index 0000000..8214bf6 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithDeletionMultiDataDirIT.java
@@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1; + +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; + +import org.apache.tsfile.utils.Pair; +import org.awaitility.Awaitility; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getAllDataNodes; +import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getDataRegionMapWithLeader; + +/** + * Tree-model coverage for IoTConsensus region migration over multiple data dirs: a deletion (mods) + * must survive the snapshot transfer to the migrated peer. With several data dirs the snapshot + * fragments of one TsFile can be received into different folders, so the receiver groups companion + * files and the loader relinks them into one data dir; if that breaks, the migrated replica loses + * the deletion. See the table-model twin {@link IoTDBRegionMigrateWithDeletionMultiDataDirTableIT}. + */ +@RunWith(IoTDBTestRunner.class) +@Category({ClusterIT.class}) +public class IoTDBRegionMigrateWithDeletionMultiDataDirIT { + + private static final String MULTI_DATA_DIRS = + "data/datanode/data/disk0,data/datanode/data/disk1,data/datanode/data/disk2"; + + @Before + public void setUp() throws Exception { + EnvFactory.getEnv() + .getConfig() + .getCommonConfig() + .setDataReplicationFactor(2) + .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS); + EnvFactory.getEnv().getConfig().getDataNodeConfig().setDnDataDirs(MULTI_DATA_DIRS); + EnvFactory.getEnv().initClusterEnvironment(1, 3); + } + + @After + public void tearDown() throws Exception { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testRegionMigratePreservesDeletionWithMultiDataDirs() throws Exception { + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("CREATE DATABASE root.db"); + statement.execute( + "INSERT INTO root.db.d1(timestamp, s1) VALUES (100, 100), (200, 200), (300, 300)"); + statement.execute("FLUSH"); + statement.execute("DELETE FROM root.db.d1.s1 WHERE time <= 200"); + statement.execute("FLUSH"); + + Map<Integer, Pair<Integer, Set<Integer>>> dataRegionMapWithLeader = + getDataRegionMapWithLeader(statement); + int dataRegionIdForTest = + dataRegionMapWithLeader.keySet().stream().max(Integer::compareTo).orElseThrow(); + assertDeletionVisibleOnAllReplicas(dataRegionIdForTest, 1); + + Pair<Integer, Set<Integer>> leaderAndNodes = dataRegionMapWithLeader.get(dataRegionIdForTest); + Set<Integer> allDataNodes = getAllDataNodes(statement); + int leaderId = leaderAndNodes.getLeft(); + int followerId = + leaderAndNodes.getRight().stream().filter(id -> id != leaderId).findFirst().orElseThrow(); + int destDataNodeId = + allDataNodes.stream() + .filter(id -> id != leaderId && id != followerId) + .findFirst() + .orElseThrow(); + + statement.execute( + String.format( + "migrate region %d from %d to %d", dataRegionIdForTest, leaderId, destDataNodeId)); + + final int finalDestDataNodeId = destDataNodeId; + Awaitility.await() + .atMost(10, TimeUnit.MINUTES) + .pollDelay(1, TimeUnit.SECONDS) + .pollInterval(2, TimeUnit.SECONDS) + .untilAsserted( + () -> { + try (ResultSet showRegions = statement.executeQuery("SHOW REGIONS")) { + boolean migrated = false; + while (showRegions.next()) { + if (showRegions.getInt("RegionId") == dataRegionIdForTest + && showRegions.getInt("DataNodeId") == finalDestDataNodeId) { + migrated = true; + break; + } + } + Assert.assertTrue(migrated); + } + }); + + assertDeletionVisibleOnAllReplicas(dataRegionIdForTest, 1); + } + } + + private void assertDeletionVisibleOnAllReplicas(int dataRegionId, int expectedCount) + throws Exception { + Set<Integer> replicaDataNodeIds; + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + replicaDataNodeIds = getReplicaDataNodeIds(statement, dataRegionId); + } + for (int dataNodeId : replicaDataNodeIds) { + DataNodeWrapper dataNodeWrapper = + EnvFactory.getEnv().dataNodeIdToWrapper(dataNodeId).orElseThrow(); + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .pollDelay(500, TimeUnit.MILLISECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .untilAsserted(() -> assertDeletionVisibleOnReplica(dataNodeWrapper, expectedCount)); + } + } + + private void assertDeletionVisibleOnReplica(DataNodeWrapper dataNodeWrapper, int expectedCount) + throws Exception { + try (Connection connection = EnvFactory.getEnv().getConnection(dataNodeWrapper); + Statement dataNodeStatement = connection.createStatement()) { + try (ResultSet countResultSet = + dataNodeStatement.executeQuery("SELECT COUNT(s1) FROM root.db.d1")) { + Assert.assertTrue(countResultSet.next()); + Assert.assertEquals(expectedCount, countResultSet.getLong(1)); + } + try (ResultSet deletedRangeResultSet = + dataNodeStatement.executeQuery("SELECT s1 FROM root.db.d1 WHERE time <= 200")) { + Assert.assertFalse(deletedRangeResultSet.next()); + } + } + } + + private Set<Integer> getReplicaDataNodeIds(Statement statement, int dataRegionId) + throws Exception { + Set<Integer> replicaDataNodeIds = new HashSet<>(); + try (ResultSet showRegions = statement.executeQuery("SHOW REGIONS")) { + while (showRegions.next()) { + if ("DataRegion".equals(showRegions.getString("Type")) + && showRegions.getInt("RegionId") == dataRegionId) { + replicaDataNodeIds.add(showRegions.getInt("DataNodeId")); + } + } + } + Assert.assertFalse(replicaDataNodeIds.isEmpty()); + return replicaDataNodeIds; + } +}
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithDeletionMultiDataDirTableIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithDeletionMultiDataDirTableIT.java new file mode 100644 index 0000000..59d5a18 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithDeletionMultiDataDirTableIT.java
@@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1; + +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.isession.SessionConfig; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.TableClusterIT; +import org.apache.iotdb.itbase.env.BaseEnv; + +import org.apache.tsfile.utils.Pair; +import org.awaitility.Awaitility; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getAllDataNodes; +import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getDataRegionMapWithLeader; + +/** + * Table-model twin of {@link IoTDBRegionMigrateWithDeletionMultiDataDirIT}: a deletion (mods) must + * survive IoTConsensus region migration across multiple data dirs, asserted through the relational + * (table) SQL dialect so the table-model cluster CI covers the same snapshot mods-transfer path. + */ +@RunWith(IoTDBTestRunner.class) +@Category({TableClusterIT.class}) +public class IoTDBRegionMigrateWithDeletionMultiDataDirTableIT { + + private static final String MULTI_DATA_DIRS = + "data/datanode/data/disk0,data/datanode/data/disk1,data/datanode/data/disk2"; + + @Before + public void setUp() throws Exception { + EnvFactory.getEnv() + .getConfig() + .getCommonConfig() + .setDataReplicationFactor(2) + .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS); + EnvFactory.getEnv().getConfig().getDataNodeConfig().setDnDataDirs(MULTI_DATA_DIRS); + EnvFactory.getEnv().initClusterEnvironment(1, 3); + } + + @After + public void tearDown() throws Exception { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testRegionMigratePreservesDeletionWithMultiDataDirs() throws Exception { + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute("CREATE DATABASE test"); + statement.execute("USE test"); + statement.execute("CREATE TABLE t1 (s1 INT64 FIELD)"); + statement.execute("INSERT INTO t1 (time, s1) VALUES (100, 100), (200, 200), (300, 300)"); + statement.execute("FLUSH"); + statement.execute("DELETE FROM t1 WHERE time <= 200"); + statement.execute("FLUSH"); + + Map<Integer, Pair<Integer, Set<Integer>>> dataRegionMapWithLeader = + getDataRegionMapWithLeader(statement); + int dataRegionIdForTest = + dataRegionMapWithLeader.keySet().stream().max(Integer::compareTo).orElseThrow(); + assertDeletionVisibleOnAllReplicas(statement, dataRegionIdForTest, 1); + + Pair<Integer, Set<Integer>> leaderAndNodes = dataRegionMapWithLeader.get(dataRegionIdForTest); + Set<Integer> allDataNodes = getAllDataNodes(statement); + int leaderId = leaderAndNodes.getLeft(); + int followerId = + leaderAndNodes.getRight().stream().filter(id -> id != leaderId).findFirst().orElseThrow(); + int destDataNodeId = + allDataNodes.stream() + .filter(id -> id != leaderId && id != followerId) + .findFirst() + .orElseThrow(); + + statement.execute( + String.format( + "migrate region %d from %d to %d", dataRegionIdForTest, leaderId, destDataNodeId)); + + final int finalDestDataNodeId = destDataNodeId; + Awaitility.await() + .atMost(10, TimeUnit.MINUTES) + .pollDelay(1, TimeUnit.SECONDS) + .pollInterval(2, TimeUnit.SECONDS) + .untilAsserted( + () -> { + try (ResultSet showRegions = statement.executeQuery("SHOW REGIONS")) { + boolean migrated = false; + while (showRegions.next()) { + if (showRegions.getInt("RegionId") == dataRegionIdForTest + && showRegions.getInt("DataNodeId") == finalDestDataNodeId) { + migrated = true; + break; + } + } + Assert.assertTrue(migrated); + } + }); + + assertDeletionVisibleOnAllReplicas(statement, dataRegionIdForTest, 1); + } + } + + private void assertDeletionVisibleOnAllReplicas( + Statement statement, int dataRegionId, int expectedCount) throws Exception { + Set<Integer> replicaDataNodeIds = getReplicaDataNodeIds(statement, dataRegionId); + for (int dataNodeId : replicaDataNodeIds) { + DataNodeWrapper dataNodeWrapper = + EnvFactory.getEnv().dataNodeIdToWrapper(dataNodeId).orElseThrow(); + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .pollDelay(500, TimeUnit.MILLISECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .untilAsserted(() -> assertDeletionVisibleOnReplica(dataNodeWrapper, expectedCount)); + } + } + + private void assertDeletionVisibleOnReplica(DataNodeWrapper dataNodeWrapper, int expectedCount) + throws Exception { + try (Connection connection = + EnvFactory.getEnv() + .getConnection( + dataNodeWrapper, + SessionConfig.DEFAULT_USER, + SessionConfig.DEFAULT_PASSWORD, + BaseEnv.TABLE_SQL_DIALECT); + Statement dataNodeStatement = connection.createStatement()) { + dataNodeStatement.execute("USE test"); + try (ResultSet countResultSet = dataNodeStatement.executeQuery("SELECT COUNT(s1) FROM t1")) { + Assert.assertTrue(countResultSet.next()); + Assert.assertEquals(expectedCount, countResultSet.getLong(1)); + } + try (ResultSet deletedRangeResultSet = + dataNodeStatement.executeQuery("SELECT s1 FROM t1 WHERE time <= 200")) { + Assert.assertFalse(deletedRangeResultSet.next()); + } + } + } + + private Set<Integer> getReplicaDataNodeIds(Statement statement, int dataRegionId) + throws Exception { + Set<Integer> replicaDataNodeIds = new HashSet<>(); + try (ResultSet showRegions = statement.executeQuery("SHOW REGIONS")) { + while (showRegions.next()) { + if ("DataRegion".equals(showRegions.getString("Type")) + && showRegions.getInt("RegionId") == dataRegionId) { + replicaDataNodeIds.add(showRegions.getInt("DataNodeId")); + } + } + } + Assert.assertFalse(replicaDataNodeIds.isEmpty()); + return replicaDataNodeIds; + } +}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileActiveRetryIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileActiveRetryIT.java new file mode 100644 index 0000000..3827615 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileActiveRetryIT.java
@@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.it; + +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.it.utils.TsFileGenerator; +import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.itbase.category.LocalStandaloneIT; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.io.File; +import java.nio.file.Files; +import java.sql.Connection; +import java.sql.Statement; +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +@RunWith(IoTDBTestRunner.class) +@Category({LocalStandaloneIT.class, ClusterIT.class}) +public class IoTDBLoadTsFileActiveRetryIT { + + private static final String DATABASE = "root.sg.test_0"; + private static final String DEVICE = DATABASE + ".d_0"; + private static final String MEASUREMENT = "sensor_00"; + private static final long UNALLOCATABLE_TABLET_CONVERSION_BATCH_MEMORY_SIZE_IN_BYTES = + Long.MAX_VALUE / 4; + private static final MeasurementSchema TSFILE_SCHEMA = + new MeasurementSchema(MEASUREMENT, TSDataType.INT32, TSEncoding.RLE); + + private File tmpDir; + + @Before + public void setUp() throws Exception { + tmpDir = new File(Files.createTempDirectory("load-active-retry").toUri()); + EnvFactory.getEnv().getConfig().getCommonConfig().setPipeMemoryManagementEnabled(false); + EnvFactory.getEnv() + .getConfig() + .getDataNodeConfig() + .setMaxAllocateMemoryRatioForLoad(1.0) + .setLoadTsFileAnalyzeSchemaMemorySizeInBytes(1) + .setLoadTsFileTabletConversionBatchMemorySizeInBytes( + UNALLOCATABLE_TABLET_CONVERSION_BATCH_MEMORY_SIZE_IN_BYTES) + .setLoadActiveListeningCheckIntervalSeconds(1); + + EnvFactory.getEnv().initClusterEnvironment(); + } + + @After + public void tearDown() throws Exception { + try (final Connection connection = EnvFactory.getEnv().getConnection(); + final Statement statement = connection.createStatement()) { + statement.execute("delete database " + DATABASE); + } catch (final Exception ignored) { + // ignore cleanup failure + } finally { + EnvFactory.getEnv().cleanClusterEnvironment(); + deleteRecursively(tmpDir); + } + } + + @Test + public void testActiveLoadTemporaryUnavailableShouldKeepFileForRetry() throws Exception { + final DataNodeWrapper dataNodeWrapper = EnvFactory.getEnv().getDataNodeWrapper(0); + final File retryTsFile = new File(tmpDir, "1-0-0-0.tsfile"); + final File permanentFailureTsFile = new File(tmpDir, "2-0-0-0.tsfile"); + generateTsFile(retryTsFile); + generateTsFile(permanentFailureTsFile); + + try (final Connection connection = + EnvFactory.getEnv().getConnectionWithSpecifiedDataNode(dataNodeWrapper); + final Statement statement = connection.createStatement()) { + statement.execute("create database " + DATABASE); + statement.execute( + String.format( + "create timeseries %s.%s %s", DEVICE, MEASUREMENT, TSDataType.INT64.name())); + + statement.execute( + String.format( + "load \"%s\" with ('database-level'='3', 'async'='true', 'on-success'='none', " + + "'convert-on-type-mismatch'='true')", + retryTsFile.getAbsolutePath())); + statement.execute( + String.format( + "load \"%s\" with ('database-level'='3', 'async'='true', 'on-success'='none', " + + "'convert-on-type-mismatch'='false')", + permanentFailureTsFile.getAbsolutePath())); + + final File activeDir = getActiveLoadDir(dataNodeWrapper); + final File failDir = getActiveLoadFailDir(dataNodeWrapper); + final File activeTsFile = waitForFile(activeDir, retryTsFile.getName(), 30_000L); + + Assert.assertNotNull( + "Async load should copy tsfile into active load directory", activeTsFile); + + Assert.assertNotNull( + "Permanent active load failure should be moved to fail dir", + waitForFile(failDir, permanentFailureTsFile.getName(), TimeUnit.SECONDS.toMillis(60))); + + assertFileKeptForRetry( + activeDir, failDir, retryTsFile.getName(), TimeUnit.SECONDS.toMillis(12)); + } + } + + private void generateTsFile(final File tsFile) throws Exception { + try (final TsFileGenerator generator = new TsFileGenerator(tsFile)) { + generator.registerTimeseries(DEVICE, Collections.singletonList(TSFILE_SCHEMA)); + generator.generateData(DEVICE, 10, 1, false); + } + } + + private File getActiveLoadDir(final DataNodeWrapper dataNodeWrapper) { + return new File( + dataNodeWrapper.getNodePath() + + File.separator + + "ext" + + File.separator + + "load" + + File.separator + + "pending"); + } + + private File getActiveLoadFailDir(final DataNodeWrapper dataNodeWrapper) { + return new File( + dataNodeWrapper.getNodePath() + + File.separator + + "ext" + + File.separator + + "load" + + File.separator + + "failed"); + } + + private File waitForFile(final File root, final String fileName, final long timeoutMs) + throws InterruptedException { + final long deadline = System.currentTimeMillis() + timeoutMs; + while (System.currentTimeMillis() < deadline) { + final File file = findFile(root, fileName); + if (file != null) { + return file; + } + Thread.sleep(500L); + } + return null; + } + + private boolean containsFile(final File root, final String fileName) { + return findFile(root, fileName) != null; + } + + private void assertFileKeptForRetry( + final File activeDir, final File failDir, final String fileName, final long observationMs) + throws InterruptedException { + final long deadline = System.currentTimeMillis() + observationMs; + while (System.currentTimeMillis() < deadline) { + Assert.assertTrue( + "Temporary unavailable active load should keep tsfile for retry", + containsFile(activeDir, fileName)); + Assert.assertFalse( + "Temporary unavailable active load must not move tsfile to fail dir", + containsFile(failDir, fileName)); + Thread.sleep(500L); + } + } + + private File findFile(final File root, final String fileName) { + if (root == null || !root.exists()) { + return null; + } + if (root.isFile()) { + return root.getName().equals(fileName) ? root : null; + } + + final File[] children = root.listFiles(); + if (children == null) { + return null; + } + for (final File child : children) { + final File file = findFile(child, fileName); + if (file != null) { + return file; + } + } + return null; + } + + private void deleteRecursively(final File file) { + if (file == null || !file.exists()) { + return; + } + final File[] children = file.listFiles(); + if (children != null) { + for (final File child : children) { + deleteRecursively(child); + } + } + Assert.assertTrue(file.delete()); + } +}
diff --git a/iotdb-client/client-cpp/CMakeLists.txt b/iotdb-client/client-cpp/CMakeLists.txt index 749341d..ad357dd 100644 --- a/iotdb-client/client-cpp/CMakeLists.txt +++ b/iotdb-client/client-cpp/CMakeLists.txt
@@ -78,7 +78,7 @@ file(WRITE "${_iotdb_cxx11_abi_stamp}" "${_iotdb_cxx11_abi_stamp_value}") endif() -option(WITH_SSL "Build with OpenSSL support" OFF) +option(WITH_SSL "Build with OpenSSL support" ON) option(BUILD_TESTING "Build IT test executables" OFF) option(IOTDB_OFFLINE "Disable all network access during configure" OFF) set(IOTDB_SESSION_VERSION "0.0.0" @@ -97,7 +97,7 @@ endif() set(BOOST_VERSION "${_iotdb_default_boost_version}" CACHE STRING "Boost version used when downloading / unpacking (Thrift build only)") -set(THRIFT_VERSION "0.21.0" +set(THRIFT_VERSION "0.23.0" CACHE STRING "Apache Thrift version used when downloading / building") if(WIN32) @@ -120,6 +120,7 @@ include(FetchBuildTools) if(WITH_SSL) include(FetchOpenSSL) + include(InstallOpenSSLRuntime) endif() include(FetchThrift) include(GenerateThriftSources) @@ -144,6 +145,22 @@ SOVERSION "${IOTDB_SESSION_SOVERSION}") endif() +# When SSL is on we bundle the OpenSSL shared libraries next to libiotdb_session +# in the package lib/ directory. Give the library an $ORIGIN-relative runtime +# search path so the loader finds them without LD_LIBRARY_PATH / install_name +# tweaks, keeping the SDK self-contained. +if(WITH_SSL) + if(APPLE) + set_target_properties(iotdb_session PROPERTIES + BUILD_RPATH "@loader_path" + INSTALL_RPATH "@loader_path") + elseif(UNIX) + set_target_properties(iotdb_session PROPERTIES + BUILD_RPATH "$ORIGIN" + INSTALL_RPATH "$ORIGIN") + endif() +endif() + add_dependencies(iotdb_session iotdb_thrift_external iotdb_thrift_codegen) target_compile_definitions(iotdb_session PRIVATE THRIFT_STATIC_DEFINE IOTDB_BUILDING_SHARED) @@ -223,6 +240,12 @@ LIBRARY DESTINATION lib ARCHIVE DESTINATION lib) +# Ship the OpenSSL shared libraries we link against next to iotdb_session so the +# packaged SDK is self-contained on machines without a system OpenSSL. +if(WITH_SSL) + iotdb_install_openssl_runtime() +endif() + foreach(_hdr IN LISTS IOTDB_PUBLIC_HEADERS) install(FILES "${CMAKE_CURRENT_SOURCE_DIR}/src/include/${_hdr}" DESTINATION include)
diff --git a/iotdb-client/client-cpp/README.md b/iotdb-client/client-cpp/README.md index 2572fd8..a882937 100644 --- a/iotdb-client/client-cpp/README.md +++ b/iotdb-client/client-cpp/README.md
@@ -300,7 +300,7 @@ | ppc64le | `quay.io/pypa/manylinux_2_28_ppc64le` | | s390x | `quay.io/pypa/manylinux_2_28_s390x` | -Thrift **0.21.0** is compiled from source during the CMake configure step (see +Thrift **0.23.0** is compiled from source during the CMake configure step (see `cmake/FetchThrift.cmake`). Older releases that used pre-built `iotdb-tools-thrift` Maven artifacts and `-Diotdb-tools-thrift.version=...` for glibc/MSVC compatibility apply only to the **legacy** client-cpp build; @@ -378,13 +378,13 @@ | Option | Default | Purpose | |-----------------------|----------------------------------|----------------------------------------------------------------------------------------------------------| -| `WITH_SSL` | `OFF` | Link against OpenSSL. See *SSL* below. | +| `WITH_SSL` | `ON` | Link against OpenSSL and bundle its runtime libraries. See *SSL* below. | | `BUILD_TESTING` | `OFF` (Maven sets `ON` for verify) | Build Catch2 IT executables (Catch2 v2.13.7 header downloaded at configure time). | | `CATCH2_INCLUDE_DIR` | (unset) | Pre-downloaded Catch2 include dir (Maven sets this under `target/test/catch2`). | | `IOTDB_OFFLINE` | `OFF` | Disallow any network access during configure. | | `IOTDB_DEPS_DIR` | `<client-cpp>/third-party` | Override the local tarball cache directory. | | `BOOST_VERSION` | `1.60.0` (`1.84.0` on macOS) | Boost version that CMake will look for / download. | -| `THRIFT_VERSION` | `0.21.0` | Apache Thrift version to build from source. | +| `THRIFT_VERSION` | `0.23.0` | Apache Thrift version to build from source. | | `BOOST_ROOT` | (unset) | Existing Boost install to reuse, equivalent to `-Dboost.include.dir=...` from the legacy build. | | `OPENSSL_ROOT_DIR` | (unset) | Existing OpenSSL install when `WITH_SSL=ON`. | | `CMAKE_INSTALL_PREFIX`| `<build>/install` | Install location. | @@ -427,12 +427,12 @@ | Platform | Required files | |------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------| - | `linux/` | `thrift-0.21.0.tar.gz`, `boost_1_60_0.tar.gz`, `m4-1.4.19.tar.gz`, `flex-2.6.4.tar.gz`, `bison-3.8.tar.gz` (and `openssl-3.5.0.tar.gz` when `WITH_SSL=ON`) | - | `mac/` | `thrift-0.21.0.tar.gz`, `boost_1_84_0.tar.gz` (newer Boost for Xcode/Clang; Apple ships m4/flex/bison; `openssl-3.5.0.tar.gz` optional) | - | `windows/` | `thrift-0.21.0.tar.gz`, `boost_1_60_0.tar.gz` (Boost headers only - no `b2` build required for `iotdb_session`) | + | `linux/` | `thrift-0.23.0.tar.gz`, `boost_1_60_0.tar.gz`, `m4-1.4.19.tar.gz`, `flex-2.6.4.tar.gz`, `bison-3.8.tar.gz` (and `openssl-3.5.0.tar.gz` only when `WITH_SSL=ON` and no system OpenSSL is present) | + | `mac/` | `thrift-0.23.0.tar.gz`, `boost_1_84_0.tar.gz` (newer Boost for Xcode/Clang; Apple ships m4/flex/bison; `openssl-3.5.0.tar.gz` optional) | + | `windows/` | `thrift-0.23.0.tar.gz`, `boost_1_60_0.tar.gz` (Boost headers only - no `b2` build required for `iotdb_session`) | Reference URLs (the configure step uses the same): - - Apache Thrift 0.21.0: <https://archive.apache.org/dist/thrift/0.21.0/thrift-0.21.0.tar.gz> + - Apache Thrift 0.23.0: <https://archive.apache.org/dist/thrift/0.23.0/thrift-0.23.0.tar.gz> - Boost 1.60.0: <https://archives.boost.io/release/1.60.0/source/boost_1_60_0.tar.gz> - GNU m4 1.4.19: <https://ftp.gnu.org/gnu/m4/m4-1.4.19.tar.gz> - GNU flex 2.6.4: <https://github.com/westes/flex/releases/download/v2.6.4/flex-2.6.4.tar.gz> @@ -461,7 +461,7 @@ ### Linux - Tested with GCC 7+ and Clang 9+. Anything that can compile Apache Thrift - 0.21.0 works. + 0.23.0 works. - Build deps that must already exist on the host (only required when CMake auto-builds m4/flex/bison from tarball): `make`, `autoconf`, `gcc`, plus the standard C/C++ toolchain. `sudo` is **not** required; @@ -492,9 +492,11 @@ 2. **flex / bison.** Install <https://sourceforge.net/projects/winflexbison/> and rename `win_flex.exe`→`flex.exe`, `win_bison.exe`→`bison.exe` on `PATH`. -3. **OpenSSL** *(only when `WITH_SSL=ON`)*: run the Win64 OpenSSL - installer from <https://slproweb.com/products/Win32OpenSSL.html>, then - pass `-DOPENSSL_ROOT_DIR=...` to CMake. +3. **OpenSSL** *(`WITH_SSL=ON` is the default)*: install OpenSSL — e.g. + `choco install openssl`, or a Win64 OpenSSL installer from + <https://slproweb.com/products/Win32OpenSSL.html> — then pass + `-DOPENSSL_ROOT_DIR=...` to CMake if it is not auto-detected. Pass + `-DWITH_SSL=OFF` to build without SSL. On Windows the SDK ships as **`iotdb_session.dll`** plus an import library **`iotdb_session.lib`**, built with **`/MD`** (dynamic CRT, same as a @@ -507,16 +509,27 @@ ## SSL -Both Thrift and `iotdb_session` build without OpenSSL by default. Enable -SSL with `-Dwith.ssl=ON` (Maven) or `-DWITH_SSL=ON` (standalone CMake). -CMake first calls `find_package(OpenSSL)`; -if nothing is found, it falls back to: +`iotdb_session` builds **with OpenSSL by default** (`WITH_SSL=ON`). Disable +it with `-Dwith.ssl=OFF` (Maven) or `-DWITH_SSL=OFF` (standalone CMake). -- **Linux / macOS** – use a local `openssl-<ver>.tar.gz` (or download it - when not in offline mode), configure with `no-shared`, install into - `build/_deps/openssl/install`, and link statically. -- **Windows** – fail with a friendly message that points at the Win64 - OpenSSL installer. Building OpenSSL from source via MSVC is out of scope. +OpenSSL **3.x** is used (Apache-2.0 licensed). Note that **OpenSSL 4.0 removed** +the legacy TLS-method APIs (`TLSv1_method`, `SSLv3_method`, …) that Apache +Thrift's `TSSLSocket` still calls, so install/point at a 3.x build, not 4.0. + +CMake calls `find_package(OpenSSL)` and uses the system OpenSSL it finds. Its +shared libraries are **bundled into the package `lib/` directory** (next to +`iotdb_session`, which records an `$ORIGIN`/`@loader_path` runtime path) so the +published SDK is self-contained. + +Fallbacks: + +- **Linux / macOS** – when no system OpenSSL is found (or + `-DIOTDB_OPENSSL_FROM_SOURCE=ON`, which the Linux packaging build uses so the + AlmaLinux 8 baseline's OpenSSL 1.1.1 is never redistributed), build + `openssl-3.5.0.tar.gz` from source as **shared** libraries and bundle them. +- **Windows** – fail with a friendly message; install a prebuilt OpenSSL 3.x + (e.g. the FireDaemon or slproweb 3.5.x zip) and set `-DOPENSSL_ROOT_DIR=...`. + Building OpenSSL from source via MSVC is out of scope. ## Tests
diff --git a/iotdb-client/client-cpp/README_zh.md b/iotdb-client/client-cpp/README_zh.md index 5f12c71..7c4326d 100644 --- a/iotdb-client/client-cpp/README_zh.md +++ b/iotdb-client/client-cpp/README_zh.md
@@ -236,14 +236,18 @@ | CMake 变量 | Maven 属性 | |------------|------------| -| `WITH_SSL` | `with.ssl`,例如 `-Dwith.ssl=ON` | +| `WITH_SSL` | `with.ssl`(默认 `ON`,关闭用 `-Dwith.ssl=OFF`) | | `IOTDB_OFFLINE` | `iotdb.offline` | | `BUILD_TESTING` | `build.tests` | | `IOTDB_DEPS_DIR` | `iotdb.deps.dir` | | `BOOST_INCLUDEDIR` | `boost.include.dir` | | `CMAKE_BUILD_TYPE` | `cmake.build.type`,例如 `-Dcmake.build.type=Debug` | -直接使用 CMake 时传入 `-DWITH_SSL=ON`、`-DIOTDB_OFFLINE=ON` 等即可。 +SSL 默认开启(`WITH_SSL=ON`)。所捆绑的 Apache Thrift 0.23 同时支持 OpenSSL 1.x +与 3.x,因此直接使用系统的 OpenSSL(任意版本)。CMake 通过 `find_package(OpenSSL)` +解析系统 OpenSSL,找不到时回退到从源码构建 OpenSSL 3.5.0;并会把所用的 OpenSSL +动态库一并复制到产物 `lib/` 目录。Windows 可用 `choco install openssl` 安装。 +直接使用 CMake 时传入 `-DWITH_SSL=OFF`、`-DIOTDB_OFFLINE=ON` 等即可。 Debug 构建请在配置阶段传入 `-DCMAKE_BUILD_TYPE=Debug`。Windows 使用 Visual Studio 生成器时也需要传入该选项,以便内置 Thrift 静态库使用 Debug MSVC 运行时; 随后用 `cmake --build build --config Debug --target install` 构建安装。
diff --git a/iotdb-client/client-cpp/cmake/FetchBuildTools.cmake b/iotdb-client/client-cpp/cmake/FetchBuildTools.cmake index c9d7482..866cc55 100644 --- a/iotdb-client/client-cpp/cmake/FetchBuildTools.cmake +++ b/iotdb-client/client-cpp/cmake/FetchBuildTools.cmake
@@ -253,8 +253,23 @@ endif() message(STATUS "[BuildTools] flex = ${FLEX_EXECUTABLE}") -# bison +# bison - Thrift 0.23's grammar build uses bison >= 3.7 features (e.g. the +# --file-prefix-map option), so reject an older system bison (manylinux_2_28 +# ships 3.0.4) and build ${BISON_VERSION} from source instead. +set(_bison_min_version "3.7") find_program(BISON_EXECUTABLE bison) +if(BISON_EXECUTABLE) + execute_process(COMMAND "${BISON_EXECUTABLE}" --version + OUTPUT_VARIABLE _bison_ver_out ERROR_QUIET + OUTPUT_STRIP_TRAILING_WHITESPACE) + string(REGEX MATCH "[0-9]+\\.[0-9]+(\\.[0-9]+)?" _bison_ver "${_bison_ver_out}") + if(_bison_ver AND _bison_ver VERSION_LESS _bison_min_version) + message(STATUS + "[BuildTools] system bison ${_bison_ver} < ${_bison_min_version} " + "(too old for Thrift ${THRIFT_VERSION}); building ${BISON_VERSION} from source") + unset(BISON_EXECUTABLE CACHE) + endif() +endif() if(NOT BISON_EXECUTABLE) _iotdb_resolve_tarball(_bison_tarball "bison-${BISON_VERSION}.tar.gz" "${_bison_url}") _iotdb_build_autotools(bison "${_bison_tarball}" "bison-${BISON_VERSION}")
diff --git a/iotdb-client/client-cpp/cmake/FetchOpenSSL.cmake b/iotdb-client/client-cpp/cmake/FetchOpenSSL.cmake index 575e280..aaf41b8 100644 --- a/iotdb-client/client-cpp/cmake/FetchOpenSSL.cmake +++ b/iotdb-client/client-cpp/cmake/FetchOpenSSL.cmake
@@ -18,14 +18,16 @@ # ============================================================================= # FetchOpenSSL.cmake (only included when WITH_SSL=ON) # +# Apache Thrift 0.23 (bundled by this client) builds against OpenSSL 1.x and 3.x, +# so any system OpenSSL is used as-is, whatever its version. +# # Resolution order: # 1. find_package(OpenSSL) - any system / vendor install is taken as-is. -# 2. On Linux/macOS: -# use tarball ${IOTDB_OS_DEPS_DIR}/openssl-${OPENSSL_VERSION}.tar.gz +# 2. On Linux/macOS, when no system OpenSSL is present: +# use tarball ${IOTDB_OS_DEPS_DIR}/openssl-${OPENSSL_FALLBACK_VERSION}.tar.gz # or download from openssl.org when not in offline mode, then -# ./Configure && make && make install_sw into ${CMAKE_BINARY_DIR}/_deps/openssl. -# 3. On Windows: emit a FATAL_ERROR with instructions to run the bundled -# Win64OpenSSL installer (or any other prebuilt OpenSSL); building +# ./config && make && make install_sw into ${CMAKE_BINARY_DIR}/_deps/openssl. +# 3. On Windows: emit a FATAL_ERROR asking for a prebuilt OpenSSL; building # OpenSSL from source on MSVC is out of scope. # # Side effects: @@ -33,24 +35,35 @@ # so callers can just link against them. # ============================================================================= -set(OPENSSL_VERSION "3.5.0" CACHE STRING "OpenSSL version to fetch when missing") +# Version built from source when no system OpenSSL is found. Named distinctly +# from find_package's OPENSSL_VERSION output variable to avoid collisions. +set(OPENSSL_FALLBACK_VERSION "3.5.0" + CACHE STRING "OpenSSL version built from source when no system OpenSSL is found") -find_package(OpenSSL QUIET) -if(OpenSSL_FOUND) - message(STATUS "[OpenSSL] using system OpenSSL ${OPENSSL_VERSION_MAJOR}.${OPENSSL_VERSION_MINOR}") - return() +# Build OpenSSL from source even if a system one exists. Used by the Linux +# packaging build, whose AlmaLinux 8 baseline ships OpenSSL 1.1.1 (EOL, not +# Apache-2.0, must not be redistributed) - we build 3.x there instead. +option(IOTDB_OPENSSL_FROM_SOURCE + "Ignore any system OpenSSL and build OpenSSL ${OPENSSL_FALLBACK_VERSION} from source" OFF) + +if(NOT IOTDB_OPENSSL_FROM_SOURCE) + find_package(OpenSSL QUIET) + if(OpenSSL_FOUND) + message(STATUS "[OpenSSL] using system OpenSSL ${OPENSSL_VERSION}") + return() + endif() endif() if(WIN32) message(FATAL_ERROR "[OpenSSL] WITH_SSL=ON but no OpenSSL was found on Windows. " - "Please run third-party/windows/Win64OpenSSL-3_5_0.exe (or any " - "OpenSSL installer), then re-run the configure step with " - "-DOPENSSL_ROOT_DIR=<install_path>.") + "Please install a prebuilt OpenSSL (e.g. 'choco install openssl'), " + "then re-run the configure step with -DOPENSSL_ROOT_DIR=<install_path>. " + "Pass -DWITH_SSL=OFF to build without SSL.") endif() -# --- Linux / macOS fallback: build from source --------------------------- -set(_ossl_tarname "openssl-${OPENSSL_VERSION}.tar.gz") +# --- Linux / macOS: build OpenSSL ${OPENSSL_FALLBACK_VERSION} from source - +set(_ossl_tarname "openssl-${OPENSSL_FALLBACK_VERSION}.tar.gz") set(_ossl_tarball "${IOTDB_OS_DEPS_DIR}/${_ossl_tarname}") if(NOT EXISTS "${_ossl_tarball}") @@ -71,9 +84,9 @@ endif() set(_ossl_root "${CMAKE_BINARY_DIR}/_deps/openssl") -set(_ossl_src "${_ossl_root}/src/openssl-${OPENSSL_VERSION}") +set(_ossl_src "${_ossl_root}/src/openssl-${OPENSSL_FALLBACK_VERSION}") set(_ossl_inst "${_ossl_root}/install") -set(_ossl_stamp "${_ossl_root}/.built-${OPENSSL_VERSION}") +set(_ossl_stamp "${_ossl_root}/.built-${OPENSSL_FALLBACK_VERSION}") if(NOT EXISTS "${_ossl_stamp}") file(REMOVE_RECURSE "${_ossl_root}/src") @@ -88,12 +101,15 @@ endif() message(STATUS "[OpenSSL] configuring -> ${_ossl_inst}") + # ./config auto-detects the platform target. Build SHARED libraries + # (libssl.so.3 / libcrypto.so.3) so they can be bundled next to + # libiotdb_session and shipped as the SDK's OpenSSL runtime. execute_process( - COMMAND ./Configure --prefix=${_ossl_inst} --openssldir=${_ossl_inst}/ssl no-shared + COMMAND ./config --prefix=${_ossl_inst} --openssldir=${_ossl_inst}/ssl shared WORKING_DIRECTORY "${_ossl_src}" RESULT_VARIABLE _rc) if(NOT _rc EQUAL 0) - message(FATAL_ERROR "[OpenSSL] Configure failed (rc=${_rc})") + message(FATAL_ERROR "[OpenSSL] config failed (rc=${_rc})") endif() message(STATUS "[OpenSSL] building (-j${_jobs})") @@ -116,6 +132,6 @@ endif() set(OPENSSL_ROOT_DIR "${_ossl_inst}" CACHE PATH "OpenSSL root" FORCE) -set(OPENSSL_USE_STATIC_LIBS ON) +set(OPENSSL_USE_STATIC_LIBS OFF) find_package(OpenSSL REQUIRED) -message(STATUS "[OpenSSL] built locally at ${OPENSSL_ROOT_DIR}") +message(STATUS "[OpenSSL] built locally (shared) at ${OPENSSL_ROOT_DIR}")
diff --git a/iotdb-client/client-cpp/cmake/FetchThrift.cmake b/iotdb-client/client-cpp/cmake/FetchThrift.cmake index f26ad64..d69b2a4 100644 --- a/iotdb-client/client-cpp/cmake/FetchThrift.cmake +++ b/iotdb-client/client-cpp/cmake/FetchThrift.cmake
@@ -100,7 +100,7 @@ # binary / library can immediately drive code generation and linking. # --------------------------------------------------------------------------- set(_thrift_cmake_args - # CMake 4.x rejects Thrift 0.21's cmake_minimum_required(3.0); set policy first. + # CMake 4.x rejects Thrift's old cmake_minimum_required(3.x); set policy first. "-DCMAKE_POLICY_VERSION_MINIMUM=3.5" "-DCMAKE_INSTALL_PREFIX=${_thrift_install}" "-DCMAKE_BUILD_TYPE=${_thrift_build_config}" @@ -138,6 +138,15 @@ if(WITH_SSL) list(APPEND _thrift_cmake_args "-DWITH_OPENSSL=ON") + # Build Thrift's TSSLSocket against the same OpenSSL that iotdb_session links + # and bundles, so the runtime libraries match. find_package does not set + # OPENSSL_ROOT_DIR itself, so derive it from the resolved include dir. + if(OPENSSL_ROOT_DIR) + list(APPEND _thrift_cmake_args "-DOPENSSL_ROOT_DIR=${OPENSSL_ROOT_DIR}") + elseif(OPENSSL_INCLUDE_DIR) + get_filename_component(_thrift_ossl_root "${OPENSSL_INCLUDE_DIR}" DIRECTORY) + list(APPEND _thrift_cmake_args "-DOPENSSL_ROOT_DIR=${_thrift_ossl_root}") + endif() else() list(APPEND _thrift_cmake_args "-DWITH_OPENSSL=OFF") endif() @@ -152,7 +161,15 @@ else() set(_thrift_abi_stamp "-abidefault") endif() -set(_thrift_stamp "${_thrift_build}/.built-${THRIFT_VERSION}-${_thrift_build_config}-mdll${_thrift_abi_stamp}") +# Encode WITH_SSL in the stamp: toggling SSL changes WITH_OPENSSL, so a cached +# build of the opposite flavour must not be reused (otherwise TSSLSocket is +# missing/extra at link time). +if(WITH_SSL) + set(_thrift_ssl_stamp "-ssl") +else() + set(_thrift_ssl_stamp "-nossl") +endif() +set(_thrift_stamp "${_thrift_build}/.built-${THRIFT_VERSION}-${_thrift_build_config}-mdll${_thrift_abi_stamp}${_thrift_ssl_stamp}") if(NOT EXISTS "${_thrift_stamp}") file(MAKE_DIRECTORY "${_thrift_build}") message(STATUS "[Thrift] configuring ${_thrift_dirname}")
diff --git a/iotdb-client/client-cpp/cmake/InstallOpenSSLRuntime.cmake b/iotdb-client/client-cpp/cmake/InstallOpenSSLRuntime.cmake new file mode 100644 index 0000000..f3e181b --- /dev/null +++ b/iotdb-client/client-cpp/cmake/InstallOpenSSLRuntime.cmake
@@ -0,0 +1,121 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# ============================================================================= +# InstallOpenSSLRuntime.cmake (only used when WITH_SSL=ON) +# +# Bundles the OpenSSL shared libraries that iotdb_session links against into the +# package lib/ directory, so the published SDK is self-contained and runs on +# machines that do not have OpenSSL installed. +# +# Relies on a prior find_package(OpenSSL) having populated +# OPENSSL_SSL_LIBRARY / OPENSSL_CRYPTO_LIBRARY / OPENSSL_ROOT_DIR / +# OPENSSL_VERSION_MAJOR. +# +# When OpenSSL was linked statically (the from-source fallback uses no-shared), +# there is nothing to bundle: those objects are already inside libiotdb_session. +# ============================================================================= + +# Windows: find_package resolves the import .lib; the runtime DLLs live in +# <root>/bin. Collect them, filtering by major version so installs that ship +# several ABIs side by side (e.g. libssl-1_1-x64.dll + libssl-3-x64.dll) only +# bundle the one we actually linked. +function(_iotdb_collect_openssl_windows_dlls _out_var) + set(_roots "") + if(OPENSSL_ROOT_DIR) + list(APPEND _roots "${OPENSSL_ROOT_DIR}") + endif() + foreach(_implib IN LISTS OPENSSL_SSL_LIBRARY OPENSSL_CRYPTO_LIBRARY OPENSSL_LIBRARIES) + if(_implib AND EXISTS "${_implib}") + # Walk up from the import lib (.../lib, .../lib/VC/x64/MD, ...) to find + # a directory that owns a bin/ holding the DLLs. + get_filename_component(_dir "${_implib}" DIRECTORY) + list(APPEND _roots "${_dir}") + foreach(_up RANGE 1 4) + get_filename_component(_dir "${_dir}" DIRECTORY) + list(APPEND _roots "${_dir}") + endforeach() + endif() + endforeach() + list(REMOVE_DUPLICATES _roots) + + set(_dlls "") + set(_seen_names "") + foreach(_root IN LISTS _roots) + if(_root AND IS_DIRECTORY "${_root}") + file(GLOB _found + "${_root}/bin/libssl-${OPENSSL_VERSION_MAJOR}*.dll" + "${_root}/bin/libcrypto-${OPENSSL_VERSION_MAJOR}*.dll" + "${_root}/libssl-${OPENSSL_VERSION_MAJOR}*.dll" + "${_root}/libcrypto-${OPENSSL_VERSION_MAJOR}*.dll") + # The same DLL can appear under several candidate roots (e.g. bin/ and + # the install root); keep only the first occurrence of each filename. + foreach(_dll IN LISTS _found) + get_filename_component(_name "${_dll}" NAME) + if(NOT _name IN_LIST _seen_names) + list(APPEND _seen_names "${_name}") + list(APPEND _dlls "${_dll}") + endif() + endforeach() + endif() + endforeach() + set(${_out_var} "${_dlls}" PARENT_SCOPE) +endfunction() + +function(iotdb_install_openssl_runtime) + if(WIN32) + _iotdb_collect_openssl_windows_dlls(_dlls) + if(NOT _dlls) + message(STATUS + "[OpenSSL] no runtime DLLs found to bundle; ensure the OpenSSL " + "bin/ directory is on PATH when running the SDK") + return() + endif() + foreach(_dll IN LISTS _dlls) + message(STATUS "[OpenSSL] bundling runtime library into lib/: ${_dll}") + endforeach() + install(FILES ${_dlls} DESTINATION lib) + return() + endif() + + # Linux / macOS: OPENSSL_*_LIBRARY is the developer name (libssl.so / + # libssl.dylib), usually a symlink to the SONAME (libssl.so.3 / .1.1). + # FOLLOW_SYMLINK_CHAIN installs the whole chain with the symlinks preserved, + # so the loader finds the SONAME the binary records. Static archives (.a) + # are skipped: they are already linked into libiotdb_session. + set(_files_arg "") + set(_have_libs OFF) + foreach(_lib IN LISTS OPENSSL_SSL_LIBRARY OPENSSL_CRYPTO_LIBRARY) + if(_lib AND EXISTS "${_lib}" AND NOT _lib MATCHES "\\.a$") + string(APPEND _files_arg " \"${_lib}\"") + set(_have_libs ON) + message(STATUS "[OpenSSL] bundling runtime library into lib/: ${_lib}") + endif() + endforeach() + + if(NOT _have_libs) + message(STATUS + "[OpenSSL] no shared runtime libraries to bundle " + "(OpenSSL linked statically); SDK is self-contained") + return() + endif() + + install(CODE + "file(INSTALL DESTINATION \"\${CMAKE_INSTALL_PREFIX}/lib\" + TYPE SHARED_LIBRARY FOLLOW_SYMLINK_CHAIN + FILES ${_files_arg})") +endfunction()
diff --git a/iotdb-client/client-cpp/examples/CMakeLists.txt b/iotdb-client/client-cpp/examples/CMakeLists.txt index 0b77bce..4184199 100644 --- a/iotdb-client/client-cpp/examples/CMakeLists.txt +++ b/iotdb-client/client-cpp/examples/CMakeLists.txt
@@ -118,6 +118,21 @@ tree_example table_example) +# OpenSSL runtime libraries bundled in the SDK lib/ (libssl / libcrypto). When +# building against an unpacked package, copy them next to each example binary so +# the examples run without a system OpenSSL - libiotdb_session records them as +# NEEDED and resolves them via its $ORIGIN runtime path. +set(_iotdb_sdk_ssl_runtime "") +if(NOT _iotdb_examples_in_tree) + file(GLOB _iotdb_sdk_ssl_runtime + "${IOTDB_SDK_ROOT}/lib/libssl*.so*" + "${IOTDB_SDK_ROOT}/lib/libcrypto*.so*" + "${IOTDB_SDK_ROOT}/lib/libssl*.dylib" + "${IOTDB_SDK_ROOT}/lib/libcrypto*.dylib" + "${IOTDB_SDK_ROOT}/lib/libssl*.dll" + "${IOTDB_SDK_ROOT}/lib/libcrypto*.dll") +endif() + foreach(_t IN LISTS _example_targets) IF(WITH_SSL) TARGET_LINK_LIBRARIES(${_t} PRIVATE "${_iotdb_link_lib}" OpenSSL::SSL OpenSSL::Crypto) @@ -128,6 +143,13 @@ TARGET_LINK_LIBRARIES(${_t} PRIVATE pthread) ENDIF() + # The packaged libiotdb_session records the bundled OpenSSL libs as DT_NEEDED; + # point the linker at the SDK lib/ so it can resolve them without a system + # OpenSSL present. + if(UNIX AND NOT _iotdb_examples_in_tree) + target_link_directories(${_t} PRIVATE "${IOTDB_SDK_ROOT}/lib") + endif() + # Run from the build output directory without setting LD_LIBRARY_PATH / PATH. if(UNIX) set_target_properties(${_t} PROPERTIES @@ -145,6 +167,12 @@ COMMAND ${CMAKE_COMMAND} -E copy_if_different "${_iotdb_runtime}" $<TARGET_FILE_DIR:${_t}> COMMENT "Copy IoTDB runtime library next to ${_t}") + foreach(_ssl_lib IN LISTS _iotdb_sdk_ssl_runtime) + add_custom_command(TARGET ${_t} POST_BUILD + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "${_ssl_lib}" $<TARGET_FILE_DIR:${_t}> + COMMENT "Copy bundled OpenSSL runtime next to ${_t}") + endforeach() elseif(WIN32) message(WARNING "Missing ${_iotdb_runtime}; copy iotdb_session.dll manually before running ${_t}.") endif() @@ -166,6 +194,13 @@ COMMAND ${CMAKE_COMMAND} -E copy_if_different "${_iotdb_runtime}" "${_example_dist_dir}/") endif() +# Stage the bundled OpenSSL runtime too, so a copied dist/ runs on a machine +# without a system OpenSSL. +foreach(_ssl_lib IN LISTS _iotdb_sdk_ssl_runtime) + add_custom_command(TARGET example-dist POST_BUILD + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "${_ssl_lib}" "${_example_dist_dir}/") +endforeach() if(IOTDB_EXAMPLES_REGISTER_TESTS) set(_runnable_example_targets
diff --git a/iotdb-client/client-cpp/examples/README.md b/iotdb-client/client-cpp/examples/README.md index 295aa29..763ec69 100644 --- a/iotdb-client/client-cpp/examples/README.md +++ b/iotdb-client/client-cpp/examples/README.md
@@ -53,7 +53,7 @@ | macOS arm64 | `macos-aarch64` | | Windows (match your Visual Studio version) | `windows-x86_64-msvc14.1` ... `msvc14.4` | -The current build compiles Thrift 0.21 from source at CMake configure time. +The current build compiles Thrift 0.23 from source at CMake configure time. Legacy `-Diotdb-tools-thrift.version=...` flags applied to the **old** pre-built Thrift workflow only. Linux release packages are built in the `manylinux_2_28` container and require glibc 2.28 or newer. See
diff --git a/iotdb-client/client-cpp/examples/README_zh.md b/iotdb-client/client-cpp/examples/README_zh.md index 435b58f..4adc38a 100644 --- a/iotdb-client/client-cpp/examples/README_zh.md +++ b/iotdb-client/client-cpp/examples/README_zh.md
@@ -52,7 +52,7 @@ | macOS arm64 | `macos-aarch64` | | Windows + 与工程相同的 VS 版本 | `windows-x86_64-msvc14.1` ... `msvc14.4` | -当前 CMake 构建在配置阶段从源码编译 Thrift 0.21,**不再**通过 +当前 CMake 构建在配置阶段从源码编译 Thrift 0.23,**不再**通过 `-Diotdb-tools-thrift.version=0.14.1.1-gcc4-SNAPSHOT` 等旧参数控制 glibc; Linux 发版包在 `manylinux_2_28` 容器中构建,部署机需要 glibc 2.28 或更新版本。 详见 [client-cpp README](../../iotdb-client/client-cpp/README.md)。
diff --git a/iotdb-client/client-cpp/pom.xml b/iotdb-client/client-cpp/pom.xml index b5b97e6..04f7fa1 100644 --- a/iotdb-client/client-cpp/pom.xml +++ b/iotdb-client/client-cpp/pom.xml
@@ -49,7 +49,8 @@ <cmake.install.prefix>${project.build.directory}/install</cmake.install.prefix> <iotdb.deps.dir>${project.basedir}/third-party</iotdb.deps.dir> <iotdb.offline>OFF</iotdb.offline> - <with.ssl>OFF</with.ssl> + <with.ssl>ON</with.ssl> + <iotdb.openssl.from.source>OFF</iotdb.openssl.from.source> <iotdb.cxx11.abi/> <!-- Switched to OFF by the .skipTests profile below. --> <build.tests>ON</build.tests> @@ -112,6 +113,7 @@ <option>-DCMAKE_INSTALL_PREFIX=${cmake.install.prefix}</option> <option>-DBUILD_TESTING=${build.tests}</option> <option>-DWITH_SSL=${with.ssl}</option> + <option>-DIOTDB_OPENSSL_FROM_SOURCE=${iotdb.openssl.from.source}</option> <option>-DIOTDB_OFFLINE=${iotdb.offline}</option> <option>-DIOTDB_DEPS_DIR=${iotdb.deps.dir}</option> <option>-DIOTDB_USE_CXX11_ABI=${iotdb.cxx11.abi}</option>
diff --git a/iotdb-client/client-cpp/src/assembly/client-cpp.xml b/iotdb-client/client-cpp/src/assembly/client-cpp.xml index af7184f..3a6a631 100644 --- a/iotdb-client/client-cpp/src/assembly/client-cpp.xml +++ b/iotdb-client/client-cpp/src/assembly/client-cpp.xml
@@ -52,6 +52,8 @@ <directory>${project.build.directory}/package-metadata</directory> <includes> <include>third_party/DEPENDENCIES.md</include> + <include>third_party/NOTICE</include> + <include>third_party/licenses/**</include> </includes> <outputDirectory>${file.separator}</outputDirectory> </fileSet>
diff --git a/iotdb-client/client-cpp/src/assembly/package-metadata/third_party/DEPENDENCIES.md b/iotdb-client/client-cpp/src/assembly/package-metadata/third_party/DEPENDENCIES.md index e921c7e..e321c6f 100644 --- a/iotdb-client/client-cpp/src/assembly/package-metadata/third_party/DEPENDENCIES.md +++ b/iotdb-client/client-cpp/src/assembly/package-metadata/third_party/DEPENDENCIES.md
@@ -20,15 +20,28 @@ --> # Third-party Dependencies -The release library is built with the following third-party components. Some -components are linked into the produced IoTDB C++ session library; this file is -included for provenance. +## Redistributed in this package + +These components are statically linked into the `iotdb_session` library, or +bundled as shared libraries, and are therefore part of the binary distribution. +Their licenses are Category A (Apache-2.0 / Boost). Attribution is provided in +the [`NOTICE`](NOTICE) file in this directory; non-Apache license texts are under +[`licenses/`](licenses). Apache-2.0 components are covered by the top-level +`LICENSE` file. + +| Component | Version | How | License | +| --- | --- | --- | --- | +| Apache Thrift | 0.23.0 | statically linked | Apache License 2.0 | +| Boost | 1.60.0 on Linux/Windows, 1.84.0 on macOS by default | statically linked (header-only) | Boost Software License 1.0 | +| OpenSSL | 3.x: system OpenSSL 3.x when present, else 3.5.0 built from source (`WITH_SSL=ON`, default) | bundled shared libs in `lib/` | Apache License 2.0 | + +## Build-time only (not redistributed) + +These tools are used only to build Thrift / generate code; none of their code +is included in the distributed library. | Component | Version | License | | --- | --- | --- | -| Apache Thrift | 0.21.0 | Apache License 2.0 | -| Boost | 1.60.0 on Linux/Windows, 1.84.0 on macOS by default | Boost Software License 1.0 | -| OpenSSL | 3.5.0 when `WITH_SSL=ON` | Apache License 2.0 | | GNU m4 | 1.4.19 on Linux build bootstrap | GPL-3.0-or-later | | GNU flex | 2.6.4 on Linux build bootstrap | BSD-style flex license | | GNU bison | 3.8 on Linux build bootstrap | GPL-3.0-or-later |
diff --git a/iotdb-client/client-cpp/src/assembly/package-metadata/third_party/NOTICE b/iotdb-client/client-cpp/src/assembly/package-metadata/third_party/NOTICE new file mode 100644 index 0000000..4da431f --- /dev/null +++ b/iotdb-client/client-cpp/src/assembly/package-metadata/third_party/NOTICE
@@ -0,0 +1,34 @@ +Apache IoTDB C++ Session Client +Bundled / statically linked third-party components +================================================== + +In addition to the Apache IoTDB code (covered by the top-level LICENSE and +NOTICE files), this binary distribution statically links or bundles the +third-party components listed below. Components licensed under the Apache +License, Version 2.0 are covered by the top-level LICENSE file; other license +texts are reproduced under third_party/licenses/. + +------------------------------------------------------------------------------ +Apache Thrift (statically linked into the iotdb_session library) +Licensed under the Apache License, Version 2.0 (see the top-level LICENSE). + +Apache Thrift +Copyright (C) 2006 - 2019, The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +------------------------------------------------------------------------------ +OpenSSL (bundled shared libraries: libssl / libcrypto, present only when the +SDK is built with SSL support) +Copyright 1999-2025 The OpenSSL Project Authors. All Rights Reserved. +Licensed under the Apache License, Version 2.0 (see the top-level LICENSE). + +------------------------------------------------------------------------------ +Boost C++ Libraries (header-only; used at build time to compile Apache Thrift +and the iotdb_session library, so portions may be inlined into the shipped +binary) +Distributed under the Boost Software License, Version 1.0 +(see third_party/licenses/LICENSE-Boost-1.0). Here "Version 1.0" is the license +version -- the Boost Software License has only ever had this single version -- +and is unrelated to the Boost library release that was compiled in.
diff --git a/iotdb-client/client-cpp/src/assembly/package-metadata/third_party/licenses/LICENSE-Boost-1.0 b/iotdb-client/client-cpp/src/assembly/package-metadata/third_party/licenses/LICENSE-Boost-1.0 new file mode 100644 index 0000000..36b7cd9 --- /dev/null +++ b/iotdb-client/client-cpp/src/assembly/package-metadata/third_party/licenses/LICENSE-Boost-1.0
@@ -0,0 +1,23 @@ +Boost Software License - Version 1.0 - August 17th, 2003 + +Permission is hereby granted, free of charge, to any person or organization +obtaining a copy of the software and accompanying documentation covered by +this license (the "Software") to use, reproduce, display, distribute, +execute, and transmit the Software, and to prepare derivative works of the +Software, and to permit third-parties to whom the Software is furnished to +do so, all subject to the following: + +The copyright notices in the Software and this entire statement, including +the above license grant, this restriction and the following disclaimer, +must be included in all copies of the Software, in whole or in part, and +all derivative works of the Software, unless such copies or derivative +works are solely in the form of machine-executable object code generated by +a source language processor. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT +SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE +FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE, +ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE.
diff --git a/iotdb-client/client-cpp/third-party/README.md b/iotdb-client/client-cpp/third-party/README.md index 313a6fb..4cbdd1e 100644 --- a/iotdb-client/client-cpp/third-party/README.md +++ b/iotdb-client/client-cpp/third-party/README.md
@@ -68,8 +68,8 @@ | Platform | Typical files | |------------|---------------| -| `linux/` | `thrift-0.21.0.tar.gz`, `boost_1_60_0.tar.gz`, `m4-1.4.19.tar.gz`, `flex-2.6.4.tar.gz`, `bison-3.8.tar.gz` (+ `openssl-3.5.0.tar.gz` when `WITH_SSL=ON`) | -| `mac/` | `thrift-0.21.0.tar.gz`, `boost_1_60_0.tar.gz` (Xcode CLT usually provides m4/flex/bison) | -| `windows/` | `thrift-0.21.0.tar.gz`, `boost_1_60_0.tar.gz`, `win_flex_bison-2.5.25.zip` (or any `win_flex_bison*.zip`; skip if flex/bison already on `PATH`) | +| `linux/` | `thrift-0.23.0.tar.gz`, `boost_1_60_0.tar.gz`, `m4-1.4.19.tar.gz`, `flex-2.6.4.tar.gz`, `bison-3.8.tar.gz` (+ `openssl-3.5.0.tar.gz` only when `WITH_SSL=ON` and no system OpenSSL is present) | +| `mac/` | `thrift-0.23.0.tar.gz`, `boost_1_60_0.tar.gz` (Xcode CLT usually provides m4/flex/bison) | +| `windows/` | `thrift-0.23.0.tar.gz`, `boost_1_60_0.tar.gz`, `win_flex_bison-2.5.25.zip` (or any `win_flex_bison*.zip`; skip if flex/bison already on `PATH`) | Download URLs: see the *Offline build* table in [`README.md`](../README.md).
diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/CommonOperatorContext.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/CommonOperatorContext.java index 2c99fa5..0ffe447 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/CommonOperatorContext.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/CommonOperatorContext.java
@@ -103,6 +103,21 @@ this.totalExecutionTimeInNanos += executionTimeInNanos; } + public void recordScanAggregationFromRawDataCost(long costTimeInNanos) { + // calc-commons operators can run in tests or standalone contexts that are not backed by a + // DataNode FragmentInstanceContext. DataNode OperatorContext overrides this to forward costs. + } + + public void recordScanAggregationFromStatisticsCost(long costTimeInNanos) { + // calc-commons operators can run in tests or standalone contexts that are not backed by a + // DataNode FragmentInstanceContext. DataNode OperatorContext overrides this to forward costs. + } + + public void recordAggregationOperatorFromRawDataCost(long costTimeInNanos) { + // calc-commons operators can run in tests or standalone contexts that are not backed by a + // DataNode FragmentInstanceContext. DataNode OperatorContext overrides this to forward costs. + } + public void recordNextCalled() { this.nextCalledCount++; }
diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/AggregationOperator.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/AggregationOperator.java index e65ee82..047894f 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/AggregationOperator.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/AggregationOperator.java
@@ -92,9 +92,7 @@ return null; } - for (TableAggregator aggregator : aggregators) { - aggregator.processBlock(block); - } + processBlock(block); return null; } else { @@ -127,6 +125,17 @@ return operatorContext; } + private void processBlock(TsBlock block) { + long startTime = System.nanoTime(); + try { + for (TableAggregator aggregator : aggregators) { + aggregator.processBlock(block); + } + } finally { + operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() - startTime); + } + } + @Override public long calculateMaxPeekMemory() { return Math.max(
diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/TableAggregator.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/TableAggregator.java index 1df3f6f..c7a7b6b 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/TableAggregator.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/TableAggregator.java
@@ -19,7 +19,6 @@ package org.apache.iotdb.calc.execution.operator.source.relational.aggregation; -import org.apache.iotdb.calc.metric.QueryExecutionMetricSet; import org.apache.iotdb.calc.plan.planner.CommonOperatorUtils; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.AggregationNode; @@ -36,13 +35,9 @@ import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; -import static org.apache.iotdb.calc.metric.QueryExecutionMetricSet.AGGREGATION_FROM_RAW_DATA; public class TableAggregator { - public static final QueryExecutionMetricSet QUERY_EXECUTION_METRICS = - QueryExecutionMetricSet.getInstance(); - private final TableAccumulator accumulator; private final AggregationNode.Step step; private final TSDataType outputType; @@ -70,34 +65,28 @@ } public void processBlock(TsBlock block) { - long startTime = System.nanoTime(); - try { - Column[] arguments = block.getColumns(inputChannels); + Column[] arguments = block.getColumns(inputChannels); - // process count(*) - if (arguments.length == 0) { - arguments = - new Column[] { - new RunLengthEncodedColumn( - CommonOperatorUtils.TIME_COLUMN_TEMPLATE, block.getPositionCount()) - }; + // process count(*) + if (arguments.length == 0) { + arguments = + new Column[] { + new RunLengthEncodedColumn( + CommonOperatorUtils.TIME_COLUMN_TEMPLATE, block.getPositionCount()) + }; + } + + if (step.isInputRaw()) { + // Use select-all AggregationMask here because filter of Agg-Function is not supported now + AggregationMask mask = AggregationMask.createSelectAll(block.getPositionCount()); + + if (maskChannel.isPresent()) { + mask.applyMaskBlock(block.getColumn(maskChannel.getAsInt())); } - if (step.isInputRaw()) { - // Use select-all AggregationMask here because filter of Agg-Function is not supported now - AggregationMask mask = AggregationMask.createSelectAll(block.getPositionCount()); - - if (maskChannel.isPresent()) { - mask.applyMaskBlock(block.getColumn(maskChannel.getAsInt())); - } - - accumulator.addInput(arguments, mask); - } else { - accumulator.addIntermediate(arguments[0]); - } - } finally { - QUERY_EXECUTION_METRICS.recordExecutionCost( - AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime); + accumulator.addInput(arguments, mask); + } else { + accumulator.addIntermediate(arguments[0]); } }
diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/StreamingAggregationOperator.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/StreamingAggregationOperator.java index 7609ac3..9808a1b 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/StreamingAggregationOperator.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/StreamingAggregationOperator.java
@@ -176,8 +176,13 @@ private void addRowsToAggregators(TsBlock page, int startPosition, int endPosition) { TsBlock region = page.getRegion(startPosition, endPosition - startPosition + 1); - for (TableAggregator aggregator : aggregators) { - aggregator.processBlock(region); + long startTime = System.nanoTime(); + try { + for (TableAggregator aggregator : aggregators) { + aggregator.processBlock(region); + } + } finally { + operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() - startTime); } }
diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/builder/InMemoryHashAggregationBuilder.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/builder/InMemoryHashAggregationBuilder.java index f372717..07e4cb9 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/builder/InMemoryHashAggregationBuilder.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/builder/InMemoryHashAggregationBuilder.java
@@ -140,8 +140,13 @@ operatorContext.recordSpecifiedInfo(MAX_GROUP_NUMBER, Long.toString(groupCount)); maxGroupNumber = groupCount; } - for (GroupedAggregator groupedAggregator : groupedAggregators) { - groupedAggregator.processBlock(groupCount, groupByIdBlock, block); + long startTime = System.nanoTime(); + try { + for (GroupedAggregator groupedAggregator : groupedAggregators) { + groupedAggregator.processBlock(groupCount, groupByIdBlock, block); + } + } finally { + operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() - startTime); } }
diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/metric/QueryExecutionMetricSet.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/metric/QueryExecutionMetricSet.java index 636b11b..5452e54 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/metric/QueryExecutionMetricSet.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/metric/QueryExecutionMetricSet.java
@@ -143,8 +143,11 @@ // region query aggregation public static final String AGGREGATION_FROM_RAW_DATA = "aggregation_from_raw_data"; public static final String AGGREGATION_FROM_STATISTICS = "aggregation_from_statistics"; + public static final String AGGREGATION_OPERATOR_FROM_RAW_DATA = + "aggregation_operator_from_raw_data"; private Timer aggregationFromRawDataTimer = DoNothingMetricManager.DO_NOTHING_TIMER; private Timer aggregationFromStatisticsTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + private Timer aggregationOperatorFromRawDataTimer = DoNothingMetricManager.DO_NOTHING_TIMER; private void bindQueryAggregation(AbstractMetricService metricService) { aggregationFromRawDataTimer = @@ -156,12 +159,19 @@ MetricLevel.IMPORTANT, Tag.FROM.toString(), "statistics"); + aggregationOperatorFromRawDataTimer = + metricService.getOrCreateTimer( + Metric.AGGREGATION.toString(), + MetricLevel.IMPORTANT, + Tag.FROM.toString(), + "raw_data_operator"); } private void unbindQueryAggregation(AbstractMetricService metricService) { aggregationFromRawDataTimer = DoNothingMetricManager.DO_NOTHING_TIMER; aggregationFromStatisticsTimer = DoNothingMetricManager.DO_NOTHING_TIMER; - Arrays.asList("raw_data", "statistics") + aggregationOperatorFromRawDataTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + Arrays.asList("raw_data", "statistics", "raw_data_operator") .forEach( from -> metricService.remove( @@ -213,6 +223,9 @@ case AGGREGATION_FROM_STATISTICS: aggregationFromStatisticsTimer.update(costTimeInNanos, TimeUnit.NANOSECONDS); break; + case AGGREGATION_OPERATOR_FROM_RAW_DATA: + aggregationOperatorFromRawDataTimer.update(costTimeInNanos, TimeUnit.NANOSECONDS); + break; default: break; }
diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java index f834e41..d542999 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
@@ -21,6 +21,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.commons.disk.strategy.DirectoryStrategyType; import java.util.List; import java.util.Optional; @@ -35,6 +36,7 @@ private final RatisConfig ratisConfig; private final IoTConsensusConfig iotConsensusConfig; private final IoTConsensusV2Config iotConsensusV2Config; + private final DirectoryStrategyType directoryStrategyType; private ConsensusConfig( TEndPoint thisNode, @@ -44,7 +46,8 @@ TConsensusGroupType consensusGroupType, RatisConfig ratisConfig, IoTConsensusConfig iotConsensusConfig, - IoTConsensusV2Config iotConsensusV2Config) { + IoTConsensusV2Config iotConsensusV2Config, + DirectoryStrategyType directoryStrategyType) { this.thisNodeEndPoint = thisNode; this.thisNodeId = thisNodeId; this.storageDir = storageDir; @@ -53,6 +56,7 @@ this.ratisConfig = ratisConfig; this.iotConsensusConfig = iotConsensusConfig; this.iotConsensusV2Config = iotConsensusV2Config; + this.directoryStrategyType = directoryStrategyType; } public TEndPoint getThisNodeEndPoint() { @@ -87,6 +91,10 @@ return iotConsensusV2Config; } + public DirectoryStrategyType getDirectoryStrategyType() { + return directoryStrategyType; + } + public static ConsensusConfig.Builder newBuilder() { return new ConsensusConfig.Builder(); } @@ -101,6 +109,8 @@ private RatisConfig ratisConfig; private IoTConsensusConfig iotConsensusConfig; private IoTConsensusV2Config iotConsensusV2Config; + private DirectoryStrategyType directoryStrategyType = + DirectoryStrategyType.MIN_FOLDER_OCCUPIED_SPACE_FIRST_STRATEGY; public ConsensusConfig build() { return new ConsensusConfig( @@ -113,7 +123,8 @@ Optional.ofNullable(iotConsensusConfig) .orElseGet(() -> IoTConsensusConfig.newBuilder().build()), Optional.ofNullable(iotConsensusV2Config) - .orElseGet(() -> IoTConsensusV2Config.newBuilder().build())); + .orElseGet(() -> IoTConsensusV2Config.newBuilder().build()), + directoryStrategyType); } public Builder setThisNode(TEndPoint thisNode) { @@ -155,5 +166,10 @@ this.iotConsensusV2Config = iotConsensusV2Config; return this; } + + public Builder setDirectoryStrategyType(DirectoryStrategyType directoryStrategyType) { + this.directoryStrategyType = directoryStrategyType; + return this; + } } }
diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java index 7720cf5..c6ca9a7 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
@@ -325,6 +325,7 @@ private final long regionMigrationSpeedLimitBytesPerSecond; private final long subscriptionWalRetentionSizeInBytes; private final long subscriptionWalRetentionTimeMs; + private final long snapshotTransmissionProgressLogIntervalMs; private Replication( int maxLogEntriesNumPerBatch, @@ -342,7 +343,8 @@ double maxMemoryRatioForQueue, long regionMigrationSpeedLimitBytesPerSecond, long subscriptionWalRetentionSizeInBytes, - long subscriptionWalRetentionTimeMs) { + long subscriptionWalRetentionTimeMs, + long snapshotTransmissionProgressLogIntervalMs) { this.maxLogEntriesNumPerBatch = maxLogEntriesNumPerBatch; this.maxSizePerBatch = maxSizePerBatch; this.maxPendingBatchesNum = maxPendingBatchesNum; @@ -359,6 +361,7 @@ this.regionMigrationSpeedLimitBytesPerSecond = regionMigrationSpeedLimitBytesPerSecond; this.subscriptionWalRetentionSizeInBytes = subscriptionWalRetentionSizeInBytes; this.subscriptionWalRetentionTimeMs = subscriptionWalRetentionTimeMs; + this.snapshotTransmissionProgressLogIntervalMs = snapshotTransmissionProgressLogIntervalMs; } public int getMaxLogEntriesNumPerBatch() { @@ -425,6 +428,10 @@ return subscriptionWalRetentionTimeMs; } + public long getSnapshotTransmissionProgressLogIntervalMs() { + return snapshotTransmissionProgressLogIntervalMs; + } + public static Replication.Builder newBuilder() { return new Replication.Builder(); } @@ -450,6 +457,11 @@ private long regionMigrationSpeedLimitBytesPerSecond = 32 * 1024 * 1024L; private long subscriptionWalRetentionSizeInBytes = 0; private long subscriptionWalRetentionTimeMs = -1L; + // Throttle the per-file snapshot-transmission progress log to at most once per this interval; + // a snapshot may contain hundreds of thousands of files, so one INFO line per file is itself + // a + // heavy IO/string-building cost. A value <= 0 logs every file. + private long snapshotTransmissionProgressLogIntervalMs = 5000L; public Replication.Builder setMaxLogEntriesNumPerBatch(int maxLogEntriesNumPerBatch) { this.maxLogEntriesNumPerBatch = maxLogEntriesNumPerBatch; @@ -535,6 +547,12 @@ return this; } + public Builder setSnapshotTransmissionProgressLogIntervalMs( + long snapshotTransmissionProgressLogIntervalMs) { + this.snapshotTransmissionProgressLogIntervalMs = snapshotTransmissionProgressLogIntervalMs; + return this; + } + public Replication build() { return new Replication( maxLogEntriesNumPerBatch, @@ -552,7 +570,8 @@ maxMemoryRatioForQueue, regionMigrationSpeedLimitBytesPerSecond, subscriptionWalRetentionSizeInBytes, - subscriptionWalRetentionTimeMs); + subscriptionWalRetentionTimeMs, + snapshotTransmissionProgressLogIntervalMs); } } }
diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java index dde577e..946f6de 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.consensus.ConsensusGroupId; +import org.apache.iotdb.commons.disk.strategy.DirectoryStrategyType; import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.commons.exception.StartupException; import org.apache.iotdb.commons.request.IConsensusRequest; @@ -97,6 +98,7 @@ private final int thisNodeId; private final File storageDir; private final List<String> recvSnapshotDirs; + private final DirectoryStrategyType recvFolderStrategyType; private final IStateMachine.Registry registry; private final Map<ConsensusGroupId, IoTConsensusServerImpl> stateMachineMap = new ConcurrentHashMap<>(); @@ -127,6 +129,7 @@ this.thisNodeId = config.getThisNodeId(); this.storageDir = new File(config.getStorageDir()); this.recvSnapshotDirs = config.getRecvSnapshotDirs(); + this.recvFolderStrategyType = config.getDirectoryStrategyType(); this.config = config.getIotConsensusConfig(); this.registry = registry; this.service = new IoTConsensusRPCService(thisNode, config.getIotConsensusConfig()); @@ -195,6 +198,7 @@ new IoTConsensusServerImpl( path.toString(), recvSnapshotDirs, + recvFolderStrategyType, new Peer(consensusGroupId, thisNodeId, thisNode), new TreeSet<>(), registry.apply(consensusGroupId), @@ -309,6 +313,7 @@ new IoTConsensusServerImpl( path, recvSnapshotDirs, + recvFolderStrategyType, new Peer(groupId, thisNodeId, thisNode), new TreeSet<>(peers), registry.apply(groupId),
diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java index 5b818db..07d1f25 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -130,6 +130,18 @@ private final Condition stateMachineCondition = stateMachineLock.newCondition(); private final String storageDir; private FolderManager recvFolderManager = null; + + /** + * Per-snapshotId map of TsFile group key ({@code fileKey}) to the chosen receive folder. It keeps + * all companion files of one TsFile ({@code .tsfile}/{@code .tsfile.resource}/{@code + * .tsfile.mods2}/...) in the same receive folder, so the load phase can hard-link them inside a + * single data dir instead of falling back to a cross-disk copy. The {@code fileKey} rule matches + * {@code SnapshotLoader#createLinksFromSnapshotToSourceDir} so grouping is consistent end to end. + * Entries are removed once the snapshot is loaded or cleaned up. + */ + private final ConcurrentHashMap<String, ConcurrentHashMap<String, String>> + snapshotReceiveFolderMap = new ConcurrentHashMap<>(); + private final TreeSet<Peer> configuration; private final AtomicLong searchIndex; private final LogDispatcher logDispatcher; @@ -172,6 +184,7 @@ public IoTConsensusServerImpl( String storageDir, List<String> recvSnapshotDirs, + DirectoryStrategyType recvFolderStrategyType, Peer thisNode, TreeSet<Peer> configuration, IStateMachine stateMachine, @@ -191,9 +204,7 @@ snapshotDirs.add(storageDir); } - this.recvFolderManager = - new FolderManager( - snapshotDirs, DirectoryStrategyType.MIN_FOLDER_OCCUPIED_SPACE_FIRST_STRATEGY); + this.recvFolderManager = new FolderManager(snapshotDirs, recvFolderStrategyType); this.thisNode = thisNode; this.stateMachine = stateMachine; this.cacheQueueMap = new ConcurrentHashMap<>(); @@ -371,33 +382,42 @@ public void transmitSnapshot(Peer targetPeer) throws ConsensusGroupModifyPeerException { File snapshotDir = new File(storageDir, newSnapshotDirName); List<File> snapshotPaths = stateMachine.getSnapshotFiles(snapshotDir); - AtomicLong snapshotSizeSumAtomic = new AtomicLong(); - StringBuilder allFilesStr = new StringBuilder(); - snapshotPaths.forEach( - file -> { - long fileSize = file.length(); - snapshotSizeSumAtomic.addAndGet(fileSize); - allFilesStr - .append("\n") - .append(file.getName()) - .append(" ") - .append(humanReadableByteCountSI(fileSize)); - }); - final long snapshotSizeSum = snapshotSizeSumAtomic.get(); + long snapshotSizeSum = 0; + for (File file : snapshotPaths) { + snapshotSizeSum += file.length(); + } long transitedSnapshotSizeSum = 0; long transitedFilesNum = 0; long startTime = System.nanoTime(); + long lastProgressLogTime = startTime; + // Throttle the per-file progress log to at most once per this interval; a snapshot may contain + // hundreds of thousands of files, so one INFO line per file is itself a heavy cost. + long progressLogIntervalNs = + TimeUnit.MILLISECONDS.toNanos( + config.getReplication().getSnapshotTransmissionProgressLogIntervalMs()); logger.info( IoTConsensusMessages.SNAPSHOT_TRANSMISSION_START, snapshotPaths.size(), humanReadableByteCountSI(snapshotSizeSum), snapshotDir); - logger.info(IoTConsensusMessages.SNAPSHOT_TRANSMISSION_ALL_FILES, allFilesStr); + if (logger.isDebugEnabled()) { + StringBuilder allFilesStr = new StringBuilder(); + for (File file : snapshotPaths) { + allFilesStr + .append("\n") + .append(file.getName()) + .append(" ") + .append(humanReadableByteCountSI(file.length())); + } + logger.debug(IoTConsensusMessages.SNAPSHOT_TRANSMISSION_ALL_FILES, allFilesStr); + } + ByteBuffer fragmentBuffer = + ByteBuffer.allocate(SnapshotFragmentReader.DEFAULT_FILE_FRAGMENT_SIZE); try (SyncIoTConsensusServiceClient client = syncClientManager.borrowClient(targetPeer.getEndpoint())) { for (File file : snapshotPaths) { SnapshotFragmentReader reader = - new SnapshotFragmentReader(newSnapshotDirName, file.toPath()); + new SnapshotFragmentReader(newSnapshotDirName, file.toPath(), fragmentBuffer); try { while (reader.hasNext()) { // TODO: zero copy ? @@ -412,16 +432,20 @@ } transitedSnapshotSizeSum += reader.getTotalReadSize(); transitedFilesNum++; - logger.info( - IoTConsensusMessages.SNAPSHOT_TRANSMISSION_PROGRESS, - newSnapshotDirName, - transitedFilesNum, - snapshotPaths.size(), - humanReadableByteCountSI(transitedSnapshotSizeSum), - humanReadableByteCountSI(snapshotSizeSum), - CommonDateTimeUtils.convertMillisecondToDurationStr( - (System.nanoTime() - startTime) / 1_000_000), - file); + long now = System.nanoTime(); + if (now - lastProgressLogTime >= progressLogIntervalNs + || transitedFilesNum == snapshotPaths.size()) { + lastProgressLogTime = now; + logger.info( + IoTConsensusMessages.SNAPSHOT_TRANSMISSION_PROGRESS, + newSnapshotDirName, + transitedFilesNum, + snapshotPaths.size(), + humanReadableByteCountSI(transitedSnapshotSizeSum), + humanReadableByteCountSI(snapshotSizeSum), + CommonDateTimeUtils.convertMillisecondToDurationStr((now - startTime) / 1_000_000), + file); + } } finally { reader.close(); } @@ -448,11 +472,32 @@ return; } - recvFolderManager.getNextWithRetry( - folder -> { - writeSnapshotFragment(getSnapshotPath(folder, targetFilePath), fileChunk, fileOffset); - return null; - }); + // Place every companion file of the same TsFile into one receive folder. The fileKey rule + // (filename before the first '.') matches SnapshotLoader so the group stays together. The + // folder is selected at most once per fileKey via computeIfAbsent, which is safe under the + // concurrent IoTConsensusRPC-Processor receivers. + String fileKey = getSnapshotFileKey(targetFilePath); + ConcurrentHashMap<String, String> folderMap = + snapshotReceiveFolderMap.computeIfAbsent(snapshotId, k -> new ConcurrentHashMap<>()); + String folder; + try { + folder = + folderMap.computeIfAbsent( + fileKey, + k -> { + try { + return recvFolderManager.getNextFolder(); + } catch (DiskSpaceInsufficientException ex) { + throw new RuntimeException(ex); + } + }); + } catch (RuntimeException re) { + if (re.getCause() instanceof DiskSpaceInsufficientException) { + throw (DiskSpaceInsufficientException) re.getCause(); + } + throw re; + } + writeSnapshotFragment(getSnapshotPath(folder, targetFilePath), fileChunk, fileOffset); } catch (IOException e) { throw new ConsensusGroupModifyPeerException( String.format(IoTConsensusMessages.ERROR_RECEIVING_SNAPSHOT, snapshotId), e); @@ -493,6 +538,14 @@ return originalFilePath.substring(originalFilePath.indexOf(snapshotId)); } + /** + * Groups companion files of one TsFile. Uses the same rule as {@code + * SnapshotLoader#createLinksFromSnapshotToSourceDir}: the file name up to the first {@code '.'}. + */ + private String getSnapshotFileKey(String targetFilePath) { + return new File(targetFilePath).getName().split("\\.")[0]; + } + private void clearOldSnapshot() { File directory = new File(storageDir); File[] versionFiles = directory.listFiles((dir, name) -> name.startsWith(SNAPSHOT_DIR_NAME)); @@ -526,17 +579,22 @@ // Note: an empty region produces a snapshot with zero fragments, so none of the receive folders // contains it. That is a legitimate (no-op) load, not a failure, so an absent snapshot must not // be reported as failure here. - List<File> snapshotDirs = new ArrayList<>(); - for (String dir : recvFolderManager.getFolders()) { - File snapshotDir = getSnapshotPath(dir, snapshotId); - if (snapshotDir.exists()) { - snapshotDirs.add(snapshotDir); + try { + List<File> snapshotDirs = new ArrayList<>(); + for (String dir : recvFolderManager.getFolders()) { + File snapshotDir = getSnapshotPath(dir, snapshotId); + if (snapshotDir.exists()) { + snapshotDirs.add(snapshotDir); + } } + if (snapshotDirs.isEmpty()) { + return true; + } + return stateMachine.loadSnapshot(snapshotDirs); + } finally { + // Receiving is finished for this snapshot; drop its receive-folder mapping. + snapshotReceiveFolderMap.remove(snapshotId); } - if (snapshotDirs.isEmpty()) { - return true; - } - return stateMachine.loadSnapshot(snapshotDirs); } private File getSnapshotPath(String curStorageDir, String snapshotRelativePath) { @@ -1178,6 +1236,7 @@ } public void cleanupSnapshot(String snapshotId) throws ConsensusGroupModifyPeerException { + snapshotReceiveFolderMap.remove(snapshotId); List<String> allDirs = new ArrayList<>(Collections.singletonList(storageDir)); allDirs.addAll(recvFolderManager.getFolders()); for (String dir : allDirs) {
diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java index 3331829..f9c905e 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java
@@ -27,7 +27,7 @@ public class SnapshotFragmentReader { - private static final int DEFAULT_FILE_FRAGMENT_SIZE = 10 * 1024 * 1024; + public static final int DEFAULT_FILE_FRAGMENT_SIZE = 10 * 1024 * 1024; private final String snapshotId; private final String filePath; private final SeekableByteChannel fileChannel; @@ -36,12 +36,20 @@ private long totalReadSize; private SnapshotFragment cachedSnapshotFragment; - public SnapshotFragmentReader(String snapshotId, Path path) throws IOException { + /** + * The {@code buf} is supplied (and owned) by the caller so a single 10MB buffer can be reused + * across every file of a snapshot transmission. Allocating a fresh 10MB buffer per file is + * extremely wasteful when a snapshot contains hundreds of thousands of tiny files, multiplying GC + * pressure and allocation cost. The buffer is fully reset via {@link ByteBuffer#clear()} on each + * {@link #hasNext()} call, and each fragment is serialized synchronously before the next read, so + * sharing it across files (and across readers) is safe. + */ + public SnapshotFragmentReader(String snapshotId, Path path, ByteBuffer buf) throws IOException { this.snapshotId = snapshotId; this.filePath = path.toAbsolutePath().toString(); this.fileSize = Files.size(path); this.fileChannel = Files.newByteChannel(path); - this.buf = ByteBuffer.allocate(DEFAULT_FILE_FRAGMENT_SIZE); + this.buf = buf; } public boolean hasNext() throws IOException {
diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReaderTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReaderTest.java new file mode 100644 index 0000000..36048d4 --- /dev/null +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReaderTest.java
@@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.consensus.iot.snapshot; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Random; + +/** + * Regression test for the snapshot-transmission read path. The sending side reuses a single, + * caller-supplied {@link ByteBuffer} across every file of a snapshot (instead of allocating a fresh + * 10MB buffer per file), so this verifies that (1) a single buffer reused across many readers still + * reconstructs every file byte-for-byte, and (2) the buffer is genuinely shared rather than + * re-allocated per file. + */ +public class SnapshotFragmentReaderTest { + + @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Test + public void reusedBufferReadsEveryFileCorrectly() throws IOException { + final Random random = new Random(42); + // Deliberately use a tiny buffer so files span multiple fragments, and use file sizes that are + // not multiples of the buffer size to exercise the partial-final-fragment path. + final int bufferSize = 16; + final int[] fileSizes = {0, 1, bufferSize, bufferSize + 1, 5 * bufferSize + 7}; + + byte[][] contents = new byte[fileSizes.length][]; + Path[] paths = new Path[fileSizes.length]; + for (int i = 0; i < fileSizes.length; i++) { + contents[i] = new byte[fileSizes[i]]; + random.nextBytes(contents[i]); + paths[i] = temporaryFolder.newFile("file-" + i).toPath(); + Files.write(paths[i], contents[i]); + } + + final ByteBuffer sharedBuffer = ByteBuffer.allocate(bufferSize); + ByteBuffer firstReusedChunk = null; + + for (int i = 0; i < paths.length; i++) { + SnapshotFragmentReader reader = new SnapshotFragmentReader("snap", paths[i], sharedBuffer); + try { + ByteArrayOutputStream reconstructed = new ByteArrayOutputStream(); + while (reader.hasNext()) { + SnapshotFragment fragment = reader.next(); + // Every fragment must be backed by the one shared buffer, proving it is reused and not + // re-allocated per file. + Assert.assertSame(sharedBuffer, fragment.getFileChunk()); + if (firstReusedChunk == null) { + firstReusedChunk = fragment.getFileChunk(); + } else { + Assert.assertSame(firstReusedChunk, fragment.getFileChunk()); + } + + // Drain the fragment immediately, mirroring how the sender serializes each fragment + // synchronously before the next read overwrites the shared buffer. + ByteBuffer chunk = fragment.getFileChunk(); + byte[] bytes = new byte[chunk.remaining()]; + chunk.get(bytes); + reconstructed.write(bytes); + } + Assert.assertArrayEquals( + "File " + i + " was not reconstructed correctly", + contents[i], + reconstructed.toByteArray()); + Assert.assertEquals(contents[i].length, reader.getTotalReadSize()); + } finally { + reader.close(); + } + } + } +}
diff --git a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/StorageEngineMessages.java b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/StorageEngineMessages.java index cb90fac..cc708e7 100644 --- a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/StorageEngineMessages.java +++ b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/StorageEngineMessages.java
@@ -489,6 +489,8 @@ public static final String RELEASE_DATA_CACHE_MEMORY_BLOCK = "Release Data Cache Memory Block {}"; public static final String START_DATA_TYPE_CONVERSION_DOT = "Start data type conversion for LoadTsFileStatement: {}."; public static final String START_DATA_TYPE_CONVERSION = "Start data type conversion for LoadTsFileStatement: {}"; + public static final String INTERRUPTED_WAITING_TABLET_CONVERSION_SLOT = + "Interrupted while waiting for tablet conversion slot: "; public static final String FAIL_TO_LOAD_TSFILE_TO_ACTIVE_DIR = "Fail to load tsfile to Active dir"; public static final String FAIL_TO_LOAD_DISK_SPACE = "Fail to load disk space of file {}"; public static final String LOAD_ACTIVE_LISTENING_DIR_NOT_SET = "Load active listening dir is not set."; @@ -521,6 +523,8 @@ public static final String ERROR_EXECUTING_ACTIVE_LOAD_JOB = "Error occurred when executing active load periodical job."; public static final String ACTIVE_LOAD_EXECUTOR_STARTED = "Active load periodical jobs executor is started successfully."; public static final String ACTIVE_LOAD_EXECUTOR_STOPPED = "Active load periodical jobs executor is stopped successfully."; + public static final String ACTIVE_LOAD_TEMPORARILY_UNAVAILABLE = + "Rejecting auto load tsfile {} (isGeneratedByPipe = {}) due to temporary unavailability, will retry later. Status: {}"; public static final String ERROR_MOVING_FILE_TO_FAIL_DIR = "Error occurred during moving file {} to fail directory."; public static final String FAILED_COUNT_FILES_IN_FAIL_DIR = "Failed to count failed files in fail directory.";
diff --git a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/StorageEngineMessages.java b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/StorageEngineMessages.java index e488c40..792909a 100644 --- a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/StorageEngineMessages.java +++ b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/StorageEngineMessages.java
@@ -489,6 +489,8 @@ public static final String RELEASE_DATA_CACHE_MEMORY_BLOCK = "释放数据缓存内存块 {}"; public static final String START_DATA_TYPE_CONVERSION_DOT = "开始对 LoadTsFileStatement: {} 进行数据类型转换。"; public static final String START_DATA_TYPE_CONVERSION = "开始对 LoadTsFileStatement: {} 进行数据类型转换"; + public static final String INTERRUPTED_WAITING_TABLET_CONVERSION_SLOT = + "等待 tablet 转换槽位时被中断: "; public static final String FAIL_TO_LOAD_TSFILE_TO_ACTIVE_DIR = "加载 TsFile 到 Active 目录失败"; public static final String FAIL_TO_LOAD_DISK_SPACE = "获取文件 {} 的磁盘空间失败"; public static final String LOAD_ACTIVE_LISTENING_DIR_NOT_SET = "未设置加载 Active 监听目录。"; @@ -521,6 +523,8 @@ public static final String ERROR_EXECUTING_ACTIVE_LOAD_JOB = "执行 Active 加载定期任务时发生错误。"; public static final String ACTIVE_LOAD_EXECUTOR_STARTED = "Active 加载定期任务执行器已成功启动。"; public static final String ACTIVE_LOAD_EXECUTOR_STOPPED = "Active 加载定期任务执行器已成功停止。"; + public static final String ACTIVE_LOAD_TEMPORARILY_UNAVAILABLE = + "拒绝自动加载 TsFile {} (isGeneratedByPipe = {}),原因是系统暂时不可用,将稍后重试。状态: {}"; public static final String ERROR_MOVING_FILE_TO_FAIL_DIR = "将文件 {} 移动到失败目录时发生错误。"; public static final String FAILED_COUNT_FILES_IN_FAIL_DIR = "统计失败目录中的失败文件数量失败。";
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index c20122b..429efe5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -89,7 +89,9 @@ private static final Logger logger = LoggerFactory.getLogger(IoTDBConfig.class); private static final String MULTI_DIR_STRATEGY_PREFIX = "org.apache.iotdb.commons.disk.strategy."; private static final String[] CLUSTER_ALLOWED_MULTI_DIR_STRATEGIES = - new String[] {"SequenceStrategy", "MaxDiskUsableSpaceFirstStrategy"}; + new String[] { + "SequenceStrategy", "MaxDiskUsableSpaceFirstStrategy", "MinFolderOccupiedSpaceFirstStrategy" + }; private static final String DEFAULT_MULTI_DIR_STRATEGY = "SequenceStrategy"; private static final String STORAGE_GROUP_MATCHER = "([a-zA-Z0-9`_.\\-\\u2E80-\\u9FFF]+)"; @@ -1099,6 +1101,9 @@ private int maxPendingBatchesNum = 5; private double maxMemoryRatioForQueue = 0.6; private long regionMigrationSpeedLimitBytesPerSecond = 48 * 1024 * 1024L; + // Throttle the per-file snapshot-transmission progress log in IoTConsensus to at most once per + // this interval (ms). A value <= 0 logs every file. + private long dataRegionIotSnapshotTransmissionProgressLogIntervalMs = 5000L; // IoTConsensusV2 Config private int iotConsensusV2PipelineSize = 5; @@ -1273,6 +1278,16 @@ this.regionMigrationSpeedLimitBytesPerSecond = regionMigrationSpeedLimitBytesPerSecond; } + public long getDataRegionIotSnapshotTransmissionProgressLogIntervalMs() { + return dataRegionIotSnapshotTransmissionProgressLogIntervalMs; + } + + public void setDataRegionIotSnapshotTransmissionProgressLogIntervalMs( + long dataRegionIotSnapshotTransmissionProgressLogIntervalMs) { + this.dataRegionIotSnapshotTransmissionProgressLogIntervalMs = + dataRegionIotSnapshotTransmissionProgressLogIntervalMs; + } + public int getIotConsensusV2PipelineSize() { return iotConsensusV2PipelineSize; }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 59878f6..147c334 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -369,7 +369,8 @@ try { conf.checkMultiDirStrategyClassName(); } catch (Exception e) { - conf.setMultiDirStrategyClassName(oldMultiDirStrategyClassName.trim()); + conf.setMultiDirStrategyClassName( + oldMultiDirStrategyClassName == null ? null : oldMultiDirStrategyClassName.trim()); throw e; } @@ -1285,6 +1286,12 @@ "region_migration_speed_limit_bytes_per_second", ConfigurationFileUtils.getConfigurationDefaultValue( "region_migration_speed_limit_bytes_per_second")))); + conf.setDataRegionIotSnapshotTransmissionProgressLogIntervalMs( + Long.parseLong( + properties.getProperty( + "data_region_iot_snapshot_transmission_progress_log_interval_ms", + ConfigurationFileUtils.getConfigurationDefaultValue( + "data_region_iot_snapshot_transmission_progress_log_interval_ms")))); conf.setKeepSameDiskWhenLoadingSnapshot( Boolean.parseBoolean( properties.getProperty(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java index 080cd9c..a4c8e00 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.consensus.DataRegionId; +import org.apache.iotdb.commons.disk.strategy.DirectoryStrategyType; import org.apache.iotdb.commons.memory.IMemoryBlock; import org.apache.iotdb.commons.memory.MemoryBlockType; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; @@ -141,6 +142,9 @@ .setThisNode(new TEndPoint(CONF.getInternalAddress(), CONF.getDataRegionConsensusPort())) .setStorageDir(CONF.getDataRegionConsensusDir()) .setRecvSnapshotDirs(Arrays.asList(CONF.getLocalDataDirs())) + // IoTConsensus always balances received snapshot files by least occupied space, + // independent of the global dn_multi_dir_strategy. + .setDirectoryStrategyType(DirectoryStrategyType.MIN_FOLDER_OCCUPIED_SPACE_FIRST_STRATEGY) .setConsensusGroupType(TConsensusGroupType.DataRegion) .setIoTConsensusConfig( IoTConsensusConfig.newBuilder() @@ -174,6 +178,8 @@ COMMON_CONF.getSubscriptionConsensusWalRetentionSizeInBytes()) .setSubscriptionWalRetentionTimeMs( COMMON_CONF.getSubscriptionConsensusWalRetentionTimeMs()) + .setSnapshotTransmissionProgressLogIntervalMs( + CONF.getDataRegionIotSnapshotTransmissionProgressLogIntervalMs()) .build()) .build()) .setIoTConsensusV2Config(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 033264b..3509e6b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -109,6 +109,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_START_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_EXCLUSION_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_INCLUSION_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_EXCLUSION_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_INCLUSION_KEY; @@ -121,6 +122,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_START_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_EXCLUSION_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_INCLUSION_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_EXCLUSION_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_INCLUSION_KEY; @@ -913,6 +915,8 @@ || sourceParameters.hasAnyAttributes( EXTRACTOR_PATH_KEY, SOURCE_PATH_KEY, + EXTRACTOR_PATH_INCLUSION_KEY, + SOURCE_PATH_INCLUSION_KEY, EXTRACTOR_PATTERN_INCLUSION_KEY, SOURCE_PATTERN_INCLUSION_KEY, EXTRACTOR_PATH_EXCLUSION_KEY,
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java index 5c38c3a..4456550 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
@@ -58,6 +58,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY; @@ -71,11 +72,11 @@ private final Set<TEndPoint> endPointSet; - private static final Map<String, Integer> RECEIVER_ATTRIBUTES_REF_COUNT = - new ConcurrentHashMap<>(); + private static final Map<String, Integer> CLIENT_RESOURCE_REF_COUNT = new ConcurrentHashMap<>(); private final String receiverAttributes; + private final String clientResourceKey; - // receiverAttributes -> IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> + // clientResourceKey -> IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> private static final Map<String, IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>> ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER = new ConcurrentHashMap<>(); private static final Map<String, ExecutorService> TS_FILE_ASYNC_EXECUTOR_HOLDER = @@ -129,10 +130,11 @@ shouldMarkAsPipeRequest, isTSFileUsed, skipIfNoPrivileges); + clientResourceKey = generateClientResourceKey(receiverAttributes, endPoints); synchronized (IoTDBDataNodeAsyncClientManager.class) { - if (!ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.containsKey(receiverAttributes)) { + if (!ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.containsKey(clientResourceKey)) { ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.putIfAbsent( - receiverAttributes, + clientResourceKey, new IClientManager.Factory<TEndPoint, AsyncPipeDataTransferServiceClient>() .createClientManager( isTSFileUsed @@ -140,21 +142,21 @@ .AsyncPipeTsFileDataTransferServiceClientPoolFactory() : new ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory())); } - endPoint2Client = ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get(receiverAttributes); + endPoint2Client = ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get(clientResourceKey); if (isTSFileUsed) { - if (!TS_FILE_ASYNC_EXECUTOR_HOLDER.containsKey(receiverAttributes)) { + if (!TS_FILE_ASYNC_EXECUTOR_HOLDER.containsKey(clientResourceKey)) { TS_FILE_ASYNC_EXECUTOR_HOLDER.putIfAbsent( - receiverAttributes, + clientResourceKey, IoTDBThreadPoolFactory.newFixedThreadPool( PipeConfig.getInstance().getPipeRealTimeQueueMaxWaitingTsFileSize(), ThreadName.PIPE_TSFILE_ASYNC_SEND_POOL.getName() + "-" + id.getAndIncrement())); } - executor = TS_FILE_ASYNC_EXECUTOR_HOLDER.get(receiverAttributes); + executor = TS_FILE_ASYNC_EXECUTOR_HOLDER.get(clientResourceKey); } - RECEIVER_ATTRIBUTES_REF_COUNT.compute( - receiverAttributes, (attributes, refCount) -> refCount == null ? 1 : refCount + 1); + CLIENT_RESOURCE_REF_COUNT.compute( + clientResourceKey, (attributes, refCount) -> refCount == null ? 1 : refCount + 1); } switch (loadBalanceStrategy) { @@ -421,30 +423,30 @@ public void close() { isClosed = true; synchronized (IoTDBDataNodeAsyncClientManager.class) { - RECEIVER_ATTRIBUTES_REF_COUNT.computeIfPresent( - receiverAttributes, + CLIENT_RESOURCE_REF_COUNT.computeIfPresent( + clientResourceKey, (attributes, refCount) -> { if (refCount <= 1) { final IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> clientManager = - ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.remove(receiverAttributes); + ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.remove(clientResourceKey); if (clientManager != null) { try { clientManager.close(); LOGGER.info( DataNodePipeMessages .CLOSED_ASYNCPIPEDATATRANSFERSERVICECLIENTMANAGER_FOR_RECEIVER_ATTRIBUTES, - receiverAttributes); + clientResourceKey); } catch (final Exception e) { LOGGER.warn( DataNodePipeMessages .FAILED_TO_CLOSE_ASYNCPIPEDATATRANSFERSERVICECLIENTMANAGER_FOR_RECEIVER_ATTRIBUTE, - receiverAttributes, + clientResourceKey, e); } } final ExecutorService executor = - TS_FILE_ASYNC_EXECUTOR_HOLDER.remove(receiverAttributes); + TS_FILE_ASYNC_EXECUTOR_HOLDER.remove(clientResourceKey); if (executor != null) { try { executor.shutdown(); @@ -552,4 +554,16 @@ private void markHealthy(TEndPoint endPoint) { unhealthyEndPointMap.remove(endPoint); } + + private static String generateClientResourceKey( + final String receiverAttributes, final List<TEndPoint> endPoints) { + return String.format( + "%s-%s", + receiverAttributes, + endPoints.stream() + .map(endPoint -> String.format("%s:%s", endPoint.getIp(), endPoint.getPort())) + .distinct() + .sorted() + .collect(Collectors.joining(",", "[", "]"))); + } }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java index 813d55e..44fae46 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
@@ -85,6 +85,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_STRICT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODS_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_INCLUSION_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_FORMAT_KEY; @@ -119,6 +120,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_STRICT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODS_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_INCLUSION_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_FORMAT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_INCLUSION_KEY; @@ -197,6 +199,8 @@ SOURCE_PATH_KEY, EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY, + EXTRACTOR_PATH_INCLUSION_KEY, + SOURCE_PATH_INCLUSION_KEY, EXTRACTOR_PATTERN_INCLUSION_KEY, SOURCE_PATTERN_INCLUSION_KEY)) { throw new PipeException(DataNodePipeMessages.THE_PIPE_CANNOT_EXTRACT_TREE_MODEL_DATA);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TreeAggregator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TreeAggregator.java index 07b482b..2e8650f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TreeAggregator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TreeAggregator.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.queryengine.execution.aggregation; import org.apache.iotdb.calc.execution.aggregation.Accumulator; -import org.apache.iotdb.calc.metric.QueryExecutionMetricSet; import org.apache.iotdb.commons.queryengine.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep; @@ -35,8 +34,6 @@ import java.util.List; import static com.google.common.base.Preconditions.checkArgument; -import static org.apache.iotdb.calc.metric.QueryExecutionMetricSet.AGGREGATION_FROM_RAW_DATA; -import static org.apache.iotdb.calc.metric.QueryExecutionMetricSet.AGGREGATION_FROM_STATISTICS; public class TreeAggregator { @@ -44,8 +41,6 @@ // In some intermediate result input, inputLocation[] should include two columns protected List<InputLocation[]> inputLocationList; protected final AggregationStep step; - public static final QueryExecutionMetricSet QUERY_EXECUTION_METRICS = - QueryExecutionMetricSet.getInstance(); // Used for SeriesAggregateScanOperator public TreeAggregator(Accumulator accumulator, AggregationStep step) { @@ -65,57 +60,44 @@ // Used for SeriesAggregateScanOperator and RawDataAggregateOperator public void processTsBlock(TsBlock tsBlock, BitMap bitMap) { - long startTime = System.nanoTime(); - try { - checkArgument( - step.isInputRaw(), - "Step in SeriesAggregateScanOperator and RawDataAggregateOperator can only process raw input"); - for (InputLocation[] inputLocations : inputLocationList) { - Column[] timeAndValueColumn = new Column[1 + inputLocations.length]; - timeAndValueColumn[0] = tsBlock.getTimeColumn(); - for (int i = 0; i < inputLocations.length; i++) { - checkArgument( - inputLocations[i].getTsBlockIndex() == 0, - "RawDataAggregateOperator can only process one tsBlock input."); - int index = inputLocations[i].getValueColumnIndex(); - // for count_time, time column is also its value column - // for max_by, the input column can also be time column. - timeAndValueColumn[1 + i] = - index == -1 ? timeAndValueColumn[0] : tsBlock.getColumn(index); - } - accumulator.addInput(timeAndValueColumn, bitMap); + checkArgument( + step.isInputRaw(), + "Step in SeriesAggregateScanOperator and RawDataAggregateOperator can only process raw input"); + for (InputLocation[] inputLocations : inputLocationList) { + Column[] timeAndValueColumn = new Column[1 + inputLocations.length]; + timeAndValueColumn[0] = tsBlock.getTimeColumn(); + for (int i = 0; i < inputLocations.length; i++) { + checkArgument( + inputLocations[i].getTsBlockIndex() == 0, + "RawDataAggregateOperator can only process one tsBlock input."); + int index = inputLocations[i].getValueColumnIndex(); + // for count_time, time column is also its value column + // for max_by, the input column can also be time column. + timeAndValueColumn[1 + i] = index == -1 ? timeAndValueColumn[0] : tsBlock.getColumn(index); } - } finally { - QUERY_EXECUTION_METRICS.recordExecutionCost( - AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime); + accumulator.addInput(timeAndValueColumn, bitMap); } } // Used for AggregateOperator public void processTsBlocks(TsBlock[] tsBlock) { - long startTime = System.nanoTime(); - try { - checkArgument(!step.isInputRaw(), "Step in AggregateOperator cannot process raw input"); - if (step.isInputFinal()) { - checkArgument(inputLocationList.size() == 1, "Final output can only be single column"); - Column finalResult = - tsBlock[inputLocationList.get(0)[0].getTsBlockIndex()].getColumn( - inputLocationList.get(0)[0].getValueColumnIndex()); - accumulator.setFinal(finalResult); - } else { - for (InputLocation[] inputLocations : inputLocationList) { - Column[] columns = new Column[inputLocations.length]; - for (int i = 0; i < inputLocations.length; i++) { - columns[i] = - tsBlock[inputLocations[i].getTsBlockIndex()].getColumn( - inputLocations[i].getValueColumnIndex()); - } - accumulator.addIntermediate(columns); + checkArgument(!step.isInputRaw(), "Step in AggregateOperator cannot process raw input"); + if (step.isInputFinal()) { + checkArgument(inputLocationList.size() == 1, "Final output can only be single column"); + Column finalResult = + tsBlock[inputLocationList.get(0)[0].getTsBlockIndex()].getColumn( + inputLocationList.get(0)[0].getValueColumnIndex()); + accumulator.setFinal(finalResult); + } else { + for (InputLocation[] inputLocations : inputLocationList) { + Column[] columns = new Column[inputLocations.length]; + for (int i = 0; i < inputLocations.length; i++) { + columns[i] = + tsBlock[inputLocations[i].getTsBlockIndex()].getColumn( + inputLocations[i].getValueColumnIndex()); } + accumulator.addIntermediate(columns); } - } finally { - QUERY_EXECUTION_METRICS.recordExecutionCost( - AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime); } } @@ -129,16 +111,10 @@ /** Used for SeriesAggregateScanOperator. */ public void processStatistics(Statistics timeStatistics, Statistics[] valueStatistics) { - long startTime = System.nanoTime(); - try { - for (InputLocation[] inputLocations : inputLocationList) { - int valueIndex = inputLocations[0].getValueColumnIndex(); - // valueIndex == -1 means it is count_time, we need to use timeStatistics - accumulator.addStatistics(valueIndex == -1 ? timeStatistics : valueStatistics[valueIndex]); - } - } finally { - QUERY_EXECUTION_METRICS.recordExecutionCost( - AGGREGATION_FROM_STATISTICS, System.nanoTime() - startTime); + for (InputLocation[] inputLocations : inputLocationList) { + int valueIndex = inputLocations[0].getValueColumnIndex(); + // valueIndex == -1 means it is count_time, we need to use timeStatistics + accumulator.addStatistics(valueIndex == -1 ? timeStatistics : valueStatistics[valueIndex]); } }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index 47eab01..3ffa7e6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -21,6 +21,7 @@ import org.apache.iotdb.calc.exception.MemoryNotEnoughException; import org.apache.iotdb.calc.exception.QueryProcessException; +import org.apache.iotdb.calc.metric.QueryExecutionMetricSet; import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.audit.UserEntity; @@ -86,6 +87,9 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import static org.apache.iotdb.calc.metric.QueryExecutionMetricSet.AGGREGATION_FROM_RAW_DATA; +import static org.apache.iotdb.calc.metric.QueryExecutionMetricSet.AGGREGATION_FROM_STATISTICS; +import static org.apache.iotdb.calc.metric.QueryExecutionMetricSet.AGGREGATION_OPERATOR_FROM_RAW_DATA; import static org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils.getRootCause; import static org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet.BLOCK_QUEUED_TIME; import static org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet.READY_QUEUED_TIME; @@ -168,6 +172,9 @@ private int initQueryDataSourceRetryCount = 0; private final AtomicLong readyQueueTime = new AtomicLong(0); private final AtomicLong blockQueueTime = new AtomicLong(0); + private final AtomicLong scanAggregationFromRawDataCost = new AtomicLong(0); + private final AtomicLong scanAggregationFromStatisticsCost = new AtomicLong(0); + private final AtomicLong aggregationOperatorFromRawDataCost = new AtomicLong(0); private long unclosedSeqFileNum = 0; private long unclosedUnseqFileNum = 0; private long closedSeqFileNum = 0; @@ -1117,6 +1124,8 @@ QueryRelatedResourceMetricSet.getInstance().updateFragmentInstanceTime(durationTime); + recordAggregationCostToMetric(); + QueryResourceMetricSet.getInstance() .recordInitQueryResourceRetryCount(getInitQueryDataSourceRetryCount()); @@ -1264,6 +1273,49 @@ blockQueueTime.addAndGet(time); } + public void recordScanAggregationFromRawDataCost(long costTimeInNanos) { + addCost(scanAggregationFromRawDataCost, costTimeInNanos); + } + + public void recordScanAggregationFromStatisticsCost(long costTimeInNanos) { + addCost(scanAggregationFromStatisticsCost, costTimeInNanos); + } + + public void recordAggregationOperatorFromRawDataCost(long costTimeInNanos) { + addCost(aggregationOperatorFromRawDataCost, costTimeInNanos); + } + + private void addCost(AtomicLong cost, long costTimeInNanos) { + if (costTimeInNanos > 0) { + cost.addAndGet(costTimeInNanos); + } + } + + long drainScanAggregationFromRawDataCost() { + return scanAggregationFromRawDataCost.getAndSet(0); + } + + long drainScanAggregationFromStatisticsCost() { + return scanAggregationFromStatisticsCost.getAndSet(0); + } + + long drainAggregationOperatorFromRawDataCost() { + return aggregationOperatorFromRawDataCost.getAndSet(0); + } + + void recordAggregationCostToMetric() { + recordAggregationCost(AGGREGATION_FROM_RAW_DATA, drainScanAggregationFromRawDataCost()); + recordAggregationCost(AGGREGATION_FROM_STATISTICS, drainScanAggregationFromStatisticsCost()); + recordAggregationCost( + AGGREGATION_OPERATOR_FROM_RAW_DATA, drainAggregationOperatorFromRawDataCost()); + } + + private void recordAggregationCost(String stage, long costTimeInNanos) { + if (costTimeInNanos > 0) { + QueryExecutionMetricSet.getInstance().recordExecutionCost(stage, costTimeInNanos); + } + } + public long getReadyQueueTime() { return readyQueueTime.get(); }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java index d976155..584cb4f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
@@ -72,6 +72,33 @@ } @Override + public void recordScanAggregationFromRawDataCost(long costTimeInNanos) { + if (driverContext != null && driverContext.getFragmentInstanceContext() != null) { + driverContext + .getFragmentInstanceContext() + .recordScanAggregationFromRawDataCost(costTimeInNanos); + } + } + + @Override + public void recordScanAggregationFromStatisticsCost(long costTimeInNanos) { + if (driverContext != null && driverContext.getFragmentInstanceContext() != null) { + driverContext + .getFragmentInstanceContext() + .recordScanAggregationFromStatisticsCost(costTimeInNanos); + } + } + + @Override + public void recordAggregationOperatorFromRawDataCost(long costTimeInNanos) { + if (driverContext != null && driverContext.getFragmentInstanceContext() != null) { + driverContext + .getFragmentInstanceContext() + .recordAggregationOperatorFromRawDataCost(costTimeInNanos); + } + } + + @Override public MemoryReservationManager getMemoryReservationContext() { return getInstanceContext().getMemoryReservationContext(); }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationOperator.java index 51f6e1c..eec1c01 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationOperator.java
@@ -154,19 +154,24 @@ private void calculateNextAggregationResult() { // Consume current input tsBlocks - for (TreeAggregator aggregator : aggregators) { - aggregator.processTsBlocks(inputTsBlocks); - } - - for (int i = 0; i < inputOperatorsCount; i++) { - inputTsBlocks[i] = inputTsBlocks[i].skipFirst(); - if (inputTsBlocks[i].isEmpty()) { - inputTsBlocks[i] = null; + long startTime = System.nanoTime(); + try { + for (TreeAggregator aggregator : aggregators) { + aggregator.processTsBlocks(inputTsBlocks); } - } - // Update result using aggregators - updateResultTsBlock(); + for (int i = 0; i < inputOperatorsCount; i++) { + inputTsBlocks[i] = inputTsBlocks[i].skipFirst(); + if (inputTsBlocks[i].isEmpty()) { + inputTsBlocks[i] = null; + } + } + + // Update result using aggregators + updateResultTsBlock(); + } finally { + operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() - startTime); + } } private void updateResultTsBlock() {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/RawDataAggregationOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/RawDataAggregationOperator.java index 3018adf..385cc48 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/RawDataAggregationOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/RawDataAggregationOperator.java
@@ -150,78 +150,83 @@ @SuppressWarnings({"squid:S3776", "squid:S135"}) private boolean calculateFromRawData() { - // if window is not initialized, we should init window status and reset aggregators - if (!windowManager.isCurWindowInit() && !skipPreviousWindowAndInitCurWindow()) { - return false; - } + long startTime = System.nanoTime(); + try { + // if window is not initialized, we should init window status and reset aggregators + if (!windowManager.isCurWindowInit() && !skipPreviousWindowAndInitCurWindow()) { + return false; + } - // If current window has been initialized, we should judge whether inputTsBlock is empty - if (inputTsBlock == null || inputTsBlock.isEmpty()) { - return false; - } + // If current window has been initialized, we should judge whether inputTsBlock is empty + if (inputTsBlock == null || inputTsBlock.isEmpty()) { + return false; + } - if (windowManager.satisfiedCurWindow(inputTsBlock)) { + if (windowManager.satisfiedCurWindow(inputTsBlock)) { - // Get the indexes in tsBlock which needs to be processed by aggregator, and the last row - // needed to be processed. - int tsBlockSize = inputTsBlock.getPositionCount(); - IWindow curWindow = windowManager.getCurWindow(); + // Get the indexes in tsBlock which needs to be processed by aggregator, and the last row + // needed to be processed. + int tsBlockSize = inputTsBlock.getPositionCount(); + IWindow curWindow = windowManager.getCurWindow(); - Column[] controlAndTimeColumn = new Column[2]; - controlAndTimeColumn[0] = curWindow.getControlColumn(inputTsBlock); - controlAndTimeColumn[1] = inputTsBlock.getTimeColumn(); + Column[] controlAndTimeColumn = new Column[2]; + controlAndTimeColumn[0] = curWindow.getControlColumn(inputTsBlock); + controlAndTimeColumn[1] = inputTsBlock.getTimeColumn(); - BitMap needProcess = new BitMap(tsBlockSize); - int lastIndexToProcess = -1; - boolean hasSkip = false; + BitMap needProcess = new BitMap(tsBlockSize); + int lastIndexToProcess = -1; + boolean hasSkip = false; - for (int i = 0; i < tsBlockSize; i++) { - if (windowManager.isIgnoringNull() && controlAndTimeColumn[0].isNull(i)) { + for (int i = 0; i < tsBlockSize; i++) { + if (windowManager.isIgnoringNull() && controlAndTimeColumn[0].isNull(i)) { + lastIndexToProcess = i; + hasSkip = true; + continue; + } + if (!curWindow.satisfy(controlAndTimeColumn[0], i)) { + break; + } + needProcess.mark(i); + curWindow.mergeOnePoint(controlAndTimeColumn, i); lastIndexToProcess = i; - hasSkip = true; - continue; - } - if (!curWindow.satisfy(controlAndTimeColumn[0], i)) { - break; - } - needProcess.mark(i); - curWindow.mergeOnePoint(controlAndTimeColumn, i); - lastIndexToProcess = i; - } - - // if no row needs to skip, just send a null parameter. - if (!hasSkip) { - needProcess = null; - } - - TsBlock inputRegion = inputTsBlock.getRegion(0, lastIndexToProcess + 1); - for (TreeAggregator aggregator : aggregators) { - // Current agg method has been calculated - if (aggregator.hasFinalResult()) { - continue; } - aggregator.processTsBlock(inputRegion, needProcess); + // if no row needs to skip, just send a null parameter. + if (!hasSkip) { + needProcess = null; + } + + TsBlock inputRegion = inputTsBlock.getRegion(0, lastIndexToProcess + 1); + for (TreeAggregator aggregator : aggregators) { + // Current agg method has been calculated + if (aggregator.hasFinalResult()) { + continue; + } + + aggregator.processTsBlock(inputRegion, needProcess); + } + int lastReadRowIndex = lastIndexToProcess + 1; + // If lastReadRowIndex is not zero, some of tsBlock is consumed and result is cached in + // aggregators. + if (lastReadRowIndex != 0) { + hasCachedDataInAggregator = true; + } + if (lastReadRowIndex >= inputTsBlock.getPositionCount()) { + inputTsBlock = null; + // For the last index of TsBlock, if we can know the aggregation calculation is over + // we can directly updateResultTsBlock and return true + return isAllAggregatorsHasFinalResult(aggregators); + } else { + inputTsBlock = inputTsBlock.subTsBlock(lastReadRowIndex); + return true; + } } - int lastReadRowIndex = lastIndexToProcess + 1; - // If lastReadRowIndex is not zero, some of tsBlock is consumed and result is cached in - // aggregators. - if (lastReadRowIndex != 0) { - hasCachedDataInAggregator = true; - } - if (lastReadRowIndex >= inputTsBlock.getPositionCount()) { - inputTsBlock = null; - // For the last index of TsBlock, if we can know the aggregation calculation is over - // we can directly updateResultTsBlock and return true - return isAllAggregatorsHasFinalResult(aggregators); - } else { - inputTsBlock = inputTsBlock.subTsBlock(lastReadRowIndex); - return true; - } + + boolean isTsBlockOutOfBound = windowManager.isTsBlockOutOfBound(inputTsBlock); + return isAllAggregatorsHasFinalResult(aggregators) || isTsBlockOutOfBound; + } finally { + operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() - startTime); } - - boolean isTsBlockOutOfBound = windowManager.isTsBlockOutOfBound(inputTsBlock); - return isAllAggregatorsHasFinalResult(aggregators) || isTsBlockOutOfBound; } @Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TagAggregationOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TagAggregationOperator.java index adab0fb..afa5c4c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TagAggregationOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TagAggregationOperator.java
@@ -123,12 +123,17 @@ } private void aggregate(List<TreeAggregator> aggregators, TsBlock[] rowBlocks) { - for (TreeAggregator aggregator : aggregators) { - if (aggregator == null) { - continue; + long startTime = System.nanoTime(); + try { + for (TreeAggregator aggregator : aggregators) { + if (aggregator == null) { + continue; + } + aggregator.reset(); + aggregator.processTsBlocks(rowBlocks); } - aggregator.reset(); - aggregator.processTsBlocks(rowBlocks); + } finally { + operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() - startTime); } }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java index c63cd24..3f9db7b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java
@@ -245,18 +245,28 @@ } private boolean calcFromRawData(TsBlock tsBlock) { - Pair<Boolean, TsBlock> calcResult = - calculateAggregationFromRawData(tsBlock, aggregators, curTimeRange, ascending); - inputTsBlock = calcResult.getRight(); - return calcResult.getLeft(); + long startTime = System.nanoTime(); + try { + Pair<Boolean, TsBlock> calcResult = + calculateAggregationFromRawData(tsBlock, aggregators, curTimeRange, ascending); + inputTsBlock = calcResult.getRight(); + return calcResult.getLeft(); + } finally { + operatorContext.recordScanAggregationFromRawDataCost(System.nanoTime() - startTime); + } } protected void calcFromStatistics(Statistics timeStatistics, Statistics[] valueStatistics) { - for (TreeAggregator aggregator : aggregators) { - if (aggregator.hasFinalResult()) { - continue; + long startTime = System.nanoTime(); + try { + for (TreeAggregator aggregator : aggregators) { + if (aggregator.hasFinalResult()) { + continue; + } + aggregator.processStatistics(timeStatistics, valueStatistics); } - aggregator.processStatistics(timeStatistics, valueStatistics); + } finally { + operatorContext.recordScanAggregationFromStatisticsCost(System.nanoTime() - startTime); } }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java index 031c10d..8569616 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java
@@ -285,28 +285,33 @@ return new Pair<>(false, inputTsBlock); } - updateCurTimeRange(inputTsBlock.getStartTime()); + long startTime = System.nanoTime(); + try { + updateCurTimeRange(inputTsBlock.getStartTime()); - TimeRange curTimeRange = timeIterator.getCurTimeRange(); - // check if the tsBlock does not contain points in current interval - if (satisfiedTimeRange(inputTsBlock, curTimeRange, ascending)) { - // skip points that cannot be calculated - if ((ascending && inputTsBlock.getStartTime() < curTimeRange.getMin()) - || (!ascending && inputTsBlock.getStartTime() > curTimeRange.getMax())) { - inputTsBlock = skipPointsOutOfTimeRange(inputTsBlock, curTimeRange, ascending); + TimeRange curTimeRange = timeIterator.getCurTimeRange(); + // check if the tsBlock does not contain points in current interval + if (satisfiedTimeRange(inputTsBlock, curTimeRange, ascending)) { + // skip points that cannot be calculated + if ((ascending && inputTsBlock.getStartTime() < curTimeRange.getMin()) + || (!ascending && inputTsBlock.getStartTime() > curTimeRange.getMax())) { + inputTsBlock = skipPointsOutOfTimeRange(inputTsBlock, curTimeRange, ascending); + } + + inputTsBlock = process(inputTsBlock, curTimeRange); } - inputTsBlock = process(inputTsBlock, curTimeRange); + // judge whether the calculation finished + boolean isTsBlockOutOfBound = + inputTsBlock != null + && (ascending + ? inputTsBlock.getEndTime() > curTimeRange.getMax() + : inputTsBlock.getEndTime() < curTimeRange.getMin()); + return new Pair<>( + isAllAggregatorsHasFinalResult(tableAggregators) || isTsBlockOutOfBound, inputTsBlock); + } finally { + operatorContext.recordScanAggregationFromRawDataCost(System.nanoTime() - startTime); } - - // judge whether the calculation finished - boolean isTsBlockOutOfBound = - inputTsBlock != null - && (ascending - ? inputTsBlock.getEndTime() > curTimeRange.getMax() - : inputTsBlock.getEndTime() < curTimeRange.getMin()); - return new Pair<>( - isAllAggregatorsHasFinalResult(tableAggregators) || isTsBlockOutOfBound, inputTsBlock); } private TsBlock process(TsBlock inputTsBlock, TimeRange curTimeRange) { @@ -394,28 +399,32 @@ protected void calcFromStatistics(Statistics timeStatistics, Statistics[] valueStatistics) { int idx = -1; + long startTime = System.nanoTime(); + try { + for (TableAggregator aggregator : tableAggregators) { + if (aggregator.hasFinalResult()) { + idx += aggregator.getChannelCount(); + continue; + } - for (TableAggregator aggregator : tableAggregators) { - if (aggregator.hasFinalResult()) { - idx += aggregator.getChannelCount(); - continue; + Statistics[] statisticsArray = new Statistics[aggregator.getChannelCount()]; + for (int i = 0; i < aggregator.getChannelCount(); i++) { + idx++; + + TsTableColumnCategory columnSchemaCategory = + aggColumnSchemas.get(aggregatorInputChannels.get(idx)).getColumnCategory(); + statisticsArray[i] = + buildStatistics( + columnSchemaCategory, + timeStatistics, + valueStatistics, + aggregatorInputChannels.get(idx)); + } + + aggregator.processStatistics(statisticsArray); } - - Statistics[] statisticsArray = new Statistics[aggregator.getChannelCount()]; - for (int i = 0; i < aggregator.getChannelCount(); i++) { - idx++; - - TsTableColumnCategory columnSchemaCategory = - aggColumnSchemas.get(aggregatorInputChannels.get(idx)).getColumnCategory(); - statisticsArray[i] = - buildStatistics( - columnSchemaCategory, - timeStatistics, - valueStatistics, - aggregatorInputChannels.get(idx)); - } - - aggregator.processStatistics(statisticsArray); + } finally { + operatorContext.recordScanAggregationFromStatisticsCost(System.nanoTime() - startTime); } }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java index 17136cd..573d8fc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java
@@ -146,9 +146,16 @@ try { deleteAllFilesInDataDirs(); LOGGER.info(StorageEngineMessages.REMOVE_ALL_DATA_FILES_IN_ORIGINAL_DIR); + // IoTConsensus may spread the fragments of one snapshot across several receive folders. + // The fileTarget map must be shared across all of them so that a tsfile and its companion + // files (resource, exclusive mods, etc.) are relinked to the same data dir even when their + // fragments were received on different disks. + Map<String, String> fileTarget = new HashMap<>(); for (String path : snapshotPaths) { File snapshotDir = new File(path); - createLinksFromSnapshotDirToDataDirWithoutLog(snapshotDir); + // IoTConsensus fragments arrive under different recv folders; do not map each + // fragment back to the same disk as its recv path, rely on fileTarget instead. + createLinksFromSnapshotDirToDataDirWithoutLog(snapshotDir, fileTarget, false); loadCompressionRatio(snapshotDir); } return loadSnapshot(); @@ -170,7 +177,7 @@ } LOGGER.info(StorageEngineMessages.MOVING_SNAPSHOT_FILE_TO_DATA_DIRS); File snapshotDir = new File(snapshotPath); - createLinksFromSnapshotDirToDataDirWithoutLog(snapshotDir); + createLinksFromSnapshotDirToDataDirWithoutLog(snapshotDir, new HashMap<>(), true); loadCompressionRatio(snapshotDir); return loadSnapshot(); } catch (IOException | DiskSpaceInsufficientException e) { @@ -294,7 +301,8 @@ } } - private void createLinksFromSnapshotDirToDataDirWithoutLog(File sourceDir) + private void createLinksFromSnapshotDirToDataDirWithoutLog( + File sourceDir, Map<String, String> fileTarget, boolean preferKeepSameDiskWhenLoading) throws IOException, DiskSpaceInsufficientException { if (!sourceDir.exists()) { throw new IOException( @@ -340,7 +348,8 @@ + dataRegionId + File.separator + timePartitionFolder.getName(); - createLinksFromSnapshotToSourceDir(targetSuffix, files, folderManager); + createLinksFromSnapshotToSourceDir( + targetSuffix, files, folderManager, fileTarget, preferKeepSameDiskWhenLoading); } } @@ -359,7 +368,8 @@ + dataRegionId + File.separator + timePartitionFolder.getName(); - createLinksFromSnapshotToSourceDir(targetSuffix, files, folderManager); + createLinksFromSnapshotToSourceDir( + targetSuffix, files, folderManager, fileTarget, preferKeepSameDiskWhenLoading); } } } @@ -406,8 +416,12 @@ } private void createLinksFromSnapshotToSourceDir( - String targetSuffix, File[] files, FolderManager folderManager) throws IOException { - Map<String, String> fileTarget = new HashMap<>(); + String targetSuffix, + File[] files, + FolderManager folderManager, + Map<String, String> fileTarget, + boolean preferKeepSameDiskWhenLoading) + throws IOException { for (File file : files) { checkTsFileResourceExists(file); @@ -421,7 +435,8 @@ try { String firstFolderOfSameDisk = - IoTDBDescriptor.getInstance().getConfig().isKeepSameDiskWhenLoadingSnapshot() + preferKeepSameDiskWhenLoading + && IoTDBDescriptor.getInstance().getConfig().isKeepSameDiskWhenLoadingSnapshot() ? folderManager.getFirstFolderOfSameDisk(file.getAbsolutePath()) : null;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandler.java index 92a1d45..4879e4f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandler.java
@@ -19,7 +19,10 @@ package org.apache.iotdb.db.storageengine.load.active; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.db.i18n.StorageEngineMessages; +import org.apache.iotdb.rpc.TSStatusCode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,6 +99,20 @@ void handle(final ActiveLoadPendingQueue.ActiveLoadEntry entry); } + public static boolean isStatusShouldRetry( + final ActiveLoadPendingQueue.ActiveLoadEntry entry, final TSStatus status) { + if (status != null + && status.getCode() == TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) { + LOGGER.info( + StorageEngineMessages.ACTIVE_LOAD_TEMPORARILY_UNAVAILABLE, + entry.getFile(), + entry.isGeneratedByPipe(), + status); + return true; + } + return isExceptionMessageShouldRetry(entry, status == null ? null : status.getMessage()); + } + public static boolean isExceptionMessageShouldRetry( final ActiveLoadPendingQueue.ActiveLoadEntry entry, final String message) { if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java index 61e297a..3f06964 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -294,7 +294,7 @@ private void handleLoadFailure( final ActiveLoadPendingQueue.ActiveLoadEntry entry, final TSStatus status) { - if (!ActiveLoadFailedMessageHandler.isExceptionMessageShouldRetry(entry, status.getMessage())) { + if (!ActiveLoadFailedMessageHandler.isStatusShouldRetry(entry, status)) { LOGGER.warn( "Failed to auto load tsfile {} (isGeneratedByPipe = {}), status: {}. File will be moved to fail directory.", entry.getFile(),
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableConvertedInsertTabletStatementExceptionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableConvertedInsertTabletStatementExceptionVisitor.java index 28dfafc..ff538cb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableConvertedInsertTabletStatementExceptionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableConvertedInsertTabletStatementExceptionVisitor.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.commons.exception.SemanticException; import org.apache.iotdb.commons.exception.auth.AccessDeniedException; import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Node; -import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile; import org.apache.iotdb.rpc.TSStatusCode; @@ -32,6 +31,9 @@ implements AstVisitor<TSStatus, Exception> { @Override public TSStatus visitNode(final Node node, final Exception context) { + if (LoadTsFileDataTypeConverter.isMemoryPressureException(context)) { + return LoadTsFileDataTypeConverter.getMemoryPressureStatus(context); + } if (context instanceof AccessDeniedException) { return new TSStatus(TSStatusCode.NO_PERMISSION.getStatusCode()) .setMessage(context.getMessage()); @@ -42,9 +44,8 @@ @Override public TSStatus visitLoadTsFile(final LoadTsFile loadTsFile, final Exception context) { - if (context instanceof LoadRuntimeOutOfMemoryException) { - return new TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); + if (LoadTsFileDataTypeConverter.isMemoryPressureException(context)) { + return LoadTsFileDataTypeConverter.getMemoryPressureStatus(context); } else if (context instanceof SemanticException) { return new TSStatus(TSStatusCode.LOAD_USER_CONFLICT_EXCEPTION.getStatusCode()) .setMessage(context.getMessage());
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeConvertedInsertTabletStatementExceptionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeConvertedInsertTabletStatementExceptionVisitor.java index 9244551..ce465f8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeConvertedInsertTabletStatementExceptionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeConvertedInsertTabletStatementExceptionVisitor.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.exception.SemanticException; import org.apache.iotdb.commons.exception.auth.AccessDeniedException; -import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException; import org.apache.iotdb.db.queryengine.plan.statement.StatementNode; import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; @@ -33,6 +32,9 @@ @Override public TSStatus visitNode(final StatementNode node, final Exception context) { + if (LoadTsFileDataTypeConverter.isMemoryPressureException(context)) { + return LoadTsFileDataTypeConverter.getMemoryPressureStatus(context); + } if (context instanceof AccessDeniedException) { return new TSStatus(TSStatusCode.NO_PERMISSION.getStatusCode()) .setMessage(context.getMessage()); @@ -44,9 +46,8 @@ @Override public TSStatus visitLoadFile( final LoadTsFileStatement loadTsFileStatement, final Exception context) { - if (context instanceof LoadRuntimeOutOfMemoryException) { - return new TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); + if (LoadTsFileDataTypeConverter.isMemoryPressureException(context)) { + return LoadTsFileDataTypeConverter.getMemoryPressureStatus(context); } else if (context instanceof SemanticException) { return new TSStatus(TSStatusCode.LOAD_USER_CONFLICT_EXCEPTION.getStatusCode()) .setMessage(context.getMessage());
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java index 3aab4aa..b8dc0e5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
@@ -21,9 +21,14 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException; +import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.queryengine.common.SqlDialect; import org.apache.iotdb.db.auth.AuthorityChecker; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException; +import org.apache.iotdb.db.i18n.StorageEngineMessages; +import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.protocol.session.IClientSession; import org.apache.iotdb.db.protocol.session.InternalClientSession; import org.apache.iotdb.db.protocol.session.SessionManager; @@ -46,6 +51,7 @@ import java.time.ZoneId; import java.util.Optional; +import java.util.concurrent.Semaphore; public class LoadTsFileDataTypeConverter { @@ -53,6 +59,89 @@ private static final SessionManager SESSION_MANAGER = SessionManager.getInstance(); + private static Semaphore getTabletConversionSemaphore() { + return TabletConversionSemaphoreHolder.INSTANCE; + } + + private static int getTabletConversionPermitCount() { + final int configuredThreadCount = + Math.max( + 1, + IoTDBDescriptor.getInstance().getConfig().getLoadTsFileTabletConversionThreadCount()); + if (!PipeConfig.getInstance().getPipeMemoryManagementEnabled()) { + return configuredThreadCount; + } + final long memorySafePermitCount = + getAllowedPipeTabletMemorySizeInBytes() / estimatePipeTabletMemorySizePerConversion(); + return (int) Math.max(1, Math.min((long) configuredThreadCount, memorySafePermitCount)); + } + + private static long estimatePipeTabletMemorySizePerConversion() { + final PipeConfig pipeConfig = PipeConfig.getInstance(); + final long tabletSize = + Math.max( + 1L, IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes()); + final long maxReaderChunkSize = Math.max(0L, pipeConfig.getPipeMaxReaderChunkSize()); + final long tableSize = + Math.max( + 1L, + Math.min(tabletSize, IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize())); + + // Both conversion paths keep source/converted tablet data and the current reader chunk in + // memory. Table-model conversion may additionally keep source/converted table-sized batches, so + // use the larger estimate as the per-conversion working set. + final long treeParserMemorySize = 2L * tabletSize + maxReaderChunkSize; + final long tableParserMemorySize = 2L * tabletSize + 2L * tableSize + maxReaderChunkSize; + return Math.max(1L, Math.max(treeParserMemorySize, tableParserMemorySize)); + } + + private static long getAllowedPipeTabletMemorySizeInBytes() { + final PipeConfig pipeConfig = PipeConfig.getInstance(); + return (long) + ((pipeConfig.getPipeDataStructureTabletMemoryBlockAllocationRejectThreshold() + + pipeConfig.getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold() / 2) + * PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes()); + } + + private static Optional<TSStatus> getInterruptedConversionStatus(final InterruptedException e) { + Thread.currentThread().interrupt(); + return Optional.of( + new TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) + .setMessage( + StorageEngineMessages.INTERRUPTED_WAITING_TABLET_CONVERSION_SLOT + e.getMessage())); + } + + public static boolean isMemoryPressureException(final Throwable throwable) { + Throwable current = throwable; + while (current != null) { + if (current instanceof LoadRuntimeOutOfMemoryException + || current instanceof PipeRuntimeOutOfMemoryCriticalException) { + return true; + } + current = current.getCause(); + } + return false; + } + + public static TSStatus getMemoryPressureStatus(final Throwable throwable) { + Throwable current = throwable; + while (current != null) { + if (current instanceof LoadRuntimeOutOfMemoryException + || current instanceof PipeRuntimeOutOfMemoryCriticalException) { + return new TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) + .setMessage(current.getMessage()); + } + current = current.getCause(); + } + + return new TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) + .setMessage(throwable == null ? null : throwable.getMessage()); + } + + private static class TabletConversionSemaphoreHolder { + private static final Semaphore INSTANCE = new Semaphore(getTabletConversionPermitCount()); + } + public static final LoadConvertedInsertTabletStatementTSStatusVisitor STATEMENT_STATUS_VISITOR = new LoadConvertedInsertTabletStatementTSStatusVisitor(); public static final LoadTreeConvertedInsertTabletStatementExceptionVisitor @@ -83,16 +172,28 @@ } public Optional<TSStatus> convertForTableModel(final LoadTsFile loadTsFileTableStatement) { + boolean isPermitAcquired = false; try { + getTabletConversionSemaphore().acquire(); + isPermitAcquired = true; return loadTsFileTableStatement.accept( tableStatementDataTypeConvertExecutionVisitor, loadTsFileTableStatement.getDatabase()); + } catch (final InterruptedException e) { + return getInterruptedConversionStatus(e); } catch (Exception e) { + if (isMemoryPressureException(e)) { + return Optional.of(getMemoryPressureStatus(e)); + } LOGGER.warn( "Failed to convert data types for table model statement {}.", loadTsFileTableStatement, e); return Optional.of( new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage())); + } finally { + if (isPermitAcquired) { + getTabletConversionSemaphore().release(); + } } } @@ -136,14 +237,25 @@ public Optional<TSStatus> convertForTreeModel(final LoadTsFileStatement loadTsFileTreeStatement) { DataNodeSchemaLockManager.getInstance().releaseReadLock(context); + boolean isPermitAcquired = false; try { + getTabletConversionSemaphore().acquire(); + isPermitAcquired = true; return loadTsFileTreeStatement.accept(treeStatementDataTypeConvertExecutionVisitor, null); + } catch (final InterruptedException e) { + return getInterruptedConversionStatus(e); } catch (Exception e) { + if (isMemoryPressureException(e)) { + return Optional.of(getMemoryPressureStatus(e)); + } LOGGER.warn( "Failed to convert data types for tree model statement {}.", loadTsFileTreeStatement, e); return Optional.of( new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage())); } finally { + if (isPermitAcquired) { + getTabletConversionSemaphore().release(); + } DataNodeSchemaLockManager.getInstance() .takeReadLock(context, SchemaLockType.VALIDATE_VS_DELETION_TREE); }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java index 200be13..561da82 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java
@@ -22,9 +22,6 @@ import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.disk.FolderManager; import org.apache.iotdb.commons.disk.strategy.DirectoryStrategyType; -import org.apache.iotdb.commons.disk.strategy.MaxDiskUsableSpaceFirstStrategy; -import org.apache.iotdb.commons.disk.strategy.MinFolderOccupiedSpaceFirstStrategy; -import org.apache.iotdb.commons.disk.strategy.RandomOnDiskUsableSpaceStrategy; import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -98,19 +95,8 @@ } public synchronized void initFolders() { - try { - String strategyName = Class.forName(config.getMultiDirStrategyClassName()).getSimpleName(); - if (strategyName.equals(MaxDiskUsableSpaceFirstStrategy.class.getSimpleName())) { - directoryStrategyType = DirectoryStrategyType.MAX_DISK_USABLE_SPACE_FIRST_STRATEGY; - } else if (strategyName.equals(MinFolderOccupiedSpaceFirstStrategy.class.getSimpleName())) { - directoryStrategyType = DirectoryStrategyType.MIN_FOLDER_OCCUPIED_SPACE_FIRST_STRATEGY; - } else if (strategyName.equals(RandomOnDiskUsableSpaceStrategy.class.getSimpleName())) { - directoryStrategyType = DirectoryStrategyType.RANDOM_ON_DISK_USABLE_SPACE_STRATEGY; - } - } catch (Exception e) { - logger.error( - "Can't find strategy {} for mult-directories.", config.getMultiDirStrategyClassName(), e); - } + directoryStrategyType = + DirectoryStrategyType.fromClassName(config.getMultiDirStrategyClassName()); config.updatePath(); String[][] tierDirs = config.getTierDataDirs();
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/TreePatternPruningTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/TreePatternPruningTest.java index eaa7e17..ae8093f 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/TreePatternPruningTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/TreePatternPruningTest.java
@@ -122,7 +122,7 @@ } @Test - public void testLegacyPatternMultipleRulesRejected() { + public void testLegacyPatternMultipleRulesPreserved() { final PipeParameters params = new PipeParameters( new HashMap<String, String>() { @@ -131,12 +131,9 @@ } }); - try { - TreePattern.parsePipePatternFromSourceParameters(params); - Assert.fail("Should throw PipeException for legacy multi-pattern parameters"); - } catch (final PipeException ignored) { - // Expected exception - } + final TreePattern result = TreePattern.parsePipePatternFromSourceParameters(params); + + Assert.assertEquals("root.sg.A,root.sg.B", result.getPattern()); } @Test @@ -157,7 +154,7 @@ } @Test - public void testLegacyPathMultipleRulesRejected() { + public void testLegacyPathMultipleRulesPreserved() { final PipeParameters params = new PipeParameters( new HashMap<String, String>() { @@ -166,12 +163,28 @@ } }); - try { - TreePattern.parsePipePatternFromSourceParameters(params); - Assert.fail("Should throw PipeException for legacy multi-path parameters"); - } catch (final PipeException ignored) { - // Expected exception - } + final TreePattern result = TreePattern.parsePipePatternFromSourceParameters(params); + + Assert.assertTrue(result instanceof UnionIoTDBTreePattern); + Assert.assertEquals("root.sg.d1,root.sg.d2", result.getPattern()); + } + + @Test + public void testLegacyPathMultipleExclusionsPreserved() { + final PipeParameters params = + new PipeParameters( + new HashMap<String, String>() { + { + put(PipeSourceConstant.SOURCE_PATH_KEY, "root.sg.**"); + put(PipeSourceConstant.SOURCE_PATH_EXCLUSION_KEY, "root.sg.d1,root.sg.d2"); + } + }); + + final TreePattern result = TreePattern.parsePipePatternFromSourceParameters(params); + + Assert.assertTrue(result instanceof WithExclusionIoTDBTreePattern); + Assert.assertEquals( + "INCLUSION(root.sg.**), EXCLUSION(root.sg.d1,root.sg.d2)", result.getPattern()); } @Test
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/IoTDBDataNodeAsyncClientManagerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/IoTDBDataNodeAsyncClientManagerTest.java index fb13e43..f564dbb 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/IoTDBDataNodeAsyncClientManagerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/IoTDBDataNodeAsyncClientManagerTest.java
@@ -28,6 +28,7 @@ import java.lang.reflect.Field; import java.util.Collections; +import java.util.concurrent.ExecutorService; public class IoTDBDataNodeAsyncClientManagerTest { @@ -71,6 +72,48 @@ } } + @Test + public void testClientResourcesShouldDifferentiateEndPoints() throws Exception { + final IoTDBDataNodeAsyncClientManager firstManager = + new IoTDBDataNodeAsyncClientManager( + Collections.singletonList(new TEndPoint("127.0.0.1", 6667)), + false, + "round-robin", + new UserEntity(1L, "user", "cli-host"), + "password", + true, + "sync", + true, + true, + true, + true); + final IoTDBDataNodeAsyncClientManager secondManager = + new IoTDBDataNodeAsyncClientManager( + Collections.singletonList(new TEndPoint("127.0.0.2", 6667)), + false, + "round-robin", + new UserEntity(1L, "user", "cli-host"), + "password", + true, + "sync", + true, + true, + true, + true); + + try { + Assert.assertEquals( + getReceiverAttributes(firstManager), getReceiverAttributes(secondManager)); + Assert.assertNotEquals( + getClientResourceKey(firstManager), getClientResourceKey(secondManager)); + Assert.assertNotSame(getEndPoint2Client(firstManager), getEndPoint2Client(secondManager)); + Assert.assertNotSame(getExecutor(firstManager), getExecutor(secondManager)); + } finally { + firstManager.close(); + secondManager.close(); + } + } + private static String getReceiverAttributes(final IoTDBDataNodeAsyncClientManager manager) throws Exception { final Field field = @@ -79,10 +122,24 @@ return (String) field.get(manager); } + private static String getClientResourceKey(final IoTDBDataNodeAsyncClientManager manager) + throws Exception { + final Field field = IoTDBDataNodeAsyncClientManager.class.getDeclaredField("clientResourceKey"); + field.setAccessible(true); + return (String) field.get(manager); + } + private static Object getEndPoint2Client(final IoTDBDataNodeAsyncClientManager manager) throws Exception { final Field field = IoTDBDataNodeAsyncClientManager.class.getDeclaredField("endPoint2Client"); field.setAccessible(true); return field.get(manager); } + + private static ExecutorService getExecutor(final IoTDBDataNodeAsyncClientManager manager) + throws Exception { + final Field field = IoTDBDataNodeAsyncClientManager.class.getDeclaredField("executor"); + field.setAccessible(true); + return (ExecutorService) field.get(manager); + } }
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContextTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContextTest.java new file mode 100644 index 0000000..0e62133 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContextTest.java
@@ -0,0 +1,623 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.fragment; + +import org.apache.iotdb.calc.execution.aggregation.Accumulator; +import org.apache.iotdb.calc.execution.operator.CommonOperatorContext; +import org.apache.iotdb.calc.execution.operator.Operator; +import org.apache.iotdb.calc.execution.operator.source.relational.aggregation.TableAccumulator; +import org.apache.iotdb.calc.execution.operator.source.relational.aggregation.TableAggregator; +import org.apache.iotdb.calc.metric.QueryExecutionMetricSet; +import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager; +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.parameter.InputLocation; +import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.AggregationNode; +import org.apache.iotdb.commons.service.metric.enums.Metric; +import org.apache.iotdb.commons.service.metric.enums.Tag; +import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; +import org.apache.iotdb.db.queryengine.common.PlanFragmentId; +import org.apache.iotdb.db.queryengine.execution.aggregation.TreeAggregator; +import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITimeRangeIterator; +import org.apache.iotdb.db.queryengine.execution.driver.DriverContext; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.execution.operator.process.TagAggregationOperator; +import org.apache.iotdb.db.queryengine.execution.operator.source.AbstractSeriesAggregationScanOperator; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep; +import org.apache.iotdb.metrics.DoNothingMetricService; +import org.apache.iotdb.metrics.type.HistogramSnapshot; +import org.apache.iotdb.metrics.type.Timer; +import org.apache.iotdb.metrics.utils.MetricLevel; +import org.apache.iotdb.metrics.utils.MetricType; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.statistics.Statistics; +import org.apache.tsfile.read.common.TimeRange; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.junit.Test; + +import javax.management.ObjectName; + +import java.util.HashMap; +import java.util.Map; +import java.util.OptionalInt; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static java.util.Collections.singletonList; +import static org.apache.iotdb.db.queryengine.common.QueryId.MOCK_QUERY_ID; +import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class FragmentInstanceContextTest { + + @Test + public void testDrainAggregationCostsSeparatelyOnce() { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 0), "0"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + FragmentInstanceContext context = createFragmentInstanceContext(instanceId, stateMachine); + + context.recordScanAggregationFromRawDataCost(10); + context.recordScanAggregationFromRawDataCost(20); + context.recordScanAggregationFromStatisticsCost(30); + context.recordAggregationOperatorFromRawDataCost(40); + + assertEquals(30, context.drainScanAggregationFromRawDataCost()); + assertEquals(30, context.drainScanAggregationFromStatisticsCost()); + assertEquals(40, context.drainAggregationOperatorFromRawDataCost()); + + assertEquals(0, context.drainScanAggregationFromRawDataCost()); + assertEquals(0, context.drainScanAggregationFromStatisticsCost()); + assertEquals(0, context.drainAggregationOperatorFromRawDataCost()); + } finally { + instanceNotificationExecutor.shutdown(); + } + } + + @Test + public void testOperatorContextForwardsAggregationCostsToFragmentInstanceContext() { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + FragmentInstanceContext context = newFragmentInstanceContext(instanceNotificationExecutor); + DriverContext driverContext = new DriverContext(context, 0); + OperatorContext operatorContext = + driverContext.addOperatorContext(1, new PlanNodeId("forward"), "forward"); + + operatorContext.recordScanAggregationFromRawDataCost(11); + operatorContext.recordScanAggregationFromStatisticsCost(13); + operatorContext.recordAggregationOperatorFromRawDataCost(17); + + assertEquals(11, context.drainScanAggregationFromRawDataCost()); + assertEquals(13, context.drainScanAggregationFromStatisticsCost()); + assertEquals(17, context.drainAggregationOperatorFromRawDataCost()); + } finally { + instanceNotificationExecutor.shutdown(); + } + } + + @Test + public void testOperatorContextWithoutFragmentInstanceAndCommonContextUseNoOpFallback() { + OperatorContext operatorContext = + new OperatorContext(1, new PlanNodeId("no-op"), "no-op", new DriverContext()); + operatorContext.recordScanAggregationFromRawDataCost(11); + operatorContext.recordScanAggregationFromStatisticsCost(13); + operatorContext.recordAggregationOperatorFromRawDataCost(17); + + TestCommonOperatorContext commonOperatorContext = new TestCommonOperatorContext(); + commonOperatorContext.recordScanAggregationFromRawDataCost(19); + commonOperatorContext.recordScanAggregationFromStatisticsCost(23); + commonOperatorContext.recordAggregationOperatorFromRawDataCost(29); + + assertEquals(0, commonOperatorContext.getTotalExecutionTimeInNanos()); + assertNull(commonOperatorContext.getMemoryReservationContext()); + } + + @Test + public void testAggregationCostsFromOperatorsFlushToMetricStages() throws Exception { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + RecordingMetricService metricService = new RecordingMetricService(); + QueryExecutionMetricSet metricSet = QueryExecutionMetricSet.getInstance(); + metricSet.bindTo(metricService); + try { + FragmentInstanceContext context = newFragmentInstanceContext(instanceNotificationExecutor); + DriverContext driverContext = new DriverContext(context, 0); + OperatorContext scanOperatorContext = + driverContext.addOperatorContext(1, new PlanNodeId("scan"), "scan"); + TestSeriesAggregationScanOperator scanOperator = + new TestSeriesAggregationScanOperator(scanOperatorContext); + + scanOperator.calculateRaw(longTsBlock()); + scanOperator.calculateStatistics( + Statistics.getStatsByType(TSDataType.INT64), + new Statistics[] {Statistics.getStatsByType(TSDataType.INT64)}); + + OperatorContext aggregationOperatorContext = + driverContext.addOperatorContext(2, new PlanNodeId("aggregation"), "aggregation"); + org.apache.iotdb.calc.execution.operator.source.relational.aggregation.AggregationOperator + aggregationOperator = newAggregationOperator(aggregationOperatorContext); + aggregationOperator.next(); + + context.recordAggregationCostToMetric(); + + assertEquals(1, metricService.count("raw_data")); + assertTrue(metricService.sum("raw_data") > 0); + assertEquals(1, metricService.count("statistics")); + assertTrue(metricService.sum("statistics") > 0); + assertEquals(1, metricService.count("raw_data_operator")); + assertTrue(metricService.sum("raw_data_operator") > 0); + + context.recordAggregationCostToMetric(); + + assertEquals(1, metricService.count("raw_data")); + assertEquals(1, metricService.count("statistics")); + assertEquals(1, metricService.count("raw_data_operator")); + } finally { + metricSet.unbindFrom(metricService); + instanceNotificationExecutor.shutdown(); + } + } + + @Test + public void testAggregationOperatorRecordsTsBlockCostForIntermediateInput() throws Exception { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + FragmentInstanceContext context = newFragmentInstanceContext(instanceNotificationExecutor); + DriverContext driverContext = new DriverContext(context, 0); + OperatorContext operatorContext = + driverContext.addOperatorContext(1, new PlanNodeId("aggregation"), "aggregation"); + org.apache.iotdb.calc.execution.operator.source.relational.aggregation.AggregationOperator + aggregationOperator = newAggregationOperator(operatorContext); + + assertNull(aggregationOperator.next()); + + assertTrue(context.drainAggregationOperatorFromRawDataCost() > 0); + } finally { + instanceNotificationExecutor.shutdown(); + } + } + + @Test + public void testTreeAggregationOperatorRecordsTsBlockCost() throws Exception { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + FragmentInstanceContext context = newFragmentInstanceContext(instanceNotificationExecutor); + DriverContext driverContext = new DriverContext(context, 0); + OperatorContext operatorContext = + driverContext.addOperatorContext(1, new PlanNodeId("tree-aggregation"), "aggregation"); + org.apache.iotdb.db.queryengine.execution.operator.process.AggregationOperator + aggregationOperator = + new org.apache.iotdb.db.queryengine.execution.operator.process.AggregationOperator( + operatorContext, + singletonList(newTreeFinalAggregator()), + new SingleTimeRangeIterator(new TimeRange(0, 10)), + singletonList(new SingleTsBlockOperator(operatorContext, longTsBlock())), + false, + 1024); + + aggregationOperator.isBlocked().get(); + aggregationOperator.next(); + + assertTrue(context.drainAggregationOperatorFromRawDataCost() > 0); + } finally { + instanceNotificationExecutor.shutdown(); + } + } + + @Test + public void testTagAggregationOperatorRecordsTsBlockCost() throws Exception { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + FragmentInstanceContext context = newFragmentInstanceContext(instanceNotificationExecutor); + DriverContext driverContext = new DriverContext(context, 0); + OperatorContext operatorContext = + driverContext.addOperatorContext(1, new PlanNodeId("tag-aggregation"), "tag-aggregation"); + TagAggregationOperator tagAggregationOperator = + new TagAggregationOperator( + operatorContext, + singletonList(singletonList("tag")), + singletonList(singletonList(newTreeFinalAggregator())), + singletonList(new SingleTsBlockOperator(operatorContext, longTsBlock())), + 1024); + + tagAggregationOperator.isBlocked().get(); + tagAggregationOperator.next(); + + assertTrue(context.drainAggregationOperatorFromRawDataCost() > 0); + } finally { + instanceNotificationExecutor.shutdown(); + } + } + + private static FragmentInstanceContext newFragmentInstanceContext( + ExecutorService instanceNotificationExecutor) { + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 0), "0"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + return createFragmentInstanceContext(instanceId, stateMachine); + } + + private static TsBlock longTsBlock() { + TsBlockBuilder builder = new TsBlockBuilder(singletonList(TSDataType.INT64)); + builder.getTimeColumnBuilder().writeLong(0); + builder.getColumnBuilder(0).writeLong(1); + builder.declarePosition(); + return builder.build(); + } + + private static org.apache.iotdb.calc.execution.operator.source.relational.aggregation + .AggregationOperator + newAggregationOperator(OperatorContext operatorContext) { + TableAggregator aggregator = + new TableAggregator( + new TestTableAccumulator(), + AggregationNode.Step.INTERMEDIATE, + TSDataType.INT64, + singletonList(0), + OptionalInt.empty()); + return new org.apache.iotdb.calc.execution.operator.source.relational.aggregation + .AggregationOperator( + operatorContext, + new SingleTsBlockOperator(operatorContext, longTsBlock()), + singletonList(aggregator)); + } + + private static TreeAggregator newTreeFinalAggregator() { + return new TreeAggregator( + new TestTreeAccumulator(), + AggregationStep.FINAL, + singletonList(new InputLocation[] {new InputLocation(0, 0)})); + } + + private static class TestSeriesAggregationScanOperator + extends AbstractSeriesAggregationScanOperator { + + private TestSeriesAggregationScanOperator(OperatorContext operatorContext) { + super( + new PlanNodeId("scan"), + operatorContext, + null, + 1, + singletonList(new TreeAggregator(new TestTreeAccumulator(), AggregationStep.SINGLE)), + new SingleTimeRangeIterator(new TimeRange(0, 10)), + true, + false, + null, + 1024, + 0, + true); + this.curTimeRange = new TimeRange(0, 10); + } + + private void calculateRaw(TsBlock tsBlock) { + this.inputTsBlock = tsBlock; + calcFromCachedData(); + } + + private void calculateStatistics(Statistics timeStatistics, Statistics[] valueStatistics) { + calcFromStatistics(timeStatistics, valueStatistics); + } + + @Override + public long ramBytesUsed() { + return 0; + } + } + + private static class SingleTimeRangeIterator implements ITimeRangeIterator { + private final TimeRange timeRange; + private boolean consumed; + + private SingleTimeRangeIterator(TimeRange timeRange) { + this.timeRange = timeRange; + } + + @Override + public TimeRange getFirstTimeRange() { + return timeRange; + } + + @Override + public boolean hasNextTimeRange() { + return !consumed; + } + + @Override + public TimeRange nextTimeRange() { + consumed = true; + return timeRange; + } + + @Override + public boolean isAscending() { + return true; + } + + @Override + public long currentOutputTime() { + return timeRange.getMin(); + } + + @Override + public long getTotalIntervalNum() { + return 1; + } + } + + private static class RecordingMetricService extends DoNothingMetricService { + private final Map<String, RecordingTimer> timers = new HashMap<>(); + + @Override + public Timer getOrCreateTimer(String metric, MetricLevel metricLevel, String... tags) { + if (!Metric.AGGREGATION.toString().equals(metric) + || tags.length != 2 + || !Tag.FROM.toString().equals(tags[0])) { + return super.getOrCreateTimer(metric, metricLevel, tags); + } + return timers.computeIfAbsent(tags[1], key -> new RecordingTimer()); + } + + @Override + public void remove(MetricType type, String metric, String... tags) { + if (Metric.AGGREGATION.toString().equals(metric) + && tags.length == 2 + && Tag.FROM.toString().equals(tags[0])) { + timers.remove(tags[1]); + } + } + + long count(String from) { + RecordingTimer timer = timers.get(from); + return timer == null ? 0 : timer.getCount(); + } + + long sum(String from) { + RecordingTimer timer = timers.get(from); + return timer == null ? 0 : timer.sum.get(); + } + } + + private static class RecordingTimer implements Timer { + private final AtomicLong count = new AtomicLong(); + private final AtomicLong sum = new AtomicLong(); + + @Override + public void update(long duration, TimeUnit unit) { + count.incrementAndGet(); + sum.addAndGet(unit.toNanos(duration)); + } + + @Override + public HistogramSnapshot takeSnapshot() { + throw new UnsupportedOperationException(); + } + + @Override + public long getCount() { + return count.get(); + } + + @Override + public void setObjectName(ObjectName objectName) { + // no-op + } + } + + private static class SingleTsBlockOperator implements Operator { + private final CommonOperatorContext operatorContext; + private final TsBlock tsBlock; + private boolean consumed; + + private SingleTsBlockOperator(CommonOperatorContext operatorContext, TsBlock tsBlock) { + this.operatorContext = operatorContext; + this.tsBlock = tsBlock; + } + + @Override + public CommonOperatorContext getOperatorContext() { + return operatorContext; + } + + @Override + public TsBlock next() { + consumed = true; + return tsBlock; + } + + @Override + public boolean hasNext() { + return !consumed; + } + + @Override + public void close() { + // no-op + } + + @Override + public boolean isFinished() { + return consumed; + } + + @Override + public long calculateMaxPeekMemory() { + return 0; + } + + @Override + public long calculateMaxReturnSize() { + return 0; + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return 0; + } + + @Override + public long ramBytesUsed() { + return 0; + } + } + + private static class TestTableAccumulator implements TableAccumulator { + @Override + public long getEstimatedSize() { + return 0; + } + + @Override + public TableAccumulator copy() { + return new TestTableAccumulator(); + } + + @Override + public void addInput( + Column[] arguments, + org.apache.iotdb.calc.execution.operator.source.relational.aggregation.AggregationMask + mask) { + // no-op + } + + @Override + public void addIntermediate(Column argument) { + // no-op + } + + @Override + public void evaluateIntermediate(ColumnBuilder columnBuilder) { + columnBuilder.writeLong(0); + } + + @Override + public void evaluateFinal(ColumnBuilder columnBuilder) { + columnBuilder.writeLong(0); + } + + @Override + public boolean hasFinalResult() { + return false; + } + + @Override + public void addStatistics(Statistics[] statistics) { + // no-op + } + + @Override + public void reset() { + // no-op + } + } + + private static class TestTreeAccumulator implements Accumulator { + @Override + public void addInput(Column[] columns, org.apache.tsfile.utils.BitMap bitMap) { + // no-op + } + + @Override + public void addIntermediate(Column[] partialResult) { + // no-op + } + + @Override + public void addStatistics(Statistics statistics) { + // no-op + } + + @Override + public void setFinal(Column finalResult) { + // no-op + } + + @Override + public void outputIntermediate(ColumnBuilder[] tsBlockBuilder) { + tsBlockBuilder[0].writeLong(0); + } + + @Override + public void outputFinal(ColumnBuilder tsBlockBuilder) { + tsBlockBuilder.writeLong(0); + } + + @Override + public void reset() { + // no-op + } + + @Override + public boolean hasFinalResult() { + return false; + } + + @Override + public TSDataType[] getIntermediateType() { + return new TSDataType[] {TSDataType.INT64}; + } + + @Override + public TSDataType getFinalType() { + return TSDataType.INT64; + } + } + + private static class TestCommonOperatorContext extends CommonOperatorContext { + private TestCommonOperatorContext() { + super(0, new PlanNodeId("common"), "common"); + } + + @Override + public MemoryReservationManager getMemoryReservationContext() { + return null; + } + + @Override + public int getFragmentId() { + return 0; + } + + @Override + public int getPipelineId() { + return 0; + } + + @Override + public long ramBytesUsed() { + return 0; + } + } +}
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/MultilevelPriorityQueueTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/MultilevelPriorityQueueTest.java index 0ee6877..42bb75a 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/MultilevelPriorityQueueTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/MultilevelPriorityQueueTest.java
@@ -38,6 +38,9 @@ import java.util.ArrayList; import java.util.List; import java.util.OptionalInt; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; public class MultilevelPriorityQueueTest { @Test @@ -58,12 +61,14 @@ } }); t1.start(); - Thread.sleep(100); - Assert.assertEquals(Thread.State.WAITING, t1.getState()); + await() + .atMost(1, TimeUnit.MINUTES) + .untilAsserted(() -> Assert.assertEquals(Thread.State.WAITING, t1.getState())); DriverTask e2 = mockDriverTask(mockDriverTaskId(), false); queue.push(e2); - Thread.sleep(100); - Assert.assertEquals(Thread.State.TERMINATED, t1.getState()); + await() + .atMost(1, TimeUnit.MINUTES) + .untilAsserted(() -> Assert.assertEquals(Thread.State.TERMINATED, t1.getState())); Assert.assertEquals(1, res.size()); Assert.assertEquals(e2.getDriverTaskId().toString(), res.get(0).getDriverTaskId().toString()); } catch (Exception e) {
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java index d042d2a..2eedca1 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java
@@ -52,7 +52,9 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.apache.iotdb.consensus.iot.IoTConsensusServerImpl.SNAPSHOT_DIR_NAME; import static org.apache.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR; @@ -259,6 +261,51 @@ } /** + * When IoTConsensus spreads a tsfile and its exclusive mods across different receive folders, the + * loader must still relink them to the same data dir. Otherwise the mods file is not found next + * to the tsfile and deletion markers are silently ignored. + */ + @Test + public void testLoadSnapshotKeepsTsFileAndModsOnSameDataDirWhenFragmentsAreSpread() + throws IOException, WriteProcessException { + String[][] originDataDirs = IoTDBDescriptor.getInstance().getConfig().getTierDataDirs(); + IoTDBDescriptor.getInstance().getConfig().setTierDataDirs(testDataDirs); + TierManager.getInstance().resetFolders(); + String recvBase0 = "target" + File.separator + "recv-snapshot-mods-0"; + String recvBase1 = "target" + File.separator + "recv-snapshot-mods-1"; + File recvFolder0 = new File(recvBase0, SNAPSHOT_DIR_NAME); + File recvFolder1 = new File(recvBase1, SNAPSHOT_DIR_NAME); + try { + Assert.assertTrue(recvFolder0.mkdirs()); + Assert.assertTrue(recvFolder1.mkdirs()); + + writeSnapshotFragmentWithExclusiveModsSpread(recvFolder0.getAbsolutePath(), 0, recvFolder1); + + DataRegion dataRegion = + new SnapshotLoader( + Arrays.asList(recvFolder0.getAbsolutePath(), recvFolder1.getAbsolutePath()), + testSgName, + "0") + .loadSnapshotForStateMachine(); + + Assert.assertNotNull(dataRegion); + TsFileResource resource = dataRegion.getTsFileManager().getTsFileList(true).get(0); + File tsFile = resource.getTsFile(); + File modsFile = + org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile + .getExclusiveMods(tsFile); + Assert.assertTrue(modsFile.exists()); + Assert.assertEquals( + tsFile.getParentFile().getAbsolutePath(), modsFile.getParentFile().getAbsolutePath()); + } finally { + FileUtils.recursivelyDeleteFolder(recvBase0); + FileUtils.recursivelyDeleteFolder(recvBase1); + IoTDBDescriptor.getInstance().getConfig().setTierDataDirs(originDataDirs); + TierManager.getInstance().resetFolders(); + } + } + + /** * The fragments of one snapshot are disjoint across the receive folders, so the order in which * the folders are relinked must not change the loaded data. This loads the same spread snapshot * with the receive folders presented in the opposite order and expects the identical result. @@ -343,6 +390,41 @@ resource.serialize(); } + private void writeSnapshotFragmentWithExclusiveModsSpread( + String tsFileRecvSnapshotDir, int i, File modsRecvFolder) + throws IOException, WriteProcessException { + writeSnapshotFragment(tsFileRecvSnapshotDir, i); + String tsFileName = String.format("%d-%d-0-0.tsfile", i + 1, i + 1); + File tsFile = + new File( + tsFileRecvSnapshotDir + + File.separator + + "sequence" + + File.separator + + testSgName + + File.separator + + "0" + + File.separator + + "0" + + File.separator + + tsFileName); + File sourceMods = + org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile.getExclusiveMods( + tsFile); + Assert.assertTrue(sourceMods.exists() || sourceMods.createNewFile()); + + File targetModsDir = + new File( + modsRecvFolder, + "sequence" + File.separator + testSgName + File.separator + "0" + File.separator + "0"); + Assert.assertTrue(targetModsDir.exists() || targetModsDir.mkdirs()); + Files.copy( + sourceMods.toPath(), + new File(targetModsDir, sourceMods.getName()).toPath(), + java.nio.file.StandardCopyOption.REPLACE_EXISTING); + Files.delete(sourceMods.toPath()); + } + @Ignore("Need manual execution to specify different disks") @Test public void testLoadSnapshotNoHardLink() @@ -511,12 +593,24 @@ Method method = SnapshotLoader.class.getDeclaredMethod( - "createLinksFromSnapshotToSourceDir", String.class, File[].class, FolderManager.class); + "createLinksFromSnapshotToSourceDir", + String.class, + File[].class, + FolderManager.class, + Map.class, + boolean.class); method.setAccessible(true); SnapshotLoader loader = new SnapshotLoader("dummy", "root.testsg", "0"); - method.invoke(loader, targetSuffix, files, folderManager); + // Tracks fileKey -> chosen data dir, so files sharing a fileKey land in the same dir. + Map<String, String> fileTarget = new HashMap<>(); + method.invoke(loader, targetSuffix, files, folderManager, fileTarget, true); + + // The shared fileKey must be recorded exactly once, pointing at one of the data dirs. + String fileKey = tsFile.getName().split("\\.")[0]; + Assert.assertEquals(1, fileTarget.size()); + Assert.assertTrue(Arrays.asList(dataDirs).contains(fileTarget.get(fileKey))); // verify: only ONE dir contains all three files int hitDirCount = 0;
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandlerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandlerTest.java new file mode 100644 index 0000000..67c909c --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandlerTest.java
@@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.load.active; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.Assert; +import org.junit.Test; + +public class ActiveLoadFailedMessageHandlerTest { + + @Test + public void testTemporaryUnavailableStatusShouldRetryWithoutMessageMatch() { + final ActiveLoadPendingQueue.ActiveLoadEntry entry = + new ActiveLoadPendingQueue.ActiveLoadEntry("test.tsfile", "pending", true, false); + + Assert.assertTrue( + ActiveLoadFailedMessageHandler.isStatusShouldRetry( + entry, + new TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) + .setMessage("temporarily unavailable"))); + } + + @Test + public void testPermanentStatusShouldNotRetryWithoutMessageMatch() { + final ActiveLoadPendingQueue.ActiveLoadEntry entry = + new ActiveLoadPendingQueue.ActiveLoadEntry("test.tsfile", "pending", true, false); + + Assert.assertFalse( + ActiveLoadFailedMessageHandler.isStatusShouldRetry( + entry, new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage("bad"))); + } +}
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoaderTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoaderTest.java new file mode 100644 index 0000000..3208dc1 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoaderTest.java
@@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.load.active; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; +import org.apache.iotdb.db.storageengine.dataregion.modification.v1.ModificationFileV1; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.lang.reflect.Method; +import java.nio.file.Files; + +public class ActiveLoadTsFileLoaderTest { + + private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + private File tempDir; + private String originalFailDir; + private NodeStatus originalNodeStatus; + + @Before + public void setUp() throws Exception { + tempDir = Files.createTempDirectory("active-load-retry").toFile(); + originalFailDir = config.getLoadActiveListeningFailDir(); + originalNodeStatus = CommonDescriptor.getInstance().getConfig().getNodeStatus(); + CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.Running); + config.setLoadActiveListeningFailDir(new File(tempDir, "failed").getAbsolutePath()); + } + + @After + public void tearDown() { + config.setLoadActiveListeningFailDir(originalFailDir); + CommonDescriptor.getInstance().getConfig().setNodeStatus(originalNodeStatus); + deleteRecursively(tempDir); + } + + @Test + public void testTemporaryUnavailableStatusDoesNotMoveFileToFailDir() throws Exception { + final ActiveLoadTsFileLoader loader = new ActiveLoadTsFileLoader(); + final File tsFile = createTsFileWithCompanionFiles("retry.tsfile"); + final ActiveLoadPendingQueue.ActiveLoadEntry entry = + new ActiveLoadPendingQueue.ActiveLoadEntry( + tsFile.getAbsolutePath(), tempDir.getAbsolutePath(), false, false); + + invokeInitFailDirIfNecessary(loader); + invokeHandleLoadFailure( + loader, + entry, + new TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) + .setMessage("load conversion is temporarily unavailable")); + + Assert.assertTrue(tsFile.exists()); + Assert.assertTrue(new File(tsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).exists()); + Assert.assertTrue(new File(tsFile.getAbsolutePath() + ModificationFileV1.FILE_SUFFIX).exists()); + Assert.assertTrue(new File(tsFile.getAbsolutePath() + ModificationFile.FILE_SUFFIX).exists()); + Assert.assertFalse(new File(config.getLoadActiveListeningFailDir(), tsFile.getName()).exists()); + } + + @Test + public void testPermanentFailureStatusMovesFileToFailDir() throws Exception { + final ActiveLoadTsFileLoader loader = new ActiveLoadTsFileLoader(); + final File tsFile = createTsFileWithCompanionFiles("failed.tsfile"); + final ActiveLoadPendingQueue.ActiveLoadEntry entry = + new ActiveLoadPendingQueue.ActiveLoadEntry( + tsFile.getAbsolutePath(), tempDir.getAbsolutePath(), false, false); + + invokeInitFailDirIfNecessary(loader); + invokeHandleLoadFailure( + loader, + entry, + new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage("permanent error")); + + final File failDir = new File(config.getLoadActiveListeningFailDir()); + Assert.assertFalse(tsFile.exists()); + Assert.assertTrue(new File(failDir, tsFile.getName()).exists()); + Assert.assertTrue( + new File(failDir, tsFile.getName() + TsFileResource.RESOURCE_SUFFIX).exists()); + Assert.assertTrue( + new File(failDir, tsFile.getName() + ModificationFileV1.FILE_SUFFIX).exists()); + Assert.assertTrue(new File(failDir, tsFile.getName() + ModificationFile.FILE_SUFFIX).exists()); + } + + private File createTsFileWithCompanionFiles(final String fileName) throws Exception { + final File tsFile = new File(tempDir, fileName); + Assert.assertTrue(tsFile.createNewFile()); + Assert.assertTrue( + new File(tsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).createNewFile()); + Assert.assertTrue( + new File(tsFile.getAbsolutePath() + ModificationFileV1.FILE_SUFFIX).createNewFile()); + Assert.assertTrue( + new File(tsFile.getAbsolutePath() + ModificationFile.FILE_SUFFIX).createNewFile()); + return tsFile; + } + + private void invokeInitFailDirIfNecessary(final ActiveLoadTsFileLoader loader) throws Exception { + final Method method = ActiveLoadTsFileLoader.class.getDeclaredMethod("initFailDirIfNecessary"); + method.setAccessible(true); + method.invoke(loader); + } + + private void invokeHandleLoadFailure( + final ActiveLoadTsFileLoader loader, + final ActiveLoadPendingQueue.ActiveLoadEntry entry, + final TSStatus status) + throws Exception { + final Method method = + ActiveLoadTsFileLoader.class.getDeclaredMethod( + "handleLoadFailure", ActiveLoadPendingQueue.ActiveLoadEntry.class, TSStatus.class); + method.setAccessible(true); + method.invoke(loader, entry, status); + } + + private static void deleteRecursively(final File file) { + if (file == null || !file.exists()) { + return; + } + final File[] children = file.listFiles(); + if (children != null) { + for (final File child : children) { + deleteRecursively(child); + } + } + Assert.assertTrue(file.delete()); + } +}
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitorTest.java index 32b15a1..29766f5 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitorTest.java
@@ -21,6 +21,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser; @@ -209,6 +210,24 @@ pointCountByTimeseries.getOrDefault(ALIGNED_DEVICE + ".s12", 0) < ROW_COUNT_PER_DEVICE); } + @Test + public void testPipeOutOfMemoryIsTemporaryUnavailable() throws Exception { + tsFile = File.createTempFile("oom", ".tsfile"); + + final LoadTreeConvertedInsertTabletStatementExceptionVisitor visitor = + new LoadTreeConvertedInsertTabletStatementExceptionVisitor(); + final TSStatus status = + visitor.visitLoadFile( + LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()), + new IllegalStateException( + "wrapped memory pressure", + new PipeRuntimeOutOfMemoryCriticalException("pipe tablet memory is not enough"))); + + Assert.assertEquals( + TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode(), status.getCode()); + Assert.assertNotEquals(TSStatusCode.LOAD_FILE_ERROR.getStatusCode(), status.getCode()); + } + private void writeTsFile(final File file) throws Exception { if (file.exists()) { Assert.assertTrue(file.delete());
diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 1e808d7..facf07f 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -1723,6 +1723,13 @@ # Datatype: long region_migration_speed_limit_bytes_per_second = 50331648 +# The minimum interval (in ms) between two per-file progress logs while transmitting a snapshot in +# IoTConsensus. A snapshot may contain a huge number of files, so logging one line per file is +# costly; this throttles it to at most once per interval. A value <= 0 logs every file. +# effectiveMode: hot_reload +# Datatype: long +data_region_iot_snapshot_transmission_progress_log_interval_ms = 5000 + # When loading snapshot, try keeping TsFiles in the same disk as the snapshot dir. # This may reduce file copies but may also result in a worse disk load-balance # effectiveMode: hot_reload
diff --git a/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/UtilMessages.java b/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/UtilMessages.java index b13ad21..395754d 100644 --- a/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/UtilMessages.java +++ b/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/UtilMessages.java
@@ -119,6 +119,8 @@ "Disk space is insufficient, change system mode to read-only."; public static final String CANNOT_CALCULATE_OCCUPIED_SPACE = "Cannot calculate occupied space of folder {}"; + public static final String UNRECOGNIZED_MULTI_DIR_STRATEGY = + "Unrecognized multi-dir strategy '{}', falling back to {}."; // ======================== NodeUrlUtils ========================
diff --git a/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/UtilMessages.java b/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/UtilMessages.java index f4215f6..1a1b97e 100644 --- a/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/UtilMessages.java +++ b/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/UtilMessages.java
@@ -117,6 +117,8 @@ "磁盘空间不足,系统切换为只读模式。"; public static final String CANNOT_CALCULATE_OCCUPIED_SPACE = "无法计算文件夹 {} 的已占用空间"; + public static final String UNRECOGNIZED_MULTI_DIR_STRATEGY = + "无法识别的多目录策略 '{}',回退为 {}。"; // ======================== NodeUrlUtils ========================
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/DirectoryStrategyType.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/DirectoryStrategyType.java index 2d081dd..853edf8 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/DirectoryStrategyType.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/DirectoryStrategyType.java
@@ -18,9 +18,39 @@ */ package org.apache.iotdb.commons.disk.strategy; +import org.apache.iotdb.commons.i18n.UtilMessages; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public enum DirectoryStrategyType { SEQUENCE_STRATEGY, MAX_DISK_USABLE_SPACE_FIRST_STRATEGY, MIN_FOLDER_OCCUPIED_SPACE_FIRST_STRATEGY, - RANDOM_ON_DISK_USABLE_SPACE_STRATEGY, + RANDOM_ON_DISK_USABLE_SPACE_STRATEGY; + + private static final Logger LOGGER = LoggerFactory.getLogger(DirectoryStrategyType.class); + + /** + * Resolves the strategy type from a multi-dir strategy class name as configured by {@code + * dn_multi_dir_strategy}. Accepts either a simple class name (e.g. {@code SequenceStrategy}) or a + * fully-qualified one. Returns {@link #SEQUENCE_STRATEGY} for a null or unrecognized value, which + * matches the configured default. + */ + public static DirectoryStrategyType fromClassName(String className) { + if (className != null) { + String simpleName = className.substring(className.lastIndexOf('.') + 1); + if (simpleName.equals(MaxDiskUsableSpaceFirstStrategy.class.getSimpleName())) { + return MAX_DISK_USABLE_SPACE_FIRST_STRATEGY; + } else if (simpleName.equals(MinFolderOccupiedSpaceFirstStrategy.class.getSimpleName())) { + return MIN_FOLDER_OCCUPIED_SPACE_FIRST_STRATEGY; + } else if (simpleName.equals(RandomOnDiskUsableSpaceStrategy.class.getSimpleName())) { + return RANDOM_ON_DISK_USABLE_SPACE_STRATEGY; + } else if (simpleName.equals(SequenceStrategy.class.getSimpleName())) { + return SEQUENCE_STRATEGY; + } + LOGGER.warn(UtilMessages.UNRECOGNIZED_MULTI_DIR_STRATEGY, className, SEQUENCE_STRATEGY); + } + return SEQUENCE_STRATEGY; + } }
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/MinFolderOccupiedSpaceFirstStrategy.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/MinFolderOccupiedSpaceFirstStrategy.java index 2b8ffda..89d63a4 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/MinFolderOccupiedSpaceFirstStrategy.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/MinFolderOccupiedSpaceFirstStrategy.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.utils.TestOnly; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; @@ -126,7 +127,7 @@ try { cachedOccupiedSpace[i] = JVMCommonUtils.getOccupiedSpace(folder); resetCannotCalculateOccupiedSpaceLogTime(folder); - } catch (IOException e) { + } catch (IOException | UncheckedIOException e) { logCannotCalculateOccupiedSpaceIfNecessary(folder, e); cachedOccupiedSpace[i] = Long.MAX_VALUE; } @@ -135,7 +136,7 @@ lastRefreshTimeMs = System.currentTimeMillis(); } - private static void logCannotCalculateOccupiedSpaceIfNecessary(String folder, IOException e) { + private static void logCannotCalculateOccupiedSpaceIfNecessary(String folder, Exception e) { AtomicLong lastLogTime = CANNOT_CALCULATE_OCCUPIED_SPACE_LAST_LOG_TIME_MAP.computeIfAbsent( folder, key -> new AtomicLong(0L));
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java index b6e9381..e755ef3 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java
@@ -58,6 +58,8 @@ public static final String SOURCE_PATTERN_INCLUSION_KEY = "source.pattern.inclusion"; public static final String EXTRACTOR_PATH_KEY = "extractor.path"; public static final String SOURCE_PATH_KEY = "source.path"; + public static final String EXTRACTOR_PATH_INCLUSION_KEY = "extractor.path.inclusion"; + public static final String SOURCE_PATH_INCLUSION_KEY = "source.path.inclusion"; public static final String EXTRACTOR_PATTERN_FORMAT_KEY = "extractor.pattern.format"; public static final String SOURCE_PATTERN_FORMAT_KEY = "source.pattern.format"; public static final String EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE = "prefix";
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java index 48c4c64..2dfbc7b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
@@ -44,6 +44,7 @@ import java.util.stream.Collectors; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_EXCLUSION_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_INCLUSION_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_EXCLUSION_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE; @@ -52,6 +53,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_INCLUSION_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_EXCLUSION_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_INCLUSION_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_EXCLUSION_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_FORMAT_KEY; @@ -164,49 +166,9 @@ final boolean isTreeModelDataAllowedToBeCaptured = isTreeModelDataAllowToBeCaptured(sourceParameters); - final boolean hasPatternInclusionKey = - sourceParameters.hasAnyAttributes( - EXTRACTOR_PATTERN_INCLUSION_KEY, SOURCE_PATTERN_INCLUSION_KEY); - final boolean hasPatternExclusionKey = - sourceParameters.hasAnyAttributes( - EXTRACTOR_PATTERN_EXCLUSION_KEY, SOURCE_PATTERN_EXCLUSION_KEY); - final boolean hasLegacyPathKey = - sourceParameters.hasAnyAttributes(EXTRACTOR_PATH_KEY, SOURCE_PATH_KEY); - final boolean hasLegacyPatternKey = - sourceParameters.hasAnyAttributes(EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY); - final boolean usePatternSyntax = - hasPatternInclusionKey - || (hasPatternExclusionKey && !hasLegacyPathKey && !hasLegacyPatternKey); - - if (hasPatternInclusionKey && (hasLegacyPathKey || hasLegacyPatternKey)) { - final String msg = - String.format( - PipeMessages.PATTERN_INCLUSION_CANNOT_BE_USED_WITH_PATTERN_OR_PATH, - SOURCE_PATTERN_INCLUSION_KEY, - SOURCE_PATTERN_KEY, - SOURCE_PATH_KEY); - LOGGER.warn(msg); - throw new PipeException(msg); - } - // 1. Parse INCLUSION patterns into a list List<TreePattern> inclusionPatterns = - usePatternSyntax - ? parseIoTDBPatternList( - sourceParameters.getStringByKeys( - EXTRACTOR_PATTERN_INCLUSION_KEY, SOURCE_PATTERN_INCLUSION_KEY), - isTreeModelDataAllowedToBeCaptured, - true, - SOURCE_PATTERN_INCLUSION_KEY) - : parseLegacyPatternList( - sourceParameters, - isTreeModelDataAllowedToBeCaptured, - EXTRACTOR_PATH_KEY, - SOURCE_PATH_KEY, - EXTRACTOR_PATTERN_KEY, - SOURCE_PATTERN_KEY, - SOURCE_PATH_KEY, - SOURCE_PATTERN_KEY); + parseInclusionPatternList(sourceParameters, isTreeModelDataAllowedToBeCaptured); // If no inclusion patterns are specified, use default "root.**" if (inclusionPatterns.isEmpty()) { @@ -217,35 +179,8 @@ } // 2. Parse EXCLUSION patterns into a list - if (usePatternSyntax - && sourceParameters.hasAnyAttributes( - EXTRACTOR_PATH_EXCLUSION_KEY, SOURCE_PATH_EXCLUSION_KEY)) { - final String msg = - String.format( - PipeMessages.PATTERN_INCLUSION_CANNOT_BE_USED_WITH_PATH_EXCLUSION, - SOURCE_PATTERN_INCLUSION_KEY, - SOURCE_PATH_EXCLUSION_KEY); - LOGGER.warn(msg); - throw new PipeException(msg); - } - List<TreePattern> exclusionPatterns = - usePatternSyntax - ? parseIoTDBPatternList( - sourceParameters.getStringByKeys( - EXTRACTOR_PATTERN_EXCLUSION_KEY, SOURCE_PATTERN_EXCLUSION_KEY), - isTreeModelDataAllowedToBeCaptured, - true, - SOURCE_PATTERN_EXCLUSION_KEY) - : parseLegacyPatternList( - sourceParameters, - isTreeModelDataAllowedToBeCaptured, - EXTRACTOR_PATH_EXCLUSION_KEY, - SOURCE_PATH_EXCLUSION_KEY, - EXTRACTOR_PATTERN_EXCLUSION_KEY, - SOURCE_PATTERN_EXCLUSION_KEY, - SOURCE_PATH_EXCLUSION_KEY, - SOURCE_PATTERN_EXCLUSION_KEY); + parseExclusionPatternList(sourceParameters, isTreeModelDataAllowedToBeCaptured); // 3. Optimize the lists: remove redundant patterns (e.g., if "root.**" exists, "root.db" is // redundant) @@ -264,6 +199,8 @@ sourceParameters.getStringByKeys( EXTRACTOR_PATTERN_INCLUSION_KEY, SOURCE_PATTERN_INCLUSION_KEY, + EXTRACTOR_PATH_INCLUSION_KEY, + SOURCE_PATH_INCLUSION_KEY, EXTRACTOR_PATH_KEY, SOURCE_PATH_KEY, EXTRACTOR_PATTERN_KEY, @@ -382,49 +319,91 @@ parseMultiplePatterns(trimmedPattern, basePatternSupplier)); } - /** - * Helper method to parse legacy pattern parameters into a list of patterns without creating the - * Union object immediately. - */ - private static List<TreePattern> parseLegacyPatternList( - final PipeParameters sourceParameters, - final boolean isTreeModelDataAllowedToBeCaptured, - final String extractorPathKey, - final String sourcePathKey, - final String extractorPatternKey, - final String sourcePatternKey, - final String pathKeyName, - final String patternKeyName) { - - final String path = sourceParameters.getStringByKeys(extractorPathKey, sourcePathKey); - final String pattern = sourceParameters.getStringByKeys(extractorPatternKey, sourcePatternKey); - - if (path != null && pattern != null) { - final String msg = - String.format( - PipeMessages.PATH_AND_PATTERN_CANNOT_BE_USED_TOGETHER, pathKeyName, patternKeyName); - LOGGER.warn(msg); - throw new PipeException(msg); - } - + private static List<TreePattern> parseInclusionPatternList( + final PipeParameters sourceParameters, final boolean isTreeModelDataAllowedToBeCaptured) { final List<TreePattern> result = new ArrayList<>(); - if (path != null) { - result.addAll( - parseIoTDBPatternList(path, isTreeModelDataAllowedToBeCaptured, false, pathKeyName)); - } + addPatternsFromPatternParameterIfPresent( + result, + sourceParameters, + isTreeModelDataAllowedToBeCaptured, + EXTRACTOR_PATTERN_KEY, + SOURCE_PATTERN_KEY, + SOURCE_PATTERN_KEY); + addIoTDBPatternsIfPresent( + result, + sourceParameters, + isTreeModelDataAllowedToBeCaptured, + EXTRACTOR_PATTERN_INCLUSION_KEY, + SOURCE_PATTERN_INCLUSION_KEY, + SOURCE_PATTERN_INCLUSION_KEY); + addIoTDBPatternsIfPresent( + result, + sourceParameters, + isTreeModelDataAllowedToBeCaptured, + EXTRACTOR_PATH_KEY, + SOURCE_PATH_KEY, + SOURCE_PATH_KEY); + addIoTDBPatternsIfPresent( + result, + sourceParameters, + isTreeModelDataAllowedToBeCaptured, + EXTRACTOR_PATH_INCLUSION_KEY, + SOURCE_PATH_INCLUSION_KEY, + SOURCE_PATH_INCLUSION_KEY); + return result; + } + + private static List<TreePattern> parseExclusionPatternList( + final PipeParameters sourceParameters, final boolean isTreeModelDataAllowedToBeCaptured) { + final List<TreePattern> result = new ArrayList<>(); + + addIoTDBPatternsIfPresent( + result, + sourceParameters, + isTreeModelDataAllowedToBeCaptured, + EXTRACTOR_PATTERN_EXCLUSION_KEY, + SOURCE_PATTERN_EXCLUSION_KEY, + SOURCE_PATTERN_EXCLUSION_KEY); + addIoTDBPatternsIfPresent( + result, + sourceParameters, + isTreeModelDataAllowedToBeCaptured, + EXTRACTOR_PATH_EXCLUSION_KEY, + SOURCE_PATH_EXCLUSION_KEY, + SOURCE_PATH_EXCLUSION_KEY); + + return result; + } + + private static void addIoTDBPatternsIfPresent( + final List<TreePattern> result, + final PipeParameters sourceParameters, + final boolean isTreeModelDataAllowedToBeCaptured, + final String extractorKey, + final String sourceKey, + final String parameterKey) { + final String pattern = sourceParameters.getStringByKeys(extractorKey, sourceKey); + if (pattern != null) { + result.addAll( + parseIoTDBPatternList(pattern, isTreeModelDataAllowedToBeCaptured, true, parameterKey)); + } + } + + private static void addPatternsFromPatternParameterIfPresent( + final List<TreePattern> result, + final PipeParameters sourceParameters, + final boolean isTreeModelDataAllowedToBeCaptured, + final String extractorKey, + final String sourceKey, + final String parameterKey) { + final String pattern = sourceParameters.getStringByKeys(extractorKey, sourceKey); if (pattern != null) { result.addAll( parsePatternsFromPatternParameter( - pattern, - sourceParameters, - isTreeModelDataAllowedToBeCaptured, - false, - patternKeyName)); + pattern, sourceParameters, isTreeModelDataAllowedToBeCaptured, true, parameterKey)); } - - return result; } private static List<TreePattern> parseIoTDBPatternList(
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/JVMCommonUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/JVMCommonUtils.java index 64279e7..029d270 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/JVMCommonUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/JVMCommonUtils.java
@@ -136,8 +136,17 @@ public static long getOccupiedSpace(String folderPath) throws IOException { Path folder = Paths.get(folderPath); + if (!Files.exists(folder)) { + return 0; + } try (Stream<Path> s = Files.walk(folder)) { - return s.filter(p -> p.toFile().isFile()).mapToLong(p -> p.toFile().length()).sum(); + return s.filter(p -> p.toFile().isFile()) + .mapToLong( + p -> { + File file = p.toFile(); + return file.exists() ? file.length() : 0L; + }) + .sum(); } }
diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/disk/strategy/DirectoryStrategyTypeTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/disk/strategy/DirectoryStrategyTypeTest.java new file mode 100644 index 0000000..90e56cf --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/disk/strategy/DirectoryStrategyTypeTest.java
@@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.disk.strategy; + +import org.junit.Assert; +import org.junit.Test; + +public class DirectoryStrategyTypeTest { + + @Test + public void fromSimpleClassName() { + Assert.assertEquals( + DirectoryStrategyType.SEQUENCE_STRATEGY, + DirectoryStrategyType.fromClassName("SequenceStrategy")); + Assert.assertEquals( + DirectoryStrategyType.MAX_DISK_USABLE_SPACE_FIRST_STRATEGY, + DirectoryStrategyType.fromClassName("MaxDiskUsableSpaceFirstStrategy")); + Assert.assertEquals( + DirectoryStrategyType.MIN_FOLDER_OCCUPIED_SPACE_FIRST_STRATEGY, + DirectoryStrategyType.fromClassName("MinFolderOccupiedSpaceFirstStrategy")); + Assert.assertEquals( + DirectoryStrategyType.RANDOM_ON_DISK_USABLE_SPACE_STRATEGY, + DirectoryStrategyType.fromClassName("RandomOnDiskUsableSpaceStrategy")); + } + + @Test + public void fromFullyQualifiedClassName() { + Assert.assertEquals( + DirectoryStrategyType.SEQUENCE_STRATEGY, + DirectoryStrategyType.fromClassName(SequenceStrategy.class.getName())); + Assert.assertEquals( + DirectoryStrategyType.MIN_FOLDER_OCCUPIED_SPACE_FIRST_STRATEGY, + DirectoryStrategyType.fromClassName(MinFolderOccupiedSpaceFirstStrategy.class.getName())); + } + + @Test + public void nullOrUnknownFallsBackToSequence() { + // The configured default (dn_multi_dir_strategy=SequenceStrategy) and any unrecognized value + // must resolve to SEQUENCE_STRATEGY. + Assert.assertEquals( + DirectoryStrategyType.SEQUENCE_STRATEGY, DirectoryStrategyType.fromClassName(null)); + Assert.assertEquals( + DirectoryStrategyType.SEQUENCE_STRATEGY, + DirectoryStrategyType.fromClassName("NoSuchStrategy")); + } +}
diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePatternParseTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePatternParseTest.java new file mode 100644 index 0000000..3ebcd5a --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePatternParseTest.java
@@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.datastructure.pattern; + +import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; + +public class TreePatternParseTest { + + @Test + public void testPatternAndPatternInclusionPreserved() { + final PipeParameters params = + new PipeParameters( + new HashMap<String, String>() { + { + put(PipeSourceConstant.SOURCE_PATTERN_KEY, "root.sg.A"); + put(PipeSourceConstant.SOURCE_PATTERN_INCLUSION_KEY, "root.sg.B"); + } + }); + + final TreePattern result = TreePattern.parsePipePatternFromSourceParameters(params); + + Assert.assertTrue(result instanceof UnionTreePattern); + Assert.assertEquals("root.sg.A,root.sg.B", result.getPattern()); + } + + @Test + public void testPathAndPathInclusionPreserved() { + final PipeParameters params = + new PipeParameters( + new HashMap<String, String>() { + { + put(PipeSourceConstant.SOURCE_PATH_KEY, "root.sg.d1"); + put(PipeSourceConstant.SOURCE_PATH_INCLUSION_KEY, "root.sg.d2,root.sg.d3"); + } + }); + + final TreePattern result = TreePattern.parsePipePatternFromSourceParameters(params); + + Assert.assertTrue(result instanceof UnionIoTDBTreePattern); + Assert.assertEquals("root.sg.d1,root.sg.d2,root.sg.d3", result.getPattern()); + } + + @Test + public void testPathInclusionWithPathExclusionPreserved() { + final PipeParameters params = + new PipeParameters( + new HashMap<String, String>() { + { + put(PipeSourceConstant.SOURCE_PATH_INCLUSION_KEY, "root.sg.**"); + put(PipeSourceConstant.SOURCE_PATH_EXCLUSION_KEY, "root.sg.d1,root.sg.d2"); + } + }); + + final TreePattern result = TreePattern.parsePipePatternFromSourceParameters(params); + + Assert.assertTrue(result instanceof WithExclusionIoTDBTreePattern); + Assert.assertEquals( + "INCLUSION(root.sg.**), EXCLUSION(root.sg.d1,root.sg.d2)", result.getPattern()); + } +}
diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/utils/JVMCommonUtilsTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/utils/JVMCommonUtilsTest.java index 2b61178..16e0a22 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/utils/JVMCommonUtilsTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/utils/JVMCommonUtilsTest.java
@@ -26,15 +26,21 @@ import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.read.ListAppender; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; public class JVMCommonUtilsTest { + @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); + @Test public void getJdkVersionTest() { try { @@ -52,6 +58,24 @@ } @Test + public void getOccupiedSpaceMissingFolderReturnsZero() throws IOException { + File missing = new File(tempFolder.getRoot(), "does-not-exist"); + Assert.assertFalse(missing.exists()); + // A non-existent folder must be treated as empty rather than throwing NoSuchFileException. + Assert.assertEquals(0L, JVMCommonUtils.getOccupiedSpace(missing.getAbsolutePath())); + } + + @Test + public void getOccupiedSpaceSumsFileSizes() throws IOException { + File dir = tempFolder.newFolder("data"); + byte[] payload = "hello-iotdb".getBytes(StandardCharsets.UTF_8); + Files.write(new File(dir, "a.txt").toPath(), payload); + Files.write(new File(dir, "b.txt").toPath(), payload); + Assert.assertEquals( + 2L * payload.length, JVMCommonUtils.getOccupiedSpace(dir.getAbsolutePath())); + } + + @Test public void unexpectedDiskSpaceErrorsLoggedOnlyOnceWhileErrorPersists() { ch.qos.logback.classic.Logger logger = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(JVMCommonUtils.class);
diff --git a/pom.xml b/pom.xml index 7561fa7..3e0aa63 100644 --- a/pom.xml +++ b/pom.xml
@@ -785,6 +785,9 @@ <exclude>**/*.cvs</exclude> <!-- licenses --> <exclude>licenses/*</exclude> + <!-- bundled third-party NOTICE / license texts for the C++ client package --> + <exclude>**/package-metadata/third_party/NOTICE</exclude> + <exclude>**/package-metadata/third_party/licenses/**</exclude> <!-- only for Travis CI with WinOS--> <exclude>hadoopbin</exclude> <exclude>windowssystem32</exclude>