| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.db.queryengine.execution.load; |
| |
| import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; |
| import org.apache.iotdb.commons.utils.TimePartitionUtils; |
| import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion; |
| import org.apache.iotdb.db.storageengine.dataregion.modification.Modification; |
| import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; |
| |
| import org.apache.tsfile.common.conf.TSFileConfig; |
| import org.apache.tsfile.common.conf.TSFileDescriptor; |
| import org.apache.tsfile.common.constant.TsFileConstant; |
| import org.apache.tsfile.encoding.decoder.Decoder; |
| import org.apache.tsfile.enums.TSDataType; |
| import org.apache.tsfile.exception.TsFileRuntimeException; |
| import org.apache.tsfile.file.MetaMarker; |
| import org.apache.tsfile.file.header.ChunkGroupHeader; |
| import org.apache.tsfile.file.header.ChunkHeader; |
| import org.apache.tsfile.file.header.PageHeader; |
| import org.apache.tsfile.file.metadata.IChunkMetadata; |
| import org.apache.tsfile.file.metadata.IDeviceID; |
| import org.apache.tsfile.file.metadata.PlainDeviceID; |
| import org.apache.tsfile.file.metadata.TimeseriesMetadata; |
| import org.apache.tsfile.file.metadata.enums.TSEncoding; |
| import org.apache.tsfile.read.TsFileSequenceReader; |
| import org.apache.tsfile.read.common.BatchData; |
| import org.apache.tsfile.read.reader.page.PageReader; |
| import org.apache.tsfile.read.reader.page.TimePageReader; |
| import org.apache.tsfile.read.reader.page.ValuePageReader; |
| import org.apache.tsfile.utils.Pair; |
| import org.apache.tsfile.utils.TsPrimitiveType; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.function.Function; |
| |
| public class TsFileSplitter { |
| private static final Logger logger = LoggerFactory.getLogger(TsFileSplitter.class); |
| |
| private final File tsFile; |
| private final Function<TsFileData, Boolean> consumer; |
| |
| public TsFileSplitter(File tsFile, Function<TsFileData, Boolean> consumer) { |
| this.tsFile = tsFile; |
| this.consumer = consumer; |
| } |
| |
| @SuppressWarnings({"squid:S3776", "squid:S6541"}) |
| public void splitTsFileByDataPartition() throws IOException, IllegalStateException { |
| try (TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) { |
| TreeMap<Long, List<Deletion>> offset2Deletions = new TreeMap<>(); |
| getAllModification(offset2Deletions); |
| |
| if (!checkMagic(reader)) { |
| throw new TsFileRuntimeException( |
| String.format("Magic String check error when parsing TsFile %s.", tsFile.getPath())); |
| } |
| |
| reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1); |
| IDeviceID curDevice = null; |
| boolean isTimeChunkNeedDecode = true; |
| Map<Integer, List<AlignedChunkData>> pageIndex2ChunkData = new HashMap<>(); |
| Map<Integer, long[]> pageIndex2Times = null; |
| Map<Long, IChunkMetadata> offset2ChunkMetadata = new HashMap<>(); |
| getChunkMetadata(reader, offset2ChunkMetadata); |
| byte marker; |
| while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) { |
| switch (marker) { |
| case MetaMarker.CHUNK_HEADER: |
| case MetaMarker.TIME_CHUNK_HEADER: |
| case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER: |
| case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER: |
| long chunkOffset = reader.position(); |
| consumeAllAlignedChunkData(chunkOffset, pageIndex2ChunkData); |
| handleModification(offset2Deletions, chunkOffset); |
| |
| ChunkHeader header = reader.readChunkHeader(marker); |
| String measurementId = header.getMeasurementID(); |
| if (header.getDataSize() == 0) { |
| throw new TsFileRuntimeException( |
| String.format( |
| "Empty Nonaligned Chunk or Time Chunk with offset %d in TsFile %s.", |
| chunkOffset, tsFile.getPath())); |
| } |
| |
| boolean isAligned = |
| ((header.getChunkType() & TsFileConstant.TIME_COLUMN_MASK) |
| == TsFileConstant.TIME_COLUMN_MASK); |
| IChunkMetadata chunkMetadata = offset2ChunkMetadata.get(chunkOffset - Byte.BYTES); |
| // When loading TsFile with Chunk in data zone but no matched ChunkMetadata |
| // at the end of file, this Chunk needs to be skipped. |
| if (chunkMetadata == null) { |
| reader.readChunk(-1, header.getDataSize()); |
| break; |
| } |
| TTimePartitionSlot timePartitionSlot = |
| TimePartitionUtils.getTimePartitionSlot(chunkMetadata.getStartTime()); |
| ChunkData chunkData = |
| ChunkData.createChunkData( |
| isAligned, ((PlainDeviceID) curDevice).toStringID(), header, timePartitionSlot); |
| |
| if (!needDecodeChunk(chunkMetadata)) { |
| chunkData.setNotDecode(); |
| chunkData.writeEntireChunk(reader.readChunk(-1, header.getDataSize()), chunkMetadata); |
| if (isAligned) { |
| isTimeChunkNeedDecode = false; |
| pageIndex2ChunkData |
| .computeIfAbsent(1, o -> new ArrayList<>()) |
| .add((AlignedChunkData) chunkData); |
| } else { |
| consumeChunkData(measurementId, chunkOffset, chunkData); |
| } |
| break; |
| } |
| |
| Decoder defaultTimeDecoder = |
| Decoder.getDecoderByType( |
| TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()), |
| TSDataType.INT64); |
| Decoder valueDecoder = |
| Decoder.getDecoderByType(header.getEncodingType(), header.getDataType()); |
| int dataSize = header.getDataSize(); |
| int pageIndex = 0; |
| if (isAligned) { |
| isTimeChunkNeedDecode = true; |
| pageIndex2Times = new HashMap<>(); |
| } |
| |
| while (dataSize > 0) { |
| PageHeader pageHeader = |
| reader.readPageHeader( |
| header.getDataType(), |
| (header.getChunkType() & 0x3F) == MetaMarker.CHUNK_HEADER); |
| long pageDataSize = pageHeader.getSerializedPageSize(); |
| if (!needDecodePage(pageHeader, chunkMetadata)) { // an entire page |
| long startTime = |
| pageHeader.getStatistics() == null |
| ? chunkMetadata.getStartTime() |
| : pageHeader.getStartTime(); |
| TTimePartitionSlot pageTimePartitionSlot = |
| TimePartitionUtils.getTimePartitionSlot(startTime); |
| if (!timePartitionSlot.equals(pageTimePartitionSlot)) { |
| if (!isAligned) { |
| consumeChunkData(measurementId, chunkOffset, chunkData); |
| } |
| timePartitionSlot = pageTimePartitionSlot; |
| chunkData = |
| ChunkData.createChunkData( |
| isAligned, |
| ((PlainDeviceID) curDevice).toStringID(), |
| header, |
| timePartitionSlot); |
| } |
| if (isAligned) { |
| pageIndex2ChunkData |
| .computeIfAbsent(pageIndex, o -> new ArrayList<>()) |
| .add((AlignedChunkData) chunkData); |
| } |
| chunkData.writeEntirePage(pageHeader, reader.readCompressedPage(pageHeader)); |
| } else { // split page |
| ByteBuffer pageData = reader.readPage(pageHeader, header.getCompressionType()); |
| Pair<long[], Object[]> tvArray = |
| decodePage( |
| isAligned, pageData, pageHeader, defaultTimeDecoder, valueDecoder, header); |
| long[] times = tvArray.left; |
| Object[] values = tvArray.right; |
| if (isAligned) { |
| pageIndex2Times.put(pageIndex, times); |
| } |
| |
| int satisfiedLength = 0; |
| long endTime = |
| timePartitionSlot.getStartTime() |
| + TimePartitionUtils.getTimePartitionInterval(); |
| for (int i = 0; i < times.length; i++) { |
| if (times[i] >= endTime) { |
| chunkData.writeDecodePage(times, values, satisfiedLength); |
| if (isAligned) { |
| pageIndex2ChunkData |
| .computeIfAbsent(pageIndex, o -> new ArrayList<>()) |
| .add((AlignedChunkData) chunkData); |
| } else { |
| consumeChunkData(measurementId, chunkOffset, chunkData); |
| } |
| |
| timePartitionSlot = TimePartitionUtils.getTimePartitionSlot(times[i]); |
| satisfiedLength = 0; |
| endTime = |
| timePartitionSlot.getStartTime() |
| + TimePartitionUtils.getTimePartitionInterval(); |
| chunkData = |
| ChunkData.createChunkData( |
| isAligned, |
| ((PlainDeviceID) curDevice).toStringID(), |
| header, |
| timePartitionSlot); |
| } |
| satisfiedLength += 1; |
| } |
| chunkData.writeDecodePage(times, values, satisfiedLength); |
| if (isAligned) { |
| pageIndex2ChunkData |
| .computeIfAbsent(pageIndex, o -> new ArrayList<>()) |
| .add((AlignedChunkData) chunkData); |
| } |
| } |
| |
| pageIndex += 1; |
| dataSize -= pageDataSize; |
| } |
| |
| if (!isAligned) { |
| consumeChunkData(measurementId, chunkOffset, chunkData); |
| } |
| break; |
| case MetaMarker.VALUE_CHUNK_HEADER: |
| case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER: |
| chunkOffset = reader.position(); |
| chunkMetadata = offset2ChunkMetadata.get(chunkOffset - Byte.BYTES); |
| header = reader.readChunkHeader(marker); |
| // When loading TsFile with Chunk in data zone but no matched ChunkMetadata |
| // at the end of file, this Chunk needs to be skipped. |
| if (chunkMetadata == null) { |
| reader.readChunk(-1, header.getDataSize()); |
| break; |
| } |
| if (header.getDataSize() == 0) { |
| handleEmptyValueChunk( |
| header, pageIndex2ChunkData, chunkMetadata, isTimeChunkNeedDecode); |
| break; |
| } |
| |
| if (!isTimeChunkNeedDecode) { |
| AlignedChunkData alignedChunkData = pageIndex2ChunkData.get(1).get(0); |
| alignedChunkData.addValueChunk(header); |
| alignedChunkData.writeEntireChunk( |
| reader.readChunk(-1, header.getDataSize()), chunkMetadata); |
| break; |
| } |
| |
| Set<ChunkData> allChunkData = new HashSet<>(); |
| dataSize = header.getDataSize(); |
| pageIndex = 0; |
| valueDecoder = Decoder.getDecoderByType(header.getEncodingType(), header.getDataType()); |
| |
| while (dataSize > 0) { |
| PageHeader pageHeader = |
| reader.readPageHeader( |
| header.getDataType(), |
| (header.getChunkType() & 0x3F) == MetaMarker.CHUNK_HEADER); |
| List<AlignedChunkData> alignedChunkDataList = pageIndex2ChunkData.get(pageIndex); |
| for (AlignedChunkData alignedChunkData : alignedChunkDataList) { |
| if (!allChunkData.contains(alignedChunkData)) { |
| alignedChunkData.addValueChunk(header); |
| allChunkData.add(alignedChunkData); |
| } |
| } |
| if (alignedChunkDataList.size() == 1) { // write entire page |
| // write the entire page if it's not an empty page. |
| alignedChunkDataList |
| .get(0) |
| .writeEntirePage(pageHeader, reader.readCompressedPage(pageHeader)); |
| } else { // decode page |
| long[] times = pageIndex2Times.get(pageIndex); |
| TsPrimitiveType[] values = |
| decodeValuePage(reader, header, pageHeader, times, valueDecoder); |
| for (AlignedChunkData alignedChunkData : alignedChunkDataList) { |
| alignedChunkData.writeDecodeValuePage(times, values, header.getDataType()); |
| } |
| } |
| long pageDataSize = pageHeader.getSerializedPageSize(); |
| pageIndex += 1; |
| dataSize -= pageDataSize; |
| } |
| break; |
| case MetaMarker.CHUNK_GROUP_HEADER: |
| ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader(); |
| curDevice = chunkGroupHeader.getDeviceID(); |
| break; |
| case MetaMarker.OPERATION_INDEX_RANGE: |
| reader.readPlanIndex(); |
| break; |
| default: |
| MetaMarker.handleUnexpectedMarker(marker); |
| } |
| } |
| |
| consumeAllAlignedChunkData(reader.position(), pageIndex2ChunkData); |
| handleModification(offset2Deletions, Long.MAX_VALUE); |
| } |
| } |
| |
| private void getAllModification(Map<Long, List<Deletion>> offset2Deletions) throws IOException { |
| try (ModificationFile modificationFile = |
| new ModificationFile(tsFile.getAbsolutePath() + ModificationFile.FILE_SUFFIX)) { |
| for (Modification modification : modificationFile.getModifications()) { |
| offset2Deletions |
| .computeIfAbsent(modification.getFileOffset(), o -> new ArrayList<>()) |
| .add((Deletion) modification); |
| } |
| } |
| } |
| |
| private boolean checkMagic(TsFileSequenceReader reader) throws IOException { |
| String magic = reader.readHeadMagic(); |
| if (!magic.equals(TSFileConfig.MAGIC_STRING)) { |
| logger.error("the file's MAGIC STRING is incorrect, file path: {}", reader.getFileName()); |
| return false; |
| } |
| |
| byte versionNumber = reader.readVersionNumber(); |
| if (versionNumber != TSFileConfig.VERSION_NUMBER) { |
| logger.error("the file's Version Number is incorrect, file path: {}", reader.getFileName()); |
| return false; |
| } |
| |
| if (!reader.readTailMagic().equals(TSFileConfig.MAGIC_STRING)) { |
| logger.error("the file is not closed correctly, file path: {}", reader.getFileName()); |
| return false; |
| } |
| return true; |
| } |
| |
| private void getChunkMetadata( |
| TsFileSequenceReader reader, Map<Long, IChunkMetadata> offset2ChunkMetadata) |
| throws IOException { |
| Map<IDeviceID, List<TimeseriesMetadata>> device2Metadata = |
| reader.getAllTimeseriesMetadata(true); |
| for (Map.Entry<IDeviceID, List<TimeseriesMetadata>> entry : device2Metadata.entrySet()) { |
| for (TimeseriesMetadata timeseriesMetadata : entry.getValue()) { |
| for (IChunkMetadata chunkMetadata : timeseriesMetadata.getChunkMetadataList()) { |
| offset2ChunkMetadata.put(chunkMetadata.getOffsetOfChunkHeader(), chunkMetadata); |
| } |
| } |
| } |
| } |
| |
| private void handleModification( |
| TreeMap<Long, List<Deletion>> offset2Deletions, long chunkOffset) { |
| while (!offset2Deletions.isEmpty() && offset2Deletions.firstEntry().getKey() <= chunkOffset) { |
| offset2Deletions |
| .pollFirstEntry() |
| .getValue() |
| .forEach(o -> consumer.apply(new DeletionData(o))); |
| } |
| } |
| |
| private void consumeAllAlignedChunkData( |
| long offset, Map<Integer, List<AlignedChunkData>> pageIndex2ChunkData) { |
| if (pageIndex2ChunkData.isEmpty()) { |
| return; |
| } |
| |
| Set<ChunkData> allChunkData = new HashSet<>(); |
| for (Map.Entry<Integer, List<AlignedChunkData>> entry : pageIndex2ChunkData.entrySet()) { |
| allChunkData.addAll(entry.getValue()); |
| } |
| for (ChunkData chunkData : allChunkData) { |
| if (Boolean.FALSE.equals(consumer.apply(chunkData))) { |
| throw new IllegalStateException( |
| String.format( |
| "Consume aligned chunk data error, next chunk offset: %d, chunkData: %s", |
| offset, chunkData)); |
| } |
| } |
| pageIndex2ChunkData.clear(); |
| } |
| |
| private void consumeChunkData(String measurement, long offset, ChunkData chunkData) { |
| if (Boolean.FALSE.equals(consumer.apply(chunkData))) { |
| throw new IllegalStateException( |
| String.format( |
| "Consume chunkData error, chunk offset: %d, measurement: %s, chunkData: %s", |
| offset, measurement, chunkData)); |
| } |
| } |
| |
| private boolean needDecodeChunk(IChunkMetadata chunkMetadata) { |
| return !TimePartitionUtils.getTimePartitionSlot(chunkMetadata.getStartTime()) |
| .equals(TimePartitionUtils.getTimePartitionSlot(chunkMetadata.getEndTime())); |
| } |
| |
| private boolean needDecodePage(PageHeader pageHeader, IChunkMetadata chunkMetadata) { |
| if (pageHeader.getStatistics() == null) { |
| return !TimePartitionUtils.getTimePartitionSlot(chunkMetadata.getStartTime()) |
| .equals(TimePartitionUtils.getTimePartitionSlot(chunkMetadata.getEndTime())); |
| } |
| return !TimePartitionUtils.getTimePartitionSlot(pageHeader.getStartTime()) |
| .equals(TimePartitionUtils.getTimePartitionSlot(pageHeader.getEndTime())); |
| } |
| |
| private Pair<long[], Object[]> decodePage( |
| boolean isAligned, |
| ByteBuffer pageData, |
| PageHeader pageHeader, |
| Decoder timeDecoder, |
| Decoder valueDecoder, |
| ChunkHeader chunkHeader) |
| throws IOException { |
| if (isAligned) { |
| TimePageReader timePageReader = new TimePageReader(pageHeader, pageData, timeDecoder); |
| long[] times = timePageReader.getNextTimeBatch(); |
| return new Pair<>(times, new Object[times.length]); |
| } |
| |
| valueDecoder.reset(); |
| PageReader pageReader = |
| new PageReader(pageData, chunkHeader.getDataType(), valueDecoder, timeDecoder); |
| BatchData batchData = pageReader.getAllSatisfiedPageData(); |
| long[] times = new long[batchData.length()]; |
| Object[] values = new Object[batchData.length()]; |
| int index = 0; |
| while (batchData.hasCurrent()) { |
| times[index] = batchData.currentTime(); |
| values[index++] = batchData.currentValue(); |
| batchData.next(); |
| } |
| return new Pair<>(times, values); |
| } |
| |
| private void handleEmptyValueChunk( |
| ChunkHeader header, |
| Map<Integer, List<AlignedChunkData>> pageIndex2ChunkData, |
| IChunkMetadata chunkMetadata, |
| boolean isTimeChunkNeedDecode) |
| throws IOException { |
| Set<ChunkData> allChunkData = new HashSet<>(); |
| for (Map.Entry<Integer, List<AlignedChunkData>> entry : pageIndex2ChunkData.entrySet()) { |
| for (AlignedChunkData alignedChunkData : entry.getValue()) { |
| if (!allChunkData.contains(alignedChunkData)) { |
| alignedChunkData.addValueChunk(header); |
| if (!isTimeChunkNeedDecode) { |
| alignedChunkData.writeEntireChunk(ByteBuffer.allocate(0), chunkMetadata); |
| } |
| allChunkData.add(alignedChunkData); |
| } |
| } |
| } |
| } |
| |
| /** |
| * handle empty page in aligned chunk, if uncompressedSize and compressedSize are both 0, and the |
| * statistics is null, then the page is empty. |
| * |
| * @param pageHeader page header |
| * @return true if the page is empty |
| */ |
| private boolean isEmptyPage(PageHeader pageHeader) { |
| return pageHeader.getUncompressedSize() == 0 |
| && pageHeader.getCompressedSize() == 0 |
| && pageHeader.getStatistics() == null; |
| } |
| |
| private TsPrimitiveType[] decodeValuePage( |
| TsFileSequenceReader reader, |
| ChunkHeader chunkHeader, |
| PageHeader pageHeader, |
| long[] times, |
| Decoder valueDecoder) |
| throws IOException { |
| if (pageHeader.getSerializedPageSize() == 0) { |
| return new TsPrimitiveType[times.length]; |
| } |
| |
| valueDecoder.reset(); |
| ByteBuffer pageData = reader.readPage(pageHeader, chunkHeader.getCompressionType()); |
| ValuePageReader valuePageReader = |
| new ValuePageReader(pageHeader, pageData, chunkHeader.getDataType(), valueDecoder); |
| return valuePageReader.nextValueBatch( |
| times); // should be origin time, so recording satisfied length is necessary |
| } |
| } |