support ReadPointCompactionPerformer with sevo
diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java
index b9241b6..5a754c6 100644
--- a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.relational.it.db.it;
-import java.sql.SQLException;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.ColumnRename;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolutionFile;
@@ -50,6 +49,7 @@
import java.nio.file.Files;
import java.sql.Connection;
import java.sql.ResultSet;
+import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
@@ -328,7 +328,8 @@
// cannot query using INT322INT32
try (final ResultSet resultSet =
- statement.executeQuery(String.format("select count(%s) from %s", "INT322INT32", SchemaConfig.TABLE_1))) {
+ statement.executeQuery(
+ String.format("select count(%s) from %s", "INT322INT32", SchemaConfig.TABLE_1))) {
fail();
} catch (SQLException e) {
assertEquals("616: Column 'int322int32' cannot be resolved", e.getMessage());
@@ -336,7 +337,8 @@
// can query with INT322INT32_NEW
try (final ResultSet resultSet =
- statement.executeQuery(String.format("select count(%s) from %s", "INT322INT32_NEW", SchemaConfig.TABLE_1))) {
+ statement.executeQuery(
+ String.format("select count(%s) from %s", "INT322INT32_NEW", SchemaConfig.TABLE_1))) {
if (resultSet.next()) {
Assert.assertEquals(lineCount, resultSet.getLong(1));
} else {
diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
index 8844f36..ea7c7c8 100644
--- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -3074,10 +3074,9 @@
this.columnEncodersMap
.getOrDefault(
measurementSchema.getType(),
- TSEncoding.valueOf(
- TSFileDescriptor.getInstance()
- .getConfig()
- .getValueEncoder(measurementSchema.getType())))
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .getValueEncoder(measurementSchema.getType()))
.serialize());
}
} else {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java
index f7ddaee..c9d7dfe 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java
@@ -67,9 +67,29 @@
Ordering scanOrder,
SeriesScanOptions scanOptions,
FragmentInstanceContext context,
+ long maxTsFileSetEndVersion) {
+ this(seriesPath, scanOrder, scanOptions, context, false, null, maxTsFileSetEndVersion);
+ }
+
+ public AlignedSeriesScanUtil(
+ AlignedFullPath seriesPath,
+ Ordering scanOrder,
+ SeriesScanOptions scanOptions,
+ FragmentInstanceContext context,
boolean queryAllSensors,
List<TSDataType> givenDataTypes) {
- super(seriesPath, scanOrder, scanOptions, context);
+ this(seriesPath, scanOrder, scanOptions, context, queryAllSensors, givenDataTypes, Long.MAX_VALUE);
+ }
+
+ public AlignedSeriesScanUtil(
+ AlignedFullPath seriesPath,
+ Ordering scanOrder,
+ SeriesScanOptions scanOptions,
+ FragmentInstanceContext context,
+ boolean queryAllSensors,
+ List<TSDataType> givenDataTypes,
+ long maxTsFileSetEndVersion) {
+ super(seriesPath, scanOrder, scanOptions, context, maxTsFileSetEndVersion);
isAligned = true;
this.dataTypes =
givenDataTypes != null
@@ -100,7 +120,7 @@
context,
scanOptions.getGlobalTimeFilter(),
isSeq,
- ignoreAllNullRows);
+ ignoreAllNullRows, maxTsFileSetEndVersion);
}
@Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java
index da8fd35..ffaeb60 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.queryengine.execution.operator.source;
+import java.util.stream.Collectors;
import org.apache.iotdb.commons.path.AlignedFullPath;
import org.apache.iotdb.commons.path.NonAlignedFullPath;
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
@@ -55,6 +56,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Set;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
import static com.google.common.base.Preconditions.checkArgument;
@@ -83,7 +85,8 @@
FragmentInstanceContext context,
Filter globalTimeFilter,
Set<String> allSensors,
- boolean isSeq)
+ boolean isSeq,
+ long maxTsFileSetEndVersion)
throws IOException {
long t1 = System.nanoTime();
boolean loadFromMem = false;
@@ -94,9 +97,13 @@
if (resource.isClosed()) {
// when resource.getTimeIndexType() == 1, TsFileResource.timeIndexType is deviceTimeIndex
// we should not ignore the non-exist of device in TsFileMetadata
- EvolvedSchema evolvedSchema = resource.getMergedEvolvedSchema();
+ EvolvedSchema evolvedSchema = resource.getMergedEvolvedSchema(maxTsFileSetEndVersion);
IDeviceID deviceId = seriesPath.getDeviceId();
String measurement = seriesPath.getMeasurement();
+ if (evolvedSchema != null) {
+ measurement = evolvedSchema.getOriginalColumnName(deviceId.getTableName(), measurement);
+ deviceId = evolvedSchema.rewriteToOriginal(deviceId);
+ }
timeSeriesMetadata =
TimeSeriesMetadataCache.getInstance()
@@ -186,7 +193,8 @@
FragmentInstanceContext context,
Filter globalTimeFilter,
boolean isSeq,
- boolean ignoreAllNullRows)
+ boolean ignoreAllNullRows,
+ long maxTsFileSetEndVersion)
throws IOException {
final long t1 = System.nanoTime();
boolean loadFromMem = false;
@@ -196,7 +204,7 @@
if (resource.isClosed()) {
alignedTimeSeriesMetadata =
loadAlignedTimeSeriesMetadataFromDisk(
- resource, alignedPath, context, globalTimeFilter, ignoreAllNullRows);
+ resource, alignedPath, context, globalTimeFilter, ignoreAllNullRows, maxTsFileSetEndVersion);
} else { // if the tsfile is unclosed, we just get it directly from TsFileResource
loadFromMem = true;
alignedTimeSeriesMetadata =
@@ -262,7 +270,8 @@
AlignedFullPath alignedPath,
FragmentInstanceContext context,
Filter globalTimeFilter,
- boolean ignoreAllNullRows)
+ boolean ignoreAllNullRows,
+ long maxTsFileSetEndVersion)
throws IOException {
AbstractAlignedTimeSeriesMetadata alignedTimeSeriesMetadata = null;
// load all the TimeseriesMetadata of vector, the first one is for time column and the
@@ -275,6 +284,15 @@
String filePath = resource.getTsFilePath();
IDeviceID deviceId = alignedPath.getDeviceId();
+ EvolvedSchema evolvedSchema = resource.getMergedEvolvedSchema(maxTsFileSetEndVersion);
+ if (evolvedSchema != null) {
+ IDeviceID finalDeviceId = deviceId;
+ valueMeasurementList = valueMeasurementList.stream().map(m -> evolvedSchema.getOriginalColumnName(finalDeviceId.getTableName(), m)).collect(
+ Collectors.toList());
+ allSensors = allSensors.stream().map(m -> evolvedSchema.getOriginalColumnName(finalDeviceId.getTableName(), m)).collect(Collectors.toSet());
+ deviceId = evolvedSchema.rewriteToOriginal(deviceId);
+ }
+
// when resource.getTimeIndexType() == 1, TsFileResource.timeIndexType is deviceTimeIndex
// we should not ignore the non-exist of device in TsFileMetadata
TimeseriesMetadata timeColumn =
@@ -296,7 +314,7 @@
resource,
timeColumn,
Collections.emptyList(),
- alignedPath,
+ deviceId,
context,
globalTimeFilter,
false);
@@ -325,7 +343,7 @@
resource,
timeColumn,
valueTimeSeriesMetadataList,
- alignedPath,
+ deviceId,
context,
globalTimeFilter,
ignoreAllNullRows);
@@ -339,7 +357,7 @@
TsFileResource resource,
TimeseriesMetadata timeColumnMetadata,
List<TimeseriesMetadata> valueColumnMetadataList,
- AlignedFullPath alignedPath,
+ IDeviceID deviceID,
QueryContext context,
Filter globalTimeFilter,
boolean ignoreAllNullRows) {
@@ -348,7 +366,7 @@
// deal with time column
List<ModEntry> timeModifications =
context.getPathModifications(
- resource, alignedPath.getDeviceId(), timeColumnMetadata.getMeasurementId());
+ resource, deviceID, timeColumnMetadata.getMeasurementId());
// all rows are deleted, just return null to skip device data in this file
if (ModificationUtils.isAllDeletedByMods(
timeModifications,
@@ -371,7 +389,7 @@
if (valueColumnMetadata != null) {
List<ModEntry> modifications =
context.getPathModifications(
- resource, alignedPath.getDeviceId(), valueColumnMetadata.getMeasurementId());
+ resource, deviceID, valueColumnMetadata.getMeasurementId());
valueColumnMetadata.setModified(!modifications.isEmpty());
valueColumnsModifications.add(modifications);
modified = (modified || !modifications.isEmpty());
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
index 233d24b..b232b13 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
@@ -130,6 +130,9 @@
protected final int MAX_NUMBER_OF_POINTS_IN_PAGE =
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+ // to restrict the scope of sevo files for compaction
+ protected final long maxTsFileSetEndVersion;
+
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(SeriesScanUtil.class)
+ RamUsageEstimator.shallowSizeOfInstance(IDeviceID.class)
@@ -145,6 +148,15 @@
Ordering scanOrder,
SeriesScanOptions scanOptions,
FragmentInstanceContext context) {
+ this(seriesPath, scanOrder, scanOptions, context, Long.MAX_VALUE);
+ }
+
+ public SeriesScanUtil(
+ IFullPath seriesPath,
+ Ordering scanOrder,
+ SeriesScanOptions scanOptions,
+ FragmentInstanceContext context,
+ long maxTsFileSetEndVersion) {
this.seriesPath = seriesPath;
this.deviceID = seriesPath.getDeviceId();
this.dataType = seriesPath.getSeriesType();
@@ -182,6 +194,8 @@
new PriorityQueue<>(
orderUtils.comparingLong(
versionPageReader -> orderUtils.getOrderTime(versionPageReader.getStatistics())));
+
+ this.maxTsFileSetEndVersion = maxTsFileSetEndVersion;
}
/**
@@ -190,7 +204,7 @@
* @param dataSource the query data source
*/
public void initQueryDataSource(QueryDataSource dataSource) {
- dataSource.fillOrderIndexes(deviceID, orderUtils.getAscending());
+ dataSource.fillOrderIndexes(deviceID, orderUtils.getAscending(), maxTsFileSetEndVersion);
this.dataSource = dataSource;
// updated filter concerning TTL
@@ -1339,7 +1353,8 @@
context,
scanOptions.getGlobalTimeFilter(),
scanOptions.getAllSensors(),
- isSeq);
+ isSeq,
+ maxTsFileSetEndVersion);
}
public List<TSDataType> getTsDataTypeList() {
@@ -1753,26 +1768,26 @@
@Override
public boolean hasNextSeqResource() {
- while (dataSource.hasNextSeqResource(curSeqFileIndex, false, deviceID)) {
+ while (dataSource.hasNextSeqResource(curSeqFileIndex, false, deviceID, maxTsFileSetEndVersion)) {
if (dataSource.isSeqSatisfied(
- deviceID, curSeqFileIndex, scanOptions.getGlobalTimeFilter(), false)) {
+ deviceID, curSeqFileIndex, scanOptions.getGlobalTimeFilter(), false, maxTsFileSetEndVersion)) {
break;
}
curSeqFileIndex--;
}
- return dataSource.hasNextSeqResource(curSeqFileIndex, false, deviceID);
+ return dataSource.hasNextSeqResource(curSeqFileIndex, false, deviceID, maxTsFileSetEndVersion);
}
@Override
public boolean hasNextUnseqResource() {
- while (dataSource.hasNextUnseqResource(curUnseqFileIndex, false, deviceID)) {
+ while (dataSource.hasNextUnseqResource(curUnseqFileIndex, false, deviceID, maxTsFileSetEndVersion)) {
if (dataSource.isUnSeqSatisfied(
deviceID, curUnseqFileIndex, scanOptions.getGlobalTimeFilter(), false)) {
break;
}
curUnseqFileIndex++;
}
- return dataSource.hasNextUnseqResource(curUnseqFileIndex, false, deviceID);
+ return dataSource.hasNextUnseqResource(curUnseqFileIndex, false, deviceID, maxTsFileSetEndVersion);
}
@Override
@@ -1882,26 +1897,26 @@
@Override
public boolean hasNextSeqResource() {
- while (dataSource.hasNextSeqResource(curSeqFileIndex, true, deviceID)) {
+ while (dataSource.hasNextSeqResource(curSeqFileIndex, true, deviceID, maxTsFileSetEndVersion)) {
if (dataSource.isSeqSatisfied(
- deviceID, curSeqFileIndex, scanOptions.getGlobalTimeFilter(), false)) {
+ deviceID, curSeqFileIndex, scanOptions.getGlobalTimeFilter(), false, maxTsFileSetEndVersion)) {
break;
}
curSeqFileIndex++;
}
- return dataSource.hasNextSeqResource(curSeqFileIndex, true, deviceID);
+ return dataSource.hasNextSeqResource(curSeqFileIndex, true, deviceID, maxTsFileSetEndVersion);
}
@Override
public boolean hasNextUnseqResource() {
- while (dataSource.hasNextUnseqResource(curUnseqFileIndex, true, deviceID)) {
+ while (dataSource.hasNextUnseqResource(curUnseqFileIndex, true, deviceID, maxTsFileSetEndVersion)) {
if (dataSource.isUnSeqSatisfied(
deviceID, curUnseqFileIndex, scanOptions.getGlobalTimeFilter(), false)) {
break;
}
curUnseqFileIndex++;
}
- return dataSource.hasNextUnseqResource(curUnseqFileIndex, true, deviceID);
+ return dataSource.hasNextUnseqResource(curUnseqFileIndex, true, deviceID, maxTsFileSetEndVersion);
}
@Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index e643505..201007d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -685,7 +685,8 @@
partitionFiles.getValue(),
fileTimeIndexMap,
false,
- recoveredPartitionTsFileSetMap, partitionMinimalVersion);
+ recoveredPartitionTsFileSetMap,
+ partitionMinimalVersion);
if (asyncRecoverTask != null) {
asyncTsFileResourceRecoverTaskList.add(asyncRecoverTask);
}
@@ -705,7 +706,8 @@
for (Entry<Long, List<TsFileSet>> entry : recoveredPartitionTsFileSetMap.entrySet()) {
long partitionId = entry.getKey();
// if no file in the partition, all filesets should be cleared
- long minimumFileVersion = partitionMinimalVersion.getOrDefault(partitionId, Long.MAX_VALUE);
+ long minimumFileVersion =
+ partitionMinimalVersion.getOrDefault(partitionId, Long.MAX_VALUE);
for (TsFileSet tsFileSet : entry.getValue()) {
if (tsFileSet.getEndVersion() < minimumFileVersion) {
tsFileSet.remove();
@@ -1056,6 +1058,7 @@
tsFileSet =
new TsFileSet(
Long.parseLong(fileSet.getName()), fileSetDir.getAbsolutePath(), true);
+ tsFileManager.addTsFileSet(tsFileSet, partitionId);
} catch (NumberFormatException e) {
continue;
}
@@ -1077,30 +1080,23 @@
List<TsFileResource> resourceList,
Map<TsFileID, FileTimeIndex> fileTimeIndexMap,
boolean isSeq,
- Map<Long, List<TsFileSet>> partitionTsFileSetMap, Map<Long, Long> partitionMinimalVersion) {
+ Map<Long, List<TsFileSet>> partitionTsFileSetMap,
+ Map<Long, Long> partitionMinimalVersion) {
List<TsFileResource> resourceListForAsyncRecover = new ArrayList<>();
List<TsFileResource> resourceListForSyncRecover = new ArrayList<>();
Callable<Void> asyncRecoverTask = null;
- List<TsFileSet> tsFileSets = recoverTsFileSets(partitionId, partitionTsFileSetMap);
+ recoverTsFileSets(partitionId, partitionTsFileSetMap);
for (TsFileResource tsFileResource : resourceList) {
long fileVersion = tsFileResource.getTsFileID().fileVersion;
- partitionMinimalVersion.compute(partitionId, (pid, oldVersion) -> {
- if (oldVersion == null) {
- return fileVersion;
- }
- return Math.min(oldVersion, fileVersion);
- });
-
- int i = Collections.binarySearch(tsFileSets, TsFileSet.comparatorKey(fileVersion));
- if (i < 0) {
- // if the binary search does not find an exact match, -i indicates the closest one
- i = -i;
- }
- if (i < tsFileSets.size()) {
- List<TsFileSet> containedSets = tsFileSets.subList(i, tsFileSets.size());
- containedSets.forEach(tsFileResource::addFileSet);
- }
+ partitionMinimalVersion.compute(
+ partitionId,
+ (pid, oldVersion) -> {
+ if (oldVersion == null) {
+ return fileVersion;
+ }
+ return Math.min(oldVersion, fileVersion);
+ });
tsFileManager.add(tsFileResource, isSeq);
if (fileTimeIndexMap.containsKey(tsFileResource.getTsFileID())
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/constant/InnerSeqCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/constant/InnerSeqCompactionPerformer.java
index 0404ec6..f64597e 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/constant/InnerSeqCompactionPerformer.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/constant/InnerSeqCompactionPerformer.java
@@ -25,9 +25,7 @@
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer;
-import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.tsfile.encrypt.EncryptParameter;
-import org.apache.tsfile.utils.Pair;
public enum InnerSeqCompactionPerformer {
READ_CHUNK,
@@ -56,12 +54,12 @@
}
}
- public ISeqCompactionPerformer createInstance(EncryptParameter encryptParameter, Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) {
+ public ISeqCompactionPerformer createInstance(EncryptParameter encryptParameter) {
switch (this) {
case READ_CHUNK:
- return new ReadChunkCompactionPerformer(encryptParameter, maxTsFileSetEndVersionAndMinResource);
+ return new ReadChunkCompactionPerformer(encryptParameter);
case FAST:
- return new FastCompactionPerformer(false, encryptParameter, maxTsFileSetEndVersionAndMinResource);
+ return new FastCompactionPerformer(false, encryptParameter);
default:
throw new IllegalCompactionPerformerException(
"Illegal compaction performer for seq inner compaction " + this);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java
index a45d0de..568db708 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java
@@ -19,8 +19,6 @@
package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PatternTreeMap;
@@ -74,6 +72,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
public class FastCompactionPerformer
implements ICrossCompactionPerformer, ISeqCompactionPerformer, IUnseqCompactionPerformer {
@@ -108,7 +108,7 @@
public FastCompactionPerformer(
List<TsFileResource> seqFiles,
List<TsFileResource> unseqFiles,
- List<TsFileResource> targetFiles, Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) {
+ List<TsFileResource> targetFiles) {
this.seqFiles = seqFiles;
this.unseqFiles = unseqFiles;
this.targetFiles = targetFiles;
@@ -122,14 +122,14 @@
new EncryptParameter(
TSFileDescriptor.getInstance().getConfig().getEncryptType(),
TSFileDescriptor.getInstance().getConfig().getEncryptKey());
- this.maxTsFileSetEndVersionAndMinResource = maxTsFileSetEndVersionAndMinResource;
+ this.maxTsFileSetEndVersionAndMinResource = new Pair<>(Long.MIN_VALUE, null);
}
public FastCompactionPerformer(
List<TsFileResource> seqFiles,
List<TsFileResource> unseqFiles,
List<TsFileResource> targetFiles,
- EncryptParameter encryptParameter, Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) {
+ EncryptParameter encryptParameter) {
this.seqFiles = seqFiles;
this.unseqFiles = unseqFiles;
this.targetFiles = targetFiles;
@@ -140,7 +140,9 @@
isCrossCompaction = true;
}
this.encryptParameter = encryptParameter;
- this.maxTsFileSetEndVersionAndMinResource = maxTsFileSetEndVersionAndMinResource;
+ this.maxTsFileSetEndVersionAndMinResource =
+ TsFileResource.getMaxTsFileSetEndVersionAndMinResource(
+ Stream.concat(seqFiles.stream(), unseqFiles.stream()).collect(Collectors.toList()));
}
@TestOnly
@@ -162,22 +164,30 @@
@Override
public void perform() throws Exception {
this.subTaskSummary.setTemporalFileNum(targetFiles.size());
+ List<TsFileResource> allSourceFiles =
+ Stream.concat(seqFiles.stream(), unseqFiles.stream())
+ .sorted(TsFileResource::compareFileName)
+ .collect(Collectors.toList());
+ Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource =
+ TsFileResource.getMaxTsFileSetEndVersionAndMinResource(allSourceFiles);
+
try (MultiTsFileDeviceIterator deviceIterator =
new MultiTsFileDeviceIterator(seqFiles, unseqFiles, readerCacheMap);
AbstractCompactionWriter compactionWriter =
isCrossCompaction
? new FastCrossCompactionWriter(
- targetFiles, seqFiles, readerCacheMap, encryptParameter)
+ targetFiles,
+ seqFiles,
+ readerCacheMap,
+ encryptParameter,
+ maxTsFileSetEndVersionAndMinResource.left)
: new FastInnerCompactionWriter(targetFiles, encryptParameter)) {
-
- List<TsFileResource> allSourceFiles = Stream.concat(seqFiles.stream(), unseqFiles.stream())
- .sorted(TsFileResource::compareFileName)
- .collect(Collectors.toList());
- Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource = TsFileResource.getMaxTsFileSetEndVersionAndMinResource(
- allSourceFiles);
List<Schema> schemas =
CompactionTableSchemaCollector.collectSchema(
- seqFiles, unseqFiles, readerCacheMap, deviceIterator.getDeprecatedTableSchemaMap(),
+ seqFiles,
+ unseqFiles,
+ readerCacheMap,
+ deviceIterator.getDeprecatedTableSchemaMap(),
maxTsFileSetEndVersionAndMinResource);
compactionWriter.setSchemaForAllTargetFile(schemas, maxTsFileSetEndVersionAndMinResource);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
index 4280c13..904b175 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
@@ -71,7 +71,7 @@
private Schema schema = null;
private final EncryptParameter firstEncryptParameter;
- protected final Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource;
+ protected Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource;
@TestOnly
public ReadChunkCompactionPerformer(List<TsFileResource> sourceFiles, TsFileResource targetFile) {
@@ -81,9 +81,8 @@
public ReadChunkCompactionPerformer(
List<TsFileResource> sourceFiles,
TsFileResource targetFile,
- EncryptParameter encryptParameter,
- Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) {
- this(sourceFiles, Collections.singletonList(targetFile), encryptParameter, maxTsFileSetEndVersionAndMinResource);
+ EncryptParameter encryptParameter) {
+ this(sourceFiles, Collections.singletonList(targetFile), encryptParameter);
}
@TestOnly
@@ -98,12 +97,12 @@
public ReadChunkCompactionPerformer(
List<TsFileResource> sourceFiles,
List<TsFileResource> targetFiles,
- EncryptParameter encryptParameter,
- Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) {
+ EncryptParameter encryptParameter) {
setSourceFiles(sourceFiles);
setTargetFiles(targetFiles);
this.firstEncryptParameter = encryptParameter;
- this.maxTsFileSetEndVersionAndMinResource = maxTsFileSetEndVersionAndMinResource;
+ this.maxTsFileSetEndVersionAndMinResource =
+ TsFileResource.getMaxTsFileSetEndVersionAndMinResource(sourceFiles);
}
@TestOnly
@@ -114,10 +113,11 @@
}
public ReadChunkCompactionPerformer(
- List<TsFileResource> sourceFiles, EncryptParameter encryptParameter, Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) {
+ List<TsFileResource> sourceFiles, EncryptParameter encryptParameter) {
setSourceFiles(sourceFiles);
this.firstEncryptParameter = encryptParameter;
- this.maxTsFileSetEndVersionAndMinResource = maxTsFileSetEndVersionAndMinResource;
+ this.maxTsFileSetEndVersionAndMinResource =
+ TsFileResource.getMaxTsFileSetEndVersionAndMinResource(sourceFiles);
}
@TestOnly
@@ -129,9 +129,8 @@
this.maxTsFileSetEndVersionAndMinResource = new Pair<>(Long.MIN_VALUE, null);
}
- public ReadChunkCompactionPerformer(EncryptParameter encryptParameter, Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) {
+ public ReadChunkCompactionPerformer(EncryptParameter encryptParameter) {
this.firstEncryptParameter = encryptParameter;
- this.maxTsFileSetEndVersionAndMinResource = maxTsFileSetEndVersionAndMinResource;
}
@Override
@@ -218,7 +217,8 @@
targetResources.get(currentTargetFileIndex),
memoryBudgetForFileWriter,
CompactionType.INNER_SEQ_COMPACTION,
- firstEncryptParameter, maxTsFileSetEndVersionAndMinResource.getLeft());
+ firstEncryptParameter,
+ maxTsFileSetEndVersionAndMinResource.getLeft());
currentWriter.setSchema(CompactionTableSchemaCollector.copySchema(schema));
}
@@ -357,6 +357,8 @@
@Override
public void setSourceFiles(List<TsFileResource> seqFiles) {
this.seqFiles = seqFiles;
+ this.maxTsFileSetEndVersionAndMinResource =
+ TsFileResource.getMaxTsFileSetEndVersionAndMinResource(seqFiles);
}
@Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
index 9b18cee..b7c1438 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
@@ -18,9 +18,7 @@
*/
package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl;
-import java.util.stream.Stream;
import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.AlignedFullPath;
import org.apache.iotdb.commons.path.IFullPath;
import org.apache.iotdb.commons.path.NonAlignedFullPath;
@@ -72,6 +70,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
public class ReadPointCompactionPerformer
implements ICrossCompactionPerformer, IUnseqCompactionPerformer {
@@ -154,11 +153,12 @@
// Do not close device iterator, because tsfile reader is managed by FileReaderManager.
MultiTsFileDeviceIterator deviceIterator =
new MultiTsFileDeviceIterator(seqFiles, unseqFiles);
- List<TsFileResource> allSourceFiles = Stream.concat(seqFiles.stream(), unseqFiles.stream())
- .sorted(TsFileResource::compareFileName)
- .collect(Collectors.toList());
- Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource = TsFileResource.getMaxTsFileSetEndVersionAndMinResource(
- allSourceFiles);
+ List<TsFileResource> allSourceFiles =
+ Stream.concat(seqFiles.stream(), unseqFiles.stream())
+ .sorted(TsFileResource::compareFileName)
+ .collect(Collectors.toList());
+ Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource =
+ TsFileResource.getMaxTsFileSetEndVersionAndMinResource(allSourceFiles);
List<Schema> schemas =
CompactionTableSchemaCollector.collectSchema(
@@ -174,14 +174,24 @@
Pair<IDeviceID, Boolean> deviceInfo = deviceIterator.nextDevice();
IDeviceID device = deviceInfo.left;
boolean isAligned = deviceInfo.right;
- queryDataSource.fillOrderIndexes(device, true);
+ queryDataSource.fillOrderIndexes(device, true, maxTsFileSetEndVersionAndMinResource.left);
if (isAligned) {
compactAlignedSeries(
- device, deviceIterator, compactionWriter, fragmentInstanceContext, queryDataSource, maxTsFileSetEndVersionAndMinResource);
+ device,
+ deviceIterator,
+ compactionWriter,
+ fragmentInstanceContext,
+ queryDataSource,
+ maxTsFileSetEndVersionAndMinResource);
} else {
compactNonAlignedSeries(
- device, deviceIterator, compactionWriter, fragmentInstanceContext, queryDataSource, maxTsFileSetEndVersionAndMinResource);
+ device,
+ deviceIterator,
+ compactionWriter,
+ fragmentInstanceContext,
+ queryDataSource,
+ maxTsFileSetEndVersionAndMinResource);
}
summary.setTemporaryFileSize(compactionWriter.getWriterSize());
}
@@ -219,8 +229,9 @@
FragmentInstanceContext fragmentInstanceContext,
QueryDataSource queryDataSource,
Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource)
- throws IOException, MetadataException {
- Map<String, MeasurementSchema> schemaMap = deviceIterator.getAllSchemasOfCurrentDevice(maxTsFileSetEndVersionAndMinResource);
+ throws IOException {
+ Map<String, MeasurementSchema> schemaMap =
+ deviceIterator.getAllSchemasOfCurrentDevice(maxTsFileSetEndVersionAndMinResource);
IMeasurementSchema timeSchema = schemaMap.remove(TsFileConstant.TIME_COLUMN_ID);
List<IMeasurementSchema> measurementSchemas = new ArrayList<>(schemaMap.values());
if (measurementSchemas.isEmpty()) {
@@ -240,7 +251,8 @@
new ArrayList<>(schemaMap.keySet()),
fragmentInstanceContext,
queryDataSource,
- true);
+ true,
+ maxTsFileSetEndVersionAndMinResource.left);
if (dataBlockReader.hasNextBatch()) {
compactionWriter.startChunkGroup(device, true);
@@ -267,8 +279,8 @@
QueryDataSource queryDataSource,
Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource)
throws IOException, InterruptedException, ExecutionException {
- Map<String, MeasurementSchema> schemaMap = deviceIterator.getAllSchemasOfCurrentDevice(
- maxTsFileSetEndVersionAndMinResource);
+ Map<String, MeasurementSchema> schemaMap =
+ deviceIterator.getAllSchemasOfCurrentDevice(maxTsFileSetEndVersionAndMinResource);
List<String> allMeasurements = new ArrayList<>(schemaMap.keySet());
allMeasurements.sort((String::compareTo));
int subTaskNums = Math.min(allMeasurements.size(), SUB_TASK_NUM);
@@ -297,7 +309,7 @@
new QueryDataSource(queryDataSource),
compactionWriter,
schemaMap,
- i)));
+ i, maxTsFileSetEndVersionAndMinResource.left)));
}
for (Future<Void> future : futures) {
future.get();
@@ -321,7 +333,8 @@
List<String> allSensors,
FragmentInstanceContext fragmentInstanceContext,
QueryDataSource queryDataSource,
- boolean isAlign) {
+ boolean isAlign,
+ long maxTsFileSetEndVersion) {
IFullPath seriesPath;
if (isAlign) {
seriesPath = new AlignedFullPath(deviceId, measurementIds, measurementSchemas);
@@ -330,7 +343,7 @@
}
return new SeriesDataBlockReader(
- seriesPath, new HashSet<>(allSensors), fragmentInstanceContext, queryDataSource, true);
+ seriesPath, new HashSet<>(allSensors), fragmentInstanceContext, queryDataSource, true, maxTsFileSetEndVersion);
}
@SuppressWarnings("squid:S1172")
@@ -361,8 +374,16 @@
throws IOException {
if (!seqFileResources.isEmpty() && !unseqFileResources.isEmpty()) {
// cross space
+ List<TsFileResource> allSourceFiles =
+ Stream.concat(seqFileResources.stream(), unseqFileResources.stream())
+ .collect(Collectors.toList());
+ Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource =
+ TsFileResource.getMaxTsFileSetEndVersionAndMinResource(allSourceFiles);
return new ReadPointCrossCompactionWriter(
- targetFileResources, seqFileResources, encryptParameter);
+ targetFileResources,
+ seqFileResources,
+ encryptParameter,
+ maxTsFileSetEndVersionAndMinResource.left);
} else {
// inner space
return new ReadPointInnerCompactionWriter(targetFileResources, encryptParameter);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java
index dba8b5e..d027957 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java
@@ -121,11 +121,10 @@
@Override
protected void calculateSourceFilesAndTargetFiles() throws IOException {
filesView.sourceFilesInLog = filesView.sourceFilesInCompactionPerformer;
- TsFileResource targetResource = new TsFileResource(generateTargetFile(),
- TsFileResourceStatus.COMPACTING);
+ TsFileResource targetResource =
+ new TsFileResource(generateTargetFile(), TsFileResourceStatus.COMPACTING);
targetResource.setTsFileManager(tsFileManager);
- filesView.targetFilesInLog =
- Collections.singletonList(targetResource);
+ filesView.targetFilesInLog = Collections.singletonList(targetResource);
filesView.targetFilesInPerformer = filesView.targetFilesInLog;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/subtask/ReadPointPerformerSubTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/subtask/ReadPointPerformerSubTask.java
index 74f7259..ed96703 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/subtask/ReadPointPerformerSubTask.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/subtask/ReadPointPerformerSubTask.java
@@ -57,6 +57,7 @@
private final AbstractCompactionWriter compactionWriter;
private final Map<String, MeasurementSchema> schemaMap;
private final int taskId;
+ private final long maxTsFileSetEndVersion;
public ReadPointPerformerSubTask(
IDeviceID device,
@@ -65,7 +66,7 @@
QueryDataSource queryDataSource,
AbstractCompactionWriter compactionWriter,
Map<String, MeasurementSchema> schemaMap,
- int taskId) {
+ int taskId, long maxTsFileSetEndVersion) {
this.device = device;
this.measurementList = measurementList;
this.fragmentInstanceContext = fragmentInstanceContext;
@@ -73,6 +74,7 @@
this.compactionWriter = compactionWriter;
this.schemaMap = schemaMap;
this.taskId = taskId;
+ this.maxTsFileSetEndVersion = maxTsFileSetEndVersion;
}
@Override
@@ -88,7 +90,8 @@
new ArrayList<>(schemaMap.keySet()),
fragmentInstanceContext,
queryDataSource,
- false);
+ false,
+ maxTsFileSetEndVersion);
if (dataBlockReader.hasNextBatch()) {
compactionWriter.startMeasurement(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchema.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchema.java
index 99c4286..8d43305 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchema.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchema.java
@@ -35,9 +35,12 @@
public CompactionTableSchema(TableSchema tableSchema) {
this(tableSchema.getTableName(), tableSchema.getColumnSchemas(), tableSchema.getColumnTypes());
+ this.updatable = tableSchema.isUpdatable();
}
- public CompactionTableSchema(String tableName, List<IMeasurementSchema> columnSchemas,
+ public CompactionTableSchema(
+ String tableName,
+ List<IMeasurementSchema> columnSchemas,
List<ColumnCategory> columnCategories) {
super(tableName, columnSchemas, columnCategories);
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchemaCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchemaCollector.java
index d5eb8d0..2c2f34e 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchemaCollector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchemaCollector.java
@@ -20,8 +20,8 @@
package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-
import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchema;
+
import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.utils.Pair;
@@ -91,8 +91,8 @@
continue;
}
- EvolvedSchema evolvedSchema = resource.getMergedEvolvedSchema(
- maxTsFileSetEndVersionAndAssociatedResource.getLeft());
+ EvolvedSchema evolvedSchema =
+ resource.getMergedEvolvedSchema(maxTsFileSetEndVersionAndAssociatedResource.getLeft());
for (Map.Entry<String, TableSchema> entry : tableSchemaMap.entrySet()) {
String tableName = entry.getKey();
@@ -102,6 +102,7 @@
}
if (evolvedSchema != null) {
currentTableSchema = evolvedSchema.rewriteToFinal(currentTableSchema);
+ tableName = currentTableSchema.getTableName();
}
// merge all id columns, measurement schema will be generated automatically when end chunk
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
index a6136d7..067cb36 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
@@ -97,12 +97,18 @@
// sort the files from the newest to the oldest
Collections.sort(
this.tsFileResourcesSortedByDesc, TsFileResource::compareFileCreationOrderByDesc);
- long maxTsFileSetEndVersion = this.tsFileResourcesSortedByDesc.stream().mapToLong(
- // max endVersion of all filesets of a TsFile
- resource -> resource.getTsFileSets().stream().mapToLong(TsFileSet::getEndVersion).max()
- .orElse(Long.MAX_VALUE))
- // overall max endVersion
- .max().orElse(Long.MAX_VALUE);
+ long maxTsFileSetEndVersion =
+ this.tsFileResourcesSortedByDesc.stream()
+ .mapToLong(
+ // max endVersion of all filesets of a TsFile
+ resource ->
+ resource.getTsFileSets().stream()
+ .mapToLong(TsFileSet::getEndVersion)
+ .max()
+ .orElse(Long.MAX_VALUE))
+ // overall max endVersion
+ .max()
+ .orElse(Long.MAX_VALUE);
try {
for (TsFileResource tsFileResource : this.tsFileResourcesSortedByDesc) {
CompactionTsFileReader reader =
@@ -114,8 +120,8 @@
TsFileDeviceIterator tsFileDeviceIterator;
EvolvedSchema evolvedSchema = tsFileResource.getMergedEvolvedSchema(maxTsFileSetEndVersion);
if (evolvedSchema != null) {
- tsFileDeviceIterator = new ReorderedTsFileDeviceIterator(reader,
- evolvedSchema::rewriteToFinal);
+ tsFileDeviceIterator =
+ new ReorderedTsFileDeviceIterator(reader, evolvedSchema::rewriteToFinal);
} else {
tsFileDeviceIterator = reader.getAllDevicesIteratorWithIsAligned();
}
@@ -144,12 +150,35 @@
// sort the files from the newest to the oldest
Collections.sort(
this.tsFileResourcesSortedByDesc, TsFileResource::compareFileCreationOrderByDesc);
+
+ long maxTsFileSetEndVersion =
+ this.tsFileResourcesSortedByDesc.stream()
+ .mapToLong(
+ // max endVersion of all filesets of a TsFile
+ resource ->
+ resource.getTsFileSets().stream()
+ .mapToLong(TsFileSet::getEndVersion)
+ .max()
+ .orElse(Long.MAX_VALUE))
+ // overall max endVersion
+ .max()
+ .orElse(Long.MAX_VALUE);
+
for (TsFileResource tsFileResource : tsFileResourcesSortedByDesc) {
TsFileSequenceReader reader =
FileReaderManager.getInstance()
.get(tsFileResource.getTsFilePath(), tsFileResource.getTsFileID(), true);
readerMap.put(tsFileResource, reader);
- deviceIteratorMap.put(tsFileResource, reader.getAllDevicesIteratorWithIsAligned());
+
+ TsFileDeviceIterator tsFileDeviceIterator;
+ EvolvedSchema evolvedSchema = tsFileResource.getMergedEvolvedSchema(maxTsFileSetEndVersion);
+ if (evolvedSchema != null) {
+ tsFileDeviceIterator =
+ new ReorderedTsFileDeviceIterator(reader, evolvedSchema::rewriteToFinal);
+ } else {
+ tsFileDeviceIterator = reader.getAllDevicesIteratorWithIsAligned();
+ }
+ deviceIteratorMap.put(tsFileResource, tsFileDeviceIterator);
}
}
@@ -171,6 +200,19 @@
this.tsFileResourcesSortedByDesc, TsFileResource::compareFileCreationOrderByDesc);
this.readerMap = readerMap;
+ long maxTsFileSetEndVersion =
+ this.tsFileResourcesSortedByDesc.stream()
+ .mapToLong(
+ // max endVersion of all filesets of a TsFile
+ resource ->
+ resource.getTsFileSets().stream()
+ .mapToLong(TsFileSet::getEndVersion)
+ .max()
+ .orElse(Long.MAX_VALUE))
+ // overall max endVersion
+ .max()
+ .orElse(Long.MAX_VALUE);
+
CompactionType type = null;
if (!seqResources.isEmpty() && !unseqResources.isEmpty()) {
type = CompactionType.CROSS_COMPACTION;
@@ -187,11 +229,20 @@
type,
EncryptDBUtils.getFirstEncryptParamFromTSFilePath(tsFileResource.getTsFilePath()));
readerMap.put(tsFileResource, reader);
- deviceIteratorMap.put(tsFileResource, reader.getAllDevicesIteratorWithIsAligned());
+
+ TsFileDeviceIterator tsFileDeviceIterator;
+ EvolvedSchema evolvedSchema = tsFileResource.getMergedEvolvedSchema(maxTsFileSetEndVersion);
+ if (evolvedSchema != null) {
+ tsFileDeviceIterator =
+ new ReorderedTsFileDeviceIterator(reader, evolvedSchema::rewriteToFinal);
+ } else {
+ tsFileDeviceIterator = reader.getAllDevicesIteratorWithIsAligned();
+ }
+ deviceIteratorMap.put(tsFileResource, tsFileDeviceIterator);
}
}
- public boolean hasNextDevice() {
+ public boolean hasNextDevice() throws IOException {
boolean hasNext = false;
for (TsFileDeviceIterator iterator : deviceIteratorMap.values()) {
hasNext =
@@ -212,7 +263,7 @@
* @return Pair of device full path and whether this device is aligned
*/
@SuppressWarnings({"squid:S135", "java:S2259"})
- public Pair<IDeviceID, Boolean> nextDevice() throws IllegalPathException {
+ public Pair<IDeviceID, Boolean> nextDevice() throws IllegalPathException, IOException {
List<TsFileResource> toBeRemovedResources = new LinkedList<>();
Pair<IDeviceID, Boolean> minDevice = null;
// get the device from source files sorted from the newest to the oldest by version
@@ -292,24 +343,25 @@
timeseriesMetadataList,
deviceIteratorMap.get(resource).getFirstMeasurementNodeOfCurrentDevice(),
schemaMap.keySet(),
- true);
- EvolvedSchema evolvedSchema = resource.getMergedEvolvedSchema(
- maxTsFileSetEndVersionAndMinResource.left);
+ true,
+ null);
+ EvolvedSchema evolvedSchema =
+ resource.getMergedEvolvedSchema(maxTsFileSetEndVersionAndMinResource.left);
if (evolvedSchema != null) {
// the device has been rewritten, should get the original name for rewriting
- evolvedSchema.rewriteToFinal(evolvedSchema.getOriginalTableName(currentDevice.left.getTableName()), timeseriesMetadataList);
+ evolvedSchema.rewriteToFinal(
+ evolvedSchema.getOriginalTableName(currentDevice.left.getTableName()),
+ timeseriesMetadataList);
}
for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) {
if (!schemaMap.containsKey(timeseriesMetadata.getMeasurementId())
&& !timeseriesMetadata.getChunkMetadataList().isEmpty()) {
- MeasurementSchema measurementSchema = reader.getMeasurementSchema(
- timeseriesMetadata.getChunkMetadataList());
+ MeasurementSchema measurementSchema =
+ reader.getMeasurementSchema(timeseriesMetadata.getChunkMetadataList());
// the column may be renamed
measurementSchema.setMeasurementName(timeseriesMetadata.getMeasurementId());
- schemaMap.put(
- timeseriesMetadata.getMeasurementId(),
- measurementSchema);
+ schemaMap.put(timeseriesMetadata.getMeasurementId(), measurementSchema);
}
}
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/ReorderedTsFileDeviceIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/ReorderedTsFileDeviceIterator.java
index 14d2db8..3b9bf6ab 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/ReorderedTsFileDeviceIterator.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/ReorderedTsFileDeviceIterator.java
@@ -1,40 +1,43 @@
package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.utils.Pair;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
-import org.apache.tsfile.file.metadata.IDeviceID;
-import org.apache.tsfile.file.metadata.MetadataIndexNode;
-import org.apache.tsfile.read.TsFileSequenceReader;
-import org.apache.tsfile.utils.Pair;
public class ReorderedTsFileDeviceIterator extends TransformedTsFileDeviceIterator {
- private final List<Pair<Pair<IDeviceID, Boolean>, MetadataIndexNode>> deviceIDAndFirstMeasurementNodeList = new ArrayList<>();
+ private final List<Pair<Pair<IDeviceID, Boolean>, MetadataIndexNode>>
+ deviceIDAndFirstMeasurementNodeList = new ArrayList<>();
private Iterator<Pair<Pair<IDeviceID, Boolean>, MetadataIndexNode>> deviceIDListIterator;
private Pair<Pair<IDeviceID, Boolean>, MetadataIndexNode> current;
- public ReorderedTsFileDeviceIterator(TsFileSequenceReader reader,
- Function<IDeviceID, IDeviceID> transformer)
- throws IOException {
+ public ReorderedTsFileDeviceIterator(
+ TsFileSequenceReader reader, Function<IDeviceID, IDeviceID> transformer) throws IOException {
super(reader, transformer);
collectAndSort();
}
- public ReorderedTsFileDeviceIterator(TsFileSequenceReader reader, String tableName,
- Function<IDeviceID, IDeviceID> transformer) throws IOException {
+ public ReorderedTsFileDeviceIterator(
+ TsFileSequenceReader reader, String tableName, Function<IDeviceID, IDeviceID> transformer)
+ throws IOException {
super(reader, tableName, transformer);
collectAndSort();
}
- private void collectAndSort() {
+ private void collectAndSort() throws IOException {
while (super.hasNext()) {
Pair<IDeviceID, Boolean> next = super.next();
next.left = transformer.apply(next.left);
- deviceIDAndFirstMeasurementNodeList.add(new Pair<>(next, super.getFirstMeasurementNodeOfCurrentDevice()));
+ deviceIDAndFirstMeasurementNodeList.add(
+ new Pair<>(next, super.getFirstMeasurementNodeOfCurrentDevice()));
}
deviceIDAndFirstMeasurementNodeList.sort(Comparator.comparing(p -> p.getLeft().getLeft()));
deviceIDListIterator = deviceIDAndFirstMeasurementNodeList.iterator();
@@ -54,12 +57,12 @@
@Override
public Pair<IDeviceID, Boolean> current() {
- return current.left;
+ return current == null ? null : current.left;
}
@Override
public MetadataIndexNode getFirstMeasurementNodeOfCurrentDevice() {
// the devices have been reordered, cannot use the measurementNode
- return current.right;
+ return current == null ? null : current.right;
}
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/TransformedTsFileDeviceIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/TransformedTsFileDeviceIterator.java
index f1af028..a361adb 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/TransformedTsFileDeviceIterator.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/TransformedTsFileDeviceIterator.java
@@ -19,26 +19,28 @@
package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils;
-import java.io.IOException;
-import java.util.function.Function;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.read.TsFileDeviceIterator;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.utils.Pair;
+import java.io.IOException;
+import java.util.function.Function;
+
public class TransformedTsFileDeviceIterator extends TsFileDeviceIterator {
protected Function<IDeviceID, IDeviceID> transformer;
- public TransformedTsFileDeviceIterator(TsFileSequenceReader reader, Function<IDeviceID, IDeviceID> transformer)
- throws IOException {
+ public TransformedTsFileDeviceIterator(
+ TsFileSequenceReader reader, Function<IDeviceID, IDeviceID> transformer) throws IOException {
super(reader);
this.transformer = transformer;
}
- public TransformedTsFileDeviceIterator(TsFileSequenceReader reader, String tableName, Function<IDeviceID, IDeviceID> transformer)
+ public TransformedTsFileDeviceIterator(
+ TsFileSequenceReader reader, String tableName, Function<IDeviceID, IDeviceID> transformer)
throws IOException {
- super(reader, tableName);
+ super(reader, tableName, null);
this.transformer = transformer;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/reader/SeriesDataBlockReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/reader/SeriesDataBlockReader.java
index a49b97c..9c78d86 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/reader/SeriesDataBlockReader.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/reader/SeriesDataBlockReader.java
@@ -53,7 +53,8 @@
Set<String> allSensors,
FragmentInstanceContext context,
QueryDataSource dataSource,
- boolean ascending) {
+ boolean ascending,
+ long maxTsFileSetEndVersion) {
SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder();
scanOptionsBuilder.withAllSensors(allSensors);
@@ -63,14 +64,16 @@
(AlignedFullPath) seriesPath,
ascending ? Ordering.ASC : Ordering.DESC,
scanOptionsBuilder.build(),
- context);
+ context,
+ maxTsFileSetEndVersion);
} else if (seriesPath instanceof NonAlignedFullPath) {
this.seriesScanUtil =
new SeriesScanUtil(
seriesPath,
ascending ? Ordering.ASC : Ordering.DESC,
scanOptionsBuilder.build(),
- context);
+ context,
+ maxTsFileSetEndVersion);
} else {
throw new IllegalArgumentException("Should call exact sub class!");
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java
index 623f2b3..63f148e 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java
@@ -27,8 +27,8 @@
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.flushcontroller.AbstractCompactionFlushController;
import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileWriter;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
-
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
import org.apache.tsfile.encrypt.EncryptParameter;
import org.apache.tsfile.exception.write.PageException;
import org.apache.tsfile.file.header.PageHeader;
@@ -340,5 +340,6 @@
}
}
- public abstract void setSchemaForAllTargetFile(List<Schema> schemas, Pair<Long, TsFileResource> maxTsFileSetEndVersionAndAssociatedResource);
+ public abstract void setSchemaForAllTargetFile(
+ List<Schema> schemas, Pair<Long, TsFileResource> maxTsFileSetEndVersionAndAssociatedResource);
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
index cfa377c..5c4a56c 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
@@ -80,7 +80,11 @@
protected AbstractCrossCompactionWriter(
List<TsFileResource> targetResources, List<TsFileResource> seqFileResources)
throws IOException {
- this(targetResources, seqFileResources, EncryptDBUtils.getDefaultFirstEncryptParam(), Long.MIN_VALUE);
+ this(
+ targetResources,
+ seqFileResources,
+ EncryptDBUtils.getDefaultFirstEncryptParam(),
+ Long.MIN_VALUE);
}
protected AbstractCrossCompactionWriter(
@@ -106,7 +110,8 @@
targetResources.get(i),
memorySizeForEachWriter,
CompactionType.CROSS_COMPACTION,
- this.encryptParameter, maxTsFileSetEndVersion));
+ this.encryptParameter,
+ maxTsFileSetEndVersion));
isEmptyFile[i] = true;
}
this.seqTsFileResources = seqFileResources;
@@ -270,7 +275,8 @@
}
@Override
- public void setSchemaForAllTargetFile(List<Schema> schemas, Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) {
+ public void setSchemaForAllTargetFile(
+ List<Schema> schemas, Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) {
for (int i = 0; i < targetFileWriters.size(); i++) {
CompactionTsFileWriter compactionTsFileWriter = targetFileWriters.get(i);
Schema schema = schemas.get(i);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
index 28ca734..42338a8 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
@@ -130,8 +130,9 @@
Schema schema = CompactionTableSchemaCollector.copySchema(schemas.get(0));
TsFileResource minVersionResource = maxTsFileSetEndVersionAndMinResource.getRight();
fileWriter.getTsFileResource().setTsFileManager(minVersionResource.getTsFileManager());
- EvolvedSchema evolvedSchema = fileWriter.getTsFileResource().getMergedEvolvedSchema(maxTsFileSetEndVersion);
- fileWriter.setSchema(evolvedSchema.rewriteToOriginal(schema, CompactionTableSchema::new));
+ EvolvedSchema evolvedSchema =
+ fileWriter.getTsFileResource().getMergedEvolvedSchema(maxTsFileSetEndVersion);
+ fileWriter.setSchema(evolvedSchema != null ? evolvedSchema.rewriteToOriginal(schema, CompactionTableSchema::new) : schema);
}
@Override
@@ -184,7 +185,8 @@
}
@Override
- public void setSchemaForAllTargetFile(List<Schema> schemas, Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) {
+ public void setSchemaForAllTargetFile(
+ List<Schema> schemas, Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) {
this.schemas = schemas;
this.maxTsFileSetEndVersionAndMinResource = maxTsFileSetEndVersionAndMinResource;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/FastCrossCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/FastCrossCompactionWriter.java
index 59a87b4..f379d02 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/FastCrossCompactionWriter.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/FastCrossCompactionWriter.java
@@ -53,7 +53,11 @@
List<TsFileResource> seqSourceResources,
Map<TsFileResource, TsFileSequenceReader> readerMap)
throws IOException {
- super(targetResources, seqSourceResources, EncryptDBUtils.getDefaultFirstEncryptParam());
+ super(
+ targetResources,
+ seqSourceResources,
+ EncryptDBUtils.getDefaultFirstEncryptParam(),
+ Long.MIN_VALUE);
this.readerMap = readerMap;
}
@@ -61,9 +65,10 @@
List<TsFileResource> targetResources,
List<TsFileResource> seqSourceResources,
Map<TsFileResource, TsFileSequenceReader> readerMap,
- EncryptParameter encryptParameter)
+ EncryptParameter encryptParameter,
+ long maxTsFileSetEndVersion)
throws IOException {
- super(targetResources, seqSourceResources, encryptParameter);
+ super(targetResources, seqSourceResources, encryptParameter, maxTsFileSetEndVersion);
this.readerMap = readerMap;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java
index 6810df4..b2799c0 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java
@@ -47,15 +47,20 @@
public ReadPointCrossCompactionWriter(
List<TsFileResource> targetResources, List<TsFileResource> seqFileResources)
throws IOException {
- super(targetResources, seqFileResources, EncryptDBUtils.getDefaultFirstEncryptParam());
+ super(
+ targetResources,
+ seqFileResources,
+ EncryptDBUtils.getDefaultFirstEncryptParam(),
+ Long.MIN_VALUE);
}
public ReadPointCrossCompactionWriter(
List<TsFileResource> targetResources,
List<TsFileResource> seqFileResources,
- EncryptParameter encryptParameter)
+ EncryptParameter encryptParameter,
+ long maxTsFileSetEndVersion)
throws IOException {
- super(targetResources, seqFileResources, encryptParameter);
+ super(targetResources, seqFileResources, encryptParameter, maxTsFileSetEndVersion);
}
@Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java
index c8c24fca..efe33d2 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java
@@ -65,12 +65,19 @@
@TestOnly
public CompactionTsFileWriter(File file, long maxMetadataSize, CompactionType type)
throws IOException {
- this(new TsFileResource(file), maxMetadataSize, type, EncryptDBUtils.getDefaultFirstEncryptParam(),
+ this(
+ new TsFileResource(file),
+ maxMetadataSize,
+ type,
+ EncryptDBUtils.getDefaultFirstEncryptParam(),
Long.MIN_VALUE);
}
public CompactionTsFileWriter(
- TsFileResource tsFile, long maxMetadataSize, CompactionType type, EncryptParameter encryptParameter,
+ TsFileResource tsFile,
+ long maxMetadataSize,
+ CompactionType type,
+ EncryptParameter encryptParameter,
long maxTsFileSetEndVersion)
throws IOException {
super(tsFile.getTsFile(), maxMetadataSize, encryptParameter);
@@ -101,7 +108,13 @@
if (!chunkWriter.isEmpty()) {
isEmptyTargetFile = false;
}
- chunkWriter.writeToFileWriter(this);
+ chunkWriter.writeToFileWriter(
+ this,
+ evolvedSchema == null
+ ? null
+ : measurementName ->
+ evolvedSchema.getOriginalColumnName(
+ evolvedSchema.getFinalTableName(currentDeviceId.getTableName()), measurementName));
long writtenDataSize = this.getPos() - beforeOffset;
CompactionMetrics.getInstance()
.recordWriteInfo(
@@ -116,6 +129,13 @@
if (chunkMetadata.getNumOfPoints() != 0) {
isEmptyTargetFile = false;
}
+ if (evolvedSchema != null) {
+ chunk
+ .getHeader()
+ .setMeasurementID(
+ evolvedSchema.getOriginalColumnName(
+ currentDeviceId.getTableName(), chunk.getHeader().getMeasurementID()));
+ }
super.writeChunk(chunk, chunkMetadata);
long writtenDataSize = this.getPos() - beforeOffset;
CompactionMetrics.getInstance()
@@ -133,6 +153,10 @@
TSEncoding encodingType,
Statistics<? extends Serializable> statistics)
throws IOException {
+ if (evolvedSchema != null) {
+ measurementId =
+ evolvedSchema.getOriginalColumnName(currentDeviceId.getTableName(), measurementId);
+ }
long beforeOffset = this.getPos();
super.writeEmptyValueChunk(
measurementId, compressionType, tsDataType, encodingType, statistics);
@@ -150,6 +174,9 @@
@Override
public int startChunkGroup(IDeviceID deviceId) throws IOException {
+ if (evolvedSchema != null) {
+ deviceId = evolvedSchema.rewriteToOriginal(deviceId);
+ }
currentDeviceId = deviceId;
return super.startChunkGroup(deviceId);
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
index d515a7f..d0f3426 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
@@ -283,7 +283,7 @@
throws IOException {
List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
reader.getDeviceTimeseriesMetadata(
- timeseriesMetadataList, metadataIndexNode, Collections.emptySet(), true);
+ timeseriesMetadataList, metadataIndexNode, Collections.emptySet(), true, null);
long actualDeviceStartTime = Long.MAX_VALUE;
long actualDeviceEndTime = Long.MIN_VALUE;
for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java
index 2816493..b659585 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java
@@ -113,18 +113,18 @@
return queryDataSource;
}
- public boolean hasNextSeqResource(int curIndex, boolean ascending, IDeviceID deviceID) {
+ public boolean hasNextSeqResource(int curIndex, boolean ascending, IDeviceID deviceID, long maxTsFileSetEndVersion) {
boolean res = ascending ? curIndex < seqResources.size() : curIndex >= 0;
if (res && curIndex != this.curSeqIndex) {
this.curSeqIndex = curIndex;
- this.curSeqOrderTime = seqResources.get(curIndex).getOrderTimeForSeq(deviceID, ascending);
+ this.curSeqOrderTime = seqResources.get(curIndex).getOrderTimeForSeq(deviceID, ascending, maxTsFileSetEndVersion);
this.curSeqSatisfied = null;
}
return res;
}
public boolean isSeqSatisfied(
- IDeviceID deviceID, int curIndex, Filter timeFilter, boolean debug) {
+ IDeviceID deviceID, int curIndex, Filter timeFilter, boolean debug, long maxTsFileSetEndVersion) {
if (curIndex != this.curSeqIndex) {
throw new IllegalArgumentException(
String.format("curIndex %d is not equal to curSeqIndex %d", curIndex, this.curSeqIndex));
@@ -133,7 +133,7 @@
TsFileResource tsFileResource = seqResources.get(curSeqIndex);
curSeqSatisfied =
tsFileResource != null
- && (isSingleDevice || tsFileResource.isSatisfied(deviceID, timeFilter, true, debug));
+ && (isSingleDevice || tsFileResource.isSatisfied(deviceID, timeFilter, true, debug, maxTsFileSetEndVersion));
}
return curSeqSatisfied;
@@ -154,14 +154,14 @@
return null;
}
- public boolean hasNextUnseqResource(int curIndex, boolean ascending, IDeviceID deviceID) {
+ public boolean hasNextUnseqResource(int curIndex, boolean ascending, IDeviceID deviceID, long maxTsFileSetEndVersion) {
boolean res = curIndex < unseqResources.size();
if (res && curIndex != this.curUnSeqIndex) {
this.curUnSeqIndex = curIndex;
this.curUnSeqOrderTime =
unseqResources
.get(unSeqFileOrderIndex[curIndex])
- .getOrderTimeForUnseq(deviceID, ascending);
+ .getOrderTimeForUnseq(deviceID, ascending, maxTsFileSetEndVersion);
this.curUnSeqSatisfied = null;
}
return res;
@@ -209,7 +209,7 @@
return unseqResources.size();
}
- public void fillOrderIndexes(IDeviceID deviceId, boolean ascending) {
+ public void fillOrderIndexes(IDeviceID deviceId, boolean ascending, long maxTsFileSetEndVersion) {
if (unseqResources == null || unseqResources.isEmpty()) {
return;
}
@@ -219,7 +219,7 @@
for (TsFileResource resource : unseqResources) {
orderTimeToIndexMap
.computeIfAbsent(
- resource.getOrderTimeForUnseq(deviceId, ascending), key -> new ArrayList<>())
+ resource.getOrderTimeForUnseq(deviceId, ascending, maxTsFileSetEndVersion), key -> new ArrayList<>())
.add(index++);
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
index 11dcde2..610365c 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.storageengine.dataregion.tsfile;
-import java.util.stream.Collectors;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModFileManagement;
@@ -43,6 +42,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
public class TsFileManager {
private final String storageGroupName;
@@ -518,20 +518,25 @@
public void addTsFileSet(TsFileSet newSet, long partitionId) {
writeLock("addTsFileSet");
try {
- List<TsFileSet> tsFileSetList = tsfileSets.computeIfAbsent(partitionId,
- p -> new ArrayList<>());
+ List<TsFileSet> tsFileSetList =
+ tsfileSets.computeIfAbsent(partitionId, p -> new ArrayList<>());
tsFileSetList.add(newSet);
} finally {
writeUnlock();
}
}
- public List<TsFileSet> getTsFileSet(long partitionId, long minFileVersionIncluded, long maxFileVersionExcluded) {
+ public List<TsFileSet> getTsFileSet(
+ long partitionId, long minFileVersionIncluded, long maxFileVersionExcluded) {
readLock();
try {
List<TsFileSet> tsFileSetList = tsfileSets.get(partitionId);
- return tsFileSetList.stream().filter(s -> s.getEndVersion() < maxFileVersionExcluded && s.getEndVersion() >= minFileVersionIncluded).collect(
- Collectors.toList());
+ return tsFileSetList.stream()
+ .filter(
+ s ->
+ s.getEndVersion() < maxFileVersionExcluded
+ && s.getEndVersion() >= minFileVersionIncluded)
+ .collect(Collectors.toList());
} finally {
readUnlock();
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index a387db5..0bc12e3 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -641,8 +641,12 @@
}
// cannot use FileTimeIndex
- public long getOrderTimeForSeq(IDeviceID deviceId, boolean ascending) {
+ public long getOrderTimeForSeq(IDeviceID deviceId, boolean ascending, long maxTsFileSetEndVersion) {
if (timeIndex instanceof ArrayDeviceTimeIndex) {
+ EvolvedSchema evolvedSchema = getMergedEvolvedSchema(maxTsFileSetEndVersion);
+ if (evolvedSchema != null) {
+ deviceId = evolvedSchema.rewriteToOriginal(deviceId);
+ }
return ascending
? timeIndex.getStartTime(deviceId).orElse(Long.MIN_VALUE)
: timeIndex.getEndTime(deviceId).orElse(Long.MAX_VALUE);
@@ -652,8 +656,12 @@
}
// can use FileTimeIndex
- public long getOrderTimeForUnseq(IDeviceID deviceId, boolean ascending) {
+ public long getOrderTimeForUnseq(IDeviceID deviceId, boolean ascending, long maxTsFileSetEndVersion) {
if (timeIndex instanceof ArrayDeviceTimeIndex) {
+ EvolvedSchema evolvedSchema = getMergedEvolvedSchema(maxTsFileSetEndVersion);
+ if (evolvedSchema != null) {
+ deviceId = evolvedSchema.rewriteToOriginal(deviceId);
+ }
if (ascending) {
return timeIndex.getStartTime(deviceId).orElse(Long.MIN_VALUE);
} else {
@@ -998,14 +1006,26 @@
}
public boolean isDeviceIdExist(IDeviceID deviceId) {
+ EvolvedSchema evolvedSchema = getMergedEvolvedSchema();
+ if (evolvedSchema != null) {
+ deviceId = evolvedSchema.rewriteToOriginal(deviceId);
+ }
return timeIndex.checkDeviceIdExist(deviceId);
}
+ public boolean isSatisfied(IDeviceID deviceId, Filter timeFilter, boolean isSeq, boolean debug) {
+ return isSatisfied(deviceId, timeFilter, isSeq, debug, Long.MAX_VALUE);
+ }
+
/**
* @return true if the device is contained in the TsFile
*/
@SuppressWarnings("OptionalGetWithoutIsPresent")
- public boolean isSatisfied(IDeviceID deviceId, Filter timeFilter, boolean isSeq, boolean debug) {
+ public boolean isSatisfied(IDeviceID deviceId, Filter timeFilter, boolean isSeq, boolean debug, long maxTsFileSetEndVersion) {
+ EvolvedSchema evolvedSchema = getMergedEvolvedSchema(maxTsFileSetEndVersion);
+ if (evolvedSchema != null) {
+ deviceId = evolvedSchema.rewriteToOriginal(deviceId);
+ }
if (deviceId != null && definitelyNotContains(deviceId)) {
if (debug) {
DEBUG_LOGGER.info(
@@ -1632,11 +1652,13 @@
}
public List<TsFileSet> getTsFileSets() {
- return tsFileManager.getTsFileSet(tsFileID.timePartitionId, tsFileID.fileVersion, Long.MAX_VALUE);
+ return tsFileManager.getTsFileSet(
+ tsFileID.timePartitionId, tsFileID.fileVersion, Long.MAX_VALUE);
}
public List<TsFileSet> getTsFileSets(long maxEndVersionExcluded) {
- return tsFileManager.getTsFileSet(tsFileID.timePartitionId, tsFileID.fileVersion, maxEndVersionExcluded);
+ return tsFileManager.getTsFileSet(
+ tsFileID.timePartitionId, tsFileID.fileVersion, maxEndVersionExcluded);
}
public EvolvedSchema getMergedEvolvedSchema() {
@@ -1645,7 +1667,8 @@
public EvolvedSchema getMergedEvolvedSchema(long excludedMaxFileVersion) {
List<EvolvedSchema> list = new ArrayList<>();
- for (TsFileSet fileSet : getTsFileSets()) {
+ List<TsFileSet> tsFileSets = getTsFileSets();
+ for (TsFileSet fileSet : tsFileSets) {
if (fileSet.getEndVersion() >= excludedMaxFileVersion) {
continue;
}
@@ -1661,7 +1684,8 @@
return EvolvedSchema.merge(list.toArray(new EvolvedSchema[0]));
}
- public static Pair<Long, TsFileResource> getMaxTsFileSetEndVersionAndMinResource(List<TsFileResource> tsFileResources) {
+ public static Pair<Long, TsFileResource> getMaxTsFileSetEndVersionAndMinResource(
+ List<TsFileResource> tsFileResources) {
long maxTsFileSetEndVersion = Long.MIN_VALUE;
long minResourceVersion = Long.MAX_VALUE;
TsFileResource minTsFileResource = null;
@@ -1682,8 +1706,7 @@
return new Pair<>(maxTsFileSetEndVersion, minTsFileResource);
}
- public void setTsFileManager(
- TsFileManager tsFileManager) {
+ public void setTsFileManager(TsFileManager tsFileManager) {
this.tsFileManager = tsFileManager;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java
index a88e76d..3dca08b 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java
@@ -19,8 +19,6 @@
package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution;
-import java.util.function.Function;
-import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionTableSchema;
import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry.ModType;
@@ -28,13 +26,13 @@
import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate;
import org.apache.tsfile.enums.ColumnCategory;
-import org.apache.tsfile.file.metadata.IChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.IDeviceID.Factory;
import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.schema.Schema;
import java.util.ArrayList;
import java.util.Collections;
@@ -44,8 +42,8 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
+import java.util.function.Function;
import java.util.stream.Collectors;
-import org.apache.tsfile.write.schema.Schema;
public class EvolvedSchema {
// the evolved table names after applying all schema evolution operations
@@ -106,7 +104,7 @@
return finalToOriginalTableNames.getOrDefault(finalTableName, finalTableName);
}
- private String getFinalTableName(String originalTableName) {
+ public String getFinalTableName(String originalTableName) {
return originalToFinalTableNames.getOrDefault(originalTableName, originalTableName);
}
@@ -232,10 +230,9 @@
String originalTableName, List<TimeseriesMetadata> timeseriesMetadataList) {
timeseriesMetadataList.forEach(
timeseriesMetadata -> {
- String finalColumnName = getFinalColumnName(originalTableName,
- timeseriesMetadata.getMeasurementId());
- timeseriesMetadata.setMeasurementId(
- finalColumnName);
+ String finalColumnName =
+ getFinalColumnName(originalTableName, timeseriesMetadata.getMeasurementId());
+ timeseriesMetadata.setMeasurementId(finalColumnName);
});
}
@@ -263,11 +260,15 @@
getOriginalColumnName(
tableSchema.getTableName(), measurementSchema.getMeasurementName()),
measurementSchema.getType(),
- measurementSchema.getEncodingType(), measurementSchema.getCompressor()));
+ measurementSchema.getEncodingType(),
+ measurementSchema.getCompressor()));
columnCategories.add(tableSchema.getColumnTypes().get(i));
}
- return new TableSchema(originalTableName, measurementSchemas, columnCategories);
+ TableSchema schema = new TableSchema(originalTableName, measurementSchemas,
+ columnCategories);
+ schema.setUpdatable(tableSchema.isUpdatable());
+ return schema;
}
public TableSchema rewriteToFinal(TableSchema tableSchema) {
@@ -288,7 +289,10 @@
columnCategories.add(tableSchema.getColumnTypes().get(i));
}
- return new TableSchema(finalTableName, measurementSchemas, columnCategories);
+ TableSchema schema = new TableSchema(finalTableName, measurementSchemas,
+ columnCategories);
+ schema.setUpdatable(tableSchema.isUpdatable());
+ return schema;
}
@SuppressWarnings("SuspiciousSystemArraycopy")
@@ -310,6 +314,10 @@
new LinkedHashMap<>(evolvedSchema.finalToOriginalTableNames);
newEvolvedSchema.finalToOriginalColumnNames =
new LinkedHashMap<>(evolvedSchema.finalToOriginalColumnNames);
+ newEvolvedSchema.originalToFinalTableNames =
+ new LinkedHashMap<>(evolvedSchema.originalToFinalTableNames);
+ newEvolvedSchema.originalToFinalColumnNames =
+ new LinkedHashMap<>(evolvedSchema.originalToFinalColumnNames);
return newEvolvedSchema;
}
@@ -323,6 +331,9 @@
break;
}
}
+ if (i == schemas.length) {
+ return firstNotNullSchema;
+ }
if (firstNotNullSchema == null) {
return null;
@@ -359,7 +370,9 @@
public Schema rewriteToOriginal(Schema schema) {
return rewriteToOriginal(schema, null);
}
- public Schema rewriteToOriginal(Schema schema, Function<TableSchema, TableSchema> tableSchemaTransformer) {
+
+ public Schema rewriteToOriginal(
+ Schema schema, Function<TableSchema, TableSchema> tableSchemaTransformer) {
Schema copySchema = new Schema();
for (TableSchema tableSchema : schema.getTableSchemaMap().values()) {
TableSchema originalSchema = rewriteToOriginal(tableSchema);
@@ -370,6 +383,4 @@
}
return copySchema;
}
-
-
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java
index c2364b2..3ae2949 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java
@@ -23,11 +23,12 @@
import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolutionFile;
+import org.apache.tsfile.external.commons.io.FileUtils;
+
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.tsfile.external.commons.io.FileUtils;
/** TsFileSet represents a set of TsFiles in a time partition whose version <= endVersion. */
public class TsFileSet implements Comparable<TsFileSet> {
@@ -65,7 +66,7 @@
if (schemaEvolutionFile == null) {
schemaEvolutionFile =
new SchemaEvolutionFile(
- fileSetsDir + File.separator + 0 + SchemaEvolutionFile.FILE_SUFFIX);
+ fileSetDir + File.separator + 0 + SchemaEvolutionFile.FILE_SUFFIX);
}
}
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java
index 31ed98e..7a90f6b 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java
@@ -850,8 +850,8 @@
Pair<IDeviceID, Boolean> iDeviceIDBooleanPair = deviceIterator.nextDevice();
IDeviceID deviceID = iDeviceIDBooleanPair.getLeft();
boolean isAlign = iDeviceIDBooleanPair.getRight();
- Map<String, MeasurementSchema> schemaMap = deviceIterator.getAllSchemasOfCurrentDevice(
- new Pair<>(Long.MIN_VALUE, null));
+ Map<String, MeasurementSchema> schemaMap =
+ deviceIterator.getAllSchemasOfCurrentDevice(new Pair<>(Long.MIN_VALUE, null));
IMeasurementSchema timeSchema = schemaMap.remove(TsFileConstant.TIME_COLUMN_ID);
List<IMeasurementSchema> measurementSchemas = new ArrayList<>(schemaMap.values());
if (measurementSchemas.isEmpty()) {
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/ReadPointCompactionPerformerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/ReadPointCompactionPerformerTest.java
index f21571c..f926506 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/ReadPointCompactionPerformerTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/ReadPointCompactionPerformerTest.java
@@ -37,18 +37,32 @@
import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.ColumnRename;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.TableRename;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.fileset.TsFileSet;
import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.constant.TestConstant;
import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.enums.ColumnCategory;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.NoMeasurementException;
+import org.apache.tsfile.exception.write.NoTableException;
import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.ColumnSchemaBuilder;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.IDeviceID.Factory;
+import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.read.common.IBatchDataIterator;
import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.query.dataset.ResultSet;
+import org.apache.tsfile.read.v4.ITsFileReader;
+import org.apache.tsfile.read.v4.TsFileReaderBuilder;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.TsFileGeneratorUtils;
import org.apache.tsfile.utils.TsPrimitiveType;
+import org.apache.tsfile.write.TsFileWriter;
+import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
@@ -56,8 +70,10 @@
import org.junit.Before;
import org.junit.Test;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -66,6 +82,8 @@
import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_SEPARATOR;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
@SuppressWarnings("OptionalGetWithoutIsPresent")
public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
@@ -6993,4 +7011,293 @@
Assert.fail();
}
}
+
+ @Test
+ public void testWithSevoFile() throws Exception {
+ String fileSetDir =
+ TestConstant.BASE_OUTPUT_PATH + File.separator + TsFileSet.FILE_SET_DIR_NAME;
+ // file1:
+ // table1[s1, s2, s3]
+ // table2[s1, s2, s3]
+ File f1 = new File(SEQ_DIRS, "0-1-0-0.tsfile");
+ TableSchema tableSchema1_1 =
+ new TableSchema(
+ "table1",
+ Arrays.asList(
+ new ColumnSchemaBuilder()
+ .name("s1")
+ .dataType(TSDataType.INT32)
+ .category(ColumnCategory.FIELD)
+ .build(),
+ new ColumnSchemaBuilder()
+ .name("s2")
+ .dataType(TSDataType.INT32)
+ .category(ColumnCategory.FIELD)
+ .build(),
+ new ColumnSchemaBuilder()
+ .name("s3")
+ .dataType(TSDataType.INT32)
+ .category(ColumnCategory.FIELD)
+ .build()));
+ TableSchema tableSchema1_2 =
+ new TableSchema(
+ "table2",
+ Arrays.asList(
+ new ColumnSchemaBuilder()
+ .name("s1")
+ .dataType(TSDataType.INT32)
+ .category(ColumnCategory.FIELD)
+ .build(),
+ new ColumnSchemaBuilder()
+ .name("s2")
+ .dataType(TSDataType.INT32)
+ .category(ColumnCategory.FIELD)
+ .build(),
+ new ColumnSchemaBuilder()
+ .name("s3")
+ .dataType(TSDataType.INT32)
+ .category(ColumnCategory.FIELD)
+ .build()));
+ try (TsFileWriter tsFileWriter = new TsFileWriter(f1)) {
+ tsFileWriter.registerTableSchema(tableSchema1_1);
+ tsFileWriter.registerTableSchema(tableSchema1_2);
+
+ Tablet tablet1 = new Tablet(tableSchema1_1.getTableName(), tableSchema1_1.getColumnSchemas());
+ tablet1.addTimestamp(0, 0);
+ tablet1.addValue(0, 0, 1);
+ tablet1.addValue(0, 1, 2);
+ tablet1.addValue(0, 2, 3);
+
+ Tablet tablet2 = new Tablet(tableSchema1_2.getTableName(), tableSchema1_2.getColumnSchemas());
+ tablet2.addTimestamp(0, 0);
+ tablet2.addValue(0, 0, 101);
+ tablet2.addValue(0, 1, 102);
+ tablet2.addValue(0, 2, 103);
+
+ tsFileWriter.writeTable(tablet1);
+ tsFileWriter.writeTable(tablet2);
+ }
+ TsFileResource resource1 = new TsFileResource(f1);
+ resource1.setTsFileManager(tsFileManager);
+ resource1.updateStartTime(Factory.DEFAULT_FACTORY.create(new String[]{"table1"}), 0);
+ resource1.updateEndTime(Factory.DEFAULT_FACTORY.create(new String[]{"table1"}), 0);
+ resource1.updateStartTime(Factory.DEFAULT_FACTORY.create(new String[]{"table2"}), 0);
+ resource1.updateEndTime(Factory.DEFAULT_FACTORY.create(new String[]{"table2"}), 0);
+ resource1.close();
+
+ // rename table1 -> table0
+ TsFileSet tsFileSet1 = new TsFileSet(1, fileSetDir, false);
+ tsFileSet1.appendSchemaEvolution(
+ Collections.singletonList(new TableRename("table1", "table0")));
+ tsFileManager.addTsFileSet(tsFileSet1, 0);
+
+ // file2:
+ // table0[s1, s2, s3]
+ // table2[s1, s2, s3]
+ File f2 = new File(SEQ_DIRS, "0-2-0-0.tsfile");
+ TableSchema tableSchema2_1 =
+ new TableSchema(
+ "table0",
+ Arrays.asList(
+ new ColumnSchemaBuilder()
+ .name("s1")
+ .dataType(TSDataType.INT32)
+ .category(ColumnCategory.FIELD)
+ .build(),
+ new ColumnSchemaBuilder()
+ .name("s2")
+ .dataType(TSDataType.INT32)
+ .category(ColumnCategory.FIELD)
+ .build(),
+ new ColumnSchemaBuilder()
+ .name("s3")
+ .dataType(TSDataType.INT32)
+ .category(ColumnCategory.FIELD)
+ .build()));
+ TableSchema tableSchema2_2 =
+ new TableSchema(
+ "table2",
+ Arrays.asList(
+ new ColumnSchemaBuilder()
+ .name("s1")
+ .dataType(TSDataType.INT32)
+ .category(ColumnCategory.FIELD)
+ .build(),
+ new ColumnSchemaBuilder()
+ .name("s2")
+ .dataType(TSDataType.INT32)
+ .category(ColumnCategory.FIELD)
+ .build(),
+ new ColumnSchemaBuilder()
+ .name("s3")
+ .dataType(TSDataType.INT32)
+ .category(ColumnCategory.FIELD)
+ .build()));
+ try (TsFileWriter tsFileWriter = new TsFileWriter(f2)) {
+ tsFileWriter.registerTableSchema(tableSchema2_1);
+ tsFileWriter.registerTableSchema(tableSchema2_2);
+
+ Tablet tablet1 = new Tablet(tableSchema2_1.getTableName(), tableSchema2_1.getColumnSchemas());
+ tablet1.addTimestamp(0, 1);
+ tablet1.addValue(0, 0, 11);
+ tablet1.addValue(0, 1, 12);
+ tablet1.addValue(0, 2, 13);
+
+ Tablet tablet2 = new Tablet(tableSchema2_2.getTableName(), tableSchema2_2.getColumnSchemas());
+ tablet2.addTimestamp(0, 1);
+ tablet2.addValue(0, 0, 111);
+ tablet2.addValue(0, 1, 112);
+ tablet2.addValue(0, 2, 113);
+
+ tsFileWriter.writeTable(tablet1);
+ tsFileWriter.writeTable(tablet2);
+ }
+ TsFileResource resource2 = new TsFileResource(f2);
+ resource2.setTsFileManager(tsFileManager);
+ resource2.updateStartTime(Factory.DEFAULT_FACTORY.create(new String[]{"table0"}), 1);
+ resource2.updateEndTime(Factory.DEFAULT_FACTORY.create(new String[]{"table0"}), 1);
+ resource2.updateStartTime(Factory.DEFAULT_FACTORY.create(new String[]{"table2"}), 1);
+ resource2.updateEndTime(Factory.DEFAULT_FACTORY.create(new String[]{"table2"}), 1);
+ resource2.close();
+
+
+ // rename table0.s1 -> table0.s0
+ TsFileSet tsFileSet2 = new TsFileSet(2, fileSetDir, false);
+ tsFileSet2.appendSchemaEvolution(
+ Collections.singletonList(new ColumnRename("table0", "s1", "s0")));
+ tsFileManager.addTsFileSet(tsFileSet2, 0);
+
+ // file3:
+ // table0[s0, s2, s3]
+ // table2[s1, s2, s3]
+ File f3 = new File(SEQ_DIRS, "0-3-0-0.tsfile");
+ TableSchema tableSchema3_1 =
+ new TableSchema(
+ "table0",
+ Arrays.asList(
+ new ColumnSchemaBuilder()
+ .name("s0")
+ .dataType(TSDataType.INT32)
+ .category(ColumnCategory.FIELD)
+ .build(),
+ new ColumnSchemaBuilder()
+ .name("s2")
+ .dataType(TSDataType.INT32)
+ .category(ColumnCategory.FIELD)
+ .build(),
+ new ColumnSchemaBuilder()
+ .name("s3")
+ .dataType(TSDataType.INT32)
+ .category(ColumnCategory.FIELD)
+ .build()));
+ TableSchema tableSchema3_2 =
+ new TableSchema(
+ "table2",
+ Arrays.asList(
+ new ColumnSchemaBuilder()
+ .name("s1")
+ .dataType(TSDataType.INT32)
+ .category(ColumnCategory.FIELD)
+ .build(),
+ new ColumnSchemaBuilder()
+ .name("s2")
+ .dataType(TSDataType.INT32)
+ .category(ColumnCategory.FIELD)
+ .build(),
+ new ColumnSchemaBuilder()
+ .name("s3")
+ .dataType(TSDataType.INT32)
+ .category(ColumnCategory.FIELD)
+ .build()));
+ try (TsFileWriter tsFileWriter = new TsFileWriter(f3)) {
+ tsFileWriter.registerTableSchema(tableSchema3_1);
+ tsFileWriter.registerTableSchema(tableSchema3_2);
+
+ Tablet tablet1 = new Tablet(tableSchema3_1.getTableName(), tableSchema3_1.getColumnSchemas());
+ tablet1.addTimestamp(0, 2);
+ tablet1.addValue(0, 0, 21);
+ tablet1.addValue(0, 1, 22);
+ tablet1.addValue(0, 2, 23);
+
+ Tablet tablet2 = new Tablet(tableSchema3_2.getTableName(), tableSchema3_2.getColumnSchemas());
+ tablet2.addTimestamp(0, 2);
+ tablet2.addValue(0, 0, 121);
+ tablet2.addValue(0, 1, 122);
+ tablet2.addValue(0, 2, 123);
+
+ tsFileWriter.writeTable(tablet1);
+ tsFileWriter.writeTable(tablet2);
+ }
+ TsFileResource resource3 = new TsFileResource(f3);
+ resource3.setTsFileManager(tsFileManager);
+ resource3.updateStartTime(Factory.DEFAULT_FACTORY.create(new String[]{"table0"}), 2);
+ resource3.updateEndTime(Factory.DEFAULT_FACTORY.create(new String[]{"table0"}), 2);
+ resource3.updateStartTime(Factory.DEFAULT_FACTORY.create(new String[]{"table2"}), 2);
+ resource3.updateEndTime(Factory.DEFAULT_FACTORY.create(new String[]{"table2"}), 2);
+ resource3.close();
+
+ // rename table2 -> table1
+ TsFileSet tsFileSet3 = new TsFileSet(3, fileSetDir, false);
+ tsFileSet3.appendSchemaEvolution(
+ Collections.singletonList(new TableRename("table2", "table1")));
+ tsFileManager.addTsFileSet(tsFileSet3, 0);
+
+ // perform compaction
+ seqResources.add(resource1);
+ seqResources.add(resource2);
+ seqResources.add(resource3);
+
+ List<TsFileResource> targetResources =
+ CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources, true);
+ targetResources.forEach(s -> s.setTsFileManager(tsFileManager));
+
+ ICompactionPerformer performer =
+ new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
+ performer.setSummary(new CompactionTaskSummary());
+ performer.perform();
+
+ // target(version=1):
+ // table1[s1, s2, s3]
+ // table2[s1, s2, s3]
+ try (ITsFileReader tsFileReader =
+ new TsFileReaderBuilder().file(targetResources.get(0).getTsFile()).build()) {
+ // table1 should not exist
+ try {
+ tsFileReader.query("table0", Collections.singletonList("s2"), Long.MIN_VALUE, Long.MAX_VALUE);
+ fail("table0 should not exist");
+ } catch (NoTableException e) {
+ assertEquals("Table table0 not found", e.getMessage());
+ }
+
+ // table1.s0 should not exist
+ try {
+ tsFileReader.query("table1", Collections.singletonList("s0"), Long.MIN_VALUE, Long.MAX_VALUE);
+ fail("table1.s0 should not exist");
+ } catch (NoMeasurementException e) {
+ assertEquals("No measurement for s0", e.getMessage());
+ }
+
+ // check data of table1
+ ResultSet resultSet = tsFileReader.query("table1", Arrays.asList("s1", "s2", "s3"),
+ Long.MIN_VALUE, Long.MAX_VALUE);
+ for (int i = 0; i < 3; i++) {
+ assertTrue(resultSet.next());
+ assertEquals(i, resultSet.getLong(1));
+ for (int j = 0; j < 3; j++) {
+ assertEquals(i * 10 + j + 1, resultSet.getLong(j + 2));
+ }
+ }
+
+ // check data of table2
+ resultSet = tsFileReader.query("table2", Arrays.asList("s1", "s2", "s3"),
+ Long.MIN_VALUE, Long.MAX_VALUE);
+ for (int i = 0; i < 3; i++) {
+ assertTrue(resultSet.next());
+ assertEquals(i, resultSet.getLong(1));
+ for (int j = 0; j < 3; j++) {
+ assertEquals(100 + i * 10 + j + 1, resultSet.getLong(j + 2));
+ }
+ }
+ }
+ }
}
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionCheckerUtils.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionCheckerUtils.java
index d3851e4..8aca646 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionCheckerUtils.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionCheckerUtils.java
@@ -538,8 +538,8 @@
Pair<IDeviceID, Boolean> iDeviceIDBooleanPair = deviceIterator.nextDevice();
IDeviceID deviceID = iDeviceIDBooleanPair.getLeft();
boolean isAlign = iDeviceIDBooleanPair.getRight();
- Map<String, MeasurementSchema> schemaMap = deviceIterator.getAllSchemasOfCurrentDevice(
- new Pair<>(Long.MIN_VALUE, null));
+ Map<String, MeasurementSchema> schemaMap =
+ deviceIterator.getAllSchemasOfCurrentDevice(new Pair<>(Long.MIN_VALUE, null));
IMeasurementSchema timeSchema = schemaMap.remove(TsFileConstant.TIME_COLUMN_ID);
List<IMeasurementSchema> measurementSchemas = new ArrayList<>(schemaMap.values());
if (measurementSchemas.isEmpty()) {