blob: 89234fb9d3564dc0f42a5881e62eb0a070ad7168 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.engine.storagegroup.virtualSg;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor.TimePartitionFilter;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.StorageGroupNotReadyException;
import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.utils.ThreadUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/** Each storage group that set by users corresponds to a StorageGroupManager */
public class StorageGroupManager {
/** logger of this class */
private static final Logger logger = LoggerFactory.getLogger(StorageGroupManager.class);
/** virtual storage group partitioner */
VirtualPartitioner partitioner = HashVirtualPartitioner.getInstance();
/** all virtual storage group processor */
VirtualStorageGroupProcessor[] virtualStorageGroupProcessor;
/**
* recover status of each virtual storage group processor, null if this logical storage group is
* new created
*/
private AtomicBoolean[] isVsgReady;
/** number of ready virtual storage group processors */
private AtomicInteger readyVsgNum;
private AtomicBoolean isSettling = new AtomicBoolean();
/** value of root.stats."root.sg".TOTAL_POINTS */
private long monitorSeriesValue;
public StorageGroupManager() {
this(false);
}
public StorageGroupManager(boolean needRecovering) {
virtualStorageGroupProcessor =
new VirtualStorageGroupProcessor[partitioner.getPartitionCount()];
isVsgReady = new AtomicBoolean[partitioner.getPartitionCount()];
boolean recoverReady = !needRecovering;
for (int i = 0; i < partitioner.getPartitionCount(); i++) {
isVsgReady[i] = new AtomicBoolean(recoverReady);
}
}
/** push forceCloseAllWorkingTsFileProcessors down to all sg */
public void forceCloseAllWorkingTsFileProcessors() throws TsFileProcessorException {
for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
this.virtualStorageGroupProcessor) {
if (virtualStorageGroupProcessor != null) {
virtualStorageGroupProcessor.forceCloseAllWorkingTsFileProcessors();
}
}
}
/** push syncCloseAllWorkingTsFileProcessors down to all sg */
public void syncCloseAllWorkingTsFileProcessors() {
for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
this.virtualStorageGroupProcessor) {
if (virtualStorageGroupProcessor != null) {
virtualStorageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
}
}
}
/** push check ttl down to all sg */
public void checkTTL() {
for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
this.virtualStorageGroupProcessor) {
if (virtualStorageGroupProcessor != null) {
virtualStorageGroupProcessor.checkFilesTTL();
}
}
}
/** push check sequence memtable flush interval down to all sg */
public void timedFlushSeqMemTable() {
for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
this.virtualStorageGroupProcessor) {
if (virtualStorageGroupProcessor != null) {
virtualStorageGroupProcessor.timedFlushSeqMemTable();
}
}
}
/** push check unsequence memtable flush interval down to all sg */
public void timedFlushUnseqMemTable() {
for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
this.virtualStorageGroupProcessor) {
if (virtualStorageGroupProcessor != null) {
virtualStorageGroupProcessor.timedFlushUnseqMemTable();
}
}
}
/** push check TsFileProcessor close interval down to all sg */
public void timedCloseTsFileProcessor() {
for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
this.virtualStorageGroupProcessor) {
if (virtualStorageGroupProcessor != null) {
virtualStorageGroupProcessor.timedCloseTsFileProcessor();
}
}
}
/**
* get processor from device id
*
* @param partialPath device path
* @return virtual storage group processor
*/
@SuppressWarnings("java:S2445")
// actually storageGroupMNode is a unique object on the mtree, synchronize it is reasonable
public VirtualStorageGroupProcessor getProcessor(
PartialPath partialPath, IStorageGroupMNode storageGroupMNode)
throws StorageGroupProcessorException, StorageEngineException {
int loc = partitioner.deviceToVirtualStorageGroupId(partialPath);
VirtualStorageGroupProcessor processor = virtualStorageGroupProcessor[loc];
if (processor == null) {
// if finish recover
if (isVsgReady[loc].get()) {
synchronized (storageGroupMNode) {
processor = virtualStorageGroupProcessor[loc];
if (processor == null) {
processor =
StorageEngine.getInstance()
.buildNewStorageGroupProcessor(
storageGroupMNode.getPartialPath(), storageGroupMNode, String.valueOf(loc));
virtualStorageGroupProcessor[loc] = processor;
}
}
} else {
// not finished recover, refuse the request
throw new StorageGroupNotReadyException(
storageGroupMNode.getFullPath(), TSStatusCode.STORAGE_GROUP_NOT_READY.getStatusCode());
}
}
return processor;
}
/**
* get processor from virtualStorageGroupId
*
* @param virtualStorageGroupId virtual storage group id
* @return virtual storage group processor
*/
@SuppressWarnings("java:S2445")
// actually storageGroupMNode is a unique object on the mtree, synchronize it is reasonable
public VirtualStorageGroupProcessor getProcessor(
int virtualStorageGroupId, IStorageGroupMNode storageGroupMNode)
throws StorageGroupProcessorException, StorageEngineException {
VirtualStorageGroupProcessor processor = virtualStorageGroupProcessor[virtualStorageGroupId];
if (processor == null) {
// if finish recover
if (isVsgReady[virtualStorageGroupId].get()) {
synchronized (storageGroupMNode) {
processor = virtualStorageGroupProcessor[virtualStorageGroupId];
if (processor == null) {
processor =
StorageEngine.getInstance()
.buildNewStorageGroupProcessor(
storageGroupMNode.getPartialPath(),
storageGroupMNode,
String.valueOf(virtualStorageGroupId));
virtualStorageGroupProcessor[virtualStorageGroupId] = processor;
}
}
} else {
// not finished recover, refuse the request
throw new StorageGroupNotReadyException(
storageGroupMNode.getFullPath(), TSStatusCode.STORAGE_GROUP_NOT_READY.getStatusCode());
}
}
return processor;
}
/**
* async recover all virtual storage groups in this logical storage group
*
* @param storageGroupMNode logical sg mnode
* @param pool thread pool to run virtual storage group recover task
* @param futures virtual storage group recover tasks
*/
public void asyncRecover(
IStorageGroupMNode storageGroupMNode, ExecutorService pool, List<Future<Void>> futures) {
readyVsgNum = new AtomicInteger(0);
for (int i = 0; i < partitioner.getPartitionCount(); i++) {
int cur = i;
Callable<Void> recoverVsgTask =
() -> {
isVsgReady[cur].set(false);
VirtualStorageGroupProcessor processor = null;
try {
processor =
StorageEngine.getInstance()
.buildNewStorageGroupProcessor(
storageGroupMNode.getPartialPath(),
storageGroupMNode,
String.valueOf(cur));
} catch (StorageGroupProcessorException e) {
logger.error(
"Failed to recover virtual storage group {}[{}]",
storageGroupMNode.getFullPath(),
cur,
e);
}
virtualStorageGroupProcessor[cur] = processor;
isVsgReady[cur].set(true);
logger.info(
"Storage Group {} has been recovered {}/{}",
storageGroupMNode.getFullPath(),
readyVsgNum.incrementAndGet(),
partitioner.getPartitionCount());
return null;
};
futures.add(pool.submit(recoverVsgTask));
}
}
public long getMonitorSeriesValue() {
return monitorSeriesValue;
}
public void setMonitorSeriesValue(long monitorSeriesValue) {
this.monitorSeriesValue = monitorSeriesValue;
}
public void updateMonitorSeriesValue(int successPointsNum) {
this.monitorSeriesValue += successPointsNum;
}
/** push closeStorageGroupProcessor operation down to all virtual storage group processors */
public void closeStorageGroupProcessor(boolean isSeq, boolean isSync) {
for (VirtualStorageGroupProcessor processor : virtualStorageGroupProcessor) {
if (processor == null) {
continue;
}
if (logger.isInfoEnabled()) {
logger.info(
"{} closing sg processor is called for closing {}, seq = {}",
isSync ? "sync" : "async",
processor.getVirtualStorageGroupId() + "-" + processor.getLogicalStorageGroupName(),
isSeq);
}
processor.writeLock("VirtualCloseStorageGroupProcessor-204");
try {
if (isSeq) {
// to avoid concurrent modification problem, we need a new array list
for (TsFileProcessor tsfileProcessor :
new ArrayList<>(processor.getWorkSequenceTsFileProcessors())) {
if (isSync) {
processor.syncCloseOneTsFileProcessor(true, tsfileProcessor);
} else {
processor.asyncCloseOneTsFileProcessor(true, tsfileProcessor);
}
}
} else {
// to avoid concurrent modification problem, we need a new array list
for (TsFileProcessor tsfileProcessor :
new ArrayList<>(processor.getWorkUnsequenceTsFileProcessors())) {
if (isSync) {
processor.syncCloseOneTsFileProcessor(false, tsfileProcessor);
} else {
processor.asyncCloseOneTsFileProcessor(false, tsfileProcessor);
}
}
}
} finally {
processor.writeUnlock();
}
}
}
/** push closeStorageGroupProcessor operation down to all virtual storage group processors */
public void closeStorageGroupProcessor(long partitionId, boolean isSeq, boolean isSync) {
for (VirtualStorageGroupProcessor processor : virtualStorageGroupProcessor) {
if (processor != null) {
logger.info(
"async closing sg processor is called for closing {}, seq = {}, partitionId = {}",
processor.getVirtualStorageGroupId() + "-" + processor.getLogicalStorageGroupName(),
isSeq,
partitionId);
processor.writeLock("VirtualCloseStorageGroupProcessor-242");
try {
// to avoid concurrent modification problem, we need a new array list
List<TsFileProcessor> processors =
isSeq
? new ArrayList<>(processor.getWorkSequenceTsFileProcessors())
: new ArrayList<>(processor.getWorkUnsequenceTsFileProcessors());
for (TsFileProcessor tsfileProcessor : processors) {
if (tsfileProcessor.getTimeRangeId() == partitionId) {
if (isSync) {
processor.syncCloseOneTsFileProcessor(isSeq, tsfileProcessor);
} else {
processor.asyncCloseOneTsFileProcessor(isSeq, tsfileProcessor);
}
break;
}
}
} finally {
processor.writeUnlock();
}
}
}
}
/** push delete operation down to all virtual storage group processors */
public void delete(
PartialPath path,
long startTime,
long endTime,
long planIndex,
TimePartitionFilter timePartitionFilter)
throws IOException {
for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
this.virtualStorageGroupProcessor) {
if (virtualStorageGroupProcessor != null) {
virtualStorageGroupProcessor.delete(
path, startTime, endTime, planIndex, timePartitionFilter);
}
}
}
/** push countUpgradeFiles operation down to all virtual storage group processors */
public int countUpgradeFiles() {
int totalUpgradeFileNum = 0;
for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
this.virtualStorageGroupProcessor) {
if (virtualStorageGroupProcessor != null) {
totalUpgradeFileNum += virtualStorageGroupProcessor.countUpgradeFiles();
}
}
return totalUpgradeFileNum;
}
/** push upgradeAll operation down to all virtual storage group processors */
public void upgradeAll() {
for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
this.virtualStorageGroupProcessor) {
if (virtualStorageGroupProcessor != null) {
virtualStorageGroupProcessor.upgrade();
}
}
}
public void getResourcesToBeSettled(
List<TsFileResource> seqResourcesToBeSettled,
List<TsFileResource> unseqResourcesToBeSettled,
List<String> tsFilePaths) {
for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
this.virtualStorageGroupProcessor) {
if (virtualStorageGroupProcessor != null) {
virtualStorageGroupProcessor.addSettleFilesToList(
seqResourcesToBeSettled, unseqResourcesToBeSettled, tsFilePaths);
}
}
}
/** push mergeAll operation down to all virtual storage group processors */
public void mergeAll() {
for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
this.virtualStorageGroupProcessor) {
if (virtualStorageGroupProcessor != null) {
virtualStorageGroupProcessor.compact();
}
}
}
/** push syncDeleteDataFiles operation down to all virtual storage group processors */
public void syncDeleteDataFiles() {
for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
this.virtualStorageGroupProcessor) {
if (virtualStorageGroupProcessor != null) {
virtualStorageGroupProcessor.syncDeleteDataFiles();
}
}
}
/** push setTTL operation down to all virtual storage group processors */
public void setTTL(long dataTTL) {
for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
this.virtualStorageGroupProcessor) {
if (virtualStorageGroupProcessor != null) {
virtualStorageGroupProcessor.setDataTTL(dataTTL);
}
}
}
/** push deleteStorageGroup operation down to all virtual storage group processors */
public void deleteStorageGroupSystemFolder(String path) {
for (VirtualStorageGroupProcessor processor : virtualStorageGroupProcessor) {
if (processor != null) {
processor.deleteFolder(path);
}
}
}
/** push getAllClosedStorageGroupTsFile operation down to all virtual storage group processors */
public void getAllClosedStorageGroupTsFile(
PartialPath storageGroupName, Map<PartialPath, Map<Long, List<TsFileResource>>> ret) {
for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
this.virtualStorageGroupProcessor) {
if (virtualStorageGroupProcessor != null) {
List<TsFileResource> allResources = virtualStorageGroupProcessor.getSequenceFileTreeSet();
allResources.addAll(virtualStorageGroupProcessor.getUnSequenceFileList());
for (TsFileResource tsfile : allResources) {
if (!tsfile.isClosed()) {
continue;
}
long partitionNum = tsfile.getTimePartition();
Map<Long, List<TsFileResource>> storageGroupFiles =
ret.computeIfAbsent(storageGroupName, n -> new HashMap<>());
storageGroupFiles.computeIfAbsent(partitionNum, n -> new ArrayList<>()).add(tsfile);
}
}
}
}
/** push setPartitionVersionToMax operation down to all virtual storage group processors */
public void setPartitionVersionToMax(long partitionId, long newMaxVersion) {
for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
this.virtualStorageGroupProcessor) {
if (virtualStorageGroupProcessor != null) {
virtualStorageGroupProcessor.setPartitionFileVersionToMax(partitionId, newMaxVersion);
}
}
}
/** push removePartitions operation down to all virtual storage group processors */
public void removePartitions(TimePartitionFilter filter) {
for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
this.virtualStorageGroupProcessor) {
if (virtualStorageGroupProcessor != null) {
virtualStorageGroupProcessor.removePartitions(filter);
}
}
}
/**
* push getWorkingStorageGroupPartitions operation down to all virtual storage group processors
*/
public void getWorkingStorageGroupPartitions(
String storageGroupName, Map<String, List<Pair<Long, Boolean>>> res) {
for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
this.virtualStorageGroupProcessor) {
if (virtualStorageGroupProcessor != null) {
List<Pair<Long, Boolean>> partitionIdList = new ArrayList<>();
for (TsFileProcessor tsFileProcessor :
virtualStorageGroupProcessor.getWorkSequenceTsFileProcessors()) {
Pair<Long, Boolean> tmpPair = new Pair<>(tsFileProcessor.getTimeRangeId(), true);
partitionIdList.add(tmpPair);
}
for (TsFileProcessor tsFileProcessor :
virtualStorageGroupProcessor.getWorkUnsequenceTsFileProcessors()) {
Pair<Long, Boolean> tmpPair = new Pair<>(tsFileProcessor.getTimeRangeId(), false);
partitionIdList.add(tmpPair);
}
res.put(storageGroupName, partitionIdList);
}
}
}
/** release resource of direct wal buffer */
public void releaseWalDirectByteBufferPool() {
for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
this.virtualStorageGroupProcessor) {
if (virtualStorageGroupProcessor != null) {
virtualStorageGroupProcessor.releaseWalDirectByteBufferPool();
}
}
}
/** only for test */
public void reset() {
Arrays.fill(virtualStorageGroupProcessor, null);
}
public void stopSchedulerPool() {
for (VirtualStorageGroupProcessor vsg : this.virtualStorageGroupProcessor) {
if (vsg != null) {
ThreadUtils.stopThreadPool(
vsg.getTimedCompactionScheduleTask(), ThreadName.COMPACTION_SCHEDULE);
ThreadUtils.stopThreadPool(vsg.getWALTrimScheduleTask(), ThreadName.WAL_TRIM);
}
}
}
public void setSettling(boolean settling) {
isSettling.set(settling);
}
public AtomicBoolean getIsSettling() {
return isSettling;
}
}