[IOTDB-4364]Reduce read amplication in compaction (#7312)

diff --git a/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java b/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
index 58b26db..db7072c 100644
--- a/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
+++ b/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
@@ -32,6 +32,7 @@
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 public class TimeseriesMetadata implements ITimeSeriesMetadata {
 
@@ -123,6 +124,42 @@
   }
 
   /**
+   * Return null if excludedMeasurements contains the measurementId without deserializing chunk
+   * metadata.
+   */
+  public static TimeseriesMetadata deserializeFrom(
+      ByteBuffer buffer, Set<String> excludedMeasurements, boolean needChunkMetadata) {
+    byte timeseriesType = ReadWriteIOUtils.readByte(buffer);
+    String measurementID = ReadWriteIOUtils.readVarIntString(buffer);
+    TSDataType tsDataType = ReadWriteIOUtils.readDataType(buffer);
+    int chunkMetaDataListDataSize = ReadWriteForEncodingUtils.readUnsignedVarInt(buffer);
+    Statistics<? extends Serializable> statistics = Statistics.deserialize(buffer, tsDataType);
+    if (excludedMeasurements.contains(measurementID)) {
+      buffer.position(buffer.position() + chunkMetaDataListDataSize);
+      return null;
+    }
+    TimeseriesMetadata timeseriesMetaData = new TimeseriesMetadata();
+    timeseriesMetaData.setTimeSeriesMetadataType(timeseriesType);
+    timeseriesMetaData.setMeasurementId(measurementID);
+    timeseriesMetaData.setTSDataType(tsDataType);
+    timeseriesMetaData.setDataSizeOfChunkMetaDataList(chunkMetaDataListDataSize);
+    timeseriesMetaData.setStatistics(statistics);
+    if (needChunkMetadata) {
+      ByteBuffer byteBuffer = buffer.slice();
+      byteBuffer.limit(chunkMetaDataListDataSize);
+      timeseriesMetaData.chunkMetadataList = new ArrayList<>();
+      while (byteBuffer.hasRemaining()) {
+        timeseriesMetaData.chunkMetadataList.add(
+            ChunkMetadata.deserializeFrom(byteBuffer, timeseriesMetaData));
+      }
+      // minimize the storage of an ArrayList instance.
+      timeseriesMetaData.chunkMetadataList.trimToSize();
+    }
+    buffer.position(buffer.position() + chunkMetaDataListDataSize);
+    return timeseriesMetaData;
+  }
+
+  /**
    * serialize to outputStream.
    *
    * @param outputStream outputStream
diff --git a/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java b/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java
index 514d16e..da511cc 100644
--- a/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java
+++ b/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java
@@ -24,9 +24,7 @@
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Queue;
 
@@ -34,6 +32,7 @@
   private final TsFileSequenceReader reader;
   private final Queue<Pair<String, Pair<Long, Long>>> queue;
   private Pair<String, Boolean> currentDevice = null;
+  private MetadataIndexNode measurementNode;
 
   public TsFileDeviceIterator(
       TsFileSequenceReader reader, Queue<Pair<String, Pair<Long, Long>>> queue) {
@@ -56,13 +55,12 @@
       throw new NoSuchElementException();
     }
     Pair<String, Pair<Long, Long>> startEndPair = queue.remove();
-    List<Pair<String, Boolean>> devices = new ArrayList<>();
     try {
-      MetadataIndexNode measurementNode =
+      // first measurement node of this device
+      this.measurementNode =
           MetadataIndexNode.deserializeFrom(
               reader.readData(startEndPair.right.left, startEndPair.right.right));
-      // if tryToGetFirstTimeseriesMetadata(node) returns null, the device is not aligned
-      boolean isAligned = reader.tryToGetFirstTimeseriesMetadata(measurementNode) != null;
+      boolean isAligned = reader.isAlignedDevice(measurementNode);
       currentDevice = new Pair<>(startEndPair.left, isAligned);
       return currentDevice;
     } catch (IOException e) {
@@ -70,4 +68,8 @@
           "Error occurred while reading a time series metadata block.");
     }
   }
+
+  public MetadataIndexNode getMeasurementNode() {
+    return measurementNode;
+  }
 }
diff --git a/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 2c1020c..0c80ddf 100644
--- a/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -801,6 +801,15 @@
     }
   }
 
+  /**
+   * Check whether the deivce is aligned or not.
+   *
+   * @param measurementNode the next measurement layer node of specific device node
+   */
+  public boolean isAlignedDevice(MetadataIndexNode measurementNode) {
+    return "".equals(measurementNode.getChildren().get(0).getName());
+  }
+
   TimeseriesMetadata tryToGetFirstTimeseriesMetadata(MetadataIndexNode measurementNode)
       throws IOException {
     // Not aligned timeseries
@@ -834,6 +843,48 @@
   }
 
   /**
+   * Get timeseries metadata under the measurementNode and put them into timeseriesMetadataList.
+   * Skip timeseries whose measurementId is in the excludedMeasurementIds.
+   *
+   * @param measurementNode next layer measurement node of specific device leaf node
+   * @param excludedMeasurementIds skip timeseries whose measurementId is in the set
+   */
+  public void getDeviceTimeseriesMetadata(
+      List<TimeseriesMetadata> timeseriesMetadataList,
+      MetadataIndexNode measurementNode,
+      Set<String> excludedMeasurementIds,
+      boolean needChunkMetadata)
+      throws IOException {
+    int metadataIndexListSize = measurementNode.getChildren().size();
+    for (int i = 0; i < metadataIndexListSize; i++) {
+      long endOffset = measurementNode.getEndOffset();
+      if (i != metadataIndexListSize - 1) {
+        endOffset = measurementNode.getChildren().get(i + 1).getOffset();
+      }
+      ByteBuffer nextBuffer = readData(measurementNode.getChildren().get(i).getOffset(), endOffset);
+      if (measurementNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
+        // leaf measurement node
+        while (nextBuffer.hasRemaining()) {
+          TimeseriesMetadata timeseriesMetadata =
+              TimeseriesMetadata.deserializeFrom(
+                  nextBuffer, excludedMeasurementIds, needChunkMetadata);
+          if (timeseriesMetadata != null) {
+            timeseriesMetadataList.add(timeseriesMetadata);
+          }
+        }
+      } else {
+        // internal measurement node
+        MetadataIndexNode nextLayerMeasurementNode = MetadataIndexNode.deserializeFrom(nextBuffer);
+        getDeviceTimeseriesMetadata(
+            timeseriesMetadataList,
+            nextLayerMeasurementNode,
+            excludedMeasurementIds,
+            needChunkMetadata);
+      }
+    }
+  }
+
+  /**
    * Traverse the metadata index from MetadataIndexEntry to get TimeseriesMetadatas
    *
    * @param metadataIndex MetadataIndexEntry
@@ -1121,6 +1172,22 @@
         header, buffer, chunkCacheKey.getDeleteIntervalList(), chunkCacheKey.getStatistics());
   }
 
+  /** Get measurement schema by chunkMetadatas. */
+  public MeasurementSchema getMeasurementSchema(List<IChunkMetadata> chunkMetadataList)
+      throws IOException {
+    if (chunkMetadataList.isEmpty()) {
+      return null;
+    }
+    IChunkMetadata lastChunkMetadata = chunkMetadataList.get(chunkMetadataList.size() - 1);
+    int chunkHeadSize = ChunkHeader.getSerializedSize(lastChunkMetadata.getMeasurementUid());
+    ChunkHeader header = readChunkHeader(lastChunkMetadata.getOffsetOfChunkHeader(), chunkHeadSize);
+    return new MeasurementSchema(
+        lastChunkMetadata.getMeasurementUid(),
+        header.getDataType(),
+        header.getEncodingType(),
+        header.getCompressionType());
+  }
+
   /**
    * not thread safe.
    *
diff --git a/src/main/java/org/apache/iotdb/tsfile/utils/TsFileGeneratorUtils.java b/src/main/java/org/apache/iotdb/tsfile/utils/TsFileGeneratorUtils.java
index 919851e..85bf39a 100644
--- a/src/main/java/org/apache/iotdb/tsfile/utils/TsFileGeneratorUtils.java
+++ b/src/main/java/org/apache/iotdb/tsfile/utils/TsFileGeneratorUtils.java
@@ -30,6 +30,7 @@
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
 import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
+import org.apache.iotdb.tsfile.write.record.datapoint.StringDataPoint;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
@@ -278,6 +279,112 @@
     }
   }
 
+  public static File generateAlignedTsFileWithTextValues(
+      String filePath,
+      List<Integer> deviceIndex,
+      List<Integer> measurementIndex,
+      int pointNum,
+      int startTime,
+      int chunkGroupSize,
+      int pageSize)
+      throws IOException, WriteProcessException {
+    File file = fsFactory.getFile(filePath);
+    if (file.exists()) {
+      file.delete();
+    }
+    if (chunkGroupSize > 0)
+      TSFileDescriptor.getInstance().getConfig().setGroupSizeInByte(chunkGroupSize);
+    if (pageSize > 0)
+      TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(pageSize);
+    try (TsFileWriter tsFileWriter = new TsFileWriter(file)) {
+      // register align timeseries
+      List<MeasurementSchema> alignedMeasurementSchemas = new ArrayList<>();
+      for (int i = 0; i < measurementIndex.size(); i++) {
+        alignedMeasurementSchemas.add(
+            new MeasurementSchema(
+                "s" + measurementIndex.get(i), TSDataType.TEXT, TSEncoding.PLAIN));
+      }
+      for (int i = 0; i < deviceIndex.size(); i++) {
+        tsFileWriter.registerAlignedTimeseries(
+            new Path(
+                testStorageGroup + PATH_SEPARATOR + "d" + (deviceIndex.get(i) + alignDeviceOffset)),
+            alignedMeasurementSchemas);
+      }
+
+      // write with record
+      for (int i = 0; i < deviceIndex.size(); i++) {
+        for (long time = startTime; time < pointNum + startTime; time++) {
+          // construct TsRecord
+          TSRecord tsRecord =
+              new TSRecord(
+                  time,
+                  testStorageGroup
+                      + PATH_SEPARATOR
+                      + "d"
+                      + (deviceIndex.get(i) + alignDeviceOffset));
+          for (IMeasurementSchema schema : alignedMeasurementSchemas) {
+            DataPoint dPoint =
+                new StringDataPoint(schema.getMeasurementId(), new Binary("textValue"));
+            tsRecord.addTuple(dPoint);
+          }
+          // write
+          tsFileWriter.writeAligned(tsRecord);
+        }
+      }
+    }
+    return file;
+  }
+
+  public static File generateNonAlignedTsFileWithTextValues(
+      String filePath,
+      List<Integer> deviceIndex,
+      List<Integer> measurementIndex,
+      int pointNum,
+      int startTime,
+      int chunkGroupSize,
+      int pageSize)
+      throws IOException, WriteProcessException {
+    File file = fsFactory.getFile(filePath);
+    if (file.exists()) {
+      file.delete();
+    }
+    if (chunkGroupSize > 0)
+      TSFileDescriptor.getInstance().getConfig().setGroupSizeInByte(chunkGroupSize);
+    if (pageSize > 0)
+      TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(pageSize);
+    try (TsFileWriter tsFileWriter = new TsFileWriter(file)) {
+      // register nonAlign timeseries
+      List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+      for (int i = 0; i < measurementIndex.size(); i++) {
+        measurementSchemas.add(
+            new MeasurementSchema(
+                "s" + measurementIndex.get(i), TSDataType.TEXT, TSEncoding.PLAIN));
+      }
+      for (int i = 0; i < deviceIndex.size(); i++) {
+        tsFileWriter.registerTimeseries(
+            new Path(testStorageGroup + PATH_SEPARATOR + "d" + deviceIndex.get(i)),
+            measurementSchemas);
+      }
+
+      // write with record
+      for (int i = 0; i < deviceIndex.size(); i++) {
+        for (long time = startTime; time < pointNum + startTime; time++) {
+          // construct TsRecord
+          TSRecord tsRecord =
+              new TSRecord(time, testStorageGroup + PATH_SEPARATOR + "d" + deviceIndex.get(i));
+          for (IMeasurementSchema schema : measurementSchemas) {
+            DataPoint dPoint =
+                new StringDataPoint(schema.getMeasurementId(), new Binary("textValue"));
+            tsRecord.addTuple(dPoint);
+          }
+          // write
+          tsFileWriter.write(tsRecord);
+        }
+      }
+      return file;
+    }
+  }
+
   public static String getTsFilePath(String fileParentPath, long tsFileVersion) {
     String fileName =
         System.currentTimeMillis()