blob: bca3bd490d4301782db7cd46f74ec059a5ebdadd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.schemaengine;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.schemaengine.metric.ISchemaRegionMetric;
import org.apache.iotdb.db.schemaengine.metric.SchemaMetricManager;
import org.apache.iotdb.db.schemaengine.rescon.CachedSchemaEngineStatistics;
import org.apache.iotdb.db.schemaengine.rescon.DataNodeSchemaQuotaManager;
import org.apache.iotdb.db.schemaengine.rescon.ISchemaEngineStatistics;
import org.apache.iotdb.db.schemaengine.rescon.MemSchemaEngineStatistics;
import org.apache.iotdb.db.schemaengine.rescon.SchemaResourceManager;
import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegionParams;
import org.apache.iotdb.db.schemaengine.schemaregion.SchemaRegionLoader;
import org.apache.iotdb.db.schemaengine.schemaregion.SchemaRegionParams;
import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
// manage all the schemaRegion in this dataNode
public class SchemaEngine {
private static final Logger logger = LoggerFactory.getLogger(SchemaEngine.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private final SchemaRegionLoader schemaRegionLoader;
@SuppressWarnings("java:S3077")
private volatile Map<SchemaRegionId, ISchemaRegion> schemaRegionMap;
private ScheduledExecutorService timedForceMLogThread;
private ISchemaEngineStatistics schemaEngineStatistics;
private SchemaMetricManager schemaMetricManager;
private final DataNodeSchemaQuotaManager schemaQuotaManager =
DataNodeSchemaQuotaManager.getInstance();
private static class SchemaEngineManagerHolder {
private static final SchemaEngine INSTANCE = new SchemaEngine();
private SchemaEngineManagerHolder() {}
}
private SchemaEngine() {
schemaRegionLoader = new SchemaRegionLoader();
}
public static SchemaEngine getInstance() {
return SchemaEngineManagerHolder.INSTANCE;
}
public void init() {
logger.info(
"used schema engine mode: {}.",
CommonDescriptor.getInstance().getConfig().getSchemaEngineMode());
schemaRegionLoader.init(CommonDescriptor.getInstance().getConfig().getSchemaEngineMode());
initSchemaEngineStatistics();
SchemaResourceManager.initSchemaResource(schemaEngineStatistics);
// CachedSchemaEngineMetric depend on CacheMemoryManager, so it should be initialized after
// CacheMemoryManager
schemaMetricManager = new SchemaMetricManager(schemaEngineStatistics);
schemaRegionMap = new ConcurrentHashMap<>();
initSchemaRegion();
if (!(config.isClusterMode()
&& config
.getSchemaRegionConsensusProtocolClass()
.equals(ConsensusFactory.RATIS_CONSENSUS))
&& config.getSyncMlogPeriodInMs() != 0) {
timedForceMLogThread =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
ThreadName.SCHEMA_FORCE_MLOG.getName());
ScheduledExecutorUtil.unsafelyScheduleAtFixedRate(
timedForceMLogThread,
this::forceMlog,
config.getSyncMlogPeriodInMs(),
config.getSyncMlogPeriodInMs(),
TimeUnit.MILLISECONDS);
}
}
/**
* Scan the database and schema region directories to recover schema regions and return the
* collected local schema partition info for localSchemaPartitionTable recovery.
*/
@SuppressWarnings("java:S2142")
private void initSchemaRegion() {
File schemaDir = new File(config.getSchemaDir());
File[] sgDirList = schemaDir.listFiles();
if (sgDirList == null) {
return;
}
// recover SchemaRegion concurrently
ExecutorService schemaRegionRecoverPools =
IoTDBThreadPoolFactory.newFixedThreadPool(
Runtime.getRuntime().availableProcessors(),
ThreadName.SCHEMA_REGION_RECOVER_TASK.getName());
List<Future<ISchemaRegion>> futures = new ArrayList<>();
for (File file : sgDirList) {
if (!file.isDirectory()) {
continue;
}
PartialPath storageGroup;
try {
storageGroup = new PartialPath(file.getName());
} catch (IllegalPathException illegalPathException) {
// not a legal sg dir
continue;
}
File sgDir = new File(config.getSchemaDir(), storageGroup.getFullPath());
if (!sgDir.exists()) {
continue;
}
File[] schemaRegionDirs = sgDir.listFiles();
if (schemaRegionDirs == null) {
continue;
}
for (File schemaRegionDir : schemaRegionDirs) {
SchemaRegionId schemaRegionId;
try {
schemaRegionId = new SchemaRegionId(Integer.parseInt(schemaRegionDir.getName()));
} catch (NumberFormatException e) {
// the dir/file is not schemaRegionDir, ignore this.
continue;
}
futures.add(
schemaRegionRecoverPools.submit(recoverSchemaRegionTask(storageGroup, schemaRegionId)));
}
}
for (Future<ISchemaRegion> future : futures) {
try {
ISchemaRegion schemaRegion = future.get();
schemaRegionMap.put(schemaRegion.getSchemaRegionId(), schemaRegion);
} catch (ExecutionException | InterruptedException | RuntimeException e) {
logger.error("Something wrong happened during SchemaRegion recovery", e);
}
}
schemaRegionRecoverPools.shutdown();
}
private void initSchemaEngineStatistics() {
if (CommonDescriptor.getInstance().getConfig().getSchemaEngineMode().equals("Memory")) {
schemaEngineStatistics = new MemSchemaEngineStatistics();
} else {
schemaEngineStatistics = new CachedSchemaEngineStatistics();
}
}
public void forceMlog() {
Map<SchemaRegionId, ISchemaRegion> schemaRegionMap = this.schemaRegionMap;
if (schemaRegionMap != null) {
for (ISchemaRegion schemaRegion : schemaRegionMap.values()) {
schemaRegion.forceMlog();
}
}
}
public void clear() {
schemaRegionLoader.clear();
// clearSchemaResource will shut down release and flush task in PBTree mode, which must be
// down before clear schema region
SchemaResourceManager.clearSchemaResource();
if (timedForceMLogThread != null) {
timedForceMLogThread.shutdown();
timedForceMLogThread = null;
}
if (schemaRegionMap != null) {
// SchemaEngineStatistics will be clear after clear all schema region
for (ISchemaRegion schemaRegion : schemaRegionMap.values()) {
schemaRegion.clear();
}
schemaRegionMap.clear();
schemaRegionMap = null;
}
// SchemaMetric should be cleared lastly
if (schemaMetricManager != null) {
schemaMetricManager.clear();
}
ClusterTemplateManager.getInstance().clear();
}
public ISchemaRegion getSchemaRegion(SchemaRegionId regionId) {
return schemaRegionMap.get(regionId);
}
public Collection<ISchemaRegion> getAllSchemaRegions() {
return schemaRegionMap.values();
}
public List<SchemaRegionId> getAllSchemaRegionIds() {
return new ArrayList<>(schemaRegionMap.keySet());
}
public synchronized void createSchemaRegion(
PartialPath storageGroup, SchemaRegionId schemaRegionId) throws MetadataException {
ISchemaRegion schemaRegion = schemaRegionMap.get(schemaRegionId);
if (schemaRegion != null) {
if (schemaRegion.getDatabaseFullPath().equals(storageGroup.getFullPath())) {
return;
} else {
throw new MetadataException(
String.format(
"SchemaRegion [%s] is duplicated between [%s] and [%s], "
+ "and the former one has been recovered.",
schemaRegionId, schemaRegion.getDatabaseFullPath(), storageGroup.getFullPath()));
}
}
schemaRegionMap.put(
schemaRegionId, createSchemaRegionWithoutExistenceCheck(storageGroup, schemaRegionId));
}
private Callable<ISchemaRegion> recoverSchemaRegionTask(
PartialPath storageGroup, SchemaRegionId schemaRegionId) {
// this method is called for concurrent recovery of schema regions
return () -> {
long timeRecord = System.currentTimeMillis();
try {
// TODO: handle duplicated regionId across different database
ISchemaRegion schemaRegion =
createSchemaRegionWithoutExistenceCheck(storageGroup, schemaRegionId);
timeRecord = System.currentTimeMillis() - timeRecord;
logger.info(
"Recover [{}] spend: {} ms",
storageGroup.concatNode(schemaRegionId.toString()),
timeRecord);
return schemaRegion;
} catch (MetadataException e) {
logger.error(
String.format(
"SchemaRegion [%d] in StorageGroup [%s] failed to recover.",
schemaRegionId.getId(), storageGroup.getFullPath()));
throw new RuntimeException(e);
}
};
}
private ISchemaRegion createSchemaRegionWithoutExistenceCheck(
PartialPath database, SchemaRegionId schemaRegionId) throws MetadataException {
ISchemaRegionParams schemaRegionParams =
new SchemaRegionParams(database, schemaRegionId, schemaEngineStatistics);
ISchemaRegion schemaRegion = schemaRegionLoader.createSchemaRegion(schemaRegionParams);
schemaMetricManager.addSchemaRegionMetric(
schemaRegionId.getId(), schemaRegion.getSchemaRegionMetric());
return schemaRegion;
}
public synchronized void deleteSchemaRegion(SchemaRegionId schemaRegionId)
throws MetadataException {
ISchemaRegion schemaRegion = schemaRegionMap.get(schemaRegionId);
if (schemaRegion == null) {
logger.warn("SchemaRegion(id = {}) has been deleted, skiped", schemaRegionId);
return;
}
schemaRegion.deleteSchemaRegion();
schemaMetricManager.removeSchemaRegionMetric(schemaRegionId.getId());
schemaRegionMap.remove(schemaRegionId);
// check whether the sg dir is empty
File sgDir = new File(config.getSchemaDir(), schemaRegion.getDatabaseFullPath());
File[] regionDirList =
sgDir.listFiles(
(dir, name) -> {
try {
Integer.parseInt(name);
return true;
} catch (NumberFormatException e) {
return false;
}
});
// remove the empty sg dir
if (regionDirList == null || regionDirList.length == 0) {
if (sgDir.exists()) {
FileUtils.deleteDirectory(sgDir);
}
}
}
public int getSchemaRegionNumber() {
return schemaRegionMap == null ? 0 : schemaRegionMap.size();
}
public Map<Integer, Long> countDeviceNumBySchemaRegion(List<Integer> schemaIds) {
Map<Integer, Long> deviceNum = new HashMap<>();
schemaRegionMap.entrySet().stream()
.filter(
entry ->
schemaIds.contains(entry.getKey().getId())
&& SchemaRegionConsensusImpl.getInstance().isLeader(entry.getKey()))
.forEach(
entry ->
deviceNum.put(
entry.getKey().getId(),
entry.getValue().getSchemaRegionStatistics().getDevicesNumber()));
return deviceNum;
}
public Map<Integer, Long> countTimeSeriesNumBySchemaRegion(List<Integer> schemaIds) {
Map<Integer, Long> timeSeriesNum = new HashMap<>();
schemaRegionMap.entrySet().stream()
.filter(
entry ->
schemaIds.contains(entry.getKey().getId())
&& SchemaRegionConsensusImpl.getInstance().isLeader(entry.getKey()))
.forEach(
entry ->
timeSeriesNum.put(
entry.getKey().getId(),
entry.getValue().getSchemaRegionStatistics().getSeriesNumber()));
return timeSeriesNum;
}
/**
* Update total count in schema quota manager and generate local count map response. If limit is
* not -1 and deviceNumMap/timeSeriesNumMap is null, fill deviceNumMap/timeSeriesNumMap of the
* SchemaRegion whose current node is the leader
*
* @param req heartbeat request
* @param resp heartbeat response
*/
public void updateAndFillSchemaCountMap(TDataNodeHeartbeatReq req, TDataNodeHeartbeatResp resp) {
// update DataNodeSchemaQuotaManager
schemaQuotaManager.updateRemain(
req.getTimeSeriesQuotaRemain(),
req.isSetDeviceQuotaRemain() ? req.getDeviceQuotaRemain() : -1);
if (schemaQuotaManager.isDeviceLimit()) {
if (resp.getRegionDeviceUsageMap() == null) {
resp.setRegionDeviceUsageMap(new HashMap<>());
}
Map<Integer, Long> tmp = resp.getRegionDeviceUsageMap();
SchemaRegionConsensusImpl.getInstance().getAllConsensusGroupIds().stream()
.filter(
consensusGroupId ->
SchemaRegionConsensusImpl.getInstance().isLeader(consensusGroupId))
.forEach(
consensusGroupId ->
tmp.put(
consensusGroupId.getId(),
Optional.ofNullable(schemaRegionMap.get(consensusGroupId))
.map(
schemaRegion ->
schemaRegion.getSchemaRegionStatistics().getDevicesNumber())
.orElse(0L)));
}
if (schemaQuotaManager.isSeriesLimit()) {
if (resp.getRegionSeriesUsageMap() == null) {
resp.setRegionSeriesUsageMap(new HashMap<>());
}
Map<Integer, Long> tmp = resp.getRegionSeriesUsageMap();
SchemaRegionConsensusImpl.getInstance().getAllConsensusGroupIds().stream()
.filter(
consensusGroupId ->
SchemaRegionConsensusImpl.getInstance().isLeader(consensusGroupId))
.forEach(
consensusGroupId ->
tmp.put(
consensusGroupId.getId(),
Optional.ofNullable(schemaRegionMap.get(consensusGroupId))
.map(
schemaRegion ->
schemaRegion.getSchemaRegionStatistics().getSeriesNumber())
.orElse(0L)));
}
}
@TestOnly
public ISchemaEngineStatistics getSchemaEngineStatistics() {
return schemaEngineStatistics;
}
public ISchemaRegionMetric getSchemaRegionMetric(int schemaRegionId) {
return schemaMetricManager.getSchemaRegionMetric(schemaRegionId);
}
}