[IOTDB-4364]Reduce read amplication in compaction (#7312)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java
index 4280d2a..bc04c09 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java
@@ -79,7 +79,7 @@
               device,
               Collections.singletonList(measurement),
               measurementSchemas,
-              measurementList,
+              schemaMap.keySet(),
               fragmentInstanceContext,
               queryDataSource,
               false);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/MultiTsFileDeviceIterator.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/MultiTsFileDeviceIterator.java
index 73f3c2c..304ff6d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/MultiTsFileDeviceIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/MultiTsFileDeviceIterator.java
@@ -32,6 +32,7 @@
 import org.apache.iotdb.tsfile.read.TsFileDeviceIterator;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -48,10 +49,11 @@
 import java.util.stream.Collectors;
 
 public class MultiTsFileDeviceIterator implements AutoCloseable {
-  private List<TsFileResource> tsFileResources;
-  private Map<TsFileResource, TsFileSequenceReader> readerMap = new HashMap<>();
-  private Map<TsFileResource, TsFileDeviceIterator> deviceIteratorMap = new HashMap<>();
-  private Map<TsFileResource, List<Modification>> modificationCache = new HashMap<>();
+  // sorted from the newest to the oldest
+  private final List<TsFileResource> tsFileResources;
+  private final Map<TsFileResource, TsFileSequenceReader> readerMap = new HashMap<>();
+  private final Map<TsFileResource, TsFileDeviceIterator> deviceIteratorMap = new HashMap<>();
+  private final Map<TsFileResource, List<Modification>> modificationCache = new HashMap<>();
   private Pair<String, Boolean> currentDevice = null;
 
   /** Used for inner space compaction. */
@@ -77,13 +79,10 @@
   /** Used for cross space compaction. */
   public MultiTsFileDeviceIterator(
       List<TsFileResource> seqResources, List<TsFileResource> unseqResources) throws IOException {
-    for (TsFileResource tsFileResource : seqResources) {
-      TsFileSequenceReader reader =
-          FileReaderManager.getInstance().get(tsFileResource.getTsFilePath(), true);
-      readerMap.put(tsFileResource, reader);
-      deviceIteratorMap.put(tsFileResource, reader.getAllDevicesIteratorWithIsAligned());
-    }
-    for (TsFileResource tsFileResource : unseqResources) {
+    this.tsFileResources = new ArrayList<>(seqResources);
+    tsFileResources.addAll(unseqResources);
+    Collections.sort(this.tsFileResources, TsFileResource::compareFileNameByDesc);
+    for (TsFileResource tsFileResource : tsFileResources) {
       TsFileSequenceReader reader =
           FileReaderManager.getInstance().get(tsFileResource.getTsFilePath(), true);
       readerMap.put(tsFileResource, reader);
@@ -135,6 +134,38 @@
   }
 
   /**
+   * Get all measurements and schemas of the current device from source files. Traverse all the
+   * files from the newest to the oldest in turn and start traversing the index tree from the
+   * firstMeasurementNode node to get all the measurements under the current device.
+   */
+  public Map<String, MeasurementSchema> getAllMeasurementSchemas() throws IOException {
+    Map<String, MeasurementSchema> schemaMap = new ConcurrentHashMap<>();
+    // get schemas from the newest file to the oldest file
+    for (TsFileResource resource : tsFileResources) {
+      if (!deviceIteratorMap.containsKey(resource)) {
+        continue;
+      }
+      TsFileSequenceReader reader = readerMap.get(resource);
+      List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
+      reader.getDeviceTimeseriesMetadata(
+          timeseriesMetadataList,
+          deviceIteratorMap.get(resource).getMeasurementNode(),
+          schemaMap.keySet(),
+          true);
+      for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) {
+        if (!schemaMap.containsKey(timeseriesMetadata.getMeasurementId())
+            && !timeseriesMetadata.getChunkMetadataList().isEmpty()) {
+          schemaMap.put(
+              timeseriesMetadata.getMeasurementId(),
+              reader.getMeasurementSchema(timeseriesMetadata.getChunkMetadataList()));
+        }
+      }
+    }
+    schemaMap.remove("");
+    return schemaMap;
+  }
+
+  /**
    * return MeasurementIterator, who iterates the measurements of not aligned device
    *
    * @param device the full path of the device to be iterated
@@ -146,10 +177,6 @@
     return new MeasurementIterator(readerMap, device, derserializeTimeseriesMetadata);
   }
 
-  public AlignedMeasurementIterator iterateAlignedSeries(String device) {
-    return new AlignedMeasurementIterator(device, new ArrayList<>(readerMap.values()));
-  }
-
   /**
    * return a list of the tsfile reader and its aligned chunk metadata list for the aligned device
    * which this iterator is visiting. If there is any modification for this device, it will be
@@ -236,25 +263,6 @@
     }
   }
 
-  public class AlignedMeasurementIterator {
-    private List<TsFileSequenceReader> sequenceReaders;
-    private String device;
-
-    private AlignedMeasurementIterator(String device, List<TsFileSequenceReader> sequenceReaders) {
-      this.device = device;
-      this.sequenceReaders = sequenceReaders;
-    }
-
-    public Set<String> getAllMeasurements() throws IOException {
-      Map<String, TimeseriesMetadata> deviceMeasurementsMap = new ConcurrentHashMap<>();
-      for (TsFileSequenceReader reader : sequenceReaders) {
-        deviceMeasurementsMap.putAll(reader.readDeviceMetadata(device));
-      }
-      deviceMeasurementsMap.remove("");
-      return deviceMeasurementsMap.keySet();
-    }
-  }
-
   public class MeasurementIterator {
     private Map<TsFileResource, TsFileSequenceReader> readerMap;
     private String device;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
index 0b8df32..de9d36e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
@@ -23,7 +23,6 @@
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.cache.ChunkCache;
 import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.cross.rewrite.task.ReadPointPerformerSubTask;
 import org.apache.iotdb.db.engine.compaction.inner.utils.MultiTsFileDeviceIterator;
@@ -36,22 +35,16 @@
 import org.apache.iotdb.db.engine.compaction.writer.CrossSpaceCompactionWriter;
 import org.apache.iotdb.db.engine.compaction.writer.InnerSpaceCompactionWriter;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
-import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.utils.QueryUtils;
-import org.apache.iotdb.tsfile.file.header.ChunkHeader;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
-import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.read.common.Chunk;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.reader.IPointReader;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -65,9 +58,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -82,7 +73,7 @@
   private List<TsFileResource> unseqFiles = Collections.emptyList();
   private static final int subTaskNum =
       IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum();
-  private Map<TsFileResource, TsFileSequenceReader> readerCacheMap = new HashMap<>();
+
   private CompactionTaskSummary summary;
 
   private List<TsFileResource> targetFiles = Collections.emptyList();
@@ -140,7 +131,6 @@
       updateDeviceStartTimeAndEndTime(targetFiles, compactionWriter);
       updatePlanIndexes(targetFiles, seqFiles, unseqFiles);
     } finally {
-      clearReaderCache();
       QueryResourceManager.getInstance().endQuery(queryId);
     }
   }
@@ -162,10 +152,7 @@
       FragmentInstanceContext fragmentInstanceContext,
       QueryDataSource queryDataSource)
       throws IOException, MetadataException {
-    MultiTsFileDeviceIterator.AlignedMeasurementIterator alignedMeasurementIterator =
-        deviceIterator.iterateAlignedSeries(device);
-    Set<String> allMeasurements = alignedMeasurementIterator.getAllMeasurements();
-    Map<String, MeasurementSchema> schemaMap = getMeasurementSchema(device, allMeasurements);
+    Map<String, MeasurementSchema> schemaMap = deviceIterator.getAllMeasurementSchemas();
     List<IMeasurementSchema> measurementSchemas = new ArrayList<>(schemaMap.values());
     if (measurementSchemas.isEmpty()) {
       return;
@@ -179,7 +166,7 @@
             device,
             existedMeasurements,
             measurementSchemas,
-            allMeasurements,
+            schemaMap.keySet(),
             fragmentInstanceContext,
             queryDataSource,
             true);
@@ -200,17 +187,14 @@
       AbstractCompactionWriter compactionWriter,
       FragmentInstanceContext fragmentInstanceContext,
       QueryDataSource queryDataSource)
-      throws IOException, InterruptedException, IllegalPathException {
-    MultiTsFileDeviceIterator.MeasurementIterator measurementIterator =
-        deviceIterator.iterateNotAlignedSeries(device, false);
-    Set<String> allMeasurements = measurementIterator.getAllMeasurements();
-    int subTaskNums = Math.min(allMeasurements.size(), subTaskNum);
-    Map<String, MeasurementSchema> schemaMap = getMeasurementSchema(device, allMeasurements);
+      throws IOException, InterruptedException {
+    Map<String, MeasurementSchema> measurementSchemaMap = deviceIterator.getAllMeasurementSchemas();
+    int subTaskNums = Math.min(measurementSchemaMap.size(), subTaskNum);
 
     // assign all measurements to different sub tasks
     Set<String>[] measurementsForEachSubTask = new HashSet[subTaskNums];
     int idx = 0;
-    for (String measurement : allMeasurements) {
+    for (String measurement : measurementSchemaMap.keySet()) {
       if (measurementsForEachSubTask[idx % subTaskNums] == null) {
         measurementsForEachSubTask[idx % subTaskNums] = new HashSet<>();
       }
@@ -230,7 +214,7 @@
                       fragmentInstanceContext,
                       queryDataSource,
                       compactionWriter,
-                      schemaMap,
+                      measurementSchemaMap,
                       i)));
     }
 
@@ -247,76 +231,6 @@
     compactionWriter.endChunkGroup();
   }
 
-  private Map<String, MeasurementSchema> getMeasurementSchema(
-      String device, Set<String> measurements) throws IllegalPathException, IOException {
-    HashMap<String, MeasurementSchema> schemaMap = new HashMap<>();
-    List<TsFileResource> allResources = new LinkedList<>(seqFiles);
-    allResources.addAll(unseqFiles);
-    // sort the tsfile by version, so that we can iterate the tsfile from the newest to oldest
-    allResources.sort(
-        (o1, o2) -> {
-          try {
-            TsFileNameGenerator.TsFileName n1 =
-                TsFileNameGenerator.getTsFileName(o1.getTsFile().getName());
-            TsFileNameGenerator.TsFileName n2 =
-                TsFileNameGenerator.getTsFileName(o2.getTsFile().getName());
-            return (int) (n2.getVersion() - n1.getVersion());
-          } catch (IOException e) {
-            return 0;
-          }
-        });
-    for (String measurement : measurements) {
-      for (TsFileResource tsFileResource : allResources) {
-        if (!tsFileResource.mayContainsDevice(device)) {
-          continue;
-        }
-        MeasurementSchema schema =
-            getMeasurementSchemaFromReader(
-                tsFileResource,
-                readerCacheMap.computeIfAbsent(
-                    tsFileResource,
-                    x -> {
-                      try {
-                        FileReaderManager.getInstance().increaseFileReaderReference(x, true);
-                        return FileReaderManager.getInstance().get(x.getTsFilePath(), true);
-                      } catch (IOException e) {
-                        throw new RuntimeException(
-                            String.format(
-                                "Failed to construct sequence reader for %s", tsFileResource));
-                      }
-                    }),
-                device,
-                measurement);
-        if (schema != null) {
-          schemaMap.put(measurement, schema);
-          break;
-        }
-      }
-    }
-    return schemaMap;
-  }
-
-  private MeasurementSchema getMeasurementSchemaFromReader(
-      TsFileResource resource, TsFileSequenceReader reader, String device, String measurement)
-      throws IllegalPathException, IOException {
-    List<ChunkMetadata> chunkMetadata =
-        reader.getChunkMetadataList(new PartialPath(device, measurement), true);
-    if (chunkMetadata.size() > 0) {
-      chunkMetadata.get(0).setFilePath(resource.getTsFilePath());
-      Chunk chunk = ChunkCache.getInstance().get(chunkMetadata.get(0));
-      ChunkHeader header = chunk.getHeader();
-      return new MeasurementSchema(
-          measurement, header.getDataType(), header.getEncodingType(), header.getCompressionType());
-    }
-    return null;
-  }
-
-  private void clearReaderCache() throws IOException {
-    for (TsFileResource resource : readerCacheMap.keySet()) {
-      FileReaderManager.getInstance().decreaseFileReaderReference(resource, true);
-    }
-  }
-
   private static void updateDeviceStartTimeAndEndTime(
       List<TsFileResource> targetResources, AbstractCompactionWriter compactionWriter) {
     List<TsFileIOWriter> targetFileWriters = compactionWriter.getFileIOWriter();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 79f7b59..99e5cfd 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -948,6 +948,18 @@
     }
   }
 
+  public static int compareFileNameByDesc(TsFileResource o1, TsFileResource o2) {
+    try {
+      TsFileNameGenerator.TsFileName n1 =
+          TsFileNameGenerator.getTsFileName(o1.getTsFile().getName());
+      TsFileNameGenerator.TsFileName n2 =
+          TsFileNameGenerator.getTsFileName(o2.getTsFile().getName());
+      return (int) (n2.getVersion() - n1.getVersion());
+    } catch (IOException e) {
+      return 0;
+    }
+  }
+
   public void setSeq(boolean seq) {
     isSeq = seq;
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/AbstractCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/AbstractCompactionTest.java
index e4477b3..5e380fc 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/AbstractCompactionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/AbstractCompactionTest.java
@@ -57,6 +57,7 @@
   private int chunkGroupSize = 0;
   private int pageSize = 0;
   protected String COMPACTION_TEST_SG = TsFileGeneratorUtils.testStorageGroup;
+  private TSDataType dataType;
 
   private static final long oldTargetChunkSize =
       IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
@@ -110,7 +111,7 @@
     if (!UNSEQ_DIRS.exists()) {
       Assert.assertTrue(UNSEQ_DIRS.mkdirs());
     }
-
+    dataType = TSDataType.INT64;
     EnvironmentUtils.envSetUp();
     IoTDB.configManager.init();
   }
@@ -195,23 +196,101 @@
     }
   }
 
+  /**
+   * @param fileNum the number of file
+   * @param deviceIndexes device index in each file
+   * @param measurementIndexes measurement index in each device of each file
+   * @param pointNum data point number of each timeseries in each file
+   * @param startTime start time of each timeseries
+   * @param timeInterval time interval of each timeseries between files
+   * @param isAlign when it is true, it will create mix tsfile which contains aligned and nonAligned
+   *     timeseries
+   * @param isSeq
+   */
+  protected void createFilesWithTextValue(
+      int fileNum,
+      List<Integer> deviceIndexes,
+      List<Integer> measurementIndexes,
+      int pointNum,
+      int startTime,
+      int timeInterval,
+      boolean isAlign,
+      boolean isSeq)
+      throws IOException, WriteProcessException {
+
+    for (int i = 0; i < fileNum; i++) {
+      String fileName =
+          System.currentTimeMillis()
+              + FilePathUtils.FILE_NAME_SEPARATOR
+              + fileVersion++
+              + "-0-0.tsfile";
+      String filePath;
+      if (isSeq) {
+        filePath = SEQ_DIRS.getPath() + File.separator + fileName;
+      } else {
+        filePath = UNSEQ_DIRS.getPath() + File.separator + fileName;
+      }
+      File file;
+      if (isAlign) {
+        file =
+            TsFileGeneratorUtils.generateAlignedTsFileWithTextValues(
+                filePath,
+                deviceIndexes,
+                measurementIndexes,
+                pointNum,
+                startTime + pointNum * i + timeInterval * i,
+                chunkGroupSize,
+                pageSize);
+      } else {
+        file =
+            TsFileGeneratorUtils.generateNonAlignedTsFileWithTextValues(
+                filePath,
+                deviceIndexes,
+                measurementIndexes,
+                pointNum,
+                startTime + pointNum * i + timeInterval * i,
+                chunkGroupSize,
+                pageSize);
+      }
+      // add resource
+      TsFileResource resource = new TsFileResource(file);
+      int deviceStartindex = isAlign ? TsFileGeneratorUtils.getAlignDeviceOffset() : 0;
+      for (int j = 0; j < deviceIndexes.size(); j++) {
+        resource.updateStartTime(
+            COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + (deviceIndexes.get(j) + deviceStartindex),
+            startTime + pointNum * i + timeInterval * i);
+        resource.updateEndTime(
+            COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + (deviceIndexes.get(j) + deviceStartindex),
+            startTime + pointNum * i + timeInterval * i + pointNum - 1);
+      }
+      resource.updatePlanIndexes(fileVersion);
+      resource.setStatus(TsFileResourceStatus.CLOSED);
+      resource.serialize();
+      if (isSeq) {
+        seqResources.add(resource);
+      } else {
+        unseqResources.add(resource);
+      }
+    }
+    // sleep a few milliseconds to avoid generating files with same timestamps
+    try {
+      Thread.sleep(10);
+    } catch (Exception e) {
+
+    }
+  }
+
   private void addResource(
       File file, int deviceNum, long startTime, long endTime, boolean isAlign, boolean isSeq)
       throws IOException {
     TsFileResource resource = new TsFileResource(file);
-    int deviceStartindex = 0;
-    if (isAlign) {
-      deviceStartindex = TsFileGeneratorUtils.getAlignDeviceOffset();
-      for (int i = deviceStartindex; i < deviceStartindex + deviceNum; i++) {
-        resource.updateStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i, startTime);
-        resource.updateEndTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i, endTime);
-      }
-    } else {
-      for (int i = deviceStartindex; i < deviceStartindex + deviceNum; i++) {
-        resource.updateStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i, startTime);
-        resource.updateEndTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i, endTime);
-      }
+    int deviceStartindex = isAlign ? TsFileGeneratorUtils.getAlignDeviceOffset() : 0;
+
+    for (int i = deviceStartindex; i < deviceStartindex + deviceNum; i++) {
+      resource.updateStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i, startTime);
+      resource.updateEndTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i, endTime);
     }
