add expr
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 3ccf61c..dcb4043 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -438,8 +438,16 @@
conf.setWALCompressionAlgorithm(
enableWALCompression ? CompressionType.LZ4 : CompressionType.UNCOMPRESSED);
- conf.setLevelModFileCntThreshold(Integer.parseInt(properties.getProperty("level_mod_file_cnt_threshold", Integer.toString(conf.getLevelModFileCntThreshold()))));
- conf.setSingleModFileSizeThreshold(Long.parseLong(properties.getProperty("single_mod_file_size_threshold", Integer.toString(conf.getLevelModFileCntThreshold()))));
+ conf.setLevelModFileCntThreshold(
+ Integer.parseInt(
+ properties.getProperty(
+ "level_mod_file_cnt_threshold",
+ Integer.toString(conf.getLevelModFileCntThreshold()))));
+ conf.setSingleModFileSizeThreshold(
+ Long.parseLong(
+ properties.getProperty(
+ "single_mod_file_size_threshold",
+ Integer.toString(conf.getLevelModFileCntThreshold()))));
conf.setCompactionScheduleIntervalInMs(
Long.parseLong(
@@ -1966,8 +1974,16 @@
conf.setWALCompressionAlgorithm(
enableWALCompression ? CompressionType.LZ4 : CompressionType.UNCOMPRESSED);
- conf.setLevelModFileCntThreshold(Integer.parseInt(properties.getProperty("level_mod_file_cnt_threshold", Integer.toString(conf.getLevelModFileCntThreshold()))));
- conf.setSingleModFileSizeThreshold(Long.parseLong(properties.getProperty("single_mod_file_size_threshold", Integer.toString(conf.getLevelModFileCntThreshold()))));
+ conf.setLevelModFileCntThreshold(
+ Integer.parseInt(
+ properties.getProperty(
+ "level_mod_file_cnt_threshold",
+ Integer.toString(conf.getLevelModFileCntThreshold()))));
+ conf.setSingleModFileSizeThreshold(
+ Long.parseLong(
+ properties.getProperty(
+ "single_mod_file_size_threshold",
+ Integer.toString(conf.getLevelModFileCntThreshold()))));
// update Consensus config
reloadConsensusProps(properties);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/DeletionExprMain.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/DeletionExprMain.java
index bda0122..7454165 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/DeletionExprMain.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/DeletionExprMain.java
@@ -19,15 +19,6 @@
package org.apache.iotdb.db.expr;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
import org.apache.iotdb.db.expr.conf.SimulationConfig;
import org.apache.iotdb.db.expr.distribution.FixedIntervalGenerator;
import org.apache.iotdb.db.expr.entity.SimDeletion;
@@ -42,6 +33,15 @@
import org.apache.tsfile.read.common.TimeRange;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
public class DeletionExprMain {
private SimulationConfig config;
@@ -51,6 +51,7 @@
private long maxTimestamp;
private ExprReport report;
private static int maxFileCntThreshold = 30;
+ private static int cntThresholdStep = 1;
public DeletionExprMain() {
init();
@@ -60,8 +61,7 @@
public void init() {
config = new SimulationConfig();
simulator = new SimpleSimulator(config);
- simpleModAllocator = new SimpleModAllocator(config,
- simulator.getSimulationContext());
+ simpleModAllocator = new SimpleModAllocator(config, simulator.getSimulationContext());
maxStep = 10000;
maxTimestamp = Long.MAX_VALUE;
}
@@ -86,8 +86,10 @@
GenerateDeletionEvent generatePartialDeletionEvent =
new GenerateDeletionEvent(
config,
- new SimDeletion(new TimeRange(config.partialDeletionOffset,
- config.partialDeletionOffset + config.partialDeletionRange)),
+ new SimDeletion(
+ new TimeRange(
+ config.partialDeletionOffset,
+ config.partialDeletionOffset + config.partialDeletionRange)),
config.partialDeletionStep,
new FixedIntervalGenerator(config.generatePartialDeletionInterval));
generatePartialDeletionEvent.generateTimestamp = config.deletionStartTime;
@@ -104,15 +106,19 @@
ExecuteRangeQueryEvent executeRangeQueryEvent =
new ExecuteRangeQueryEvent(
config,
- new TimeRange(config.rangeQueryOffset,
- config.rangeQueryRange + config.rangeQueryOffset),
+ new TimeRange(
+ config.rangeQueryOffset, config.rangeQueryRange + config.rangeQueryOffset),
config.rangeQueryStep,
new FixedIntervalGenerator(config.rangeQueryInterval));
- ExecuteLastPointQueryEvent executeLastPointQueryEvent = new ExecuteLastPointQueryEvent(config,
- new TimeRange(0, 1), new FixedIntervalGenerator(config.pointQueryInterval));
- ExecuteRangeQueryEvent executeFullQueryEvent = new ExecuteRangeQueryEvent(config,
- new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE), 0,
- new FixedIntervalGenerator(config.fullQueryInterval));
+ ExecuteLastPointQueryEvent executeLastPointQueryEvent =
+ new ExecuteLastPointQueryEvent(
+ config, new TimeRange(0, 1), new FixedIntervalGenerator(config.pointQueryInterval));
+ ExecuteRangeQueryEvent executeFullQueryEvent =
+ new ExecuteRangeQueryEvent(
+ config,
+ new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE),
+ 0,
+ new FixedIntervalGenerator(config.fullQueryInterval));
events.add(executeRangeQueryEvent);
events.add(executeLastPointQueryEvent);
events.add(executeFullQueryEvent);
@@ -134,18 +140,21 @@
if (printState) {
System.out.println(simulator);
System.out.println(simulator.getStatistics());
- System.out.println(simulator.getSimulationContext().modFileManager.modFileList.stream()
- .map(s -> s.mods.size()).collect(
- Collectors.toList()));
- System.out.println(simulator.getSimulationContext().modFileManager.modFileList.stream()
- .map(s -> s.tsfileReferences.size()).collect(
- Collectors.toList()));
+ System.out.println(
+ simulator.getSimulationContext().modFileManager.modFileList.stream()
+ .map(s -> s.mods.size())
+ .collect(Collectors.toList()));
+ System.out.println(
+ simulator.getSimulationContext().modFileManager.modFileList.stream()
+ .map(s -> s.tsfileReferences.size())
+ .collect(Collectors.toList()));
}
}
private void writeReport() {
- report.deletionWriteTime = simulator.getStatistics().partialDeletionExecutedTime
- + simulator.getStatistics().fullDeletionExecutedTime;
+ report.deletionWriteTime =
+ simulator.getStatistics().partialDeletionExecutedTime
+ + simulator.getStatistics().fullDeletionExecutedTime;
report.deletionTimeList.add(report.deletionWriteTime);
report.deletionReadTime = simulator.getStatistics().queryReadDeletionTime;
report.queryTimeList.add(report.deletionReadTime);
@@ -188,7 +197,7 @@
}
// use modFileCntThreshold as the x-axis
- for (int i = 1; i < maxFileCntThreshold; i++) {
+ for (int i = 1; i < maxFileCntThreshold; i+=cntThresholdStep) {
initExpr(expr);
configurer.configure(expr, exprNum);
expr.config.modFileCntThreshold = i;
@@ -210,30 +219,40 @@
expr.config.generatePartialDeletionInterval = 2_000_000;
expr.config.partialDeletionRange = expr.config.tsfileRange * 3;
expr.config.partialDeletionOffset = -expr.config.partialDeletionRange;
- expr.config.partialDeletionStep = (long) (expr.config.tsfileRange / (
- 1.0 * expr.config.generateTsFileInterval / expr.config.generatePartialDeletionInterval));
+ expr.config.partialDeletionStep =
+ (long)
+ (expr.config.tsfileRange
+ / (1.0
+ * expr.config.generateTsFileInterval
+ / expr.config.generatePartialDeletionInterval));
expr.config.generateTsFileInterval = 10_000_000;
expr.config.modFileSizeThreshold = 64 * 1024;
expr.config.deletionStartTime = 1000 * expr.config.generateTsFileInterval;
-// expr.config.queryRange = maxTimestamp;
-// expr.config.queryStep = 0;
+ // expr.config.queryRange = maxTimestamp;
+ // expr.config.queryStep = 0;
expr.config.rangeQueryRange = expr.config.tsfileRange * 1000;
- expr.config.rangeQueryStep = expr.config.tsfileRange / (expr.config.generateTsFileInterval
- / expr.config.rangeQueryInterval);
- expr.config.rangeQueryOffset = -expr.config.rangeQueryRange
- + expr.config.deletionStartTime / expr.config.generateTsFileInterval
- * expr.config.tsfileRange;
+ expr.config.rangeQueryStep =
+ expr.config.tsfileRange
+ / (expr.config.generateTsFileInterval / expr.config.rangeQueryInterval);
+ expr.config.rangeQueryOffset =
+ -expr.config.rangeQueryRange
+ + expr.config.deletionStartTime
+ / expr.config.generateTsFileInterval
+ * expr.config.tsfileRange;
}
- private static void parallelExpr(Configurer configurer, int exprNum, Function<Integer, String> argsToString, boolean runBaseline)
+ private static void parallelExpr(
+ Configurer configurer,
+ int exprNum,
+ Function<Integer, String> argsToString,
+ boolean runBaseline)
throws ExecutionException, InterruptedException {
ExecutorService service = Executors.newCachedThreadPool();
List<Future<ExprReport>> asyncReports = new ArrayList<>();
for (int i = 0; i < exprNum; i++) {
int finalI = i;
- asyncReports.add(service.submit(() -> oneExpr(configurer,
- finalI, runBaseline)));
+ asyncReports.add(service.submit(() -> oneExpr(configurer, finalI, runBaseline)));
}
for (Future<ExprReport> asyncReport : asyncReports) {
@@ -249,81 +268,109 @@
private static void testSizeThreshold() throws ExecutionException, InterruptedException {
String argName = "sizeThreshold";
- long[] exprArgs = new long[]{
- 16 * 1024,
- 32 * 1024,
- 64 * 1024,
- 128 * 1024,
- 256 * 1024,
- };
- Configurer configurer = (expr, j) -> {
- expr.config.modFileSizeThreshold = exprArgs[j];
- };
+ long[] exprArgs =
+ new long[]{
+ 16 * 1024, 32 * 1024, 64 * 1024, 128 * 1024, 256 * 1024,
+ };
+ Configurer configurer =
+ (expr, j) -> {
+ expr.config.modFileSizeThreshold = exprArgs[j];
+ };
parallelExpr(configurer, exprArgs.length, (i) -> argName + ":" + exprArgs[i], true);
}
private static void testQueryInterval() throws ExecutionException, InterruptedException {
String argName = "queryInterval";
- long[] exprArgs = new long[]{
- 500_000,
- 1000_000,
- 1500_000,
- 2000_000,
- 2500_000
- };
- Configurer configurer = (expr, j) -> {
- expr.config.pointQueryInterval = exprArgs[j];
- expr.config.rangeQueryInterval = exprArgs[j];
- expr.config.fullQueryInterval = exprArgs[j];
- };
+ long[] exprArgs = new long[]{500_000, 1000_000, 1500_000, 2000_000, 2500_000};
+ Configurer configurer =
+ (expr, j) -> {
+ expr.config.pointQueryInterval = exprArgs[j];
+ expr.config.rangeQueryInterval = exprArgs[j];
+ expr.config.fullQueryInterval = exprArgs[j];
+ };
parallelExpr(configurer, exprArgs.length, (i) -> argName + ":" + exprArgs[i], true);
}
private static void testSimulationTime() throws ExecutionException, InterruptedException {
String argName = "simulationTime";
- long[] exprArgs = new long[]{
- 24 * 60 * 60 * 1000 * 1000L,
- 2 * 24 * 60 * 60 * 1000 * 1000L,
- 3 * 24 * 60 * 60 * 1000 * 1000L,
- 4 * 24 * 60 * 60 * 1000 * 1000L,
- 5 * 24 * 60 * 60 * 1000 * 1000L
- };
- Configurer configurer = (expr, j) -> {
- expr.maxTimestamp = exprArgs[j];
- };
+ long[] exprArgs =
+ new long[]{
+ 24 * 60 * 60 * 1000 * 1000L,
+ 2 * 24 * 60 * 60 * 1000 * 1000L,
+ 3 * 24 * 60 * 60 * 1000 * 1000L,
+ 4 * 24 * 60 * 60 * 1000 * 1000L,
+ 5 * 24 * 60 * 60 * 1000 * 1000L
+ };
+ Configurer configurer =
+ (expr, j) -> {
+ expr.maxTimestamp = exprArgs[j];
+ };
parallelExpr(configurer, exprArgs.length, (i) -> argName + ":" + exprArgs[i], false);
}
private static void testDeletionRatio() throws ExecutionException, InterruptedException {
String argName1 = "fullDeletionInterval";
- long[] exprArgs1 = new long[]{
- 200_000_000,
- 20_000_000,
- 2_000_000,
- 2_000_000,
- 2_000_000
- };
+ long[] exprArgs1;
+ exprArgs1 = new long[]{200_000_000, 20_000_000, 2_000_000, 2_000_000, 2_000_000};
+// exprArgs1 = new long[]{2_000_000, 2_000_000};
String argName2 = "partialDeletionInterval";
- long[] exprArgs2 = new long[]{
- 2_000_000,
- 2_000_000,
- 2_000_000,
- 20_000_000,
- 200_000_000,
+ long[] exprArgs2;
+ exprArgs2 = new long[]{
+ 2_000_000, 2_000_000, 2_000_000, 20_000_000, 200_000_000,
};
- Configurer configurer = (expr, j) -> {
- expr.config.generateFullDeletionInterval = exprArgs1[j];
- expr.config.generatePartialDeletionInterval = exprArgs2[j];
- };
- parallelExpr(configurer, exprArgs1.length, (i) -> argName1 + ":" + exprArgs1[i] +";" + argName2 + ":" + exprArgs2[i], true);
+// exprArgs2 =
+// new long[]{
+// 20_000_000, 200_000_000,
+// };
+ Configurer configurer =
+ (expr, j) -> {
+ expr.config.generateFullDeletionInterval = exprArgs1[j];
+ expr.config.generatePartialDeletionInterval = exprArgs2[j];
+ expr.config.partialDeletionStep =
+ (long)
+ (expr.config.tsfileRange
+ / (1.0
+ * expr.config.generateTsFileInterval
+ / expr.config.generatePartialDeletionInterval));
+ };
+ parallelExpr(
+ configurer,
+ exprArgs1.length,
+ (i) -> argName1 + ":" + exprArgs1[i] + ";" + argName2 + ":" + exprArgs2[i],
+ true);
}
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- maxFileCntThreshold = 30;
+ private static void testTsFileGenInterval() throws ExecutionException, InterruptedException {
+ String argName = "tsFileGenerationInterval";
+ long[] exprArgs = new long[]{
+ 10_000_000L,
+ 20_000_000L,
+ 40_000_000L,
+ 80_000_000L,
+ 160_000_000L,
+ };
+ Configurer configurer = (expr, j) -> {
+ expr.config.generateTsFileInterval = exprArgs[j];
+ expr.config.partialDeletionStep = (long) (expr.config.tsfileRange / (
+ 1.0 * expr.config.generateTsFileInterval / expr.config.generatePartialDeletionInterval));
+ expr.config.rangeQueryStep = expr.config.tsfileRange / (expr.config.generateTsFileInterval
+ / expr.config.rangeQueryInterval);
+ expr.config.rangeQueryOffset = -expr.config.rangeQueryRange
+ + expr.config.deletionStartTime / expr.config.generateTsFileInterval
+ * expr.config.tsfileRange;
+ };
+ parallelExpr(configurer, exprArgs.length, (i) -> argName + ":" + exprArgs[i], true);
+ }
-// testSizeThreshold();
-// testQueryInterval();
-// testSimulationTime();
- testDeletionRatio();
+
+ public static void main(String[] args) throws ExecutionException, InterruptedException {
+ maxFileCntThreshold = 101;
+ cntThresholdStep = 5;
+
+ testSizeThreshold();
+ // testQueryInterval();
+ // testSimulationTime();
+// testDeletionRatio();
+// testTsFileGenInterval();
}
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/conf/SimulationConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/conf/SimulationConfig.java
index 83ea23c..35fd6d6 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/conf/SimulationConfig.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/conf/SimulationConfig.java
@@ -21,8 +21,10 @@
public long generatePartialDeletionInterval = 20_000_000L;
public long generateFullDeletionInterval = 20_000_000L;
- // the first deletion ranges from [partialDeletionOffset, partialDeletionRange + partialDeletionOffset],
- // and the next one ranges from [partialDeletionOffset + partialDeletionStep, partialDeletionRange + partialDeletionOffset + partialDeletionStep],
+ // the first deletion ranges from [partialDeletionOffset, partialDeletionRange +
+ // partialDeletionOffset],
+ // and the next one ranges from [partialDeletionOffset + partialDeletionStep, partialDeletionRange
+ // + partialDeletionOffset + partialDeletionStep],
// and so on
public long partialDeletionRange = tsfileRange * 3;
public long partialDeletionStep = tsfileRange / 2;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/entity/SimpleModAllocator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/entity/SimpleModAllocator.java
index 7825140..795b902 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/entity/SimpleModAllocator.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/entity/SimpleModAllocator.java
@@ -39,9 +39,10 @@
// can allocate more mod file
long totalSize = prevModFile.mods.size() * config.deletionSizeInByte;
if (totalSize > config.modFileSizeThreshold) {
-// System.out.printf(
-// "When allocating new Mod File, there are %d partial deletion and %d full deletion%n",
-// prevModFile.partialDeletionCnt, prevModFile.fullDeletionCnt);
+ // System.out.printf(
+ // "When allocating new Mod File, there are %d partial deletion and %d full
+ // deletion%n",
+ // prevModFile.partialDeletionCnt, prevModFile.fullDeletionCnt);
// the previous one is already large enough, allocate a new one
return allocateNew();
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/event/ExecuteLastPointQueryEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/event/ExecuteLastPointQueryEvent.java
index 9985300..d67db56 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/event/ExecuteLastPointQueryEvent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/event/ExecuteLastPointQueryEvent.java
@@ -19,14 +19,14 @@
package org.apache.iotdb.db.expr.event;
+import org.apache.iotdb.db.expr.conf.SimulationConfig;
+import org.apache.iotdb.db.expr.simulator.SimulationContext;
+
+import org.apache.tsfile.read.common.TimeRange;
+
import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;
-import org.apache.iotdb.db.expr.conf.SimulationConfig;
-import org.apache.iotdb.db.expr.entity.SimDeletion;
-import org.apache.iotdb.db.expr.entity.SimTsFile;
-import org.apache.iotdb.db.expr.simulator.SimulationContext;
-import org.apache.tsfile.read.common.TimeRange;
public class ExecuteLastPointQueryEvent extends ExecuteRangeQueryEvent {
@@ -39,7 +39,7 @@
long currentTimestamp = context.getSimulator().currentTimestamp;
long lastTsFileVersion = currentTimestamp / context.getConfig().generateTsFileInterval;
long lastTsFileTime = lastTsFileVersion * context.getConfig().tsfileRange;
- return new TimeRange(lastTsFileTime,lastTsFileTime + 1);
+ return new TimeRange(lastTsFileTime, lastTsFileTime + 1);
}
@Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/event/ExecuteRangeQueryEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/event/ExecuteRangeQueryEvent.java
index 0e547e7..f462164 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/event/ExecuteRangeQueryEvent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/event/ExecuteRangeQueryEvent.java
@@ -78,8 +78,7 @@
1.0 * deletions.size() * config.deletionSizeInByte / config.IoBandwidthBytesPerTimestamp;
readDeletionTransTimeSum += transTime;
readDeletionSeekTimeSum += config.IoSeekTimestamp;
- return transTime
- + config.IoSeekTimestamp;
+ return transTime + config.IoSeekTimestamp;
}
@Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/event/GenerateDeletionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/event/GenerateDeletionEvent.java
index 4b1ac0b..72c06a2 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/event/GenerateDeletionEvent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/event/GenerateDeletionEvent.java
@@ -26,7 +26,7 @@
long step,
Supplier<Long> intervalGenerator) {
super(config);
-
+
this.currentDeletion = currentDeletion;
this.step = step;
this.intervalGenerator = intervalGenerator;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/executor/SimpleExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/executor/SimpleExecutor.java
index 50c826c..bca6d78 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/executor/SimpleExecutor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/executor/SimpleExecutor.java
@@ -102,12 +102,13 @@
simpleContext.getStatistics().queryExecutedCnt++;
simpleContext.getStatistics().queryExecutedTime += event.getTimeConsumption();
simpleContext.getStatistics().queryReadDeletionTime += Math.round(event.readDeletionTimeSum);
- simpleContext.getStatistics().queryReadDeletionSeekTime += Math.round(
- event.readDeletionSeekTimeSum);
- simpleContext.getStatistics().queryReadDeletionTransTime += Math.round(
- event.readDeletionTransTimeSum);
-// System.out.println(
-// simpleContext.getSimulator().currentStep + " " + simpleContext.getSimulator().currentTimestamp
-// + " " + event.readDeletionSeekTimeSum + " " + event.readDeletionTransTimeSum);
+ simpleContext.getStatistics().queryReadDeletionSeekTime +=
+ Math.round(event.readDeletionSeekTimeSum);
+ simpleContext.getStatistics().queryReadDeletionTransTime +=
+ Math.round(event.readDeletionTransTimeSum);
+ // System.out.println(
+ // simpleContext.getSimulator().currentStep + " " +
+ // simpleContext.getSimulator().currentTimestamp
+ // + " " + event.readDeletionSeekTimeSum + " " + event.readDeletionTransTimeSum);
}
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/simulator/SimpleSimulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/simulator/SimpleSimulator.java
index 58aacbf..8bafae3 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/simulator/SimpleSimulator.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/simulator/SimpleSimulator.java
@@ -19,13 +19,13 @@
package org.apache.iotdb.db.expr.simulator;
-import java.util.List;
import org.apache.iotdb.db.expr.conf.SimulationConfig;
import org.apache.iotdb.db.expr.entity.SimModFileManager;
import org.apache.iotdb.db.expr.entity.SimTsFileManager;
import org.apache.iotdb.db.expr.event.Event;
import org.apache.iotdb.db.expr.executor.SimpleExecutor;
+import java.util.List;
import java.util.PriorityQueue;
public class SimpleSimulator {
@@ -94,13 +94,18 @@
@Override
public String toString() {
- return "SimpleSimulator{" +
- "currentTimestamp=" + currentTimestamp +
- "\n, currentStep=" + currentStep +
- "\n, eventQueue=" + eventQueue +
- "\n, maxStep=" + maxStep +
- "\n, maxTimestamp=" + maxTimestamp +
- '}';
+ return "SimpleSimulator{"
+ + "currentTimestamp="
+ + currentTimestamp
+ + "\n, currentStep="
+ + currentStep
+ + "\n, eventQueue="
+ + eventQueue
+ + "\n, maxStep="
+ + maxStep
+ + "\n, maxTimestamp="
+ + maxTimestamp
+ + '}';
}
public class SimpleContext implements SimulationContext {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java
index 08c8180..21b3b70 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java
@@ -207,7 +207,8 @@
} else {
if (sourceFile.modFileExists()) {
Files.createLink(
- new File(filesView.targetFilesInPerformer.get(0).getOldModFile().getFilePath()).toPath(),
+ new File(filesView.targetFilesInPerformer.get(0).getOldModFile().getFilePath())
+ .toPath(),
new File(sourceFile.getOldModFile().getFilePath()).toPath());
}
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
index 8cb6f06..0fe8739 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
@@ -141,7 +141,8 @@
continue;
}
Set<Modification> seqModifications =
- new HashSet<>(ModificationFileV1.getCompactionMods(seqResources.get(i)).getModifications());
+ new HashSet<>(
+ ModificationFileV1.getCompactionMods(seqResources.get(i)).getModifications());
modifications.addAll(seqModifications);
updateOneTargetMods(targetResource, modifications);
if (!modifications.isEmpty()) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/TsFileIdentifier.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/TsFileIdentifier.java
index 9ab77d7..d4e9076 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/TsFileIdentifier.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/TsFileIdentifier.java
@@ -195,7 +195,8 @@
if (file.exists()
|| new File(file.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).exists()
|| new File(file.getAbsolutePath() + ModificationFileV1.FILE_SUFFIX).exists()
- || new File(file.getAbsolutePath() + ModificationFileV1.COMPACTION_FILE_SUFFIX).exists()) {
+ || new File(file.getAbsolutePath() + ModificationFileV1.COMPACTION_FILE_SUFFIX)
+ .exists()) {
return file;
}
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/DeletionPredicate.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/DeletionPredicate.java
index 1918d47..c9a8589 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/DeletionPredicate.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/DeletionPredicate.java
@@ -18,15 +18,17 @@
*/
package org.apache.iotdb.db.storageengine.dataregion.modification;
+import org.apache.iotdb.db.utils.IOUtils.StreamSerializable;
+
+import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import org.apache.iotdb.db.utils.IOUtils.StreamSerializable;
-import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
-import org.apache.tsfile.utils.ReadWriteIOUtils;
public class DeletionPredicate implements StreamSerializable {
@@ -57,11 +59,10 @@
measurementNames.add(ReadWriteIOUtils.readVarIntString(stream));
}
} else {
- measurementNames = Collections.emptyList();
+ measurementNames = Collections.emptyList();
}
}
-
public static class IDPredicate implements StreamSerializable {
@Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModEntry.java
index e3c1fd6..12262a7 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModEntry.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModEntry.java
@@ -18,13 +18,15 @@
*/
package org.apache.iotdb.db.storageengine.dataregion.modification;
+import org.apache.iotdb.db.utils.IOUtils.StreamSerializable;
+
+import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
-import org.apache.iotdb.db.utils.IOUtils.StreamSerializable;
-import org.apache.tsfile.read.common.TimeRange;
-import org.apache.tsfile.utils.ReadWriteIOUtils;
public abstract class ModEntry implements StreamSerializable {
protected ModType modType;
@@ -43,8 +45,8 @@
@Override
public void deserialize(InputStream stream) throws IOException {
- this.timeRange = new TimeRange(ReadWriteIOUtils.readLong(stream),
- ReadWriteIOUtils.readLong(stream));
+ this.timeRange =
+ new TimeRange(ReadWriteIOUtils.readLong(stream), ReadWriteIOUtils.readLong(stream));
}
public static ModEntry createFrom(InputStream stream) throws IOException {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModFileManager.java
index 79feb96..db9f55b 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModFileManager.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModFileManager.java
@@ -19,6 +19,13 @@
package org.apache.iotdb.db.storageengine.dataregion.modification;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -26,21 +33,15 @@
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.iotdb.commons.utils.FileUtils;
-import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
-import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-/**
- * A ModFileManager manages the ModificationFiles of a Time Partition.
- */
+/** A ModFileManager manages the ModificationFiles of a Time Partition. */
@SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter"})
public class ModFileManager {
private static final Logger LOGGER = LoggerFactory.getLogger(ModFileManager.class);
// levelNum -> modFileNum -> modFile
- private final Map<Long, TreeMap<Long, ModificationFile>> allLevelModsFileMap = new ConcurrentHashMap<>();
+ private final Map<Long, TreeMap<Long, ModificationFile>> allLevelModsFileMap =
+ new ConcurrentHashMap<>();
private final int levelModFileCntThreshold;
private final long singleModFileSizeThreshold;
@@ -55,15 +56,17 @@
String name = file.getName();
long[] levelNumAndModNum = ModificationFile.parseFileName(name);
- ModificationFile modificationFile = allLevelModsFileMap.computeIfAbsent(levelNumAndModNum[0],
- k -> new TreeMap<>()).computeIfAbsent(levelNumAndModNum[1], k -> new ModificationFile(file, resource));
+ ModificationFile modificationFile =
+ allLevelModsFileMap
+ .computeIfAbsent(levelNumAndModNum[0], k -> new TreeMap<>())
+ .computeIfAbsent(levelNumAndModNum[1], k -> new ModificationFile(file, resource));
modificationFile.addReference(resource);
return modificationFile;
}
private long maxModNum(long levelNum) {
- TreeMap<Long, ModificationFile> levelModFileMap = allLevelModsFileMap.computeIfAbsent(
- levelNum, k -> new TreeMap<>());
+ TreeMap<Long, ModificationFile> levelModFileMap =
+ allLevelModsFileMap.computeIfAbsent(levelNum, k -> new TreeMap<>());
if (levelModFileMap.isEmpty()) {
return -1;
} else {
@@ -126,12 +129,15 @@
TsFileID tsFileID = resource.getTsFileID();
long levelNum = tsFileID.getInnerCompactionCount();
long nextModNum = maxModNum(levelNum) + 1;
- File file = new File(resource.getTsFile().getParentFile(), ModificationFile.composeFileName(levelNum, nextModNum));
- TreeMap<Long, ModificationFile> levelModsFileMap = this.allLevelModsFileMap.computeIfAbsent(
- levelNum,
- k -> new TreeMap<>());
+ File file =
+ new File(
+ resource.getTsFile().getParentFile(),
+ ModificationFile.composeFileName(levelNum, nextModNum));
+ TreeMap<Long, ModificationFile> levelModsFileMap =
+ this.allLevelModsFileMap.computeIfAbsent(levelNum, k -> new TreeMap<>());
synchronized (levelModsFileMap) {
- return levelModsFileMap.computeIfAbsent(nextModNum, k -> new ModificationFile(file, resource));
+ return levelModsFileMap.computeIfAbsent(
+ nextModNum, k -> new ModificationFile(file, resource));
}
}
@@ -139,11 +145,12 @@
for (TreeMap<Long, ModificationFile> levelModFileMap : allLevelModsFileMap.values()) {
List<Long> modFilesToRemove = new ArrayList<>();
synchronized (levelModFileMap) {
- levelModFileMap.forEach((modNum, modFile) -> {
- if (!modFile.hasReference()) {
- modFilesToRemove.add(modNum);
- }
- });
+ levelModFileMap.forEach(
+ (modNum, modFile) -> {
+ if (!modFile.hasReference()) {
+ modFilesToRemove.add(modNum);
+ }
+ });
}
synchronized (levelModFileMap) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
index 6ee3480..d8a5e8c 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
@@ -19,6 +19,11 @@
package org.apache.iotdb.db.storageengine.dataregion.modification;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.BufferedOutputStream;
import java.io.EOFException;
import java.io.File;
@@ -34,9 +39,6 @@
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class ModificationFile implements AutoCloseable {
@@ -47,8 +49,8 @@
private FileChannel channel;
private OutputStream fileOutputStream;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- private final Set<TsFileResource> tsFileRefs = new ConcurrentSkipListSet<>(Comparator.comparing(
- TsFileResource::getTsFilePath));
+ private final Set<TsFileResource> tsFileRefs =
+ new ConcurrentSkipListSet<>(Comparator.comparing(TsFileResource::getTsFilePath));
public ModificationFile(File file, TsFileResource firstResource) {
this.file = file;
@@ -82,6 +84,7 @@
/**
* Add a TsFile to the reference set only if the set is not empty.
+ *
* @param tsFile TsFile to be added
* @return true if the TsFile is successfully added, false if the reference set is empty.
*/
@@ -101,6 +104,7 @@
/**
* Remove the references of the given TsFiles.
+ *
* @param tsFiles references to remove
* @return true if the ref set is empty after removal, false otherwise
*/
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TableDeletionEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TableDeletionEntry.java
index 2f09b92..44c0d9d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TableDeletionEntry.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TableDeletionEntry.java
@@ -18,10 +18,11 @@
*/
package org.apache.iotdb.db.storageengine.dataregion.modification;
+import org.apache.tsfile.read.common.TimeRange;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import org.apache.tsfile.read.common.TimeRange;
public class TableDeletionEntry extends ModEntry {
private DeletionPredicate predicate;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TreeDeletionEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TreeDeletionEntry.java
index a2607f4..692e780 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TreeDeletionEntry.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TreeDeletionEntry.java
@@ -18,15 +18,17 @@
*/
package org.apache.iotdb.db.storageengine.dataregion.modification;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.storageengine.dataregion.modification.v1.Deletion;
+
import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.utils.ReadWriteIOUtils;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
public class TreeDeletionEntry extends ModEntry {
private PartialPath pathPattern;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index 4ae4e4a..4ecb97a 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.storageengine.dataregion.tsfile;
-import java.nio.channels.FileChannel;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
@@ -65,6 +64,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -111,11 +111,13 @@
private ModificationFile modFile;
private long modFileOffset;
+
@SuppressWarnings("squid:S3077")
private volatile ModificationFileV1 oldModFile;
@SuppressWarnings("squid:S3077")
private volatile ModificationFileV1 compactionModFile;
+
private ModFileManager modFileManager;
// the start pos of mod file path in this TsFileResource
private long modFilePathOffset = -1;
@@ -248,7 +250,8 @@
fsFactory.moveFile(src, dest);
}
- private void serializeTo(BufferedOutputStream outputStream, FileOutputStream fileOutputStream) throws IOException {
+ private void serializeTo(BufferedOutputStream outputStream, FileOutputStream fileOutputStream)
+ throws IOException {
ReadWriteIOUtils.write(VERSION_NUMBER, outputStream);
timeIndex.serialize(outputStream);
@@ -310,9 +313,7 @@
modFilePathDeserialized = true;
}
- /**
- * deserialize only the mod file related fields from the tail of the file.
- */
+ /** deserialize only the mod file related fields from the tail of the file. */
private void deserializeModFilePath() throws IOException {
if (modFilePathDeserialized) {
return;
@@ -351,7 +352,8 @@
try (FileChannel fileChannel = FileChannel.open(resFile.toPath())) {
fileChannel.truncate(modFilePathOffset);
}
- FileOutputStream fileOutputStream = new FileOutputStream(file + RESOURCE_SUFFIX + TEMP_SUFFIX, true);
+ FileOutputStream fileOutputStream =
+ new FileOutputStream(file + RESOURCE_SUFFIX + TEMP_SUFFIX, true);
BufferedOutputStream outputStream = new BufferedOutputStream(fileOutputStream);
try {
ReadWriteIOUtils.writeVar(modFile.getFile().getAbsolutePath(), outputStream);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java
index 0041606..7950ba8 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java
@@ -19,10 +19,18 @@
package org.apache.iotdb.db.tools;
+import java.io.File;
+import java.io.Serializable;
+import java.util.HashMap;
import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.common.constant.TsFileConstant;
+import org.apache.tsfile.encoding.decoder.Decoder;
+import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.IMetadataIndexEntry;
import org.apache.tsfile.file.MetaMarker;
import org.apache.tsfile.file.header.ChunkGroupHeader;
+import org.apache.tsfile.file.header.ChunkHeader;
import org.apache.tsfile.file.header.PageHeader;
import org.apache.tsfile.file.metadata.ChunkGroupMetadata;
import org.apache.tsfile.file.metadata.ChunkMetadata;
@@ -34,11 +42,17 @@
import org.apache.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.tsfile.file.metadata.TsFileMetadata;
import org.apache.tsfile.file.metadata.enums.MetadataIndexNodeType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
import org.apache.tsfile.fileSystem.FSFactoryProducer;
import org.apache.tsfile.read.TsFileCheckStatus;
import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.BatchData;
import org.apache.tsfile.read.common.Chunk;
import org.apache.tsfile.read.common.Path;
+import org.apache.tsfile.read.reader.page.PageReader;
+import org.apache.tsfile.read.reader.page.TimePageReader;
+import org.apache.tsfile.read.reader.page.ValuePageReader;
import org.apache.tsfile.utils.BloomFilter;
import org.apache.tsfile.utils.Pair;
@@ -53,9 +67,15 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
+import org.apache.tsfile.utils.TsPrimitiveType;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TsFileSketchTool {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TsFileSketchTool.class);
private String filename;
private PrintWriter pw;
private TsFileSketchToolReader reader;
@@ -82,7 +102,308 @@
try {
this.filename = filename;
pw = new PrintWriter(new FileWriter(outFile));
- reader = new TsFileSketchToolReader(filename);
+ reader = new TsFileSketchToolReader(filename) {
+ @Override
+ public long selfCheck(Map<Path, IMeasurementSchema> newSchema,
+ List<ChunkGroupMetadata> chunkGroupMetadataList, boolean fastFinish)
+ throws IOException {
+ File checkFile = FSFactoryProducer.getFSFactory().getFile(this.file);
+ long fileSize;
+ if (!checkFile.exists()) {
+ return TsFileCheckStatus.FILE_NOT_FOUND;
+ } else {
+ fileSize = checkFile.length();
+ }
+ ChunkMetadata currentChunk;
+ String measurementID;
+ TSDataType dataType;
+ long fileOffsetOfChunk;
+
+ // ChunkMetadata of current ChunkGroup
+ List<ChunkMetadata> chunkMetadataList = new ArrayList<>();
+
+ int headerLength = TSFileConfig.MAGIC_STRING.getBytes().length + Byte.BYTES;
+ if (fileSize < headerLength) {
+ return TsFileCheckStatus.INCOMPATIBLE_FILE;
+ }
+ if (!TSFileConfig.MAGIC_STRING.equals(readHeadMagic())
+ || (TSFileConfig.VERSION_NUMBER != readVersionNumber())) {
+ return TsFileCheckStatus.INCOMPATIBLE_FILE;
+ }
+
+ tsFileInput.position(headerLength);
+ boolean isComplete = isComplete();
+ if (fileSize == headerLength) {
+ return headerLength;
+ } else if (isComplete) {
+ loadMetadataSize();
+ if (fastFinish) {
+ return TsFileCheckStatus.COMPLETE_FILE;
+ }
+ }
+ // if not a complete file, we will recover it...
+ long truncatedSize = headerLength;
+ byte marker;
+ List<long[]> timeBatch = new ArrayList<>();
+ IDeviceID lastDeviceId = null;
+ List<IMeasurementSchema> measurementSchemaList = new ArrayList<>();
+ Map<String, Integer> valueColumn2TimeBatchIndex = new HashMap<>();
+ try {
+ while ((marker = this.readMarker()) != MetaMarker.SEPARATOR) {
+ switch (marker) {
+ case MetaMarker.CHUNK_HEADER:
+ case MetaMarker.TIME_CHUNK_HEADER:
+ case MetaMarker.VALUE_CHUNK_HEADER:
+ case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
+ case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
+ case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
+ fileOffsetOfChunk = this.position() - 1;
+ if (fileOffsetOfChunk == 350064120) {
+ tsFileInput.position(350249561);
+ break;
+ }
+ // if there is something wrong with a chunk, we will drop the whole ChunkGroup
+ // as different chunks may be created by the same insertions(sqls), and partial
+ // insertion is not tolerable
+ ChunkHeader chunkHeader = this.readChunkHeader(marker);
+ measurementID = chunkHeader.getMeasurementID();
+ IMeasurementSchema measurementSchema =
+ new MeasurementSchema(
+ measurementID,
+ chunkHeader.getDataType(),
+ chunkHeader.getEncodingType(),
+ chunkHeader.getCompressionType());
+ measurementSchemaList.add(measurementSchema);
+ dataType = chunkHeader.getDataType();
+
+ Statistics<? extends Serializable> chunkStatistics =
+ Statistics.getStatsByType(dataType);
+ int dataSize = chunkHeader.getDataSize();
+
+ if (dataSize > 0) {
+ if (marker == MetaMarker.TIME_CHUNK_HEADER) {
+ timeBatch.add(null);
+ }
+ if (((byte) (chunkHeader.getChunkType() & 0x3F))
+ == MetaMarker
+ .CHUNK_HEADER) { // more than one page, we could use page statistics to
+ if (marker == MetaMarker.VALUE_CHUNK_HEADER) {
+ int timeBatchIndex =
+ valueColumn2TimeBatchIndex.getOrDefault(chunkHeader.getMeasurementID(), 0);
+ valueColumn2TimeBatchIndex.put(
+ chunkHeader.getMeasurementID(), timeBatchIndex + 1);
+ }
+ // generate chunk statistic
+ while (dataSize > 0) {
+ // a new Page
+ PageHeader pageHeader = this.readPageHeader(chunkHeader.getDataType(), true);
+ if (pageHeader.getUncompressedSize() != 0) {
+ // not empty page
+ chunkStatistics.mergeStatistics(pageHeader.getStatistics());
+ }
+ this.skipPageData(pageHeader);
+ dataSize -= pageHeader.getSerializedPageSize();
+ chunkHeader.increasePageNums(1);
+ }
+ } else { // only one page without statistic, we need to iterate each point to generate
+ // chunk statistic
+ PageHeader pageHeader = this.readPageHeader(chunkHeader.getDataType(), false);
+ Decoder valueDecoder =
+ Decoder.getDecoderByType(
+ chunkHeader.getEncodingType(), chunkHeader.getDataType());
+ ByteBuffer pageData = readPage(pageHeader, chunkHeader.getCompressionType());
+ Decoder timeDecoder =
+ Decoder.getDecoderByType(
+ TSEncoding.valueOf(
+ TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+ TSDataType.INT64);
+
+ if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
+ == TsFileConstant.TIME_COLUMN_MASK) { // Time Chunk with only one page
+
+ TimePageReader timePageReader =
+ new TimePageReader(pageHeader, pageData, timeDecoder);
+ long[] currentTimeBatch = timePageReader.getNextTimeBatch();
+ timeBatch.add(currentTimeBatch);
+ for (long currentTime : currentTimeBatch) {
+ chunkStatistics.update(currentTime);
+ }
+ } else if ((chunkHeader.getChunkType() & TsFileConstant.VALUE_COLUMN_MASK)
+ == TsFileConstant.VALUE_COLUMN_MASK) { // Value Chunk with only one page
+
+ ValuePageReader valuePageReader =
+ new ValuePageReader(
+ pageHeader, pageData, chunkHeader.getDataType(), valueDecoder);
+ int timeBatchIndex =
+ valueColumn2TimeBatchIndex.getOrDefault(chunkHeader.getMeasurementID(), 0);
+ valueColumn2TimeBatchIndex.put(
+ chunkHeader.getMeasurementID(), timeBatchIndex + 1);
+ TsPrimitiveType[] valueBatch =
+ valuePageReader.nextValueBatch(timeBatch.get(timeBatchIndex));
+
+ if (valueBatch != null && valueBatch.length != 0) {
+ for (int i = 0; i < valueBatch.length; i++) {
+ TsPrimitiveType value = valueBatch[i];
+ if (value == null) {
+ continue;
+ }
+ long timeStamp = timeBatch.get(timeBatchIndex)[i];
+ switch (dataType) {
+ case INT32:
+ case DATE:
+ chunkStatistics.update(timeStamp, value.getInt());
+ break;
+ case INT64:
+ case TIMESTAMP:
+ chunkStatistics.update(timeStamp, value.getLong());
+ break;
+ case FLOAT:
+ chunkStatistics.update(timeStamp, value.getFloat());
+ break;
+ case DOUBLE:
+ chunkStatistics.update(timeStamp, value.getDouble());
+ break;
+ case BOOLEAN:
+ chunkStatistics.update(timeStamp, value.getBoolean());
+ break;
+ case TEXT:
+ case BLOB:
+ case STRING:
+ chunkStatistics.update(timeStamp, value.getBinary());
+ break;
+ default:
+ throw new IOException("Unexpected type " + dataType);
+ }
+ }
+ }
+
+ } else { // NonAligned Chunk with only one page
+ PageReader reader =
+ new PageReader(
+ pageHeader,
+ pageData,
+ chunkHeader.getDataType(),
+ valueDecoder,
+ timeDecoder);
+ BatchData batchData = reader.getAllSatisfiedPageData();
+ while (batchData.hasCurrent()) {
+ switch (dataType) {
+ case INT32:
+ case DATE:
+ chunkStatistics.update(batchData.currentTime(), batchData.getInt());
+ break;
+ case INT64:
+ case TIMESTAMP:
+ chunkStatistics.update(batchData.currentTime(), batchData.getLong());
+ break;
+ case FLOAT:
+ chunkStatistics.update(batchData.currentTime(), batchData.getFloat());
+ break;
+ case DOUBLE:
+ chunkStatistics.update(batchData.currentTime(), batchData.getDouble());
+ break;
+ case BOOLEAN:
+ chunkStatistics.update(batchData.currentTime(), batchData.getBoolean());
+ break;
+ case TEXT:
+ case BLOB:
+ case STRING:
+ chunkStatistics.update(batchData.currentTime(), batchData.getBinary());
+ break;
+ default:
+ throw new IOException("Unexpected type " + dataType);
+ }
+ batchData.next();
+ }
+ }
+ chunkHeader.increasePageNums(1);
+ }
+ } else if (marker == MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER
+ || marker == MetaMarker.VALUE_CHUNK_HEADER) {
+ int timeBatchIndex =
+ valueColumn2TimeBatchIndex.getOrDefault(chunkHeader.getMeasurementID(), 0);
+ valueColumn2TimeBatchIndex.put(chunkHeader.getMeasurementID(), timeBatchIndex + 1);
+ }
+ currentChunk =
+ new ChunkMetadata(measurementID, dataType, fileOffsetOfChunk, chunkStatistics);
+ chunkMetadataList.add(currentChunk);
+ break;
+ case MetaMarker.CHUNK_GROUP_HEADER:
+ // if there is something wrong with the ChunkGroup Header, we will drop this ChunkGroup
+ // because we can not guarantee the correctness of the deviceId.
+ truncatedSize = this.position() - 1;
+ if (lastDeviceId != null) {
+ // schema of last chunk group
+ if (newSchema != null) {
+ for (IMeasurementSchema tsSchema : measurementSchemaList) {
+ newSchema.putIfAbsent(
+ new Path(lastDeviceId, tsSchema.getMeasurementId(), true), tsSchema);
+ }
+ }
+ measurementSchemaList = new ArrayList<>();
+ // last chunk group Metadata
+ chunkGroupMetadataList.add(new ChunkGroupMetadata(lastDeviceId, chunkMetadataList));
+ }
+ // this is a chunk group
+ chunkMetadataList = new ArrayList<>();
+ ChunkGroupHeader chunkGroupHeader = this.readChunkGroupHeader();
+ lastDeviceId = chunkGroupHeader.getDeviceID();
+ timeBatch.clear();
+ valueColumn2TimeBatchIndex.clear();
+ break;
+ case MetaMarker.OPERATION_INDEX_RANGE:
+ truncatedSize = this.position() - 1;
+ if (lastDeviceId != null) {
+ // schema of last chunk group
+ if (newSchema != null) {
+ for (IMeasurementSchema tsSchema : measurementSchemaList) {
+ newSchema.putIfAbsent(
+ new Path(lastDeviceId, tsSchema.getMeasurementId(), true), tsSchema);
+ }
+ }
+ measurementSchemaList = new ArrayList<>();
+ // last chunk group Metadata
+ chunkGroupMetadataList.add(new ChunkGroupMetadata(lastDeviceId, chunkMetadataList));
+ lastDeviceId = null;
+ }
+ readPlanIndex();
+ truncatedSize = this.position();
+ break;
+ default:
+ // the disk file is corrupted, using this file may be dangerous
+ throw new IOException("Unexpected marker " + marker);
+ }
+ }
+ // now we read the tail of the data section, so we are sure that the last
+ // ChunkGroupFooter is complete.
+ if (lastDeviceId != null) {
+ // schema of last chunk group
+ if (newSchema != null) {
+ for (IMeasurementSchema tsSchema : measurementSchemaList) {
+ newSchema.putIfAbsent(
+ new Path(lastDeviceId, tsSchema.getMeasurementId(), true), tsSchema);
+ }
+ }
+ // last chunk group Metadata
+ chunkGroupMetadataList.add(new ChunkGroupMetadata(lastDeviceId, chunkMetadataList));
+ }
+ if (isComplete) {
+ truncatedSize = TsFileCheckStatus.COMPLETE_FILE;
+ } else {
+ truncatedSize = this.position() - 1;
+ }
+ } catch (Exception e) {
+ LOGGER.warn(
+ "TsFile {} self-check cannot proceed at position {} " + "recovered, because : {}",
+ file,
+ this.position(),
+ e.getMessage());
+ }
+ // Despite the completeness of the data section, we will discard current FileMetadata
+ // so that we can continue to write data into this tsfile.
+ return truncatedSize;
+ }
+ };
StringBuilder str1 = new StringBuilder();
for (int i = 0; i < 21; i++) {
str1.append("|");
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/TsFileSplitByPartitionTool.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/TsFileSplitByPartitionTool.java
index 62c4ba3..a3b1396 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/TsFileSplitByPartitionTool.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/TsFileSplitByPartitionTool.java
@@ -106,7 +106,8 @@
reader = new TsFileSequenceReader(file);
partitionWriterMap = new HashMap<>();
if (FSFactoryProducer.getFSFactory().getFile(file + ModificationFileV1.FILE_SUFFIX).exists()) {
- oldModification = (List<Modification>) resourceToBeRewritten.getOldModFile().getModifications();
+ oldModification =
+ (List<Modification>) resourceToBeRewritten.getOldModFile().getModifications();
modsIterator = oldModification.iterator();
}
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/IOUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/IOUtils.java
index 73aa256..df51f21 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/IOUtils.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/IOUtils.java
@@ -8,11 +8,13 @@
public class IOUtils {
public interface BufferSerializable {
void serialize(ByteBuffer buffer);
+
void deserialize(ByteBuffer buffer);
}
public interface StreamSerializable {
void serialize(OutputStream stream) throws IOException;
+
void deserialize(InputStream stream) throws IOException;
}
}