Avoid modification causing OOM while there is much deletion entry in mods file
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java index 60ee9ba..3b9b604 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
@@ -33,18 +33,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; /** QueryContext contains the shared information with in a query. */ public class QueryContext { /** - * The outer key is the path of a ModificationFile, the inner key in the name of a timeseries and - * the value is the Modifications of a timeseries in this file. - */ - private final Map<String, Map<String, List<Modification>>> filePathModCache = - new ConcurrentHashMap<>(); - /** * The key is the path of a ModificationFile and the value is all Modifications in this file. We * use this field because each call of Modification.getModifications() return a copy of the * Modifications, and we do not want it to create multiple copies within a query. @@ -86,22 +79,17 @@ if (!modFile.exists()) { return Collections.emptyList(); } - Map<String, List<Modification>> fileModifications = - filePathModCache.computeIfAbsent(modFile.getFilePath(), k -> new ConcurrentHashMap<>()); - return fileModifications.computeIfAbsent( - path.getFullPath(), - k -> { - PatternTreeMap<Modification, ModsSerializer> allModifications = - fileModCache.get(modFile.getFilePath()); - if (allModifications == null) { - allModifications = PatternTreeMapFactory.getModsPatternTreeMap(); - for (Modification modification : modFile.getModificationsIter()) { - allModifications.append(modification.getPath(), modification); - } - fileModCache.put(modFile.getFilePath(), allModifications); - } - return ModificationFile.sortAndMerge(allModifications.getOverlapped(path)); - }); + + PatternTreeMap<Modification, ModsSerializer> allModifications = + fileModCache.get(modFile.getFilePath()); + if (allModifications == null) { + allModifications = PatternTreeMapFactory.getModsPatternTreeMap(); + for (Modification modification : modFile.getModificationsIter()) { + allModifications.append(modification.getPath(), modification); + } + fileModCache.put(modFile.getFilePath(), allModifications); + } + return ModificationFile.sortAndMerge(allModifications.getOverlapped(path)); } /**
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java index f17c3f9..b041ef0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
@@ -140,8 +140,10 @@ } boolean[] exist = new boolean[partialPath.getSchemaList().size()]; + boolean modified = false; for (IChunkMetadata chunkMetadata : chunkMetadataList) { AlignedChunkMetadata alignedChunkMetadata = (AlignedChunkMetadata) chunkMetadata; + modified = (modified || alignedChunkMetadata.isModified()); timeStatistics.mergeStatistics(alignedChunkMetadata.getTimeChunkMetadata().getStatistics()); for (int i = 0; i < valueTimeSeriesMetadataList.size(); i++) { if (alignedChunkMetadata.getValueChunkMetadataList().get(i) != null) { @@ -172,7 +174,9 @@ } } } + timeTimeSeriesMetadata.setStatistics(timeStatistics); + timeTimeSeriesMetadata.setModified(modified); for (int i = 0; i < valueTimeSeriesMetadataList.size(); i++) { if (!exist[i]) { @@ -318,8 +322,7 @@ */ @Override public ITimeSeriesMetadata generateTimeSeriesMetadata( - List<ReadOnlyMemChunk> readOnlyMemChunk, List<IChunkMetadata> chunkMetadataList) - throws IOException { + List<ReadOnlyMemChunk> readOnlyMemChunk, List<IChunkMetadata> chunkMetadataList) { TimeseriesMetadata timeSeriesMetadata = new TimeseriesMetadata(); timeSeriesMetadata.setMeasurementId(partialPath.getMeasurementSchema().getMeasurementId()); timeSeriesMetadata.setTsDataType(partialPath.getMeasurementSchema().getType()); @@ -329,7 +332,9 @@ Statistics<? extends Serializable> seriesStatistics = Statistics.getStatsByType(timeSeriesMetadata.getTsDataType()); // flush chunkMetadataList one by one + boolean isModified = false; for (IChunkMetadata chunkMetadata : chunkMetadataList) { + isModified = (isModified || chunkMetadata.isModified()); seriesStatistics.mergeStatistics(chunkMetadata.getStatistics()); } @@ -339,6 +344,7 @@ } } timeSeriesMetadata.setStatistics(seriesStatistics); + timeSeriesMetadata.setModified(isModified); return timeSeriesMetadata; }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java index 95dbfd6..43ffd69 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java
@@ -57,6 +57,9 @@ // it's only exact while using limit & offset push down private final boolean queryAllSensors; + // all sub sensors' modifications + private final List<List<Modification>> pathModifications; + private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG"); private static final SeriesScanCostMetricSet SERIES_SCAN_COST_METRIC_SET = SeriesScanCostMetricSet.getInstance(); @@ -66,12 +69,14 @@ AlignedPath seriesPath, QueryContext context, Filter filter, - boolean queryAllSensors) { + boolean queryAllSensors, + List<List<Modification>> pathModifications) { this.resource = resource; this.seriesPath = seriesPath; this.context = context; this.filter = filter; this.queryAllSensors = queryAllSensors; + this.pathModifications = pathModifications; } @Override @@ -82,9 +87,6 @@ ((AlignedTimeSeriesMetadata) timeSeriesMetadata).getCopiedChunkMetadataList(); final long t2 = System.nanoTime(); - // get all sub sensors' modifications - List<List<Modification>> pathModifications = - context.getPathModifications(resource.getModFile(), seriesPath); if (context.isDebug()) { DEBUG_LOGGER.info(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskChunkMetadataLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskChunkMetadataLoader.java index 1a43189..b33a7e6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskChunkMetadataLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskChunkMetadataLoader.java
@@ -19,7 +19,6 @@ package org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.metadata; -import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet; import org.apache.iotdb.db.storageengine.dataregion.modification.Modification; @@ -44,21 +43,25 @@ public class DiskChunkMetadataLoader implements IChunkMetadataLoader { private final TsFileResource resource; - private final PartialPath seriesPath; private final QueryContext context; // time filter or value filter, only used to check time range private final Filter filter; + private final List<Modification> pathModifications; + private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG"); private static final SeriesScanCostMetricSet SERIES_SCAN_COST_METRIC_SET = SeriesScanCostMetricSet.getInstance(); public DiskChunkMetadataLoader( - TsFileResource resource, PartialPath seriesPath, QueryContext context, Filter filter) { + TsFileResource resource, + QueryContext context, + Filter filter, + List<Modification> pathModifications) { this.resource = resource; - this.seriesPath = seriesPath; this.context = context; this.filter = filter; + this.pathModifications = pathModifications; } @Override @@ -69,8 +72,6 @@ ((TimeseriesMetadata) timeSeriesMetadata).getCopiedChunkMetadataList(); final long t2 = System.nanoTime(); - List<Modification> pathModifications = - context.getPathModifications(resource.getModFile(), seriesPath); if (context.isDebug()) { DEBUG_LOGGER.info(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java index 2d6d538..8463ce2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -149,8 +149,11 @@ resource.getTimeIndexType() != 1, context.isDebug()); if (timeSeriesMetadata != null) { + List<Modification> pathModifications = + context.getPathModifications(resource.getModFile(), seriesPath); + timeSeriesMetadata.setModified(!pathModifications.isEmpty()); timeSeriesMetadata.setChunkMetadataLoader( - new DiskChunkMetadataLoader(resource, seriesPath, context, filter)); + new DiskChunkMetadataLoader(resource, context, filter, pathModifications)); } } else { // if the tsfile is unclosed, we just get it directly from TsFileResource loadFromMem = true; @@ -165,9 +168,6 @@ if (timeSeriesMetadata != null) { long t2 = System.nanoTime(); try { - List<Modification> pathModifications = - context.getPathModifications(resource.getModFile(), seriesPath); - timeSeriesMetadata.setModified(!pathModifications.isEmpty()); if (timeSeriesMetadata.getStatistics().getStartTime() > timeSeriesMetadata.getStatistics().getEndTime()) { return null; @@ -224,6 +224,7 @@ alignedTimeSeriesMetadata.setChunkMetadataLoader( new MemAlignedChunkMetadataLoader( resource, alignedPath, context, filter, queryAllSensors)); + // mem's modification already done in generating chunkmetadata } } @@ -240,9 +241,6 @@ alignedTimeSeriesMetadata.getTimeseriesMetadata().getStatistics().getEndTime())) { return null; } - - // set modifications to each aligned path - setModifications(resource, alignedTimeSeriesMetadata, alignedPath, context); } finally { SERIES_SCAN_COST_METRIC_SET.recordSeriesScanCost( TIMESERIES_METADATA_MODIFICATION_ALIGNED, System.nanoTime() - t2); @@ -292,7 +290,7 @@ new AlignedTimeSeriesMetadata(timeColumn, Collections.emptyList()); alignedTimeSeriesMetadata.setChunkMetadataLoader( new DiskAlignedChunkMetadataLoader( - resource, alignedPath, context, filter, queryAllSensors)); + resource, alignedPath, context, filter, queryAllSensors, Collections.emptyList())); } else { List<TimeseriesMetadata> valueTimeSeriesMetadataList = new ArrayList<>(valueMeasurementList.size()); @@ -309,24 +307,29 @@ valueTimeSeriesMetadataList.add(valueColumn); } if (exist) { + // set modifications to each aligned path alignedTimeSeriesMetadata = new AlignedTimeSeriesMetadata(timeColumn, valueTimeSeriesMetadataList); + List<List<Modification>> pathModifications = + setModifications(resource, alignedTimeSeriesMetadata, alignedPath, context); + alignedTimeSeriesMetadata.setChunkMetadataLoader( new DiskAlignedChunkMetadataLoader( - resource, alignedPath, context, filter, queryAllSensors)); + resource, alignedPath, context, filter, queryAllSensors, pathModifications)); } } } return alignedTimeSeriesMetadata; } - private static void setModifications( + private static List<List<Modification>> setModifications( TsFileResource resource, AlignedTimeSeriesMetadata alignedTimeSeriesMetadata, AlignedPath alignedPath, QueryContext context) { List<TimeseriesMetadata> valueTimeSeriesMetadataList = alignedTimeSeriesMetadata.getValueTimeseriesMetadataList(); + List<List<Modification>> res = new ArrayList<>(); boolean modified = false; for (int i = 0; i < valueTimeSeriesMetadataList.size(); i++) { if (valueTimeSeriesMetadataList.get(i) != null) { @@ -334,10 +337,12 @@ context.getPathModifications( resource.getModFile(), alignedPath.getPathWithMeasurement(i)); valueTimeSeriesMetadataList.get(i).setModified(!pathModifications.isEmpty()); + res.add(pathModifications); modified = (modified || !pathModifications.isEmpty()); } } alignedTimeSeriesMetadata.getTimeseriesMetadata().setModified(modified); + return res; } /**