| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iotdb.tsfile.read; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.locks.ReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import java.util.stream.Collectors; |
| import org.apache.iotdb.tsfile.common.conf.TSFileConfig; |
| import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; |
| import org.apache.iotdb.tsfile.compress.IUnCompressor; |
| import org.apache.iotdb.tsfile.file.MetaMarker; |
| import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter; |
| import org.apache.iotdb.tsfile.file.header.ChunkHeader; |
| import org.apache.iotdb.tsfile.file.header.PageHeader; |
| import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry; |
| import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode; |
| import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; |
| import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; |
| import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; |
| import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; |
| import org.apache.iotdb.tsfile.read.common.Chunk; |
| import org.apache.iotdb.tsfile.read.common.Path; |
| import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl; |
| import org.apache.iotdb.tsfile.read.reader.TsFileInput; |
| import org.apache.iotdb.tsfile.utils.BloomFilter; |
| import org.apache.iotdb.tsfile.utils.Pair; |
| import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; |
| import org.apache.iotdb.tsfile.utils.VersionUtils; |
| import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class TsFileSequenceReader implements AutoCloseable { |
| |
| private static final Logger logger = LoggerFactory.getLogger(TsFileSequenceReader.class); |
| private static final Logger resourceLogger = LoggerFactory.getLogger("FileMonitor"); |
| protected static final TSFileConfig config = TSFileDescriptor.getInstance().getConfig(); |
| protected String file; |
| protected TsFileInput tsFileInput; |
| private long fileMetadataPos; |
| private int fileMetadataSize; |
| private ByteBuffer markerBuffer = ByteBuffer.allocate(Byte.BYTES); |
| private int totalChunkNum; |
| private TsFileMetadata tsFileMetaData; |
| // device -> measurement -> TimeseriesMetadata |
| private Map<String, Map<String, TimeseriesMetadata>> cachedDeviceMetadata = new ConcurrentHashMap<>(); |
| private static final ReadWriteLock cacheLock = new ReentrantReadWriteLock(); |
| private boolean cacheDeviceMetadata; |
| |
| /** |
| * Create a file reader of the given file. The reader will read the tail of the file to get the |
| * file metadata size.Then the reader will skip the first TSFileConfig.MAGIC_STRING.getBytes().length |
| * + TSFileConfig.NUMBER_VERSION.getBytes().length bytes of the file for preparing reading real |
| * data. |
| * |
| * @param file the data file |
| * @throws IOException If some I/O error occurs |
| */ |
| public TsFileSequenceReader(String file) throws IOException { |
| this(file, true); |
| } |
| |
| /** |
| * construct function for TsFileSequenceReader. |
| * |
| * @param file -given file name |
| * @param loadMetadataSize -whether load meta data size |
| */ |
| public TsFileSequenceReader(String file, boolean loadMetadataSize) throws IOException { |
| if (resourceLogger.isDebugEnabled()) { |
| resourceLogger.debug("{} reader is opened. {}", file, getClass().getName()); |
| } |
| this.file = file; |
| tsFileInput = FSFactoryProducer.getFileInputFactory().getTsFileInput(file); |
| try { |
| if (loadMetadataSize) { |
| loadMetadataSize(); |
| } |
| } catch (Throwable e) { |
| tsFileInput.close(); |
| throw e; |
| } |
| } |
| |
| // used in merge resource |
| public TsFileSequenceReader(String file, boolean loadMetadata, boolean cacheDeviceMetadata) |
| throws IOException { |
| this(file, loadMetadata); |
| this.cacheDeviceMetadata = cacheDeviceMetadata; |
| } |
| |
| /** |
| * Create a file reader of the given file. The reader will read the tail of the file to get the |
| * file metadata size.Then the reader will skip the first TSFileConfig.MAGIC_STRING.getBytes().length |
| * + TSFileConfig.NUMBER_VERSION.getBytes().length bytes of the file for preparing reading real |
| * data. |
| * |
| * @param input given input |
| */ |
| public TsFileSequenceReader(TsFileInput input) throws IOException { |
| this(input, true); |
| } |
| |
| /** |
| * construct function for TsFileSequenceReader. |
| * |
| * @param input -given input |
| * @param loadMetadataSize -load meta data size |
| */ |
| public TsFileSequenceReader(TsFileInput input, boolean loadMetadataSize) throws IOException { |
| this.tsFileInput = input; |
| try { |
| if (loadMetadataSize) { // NOTE no autoRepair here |
| loadMetadataSize(); |
| } |
| } catch (Throwable e) { |
| tsFileInput.close(); |
| throw e; |
| } |
| } |
| |
| /** |
| * construct function for TsFileSequenceReader. |
| * |
| * @param input the input of a tsfile. The current position should be a markder and |
| * then a chunk Header, rather than the magic number |
| * @param fileMetadataPos the position of the file metadata in the TsFileInput from the beginning |
| * of the input to the current position |
| * @param fileMetadataSize the byte size of the file metadata in the input |
| */ |
| public TsFileSequenceReader(TsFileInput input, long fileMetadataPos, int fileMetadataSize) { |
| this.tsFileInput = input; |
| this.fileMetadataPos = fileMetadataPos; |
| this.fileMetadataSize = fileMetadataSize; |
| } |
| |
| public void loadMetadataSize() throws IOException { |
| ByteBuffer metadataSize = ByteBuffer.allocate(Integer.BYTES); |
| if (readTailMagic().equals(TSFileConfig.MAGIC_STRING)) { |
| tsFileInput.read(metadataSize, |
| tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES); |
| metadataSize.flip(); |
| // read file metadata size and position |
| fileMetadataSize = ReadWriteIOUtils.readInt(metadataSize); |
| fileMetadataPos = tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length |
| - Integer.BYTES - fileMetadataSize; |
| } |
| } |
| |
| public long getFileMetadataPos() { |
| return fileMetadataPos; |
| } |
| |
| public int getFileMetadataSize() { |
| return fileMetadataSize; |
| } |
| |
| /** |
| * this function does not modify the position of the file reader. |
| */ |
| public String readTailMagic() throws IOException { |
| long totalSize = tsFileInput.size(); |
| ByteBuffer magicStringBytes = ByteBuffer |
| .allocate(TSFileConfig.MAGIC_STRING.getBytes().length); |
| tsFileInput.read(magicStringBytes, totalSize - TSFileConfig.MAGIC_STRING.getBytes().length); |
| magicStringBytes.flip(); |
| return new String(magicStringBytes.array()); |
| } |
| |
| /** |
| * whether the file is a complete TsFile: only if the head magic and tail magic string exists. |
| */ |
| public boolean isComplete() throws IOException { |
| return tsFileInput.size() >= TSFileConfig.MAGIC_STRING.getBytes().length * 2 |
| + TSFileConfig.VERSION_NUMBER.getBytes().length |
| && (readTailMagic().equals(readHeadMagic()) || readTailMagic() |
| .equals(TSFileConfig.VERSION_NUMBER_V1)); |
| } |
| |
| /** |
| * this function does not modify the position of the file reader. |
| */ |
| public String readHeadMagic() throws IOException { |
| ByteBuffer magicStringBytes = ByteBuffer |
| .allocate(TSFileConfig.MAGIC_STRING.getBytes().length); |
| tsFileInput.read(magicStringBytes, 0); |
| magicStringBytes.flip(); |
| return new String(magicStringBytes.array()); |
| } |
| |
| /** |
| * this function reads version number and checks compatibility of TsFile. |
| */ |
| public String readVersionNumber() throws IOException { |
| ByteBuffer versionNumberBytes = ByteBuffer |
| .allocate(TSFileConfig.VERSION_NUMBER.getBytes().length); |
| tsFileInput.read(versionNumberBytes, TSFileConfig.MAGIC_STRING.getBytes().length); |
| versionNumberBytes.flip(); |
| return new String(versionNumberBytes.array()); |
| } |
| |
| /** |
| * this function does not modify the position of the file reader. |
| * |
| * @throws IOException io error |
| */ |
| public TsFileMetadata readFileMetadata() throws IOException { |
| if (tsFileMetaData == null) { |
| tsFileMetaData = TsFileMetadata.deserializeFrom(readData(fileMetadataPos, fileMetadataSize)); |
| } |
| return tsFileMetaData; |
| } |
| |
| /** |
| * this function does not modify the position of the file reader. |
| * |
| * @throws IOException io error |
| */ |
| public BloomFilter readBloomFilter() throws IOException { |
| readFileMetadata(); |
| return tsFileMetaData.getBloomFilter(); |
| } |
| |
| /** |
| * this function reads measurements and TimeseriesMetaDatas in given device Thread Safe |
| * |
| * @param device name |
| * @return the map measurementId -> TimeseriesMetaData in one device |
| * @throws IOException io error |
| */ |
| public Map<String, TimeseriesMetadata> readDeviceMetadata(String device) throws IOException { |
| if (!cacheDeviceMetadata) { |
| return readDeviceMetadataFromDisk(device); |
| } |
| |
| cacheLock.readLock().lock(); |
| try { |
| if (cachedDeviceMetadata.containsKey(device)) { |
| return cachedDeviceMetadata.get(device); |
| } |
| } finally { |
| cacheLock.readLock().unlock(); |
| } |
| |
| cacheLock.writeLock().lock(); |
| try { |
| if (cachedDeviceMetadata.containsKey(device)) { |
| return cachedDeviceMetadata.get(device); |
| } |
| readFileMetadata(); |
| Map<String, TimeseriesMetadata> deviceMetadata = readDeviceMetadataFromDisk(device); |
| cachedDeviceMetadata.put(device, deviceMetadata); |
| return deviceMetadata; |
| } finally { |
| cacheLock.writeLock().unlock(); |
| } |
| } |
| |
| private Map<String, TimeseriesMetadata> readDeviceMetadataFromDisk(String device) |
| throws IOException { |
| readFileMetadata(); |
| List<TimeseriesMetadata> timeseriesMetadataList = getDeviceTimeseriesMetadata(device); |
| Map<String, TimeseriesMetadata> deviceMetadata = new HashMap<>(); |
| for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) { |
| deviceMetadata.put(timeseriesMetadata.getMeasurementId(), timeseriesMetadata); |
| } |
| return deviceMetadata; |
| } |
| |
| public TimeseriesMetadata readTimeseriesMetadata(Path path) throws IOException { |
| readFileMetadata(); |
| MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getMetadataIndex(); |
| Pair<MetadataIndexEntry, Long> metadataIndexPair = getMetadataAndEndOffset( |
| deviceMetadataIndexNode, path.getDevice(), MetadataIndexNodeType.INTERNAL_DEVICE); |
| ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); |
| MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode; |
| if (!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { |
| metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer); |
| metadataIndexPair = getMetadataAndEndOffset(metadataIndexNode, |
| path.getMeasurement(), MetadataIndexNodeType.INTERNAL_MEASUREMENT); |
| } |
| List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>(); |
| buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); |
| while (buffer.hasRemaining()) { |
| timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer)); |
| } |
| // return null if path does not exist in the TsFile |
| int searchResult = binarySearchInTimeseriesMetadataList(timeseriesMetadataList, |
| path.getMeasurement()); |
| return searchResult >= 0 ? timeseriesMetadataList.get(searchResult) : null; |
| } |
| |
| public List<TimeseriesMetadata> readTimeseriesMetadata(String device, Set<String> measurements) |
| throws IOException { |
| readFileMetadata(); |
| MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getMetadataIndex(); |
| Pair<MetadataIndexEntry, Long> metadataIndexPair = getMetadataAndEndOffset( |
| deviceMetadataIndexNode, device, MetadataIndexNodeType.INTERNAL_DEVICE); |
| List<TimeseriesMetadata> resultTimeseriesMetadataList = new ArrayList<>(); |
| int maxDegreeOfIndexNode = config.getMaxDegreeOfIndexNode(); |
| if (measurements.size() > maxDegreeOfIndexNode / Math.log(maxDegreeOfIndexNode)) { |
| traverseAndReadTimeseriesMetadataInOneDevice(resultTimeseriesMetadataList, |
| MetadataIndexNodeType.INTERNAL_MEASUREMENT, metadataIndexPair, measurements); |
| return resultTimeseriesMetadataList; |
| } |
| for (String measurement : measurements) { |
| ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); |
| Pair<MetadataIndexEntry, Long> measurementMetadataIndexPair = metadataIndexPair; |
| List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>(); |
| MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode; |
| if (!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { |
| metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer); |
| measurementMetadataIndexPair = getMetadataAndEndOffset(metadataIndexNode, |
| measurement, MetadataIndexNodeType.INTERNAL_MEASUREMENT); |
| } |
| buffer = readData(measurementMetadataIndexPair.left.getOffset(), |
| measurementMetadataIndexPair.right); |
| while (buffer.hasRemaining()) { |
| timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer)); |
| } |
| int searchResult = binarySearchInTimeseriesMetadataList(timeseriesMetadataList, |
| measurement); |
| if (searchResult >= 0) { |
| resultTimeseriesMetadataList.add(timeseriesMetadataList.get(searchResult)); |
| } |
| } |
| return resultTimeseriesMetadataList; |
| } |
| |
| private void traverseAndReadTimeseriesMetadataInOneDevice( |
| List<TimeseriesMetadata> timeseriesMetadataList, MetadataIndexNodeType type, |
| Pair<MetadataIndexEntry, Long> metadataIndexPair, Set<String> measurements) |
| throws IOException { |
| ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); |
| switch (type) { |
| case LEAF_DEVICE: |
| case INTERNAL_MEASUREMENT: |
| MetadataIndexNode metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer); |
| int metadataIndexListSize = metadataIndexNode.getChildren().size(); |
| for (int i = 0; i < metadataIndexListSize; i++) { |
| long endOffset = metadataIndexNode.getEndOffset(); |
| if (i != metadataIndexListSize - 1) { |
| endOffset = metadataIndexNode.getChildren().get(i + 1).getOffset(); |
| } |
| traverseAndReadTimeseriesMetadataInOneDevice(timeseriesMetadataList, |
| metadataIndexNode.getNodeType(), |
| new Pair<>(metadataIndexNode.getChildren().get(i), endOffset), measurements); |
| } |
| break; |
| case LEAF_MEASUREMENT: |
| while (buffer.hasRemaining()) { |
| TimeseriesMetadata timeseriesMetadata = TimeseriesMetadata.deserializeFrom(buffer); |
| if (measurements.contains(timeseriesMetadata.getMeasurementId())) { |
| timeseriesMetadataList.add(timeseriesMetadata); |
| } |
| } |
| break; |
| default: |
| throw new IOException("Failed to traverse and read TimeseriesMetadata in device: " + |
| metadataIndexPair.left.getName() + ". Wrong MetadataIndexEntry type."); |
| } |
| } |
| |
| private int binarySearchInTimeseriesMetadataList(List<TimeseriesMetadata> timeseriesMetadataList, |
| String key) { |
| int low = 0; |
| int high = timeseriesMetadataList.size() - 1; |
| |
| while (low <= high) { |
| int mid = (low + high) >>> 1; |
| TimeseriesMetadata midVal = timeseriesMetadataList.get(mid); |
| int cmp = midVal.getMeasurementId().compareTo(key); |
| |
| if (cmp < 0) { |
| low = mid + 1; |
| } else if (cmp > 0) { |
| high = mid - 1; |
| } else { |
| return mid; // key found |
| } |
| } |
| return -1; // key not found |
| } |
| |
| public List<String> getAllDevices() throws IOException { |
| if (tsFileMetaData == null) { |
| readFileMetadata(); |
| } |
| return getAllDevices(tsFileMetaData.getMetadataIndex()); |
| } |
| |
| private List<String> getAllDevices(MetadataIndexNode metadataIndexNode) throws IOException { |
| List<String> deviceList = new ArrayList<>(); |
| int metadataIndexListSize = metadataIndexNode.getChildren().size(); |
| if (metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.INTERNAL_MEASUREMENT)) { |
| for (MetadataIndexEntry index : metadataIndexNode.getChildren()) { |
| deviceList.add(index.getName()); |
| } |
| } else { |
| for (int i = 0; i < metadataIndexListSize; i++) { |
| long endOffset = metadataIndexNode.getEndOffset(); |
| if (i != metadataIndexListSize - 1) { |
| endOffset = metadataIndexNode.getChildren().get(i + 1).getOffset(); |
| } |
| ByteBuffer buffer = readData(metadataIndexNode.getChildren().get(i).getOffset(), endOffset); |
| MetadataIndexNode node = MetadataIndexNode.deserializeFrom(buffer); |
| if (node.getNodeType().equals(MetadataIndexNodeType.LEAF_DEVICE)) { |
| // if node in next level is LEAF_DEVICE, put all devices in node entry into the set |
| deviceList.addAll(node.getChildren().stream().map(MetadataIndexEntry::getName).collect( |
| Collectors.toList())); |
| } else { |
| // keep traversing |
| deviceList.addAll(getAllDevices(node)); |
| } |
| } |
| } |
| return deviceList; |
| } |
| |
| /** |
| * read all ChunkMetaDatas of given device |
| * |
| * @param device name |
| * @return measurement -> ChunkMetadata list |
| * @throws IOException io error |
| */ |
| public Map<String, List<ChunkMetadata>> readChunkMetadataInDevice(String device) |
| throws IOException { |
| if (tsFileMetaData == null) { |
| readFileMetadata(); |
| } |
| |
| long start = 0; |
| int size = 0; |
| List<TimeseriesMetadata> timeseriesMetadataMap = getDeviceTimeseriesMetadata(device); |
| for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataMap) { |
| if (start == 0) { |
| start = timeseriesMetadata.getOffsetOfChunkMetaDataList(); |
| } |
| size += timeseriesMetadata.getDataSizeOfChunkMetaDataList(); |
| } |
| // read buffer of all ChunkMetadatas of this device |
| ByteBuffer buffer = readData(start, size); |
| Map<String, List<ChunkMetadata>> seriesMetadata = new HashMap<>(); |
| while (buffer.hasRemaining()) { |
| ChunkMetadata chunkMetadata = ChunkMetadata.deserializeFrom(buffer); |
| seriesMetadata.computeIfAbsent(chunkMetadata.getMeasurementUid(), key -> new ArrayList<>()) |
| .add(chunkMetadata); |
| } |
| |
| // set version in ChunkMetadata |
| List<Pair<Long, Long>> versionInfo = tsFileMetaData.getVersionInfo(); |
| for (Entry<String, List<ChunkMetadata>> entry : seriesMetadata.entrySet()) { |
| VersionUtils.applyVersion(entry.getValue(), versionInfo); |
| } |
| return seriesMetadata; |
| } |
| |
| /** |
| * this function return all timeseries names in this file |
| * |
| * @return list of Paths |
| * @throws IOException io error |
| */ |
| public List<Path> getAllPaths() throws IOException { |
| List<Path> paths = new ArrayList<>(); |
| for (String device : getAllDevices()) { |
| Map<String, TimeseriesMetadata> timeseriesMetadataMap = readDeviceMetadata(device); |
| for (String measurementId : timeseriesMetadataMap.keySet()) { |
| paths.add(new Path(device, measurementId)); |
| } |
| } |
| return paths; |
| } |
| |
| /** |
| * Traverse the metadata index from MetadataIndexEntry to get TimeseriesMetadatas |
| * |
| * @param metadataIndex MetadataIndexEntry |
| * @param buffer byte buffer |
| * @param deviceId String |
| * @param timeseriesMetadataMap map: deviceId -> timeseriesMetadata list |
| */ |
| private void generateMetadataIndex(MetadataIndexEntry metadataIndex, ByteBuffer buffer, |
| String deviceId, MetadataIndexNodeType type, |
| Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap) throws IOException { |
| switch (type) { |
| case INTERNAL_DEVICE: |
| case LEAF_DEVICE: |
| case INTERNAL_MEASUREMENT: |
| deviceId = metadataIndex.getName(); |
| MetadataIndexNode metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer); |
| int metadataIndexListSize = metadataIndexNode.getChildren().size(); |
| for (int i = 0; i < metadataIndexListSize; i++) { |
| long endOffset = metadataIndexNode.getEndOffset(); |
| if (i != metadataIndexListSize - 1) { |
| endOffset = metadataIndexNode.getChildren().get(i + 1).getOffset(); |
| } |
| ByteBuffer nextBuffer = readData(metadataIndexNode.getChildren().get(i).getOffset(), |
| endOffset); |
| generateMetadataIndex(metadataIndexNode.getChildren().get(i), nextBuffer, deviceId, |
| metadataIndexNode.getNodeType(), timeseriesMetadataMap); |
| } |
| break; |
| case LEAF_MEASUREMENT: |
| List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>(); |
| while (buffer.hasRemaining()) { |
| timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer)); |
| } |
| timeseriesMetadataMap.computeIfAbsent(deviceId, k -> new ArrayList<>()) |
| .addAll(timeseriesMetadataList); |
| break; |
| } |
| } |
| |
| public Map<String, List<TimeseriesMetadata>> getAllTimeseriesMetadata() throws IOException { |
| if (tsFileMetaData == null) { |
| readFileMetadata(); |
| } |
| Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap = new HashMap<>(); |
| MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex(); |
| List<MetadataIndexEntry> metadataIndexEntryList = metadataIndexNode.getChildren(); |
| for (int i = 0; i < metadataIndexEntryList.size(); i++) { |
| MetadataIndexEntry metadataIndexEntry = metadataIndexEntryList.get(i); |
| long endOffset = tsFileMetaData.getMetadataIndex().getEndOffset(); |
| if (i != metadataIndexEntryList.size() - 1) { |
| endOffset = metadataIndexEntryList.get(i + 1).getOffset(); |
| } |
| ByteBuffer buffer = readData(metadataIndexEntry.getOffset(), endOffset); |
| generateMetadataIndex(metadataIndexEntry, buffer, null, |
| metadataIndexNode.getNodeType(), timeseriesMetadataMap); |
| } |
| return timeseriesMetadataMap; |
| } |
| |
| private List<TimeseriesMetadata> getDeviceTimeseriesMetadata(String device) throws IOException { |
| MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex(); |
| Pair<MetadataIndexEntry, Long> metadataIndexPair = getMetadataAndEndOffset( |
| metadataIndexNode, device, MetadataIndexNodeType.INTERNAL_DEVICE); |
| ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); |
| Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap = new TreeMap<>(); |
| generateMetadataIndex(metadataIndexPair.left, buffer, device, |
| MetadataIndexNodeType.INTERNAL_MEASUREMENT, timeseriesMetadataMap); |
| List<TimeseriesMetadata> deviceTimeseriesMetadata = new ArrayList<>(); |
| for (List<TimeseriesMetadata> timeseriesMetadataList : timeseriesMetadataMap.values()) { |
| deviceTimeseriesMetadata.addAll(timeseriesMetadataList); |
| } |
| return deviceTimeseriesMetadata; |
| } |
| |
| /** |
| * Get target MetadataIndexEntry and its end offset |
| * |
| * @param metadataIndex given MetadataIndexNode |
| * @param name target device / measurement name |
| * @param type target MetadataIndexNodeType, either INTERNAL_DEVICE or |
| * INTERNAL_MEASUREMENT. When searching for a device node, return when it is |
| * not INTERNAL_DEVICE. Likewise, when searching for a measurement node, |
| * return when it is not INTERNAL_MEASUREMENT. This works for the situation |
| * when the index tree does NOT have the device level and ONLY has the |
| * measurement level. |
| * @return target MetadataIndexEntry, endOffset pair |
| */ |
| private Pair<MetadataIndexEntry, Long> getMetadataAndEndOffset(MetadataIndexNode metadataIndex, |
| String name, MetadataIndexNodeType type) throws IOException { |
| Pair<MetadataIndexEntry, Long> childIndexEntry = metadataIndex.getChildIndexEntry(name); |
| if (!metadataIndex.getNodeType().equals(type)) { |
| return childIndexEntry; |
| } |
| ByteBuffer buffer = readData(childIndexEntry.left.getOffset(), childIndexEntry.right); |
| return getMetadataAndEndOffset(MetadataIndexNode.deserializeFrom(buffer), name, type); |
| } |
| |
| /** |
| * read data from current position of the input, and deserialize it to a CHUNK_GROUP_FOOTER. <br> |
| * This method is not threadsafe. |
| * |
| * @return a CHUNK_GROUP_FOOTER |
| * @throws IOException io error |
| */ |
| public ChunkGroupFooter readChunkGroupFooter() throws IOException { |
| return ChunkGroupFooter.deserializeFrom(tsFileInput.wrapAsInputStream(), true); |
| } |
| |
| /** |
| * read data from current position of the input, and deserialize it to a CHUNK_GROUP_FOOTER. |
| * |
| * @param position the offset of the chunk group footer in the file |
| * @param markerRead true if the offset does not contains the marker , otherwise false |
| * @return a CHUNK_GROUP_FOOTER |
| * @throws IOException io error |
| */ |
| public ChunkGroupFooter readChunkGroupFooter(long position, boolean markerRead) |
| throws IOException { |
| return ChunkGroupFooter.deserializeFrom(tsFileInput, position, markerRead); |
| } |
| |
| public long readVersion() throws IOException { |
| ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); |
| if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) == 0) { |
| throw new IOException("reach the end of the file."); |
| } |
| buffer.flip(); |
| return buffer.getLong(); |
| } |
| |
| /** |
| * read data from current position of the input, and deserialize it to a CHUNK_HEADER. <br> This |
| * method is not threadsafe. |
| * |
| * @return a CHUNK_HEADER |
| * @throws IOException io error |
| */ |
| public ChunkHeader readChunkHeader() throws IOException { |
| return ChunkHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), true); |
| } |
| |
| /** |
| * read the chunk's header. |
| * |
| * @param position the file offset of this chunk's header |
| * @param chunkHeaderSize the size of chunk's header |
| * @param markerRead true if the offset does not contains the marker , otherwise false |
| */ |
| private ChunkHeader readChunkHeader(long position, int chunkHeaderSize, boolean markerRead) |
| throws IOException { |
| return ChunkHeader.deserializeFrom(tsFileInput, position, chunkHeaderSize, markerRead); |
| } |
| |
| /** |
| * notice, this function will modify channel's position. |
| * |
| * @param dataSize the size of chunkdata |
| * @param position the offset of the chunk data |
| * @return the pages of this chunk |
| */ |
| private ByteBuffer readChunk(long position, int dataSize) throws IOException { |
| return readData(position, dataSize); |
| } |
| |
| /** |
| * read memory chunk. |
| * |
| * @param metaData -given chunk meta data |
| * @return -chunk |
| */ |
| public Chunk readMemChunk(ChunkMetadata metaData) throws IOException { |
| int chunkHeadSize = ChunkHeader.getSerializedSize(metaData.getMeasurementUid()); |
| ChunkHeader header = readChunkHeader(metaData.getOffsetOfChunkHeader(), chunkHeadSize, false); |
| ByteBuffer buffer = readChunk(metaData.getOffsetOfChunkHeader() + header.getSerializedSize(), |
| header.getDataSize()); |
| return new Chunk(header, buffer, metaData.getDeleteIntervalList()); |
| } |
| |
| /** |
| * read all Chunks of given device. |
| * <p> |
| * note that this method loads all the chunks into memory, so it needs to be invoked carefully. |
| * |
| * @param device name |
| * @return measurement -> chunks list |
| */ |
| public Map<String, List<Chunk>> readChunksInDevice(String device) throws IOException { |
| List<ChunkMetadata> chunkMetadataList = new ArrayList<>(); |
| Map<String, List<ChunkMetadata>> chunkMetadataInDevice = readChunkMetadataInDevice(device); |
| for (List<ChunkMetadata> chunkMetadataListInDevice : chunkMetadataInDevice.values()) { |
| chunkMetadataList.addAll(chunkMetadataListInDevice); |
| } |
| |
| Map<String, List<Chunk>> chunksInDevice = new HashMap<>(); |
| chunkMetadataList.sort(Comparator.comparing(ChunkMetadata::getOffsetOfChunkHeader)); |
| for (ChunkMetadata chunkMetadata : chunkMetadataList) { |
| Chunk chunk = readMemChunk(chunkMetadata); |
| String measurement = chunk.getHeader().getMeasurementID(); |
| if (!chunksInDevice.containsKey(measurement)) { |
| chunksInDevice.put(measurement, new ArrayList<>()); |
| } |
| chunksInDevice.get(measurement).add(chunk); |
| } |
| return chunksInDevice; |
| } |
| |
| /** |
| * not thread safe. |
| * |
| * @param type given tsfile data type |
| */ |
| public PageHeader readPageHeader(TSDataType type) throws IOException { |
| return PageHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), type); |
| } |
| |
| public long position() throws IOException { |
| return tsFileInput.position(); |
| } |
| |
| public void position(long offset) throws IOException { |
| tsFileInput.position(offset); |
| } |
| |
| public void skipPageData(PageHeader header) throws IOException { |
| tsFileInput.position(tsFileInput.position() + header.getCompressedSize()); |
| } |
| |
| public ByteBuffer readPage(PageHeader header, CompressionType type) throws IOException { |
| return readPage(header, type, -1); |
| } |
| |
| private ByteBuffer readPage(PageHeader header, CompressionType type, long position) |
| throws IOException { |
| ByteBuffer buffer = readData(position, header.getCompressedSize()); |
| IUnCompressor unCompressor = IUnCompressor.getUnCompressor(type); |
| ByteBuffer uncompressedBuffer = ByteBuffer.allocate(header.getUncompressedSize()); |
| if (type == CompressionType.UNCOMPRESSED) { |
| return buffer; |
| }// FIXME if the buffer is not array-implemented. |
| unCompressor.uncompress(buffer.array(), buffer.position(), buffer.remaining(), |
| uncompressedBuffer.array(), |
| 0); |
| return uncompressedBuffer; |
| } |
| |
| /** |
| * read one byte from the input. <br> this method is not thread safe |
| */ |
| public byte readMarker() throws IOException { |
| markerBuffer.clear(); |
| if (ReadWriteIOUtils.readAsPossible(tsFileInput, markerBuffer) == 0) { |
| throw new IOException("reach the end of the file."); |
| } |
| markerBuffer.flip(); |
| return markerBuffer.get(); |
| } |
| |
| public void close() throws IOException { |
| if (resourceLogger.isDebugEnabled()) { |
| resourceLogger.debug("{} reader is closed.", file); |
| } |
| this.tsFileInput.close(); |
| } |
| |
| public String getFileName() { |
| return this.file; |
| } |
| |
| public long fileSize() throws IOException { |
| return tsFileInput.size(); |
| } |
| |
| /** |
| * read data from tsFileInput, from the current position (if position = -1), or the given |
| * position. <br> if position = -1, the tsFileInput's position will be changed to the current |
| * position + real data size that been read. Other wise, the tsFileInput's position is not |
| * changed. |
| * |
| * @param position the start position of data in the tsFileInput, or the current position if |
| * position = -1 |
| * @param size the size of data that want to read |
| * @return data that been read. |
| */ |
| private ByteBuffer readData(long position, int size) throws IOException { |
| ByteBuffer buffer = ByteBuffer.allocate(size); |
| if (position < 0) { |
| if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) != size) { |
| throw new IOException("reach the end of the data"); |
| } |
| } else { |
| long actualReadSize = ReadWriteIOUtils.readAsPossible(tsFileInput, buffer, position, size); |
| if (actualReadSize != size) { |
| throw new IOException( |
| String.format("reach the end of the data. Size of data that want to read: %s," |
| + "actual read size: %s, posiotion: %s", size, actualReadSize, position)); |
| } |
| } |
| buffer.flip(); |
| return buffer; |
| } |
| |
| /** |
| * read data from tsFileInput, from the current position (if position = -1), or the given |
| * position. |
| * |
| * @param start the start position of data in the tsFileInput, or the current position if position |
| * = -1 |
| * @param end the end position of data that want to read |
| * @return data that been read. |
| */ |
| private ByteBuffer readData(long start, long end) throws IOException { |
| return readData(start, (int) (end - start)); |
| } |
| |
| /** |
| * notice, the target bytebuffer are not flipped. |
| */ |
| public int readRaw(long position, int length, ByteBuffer target) throws IOException { |
| return ReadWriteIOUtils.readAsPossible(tsFileInput, target, position, length); |
| } |
| |
| /** |
| * Self Check the file and return the position before where the data is safe. |
| * |
| * @param newSchema the schema on each time series in the file |
| * @param chunkGroupMetadataList ChunkGroupMetadata List |
| * @param versionInfo version pair List |
| * @param fastFinish if true and the file is complete, then newSchema and |
| * chunkGroupMetadataList parameter will be not modified. |
| * @return the position of the file that is fine. All data after the position in the file should |
| * be truncated. |
| */ |
| @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning |
| public long selfCheck(Map<Path, MeasurementSchema> newSchema, |
| List<ChunkGroupMetadata> chunkGroupMetadataList, |
| List<Pair<Long, Long>> versionInfo, |
| boolean fastFinish) throws IOException { |
| File checkFile = FSFactoryProducer.getFSFactory().getFile(this.file); |
| long fileSize; |
| if (!checkFile.exists()) { |
| return TsFileCheckStatus.FILE_NOT_FOUND; |
| } else { |
| fileSize = checkFile.length(); |
| } |
| ChunkMetadata currentChunk; |
| String measurementID; |
| TSDataType dataType; |
| long fileOffsetOfChunk; |
| |
| // ChunkMetadata of current ChunkGroup |
| List<ChunkMetadata> chunkMetadataList = null; |
| String deviceID; |
| |
| int headerLength = TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.VERSION_NUMBER |
| .getBytes().length; |
| if (fileSize < headerLength) { |
| return TsFileCheckStatus.INCOMPATIBLE_FILE; |
| } |
| if (!TSFileConfig.MAGIC_STRING.equals(readHeadMagic()) || !TSFileConfig.VERSION_NUMBER |
| .equals(readVersionNumber())) { |
| return TsFileCheckStatus.INCOMPATIBLE_FILE; |
| } |
| |
| tsFileInput.position(headerLength); |
| if (fileSize == headerLength) { |
| return headerLength; |
| } else if (isComplete()) { |
| loadMetadataSize(); |
| if (fastFinish) { |
| return TsFileCheckStatus.COMPLETE_FILE; |
| } |
| } |
| boolean newChunkGroup = true; |
| // not a complete file, we will recover it... |
| long truncatedSize = headerLength; |
| byte marker; |
| int chunkCnt = 0; |
| List<MeasurementSchema> measurementSchemaList = new ArrayList<>(); |
| try { |
| while ((marker = this.readMarker()) != MetaMarker.SEPARATOR) { |
| switch (marker) { |
| case MetaMarker.CHUNK_HEADER: |
| // this is the first chunk of a new ChunkGroup. |
| if (newChunkGroup) { |
| newChunkGroup = false; |
| chunkMetadataList = new ArrayList<>(); |
| } |
| fileOffsetOfChunk = this.position() - 1; |
| // if there is something wrong with a chunk, we will drop the whole ChunkGroup |
| // as different chunks may be created by the same insertions(sqls), and partial |
| // insertion is not tolerable |
| ChunkHeader chunkHeader = this.readChunkHeader(); |
| measurementID = chunkHeader.getMeasurementID(); |
| MeasurementSchema measurementSchema = new MeasurementSchema(measurementID, |
| chunkHeader.getDataType(), |
| chunkHeader.getEncodingType(), chunkHeader.getCompressionType()); |
| measurementSchemaList.add(measurementSchema); |
| dataType = chunkHeader.getDataType(); |
| Statistics<?> chunkStatistics = Statistics.getStatsByType(dataType); |
| for (int j = 0; j < chunkHeader.getNumOfPages(); j++) { |
| // a new Page |
| PageHeader pageHeader = this.readPageHeader(chunkHeader.getDataType()); |
| chunkStatistics.mergeStatistics(pageHeader.getStatistics()); |
| this.skipPageData(pageHeader); |
| } |
| currentChunk = new ChunkMetadata(measurementID, dataType, fileOffsetOfChunk, |
| chunkStatistics); |
| chunkMetadataList.add(currentChunk); |
| chunkCnt++; |
| break; |
| case MetaMarker.CHUNK_GROUP_FOOTER: |
| // this is a chunk group |
| // if there is something wrong with the ChunkGroup Footer, we will drop this ChunkGroup |
| // because we can not guarantee the correctness of the deviceId. |
| ChunkGroupFooter chunkGroupFooter = this.readChunkGroupFooter(); |
| deviceID = chunkGroupFooter.getDeviceID(); |
| if (newSchema != null) { |
| for (MeasurementSchema tsSchema : measurementSchemaList) { |
| newSchema.putIfAbsent(new Path(deviceID, tsSchema.getMeasurementId()), tsSchema); |
| } |
| } |
| chunkGroupMetadataList.add(new ChunkGroupMetadata(deviceID, chunkMetadataList)); |
| newChunkGroup = true; |
| truncatedSize = this.position(); |
| |
| totalChunkNum += chunkCnt; |
| chunkCnt = 0; |
| measurementSchemaList = new ArrayList<>(); |
| break; |
| case MetaMarker.VERSION: |
| long version = readVersion(); |
| versionInfo.add(new Pair<>(position(), version)); |
| truncatedSize = this.position(); |
| break; |
| default: |
| // the disk file is corrupted, using this file may be dangerous |
| throw new IOException("Unexpected marker " + marker); |
| } |
| } |
| // now we read the tail of the data section, so we are sure that the last |
| // ChunkGroupFooter is complete. |
| truncatedSize = this.position() - 1; |
| } catch (Exception e) { |
| logger.info("TsFile {} self-check cannot proceed at position {} " + "recovered, because : {}", |
| file, this.position(), e.getMessage()); |
| } |
| // Despite the completeness of the data section, we will discard current FileMetadata |
| // so that we can continue to write data into this tsfile. |
| return truncatedSize; |
| } |
| |
| public int getTotalChunkNum() { |
| return totalChunkNum; |
| } |
| |
| /** |
| * get ChunkMetaDatas of given path |
| * |
| * @param path timeseries path |
| * @return List of ChunkMetaData |
| */ |
| public List<ChunkMetadata> getChunkMetadataList(Path path) throws IOException { |
| TimeseriesMetadata timeseriesMetaData = readTimeseriesMetadata(path); |
| if (timeseriesMetaData == null) { |
| return new ArrayList<>(); |
| } |
| List<ChunkMetadata> chunkMetadataList = readChunkMetaDataList(timeseriesMetaData); |
| chunkMetadataList.sort(Comparator.comparingLong(ChunkMetadata::getStartTime)); |
| return chunkMetadataList; |
| } |
| |
| /** |
| * get ChunkMetaDatas in given TimeseriesMetaData |
| * |
| * @return List of ChunkMetaData |
| */ |
| public List<ChunkMetadata> readChunkMetaDataList(TimeseriesMetadata timeseriesMetaData) |
| throws IOException { |
| List<Pair<Long, Long>> versionInfo = tsFileMetaData.getVersionInfo(); |
| ArrayList<ChunkMetadata> chunkMetadataList = new ArrayList<>(); |
| long startOffsetOfChunkMetadataList = timeseriesMetaData.getOffsetOfChunkMetaDataList(); |
| int dataSizeOfChunkMetadataList = timeseriesMetaData.getDataSizeOfChunkMetaDataList(); |
| |
| ByteBuffer buffer = readData(startOffsetOfChunkMetadataList, dataSizeOfChunkMetadataList); |
| while (buffer.hasRemaining()) { |
| chunkMetadataList.add(ChunkMetadata.deserializeFrom(buffer)); |
| } |
| |
| VersionUtils.applyVersion(chunkMetadataList, versionInfo); |
| |
| // minimize the storage of an ArrayList instance. |
| chunkMetadataList.trimToSize(); |
| return chunkMetadataList; |
| } |
| |
| /** |
| * get all measurements in this file |
| * |
| * @return measurement -> datatype |
| */ |
| public Map<String, TSDataType> getAllMeasurements() throws IOException { |
| Map<String, TSDataType> result = new HashMap<>(); |
| for (String device : getAllDevices()) { |
| Map<String, TimeseriesMetadata> timeseriesMetadataMap = readDeviceMetadata(device); |
| for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataMap.values()) { |
| result.put(timeseriesMetadata.getMeasurementId(), timeseriesMetadata.getTSDataType()); |
| } |
| } |
| return result; |
| } |
| |
| /** |
| * get device names which has valid chunks in [start, end) |
| * |
| * @param start start of the partition |
| * @param end end of the partition |
| * @return device names in range |
| */ |
| public List<String> getDeviceNameInRange(long start, long end) throws IOException { |
| List<String> res = new ArrayList<>(); |
| for (String device : getAllDevices()) { |
| Map<String, List<ChunkMetadata>> seriesMetadataMap = readChunkMetadataInDevice(device); |
| if (hasDataInPartition(seriesMetadataMap, start, end)) { |
| res.add(device); |
| } |
| } |
| return res; |
| } |
| |
| /** |
| * Check if the device has at least one Chunk in this partition |
| * |
| * @param seriesMetadataMap chunkMetaDataList of each measurement |
| * @param start the start position of the space partition |
| * @param end the end position of the space partition |
| */ |
| private boolean hasDataInPartition(Map<String, List<ChunkMetadata>> seriesMetadataMap, |
| long start, long end) { |
| for (List<ChunkMetadata> chunkMetadataList : seriesMetadataMap.values()) { |
| for (ChunkMetadata chunkMetadata : chunkMetadataList) { |
| LocateStatus location = MetadataQuerierByFileImpl |
| .checkLocateStatus(chunkMetadata, start, end); |
| if (location == LocateStatus.in) { |
| return true; |
| } |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * The location of a chunkGroupMetaData with respect to a space partition constraint. <p> in - the |
| * middle point of the chunkGroupMetaData is located in the current space partition. before - the |
| * middle point of the chunkGroupMetaData is located before the current space partition. after - |
| * the middle point of the chunkGroupMetaData is located after the current space partition. |
| */ |
| public enum LocateStatus { |
| in, before, after |
| } |
| } |