Update TsFileSplitter.java
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java index 78d8d03..439a02f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java
@@ -72,14 +72,12 @@ private final TsFileDataConsumer consumer; private Map<Long, IChunkMetadata> offset2ChunkMetadata = new HashMap<>(); private Map<Long, Long> valueChunkOffset2TimeChunkOffset = new HashMap<>(); - private Map<Long, Integer> timeChunkOffset2RemainingValueChunkCount = new HashMap<>(); private List<ModEntry> deletions = new ArrayList<>(); private Map<Integer, List<AlignedChunkData>> pageIndex2ChunkData = new HashMap<>(); private Map<Integer, long[]> pageIndex2Times = new HashMap<>(); private boolean isTimeChunkNeedDecode = true; private IDeviceID curDevice = null; private boolean isAligned; - private int timeChunkIndexOfCurrentValueColumn = 0; private Set<TTimePartitionSlot> timePartitionSlots = new HashSet<>(); // Maintain the number of times the chunk of each measurement appears. @@ -91,7 +89,6 @@ private List<Map<Integer, long[]>> pageIndex2TimesList = null; private List<Boolean> isTimeChunkNeedDecodeList = new ArrayList<>(); private Map<Long, Integer> timeChunkOffset2ContextIndex = new HashMap<>(); - private Long currentTimeChunkOffset = null; public TsFileSplitter(File tsFile, TsFileDataConsumer consumer) { this.tsFile = tsFile; @@ -125,9 +122,7 @@ case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER: processTimeChunkOrNonAlignedChunk(reader, marker); if (isAligned) { - final Long timeChunkOffset = currentTimeChunkOffset; storeTimeChunkContext(); - consumeAlignedChunkDataIfComplete(reader.position(), timeChunkOffset); } break; case MetaMarker.VALUE_CHUNK_HEADER: @@ -142,9 +137,8 @@ pageIndex2TimesList = new ArrayList<>(); isTimeChunkNeedDecodeList = new ArrayList<>(); valueColumn2TimeChunkIndex = new HashMap<>(); - timeChunkIndexOfCurrentValueColumn = 0; timeChunkOffset2ContextIndex = new HashMap<>(); - currentTimeChunkOffset = null; + resetCurrentAlignedTimeChunkContext(); break; case MetaMarker.OPERATION_INDEX_RANGE: reader.readPlanIndex(); @@ -162,8 +156,6 @@ private void processTimeChunkOrNonAlignedChunk(TsFileSequenceReader reader, byte marker) throws IOException, LoadFileException { long chunkOffset = reader.position(); - currentTimeChunkOffset = null; - timeChunkIndexOfCurrentValueColumn = pageIndex2TimesList.size(); ChunkHeader header = reader.readChunkHeader(marker); String measurementId = header.getMeasurementID(); @@ -185,8 +177,9 @@ return; } if (isAligned) { - currentTimeChunkOffset = chunkMetadata.getOffsetOfChunkHeader(); - timeChunkOffset2ContextIndex.put(currentTimeChunkOffset, pageIndex2TimesList.size()); + resetCurrentAlignedTimeChunkContext(); + timeChunkOffset2ContextIndex.put( + chunkMetadata.getOffsetOfChunkHeader(), pageIndex2TimesList.size()); } TTimePartitionSlot timePartitionSlot = TimePartitionUtils.getTimePartitionSlot(chunkMetadata.getStartTime()); @@ -327,7 +320,6 @@ reader, header.getMeasurementID(), chunkMetadata.getOffsetOfChunkHeader()); if (header.getDataSize() == 0) { handleEmptyValueChunk(header, pageIndex2ChunkData, chunkMetadata, isTimeChunkNeedDecode); - consumeValueChunkAndAlignedChunkDataIfComplete(reader.position(), chunkMetadata); return; } @@ -336,7 +328,8 @@ if (alignedChunkDataList == null || alignedChunkDataList.isEmpty()) { throw new TsFileRuntimeException( String.format( - "Missing non-decoded aligned time chunk context for value chunk %s at offset %d, time chunk offset: %s, context page keys: %s.", + "Missing non-decoded aligned time chunk context for value chunk %s at offset %d, " + + "time chunk offset: %s, context page keys: %s.", header.getMeasurementID(), chunkMetadata.getOffsetOfChunkHeader(), valueChunkOffset2TimeChunkOffset.get(chunkMetadata.getOffsetOfChunkHeader()), @@ -345,7 +338,6 @@ AlignedChunkData alignedChunkData = alignedChunkDataList.get(0); alignedChunkData.addValueChunk(header); alignedChunkData.writeEntireChunk(reader.readChunk(-1, header.getDataSize()), chunkMetadata); - consumeValueChunkAndAlignedChunkDataIfComplete(reader.position(), chunkMetadata); return; } @@ -362,7 +354,9 @@ if (alignedChunkDataList == null) { throw new TsFileRuntimeException( String.format( - "Missing decoded aligned time page context for value chunk %s at offset %d, page index: %d, time chunk offset: %s, context page keys: %s, time page keys: %s.", + "Missing decoded aligned time page context for value chunk %s at offset %d, " + + "page index: %d, time chunk offset: %s, context page keys: %s, " + + "time page keys: %s.", header.getMeasurementID(), chunkMetadata.getOffsetOfChunkHeader(), pageIndex, @@ -392,13 +386,16 @@ pageIndex += 1; dataSize -= pageDataSize; } - consumeValueChunkAndAlignedChunkDataIfComplete(reader.position(), chunkMetadata); } private void storeTimeChunkContext() { pageIndex2TimesList.add(pageIndex2Times); pageIndex2ChunkDataList.add(pageIndex2ChunkData); isTimeChunkNeedDecodeList.add(isTimeChunkNeedDecode); + resetCurrentAlignedTimeChunkContext(); + } + + private void resetCurrentAlignedTimeChunkContext() { pageIndex2Times = new HashMap<>(); pageIndex2ChunkData = new HashMap<>(); isTimeChunkNeedDecode = true; @@ -406,26 +403,30 @@ private void switchToTimeChunkContextOfCurrentValueChunk( TsFileSequenceReader reader, String measurement, long valueChunkOffset) throws IOException { + // Keep ordinal fallback in sync for value chunks that cannot be matched by offset or range. + final int occurrenceIndex = valueColumn2TimeChunkIndex.getOrDefault(measurement, 0); + valueColumn2TimeChunkIndex.put(measurement, occurrenceIndex + 1); + final Long timeChunkOffset = valueChunkOffset2TimeChunkOffset.get(valueChunkOffset); int index; if (timeChunkOffset == null) { - index = valueColumn2TimeChunkIndex.getOrDefault(measurement, 0); - valueColumn2TimeChunkIndex.put(measurement, index + 1); + index = occurrenceIndex; } else { final Integer contextIndex = timeChunkOffset2ContextIndex.get(timeChunkOffset); if (contextIndex == null) { throw new TsFileRuntimeException( String.format( - "Cannot find aligned time chunk context for value chunk %s at offset %d in TsFile %s, reader position: %d.", + "Cannot find aligned time chunk context for value chunk %s at offset %d in " + + "TsFile %s, reader position: %d.", measurement, valueChunkOffset, tsFile.getPath(), reader.position())); } index = contextIndex; } - timeChunkIndexOfCurrentValueColumn = index; if (index < 0 || index >= pageIndex2ChunkDataList.size()) { throw new TsFileRuntimeException( String.format( - "Aligned time chunk context index %d is out of range for value chunk %s at offset %d in TsFile %s, reader position: %d.", + "Aligned time chunk context index %d is out of range for value chunk %s at " + + "offset %d in TsFile %s, reader position: %d.", index, measurement, valueChunkOffset, tsFile.getPath(), reader.position())); } pageIndex2Times = pageIndex2TimesList.get(index); @@ -496,7 +497,6 @@ long timeChunkOffset = timeChunkMetadata.getOffsetOfChunkHeader(); offset2ChunkMetadata.put(timeChunkOffset, timeChunkMetadata); - int valueChunkCount = 0; for (IChunkMetadata valueChunkMetadata : alignedChunkMetadata.getValueChunkMetadataList()) { if (valueChunkMetadata == null) { continue; @@ -504,10 +504,7 @@ long valueChunkOffset = valueChunkMetadata.getOffsetOfChunkHeader(); offset2ChunkMetadata.put(valueChunkOffset, valueChunkMetadata); valueChunkOffset2TimeChunkOffset.put(valueChunkOffset, timeChunkOffset); - valueChunkCount += 1; } - timeChunkOffset2RemainingValueChunkCount.merge( - timeChunkOffset, valueChunkCount, Integer::sum); } } mapUnmatchedValueChunksToTimeChunks(device2Metadata, alignedDevices); @@ -530,15 +527,7 @@ } } } - for (IChunkMetadata timeChunkMetadata : timeChunkMetadataList) { - timeChunkOffset2RemainingValueChunkCount.putIfAbsent( - timeChunkMetadata.getOffsetOfChunkHeader(), 0); - } for (IChunkMetadata valueChunkMetadata : valueChunkMetadataList) { - if (valueChunkOffset2TimeChunkOffset.containsKey( - valueChunkMetadata.getOffsetOfChunkHeader())) { - continue; - } IChunkMetadata timeChunkMetadata = findCorrespondingTimeChunk(timeChunkMetadataList, valueChunkMetadata); if (timeChunkMetadata == null) { @@ -547,7 +536,6 @@ long timeChunkOffset = timeChunkMetadata.getOffsetOfChunkHeader(); valueChunkOffset2TimeChunkOffset.put( valueChunkMetadata.getOffsetOfChunkHeader(), timeChunkOffset); - timeChunkOffset2RemainingValueChunkCount.merge(timeChunkOffset, 1, Integer::sum); } } } @@ -617,11 +605,15 @@ } private void consumeAllPendingAlignedChunkData(final long offset) throws LoadFileException { - consumeAllAlignedChunkData(offset, pageIndex2ChunkData); - for (final Map<Integer, List<AlignedChunkData>> pendingPageIndex2ChunkData : - pageIndex2ChunkDataList) { - consumeAllAlignedChunkData(offset, pendingPageIndex2ChunkData); + if (pageIndex2ChunkDataList.isEmpty()) { + consumeAllAlignedChunkData(offset, pageIndex2ChunkData); + } else { + for (final Map<Integer, List<AlignedChunkData>> pendingPageIndex2ChunkData : + pageIndex2ChunkDataList) { + consumeAllAlignedChunkData(offset, pendingPageIndex2ChunkData); + } } + resetCurrentAlignedTimeChunkContext(); pageIndex2ChunkDataList.clear(); if (pageIndex2TimesList != null) { pageIndex2TimesList.clear(); @@ -629,40 +621,6 @@ isTimeChunkNeedDecodeList.clear(); valueColumn2TimeChunkIndex.clear(); timeChunkOffset2ContextIndex.clear(); - currentTimeChunkOffset = null; - } - - private void consumeValueChunkAndAlignedChunkDataIfComplete( - final long offset, final IChunkMetadata valueChunkMetadata) throws LoadFileException { - final Long timeChunkOffset = - valueChunkOffset2TimeChunkOffset.get(valueChunkMetadata.getOffsetOfChunkHeader()); - if (timeChunkOffset == null) { - return; - } - final Integer remainingValueChunkCount = - timeChunkOffset2RemainingValueChunkCount.get(timeChunkOffset); - if (remainingValueChunkCount == null) { - return; - } - if (remainingValueChunkCount > 0) { - timeChunkOffset2RemainingValueChunkCount.put(timeChunkOffset, remainingValueChunkCount - 1); - } - consumeAlignedChunkDataIfComplete(offset, timeChunkOffset); - } - - private void consumeAlignedChunkDataIfComplete(final long offset, final Long timeChunkOffset) - throws LoadFileException { - if (timeChunkOffset == null - || timeChunkOffset2RemainingValueChunkCount.getOrDefault(timeChunkOffset, 0) > 0) { - return; - } - final Integer contextIndex = timeChunkOffset2ContextIndex.remove(timeChunkOffset); - if (contextIndex == null - || contextIndex < 0 - || contextIndex >= pageIndex2ChunkDataList.size()) { - return; - } - consumeAllAlignedChunkData(offset, pageIndex2ChunkDataList.get(contextIndex)); } private void consumeChunkData(String measurement, long offset, ChunkData chunkData)