Fix that compression ratio is not transferred during region migration
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithCompressionRatioIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithCompressionRatioIT.java new file mode 100644 index 0000000..e9fddd2 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithCompressionRatioIT.java
@@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1; + +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.it.env.EnvFactory; + +import org.apache.tsfile.utils.Pair; +import org.awaitility.Awaitility; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getAllDataNodes; +import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getDataRegionMapWithLeader; + +public class IoTDBRegionMigrateWithCompressionRatioIT { + @Before + public void setUp() throws Exception { + EnvFactory.getEnv() + .getConfig() + .getCommonConfig() + .setDataReplicationFactor(2) + .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS); + EnvFactory.getEnv().initClusterEnvironment(1, 3); + } + + @After + public void tearDown() throws Exception { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testWithCompressionRatio() throws Exception { + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute("CREATE DATABASE test"); + statement.execute("USE test"); + statement.execute("CREATE TABLE t1 (s1 INT64 FIELD)"); + statement.execute("INSERT INTO t1 (time, s1) VALUES (100, 100)"); + statement.execute("FLUSH"); + + Map<Integer, Pair<Integer, Set<Integer>>> dataRegionMapWithLeader = + getDataRegionMapWithLeader(statement); + int dataRegionIdForTest = + dataRegionMapWithLeader.keySet().stream().max(Integer::compare).get(); + + Pair<Integer, Set<Integer>> leaderAndNodes = dataRegionMapWithLeader.get(dataRegionIdForTest); + Set<Integer> allDataNodes = getAllDataNodes(statement); + int leaderId = leaderAndNodes.getLeft(); + int followerId = + leaderAndNodes.getRight().stream().filter(i -> i != leaderId).findAny().get(); + int newLeaderId = + allDataNodes.stream().filter(i -> i != leaderId && i != followerId).findAny().get(); + + System.out.printf( + "Old leader: %d, follower: %d, new leader: %d%n", leaderId, followerId, newLeaderId); + + double[] compressionRatioBeforeMigration = new double[] {Double.NaN}; + Awaitility.await() + .atMost(10, TimeUnit.MINUTES) + .pollDelay(1, TimeUnit.SECONDS) + .untilAsserted( + () -> { + try (ResultSet showRegions = statement.executeQuery("SHOW REGIONS")) { + while (showRegions.next()) { + int regionId = showRegions.getInt("RegionId"); + int dataNodeId = showRegions.getInt("DataNodeId"); + if (regionId == dataRegionIdForTest && dataNodeId == leaderId) { + compressionRatioBeforeMigration[0] = + showRegions.getDouble("CompressionRatio"); + break; + } + } + } + Assert.assertFalse(Double.isNaN(compressionRatioBeforeMigration[0])); + }); + + statement.execute( + String.format( + "migrate region %d from %d to %d", dataRegionIdForTest, leaderId, newLeaderId)); + + double finalCompressionRatioBeforeMigration = compressionRatioBeforeMigration[0]; + Awaitility.await() + .atMost(10, TimeUnit.MINUTES) + .pollDelay(1, TimeUnit.SECONDS) + .untilAsserted( + () -> { + double compressionRatioAfterMigration = 0.0; + try (ResultSet showRegions = statement.executeQuery("SHOW REGIONS")) { + while (showRegions.next()) { + int regionId = showRegions.getInt("RegionId"); + int dataNodeId = showRegions.getInt("DataNodeId"); + if (regionId == dataRegionIdForTest && dataNodeId == newLeaderId) { + compressionRatioAfterMigration = showRegions.getDouble("CompressionRatio"); + break; + } + } + } + Assert.assertEquals( + finalCompressionRatioBeforeMigration, compressionRatioAfterMigration, 0.0001); + }); + } + } +}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatio.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatio.java index c22afac..9978d81 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatio.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatio.java
@@ -57,7 +57,7 @@ static final String COMPRESSION_RATIO_DIR = "compression_ratio"; private static final String FILE_PREFIX_BEFORE_V121 = "Ratio-"; - private static final String FILE_PREFIX = "Compress-"; + public static final String FILE_PREFIX = "Compress-"; private static final String SEPARATOR = "-"; @@ -296,7 +296,7 @@ } @TestOnly - void reset() throws IOException { + public void reset() throws IOException { if (!directory.exists()) { return; } @@ -308,9 +308,26 @@ Files.delete(file.toPath()); } totalMemorySize = new AtomicLong(0); + dataRegionRatioMap.clear(); totalDiskSize = 0L; } + public synchronized File getCompressionRatioFile(String dataRegionId) { + Pair<Long, Long> dataRegionCompressionRatio = dataRegionRatioMap.get(dataRegionId); + if (dataRegionCompressionRatio == null) { + return null; + } + return SystemFileFactory.INSTANCE.getFile( + directory, + String.format( + Locale.ENGLISH, + RATIO_FILE_PATH_FORMAT, + dataRegionCompressionRatio.getLeft(), + dataRegionCompressionRatio.getRight()) + + "." + + dataRegionId); + } + public Map<String, Pair<Long, Long>> getDataRegionRatioMap() { return dataRegionRatioMap; }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java index 5c5c8e4..daa6427 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; +import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio; import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager; import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType; @@ -121,7 +122,9 @@ return null; } LOGGER.info("Moving snapshot file to data dirs"); - createLinksFromSnapshotDirToDataDirWithoutLog(new File(snapshotPath)); + File snapshotDir = new File(snapshotPath); + createLinksFromSnapshotDirToDataDirWithoutLog(snapshotDir); + loadCompressionRatio(snapshotDir); return loadSnapshot(); } catch (IOException | DiskSpaceInsufficientException e) { LOGGER.error( @@ -130,6 +133,34 @@ } } + private void loadCompressionRatio(File snapshotDir) { + File[] compressionFiles = + snapshotDir.listFiles(f -> f.getName().startsWith(CompressionRatio.FILE_PREFIX)); + if (compressionFiles == null || compressionFiles.length == 0) { + LOGGER.info("No compression ratio file in dir {}", snapshotPath); + return; + } + File ratioFile = compressionFiles[0]; + String fileName = ratioFile.getName(); + String ratioPart = fileName.substring(0, fileName.lastIndexOf(".")); + String dataRegionId = fileName.substring(fileName.lastIndexOf(".") + 1); + + String[] fileNameArray = ratioPart.split("-"); + // fileNameArray.length != 3 means the compression ratio may be negative, ignore it + if (fileNameArray.length == 3) { + try { + long rawSize = Long.parseLong(fileNameArray[1]); + long diskSize = Long.parseLong(fileNameArray[2]); + CompressionRatio.getInstance().updateRatio(rawSize, diskSize, dataRegionId); + } catch (NumberFormatException ignore) { + // ignore illegal compression file name + } catch (IOException e) { + LOGGER.warn("Cannot load compression ratio from {}", ratioFile, e); + } + } + LOGGER.info("Loaded compression ratio from {}", ratioFile); + } + private DataRegion loadSnapshotWithLog(File logFile) { boolean snapshotComplete = false; try { @@ -151,6 +182,7 @@ deleteAllFilesInDataDirs(); LOGGER.info("Remove all data files in original data dir"); createLinksFromSnapshotDirToDataDirWithLog(); + loadCompressionRatio(new File(snapshotPath)); return loadSnapshot(); } catch (IOException e) { LOGGER.error("Failed to remove origin data files", e); @@ -497,6 +529,14 @@ + snapshotId; fileList.addAll(searchDataFilesRecursively(snapshotDir)); } + + File[] compressionRatioFiles = + logFile + .getParentFile() + .listFiles(f -> f.getName().startsWith(CompressionRatio.FILE_PREFIX)); + if (compressionRatioFiles != null) { + fileList.addAll(Arrays.asList(compressionRatioFiles)); + } return fileList; } finally { analyzer.close();
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotTaker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotTaker.java index e60d7e1..dde04c1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotTaker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotTaker.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.DirectoryNotLegalException; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; +import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio; import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -100,6 +101,7 @@ } success = createSnapshot(seqFiles, tempSnapshotId); success = success && createSnapshot(unseqFiles, tempSnapshotId); + success = success && snapshotCompressionRatio(snapshotDirPath); } finally { readUnlockTheFile(); } @@ -136,6 +138,31 @@ } } + private boolean snapshotCompressionRatio(String snapshotDir) { + File compressionRatioFile = + CompressionRatio.getInstance().getCompressionRatioFile(dataRegion.getDataRegionId()); + if (compressionRatioFile != null) { + LOGGER.info("Snapshotting compression ratio {}.", compressionRatioFile.getName()); + try { + File snapshotFile = new File(snapshotDir, compressionRatioFile.getName()); + if (snapshotFile.createNewFile()) { + // write one byte so that it will not be skipped + Files.write(snapshotFile.toPath(), new byte[1]); + LOGGER.info( + "Snapshot compression ratio {} in {}.", compressionRatioFile.getName(), snapshotDir); + return true; + } + } catch (IOException ignored) { + LOGGER.warn( + "Cannot snapshot compression ratio {} in {}.", + compressionRatioFile.getName(), + snapshotDir); + } + return false; + } + return true; + } + public boolean cleanSnapshot() { return clearSnapshotOfDataRegion(this.dataRegion); }
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java index 00bbc38..6bec0df 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.exception.DirectoryNotLegalException; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; +import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; @@ -32,6 +33,7 @@ import org.apache.tsfile.exception.write.WriteProcessException; import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.TsFileGeneratorUtils; import org.junit.After; import org.junit.Assert; @@ -179,18 +181,24 @@ try { List<TsFileResource> resources = writeTsFiles(); DataRegion region = new DataRegion(testSgName, "0"); + CompressionRatio.getInstance().updateRatio(100, 100, "0"); region.getTsFileManager().addAll(resources, true); File snapshotDir = new File("target" + File.separator + "snapshot"); Assert.assertTrue(snapshotDir.exists() || snapshotDir.mkdirs()); try { Assert.assertTrue( new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true)); + CompressionRatio.getInstance().reset(); + DataRegion dataRegion = new SnapshotLoader(snapshotDir.getAbsolutePath(), testSgName, "0") .loadSnapshotForStateMachine(); Assert.assertNotNull(dataRegion); List<TsFileResource> resource = dataRegion.getTsFileManager().getTsFileList(true); Assert.assertEquals(100, resource.size()); + Assert.assertEquals( + new Pair<>(100L, 100L), + CompressionRatio.getInstance().getDataRegionRatioMap().get("0")); } finally { FileUtils.recursivelyDeleteFolder(snapshotDir.getAbsolutePath()); }