+
     resource.updatePlanIndexes(fileVersion);
     resource.setStatus(TsFileResourceStatus.CLOSED);
     // resource.setTimeIndexType((byte) 0);
@@ -233,12 +312,12 @@
         List<CompressionType> compressionTypes = new ArrayList<>();
         for (int j = 0; j < measurementNum; j++) {
           measurements.add("s" + j);
-          dataTypes.add(TSDataType.INT64);
+          dataTypes.add(dataType);
           encodings.add(TSEncoding.PLAIN);
           compressionTypes.add(CompressionType.UNCOMPRESSED);
           IoTDB.schemaProcessor.createTimeseries(
               new PartialPath(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i, "s" + j),
-              TSDataType.INT64,
+              dataType,
               TSEncoding.PLAIN,
               CompressionType.UNCOMPRESSED,
               Collections.emptyMap());
@@ -253,7 +332,7 @@
         for (int j = 0; j < measurementNum; j++) {
           IoTDB.schemaProcessor.createTimeseries(
               new PartialPath(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i, "s" + j),
-              TSDataType.INT64,
+              dataType,
               TSEncoding.PLAIN,
               CompressionType.UNCOMPRESSED,
               Collections.emptyMap());
@@ -262,6 +341,12 @@
     }
   }
 
+  protected void deleteTimeseriesInMManager(List<String> timeseries) throws MetadataException {
+    for (String path : timeseries) {
+      IoTDB.schemaProcessor.deleteTimeseries(new PartialPath(path));
+    }
+  }
+
   public void tearDown() throws IOException, StorageEngineException {
     new CompactionConfigRestorer().restoreCompactionConfig();
     removeFiles();
@@ -305,4 +390,8 @@
       resourceFile.delete();
     }
   }
