| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.iotdb.db.sync.datasource; |
| |
| import org.apache.iotdb.commons.exception.IllegalPathException; |
| import org.apache.iotdb.commons.path.MeasurementPath; |
| import org.apache.iotdb.commons.path.PartialPath; |
| import org.apache.iotdb.commons.utils.TestOnly; |
| import org.apache.iotdb.db.engine.modification.Deletion; |
| import org.apache.iotdb.db.engine.modification.Modification; |
| import org.apache.iotdb.db.engine.modification.ModificationFile; |
| import org.apache.iotdb.db.sync.externalpipe.operation.InsertOperation; |
| import org.apache.iotdb.db.sync.externalpipe.operation.Operation; |
| import org.apache.iotdb.tsfile.common.cache.LRUCache; |
| import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; |
| import org.apache.iotdb.tsfile.common.constant.TsFileConstant; |
| import org.apache.iotdb.tsfile.encoding.decoder.Decoder; |
| import org.apache.iotdb.tsfile.file.MetaMarker; |
| import org.apache.iotdb.tsfile.file.header.ChunkHeader; |
| import org.apache.iotdb.tsfile.file.header.PageHeader; |
| import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry; |
| import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode; |
| import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; |
| import org.apache.iotdb.tsfile.read.TimeValuePair; |
| import org.apache.iotdb.tsfile.read.TsFileSequenceReader; |
| import org.apache.iotdb.tsfile.read.common.BatchData; |
| import org.apache.iotdb.tsfile.read.common.Path; |
| import org.apache.iotdb.tsfile.read.reader.page.PageReader; |
| import org.apache.iotdb.tsfile.read.reader.page.TimePageReader; |
| import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader; |
| import org.apache.iotdb.tsfile.utils.Pair; |
| import org.apache.iotdb.tsfile.utils.TsPrimitiveType; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.nio.BufferOverflowException; |
| import java.nio.ByteBuffer; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.TreeMap; |
| |
| import static java.lang.Math.max; |
| import static java.lang.Math.min; |
| import static org.apache.iotdb.db.sync.datasource.DeletionGroup.DeletedType.FULL_DELETED; |
| import static org.apache.iotdb.db.sync.datasource.DeletionGroup.DeletedType.NO_DELETED; |
| import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.VECTOR; |
| |
| /** This class will parse 1 TsFile's content to 1 operation block. */ |
| public class TsFileOpBlock extends AbstractOpBlock { |
| private static final Logger logger = LoggerFactory.getLogger(TsFileOpBlock.class); |
| |
| // tsFile name |
| private String tsFileName; |
| private String modsFileName; |
| private TsFileFullReader tsFileFullSeqReader; |
| |
| // full Timeseries Metadata TreeMap : FileOffset => Pair(MeasurementFullPath, TimeseriesMetadata) |
| private Map<Long, Pair<Path, TimeseriesMetadata>> fullTsMetadataMap; |
| // TreeMap: IndexInFile => ChunkInfo (measurementFullPath, ChunkOffset, PointCount, deletedFlag) |
| private TreeMap<Long, ChunkInfo> indexToChunkInfoMap; |
| |
| // Save all modifications that are from .mods file. |
| // (modificationList == null) means no .mods file or .mods file is empty. |
| Collection<Modification> modificationList; |
| // HashMap: measurement FullPath => DeletionGroup(save deletion info) |
| private Map<String, DeletionGroup> fullPathToDeletionMap; |
| |
| // LRUMap: startIndexOfPage => PageData |
| private LRUCache<Long, List<TimeValuePair>> pageCache; |
| |
| private boolean dataReady = false; |
| |
| Decoder timeDecoder = |
| Decoder.getDecoderByType( |
| TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()), |
| TSDataType.INT64); |
| |
| private class ChunkInfo { |
| public String measurementFullPath; |
| public long chunkOffsetInFile = -1; |
| public long startIndexInFile = -1; |
| public long pointCount = 0; |
| public boolean isTimeAligned = false; |
| public long timeChunkOffsetInFile = -1; |
| public DeletionGroup.DeletedType deletedFlag = NO_DELETED; |
| } |
| |
| public TsFileOpBlock(String sg, String tsFileName, long pipeDataSerialNumber) throws IOException { |
| this(sg, tsFileName, pipeDataSerialNumber, 0); |
| } |
| |
| public TsFileOpBlock(String sg, String tsFileName, long pipeDataSerialNumber, long beginIndex) |
| throws IOException { |
| this(sg, tsFileName, null, pipeDataSerialNumber, beginIndex); |
| } |
| |
| public TsFileOpBlock(String sg, String tsFileName, String modsFileName, long pipeDataSerialNumber) |
| throws IOException { |
| this(sg, tsFileName, modsFileName, pipeDataSerialNumber, 0); |
| } |
| |
| public TsFileOpBlock( |
| String sg, String tsFileName, String modsFileName, long pipeDataSerialNumber, long beginIndex) |
| throws IOException { |
| super(sg, pipeDataSerialNumber, beginIndex); |
| this.tsFileName = tsFileName; |
| |
| this.modsFileName = null; |
| if (modsFileName != null) { |
| if (new File(modsFileName).exists()) { |
| this.modsFileName = modsFileName; |
| } |
| } |
| |
| pageCache = |
| new LRUCache<Long, List<TimeValuePair>>(5) { |
| @Override |
| public List<TimeValuePair> loadObjectByKey(Long key) throws IOException { |
| return null; |
| } |
| }; |
| |
| calculateDataCount(); |
| } |
| |
| /** |
| * Calculate TsfileOpBlock's dataCount |
| * |
| * @throws IOException |
| */ |
| private void calculateDataCount() throws IOException { |
| // == calculate dataCount according to tsfile |
| tsFileFullSeqReader = new TsFileFullReader(this.tsFileName); |
| |
| dataCount = 0; |
| for (String device : tsFileFullSeqReader.getAllDevices()) { |
| // == Here use not readChunkMetadataInDevice(device) but readDeviceMetadata(Device). |
| // == Because readDeviceMetadata() will not parse chunkMeta, so has better performance. |
| // == deviceMetaData is HashMap: Measurement => TimeseriesMetadata |
| Map<String, TimeseriesMetadata> deviceMetaData = |
| tsFileFullSeqReader.readDeviceMetadata(device); |
| |
| long countInDevice = 0; |
| for (TimeseriesMetadata tsMetaData : deviceMetaData.values()) { |
| if (tsMetaData.getTSDataType() == VECTOR) { |
| countInDevice = tsMetaData.getStatistics().getCount() * (deviceMetaData.size() - 1); |
| break; |
| } |
| countInDevice += tsMetaData.getStatistics().getCount(); |
| } |
| dataCount += countInDevice; |
| } |
| |
| // close reader to avoid fd leak |
| tsFileFullSeqReader.close(); |
| tsFileFullSeqReader = null; |
| |
| // == Here, release fullTsMetadataMap for saving memory. |
| fullTsMetadataMap = null; |
| } |
| |
| /** |
| * return the Count of data points in this TsFile |
| * |
| * @return |
| */ |
| @Override |
| public long getDataCount() { |
| return dataCount; |
| } |
| |
| /** |
| * Check the deletion-state of the data-points of specific time range according to the info of |
| * .mods. |
| * |
| * @param measurementPath - measurementPath full path without wildcard |
| * @param startTime - the start time of data set, inclusive |
| * @param endTime - the end time of data set, inclusive |
| * @return |
| */ |
| private DeletionGroup.DeletedType checkDeletedState( |
| String measurementPath, long startTime, long endTime) { |
| DeletionGroup deletionGroup = getFullPathDeletion(measurementPath); |
| |
| if (deletionGroup == null) { |
| return NO_DELETED; |
| } |
| |
| return deletionGroup.checkDeletedState(startTime, endTime); |
| } |
| |
| /** |
| * Generate indexToChunkInfoMap for whole TsFile |
| * |
| * @throws IOException |
| */ |
| private void buildIndexToChunkMap() throws IOException { |
| if (tsFileFullSeqReader == null) { |
| tsFileFullSeqReader = new TsFileFullReader(tsFileName); |
| } |
| |
| if (fullTsMetadataMap == null) { |
| fullTsMetadataMap = tsFileFullSeqReader.getAllTimeseriesMeta(true); |
| } |
| |
| // chunkOffset => ChunkInfo (measurementFullPath, ChunkOffset, PointCount, deletedFlag etc.) |
| Map<Long, ChunkInfo> offsetToChunkInfoMap = new TreeMap<>(); |
| Map<Long, ChunkInfo> tmpOffsetToChunkInfoMap = new TreeMap<>(); |
| |
| for (String device : tsFileFullSeqReader.getAllDevices()) { |
| // == Here use not readDeviceMetadata(Device) but readChunkMetadataInDevice(device). |
| // == Because readChunkMetadataInDevice() will parse chunkMeta. |
| // == Get LinkedHashMap: measurementId -> ChunkMetadata list |
| Map<String, List<ChunkMetadata>> chunkMetadataMap = |
| tsFileFullSeqReader.readChunkMetadataInDevice(device); |
| |
| tmpOffsetToChunkInfoMap.clear(); |
| boolean isTimeAlignedDevice = false; |
| for (List<ChunkMetadata> chunkMetadataList : chunkMetadataMap.values()) { |
| for (ChunkMetadata chunkMetadata : chunkMetadataList) { |
| ChunkInfo chunkInfo = new ChunkInfo(); |
| |
| chunkInfo.measurementFullPath = |
| new Path(device, chunkMetadata.getMeasurementUid()).getFullPath(); |
| chunkInfo.chunkOffsetInFile = chunkMetadata.getOffsetOfChunkHeader(); |
| chunkInfo.pointCount = chunkMetadata.getStatistics().getCount(); |
| |
| if (chunkMetadata.getDataType() == VECTOR) { |
| isTimeAlignedDevice = true; |
| chunkInfo.isTimeAligned = true; |
| } else { |
| chunkInfo.deletedFlag = |
| checkDeletedState( |
| chunkInfo.measurementFullPath, |
| chunkMetadata.getStatistics().getStartTime(), |
| chunkMetadata.getStatistics().getEndTime()); |
| } |
| |
| tmpOffsetToChunkInfoMap.put(chunkInfo.chunkOffsetInFile, chunkInfo); |
| } |
| } |
| |
| if (isTimeAlignedDevice) { |
| long timeChunkOffset = -1; |
| long timeChunkPointCount = 0; |
| |
| Iterator<Map.Entry<Long, ChunkInfo>> iter = tmpOffsetToChunkInfoMap.entrySet().iterator(); |
| while (iter.hasNext()) { |
| Map.Entry<Long, ChunkInfo> entry = iter.next(); |
| ChunkInfo chunkInfo = entry.getValue(); |
| if (chunkInfo.isTimeAligned) { |
| timeChunkOffset = entry.getKey(); |
| timeChunkPointCount = chunkInfo.pointCount; |
| iter.remove(); |
| continue; |
| } else { |
| chunkInfo.isTimeAligned = true; |
| chunkInfo.timeChunkOffsetInFile = timeChunkOffset; |
| chunkInfo.pointCount = timeChunkPointCount; |
| } |
| } |
| } |
| |
| offsetToChunkInfoMap.putAll(tmpOffsetToChunkInfoMap); |
| } |
| |
| indexToChunkInfoMap = new TreeMap<>(); |
| long indexInFile = 0; |
| for (ChunkInfo chunkInfo : offsetToChunkInfoMap.values()) { |
| chunkInfo.startIndexInFile = indexInFile; |
| indexToChunkInfoMap.put(indexInFile, chunkInfo); |
| indexInFile += chunkInfo.pointCount; |
| } |
| } |
| |
| /** |
| * Generate modificationList using .mods file. Result: (modificationList == null) means no .mods |
| * file or .mods file is empty. |
| * |
| * @throws IOException |
| */ |
| private void buildModificationList() throws IOException { |
| if (modsFileName == null) { |
| logger.debug("buildModificationList(), modsFileName is null."); |
| modificationList = null; |
| return; |
| } |
| |
| try (ModificationFile modificationFile = new ModificationFile(modsFileName)) { |
| modificationList = modificationFile.getModifications(); |
| } |
| |
| if (modificationList.isEmpty()) { |
| modificationList = null; |
| } |
| } |
| |
| /** |
| * Generate fullPathToDeletionMap for fullPath |
| * |
| * @param fullPath measurement full path without wildcard |
| * @return |
| */ |
| private DeletionGroup getFullPathDeletion(String fullPath) { |
| // (fullPathToDeletionMap == null) means modificationList is null or empty |
| if (fullPathToDeletionMap == null) { |
| return null; |
| } |
| |
| // Try to get data from cache firstly |
| if (fullPathToDeletionMap.containsKey(fullPath)) { |
| return fullPathToDeletionMap.get(fullPath); |
| } |
| |
| // == insert all deletion intervals info to deletionGroup. |
| DeletionGroup deletionGroup = new DeletionGroup(); |
| PartialPath partialPath = null; |
| try { |
| partialPath = new PartialPath(fullPath); |
| } catch (IllegalPathException e) { |
| logger.error("getFullPathDeletion(), find invalid fullPath: {}", fullPath); |
| } |
| |
| if (partialPath != null) { |
| // == Here, has been sure (modificationList != null) && (!modificationList.isEmpty()) |
| for (Modification modification : modificationList) { |
| if ((modification instanceof Deletion) |
| && (modification.getPath().matchFullPath(partialPath))) { |
| Deletion deletion = (Deletion) modification; |
| deletionGroup.addDelInterval(deletion.getStartTime(), deletion.getEndTime()); |
| } |
| } |
| } |
| |
| if (deletionGroup.isEmpty()) { |
| deletionGroup = null; |
| } |
| |
| fullPathToDeletionMap.put(fullPath, deletionGroup); |
| |
| return deletionGroup; |
| } |
| |
| /** |
| * Try to get page data from page cache. * |
| * |
| * @param pageStartIndexInFile page's first data's index in Tsfile. It is also the key of |
| * page-cache. |
| * @param dataIndexInfile requested data's first dota's index in Tsfile. |
| * @param dataLen requested data's length |
| * @param destTVList returned data will be appended to this variable. |
| * @return -1 : pageCache has no this page. >=0 : data number of the found page in page-cache. (No |
| * matter whether this page has requested data) |
| * @throws IOException |
| */ |
| private long getDataFromPageCache( |
| long pageStartIndexInFile, long dataIndexInfile, long dataLen, List<TimeValuePair> destTVList) |
| throws IOException { |
| List<TimeValuePair> pageTVList = pageCache.get(pageStartIndexInFile); |
| if (pageTVList == null) { |
| return -1; |
| } |
| |
| // == PageCache has this page. Try to get data from pageCache |
| long pageDataCount = pageTVList.size(); |
| // == If this page has no requested data |
| if (((pageStartIndexInFile + pageDataCount) < dataIndexInfile) |
| || (pageStartIndexInFile >= (dataIndexInfile + dataLen))) { |
| return pageDataCount; |
| } |
| |
| int beginIdxInPage = (int) (max(dataIndexInfile, pageStartIndexInFile) - pageStartIndexInFile); |
| int needCountInPage = |
| (int) |
| (min(pageStartIndexInFile + pageDataCount, dataIndexInfile + dataLen) |
| - max(dataIndexInfile, pageStartIndexInFile)); |
| destTVList.addAll( |
| ((LinkedList) pageTVList).subList(beginIdxInPage, beginIdxInPage + needCountInPage)); |
| |
| return pageDataCount; |
| } |
| /** |
| * Parse 1 normal (not Time-Aligned) Chunk to get partial data points according to indexInChunk & |
| * lengthInChunk |
| * |
| * <p>Note: new data will be appended to parameter tvPairList for better performance. |
| * |
| * @param chunkInfo |
| * @param startIndexInChunk |
| * @param lengthInChunk |
| * @param deletionGroup - If it is not null, need to check whether data points have benn deleted |
| * @param tvPairList |
| * @return the number of returned data points |
| * @throws IOException |
| */ |
| private long getNonAlignedChunkPoints( |
| ChunkInfo chunkInfo, |
| long startIndexInChunk, |
| long lengthInChunk, |
| DeletionGroup deletionGroup, |
| List<TimeValuePair> tvPairList) |
| throws IOException { |
| |
| if (chunkInfo.chunkOffsetInFile < 0) { |
| String errMsg = |
| String.format( |
| "getNonAlignedChunkPoints(), invalid chunkOffsetInFile=%d.", |
| chunkInfo.chunkOffsetInFile); |
| logger.error(errMsg); |
| throw new IOException(errMsg); |
| } |
| tsFileFullSeqReader.position(chunkInfo.chunkOffsetInFile); |
| byte chunkTypeByte = tsFileFullSeqReader.readMarker(); |
| ChunkHeader chunkHeader = tsFileFullSeqReader.readChunkHeader(chunkTypeByte); |
| |
| Decoder valueDecoder = |
| Decoder.getDecoderByType(chunkHeader.getEncodingType(), chunkHeader.getDataType()); |
| boolean pageHeaderHasStatistic = |
| ((chunkHeader.getChunkType() & 0x3F) == MetaMarker.CHUNK_HEADER); |
| int chunkLeftDataSize = chunkHeader.getDataSize(); |
| |
| long indexInChunk = 0; |
| while ((chunkLeftDataSize > 0) && (indexInChunk < (startIndexInChunk + lengthInChunk))) { |
| // == Begin to traverse every page |
| long pagePosInTsfile = tsFileFullSeqReader.position(); |
| PageHeader pageHeader = |
| tsFileFullSeqReader.readPageHeader(chunkHeader.getDataType(), pageHeaderHasStatistic); |
| |
| int pageSize = pageHeader.getSerializedPageSize(); |
| chunkLeftDataSize -= pageSize; |
| |
| long pageStartIndexInFile = chunkInfo.startIndexInFile + indexInChunk; |
| |
| // == At first, try to find page data from pageCache. |
| long pageDataNum = |
| getDataFromPageCache( |
| pageStartIndexInFile, |
| chunkInfo.startIndexInFile + startIndexInChunk, |
| lengthInChunk, |
| tvPairList); |
| if (pageDataNum >= 0) { // find page in page cache |
| tsFileFullSeqReader.position(pagePosInTsfile + pageSize); |
| indexInChunk += pageDataNum; |
| continue; |
| } |
| |
| // Note: Here, we can not use pageHeaderHasStatistic to do judge. |
| // Because, even if pageHeaderHasStatistic == true, |
| // pageHeader.getStatistics() can still be null when page uncompressedSize == 0 (empty page). |
| if (pageHeader.getStatistics() != null) { |
| // == check whether whole page is out of [startIndexInChunk, startIndexInChunk + |
| // lengthInChunk) |
| long pageDataCount = pageHeader.getNumOfValues(); |
| if ((indexInChunk + pageDataCount) <= startIndexInChunk) { // skip this page |
| tsFileFullSeqReader.position(pagePosInTsfile + pageSize); |
| indexInChunk += pageDataCount; |
| continue; |
| } |
| |
| // == check whether whole page has been deleted by .mods file |
| if (deletionGroup != null) { // have deletion related to current chunk |
| DeletionGroup.DeletedType pageDeletedStatus = |
| deletionGroup.checkDeletedState(pageHeader.getStartTime(), pageHeader.getEndTime()); |
| if (pageDeletedStatus == FULL_DELETED) { |
| // == put page data to page-cache |
| List<TimeValuePair> pageTVList = new LinkedList<>(); |
| for (long i = 0; i < pageDataCount; i++) { |
| pageTVList.add(null); |
| } |
| pageCache.put(pageStartIndexInFile, pageTVList); |
| |
| // == gen requested TV data list |
| long needCount = |
| min(indexInChunk + pageDataCount, startIndexInChunk + lengthInChunk) |
| - max(indexInChunk, startIndexInChunk); |
| for (long i = 0; i < needCount; i++) { |
| tvPairList.add(null); |
| } |
| |
| tsFileFullSeqReader.position(pagePosInTsfile + pageSize); |
| indexInChunk += pageDataCount; |
| continue; |
| } |
| } |
| } |
| |
| // == read page data from Tsfile |
| List<TimeValuePair> pageTVList; |
| pageTVList = getNonAlignedPagePoints(pageHeader, chunkHeader, valueDecoder, deletionGroup); |
| pageCache.put(pageStartIndexInFile, pageTVList); |
| |
| // == check whether whole page is out of [startIndexInChunk, startIndexInChunk + |
| // lengthInChunk) |
| long pageDataCount = pageTVList.size(); |
| if ((indexInChunk + pageDataCount) <= startIndexInChunk) { // skip this page |
| // == Here, not check (indexInChunk >= (startIndexInChunk + lengthInChunk)), because the |
| // preceding while(xxx) has checked it. |
| // tsFileFullSeqReader.position(pagePosInTsfile + pageSize); |
| indexInChunk += pageDataCount; |
| continue; |
| } |
| |
| // == select needed data and append them to tvPairList |
| int beginIdxInPage = (int) (max(indexInChunk, startIndexInChunk) - indexInChunk); |
| int needCountInPage = |
| (int) |
| (min(indexInChunk + pageDataCount, startIndexInChunk + lengthInChunk) |
| - max(indexInChunk, startIndexInChunk)); |
| tvPairList.addAll( |
| ((LinkedList) pageTVList).subList(beginIdxInPage, beginIdxInPage + needCountInPage)); |
| |
| // tsFileFullSeqReader.position(pagePosInTsfile + pageSize); |
| indexInChunk += pageDataCount; |
| } |
| |
| return tvPairList.size(); |
| } |
| |
| /** |
| * Parse 1 normal(not Time-Aligned) page to get all data points. |
| * |
| * <p>Note: * |
| * |
| * <p>1) New data will be appended to parameter tvPairList for better performance. |
| * |
| * <p>2) The deleted data points by .mods will be set to null. |
| * |
| * @param pageHeader |
| * @param chunkHeader |
| * @param valueDecoder |
| * @param deletionGroup |
| * @return |
| * @throws IOException |
| */ |
| private List<TimeValuePair> getNonAlignedPagePoints( |
| PageHeader pageHeader, |
| ChunkHeader chunkHeader, |
| Decoder valueDecoder, |
| DeletionGroup deletionGroup) |
| throws IOException { |
| List<TimeValuePair> pageTVList = new LinkedList<>(); |
| |
| ByteBuffer pageData = |
| tsFileFullSeqReader.readPage(pageHeader, chunkHeader.getCompressionType()); |
| |
| valueDecoder.reset(); |
| PageReader pageReader = |
| new PageReader(pageData, chunkHeader.getDataType(), valueDecoder, timeDecoder, null); |
| BatchData batchData = pageReader.getAllSatisfiedPageData(); |
| if (chunkHeader.getChunkType() == MetaMarker.CHUNK_HEADER) { |
| logger.debug("points in the page(by pageHeader): " + pageHeader.getNumOfValues()); |
| } else { |
| logger.debug("points in the page(by batchData): " + batchData.length()); |
| } |
| |
| if (batchData.isEmpty()) { |
| logger.warn("getNonAlignedChunkPoints(), chunk is empty, chunkHeader = {}.", chunkHeader); |
| return pageTVList; |
| } |
| |
| batchData.resetBatchData(); |
| DeletionGroup.IntervalCursor intervalCursor = new DeletionGroup.IntervalCursor(); |
| while (batchData.hasCurrent()) { |
| long ts = batchData.currentTime(); |
| if ((deletionGroup != null) && (deletionGroup.isDeleted(ts, intervalCursor))) { |
| pageTVList.add(null); |
| } else { |
| TimeValuePair timeValuePair = new TimeValuePair(ts, batchData.currentTsPrimitiveType()); |
| logger.debug("getNonAlignedChunkPoints(), timeValuePair = {} ", timeValuePair); |
| pageTVList.add(timeValuePair); |
| } |
| |
| batchData.next(); |
| } |
| |
| return pageTVList; |
| } |
| |
| /** |
| * Get the data of Time-Aligned Value page. |
| * |
| * @param chunkInfo The chunk-info that value page exists in. |
| * @param pageIndexInChunk needed page's index in value chunk, 0 mean the first page of this |
| * chunk. |
| * @param timeBatch timeStamp info got from Time-Aligned time page |
| * @param deletionGroup If it is not null, need to check whether data points have benn deleted |
| * @return The data list of this value page. |
| * @throws IOException |
| */ |
| private List<TimeValuePair> getAlignedValuePagePoints( |
| ChunkInfo chunkInfo, int pageIndexInChunk, long[] timeBatch, DeletionGroup deletionGroup) |
| throws IOException { |
| |
| if (chunkInfo.chunkOffsetInFile < 0) { |
| String errMsg = |
| String.format( |
| "getAlignedValuePagePoints(), invalid chunkOffsetInFile=%d.", |
| chunkInfo.chunkOffsetInFile); |
| logger.error(errMsg); |
| throw new IOException(errMsg); |
| } |
| |
| tsFileFullSeqReader.position(chunkInfo.chunkOffsetInFile); |
| byte valueChunkTypeByte = tsFileFullSeqReader.readMarker(); |
| ChunkHeader valueChunkHeader = tsFileFullSeqReader.readChunkHeader(valueChunkTypeByte); |
| |
| byte chunkType = valueChunkHeader.getChunkType(); |
| // == If this chunk is not time-aligned value chunk |
| if ((chunkType & (byte) TsFileConstant.VALUE_COLUMN_MASK) |
| != (byte) TsFileConstant.VALUE_COLUMN_MASK) { |
| String errMsg = |
| String.format( |
| "getAlignedValuePagePoints(), tsFile(%s) current chunk is not time-aligned value chunk. chunkOffsetInFile=%d.", |
| tsFileName, chunkInfo.chunkOffsetInFile); |
| logger.error(errMsg); |
| throw new IOException(errMsg); |
| } |
| |
| boolean pageHeaderHasStatistic = ((chunkType & 0x3F) == MetaMarker.CHUNK_HEADER); |
| if ((!pageHeaderHasStatistic) && (pageIndexInChunk != 0)) { |
| String errMsg = |
| String.format( |
| "getAlignedValuePagePoints(), current value chunk has only 1 page, but pageIndexInChunk=%d.", |
| pageIndexInChunk); |
| logger.error(errMsg); |
| throw new IOException(errMsg); |
| } |
| |
| int chunkLeftDataSize = valueChunkHeader.getDataSize(); |
| |
| // == skip the first (pageIndexInChunk) pages. |
| int k = 0; |
| TSDataType tsDataType = valueChunkHeader.getDataType(); |
| while ((chunkLeftDataSize > 0) && (k++ < pageIndexInChunk)) { |
| long pagePosInTsFile = tsFileFullSeqReader.position(); |
| PageHeader pageHeader = |
| tsFileFullSeqReader.readPageHeader(tsDataType, pageHeaderHasStatistic); |
| |
| int pageSize = pageHeader.getSerializedPageSize(); |
| tsFileFullSeqReader.position(pagePosInTsFile + pageSize); |
| chunkLeftDataSize -= pageSize; |
| } |
| |
| if (chunkLeftDataSize <= 0) { |
| String errMsg = |
| String.format( |
| "getAlignedValuePagePoints(), current value chunk has only %d pages, but pageIndexInChunk=%d.", |
| k, pageIndexInChunk); |
| logger.error(errMsg); |
| throw new IOException(errMsg); |
| } |
| |
| PageHeader pageHeader = tsFileFullSeqReader.readPageHeader(tsDataType, pageHeaderHasStatistic); |
| ByteBuffer pageData = |
| tsFileFullSeqReader.readPage(pageHeader, valueChunkHeader.getCompressionType()); |
| Decoder valueDecoder = Decoder.getDecoderByType(valueChunkHeader.getEncodingType(), tsDataType); |
| ValuePageReader valuePageReader = |
| new ValuePageReader(pageHeader, pageData, tsDataType, valueDecoder); |
| TsPrimitiveType[] valueBatch = valuePageReader.nextValueBatch(timeBatch); |
| |
| if (timeBatch.length != valueBatch.length) { |
| String errMsg = |
| String.format( |
| "getAlignedValuePagePoints(), timeBatch & valueBatch have different length. %d != %d.", |
| timeBatch.length, valueBatch.length); |
| logger.error(errMsg); |
| throw new IOException(errMsg); |
| } |
| |
| List<TimeValuePair> pageTVList = new LinkedList<>(); |
| DeletionGroup.IntervalCursor intervalCursor = new DeletionGroup.IntervalCursor(); |
| for (int i = 0; i < timeBatch.length; i++) { |
| long ts = timeBatch[i]; |
| if ((valueBatch[i] == null) |
| || ((deletionGroup != null) && (deletionGroup.isDeleted(ts, intervalCursor)))) { |
| pageTVList.add(null); |
| } else { |
| TimeValuePair timeValuePair = new TimeValuePair(ts, valueBatch[i]); |
| logger.debug("getNonAlignedChunkPoints(), timeValuePair = {} ", timeValuePair); |
| pageTVList.add(timeValuePair); |
| } |
| } |
| |
| return pageTVList; |
| } |
| |
| /** |
| * Parse 1 time-aligned value chunk to get partial data points according to startIndexInChunk & |
| * lengthInChunk. |
| * |
| * <p>Note: |
| * |
| * <p>1) New data will be appended to parameter tvPairList for better performance |
| * |
| * <p>2) Parsing value chunk will automatically involve time chunk's info. |
| * |
| * @param chunkInfo - The info of 1 time-aligned value chunk |
| * @param startIndexInChunk |
| * @param lengthInChunk |
| * @param deletionGroup - If it is not null, need to check whether data points have benn deleted |
| * @param tvPairList |
| * @return |
| * @throws IOException |
| */ |
| private long getAlignedChunkPoints( |
| ChunkInfo chunkInfo, |
| long startIndexInChunk, |
| long lengthInChunk, |
| DeletionGroup deletionGroup, |
| List<TimeValuePair> tvPairList) |
| throws IOException { |
| |
| if (chunkInfo.timeChunkOffsetInFile < 0) { |
| String errMsg = |
| String.format( |
| "getAlignedChunkPoints(), invalid timeChunkOffsetInFile=%d.", |
| chunkInfo.timeChunkOffsetInFile); |
| logger.error(errMsg); |
| throw new IOException(errMsg); |
| } |
| tsFileFullSeqReader.position(chunkInfo.timeChunkOffsetInFile); |
| byte timeChunkTypeByte = tsFileFullSeqReader.readMarker(); |
| ChunkHeader timeChunkHeader = tsFileFullSeqReader.readChunkHeader(timeChunkTypeByte); |
| |
| TSDataType tsDataType = timeChunkHeader.getDataType(); |
| boolean pageHeaderHasStatistic = |
| ((timeChunkHeader.getChunkType() & 0x3F) == MetaMarker.CHUNK_HEADER); |
| int chunkLeftDataSize = timeChunkHeader.getDataSize(); |
| Decoder defaultTimeDecoder = |
| Decoder.getDecoderByType( |
| TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()), |
| TSDataType.INT64); |
| |
| long indexInChunk = 0; |
| int pageIndexInChunk = -1; |
| while ((chunkLeftDataSize > 0) && (indexInChunk < (startIndexInChunk + lengthInChunk))) { |
| // == Begin to traverse every time+value page |
| long timePagePosInTsfile = tsFileFullSeqReader.position(); |
| PageHeader timePageHeader = |
| tsFileFullSeqReader.readPageHeader(tsDataType, pageHeaderHasStatistic); |
| |
| int timePageSize = timePageHeader.getSerializedPageSize(); |
| chunkLeftDataSize -= timePageSize; |
| pageIndexInChunk++; |
| |
| long pageStartIndexInFile = chunkInfo.startIndexInFile + indexInChunk; |
| |
| // == At first, try to find page data from pageCache. |
| long pageDataNum = |
| getDataFromPageCache( |
| pageStartIndexInFile, |
| chunkInfo.startIndexInFile + startIndexInChunk, |
| lengthInChunk, |
| tvPairList); |
| if (pageDataNum >= 0) { // find page in page cache |
| tsFileFullSeqReader.position(timePagePosInTsfile + timePageSize); |
| indexInChunk += pageDataNum; |
| continue; |
| } |
| |
| // Note: Here, we can not use pageHeaderHasStatistic to do judge. |
| // Because, even if pageHeaderHasStatistic == true, |
| // pageHeader.getStatistics() can still be null when page uncompressedSize == 0 (empty page). |
| if (timePageHeader.getStatistics() != null) { |
| // == check whether whole time page is out of [startIndexInChunk, startIndexInChunk + |
| // lengthInChunk) |
| long pageDataCount = timePageHeader.getNumOfValues(); |
| if ((indexInChunk + pageDataCount) <= startIndexInChunk) { // skip this page |
| tsFileFullSeqReader.position(timePagePosInTsfile + timePageSize); |
| indexInChunk += pageDataCount; |
| continue; |
| } |
| |
| // == check whether whole page has been deleted by .mods file |
| if (deletionGroup != null) { // have deletion related to current chunk |
| DeletionGroup.DeletedType pageDeletedStatus = |
| deletionGroup.checkDeletedState( |
| timePageHeader.getStartTime(), timePageHeader.getEndTime()); |
| if (pageDeletedStatus == FULL_DELETED) { |
| // == put page data to page-cache |
| List<TimeValuePair> pageTVList = new LinkedList<>(); |
| for (long i = 0; i < pageDataCount; i++) { |
| pageTVList.add(null); |
| } |
| pageCache.put(pageStartIndexInFile, pageTVList); |
| |
| // == gen requested TV data list |
| long needCount = |
| min(indexInChunk + pageDataCount, startIndexInChunk + lengthInChunk) |
| - max(indexInChunk, startIndexInChunk); |
| for (long i = 0; i < needCount; i++) { |
| tvPairList.add(null); |
| } |
| |
| tsFileFullSeqReader.position(timePagePosInTsfile + timePageSize); |
| indexInChunk += pageDataCount; |
| continue; |
| } |
| } |
| } |
| |
| // == read time page data from Tsfile (for pageIndexInChunk) |
| ByteBuffer timePageData = |
| tsFileFullSeqReader.readPage(timePageHeader, timeChunkHeader.getCompressionType()); |
| TimePageReader timePageReader = |
| new TimePageReader(timePageHeader, timePageData, defaultTimeDecoder); |
| long[] timeBatch = timePageReader.getNextTimeBatch(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Time pager content: {}", timeBatch); |
| } |
| |
| // == check whether whole time page is out of [startIndexInChunk, startIndexInChunk + |
| // lengthInChunk) |
| long timePageDataCount = timeBatch.length; |
| if ((indexInChunk + timePageDataCount) <= startIndexInChunk) { // skip this page |
| // == Here, not check (indexInChunk >= (startIndexInChunk + lengthInChunk)), because the |
| // preceding while(xxx) has checked it. |
| tsFileFullSeqReader.position(timePagePosInTsfile + timePageSize); |
| indexInChunk += timePageDataCount; |
| continue; |
| } |
| |
| // == read value page data from Tsfile (for pageIndexInChunk) |
| List<TimeValuePair> pageTVList = |
| getAlignedValuePagePoints(chunkInfo, pageIndexInChunk, timeBatch, deletionGroup); |
| |
| pageCache.put(pageStartIndexInFile, pageTVList); |
| |
| // == select needed data and append them to tvPairList |
| int beginIdxInPage = (int) (max(indexInChunk, startIndexInChunk) - indexInChunk); |
| int needCountInPage = |
| (int) |
| (min(indexInChunk + timePageDataCount, startIndexInChunk + lengthInChunk) |
| - max(indexInChunk, startIndexInChunk)); |
| |
| tvPairList.addAll( |
| ((LinkedList) pageTVList).subList(beginIdxInPage, beginIdxInPage + needCountInPage)); |
| |
| tsFileFullSeqReader.position(timePagePosInTsfile + timePageSize); |
| indexInChunk += timePageDataCount; |
| } |
| |
| return tvPairList.size(); |
| } |
| |
| /** |
| * Get 1 Chunk's partial data points according to indexInChunk & lengthInChunk. |
| * |
| * <p>Note: new data will be appended to parameter tvPairList for better performance |
| * |
| * @param chunkInfo |
| * @param indexInChunk |
| * @param lengthInChunk |
| * @param tvPairList, the got data points will be appended to this List. |
| * @return the number of returned data points |
| * @throws IOException |
| */ |
| private long getChunkPoints( |
| ChunkInfo chunkInfo, long indexInChunk, long lengthInChunk, List<TimeValuePair> tvPairList) |
| throws IOException { |
| |
| // == If whole chunk has been deleted according to .mods file |
| if (chunkInfo.deletedFlag == FULL_DELETED) { |
| for (long i = 0; i < lengthInChunk; i++) { |
| tvPairList.add(null); |
| } |
| return lengthInChunk; |
| } |
| |
| DeletionGroup deletionGroup = null; |
| if (chunkInfo.deletedFlag != NO_DELETED) { |
| deletionGroup = getFullPathDeletion(chunkInfo.measurementFullPath); |
| } |
| |
| if (chunkInfo.isTimeAligned) { |
| return getAlignedChunkPoints( |
| chunkInfo, indexInChunk, lengthInChunk, deletionGroup, tvPairList); |
| } else { |
| return getNonAlignedChunkPoints( |
| chunkInfo, indexInChunk, lengthInChunk, deletionGroup, tvPairList); |
| } |
| } |
| |
| /** |
| * Add sensorFullPath + tvPairList to dataList |
| * |
| * @param dataList |
| * @param sensorFullPath |
| * @param tvPairList |
| * @throws IOException |
| */ |
| private void insertToDataList( |
| List<Pair<MeasurementPath, List<TimeValuePair>>> dataList, |
| String sensorFullPath, |
| List<TimeValuePair> tvPairList) |
| throws IOException { |
| MeasurementPath measurementPath; |
| try { |
| measurementPath = new MeasurementPath(sensorFullPath); |
| } catch (IllegalPathException e) { |
| logger.error("TsFileOpBlock.insertToDataList(), Illegal MeasurementPath: {}", ""); |
| throw new IOException("Illegal MeasurementPath: " + sensorFullPath, e); |
| } |
| dataList.add(new Pair<>(measurementPath, tvPairList)); |
| } |
| |
| /* Prepare TsfileOpBlock's internal data before formal accessing. */ |
| private synchronized void prepareData() throws IOException { |
| if (dataReady) { |
| return; |
| } |
| |
| if (tsFileFullSeqReader == null) { |
| tsFileFullSeqReader = new TsFileFullReader(tsFileName); |
| } |
| |
| if (modsFileName != null) { |
| buildModificationList(); |
| } |
| |
| if ((fullPathToDeletionMap == null) && (modificationList != null)) { |
| fullPathToDeletionMap = new HashMap<>(); |
| } |
| |
| if (indexToChunkInfoMap == null) { |
| buildIndexToChunkMap(); |
| } |
| |
| dataReady = true; |
| } |
| |
| /** |
| * Get 1 Operation that contain needed data. Note: 1) Expected data range is [index, index+length) |
| * 2) Real returned data length can less than input parameter length |
| * |
| * @param index |
| * @param length |
| * @return |
| * @throws IOException |
| */ |
| @Override |
| public Operation getOperation(long index, long length) throws IOException { |
| if (closed) { |
| logger.error("TsFileOpBlock.getOperation(), can not access closed TsFileOpBlock: {}.", this); |
| throw new IOException("can not access closed TsFileOpBlock: " + this); |
| } |
| |
| long indexInTsFile = index - beginIndex; |
| if (indexInTsFile < 0 || indexInTsFile >= dataCount) { |
| logger.error("TsFileOpBlock.getOperation(), Error: index {} is out of range.", index); |
| // throw new IOException("index is out of range."); |
| return null; |
| } |
| |
| if (!dataReady) { |
| prepareData(); |
| } |
| |
| LinkedList<Pair<MeasurementPath, List<TimeValuePair>>> dataList = new LinkedList<>(); |
| String lastSensorFullPath = ""; |
| List<TimeValuePair> tvPairList = null; |
| |
| // handle all chunks that contain needed data |
| long remain = length; |
| while (remain > 0) { |
| Map.Entry<Long, ChunkInfo> entry = indexToChunkInfoMap.floorEntry(indexInTsFile); |
| if (entry == null) { |
| logger.error( |
| "TsFileOpBlock.getOperation(), indexInTsFile {} if out of indexToChunkOffsetMap.", |
| indexInTsFile); |
| throw new IOException("indexInTsFile is out of range."); |
| } |
| |
| long indexInChunk = indexInTsFile - entry.getKey(); |
| ChunkInfo chunkInfo = entry.getValue(); |
| String sensorFullPath = chunkInfo.measurementFullPath; |
| long chunkPointCount = chunkInfo.pointCount; |
| |
| long lengthInChunk = min(chunkPointCount - indexInChunk, remain); |
| |
| if (!sensorFullPath.equals(lastSensorFullPath)) { |
| if ((tvPairList != null) && (tvPairList.size() > 0)) { |
| insertToDataList(dataList, lastSensorFullPath, tvPairList); |
| tvPairList = null; |
| } |
| lastSensorFullPath = sensorFullPath; |
| } |
| |
| // == get data from 1 chunk |
| if (tvPairList == null) { |
| tvPairList = new LinkedList<>(); |
| } |
| long readCount = getChunkPoints(chunkInfo, indexInChunk, lengthInChunk, tvPairList); |
| if (readCount != lengthInChunk) { |
| String errMsg = |
| String.format( |
| "TsFileOpBlock.getOperation(), error when read chunk from file %s. indexInTsFile=%d, lengthInChunk=%d, readCount=%d.", |
| tsFileName, indexInTsFile, lengthInChunk, readCount); |
| logger.error(errMsg); |
| throw new IOException(errMsg); |
| } |
| |
| remain -= readCount; |
| indexInTsFile = entry.getKey() + chunkPointCount; // next chunk's local index |
| |
| if (indexInTsFile >= dataCount) { // has reached the end of this Tsfile |
| break; |
| } |
| } |
| |
| if ((tvPairList != null) && (tvPairList.size() > 0)) { |
| insertToDataList(dataList, lastSensorFullPath, tvPairList); |
| } |
| |
| return new InsertOperation(storageGroup, index, index + length - remain, dataList); |
| } |
| |
| /** release the current class object's resource */ |
| @Override |
| public void close() { |
| super.close(); |
| if (tsFileFullSeqReader != null) { |
| try { |
| tsFileFullSeqReader.close(); |
| } catch (IOException e) { |
| logger.error("tsFileFullSeqReader.close() exception, file = {}", tsFileName, e); |
| } |
| tsFileFullSeqReader = null; |
| } |
| |
| dataReady = false; |
| } |
| |
| /** This class is used to read & parse Tsfile */ |
| private class TsFileFullReader extends TsFileSequenceReader { |
| |
| /** |
| * @param file, Tsfile's full path name. |
| * @throws IOException |
| */ |
| public TsFileFullReader(String file) throws IOException { |
| super(file); |
| } |
| |
| /** |
| * Generate timeseriesMetadataMap, Offset => (MeasurementID, TimeseriesMetadata) |
| * |
| * @param startOffset |
| * @param metadataIndex |
| * @param buffer |
| * @param deviceId |
| * @param type |
| * @param timeseriesMetadataMap, output param |
| * @param needChunkMetadata, input param, indicates whether need parser ChunkMetaData |
| * @throws IOException |
| */ |
| private void genTSMetadataFromMetaIndexEntry( |
| long startOffset, |
| MetadataIndexEntry metadataIndex, |
| ByteBuffer buffer, |
| String deviceId, |
| MetadataIndexNodeType type, |
| Map<Long, Pair<Path, TimeseriesMetadata>> timeseriesMetadataMap, |
| boolean needChunkMetadata) |
| throws IOException { |
| try { |
| if (type.equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { |
| while (buffer.hasRemaining()) { |
| long pos = startOffset + buffer.position(); |
| TimeseriesMetadata timeseriesMetadata = |
| TimeseriesMetadata.deserializeFrom(buffer, needChunkMetadata); |
| timeseriesMetadataMap.put( |
| pos, |
| new Pair<>( |
| new Path(deviceId, timeseriesMetadata.getMeasurementId()), timeseriesMetadata)); |
| } |
| } else { // deviceId should be determined by LEAF_DEVICE node |
| if (type.equals(MetadataIndexNodeType.LEAF_DEVICE)) { |
| deviceId = metadataIndex.getName(); |
| } |
| MetadataIndexNode metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer); |
| int metadataIndexListSize = metadataIndexNode.getChildren().size(); |
| for (int i = 0; i < metadataIndexListSize; i++) { |
| long endOffset = metadataIndexNode.getEndOffset(); |
| if (i != metadataIndexListSize - 1) { |
| endOffset = metadataIndexNode.getChildren().get(i + 1).getOffset(); |
| } |
| ByteBuffer nextBuffer = |
| readData(metadataIndexNode.getChildren().get(i).getOffset(), endOffset); |
| genTSMetadataFromMetaIndexEntry( |
| metadataIndexNode.getChildren().get(i).getOffset(), |
| metadataIndexNode.getChildren().get(i), |
| nextBuffer, |
| deviceId, |
| metadataIndexNode.getNodeType(), |
| timeseriesMetadataMap, |
| needChunkMetadata); |
| } |
| } |
| } catch (BufferOverflowException e) { |
| throw e; |
| } |
| } |
| |
| /** |
| * collect all Timeseries Meta of the tsFile |
| * |
| * @param needChunkMetadata |
| * @return Map, FileOffset => pair(MeasurementId => TimeseriesMetadata) |
| * @throws IOException |
| */ |
| public Map<Long, Pair<Path, TimeseriesMetadata>> getAllTimeseriesMeta(boolean needChunkMetadata) |
| throws IOException { |
| if (tsFileMetaData == null) { |
| readFileMetadata(); |
| } |
| |
| MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex(); |
| Map<Long, Pair<Path, TimeseriesMetadata>> timeseriesMetadataMap = new TreeMap<>(); |
| List<MetadataIndexEntry> metadataIndexEntryList = metadataIndexNode.getChildren(); |
| for (int i = 0; i < metadataIndexEntryList.size(); i++) { |
| MetadataIndexEntry metadataIndexEntry = metadataIndexEntryList.get(i); |
| long endOffset = tsFileMetaData.getMetadataIndex().getEndOffset(); |
| if (i != metadataIndexEntryList.size() - 1) { |
| endOffset = metadataIndexEntryList.get(i + 1).getOffset(); |
| } |
| ByteBuffer buffer = readData(metadataIndexEntry.getOffset(), endOffset); |
| genTSMetadataFromMetaIndexEntry( |
| metadataIndexEntry.getOffset(), |
| metadataIndexEntry, |
| buffer, |
| null, |
| metadataIndexNode.getNodeType(), |
| timeseriesMetadataMap, |
| needChunkMetadata); |
| } |
| |
| return timeseriesMetadataMap; |
| } |
| |
| /** |
| * read File meta-data of TsFile |
| * |
| * @return |
| * @throws IOException |
| */ |
| public TsFileMetadata readFileMetadata() throws IOException { |
| if (tsFileMetaData != null) { |
| return tsFileMetaData; |
| } |
| |
| try { |
| tsFileMetaData = |
| TsFileMetadata.deserializeFrom( |
| readData(getFileMetadataPos(), (int) getFileMetadataSize())); |
| } catch (BufferOverflowException e) { |
| logger.error("readFileMetadata(), reading file metadata of file {}", file); |
| throw e; |
| } |
| return tsFileMetaData; |
| } |
| } |
| |
| @TestOnly |
| public Collection<Modification> getModificationList() { |
| return modificationList; |
| } |
| |
| @TestOnly |
| public Map<String, DeletionGroup> getFullPathToDeletionMap() { |
| return fullPathToDeletionMap; |
| } |
| |
| @Override |
| public String toString() { |
| return super.toString() + ", tsFileName=" + tsFileName + ", modsFileName=" + modsFileName; |
| } |
| } |