blob: 358d37f7b8bc26bd67c3c897fc4eb48436287515 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.metadata.schemaregion;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
import org.apache.iotdb.db.metadata.mtree.ConfigMTree;
import org.apache.iotdb.db.metadata.rescon.SchemaResourceManager;
import org.apache.iotdb.db.metadata.visitor.SchemaExecutionVisitor;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.external.api.ISeriesNumerLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
// manage all the schemaRegion in this dataNode
public class SchemaEngine {
private static final Logger logger = LoggerFactory.getLogger(SchemaEngine.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private ConfigMTree sharedPrefixTree;
private Map<SchemaRegionId, ISchemaRegion> schemaRegionMap;
private SchemaEngineMode schemaRegionStoredMode;
private ScheduledExecutorService timedForceMLogThread;
private ISeriesNumerLimiter seriesNumerLimiter =
new ISeriesNumerLimiter() {
@Override
public void init(Properties properties) {}
@Override
public boolean addTimeSeries(int number) {
// always return true, don't limit the number of series
return true;
}
@Override
public void deleteTimeSeries(int number) {
// do nothing
}
};
public TSStatus write(SchemaRegionId schemaRegionId, PlanNode planNode) {
return planNode.accept(new SchemaExecutionVisitor(), schemaRegionMap.get(schemaRegionId));
}
private static class SchemaEngineManagerHolder {
private static final SchemaEngine INSTANCE = new SchemaEngine();
private SchemaEngineManagerHolder() {}
}
private SchemaEngine() {}
public static SchemaEngine getInstance() {
return SchemaEngineManagerHolder.INSTANCE;
}
public void init() {
try {
initForLocalConfigNode();
} catch (MetadataException e) {
e.printStackTrace();
logger.error("Error occurred during SchemaEngine initialization.", e);
}
}
public Map<PartialPath, List<SchemaRegionId>> initForLocalConfigNode() throws MetadataException {
schemaRegionStoredMode = SchemaEngineMode.valueOf(config.getSchemaEngineMode());
logger.info("used schema engine mode: {}.", schemaRegionStoredMode);
SchemaResourceManager.initSchemaResource();
schemaRegionMap = new ConcurrentHashMap<>();
sharedPrefixTree = new ConfigMTree();
Map<PartialPath, List<SchemaRegionId>> schemaRegionInfo = initSchemaRegion();
if (!(config.isClusterMode()
&& config
.getSchemaRegionConsensusProtocolClass()
.equals(ConsensusFactory.RatisConsensus))
&& config.getSyncMlogPeriodInMs() != 0) {
timedForceMLogThread =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
"SchemaEngine-TimedForceMLog-Thread");
ScheduledExecutorUtil.unsafelyScheduleAtFixedRate(
timedForceMLogThread,
this::forceMlog,
config.getSyncMlogPeriodInMs(),
config.getSyncMlogPeriodInMs(),
TimeUnit.MILLISECONDS);
}
return schemaRegionInfo;
}
/**
* Scan the storage group and schema region directories to recover schema regions and return the
* collected local schema partition info for localSchemaPartitionTable recovery.
*/
private Map<PartialPath, List<SchemaRegionId>> initSchemaRegion() throws MetadataException {
Map<PartialPath, List<SchemaRegionId>> partitionTable = new HashMap<>();
File schemaDir = new File(config.getSchemaDir());
File[] sgDirList = schemaDir.listFiles();
if (sgDirList == null) {
return partitionTable;
}
// recover SchemaRegion concurrently
ExecutorService schemaRegionRecoverPools =
IoTDBThreadPoolFactory.newFixedThreadPool(
Runtime.getRuntime().availableProcessors(), "SchemaRegion-recover-task");
List<Future<ISchemaRegion>> futures = new ArrayList<>();
for (File file : sgDirList) {
if (!file.isDirectory()) {
continue;
}
PartialPath storageGroup;
try {
storageGroup = new PartialPath(file.getName());
} catch (IllegalPathException illegalPathException) {
// not a legal sg dir
continue;
}
List<SchemaRegionId> schemaRegionIdList = new ArrayList<>();
partitionTable.put(storageGroup, schemaRegionIdList);
File sgDir = new File(config.getSchemaDir(), storageGroup.getFullPath());
if (!sgDir.exists()) {
continue;
}
File[] schemaRegionDirs = sgDir.listFiles();
if (schemaRegionDirs == null) {
continue;
}
for (File schemaRegionDir : schemaRegionDirs) {
SchemaRegionId schemaRegionId;
try {
schemaRegionId = new SchemaRegionId(Integer.parseInt(schemaRegionDir.getName()));
} catch (NumberFormatException e) {
// the dir/file is not schemaRegionDir, ignore this.
continue;
}
futures.add(
schemaRegionRecoverPools.submit(recoverSchemaRegionTask(storageGroup, schemaRegionId)));
schemaRegionIdList.add(schemaRegionId);
}
}
for (Future<ISchemaRegion> future : futures) {
try {
ISchemaRegion schemaRegion = future.get();
schemaRegionMap.put(schemaRegion.getSchemaRegionId(), schemaRegion);
} catch (ExecutionException | InterruptedException | RuntimeException e) {
logger.error("Something wrong happened during SchemaRegion recovery: " + e.getMessage());
e.printStackTrace();
}
}
schemaRegionRecoverPools.shutdown();
return partitionTable;
}
public void forceMlog() {
if (schemaRegionMap != null) {
for (ISchemaRegion schemaRegion : schemaRegionMap.values()) {
schemaRegion.forceMlog();
}
}
}
public void clear() {
SchemaResourceManager.clearSchemaResource();
if (timedForceMLogThread != null) {
timedForceMLogThread.shutdown();
timedForceMLogThread = null;
}
if (schemaRegionMap != null) {
for (ISchemaRegion schemaRegion : schemaRegionMap.values()) {
schemaRegion.clear();
}
schemaRegionMap.clear();
schemaRegionMap = null;
}
if (sharedPrefixTree != null) {
sharedPrefixTree.clear();
sharedPrefixTree = null;
}
}
public ISchemaRegion getSchemaRegion(SchemaRegionId regionId) {
return schemaRegionMap.get(regionId);
}
public Collection<ISchemaRegion> getAllSchemaRegions() {
return schemaRegionMap.values();
}
public synchronized void createSchemaRegion(
PartialPath storageGroup, SchemaRegionId schemaRegionId) throws MetadataException {
ISchemaRegion schemaRegion = schemaRegionMap.get(schemaRegionId);
if (schemaRegion != null) {
if (schemaRegion.getStorageGroupFullPath().equals(storageGroup.getFullPath())) {
return;
} else {
throw new MetadataException(
String.format(
"SchemaRegion [%s] is duplicated between [%s] and [%s], "
+ "and the former one has been recovered.",
schemaRegionId,
schemaRegion.getStorageGroupFullPath(),
storageGroup.getFullPath()));
}
}
schemaRegionMap.put(
schemaRegionId, createSchemaRegionWithoutExistenceCheck(storageGroup, schemaRegionId));
}
private Callable<ISchemaRegion> recoverSchemaRegionTask(
PartialPath storageGroup, SchemaRegionId schemaRegionId) {
// this method is called for concurrent recovery of schema regions
return () -> {
long timeRecord = System.currentTimeMillis();
try {
// TODO: handle duplicated regionId across different storage group
ISchemaRegion schemaRegion =
createSchemaRegionWithoutExistenceCheck(storageGroup, schemaRegionId);
timeRecord = System.currentTimeMillis() - timeRecord;
logger.info(
String.format(
"Recover [%s] spend: %s ms",
storageGroup.concatNode(schemaRegionId.toString()), timeRecord));
return schemaRegion;
} catch (MetadataException e) {
logger.error(
String.format(
"SchemaRegion [%d] in StorageGroup [%s] failed to recover.",
schemaRegionId.getId(), storageGroup.getFullPath()));
throw new RuntimeException(e);
}
};
}
private ISchemaRegion createSchemaRegionWithoutExistenceCheck(
PartialPath storageGroup, SchemaRegionId schemaRegionId) throws MetadataException {
ISchemaRegion schemaRegion;
IStorageGroupMNode storageGroupMNode = ensureStorageGroupByStorageGroupPath(storageGroup);
switch (this.schemaRegionStoredMode) {
case Memory:
schemaRegion =
new SchemaRegionMemoryImpl(
storageGroup, schemaRegionId, storageGroupMNode, seriesNumerLimiter);
break;
case Schema_File:
schemaRegion =
new SchemaRegionSchemaFileImpl(
storageGroup, schemaRegionId, storageGroupMNode, seriesNumerLimiter);
break;
case Rocksdb_based:
schemaRegion =
new RSchemaRegionLoader()
.loadRSchemaRegion(storageGroup, schemaRegionId, storageGroupMNode);
break;
default:
throw new UnsupportedOperationException(
String.format(
"This mode [%s] is not supported. Please check and modify it.",
schemaRegionStoredMode));
}
return schemaRegion;
}
private IStorageGroupMNode ensureStorageGroupByStorageGroupPath(PartialPath storageGroup)
throws MetadataException {
try {
return sharedPrefixTree.getStorageGroupNodeByStorageGroupPath(storageGroup);
} catch (StorageGroupNotSetException e) {
try {
sharedPrefixTree.setStorageGroup(storageGroup);
} catch (StorageGroupAlreadySetException storageGroupAlreadySetException) {
// do nothing
// concurrent timeseries creation may result concurrent ensureStorageGroup
// it's ok that the storageGroup has already been set
if (storageGroupAlreadySetException.isHasChild()) {
// if setStorageGroup failure is because of child, the deviceNode should not be created.
// Timeseries can't be created under a deviceNode without storageGroup.
throw storageGroupAlreadySetException;
}
}
return sharedPrefixTree.getStorageGroupNodeByStorageGroupPath(storageGroup);
}
}
public synchronized void deleteSchemaRegion(SchemaRegionId schemaRegionId)
throws MetadataException {
ISchemaRegion schemaRegion = schemaRegionMap.get(schemaRegionId);
if (schemaRegion == null) {
logger.warn("SchemaRegion(id = {}) has been deleted, skiped", schemaRegionId);
return;
}
schemaRegion.deleteSchemaRegion();
schemaRegionMap.remove(schemaRegionId);
// check whether the sg dir is empty
File sgDir = new File(config.getSchemaDir(), schemaRegion.getStorageGroupFullPath());
File[] regionDirList =
sgDir.listFiles(
(dir, name) -> {
try {
Integer.parseInt(name);
return true;
} catch (NumberFormatException e) {
return false;
}
});
// remove the empty sg dir
if (regionDirList == null || regionDirList.length == 0) {
if (sgDir.exists()) {
FileUtils.deleteDirectory(sgDir);
}
sharedPrefixTree.deleteStorageGroup(new PartialPath(schemaRegion.getStorageGroupFullPath()));
}
}
public void setSeriesNumerLimiter(ISeriesNumerLimiter seriesNumerLimiter) {
this.seriesNumerLimiter = seriesNumerLimiter;
}
}