[IOTDB-4364]Reduce read amplication in compaction (#7312)
diff --git a/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java b/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
index 58b26db..db7072c 100644
--- a/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
+++ b/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
@@ -32,6 +32,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
public class TimeseriesMetadata implements ITimeSeriesMetadata {
@@ -123,6 +124,42 @@
}
/**
+ * Return null if excludedMeasurements contains the measurementId without deserializing chunk
+ * metadata.
+ */
+ public static TimeseriesMetadata deserializeFrom(
+ ByteBuffer buffer, Set<String> excludedMeasurements, boolean needChunkMetadata) {
+ byte timeseriesType = ReadWriteIOUtils.readByte(buffer);
+ String measurementID = ReadWriteIOUtils.readVarIntString(buffer);
+ TSDataType tsDataType = ReadWriteIOUtils.readDataType(buffer);
+ int chunkMetaDataListDataSize = ReadWriteForEncodingUtils.readUnsignedVarInt(buffer);
+ Statistics<? extends Serializable> statistics = Statistics.deserialize(buffer, tsDataType);
+ if (excludedMeasurements.contains(measurementID)) {
+ buffer.position(buffer.position() + chunkMetaDataListDataSize);
+ return null;
+ }
+ TimeseriesMetadata timeseriesMetaData = new TimeseriesMetadata();
+ timeseriesMetaData.setTimeSeriesMetadataType(timeseriesType);
+ timeseriesMetaData.setMeasurementId(measurementID);
+ timeseriesMetaData.setTSDataType(tsDataType);
+ timeseriesMetaData.setDataSizeOfChunkMetaDataList(chunkMetaDataListDataSize);
+ timeseriesMetaData.setStatistics(statistics);
+ if (needChunkMetadata) {
+ ByteBuffer byteBuffer = buffer.slice();
+ byteBuffer.limit(chunkMetaDataListDataSize);
+ timeseriesMetaData.chunkMetadataList = new ArrayList<>();
+ while (byteBuffer.hasRemaining()) {
+ timeseriesMetaData.chunkMetadataList.add(
+ ChunkMetadata.deserializeFrom(byteBuffer, timeseriesMetaData));
+ }
+ // minimize the storage of an ArrayList instance.
+ timeseriesMetaData.chunkMetadataList.trimToSize();
+ }
+ buffer.position(buffer.position() + chunkMetaDataListDataSize);
+ return timeseriesMetaData;
+ }
+
+ /**
* serialize to outputStream.
*
* @param outputStream outputStream
diff --git a/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java b/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java
index 514d16e..da511cc 100644
--- a/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java
+++ b/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java
@@ -24,9 +24,7 @@
import org.apache.iotdb.tsfile.utils.Pair;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Iterator;
-import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
@@ -34,6 +32,7 @@
private final TsFileSequenceReader reader;
private final Queue<Pair<String, Pair<Long, Long>>> queue;
private Pair<String, Boolean> currentDevice = null;
+ private MetadataIndexNode measurementNode;
public TsFileDeviceIterator(
TsFileSequenceReader reader, Queue<Pair<String, Pair<Long, Long>>> queue) {
@@ -56,13 +55,12 @@
throw new NoSuchElementException();
}
Pair<String, Pair<Long, Long>> startEndPair = queue.remove();
- List<Pair<String, Boolean>> devices = new ArrayList<>();
try {
- MetadataIndexNode measurementNode =
+ // first measurement node of this device
+ this.measurementNode =
MetadataIndexNode.deserializeFrom(
reader.readData(startEndPair.right.left, startEndPair.right.right));
- // if tryToGetFirstTimeseriesMetadata(node) returns null, the device is not aligned
- boolean isAligned = reader.tryToGetFirstTimeseriesMetadata(measurementNode) != null;
+ boolean isAligned = reader.isAlignedDevice(measurementNode);
currentDevice = new Pair<>(startEndPair.left, isAligned);
return currentDevice;
} catch (IOException e) {
@@ -70,4 +68,8 @@
"Error occurred while reading a time series metadata block.");
}
}
+
+ public MetadataIndexNode getMeasurementNode() {
+ return measurementNode;
+ }
}
diff --git a/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 2c1020c..0c80ddf 100644
--- a/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -801,6 +801,15 @@
}
}
+ /**
+ * Check whether the deivce is aligned or not.
+ *
+ * @param measurementNode the next measurement layer node of specific device node
+ */
+ public boolean isAlignedDevice(MetadataIndexNode measurementNode) {
+ return "".equals(measurementNode.getChildren().get(0).getName());
+ }
+
TimeseriesMetadata tryToGetFirstTimeseriesMetadata(MetadataIndexNode measurementNode)
throws IOException {
// Not aligned timeseries
@@ -834,6 +843,48 @@
}
/**
+ * Get timeseries metadata under the measurementNode and put them into timeseriesMetadataList.
+ * Skip timeseries whose measurementId is in the excludedMeasurementIds.
+ *
+ * @param measurementNode next layer measurement node of specific device leaf node
+ * @param excludedMeasurementIds skip timeseries whose measurementId is in the set
+ */
+ public void getDeviceTimeseriesMetadata(
+ List<TimeseriesMetadata> timeseriesMetadataList,
+ MetadataIndexNode measurementNode,
+ Set<String> excludedMeasurementIds,
+ boolean needChunkMetadata)
+ throws IOException {
+ int metadataIndexListSize = measurementNode.getChildren().size();
+ for (int i = 0; i < metadataIndexListSize; i++) {
+ long endOffset = measurementNode.getEndOffset();
+ if (i != metadataIndexListSize - 1) {
+ endOffset = measurementNode.getChildren().get(i + 1).getOffset();
+ }
+ ByteBuffer nextBuffer = readData(measurementNode.getChildren().get(i).getOffset(), endOffset);
+ if (measurementNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
+ // leaf measurement node
+ while (nextBuffer.hasRemaining()) {
+ TimeseriesMetadata timeseriesMetadata =
+ TimeseriesMetadata.deserializeFrom(
+ nextBuffer, excludedMeasurementIds, needChunkMetadata);
+ if (timeseriesMetadata != null) {
+ timeseriesMetadataList.add(timeseriesMetadata);
+ }
+ }
+ } else {
+ // internal measurement node
+ MetadataIndexNode nextLayerMeasurementNode = MetadataIndexNode.deserializeFrom(nextBuffer);
+ getDeviceTimeseriesMetadata(
+ timeseriesMetadataList,
+ nextLayerMeasurementNode,
+ excludedMeasurementIds,
+ needChunkMetadata);
+ }
+ }
+ }
+
+ /**
* Traverse the metadata index from MetadataIndexEntry to get TimeseriesMetadatas
*
* @param metadataIndex MetadataIndexEntry
@@ -1121,6 +1172,22 @@
header, buffer, chunkCacheKey.getDeleteIntervalList(), chunkCacheKey.getStatistics());
}
+ /** Get measurement schema by chunkMetadatas. */
+ public MeasurementSchema getMeasurementSchema(List<IChunkMetadata> chunkMetadataList)
+ throws IOException {
+ if (chunkMetadataList.isEmpty()) {
+ return null;
+ }
+ IChunkMetadata lastChunkMetadata = chunkMetadataList.get(chunkMetadataList.size() - 1);
+ int chunkHeadSize = ChunkHeader.getSerializedSize(lastChunkMetadata.getMeasurementUid());
+ ChunkHeader header = readChunkHeader(lastChunkMetadata.getOffsetOfChunkHeader(), chunkHeadSize);
+ return new MeasurementSchema(
+ lastChunkMetadata.getMeasurementUid(),
+ header.getDataType(),
+ header.getEncodingType(),
+ header.getCompressionType());
+ }
+
/**
* not thread safe.
*
diff --git a/src/main/java/org/apache/iotdb/tsfile/utils/TsFileGeneratorUtils.java b/src/main/java/org/apache/iotdb/tsfile/utils/TsFileGeneratorUtils.java
index 919851e..85bf39a 100644
--- a/src/main/java/org/apache/iotdb/tsfile/utils/TsFileGeneratorUtils.java
+++ b/src/main/java/org/apache/iotdb/tsfile/utils/TsFileGeneratorUtils.java
@@ -30,6 +30,7 @@
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
+import org.apache.iotdb.tsfile.write.record.datapoint.StringDataPoint;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -278,6 +279,112 @@
}
}
+ public static File generateAlignedTsFileWithTextValues(
+ String filePath,
+ List<Integer> deviceIndex,
+ List<Integer> measurementIndex,
+ int pointNum,
+ int startTime,
+ int chunkGroupSize,
+ int pageSize)
+ throws IOException, WriteProcessException {
+ File file = fsFactory.getFile(filePath);
+ if (file.exists()) {
+ file.delete();
+ }
+ if (chunkGroupSize > 0)
+ TSFileDescriptor.getInstance().getConfig().setGroupSizeInByte(chunkGroupSize);
+ if (pageSize > 0)
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(pageSize);
+ try (TsFileWriter tsFileWriter = new TsFileWriter(file)) {
+ // register align timeseries
+ List<MeasurementSchema> alignedMeasurementSchemas = new ArrayList<>();
+ for (int i = 0; i < measurementIndex.size(); i++) {
+ alignedMeasurementSchemas.add(
+ new MeasurementSchema(
+ "s" + measurementIndex.get(i), TSDataType.TEXT, TSEncoding.PLAIN));
+ }
+ for (int i = 0; i < deviceIndex.size(); i++) {
+ tsFileWriter.registerAlignedTimeseries(
+ new Path(
+ testStorageGroup + PATH_SEPARATOR + "d" + (deviceIndex.get(i) + alignDeviceOffset)),
+ alignedMeasurementSchemas);
+ }
+
+ // write with record
+ for (int i = 0; i < deviceIndex.size(); i++) {
+ for (long time = startTime; time < pointNum + startTime; time++) {
+ // construct TsRecord
+ TSRecord tsRecord =
+ new TSRecord(
+ time,
+ testStorageGroup
+ + PATH_SEPARATOR
+ + "d"
+ + (deviceIndex.get(i) + alignDeviceOffset));
+ for (IMeasurementSchema schema : alignedMeasurementSchemas) {
+ DataPoint dPoint =
+ new StringDataPoint(schema.getMeasurementId(), new Binary("textValue"));
+ tsRecord.addTuple(dPoint);
+ }
+ // write
+ tsFileWriter.writeAligned(tsRecord);
+ }
+ }
+ }
+ return file;
+ }
+
+ public static File generateNonAlignedTsFileWithTextValues(
+ String filePath,
+ List<Integer> deviceIndex,
+ List<Integer> measurementIndex,
+ int pointNum,
+ int startTime,
+ int chunkGroupSize,
+ int pageSize)
+ throws IOException, WriteProcessException {
+ File file = fsFactory.getFile(filePath);
+ if (file.exists()) {
+ file.delete();
+ }
+ if (chunkGroupSize > 0)
+ TSFileDescriptor.getInstance().getConfig().setGroupSizeInByte(chunkGroupSize);
+ if (pageSize > 0)
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(pageSize);
+ try (TsFileWriter tsFileWriter = new TsFileWriter(file)) {
+ // register nonAlign timeseries
+ List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+ for (int i = 0; i < measurementIndex.size(); i++) {
+ measurementSchemas.add(
+ new MeasurementSchema(
+ "s" + measurementIndex.get(i), TSDataType.TEXT, TSEncoding.PLAIN));
+ }
+ for (int i = 0; i < deviceIndex.size(); i++) {
+ tsFileWriter.registerTimeseries(
+ new Path(testStorageGroup + PATH_SEPARATOR + "d" + deviceIndex.get(i)),
+ measurementSchemas);
+ }
+
+ // write with record
+ for (int i = 0; i < deviceIndex.size(); i++) {
+ for (long time = startTime; time < pointNum + startTime; time++) {
+ // construct TsRecord
+ TSRecord tsRecord =
+ new TSRecord(time, testStorageGroup + PATH_SEPARATOR + "d" + deviceIndex.get(i));
+ for (IMeasurementSchema schema : measurementSchemas) {
+ DataPoint dPoint =
+ new StringDataPoint(schema.getMeasurementId(), new Binary("textValue"));
+ tsRecord.addTuple(dPoint);
+ }
+ // write
+ tsFileWriter.write(tsRecord);
+ }
+ }
+ return file;
+ }
+ }
+
public static String getTsFilePath(String fileParentPath, long tsFileVersion) {
String fileName =
System.currentTimeMillis()