| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.tsfile.write; |
| |
| import org.apache.iotdb.tsfile.common.conf.TSFileConfig; |
| import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; |
| import org.apache.iotdb.tsfile.common.constant.TsFileConstant; |
| import org.apache.iotdb.tsfile.encoding.decoder.Decoder; |
| import org.apache.iotdb.tsfile.file.MetaMarker; |
| import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader; |
| import org.apache.iotdb.tsfile.file.header.ChunkHeader; |
| import org.apache.iotdb.tsfile.file.header.PageHeader; |
| import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.IDeviceID; |
| import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; |
| import org.apache.iotdb.tsfile.read.TimeValuePair; |
| import org.apache.iotdb.tsfile.read.TsFileSequenceReader; |
| import org.apache.iotdb.tsfile.read.common.BatchData; |
| import org.apache.iotdb.tsfile.read.common.Chunk; |
| import org.apache.iotdb.tsfile.read.common.IBatchDataIterator; |
| import org.apache.iotdb.tsfile.read.reader.IChunkReader; |
| import org.apache.iotdb.tsfile.read.reader.IPointReader; |
| import org.apache.iotdb.tsfile.read.reader.chunk.AlignedChunkReader; |
| import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; |
| import org.apache.iotdb.tsfile.read.reader.page.PageReader; |
| import org.apache.iotdb.tsfile.read.reader.page.TimePageReader; |
| import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader; |
| import org.apache.iotdb.tsfile.utils.Pair; |
| import org.apache.iotdb.tsfile.utils.TsPrimitiveType; |
| |
| import org.junit.Assert; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.List; |
| import java.util.Map; |
| |
| /** This class provide some static method to check the integrity of tsfile */ |
| public class TsFileIntegrityCheckingTool { |
| private static Logger LOG = LoggerFactory.getLogger(TsFileIntegrityCheckingTool.class); |
| |
| /** |
| * This method check the integrity of file by reading it from the start to the end. It mainly |
| * checks the integrity of the chunks. |
| * |
| * @param filename |
| */ |
| public static void checkIntegrityBySequenceRead(String filename) { |
| try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) { |
| String headMagicString = reader.readHeadMagic(); |
| Assert.assertEquals(TSFileConfig.MAGIC_STRING, headMagicString); |
| String tailMagicString = reader.readTailMagic(); |
| Assert.assertEquals(TSFileConfig.MAGIC_STRING, tailMagicString); |
| reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1); |
| List<long[]> timeBatch = new ArrayList<>(); |
| int pageIndex = 0; |
| byte marker; |
| while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) { |
| switch (marker) { |
| case MetaMarker.CHUNK_HEADER: |
| case MetaMarker.TIME_CHUNK_HEADER: |
| case MetaMarker.VALUE_CHUNK_HEADER: |
| case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER: |
| case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER: |
| case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER: |
| ChunkHeader header = reader.readChunkHeader(marker); |
| if (header.getDataSize() == 0) { |
| // empty value chunk |
| break; |
| } |
| Decoder defaultTimeDecoder = |
| Decoder.getDecoderByType( |
| TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()), |
| TSDataType.INT64); |
| Decoder valueDecoder = |
| Decoder.getDecoderByType(header.getEncodingType(), header.getDataType()); |
| int dataSize = header.getDataSize(); |
| pageIndex = 0; |
| if (header.getDataType() == TSDataType.VECTOR) { |
| timeBatch.clear(); |
| } |
| while (dataSize > 0) { |
| valueDecoder.reset(); |
| PageHeader pageHeader = |
| reader.readPageHeader( |
| header.getDataType(), |
| (header.getChunkType() & 0x3F) == MetaMarker.CHUNK_HEADER); |
| ByteBuffer pageData = reader.readPage(pageHeader, header.getCompressionType()); |
| if ((header.getChunkType() & (byte) TsFileConstant.TIME_COLUMN_MASK) |
| == (byte) TsFileConstant.TIME_COLUMN_MASK) { // Time Chunk |
| TimePageReader timePageReader = |
| new TimePageReader(pageHeader, pageData, defaultTimeDecoder); |
| timeBatch.add(timePageReader.getNextTimeBatch()); |
| } else if ((header.getChunkType() & (byte) TsFileConstant.VALUE_COLUMN_MASK) |
| == (byte) TsFileConstant.VALUE_COLUMN_MASK) { // Value Chunk |
| ValuePageReader valuePageReader = |
| new ValuePageReader(pageHeader, pageData, header.getDataType(), valueDecoder); |
| TsPrimitiveType[] valueBatch = |
| valuePageReader.nextValueBatch(timeBatch.get(pageIndex)); |
| } else { // NonAligned Chunk |
| PageReader pageReader = |
| new PageReader( |
| pageData, header.getDataType(), valueDecoder, defaultTimeDecoder); |
| BatchData batchData = pageReader.getAllSatisfiedPageData(); |
| } |
| pageIndex++; |
| dataSize -= pageHeader.getSerializedPageSize(); |
| } |
| break; |
| case MetaMarker.CHUNK_GROUP_HEADER: |
| ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader(); |
| break; |
| case MetaMarker.OPERATION_INDEX_RANGE: |
| reader.readPlanIndex(); |
| break; |
| default: |
| MetaMarker.handleUnexpectedMarker(marker); |
| } |
| } |
| } catch (IOException e) { |
| LOG.error("Meet exception when checking integrity of tsfile", e); |
| Assert.fail(); |
| } |
| } |
| |
| /** |
| * This method checks the integrity of the file by mimicking the process of the query, which reads |
| * the metadata index tree first, and get the timeseries metadata list and chunk metadata list. |
| * After that, this method acquires single chunk according to chunk metadata, then it deserializes |
| * the chunk, and verifies the correctness of the data. |
| * |
| * @param filename File to be check |
| * @param originData The origin data in a map format, Device -> SeriesId -> List<List<Time,Val>>, |
| * each inner list stands for a chunk. |
| */ |
| public static void checkIntegrityByQuery( |
| String filename, |
| Map<IDeviceID, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData) { |
| try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) { |
| Map<IDeviceID, List<TimeseriesMetadata>> allTimeseriesMetadata = |
| reader.getAllTimeseriesMetadata(true); |
| Assert.assertEquals(originData.size(), allTimeseriesMetadata.size()); |
| // check each series |
| for (Map.Entry<IDeviceID, List<TimeseriesMetadata>> entry : |
| allTimeseriesMetadata.entrySet()) { |
| IDeviceID deviceId = entry.getKey(); |
| List<TimeseriesMetadata> timeseriesMetadataList = entry.getValue(); |
| boolean vectorMode = false; |
| if (timeseriesMetadataList.size() > 0 |
| && timeseriesMetadataList.get(0).getTsDataType() != TSDataType.VECTOR) { |
| Assert.assertEquals(originData.get(deviceId).size(), timeseriesMetadataList.size()); |
| } else { |
| vectorMode = true; |
| Assert.assertEquals(originData.get(deviceId).size(), timeseriesMetadataList.size() - 1); |
| } |
| |
| if (!vectorMode) { |
| // check integrity of not aligned series |
| for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) { |
| // get its chunk metadata list, and read the chunk |
| String measurementId = timeseriesMetadata.getMeasurementId(); |
| List<List<Pair<Long, TsPrimitiveType>>> originChunks = |
| originData.get(deviceId).get(measurementId); |
| List<IChunkMetadata> chunkMetadataList = timeseriesMetadata.getChunkMetadataList(); |
| Assert.assertEquals(originChunks.size(), chunkMetadataList.size()); |
| chunkMetadataList.sort(Comparator.comparing(IChunkMetadata::getStartTime)); |
| for (int i = 0; i < chunkMetadataList.size(); ++i) { |
| Chunk chunk = reader.readMemChunk((ChunkMetadata) chunkMetadataList.get(i)); |
| ChunkReader chunkReader = new ChunkReader(chunk); |
| List<Pair<Long, TsPrimitiveType>> originValue = originChunks.get(i); |
| // deserialize the chunk and verify it with origin data |
| for (int valIdx = 0; chunkReader.hasNextSatisfiedPage(); ) { |
| IPointReader pointReader = chunkReader.nextPageData().getBatchDataIterator(); |
| while (pointReader.hasNextTimeValuePair()) { |
| TimeValuePair pair = pointReader.nextTimeValuePair(); |
| Assert.assertEquals( |
| originValue.get(valIdx).left.longValue(), pair.getTimestamp()); |
| Assert.assertEquals(originValue.get(valIdx++).right, pair.getValue()); |
| } |
| } |
| } |
| } |
| } else { |
| // check integrity of vector type |
| // get the timeseries metadata of the time column |
| TimeseriesMetadata timeColumnMetadata = timeseriesMetadataList.get(0); |
| List<IChunkMetadata> timeChunkMetadataList = timeColumnMetadata.getChunkMetadataList(); |
| timeChunkMetadataList.sort(Comparator.comparing(IChunkMetadata::getStartTime)); |
| |
| for (int i = 1; i < timeseriesMetadataList.size(); ++i) { |
| // traverse each value column |
| List<IChunkMetadata> valueChunkMetadataList = |
| timeseriesMetadataList.get(i).getChunkMetadataList(); |
| Assert.assertEquals(timeChunkMetadataList.size(), valueChunkMetadataList.size()); |
| List<List<Pair<Long, TsPrimitiveType>>> originDataChunks = |
| originData.get(deviceId).get(timeseriesMetadataList.get(i).getMeasurementId()); |
| for (int chunkIdx = 0; chunkIdx < timeChunkMetadataList.size(); ++chunkIdx) { |
| Chunk timeChunk = |
| reader.readMemChunk((ChunkMetadata) timeChunkMetadataList.get(chunkIdx)); |
| Chunk valueChunk = |
| reader.readMemChunk((ChunkMetadata) valueChunkMetadataList.get(chunkIdx)); |
| // construct an aligned chunk reader using time chunk and value chunk |
| IChunkReader chunkReader = |
| new AlignedChunkReader(timeChunk, Collections.singletonList(valueChunk)); |
| // verify the values |
| List<Pair<Long, TsPrimitiveType>> originValue = originDataChunks.get(chunkIdx); |
| for (int valIdx = 0; chunkReader.hasNextSatisfiedPage(); ) { |
| IBatchDataIterator pointReader = chunkReader.nextPageData().getBatchDataIterator(); |
| while (pointReader.hasNext()) { |
| long time = pointReader.currentTime(); |
| Assert.assertEquals(originValue.get(valIdx).left.longValue(), time); |
| Assert.assertEquals( |
| originValue.get(valIdx++).right.getValue(), |
| ((TsPrimitiveType[]) pointReader.currentValue())[0].getValue()); |
| pointReader.next(); |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| } catch (IOException e) { |
| LOG.error("Meet exception when checking integrity of tsfile", e); |
| Assert.fail(); |
| } |
| } |
| } |