chunk
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/AlignedSinglePageWholeChunkReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/AlignedSinglePageWholeChunkReader.java index 757e7f8..ec6ed5f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/AlignedSinglePageWholeChunkReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/AlignedSinglePageWholeChunkReader.java
@@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan; +import org.apache.tsfile.compress.IUnCompressor; import org.apache.tsfile.encoding.decoder.Decoder; import org.apache.tsfile.encrypt.EncryptParameter; import org.apache.tsfile.encrypt.IDecryptor; @@ -31,6 +32,7 @@ import org.apache.tsfile.read.reader.chunk.AbstractChunkReader; import org.apache.tsfile.read.reader.chunk.ChunkReader; import org.apache.tsfile.read.reader.page.AlignedPageReader; +import org.apache.tsfile.read.reader.page.LazyLoadPageData; import java.io.IOException; import java.io.Serializable; @@ -43,7 +45,8 @@ * The {@link AlignedSinglePageWholeChunkReader} is used to read a whole single page aligned chunk * with need to pass in the statistics. */ -public class AlignedSinglePageWholeChunkReader extends AbstractChunkReader { +public class AlignedSinglePageWholeChunkReader extends AbstractChunkReader + implements EstimatedMemoryChunkReader { // chunk header of the time column private final ChunkHeader timeChunkHeader; @@ -58,6 +61,7 @@ private final List<ByteBuffer> valueChunkDataBufferList = new ArrayList<>(); // deleted intervals of all the sub sensors private final List<List<TimeRange>> valueDeleteIntervalsList = new ArrayList<>(); + private final long pageEstimatedMemoryUsageInBytes; public AlignedSinglePageWholeChunkReader( Chunk timeChunk, List<Chunk> valueChunkList, LongConsumer filteredRowsRecord) @@ -66,6 +70,8 @@ this.timeChunkHeader = timeChunk.getHeader(); this.timeChunkDataBuffer = timeChunk.getData(); this.encryptParam = timeChunk.getEncryptParam(); + this.pageEstimatedMemoryUsageInBytes = + calculatePageEstimatedMemoryUsageInBytes(timeChunk, valueChunkList); valueChunkList.forEach( chunk -> { @@ -133,7 +139,7 @@ timePageHeader, timeChunkDataBuffer, timeChunkHeader, decryptor); List<PageHeader> valuePageHeaderList = new ArrayList<>(); - List<ByteBuffer> valuePageDataList = new ArrayList<>(); + LazyLoadPageData[] valuePageDataArray = new LazyLoadPageData[rawValuePageHeaderList.size()]; List<TSDataType> valueDataTypeList = new ArrayList<>(); List<Decoder> valueDecoderList = new ArrayList<>(); @@ -144,15 +150,22 @@ if (valuePageHeader == null || valuePageHeader.getUncompressedSize() == 0) { // Empty Page valuePageHeaderList.add(null); - valuePageDataList.add(null); + valuePageDataArray[i] = null; valueDataTypeList.add(null); valueDecoderList.add(null); } else { ChunkHeader valueChunkHeader = valueChunkHeaderList.get(i); + int currentPagePosition = valueChunkDataBufferList.get(i).position(); + valueChunkDataBufferList + .get(i) + .position(currentPagePosition + valuePageHeader.getCompressedSize()); valuePageHeaderList.add(valuePageHeader); - valuePageDataList.add( - ChunkReader.deserializePageData( - valuePageHeader, valueChunkDataBufferList.get(i), valueChunkHeader, decryptor)); + valuePageDataArray[i] = + new LazyLoadPageData( + valueChunkDataBufferList.get(i).array(), + currentPagePosition, + IUnCompressor.getUnCompressor(valueChunkHeader.getCompressionType()), + encryptParam); valueDataTypeList.add(valueChunkHeader.getDataType()); valueDecoderList.add( Decoder.getDecoderByType( @@ -169,11 +182,38 @@ timePageData, defaultTimeDecoder, valuePageHeaderList, - valuePageDataList, + valuePageDataArray, valueDataTypeList, valueDecoderList, queryFilter); alignedPageReader.setDeleteIntervalList(valueDeleteIntervalsList); return alignedPageReader; } + + @Override + public long getCurrentPageEstimatedMemoryUsageInBytes() { + return pageEstimatedMemoryUsageInBytes; + } + + public static long calculatePageEstimatedMemoryUsageInBytes( + final Chunk timeChunk, final List<Chunk> valueChunkList) throws IOException { + final ByteBuffer timeChunkDataBuffer = timeChunk.getData().duplicate(); + long estimatedMemoryUsageInBytes = + PageHeader.deserializeFrom(timeChunkDataBuffer, (Statistics<? extends Serializable>) null) + .getUncompressedSize(); + + for (final Chunk valueChunk : valueChunkList) { + if (valueChunk == null) { + continue; + } + + final ByteBuffer valueChunkDataBuffer = valueChunk.getData().duplicate(); + estimatedMemoryUsageInBytes += + PageHeader.deserializeFrom( + valueChunkDataBuffer, (Statistics<? extends Serializable>) null) + .getUncompressedSize(); + } + + return estimatedMemoryUsageInBytes; + } }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/EstimatedMemoryChunkReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/EstimatedMemoryChunkReader.java new file mode 100644 index 0000000..1d45574 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/EstimatedMemoryChunkReader.java
@@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan; + +interface EstimatedMemoryChunkReader { + + long getCurrentPageEstimatedMemoryUsageInBytes(); +}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/SinglePageWholeChunkReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/SinglePageWholeChunkReader.java index 2b2743b..6be6f5a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/SinglePageWholeChunkReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/SinglePageWholeChunkReader.java
@@ -29,6 +29,7 @@ import org.apache.tsfile.file.metadata.statistics.Statistics; import org.apache.tsfile.read.common.Chunk; import org.apache.tsfile.read.reader.chunk.AbstractChunkReader; +import org.apache.tsfile.read.reader.page.LazyLoadPageData; import org.apache.tsfile.read.reader.page.PageReader; import java.io.IOException; @@ -37,10 +38,12 @@ import static org.apache.tsfile.file.metadata.enums.CompressionType.UNCOMPRESSED; -public class SinglePageWholeChunkReader extends AbstractChunkReader { +public class SinglePageWholeChunkReader extends AbstractChunkReader + implements EstimatedMemoryChunkReader { private final ChunkHeader chunkHeader; private final ByteBuffer chunkDataBuffer; private final EncryptParameter encryptParam; + private final long pageEstimatedMemoryUsageInBytes; public SinglePageWholeChunkReader(Chunk chunk) throws IOException { super(Long.MIN_VALUE, null, null); @@ -48,6 +51,7 @@ this.chunkHeader = chunk.getHeader(); this.chunkDataBuffer = chunk.getData(); this.encryptParam = chunk.getEncryptParam(); + this.pageEstimatedMemoryUsageInBytes = calculatePageEstimatedMemoryUsageInBytes(chunk); initAllPageReaders(); } @@ -62,16 +66,34 @@ } private PageReader constructPageReader(PageHeader pageHeader) throws IOException { - IDecryptor decryptor = IDecryptor.getDecryptor(encryptParam); + final int currentPagePosition = chunkDataBuffer.position(); + chunkDataBuffer.position(currentPagePosition + pageHeader.getCompressedSize()); return new PageReader( pageHeader, - deserializePageData(pageHeader, chunkDataBuffer, chunkHeader, decryptor), + new LazyLoadPageData( + chunkDataBuffer.array(), + currentPagePosition, + IUnCompressor.getUnCompressor(chunkHeader.getCompressionType()), + encryptParam), chunkHeader.getDataType(), Decoder.getDecoderByType(chunkHeader.getEncodingType(), chunkHeader.getDataType()), defaultTimeDecoder, null); } + @Override + public long getCurrentPageEstimatedMemoryUsageInBytes() { + return pageEstimatedMemoryUsageInBytes; + } + + public static long calculatePageEstimatedMemoryUsageInBytes(final Chunk chunk) + throws IOException { + final ByteBuffer chunkDataBuffer = chunk.getData().duplicate(); + final PageHeader pageHeader = + PageHeader.deserializeFrom(chunkDataBuffer, (Statistics<? extends Serializable>) null); + return pageHeader.getUncompressedSize(); + } + ///////////////////////////////////////////////////////////////////////////////////////////////// // util methods /////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java index aeb7aaa..492bd4f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
@@ -97,10 +97,11 @@ // Cached time chunk private final List<Chunk> timeChunkList = new ArrayList<>(); private final List<Boolean> isMultiPageList = new ArrayList<>(); + private final List<Long> timeChunkPageMemorySizeList = new ArrayList<>(); private final Map<String, Integer> measurementIndexMap = new HashMap<>(); private int lastIndex = -1; - private ChunkHeader firstChunkHeader4NextSequentialValueChunks; + private Chunk firstChunk4NextSequentialValueChunks; private byte lastMarker = Byte.MIN_VALUE; @@ -372,6 +373,7 @@ } do { + resizeBatchDataMemoryForCurrentPageIfNeeded(); data = chunkReader.nextPageData(); long size = PipeMemoryWeightUtil.calculateBatchDataRamBytesUsed(data); if (allocatedMemoryBlockForBatchData.getMemoryUsageInBytes() < size) { @@ -381,6 +383,19 @@ } while (!data.hasCurrent()); } + private void resizeBatchDataMemoryForCurrentPageIfNeeded() { + if (!(chunkReader instanceof EstimatedMemoryChunkReader)) { + return; + } + + final long estimatedMemoryUsageInBytes = + ((EstimatedMemoryChunkReader) chunkReader).getCurrentPageEstimatedMemoryUsageInBytes(); + if (allocatedMemoryBlockForBatchData.getMemoryUsageInBytes() < estimatedMemoryUsageInBytes) { + PipeDataNodeResourceManager.memory() + .forceResize(allocatedMemoryBlockForBatchData, estimatedMemoryUsageInBytes); + } + } + private boolean putValueToColumns(final BatchData data, final Tablet tablet, final int rowIndex) { boolean isNeedFillTime = false; if (data.getDataType() == TSDataType.VECTOR) { @@ -471,6 +486,7 @@ throws IOException, IllegalStateException, IllegalPathException { ChunkHeader chunkHeader; long valueChunkSize = 0; + long valueChunkPageMemorySize = 0; final List<Chunk> valueChunkList = new ArrayList<>(); currentMeasurements.clear(); modsInfos.clear(); @@ -481,7 +497,12 @@ } byte marker; - while ((marker = lastMarker != Byte.MIN_VALUE ? lastMarker : tsFileSequenceReader.readMarker()) + while ((marker = + lastMarker != Byte.MIN_VALUE + ? lastMarker + : Objects.nonNull(firstChunk4NextSequentialValueChunks) + ? toValueChunkMarker(firstChunk4NextSequentialValueChunks.getHeader()) + : tsFileSequenceReader.readMarker()) != MetaMarker.SEPARATOR) { lastMarker = Byte.MIN_VALUE; switch (marker) { @@ -508,6 +529,15 @@ Chunk chunk = new Chunk( chunkHeader, tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize())); + if (!currentIsMultiPage) { + final long pageEstimatedMemoryUsageInBytes = + SinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(chunk); + if (pageEstimatedMemoryUsageInBytes + > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { + PipeDataNodeResourceManager.memory() + .forceResize(allocatedMemoryBlockForChunk, pageEstimatedMemoryUsageInBytes); + } + } chunkReader = currentIsMultiPage @@ -530,7 +560,8 @@ case MetaMarker.VALUE_CHUNK_HEADER: case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER: { - if (Objects.isNull(firstChunkHeader4NextSequentialValueChunks)) { + Chunk chunk; + if (Objects.isNull(firstChunk4NextSequentialValueChunks)) { final long currentChunkHeaderOffset = tsFileSequenceReader.position() - 1; chunkHeader = tsFileSequenceReader.readChunkHeader(marker); @@ -550,23 +581,48 @@ if (chunkHeader.getDataSize() == 0) { break; } + chunk = + new Chunk( + chunkHeader, tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize())); boolean needReturn = false; final long timeChunkSize = lastIndex >= 0 ? PipeMemoryWeightUtil.calculateChunkRamBytesUsed( timeChunkList.get(lastIndex)) : 0; + final long timeChunkPageMemorySize = + lastIndex >= 0 ? timeChunkPageMemorySizeList.get(lastIndex) : 0; + final long chunkPageMemorySize = + isSinglePageValueChunk(chunkHeader) + ? SinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(chunk) + : 0; if (lastIndex >= 0) { if (valueIndex != lastIndex) { needReturn = recordAlignedChunk(valueChunkList, marker); } else { final long chunkSize = timeChunkSize + valueChunkSize; + final long pageMemorySize = timeChunkPageMemorySize + valueChunkPageMemorySize; if (chunkSize + chunkHeader.getDataSize() - > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { + > allocatedMemoryBlockForChunk.getMemoryUsageInBytes() + || timeChunkPageMemorySize > 0 + && chunkPageMemorySize > 0 + && pageMemorySize + chunkPageMemorySize + > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { if (valueChunkList.size() == 1 - && chunkSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { + && Math.max( + chunkSize, + timeChunkPageMemorySize > 0 && chunkPageMemorySize > 0 + ? pageMemorySize + : 0) + > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { PipeDataNodeResourceManager.memory() - .forceResize(allocatedMemoryBlockForChunk, chunkSize); + .forceResize( + allocatedMemoryBlockForChunk, + Math.max( + chunkSize, + timeChunkPageMemorySize > 0 && chunkPageMemorySize > 0 + ? pageMemorySize + : 0)); } needReturn = recordAlignedChunk(valueChunkList, marker); } @@ -574,19 +630,20 @@ } lastIndex = valueIndex; if (needReturn) { - firstChunkHeader4NextSequentialValueChunks = chunkHeader; + firstChunk4NextSequentialValueChunks = chunk; return; } } else { - chunkHeader = firstChunkHeader4NextSequentialValueChunks; - firstChunkHeader4NextSequentialValueChunks = null; + chunk = firstChunk4NextSequentialValueChunks; + chunkHeader = chunk.getHeader(); + firstChunk4NextSequentialValueChunks = null; } - Chunk chunk = - new Chunk( - chunkHeader, tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize())); - valueChunkSize += chunkHeader.getDataSize(); + if (isSinglePageValueChunk(chunkHeader)) { + valueChunkPageMemorySize += + SinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(chunk); + } valueChunkList.add(chunk); currentMeasurements.add( new MeasurementSchema( @@ -611,6 +668,7 @@ lastIndex = -1; timeChunkList.clear(); isMultiPageList.clear(); + timeChunkPageMemorySizeList.clear(); measurementIndexMap.clear(); final IDeviceID deviceID = tsFileSequenceReader.readChunkGroupHeader().getDeviceID(); currentDevice = treePattern.mayOverlapWithDevice(deviceID) ? deviceID : null; @@ -648,9 +706,15 @@ if (!isAlignedValueChunk) { if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK) == TsFileConstant.TIME_COLUMN_MASK) { - timeChunkList.add( - new Chunk(chunkHeader, tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize()))); - isMultiPageList.add(marker == MetaMarker.TIME_CHUNK_HEADER); + final Chunk timeChunk = + new Chunk(chunkHeader, tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize())); + timeChunkList.add(timeChunk); + final boolean isMultiPage = marker == MetaMarker.TIME_CHUNK_HEADER; + isMultiPageList.add(isMultiPage); + timeChunkPageMemorySizeList.add( + isMultiPage + ? 0 + : SinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(timeChunk)); return true; } } @@ -720,6 +784,16 @@ return false; } + private boolean isSinglePageValueChunk(final ChunkHeader chunkHeader) { + return (chunkHeader.getChunkType() & 0x3F) == MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER; + } + + private byte toValueChunkMarker(final ChunkHeader chunkHeader) { + return isSinglePageValueChunk(chunkHeader) + ? MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER + : MetaMarker.VALUE_CHUNK_HEADER; + } + @Override public void close() { super.close();
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java index a2e7c55..5d3cf6a 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
@@ -28,17 +28,26 @@ import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.query.TsFileInsertionEventQueryParser; +import org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.AlignedSinglePageWholeChunkReader; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser; +import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil; import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.pipe.api.access.Row; import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.MetaMarker; +import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata; +import org.apache.tsfile.file.metadata.ChunkMetadata; +import org.apache.tsfile.file.metadata.IChunkMetadata; +import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.enums.CompressionType; import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.read.common.Chunk; import org.apache.tsfile.read.common.Path; import org.apache.tsfile.read.common.TimeRange; import org.apache.tsfile.utils.Binary; @@ -120,6 +129,84 @@ System.out.println(System.currentTimeMillis() - startTime); } + @Test + public void testScanParserSplitAlignedSinglePageChunkByEstimatedPageMemory() throws Exception { + final long originalPipeMaxReaderChunkSize = + CommonDescriptor.getInstance().getConfig().getPipeMaxReaderChunkSize(); + final int originalPageSizeInByte = + TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(); + final int originalMaxNumberOfPointsInPage = + TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage(); + + try { + TSFileDescriptor.getInstance().getConfig().setPageSizeInByte(64 * 1024); + TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(10000); + + final int measurementCount = 16; + final int rowCount = 64; + final List<IMeasurementSchema> schemaList = new ArrayList<>(); + for (int i = 0; i < measurementCount; ++i) { + schemaList.add( + new MeasurementSchema( + "s" + i, TSDataType.STRING, TSEncoding.PLAIN, CompressionType.LZ4)); + } + + alignedTsFile = new File("aligned-single-page-high-compression.tsfile"); + final Tablet tablet = new Tablet("root.sg.d", schemaList, rowCount); + final Binary value = + new Binary(new String(new char[512]).replace('\0', 'a'), TSFileConfig.STRING_CHARSET); + for (int row = 0; row < rowCount; ++row) { + tablet.addTimestamp(row, row); + for (int measurementIndex = 0; measurementIndex < measurementCount; ++measurementIndex) { + tablet.addValue("s" + measurementIndex, row, value); + } + } + + try (final TsFileWriter writer = new TsFileWriter(alignedTsFile)) { + writer.registerAlignedTimeseries(new PartialPath("root.sg.d"), schemaList); + writer.writeAligned(tablet); + } + + CommonDescriptor.getInstance() + .getConfig() + .setPipeMaxReaderChunkSize( + calculatePipeMaxReaderChunkSizeForSinglePageAlignedChunk(alignedTsFile)); + + int tabletCount = 0; + int maxMeasurementCount = 0; + int pointCount = 0; + try (final TsFileInsertionEventScanParser parser = + new TsFileInsertionEventScanParser( + alignedTsFile, + new PrefixTreePattern("root"), + Long.MIN_VALUE, + Long.MAX_VALUE, + null, + null, + false)) { + for (final Pair<Tablet, Boolean> tabletWithIsAligned : parser.toTabletWithIsAligneds()) { + Assert.assertTrue(tabletWithIsAligned.getRight()); + final Tablet parsedTablet = tabletWithIsAligned.getLeft(); + tabletCount++; + maxMeasurementCount = Math.max(maxMeasurementCount, parsedTablet.getSchemas().size()); + pointCount += getNonNullSize(parsedTablet); + } + } + + Assert.assertTrue(tabletCount > 1); + Assert.assertTrue(maxMeasurementCount < measurementCount); + Assert.assertEquals(measurementCount * rowCount, pointCount); + } finally { + CommonDescriptor.getInstance() + .getConfig() + .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize); + TSFileDescriptor.getInstance().getConfig().setPageSizeInByte(originalPageSizeInByte); + TSFileDescriptor.getInstance() + .getConfig() + .setMaxNumberOfPointsInPage(originalMaxNumberOfPointsInPage); + } + } + public void testToTabletInsertionEvents(final boolean isQuery) throws Exception { // Test empty chunk testMixedTsFileWithEmptyChunk(isQuery); @@ -666,4 +753,38 @@ } return count; } + + private long calculatePipeMaxReaderChunkSizeForSinglePageAlignedChunk(final File tsFile) + throws Exception { + try (final TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) { + final IDeviceID deviceID = reader.getDeviceMeasurementsMap().keySet().iterator().next(); + final List<AbstractAlignedChunkMetadata> alignedChunkMetadataList = + reader.getAlignedChunkMetadata(deviceID, true); + Assert.assertEquals(1, alignedChunkMetadataList.size()); + + final AbstractAlignedChunkMetadata alignedChunkMetadata = alignedChunkMetadataList.get(0); + final Chunk timeChunk = + reader.readMemChunk((ChunkMetadata) alignedChunkMetadata.getTimeChunkMetadata()); + Assert.assertEquals( + MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER, timeChunk.getHeader().getChunkType() & 0x3F); + + final List<Chunk> valueChunkList = new ArrayList<>(); + long chunkSizeLimit = PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunk); + for (final IChunkMetadata valueChunkMetadata : + alignedChunkMetadata.getValueChunkMetadataList()) { + final Chunk valueChunk = reader.readMemChunk((ChunkMetadata) valueChunkMetadata); + Assert.assertEquals( + MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER, + valueChunk.getHeader().getChunkType() & 0x3F); + valueChunkList.add(valueChunk); + chunkSizeLimit += valueChunk.getHeader().getDataSize(); + } + + final long estimatedPageMemorySize = + AlignedSinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes( + timeChunk, valueChunkList); + Assert.assertTrue(estimatedPageMemorySize > chunkSizeLimit); + return chunkSizeLimit; + } + } }