blob: 8d26049ce620daad4444390f6c32427eb104d29b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.confignode.manager.schema;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.read.database.CountDatabasePlan;
import org.apache.iotdb.confignode.consensus.request.read.database.GetDatabasePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetAllSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetAllTemplateSetInfoPlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetPathsSetTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetTemplateSetInfoPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.AdjustMaxRegionGroupNumPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.DeleteDatabasePlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetDataReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeEnrichedPlan;
import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.DropSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.ExtendSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.PreUnsetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.RollbackPreUnsetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.UnsetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.response.database.CountDatabaseResp;
import org.apache.iotdb.confignode.consensus.response.database.DatabaseSchemaResp;
import org.apache.iotdb.confignode.consensus.response.partition.PathInfoResp;
import org.apache.iotdb.confignode.consensus.response.template.AllTemplateSetInfoResp;
import org.apache.iotdb.confignode.consensus.response.template.TemplateInfoResp;
import org.apache.iotdb.confignode.consensus.response.template.TemplateSetInfoResp;
import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseInfo;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.db.exception.metadata.DatabaseAlreadySetException;
import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
import org.apache.iotdb.db.schemaengine.template.Template;
import org.apache.iotdb.db.schemaengine.template.TemplateInternalRPCUpdateType;
import org.apache.iotdb.db.schemaengine.template.TemplateInternalRPCUtil;
import org.apache.iotdb.db.schemaengine.template.alter.TemplateExtendInfo;
import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import static org.apache.iotdb.commons.conf.IoTDBConstant.MAX_DATABASE_NAME_LENGTH;
/** The ClusterSchemaManager Manages cluster schemaengine read and write requests. */
public class ClusterSchemaManager {
private static final Logger LOGGER = LoggerFactory.getLogger(ClusterSchemaManager.class);
private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
private static final double SCHEMA_REGION_PER_DATA_NODE = CONF.getSchemaRegionPerDataNode();
private static final double DATA_REGION_PER_DATA_NODE = CONF.getDataRegionPerDataNode();
private final IManager configManager;
private final ClusterSchemaInfo clusterSchemaInfo;
private final ClusterSchemaQuotaStatistics schemaQuotaStatistics;
private final ReentrantLock createDatabaseLock = new ReentrantLock();
private static final String CONSENSUS_READ_ERROR =
"Failed in the read API executing the consensus layer due to: ";
private static final String CONSENSUS_WRITE_ERROR =
"Failed in the write API executing the consensus layer due to: ";
public ClusterSchemaManager(
IManager configManager,
ClusterSchemaInfo clusterSchemaInfo,
ClusterSchemaQuotaStatistics schemaQuotaStatistics) {
this.configManager = configManager;
this.clusterSchemaInfo = clusterSchemaInfo;
this.schemaQuotaStatistics = schemaQuotaStatistics;
}
// ======================================================
// Consensus read/write interfaces
// ======================================================
/** Set Database */
public TSStatus setDatabase(DatabaseSchemaPlan databaseSchemaPlan, boolean isGeneratedByPipe) {
TSStatus result;
if (databaseSchemaPlan.getSchema().getName().length() > MAX_DATABASE_NAME_LENGTH) {
IllegalPathException illegalPathException =
new IllegalPathException(
databaseSchemaPlan.getSchema().getName(),
"the length of database name shall not exceed " + MAX_DATABASE_NAME_LENGTH);
return RpcUtils.getStatus(
illegalPathException.getErrorCode(), illegalPathException.getMessage());
}
if (getPartitionManager().isDatabasePreDeleted(databaseSchemaPlan.getSchema().getName())) {
return RpcUtils.getStatus(
TSStatusCode.METADATA_ERROR,
String.format(
"Some other task is deleting database %s", databaseSchemaPlan.getSchema().getName()));
}
try {
createDatabaseLock.lock();
clusterSchemaInfo.isDatabaseNameValid(databaseSchemaPlan.getSchema().getName());
if (!databaseSchemaPlan.getSchema().getName().equals(SchemaConstant.SYSTEM_DATABASE)) {
clusterSchemaInfo.checkDatabaseLimit();
}
// Cache DatabaseSchema
result =
getConsensusManager()
.write(
isGeneratedByPipe
? new PipeEnrichedPlan(databaseSchemaPlan)
: databaseSchemaPlan);
// Bind Database metrics
PartitionMetrics.bindDatabaseRelatedMetricsWhenUpdate(
MetricService.getInstance(),
configManager,
databaseSchemaPlan.getSchema().getName(),
databaseSchemaPlan.getSchema().getDataReplicationFactor(),
databaseSchemaPlan.getSchema().getSchemaReplicationFactor());
// Adjust the maximum RegionGroup number of each Database
adjustMaxRegionGroupNum();
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
result = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
result.setMessage(e.getMessage());
} catch (MetadataException metadataException) {
// Reject if StorageGroup already set
if (metadataException instanceof IllegalPathException) {
result = new TSStatus(TSStatusCode.ILLEGAL_PATH.getStatusCode());
} else if (metadataException instanceof DatabaseAlreadySetException) {
result = new TSStatus(TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode());
} else if (metadataException instanceof SchemaQuotaExceededException) {
result = new TSStatus(TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode());
} else {
result = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
}
result.setMessage(metadataException.getMessage());
} finally {
createDatabaseLock.unlock();
}
return result;
}
/** Alter Database */
public TSStatus alterDatabase(DatabaseSchemaPlan databaseSchemaPlan, boolean isGeneratedByPipe) {
TSStatus result;
TDatabaseSchema databaseSchema = databaseSchemaPlan.getSchema();
if (!isDatabaseExist(databaseSchema.getName())) {
// Reject if Database doesn't exist
result = new TSStatus(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode());
result.setMessage(
"Failed to alter database. The Database " + databaseSchema.getName() + " doesn't exist.");
return result;
}
if (databaseSchema.isSetMinSchemaRegionGroupNum()) {
// Validate alter SchemaRegionGroupNum
int minSchemaRegionGroupNum =
getMinRegionGroupNum(databaseSchema.getName(), TConsensusGroupType.SchemaRegion);
if (databaseSchema.getMinSchemaRegionGroupNum() <= minSchemaRegionGroupNum) {
result = new TSStatus(TSStatusCode.DATABASE_CONFIG_ERROR.getStatusCode());
result.setMessage(
String.format(
"Failed to alter database. The SchemaRegionGroupNum could only be increased. "
+ "Current SchemaRegionGroupNum: %d, Alter SchemaRegionGroupNum: %d",
minSchemaRegionGroupNum, databaseSchema.getMinSchemaRegionGroupNum()));
return result;
}
}
if (databaseSchema.isSetMinDataRegionGroupNum()) {
// Validate alter DataRegionGroupNum
int minDataRegionGroupNum =
getMinRegionGroupNum(databaseSchema.getName(), TConsensusGroupType.DataRegion);
if (databaseSchema.getMinDataRegionGroupNum() <= minDataRegionGroupNum) {
result = new TSStatus(TSStatusCode.DATABASE_CONFIG_ERROR.getStatusCode());
result.setMessage(
String.format(
"Failed to alter database. The DataRegionGroupNum could only be increased. "
+ "Current DataRegionGroupNum: %d, Alter DataRegionGroupNum: %d",
minDataRegionGroupNum, databaseSchema.getMinDataRegionGroupNum()));
return result;
}
}
// Alter DatabaseSchema
try {
result =
getConsensusManager()
.write(
isGeneratedByPipe
? new PipeEnrichedPlan(databaseSchemaPlan)
: databaseSchemaPlan);
PartitionMetrics.bindDatabaseReplicationFactorMetricsWhenUpdate(
MetricService.getInstance(),
databaseSchemaPlan.getSchema().getName(),
databaseSchemaPlan.getSchema().getDataReplicationFactor(),
databaseSchemaPlan.getSchema().getSchemaReplicationFactor());
return result;
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
result = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
result.setMessage(e.getMessage());
return result;
}
}
/** Delete DatabaseSchema. */
public TSStatus deleteDatabase(DeleteDatabasePlan deleteDatabasePlan, boolean isGeneratedByPipe) {
TSStatus result;
try {
result =
getConsensusManager()
.write(
isGeneratedByPipe
? new PipeEnrichedPlan(deleteDatabasePlan)
: deleteDatabasePlan);
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
result = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
result.setMessage(e.getMessage());
}
if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
adjustMaxRegionGroupNum();
}
return result;
}
/**
* Count Databases by specified path pattern. Notice: including pre-deleted Database.
*
* <p>Notice: Only invoke this interface in ConfigManager
*
* @return CountDatabaseResp
*/
public CountDatabaseResp countMatchedDatabases(CountDatabasePlan countDatabasePlan) {
try {
return (CountDatabaseResp) getConsensusManager().read(countDatabasePlan);
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_READ_ERROR, e);
TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
res.setMessage(e.getMessage());
CountDatabaseResp response = new CountDatabaseResp();
response.setStatus(res);
return response;
}
}
/**
* Get DatabaseSchemas by specified path pattern. Notice: including pre-deleted Database
*
* <p>Notice: Only invoke this interface in ConfigManager
*
* @return DatabaseSchemaResp
*/
public DatabaseSchemaResp getMatchedDatabaseSchema(GetDatabasePlan getDatabasePlan) {
DatabaseSchemaResp resp;
try {
resp = (DatabaseSchemaResp) getConsensusManager().read(getDatabasePlan);
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_READ_ERROR, e);
TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
res.setMessage(e.getMessage());
resp = new DatabaseSchemaResp();
resp.setStatus(res);
}
List<String> preDeletedDatabaseList = new ArrayList<>();
for (String database : resp.getSchemaMap().keySet()) {
if (getPartitionManager().isDatabasePreDeleted(database)) {
preDeletedDatabaseList.add(database);
}
}
for (String preDeletedDatabase : preDeletedDatabaseList) {
resp.getSchemaMap().remove(preDeletedDatabase);
}
return resp;
}
/** Only used in cluster tool show Databases. */
public TShowDatabaseResp showDatabase(GetDatabasePlan getStorageGroupPlan) {
DatabaseSchemaResp databaseSchemaResp;
try {
databaseSchemaResp = (DatabaseSchemaResp) getConsensusManager().read(getStorageGroupPlan);
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_READ_ERROR, e);
TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
res.setMessage(e.getMessage());
databaseSchemaResp = new DatabaseSchemaResp();
databaseSchemaResp.setStatus(res);
}
if (databaseSchemaResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
// Return immediately if some Database doesn't exist
return new TShowDatabaseResp().setStatus(databaseSchemaResp.getStatus());
}
Map<String, TDatabaseInfo> infoMap = new ConcurrentHashMap<>();
for (TDatabaseSchema databaseSchema : databaseSchemaResp.getSchemaMap().values()) {
String database = databaseSchema.getName();
TDatabaseInfo databaseInfo = new TDatabaseInfo();
databaseInfo.setName(database);
databaseInfo.setTTL(databaseSchema.getTTL());
databaseInfo.setSchemaReplicationFactor(databaseSchema.getSchemaReplicationFactor());
databaseInfo.setDataReplicationFactor(databaseSchema.getDataReplicationFactor());
databaseInfo.setTimePartitionInterval(databaseSchema.getTimePartitionInterval());
databaseInfo.setMinSchemaRegionNum(
getMinRegionGroupNum(database, TConsensusGroupType.SchemaRegion));
databaseInfo.setMaxSchemaRegionNum(
getMaxRegionGroupNum(database, TConsensusGroupType.SchemaRegion));
databaseInfo.setMinDataRegionNum(
getMinRegionGroupNum(database, TConsensusGroupType.DataRegion));
databaseInfo.setMaxDataRegionNum(
getMaxRegionGroupNum(database, TConsensusGroupType.DataRegion));
try {
databaseInfo.setSchemaRegionNum(
getPartitionManager().getRegionGroupCount(database, TConsensusGroupType.SchemaRegion));
databaseInfo.setDataRegionNum(
getPartitionManager().getRegionGroupCount(database, TConsensusGroupType.DataRegion));
} catch (DatabaseNotExistsException e) {
// Skip pre-deleted Database
LOGGER.warn(
"The Database: {} doesn't exist. Maybe it has been pre-deleted.",
databaseSchema.getName());
continue;
}
infoMap.put(database, databaseInfo);
}
return new TShowDatabaseResp().setDatabaseInfoMap(infoMap).setStatus(StatusUtils.OK);
}
public Map<String, Long> getAllTTLInfo() {
List<String> databases = getDatabaseNames();
Map<String, Long> infoMap = new ConcurrentHashMap<>();
for (String database : databases) {
try {
infoMap.put(database, getTTL(database));
} catch (DatabaseNotExistsException e) {
LOGGER.warn("Database: {} doesn't exist", databases, e);
}
}
return infoMap;
}
/**
* Update TTL for the specific StorageGroup or all databases in a path
*
* @param setTTLPlan setTTLPlan
* @return {@link TSStatusCode#SUCCESS_STATUS} if successfully update the TTL, {@link
* TSStatusCode#DATABASE_NOT_EXIST} if the path doesn't exist
*/
public TSStatus setTTL(SetTTLPlan setTTLPlan, boolean isGeneratedByPipe) {
Map<String, TDatabaseSchema> storageSchemaMap =
clusterSchemaInfo.getMatchedDatabaseSchemasByOneName(setTTLPlan.getDatabasePathPattern());
if (storageSchemaMap.isEmpty()) {
return RpcUtils.getStatus(
TSStatusCode.DATABASE_NOT_EXIST,
"Path [" + new PartialPath(setTTLPlan.getDatabasePathPattern()) + "] does not exist");
}
// Map<DataNodeId, TDataNodeLocation>
Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
// Map<DataNodeId, StorageGroupPatterns>
Map<Integer, List<String>> dnlToSgMap = new ConcurrentHashMap<>();
for (String storageGroup : storageSchemaMap.keySet()) {
// Get related DataNodes
Set<TDataNodeLocation> dataNodeLocations =
getPartitionManager()
.getDatabaseRelatedDataNodes(storageGroup, TConsensusGroupType.DataRegion);
for (TDataNodeLocation dataNodeLocation : dataNodeLocations) {
dataNodeLocationMap.putIfAbsent(dataNodeLocation.getDataNodeId(), dataNodeLocation);
dnlToSgMap
.computeIfAbsent(dataNodeLocation.getDataNodeId(), empty -> new ArrayList<>())
.add(storageGroup);
}
}
AsyncClientHandler<TSetTTLReq, TSStatus> clientHandler =
new AsyncClientHandler<>(DataNodeRequestType.SET_TTL);
dnlToSgMap
.keySet()
.forEach(
dataNodeId -> {
TSetTTLReq setTTLReq =
new TSetTTLReq(dnlToSgMap.get(dataNodeId), setTTLPlan.getTTL());
clientHandler.putRequest(dataNodeId, setTTLReq);
clientHandler.putDataNodeLocation(dataNodeId, dataNodeLocationMap.get(dataNodeId));
});
// TODO: Check response
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
try {
return getConsensusManager()
.write(isGeneratedByPipe ? new PipeEnrichedPlan(setTTLPlan) : setTTLPlan);
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
TSStatus result = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
result.setMessage(e.getMessage());
return result;
}
}
public TSStatus setSchemaReplicationFactor(
SetSchemaReplicationFactorPlan setSchemaReplicationFactorPlan) {
// TODO: Inform DataNodes
try {
return getConsensusManager().write(setSchemaReplicationFactorPlan);
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
TSStatus result = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
result.setMessage(e.getMessage());
return result;
}
}
public TSStatus setDataReplicationFactor(
SetDataReplicationFactorPlan setDataReplicationFactorPlan) {
// TODO: Inform DataNodes
try {
return getConsensusManager().write(setDataReplicationFactorPlan);
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
TSStatus result = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
result.setMessage(e.getMessage());
return result;
}
}
public TSStatus setTimePartitionInterval(
SetTimePartitionIntervalPlan setTimePartitionIntervalPlan) {
// TODO: Inform DataNodes
try {
return getConsensusManager().write(setTimePartitionIntervalPlan);
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
TSStatus result = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
result.setMessage(e.getMessage());
return result;
}
}
/**
* Only leader use this interface. Adjust the maxSchemaRegionGroupNum and maxDataRegionGroupNum of
* each StorageGroup based on existing cluster resources
*/
public synchronized void adjustMaxRegionGroupNum() {
// Get all StorageGroupSchemas
Map<String, TDatabaseSchema> databaseSchemaMap =
getMatchedDatabaseSchemasByName(getDatabaseNames());
if (databaseSchemaMap.isEmpty()) {
// Skip when there are no StorageGroups
return;
}
int dataNodeNum = getNodeManager().getRegisteredDataNodeCount();
int databaseNum = databaseSchemaMap.size();
for (TDatabaseSchema databaseSchema : databaseSchemaMap.values()) {
if (!isDatabaseExist(databaseSchema.getName())
|| databaseSchema.getName().equals(SchemaConstant.SYSTEM_DATABASE)) {
// filter the pre deleted database and the system database
databaseNum--;
}
}
AdjustMaxRegionGroupNumPlan adjustMaxRegionGroupNumPlan = new AdjustMaxRegionGroupNumPlan();
for (TDatabaseSchema databaseSchema : databaseSchemaMap.values()) {
if (databaseSchema.getName().equals(SchemaConstant.SYSTEM_DATABASE)) {
// filter the system database
continue;
}
try {
// Adjust maxSchemaRegionGroupNum for each Database.
// All Databases share the DataNodes equally.
// The allocated SchemaRegionGroups will not be shrunk.
int allocatedSchemaRegionGroupCount;
try {
allocatedSchemaRegionGroupCount =
getPartitionManager()
.getRegionGroupCount(databaseSchema.getName(), TConsensusGroupType.SchemaRegion);
} catch (DatabaseNotExistsException e) {
// ignore the pre deleted database
continue;
}
int maxSchemaRegionGroupNum =
calcMaxRegionGroupNum(
databaseSchema.getMinSchemaRegionGroupNum(),
SCHEMA_REGION_PER_DATA_NODE,
dataNodeNum,
databaseNum,
databaseSchema.getSchemaReplicationFactor(),
allocatedSchemaRegionGroupCount);
LOGGER.info(
"[AdjustRegionGroupNum] The maximum number of SchemaRegionGroups for Database: {} is adjusted to: {}",
databaseSchema.getName(),
maxSchemaRegionGroupNum);
// Adjust maxDataRegionGroupNum for each Database.
// All Databases share the DataNodes equally.
// The allocated DataRegionGroups will not be shrunk.
int allocatedDataRegionGroupCount =
getPartitionManager()
.getRegionGroupCount(databaseSchema.getName(), TConsensusGroupType.DataRegion);
int maxDataRegionGroupNum =
calcMaxRegionGroupNum(
databaseSchema.getMinDataRegionGroupNum(),
DATA_REGION_PER_DATA_NODE,
dataNodeNum,
databaseNum,
databaseSchema.getDataReplicationFactor(),
allocatedDataRegionGroupCount);
LOGGER.info(
"[AdjustRegionGroupNum] The maximum number of DataRegionGroups for Database: {} is adjusted to: {}",
databaseSchema.getName(),
maxDataRegionGroupNum);
adjustMaxRegionGroupNumPlan.putEntry(
databaseSchema.getName(), new Pair<>(maxSchemaRegionGroupNum, maxDataRegionGroupNum));
} catch (DatabaseNotExistsException e) {
LOGGER.warn("Adjust maxRegionGroupNum failed because StorageGroup doesn't exist", e);
}
}
try {
getConsensusManager().write(adjustMaxRegionGroupNumPlan);
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
}
}
public static int calcMaxRegionGroupNum(
int minRegionGroupNum,
double resourceWeight,
int resource,
int databaseNum,
int replicationFactor,
int allocatedRegionGroupCount) {
return Math.max(
// The maxRegionGroupNum should be great or equal to the minRegionGroupNum
minRegionGroupNum,
Math.max(
(int)
// Use Math.ceil here to ensure that the maxRegionGroupNum
// will be increased as long as the number of cluster DataNodes is increased
Math.ceil(
// The maxRegionGroupNum of the current StorageGroup is expected to be:
// (resourceWeight * resource) / (createdStorageGroupNum * replicationFactor)
resourceWeight * resource / (databaseNum * replicationFactor)),
// The maxRegionGroupNum should be great or equal to the allocatedRegionGroupCount
allocatedRegionGroupCount));
}
// ======================================================
// Leader scheduling interfaces
// ======================================================
/**
* Check if the specified Database exists
*
* @param database The specified Database
* @return True if the DatabaseSchema is exists and the Database is not pre-deleted
*/
public boolean isDatabaseExist(String database) {
return getPartitionManager().isDatabaseExist(database);
}
/**
* Only leader use this interface. Get all Databases name
*
* @return List<DatabaseName>, all Databases' name
*/
public List<String> getDatabaseNames() {
return clusterSchemaInfo.getDatabaseNames().stream()
.filter(this::isDatabaseExist)
.collect(Collectors.toList());
}
/**
* Only leader use this interface. Get the specified Database's schemaengine
*
* @param database DatabaseName
* @return The specific DatabaseSchema
* @throws DatabaseNotExistsException When the specific Database doesn't exist
*/
public TDatabaseSchema getDatabaseSchemaByName(String database)
throws DatabaseNotExistsException {
if (!isDatabaseExist(database)) {
throw new DatabaseNotExistsException(database);
}
return clusterSchemaInfo.getMatchedDatabaseSchemaByName(database);
}
/**
* Only leader use this interface.
*
* @return The DatabaseName of the specified Device. Empty String if not exists.
*/
public String getDatabaseNameByDevice(String devicePath) {
List<String> databases = getDatabaseNames();
for (String database : databases) {
if (PathUtils.isStartWith(devicePath, database)) {
return database;
}
}
return "";
}
/**
* Only leader use this interface. Get the specified Databases' schemaengine
*
* @param rawPathList List<DatabaseName>
* @return the matched DatabaseSchemas
*/
public Map<String, TDatabaseSchema> getMatchedDatabaseSchemasByName(List<String> rawPathList) {
Map<String, TDatabaseSchema> result = new ConcurrentHashMap<>();
clusterSchemaInfo
.getMatchedDatabaseSchemasByName(rawPathList)
.forEach(
(database, databaseSchema) -> {
if (isDatabaseExist(database)) {
result.put(database, databaseSchema);
}
});
return result;
}
/**
* Only leader use this interface. Get the specified Databases' schemaengine
*
* @param prefix prefix full path
* @return the matched DatabaseSchemas
*/
public Map<String, TDatabaseSchema> getMatchedDatabaseSchemasByPrefix(PartialPath prefix) {
Map<String, TDatabaseSchema> result = new ConcurrentHashMap<>();
clusterSchemaInfo
.getMatchedDatabaseSchemasByPrefix(prefix)
.forEach(
(database, databaseSchema) -> {
if (isDatabaseExist(database)) {
result.put(database, databaseSchema);
}
});
return result;
}
/**
* Only leader use this interface. Get the TTL of specified Database
*
* @param database DatabaseName
* @throws DatabaseNotExistsException When the specified Database doesn't exist
*/
public long getTTL(String database) throws DatabaseNotExistsException {
return getDatabaseSchemaByName(database).getTTL();
}
/**
* Only leader use this interface. Get the replication factor of specified Database
*
* @param database DatabaseName
* @param consensusGroupType SchemaRegion or DataRegion
* @return SchemaReplicationFactor or DataReplicationFactor
* @throws DatabaseNotExistsException When the specific StorageGroup doesn't exist
*/
public int getReplicationFactor(String database, TConsensusGroupType consensusGroupType)
throws DatabaseNotExistsException {
TDatabaseSchema storageGroupSchema = getDatabaseSchemaByName(database);
return TConsensusGroupType.SchemaRegion.equals(consensusGroupType)
? storageGroupSchema.getSchemaReplicationFactor()
: storageGroupSchema.getDataReplicationFactor();
}
/**
* Only leader use this interface. Get the maxRegionGroupNum of specified Database.
*
* @param database DatabaseName
* @param consensusGroupType SchemaRegion or DataRegion
* @return minSchemaRegionGroupNum or minDataRegionGroupNum
*/
public int getMinRegionGroupNum(String database, TConsensusGroupType consensusGroupType) {
return clusterSchemaInfo.getMinRegionGroupNum(database, consensusGroupType);
}
/**
* Only leader use this interface. Get the maxRegionGroupNum of specified Database.
*
* @param database DatabaseName
* @param consensusGroupType SchemaRegion or DataRegion
* @return maxSchemaRegionGroupNum or maxDataRegionGroupNum
*/
public int getMaxRegionGroupNum(String database, TConsensusGroupType consensusGroupType) {
return clusterSchemaInfo.getMaxRegionGroupNum(database, consensusGroupType);
}
/**
* create schemaengine template
*
* @param createSchemaTemplatePlan CreateSchemaTemplatePlan
* @return TSStatus
*/
public TSStatus createTemplate(CreateSchemaTemplatePlan createSchemaTemplatePlan) {
try {
return getConsensusManager().write(createSchemaTemplatePlan);
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
res.setMessage(e.getMessage());
return res;
}
}
/**
* show schemaengine template
*
* @return TGetAllTemplatesResp
*/
public TGetAllTemplatesResp getAllTemplates() {
GetAllSchemaTemplatePlan getAllSchemaTemplatePlan = new GetAllSchemaTemplatePlan();
TemplateInfoResp templateResp;
try {
templateResp = (TemplateInfoResp) getConsensusManager().read(getAllSchemaTemplatePlan);
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_READ_ERROR, e);
TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
res.setMessage(e.getMessage());
templateResp = new TemplateInfoResp();
templateResp.setStatus(res);
}
TGetAllTemplatesResp resp = new TGetAllTemplatesResp();
resp.setStatus(templateResp.getStatus());
if (resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& templateResp.getTemplateList() != null) {
List<ByteBuffer> list = new ArrayList<>();
templateResp.getTemplateList().forEach(template -> list.add(template.serialize()));
resp.setTemplateList(list);
}
return resp;
}
/** show nodes in schemaengine template */
public TGetTemplateResp getTemplate(String req) {
GetSchemaTemplatePlan getSchemaTemplatePlan = new GetSchemaTemplatePlan(req);
TemplateInfoResp templateResp;
try {
templateResp = (TemplateInfoResp) getConsensusManager().read(getSchemaTemplatePlan);
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_READ_ERROR, e);
TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
res.setMessage(e.getMessage());
templateResp = new TemplateInfoResp();
templateResp.setStatus(res);
}
TGetTemplateResp resp = new TGetTemplateResp();
if (templateResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& templateResp.getTemplateList() != null
&& !templateResp.getTemplateList().isEmpty()) {
ByteBuffer byteBuffer = templateResp.getTemplateList().get(0).serialize();
resp.setTemplate(byteBuffer);
}
resp.setStatus(templateResp.getStatus());
return resp;
}
/** Get template by id. Only leader uses this interface. */
public Template getTemplate(int id) throws MetadataException {
return clusterSchemaInfo.getTemplate(id);
}
/** show path set template xx */
public TGetPathsSetTemplatesResp getPathsSetTemplate(String templateName, PathPatternTree scope) {
GetPathsSetTemplatePlan getPathsSetTemplatePlan =
new GetPathsSetTemplatePlan(templateName, scope);
PathInfoResp pathInfoResp;
try {
pathInfoResp = (PathInfoResp) getConsensusManager().read(getPathsSetTemplatePlan);
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_READ_ERROR, e);
TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
res.setMessage(e.getMessage());
pathInfoResp = new PathInfoResp();
pathInfoResp.setStatus(res);
}
if (pathInfoResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
TGetPathsSetTemplatesResp resp = new TGetPathsSetTemplatesResp();
resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
resp.setPathList(pathInfoResp.getPathList());
return resp;
} else {
return new TGetPathsSetTemplatesResp(pathInfoResp.getStatus());
}
}
/**
* get all template set and pre-set info to sync to a registering dataNodes, the pre unset
* template info won't be taken
*/
public byte[] getAllTemplateSetInfo() {
try {
AllTemplateSetInfoResp resp =
(AllTemplateSetInfoResp) getConsensusManager().read(new GetAllTemplateSetInfoPlan());
return resp.getTemplateInfo();
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_READ_ERROR, e);
return new byte[0];
}
}
public TemplateSetInfoResp getTemplateSetInfo(List<PartialPath> patternList) {
try {
return (TemplateSetInfoResp)
getConsensusManager().read(new GetTemplateSetInfoPlan(patternList));
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_READ_ERROR, e);
TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
res.setMessage(e.getMessage());
TemplateSetInfoResp response = new TemplateSetInfoResp();
response.setStatus(res);
return response;
}
}
public Pair<TSStatus, Template> checkIsTemplateSetOnPath(String templateName, String path) {
GetSchemaTemplatePlan getSchemaTemplatePlan = new GetSchemaTemplatePlan(templateName);
TemplateInfoResp templateResp;
try {
templateResp = (TemplateInfoResp) getConsensusManager().read(getSchemaTemplatePlan);
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_READ_ERROR, e);
TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
res.setMessage(e.getMessage());
templateResp = new TemplateInfoResp();
templateResp.setStatus(res);
}
if (templateResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
if (templateResp.getTemplateList() == null || templateResp.getTemplateList().isEmpty()) {
return new Pair<>(
RpcUtils.getStatus(
TSStatusCode.UNDEFINED_TEMPLATE.getStatusCode(),
String.format("Undefined template name: %s", templateName)),
null);
}
} else {
return new Pair<>(templateResp.getStatus(), null);
}
GetPathsSetTemplatePlan getPathsSetTemplatePlan =
new GetPathsSetTemplatePlan(templateName, SchemaConstant.ALL_MATCH_SCOPE);
PathInfoResp pathInfoResp;
try {
pathInfoResp = (PathInfoResp) getConsensusManager().read(getPathsSetTemplatePlan);
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_READ_ERROR, e);
TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
res.setMessage(e.getMessage());
pathInfoResp = new PathInfoResp();
pathInfoResp.setStatus(res);
}
if (pathInfoResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
List<String> templateSetPathList = pathInfoResp.getPathList();
if (templateSetPathList == null
|| templateSetPathList.isEmpty()
|| !pathInfoResp.getPathList().contains(path)) {
return new Pair<>(
RpcUtils.getStatus(
TSStatusCode.TEMPLATE_NOT_SET.getStatusCode(),
String.format("No template on %s", path)),
null);
} else {
return new Pair<>(templateResp.getStatus(), templateResp.getTemplateList().get(0));
}
} else {
return new Pair<>(pathInfoResp.getStatus(), null);
}
}
public TSStatus preUnsetSchemaTemplate(int templateId, PartialPath path) {
try {
return getConsensusManager().write(new PreUnsetSchemaTemplatePlan(templateId, path));
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
TSStatus result = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
result.setMessage(e.getMessage());
return result;
}
}
public TSStatus rollbackPreUnsetSchemaTemplate(int templateId, PartialPath path) {
try {
return getConsensusManager().write(new RollbackPreUnsetSchemaTemplatePlan(templateId, path));
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
TSStatus result = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
result.setMessage(e.getMessage());
return result;
}
}
public TSStatus unsetSchemaTemplateInBlackList(
int templateId, PartialPath path, boolean isGeneratedByPipe) {
try {
return getConsensusManager()
.write(
isGeneratedByPipe
? new PipeEnrichedPlan(new UnsetSchemaTemplatePlan(templateId, path))
: new UnsetSchemaTemplatePlan(templateId, path));
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
TSStatus result = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
result.setMessage(e.getMessage());
return result;
}
}
public synchronized TSStatus dropSchemaTemplate(String templateName) {
// check template existence
GetSchemaTemplatePlan getSchemaTemplatePlan = new GetSchemaTemplatePlan(templateName);
TemplateInfoResp templateInfoResp;
try {
templateInfoResp = (TemplateInfoResp) getConsensusManager().read(getSchemaTemplatePlan);
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_READ_ERROR, e);
TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
res.setMessage(e.getMessage());
templateInfoResp = new TemplateInfoResp();
templateInfoResp.setStatus(res);
}
if (templateInfoResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return templateInfoResp.getStatus();
} else if (templateInfoResp.getTemplateList() == null
|| templateInfoResp.getTemplateList().isEmpty()) {
return RpcUtils.getStatus(
TSStatusCode.UNDEFINED_TEMPLATE.getStatusCode(),
String.format("Undefined template name: %s", templateName));
}
// check is template set on some path, block all template set operation
GetPathsSetTemplatePlan getPathsSetTemplatePlan =
new GetPathsSetTemplatePlan(templateName, SchemaConstant.ALL_MATCH_SCOPE);
PathInfoResp pathInfoResp;
try {
pathInfoResp = (PathInfoResp) getConsensusManager().read(getPathsSetTemplatePlan);
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_READ_ERROR, e);
TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
res.setMessage(e.getMessage());
pathInfoResp = new PathInfoResp();
pathInfoResp.setStatus(res);
}
if (pathInfoResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return pathInfoResp.getStatus();
} else if (pathInfoResp.getPathList() != null && !pathInfoResp.getPathList().isEmpty()) {
return RpcUtils.getStatus(
TSStatusCode.METADATA_ERROR.getStatusCode(),
String.format(
"Template [%s] has been set on MTree, cannot be dropped now.", templateName));
}
// execute drop template
try {
return getConsensusManager().write(new DropSchemaTemplatePlan(templateName));
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
TSStatus result = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
result.setMessage(e.getMessage());
return result;
}
}
public synchronized TSStatus extendSchemaTemplate(
TemplateExtendInfo templateExtendInfo, boolean isGeneratedByPipe) {
if (templateExtendInfo.getEncodings() != null) {
for (int i = 0; i < templateExtendInfo.getDataTypes().size(); i++) {
try {
SchemaUtils.checkDataTypeWithEncoding(
templateExtendInfo.getDataTypes().get(i), templateExtendInfo.getEncodings().get(i));
} catch (MetadataException e) {
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
}
}
TemplateInfoResp resp =
clusterSchemaInfo.getTemplate(
new GetSchemaTemplatePlan(templateExtendInfo.getTemplateName()));
if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return resp.getStatus();
}
Template template = resp.getTemplateList().get(0);
List<String> intersectionMeasurements =
templateExtendInfo.updateAsDifferenceAndGetIntersection(template.getSchemaMap().keySet());
if (templateExtendInfo.isEmpty()) {
if (intersectionMeasurements.isEmpty()) {
return RpcUtils.SUCCESS_STATUS;
} else {
return RpcUtils.getStatus(
TSStatusCode.MEASUREMENT_ALREADY_EXISTS_IN_TEMPLATE,
String.format(
"Measurement %s already exist in schemaengine template %s",
intersectionMeasurements, template.getName()));
}
}
ExtendSchemaTemplatePlan extendSchemaTemplatePlan =
new ExtendSchemaTemplatePlan(templateExtendInfo);
TSStatus status;
try {
status =
getConsensusManager()
.write(
isGeneratedByPipe
? new PipeEnrichedPlan(extendSchemaTemplatePlan)
: extendSchemaTemplatePlan);
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
status = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
status.setMessage(e.getMessage());
}
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
template =
clusterSchemaInfo
.getTemplate(new GetSchemaTemplatePlan(templateExtendInfo.getTemplateName()))
.getTemplateList()
.get(0);
TUpdateTemplateReq updateTemplateReq = new TUpdateTemplateReq();
updateTemplateReq.setType(TemplateInternalRPCUpdateType.UPDATE_TEMPLATE_INFO.toByte());
updateTemplateReq.setTemplateInfo(
TemplateInternalRPCUtil.generateUpdateTemplateInfoBytes(template));
Map<Integer, TDataNodeLocation> dataNodeLocationMap =
configManager.getNodeManager().getRegisteredDataNodeLocations();
AsyncClientHandler<TUpdateTemplateReq, TSStatus> clientHandler =
new AsyncClientHandler<>(
DataNodeRequestType.UPDATE_TEMPLATE, updateTemplateReq, dataNodeLocationMap);
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap();
for (Map.Entry<Integer, TSStatus> entry : statusMap.entrySet()) {
if (entry.getValue().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.warn(
"Failed to sync template {} extension info to DataNode {}",
template.getName(),
dataNodeLocationMap.get(entry.getKey()));
return RpcUtils.getStatus(
TSStatusCode.EXECUTE_STATEMENT_ERROR,
String.format(
"Failed to sync template %s extension info to DataNode %s",
template.getName(), dataNodeLocationMap.get(entry.getKey())));
}
}
if (intersectionMeasurements.isEmpty()) {
return RpcUtils.SUCCESS_STATUS;
} else {
return RpcUtils.getStatus(
TSStatusCode.MEASUREMENT_ALREADY_EXISTS_IN_TEMPLATE,
String.format(
"Measurement %s already exist in schemaengine template %s",
intersectionMeasurements, template.getName()));
}
}
/**
* Only leader use this interface. Get the remain schema quota of specified schema region.
*
* @return pair of <series quota, device quota>, -1 means no limit
*/
public Pair<Long, Long> getSchemaQuotaRemain() {
boolean isDeviceLimit = schemaQuotaStatistics.getDeviceThreshold() != -1;
boolean isSeriesLimit = schemaQuotaStatistics.getSeriesThreshold() != -1;
if (isSeriesLimit || isDeviceLimit) {
Set<Integer> schemaPartitionSet = getPartitionManager().getAllSchemaPartition();
return new Pair<>(
isSeriesLimit ? schemaQuotaStatistics.getSeriesQuotaRemain(schemaPartitionSet) : -1L,
isDeviceLimit ? schemaQuotaStatistics.getDeviceQuotaRemain(schemaPartitionSet) : -1L);
} else {
return new Pair<>(-1L, -1L);
}
}
public void updateTimeSeriesUsage(Map<Integer, Long> seriesUsage) {
schemaQuotaStatistics.updateTimeSeriesUsage(seriesUsage);
}
public void updateDeviceUsage(Map<Integer, Long> deviceUsage) {
schemaQuotaStatistics.updateDeviceUsage(deviceUsage);
}
public void updateSchemaQuotaConfiguration(long seriesThreshold, long deviceThreshold) {
schemaQuotaStatistics.setDeviceThreshold(deviceThreshold);
schemaQuotaStatistics.setSeriesThreshold(seriesThreshold);
}
public void clearSchemaQuotaCache() {
schemaQuotaStatistics.clear();
}
private NodeManager getNodeManager() {
return configManager.getNodeManager();
}
private PartitionManager getPartitionManager() {
return configManager.getPartitionManager();
}
private ConsensusManager getConsensusManager() {
return configManager.getConsensusManager();
}
}