| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iotdb.db.engine.compaction; |
| |
| import org.apache.iotdb.commons.conf.IoTDBConstant; |
| import org.apache.iotdb.db.conf.IoTDBDescriptor; |
| import org.apache.iotdb.db.engine.modification.Modification; |
| import org.apache.iotdb.db.engine.modification.ModificationFile; |
| import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator; |
| import org.apache.iotdb.db.engine.storagegroup.TsFileResource; |
| import org.apache.iotdb.db.query.control.FileReaderManager; |
| import org.apache.iotdb.tsfile.common.constant.TsFileConstant; |
| import org.apache.iotdb.tsfile.exception.write.WriteProcessException; |
| import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| /** |
| * This tool can be used to perform inner space or cross space compaction of aligned and non aligned |
| * timeseries. |
| */ |
| public class CompactionUtils { |
| private static final Logger logger = |
| LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME); |
| private static final int subTaskNum = |
| IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum(); |
| |
| /** |
| * Update the targetResource. Move tmp target file to target file and serialize |
| * xxx.tsfile.resource. |
| */ |
| public static void moveTargetFile( |
| List<TsFileResource> targetResources, boolean isInnerSpace, String fullStorageGroupName) |
| throws IOException, WriteProcessException { |
| String fileSuffix; |
| if (isInnerSpace) { |
| fileSuffix = IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX; |
| } else { |
| fileSuffix = IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX; |
| } |
| for (TsFileResource targetResource : targetResources) { |
| moveOneTargetFile(targetResource, fileSuffix, fullStorageGroupName); |
| } |
| } |
| |
| private static void moveOneTargetFile( |
| TsFileResource targetResource, String tmpFileSuffix, String fullStorageGroupName) |
| throws IOException { |
| // move to target file and delete old tmp target file |
| if (!targetResource.getTsFile().exists()) { |
| logger.info( |
| "{} [Compaction] Tmp target tsfile {} may be deleted after compaction.", |
| fullStorageGroupName, |
| targetResource.getTsFilePath()); |
| return; |
| } |
| File newFile = |
| new File( |
| targetResource.getTsFilePath().replace(tmpFileSuffix, TsFileConstant.TSFILE_SUFFIX)); |
| if (!newFile.exists()) { |
| FSFactoryProducer.getFSFactory().moveFile(targetResource.getTsFile(), newFile); |
| } |
| |
| // serialize xxx.tsfile.resource |
| targetResource.setFile(newFile); |
| targetResource.serialize(); |
| targetResource.close(); |
| } |
| |
| /** |
| * Collect all the compaction modification files of source files, and combines them as the |
| * modification file of target file. |
| */ |
| public static void combineModsInCrossCompaction( |
| List<TsFileResource> seqResources, |
| List<TsFileResource> unseqResources, |
| List<TsFileResource> targetResources) |
| throws IOException { |
| // target file may less than source seq files, so we should find each target file with its |
| // corresponding source seq file. |
| Map<String, TsFileResource> seqFileInfoMap = new HashMap<>(); |
| for (TsFileResource tsFileResource : seqResources) { |
| seqFileInfoMap.put( |
| TsFileNameGenerator.increaseCrossCompactionCnt(tsFileResource.getTsFile()).getName(), |
| tsFileResource); |
| } |
| // update each target mods file. |
| for (TsFileResource targetResource : targetResources) { |
| TsFileResource seqFile = seqFileInfoMap.get(targetResource.getTsFile().getName()); |
| Set<Modification> modifications = new HashSet<>(); |
| if (seqFile != null) { |
| // get compaction mods from its corresponding source seq file |
| modifications.addAll(ModificationFile.getCompactionMods(seqFile).getModifications()); |
| } |
| // get compaction mods from all source unseq files |
| for (TsFileResource unseqFile : unseqResources) { |
| modifications.addAll(ModificationFile.getCompactionMods(unseqFile).getModifications()); |
| } |
| |
| updateOneTargetMods(targetResource, modifications); |
| } |
| } |
| |
| /** |
| * Collect all the compaction modification files of source files, and combines them as the |
| * modification file of target file. |
| */ |
| public static void combineModsInInnerCompaction( |
| Collection<TsFileResource> sourceFiles, TsFileResource targetTsFile) throws IOException { |
| Set<Modification> modifications = new HashSet<>(); |
| for (TsFileResource mergeTsFile : sourceFiles) { |
| try (ModificationFile sourceCompactionModificationFile = |
| ModificationFile.getCompactionMods(mergeTsFile)) { |
| modifications.addAll(sourceCompactionModificationFile.getModifications()); |
| } |
| } |
| updateOneTargetMods(targetTsFile, modifications); |
| } |
| |
| private static void updateOneTargetMods( |
| TsFileResource targetFile, Set<Modification> modifications) throws IOException { |
| if (!modifications.isEmpty()) { |
| try (ModificationFile modificationFile = ModificationFile.getNormalMods(targetFile)) { |
| for (Modification modification : modifications) { |
| // we have to set modification offset to MAX_VALUE, as the offset of source chunk may |
| // change after compaction |
| modification.setFileOffset(Long.MAX_VALUE); |
| modificationFile.write(modification); |
| } |
| } |
| } |
| } |
| |
| public static void deleteCompactionModsFile( |
| List<TsFileResource> selectedSeqTsFileResourceList, |
| List<TsFileResource> selectedUnSeqTsFileResourceList) |
| throws IOException { |
| for (TsFileResource seqFile : selectedSeqTsFileResourceList) { |
| ModificationFile modificationFile = seqFile.getCompactionModFile(); |
| if (modificationFile.exists()) { |
| modificationFile.remove(); |
| } |
| } |
| for (TsFileResource unseqFile : selectedUnSeqTsFileResourceList) { |
| ModificationFile modificationFile = unseqFile.getCompactionModFile(); |
| if (modificationFile.exists()) { |
| modificationFile.remove(); |
| } |
| } |
| } |
| |
| public static boolean deleteTsFilesInDisk( |
| Collection<TsFileResource> mergeTsFiles, String storageGroupName) { |
| logger.info("{} [Compaction] Compaction starts to delete real file ", storageGroupName); |
| boolean result = true; |
| for (TsFileResource mergeTsFile : mergeTsFiles) { |
| if (!deleteTsFile(mergeTsFile)) { |
| result = false; |
| } |
| logger.info( |
| "{} [Compaction] delete TsFile {}", storageGroupName, mergeTsFile.getTsFilePath()); |
| } |
| return result; |
| } |
| |
| public static boolean deleteTsFile(TsFileResource seqFile) { |
| try { |
| FileReaderManager.getInstance().closeFileAndRemoveReader(seqFile.getTsFilePath()); |
| seqFile.remove(); |
| } catch (IOException e) { |
| logger.error(e.getMessage(), e); |
| return false; |
| } |
| return true; |
| } |
| |
| /** Delete all modification files for source files */ |
| public static void deleteModificationForSourceFile( |
| Collection<TsFileResource> sourceFiles, String storageGroupName) throws IOException { |
| logger.info("{} [Compaction] Start to delete modifications of source files", storageGroupName); |
| for (TsFileResource tsFileResource : sourceFiles) { |
| ModificationFile compactionModificationFile = |
| ModificationFile.getCompactionMods(tsFileResource); |
| if (compactionModificationFile.exists()) { |
| compactionModificationFile.remove(); |
| } |
| |
| ModificationFile normalModification = ModificationFile.getNormalMods(tsFileResource); |
| if (normalModification.exists()) { |
| normalModification.remove(); |
| } |
| } |
| } |
| } |