| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iotdb.tsfile.read; |
| |
| import org.apache.iotdb.tsfile.common.conf.TSFileConfig; |
| import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; |
| import org.apache.iotdb.tsfile.common.constant.TsFileConstant; |
| import org.apache.iotdb.tsfile.compress.IUnCompressor; |
| import org.apache.iotdb.tsfile.encoding.decoder.Decoder; |
| import org.apache.iotdb.tsfile.exception.TsFileRuntimeException; |
| import org.apache.iotdb.tsfile.exception.TsFileStatisticsMistakesException; |
| import org.apache.iotdb.tsfile.file.MetaMarker; |
| import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader; |
| import org.apache.iotdb.tsfile.file.header.ChunkHeader; |
| import org.apache.iotdb.tsfile.file.header.PageHeader; |
| import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry; |
| import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode; |
| import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; |
| import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; |
| import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; |
| import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; |
| import org.apache.iotdb.tsfile.read.common.BatchData; |
| import org.apache.iotdb.tsfile.read.common.Chunk; |
| import org.apache.iotdb.tsfile.read.common.Path; |
| import org.apache.iotdb.tsfile.read.controller.CachedChunkLoaderImpl; |
| import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl; |
| import org.apache.iotdb.tsfile.read.reader.TsFileInput; |
| import org.apache.iotdb.tsfile.read.reader.page.PageReader; |
| import org.apache.iotdb.tsfile.read.reader.page.TimePageReader; |
| import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader; |
| import org.apache.iotdb.tsfile.utils.BloomFilter; |
| import org.apache.iotdb.tsfile.utils.Pair; |
| import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; |
| import org.apache.iotdb.tsfile.utils.TsPrimitiveType; |
| import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; |
| import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.Serializable; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.NoSuchElementException; |
| import java.util.Queue; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.locks.ReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import java.util.stream.Collectors; |
| |
| public class TsFileSequenceReader implements AutoCloseable { |
| |
| private static final Logger logger = LoggerFactory.getLogger(TsFileSequenceReader.class); |
| private static final Logger resourceLogger = LoggerFactory.getLogger("FileMonitor"); |
| protected static final TSFileConfig config = TSFileDescriptor.getInstance().getConfig(); |
| private static final String METADATA_INDEX_NODE_DESERIALIZE_ERROR = |
| "Something error happened while deserializing MetadataIndexNode of file {}"; |
| private static final int MAX_READ_BUFFER_SIZE = 4 * 1024 * 1024; |
| protected String file; |
| protected TsFileInput tsFileInput; |
| protected long fileMetadataPos; |
| protected int fileMetadataSize; |
| private ByteBuffer markerBuffer = ByteBuffer.allocate(Byte.BYTES); |
| protected volatile TsFileMetadata tsFileMetaData; |
| // device -> measurement -> TimeseriesMetadata |
| private Map<String, Map<String, TimeseriesMetadata>> cachedDeviceMetadata = |
| new ConcurrentHashMap<>(); |
| private static final ReadWriteLock cacheLock = new ReentrantReadWriteLock(); |
| private boolean cacheDeviceMetadata; |
| private long minPlanIndex = Long.MAX_VALUE; |
| private long maxPlanIndex = Long.MIN_VALUE; |
| |
| /** |
| * Create a file reader of the given file. The reader will read the tail of the file to get the |
| * file metadata size.Then the reader will skip the first |
| * TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.NUMBER_VERSION.getBytes().length |
| * bytes of the file for preparing reading real data. |
| * |
| * @param file the data file |
| * @throws IOException If some I/O error occurs |
| */ |
| public TsFileSequenceReader(String file) throws IOException { |
| this(file, true); |
| } |
| |
| /** |
| * construct function for TsFileSequenceReader. |
| * |
| * @param file -given file name |
| * @param loadMetadataSize -whether load meta data size |
| */ |
| public TsFileSequenceReader(String file, boolean loadMetadataSize) throws IOException { |
| if (resourceLogger.isDebugEnabled()) { |
| resourceLogger.debug("{} reader is opened. {}", file, getClass().getName()); |
| } |
| this.file = file; |
| tsFileInput = FSFactoryProducer.getFileInputFactory().getTsFileInput(file); |
| try { |
| if (loadMetadataSize) { |
| loadMetadataSize(); |
| } |
| } catch (Throwable e) { |
| tsFileInput.close(); |
| throw e; |
| } |
| } |
| |
| // used in merge resource |
| public TsFileSequenceReader(String file, boolean loadMetadata, boolean cacheDeviceMetadata) |
| throws IOException { |
| this(file, loadMetadata); |
| this.cacheDeviceMetadata = cacheDeviceMetadata; |
| } |
| |
| /** |
| * Create a file reader of the given file. The reader will read the tail of the file to get the |
| * file metadata size.Then the reader will skip the first |
| * TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.NUMBER_VERSION.getBytes().length |
| * bytes of the file for preparing reading real data. |
| * |
| * @param input given input |
| */ |
| public TsFileSequenceReader(TsFileInput input) throws IOException { |
| this(input, true); |
| } |
| |
| /** |
| * construct function for TsFileSequenceReader. |
| * |
| * @param input -given input |
| * @param loadMetadataSize -load meta data size |
| */ |
| public TsFileSequenceReader(TsFileInput input, boolean loadMetadataSize) throws IOException { |
| this.tsFileInput = input; |
| this.file = input.getFilePath(); |
| try { |
| if (loadMetadataSize) { // NOTE no autoRepair here |
| loadMetadataSize(); |
| } |
| } catch (Throwable e) { |
| tsFileInput.close(); |
| throw e; |
| } |
| } |
| |
| /** |
| * construct function for TsFileSequenceReader. |
| * |
| * @param input the input of a tsfile. The current position should be a marker and then a chunk |
| * Header, rather than the magic number |
| * @param fileMetadataPos the position of the file metadata in the TsFileInput from the beginning |
| * of the input to the current position |
| * @param fileMetadataSize the byte size of the file metadata in the input |
| */ |
| public TsFileSequenceReader(TsFileInput input, long fileMetadataPos, int fileMetadataSize) { |
| this.tsFileInput = input; |
| this.fileMetadataPos = fileMetadataPos; |
| this.fileMetadataSize = fileMetadataSize; |
| } |
| |
| public void loadMetadataSize() throws IOException { |
| ByteBuffer metadataSize = ByteBuffer.allocate(Integer.BYTES); |
| if (readTailMagic().equals(TSFileConfig.MAGIC_STRING)) { |
| tsFileInput.read( |
| metadataSize, |
| tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES); |
| metadataSize.flip(); |
| // read file metadata size and position |
| fileMetadataSize = ReadWriteIOUtils.readInt(metadataSize); |
| fileMetadataPos = |
| tsFileInput.size() |
| - TSFileConfig.MAGIC_STRING.getBytes().length |
| - Integer.BYTES |
| - fileMetadataSize; |
| } |
| } |
| |
| public long getFileMetadataPos() { |
| return fileMetadataPos; |
| } |
| |
| public int getTsFileMetadataSize() { |
| return fileMetadataSize; |
| } |
| |
| /** |
| * Return the whole meta data size of this tsfile, including ChunkMetadata, TimeseriesMetadata and |
| * etc. |
| */ |
| public long getFileMetadataSize() throws IOException { |
| return tsFileInput.size() - getFileMetadataPos(); |
| } |
| |
| /** this function does not modify the position of the file reader. */ |
| public String readTailMagic() throws IOException { |
| long totalSize = tsFileInput.size(); |
| ByteBuffer magicStringBytes = ByteBuffer.allocate(TSFileConfig.MAGIC_STRING.getBytes().length); |
| tsFileInput.read(magicStringBytes, totalSize - TSFileConfig.MAGIC_STRING.getBytes().length); |
| magicStringBytes.flip(); |
| return new String(magicStringBytes.array()); |
| } |
| |
| /** whether the file is a complete TsFile: only if the head magic and tail magic string exists. */ |
| public boolean isComplete() throws IOException { |
| long size = tsFileInput.size(); |
| // TSFileConfig.MAGIC_STRING.getBytes().length * 2 for two magic string |
| // Byte.BYTES for the file version number |
| if (size >= TSFileConfig.MAGIC_STRING.getBytes().length * 2 + Byte.BYTES) { |
| String tailMagic = readTailMagic(); |
| String headMagic = readHeadMagic(); |
| return tailMagic.equals(headMagic); |
| } else { |
| return false; |
| } |
| } |
| |
| /** this function does not modify the position of the file reader. */ |
| public String readHeadMagic() throws IOException { |
| ByteBuffer magicStringBytes = ByteBuffer.allocate(TSFileConfig.MAGIC_STRING.getBytes().length); |
| tsFileInput.read(magicStringBytes, 0); |
| magicStringBytes.flip(); |
| return new String(magicStringBytes.array()); |
| } |
| |
| /** this function reads version number and checks compatibility of TsFile. */ |
| public byte readVersionNumber() throws IOException { |
| ByteBuffer versionNumberByte = ByteBuffer.allocate(Byte.BYTES); |
| tsFileInput.read(versionNumberByte, TSFileConfig.MAGIC_STRING.getBytes().length); |
| versionNumberByte.flip(); |
| return versionNumberByte.get(); |
| } |
| |
| /** |
| * this function does not modify the position of the file reader. |
| * |
| * @throws IOException io error |
| */ |
| public TsFileMetadata readFileMetadata() throws IOException { |
| try { |
| if (tsFileMetaData == null) { |
| synchronized (this) { |
| if (tsFileMetaData == null) { |
| tsFileMetaData = |
| TsFileMetadata.deserializeFrom(readData(fileMetadataPos, fileMetadataSize)); |
| } |
| } |
| } |
| } catch (Exception e) { |
| logger.error("Something error happened while reading file metadata of file {}", file); |
| throw e; |
| } |
| return tsFileMetaData; |
| } |
| |
| /** |
| * this function does not modify the position of the file reader. |
| * |
| * @throws IOException io error |
| */ |
| public BloomFilter readBloomFilter() throws IOException { |
| readFileMetadata(); |
| return tsFileMetaData.getBloomFilter(); |
| } |
| |
| /** |
| * this function reads measurements and TimeseriesMetaDatas in given device Thread Safe |
| * |
| * @param device name |
| * @return the map measurementId -> TimeseriesMetaData in one device |
| * @throws IOException io error |
| */ |
| public Map<String, TimeseriesMetadata> readDeviceMetadata(String device) throws IOException { |
| if (!cacheDeviceMetadata) { |
| return readDeviceMetadataFromDisk(device); |
| } |
| |
| cacheLock.readLock().lock(); |
| try { |
| if (cachedDeviceMetadata.containsKey(device)) { |
| return cachedDeviceMetadata.get(device); |
| } |
| } finally { |
| cacheLock.readLock().unlock(); |
| } |
| |
| cacheLock.writeLock().lock(); |
| try { |
| if (cachedDeviceMetadata.containsKey(device)) { |
| return cachedDeviceMetadata.get(device); |
| } |
| readFileMetadata(); |
| Map<String, TimeseriesMetadata> deviceMetadata = readDeviceMetadataFromDisk(device); |
| cachedDeviceMetadata.put(device, deviceMetadata); |
| return deviceMetadata; |
| } finally { |
| cacheLock.writeLock().unlock(); |
| } |
| } |
| |
| private Map<String, TimeseriesMetadata> readDeviceMetadataFromDisk(String device) |
| throws IOException { |
| readFileMetadata(); |
| List<TimeseriesMetadata> timeseriesMetadataList = |
| getDeviceTimeseriesMetadataWithoutChunkMetadata(device); |
| Map<String, TimeseriesMetadata> deviceMetadata = new HashMap<>(); |
| for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) { |
| deviceMetadata.put(timeseriesMetadata.getMeasurementId(), timeseriesMetadata); |
| } |
| return deviceMetadata; |
| } |
| |
| public TimeseriesMetadata readTimeseriesMetadata(Path path, boolean ignoreNotExists) |
| throws IOException { |
| readFileMetadata(); |
| MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getMetadataIndex(); |
| Pair<MetadataIndexEntry, Long> metadataIndexPair = |
| getMetadataAndEndOffset(deviceMetadataIndexNode, path.getDevice(), true, true); |
| if (metadataIndexPair == null) { |
| if (ignoreNotExists) { |
| return null; |
| } |
| throw new IOException("Device {" + path.getDevice() + "} is not in tsFileMetaData"); |
| } |
| ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); |
| MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode; |
| if (!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { |
| try { |
| metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer); |
| } catch (Exception e) { |
| logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file); |
| throw e; |
| } |
| metadataIndexPair = |
| getMetadataAndEndOffset(metadataIndexNode, path.getMeasurement(), false, false); |
| } |
| if (metadataIndexPair == null) { |
| return null; |
| } |
| List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>(); |
| buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); |
| while (buffer.hasRemaining()) { |
| try { |
| timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer, true)); |
| } catch (Exception e) { |
| logger.error( |
| "Something error happened while deserializing TimeseriesMetadata of file {}", file); |
| throw e; |
| } |
| } |
| // return null if path does not exist in the TsFile |
| int searchResult = |
| binarySearchInTimeseriesMetadataList(timeseriesMetadataList, path.getMeasurement()); |
| return searchResult >= 0 ? timeseriesMetadataList.get(searchResult) : null; |
| } |
| |
| // This method is only used for TsFile |
| public ITimeSeriesMetadata readITimeseriesMetadata(Path path, boolean ignoreNotExists) |
| throws IOException { |
| readFileMetadata(); |
| MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getMetadataIndex(); |
| Pair<MetadataIndexEntry, Long> metadataIndexPair = |
| getMetadataAndEndOffset(deviceMetadataIndexNode, path.getDevice(), true, true); |
| if (metadataIndexPair == null) { |
| if (ignoreNotExists) { |
| return null; |
| } |
| throw new IOException("Device {" + path.getDevice() + "} is not in tsFileMetaData"); |
| } |
| ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); |
| MetadataIndexNode metadataIndexNode; |
| TimeseriesMetadata firstTimeseriesMetadata; |
| try { |
| // next layer MeasurementNode of the specific DeviceNode |
| metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer); |
| } catch (Exception e) { |
| logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file); |
| throw e; |
| } |
| firstTimeseriesMetadata = tryToGetFirstTimeseriesMetadata(metadataIndexNode); |
| metadataIndexPair = |
| getMetadataAndEndOffset(metadataIndexNode, path.getMeasurement(), false, false); |
| |
| if (metadataIndexPair == null) { |
| return null; |
| } |
| List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>(); |
| buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); |
| while (buffer.hasRemaining()) { |
| try { |
| timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer, true)); |
| } catch (Exception e) { |
| logger.error( |
| "Something error happened while deserializing TimeseriesMetadata of file {}", file); |
| throw e; |
| } |
| } |
| // return null if path does not exist in the TsFile |
| int searchResult = |
| binarySearchInTimeseriesMetadataList(timeseriesMetadataList, path.getMeasurement()); |
| if (searchResult >= 0) { |
| if (firstTimeseriesMetadata != null) { |
| List<TimeseriesMetadata> valueTimeseriesMetadataList = new ArrayList<>(); |
| valueTimeseriesMetadataList.add(timeseriesMetadataList.get(searchResult)); |
| return new AlignedTimeSeriesMetadata(firstTimeseriesMetadata, valueTimeseriesMetadataList); |
| } else { |
| return timeseriesMetadataList.get(searchResult); |
| } |
| } else { |
| return null; |
| } |
| } |
| |
| /* Find the leaf node that contains path, return all the sensors in that leaf node which are also in allSensors set */ |
| public List<TimeseriesMetadata> readTimeseriesMetadata(Path path, Set<String> allSensors) |
| throws IOException { |
| Pair<MetadataIndexEntry, Long> metadataIndexPair = getLeafMetadataIndexPair(path); |
| if (metadataIndexPair == null) { |
| return Collections.emptyList(); |
| } |
| List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>(); |
| |
| ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); |
| while (buffer.hasRemaining()) { |
| TimeseriesMetadata timeseriesMetadata; |
| try { |
| timeseriesMetadata = TimeseriesMetadata.deserializeFrom(buffer, true); |
| } catch (Exception e) { |
| logger.error( |
| "Something error happened while deserializing TimeseriesMetadata of file {}", file); |
| throw e; |
| } |
| if (allSensors.contains(timeseriesMetadata.getMeasurementId())) { |
| timeseriesMetadataList.add(timeseriesMetadata); |
| } |
| } |
| return timeseriesMetadataList; |
| } |
| |
| /* Get leaf MetadataIndexPair which contains path */ |
| private Pair<MetadataIndexEntry, Long> getLeafMetadataIndexPair(Path path) throws IOException { |
| readFileMetadata(); |
| MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getMetadataIndex(); |
| Pair<MetadataIndexEntry, Long> metadataIndexPair = |
| getMetadataAndEndOffset(deviceMetadataIndexNode, path.getDevice(), true, true); |
| if (metadataIndexPair == null) { |
| return null; |
| } |
| ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); |
| MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode; |
| if (!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { |
| try { |
| metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer); |
| } catch (Exception e) { |
| logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file); |
| throw e; |
| } |
| metadataIndexPair = |
| getMetadataAndEndOffset(metadataIndexNode, path.getMeasurement(), false, false); |
| } |
| return metadataIndexPair; |
| } |
| |
| // This method is only used for TsFile |
| public List<ITimeSeriesMetadata> readITimeseriesMetadata(String device, Set<String> measurements) |
| throws IOException { |
| readFileMetadata(); |
| MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getMetadataIndex(); |
| Pair<MetadataIndexEntry, Long> metadataIndexPair = |
| getMetadataAndEndOffset(deviceMetadataIndexNode, device, true, false); |
| if (metadataIndexPair == null) { |
| return Collections.emptyList(); |
| } |
| List<ITimeSeriesMetadata> resultTimeseriesMetadataList = new ArrayList<>(); |
| List<String> measurementList = new ArrayList<>(measurements); |
| Set<String> measurementsHadFound = new HashSet<>(); |
| // the content of next Layer MeasurementNode of the specific device's DeviceNode |
| ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); |
| Pair<MetadataIndexEntry, Long> measurementMetadataIndexPair = metadataIndexPair; |
| List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>(); |
| |
| // next layer MeasurementNode of the specific DeviceNode |
| MetadataIndexNode measurementMetadataIndexNode; |
| try { |
| measurementMetadataIndexNode = MetadataIndexNode.deserializeFrom(buffer); |
| } catch (Exception e) { |
| logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file); |
| throw e; |
| } |
| // Get the first timeseriesMetadata of the device |
| TimeseriesMetadata firstTimeseriesMetadata = |
| tryToGetFirstTimeseriesMetadata(measurementMetadataIndexNode); |
| |
| for (int i = 0; i < measurementList.size(); i++) { |
| if (measurementsHadFound.contains(measurementList.get(i))) { |
| continue; |
| } |
| timeseriesMetadataList.clear(); |
| measurementMetadataIndexPair = |
| getMetadataAndEndOffset( |
| measurementMetadataIndexNode, measurementList.get(i), false, false); |
| |
| if (measurementMetadataIndexPair == null) { |
| continue; |
| } |
| // the content of TimeseriesNode of the specific MeasurementLeafNode |
| buffer = |
| readData( |
| measurementMetadataIndexPair.left.getOffset(), measurementMetadataIndexPair.right); |
| while (buffer.hasRemaining()) { |
| try { |
| timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer, true)); |
| } catch (Exception e) { |
| logger.error( |
| "Something error happened while deserializing TimeseriesMetadata of file {}", file); |
| throw e; |
| } |
| } |
| for (int j = i; j < measurementList.size(); j++) { |
| String current = measurementList.get(j); |
| if (!measurementsHadFound.contains(current)) { |
| int searchResult = binarySearchInTimeseriesMetadataList(timeseriesMetadataList, current); |
| if (searchResult >= 0) { |
| if (firstTimeseriesMetadata != null) { |
| List<TimeseriesMetadata> valueTimeseriesMetadataList = new ArrayList<>(); |
| valueTimeseriesMetadataList.add(timeseriesMetadataList.get(searchResult)); |
| resultTimeseriesMetadataList.add( |
| new AlignedTimeSeriesMetadata( |
| firstTimeseriesMetadata, valueTimeseriesMetadataList)); |
| } else { |
| resultTimeseriesMetadataList.add(timeseriesMetadataList.get(searchResult)); |
| } |
| measurementsHadFound.add(current); |
| } |
| } |
| if (measurementsHadFound.size() == measurements.size()) { |
| return resultTimeseriesMetadataList; |
| } |
| } |
| } |
| return resultTimeseriesMetadataList; |
| } |
| |
| protected int binarySearchInTimeseriesMetadataList( |
| List<TimeseriesMetadata> timeseriesMetadataList, String key) { |
| int low = 0; |
| int high = timeseriesMetadataList.size() - 1; |
| |
| while (low <= high) { |
| int mid = (low + high) >>> 1; |
| TimeseriesMetadata midVal = timeseriesMetadataList.get(mid); |
| int cmp = midVal.getMeasurementId().compareTo(key); |
| |
| if (cmp < 0) { |
| low = mid + 1; |
| } else if (cmp > 0) { |
| high = mid - 1; |
| } else { |
| return mid; // key found |
| } |
| } |
| return -1; // key not found |
| } |
| |
| public List<String> getAllDevices() throws IOException { |
| if (tsFileMetaData == null) { |
| readFileMetadata(); |
| } |
| return getAllDevices(tsFileMetaData.getMetadataIndex()); |
| } |
| |
| private List<String> getAllDevices(MetadataIndexNode metadataIndexNode) throws IOException { |
| List<String> deviceList = new ArrayList<>(); |
| // if metadataIndexNode is LEAF_DEVICE, put all devices in node entry into the list |
| if (metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_DEVICE)) { |
| deviceList.addAll( |
| metadataIndexNode.getChildren().stream() |
| .map(x -> x.getName().intern()) |
| .collect(Collectors.toList())); |
| return deviceList; |
| } |
| |
| int metadataIndexListSize = metadataIndexNode.getChildren().size(); |
| for (int i = 0; i < metadataIndexListSize; i++) { |
| long endOffset = metadataIndexNode.getEndOffset(); |
| if (i != metadataIndexListSize - 1) { |
| endOffset = metadataIndexNode.getChildren().get(i + 1).getOffset(); |
| } |
| ByteBuffer buffer = readData(metadataIndexNode.getChildren().get(i).getOffset(), endOffset); |
| MetadataIndexNode node = MetadataIndexNode.deserializeFrom(buffer); |
| deviceList.addAll(getAllDevices(node)); |
| } |
| return deviceList; |
| } |
| |
| /** |
| * @return an iterator of "device, isAligned" list, in which names of devices are ordered in |
| * dictionary order, and isAligned represents whether the device is aligned |
| */ |
| public TsFileDeviceIterator getAllDevicesIteratorWithIsAligned() throws IOException { |
| readFileMetadata(); |
| |
| MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex(); |
| Queue<Pair<String, Pair<Long, Long>>> queue = new LinkedList<>(); |
| getAllDevicesWithIsAligned(metadataIndexNode, queue); |
| |
| return new TsFileDeviceIterator(this, queue); |
| } |
| |
| private void getAllDevicesWithIsAligned( |
| MetadataIndexNode metadataIndexNode, Queue<Pair<String, Pair<Long, Long>>> queue) |
| throws IOException { |
| try { |
| int metadataIndexListSize = metadataIndexNode.getChildren().size(); |
| |
| for (int i = 0; i < metadataIndexListSize; i++) { |
| MetadataIndexEntry entry = metadataIndexNode.getChildren().get(i); |
| long startOffset = entry.getOffset(); |
| long endOffset = metadataIndexNode.getEndOffset(); |
| if (i != metadataIndexListSize - 1) { |
| endOffset = metadataIndexNode.getChildren().get(i + 1).getOffset(); |
| } |
| if (metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_DEVICE)) { |
| queue.add(new Pair<>(entry.getName(), new Pair<>(startOffset, endOffset))); |
| continue; |
| } |
| ByteBuffer nextBuffer = readData(startOffset, endOffset); |
| getAllDevicesWithIsAligned(MetadataIndexNode.deserializeFrom(nextBuffer), queue); |
| } |
| } catch (Exception e) { |
| logger.error("Something error happened while getting all devices of file {}", file); |
| throw e; |
| } |
| } |
| |
| /** |
| * read all ChunkMetaDatas of given device |
| * |
| * @param device name |
| * @return measurement -> ChunkMetadata list |
| * @throws IOException io error |
| */ |
| public Map<String, List<ChunkMetadata>> readChunkMetadataInDevice(String device) |
| throws IOException { |
| readFileMetadata(); |
| List<TimeseriesMetadata> timeseriesMetadataMap = getDeviceTimeseriesMetadata(device); |
| if (timeseriesMetadataMap.isEmpty()) { |
| return new HashMap<>(); |
| } |
| Map<String, List<ChunkMetadata>> seriesMetadata = new LinkedHashMap<>(); |
| for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataMap) { |
| seriesMetadata.put( |
| timeseriesMetadata.getMeasurementId(), |
| timeseriesMetadata.getChunkMetadataList().stream() |
| .map(chunkMetadata -> ((ChunkMetadata) chunkMetadata)) |
| .collect(Collectors.toList())); |
| } |
| return seriesMetadata; |
| } |
| |
| /** |
| * this function return all timeseries names |
| * |
| * @return list of Paths |
| * @throws IOException io error |
| */ |
| public List<Path> getAllPaths() throws IOException { |
| List<Path> paths = new ArrayList<>(); |
| for (String device : getAllDevices()) { |
| Map<String, TimeseriesMetadata> timeseriesMetadataMap = readDeviceMetadata(device); |
| for (String measurementId : timeseriesMetadataMap.keySet()) { |
| paths.add(new Path(device, measurementId)); |
| } |
| } |
| return paths; |
| } |
| |
| /** |
| * @return an iterator of timeseries list, in which names of timeseries are ordered in dictionary |
| * order |
| * @throws IOException io error |
| */ |
| public Iterator<List<Path>> getPathsIterator() throws IOException { |
| readFileMetadata(); |
| |
| MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex(); |
| List<MetadataIndexEntry> metadataIndexEntryList = metadataIndexNode.getChildren(); |
| Queue<Pair<String, Pair<Long, Long>>> queue = new LinkedList<>(); |
| for (int i = 0; i < metadataIndexEntryList.size(); i++) { |
| MetadataIndexEntry metadataIndexEntry = metadataIndexEntryList.get(i); |
| long endOffset = metadataIndexNode.getEndOffset(); |
| if (i != metadataIndexEntryList.size() - 1) { |
| endOffset = metadataIndexEntryList.get(i + 1).getOffset(); |
| } |
| ByteBuffer buffer = readData(metadataIndexEntry.getOffset(), endOffset); |
| getAllPaths(metadataIndexEntry, buffer, null, metadataIndexNode.getNodeType(), queue); |
| } |
| return new Iterator<List<Path>>() { |
| @Override |
| public boolean hasNext() { |
| return !queue.isEmpty(); |
| } |
| |
| @Override |
| public List<Path> next() { |
| if (!hasNext()) { |
| throw new NoSuchElementException(); |
| } |
| Pair<String, Pair<Long, Long>> startEndPair = queue.remove(); |
| List<Path> paths = new ArrayList<>(); |
| try { |
| ByteBuffer nextBuffer = readData(startEndPair.right.left, startEndPair.right.right); |
| while (nextBuffer.hasRemaining()) { |
| paths.add( |
| new Path( |
| startEndPair.left, |
| TimeseriesMetadata.deserializeFrom(nextBuffer, false).getMeasurementId())); |
| } |
| return paths; |
| } catch (IOException e) { |
| throw new TsFileRuntimeException( |
| "Error occurred while reading a time series metadata block."); |
| } |
| } |
| }; |
| } |
| |
| private void getAllPaths( |
| MetadataIndexEntry metadataIndex, |
| ByteBuffer buffer, |
| String deviceId, |
| MetadataIndexNodeType type, |
| Queue<Pair<String, Pair<Long, Long>>> queue) |
| throws IOException { |
| try { |
| if (type.equals(MetadataIndexNodeType.LEAF_DEVICE)) { |
| deviceId = metadataIndex.getName(); |
| } |
| MetadataIndexNode metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer); |
| int metadataIndexListSize = metadataIndexNode.getChildren().size(); |
| for (int i = 0; i < metadataIndexListSize; i++) { |
| long startOffset = metadataIndexNode.getChildren().get(i).getOffset(); |
| long endOffset = metadataIndexNode.getEndOffset(); |
| if (i != metadataIndexListSize - 1) { |
| endOffset = metadataIndexNode.getChildren().get(i + 1).getOffset(); |
| } |
| if (metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { |
| queue.add(new Pair<>(deviceId, new Pair<>(startOffset, endOffset))); |
| continue; |
| } |
| ByteBuffer nextBuffer = |
| readData(metadataIndexNode.getChildren().get(i).getOffset(), endOffset); |
| getAllPaths( |
| metadataIndexNode.getChildren().get(i), |
| nextBuffer, |
| deviceId, |
| metadataIndexNode.getNodeType(), |
| queue); |
| } |
| } catch (Exception e) { |
| logger.error("Something error happened while getting all paths of file {}", file); |
| throw e; |
| } |
| } |
| |
| /** |
| * Check whether the deivce is aligned or not. |
| * |
| * @param measurementNode the next measurement layer node of specific device node |
| */ |
| public boolean isAlignedDevice(MetadataIndexNode measurementNode) { |
| return "".equals(measurementNode.getChildren().get(0).getName()); |
| } |
| |
| TimeseriesMetadata tryToGetFirstTimeseriesMetadata(MetadataIndexNode measurementNode) |
| throws IOException { |
| // Not aligned timeseries |
| if (!"".equals(measurementNode.getChildren().get(0).getName())) { |
| return null; |
| } |
| |
| // Aligned timeseries |
| if (measurementNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { |
| ByteBuffer buffer; |
| if (measurementNode.getChildren().size() > 1) { |
| buffer = |
| readData( |
| measurementNode.getChildren().get(0).getOffset(), |
| measurementNode.getChildren().get(1).getOffset()); |
| } else { |
| buffer = |
| readData( |
| measurementNode.getChildren().get(0).getOffset(), measurementNode.getEndOffset()); |
| } |
| return TimeseriesMetadata.deserializeFrom(buffer, true); |
| } else if (measurementNode.getNodeType().equals(MetadataIndexNodeType.INTERNAL_MEASUREMENT)) { |
| ByteBuffer buffer = |
| readData( |
| measurementNode.getChildren().get(0).getOffset(), |
| measurementNode.getChildren().get(1).getOffset()); |
| MetadataIndexNode metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer); |
| return tryToGetFirstTimeseriesMetadata(metadataIndexNode); |
| } |
| return null; |
| } |
| |
| /** |
| * Get timeseries metadata under the measurementNode and put them into timeseriesMetadataList. |
| * Skip timeseries whose measurementId is in the excludedMeasurementIds. |
| * |
| * @param measurementNode next layer measurement node of specific device leaf node |
| * @param excludedMeasurementIds skip timeseries whose measurementId is in the set |
| */ |
| public void getDeviceTimeseriesMetadata( |
| List<TimeseriesMetadata> timeseriesMetadataList, |
| MetadataIndexNode measurementNode, |
| Set<String> excludedMeasurementIds, |
| boolean needChunkMetadata) |
| throws IOException { |
| int metadataIndexListSize = measurementNode.getChildren().size(); |
| for (int i = 0; i < metadataIndexListSize; i++) { |
| long endOffset = measurementNode.getEndOffset(); |
| if (i != metadataIndexListSize - 1) { |
| endOffset = measurementNode.getChildren().get(i + 1).getOffset(); |
| } |
| ByteBuffer nextBuffer = readData(measurementNode.getChildren().get(i).getOffset(), endOffset); |
| if (measurementNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { |
| // leaf measurement node |
| while (nextBuffer.hasRemaining()) { |
| TimeseriesMetadata timeseriesMetadata = |
| TimeseriesMetadata.deserializeFrom( |
| nextBuffer, excludedMeasurementIds, needChunkMetadata); |
| if (timeseriesMetadata != null) { |
| timeseriesMetadataList.add(timeseriesMetadata); |
| } |
| } |
| } else { |
| // internal measurement node |
| MetadataIndexNode nextLayerMeasurementNode = MetadataIndexNode.deserializeFrom(nextBuffer); |
| getDeviceTimeseriesMetadata( |
| timeseriesMetadataList, |
| nextLayerMeasurementNode, |
| excludedMeasurementIds, |
| needChunkMetadata); |
| } |
| } |
| } |
| |
| /** |
| * Traverse the metadata index from MetadataIndexEntry to get TimeseriesMetadatas |
| * |
| * @param metadataIndex MetadataIndexEntry |
| * @param buffer byte buffer |
| * @param deviceId String |
| * @param timeseriesMetadataMap map: deviceId -> timeseriesMetadata list |
| * @param needChunkMetadata deserialize chunk metadata list or not |
| */ |
| private void generateMetadataIndex( |
| MetadataIndexEntry metadataIndex, |
| ByteBuffer buffer, |
| String deviceId, |
| MetadataIndexNodeType type, |
| Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap, |
| boolean needChunkMetadata) |
| throws IOException { |
| try { |
| if (type.equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { |
| List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>(); |
| while (buffer.hasRemaining()) { |
| timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer, needChunkMetadata)); |
| } |
| timeseriesMetadataMap |
| .computeIfAbsent(deviceId, k -> new ArrayList<>()) |
| .addAll(timeseriesMetadataList); |
| } else { |
| // deviceId should be determined by LEAF_DEVICE node |
| if (type.equals(MetadataIndexNodeType.LEAF_DEVICE)) { |
| deviceId = metadataIndex.getName(); |
| } |
| MetadataIndexNode metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer); |
| int metadataIndexListSize = metadataIndexNode.getChildren().size(); |
| for (int i = 0; i < metadataIndexListSize; i++) { |
| long endOffset = metadataIndexNode.getEndOffset(); |
| if (i != metadataIndexListSize - 1) { |
| endOffset = metadataIndexNode.getChildren().get(i + 1).getOffset(); |
| } |
| ByteBuffer nextBuffer = |
| readData(metadataIndexNode.getChildren().get(i).getOffset(), endOffset); |
| generateMetadataIndex( |
| metadataIndexNode.getChildren().get(i), |
| nextBuffer, |
| deviceId, |
| metadataIndexNode.getNodeType(), |
| timeseriesMetadataMap, |
| needChunkMetadata); |
| } |
| } |
| } catch (Exception e) { |
| logger.error("Something error happened while generating MetadataIndex of file {}", file); |
| throw e; |
| } |
| } |
| |
| /* TimeseriesMetadata don't need deserialize chunk metadata list */ |
| public Map<String, List<TimeseriesMetadata>> getAllTimeseriesMetadata(boolean needChunkMetadata) |
| throws IOException { |
| if (tsFileMetaData == null) { |
| readFileMetadata(); |
| } |
| Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap = new HashMap<>(); |
| MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex(); |
| List<MetadataIndexEntry> metadataIndexEntryList = metadataIndexNode.getChildren(); |
| for (int i = 0; i < metadataIndexEntryList.size(); i++) { |
| MetadataIndexEntry metadataIndexEntry = metadataIndexEntryList.get(i); |
| long endOffset = metadataIndexNode.getEndOffset(); |
| if (i != metadataIndexEntryList.size() - 1) { |
| endOffset = metadataIndexEntryList.get(i + 1).getOffset(); |
| } |
| ByteBuffer buffer = readData(metadataIndexEntry.getOffset(), endOffset); |
| generateMetadataIndex( |
| metadataIndexEntry, |
| buffer, |
| null, |
| metadataIndexNode.getNodeType(), |
| timeseriesMetadataMap, |
| needChunkMetadata); |
| } |
| return timeseriesMetadataMap; |
| } |
| |
| /* This method will only deserialize the TimeseriesMetadata, not including chunk metadata list */ |
| private List<TimeseriesMetadata> getDeviceTimeseriesMetadataWithoutChunkMetadata(String device) |
| throws IOException { |
| MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex(); |
| Pair<MetadataIndexEntry, Long> metadataIndexPair = |
| getMetadataAndEndOffset(metadataIndexNode, device, true, true); |
| if (metadataIndexPair == null) { |
| return Collections.emptyList(); |
| } |
| ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); |
| Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap = new TreeMap<>(); |
| generateMetadataIndex( |
| metadataIndexPair.left, |
| buffer, |
| device, |
| MetadataIndexNodeType.INTERNAL_MEASUREMENT, |
| timeseriesMetadataMap, |
| false); |
| List<TimeseriesMetadata> deviceTimeseriesMetadata = new ArrayList<>(); |
| for (List<TimeseriesMetadata> timeseriesMetadataList : timeseriesMetadataMap.values()) { |
| deviceTimeseriesMetadata.addAll(timeseriesMetadataList); |
| } |
| return deviceTimeseriesMetadata; |
| } |
| |
| /* This method will not only deserialize the TimeseriesMetadata, but also all the chunk metadata list meanwhile. */ |
| private List<TimeseriesMetadata> getDeviceTimeseriesMetadata(String device) throws IOException { |
| MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex(); |
| Pair<MetadataIndexEntry, Long> metadataIndexPair = |
| getMetadataAndEndOffset(metadataIndexNode, device, true, true); |
| if (metadataIndexPair == null) { |
| return Collections.emptyList(); |
| } |
| ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); |
| Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap = new TreeMap<>(); |
| generateMetadataIndex( |
| metadataIndexPair.left, |
| buffer, |
| device, |
| MetadataIndexNodeType.INTERNAL_MEASUREMENT, |
| timeseriesMetadataMap, |
| true); |
| List<TimeseriesMetadata> deviceTimeseriesMetadata = new ArrayList<>(); |
| for (List<TimeseriesMetadata> timeseriesMetadataList : timeseriesMetadataMap.values()) { |
| deviceTimeseriesMetadata.addAll(timeseriesMetadataList); |
| } |
| return deviceTimeseriesMetadata; |
| } |
| |
| /** |
| * Get target MetadataIndexEntry and its end offset |
| * |
| * @param metadataIndex given MetadataIndexNode |
| * @param name target device / measurement name |
| * @param isDeviceLevel whether target MetadataIndexNode is device level |
| * @param exactSearch whether is in exact search mode, return null when there is no entry with |
| * name; or else return the nearest MetadataIndexEntry before it (for deeper search) |
| * @return target MetadataIndexEntry, endOffset pair |
| */ |
| protected Pair<MetadataIndexEntry, Long> getMetadataAndEndOffset( |
| MetadataIndexNode metadataIndex, String name, boolean isDeviceLevel, boolean exactSearch) |
| throws IOException { |
| try { |
| // When searching for a device node, return when it is not INTERNAL_DEVICE |
| // When searching for a measurement node, return when it is not INTERNAL_MEASUREMENT |
| if ((isDeviceLevel |
| && !metadataIndex.getNodeType().equals(MetadataIndexNodeType.INTERNAL_DEVICE)) |
| || (!isDeviceLevel |
| && !metadataIndex.getNodeType().equals(MetadataIndexNodeType.INTERNAL_MEASUREMENT))) { |
| return metadataIndex.getChildIndexEntry(name, exactSearch); |
| } else { |
| Pair<MetadataIndexEntry, Long> childIndexEntry = |
| metadataIndex.getChildIndexEntry(name, false); |
| ByteBuffer buffer = readData(childIndexEntry.left.getOffset(), childIndexEntry.right); |
| return getMetadataAndEndOffset( |
| MetadataIndexNode.deserializeFrom(buffer), name, isDeviceLevel, exactSearch); |
| } |
| } catch (Exception e) { |
| logger.error("Something error happened while deserializing MetadataIndex of file {}", file); |
| throw e; |
| } |
| } |
| |
| /** |
| * read data from current position of the input, and deserialize it to a CHUNK_GROUP_FOOTER. <br> |
| * This method is not threadsafe. |
| * |
| * @return a CHUNK_GROUP_FOOTER |
| * @throws IOException io error |
| */ |
| public ChunkGroupHeader readChunkGroupHeader() throws IOException { |
| return ChunkGroupHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), true); |
| } |
| |
| /** |
| * read data from current position of the input, and deserialize it to a CHUNK_GROUP_FOOTER. |
| * |
| * @param position the offset of the chunk group footer in the file |
| * @param markerRead true if the offset does not contains the marker , otherwise false |
| * @return a CHUNK_GROUP_FOOTER |
| * @throws IOException io error |
| */ |
| public ChunkGroupHeader readChunkGroupHeader(long position, boolean markerRead) |
| throws IOException { |
| return ChunkGroupHeader.deserializeFrom(tsFileInput, position, markerRead); |
| } |
| |
| public void readPlanIndex() throws IOException { |
| ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); |
| if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) == 0) { |
| throw new IOException("reach the end of the file."); |
| } |
| buffer.flip(); |
| minPlanIndex = buffer.getLong(); |
| buffer.clear(); |
| if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) == 0) { |
| throw new IOException("reach the end of the file."); |
| } |
| buffer.flip(); |
| maxPlanIndex = buffer.getLong(); |
| } |
| |
| /** |
| * read data from current position of the input, and deserialize it to a CHUNK_HEADER. <br> |
| * This method is not threadsafe. |
| * |
| * @return a CHUNK_HEADER |
| * @throws IOException io error |
| */ |
| public ChunkHeader readChunkHeader(byte chunkType) throws IOException { |
| try { |
| return ChunkHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), chunkType); |
| } catch (Throwable t) { |
| logger.warn("Exception {} happened while reading chunk header of {}", t.getMessage(), file); |
| throw t; |
| } |
| } |
| |
| /** |
| * read the chunk's header. |
| * |
| * @param position the file offset of this chunk's header |
| * @param chunkHeaderSize the size of chunk's header |
| */ |
| private ChunkHeader readChunkHeader(long position, int chunkHeaderSize) throws IOException { |
| try { |
| return ChunkHeader.deserializeFrom(tsFileInput, position, chunkHeaderSize); |
| } catch (Throwable t) { |
| logger.warn("Exception {} happened while reading chunk header of {}", t.getMessage(), file); |
| throw t; |
| } |
| } |
| |
| /** |
| * notice, this function will modify channel's position. |
| * |
| * @param dataSize the size of chunkdata |
| * @param position the offset of the chunk data |
| * @return the pages of this chunk |
| */ |
| public ByteBuffer readChunk(long position, int dataSize) throws IOException { |
| try { |
| return readData(position, dataSize); |
| } catch (Throwable t) { |
| logger.warn("Exception {} happened while reading chunk of {}", t.getMessage(), file); |
| throw t; |
| } |
| } |
| |
| /** |
| * read memory chunk. |
| * |
| * @param metaData -given chunk meta data |
| * @return -chunk |
| */ |
| public Chunk readMemChunk(ChunkMetadata metaData) throws IOException { |
| try { |
| int chunkHeadSize = ChunkHeader.getSerializedSize(metaData.getMeasurementUid()); |
| ChunkHeader header = readChunkHeader(metaData.getOffsetOfChunkHeader(), chunkHeadSize); |
| ByteBuffer buffer = |
| readChunk( |
| metaData.getOffsetOfChunkHeader() + header.getSerializedSize(), header.getDataSize()); |
| return new Chunk(header, buffer, metaData.getDeleteIntervalList(), metaData.getStatistics()); |
| } catch (Throwable t) { |
| logger.warn("Exception {} happened while reading chunk of {}", t.getMessage(), file); |
| throw t; |
| } |
| } |
| |
| /** |
| * read memory chunk. |
| * |
| * @param chunkCacheKey given key of chunk LRUCache |
| * @return chunk |
| */ |
| public Chunk readMemChunk(CachedChunkLoaderImpl.ChunkCacheKey chunkCacheKey) throws IOException { |
| int chunkHeadSize = ChunkHeader.getSerializedSize(chunkCacheKey.getMeasurementUid()); |
| ChunkHeader header = readChunkHeader(chunkCacheKey.getOffsetOfChunkHeader(), chunkHeadSize); |
| ByteBuffer buffer = |
| readChunk( |
| chunkCacheKey.getOffsetOfChunkHeader() + header.getSerializedSize(), |
| header.getDataSize()); |
| return new Chunk( |
| header, buffer, chunkCacheKey.getDeleteIntervalList(), chunkCacheKey.getStatistics()); |
| } |
| |
| /** Get measurement schema by chunkMetadatas. */ |
| public MeasurementSchema getMeasurementSchema(List<IChunkMetadata> chunkMetadataList) |
| throws IOException { |
| if (chunkMetadataList.isEmpty()) { |
| return null; |
| } |
| IChunkMetadata lastChunkMetadata = chunkMetadataList.get(chunkMetadataList.size() - 1); |
| int chunkHeadSize = ChunkHeader.getSerializedSize(lastChunkMetadata.getMeasurementUid()); |
| ChunkHeader header = readChunkHeader(lastChunkMetadata.getOffsetOfChunkHeader(), chunkHeadSize); |
| return new MeasurementSchema( |
| lastChunkMetadata.getMeasurementUid(), |
| header.getDataType(), |
| header.getEncodingType(), |
| header.getCompressionType()); |
| } |
| |
| /** |
| * not thread safe. |
| * |
| * @param type given tsfile data type |
| */ |
| public PageHeader readPageHeader(TSDataType type, boolean hasStatistic) throws IOException { |
| try { |
| return PageHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), type, hasStatistic); |
| } catch (Throwable t) { |
| logger.warn("Exception {} happened while reading page header of {}", t.getMessage(), file); |
| throw t; |
| } |
| } |
| |
| public long position() throws IOException { |
| return tsFileInput.position(); |
| } |
| |
| public void position(long offset) throws IOException { |
| tsFileInput.position(offset); |
| } |
| |
| public void skipPageData(PageHeader header) throws IOException { |
| tsFileInput.position(tsFileInput.position() + header.getCompressedSize()); |
| } |
| |
| public ByteBuffer readCompressedPage(PageHeader header) throws IOException { |
| return readData(-1, header.getCompressedSize()); |
| } |
| |
| public ByteBuffer readPage(PageHeader header, CompressionType type) throws IOException { |
| ByteBuffer buffer = readData(-1, header.getCompressedSize()); |
| if (header.getUncompressedSize() == 0 || type == CompressionType.UNCOMPRESSED) { |
| return buffer; |
| } // FIXME if the buffer is not array-implemented. |
| IUnCompressor unCompressor = IUnCompressor.getUnCompressor(type); |
| ByteBuffer uncompressedBuffer = ByteBuffer.allocate(header.getUncompressedSize()); |
| unCompressor.uncompress( |
| buffer.array(), buffer.position(), buffer.remaining(), uncompressedBuffer.array(), 0); |
| return uncompressedBuffer; |
| } |
| |
| /** |
| * read one byte from the input. <br> |
| * this method is not thread safe |
| */ |
| public byte readMarker() throws IOException { |
| markerBuffer.clear(); |
| if (ReadWriteIOUtils.readAsPossible(tsFileInput, markerBuffer) == 0) { |
| throw new IOException("reach the end of the file."); |
| } |
| markerBuffer.flip(); |
| return markerBuffer.get(); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| if (resourceLogger.isDebugEnabled()) { |
| resourceLogger.debug("{} reader is closed.", file); |
| } |
| this.tsFileInput.close(); |
| } |
| |
| public String getFileName() { |
| return this.file; |
| } |
| |
| public long fileSize() throws IOException { |
| return tsFileInput.size(); |
| } |
| |
| /** |
| * read data from tsFileInput, from the current position (if position = -1), or the given |
| * position. <br> |
| * if position = -1, the tsFileInput's position will be changed to the current position + real |
| * data size that been read. Other wise, the tsFileInput's position is not changed. |
| * |
| * @param position the start position of data in the tsFileInput, or the current position if |
| * position = -1 |
| * @param totalSize the size of data that want to read |
| * @return data that been read. |
| */ |
| protected ByteBuffer readData(long position, int totalSize) throws IOException { |
| int allocateSize = Math.min(MAX_READ_BUFFER_SIZE, totalSize); |
| int allocateNum = (int) Math.ceil((double) totalSize / allocateSize); |
| ByteBuffer buffer = ByteBuffer.allocate(totalSize); |
| int bufferLimit = 0; |
| for (int i = 0; i < allocateNum; i++) { |
| if (i == allocateNum - 1) { |
| allocateSize = totalSize - allocateSize * (allocateNum - 1); |
| } |
| bufferLimit += allocateSize; |
| buffer.limit(bufferLimit); |
| if (position < 0) { |
| if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) != allocateSize) { |
| throw new IOException("reach the end of the data"); |
| } |
| } else { |
| long actualReadSize = |
| ReadWriteIOUtils.readAsPossible(tsFileInput, buffer, position, allocateSize); |
| if (actualReadSize != allocateSize) { |
| throw new IOException( |
| String.format( |
| "reach the end of the data. Size of data that want to read: %s," |
| + "actual read size: %s, position: %s", |
| allocateSize, actualReadSize, position)); |
| } |
| position += allocateSize; |
| } |
| } |
| buffer.flip(); |
| return buffer; |
| } |
| |
| /** |
| * read data from tsFileInput, from the current position (if position = -1), or the given |
| * position. |
| * |
| * @param start the start position of data in the tsFileInput, or the current position if position |
| * = -1 |
| * @param end the end position of data that want to read |
| * @return data that been read. |
| */ |
| protected ByteBuffer readData(long start, long end) throws IOException { |
| try { |
| return readData(start, (int) (end - start)); |
| } catch (Throwable t) { |
| logger.warn("Exception {} happened while reading data of {}", t.getMessage(), file); |
| throw t; |
| } |
| } |
| |
| /** notice, the target bytebuffer are not flipped. */ |
| public int readRaw(long position, int length, ByteBuffer target) throws IOException { |
| return ReadWriteIOUtils.readAsPossible(tsFileInput, target, position, length); |
| } |
| |
| /** |
| * Self Check the file and return the position before where the data is safe. |
| * |
| * @param newSchema the schema on each time series in the file |
| * @param chunkGroupMetadataList ChunkGroupMetadata List |
| * @param fastFinish if true and the file is complete, then newSchema and chunkGroupMetadataList |
| * parameter will be not modified. |
| * @return the position of the file that is fine. All data after the position in the file should |
| * be truncated. |
| */ |
| @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning |
| public long selfCheck( |
| Map<Path, IMeasurementSchema> newSchema, |
| List<ChunkGroupMetadata> chunkGroupMetadataList, |
| boolean fastFinish) |
| throws IOException { |
| File checkFile = FSFactoryProducer.getFSFactory().getFile(this.file); |
| long fileSize; |
| if (!checkFile.exists()) { |
| return TsFileCheckStatus.FILE_NOT_FOUND; |
| } else { |
| fileSize = checkFile.length(); |
| } |
| ChunkMetadata currentChunk; |
| String measurementID; |
| TSDataType dataType; |
| long fileOffsetOfChunk; |
| |
| // ChunkMetadata of current ChunkGroup |
| List<ChunkMetadata> chunkMetadataList = new ArrayList<>(); |
| |
| int headerLength = TSFileConfig.MAGIC_STRING.getBytes().length + Byte.BYTES; |
| if (fileSize < headerLength) { |
| return TsFileCheckStatus.INCOMPATIBLE_FILE; |
| } |
| if (!TSFileConfig.MAGIC_STRING.equals(readHeadMagic()) |
| || (TSFileConfig.VERSION_NUMBER != readVersionNumber())) { |
| return TsFileCheckStatus.INCOMPATIBLE_FILE; |
| } |
| |
| tsFileInput.position(headerLength); |
| boolean isComplete = isComplete(); |
| if (fileSize == headerLength) { |
| return headerLength; |
| } else if (isComplete) { |
| loadMetadataSize(); |
| if (fastFinish) { |
| return TsFileCheckStatus.COMPLETE_FILE; |
| } |
| } |
| // if not a complete file, we will recover it... |
| long truncatedSize = headerLength; |
| byte marker; |
| List<long[]> timeBatch = new ArrayList<>(); |
| String lastDeviceId = null; |
| List<IMeasurementSchema> measurementSchemaList = new ArrayList<>(); |
| try { |
| while ((marker = this.readMarker()) != MetaMarker.SEPARATOR) { |
| switch (marker) { |
| case MetaMarker.CHUNK_HEADER: |
| case MetaMarker.TIME_CHUNK_HEADER: |
| case MetaMarker.VALUE_CHUNK_HEADER: |
| case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER: |
| case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER: |
| case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER: |
| fileOffsetOfChunk = this.position() - 1; |
| // if there is something wrong with a chunk, we will drop the whole ChunkGroup |
| // as different chunks may be created by the same insertions(sqls), and partial |
| // insertion is not tolerable |
| ChunkHeader chunkHeader = this.readChunkHeader(marker); |
| measurementID = chunkHeader.getMeasurementID(); |
| IMeasurementSchema measurementSchema = |
| new MeasurementSchema( |
| measurementID, |
| chunkHeader.getDataType(), |
| chunkHeader.getEncodingType(), |
| chunkHeader.getCompressionType()); |
| measurementSchemaList.add(measurementSchema); |
| dataType = chunkHeader.getDataType(); |
| if (chunkHeader.getDataType() == TSDataType.VECTOR) { |
| timeBatch.clear(); |
| } |
| Statistics<? extends Serializable> chunkStatistics = |
| Statistics.getStatsByType(dataType); |
| int dataSize = chunkHeader.getDataSize(); |
| |
| if (dataSize > 0) { |
| if (((byte) (chunkHeader.getChunkType() & 0x3F)) |
| == MetaMarker |
| .CHUNK_HEADER) { // more than one page, we could use page statistics to |
| // generate chunk statistic |
| while (dataSize > 0) { |
| // a new Page |
| PageHeader pageHeader = this.readPageHeader(chunkHeader.getDataType(), true); |
| if (pageHeader.getUncompressedSize() != 0) { |
| // not empty page |
| chunkStatistics.mergeStatistics(pageHeader.getStatistics()); |
| } |
| this.skipPageData(pageHeader); |
| dataSize -= pageHeader.getSerializedPageSize(); |
| chunkHeader.increasePageNums(1); |
| } |
| } else { // only one page without statistic, we need to iterate each point to generate |
| // chunk statistic |
| PageHeader pageHeader = this.readPageHeader(chunkHeader.getDataType(), false); |
| Decoder valueDecoder = |
| Decoder.getDecoderByType( |
| chunkHeader.getEncodingType(), chunkHeader.getDataType()); |
| ByteBuffer pageData = readPage(pageHeader, chunkHeader.getCompressionType()); |
| Decoder timeDecoder = |
| Decoder.getDecoderByType( |
| TSEncoding.valueOf( |
| TSFileDescriptor.getInstance().getConfig().getTimeEncoder()), |
| TSDataType.INT64); |
| |
| if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK) |
| == TsFileConstant.TIME_COLUMN_MASK) { // Time Chunk with only one page |
| |
| TimePageReader timePageReader = |
| new TimePageReader(pageHeader, pageData, timeDecoder); |
| long[] currentTimeBatch = timePageReader.getNextTimeBatch(); |
| timeBatch.add(currentTimeBatch); |
| for (long currentTime : currentTimeBatch) { |
| chunkStatistics.update(currentTime); |
| } |
| } else if ((chunkHeader.getChunkType() & TsFileConstant.VALUE_COLUMN_MASK) |
| == TsFileConstant.VALUE_COLUMN_MASK) { // Value Chunk with only one page |
| |
| ValuePageReader valuePageReader = |
| new ValuePageReader( |
| pageHeader, pageData, chunkHeader.getDataType(), valueDecoder); |
| TsPrimitiveType[] valueBatch = valuePageReader.nextValueBatch(timeBatch.get(0)); |
| |
| if (valueBatch != null && valueBatch.length != 0) { |
| for (int i = 0; i < valueBatch.length; i++) { |
| TsPrimitiveType value = valueBatch[i]; |
| if (value == null) { |
| continue; |
| } |
| long timeStamp = timeBatch.get(0)[i]; |
| switch (dataType) { |
| case INT32: |
| chunkStatistics.update(timeStamp, value.getInt()); |
| break; |
| case INT64: |
| chunkStatistics.update(timeStamp, value.getLong()); |
| break; |
| case FLOAT: |
| chunkStatistics.update(timeStamp, value.getFloat()); |
| break; |
| case DOUBLE: |
| chunkStatistics.update(timeStamp, value.getDouble()); |
| break; |
| case BOOLEAN: |
| chunkStatistics.update(timeStamp, value.getBoolean()); |
| break; |
| case TEXT: |
| chunkStatistics.update(timeStamp, value.getBinary()); |
| break; |
| default: |
| throw new IOException("Unexpected type " + dataType); |
| } |
| } |
| } |
| |
| } else { // NonAligned Chunk with only one page |
| PageReader reader = |
| new PageReader( |
| pageHeader, |
| pageData, |
| chunkHeader.getDataType(), |
| valueDecoder, |
| timeDecoder, |
| null); |
| BatchData batchData = reader.getAllSatisfiedPageData(); |
| while (batchData.hasCurrent()) { |
| switch (dataType) { |
| case INT32: |
| chunkStatistics.update(batchData.currentTime(), batchData.getInt()); |
| break; |
| case INT64: |
| chunkStatistics.update(batchData.currentTime(), batchData.getLong()); |
| break; |
| case FLOAT: |
| chunkStatistics.update(batchData.currentTime(), batchData.getFloat()); |
| break; |
| case DOUBLE: |
| chunkStatistics.update(batchData.currentTime(), batchData.getDouble()); |
| break; |
| case BOOLEAN: |
| chunkStatistics.update(batchData.currentTime(), batchData.getBoolean()); |
| break; |
| case TEXT: |
| chunkStatistics.update(batchData.currentTime(), batchData.getBinary()); |
| break; |
| default: |
| throw new IOException("Unexpected type " + dataType); |
| } |
| batchData.next(); |
| } |
| } |
| chunkHeader.increasePageNums(1); |
| } |
| } |
| currentChunk = |
| new ChunkMetadata(measurementID, dataType, fileOffsetOfChunk, chunkStatistics); |
| chunkMetadataList.add(currentChunk); |
| break; |
| case MetaMarker.CHUNK_GROUP_HEADER: |
| // if there is something wrong with the ChunkGroup Header, we will drop this ChunkGroup |
| // because we can not guarantee the correctness of the deviceId. |
| truncatedSize = this.position() - 1; |
| if (lastDeviceId != null) { |
| // schema of last chunk group |
| if (newSchema != null) { |
| for (IMeasurementSchema tsSchema : measurementSchemaList) { |
| newSchema.putIfAbsent( |
| new Path(lastDeviceId, tsSchema.getMeasurementId()), tsSchema); |
| } |
| } |
| measurementSchemaList = new ArrayList<>(); |
| // last chunk group Metadata |
| chunkGroupMetadataList.add(new ChunkGroupMetadata(lastDeviceId, chunkMetadataList)); |
| } |
| // this is a chunk group |
| chunkMetadataList = new ArrayList<>(); |
| ChunkGroupHeader chunkGroupHeader = this.readChunkGroupHeader(); |
| lastDeviceId = chunkGroupHeader.getDeviceID(); |
| break; |
| case MetaMarker.OPERATION_INDEX_RANGE: |
| truncatedSize = this.position() - 1; |
| if (lastDeviceId != null) { |
| // schema of last chunk group |
| if (newSchema != null) { |
| for (IMeasurementSchema tsSchema : measurementSchemaList) { |
| newSchema.putIfAbsent( |
| new Path(lastDeviceId, tsSchema.getMeasurementId()), tsSchema); |
| } |
| } |
| measurementSchemaList = new ArrayList<>(); |
| // last chunk group Metadata |
| chunkGroupMetadataList.add(new ChunkGroupMetadata(lastDeviceId, chunkMetadataList)); |
| lastDeviceId = null; |
| } |
| readPlanIndex(); |
| truncatedSize = this.position(); |
| break; |
| default: |
| // the disk file is corrupted, using this file may be dangerous |
| throw new IOException("Unexpected marker " + marker); |
| } |
| } |
| // now we read the tail of the data section, so we are sure that the last |
| // ChunkGroupFooter is complete. |
| if (lastDeviceId != null) { |
| // schema of last chunk group |
| if (newSchema != null) { |
| for (IMeasurementSchema tsSchema : measurementSchemaList) { |
| newSchema.putIfAbsent(new Path(lastDeviceId, tsSchema.getMeasurementId()), tsSchema); |
| } |
| } |
| // last chunk group Metadata |
| chunkGroupMetadataList.add(new ChunkGroupMetadata(lastDeviceId, chunkMetadataList)); |
| } |
| if (isComplete) { |
| truncatedSize = TsFileCheckStatus.COMPLETE_FILE; |
| } else { |
| truncatedSize = this.position() - 1; |
| } |
| } catch (Exception e) { |
| logger.warn( |
| "TsFile {} self-check cannot proceed at position {} " + "recovered, because : {}", |
| file, |
| this.position(), |
| e.getMessage()); |
| } |
| // Despite the completeness of the data section, we will discard current FileMetadata |
| // so that we can continue to write data into this tsfile. |
| return truncatedSize; |
| } |
| |
| /** |
| * Self Check the file and return whether the file is safe. |
| * |
| * @param filename the path of file |
| * @param fastFinish if true, the method will only check the format of head (Magic String TsFile, |
| * Version Number) and tail (Magic String TsFile) of TsFile. |
| * @return the status of TsFile |
| */ |
| public long selfCheckWithInfo( |
| String filename, |
| boolean fastFinish, |
| Map<Long, Pair<Path, TimeseriesMetadata>> timeseriesMetadataMap) |
| throws IOException, TsFileStatisticsMistakesException { |
| String message = " exists statistics mistakes at position "; |
| File checkFile = FSFactoryProducer.getFSFactory().getFile(filename); |
| if (!checkFile.exists()) { |
| return TsFileCheckStatus.FILE_NOT_FOUND; |
| } |
| long fileSize = checkFile.length(); |
| logger.info("file length: " + fileSize); |
| |
| int headerLength = TSFileConfig.MAGIC_STRING.getBytes().length + Byte.BYTES; |
| if (fileSize < headerLength) { |
| return TsFileCheckStatus.INCOMPATIBLE_FILE; |
| } |
| try { |
| if (!TSFileConfig.MAGIC_STRING.equals(readHeadMagic()) |
| || (TSFileConfig.VERSION_NUMBER != readVersionNumber())) { |
| return TsFileCheckStatus.INCOMPATIBLE_FILE; |
| } |
| tsFileInput.position(headerLength); |
| if (isComplete()) { |
| loadMetadataSize(); |
| if (fastFinish) { |
| return TsFileCheckStatus.COMPLETE_FILE; |
| } |
| } |
| } catch (IOException e) { |
| logger.error("Error occurred while fast checking TsFile."); |
| throw e; |
| } |
| for (Map.Entry<Long, Pair<Path, TimeseriesMetadata>> entry : timeseriesMetadataMap.entrySet()) { |
| TimeseriesMetadata timeseriesMetadata = entry.getValue().right; |
| TSDataType dataType = timeseriesMetadata.getTSDataType(); |
| Statistics<? extends Serializable> timeseriesMetadataSta = timeseriesMetadata.getStatistics(); |
| Statistics<? extends Serializable> chunkMetadatasSta = Statistics.getStatsByType(dataType); |
| for (IChunkMetadata chunkMetadata : getChunkMetadataList(entry.getValue().left)) { |
| long tscheckStatus = TsFileCheckStatus.COMPLETE_FILE; |
| try { |
| tscheckStatus = checkChunkAndPagesStatistics(chunkMetadata); |
| } catch (IOException e) { |
| logger.error("Error occurred while checking the statistics of chunk and its pages"); |
| throw e; |
| } |
| if (tscheckStatus == TsFileCheckStatus.FILE_EXISTS_MISTAKES) { |
| throw new TsFileStatisticsMistakesException( |
| "Chunk" + message + chunkMetadata.getOffsetOfChunkHeader()); |
| } |
| chunkMetadatasSta.mergeStatistics(chunkMetadata.getStatistics()); |
| } |
| if (!timeseriesMetadataSta.equals(chunkMetadatasSta)) { |
| long timeseriesMetadataPos = entry.getKey(); |
| throw new TsFileStatisticsMistakesException( |
| "TimeseriesMetadata" + message + timeseriesMetadataPos); |
| } |
| } |
| return TsFileCheckStatus.COMPLETE_FILE; |
| } |
| |
| public long checkChunkAndPagesStatistics(IChunkMetadata chunkMetadata) throws IOException { |
| long offsetOfChunkHeader = chunkMetadata.getOffsetOfChunkHeader(); |
| tsFileInput.position(offsetOfChunkHeader); |
| byte marker = this.readMarker(); |
| ChunkHeader chunkHeader = this.readChunkHeader(marker); |
| TSDataType dataType = chunkHeader.getDataType(); |
| Statistics<? extends Serializable> chunkStatistics = Statistics.getStatsByType(dataType); |
| int dataSize = chunkHeader.getDataSize(); |
| if (((byte) (chunkHeader.getChunkType() & 0x3F)) == MetaMarker.CHUNK_HEADER) { |
| while (dataSize > 0) { |
| // a new Page |
| PageHeader pageHeader = this.readPageHeader(chunkHeader.getDataType(), true); |
| chunkStatistics.mergeStatistics(pageHeader.getStatistics()); |
| this.skipPageData(pageHeader); |
| dataSize -= pageHeader.getSerializedPageSize(); |
| chunkHeader.increasePageNums(1); |
| } |
| } else { |
| // only one page without statistic, we need to iterate each point to generate |
| // statistic |
| PageHeader pageHeader = this.readPageHeader(chunkHeader.getDataType(), false); |
| Decoder valueDecoder = |
| Decoder.getDecoderByType(chunkHeader.getEncodingType(), chunkHeader.getDataType()); |
| ByteBuffer pageData = readPage(pageHeader, chunkHeader.getCompressionType()); |
| Decoder timeDecoder = |
| Decoder.getDecoderByType( |
| TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()), |
| TSDataType.INT64); |
| PageReader reader = |
| new PageReader( |
| pageHeader, pageData, chunkHeader.getDataType(), valueDecoder, timeDecoder, null); |
| BatchData batchData = reader.getAllSatisfiedPageData(); |
| while (batchData.hasCurrent()) { |
| switch (dataType) { |
| case INT32: |
| chunkStatistics.update(batchData.currentTime(), batchData.getInt()); |
| break; |
| case INT64: |
| chunkStatistics.update(batchData.currentTime(), batchData.getLong()); |
| break; |
| case FLOAT: |
| chunkStatistics.update(batchData.currentTime(), batchData.getFloat()); |
| break; |
| case DOUBLE: |
| chunkStatistics.update(batchData.currentTime(), batchData.getDouble()); |
| break; |
| case BOOLEAN: |
| chunkStatistics.update(batchData.currentTime(), batchData.getBoolean()); |
| break; |
| case TEXT: |
| chunkStatistics.update(batchData.currentTime(), batchData.getBinary()); |
| break; |
| default: |
| throw new IOException("Unexpected type " + dataType); |
| } |
| batchData.next(); |
| } |
| chunkHeader.increasePageNums(1); |
| } |
| if (chunkMetadata.getStatistics().equals(chunkStatistics)) { |
| return TsFileCheckStatus.COMPLETE_FILE; |
| } |
| return TsFileCheckStatus.FILE_EXISTS_MISTAKES; |
| } |
| |
| /** |
| * get ChunkMetaDatas of given path, and throw exception if path not exists |
| * |
| * @param path timeseries path |
| * @return List of ChunkMetaData |
| */ |
| public List<ChunkMetadata> getChunkMetadataList(Path path, boolean ignoreNotExists) |
| throws IOException { |
| TimeseriesMetadata timeseriesMetaData = readTimeseriesMetadata(path, ignoreNotExists); |
| if (timeseriesMetaData == null) { |
| return Collections.emptyList(); |
| } |
| List<ChunkMetadata> chunkMetadataList = readChunkMetaDataList(timeseriesMetaData); |
| chunkMetadataList.sort(Comparator.comparingLong(IChunkMetadata::getStartTime)); |
| return chunkMetadataList; |
| } |
| |
| // This method is only used for TsFile |
| public List<IChunkMetadata> getIChunkMetadataList(Path path) throws IOException { |
| ITimeSeriesMetadata timeseriesMetaData = readITimeseriesMetadata(path, true); |
| if (timeseriesMetaData == null) { |
| return Collections.emptyList(); |
| } |
| List<IChunkMetadata> chunkMetadataList = readIChunkMetaDataList(timeseriesMetaData); |
| chunkMetadataList.sort(Comparator.comparingLong(IChunkMetadata::getStartTime)); |
| return chunkMetadataList; |
| } |
| |
| public List<ChunkMetadata> getChunkMetadataList(Path path) throws IOException { |
| return getChunkMetadataList(path, false); |
| } |
| |
| /** |
| * Get AlignedChunkMetadata of sensors under one device |
| * |
| * @param device device name |
| */ |
| public List<AlignedChunkMetadata> getAlignedChunkMetadata(String device) throws IOException { |
| readFileMetadata(); |
| MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getMetadataIndex(); |
| Pair<MetadataIndexEntry, Long> metadataIndexPair = |
| getMetadataAndEndOffset(deviceMetadataIndexNode, device, true, true); |
| if (metadataIndexPair == null) { |
| throw new IOException("Device {" + device + "} is not in tsFileMetaData"); |
| } |
| ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); |
| MetadataIndexNode metadataIndexNode; |
| TimeseriesMetadata firstTimeseriesMetadata; |
| try { |
| // next layer MeasurementNode of the specific DeviceNode |
| metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer); |
| } catch (Exception e) { |
| logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file); |
| throw e; |
| } |
| firstTimeseriesMetadata = tryToGetFirstTimeseriesMetadata(metadataIndexNode); |
| if (firstTimeseriesMetadata == null) { |
| throw new IOException("Timeseries of device {" + device + "} are not aligned"); |
| } |
| |
| Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap = new TreeMap<>(); |
| List<MetadataIndexEntry> metadataIndexEntryList = metadataIndexNode.getChildren(); |
| |
| for (int i = 0; i < metadataIndexEntryList.size(); i++) { |
| MetadataIndexEntry metadataIndexEntry = metadataIndexEntryList.get(i); |
| long endOffset = metadataIndexNode.getEndOffset(); |
| if (i != metadataIndexEntryList.size() - 1) { |
| endOffset = metadataIndexEntryList.get(i + 1).getOffset(); |
| } |
| buffer = readData(metadataIndexEntry.getOffset(), endOffset); |
| if (metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { |
| List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>(); |
| while (buffer.hasRemaining()) { |
| timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer, true)); |
| } |
| timeseriesMetadataMap |
| .computeIfAbsent(device, k -> new ArrayList<>()) |
| .addAll(timeseriesMetadataList); |
| } else { |
| generateMetadataIndex( |
| metadataIndexEntry, |
| buffer, |
| device, |
| metadataIndexNode.getNodeType(), |
| timeseriesMetadataMap, |
| true); |
| } |
| } |
| |
| for (List<TimeseriesMetadata> timeseriesMetadataList : timeseriesMetadataMap.values()) { |
| TimeseriesMetadata timeseriesMetadata = timeseriesMetadataList.get(0); |
| List<TimeseriesMetadata> valueTimeseriesMetadataList = new ArrayList<>(); |
| |
| for (int i = 1; i < timeseriesMetadataList.size(); i++) { |
| valueTimeseriesMetadataList.add(timeseriesMetadataList.get(i)); |
| } |
| |
| AlignedTimeSeriesMetadata alignedTimeSeriesMetadata = |
| new AlignedTimeSeriesMetadata(timeseriesMetadata, valueTimeseriesMetadataList); |
| List<AlignedChunkMetadata> chunkMetadataList = new ArrayList<>(); |
| for (IChunkMetadata chunkMetadata : readIChunkMetaDataList(alignedTimeSeriesMetadata)) { |
| chunkMetadataList.add((AlignedChunkMetadata) chunkMetadata); |
| } |
| // only one timeseriesMetadataList in one device |
| return chunkMetadataList; |
| } |
| |
| throw new IOException( |
| String.format( |
| "Error when reading timeseriesMetadata of device %s in file %s", device, file)); |
| } |
| |
| /** |
| * get ChunkMetaDatas in given TimeseriesMetaData |
| * |
| * @return List of ChunkMetaData |
| */ |
| public List<ChunkMetadata> readChunkMetaDataList(TimeseriesMetadata timeseriesMetaData) |
| throws IOException { |
| return timeseriesMetaData.getChunkMetadataList().stream() |
| .map(chunkMetadata -> (ChunkMetadata) chunkMetadata) |
| .collect(Collectors.toList()); |
| } |
| |
| // This method is only used for TsFile |
| public List<IChunkMetadata> readIChunkMetaDataList(ITimeSeriesMetadata timeseriesMetaData) { |
| if (timeseriesMetaData instanceof AlignedTimeSeriesMetadata) { |
| return new ArrayList<>( |
| ((AlignedTimeSeriesMetadata) timeseriesMetaData).getChunkMetadataList()); |
| } else { |
| return new ArrayList<>(((TimeseriesMetadata) timeseriesMetaData).getChunkMetadataList()); |
| } |
| } |
| |
| /** |
| * get all measurements in this file |
| * |
| * @return measurement -> datatype |
| */ |
| public Map<String, TSDataType> getAllMeasurements() throws IOException { |
| Map<String, TSDataType> result = new HashMap<>(); |
| for (String device : getAllDevices()) { |
| Map<String, TimeseriesMetadata> timeseriesMetadataMap = readDeviceMetadata(device); |
| for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataMap.values()) { |
| result.put(timeseriesMetadata.getMeasurementId(), timeseriesMetadata.getTSDataType()); |
| } |
| } |
| return result; |
| } |
| |
| public Map<String, List<String>> getDeviceMeasurementsMap() throws IOException { |
| Map<String, List<String>> result = new HashMap<>(); |
| for (String device : getAllDevices()) { |
| Map<String, TimeseriesMetadata> timeseriesMetadataMap = readDeviceMetadata(device); |
| for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataMap.values()) { |
| result |
| .computeIfAbsent(device, d -> new ArrayList<>()) |
| .add(timeseriesMetadata.getMeasurementId()); |
| } |
| } |
| return result; |
| } |
| |
| /** |
| * get device names which has valid chunks in [start, end) |
| * |
| * @param start start of the partition |
| * @param end end of the partition |
| * @return device names in range |
| */ |
| public List<String> getDeviceNameInRange(long start, long end) throws IOException { |
| List<String> res = new ArrayList<>(); |
| for (String device : getAllDevices()) { |
| Map<String, List<ChunkMetadata>> seriesMetadataMap = readChunkMetadataInDevice(device); |
| if (hasDataInPartition(seriesMetadataMap, start, end)) { |
| res.add(device); |
| } |
| } |
| return res; |
| } |
| |
| /** |
| * get metadata index node |
| * |
| * @param startOffset start read offset |
| * @param endOffset end read offset |
| * @return MetadataIndexNode |
| */ |
| public MetadataIndexNode getMetadataIndexNode(long startOffset, long endOffset) |
| throws IOException { |
| return MetadataIndexNode.deserializeFrom(readData(startOffset, endOffset)); |
| } |
| |
| /** |
| * Check if the device has at least one Chunk in this partition |
| * |
| * @param seriesMetadataMap chunkMetaDataList of each measurement |
| * @param start the start position of the space partition |
| * @param end the end position of the space partition |
| */ |
| private boolean hasDataInPartition( |
| Map<String, List<ChunkMetadata>> seriesMetadataMap, long start, long end) { |
| for (List<ChunkMetadata> chunkMetadataList : seriesMetadataMap.values()) { |
| for (ChunkMetadata chunkMetadata : chunkMetadataList) { |
| LocateStatus location = |
| MetadataQuerierByFileImpl.checkLocateStatus(chunkMetadata, start, end); |
| if (location == LocateStatus.in) { |
| return true; |
| } |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * The location of a chunkGroupMetaData with respect to a space partition constraint. |
| * |
| * <p>in - the middle point of the chunkGroupMetaData is located in the current space partition. |
| * before - the middle point of the chunkGroupMetaData is located before the current space |
| * partition. after - the middle point of the chunkGroupMetaData is located after the current |
| * space partition. |
| */ |
| public enum LocateStatus { |
| in, |
| before, |
| after |
| } |
| |
| public long getMinPlanIndex() { |
| return minPlanIndex; |
| } |
| |
| public long getMaxPlanIndex() { |
| return maxPlanIndex; |
| } |
| |
| /** |
| * @return An iterator of linked hashmaps ( measurement -> chunk metadata list ). When traversing |
| * the linked hashmap, you will get chunk metadata lists according to the lexicographic order |
| * of the measurements. The first measurement of the linked hashmap of each iteration is |
| * always larger than the last measurement of the linked hashmap of the previous iteration in |
| * lexicographic order. |
| */ |
| public Iterator<Map<String, List<ChunkMetadata>>> getMeasurementChunkMetadataListMapIterator( |
| String device) throws IOException { |
| readFileMetadata(); |
| |
| MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex(); |
| Pair<MetadataIndexEntry, Long> metadataIndexPair = |
| getMetadataAndEndOffset(metadataIndexNode, device, true, true); |
| |
| if (metadataIndexPair == null) { |
| return new Iterator<Map<String, List<ChunkMetadata>>>() { |
| |
| @Override |
| public boolean hasNext() { |
| return false; |
| } |
| |
| @Override |
| public LinkedHashMap<String, List<ChunkMetadata>> next() { |
| throw new NoSuchElementException(); |
| } |
| }; |
| } |
| |
| Queue<Pair<Long, Long>> queue = new LinkedList<>(); |
| ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); |
| collectEachLeafMeasurementNodeOffsetRange(buffer, queue); |
| |
| return new Iterator<Map<String, List<ChunkMetadata>>>() { |
| |
| @Override |
| public boolean hasNext() { |
| return !queue.isEmpty(); |
| } |
| |
| @Override |
| public LinkedHashMap<String, List<ChunkMetadata>> next() { |
| if (!hasNext()) { |
| throw new NoSuchElementException(); |
| } |
| Pair<Long, Long> startEndPair = queue.remove(); |
| LinkedHashMap<String, List<ChunkMetadata>> measurementChunkMetadataList = |
| new LinkedHashMap<>(); |
| try { |
| List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>(); |
| ByteBuffer nextBuffer = readData(startEndPair.left, startEndPair.right); |
| while (nextBuffer.hasRemaining()) { |
| timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(nextBuffer, true)); |
| } |
| for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) { |
| List<ChunkMetadata> list = |
| measurementChunkMetadataList.computeIfAbsent( |
| timeseriesMetadata.getMeasurementId(), m -> new ArrayList<>()); |
| for (IChunkMetadata chunkMetadata : timeseriesMetadata.getChunkMetadataList()) { |
| list.add((ChunkMetadata) chunkMetadata); |
| } |
| } |
| return measurementChunkMetadataList; |
| } catch (IOException e) { |
| throw new TsFileRuntimeException( |
| "Error occurred while reading a time series metadata block."); |
| } |
| } |
| }; |
| } |
| |
| private void collectEachLeafMeasurementNodeOffsetRange( |
| ByteBuffer buffer, Queue<Pair<Long, Long>> queue) throws IOException { |
| try { |
| final MetadataIndexNode metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer); |
| final MetadataIndexNodeType metadataIndexNodeType = metadataIndexNode.getNodeType(); |
| final int metadataIndexListSize = metadataIndexNode.getChildren().size(); |
| for (int i = 0; i < metadataIndexListSize; ++i) { |
| long startOffset = metadataIndexNode.getChildren().get(i).getOffset(); |
| long endOffset = metadataIndexNode.getEndOffset(); |
| if (i != metadataIndexListSize - 1) { |
| endOffset = metadataIndexNode.getChildren().get(i + 1).getOffset(); |
| } |
| if (metadataIndexNodeType.equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { |
| queue.add(new Pair<>(startOffset, endOffset)); |
| continue; |
| } |
| collectEachLeafMeasurementNodeOffsetRange(readData(startOffset, endOffset), queue); |
| } |
| } catch (Exception e) { |
| logger.error( |
| "Error occurred while collecting offset ranges of measurement nodes of file {}", file); |
| throw e; |
| } |
| } |
| |
| @Override |
| public int hashCode() { |
| return file.hashCode(); |
| } |
| } |