blob: 3adcf93621cd6f4dd2a7bdda607fc1784bc6dc06 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.storageengine.dataregion.compaction.settle;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSettleReq;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduler;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceList;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.utils.TsFileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@SuppressWarnings("squid:S6548")
public class SettleRequestHandler {
private static final Logger logger = LoggerFactory.getLogger(SettleRequestHandler.class);
private SettleRequestHandler() {}
private boolean testMode = false;
public boolean isTestMode() {
return testMode;
}
public void setTestMode(boolean testMode) {
this.testMode = testMode;
}
public static SettleRequestHandler getInstance() {
return SettleRequestHandlerHolder.INSTANCE;
}
public TSStatus handleSettleRequest(TSettleReq req) {
List<String> paths = req.getPaths();
CompactionScheduler.exclusiveLockCompactionSelection();
try {
SettleRequestContext context = new SettleRequestContext(paths);
TSStatus validationResult = context.validate();
if (validationResult.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return validationResult;
}
if (testMode) {
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
List<TsFileResource> selectedTsFileResources = context.getTsFileResourcesByFileNames();
return context.submitCompactionTask(selectedTsFileResources);
} finally {
CompactionScheduler.exclusiveUnlockCompactionSelection();
}
}
private static class SettleRequestContext {
private ConsistentSettleInfo targetConsistentSettleInfo;
private boolean hasSeqFiles;
private boolean hasUnseqFiles;
private boolean hasModsFiles;
private List<String> paths;
private Set<String> tsFileNames;
private TsFileResourceList allTsFileResourceList;
private TsFileManager tsFileManager;
private IoTDBConfig config;
private SettleRequestContext(List<String> paths) {
this.paths = paths;
this.tsFileNames = new HashSet<>();
this.config = IoTDBDescriptor.getInstance().getConfig();
}
@SuppressWarnings("squid:S3776")
private TSStatus validate() {
if (paths == null || paths.isEmpty()) {
return RpcUtils.getStatus(
TSStatusCode.ILLEGAL_PARAMETER, "The files to settle is not offered.");
}
int maxInnerCompactionCandidateFileNum = config.getFileLimitPerInnerTask();
if (paths.size() > maxInnerCompactionCandidateFileNum) {
return RpcUtils.getStatus(
TSStatusCode.UNSUPPORTED_OPERATION,
"Too many files offered, the limited count of system config is "
+ maxInnerCompactionCandidateFileNum
+ ", the input file count is "
+ tsFileNames.size());
}
TSStatus validationResult;
for (String path : paths) {
File currentTsFile = new File(path);
if (!currentTsFile.exists()) {
return RpcUtils.getStatus(
TSStatusCode.PATH_NOT_EXIST, "The specified file does not exist in " + path);
}
File modsFile = new File(path + ModificationFile.FILE_SUFFIX);
hasModsFiles |= modsFile.exists();
ConsistentSettleInfo currentInfo = calculateConsistentInfo(currentTsFile);
if (!currentInfo.isValid) {
return RpcUtils.getStatus(
TSStatusCode.ILLEGAL_PATH, "The File Name of the TsFile is not valid: " + path);
}
if (this.targetConsistentSettleInfo == null) {
this.targetConsistentSettleInfo = currentInfo;
}
validationResult = targetConsistentSettleInfo.checkConsistency(currentInfo);
if (!isSuccess(validationResult)) {
return validationResult;
}
if (TsFileUtils.isSequence(currentTsFile)) {
hasSeqFiles = true;
} else {
hasUnseqFiles = true;
}
tsFileNames.add(currentTsFile.getName());
if (hasSeqFiles && hasUnseqFiles) {
return RpcUtils.getStatus(
TSStatusCode.UNSUPPORTED_OPERATION, "Settle by cross compaction is not allowed.");
}
}
if (!hasModsFiles) {
return RpcUtils.getStatus(
TSStatusCode.ILLEGAL_PARAMETER,
"Every selected TsFile does not contains the mods file.");
}
DataRegion dataRegion =
StorageEngine.getInstance()
.getDataRegion(new DataRegionId(targetConsistentSettleInfo.dataRegionId));
if (dataRegion == null) {
return RpcUtils.getStatus(TSStatusCode.ILLEGAL_PATH, "DataRegion not exist");
}
tsFileManager = dataRegion.getTsFileManager();
validationResult = checkCompactionConfigs();
if (!isSuccess(validationResult)) {
return validationResult;
}
if (hasSeqFiles) {
allTsFileResourceList =
tsFileManager.getOrCreateSequenceListByTimePartition(
targetConsistentSettleInfo.timePartitionId);
} else {
allTsFileResourceList =
tsFileManager.getOrCreateUnsequenceListByTimePartition(
targetConsistentSettleInfo.timePartitionId);
}
return validateTsFileResources();
}
private ConsistentSettleInfo calculateConsistentInfo(File tsFile) {
ConsistentSettleInfo values = new ConsistentSettleInfo();
values.dataRegionId = TsFileUtils.getDataRegionId(tsFile);
values.storageGroupName = TsFileUtils.getStorageGroup(tsFile);
values.timePartitionId = TsFileUtils.getTimePartition(tsFile);
values.isValid = true;
String fileNameStr = tsFile.getName();
TsFileNameGenerator.TsFileName tsFileName;
try {
tsFileName = TsFileNameGenerator.getTsFileName(fileNameStr);
} catch (IOException e) {
values.isValid = false;
return values;
}
values.level = tsFileName.getInnerCompactionCnt();
return values;
}
private boolean isSuccess(TSStatus status) {
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
}
private TSStatus checkCompactionConfigs() {
if (!tsFileManager.isAllowCompaction()) {
return RpcUtils.getStatus(
TSStatusCode.UNSUPPORTED_OPERATION, "Compaction in this DataRegion is not allowed.");
}
if (hasSeqFiles && !config.isEnableSeqSpaceCompaction()) {
return RpcUtils.getStatus(
TSStatusCode.UNSUPPORTED_OPERATION, "Compaction in Seq Space is not enabled");
}
if (hasUnseqFiles && !config.isEnableUnseqSpaceCompaction()) {
return RpcUtils.getStatus(
TSStatusCode.UNSUPPORTED_OPERATION, "Compaction in Unseq Space is not enabled");
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
private TSStatus validateTsFileResources() {
int continuousCount = 0;
for (TsFileResource tsFileResource : allTsFileResourceList) {
File tsFile = tsFileResource.getTsFile();
if (tsFileNames.contains(tsFile.getName())) {
if (tsFileResource.getStatus() != TsFileResourceStatus.NORMAL) {
return RpcUtils.getStatus(
TSStatusCode.ILLEGAL_PARAMETER,
"The TsFile is not valid: " + tsFile.getAbsolutePath());
}
continuousCount++;
} else if (continuousCount != 0) {
break;
}
}
if (continuousCount != tsFileNames.size()) {
return RpcUtils.getStatus(
TSStatusCode.ILLEGAL_PARAMETER, "Selected TsFiles are not continuous.");
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
private List<TsFileResource> getTsFileResourcesByFileNames() {
List<TsFileResource> selectedTsFileResources = new ArrayList<>(tsFileNames.size());
for (TsFileResource tsFileResource : allTsFileResourceList) {
if (tsFileNames.contains(tsFileResource.getTsFile().getName())) {
selectedTsFileResources.add(tsFileResource);
}
if (selectedTsFileResources.size() == tsFileNames.size()) {
break;
}
}
return selectedTsFileResources;
}
private TSStatus submitCompactionTask(List<TsFileResource> tsFileResources) {
ICompactionPerformer performer =
hasSeqFiles
? config.getInnerSeqCompactionPerformer().createInstance()
: config.getInnerUnseqCompactionPerformer().createInstance();
AbstractCompactionTask task =
new InnerSpaceCompactionTask(
targetConsistentSettleInfo.timePartitionId,
tsFileManager,
tsFileResources,
hasSeqFiles,
performer,
tsFileManager.getNextCompactionTaskId());
try {
CompactionTaskManager.getInstance().addTaskToWaitingQueue(task);
} catch (InterruptedException e) {
logger.error(
"meet error when adding task-{} to compaction waiting queue: {}",
task.getSerialId(),
e.getMessage());
Thread.currentThread().interrupt();
return RpcUtils.getStatus(
TSStatusCode.COMPACTION_ERROR, "meet error when submit settle task.");
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
}
private static class ConsistentSettleInfo {
private int dataRegionId;
private int level;
private String storageGroupName;
private long timePartitionId;
private boolean isValid;
private TSStatus checkConsistency(ConsistentSettleInfo other) {
if (this.dataRegionId != other.dataRegionId) {
return RpcUtils.getStatus(
TSStatusCode.ILLEGAL_PATH, "DataRegion of files is not consistent.");
}
if (!this.storageGroupName.equals(other.storageGroupName)) {
return RpcUtils.getStatus(
TSStatusCode.ILLEGAL_PATH, "StorageGroup of files is not consistent.");
}
if (this.timePartitionId != other.timePartitionId) {
return RpcUtils.getStatus(
TSStatusCode.ILLEGAL_PATH, "TimePartition of files is not consistent.");
}
if (this.level != other.level) {
return RpcUtils.getStatus(
TSStatusCode.ILLEGAL_PARAMETER, "Level of files is not consistent.");
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
}
private static class SettleRequestHandlerHolder {
private static final SettleRequestHandler INSTANCE = new SettleRequestHandler();
}
}