blob: 2c6aa02b9b0fcd04e64fccfbd35a878340680276 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.service.metrics.CompactionMetrics;
import org.apache.iotdb.db.service.metrics.FileMetrics;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionRecoverException;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICrossCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogAnalyzer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.SimpleCompactionLogger;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.TsFileIdentifier;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
import org.apache.tsfile.utils.TsFileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class CrossSpaceCompactionTask extends AbstractCompactionTask {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
protected List<TsFileResource> selectedSequenceFiles;
protected List<TsFileResource> selectedUnsequenceFiles;
private File logFile;
protected List<TsFileResource> targetTsfileResourceList;
private List<TsFileResource> emptyTargetTsFileResourceList;
protected List<TsFileResource> holdWriteLockList = new ArrayList<>();
protected double selectedSeqFileSize = 0;
protected double selectedUnseqFileSize = 0;
protected boolean needRecoverTaskInfoFromLogFile;
@SuppressWarnings("squid:S107")
public CrossSpaceCompactionTask(
long timePartition,
TsFileManager tsFileManager,
List<TsFileResource> selectedSequenceFiles,
List<TsFileResource> selectedUnsequenceFiles,
ICrossCompactionPerformer performer,
long memoryCost,
long serialId) {
super(
tsFileManager.getStorageGroupName(),
tsFileManager.getDataRegionId(),
timePartition,
tsFileManager,
serialId);
this.selectedSequenceFiles = selectedSequenceFiles;
this.selectedUnsequenceFiles = selectedUnsequenceFiles;
this.emptyTargetTsFileResourceList = new ArrayList<>();
this.performer = performer;
this.hashCode = this.toString().hashCode();
this.memoryCost = memoryCost;
this.crossTask = true;
this.innerSeqTask = false;
createSummary();
}
public CrossSpaceCompactionTask(
String databaseName, String dataRegionId, TsFileManager tsFileManager, File logFile) {
super(databaseName, dataRegionId, 0L, tsFileManager, 0L, CompactionTaskPriorityType.NORMAL);
this.logFile = logFile;
this.needRecoverTaskInfoFromLogFile = true;
}
private void recoverTaskInfoFromLogFile() throws IOException {
CompactionLogAnalyzer logAnalyzer = new CompactionLogAnalyzer(this.logFile);
logAnalyzer.analyze();
List<TsFileIdentifier> sourceFileIdentifiers = logAnalyzer.getSourceFileInfos();
this.selectedSequenceFiles = new ArrayList<>();
sourceFileIdentifiers.stream()
.filter(TsFileIdentifier::isSequence)
.forEach(f -> this.selectedSequenceFiles.add(new TsFileResource(f.getFileFromDataDirs())));
sourceFileIdentifiers.stream()
.filter(f -> !f.isSequence())
.forEach(
f -> this.selectedUnsequenceFiles.add(new TsFileResource(f.getFileFromDataDirs())));
List<TsFileIdentifier> targetFileIdentifiers = logAnalyzer.getTargetFileInfos();
List<TsFileIdentifier> deletedTargetFileIdentifiers = logAnalyzer.getDeletedTargetFileInfos();
for (TsFileIdentifier f : targetFileIdentifiers) {
File targetFileOnDisk = getRealTargetFile(f, IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX);
// The targetFileOnDisk may be null, but it won't impact the task recover stage
TsFileResource targetTsFile = new TsFileResource(targetFileOnDisk);
this.targetTsfileResourceList.add(targetTsFile);
if (deletedTargetFileIdentifiers.contains(f)) {
this.emptyTargetTsFileResourceList.add(targetTsFile);
}
}
this.taskStage = logAnalyzer.getTaskStage();
}
@Override
@SuppressWarnings({"squid:S6541", "squid:S3776", "squid:S2142"})
public boolean doCompaction() {
recoverMemoryStatus = true;
boolean isSuccess = true;
try {
if (!tsFileManager.isAllowCompaction()) {
return true;
}
long startTime = System.currentTimeMillis();
targetTsfileResourceList =
TsFileNameGenerator.getCrossCompactionTargetFileResources(selectedSequenceFiles);
if (targetTsfileResourceList.isEmpty()
|| selectedSequenceFiles.isEmpty()
|| selectedUnsequenceFiles.isEmpty()) {
LOGGER.info(
"{}-{} [Compaction] Cross space compaction file list is empty, end it",
storageGroupName,
dataRegionId);
return true;
}
for (TsFileResource resource : selectedSequenceFiles) {
selectedSeqFileSize += resource.getTsFileSize();
}
for (TsFileResource resource : selectedUnsequenceFiles) {
selectedUnseqFileSize += resource.getTsFileSize();
}
LOGGER.info(
"{}-{} [Compaction] CrossSpaceCompaction task starts with {} seq files "
+ "and {} unsequence files. "
+ "Sequence files : {}, unsequence files : {} . "
+ "Sequence files size is {} MB, "
+ "unsequence file size is {} MB, "
+ "total size is {} MB",
storageGroupName,
dataRegionId,
selectedSequenceFiles.size(),
selectedUnsequenceFiles.size(),
selectedSequenceFiles,
selectedUnsequenceFiles,
selectedSeqFileSize / 1024 / 1024,
selectedUnseqFileSize / 1024 / 1024,
(selectedSeqFileSize + selectedUnseqFileSize) / 1024 / 1024);
logFile =
new File(
selectedSequenceFiles.get(0).getTsFile().getParent()
+ File.separator
+ targetTsfileResourceList.get(0).getTsFile().getName()
+ CompactionLogger.CROSS_COMPACTION_LOG_NAME_SUFFIX);
try (SimpleCompactionLogger compactionLogger = new SimpleCompactionLogger(logFile)) {
// print the path of the temporary file first for priority check during recovery
compactionLogger.logSourceFiles(selectedSequenceFiles);
compactionLogger.logSourceFiles(selectedUnsequenceFiles);
compactionLogger.logTargetFiles(targetTsfileResourceList);
compactionLogger.force();
performer.setSourceFiles(selectedSequenceFiles, selectedUnsequenceFiles);
performer.setTargetFiles(targetTsfileResourceList);
performer.setSummary(summary);
performer.perform();
CompactionUtils.updateProgressIndex(
targetTsfileResourceList, selectedSequenceFiles, selectedUnsequenceFiles);
CompactionUtils.moveTargetFile(
targetTsfileResourceList, false, storageGroupName + "-" + dataRegionId);
CompactionUtils.combineModsInCrossCompaction(
selectedSequenceFiles, selectedUnsequenceFiles, targetTsfileResourceList);
validateCompactionResult(
selectedSequenceFiles, selectedUnsequenceFiles, targetTsfileResourceList);
// update tsfile resource in memory
tsFileManager.replace(
selectedSequenceFiles,
selectedUnsequenceFiles,
targetTsfileResourceList,
timePartition);
// find empty target files and add log
for (TsFileResource targetResource : targetTsfileResourceList) {
if (targetResource.isDeleted()) {
emptyTargetTsFileResourceList.add(targetResource);
compactionLogger.logEmptyTargetFile(targetResource);
compactionLogger.force();
}
}
lockWrite(selectedSequenceFiles);
lockWrite(selectedUnsequenceFiles);
for (TsFileResource sequenceResource : selectedSequenceFiles) {
if (sequenceResource.getModFile().exists()) {
FileMetrics.getInstance().decreaseModFileNum(1);
FileMetrics.getInstance().decreaseModFileSize(sequenceResource.getModFile().getSize());
}
}
for (TsFileResource unsequenceResource : selectedUnsequenceFiles) {
if (unsequenceResource.getModFile().exists()) {
FileMetrics.getInstance().decreaseModFileNum(1);
FileMetrics.getInstance()
.decreaseModFileSize(unsequenceResource.getModFile().getSize());
}
}
CompactionUtils.deleteSourceTsFileAndUpdateFileMetrics(
selectedSequenceFiles, selectedUnsequenceFiles);
CompactionUtils.deleteCompactionModsFile(selectedSequenceFiles, selectedUnsequenceFiles);
for (TsFileResource targetResource : targetTsfileResourceList) {
if (!targetResource.isDeleted()) {
FileMetrics.getInstance()
.addTsFile(
targetResource.getDatabaseName(),
targetResource.getDataRegionId(),
targetResource.getTsFileSize(),
true,
targetResource.getTsFile().getName());
} else {
// target resource is empty after compaction, then delete it
targetResource.remove();
}
}
CompactionMetrics.getInstance().recordSummaryInfo(summary);
double costTime = (System.currentTimeMillis() - startTime) / 1000.0d;
LOGGER.info(
"{}-{} [Compaction] CrossSpaceCompaction task finishes successfully, "
+ "time cost is {} s, "
+ "compaction speed is {} MB/s, {}",
storageGroupName,
dataRegionId,
String.format("%.2f", costTime),
String.format(
"%.2f",
(selectedSeqFileSize + selectedUnseqFileSize) / 1024.0d / 1024.0d / costTime),
summary);
} finally {
Files.deleteIfExists(logFile.toPath());
for (TsFileResource resource : targetTsfileResourceList) {
// may failed to set status if the status of current resource is DELETED
resource.setStatus(TsFileResourceStatus.NORMAL);
}
}
} catch (Exception e) {
isSuccess = false;
handleException(LOGGER, e);
recover();
} finally {
releaseAllLocks();
}
return isSuccess;
}
public void recover() {
try {
if (needRecoverTaskInfoFromLogFile) {
recoverTaskInfoFromLogFile();
}
if (shouldRollback()) {
rollback();
} else {
// That finishTask() is revoked means
finishTask();
}
} catch (Exception e) {
handleRecoverException(e);
}
}
private boolean shouldRollback() {
return checkAllSourceFileExists(selectedSequenceFiles)
&& checkAllSourceFileExists(selectedUnsequenceFiles);
}
private void rollback() throws IOException {
// if the task has started,
if (recoverMemoryStatus) {
replaceTsFileInMemory(
targetTsfileResourceList,
Stream.concat(selectedSequenceFiles.stream(), selectedUnsequenceFiles.stream())
.collect(Collectors.toList()));
}
deleteCompactionModsFile(selectedSequenceFiles);
deleteCompactionModsFile(selectedUnsequenceFiles);
// delete target file
if (targetTsfileResourceList != null && !deleteTsFilesOnDisk(targetTsfileResourceList)) {
throw new CompactionRecoverException("failed to delete target file %s");
}
}
private void finishTask() throws IOException {
for (TsFileResource target : targetTsfileResourceList) {
if (target.isDeleted() || emptyTargetTsFileResourceList.contains(target)) {
// it means the target file is empty after compaction
if (target.remove()) {
throw new CompactionRecoverException(
String.format("failed to delete empty target file %s", target));
}
} else {
File targetFile = target.getTsFile();
if (targetFile == null || !TsFileUtils.isTsFileComplete(target.getTsFile())) {
throw new CompactionRecoverException(
String.format("Target file is not completed. %s", targetFile));
}
if (recoverMemoryStatus) {
target.setStatus(TsFileResourceStatus.NORMAL);
}
}
}
if (!deleteTsFilesOnDisk(selectedSequenceFiles)
|| !deleteTsFilesOnDisk(selectedUnsequenceFiles)) {
throw new CompactionRecoverException("source files cannot be deleted successfully");
}
if (recoverMemoryStatus) {
FileMetrics.getInstance().deleteTsFile(true, selectedSequenceFiles);
FileMetrics.getInstance().deleteTsFile(true, selectedUnsequenceFiles);
}
deleteCompactionModsFile(selectedSequenceFiles);
deleteCompactionModsFile(selectedUnsequenceFiles);
}
@Override
public boolean equalsOtherTask(AbstractCompactionTask otherTask) {
if (!(otherTask instanceof CrossSpaceCompactionTask)) {
return false;
}
CrossSpaceCompactionTask otherCrossCompactionTask = (CrossSpaceCompactionTask) otherTask;
return this.selectedSequenceFiles.equals(otherCrossCompactionTask.selectedSequenceFiles)
&& this.selectedUnsequenceFiles.equals(otherCrossCompactionTask.selectedUnsequenceFiles)
&& this.performer.getClass().isInstance(otherCrossCompactionTask.performer);
}
private void releaseAllLocks() {
for (TsFileResource tsFileResource : holdWriteLockList) {
tsFileResource.writeUnlock();
}
holdWriteLockList.clear();
}
public List<TsFileResource> getSelectedSequenceFiles() {
return selectedSequenceFiles;
}
@Override
public List<TsFileResource> getAllSourceTsFiles() {
List<TsFileResource> allRelatedFiles = new ArrayList<>();
allRelatedFiles.addAll(selectedSequenceFiles);
allRelatedFiles.addAll(selectedUnsequenceFiles);
return allRelatedFiles;
}
public List<TsFileResource> getSelectedUnsequenceFiles() {
return selectedUnsequenceFiles;
}
@Override
public String toString() {
return storageGroupName
+ "-"
+ dataRegionId
+ "-"
+ timePartition
+ " task seq files are "
+ selectedSequenceFiles.toString()
+ " , unseq files are "
+ selectedUnsequenceFiles.toString();
}
@Override
public int hashCode() {
return hashCode;
}
@Override
public boolean equals(Object other) {
if (!(other instanceof CrossSpaceCompactionTask)) {
return false;
}
return equalsOtherTask((CrossSpaceCompactionTask) other);
}
private void lockWrite(List<TsFileResource> tsFileResourceList) {
for (TsFileResource tsFileResource : tsFileResourceList) {
tsFileResource.writeLock();
holdWriteLockList.add(tsFileResource);
}
}
@Override
public long getEstimatedMemoryCost() {
return memoryCost;
}
@Override
public int getProcessedFileNum() {
return selectedSequenceFiles.size() + selectedUnsequenceFiles.size();
}
@Override
protected void createSummary() {
if (performer instanceof FastCompactionPerformer) {
this.summary = new FastCompactionTaskSummary();
} else {
this.summary = new CompactionTaskSummary();
}
}
}