Avoid modification causing OOM while there is much deletion entry in mods file
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
index 60ee9ba..3b9b604 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
@@ -33,18 +33,11 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
 /** QueryContext contains the shared information with in a query. */
 public class QueryContext {
 
   /**
-   * The outer key is the path of a ModificationFile, the inner key in the name of a timeseries and
-   * the value is the Modifications of a timeseries in this file.
-   */
-  private final Map<String, Map<String, List<Modification>>> filePathModCache =
-      new ConcurrentHashMap<>();
-  /**
    * The key is the path of a ModificationFile and the value is all Modifications in this file. We
    * use this field because each call of Modification.getModifications() return a copy of the
    * Modifications, and we do not want it to create multiple copies within a query.
@@ -86,22 +79,17 @@
     if (!modFile.exists()) {
       return Collections.emptyList();
     }
-    Map<String, List<Modification>> fileModifications =
-        filePathModCache.computeIfAbsent(modFile.getFilePath(), k -> new ConcurrentHashMap<>());
-    return fileModifications.computeIfAbsent(
-        path.getFullPath(),
-        k -> {
-          PatternTreeMap<Modification, ModsSerializer> allModifications =
-              fileModCache.get(modFile.getFilePath());
-          if (allModifications == null) {
-            allModifications = PatternTreeMapFactory.getModsPatternTreeMap();
-            for (Modification modification : modFile.getModificationsIter()) {
-              allModifications.append(modification.getPath(), modification);
-            }
-            fileModCache.put(modFile.getFilePath(), allModifications);
-          }
-          return ModificationFile.sortAndMerge(allModifications.getOverlapped(path));
-        });
+
+    PatternTreeMap<Modification, ModsSerializer> allModifications =
+        fileModCache.get(modFile.getFilePath());
+    if (allModifications == null) {
+      allModifications = PatternTreeMapFactory.getModsPatternTreeMap();
+      for (Modification modification : modFile.getModificationsIter()) {
+        allModifications.append(modification.getPath(), modification);
+      }
+      fileModCache.put(modFile.getFilePath(), allModifications);
+    }
+    return ModificationFile.sortAndMerge(allModifications.getOverlapped(path));
   }
 
   /**
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
index f17c3f9..b041ef0 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
@@ -140,8 +140,10 @@
     }
 
     boolean[] exist = new boolean[partialPath.getSchemaList().size()];
+    boolean modified = false;
     for (IChunkMetadata chunkMetadata : chunkMetadataList) {
       AlignedChunkMetadata alignedChunkMetadata = (AlignedChunkMetadata) chunkMetadata;
+      modified = (modified || alignedChunkMetadata.isModified());
       timeStatistics.mergeStatistics(alignedChunkMetadata.getTimeChunkMetadata().getStatistics());
       for (int i = 0; i < valueTimeSeriesMetadataList.size(); i++) {
         if (alignedChunkMetadata.getValueChunkMetadataList().get(i) != null) {
@@ -172,7 +174,9 @@
         }
       }
     }
+
     timeTimeSeriesMetadata.setStatistics(timeStatistics);
+    timeTimeSeriesMetadata.setModified(modified);
 
     for (int i = 0; i < valueTimeSeriesMetadataList.size(); i++) {
       if (!exist[i]) {
@@ -318,8 +322,7 @@
    */
   @Override
   public ITimeSeriesMetadata generateTimeSeriesMetadata(
-      List<ReadOnlyMemChunk> readOnlyMemChunk, List<IChunkMetadata> chunkMetadataList)
-      throws IOException {
+      List<ReadOnlyMemChunk> readOnlyMemChunk, List<IChunkMetadata> chunkMetadataList) {
     TimeseriesMetadata timeSeriesMetadata = new TimeseriesMetadata();
     timeSeriesMetadata.setMeasurementId(partialPath.getMeasurementSchema().getMeasurementId());
     timeSeriesMetadata.setTsDataType(partialPath.getMeasurementSchema().getType());
@@ -329,7 +332,9 @@
     Statistics<? extends Serializable> seriesStatistics =
         Statistics.getStatsByType(timeSeriesMetadata.getTsDataType());
     // flush chunkMetadataList one by one
+    boolean isModified = false;
     for (IChunkMetadata chunkMetadata : chunkMetadataList) {
+      isModified = (isModified || chunkMetadata.isModified());
       seriesStatistics.mergeStatistics(chunkMetadata.getStatistics());
     }
 
@@ -339,6 +344,7 @@
       }
     }
     timeSeriesMetadata.setStatistics(seriesStatistics);
