blob: d044bc109d629dc56ecf6320c5d1d3064f0db9d0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.engine.merge.task;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
import org.apache.iotdb.db.engine.merge.manage.MergeContext;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.writer.ForceAppendTsFileWriter;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.modifyTsFileNameUnseqMergCnt;
/**
* MergeFileTask merges the merge temporary files with the seqFiles, either move the merged chunks
* in the temp files into the seqFiles or move the unmerged chunks into the merge temp files,
* depending on which one is the majority.
*/
public class MergeFileTask {
private static final Logger logger = LoggerFactory.getLogger(MergeFileTask.class);
private String taskName;
private MergeContext context;
private MergeLogger mergeLogger;
private MergeResource resource;
private List<TsFileResource> unmergedFiles;
private FSFactory fsFactory = FSFactoryProducer.getFSFactory();
private int currentMergeIndex;
private String currMergeFile;
MergeFileTask(
String taskName,
MergeContext context,
MergeLogger mergeLogger,
MergeResource resource,
List<TsFileResource> unmergedSeqFiles) {
this.taskName = taskName;
this.context = context;
this.mergeLogger = mergeLogger;
this.resource = resource;
this.unmergedFiles = unmergedSeqFiles;
}
void mergeFiles() throws IOException {
// decide whether to write the unmerged chunks to the merge files or to move the merged chunks
// back to the origin seqFile's
if (logger.isInfoEnabled()) {
logger.info("{} starts to merge {} files", taskName, unmergedFiles.size());
}
long startTime = System.currentTimeMillis();
for (int i = 0; i < unmergedFiles.size(); i++) {
TsFileResource seqFile = unmergedFiles.get(i);
currentMergeIndex = i;
currMergeFile = seqFile.getTsFilePath();
int mergedChunkNum = context.getMergedChunkCnt().getOrDefault(seqFile, 0);
int unmergedChunkNum = context.getUnmergedChunkCnt().getOrDefault(seqFile, 0);
if (mergedChunkNum >= unmergedChunkNum) {
// move the unmerged data to the new file
if (logger.isInfoEnabled()) {
logger.info(
"{} moving unmerged data of {} to the merged file, {} merged chunks, {} "
+ "unmerged chunks",
taskName,
seqFile.getTsFile().getName(),
mergedChunkNum,
unmergedChunkNum);
}
moveUnmergedToNew(seqFile);
} else {
// move the merged data to the old file
if (logger.isInfoEnabled()) {
logger.info(
"{} moving merged data of {} to the old file {} merged chunks, {} "
+ "unmerged chunks",
taskName,
seqFile.getTsFile().getName(),
mergedChunkNum,
unmergedChunkNum);
}
moveMergedToOld(seqFile);
}
if (Thread.interrupted()) {
Thread.currentThread().interrupt();
return;
}
logProgress();
}
if (logger.isInfoEnabled()) {
logger.info(
"{} has merged all files after {}ms", taskName, System.currentTimeMillis() - startTime);
}
mergeLogger.logMergeEnd();
}
private void logProgress() {
if (logger.isDebugEnabled()) {
logger.debug(
"{} has merged {}, processed {}/{} files",
taskName,
currMergeFile,
currentMergeIndex + 1,
unmergedFiles.size());
}
}
public String getProgress() {
return String.format(
"Merging %s, processed %d/%d files",
currMergeFile, currentMergeIndex + 1, unmergedFiles.size());
}
private void moveMergedToOld(TsFileResource seqFile) throws IOException {
int mergedChunkNum = context.getMergedChunkCnt().getOrDefault(seqFile, 0);
if (mergedChunkNum == 0) {
resource.removeFileAndWriter(seqFile);
return;
}
seqFile.writeLock();
try {
FileReaderManager.getInstance().closeFileAndRemoveReader(seqFile.getTsFilePath());
resource.removeFileReader(seqFile);
TsFileIOWriter oldFileWriter = getOldFileWriter(seqFile);
// filter the chunks that have been merged
oldFileWriter.filterChunks(new HashMap<>(context.getUnmergedChunkStartTimes().get(seqFile)));
RestorableTsFileIOWriter newFileWriter = resource.getMergeFileWriter(seqFile);
newFileWriter.close();
try (TsFileSequenceReader newFileReader =
new TsFileSequenceReader(newFileWriter.getFile().getPath())) {
Map<String, List<ChunkMetadata>> chunkMetadataListInChunkGroups =
newFileWriter.getDeviceChunkMetadataMap();
if (logger.isDebugEnabled()) {
logger.debug(
"{} find {} merged chunk groups", taskName, chunkMetadataListInChunkGroups.size());
}
for (Map.Entry<String, List<ChunkMetadata>> entry :
chunkMetadataListInChunkGroups.entrySet()) {
String deviceId = entry.getKey();
List<ChunkMetadata> chunkMetadataList = entry.getValue();
writeMergedChunkGroup(chunkMetadataList, deviceId, newFileReader, oldFileWriter);
if (Thread.interrupted()) {
Thread.currentThread().interrupt();
oldFileWriter.close();
restoreOldFile(seqFile);
return;
}
}
}
updateStartTimeAndEndTime(seqFile, oldFileWriter);
oldFileWriter.endFile();
updatePlanIndexes(seqFile);
seqFile.serialize();
mergeLogger.logFileMergeEnd();
logger.debug("{} moved merged chunks of {} to the old file", taskName, seqFile);
newFileWriter.getFile().delete();
// change tsFile name
File nextMergeVersionFile = modifyTsFileNameUnseqMergCnt(seqFile.getTsFile());
fsFactory.moveFile(seqFile.getTsFile(), nextMergeVersionFile);
fsFactory.moveFile(
fsFactory.getFile(seqFile.getTsFile().getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX),
fsFactory.getFile(
nextMergeVersionFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX));
seqFile.setFile(nextMergeVersionFile);
} catch (Exception e) {
restoreOldFile(seqFile);
throw e;
} finally {
seqFile.writeUnlock();
}
}
private void updateStartTimeAndEndTime(TsFileResource seqFile, TsFileIOWriter fileWriter) {
// TODO change to get one timeseries block each time
Map<String, List<ChunkMetadata>> deviceChunkMetadataListMap =
fileWriter.getDeviceChunkMetadataMap();
for (Entry<String, List<ChunkMetadata>> deviceChunkMetadataListEntry :
deviceChunkMetadataListMap.entrySet()) {
String device = deviceChunkMetadataListEntry.getKey();
for (IChunkMetadata chunkMetadata : deviceChunkMetadataListEntry.getValue()) {
resource.updateStartTime(seqFile, device, chunkMetadata.getStartTime());
resource.updateEndTime(seqFile, device, chunkMetadata.getEndTime());
}
}
// update all device start time and end time of the resource
Map<String, Pair<Long, Long>> deviceStartEndTimePairMap = resource.getStartEndTime(seqFile);
for (Entry<String, Pair<Long, Long>> deviceStartEndTimePairEntry :
deviceStartEndTimePairMap.entrySet()) {
String device = deviceStartEndTimePairEntry.getKey();
Pair<Long, Long> startEndTimePair = deviceStartEndTimePairEntry.getValue();
seqFile.putStartTime(device, startEndTimePair.left);
seqFile.putEndTime(device, startEndTimePair.right);
}
}
/**
* Restore an old seq file which is being written new chunks when exceptions occur or the task is
* aborted.
*/
private void restoreOldFile(TsFileResource seqFile) throws IOException {
RestorableTsFileIOWriter oldFileRecoverWriter =
new RestorableTsFileIOWriter(seqFile.getTsFile());
if (oldFileRecoverWriter.hasCrashed() && oldFileRecoverWriter.canWrite()) {
oldFileRecoverWriter.endFile();
} else {
oldFileRecoverWriter.close();
}
}
/** Open an appending writer for an old seq file so we can add new chunks to it. */
private TsFileIOWriter getOldFileWriter(TsFileResource seqFile) throws IOException {
TsFileIOWriter oldFileWriter;
try {
oldFileWriter = new ForceAppendTsFileWriter(seqFile.getTsFile());
mergeLogger.logFileMergeStart(
seqFile.getTsFile(), ((ForceAppendTsFileWriter) oldFileWriter).getTruncatePosition());
logger.debug("{} moving merged chunks of {} to the old file", taskName, seqFile);
((ForceAppendTsFileWriter) oldFileWriter).doTruncate();
} catch (TsFileNotCompleteException e) {
// this file may already be truncated if this merge is a system reboot merge
oldFileWriter = new RestorableTsFileIOWriter(seqFile.getTsFile());
}
return oldFileWriter;
}
private void updatePlanIndexes(TsFileResource seqFile) {
// as the new file contains data of other files, track their plan indexes in the new file
// so that we will be able to compare data across different IoTDBs that share the same index
// generation policy
// however, since the data of unseq files are mixed together, we won't be able to know
// which files are exactly contained in the new file, so we have to record all unseq files
// in the new file
for (TsFileResource unseqFile : resource.getUnseqFiles()) {
seqFile.updatePlanIndexes(unseqFile);
}
}
private void writeMergedChunkGroup(
List<ChunkMetadata> chunkMetadataList,
String device,
TsFileSequenceReader reader,
TsFileIOWriter fileWriter)
throws IOException {
fileWriter.startChunkGroup(device);
for (ChunkMetadata chunkMetaData : chunkMetadataList) {
Chunk chunk = reader.readMemChunk(chunkMetaData);
fileWriter.writeChunk(chunk, chunkMetaData);
context.incTotalPointWritten(chunkMetaData.getNumOfPoints());
}
fileWriter.endChunkGroup();
}
private void moveUnmergedToNew(TsFileResource seqFile) throws IOException {
Map<PartialPath, List<Long>> fileUnmergedChunkStartTimes =
context.getUnmergedChunkStartTimes().get(seqFile);
RestorableTsFileIOWriter fileWriter = resource.getMergeFileWriter(seqFile);
mergeLogger.logFileMergeStart(fileWriter.getFile(), fileWriter.getFile().length());
logger.debug("{} moving unmerged chunks of {} to the new file", taskName, seqFile);
int unmergedChunkNum = context.getUnmergedChunkCnt().getOrDefault(seqFile, 0);
if (unmergedChunkNum > 0) {
for (Entry<PartialPath, List<Long>> entry : fileUnmergedChunkStartTimes.entrySet()) {
PartialPath path = entry.getKey();
List<Long> chunkStartTimes = entry.getValue();
if (chunkStartTimes.isEmpty()) {
continue;
}
List<ChunkMetadata> chunkMetadataList = resource.queryChunkMetadata(path, seqFile);
if (logger.isDebugEnabled()) {
logger.debug("{} find {} unmerged chunks", taskName, chunkMetadataList.size());
}
fileWriter.startChunkGroup(path.getDevice());
writeUnmergedChunks(
chunkStartTimes, chunkMetadataList, resource.getFileReader(seqFile), fileWriter);
if (Thread.interrupted()) {
Thread.currentThread().interrupt();
return;
}
fileWriter.endChunkGroup();
}
}
updateStartTimeAndEndTime(seqFile, fileWriter);
resource.removeFileReader(seqFile);
FileReaderManager.getInstance().closeFileAndRemoveReader(seqFile.getTsFilePath());
fileWriter.endFile();
updatePlanIndexes(seqFile);
mergeLogger.logFileMergeEnd();
logger.debug("{} moved unmerged chunks of {} to the new file", taskName, seqFile);
seqFile.writeLock();
try {
seqFile.serialize();
// change tsFile name
seqFile.getTsFile().delete();
File nextMergeVersionFile = modifyTsFileNameUnseqMergCnt(seqFile.getTsFile());
fsFactory.moveFile(fileWriter.getFile(), nextMergeVersionFile);
fsFactory.moveFile(
fsFactory.getFile(seqFile.getTsFile().getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX),
fsFactory.getFile(
nextMergeVersionFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX));
seqFile.setFile(nextMergeVersionFile);
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
// clean cache
if (IoTDBDescriptor.getInstance().getConfig().isMetaDataCacheEnable()) {
ChunkCache.getInstance().clear();
TimeSeriesMetadataCache.getInstance().clear();
}
seqFile.writeUnlock();
}
}
private long writeUnmergedChunks(
List<Long> chunkStartTimes,
List<ChunkMetadata> chunkMetadataList,
TsFileSequenceReader reader,
RestorableTsFileIOWriter fileWriter)
throws IOException {
long maxVersion = 0;
int chunkIdx = 0;
for (Long startTime : chunkStartTimes) {
for (; chunkIdx < chunkMetadataList.size(); chunkIdx++) {
ChunkMetadata metaData = chunkMetadataList.get(chunkIdx);
if (metaData.getStartTime() == startTime) {
Chunk chunk = reader.readMemChunk(metaData);
fileWriter.writeChunk(chunk, metaData);
maxVersion = metaData.getVersion() > maxVersion ? metaData.getVersion() : maxVersion;
context.incTotalPointWritten(metaData.getNumOfPoints());
break;
}
}
if (Thread.interrupted()) {
Thread.currentThread().interrupt();
return maxVersion;
}
}
return maxVersion;
}
}