[IOTDB-4919] add space quota snapshot and show space quota (#7968)
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 index 63b717c..f0296d5 100644 --- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 +++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -45,6 +45,7 @@ | showFunctions | showTriggers | showContinuousQueries | showTTL | showAllTTL | showCluster | showClusterDetails | showRegion | showDataNodes | showConfigNodes | showSchemaTemplates | showNodesInSchemaTemplate | showPathsUsingSchemaTemplate | showPathsSetSchemaTemplate + | showSpaceQuota | countStorageGroup | countDevices | countTimeseries | countNodes | getRegionId | getTimeSlotList | getSeriesSlotList ; @@ -384,6 +385,11 @@ : SHOW PATHS prefixPath? USING SCHEMA? TEMPLATE templateName=identifier ; +// Show Space Quota +showSpaceQuota + : SHOW SPACE QUOTA (prefixPath (COMMA prefixPath)*)? + ; + // Count Storage Group countStorageGroup : COUNT (STORAGE GROUP | DATABASES) prefixPath?
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java index ff236ce..096b818 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -85,5 +85,8 @@ CONSTRUCT_SCHEMA_BLACK_LIST_WITH_TEMPLATE, ROLLBACK_SCHEMA_BLACK_LIST_WITH_TEMPLATE, DEACTIVATE_TEMPLATE, - COUNT_PATHS_USING_TEMPLATE + COUNT_PATHS_USING_TEMPLATE, + + /** Quota */ + SET_SPACE_QUOTA }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java index bfd686a..a94565d 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
@@ -21,6 +21,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TFlushReq; +import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq; import org.apache.iotdb.common.rpc.thrift.TSetTTLReq; import org.apache.iotdb.commons.client.ClientPoolFactory; import org.apache.iotdb.commons.client.IClientManager; @@ -316,6 +317,12 @@ (CountPathsUsingTemplateRPCHandler) clientHandler.createAsyncRPCHandler(requestId, targetDataNode)); break; + case SET_SPACE_QUOTA: + client.setSpaceQuota( + (TSetSpaceQuotaReq) clientHandler.getRequest(requestId), + (AsyncTSStatusRPCHandler) + clientHandler.createAsyncRPCHandler(requestId, targetDataNode)); + break; default: LOGGER.error( "Unexpected DataNode Request Type: {} when sendAsyncRequestToDataNode",
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java index 6240d49..c594782 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -346,6 +346,11 @@ loadRatisConsensusConfig(properties); loadCQConfig(properties); + loadQuotaConfig(properties); + } + + private void loadQuotaConfig(Properties properties) { + conf.setSpaceQuotaDir(properties.getProperty("space_quota_dir", conf.getSpaceQuotaDir())); } private void loadRatisConsensusConfig(Properties properties) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/quota/SetSpaceQuotaPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/quota/SetSpaceQuotaPlan.java index 0ec2f38..5a5ab0e 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/quota/SetSpaceQuotaPlan.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/quota/SetSpaceQuotaPlan.java
@@ -67,7 +67,7 @@ BasicStructureSerDeUtil.write(prefixPathList, stream); BasicStructureSerDeUtil.write(spaceLimit.getDeviceNum(), stream); BasicStructureSerDeUtil.write(spaceLimit.getTimeserieNum(), stream); - BasicStructureSerDeUtil.write(spaceLimit.getDisk(), stream); + BasicStructureSerDeUtil.write(spaceLimit.getDiskSize(), stream); } @Override @@ -80,7 +80,7 @@ TSpaceQuota spaceLimit = new TSpaceQuota(); spaceLimit.setDeviceNum(deviceNum); spaceLimit.setTimeserieNum(timeserieNum); - spaceLimit.setDisk(disk); + spaceLimit.setDiskSize(disk); this.spaceLimit = spaceLimit; }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterQuotaManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterQuotaManager.java index a7c4939..d1a15e9 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterQuotaManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterQuotaManager.java
@@ -18,16 +18,27 @@ */ package org.apache.iotdb.confignode.manager; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq; +import org.apache.iotdb.common.rpc.thrift.TSpaceQuota; +import org.apache.iotdb.confignode.client.DataNodeRequestType; +import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool; +import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler; import org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan; -import org.apache.iotdb.confignode.persistence.QuotaInfo; -import org.apache.iotdb.confignode.rpc.thrift.TSetSpaceQuotaReq; +import org.apache.iotdb.confignode.persistence.quota.QuotaInfo; +import org.apache.iotdb.confignode.rpc.thrift.TShowSpaceQuotaResp; import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; +import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + // TODO: Manage quotas for storage groups public class ClusterQuotaManager { @@ -47,6 +58,14 @@ .getConsensusManager() .write(new SetSpaceQuotaPlan(req.getStorageGroup(), req.getSpaceLimit())); if (response.getStatus() != null) { + if (response.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + Map<Integer, TDataNodeLocation> dataNodeLocationMap = + configManager.getNodeManager().getRegisteredDataNodeLocations(); + AsyncClientHandler<TSetSpaceQuotaReq, TSStatus> clientHandler = + new AsyncClientHandler<>(DataNodeRequestType.SET_SPACE_QUOTA, req, dataNodeLocationMap); + AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler); + return RpcUtils.squashResponseStatusList(clientHandler.getResponseList()); + } return response.getStatus(); } else { LOGGER.warn( @@ -59,4 +78,21 @@ return res; } } + + public TShowSpaceQuotaResp showSpaceQuota(List<String> storageGroups) { + TShowSpaceQuotaResp showSpaceQuotaResp = new TShowSpaceQuotaResp(); + if (storageGroups.isEmpty()) { + showSpaceQuotaResp.setSpaceQuota(quotaInfo.getSpaceQuotaLimit()); + } else if (!quotaInfo.getSpaceQuotaLimit().isEmpty()) { + Map<String, TSpaceQuota> spaceQuotaMap = new HashMap<>(); + for (String storageGroup : storageGroups) { + if (quotaInfo.getSpaceQuotaLimit().containsKey(storageGroup)) { + spaceQuotaMap.put(storageGroup, quotaInfo.getSpaceQuotaLimit().get(storageGroup)); + } + } + showSpaceQuotaResp.setSpaceQuota(spaceQuotaMap); + } + showSpaceQuotaResp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); + return showSpaceQuotaResp; + } }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index dd98b90..0daa51c 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; +import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.conf.CommonDescriptor; @@ -82,13 +83,13 @@ import org.apache.iotdb.confignode.manager.partition.PartitionManager; import org.apache.iotdb.confignode.persistence.AuthorInfo; import org.apache.iotdb.confignode.persistence.ProcedureInfo; -import org.apache.iotdb.confignode.persistence.QuotaInfo; import org.apache.iotdb.confignode.persistence.TriggerInfo; import org.apache.iotdb.confignode.persistence.UDFInfo; import org.apache.iotdb.confignode.persistence.cq.CQInfo; import org.apache.iotdb.confignode.persistence.executor.ConfigPlanExecutor; import org.apache.iotdb.confignode.persistence.node.NodeInfo; import org.apache.iotdb.confignode.persistence.partition.PartitionInfo; +import org.apache.iotdb.confignode.persistence.quota.QuotaInfo; import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo; import org.apache.iotdb.confignode.persistence.sync.ClusterSyncInfo; import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq; @@ -124,13 +125,13 @@ import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp; import org.apache.iotdb.confignode.rpc.thrift.TSetDataNodeStatusReq; import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaTemplateReq; -import org.apache.iotdb.confignode.rpc.thrift.TSetSpaceQuotaReq; import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp; import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp; import org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp; import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp; +import org.apache.iotdb.confignode.rpc.thrift.TShowSpaceQuotaResp; import org.apache.iotdb.confignode.rpc.thrift.TShowStorageGroupResp; import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema; import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq; @@ -1332,6 +1333,13 @@ : status; } + public TShowSpaceQuotaResp showSpaceQuotaResp(List<String> storageGroups) { + TSStatus status = confirmLeader(); + return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() + ? clusterQuotaManager.showSpaceQuota(storageGroups) + : new TShowSpaceQuotaResp(status); + } + /** Get all related schemaRegion which may contains the timeSeries matched by given patternTree */ public Map<TConsensusGroupId, TRegionReplicaSet> getRelatedSchemaRegionGroup( PathPatternTree patternTree) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java index c71a18f..8344d68 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TFlushReq; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathPatternTree; import org.apache.iotdb.confignode.consensus.request.auth.AuthorPlan; @@ -82,7 +83,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp; import org.apache.iotdb.confignode.rpc.thrift.TSetDataNodeStatusReq; import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaTemplateReq; -import org.apache.iotdb.confignode.rpc.thrift.TSetSpaceQuotaReq; import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp; import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp; import org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/QuotaInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/QuotaInfo.java deleted file mode 100644 index 026d11e..0000000 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/QuotaInfo.java +++ /dev/null
@@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.confignode.persistence; - -import org.apache.iotdb.common.rpc.thrift.TSStatus; -import org.apache.iotdb.common.rpc.thrift.TSpaceQuota; -import org.apache.iotdb.commons.snapshot.SnapshotProcessor; -import org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan; -import org.apache.iotdb.rpc.RpcUtils; -import org.apache.iotdb.rpc.TSStatusCode; - -import org.apache.thrift.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -// TODO: Store quota information of each sg -public class QuotaInfo implements SnapshotProcessor { - - private static final Logger logger = LoggerFactory.getLogger(QuotaInfo.class); - private final Map<String, TSpaceQuota> spaceQuotaLimit; - private final Map<String, TSpaceQuota> useSpaceQuota; - private final Map<Integer, Integer> regionDisk; - - public QuotaInfo() { - spaceQuotaLimit = new HashMap<>(); - useSpaceQuota = new HashMap<>(); - regionDisk = new HashMap<>(); - } - - public TSStatus setSpaceQuota(SetSpaceQuotaPlan setSpaceQuotaPlan) { - for (String storageGroup : setSpaceQuotaPlan.getPrefixPathList()) { - spaceQuotaLimit.put(storageGroup, setSpaceQuotaPlan.getSpaceLimit()); - } - return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); - } - - // TODO: add Snapshot - @Override - public boolean processTakeSnapshot(File snapshotDir) throws TException, IOException { - return false; - } - - @Override - public void processLoadSnapshot(File snapshotDir) throws TException, IOException {} -}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java index 886362b..df9fffc 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -93,12 +93,12 @@ import org.apache.iotdb.confignode.exception.physical.UnknownPhysicalPlanTypeException; import org.apache.iotdb.confignode.persistence.AuthorInfo; import org.apache.iotdb.confignode.persistence.ProcedureInfo; -import org.apache.iotdb.confignode.persistence.QuotaInfo; import org.apache.iotdb.confignode.persistence.TriggerInfo; import org.apache.iotdb.confignode.persistence.UDFInfo; import org.apache.iotdb.confignode.persistence.cq.CQInfo; import org.apache.iotdb.confignode.persistence.node.NodeInfo; import org.apache.iotdb.confignode.persistence.partition.PartitionInfo; +import org.apache.iotdb.confignode.persistence.quota.QuotaInfo; import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo; import org.apache.iotdb.confignode.persistence.sync.ClusterSyncInfo; import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/quota/QuotaInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/quota/QuotaInfo.java new file mode 100644 index 0000000..6f9d32a --- /dev/null +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/quota/QuotaInfo.java
@@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.persistence.quota; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.common.rpc.thrift.TSpaceQuota; +import org.apache.iotdb.commons.snapshot.SnapshotProcessor; +import org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan; +import org.apache.iotdb.rpc.RpcUtils; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; + +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class QuotaInfo implements SnapshotProcessor { + + private static final Logger logger = LoggerFactory.getLogger(QuotaInfo.class); + + private final ReentrantReadWriteLock spaceQuotaReadWriteLock; + private final Map<String, TSpaceQuota> spaceQuotaLimit; + private final Map<String, TSpaceQuota> useSpaceQuota; + private final Map<Integer, Integer> regionDisk; + + private final String snapshotFileName = "quota_info.bin"; + + public QuotaInfo() { + spaceQuotaReadWriteLock = new ReentrantReadWriteLock(); + spaceQuotaLimit = new HashMap<>(); + useSpaceQuota = new HashMap<>(); + regionDisk = new HashMap<>(); + } + + public TSStatus setSpaceQuota(SetSpaceQuotaPlan setSpaceQuotaPlan) { + for (String storageGroup : setSpaceQuotaPlan.getPrefixPathList()) { + TSpaceQuota spaceQuota = setSpaceQuotaPlan.getSpaceLimit(); + // “0” means that the user has not reset the value of the space quota type + // So the old values are still used + if (spaceQuotaLimit.containsKey(storageGroup)) { + if (spaceQuota.getDeviceNum() == 0) { + spaceQuota.setDeviceNum(spaceQuotaLimit.get(storageGroup).getDeviceNum()); + } + if (spaceQuota.getTimeserieNum() == 0) { + spaceQuota.setTimeserieNum(spaceQuotaLimit.get(storageGroup).getTimeserieNum()); + } + if (spaceQuota.getDiskSize() == 0) { + spaceQuota.setDiskSize(spaceQuotaLimit.get(storageGroup).getDiskSize()); + } + } + spaceQuotaLimit.put(storageGroup, spaceQuota); + } + return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); + } + + public Map<String, TSpaceQuota> getSpaceQuotaLimit() { + return spaceQuotaLimit; + } + + // TODO: add Snapshot + @Override + public boolean processTakeSnapshot(File snapshotDir) throws TException, IOException { + File snapshotFile = new File(snapshotDir, snapshotFileName); + if (snapshotFile.exists() && snapshotFile.isFile()) { + logger.error( + "Failed to take snapshot, because snapshot file [{}] is already exist.", + snapshotFile.getAbsolutePath()); + return false; + } + + spaceQuotaReadWriteLock.writeLock().lock(); + try (FileOutputStream fileOutputStream = new FileOutputStream(snapshotFile)) { + serializeSpaceQuotaLimit(fileOutputStream); + } finally { + spaceQuotaReadWriteLock.writeLock().unlock(); + } + return true; + } + + private void serializeSpaceQuotaLimit(FileOutputStream fileOutputStream) throws IOException { + ReadWriteIOUtils.write(spaceQuotaLimit.size(), fileOutputStream); + for (Map.Entry<String, TSpaceQuota> spaceQuotaEntry : spaceQuotaLimit.entrySet()) { + ReadWriteIOUtils.write(spaceQuotaEntry.getKey(), fileOutputStream); + ReadWriteIOUtils.write(spaceQuotaEntry.getValue().getDeviceNum(), fileOutputStream); + ReadWriteIOUtils.write(spaceQuotaEntry.getValue().getTimeserieNum(), fileOutputStream); + ReadWriteIOUtils.write(spaceQuotaEntry.getValue().getDiskSize(), fileOutputStream); + } + } + + @Override + public void processLoadSnapshot(File snapshotDir) throws TException, IOException { + File snapshotFile = new File(snapshotDir, snapshotFileName); + if (!snapshotFile.exists() || !snapshotFile.isFile()) { + logger.error( + "Failed to load snapshot,snapshot file [{}] is not exist.", + snapshotFile.getAbsolutePath()); + return; + } + spaceQuotaReadWriteLock.writeLock().lock(); + try (FileInputStream fileInputStream = new FileInputStream(snapshotFile)) { + clear(); + deserializeSpaceQuotaLimit(fileInputStream); + } finally { + spaceQuotaReadWriteLock.writeLock().unlock(); + } + } + + private void deserializeSpaceQuotaLimit(FileInputStream fileInputStream) throws IOException { + int size = ReadWriteIOUtils.readInt(fileInputStream); + while (size > 0) { + String path = ReadWriteIOUtils.readString(fileInputStream); + TSpaceQuota spaceQuota = new TSpaceQuota(); + spaceQuota.setDeviceNum(ReadWriteIOUtils.readInt(fileInputStream)); + spaceQuota.setTimeserieNum(ReadWriteIOUtils.readInt(fileInputStream)); + spaceQuota.setDiskSize(ReadWriteIOUtils.readLong(fileInputStream)); + spaceQuotaLimit.put(path, spaceQuota); + size--; + } + } + + public void clear() { + spaceQuotaLimit.clear(); + } +}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index 1da0aac..a4196a4 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TFlushReq; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq; import org.apache.iotdb.common.rpc.thrift.TSetTTLReq; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.auth.AuthException; @@ -124,7 +125,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TSetDataReplicationFactorReq; import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaReplicationFactorReq; import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaTemplateReq; -import org.apache.iotdb.confignode.rpc.thrift.TSetSpaceQuotaReq; import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq; import org.apache.iotdb.confignode.rpc.thrift.TSetTimePartitionIntervalReq; import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp; @@ -135,6 +135,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp; import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp; +import org.apache.iotdb.confignode.rpc.thrift.TShowSpaceQuotaResp; import org.apache.iotdb.confignode.rpc.thrift.TShowStorageGroupResp; import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema; import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp; @@ -779,4 +780,9 @@ public TSStatus setSpaceQuota(TSetSpaceQuotaReq req) throws TException { return configManager.setSpaceQuota(req); } + + @Override + public TShowSpaceQuotaResp showSpaceQuota(List<String> storageGroups) throws TException { + return configManager.showSpaceQuotaResp(storageGroups); + } }
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java index 3bfc22e..7234469 100644 --- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java +++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -1312,7 +1312,7 @@ TSpaceQuota spaceQuota = new TSpaceQuota(); spaceQuota.setDeviceNum(2); spaceQuota.setTimeserieNum(3); - spaceQuota.setDisk(1024); + spaceQuota.setDiskSize(1024); SetSpaceQuotaPlan plan = new SetSpaceQuotaPlan(Collections.singletonList("root.sg"), spaceQuota); SetSpaceQuotaPlan deserializedPlan =
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/QuotaInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/QuotaInfoTest.java new file mode 100644 index 0000000..075237e --- /dev/null +++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/QuotaInfoTest.java
@@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.persistence; + +import org.apache.iotdb.common.rpc.thrift.TSpaceQuota; +import org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan; +import org.apache.iotdb.confignode.persistence.quota.QuotaInfo; + +import org.apache.commons.io.FileUtils; +import org.apache.thrift.TException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.iotdb.db.constant.TestConstant.BASE_OUTPUT_PATH; + +public class QuotaInfoTest { + + private QuotaInfo quotaInfo; + private static final File snapshotDir = new File(BASE_OUTPUT_PATH, "snapshot"); + + @Before + public void setup() throws IOException { + quotaInfo = new QuotaInfo(); + if (!snapshotDir.exists()) { + snapshotDir.mkdirs(); + } + } + + @After + public void cleanup() throws IOException { + if (snapshotDir.exists()) { + FileUtils.deleteDirectory(snapshotDir); + } + } + + private void prepareQuotaInfo() { + List<String> prefixPathList = new ArrayList<>(); + prefixPathList.add("root.sg"); + prefixPathList.add("root.ln"); + TSpaceQuota spaceQuota = new TSpaceQuota(); + spaceQuota.setTimeserieNum(10000); + spaceQuota.setDeviceNum(100); + spaceQuota.setDiskSize(512); + SetSpaceQuotaPlan setSpaceQuotaPlan = new SetSpaceQuotaPlan(prefixPathList, spaceQuota); + quotaInfo.setSpaceQuota(setSpaceQuotaPlan); + } + + @Test + public void testSnapshot() throws TException, IOException { + prepareQuotaInfo(); + + quotaInfo.processTakeSnapshot(snapshotDir); + QuotaInfo quotaInfo2 = new QuotaInfo(); + quotaInfo2.processLoadSnapshot(snapshotDir); + + Assert.assertEquals(quotaInfo.getSpaceQuotaLimit(), quotaInfo2.getSpaceQuotaLimit()); + } +}
diff --git a/confignode/src/test/resources/confignode1conf/iotdb-common.properties b/confignode/src/test/resources/confignode1conf/iotdb-common.properties index caf36cc..442e767 100644 --- a/confignode/src/test/resources/confignode1conf/iotdb-common.properties +++ b/confignode/src/test/resources/confignode1conf/iotdb-common.properties
@@ -24,4 +24,6 @@ data_replication_factor=3 udf_lib_dir=target/confignode1/ext/udf trigger_lib_dir=target/confignode1/ext/trigger -config_node_ratis_log_appender_buffer_size_max = 14194304 \ No newline at end of file +config_node_ratis_log_appender_buffer_size_max = 14194304 +space_quota_dir=target/confignode1/system/quota/space +
diff --git a/confignode/src/test/resources/confignode2conf/iotdb-common.properties b/confignode/src/test/resources/confignode2conf/iotdb-common.properties index 2f25f57..a001002 100644 --- a/confignode/src/test/resources/confignode2conf/iotdb-common.properties +++ b/confignode/src/test/resources/confignode2conf/iotdb-common.properties
@@ -24,4 +24,6 @@ data_replication_factor=3 udf_lib_dir=target/confignode2/ext/udf trigger_lib_dir=target/confignode2/ext/trigger -config_node_ratis_log_appender_buffer_size_max = 14194304 \ No newline at end of file +config_node_ratis_log_appender_buffer_size_max = 14194304 +space_quota_dir=target/confignode2/system/quota/space +
diff --git a/confignode/src/test/resources/confignode3conf/iotdb-common.properties b/confignode/src/test/resources/confignode3conf/iotdb-common.properties index 208d039..fe9eb1d 100644 --- a/confignode/src/test/resources/confignode3conf/iotdb-common.properties +++ b/confignode/src/test/resources/confignode3conf/iotdb-common.properties
@@ -24,4 +24,5 @@ data_replication_factor=3 udf_lib_dir=target/confignode3/ext/udf trigger_lib_dir=target/confignode3/ext/trigger -config_node_ratis_log_appender_buffer_size_max = 14194304 \ No newline at end of file +config_node_ratis_log_appender_buffer_size_max = 14194304 +space_quota_dir=target/confignode3/system/quota/space
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties index f41b0e6..b32508a 100644 --- a/node-commons/src/assembly/resources/conf/iotdb-common.properties +++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -1111,3 +1111,22 @@ # If enabled, we can set different quotas for storage groups # Datatype: boolean # quota_enable=false + +# Space quota dir +# If this property is unset, system will save the data in the default relative path directory under +# the space quota folder(i.e., %CONFIGNODE_HOME%/system/quota/space). +# +# If it is absolute, system will save the data in exact location it points to. +# If it is relative, system will save the data in the relative path directory it indicates under the +# Space quota folder. +# Note: If data_dir is assigned an empty string(i.e.,zero-size), it will be handled as a relative +# path. +# +# For Window platform +# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is +# absolute. Otherwise, it is relative. +# space_quota_dir=system\\quota\\space +# +# For Linux platform +# If its prefix is "/", then the path is absolute. Otherwise, it is relative. +# space_quota_dir=system/quota/space \ No newline at end of file
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java index f485208..da3ad97 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -257,6 +257,7 @@ // quota public static final String SPACE_QUOTA_DISK = "disk"; + public static final String SPACE_QUOTA_UNLIMITED = "unlimited"; // client version number public enum ClientVersion {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/enums/SpaceQuotaType.java b/node-commons/src/main/java/org/apache/iotdb/commons/enums/SpaceQuotaType.java index 36b23e4..bfbb656 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/enums/SpaceQuotaType.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/enums/SpaceQuotaType.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.commons.enums; public enum SpaceQuotaType { - DISK, - DEVICE_NUMBER, - TIMESERIES_NUMBER + diskSize, + deviceNum, + timeSeriesNum }
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java index 463afad..b35d51c 100644 --- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java +++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TFlushReq; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq; import org.apache.iotdb.common.rpc.thrift.TSetTTLReq; import org.apache.iotdb.commons.client.BaseClientFactory; import org.apache.iotdb.commons.client.ClientFactoryProperty; @@ -91,7 +92,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TSetDataReplicationFactorReq; import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaReplicationFactorReq; import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaTemplateReq; -import org.apache.iotdb.confignode.rpc.thrift.TSetSpaceQuotaReq; import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq; import org.apache.iotdb.confignode.rpc.thrift.TSetTimePartitionIntervalReq; import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp; @@ -102,6 +102,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp; import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp; +import org.apache.iotdb.confignode.rpc.thrift.TShowSpaceQuotaResp; import org.apache.iotdb.confignode.rpc.thrift.TShowStorageGroupResp; import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp; import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq; @@ -1753,6 +1754,22 @@ throw new TException(MSG_RECONNECTION_FAIL); } + @Override + public TShowSpaceQuotaResp showSpaceQuota(List<String> storageGroups) throws TException { + for (int i = 0; i < RETRY_NUM; i++) { + try { + TShowSpaceQuotaResp resp = client.showSpaceQuota(storageGroups); + if (!updateConfigNodeLeader(resp.getStatus())) { + return resp; + } + } catch (TException e) { + configLeader = null; + } + reconnect(); + } + throw new TException(MSG_RECONNECTION_FAIL); + } + public static class Factory extends BaseClientFactory<ConfigNodeRegionId, ConfigNodeClient> { public Factory( ClientManager<ConfigNodeRegionId, ConfigNodeClient> clientManager,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java index 1d93103..ba7f10f 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
@@ -123,6 +123,12 @@ public static final String CQID = "CQId"; public static final String QUERY = "Query"; + // column names for show space quota + public static final String COLUMN_DATABASE = "database"; + public static final String COLUMN_QUOTA_TYPE = "quotaType"; + public static final String COLUMN_LIMIT = "limit"; + public static final String COLUMN_USED = "used"; + public static final List<ColumnHeader> lastQueryColumnHeaders = ImmutableList.of( new ColumnHeader(TIMESERIES, TSDataType.TEXT), @@ -325,4 +331,10 @@ new ColumnHeader(CQID, TSDataType.TEXT), new ColumnHeader(QUERY, TSDataType.TEXT), new ColumnHeader(STATE, TSDataType.TEXT)); + + public static final List<ColumnHeader> showSpaceQuotaColumnHeaders = + ImmutableList.of( + new ColumnHeader(COLUMN_DATABASE, TSDataType.TEXT), + new ColumnHeader(COLUMN_QUOTA_TYPE, TSDataType.TEXT), + new ColumnHeader(COLUMN_LIMIT, TSDataType.TEXT)); }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java index 00d4f7a..f3343f0 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
@@ -154,4 +154,8 @@ public static DatasetHeader getShowContinuousQueriesHeader() { return new DatasetHeader(ColumnHeaderConstant.showContinuousQueriesColumnHeaders, true); } + + public static DatasetHeader getShowSpaceQuotaHeader() { + return new DatasetHeader(ColumnHeaderConstant.showSpaceQuotaColumnHeaders, true); + } }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/constant/StatementType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/constant/StatementType.java index 7e926f1..c179719 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/constant/StatementType.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/constant/StatementType.java
@@ -152,4 +152,5 @@ DEACTIVATE_TEMPLATE, SET_SPACE_QUOTA, + SHOW_SPACE_QUOTA, }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java index 43c8610..e11fc1f 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
@@ -59,6 +59,7 @@ import org.apache.iotdb.db.mpp.plan.execution.config.sys.MergeTask; import org.apache.iotdb.db.mpp.plan.execution.config.sys.SetSystemStatusTask; import org.apache.iotdb.db.mpp.plan.execution.config.sys.quota.SetSpaceQuotaTask; +import org.apache.iotdb.db.mpp.plan.execution.config.sys.quota.ShowSpaceQuotaTask; import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.CreatePipeSinkTask; import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.CreatePipeTask; import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.DropPipeSinkTask; @@ -109,6 +110,7 @@ import org.apache.iotdb.db.mpp.plan.statement.sys.MergeStatement; import org.apache.iotdb.db.mpp.plan.statement.sys.SetSystemStatusStatement; import org.apache.iotdb.db.mpp.plan.statement.sys.quota.SetSpaceQuotaStatement; +import org.apache.iotdb.db.mpp.plan.statement.sys.quota.ShowSpaceQuotaStatement; import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement; import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement; import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement; @@ -307,6 +309,12 @@ } @Override + public IConfigTask visitShowSpaceQuota( + ShowSpaceQuotaStatement showSpaceQuotaStatement, TaskContext context) { + return new ShowSpaceQuotaTask(showSpaceQuotaStatement); + } + + @Override public IConfigTask visitShowDataNodes( ShowDataNodesStatement showDataNodesStatement, TaskContext context) { return new ShowDataNodesTask(showDataNodesStatement);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java index 66c720b..0431c8c 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -21,6 +21,7 @@ import org.apache.iotdb.common.rpc.thrift.TFlushReq; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq; import org.apache.iotdb.common.rpc.thrift.TSetTTLReq; import org.apache.iotdb.common.rpc.thrift.TSpaceQuota; import org.apache.iotdb.commons.client.IClientManager; @@ -58,7 +59,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp; import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp; import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo; -import org.apache.iotdb.confignode.rpc.thrift.TSetSpaceQuotaReq; import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq; import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp; import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp; @@ -69,6 +69,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp; import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp; +import org.apache.iotdb.confignode.rpc.thrift.TShowSpaceQuotaResp; import org.apache.iotdb.confignode.rpc.thrift.TShowStorageGroupResp; import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema; import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp; @@ -99,6 +100,7 @@ import org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.ShowNodesInSchemaTemplateTask; import org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.ShowPathSetTemplateTask; import org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.ShowSchemaTemplateTask; +import org.apache.iotdb.db.mpp.plan.execution.config.sys.quota.ShowSpaceQuotaTask; import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.ShowPipeSinkTask; import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.ShowPipeTask; import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement; @@ -126,6 +128,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowSchemaTemplateStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.template.UnsetSchemaTemplateStatement; import org.apache.iotdb.db.mpp.plan.statement.sys.quota.SetSpaceQuotaStatement; +import org.apache.iotdb.db.mpp.plan.statement.sys.quota.ShowSpaceQuotaStatement; import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement; import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement; import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement; @@ -161,6 +164,7 @@ import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -1096,10 +1100,10 @@ TSpaceQuota spaceQuota = new TSpaceQuota(); spaceQuota.setDeviceNum(setSpaceQuotaStatement.getDeviceNum()); spaceQuota.setTimeserieNum(setSpaceQuotaStatement.getTimeSeriesNum()); - spaceQuota.setDisk(setSpaceQuotaStatement.getDiskSize()); + spaceQuota.setDiskSize(setSpaceQuotaStatement.getDiskSize()); req.setSpaceLimit(spaceQuota); try (ConfigNodeClient client = - CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) { + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) { // Send request to some API server tsStatus = client.setSpaceQuota(req); } catch (IOException | TException e) { @@ -1114,6 +1118,29 @@ } @Override + public SettableFuture<ConfigTaskResult> showSpaceQuota( + ShowSpaceQuotaStatement showSpaceQuotaStatement) { + SettableFuture<ConfigTaskResult> future = SettableFuture.create(); + + try (ConfigNodeClient configNodeClient = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) { + List<String> storageGroups = new ArrayList<>(); + if (showSpaceQuotaStatement.getStorageGroups() != null) { + showSpaceQuotaStatement + .getStorageGroups() + .forEach(storageGroup -> storageGroups.add(storageGroup.toString())); + } + // Send request to some API server + TShowSpaceQuotaResp showSpaceQuotaResp = configNodeClient.showSpaceQuota(storageGroups); + // build TSBlock + ShowSpaceQuotaTask.buildTSBlock(showSpaceQuotaResp, future); + } catch (Exception e) { + future.setException(e); + } + return future; + } + + @Override public SettableFuture<ConfigTaskResult> createPipeSink( CreatePipeSinkStatement createPipeSinkStatement) { SettableFuture<ConfigTaskResult> future = SettableFuture.create();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java index 7b2ffa5..f182a77 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowSchemaTemplateStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.template.UnsetSchemaTemplateStatement; import org.apache.iotdb.db.mpp.plan.statement.sys.quota.SetSpaceQuotaStatement; +import org.apache.iotdb.db.mpp.plan.statement.sys.quota.ShowSpaceQuotaStatement; import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement; import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement; import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement; @@ -132,6 +133,8 @@ SettableFuture<ConfigTaskResult> setSpaceQuota(SetSpaceQuotaStatement setSpaceQuotaStatement); + SettableFuture<ConfigTaskResult> showSpaceQuota(ShowSpaceQuotaStatement showSpaceQuotaStatement); + SettableFuture<ConfigTaskResult> createPipeSink(CreatePipeSinkStatement createPipeSinkStatement); SettableFuture<ConfigTaskResult> dropPipeSink(DropPipeSinkStatement dropPipeSinkStatement);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java index 38f2eba..8acee47 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
@@ -65,6 +65,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowSchemaTemplateStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.template.UnsetSchemaTemplateStatement; import org.apache.iotdb.db.mpp.plan.statement.sys.quota.SetSpaceQuotaStatement; +import org.apache.iotdb.db.mpp.plan.statement.sys.quota.ShowSpaceQuotaStatement; import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement; import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement; import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement; @@ -535,7 +536,18 @@ SettableFuture<ConfigTaskResult> future = SettableFuture.create(); future.setException( new IoTDBException( - "Executing unset schema template is not supported", + "Executing set space quota is not supported", + TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())); + return future; + } + + @Override + public SettableFuture<ConfigTaskResult> showSpaceQuota( + ShowSpaceQuotaStatement showSpaceQuotaStatement) { + SettableFuture<ConfigTaskResult> future = SettableFuture.create(); + future.setException( + new IoTDBException( + "Executing show space quota is not supported", TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())); return future; }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/quota/ShowSpaceQuotaTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/quota/ShowSpaceQuotaTask.java new file mode 100644 index 0000000..7d77972 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/quota/ShowSpaceQuotaTask.java
@@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.plan.execution.config.sys.quota; + +import org.apache.iotdb.common.rpc.thrift.TSpaceQuota; +import org.apache.iotdb.commons.enums.SpaceQuotaType; +import org.apache.iotdb.confignode.rpc.thrift.TShowSpaceQuotaResp; +import org.apache.iotdb.db.mpp.common.header.ColumnHeader; +import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant; +import org.apache.iotdb.db.mpp.common.header.DatasetHeader; +import org.apache.iotdb.db.mpp.common.header.DatasetHeaderFactory; +import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult; +import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask; +import org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor; +import org.apache.iotdb.db.mpp.plan.statement.sys.quota.ShowSpaceQuotaStatement; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; +import org.apache.iotdb.tsfile.utils.Binary; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class ShowSpaceQuotaTask implements IConfigTask { + + private final ShowSpaceQuotaStatement showSpaceQuotaStatement; + + public ShowSpaceQuotaTask(ShowSpaceQuotaStatement showSpaceQuotaStatement) { + this.showSpaceQuotaStatement = showSpaceQuotaStatement; + } + + @Override + public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor) + throws InterruptedException { + return configTaskExecutor.showSpaceQuota(showSpaceQuotaStatement); + } + + public static void buildTSBlock( + TShowSpaceQuotaResp resp, SettableFuture<ConfigTaskResult> future) { + List<TSDataType> outputDataTypes = + ColumnHeaderConstant.showSpaceQuotaColumnHeaders.stream() + .map(ColumnHeader::getColumnType) + .collect(Collectors.toList()); + TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes); + if (resp.getSpaceQuota() != null) { + for (Map.Entry<String, TSpaceQuota> spaceQuotaEntry : resp.getSpaceQuota().entrySet()) { + if (spaceQuotaEntry.getValue().getDiskSize() != -1) { + builder.getTimeColumnBuilder().writeLong(0L); + builder.getColumnBuilder(0).writeBinary(Binary.valueOf(spaceQuotaEntry.getKey())); + builder.getColumnBuilder(1).writeBinary(Binary.valueOf(SpaceQuotaType.diskSize.name())); + builder + .getColumnBuilder(2) + .writeBinary( + Binary.valueOf( + spaceQuotaEntry.getValue().getDiskSize() == 0 + ? "unlimited" + : spaceQuotaEntry.getValue().getDiskSize() + "M")); + builder.declarePosition(); + } + if (spaceQuotaEntry.getValue().getDeviceNum() != -1) { + builder.getTimeColumnBuilder().writeLong(0L); + builder.getColumnBuilder(0).writeBinary(Binary.valueOf(spaceQuotaEntry.getKey())); + builder.getColumnBuilder(1).writeBinary(Binary.valueOf(SpaceQuotaType.deviceNum.name())); + builder + .getColumnBuilder(2) + .writeBinary( + Binary.valueOf( + spaceQuotaEntry.getValue().getDeviceNum() == 0 + ? "unlimited" + : spaceQuotaEntry.getValue().getDeviceNum() + "")); + builder.declarePosition(); + } + if (spaceQuotaEntry.getValue().getTimeserieNum() != -1) { + builder.getTimeColumnBuilder().writeLong(0L); + builder.getColumnBuilder(0).writeBinary(Binary.valueOf(spaceQuotaEntry.getKey())); + builder + .getColumnBuilder(1) + .writeBinary(Binary.valueOf(SpaceQuotaType.timeSeriesNum.name())); + builder + .getColumnBuilder(2) + .writeBinary( + Binary.valueOf( + spaceQuotaEntry.getValue().getTimeserieNum() == 0 + ? "unlimited" + : spaceQuotaEntry.getValue().getTimeserieNum() + "")); + builder.declarePosition(); + } + } + } + DatasetHeader datasetHeader = DatasetHeaderFactory.getShowSpaceQuotaHeader(); + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, builder.build(), datasetHeader)); + } +}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java index f04e01e..001949e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -147,6 +147,7 @@ import org.apache.iotdb.db.mpp.plan.statement.sys.SetSystemStatusStatement; import org.apache.iotdb.db.mpp.plan.statement.sys.ShowVersionStatement; import org.apache.iotdb.db.mpp.plan.statement.sys.quota.SetSpaceQuotaStatement; +import org.apache.iotdb.db.mpp.plan.statement.sys.quota.ShowSpaceQuotaStatement; import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement; import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement; import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement; @@ -2169,6 +2170,10 @@ // Quota @Override public Statement visitSetSpaceQuota(IoTDBSqlParser.SetSpaceQuotaContext ctx) { + if (!IoTDBDescriptor.getInstance().getConfig().isQuotaEnable()) { + throw new SQLParserException("Limit configuration is not enabled, please enable it first."); + } + SetSpaceQuotaStatement setSpaceQuotaStatement = new SetSpaceQuotaStatement(); List<IoTDBSqlParser.PrefixPathContext> prefixPathContexts = ctx.prefixPath(); List<String> paths = new ArrayList<>(); @@ -2185,35 +2190,73 @@ } if (quotas.containsKey(IoTDBConstant.COLUMN_DEVICES)) { - setSpaceQuotaStatement.setDeviceNum( - Integer.parseInt(quotas.get(IoTDBConstant.COLUMN_DEVICES))); + if (quotas.get(IoTDBConstant.COLUMN_DEVICES).equals(IoTDBConstant.SPACE_QUOTA_UNLIMITED)) { + setSpaceQuotaStatement.setTimeSeriesNum(0); + } else if (Integer.parseInt(quotas.get(IoTDBConstant.COLUMN_DEVICES)) <= 0) { + throw new SQLParserException("Please set the number of devices greater than 0"); + } else { + setSpaceQuotaStatement.setDeviceNum( + Integer.parseInt(quotas.get(IoTDBConstant.COLUMN_DEVICES))); + } } if (quotas.containsKey(IoTDBConstant.COLUMN_TIMESERIES)) { - setSpaceQuotaStatement.setTimeSeriesNum( - Integer.parseInt(quotas.get(IoTDBConstant.COLUMN_TIMESERIES))); + if (quotas.get(IoTDBConstant.COLUMN_TIMESERIES).equals(IoTDBConstant.SPACE_QUOTA_UNLIMITED)) { + setSpaceQuotaStatement.setDiskSize(0); + } else if (Integer.parseInt(quotas.get(IoTDBConstant.COLUMN_TIMESERIES)) <= 0) { + throw new SQLParserException("Please set the number of timeseries greater than 0"); + } else { + setSpaceQuotaStatement.setTimeSeriesNum( + Integer.parseInt(quotas.get(IoTDBConstant.COLUMN_TIMESERIES))); + } } if (quotas.containsKey(IoTDBConstant.SPACE_QUOTA_DISK)) { - setSpaceQuotaStatement.setDiskSize(parseUnit(quotas.get(IoTDBConstant.SPACE_QUOTA_DISK))); + if (quotas.get(IoTDBConstant.SPACE_QUOTA_DISK).equals(IoTDBConstant.SPACE_QUOTA_UNLIMITED)) { + setSpaceQuotaStatement.setDeviceNum(0); + } else { + setSpaceQuotaStatement.setDiskSize(parseUnit(quotas.get(IoTDBConstant.SPACE_QUOTA_DISK))); + } } return setSpaceQuotaStatement; } + @Override + public Statement visitShowSpaceQuota(IoTDBSqlParser.ShowSpaceQuotaContext ctx) { + if (!IoTDBDescriptor.getInstance().getConfig().isQuotaEnable()) { + throw new SQLParserException("Limit configuration is not enabled, please enable it first."); + } + ShowSpaceQuotaStatement showSpaceQuotaStatement = new ShowSpaceQuotaStatement(); + List<PartialPath> storageGroups = null; + if (ctx.prefixPath() != null) { + storageGroups = new ArrayList<>(); + for (IoTDBSqlParser.PrefixPathContext prefixPathContext : ctx.prefixPath()) { + storageGroups.add(parsePrefixPath(prefixPathContext)); + } + showSpaceQuotaStatement.setStorageGroups(storageGroups); + } else { + showSpaceQuotaStatement.setStorageGroups(null); + } + return showSpaceQuotaStatement; + } + private long parseUnit(String data) { String unit = data.substring(data.length() - 1); long disk = Long.parseLong(data.substring(0, data.length() - 1)); + if (disk <= 0) { + throw new SQLParserException("Please set the disk size greater than 0"); + } switch (unit) { case "M": case "m": - return disk * 1024; + return disk; case "G": case "g": - return disk * 1024 * 1024; + return disk * 1024; case "T": case "t": - return disk * 1024 * 1024 * 1024; + return disk * 1024 * 1024; case "P": case "p": - return disk * 1024 * 1024 * 1024 * 1024; + return disk * 1024 * 1024 * 1024; default: throw new SQLParserException( "When setting the disk size, the unit is incorrect. Please use 'M', 'G', 'P', 'T' as the unit");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java index 29ac69a..dbdb04a 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
@@ -85,6 +85,7 @@ import org.apache.iotdb.db.mpp.plan.statement.sys.SetSystemStatusStatement; import org.apache.iotdb.db.mpp.plan.statement.sys.ShowVersionStatement; import org.apache.iotdb.db.mpp.plan.statement.sys.quota.SetSpaceQuotaStatement; +import org.apache.iotdb.db.mpp.plan.statement.sys.quota.ShowSpaceQuotaStatement; import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement; import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement; import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement; @@ -450,4 +451,8 @@ public R visitSetSpaceQuota(SetSpaceQuotaStatement setSpaceQuotaStatement, C context) { return visitStatement(setSpaceQuotaStatement, context); } + + public R visitShowSpaceQuota(ShowSpaceQuotaStatement showSpaceQuotaStatement, C context) { + return visitStatement(showSpaceQuotaStatement, context); + } }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/quota/ShowSpaceQuotaStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/quota/ShowSpaceQuotaStatement.java new file mode 100644 index 0000000..8cf304f --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/quota/ShowSpaceQuotaStatement.java
@@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.plan.statement.sys.quota; + +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.mpp.plan.analyze.QueryType; +import org.apache.iotdb.db.mpp.plan.constant.StatementType; +import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement; +import org.apache.iotdb.db.mpp.plan.statement.Statement; +import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor; + +import java.util.List; + +public class ShowSpaceQuotaStatement extends Statement implements IConfigStatement { + + private List<PartialPath> storageGroups; + + public ShowSpaceQuotaStatement() { + super(); + statementType = StatementType.SHOW_SPACE_QUOTA; + } + + public List<PartialPath> getStorageGroups() { + return storageGroups; + } + + public void setStorageGroups(List<PartialPath> storageGroups) { + this.storageGroups = storageGroups; + } + + @Override + public QueryType getQueryType() { + return QueryType.READ; + } + + @Override + public List<PartialPath> getPaths() { + return storageGroups; + } + + @Override + public <R, C> R accept(StatementVisitor<R, C> visitor, C context) { + return visitor.visitShowSpaceQuota(this, context); + } +}
diff --git a/server/src/main/java/org/apache/iotdb/db/quotas/DataNodeSpaceQuotaManager.java b/server/src/main/java/org/apache/iotdb/db/quotas/DataNodeSpaceQuotaManager.java index 776080b..ee8dced 100644 --- a/server/src/main/java/org/apache/iotdb/db/quotas/DataNodeSpaceQuotaManager.java +++ b/server/src/main/java/org/apache/iotdb/db/quotas/DataNodeSpaceQuotaManager.java
@@ -19,16 +19,48 @@ package org.apache.iotdb.db.quotas; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq; import org.apache.iotdb.common.rpc.thrift.TSpaceQuota; +import org.apache.iotdb.rpc.RpcUtils; +import org.apache.iotdb.rpc.TSStatusCode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; import java.util.Map; // TODO: Store quota information of each sg public class DataNodeSpaceQuotaManager { + private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeSpaceQuotaManager.class); + private Map<String, TSpaceQuota> spaceQuotaLimit; private Map<String, TSpaceQuota> useSpaceQuota; private Map<Integer, Integer> regionDisk; - public DataNodeSpaceQuotaManager() {} + public DataNodeSpaceQuotaManager() { + spaceQuotaLimit = new HashMap<>(); + useSpaceQuota = new HashMap<>(); + regionDisk = new HashMap<>(); + } + + /** SingleTone */ + private static class DataNodeSpaceQuotaManagerHolder { + private static final DataNodeSpaceQuotaManager INSTANCE = new DataNodeSpaceQuotaManager(); + + private DataNodeSpaceQuotaManagerHolder() {} + } + + public static DataNodeSpaceQuotaManager getInstance() { + return DataNodeSpaceQuotaManager.DataNodeSpaceQuotaManagerHolder.INSTANCE; + } + + public TSStatus setSpaceQuota(TSetSpaceQuotaReq req) { + for (String storageGroup : req.getStorageGroup()) { + spaceQuotaLimit.put(storageGroup, req.getSpaceLimit()); + } + return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); + } }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java index f691488..ca737e1 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TFlushReq; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq; import org.apache.iotdb.common.rpc.thrift.TSetTTLReq; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.conf.CommonConfig; @@ -109,6 +110,7 @@ import org.apache.iotdb.db.query.control.SessionManager; import org.apache.iotdb.db.query.control.clientsession.IClientSession; import org.apache.iotdb.db.query.control.clientsession.InternalClientSession; +import org.apache.iotdb.db.quotas.DataNodeSpaceQuotaManager; import org.apache.iotdb.db.service.DataNode; import org.apache.iotdb.db.service.RegionMigrateService; import org.apache.iotdb.db.sync.SyncService; @@ -225,6 +227,9 @@ private final DataNodeRegionManager regionManager = DataNodeRegionManager.getInstance(); + private final DataNodeSpaceQuotaManager spaceQuotaManager = + DataNodeSpaceQuotaManager.getInstance(); + public DataNodeInternalRPCServiceImpl() { super(); if (config.isClusterMode()) { @@ -905,6 +910,11 @@ } } + @Override + public TSStatus setSpaceQuota(TSetSpaceQuotaReq req) throws TException { + return spaceQuotaManager.setSpaceQuota(req); + } + private PathPatternTree filterPathPatternTree(PathPatternTree patternTree, String storageGroup) { PathPatternTree filteredPatternTree = new PathPatternTree(); try {
diff --git a/thrift-commons/src/main/thrift/common.thrift b/thrift-commons/src/main/thrift/common.thrift index 87800d9..c1b0f35 100644 --- a/thrift-commons/src/main/thrift/common.thrift +++ b/thrift-commons/src/main/thrift/common.thrift
@@ -125,7 +125,12 @@ // quota struct TSpaceQuota { - 1: optional i64 disk + 1: optional i64 diskSize 2: optional i32 deviceNum 3: optional i32 timeserieNum } + +struct TSetSpaceQuotaReq { + 1: required list<string> storageGroup + 2: required TSpaceQuota spaceLimit +}
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift index e2fcb0e..66ac2d9 100644 --- a/thrift-confignode/src/main/thrift/confignode.thrift +++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -557,14 +557,6 @@ } // ==================================================== -// Quota -// ==================================================== -struct TSetSpaceQuotaReq { - 1: required list<string> storageGroup - 2: required common.TSpaceQuota spaceLimit -} - -// ==================================================== // CQ // ==================================================== struct TCreateCQReq { @@ -608,6 +600,14 @@ 3: required string path } +// ==================================================== +// Quota +// ==================================================== +struct TShowSpaceQuotaResp{ + 1: required common.TSStatus status + 2: optional map<string, common.TSpaceQuota> spaceQuota +} + service IConfigNodeRPCService { // ====================================================== @@ -1086,6 +1086,11 @@ */ TShowCQResp showCQ() - common.TSStatus setSpaceQuota(TSetSpaceQuotaReq req) + /** + * Set space quota + **/ + common.TSStatus setSpaceQuota(common.TSetSpaceQuotaReq req) + + TShowSpaceQuotaResp showSpaceQuota(list<string> storageGroups); }
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift index 174847c..c2926c0 100644 --- a/thrift/src/main/thrift/datanode.thrift +++ b/thrift/src/main/thrift/datanode.thrift
@@ -639,6 +639,11 @@ * Execute CQ on DataNode */ common.TSStatus executeCQ(TExecuteCQ req) + + /** + * Set space quota + **/ + common.TSStatus setSpaceQuota(common.TSetSpaceQuotaReq req) } service MPPDataExchangeService {