+    timeSeriesMetadata.setModified(isModified);
     return timeSeriesMetadata;
   }
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java
index 95dbfd6..43ffd69 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java
@@ -57,6 +57,9 @@
   // it's only exact while using limit & offset push down
   private final boolean queryAllSensors;
 
+  // all sub sensors' modifications
+  private final List<List<Modification>> pathModifications;
+
   private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG");
   private static final SeriesScanCostMetricSet SERIES_SCAN_COST_METRIC_SET =
       SeriesScanCostMetricSet.getInstance();
@@ -66,12 +69,14 @@
       AlignedPath seriesPath,
       QueryContext context,
       Filter filter,
-      boolean queryAllSensors) {
+      boolean queryAllSensors,
+      List<List<Modification>> pathModifications) {
     this.resource = resource;
     this.seriesPath = seriesPath;
     this.context = context;
     this.filter = filter;
     this.queryAllSensors = queryAllSensors;
+    this.pathModifications = pathModifications;
   }
 
   @Override
@@ -82,9 +87,6 @@
           ((AlignedTimeSeriesMetadata) timeSeriesMetadata).getCopiedChunkMetadataList();
 
       final long t2 = System.nanoTime();
-      // get all sub sensors' modifications
-      List<List<Modification>> pathModifications =
-          context.getPathModifications(resource.getModFile(), seriesPath);
 
       if (context.isDebug()) {
         DEBUG_LOGGER.info(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskChunkMetadataLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskChunkMetadataLoader.java
index 1a43189..b33a7e6 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskChunkMetadataLoader.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskChunkMetadataLoader.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.metadata;
 
-import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
 import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
 import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
@@ -44,21 +43,25 @@
 public class DiskChunkMetadataLoader implements IChunkMetadataLoader {
 
   private final TsFileResource resource;
-  private final PartialPath seriesPath;
   private final QueryContext context;
   // time filter or value filter, only used to check time range
   private final Filter filter;
 
+  private final List<Modification> pathModifications;
+
   private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG");
   private static final SeriesScanCostMetricSet SERIES_SCAN_COST_METRIC_SET =
       SeriesScanCostMetricSet.getInstance();
 
   public DiskChunkMetadataLoader(
-      TsFileResource resource, PartialPath seriesPath, QueryContext context, Filter filter) {
+      TsFileResource resource,
+      QueryContext context,
+      Filter filter,
+      List<Modification> pathModifications) {
     this.resource = resource;
-    this.seriesPath = seriesPath;
     this.context = context;
     this.filter = filter;
+    this.pathModifications = pathModifications;
   }
 
   @Override
@@ -69,8 +72,6 @@
           ((TimeseriesMetadata) timeSeriesMetadata).getCopiedChunkMetadataList();
 
       final long t2 = System.nanoTime();
-      List<Modification> pathModifications =
-          context.getPathModifications(resource.getModFile(), seriesPath);
 
       if (context.isDebug()) {
         DEBUG_LOGGER.info(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index 2d6d538..8463ce2 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -149,8 +149,11 @@
                     resource.getTimeIndexType() != 1,
                     context.isDebug());
         if (timeSeriesMetadata != null) {
+          List<Modification> pathModifications =
+              context.getPathModifications(resource.getModFile(), seriesPath);
+          timeSeriesMetadata.setModified(!pathModifications.isEmpty());
           timeSeriesMetadata.setChunkMetadataLoader(
-              new DiskChunkMetadataLoader(resource, seriesPath, context, filter));
+              new DiskChunkMetadataLoader(resource, context, filter, pathModifications));
         }
       } else { // if the tsfile is unclosed, we just get it directly from TsFileResource
         loadFromMem = true;
@@ -165,9 +168,6 @@
       if (timeSeriesMetadata != null) {
         long t2 = System.nanoTime();
         try {
-          List<Modification> pathModifications =
-              context.getPathModifications(resource.getModFile(), seriesPath);
-          timeSeriesMetadata.setModified(!pathModifications.isEmpty());
           if (timeSeriesMetadata.getStatistics().getStartTime()
               > timeSeriesMetadata.getStatistics().getEndTime()) {
             return null;
@@ -224,6 +224,7 @@
           alignedTimeSeriesMetadata.setChunkMetadataLoader(
               new MemAlignedChunkMetadataLoader(
                   resource, alignedPath, context, filter, queryAllSensors));
+          // mem's modification already done in generating chunkmetadata
         }
       }
 
@@ -240,9 +241,6 @@
                   alignedTimeSeriesMetadata.getTimeseriesMetadata().getStatistics().getEndTime())) {
             return null;
           }
-
-          // set modifications to each aligned path
-          setModifications(resource, alignedTimeSeriesMetadata, alignedPath, context);
         } finally {
           SERIES_SCAN_COST_METRIC_SET.recordSeriesScanCost(
               TIMESERIES_METADATA_MODIFICATION_ALIGNED, System.nanoTime() - t2);
@@ -292,7 +290,7 @@
             new AlignedTimeSeriesMetadata(timeColumn, Collections.emptyList());
         alignedTimeSeriesMetadata.setChunkMetadataLoader(
             new DiskAlignedChunkMetadataLoader(
-                resource, alignedPath, context, filter, queryAllSensors));
+                resource, alignedPath, context, filter, queryAllSensors, Collections.emptyList()));
       } else {
         List<TimeseriesMetadata> valueTimeSeriesMetadataList =
             new ArrayList<>(valueMeasurementList.size());
@@ -309,24 +307,29 @@
           valueTimeSeriesMetadataList.add(valueColumn);
         }
         if (exist) {
+          // set modifications to each aligned path
           alignedTimeSeriesMetadata =
               new AlignedTimeSeriesMetadata(timeColumn, valueTimeSeriesMetadataList);
+          List<List<Modification>> pathModifications =
+              setModifications(resource, alignedTimeSeriesMetadata, alignedPath, context);
+
           alignedTimeSeriesMetadata.setChunkMetadataLoader(
               new DiskAlignedChunkMetadataLoader(
-                  resource, alignedPath, context, filter, queryAllSensors));
+                  resource, alignedPath, context, filter, queryAllSensors, pathModifications));
         }
       }
     }
     return alignedTimeSeriesMetadata;
   }
 
-  private static void setModifications(
+  private static List<List<Modification>> setModifications(
       TsFileResource resource,
       AlignedTimeSeriesMetadata alignedTimeSeriesMetadata,
       AlignedPath alignedPath,
       QueryContext context) {
     List<TimeseriesMetadata> valueTimeSeriesMetadataList =
         alignedTimeSeriesMetadata.getValueTimeseriesMetadataList();
+    List<List<Modification>> res = new ArrayList<>();
     boolean modified = false;
     for (int i = 0; i < valueTimeSeriesMetadataList.size(); i++) {
       if (valueTimeSeriesMetadataList.get(i) != null) {
@@ -334,10 +337,12 @@
             context.getPathModifications(
                 resource.getModFile(), alignedPath.getPathWithMeasurement(i));
         valueTimeSeriesMetadataList.get(i).setModified(!pathModifications.isEmpty());
+        res.add(pathModifications);
         modified = (modified || !pathModifications.isEmpty());
       }
     }
     alignedTimeSeriesMetadata.getTimeseriesMetadata().setModified(modified);
+    return res;
   }
 
   /**