+
+  protected void setDataType(TSDataType dataType) {
+    this.dataType = dataType;
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
index f0ee615..c5b65ed 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
@@ -34,6 +34,7 @@
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -43,8 +44,10 @@
 import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.TsFileGeneratorUtils;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -2047,6 +2050,7 @@
     seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3" + PATH_SEPARATOR + "s4");
     generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE);
     generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE);
+    deleteTimeseriesInMManager(seriesPaths);
 
     for (int i = 0; i < 4; i++) {
       for (int j = 0; j < 5; j++) {
@@ -2245,6 +2249,7 @@
     }
     generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE);
     generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE);
+    deleteTimeseriesInMManager(seriesPaths);
 
     for (int i = 0; i < 4; i++) {
       for (int j = 0; j < 5; j++) {
@@ -2435,6 +2440,7 @@
     }
     generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE);
     generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE);
+    deleteTimeseriesInMManager(seriesPaths);
 
     for (int i = 0; i < 4; i++) {
       for (int j = 0; j < 5; j++) {
@@ -3094,6 +3100,7 @@
             + "s4");
     generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE);
     generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE);
+    deleteTimeseriesInMManager(seriesPaths);
 
     for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
         i < TsFileGeneratorUtils.getAlignDeviceOffset() + 4;
@@ -3332,6 +3339,7 @@
     }
     generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE);
     generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE);
+    deleteTimeseriesInMManager(seriesPaths);
 
     for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
         i < TsFileGeneratorUtils.getAlignDeviceOffset() + 4;
