blob: 3af6fad71b91fb65206038c258bbe2533981bb91 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.storageengine.dataregion.compaction.repair;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionLastTimeCheckFailedException;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.reader.CompactionChunkReader;
import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileReader;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.compress.IUnCompressor;
import org.apache.tsfile.encoding.decoder.Decoder;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.MetaMarker;
import org.apache.tsfile.file.header.ChunkHeader;
import org.apache.tsfile.file.header.PageHeader;
import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.tsfile.file.metadata.ChunkMetadata;
import org.apache.tsfile.file.metadata.IChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.TsFileDeviceIterator;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.read.common.Chunk;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class RepairDataFileScanUtil {
private static final Logger logger = LoggerFactory.getLogger(RepairDataFileScanUtil.class);
private final TsFileResource resource;
private boolean hasUnsortedData;
private boolean isBrokenFile;
private long previousTime;
public RepairDataFileScanUtil(TsFileResource resource) {
this.resource = resource;
this.hasUnsortedData = false;
this.previousTime = Long.MIN_VALUE;
}
public void scanTsFile() {
File tsfile = resource.getTsFile();
try (TsFileSequenceReader reader =
new CompactionTsFileReader(
tsfile.getPath(),
resource.isSeq()
? CompactionType.INNER_SEQ_COMPACTION
: CompactionType.INNER_UNSEQ_COMPACTION)) {
TsFileDeviceIterator deviceIterator = reader.getAllDevicesIteratorWithIsAligned();
while (deviceIterator.hasNext()) {
Pair<IDeviceID, Boolean> deviceIsAlignedPair = deviceIterator.next();
IDeviceID device = deviceIsAlignedPair.getLeft();
boolean isAligned = deviceIsAlignedPair.getRight();
if (isAligned) {
checkAlignedDeviceSeries(reader, device);
} else {
checkNonAlignedDeviceSeries(reader, device);
}
}
} catch (CompactionLastTimeCheckFailedException lastTimeCheckFailedException) {
this.hasUnsortedData = true;
} catch (Exception e) {
// ignored the exception caused by thread interrupt
if (Thread.currentThread().isInterrupted()) {
return;
}
// source file may be deleted
if (!resource.tsFileExists()) {
return;
}
logger.warn("Meet error when read tsfile {}", tsfile.getAbsolutePath(), e);
isBrokenFile = true;
}
}
private void checkAlignedDeviceSeries(TsFileSequenceReader reader, IDeviceID device)
throws IOException {
List<AlignedChunkMetadata> chunkMetadataList = reader.getAlignedChunkMetadata(device);
for (AlignedChunkMetadata alignedChunkMetadata : chunkMetadataList) {
IChunkMetadata timeChunkMetadata = alignedChunkMetadata.getTimeChunkMetadata();
Chunk timeChunk = reader.readMemChunk((ChunkMetadata) timeChunkMetadata);
CompactionChunkReader chunkReader = new CompactionChunkReader(timeChunk);
ByteBuffer chunkDataBuffer = timeChunk.getData();
ChunkHeader chunkHeader = timeChunk.getHeader();
while (chunkDataBuffer.hasRemaining()) {
// deserialize a PageHeader from chunkDataBuffer
PageHeader pageHeader = null;
if (((byte) (chunkHeader.getChunkType() & 0x3F)) == MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) {
pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, timeChunk.getChunkStatistic());
} else {
pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, chunkHeader.getDataType());
}
ByteBuffer pageData = chunkReader.readPageDataWithoutUncompressing(pageHeader);
ByteBuffer uncompressedPageData =
uncompressPageData(chunkHeader.getCompressionType(), pageHeader, pageData);
Decoder decoder =
Decoder.getDecoderByType(chunkHeader.getEncodingType(), chunkHeader.getDataType());
while (decoder.hasNext(uncompressedPageData)) {
long currentTime = decoder.readLong(uncompressedPageData);
checkPreviousTimeAndUpdate(((PlainDeviceID) device).toStringID(), currentTime);
}
}
}
previousTime = Long.MIN_VALUE;
}
private void checkNonAlignedDeviceSeries(TsFileSequenceReader reader, IDeviceID device)
throws IOException {
Iterator<Map<String, List<ChunkMetadata>>> measurementChunkMetadataListMapIterator =
reader.getMeasurementChunkMetadataListMapIterator(device);
while (measurementChunkMetadataListMapIterator.hasNext()) {
Map<String, List<ChunkMetadata>> measurementChunkMetadataListMap =
measurementChunkMetadataListMapIterator.next();
for (Map.Entry<String, List<ChunkMetadata>> measurementChunkMetadataListEntry :
measurementChunkMetadataListMap.entrySet()) {
String measurement = measurementChunkMetadataListEntry.getKey();
List<ChunkMetadata> chunkMetadataList = measurementChunkMetadataListEntry.getValue();
checkSingleNonAlignedSeries(reader, measurement, chunkMetadataList);
previousTime = Long.MIN_VALUE;
}
}
}
private void checkSingleNonAlignedSeries(
TsFileSequenceReader reader, String measurement, List<ChunkMetadata> chunkMetadataList)
throws IOException {
for (ChunkMetadata chunkMetadata : chunkMetadataList) {
if (chunkMetadata == null || chunkMetadata.getStatistics().getCount() == 0) {
continue;
}
Chunk chunk = reader.readMemChunk(chunkMetadata);
ChunkHeader chunkHeader = chunk.getHeader();
CompactionChunkReader chunkReader = new CompactionChunkReader(chunk);
ByteBuffer chunkDataBuffer = chunk.getData();
while (chunkDataBuffer.hasRemaining()) {
// deserialize a PageHeader from chunkDataBuffer
PageHeader pageHeader = null;
if (((byte) (chunkHeader.getChunkType() & 0x3F)) == MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) {
pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, chunk.getChunkStatistic());
} else {
pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, chunkHeader.getDataType());
}
ByteBuffer pageData = chunkReader.readPageDataWithoutUncompressing(pageHeader);
ByteBuffer uncompressedPageData =
uncompressPageData(chunkHeader.getCompressionType(), pageHeader, pageData);
ByteBuffer timeBuffer = getTimeBufferFromNonAlignedPage(uncompressedPageData);
Decoder timeDecoder =
Decoder.getDecoderByType(
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
TSDataType.INT64);
while (timeDecoder.hasNext(timeBuffer)) {
long currentTime = timeDecoder.readLong(timeBuffer);
checkPreviousTimeAndUpdate(measurement, currentTime);
}
}
}
}
private ByteBuffer getTimeBufferFromNonAlignedPage(ByteBuffer uncompressedPageData) {
int timeBufferLength = ReadWriteForEncodingUtils.readUnsignedVarInt(uncompressedPageData);
ByteBuffer timeBuffer = uncompressedPageData.slice();
timeBuffer.limit(timeBufferLength);
return timeBuffer;
}
private ByteBuffer uncompressPageData(
CompressionType compressionType, PageHeader pageHeader, ByteBuffer pageData)
throws IOException {
IUnCompressor unCompressor = IUnCompressor.getUnCompressor(compressionType);
byte[] uncompressedData = new byte[pageHeader.getUncompressedSize()];
unCompressor.uncompress(
pageData.array(), 0, pageHeader.getCompressedSize(), uncompressedData, 0);
return ByteBuffer.wrap(uncompressedData);
}
private void checkPreviousTimeAndUpdate(String path, long time) {
if (previousTime >= time) {
throw new CompactionLastTimeCheckFailedException(path, time, previousTime);
}
previousTime = time;
}
public boolean hasUnsortedData() {
return hasUnsortedData;
}
public boolean isBrokenFile() {
return isBrokenFile;
}
public static List<TsFileResource> checkTimePartitionHasOverlap(List<TsFileResource> resources) {
List<TsFileResource> overlapResources = new ArrayList<>();
Map<IDeviceID, Long> deviceEndTimeMap = new HashMap<>();
for (TsFileResource resource : resources) {
if (resource.getStatus() == TsFileResourceStatus.UNCLOSED
|| resource.getStatus() == TsFileResourceStatus.DELETED) {
continue;
}
DeviceTimeIndex deviceTimeIndex;
try {
deviceTimeIndex = getDeviceTimeIndex(resource);
} catch (Exception ignored) {
continue;
}
Set<IDeviceID> devices = deviceTimeIndex.getDevices();
boolean fileHasOverlap = false;
// check overlap
for (IDeviceID device : devices) {
long deviceStartTimeInCurrentFile = deviceTimeIndex.getStartTime(device);
if (deviceStartTimeInCurrentFile > deviceTimeIndex.getEndTime(device)) {
continue;
}
if (!deviceEndTimeMap.containsKey(device)) {
continue;
}
long deviceEndTimeInPreviousFile = deviceEndTimeMap.get(device);
if (deviceStartTimeInCurrentFile <= deviceEndTimeInPreviousFile) {
fileHasOverlap = true;
overlapResources.add(resource);
break;
}
}
// update end time map
if (!fileHasOverlap) {
for (IDeviceID device : devices) {
deviceEndTimeMap.put(device, deviceTimeIndex.getEndTime(device));
}
}
}
return overlapResources;
}
private static DeviceTimeIndex getDeviceTimeIndex(TsFileResource resource) throws IOException {
ITimeIndex timeIndex = resource.getTimeIndex();
if (timeIndex instanceof DeviceTimeIndex) {
return (DeviceTimeIndex) timeIndex;
}
return resource.buildDeviceTimeIndex();
}
}