blob: 0caa18159460819c14c215ba7ebed5e721f9c408 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.db.service.metrics.FileMetrics;
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.metrics.utils.SystemMetric;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IDeviceID;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* This tool can be used to perform inner space or cross space compaction of aligned and non aligned
* timeseries.
*/
public class CompactionUtils {
private static final Logger logger =
LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
private static final String SYSTEM = "system";
private CompactionUtils() {}
/**
* Update the targetResource. Move tmp target file to target file and serialize
* xxx.tsfile.resource.
*
* @throws IOException if io errors occurred
*/
public static void moveTargetFile(
List<TsFileResource> targetResources, boolean isInnerSpace, String fullStorageGroupName)
throws IOException {
String fileSuffix;
if (isInnerSpace) {
fileSuffix = IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX;
} else {
fileSuffix = IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX;
}
for (TsFileResource targetResource : targetResources) {
if (targetResource != null) {
moveOneTargetFile(targetResource, fileSuffix, fullStorageGroupName);
}
}
}
private static void moveOneTargetFile(
TsFileResource targetResource, String tmpFileSuffix, String fullStorageGroupName)
throws IOException {
// move to target file and delete old tmp target file
if (!targetResource.getTsFile().exists()) {
logger.info(
"{} [Compaction] Tmp target tsfile {} may be deleted after compaction.",
fullStorageGroupName,
targetResource.getTsFilePath());
return;
}
File newFile =
new File(
targetResource.getTsFilePath().replace(tmpFileSuffix, TsFileConstant.TSFILE_SUFFIX));
if (!newFile.exists()) {
FSFactoryProducer.getFSFactory().moveFile(targetResource.getTsFile(), newFile);
}
// serialize xxx.tsfile.resource
targetResource.setFile(newFile);
targetResource.serialize();
targetResource.closeWithoutSettingStatus();
}
/**
* Collect all the compaction modification files of source files, and combines them as the
* modification file of target file.
*
* @throws IOException if io errors occurred
*/
public static void combineModsInCrossCompaction(
List<TsFileResource> seqResources,
List<TsFileResource> unseqResources,
List<TsFileResource> targetResources)
throws IOException {
Set<Modification> modifications = new HashSet<>();
// get compaction mods from all source unseq files
for (TsFileResource unseqFile : unseqResources) {
modifications.addAll(ModificationFile.getCompactionMods(unseqFile).getModifications());
}
// write target mods file
for (int i = 0; i < targetResources.size(); i++) {
TsFileResource targetResource = targetResources.get(i);
if (targetResource == null) {
continue;
}
Set<Modification> seqModifications =
new HashSet<>(ModificationFile.getCompactionMods(seqResources.get(i)).getModifications());
modifications.addAll(seqModifications);
updateOneTargetMods(targetResource, modifications);
if (!modifications.isEmpty()) {
FileMetrics.getInstance().increaseModFileNum(1);
FileMetrics.getInstance().increaseModFileSize(targetResource.getModFile().getSize());
}
modifications.removeAll(seqModifications);
}
}
/**
* Collect all the compaction modification files of source files, and combines them as the
* modification file of target file.
*
* @throws IOException if io errors occurred
*/
public static void combineModsInInnerCompaction(
Collection<TsFileResource> sourceFiles, TsFileResource targetTsFile) throws IOException {
Set<Modification> modifications = new HashSet<>();
for (TsFileResource mergeTsFile : sourceFiles) {
try (ModificationFile sourceCompactionModificationFile =
ModificationFile.getCompactionMods(mergeTsFile)) {
modifications.addAll(sourceCompactionModificationFile.getModifications());
}
}
updateOneTargetMods(targetTsFile, modifications);
if (!modifications.isEmpty()) {
FileMetrics.getInstance().increaseModFileNum(1);
FileMetrics.getInstance().increaseModFileSize(targetTsFile.getModFile().getSize());
}
}
private static void updateOneTargetMods(
TsFileResource targetFile, Set<Modification> modifications) throws IOException {
if (!modifications.isEmpty()) {
try (ModificationFile modificationFile = ModificationFile.getNormalMods(targetFile)) {
for (Modification modification : modifications) {
// we have to set modification offset to MAX_VALUE, as the offset of source chunk may
// change after compaction
modification.setFileOffset(Long.MAX_VALUE);
modificationFile.write(modification);
}
}
}
}
public static void deleteCompactionModsFile(
List<TsFileResource> selectedSeqTsFileResourceList,
List<TsFileResource> selectedUnSeqTsFileResourceList)
throws IOException {
for (TsFileResource seqFile : selectedSeqTsFileResourceList) {
ModificationFile modificationFile = seqFile.getCompactionModFile();
if (modificationFile.exists()) {
modificationFile.remove();
}
}
for (TsFileResource unseqFile : selectedUnSeqTsFileResourceList) {
ModificationFile modificationFile = unseqFile.getCompactionModFile();
if (modificationFile.exists()) {
modificationFile.remove();
}
}
}
public static boolean deleteTsFilesInDisk(
Collection<TsFileResource> mergeTsFiles, String storageGroupName) {
logger.info("{} [Compaction] Compaction starts to delete real file ", storageGroupName);
boolean result = true;
for (TsFileResource mergeTsFile : mergeTsFiles) {
if (!mergeTsFile.remove()) {
result = false;
}
logger.info(
"{} [Compaction] delete TsFile {}", storageGroupName, mergeTsFile.getTsFilePath());
}
return result;
}
/**
* Delete all modification files for source files.
*
* @throws IOException if io errors occurred
*/
public static void deleteModificationForSourceFile(
Collection<TsFileResource> sourceFiles, String storageGroupName) throws IOException {
logger.info("{} [Compaction] Start to delete modifications of source files", storageGroupName);
for (TsFileResource tsFileResource : sourceFiles) {
ModificationFile compactionModificationFile =
ModificationFile.getCompactionMods(tsFileResource);
if (compactionModificationFile.exists()) {
compactionModificationFile.remove();
}
ModificationFile normalModification = ModificationFile.getNormalMods(tsFileResource);
if (normalModification.exists()) {
FileMetrics.getInstance().decreaseModFileNum(1);
FileMetrics.getInstance().decreaseModFileSize(tsFileResource.getModFile().getSize());
normalModification.remove();
}
}
}
public static void updateResource(
TsFileResource resource, TsFileIOWriter tsFileIoWriter, IDeviceID deviceId) {
List<ChunkMetadata> chunkMetadatasOfCurrentDevice =
tsFileIoWriter.getChunkMetadataListOfCurrentDeviceInMemory();
if (chunkMetadatasOfCurrentDevice != null) {
// this target file contains current device
for (ChunkMetadata chunkMetadata : chunkMetadatasOfCurrentDevice) {
if (chunkMetadata.getMask() == TsFileConstant.VALUE_COLUMN_MASK) {
// value chunk metadata can be skipped
continue;
}
resource.updateStartTime(deviceId, chunkMetadata.getStatistics().getStartTime());
resource.updateEndTime(deviceId, chunkMetadata.getStatistics().getEndTime());
}
}
}
public static void updateProgressIndex(
List<TsFileResource> targetResources,
List<TsFileResource> seqResources,
List<TsFileResource> unseqResources) {
for (TsFileResource targetResource : targetResources) {
for (TsFileResource unseqResource : unseqResources) {
targetResource.updateProgressIndex(unseqResource.getMaxProgressIndexAfterClose());
}
for (TsFileResource seqResource : seqResources) {
targetResource.updateProgressIndex(seqResource.getMaxProgressIndexAfterClose());
}
}
}
public static void updatePlanIndexes(
List<TsFileResource> targetResources,
List<TsFileResource> seqResources,
List<TsFileResource> unseqResources) {
// as the new file contains data of other files, track their plan indexes in the new file
// so that we will be able to compare data across different IoTDBs that share the same index
// generation policy
// however, since the data of unseq files are mixed together, we won't be able to know
// which files are exactly contained in the new file, so we have to record all unseq files
// in the new file
for (TsFileResource targetResource : targetResources) {
for (TsFileResource unseqResource : unseqResources) {
targetResource.updatePlanIndexes(unseqResource);
}
for (TsFileResource seqResource : seqResources) {
targetResource.updatePlanIndexes(seqResource);
}
}
}
public static void deleteSourceTsFileAndUpdateFileMetrics(
List<TsFileResource> sourceSeqResourceList, List<TsFileResource> sourceUnseqResourceList) {
deleteSourceTsFileAndUpdateFileMetrics(sourceSeqResourceList, true);
deleteSourceTsFileAndUpdateFileMetrics(sourceUnseqResourceList, false);
}
public static void deleteSourceTsFileAndUpdateFileMetrics(
List<TsFileResource> resources, boolean seq) {
List<TsFileResource> removeResources = new ArrayList<>();
for (TsFileResource resource : resources) {
if (!resource.remove()) {
logger.warn(
"[Compaction] delete file failed, file path is {}",
resource.getTsFile().getAbsolutePath());
} else {
logger.info("[Compaction] delete file: {}", resource.getTsFile().getAbsolutePath());
removeResources.add(resource);
}
}
FileMetrics.getInstance().deleteTsFile(seq, resources);
}
public static boolean isDiskHasSpace() {
return isDiskHasSpace(0d);
}
public static boolean isDiskHasSpace(double redundancy) {
double availableDisk =
MetricService.getInstance()
.getAutoGauge(
SystemMetric.SYS_DISK_AVAILABLE_SPACE.toString(),
MetricLevel.CORE,
Tag.NAME.toString(),
SYSTEM)
.getValue();
double totalDisk =
MetricService.getInstance()
.getAutoGauge(
SystemMetric.SYS_DISK_TOTAL_SPACE.toString(),
MetricLevel.CORE,
Tag.NAME.toString(),
SYSTEM)
.getValue();
if (availableDisk != 0 && totalDisk != 0) {
return availableDisk / totalDisk
> CommonDescriptor.getInstance().getConfig().getDiskSpaceWarningThreshold() + redundancy;
}
return true;
}
}