[IOTDB-4364]Reduce read amplication in compaction (#7312)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java
index 4280d2a..bc04c09 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java
@@ -79,7 +79,7 @@
device,
Collections.singletonList(measurement),
measurementSchemas,
- measurementList,
+ schemaMap.keySet(),
fragmentInstanceContext,
queryDataSource,
false);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/MultiTsFileDeviceIterator.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/MultiTsFileDeviceIterator.java
index 73f3c2c..304ff6d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/MultiTsFileDeviceIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/MultiTsFileDeviceIterator.java
@@ -32,6 +32,7 @@
import org.apache.iotdb.tsfile.read.TsFileDeviceIterator;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.io.IOException;
import java.util.ArrayList;
@@ -48,10 +49,11 @@
import java.util.stream.Collectors;
public class MultiTsFileDeviceIterator implements AutoCloseable {
- private List<TsFileResource> tsFileResources;
- private Map<TsFileResource, TsFileSequenceReader> readerMap = new HashMap<>();
- private Map<TsFileResource, TsFileDeviceIterator> deviceIteratorMap = new HashMap<>();
- private Map<TsFileResource, List<Modification>> modificationCache = new HashMap<>();
+ // sorted from the newest to the oldest
+ private final List<TsFileResource> tsFileResources;
+ private final Map<TsFileResource, TsFileSequenceReader> readerMap = new HashMap<>();
+ private final Map<TsFileResource, TsFileDeviceIterator> deviceIteratorMap = new HashMap<>();
+ private final Map<TsFileResource, List<Modification>> modificationCache = new HashMap<>();
private Pair<String, Boolean> currentDevice = null;
/** Used for inner space compaction. */
@@ -77,13 +79,10 @@
/** Used for cross space compaction. */
public MultiTsFileDeviceIterator(
List<TsFileResource> seqResources, List<TsFileResource> unseqResources) throws IOException {
- for (TsFileResource tsFileResource : seqResources) {
- TsFileSequenceReader reader =
- FileReaderManager.getInstance().get(tsFileResource.getTsFilePath(), true);
- readerMap.put(tsFileResource, reader);
- deviceIteratorMap.put(tsFileResource, reader.getAllDevicesIteratorWithIsAligned());
- }
- for (TsFileResource tsFileResource : unseqResources) {
+ this.tsFileResources = new ArrayList<>(seqResources);
+ tsFileResources.addAll(unseqResources);
+ Collections.sort(this.tsFileResources, TsFileResource::compareFileNameByDesc);
+ for (TsFileResource tsFileResource : tsFileResources) {
TsFileSequenceReader reader =
FileReaderManager.getInstance().get(tsFileResource.getTsFilePath(), true);
readerMap.put(tsFileResource, reader);
@@ -135,6 +134,38 @@
}
/**
+ * Get all measurements and schemas of the current device from source files. Traverse all the
+ * files from the newest to the oldest in turn and start traversing the index tree from the
+ * firstMeasurementNode node to get all the measurements under the current device.
+ */
+ public Map<String, MeasurementSchema> getAllMeasurementSchemas() throws IOException {
+ Map<String, MeasurementSchema> schemaMap = new ConcurrentHashMap<>();
+ // get schemas from the newest file to the oldest file
+ for (TsFileResource resource : tsFileResources) {
+ if (!deviceIteratorMap.containsKey(resource)) {
+ continue;
+ }
+ TsFileSequenceReader reader = readerMap.get(resource);
+ List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
+ reader.getDeviceTimeseriesMetadata(
+ timeseriesMetadataList,
+ deviceIteratorMap.get(resource).getMeasurementNode(),
+ schemaMap.keySet(),
+ true);
+ for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) {
+ if (!schemaMap.containsKey(timeseriesMetadata.getMeasurementId())
+ && !timeseriesMetadata.getChunkMetadataList().isEmpty()) {
+ schemaMap.put(
+ timeseriesMetadata.getMeasurementId(),
+ reader.getMeasurementSchema(timeseriesMetadata.getChunkMetadataList()));
+ }
+ }
+ }
+ schemaMap.remove("");
+ return schemaMap;
+ }
+
+ /**
* return MeasurementIterator, who iterates the measurements of not aligned device
*
* @param device the full path of the device to be iterated
@@ -146,10 +177,6 @@
return new MeasurementIterator(readerMap, device, derserializeTimeseriesMetadata);
}
- public AlignedMeasurementIterator iterateAlignedSeries(String device) {
- return new AlignedMeasurementIterator(device, new ArrayList<>(readerMap.values()));
- }
-
/**
* return a list of the tsfile reader and its aligned chunk metadata list for the aligned device
* which this iterator is visiting. If there is any modification for this device, it will be
@@ -236,25 +263,6 @@
}
}
- public class AlignedMeasurementIterator {
- private List<TsFileSequenceReader> sequenceReaders;
- private String device;
-
- private AlignedMeasurementIterator(String device, List<TsFileSequenceReader> sequenceReaders) {
- this.device = device;
- this.sequenceReaders = sequenceReaders;
- }
-
- public Set<String> getAllMeasurements() throws IOException {
- Map<String, TimeseriesMetadata> deviceMeasurementsMap = new ConcurrentHashMap<>();
- for (TsFileSequenceReader reader : sequenceReaders) {
- deviceMeasurementsMap.putAll(reader.readDeviceMetadata(device));
- }
- deviceMeasurementsMap.remove("");
- return deviceMeasurementsMap.keySet();
- }
- }
-
public class MeasurementIterator {
private Map<TsFileResource, TsFileSequenceReader> readerMap;
private String device;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
index 0b8df32..de9d36e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
@@ -23,7 +23,6 @@
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import org.apache.iotdb.db.engine.compaction.cross.rewrite.task.ReadPointPerformerSubTask;
import org.apache.iotdb.db.engine.compaction.inner.utils.MultiTsFileDeviceIterator;
@@ -36,22 +35,16 @@
import org.apache.iotdb.db.engine.compaction.writer.CrossSpaceCompactionWriter;
import org.apache.iotdb.db.engine.compaction.writer.InnerSpaceCompactionWriter;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
-import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.utils.QueryUtils;
-import org.apache.iotdb.tsfile.file.header.ChunkHeader;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
-import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -65,9 +58,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -82,7 +73,7 @@
private List<TsFileResource> unseqFiles = Collections.emptyList();
private static final int subTaskNum =
IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum();
- private Map<TsFileResource, TsFileSequenceReader> readerCacheMap = new HashMap<>();
+
private CompactionTaskSummary summary;
private List<TsFileResource> targetFiles = Collections.emptyList();
@@ -140,7 +131,6 @@
updateDeviceStartTimeAndEndTime(targetFiles, compactionWriter);
updatePlanIndexes(targetFiles, seqFiles, unseqFiles);
} finally {
- clearReaderCache();
QueryResourceManager.getInstance().endQuery(queryId);
}
}
@@ -162,10 +152,7 @@
FragmentInstanceContext fragmentInstanceContext,
QueryDataSource queryDataSource)
throws IOException, MetadataException {
- MultiTsFileDeviceIterator.AlignedMeasurementIterator alignedMeasurementIterator =
- deviceIterator.iterateAlignedSeries(device);
- Set<String> allMeasurements = alignedMeasurementIterator.getAllMeasurements();
- Map<String, MeasurementSchema> schemaMap = getMeasurementSchema(device, allMeasurements);
+ Map<String, MeasurementSchema> schemaMap = deviceIterator.getAllMeasurementSchemas();
List<IMeasurementSchema> measurementSchemas = new ArrayList<>(schemaMap.values());
if (measurementSchemas.isEmpty()) {
return;
@@ -179,7 +166,7 @@
device,
existedMeasurements,
measurementSchemas,
- allMeasurements,
+ schemaMap.keySet(),
fragmentInstanceContext,
queryDataSource,
true);
@@ -200,17 +187,14 @@
AbstractCompactionWriter compactionWriter,
FragmentInstanceContext fragmentInstanceContext,
QueryDataSource queryDataSource)
- throws IOException, InterruptedException, IllegalPathException {
- MultiTsFileDeviceIterator.MeasurementIterator measurementIterator =
- deviceIterator.iterateNotAlignedSeries(device, false);
- Set<String> allMeasurements = measurementIterator.getAllMeasurements();
- int subTaskNums = Math.min(allMeasurements.size(), subTaskNum);
- Map<String, MeasurementSchema> schemaMap = getMeasurementSchema(device, allMeasurements);
+ throws IOException, InterruptedException {
+ Map<String, MeasurementSchema> measurementSchemaMap = deviceIterator.getAllMeasurementSchemas();
+ int subTaskNums = Math.min(measurementSchemaMap.size(), subTaskNum);
// assign all measurements to different sub tasks
Set<String>[] measurementsForEachSubTask = new HashSet[subTaskNums];
int idx = 0;
- for (String measurement : allMeasurements) {
+ for (String measurement : measurementSchemaMap.keySet()) {
if (measurementsForEachSubTask[idx % subTaskNums] == null) {
measurementsForEachSubTask[idx % subTaskNums] = new HashSet<>();
}
@@ -230,7 +214,7 @@
fragmentInstanceContext,
queryDataSource,
compactionWriter,
- schemaMap,
+ measurementSchemaMap,
i)));
}
@@ -247,76 +231,6 @@
compactionWriter.endChunkGroup();
}
- private Map<String, MeasurementSchema> getMeasurementSchema(
- String device, Set<String> measurements) throws IllegalPathException, IOException {
- HashMap<String, MeasurementSchema> schemaMap = new HashMap<>();
- List<TsFileResource> allResources = new LinkedList<>(seqFiles);
- allResources.addAll(unseqFiles);
- // sort the tsfile by version, so that we can iterate the tsfile from the newest to oldest
- allResources.sort(
- (o1, o2) -> {
- try {
- TsFileNameGenerator.TsFileName n1 =
- TsFileNameGenerator.getTsFileName(o1.getTsFile().getName());
- TsFileNameGenerator.TsFileName n2 =
- TsFileNameGenerator.getTsFileName(o2.getTsFile().getName());
- return (int) (n2.getVersion() - n1.getVersion());
- } catch (IOException e) {
- return 0;
- }
- });
- for (String measurement : measurements) {
- for (TsFileResource tsFileResource : allResources) {
- if (!tsFileResource.mayContainsDevice(device)) {
- continue;
- }
- MeasurementSchema schema =
- getMeasurementSchemaFromReader(
- tsFileResource,
- readerCacheMap.computeIfAbsent(
- tsFileResource,
- x -> {
- try {
- FileReaderManager.getInstance().increaseFileReaderReference(x, true);
- return FileReaderManager.getInstance().get(x.getTsFilePath(), true);
- } catch (IOException e) {
- throw new RuntimeException(
- String.format(
- "Failed to construct sequence reader for %s", tsFileResource));
- }
- }),
- device,
- measurement);
- if (schema != null) {
- schemaMap.put(measurement, schema);
- break;
- }
- }
- }
- return schemaMap;
- }
-
- private MeasurementSchema getMeasurementSchemaFromReader(
- TsFileResource resource, TsFileSequenceReader reader, String device, String measurement)
- throws IllegalPathException, IOException {
- List<ChunkMetadata> chunkMetadata =
- reader.getChunkMetadataList(new PartialPath(device, measurement), true);
- if (chunkMetadata.size() > 0) {
- chunkMetadata.get(0).setFilePath(resource.getTsFilePath());
- Chunk chunk = ChunkCache.getInstance().get(chunkMetadata.get(0));
- ChunkHeader header = chunk.getHeader();
- return new MeasurementSchema(
- measurement, header.getDataType(), header.getEncodingType(), header.getCompressionType());
- }
- return null;
- }
-
- private void clearReaderCache() throws IOException {
- for (TsFileResource resource : readerCacheMap.keySet()) {
- FileReaderManager.getInstance().decreaseFileReaderReference(resource, true);
- }
- }
-
private static void updateDeviceStartTimeAndEndTime(
List<TsFileResource> targetResources, AbstractCompactionWriter compactionWriter) {
List<TsFileIOWriter> targetFileWriters = compactionWriter.getFileIOWriter();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 79f7b59..99e5cfd 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -948,6 +948,18 @@
}
}
+ public static int compareFileNameByDesc(TsFileResource o1, TsFileResource o2) {
+ try {
+ TsFileNameGenerator.TsFileName n1 =
+ TsFileNameGenerator.getTsFileName(o1.getTsFile().getName());
+ TsFileNameGenerator.TsFileName n2 =
+ TsFileNameGenerator.getTsFileName(o2.getTsFile().getName());
+ return (int) (n2.getVersion() - n1.getVersion());
+ } catch (IOException e) {
+ return 0;
+ }
+ }
+
public void setSeq(boolean seq) {
isSeq = seq;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/AbstractCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/AbstractCompactionTest.java
index e4477b3..5e380fc 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/AbstractCompactionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/AbstractCompactionTest.java
@@ -57,6 +57,7 @@
private int chunkGroupSize = 0;
private int pageSize = 0;
protected String COMPACTION_TEST_SG = TsFileGeneratorUtils.testStorageGroup;
+ private TSDataType dataType;
private static final long oldTargetChunkSize =
IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
@@ -110,7 +111,7 @@
if (!UNSEQ_DIRS.exists()) {
Assert.assertTrue(UNSEQ_DIRS.mkdirs());
}
-
+ dataType = TSDataType.INT64;
EnvironmentUtils.envSetUp();
IoTDB.configManager.init();
}
@@ -195,23 +196,101 @@
}
}
+ /**
+ * @param fileNum the number of file
+ * @param deviceIndexes device index in each file
+ * @param measurementIndexes measurement index in each device of each file
+ * @param pointNum data point number of each timeseries in each file
+ * @param startTime start time of each timeseries
+ * @param timeInterval time interval of each timeseries between files
+ * @param isAlign when it is true, it will create mix tsfile which contains aligned and nonAligned
+ * timeseries
+ * @param isSeq
+ */
+ protected void createFilesWithTextValue(
+ int fileNum,
+ List<Integer> deviceIndexes,
+ List<Integer> measurementIndexes,
+ int pointNum,
+ int startTime,
+ int timeInterval,
+ boolean isAlign,
+ boolean isSeq)
+ throws IOException, WriteProcessException {
+
+ for (int i = 0; i < fileNum; i++) {
+ String fileName =
+ System.currentTimeMillis()
+ + FilePathUtils.FILE_NAME_SEPARATOR
+ + fileVersion++
+ + "-0-0.tsfile";
+ String filePath;
+ if (isSeq) {
+ filePath = SEQ_DIRS.getPath() + File.separator + fileName;
+ } else {
+ filePath = UNSEQ_DIRS.getPath() + File.separator + fileName;
+ }
+ File file;
+ if (isAlign) {
+ file =
+ TsFileGeneratorUtils.generateAlignedTsFileWithTextValues(
+ filePath,
+ deviceIndexes,
+ measurementIndexes,
+ pointNum,
+ startTime + pointNum * i + timeInterval * i,
+ chunkGroupSize,
+ pageSize);
+ } else {
+ file =
+ TsFileGeneratorUtils.generateNonAlignedTsFileWithTextValues(
+ filePath,
+ deviceIndexes,
+ measurementIndexes,
+ pointNum,
+ startTime + pointNum * i + timeInterval * i,
+ chunkGroupSize,
+ pageSize);
+ }
+ // add resource
+ TsFileResource resource = new TsFileResource(file);
+ int deviceStartindex = isAlign ? TsFileGeneratorUtils.getAlignDeviceOffset() : 0;
+ for (int j = 0; j < deviceIndexes.size(); j++) {
+ resource.updateStartTime(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + (deviceIndexes.get(j) + deviceStartindex),
+ startTime + pointNum * i + timeInterval * i);
+ resource.updateEndTime(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + (deviceIndexes.get(j) + deviceStartindex),
+ startTime + pointNum * i + timeInterval * i + pointNum - 1);
+ }
+ resource.updatePlanIndexes(fileVersion);
+ resource.setStatus(TsFileResourceStatus.CLOSED);
+ resource.serialize();
+ if (isSeq) {
+ seqResources.add(resource);
+ } else {
+ unseqResources.add(resource);
+ }
+ }
+ // sleep a few milliseconds to avoid generating files with same timestamps
+ try {
+ Thread.sleep(10);
+ } catch (Exception e) {
+
+ }
+ }
+
private void addResource(
File file, int deviceNum, long startTime, long endTime, boolean isAlign, boolean isSeq)
throws IOException {
TsFileResource resource = new TsFileResource(file);
- int deviceStartindex = 0;
- if (isAlign) {
- deviceStartindex = TsFileGeneratorUtils.getAlignDeviceOffset();
- for (int i = deviceStartindex; i < deviceStartindex + deviceNum; i++) {
- resource.updateStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i, startTime);
- resource.updateEndTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i, endTime);
- }
- } else {
- for (int i = deviceStartindex; i < deviceStartindex + deviceNum; i++) {
- resource.updateStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i, startTime);
- resource.updateEndTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i, endTime);
- }
+ int deviceStartindex = isAlign ? TsFileGeneratorUtils.getAlignDeviceOffset() : 0;
+
+ for (int i = deviceStartindex; i < deviceStartindex + deviceNum; i++) {
+ resource.updateStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i, startTime);
+ resource.updateEndTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i, endTime);
}
+
resource.updatePlanIndexes(fileVersion);
resource.setStatus(TsFileResourceStatus.CLOSED);
// resource.setTimeIndexType((byte) 0);
@@ -233,12 +312,12 @@
List<CompressionType> compressionTypes = new ArrayList<>();
for (int j = 0; j < measurementNum; j++) {
measurements.add("s" + j);
- dataTypes.add(TSDataType.INT64);
+ dataTypes.add(dataType);
encodings.add(TSEncoding.PLAIN);
compressionTypes.add(CompressionType.UNCOMPRESSED);
IoTDB.schemaProcessor.createTimeseries(
new PartialPath(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i, "s" + j),
- TSDataType.INT64,
+ dataType,
TSEncoding.PLAIN,
CompressionType.UNCOMPRESSED,
Collections.emptyMap());
@@ -253,7 +332,7 @@
for (int j = 0; j < measurementNum; j++) {
IoTDB.schemaProcessor.createTimeseries(
new PartialPath(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i, "s" + j),
- TSDataType.INT64,
+ dataType,
TSEncoding.PLAIN,
CompressionType.UNCOMPRESSED,
Collections.emptyMap());
@@ -262,6 +341,12 @@
}
}
+ protected void deleteTimeseriesInMManager(List<String> timeseries) throws MetadataException {
+ for (String path : timeseries) {
+ IoTDB.schemaProcessor.deleteTimeseries(new PartialPath(path));
+ }
+ }
+
public void tearDown() throws IOException, StorageEngineException {
new CompactionConfigRestorer().restoreCompactionConfig();
removeFiles();
@@ -305,4 +390,8 @@
resourceFile.delete();
}
}
+
+ protected void setDataType(TSDataType dataType) {
+ this.dataType = dataType;
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
index f0ee615..c5b65ed 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
@@ -34,6 +34,7 @@
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -43,8 +44,10 @@
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.TsFileGeneratorUtils;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -2047,6 +2050,7 @@
seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3" + PATH_SEPARATOR + "s4");
generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE);
generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE);
+ deleteTimeseriesInMManager(seriesPaths);
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 5; j++) {
@@ -2245,6 +2249,7 @@
}
generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE);
generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE);
+ deleteTimeseriesInMManager(seriesPaths);
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 5; j++) {
@@ -2435,6 +2440,7 @@
}
generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE);
generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE);
+ deleteTimeseriesInMManager(seriesPaths);
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 5; j++) {
@@ -3094,6 +3100,7 @@
+ "s4");
generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE);
generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE);
+ deleteTimeseriesInMManager(seriesPaths);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
i < TsFileGeneratorUtils.getAlignDeviceOffset() + 4;
@@ -3332,6 +3339,7 @@
}
generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE);
generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE);
+ deleteTimeseriesInMManager(seriesPaths);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
i < TsFileGeneratorUtils.getAlignDeviceOffset() + 4;
@@ -3481,6 +3489,1210 @@
}
/**
+ * Different source files have timeseries with the same path, but different data types. Because
+ * timeseries in the former file is been deleted.
+ */
+ @Test
+ public void testCrossSpaceCompactionWithSameTimeseriesInDifferentSourceFiles()
+ throws IOException, WriteProcessException, MetadataException, StorageEngineException,
+ InterruptedException {
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
+ registerTimeseriesInMManger(4, 5, false);
+ createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
+ createFiles(2, 4, 5, 300, 700, 700, 50, 50, false, true);
+ createFiles(3, 3, 4, 200, 20, 10020, 30, 30, false, false);
+ createFiles(2, 1, 5, 100, 450, 20450, 0, 0, false, false);
+
+ // generate mods file
+ List<String> seriesPaths = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0" + PATH_SEPARATOR + "s" + i);
+ seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1" + PATH_SEPARATOR + "s" + i);
+ seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2" + PATH_SEPARATOR + "s" + i);
+ }
+ generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE);
+ generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE);
+ deleteTimeseriesInMManager(seriesPaths);
+ setDataType(TSDataType.TEXT);
+ registerTimeseriesInMManger(2, 7, false);
+ List<Integer> deviceIndex = new ArrayList<>();
+ for (int i = 0; i < 2; i++) {
+ deviceIndex.add(i);
+ }
+ List<Integer> measurementIndex = new ArrayList<>();
+ for (int i = 0; i < 7; i++) {
+ measurementIndex.add(i);
+ }
+
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 1350, 0, false, false);
+
+ List<TsFileResource> targetResources =
+ CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
+ ICompactionPerformer performer =
+ new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
+ performer.setSummary(new CompactionTaskSummary());
+ performer.perform();
+ CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+ Assert.assertEquals(2, targetResources.size());
+
+ List<String> deviceIdList = new ArrayList<>();
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
+ for (int i = 0; i < 2; i++) {
+ if (i == 0) {
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+ } else {
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+ }
+ check(targetResources.get(i), deviceIdList);
+ }
+
+ Map<String, Long> measurementMaxTime = new HashMap<>();
+
+ for (int i = 0; i < 4; i++) {
+ TSDataType tsDataType = i < 2 ? TSDataType.TEXT : TSDataType.INT64;
+ for (int j = 0; j < 7; j++) {
+ measurementMaxTime.putIfAbsent(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
+ Long.MIN_VALUE);
+ PartialPath path =
+ new MeasurementPath(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
+ "s" + j,
+ new MeasurementSchema("s" + j, tsDataType));
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
+ path,
+ tsDataType,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
+ targetResources,
+ new ArrayList<>(),
+ true);
+ int count = 0;
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
+ if (measurementMaxTime.get(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j)
+ >= iterator.currentTime()) {
+ Assert.fail();
+ }
+ measurementMaxTime.put(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
+ iterator.currentTime());
+ count++;
+ iterator.next();
+ }
+ }
+ tsBlockReader.close();
+ if (i < 2 && j < 7) {
+ assertEquals(300, count);
+ } else if (i == 3 && j < 5) {
+ assertEquals(600, count);
+ } else {
+ assertEquals(0, count);
+ }
+ }
+ }
+ }
+
+ /** Each source file has different device. */
+ @Test
+ public void testCrossSpaceCompactionWithDifferentDevicesInDifferentSourceFiles()
+ throws IOException, WriteProcessException, MetadataException, StorageEngineException,
+ InterruptedException {
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
+ registerTimeseriesInMManger(5, 7, false);
+ List<Integer> deviceIndex = new ArrayList<>();
+ List<Integer> measurementIndex = new ArrayList<>();
+ for (int i = 0; i < 7; i++) {
+ measurementIndex.add(i);
+ }
+
+ deviceIndex.add(0);
+ deviceIndex.add(2);
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 0, 0, false, true);
+
+ deviceIndex.clear();
+ deviceIndex.add(1);
+ deviceIndex.add(3);
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 400, 0, false, true);
+
+ deviceIndex.clear();
+ deviceIndex.add(2);
+ deviceIndex.add(4);
+ deviceIndex.add(0);
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 800, 0, false, true);
+
+ deviceIndex.clear();
+ deviceIndex.add(1);
+ deviceIndex.add(4);
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 200, 100, 0, false, false);
+
+ deviceIndex.clear();
+ deviceIndex.add(1);
+ deviceIndex.add(3);
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 400, 600, 0, false, false);
+
+ List<TsFileResource> targetResources =
+ CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
+ ICompactionPerformer performer =
+ new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
+ performer.setSummary(new CompactionTaskSummary());
+ performer.perform();
+ CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+
+ List<String> deviceIdList = new ArrayList<>();
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d4");
+ for (int i = 0; i < 3; i++) {
+ if (i == 0) {
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d4"));
+ } else if (i == 1) {
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d4"));
+ } else {
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d4"));
+ }
+ check(targetResources.get(i), deviceIdList);
+ }
+
+ Map<String, Long> measurementMaxTime = new HashMap<>();
+
+ for (int i = 0; i < 5; i++) {
+ for (int j = 0; j < 5; j++) {
+ measurementMaxTime.putIfAbsent(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
+ Long.MIN_VALUE);
+ PartialPath path =
+ new MeasurementPath(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
+ "s" + j,
+ new MeasurementSchema("s" + j, TSDataType.TEXT));
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
+ path,
+ TSDataType.TEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
+ targetResources,
+ new ArrayList<>(),
+ true);
+ int count = 0;
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
+ if (measurementMaxTime.get(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j)
+ >= iterator.currentTime()) {
+ Assert.fail();
+ }
+ measurementMaxTime.put(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
+ iterator.currentTime());
+ count++;
+ iterator.next();
+ }
+ }
+ tsBlockReader.close();
+ if (i == 0 || i == 2 || i == 3) {
+ assertEquals(600, count);
+ } else if (i == 1) {
+ assertEquals(800, count);
+ } else {
+ assertEquals(500, count);
+ }
+ }
+ }
+ }
+
+ /** Each source file has same device with different measurements. */
+ @Test
+ public void testCrossSpaceCompactionWithDifferentMeasurementsInDifferentSourceFiles()
+ throws IOException, WriteProcessException, MetadataException, StorageEngineException,
+ InterruptedException {
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
+ registerTimeseriesInMManger(5, 5, false);
+ List<Integer> deviceIndex = new ArrayList<>();
+ List<Integer> measurementIndex = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ deviceIndex.add(i);
+ }
+
+ measurementIndex.add(0);
+ measurementIndex.add(2);
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 0, 0, false, true);
+
+ measurementIndex.clear();
+ measurementIndex.add(1);
+ measurementIndex.add(3);
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 400, 0, false, true);
+
+ measurementIndex.clear();
+ measurementIndex.add(2);
+ measurementIndex.add(4);
+ measurementIndex.add(0);
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 800, 0, false, true);
+
+ measurementIndex.clear();
+ measurementIndex.add(1);
+ measurementIndex.add(4);
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 200, 100, 0, false, false);
+
+ measurementIndex.clear();
+ measurementIndex.add(0);
+ measurementIndex.add(2);
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 200, 400, 0, false, false);
+
+ List<TsFileResource> targetResources =
+ CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
+ ICompactionPerformer performer =
+ new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
+ performer.setSummary(new CompactionTaskSummary());
+ performer.perform();
+ CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+
+ List<String> deviceIdList = new ArrayList<>();
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d4");
+ for (int i = 0; i < 3; i++) {
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d4"));
+ check(targetResources.get(i), deviceIdList);
+ }
+
+ Map<String, Long> measurementMaxTime = new HashMap<>();
+
+ for (int i = 0; i < 5; i++) {
+ for (int j = 0; j < 5; j++) {
+ measurementMaxTime.putIfAbsent(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
+ Long.MIN_VALUE);
+ PartialPath path =
+ new MeasurementPath(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
+ "s" + j,
+ new MeasurementSchema("s" + j, TSDataType.TEXT));
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
+ path,
+ TSDataType.TEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
+ targetResources,
+ new ArrayList<>(),
+ true);
+ int count = 0;
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
+ if (measurementMaxTime.get(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j)
+ >= iterator.currentTime()) {
+ Assert.fail();
+ }
+ measurementMaxTime.put(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
+ iterator.currentTime());
+ count++;
+ iterator.next();
+ }
+ }
+ tsBlockReader.close();
+ if (j == 0 || j == 2) {
+ assertEquals(800, count);
+ } else if (j == 1 || j == 4) {
+ assertEquals(500, count);
+ } else {
+ assertEquals(300, count);
+ }
+ }
+ }
+ }
+
+ /** Each source file has different devices and different measurements. */
+ @Test
+ public void testCrossSpaceCompactionWithDifferentDevicesAndMeasurementsInDifferentSourceFiles()
+ throws IOException, WriteProcessException, MetadataException, StorageEngineException,
+ InterruptedException {
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
+ registerTimeseriesInMManger(4, 5, false);
+ List<Integer> deviceIndex = new ArrayList<>();
+ List<Integer> measurementIndex = new ArrayList<>();
+ deviceIndex.add(0);
+ deviceIndex.add(1);
+
+ measurementIndex.add(0);
+ measurementIndex.add(2);
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 0, 0, false, true);
+
+ measurementIndex.clear();
+ measurementIndex.add(1);
+ measurementIndex.add(3);
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 400, 0, false, true);
+
+ deviceIndex.add(2);
+ deviceIndex.add(3);
+ measurementIndex.clear();
+ measurementIndex.add(2);
+ measurementIndex.add(4);
+ measurementIndex.add(0);
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 800, 0, false, true);
+ deviceIndex.remove(2);
+ deviceIndex.remove(2);
+
+ measurementIndex.clear();
+ measurementIndex.add(1);
+ measurementIndex.add(4);
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 200, 100, 0, false, false);
+
+ measurementIndex.clear();
+ measurementIndex.add(0);
+ measurementIndex.add(2);
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 600, 0, false, false);
+
+ List<TsFileResource> targetResources =
+ CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
+ ICompactionPerformer performer =
+ new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
+ performer.setSummary(new CompactionTaskSummary());
+ performer.perform();
+ CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+
+ List<String> deviceIdList = new ArrayList<>();
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
+ for (int i = 0; i < 3; i++) {
+ if (i < 2) {
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+ } else {
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+ }
+ check(targetResources.get(i), deviceIdList);
+ }
+
+ Map<String, Long> measurementMaxTime = new HashMap<>();
+
+ for (int i = 0; i < 4; i++) {
+ for (int j = 0; j < 5; j++) {
+ measurementMaxTime.putIfAbsent(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
+ Long.MIN_VALUE);
+ PartialPath path =
+ new MeasurementPath(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
+ "s" + j,
+ new MeasurementSchema("s" + j, TSDataType.TEXT));
+ IBatchReader tsFilesReader =
+ new SeriesRawDataBatchReader(
+ path,
+ TSDataType.VECTOR,
+ EnvironmentUtils.TEST_QUERY_CONTEXT,
+ targetResources,
+ new ArrayList<>(),
+ null,
+ null,
+ true);
+ int count = 0;
+ while (tsFilesReader.hasNextBatch()) {
+ BatchData batchData = tsFilesReader.nextBatch();
+ while (batchData.hasCurrent()) {
+ if (measurementMaxTime.get(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j)
+ >= batchData.currentTime()) {
+ Assert.fail();
+ }
+ measurementMaxTime.put(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
+ batchData.currentTime());
+ count++;
+ batchData.next();
+ }
+ }
+ tsFilesReader.close();
+ if (i < 2) {
+ if (j == 0 || j == 2) {
+ assertEquals(800, count);
+ } else if (j == 1 || j == 4) {
+ assertEquals(500, count);
+ } else {
+ assertEquals(300, count);
+ }
+ } else {
+ if (j == 0 || j == 2 || j == 4) {
+ assertEquals(300, count);
+ } else {
+ assertEquals(0, count);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Different source files have timeseries with the same path, but different data types. Because
+ * timeseries in the former file is been deleted.
+ */
+ @Test
+ public void testAlignedCrossSpaceCompactionWithSameTimeseriesInDifferentSourceFiles()
+ throws IOException, WriteProcessException, MetadataException, StorageEngineException,
+ InterruptedException {
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
+ registerTimeseriesInMManger(4, 5, true);
+ createFiles(2, 2, 3, 300, 0, 0, 50, 50, true, true);
+ createFiles(2, 4, 5, 300, 700, 700, 50, 50, true, true);
+ createFiles(3, 3, 4, 200, 20, 10020, 30, 30, true, false);
+ createFiles(2, 1, 5, 100, 450, 20450, 0, 0, true, false);
+
+ // generate mods file
+ List<String> seriesPaths = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ seriesPaths.add(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + TsFileGeneratorUtils.getAlignDeviceOffset()
+ + PATH_SEPARATOR
+ + "s"
+ + i);
+ seriesPaths.add(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 1)
+ + PATH_SEPARATOR
+ + "s"
+ + i);
+ seriesPaths.add(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 2)
+ + PATH_SEPARATOR
+ + "s"
+ + i);
+ }
+ generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE);
+ generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE);
+ deleteTimeseriesInMManager(seriesPaths);
+ setDataType(TSDataType.TEXT);
+ registerTimeseriesInMManger(2, 7, true);
+ List<Integer> deviceIndex = new ArrayList<>();
+ for (int i = 0; i < 2; i++) {
+ deviceIndex.add(i);
+ }
+ List<Integer> measurementIndex = new ArrayList<>();
+ for (int i = 0; i < 7; i++) {
+ measurementIndex.add(i);
+ }
+
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 1350, 0, true, false);
+
+ List<TsFileResource> targetResources =
+ CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
+ ICompactionPerformer performer =
+ new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
+ performer.setSummary(new CompactionTaskSummary());
+ performer.perform();
+ CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+
+ Assert.assertEquals(2, targetResources.size());
+
+ List<String> deviceIdList = new ArrayList<>();
+ deviceIdList.add(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + (TsFileGeneratorUtils.getAlignDeviceOffset()));
+ deviceIdList.add(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 1));
+ deviceIdList.add(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 2));
+ deviceIdList.add(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 3));
+ for (int i = 0; i < 2; i++) {
+ if (i == 0) {
+ Assert.assertFalse(
+ targetResources
+ .get(i)
+ .isDeviceIdExist(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset())));
+ Assert.assertFalse(
+ targetResources
+ .get(i)
+ .isDeviceIdExist(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 1)));
+ Assert.assertFalse(
+ targetResources
+ .get(i)
+ .isDeviceIdExist(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 2)));
+ Assert.assertTrue(
+ targetResources
+ .get(i)
+ .isDeviceIdExist(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 3)));
+ } else {
+ Assert.assertTrue(
+ targetResources
+ .get(i)
+ .isDeviceIdExist(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset())));
+ Assert.assertTrue(
+ targetResources
+ .get(i)
+ .isDeviceIdExist(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 1)));
+ Assert.assertFalse(
+ targetResources
+ .get(i)
+ .isDeviceIdExist(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 2)));
+ Assert.assertTrue(
+ targetResources
+ .get(i)
+ .isDeviceIdExist(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 3)));
+ }
+ check(targetResources.get(i), deviceIdList);
+ }
+
+ Map<String, Long> measurementMaxTime = new HashMap<>();
+
+ for (int i = 0; i < 4; i++) {
+ TSDataType tsDataType = i < 2 ? TSDataType.TEXT : TSDataType.INT64;
+ for (int j = 0; j < 7; j++) {
+ measurementMaxTime.putIfAbsent(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + i)
+ + PATH_SEPARATOR
+ + "s"
+ + j,
+ Long.MIN_VALUE);
+ List<IMeasurementSchema> schemas = new ArrayList<>();
+ schemas.add(new MeasurementSchema("s" + j, tsDataType));
+ AlignedPath path =
+ new AlignedPath(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + i),
+ Collections.singletonList("s" + j),
+ schemas);
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
+ path,
+ tsDataType,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
+ targetResources,
+ new ArrayList<>(),
+ true);
+ int count = 0;
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
+ if (measurementMaxTime.get(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + i)
+ + PATH_SEPARATOR
+ + "s"
+ + j)
+ >= iterator.currentTime()) {
+ Assert.fail();
+ }
+ measurementMaxTime.put(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + i)
+ + PATH_SEPARATOR
+ + "s"
+ + j,
+ iterator.currentTime());
+ count++;
+ iterator.next();
+ }
+ }
+ tsBlockReader.close();
+ if (i < 2 && j < 7) {
+ assertEquals(300, count);
+ } else if (i == 3 && j < 5) {
+ assertEquals(600, count);
+ } else {
+ assertEquals(0, count);
+ }
+ }
+ }
+ }
+
+ /** Each source file has different device. */
+ @Test
+ public void testAlignedCrossSpaceCompactionWithDifferentDevicesInDifferentSourceFiles()
+ throws IOException, WriteProcessException, MetadataException, StorageEngineException,
+ InterruptedException {
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
+ registerTimeseriesInMManger(5, 7, true);
+ List<Integer> deviceIndex = new ArrayList<>();
+ List<Integer> measurementIndex = new ArrayList<>();
+ for (int i = 0; i < 7; i++) {
+ measurementIndex.add(i);
+ }
+
+ deviceIndex.add(0);
+ deviceIndex.add(2);
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 0, 0, true, true);
+
+ deviceIndex.clear();
+ deviceIndex.add(1);
+ deviceIndex.add(3);
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 400, 0, true, true);
+
+ deviceIndex.clear();
+ deviceIndex.add(2);
+ deviceIndex.add(4);
+ deviceIndex.add(0);
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 800, 0, true, true);
+
+ deviceIndex.clear();
+ deviceIndex.add(1);
+ deviceIndex.add(4);
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 200, 100, 0, true, false);
+
+ deviceIndex.clear();
+ deviceIndex.add(1);
+ deviceIndex.add(3);
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 400, 600, 0, true, false);
+
+ List<TsFileResource> targetResources =
+ CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
+ ICompactionPerformer performer =
+ new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
+ performer.setSummary(new CompactionTaskSummary());
+ performer.perform();
+ CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+
+ List<String> deviceIdList = new ArrayList<>();
+ deviceIdList.add(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + TsFileGeneratorUtils.getAlignDeviceOffset());
+ deviceIdList.add(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 1));
+ deviceIdList.add(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 2));
+ deviceIdList.add(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 3));
+ deviceIdList.add(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 4));
+ for (int i = 0; i < 3; i++) {
+ if (i == 0) {
+ Assert.assertTrue(
+ targetResources
+ .get(i)
+ .isDeviceIdExist(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset())));
+ Assert.assertFalse(
+ targetResources
+ .get(i)
+ .isDeviceIdExist(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 1)));
+ Assert.assertTrue(
+ targetResources
+ .get(i)
+ .isDeviceIdExist(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 2)));
+ Assert.assertFalse(
+ targetResources
+ .get(i)
+ .isDeviceIdExist(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 3)));
+ Assert.assertFalse(
+ targetResources
+ .get(i)
+ .isDeviceIdExist(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 4)));
+ } else if (i == 1) {
+ Assert.assertFalse(
+ targetResources
+ .get(i)
+ .isDeviceIdExist(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset())));
+ Assert.assertTrue(
+ targetResources
+ .get(i)
+ .isDeviceIdExist(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 1)));
+ Assert.assertFalse(
+ targetResources
+ .get(i)
+ .isDeviceIdExist(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 2)));
+ Assert.assertTrue(
+ targetResources
+ .get(i)
+ .isDeviceIdExist(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 3)));
+ Assert.assertFalse(
+ targetResources
+ .get(i)
+ .isDeviceIdExist(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 4)));
+ } else {
+ Assert.assertTrue(
+ targetResources
+ .get(i)
+ .isDeviceIdExist(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset())));
+ Assert.assertTrue(
+ targetResources
+ .get(i)
+ .isDeviceIdExist(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 1)));
+ Assert.assertTrue(
+ targetResources
+ .get(i)
+ .isDeviceIdExist(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 2)));
+ Assert.assertTrue(
+ targetResources
+ .get(i)
+ .isDeviceIdExist(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 3)));
+ Assert.assertTrue(
+ targetResources
+ .get(i)
+ .isDeviceIdExist(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 4)));
+ }
+ check(targetResources.get(i), deviceIdList);
+ }
+
+ Map<String, Long> measurementMaxTime = new HashMap<>();
+
+ for (int i = 0; i < 5; i++) {
+ for (int j = 0; j < 5; j++) {
+ measurementMaxTime.putIfAbsent(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + i)
+ + PATH_SEPARATOR
+ + "s"
+ + j,
+ Long.MIN_VALUE);
+ List<IMeasurementSchema> schemas = new ArrayList<>();
+ schemas.add(new MeasurementSchema("s" + j, TSDataType.TEXT));
+ AlignedPath path =
+ new AlignedPath(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + i),
+ Collections.singletonList("s" + j),
+ schemas);
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
+ path,
+ TSDataType.TEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
+ targetResources,
+ new ArrayList<>(),
+ true);
+ int count = 0;
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
+ if (measurementMaxTime.get(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + i)
+ + PATH_SEPARATOR
+ + "s"
+ + j)
+ >= iterator.currentTime()) {
+ Assert.fail();
+ }
+ measurementMaxTime.put(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + i)
+ + PATH_SEPARATOR
+ + "s"
+ + j,
+ iterator.currentTime());
+ count++;
+ iterator.next();
+ }
+ }
+ tsBlockReader.close();
+ if (i == 0 || i == 2 || i == 3) {
+ assertEquals(600, count);
+ } else if (i == 1) {
+ assertEquals(800, count);
+ } else {
+ assertEquals(500, count);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testAlignedCrossSpaceCompactionWithDifferentMeasurementsInDifferentSourceFiles()
+ throws IOException, WriteProcessException, MetadataException, StorageEngineException,
+ InterruptedException {
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
+ registerTimeseriesInMManger(5, 5, true);
+ List<Integer> deviceIndex = new ArrayList<>();
+ List<Integer> measurementIndex = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ deviceIndex.add(i);
+ }
+
+ measurementIndex.add(0);
+ measurementIndex.add(2);
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 0, 0, true, true);
+
+ measurementIndex.clear();
+ measurementIndex.add(1);
+ measurementIndex.add(3);
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 400, 0, true, true);
+
+ measurementIndex.clear();
+ measurementIndex.add(2);
+ measurementIndex.add(4);
+ measurementIndex.add(0);
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 800, 0, true, true);
+
+ measurementIndex.clear();
+ measurementIndex.add(1);
+ measurementIndex.add(4);
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 200, 100, 0, true, false);
+
+ measurementIndex.clear();
+ measurementIndex.add(0);
+ measurementIndex.add(2);
+ createFilesWithTextValue(1, deviceIndex, measurementIndex, 200, 400, 0, true, false);
+
+ List<TsFileResource> targetResources =
+ CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
+ ICompactionPerformer performer =
+ new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
+ performer.setSummary(new CompactionTaskSummary());
+ performer.perform();
+ CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+
+ List<String> deviceIdList = new ArrayList<>();
+ deviceIdList.add(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + TsFileGeneratorUtils.getAlignDeviceOffset());
+ deviceIdList.add(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 1));
+ deviceIdList.add(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 2));
+ deviceIdList.add(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 3));
+ deviceIdList.add(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 4));
+ for (int i = 0; i < 3; i++) {
+ Assert.assertTrue(
+ targetResources
+ .get(i)
+ .isDeviceIdExist(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset())));
+ Assert.assertTrue(
+ targetResources
+ .get(i)
+ .isDeviceIdExist(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 1)));
+ Assert.assertTrue(
+ targetResources
+ .get(i)
+ .isDeviceIdExist(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 2)));
+ Assert.assertTrue(
+ targetResources
+ .get(i)
+ .isDeviceIdExist(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 3)));
+ Assert.assertTrue(
+ targetResources
+ .get(i)
+ .isDeviceIdExist(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + 4)));
+ check(targetResources.get(i), deviceIdList);
+ }
+
+ Map<String, Long> measurementMaxTime = new HashMap<>();
+
+ for (int i = 0; i < 5; i++) {
+ for (int j = 0; j < 5; j++) {
+ measurementMaxTime.putIfAbsent(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + i)
+ + PATH_SEPARATOR
+ + "s"
+ + j,
+ Long.MIN_VALUE);
+ List<IMeasurementSchema> schemas = new ArrayList<>();
+ schemas.add(new MeasurementSchema("s" + j, TSDataType.TEXT));
+ AlignedPath path =
+ new AlignedPath(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + i),
+ Collections.singletonList("s" + j),
+ schemas);
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
+ path,
+ TSDataType.TEXT,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
+ targetResources,
+ new ArrayList<>(),
+ true);
+ int count = 0;
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
+ if (measurementMaxTime.get(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + i)
+ + PATH_SEPARATOR
+ + "s"
+ + j)
+ >= iterator.currentTime()) {
+ Assert.fail();
+ }
+ measurementMaxTime.put(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "d"
+ + (TsFileGeneratorUtils.getAlignDeviceOffset() + i)
+ + PATH_SEPARATOR
+ + "s"
+ + j,
+ iterator.currentTime());
+ count++;
+ iterator.next();
+ }
+ }
+ tsBlockReader.close();
+ if (j == 0 || j == 2) {
+ assertEquals(800, count);
+ } else if (j == 1 || j == 4) {
+ assertEquals(500, count);
+ } else {
+ assertEquals(300, count);
+ }
+ }
+ }
+ }
+
+ /**
* Total 4 seq files and 5 unseq files, each file has different aligned timeseries.
*
* <p>Seq files<br>
@@ -3538,6 +4750,7 @@
}
generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE);
generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE);
+ deleteTimeseriesInMManager(seriesPaths);
for (TsFileResource resource : seqResources) {
resource.setTimeIndexType((byte) 2);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
index 58b26db..db7072c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
@@ -32,6 +32,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
public class TimeseriesMetadata implements ITimeSeriesMetadata {
@@ -123,6 +124,42 @@
}
/**
+ * Return null if excludedMeasurements contains the measurementId without deserializing chunk
+ * metadata.
+ */
+ public static TimeseriesMetadata deserializeFrom(
+ ByteBuffer buffer, Set<String> excludedMeasurements, boolean needChunkMetadata) {
+ byte timeseriesType = ReadWriteIOUtils.readByte(buffer);
+ String measurementID = ReadWriteIOUtils.readVarIntString(buffer);
+ TSDataType tsDataType = ReadWriteIOUtils.readDataType(buffer);
+ int chunkMetaDataListDataSize = ReadWriteForEncodingUtils.readUnsignedVarInt(buffer);
+ Statistics<? extends Serializable> statistics = Statistics.deserialize(buffer, tsDataType);
+ if (excludedMeasurements.contains(measurementID)) {
+ buffer.position(buffer.position() + chunkMetaDataListDataSize);
+ return null;
+ }
+ TimeseriesMetadata timeseriesMetaData = new TimeseriesMetadata();
+ timeseriesMetaData.setTimeSeriesMetadataType(timeseriesType);
+ timeseriesMetaData.setMeasurementId(measurementID);
+ timeseriesMetaData.setTSDataType(tsDataType);
+ timeseriesMetaData.setDataSizeOfChunkMetaDataList(chunkMetaDataListDataSize);
+ timeseriesMetaData.setStatistics(statistics);
+ if (needChunkMetadata) {
+ ByteBuffer byteBuffer = buffer.slice();
+ byteBuffer.limit(chunkMetaDataListDataSize);
+ timeseriesMetaData.chunkMetadataList = new ArrayList<>();
+ while (byteBuffer.hasRemaining()) {
+ timeseriesMetaData.chunkMetadataList.add(
+ ChunkMetadata.deserializeFrom(byteBuffer, timeseriesMetaData));
+ }
+ // minimize the storage of an ArrayList instance.
+ timeseriesMetaData.chunkMetadataList.trimToSize();
+ }
+ buffer.position(buffer.position() + chunkMetaDataListDataSize);
+ return timeseriesMetaData;
+ }
+
+ /**
* serialize to outputStream.
*
* @param outputStream outputStream
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java
index 514d16e..da511cc 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java
@@ -24,9 +24,7 @@
import org.apache.iotdb.tsfile.utils.Pair;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Iterator;
-import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
@@ -34,6 +32,7 @@
private final TsFileSequenceReader reader;
private final Queue<Pair<String, Pair<Long, Long>>> queue;
private Pair<String, Boolean> currentDevice = null;
+ private MetadataIndexNode measurementNode;
public TsFileDeviceIterator(
TsFileSequenceReader reader, Queue<Pair<String, Pair<Long, Long>>> queue) {
@@ -56,13 +55,12 @@
throw new NoSuchElementException();
}
Pair<String, Pair<Long, Long>> startEndPair = queue.remove();
- List<Pair<String, Boolean>> devices = new ArrayList<>();
try {
- MetadataIndexNode measurementNode =
+ // first measurement node of this device
+ this.measurementNode =
MetadataIndexNode.deserializeFrom(
reader.readData(startEndPair.right.left, startEndPair.right.right));
- // if tryToGetFirstTimeseriesMetadata(node) returns null, the device is not aligned
- boolean isAligned = reader.tryToGetFirstTimeseriesMetadata(measurementNode) != null;
+ boolean isAligned = reader.isAlignedDevice(measurementNode);
currentDevice = new Pair<>(startEndPair.left, isAligned);
return currentDevice;
} catch (IOException e) {
@@ -70,4 +68,8 @@
"Error occurred while reading a time series metadata block.");
}
}
+
+ public MetadataIndexNode getMeasurementNode() {
+ return measurementNode;
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 2c1020c..0c80ddf 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -801,6 +801,15 @@
}
}
+ /**
+ * Check whether the deivce is aligned or not.
+ *
+ * @param measurementNode the next measurement layer node of specific device node
+ */
+ public boolean isAlignedDevice(MetadataIndexNode measurementNode) {
+ return "".equals(measurementNode.getChildren().get(0).getName());
+ }
+
TimeseriesMetadata tryToGetFirstTimeseriesMetadata(MetadataIndexNode measurementNode)
throws IOException {
// Not aligned timeseries
@@ -834,6 +843,48 @@
}
/**
+ * Get timeseries metadata under the measurementNode and put them into timeseriesMetadataList.
+ * Skip timeseries whose measurementId is in the excludedMeasurementIds.
+ *
+ * @param measurementNode next layer measurement node of specific device leaf node
+ * @param excludedMeasurementIds skip timeseries whose measurementId is in the set
+ */
+ public void getDeviceTimeseriesMetadata(
+ List<TimeseriesMetadata> timeseriesMetadataList,
+ MetadataIndexNode measurementNode,
+ Set<String> excludedMeasurementIds,
+ boolean needChunkMetadata)
+ throws IOException {
+ int metadataIndexListSize = measurementNode.getChildren().size();
+ for (int i = 0; i < metadataIndexListSize; i++) {
+ long endOffset = measurementNode.getEndOffset();
+ if (i != metadataIndexListSize - 1) {
+ endOffset = measurementNode.getChildren().get(i + 1).getOffset();
+ }
+ ByteBuffer nextBuffer = readData(measurementNode.getChildren().get(i).getOffset(), endOffset);
+ if (measurementNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
+ // leaf measurement node
+ while (nextBuffer.hasRemaining()) {
+ TimeseriesMetadata timeseriesMetadata =
+ TimeseriesMetadata.deserializeFrom(
+ nextBuffer, excludedMeasurementIds, needChunkMetadata);
+ if (timeseriesMetadata != null) {
+ timeseriesMetadataList.add(timeseriesMetadata);
+ }
+ }
+ } else {
+ // internal measurement node
+ MetadataIndexNode nextLayerMeasurementNode = MetadataIndexNode.deserializeFrom(nextBuffer);
+ getDeviceTimeseriesMetadata(
+ timeseriesMetadataList,
+ nextLayerMeasurementNode,
+ excludedMeasurementIds,
+ needChunkMetadata);
+ }
+ }
+ }
+
+ /**
* Traverse the metadata index from MetadataIndexEntry to get TimeseriesMetadatas
*
* @param metadataIndex MetadataIndexEntry
@@ -1121,6 +1172,22 @@
header, buffer, chunkCacheKey.getDeleteIntervalList(), chunkCacheKey.getStatistics());
}
+ /** Get measurement schema by chunkMetadatas. */
+ public MeasurementSchema getMeasurementSchema(List<IChunkMetadata> chunkMetadataList)
+ throws IOException {
+ if (chunkMetadataList.isEmpty()) {
+ return null;
+ }
+ IChunkMetadata lastChunkMetadata = chunkMetadataList.get(chunkMetadataList.size() - 1);
+ int chunkHeadSize = ChunkHeader.getSerializedSize(lastChunkMetadata.getMeasurementUid());
+ ChunkHeader header = readChunkHeader(lastChunkMetadata.getOffsetOfChunkHeader(), chunkHeadSize);
+ return new MeasurementSchema(
+ lastChunkMetadata.getMeasurementUid(),
+ header.getDataType(),
+ header.getEncodingType(),
+ header.getCompressionType());
+ }
+
/**
* not thread safe.
*
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileGeneratorUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileGeneratorUtils.java
index 919851e..85bf39a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileGeneratorUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileGeneratorUtils.java
@@ -30,6 +30,7 @@
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
+import org.apache.iotdb.tsfile.write.record.datapoint.StringDataPoint;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -278,6 +279,112 @@
}
}
+ public static File generateAlignedTsFileWithTextValues(
+ String filePath,
+ List<Integer> deviceIndex,
+ List<Integer> measurementIndex,
+ int pointNum,
+ int startTime,
+ int chunkGroupSize,
+ int pageSize)
+ throws IOException, WriteProcessException {
+ File file = fsFactory.getFile(filePath);
+ if (file.exists()) {
+ file.delete();
+ }
+ if (chunkGroupSize > 0)
+ TSFileDescriptor.getInstance().getConfig().setGroupSizeInByte(chunkGroupSize);
+ if (pageSize > 0)
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(pageSize);
+ try (TsFileWriter tsFileWriter = new TsFileWriter(file)) {
+ // register align timeseries
+ List<MeasurementSchema> alignedMeasurementSchemas = new ArrayList<>();
+ for (int i = 0; i < measurementIndex.size(); i++) {
+ alignedMeasurementSchemas.add(
+ new MeasurementSchema(
+ "s" + measurementIndex.get(i), TSDataType.TEXT, TSEncoding.PLAIN));
+ }
+ for (int i = 0; i < deviceIndex.size(); i++) {
+ tsFileWriter.registerAlignedTimeseries(
+ new Path(
+ testStorageGroup + PATH_SEPARATOR + "d" + (deviceIndex.get(i) + alignDeviceOffset)),
+ alignedMeasurementSchemas);
+ }
+
+ // write with record
+ for (int i = 0; i < deviceIndex.size(); i++) {
+ for (long time = startTime; time < pointNum + startTime; time++) {
+ // construct TsRecord
+ TSRecord tsRecord =
+ new TSRecord(
+ time,
+ testStorageGroup
+ + PATH_SEPARATOR
+ + "d"
+ + (deviceIndex.get(i) + alignDeviceOffset));
+ for (IMeasurementSchema schema : alignedMeasurementSchemas) {
+ DataPoint dPoint =
+ new StringDataPoint(schema.getMeasurementId(), new Binary("textValue"));
+ tsRecord.addTuple(dPoint);
+ }
+ // write
+ tsFileWriter.writeAligned(tsRecord);
+ }
+ }
+ }
+ return file;
+ }
+
+ public static File generateNonAlignedTsFileWithTextValues(
+ String filePath,
+ List<Integer> deviceIndex,
+ List<Integer> measurementIndex,
+ int pointNum,
+ int startTime,
+ int chunkGroupSize,
+ int pageSize)
+ throws IOException, WriteProcessException {
+ File file = fsFactory.getFile(filePath);
+ if (file.exists()) {
+ file.delete();
+ }
+ if (chunkGroupSize > 0)
+ TSFileDescriptor.getInstance().getConfig().setGroupSizeInByte(chunkGroupSize);
+ if (pageSize > 0)
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(pageSize);
+ try (TsFileWriter tsFileWriter = new TsFileWriter(file)) {
+ // register nonAlign timeseries
+ List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+ for (int i = 0; i < measurementIndex.size(); i++) {
+ measurementSchemas.add(
+ new MeasurementSchema(
+ "s" + measurementIndex.get(i), TSDataType.TEXT, TSEncoding.PLAIN));
+ }
+ for (int i = 0; i < deviceIndex.size(); i++) {
+ tsFileWriter.registerTimeseries(
+ new Path(testStorageGroup + PATH_SEPARATOR + "d" + deviceIndex.get(i)),
+ measurementSchemas);
+ }
+
+ // write with record
+ for (int i = 0; i < deviceIndex.size(); i++) {
+ for (long time = startTime; time < pointNum + startTime; time++) {
+ // construct TsRecord
+ TSRecord tsRecord =
+ new TSRecord(time, testStorageGroup + PATH_SEPARATOR + "d" + deviceIndex.get(i));
+ for (IMeasurementSchema schema : measurementSchemas) {
+ DataPoint dPoint =
+ new StringDataPoint(schema.getMeasurementId(), new Binary("textValue"));
+ tsRecord.addTuple(dPoint);
+ }
+ // write
+ tsFileWriter.write(tsRecord);
+ }
+ }
+ return file;
+ }
+ }
+
public static String getTsFilePath(String fileParentPath, long tsFileVersion) {
String fileName =
System.currentTimeMillis()