@@ -3481,6 +3489,1210 @@
   }
 
   /**
+   * Different source files have timeseries with the same path, but different data types. Because
+   * timeseries in the former file is been deleted.
+   */
+  @Test
+  public void testCrossSpaceCompactionWithSameTimeseriesInDifferentSourceFiles()
+      throws IOException, WriteProcessException, MetadataException, StorageEngineException,
+          InterruptedException {
+    TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
+    registerTimeseriesInMManger(4, 5, false);
+    createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
+    createFiles(2, 4, 5, 300, 700, 700, 50, 50, false, true);
+    createFiles(3, 3, 4, 200, 20, 10020, 30, 30, false, false);
+    createFiles(2, 1, 5, 100, 450, 20450, 0, 0, false, false);
+
+    // generate mods file
+    List<String> seriesPaths = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0" + PATH_SEPARATOR + "s" + i);
+      seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1" + PATH_SEPARATOR + "s" + i);
+      seriesPaths.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2" + PATH_SEPARATOR + "s" + i);
+    }
+    generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE);
+    generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE);
+    deleteTimeseriesInMManager(seriesPaths);
+    setDataType(TSDataType.TEXT);
+    registerTimeseriesInMManger(2, 7, false);
+    List<Integer> deviceIndex = new ArrayList<>();
+    for (int i = 0; i < 2; i++) {
+      deviceIndex.add(i);
+    }
+    List<Integer> measurementIndex = new ArrayList<>();
+    for (int i = 0; i < 7; i++) {
+      measurementIndex.add(i);
+    }
+
+    createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 1350, 0, false, false);
+
+    List<TsFileResource> targetResources =
+        CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
+    ICompactionPerformer performer =
+        new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
+    performer.setSummary(new CompactionTaskSummary());
+    performer.perform();
+    CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+    Assert.assertEquals(2, targetResources.size());
+
+    List<String> deviceIdList = new ArrayList<>();
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0");
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
+    for (int i = 0; i < 2; i++) {
+      if (i == 0) {
+        Assert.assertFalse(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+        Assert.assertFalse(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+        Assert.assertFalse(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+      } else {
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+        Assert.assertFalse(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+      }
+      check(targetResources.get(i), deviceIdList);
+    }
+
+    Map<String, Long> measurementMaxTime = new HashMap<>();
+
+    for (int i = 0; i < 4; i++) {
+      TSDataType tsDataType = i < 2 ? TSDataType.TEXT : TSDataType.INT64;
+      for (int j = 0; j < 7; j++) {
+        measurementMaxTime.putIfAbsent(
+            COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
+            Long.MIN_VALUE);
+        PartialPath path =
+            new MeasurementPath(
+                COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
+                "s" + j,
+                new MeasurementSchema("s" + j, tsDataType));
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
+                path,
+                tsDataType,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
+                targetResources,
+                new ArrayList<>(),
+                true);
+        int count = 0;
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+          while (iterator.hasNext()) {
+            if (measurementMaxTime.get(
+                    COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j)
+                >= iterator.currentTime()) {
+              Assert.fail();
+            }
+            measurementMaxTime.put(
+                COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
+                iterator.currentTime());
+            count++;
+            iterator.next();
+          }
+        }
+        tsBlockReader.close();
+        if (i < 2 && j < 7) {
+          assertEquals(300, count);
+        } else if (i == 3 && j < 5) {
+          assertEquals(600, count);
+        } else {
+          assertEquals(0, count);
+        }
+      }
+    }
+  }
+
+  /** Each source file has different device. */
+  @Test
+  public void testCrossSpaceCompactionWithDifferentDevicesInDifferentSourceFiles()
+      throws IOException, WriteProcessException, MetadataException, StorageEngineException,
+          InterruptedException {
+    TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
+    registerTimeseriesInMManger(5, 7, false);
+    List<Integer> deviceIndex = new ArrayList<>();
+    List<Integer> measurementIndex = new ArrayList<>();
+    for (int i = 0; i < 7; i++) {
+      measurementIndex.add(i);
+    }
+
+    deviceIndex.add(0);
+    deviceIndex.add(2);
+    createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 0, 0, false, true);
+
+    deviceIndex.clear();
+    deviceIndex.add(1);
+    deviceIndex.add(3);
+    createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 400, 0, false, true);
+
+    deviceIndex.clear();
+    deviceIndex.add(2);
+    deviceIndex.add(4);
+    deviceIndex.add(0);
+    createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 800, 0, false, true);
+
+    deviceIndex.clear();
+    deviceIndex.add(1);
+    deviceIndex.add(4);
+    createFilesWithTextValue(1, deviceIndex, measurementIndex, 200, 100, 0, false, false);
+
+    deviceIndex.clear();
+    deviceIndex.add(1);
+    deviceIndex.add(3);
+    createFilesWithTextValue(1, deviceIndex, measurementIndex, 400, 600, 0, false, false);
+
+    List<TsFileResource> targetResources =
+        CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
+    ICompactionPerformer performer =
+        new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
+    performer.setSummary(new CompactionTaskSummary());
+    performer.perform();
+    CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+
+    List<String> deviceIdList = new ArrayList<>();
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0");
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d4");
+    for (int i = 0; i < 3; i++) {
+      if (i == 0) {
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+        Assert.assertFalse(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+        Assert.assertFalse(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+        Assert.assertFalse(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d4"));
+      } else if (i == 1) {
+        Assert.assertFalse(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+        Assert.assertFalse(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+        Assert.assertFalse(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d4"));
+      } else {
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d4"));
+      }
+      check(targetResources.get(i), deviceIdList);
+    }
+
+    Map<String, Long> measurementMaxTime = new HashMap<>();
+
+    for (int i = 0; i < 5; i++) {
+      for (int j = 0; j < 5; j++) {
+        measurementMaxTime.putIfAbsent(
+            COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
+            Long.MIN_VALUE);
+        PartialPath path =
+            new MeasurementPath(
+                COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
+                "s" + j,
+                new MeasurementSchema("s" + j, TSDataType.TEXT));
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
+                path,
+                TSDataType.TEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
+                targetResources,
+                new ArrayList<>(),
+                true);
+        int count = 0;
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+          while (iterator.hasNext()) {
+            if (measurementMaxTime.get(
+                    COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j)
+                >= iterator.currentTime()) {
+              Assert.fail();
+            }
+            measurementMaxTime.put(
+                COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
+                iterator.currentTime());
+            count++;
+            iterator.next();
+          }
+        }
+        tsBlockReader.close();
+        if (i == 0 || i == 2 || i == 3) {
+          assertEquals(600, count);
+        } else if (i == 1) {
+          assertEquals(800, count);
+        } else {
+          assertEquals(500, count);
+        }
+      }
+    }
+  }
+
+  /** Each source file has same device with different measurements. */
+  @Test
+  public void testCrossSpaceCompactionWithDifferentMeasurementsInDifferentSourceFiles()
+      throws IOException, WriteProcessException, MetadataException, StorageEngineException,
+          InterruptedException {
+    TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
+    registerTimeseriesInMManger(5, 5, false);
+    List<Integer> deviceIndex = new ArrayList<>();
+    List<Integer> measurementIndex = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      deviceIndex.add(i);
+    }
+
+    measurementIndex.add(0);
+    measurementIndex.add(2);
+    createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 0, 0, false, true);
+
+    measurementIndex.clear();
+    measurementIndex.add(1);
+    measurementIndex.add(3);
+    createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 400, 0, false, true);
+
+    measurementIndex.clear();
+    measurementIndex.add(2);
+    measurementIndex.add(4);
+    measurementIndex.add(0);
+    createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 800, 0, false, true);
+
+    measurementIndex.clear();
+    measurementIndex.add(1);
+    measurementIndex.add(4);
+    createFilesWithTextValue(1, deviceIndex, measurementIndex, 200, 100, 0, false, false);
+
+    measurementIndex.clear();
+    measurementIndex.add(0);
+    measurementIndex.add(2);
+    createFilesWithTextValue(1, deviceIndex, measurementIndex, 200, 400, 0, false, false);
+
+    List<TsFileResource> targetResources =
+        CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
+    ICompactionPerformer performer =
+        new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
+    performer.setSummary(new CompactionTaskSummary());
+    performer.perform();
+    CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+
+    List<String> deviceIdList = new ArrayList<>();
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0");
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d4");
+    for (int i = 0; i < 3; i++) {
+      Assert.assertTrue(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+      Assert.assertTrue(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+      Assert.assertTrue(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+      Assert.assertTrue(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+      Assert.assertTrue(
+          targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d4"));
+      check(targetResources.get(i), deviceIdList);
+    }
+
+    Map<String, Long> measurementMaxTime = new HashMap<>();
+
+    for (int i = 0; i < 5; i++) {
+      for (int j = 0; j < 5; j++) {
+        measurementMaxTime.putIfAbsent(
+            COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
+            Long.MIN_VALUE);
+        PartialPath path =
+            new MeasurementPath(
+                COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
+                "s" + j,
+                new MeasurementSchema("s" + j, TSDataType.TEXT));
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
+                path,
+                TSDataType.TEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
+                targetResources,
+                new ArrayList<>(),
+                true);
+        int count = 0;
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+          while (iterator.hasNext()) {
+            if (measurementMaxTime.get(
+                    COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j)
+                >= iterator.currentTime()) {
+              Assert.fail();
+            }
+            measurementMaxTime.put(
+                COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
+                iterator.currentTime());
+            count++;
+            iterator.next();
+          }
+        }
+        tsBlockReader.close();
+        if (j == 0 || j == 2) {
+          assertEquals(800, count);
+        } else if (j == 1 || j == 4) {
+          assertEquals(500, count);
+        } else {
+          assertEquals(300, count);
+        }
+      }
+    }
+  }
+
+  /** Each source file has different devices and different measurements. */
+  @Test
+  public void testCrossSpaceCompactionWithDifferentDevicesAndMeasurementsInDifferentSourceFiles()
+      throws IOException, WriteProcessException, MetadataException, StorageEngineException,
+          InterruptedException {
+    TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
+    registerTimeseriesInMManger(4, 5, false);
+    List<Integer> deviceIndex = new ArrayList<>();
+    List<Integer> measurementIndex = new ArrayList<>();
+    deviceIndex.add(0);
+    deviceIndex.add(1);
+
+    measurementIndex.add(0);
+    measurementIndex.add(2);
+    createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 0, 0, false, true);
+
+    measurementIndex.clear();
+    measurementIndex.add(1);
+    measurementIndex.add(3);
+    createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 400, 0, false, true);
+
+    deviceIndex.add(2);
+    deviceIndex.add(3);
+    measurementIndex.clear();
+    measurementIndex.add(2);
+    measurementIndex.add(4);
+    measurementIndex.add(0);
+    createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 800, 0, false, true);
+    deviceIndex.remove(2);
+    deviceIndex.remove(2);
+
+    measurementIndex.clear();
+    measurementIndex.add(1);
+    measurementIndex.add(4);
+    createFilesWithTextValue(1, deviceIndex, measurementIndex, 200, 100, 0, false, false);
+
+    measurementIndex.clear();
+    measurementIndex.add(0);
+    measurementIndex.add(2);
+    createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 600, 0, false, false);
+
+    List<TsFileResource> targetResources =
+        CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
+    ICompactionPerformer performer =
+        new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
+    performer.setSummary(new CompactionTaskSummary());
+    performer.perform();
+    CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+
+    List<String> deviceIdList = new ArrayList<>();
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0");
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
+    deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
+    for (int i = 0; i < 3; i++) {
+      if (i < 2) {
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+        Assert.assertFalse(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+        Assert.assertFalse(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+      } else {
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+        Assert.assertTrue(
+            targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+      }
+      check(targetResources.get(i), deviceIdList);
+    }
+
+    Map<String, Long> measurementMaxTime = new HashMap<>();
+
+    for (int i = 0; i < 4; i++) {
+      for (int j = 0; j < 5; j++) {
+        measurementMaxTime.putIfAbsent(
+            COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
+            Long.MIN_VALUE);
+        PartialPath path =
+            new MeasurementPath(
+                COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
+                "s" + j,
+                new MeasurementSchema("s" + j, TSDataType.TEXT));
+        IBatchReader tsFilesReader =
+            new SeriesRawDataBatchReader(
+                path,
+                TSDataType.VECTOR,
+                EnvironmentUtils.TEST_QUERY_CONTEXT,
+                targetResources,
+                new ArrayList<>(),
+                null,
+                null,
+                true);
+        int count = 0;
+        while (tsFilesReader.hasNextBatch()) {
+          BatchData batchData = tsFilesReader.nextBatch();
+          while (batchData.hasCurrent()) {
+            if (measurementMaxTime.get(
+                    COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j)
+                >= batchData.currentTime()) {
+              Assert.fail();
+            }
+            measurementMaxTime.put(
+                COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
+                batchData.currentTime());
+            count++;
+            batchData.next();
+          }
+        }
+        tsFilesReader.close();
+        if (i < 2) {
+          if (j == 0 || j == 2) {
+            assertEquals(800, count);
+          } else if (j == 1 || j == 4) {
+            assertEquals(500, count);
+          } else {
+            assertEquals(300, count);
+          }
+        } else {
+          if (j == 0 || j == 2 || j == 4) {
+            assertEquals(300, count);
+          } else {
+            assertEquals(0, count);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Different source files have timeseries with the same path, but different data types. Because
+   * timeseries in the former file is been deleted.
+   */
+  @Test
+  public void testAlignedCrossSpaceCompactionWithSameTimeseriesInDifferentSourceFiles()
+      throws IOException, WriteProcessException, MetadataException, StorageEngineException,
+          InterruptedException {
+    TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
+    registerTimeseriesInMManger(4, 5, true);
+    createFiles(2, 2, 3, 300, 0, 0, 50, 50, true, true);
+    createFiles(2, 4, 5, 300, 700, 700, 50, 50, true, true);
+    createFiles(3, 3, 4, 200, 20, 10020, 30, 30, true, false);
+    createFiles(2, 1, 5, 100, 450, 20450, 0, 0, true, false);
+
+    // generate mods file
+    List<String> seriesPaths = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      seriesPaths.add(
+          COMPACTION_TEST_SG
+              + PATH_SEPARATOR
+              + "d"
+              + TsFileGeneratorUtils.getAlignDeviceOffset()
+              + PATH_SEPARATOR
+              + "s"
+              + i);
+      seriesPaths.add(
+          COMPACTION_TEST_SG
+              + PATH_SEPARATOR
+              + "d"
+              + (TsFileGeneratorUtils.getAlignDeviceOffset() + 1)
+              + PATH_SEPARATOR
+              + "s"
+              + i);
+      seriesPaths.add(
+          COMPACTION_TEST_SG
+              + PATH_SEPARATOR
+              + "d"
+              + (TsFileGeneratorUtils.getAlignDeviceOffset() + 2)
+              + PATH_SEPARATOR
+              + "s"
+              + i);
+    }
+    generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE);
+    generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE);
+    deleteTimeseriesInMManager(seriesPaths);
+    setDataType(TSDataType.TEXT);
+    registerTimeseriesInMManger(2, 7, true);
+    List<Integer> deviceIndex = new ArrayList<>();
+    for (int i = 0; i < 2; i++) {
+      deviceIndex.add(i);
+    }
+    List<Integer> measurementIndex = new ArrayList<>();
+    for (int i = 0; i < 7; i++) {
+      measurementIndex.add(i);
+    }
+
+    createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 1350, 0, true, false);
+
+    List<TsFileResource> targetResources =
+        CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
+    ICompactionPerformer performer =
+        new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
+    performer.setSummary(new CompactionTaskSummary());
+    performer.perform();
+    CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+
+    Assert.assertEquals(2, targetResources.size());
+
+    List<String> deviceIdList = new ArrayList<>();
+    deviceIdList.add(
+        COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + (TsFileGeneratorUtils.getAlignDeviceOffset()));
+    deviceIdList.add(
+        COMPACTION_TEST_SG
+            + PATH_SEPARATOR
+            + "d"
+            + (TsFileGeneratorUtils.getAlignDeviceOffset() + 1));
+    deviceIdList.add(
+        COMPACTION_TEST_SG
+            + PATH_SEPARATOR
+            + "d"
+            + (TsFileGeneratorUtils.getAlignDeviceOffset() + 2));
+    deviceIdList.add(
+        COMPACTION_TEST_SG
+            + PATH_SEPARATOR
+            + "d"
+            + (TsFileGeneratorUtils.getAlignDeviceOffset() + 3));
+    for (int i = 0; i < 2; i++) {
+      if (i == 0) {
+        Assert.assertFalse(
+            targetResources
+                .get(i)
+                .isDeviceIdExist(
+                    COMPACTION_TEST_SG
+                        + PATH_SEPARATOR
+                        + "d"
+                        + (TsFileGeneratorUtils.getAlignDeviceOffset())));
+        Assert.assertFalse(
+            targetResources
+                .get(i)
+                .isDeviceIdExist(
+                    COMPACTION_TEST_SG
+                        + PATH_SEPARATOR
+                        + "d"
+                        + (TsFileGeneratorUtils.getAlignDeviceOffset() + 1)));
+        Assert.assertFalse(
+            targetResources
+                .get(i)
+                .isDeviceIdExist(
+                    COMPACTION_TEST_SG
+                        + PATH_SEPARATOR
+                        + "d"
+                        + (TsFileGeneratorUtils.getAlignDeviceOffset() + 2)));
+        Assert.assertTrue(
+            targetResources
+                .get(i)
+                .isDeviceIdExist(
+                    COMPACTION_TEST_SG
+                        + PATH_SEPARATOR
+                        + "d"
+                        + (TsFileGeneratorUtils.getAlignDeviceOffset() + 3)));
+      } else {
+        Assert.assertTrue(
+            targetResources
+                .get(i)
+                .isDeviceIdExist(
+                    COMPACTION_TEST_SG
+                        + PATH_SEPARATOR
+                        + "d"
+                        + (TsFileGeneratorUtils.getAlignDeviceOffset())));
+        Assert.assertTrue(
+            targetResources
+                .get(i)
+                .isDeviceIdExist(
+                    COMPACTION_TEST_SG
+                        + PATH_SEPARATOR
+                        + "d"
+                        + (TsFileGeneratorUtils.getAlignDeviceOffset() + 1)));
+        Assert.assertFalse(
+            targetResources
+                .get(i)
+                .isDeviceIdExist(
+                    COMPACTION_TEST_SG
+                        + PATH_SEPARATOR
+                        + "d"
+                        + (TsFileGeneratorUtils.getAlignDeviceOffset() + 2)));
+        Assert.assertTrue(
+            targetResources
+                .get(i)
+                .isDeviceIdExist(
+                    COMPACTION_TEST_SG
+                        + PATH_SEPARATOR
+                        + "d"
+                        + (TsFileGeneratorUtils.getAlignDeviceOffset() + 3)));
+      }
+      check(targetResources.get(i), deviceIdList);
+    }
+
+    Map<String, Long> measurementMaxTime = new HashMap<>();
+
+    for (int i = 0; i < 4; i++) {
+      TSDataType tsDataType = i < 2 ? TSDataType.TEXT : TSDataType.INT64;
+      for (int j = 0; j < 7; j++) {
+        measurementMaxTime.putIfAbsent(
+            COMPACTION_TEST_SG
+                + PATH_SEPARATOR
+                + "d"
+                + (TsFileGeneratorUtils.getAlignDeviceOffset() + i)
+                + PATH_SEPARATOR
+                + "s"
+                + j,
+            Long.MIN_VALUE);
+        List<IMeasurementSchema> schemas = new ArrayList<>();
+        schemas.add(new MeasurementSchema("s" + j, tsDataType));
+        AlignedPath path =
+            new AlignedPath(
+                COMPACTION_TEST_SG
+                    + PATH_SEPARATOR
+                    + "d"
+                    + (TsFileGeneratorUtils.getAlignDeviceOffset() + i),
+                Collections.singletonList("s" + j),
+                schemas);
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
+                path,
+                tsDataType,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
+                targetResources,
+                new ArrayList<>(),
+                true);
+        int count = 0;
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+          while (iterator.hasNext()) {
+            if (measurementMaxTime.get(
+                    COMPACTION_TEST_SG
+                        + PATH_SEPARATOR
+                        + "d"
+                        + (TsFileGeneratorUtils.getAlignDeviceOffset() + i)
+                        + PATH_SEPARATOR
+                        + "s"
+                        + j)
+                >= iterator.currentTime()) {
+              Assert.fail();
+            }
+            measurementMaxTime.put(
+                COMPACTION_TEST_SG
+                    + PATH_SEPARATOR
+                    + "d"
+                    + (TsFileGeneratorUtils.getAlignDeviceOffset() + i)
+                    + PATH_SEPARATOR
+                    + "s"
+                    + j,
+                iterator.currentTime());
+            count++;
+            iterator.next();
+          }
+        }
+        tsBlockReader.close();
+        if (i < 2 && j < 7) {
+          assertEquals(300, count);
+        } else if (i == 3 && j < 5) {
+          assertEquals(600, count);
+        } else {
+          assertEquals(0, count);
+        }
+      }
+    }
+  }
+
+  /** Each source file has different device. */
+  @Test
+  public void testAlignedCrossSpaceCompactionWithDifferentDevicesInDifferentSourceFiles()
+      throws IOException, WriteProcessException, MetadataException, StorageEngineException,
+          InterruptedException {
+    TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
+    registerTimeseriesInMManger(5, 7, true);
+    List<Integer> deviceIndex = new ArrayList<>();
+    List<Integer> measurementIndex = new ArrayList<>();
+    for (int i = 0; i < 7; i++) {
+      measurementIndex.add(i);
+    }
+
+    deviceIndex.add(0);
+    deviceIndex.add(2);
+    createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 0, 0, true, true);
+
+    deviceIndex.clear();
+    deviceIndex.add(1);
+    deviceIndex.add(3);
+    createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 400, 0, true, true);
+
+    deviceIndex.clear();
+    deviceIndex.add(2);
+    deviceIndex.add(4);
+    deviceIndex.add(0);
+    createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 800, 0, true, true);
+
+    deviceIndex.clear();
+    deviceIndex.add(1);
+    deviceIndex.add(4);
+    createFilesWithTextValue(1, deviceIndex, measurementIndex, 200, 100, 0, true, false);
+
+    deviceIndex.clear();
+    deviceIndex.add(1);
+    deviceIndex.add(3);
+    createFilesWithTextValue(1, deviceIndex, measurementIndex, 400, 600, 0, true, false);
+
+    List<TsFileResource> targetResources =
+        CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
+    ICompactionPerformer performer =
+        new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
+    performer.setSummary(new CompactionTaskSummary());
+    performer.perform();
+    CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+
+    List<String> deviceIdList = new ArrayList<>();
+    deviceIdList.add(
+        COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + TsFileGeneratorUtils.getAlignDeviceOffset());
+    deviceIdList.add(
+        COMPACTION_TEST_SG
+            + PATH_SEPARATOR
+            + "d"
+            + (TsFileGeneratorUtils.getAlignDeviceOffset() + 1));
+    deviceIdList.add(
+        COMPACTION_TEST_SG
+            + PATH_SEPARATOR
+            + "d"
+            + (TsFileGeneratorUtils.getAlignDeviceOffset() + 2));
+    deviceIdList.add(
+        COMPACTION_TEST_SG
+            + PATH_SEPARATOR
+            + "d"
+            + (TsFileGeneratorUtils.getAlignDeviceOffset() + 3));
+    deviceIdList.add(
+        COMPACTION_TEST_SG
+            + PATH_SEPARATOR
+            + "d"
+            + (TsFileGeneratorUtils.getAlignDeviceOffset() + 4));
+    for (int i = 0; i < 3; i++) {
+      if (i == 0) {
+        Assert.assertTrue(
+            targetResources
+                .get(i)
+                .isDeviceIdExist(
+                    COMPACTION_TEST_SG
+                        + PATH_SEPARATOR
+                        + "d"
+                        + (TsFileGeneratorUtils.getAlignDeviceOffset())));
+        Assert.assertFalse(
+            targetResources
+                .get(i)
+                .isDeviceIdExist(
+                    COMPACTION_TEST_SG
+                        + PATH_SEPARATOR
+                        + "d"
+                        + (TsFileGeneratorUtils.getAlignDeviceOffset() + 1)));
+        Assert.assertTrue(
+            targetResources
+                .get(i)
+                .isDeviceIdExist(
+                    COMPACTION_TEST_SG
+                        + PATH_SEPARATOR
+                        + "d"
+                        + (TsFileGeneratorUtils.getAlignDeviceOffset() + 2)));
+        Assert.assertFalse(
+            targetResources
+                .get(i)
+                .isDeviceIdExist(
+                    COMPACTION_TEST_SG
+                        + PATH_SEPARATOR
+                        + "d"
+                        + (TsFileGeneratorUtils.getAlignDeviceOffset() + 3)));
+        Assert.assertFalse(
+            targetResources
+                .get(i)
+                .isDeviceIdExist(
+                    COMPACTION_TEST_SG
+                        + PATH_SEPARATOR
+                        + "d"
+                        + (TsFileGeneratorUtils.getAlignDeviceOffset() + 4)));
+      } else if (i == 1) {
+        Assert.assertFalse(
+            targetResources
+                .get(i)
+                .isDeviceIdExist(
+                    COMPACTION_TEST_SG
+                        + PATH_SEPARATOR
+                        + "d"
+                        + (TsFileGeneratorUtils.getAlignDeviceOffset())));
+        Assert.assertTrue(
+            targetResources
+                .get(i)
+                .isDeviceIdExist(
+                    COMPACTION_TEST_SG
+                        + PATH_SEPARATOR
+                        + "d"
+                        + (TsFileGeneratorUtils.getAlignDeviceOffset() + 1)));
+        Assert.assertFalse(
+            targetResources
+                .get(i)
+                .isDeviceIdExist(
+                    COMPACTION_TEST_SG
+                        + PATH_SEPARATOR
+                        + "d"
+                        + (TsFileGeneratorUtils.getAlignDeviceOffset() + 2)));
+        Assert.assertTrue(
+            targetResources
+                .get(i)
+                .isDeviceIdExist(
+                    COMPACTION_TEST_SG
+                        + PATH_SEPARATOR
+                        + "d"
+                        + (TsFileGeneratorUtils.getAlignDeviceOffset() + 3)));
+        Assert.assertFalse(
+            targetResources
+                .get(i)
+                .isDeviceIdExist(
+                    COMPACTION_TEST_SG
+                        + PATH_SEPARATOR
+                        + "d"
+                        + (TsFileGeneratorUtils.getAlignDeviceOffset() + 4)));
+      } else {
+        Assert.assertTrue(
+            targetResources
+                .get(i)
+                .isDeviceIdExist(
+                    COMPACTION_TEST_SG
+                        + PATH_SEPARATOR
+                        + "d"
+                        + (TsFileGeneratorUtils.getAlignDeviceOffset())));
+        Assert.assertTrue(
+            targetResources
+                .get(i)
+                .isDeviceIdExist(
+                    COMPACTION_TEST_SG
+                        + PATH_SEPARATOR
+                        + "d"
+                        + (TsFileGeneratorUtils.getAlignDeviceOffset() + 1)));
+        Assert.assertTrue(
+            targetResources
+                .get(i)
+                .isDeviceIdExist(
+                    COMPACTION_TEST_SG
+                        + PATH_SEPARATOR
+                        + "d"
+                        + (TsFileGeneratorUtils.getAlignDeviceOffset() + 2)));
+        Assert.assertTrue(
+            targetResources
+                .get(i)
+                .isDeviceIdExist(
+                    COMPACTION_TEST_SG
+                        + PATH_SEPARATOR
+                        + "d"
+                        + (TsFileGeneratorUtils.getAlignDeviceOffset() + 3)));
+        Assert.assertTrue(
+            targetResources
+                .get(i)
+                .isDeviceIdExist(
+                    COMPACTION_TEST_SG
+                        + PATH_SEPARATOR
+                        + "d"
+                        + (TsFileGeneratorUtils.getAlignDeviceOffset() + 4)));
+      }
+      check(targetResources.get(i), deviceIdList);
+    }
+
+    Map<String, Long> measurementMaxTime = new HashMap<>();
+
+    for (int i = 0; i < 5; i++) {
+      for (int j = 0; j < 5; j++) {
+        measurementMaxTime.putIfAbsent(
+            COMPACTION_TEST_SG
+                + PATH_SEPARATOR
+                + "d"
+                + (TsFileGeneratorUtils.getAlignDeviceOffset() + i)
+                + PATH_SEPARATOR
+                + "s"
+                + j,
+            Long.MIN_VALUE);
+        List<IMeasurementSchema> schemas = new ArrayList<>();
+        schemas.add(new MeasurementSchema("s" + j, TSDataType.TEXT));
+        AlignedPath path =
+            new AlignedPath(
+                COMPACTION_TEST_SG
+                    + PATH_SEPARATOR
+                    + "d"
+                    + (TsFileGeneratorUtils.getAlignDeviceOffset() + i),
+                Collections.singletonList("s" + j),
+                schemas);
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
+                path,
+                TSDataType.TEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
+                targetResources,
+                new ArrayList<>(),
+                true);
+        int count = 0;
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+          while (iterator.hasNext()) {
+            if (measurementMaxTime.get(
+                    COMPACTION_TEST_SG
+                        + PATH_SEPARATOR
+                        + "d"
+                        + (TsFileGeneratorUtils.getAlignDeviceOffset() + i)
+                        + PATH_SEPARATOR
+                        + "s"
+                        + j)
+                >= iterator.currentTime()) {
+              Assert.fail();
+            }
+            measurementMaxTime.put(
+                COMPACTION_TEST_SG
+                    + PATH_SEPARATOR
+                    + "d"
+                    + (TsFileGeneratorUtils.getAlignDeviceOffset() + i)
+                    + PATH_SEPARATOR
+                    + "s"
+                    + j,
+                iterator.currentTime());
+            count++;
+            iterator.next();
+          }
+        }
+        tsBlockReader.close();
+        if (i == 0 || i == 2 || i == 3) {
+          assertEquals(600, count);
+        } else if (i == 1) {
+          assertEquals(800, count);
+        } else {
+          assertEquals(500, count);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testAlignedCrossSpaceCompactionWithDifferentMeasurementsInDifferentSourceFiles()
+      throws IOException, WriteProcessException, MetadataException, StorageEngineException,
+          InterruptedException {
+    TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
+    registerTimeseriesInMManger(5, 5, true);
+    List<Integer> deviceIndex = new ArrayList<>();
+    List<Integer> measurementIndex = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      deviceIndex.add(i);
+    }
+
+    measurementIndex.add(0);
+    measurementIndex.add(2);
+    createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 0, 0, true, true);
+
+    measurementIndex.clear();
+    measurementIndex.add(1);
+    measurementIndex.add(3);
+    createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 400, 0, true, true);
+
+    measurementIndex.clear();
+    measurementIndex.add(2);
+    measurementIndex.add(4);
+    measurementIndex.add(0);
+    createFilesWithTextValue(1, deviceIndex, measurementIndex, 300, 800, 0, true, true);
+
+    measurementIndex.clear();
+    measurementIndex.add(1);
+    measurementIndex.add(4);
+    createFilesWithTextValue(1, deviceIndex, measurementIndex, 200, 100, 0, true, false);
+
+    measurementIndex.clear();
+    measurementIndex.add(0);
+    measurementIndex.add(2);
+    createFilesWithTextValue(1, deviceIndex, measurementIndex, 200, 400, 0, true, false);
+
+    List<TsFileResource> targetResources =
+        CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
+    ICompactionPerformer performer =
+        new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
+    performer.setSummary(new CompactionTaskSummary());
+    performer.perform();
+    CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+
+    List<String> deviceIdList = new ArrayList<>();
+    deviceIdList.add(
+        COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + TsFileGeneratorUtils.getAlignDeviceOffset());
+    deviceIdList.add(
+        COMPACTION_TEST_SG
+            + PATH_SEPARATOR
+            + "d"
+            + (TsFileGeneratorUtils.getAlignDeviceOffset() + 1));
+    deviceIdList.add(
+        COMPACTION_TEST_SG
+            + PATH_SEPARATOR
+            + "d"
+            + (TsFileGeneratorUtils.getAlignDeviceOffset() + 2));
+    deviceIdList.add(
+        COMPACTION_TEST_SG
+            + PATH_SEPARATOR
+            + "d"
+            + (TsFileGeneratorUtils.getAlignDeviceOffset() + 3));
+    deviceIdList.add(
+        COMPACTION_TEST_SG
+            + PATH_SEPARATOR
+            + "d"
+            + (TsFileGeneratorUtils.getAlignDeviceOffset() + 4));
+    for (int i = 0; i < 3; i++) {
+      Assert.assertTrue(
+          targetResources
+              .get(i)
+              .isDeviceIdExist(
+                  COMPACTION_TEST_SG
+                      + PATH_SEPARATOR
+                      + "d"
+                      + (TsFileGeneratorUtils.getAlignDeviceOffset())));
+      Assert.assertTrue(
+          targetResources
+              .get(i)
+              .isDeviceIdExist(
+                  COMPACTION_TEST_SG
+                      + PATH_SEPARATOR
+                      + "d"
+                      + (TsFileGeneratorUtils.getAlignDeviceOffset() + 1)));
+      Assert.assertTrue(
+          targetResources
+              .get(i)
+              .isDeviceIdExist(
+                  COMPACTION_TEST_SG
+                      + PATH_SEPARATOR
+                      + "d"
+                      + (TsFileGeneratorUtils.getAlignDeviceOffset() + 2)));
+      Assert.assertTrue(
+          targetResources
+              .get(i)
+              .isDeviceIdExist(
+                  COMPACTION_TEST_SG
+                      + PATH_SEPARATOR
+                      + "d"
+                      + (TsFileGeneratorUtils.getAlignDeviceOffset() + 3)));
+      Assert.assertTrue(
+          targetResources
+              .get(i)
+              .isDeviceIdExist(
+                  COMPACTION_TEST_SG
+                      + PATH_SEPARATOR
+                      + "d"
+                      + (TsFileGeneratorUtils.getAlignDeviceOffset() + 4)));
+      check(targetResources.get(i), deviceIdList);
+    }
+
+    Map<String, Long> measurementMaxTime = new HashMap<>();
+
+    for (int i = 0; i < 5; i++) {
+      for (int j = 0; j < 5; j++) {
+        measurementMaxTime.putIfAbsent(
+            COMPACTION_TEST_SG
+                + PATH_SEPARATOR
+                + "d"
+                + (TsFileGeneratorUtils.getAlignDeviceOffset() + i)
+                + PATH_SEPARATOR
+                + "s"
+                + j,
+            Long.MIN_VALUE);
+        List<IMeasurementSchema> schemas = new ArrayList<>();
+        schemas.add(new MeasurementSchema("s" + j, TSDataType.TEXT));
+        AlignedPath path =
+            new AlignedPath(
+                COMPACTION_TEST_SG
+                    + PATH_SEPARATOR
+                    + "d"
+                    + (TsFileGeneratorUtils.getAlignDeviceOffset() + i),
+                Collections.singletonList("s" + j),
+                schemas);
+        IDataBlockReader tsBlockReader =
+            new SeriesDataBlockReader(
+                path,
+                TSDataType.TEXT,
+                FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+                    EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
+                targetResources,
+                new ArrayList<>(),
+                true);
+        int count = 0;
+        while (tsBlockReader.hasNextBatch()) {
+          TsBlock block = tsBlockReader.nextBatch();
+          IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+          while (iterator.hasNext()) {
+            if (measurementMaxTime.get(
+                    COMPACTION_TEST_SG
+                        + PATH_SEPARATOR
+                        + "d"
+                        + (TsFileGeneratorUtils.getAlignDeviceOffset() + i)
+                        + PATH_SEPARATOR
+                        + "s"
+                        + j)
+                >= iterator.currentTime()) {
+              Assert.fail();
+            }
+            measurementMaxTime.put(
+                COMPACTION_TEST_SG
+                    + PATH_SEPARATOR
+                    + "d"
+                    + (TsFileGeneratorUtils.getAlignDeviceOffset() + i)
+                    + PATH_SEPARATOR
+                    + "s"
+                    + j,
+                iterator.currentTime());
+            count++;
+            iterator.next();
+          }
+        }
+        tsBlockReader.close();
+        if (j == 0 || j == 2) {
+          assertEquals(800, count);
+        } else if (j == 1 || j == 4) {
+          assertEquals(500, count);
+        } else {
+          assertEquals(300, count);
+        }
+      }
+    }
+  }
+
+  /**
    * Total 4 seq files and 5 unseq files, each file has different aligned timeseries.
    *
    * <p>Seq files<br>
@@ -3538,6 +4750,7 @@
     }
     generateModsFile(seriesPaths, seqResources, Long.MIN_VALUE, Long.MAX_VALUE);
     generateModsFile(seriesPaths, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE);
+    deleteTimeseriesInMManager(seriesPaths);
 
     for (TsFileResource resource : seqResources) {
       resource.setTimeIndexType((byte) 2);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
index 58b26db..db7072c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
@@ -32,6 +32,7 @@
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 public class TimeseriesMetadata implements ITimeSeriesMetadata {
 
@@ -123,6 +124,42 @@
   }
 
   /**
+   * Return null if excludedMeasurements contains the measurementId without deserializing chunk
+   * metadata.
+   */
+  public static TimeseriesMetadata deserializeFrom(
+      ByteBuffer buffer, Set<String> excludedMeasurements, boolean needChunkMetadata) {
+    byte timeseriesType = ReadWriteIOUtils.readByte(buffer);
+    String measurementID = ReadWriteIOUtils.readVarIntString(buffer);
+    TSDataType tsDataType = ReadWriteIOUtils.readDataType(buffer);
+    int chunkMetaDataListDataSize = ReadWriteForEncodingUtils.readUnsignedVarInt(buffer);
+    Statistics<? extends Serializable> statistics = Statistics.deserialize(buffer, tsDataType);
+    if (excludedMeasurements.contains(measurementID)) {
+      buffer.position(buffer.position() + chunkMetaDataListDataSize);
+      return null;
+    }
+    TimeseriesMetadata timeseriesMetaData = new TimeseriesMetadata();
+    timeseriesMetaData.setTimeSeriesMetadataType(timeseriesType);
+    timeseriesMetaData.setMeasurementId(measurementID);
+    timeseriesMetaData.setTSDataType(tsDataType);
+    timeseriesMetaData.setDataSizeOfChunkMetaDataList(chunkMetaDataListDataSize);
+    timeseriesMetaData.setStatistics(statistics);
+    if (needChunkMetadata) {
+      ByteBuffer byteBuffer = buffer.slice();
+      byteBuffer.limit(chunkMetaDataListDataSize);
+      timeseriesMetaData.chunkMetadataList = new ArrayList<>();
+      while (byteBuffer.hasRemaining()) {
+        timeseriesMetaData.chunkMetadataList.add(
+            ChunkMetadata.deserializeFrom(byteBuffer, timeseriesMetaData));
+      }
+      // minimize the storage of an ArrayList instance.
+      timeseriesMetaData.chunkMetadataList.trimToSize();
+    }
+    buffer.position(buffer.position() + chunkMetaDataListDataSize);
+    return timeseriesMetaData;
+  }
+
+  /**
    * serialize to outputStream.
    *
    * @param outputStream outputStream
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java
index 514d16e..da511cc 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java
@@ -24,9 +24,7 @@
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Queue;
 
@@ -34,6 +32,7 @@
   private final TsFileSequenceReader reader;
   private final Queue<Pair<String, Pair<Long, Long>>> queue;
   private Pair<String, Boolean> currentDevice = null;
+  private MetadataIndexNode measurementNode;
 
   public TsFileDeviceIterator(
       TsFileSequenceReader reader, Queue<Pair<String, Pair<Long, Long>>> queue) {
@@ -56,13 +55,12 @@
       throw new NoSuchElementException();
     }
     Pair<String, Pair<Long, Long>> startEndPair = queue.remove();
-    List<Pair<String, Boolean>> devices = new ArrayList<>();
     try {
-      MetadataIndexNode measurementNode =
+      // first measurement node of this device
+      this.measurementNode =
           MetadataIndexNode.deserializeFrom(
               reader.readData(startEndPair.right.left, startEndPair.right.right));
-      // if tryToGetFirstTimeseriesMetadata(node) returns null, the device is not aligned
-      boolean isAligned = reader.tryToGetFirstTimeseriesMetadata(measurementNode) != null;
+      boolean isAligned = reader.isAlignedDevice(measurementNode);
       currentDevice = new Pair<>(startEndPair.left, isAligned);
       return currentDevice;
     } catch (IOException e) {
@@ -70,4 +68,8 @@
           "Error occurred while reading a time series metadata block.");
     }
   }
+
+  public MetadataIndexNode getMeasurementNode() {
+    return measurementNode;
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 2c1020c..0c80ddf 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -801,6 +801,15 @@
     }
   }
 
+  /**
+   * Check whether the deivce is aligned or not.
+   *
+   * @param measurementNode the next measurement layer node of specific device node
+   */
+  public boolean isAlignedDevice(MetadataIndexNode measurementNode) {
+    return "".equals(measurementNode.getChildren().get(0).getName());
+  }
+
   TimeseriesMetadata tryToGetFirstTimeseriesMetadata(MetadataIndexNode measurementNode)
       throws IOException {
     // Not aligned timeseries
@@ -834,6 +843,48 @@
   }
 
   /**
+   * Get timeseries metadata under the measurementNode and put them into timeseriesMetadataList.
+   * Skip timeseries whose measurementId is in the excludedMeasurementIds.
+   *
+   * @param measurementNode next layer measurement node of specific device leaf node
+   * @param excludedMeasurementIds skip timeseries whose measurementId is in the set
+   */
+  public void getDeviceTimeseriesMetadata(
+      List<TimeseriesMetadata> timeseriesMetadataList,
+      MetadataIndexNode measurementNode,
+      Set<String> excludedMeasurementIds,
+      boolean needChunkMetadata)
+      throws IOException {
+    int metadataIndexListSize = measurementNode.getChildren().size();
+    for (int i = 0; i < metadataIndexListSize; i++) {
+      long endOffset = measurementNode.getEndOffset();
+      if (i != metadataIndexListSize - 1) {
+        endOffset = measurementNode.getChildren().get(i + 1).getOffset();
+      }
+      ByteBuffer nextBuffer = readData(measurementNode.getChildren().get(i).getOffset(), endOffset);
+      if (measurementNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
+        // leaf measurement node
+        while (nextBuffer.hasRemaining()) {
+          TimeseriesMetadata timeseriesMetadata =
+              TimeseriesMetadata.deserializeFrom(
+                  nextBuffer, excludedMeasurementIds, needChunkMetadata);
+          if (timeseriesMetadata != null) {
+            timeseriesMetadataList.add(timeseriesMetadata);
+          }
+        }
+      } else {
+        // internal measurement node
+        MetadataIndexNode nextLayerMeasurementNode = MetadataIndexNode.deserializeFrom(nextBuffer);
+        getDeviceTimeseriesMetadata(
+            timeseriesMetadataList,
+            nextLayerMeasurementNode,
+            excludedMeasurementIds,
+            needChunkMetadata);
+      }
+    }
+  }
+
+  /**
    * Traverse the metadata index from MetadataIndexEntry to get TimeseriesMetadatas
    *
    * @param metadataIndex MetadataIndexEntry
@@ -1121,6 +1172,22 @@
         header, buffer, chunkCacheKey.getDeleteIntervalList(), chunkCacheKey.getStatistics());
   }
 
+  /** Get measurement schema by chunkMetadatas. */
+  public MeasurementSchema getMeasurementSchema(List<IChunkMetadata> chunkMetadataList)
+      throws IOException {
+    if (chunkMetadataList.isEmpty()) {
+      return null;
+    }
+    IChunkMetadata lastChunkMetadata = chunkMetadataList.get(chunkMetadataList.size() - 1);
+    int chunkHeadSize = ChunkHeader.getSerializedSize(lastChunkMetadata.getMeasurementUid());
+    ChunkHeader header = readChunkHeader(lastChunkMetadata.getOffsetOfChunkHeader(), chunkHeadSize);
+    return new MeasurementSchema(
+        lastChunkMetadata.getMeasurementUid(),
+        header.getDataType(),
+        header.getEncodingType(),
+        header.getCompressionType());
+  }
+
   /**
    * not thread safe.
    *
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileGeneratorUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileGeneratorUtils.java
index 919851e..85bf39a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileGeneratorUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileGeneratorUtils.java
@@ -30,6 +30,7 @@
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
 import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
+import org.apache.iotdb.tsfile.write.record.datapoint.StringDataPoint;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
@@ -278,6 +279,112 @@
     }
   }
 
+  public static File generateAlignedTsFileWithTextValues(
+      String filePath,
+      List<Integer> deviceIndex,
+      List<Integer> measurementIndex,
+      int pointNum,
+      int startTime,
+      int chunkGroupSize,
+      int pageSize)
+      throws IOException, WriteProcessException {
+    File file = fsFactory.getFile(filePath);
+    if (file.exists()) {
+      file.delete();
+    }
+    if (chunkGroupSize > 0)
+      TSFileDescriptor.getInstance().getConfig().setGroupSizeInByte(chunkGroupSize);
+    if (pageSize > 0)
+      TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(pageSize);
+    try (TsFileWriter tsFileWriter = new TsFileWriter(file)) {
+      // register align timeseries
+      List<MeasurementSchema> alignedMeasurementSchemas = new ArrayList<>();
+      for (int i = 0; i < measurementIndex.size(); i++) {
+        alignedMeasurementSchemas.add(
+            new MeasurementSchema(
+                "s" + measurementIndex.get(i), TSDataType.TEXT, TSEncoding.PLAIN));
+      }
+      for (int i = 0; i < deviceIndex.size(); i++) {
+        tsFileWriter.registerAlignedTimeseries(
+            new Path(
+                testStorageGroup + PATH_SEPARATOR + "d" + (deviceIndex.get(i) + alignDeviceOffset)),
+            alignedMeasurementSchemas);
+      }
+
+      // write with record
+      for (int i = 0; i < deviceIndex.size(); i++) {
+        for (long time = startTime; time < pointNum + startTime; time++) {
+          // construct TsRecord
+          TSRecord tsRecord =
+              new TSRecord(
+                  time,
+                  testStorageGroup
+                      + PATH_SEPARATOR
+                      + "d"
+                      + (deviceIndex.get(i) + alignDeviceOffset));
+          for (IMeasurementSchema schema : alignedMeasurementSchemas) {
+            DataPoint dPoint =
+                new StringDataPoint(schema.getMeasurementId(), new Binary("textValue"));
+            tsRecord.addTuple(dPoint);
+          }
+          // write
+          tsFileWriter.writeAligned(tsRecord);
+        }
+      }
+    }
+    return file;
+  }
+
+  public static File generateNonAlignedTsFileWithTextValues(
+      String filePath,
+      List<Integer> deviceIndex,
+      List<Integer> measurementIndex,
+      int pointNum,
+      int startTime,
+      int chunkGroupSize,
+      int pageSize)
+      throws IOException, WriteProcessException {
+    File file = fsFactory.getFile(filePath);
+    if (file.exists()) {
+      file.delete();
+    }
+    if (chunkGroupSize > 0)
+      TSFileDescriptor.getInstance().getConfig().setGroupSizeInByte(chunkGroupSize);
+    if (pageSize > 0)
+      TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(pageSize);
+    try (TsFileWriter tsFileWriter = new TsFileWriter(file)) {
+      // register nonAlign timeseries
+      List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+      for (int i = 0; i < measurementIndex.size(); i++) {
+        measurementSchemas.add(
+            new MeasurementSchema(
+                "s" + measurementIndex.get(i), TSDataType.TEXT, TSEncoding.PLAIN));
+      }
+      for (int i = 0; i < deviceIndex.size(); i++) {
+        tsFileWriter.registerTimeseries(
+            new Path(testStorageGroup + PATH_SEPARATOR + "d" + deviceIndex.get(i)),
+            measurementSchemas);
+      }
+
+      // write with record
+      for (int i = 0; i < deviceIndex.size(); i++) {
+        for (long time = startTime; time < pointNum + startTime; time++) {
+          // construct TsRecord
+          TSRecord tsRecord =
+              new TSRecord(time, testStorageGroup + PATH_SEPARATOR + "d" + deviceIndex.get(i));
+          for (IMeasurementSchema schema : measurementSchemas) {
+            DataPoint dPoint =
+                new StringDataPoint(schema.getMeasurementId(), new Binary("textValue"));
+            tsRecord.addTuple(dPoint);
+          }
+          // write
+          tsFileWriter.write(tsRecord);
+        }
+      }
+      return file;
+    }
+  }
+
   public static String getTsFilePath(String fileParentPath, long tsFileVersion) {
     String fileName =
         System.currentTimeMillis()