[IOTDB-4919] add space quota snapshot and show space quota (#7968)

diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 63b717c..f0296d5 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -45,6 +45,7 @@
     | showFunctions | showTriggers | showContinuousQueries | showTTL | showAllTTL | showCluster | showClusterDetails | showRegion | showDataNodes | showConfigNodes
     | showSchemaTemplates | showNodesInSchemaTemplate
     | showPathsUsingSchemaTemplate | showPathsSetSchemaTemplate
+    | showSpaceQuota
     | countStorageGroup | countDevices | countTimeseries | countNodes
     | getRegionId | getTimeSlotList | getSeriesSlotList
     ;
@@ -384,6 +385,11 @@
     : SHOW PATHS prefixPath? USING SCHEMA? TEMPLATE templateName=identifier
     ;
 
+// Show Space Quota
+showSpaceQuota
+    : SHOW SPACE QUOTA (prefixPath (COMMA prefixPath)*)?
+    ;
+
 // Count Storage Group
 countStorageGroup
     : COUNT (STORAGE GROUP | DATABASES) prefixPath?
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
index ff236ce..096b818 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -85,5 +85,8 @@
   CONSTRUCT_SCHEMA_BLACK_LIST_WITH_TEMPLATE,
   ROLLBACK_SCHEMA_BLACK_LIST_WITH_TEMPLATE,
   DEACTIVATE_TEMPLATE,
-  COUNT_PATHS_USING_TEMPLATE
+  COUNT_PATHS_USING_TEMPLATE,
+
+  /** Quota */
+  SET_SPACE_QUOTA
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
index bfd686a..a94565d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
@@ -21,6 +21,7 @@
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
+import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
 import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
 import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
@@ -316,6 +317,12 @@
               (CountPathsUsingTemplateRPCHandler)
                   clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
           break;
+        case SET_SPACE_QUOTA:
+          client.setSpaceQuota(
+              (TSetSpaceQuotaReq) clientHandler.getRequest(requestId),
+              (AsyncTSStatusRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+          break;
         default:
           LOGGER.error(
               "Unexpected DataNode Request Type: {} when sendAsyncRequestToDataNode",
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 6240d49..c594782 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -346,6 +346,11 @@
 
     loadRatisConsensusConfig(properties);
     loadCQConfig(properties);
+    loadQuotaConfig(properties);
+  }
+
+  private void loadQuotaConfig(Properties properties) {
+    conf.setSpaceQuotaDir(properties.getProperty("space_quota_dir", conf.getSpaceQuotaDir()));
   }
 
   private void loadRatisConsensusConfig(Properties properties) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/quota/SetSpaceQuotaPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/quota/SetSpaceQuotaPlan.java
index 0ec2f38..5a5ab0e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/quota/SetSpaceQuotaPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/quota/SetSpaceQuotaPlan.java
@@ -67,7 +67,7 @@
     BasicStructureSerDeUtil.write(prefixPathList, stream);
     BasicStructureSerDeUtil.write(spaceLimit.getDeviceNum(), stream);
     BasicStructureSerDeUtil.write(spaceLimit.getTimeserieNum(), stream);
-    BasicStructureSerDeUtil.write(spaceLimit.getDisk(), stream);
+    BasicStructureSerDeUtil.write(spaceLimit.getDiskSize(), stream);
   }
 
   @Override
@@ -80,7 +80,7 @@
     TSpaceQuota spaceLimit = new TSpaceQuota();
     spaceLimit.setDeviceNum(deviceNum);
     spaceLimit.setTimeserieNum(timeserieNum);
-    spaceLimit.setDisk(disk);
+    spaceLimit.setDiskSize(disk);
     this.spaceLimit = spaceLimit;
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterQuotaManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterQuotaManager.java
index a7c4939..d1a15e9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterQuotaManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterQuotaManager.java
@@ -18,16 +18,27 @@
  */
 package org.apache.iotdb.confignode.manager;
 
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
+import org.apache.iotdb.common.rpc.thrift.TSpaceQuota;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
 import org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan;
-import org.apache.iotdb.confignode.persistence.QuotaInfo;
-import org.apache.iotdb.confignode.rpc.thrift.TSetSpaceQuotaReq;
+import org.apache.iotdb.confignode.persistence.quota.QuotaInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowSpaceQuotaResp;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 // TODO: Manage quotas for storage groups
 public class ClusterQuotaManager {
 
@@ -47,6 +58,14 @@
             .getConsensusManager()
             .write(new SetSpaceQuotaPlan(req.getStorageGroup(), req.getSpaceLimit()));
     if (response.getStatus() != null) {
+      if (response.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+            configManager.getNodeManager().getRegisteredDataNodeLocations();
+        AsyncClientHandler<TSetSpaceQuotaReq, TSStatus> clientHandler =
+            new AsyncClientHandler<>(DataNodeRequestType.SET_SPACE_QUOTA, req, dataNodeLocationMap);
+        AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+        return RpcUtils.squashResponseStatusList(clientHandler.getResponseList());
+      }
       return response.getStatus();
     } else {
       LOGGER.warn(
@@ -59,4 +78,21 @@
       return res;
     }
   }
+
+  public TShowSpaceQuotaResp showSpaceQuota(List<String> storageGroups) {
+    TShowSpaceQuotaResp showSpaceQuotaResp = new TShowSpaceQuotaResp();
+    if (storageGroups.isEmpty()) {
+      showSpaceQuotaResp.setSpaceQuota(quotaInfo.getSpaceQuotaLimit());
+    } else if (!quotaInfo.getSpaceQuotaLimit().isEmpty()) {
+      Map<String, TSpaceQuota> spaceQuotaMap = new HashMap<>();
+      for (String storageGroup : storageGroups) {
+        if (quotaInfo.getSpaceQuotaLimit().containsKey(storageGroup)) {
+          spaceQuotaMap.put(storageGroup, quotaInfo.getSpaceQuotaLimit().get(storageGroup));
+        }
+      }
+      showSpaceQuotaResp.setSpaceQuota(spaceQuotaMap);
+    }
+    showSpaceQuotaResp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+    return showSpaceQuotaResp;
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index dd98b90..0daa51c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -27,6 +27,7 @@
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -82,13 +83,13 @@
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.confignode.persistence.AuthorInfo;
 import org.apache.iotdb.confignode.persistence.ProcedureInfo;
-import org.apache.iotdb.confignode.persistence.QuotaInfo;
 import org.apache.iotdb.confignode.persistence.TriggerInfo;
 import org.apache.iotdb.confignode.persistence.UDFInfo;
 import org.apache.iotdb.confignode.persistence.cq.CQInfo;
 import org.apache.iotdb.confignode.persistence.executor.ConfigPlanExecutor;
 import org.apache.iotdb.confignode.persistence.node.NodeInfo;
 import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
+import org.apache.iotdb.confignode.persistence.quota.QuotaInfo;
 import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
 import org.apache.iotdb.confignode.persistence.sync.ClusterSyncInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
@@ -124,13 +125,13 @@
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSetDataNodeStatusReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaTemplateReq;
-import org.apache.iotdb.confignode.rpc.thrift.TSetSpaceQuotaReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
+import org.apache.iotdb.confignode.rpc.thrift.TShowSpaceQuotaResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowStorageGroupResp;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
 import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
@@ -1332,6 +1333,13 @@
         : status;
   }
 
+  public TShowSpaceQuotaResp showSpaceQuotaResp(List<String> storageGroups) {
+    TSStatus status = confirmLeader();
+    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        ? clusterQuotaManager.showSpaceQuota(storageGroups)
+        : new TShowSpaceQuotaResp(status);
+  }
+
   /** Get all related schemaRegion which may contains the timeSeries matched by given patternTree */
   public Map<TConsensusGroupId, TRegionReplicaSet> getRelatedSchemaRegionGroup(
       PathPatternTree patternTree) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index c71a18f..8344d68 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -22,6 +22,7 @@
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
 import org.apache.iotdb.confignode.consensus.request.auth.AuthorPlan;
@@ -82,7 +83,6 @@
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSetDataNodeStatusReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaTemplateReq;
-import org.apache.iotdb.confignode.rpc.thrift.TSetSpaceQuotaReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/QuotaInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/QuotaInfo.java
deleted file mode 100644
index 026d11e..0000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/QuotaInfo.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.persistence;
-
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.common.rpc.thrift.TSpaceQuota;
-import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
-import org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-// TODO: Store quota information of each sg
-public class QuotaInfo implements SnapshotProcessor {
-
-  private static final Logger logger = LoggerFactory.getLogger(QuotaInfo.class);
-  private final Map<String, TSpaceQuota> spaceQuotaLimit;
-  private final Map<String, TSpaceQuota> useSpaceQuota;
-  private final Map<Integer, Integer> regionDisk;
-
-  public QuotaInfo() {
-    spaceQuotaLimit = new HashMap<>();
-    useSpaceQuota = new HashMap<>();
-    regionDisk = new HashMap<>();
-  }
-
-  public TSStatus setSpaceQuota(SetSpaceQuotaPlan setSpaceQuotaPlan) {
-    for (String storageGroup : setSpaceQuotaPlan.getPrefixPathList()) {
-      spaceQuotaLimit.put(storageGroup, setSpaceQuotaPlan.getSpaceLimit());
-    }
-    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
-  }
-
-  // TODO: add Snapshot
-  @Override
-  public boolean processTakeSnapshot(File snapshotDir) throws TException, IOException {
-    return false;
-  }
-
-  @Override
-  public void processLoadSnapshot(File snapshotDir) throws TException, IOException {}
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index 886362b..df9fffc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -93,12 +93,12 @@
 import org.apache.iotdb.confignode.exception.physical.UnknownPhysicalPlanTypeException;
 import org.apache.iotdb.confignode.persistence.AuthorInfo;
 import org.apache.iotdb.confignode.persistence.ProcedureInfo;
-import org.apache.iotdb.confignode.persistence.QuotaInfo;
 import org.apache.iotdb.confignode.persistence.TriggerInfo;
 import org.apache.iotdb.confignode.persistence.UDFInfo;
 import org.apache.iotdb.confignode.persistence.cq.CQInfo;
 import org.apache.iotdb.confignode.persistence.node.NodeInfo;
 import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
+import org.apache.iotdb.confignode.persistence.quota.QuotaInfo;
 import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
 import org.apache.iotdb.confignode.persistence.sync.ClusterSyncInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/quota/QuotaInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/quota/QuotaInfo.java
new file mode 100644
index 0000000..6f9d32a
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/quota/QuotaInfo.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.persistence.quota;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSpaceQuota;
+import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
+import org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class QuotaInfo implements SnapshotProcessor {
+
+  private static final Logger logger = LoggerFactory.getLogger(QuotaInfo.class);
+
+  private final ReentrantReadWriteLock spaceQuotaReadWriteLock;
+  private final Map<String, TSpaceQuota> spaceQuotaLimit;
+  private final Map<String, TSpaceQuota> useSpaceQuota;
+  private final Map<Integer, Integer> regionDisk;
+
+  private final String snapshotFileName = "quota_info.bin";
+
+  public QuotaInfo() {
+    spaceQuotaReadWriteLock = new ReentrantReadWriteLock();
+    spaceQuotaLimit = new HashMap<>();
+    useSpaceQuota = new HashMap<>();
+    regionDisk = new HashMap<>();
+  }
+
+  public TSStatus setSpaceQuota(SetSpaceQuotaPlan setSpaceQuotaPlan) {
+    for (String storageGroup : setSpaceQuotaPlan.getPrefixPathList()) {
+      TSpaceQuota spaceQuota = setSpaceQuotaPlan.getSpaceLimit();
+      // “0” means that the user has not reset the value of the space quota type
+      // So the old values are still used
+      if (spaceQuotaLimit.containsKey(storageGroup)) {
+        if (spaceQuota.getDeviceNum() == 0) {
+          spaceQuota.setDeviceNum(spaceQuotaLimit.get(storageGroup).getDeviceNum());
+        }
+        if (spaceQuota.getTimeserieNum() == 0) {
+          spaceQuota.setTimeserieNum(spaceQuotaLimit.get(storageGroup).getTimeserieNum());
+        }
+        if (spaceQuota.getDiskSize() == 0) {
+          spaceQuota.setDiskSize(spaceQuotaLimit.get(storageGroup).getDiskSize());
+        }
+      }
+      spaceQuotaLimit.put(storageGroup, spaceQuota);
+    }
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  public Map<String, TSpaceQuota> getSpaceQuotaLimit() {
+    return spaceQuotaLimit;
+  }
+
+  // TODO: add Snapshot
+  @Override
+  public boolean processTakeSnapshot(File snapshotDir) throws TException, IOException {
+    File snapshotFile = new File(snapshotDir, snapshotFileName);
+    if (snapshotFile.exists() && snapshotFile.isFile()) {
+      logger.error(
+          "Failed to take snapshot, because snapshot file [{}] is already exist.",
+          snapshotFile.getAbsolutePath());
+      return false;
+    }
+
+    spaceQuotaReadWriteLock.writeLock().lock();
+    try (FileOutputStream fileOutputStream = new FileOutputStream(snapshotFile)) {
+      serializeSpaceQuotaLimit(fileOutputStream);
+    } finally {
+      spaceQuotaReadWriteLock.writeLock().unlock();
+    }
+    return true;
+  }
+
+  private void serializeSpaceQuotaLimit(FileOutputStream fileOutputStream) throws IOException {
+    ReadWriteIOUtils.write(spaceQuotaLimit.size(), fileOutputStream);
+    for (Map.Entry<String, TSpaceQuota> spaceQuotaEntry : spaceQuotaLimit.entrySet()) {
+      ReadWriteIOUtils.write(spaceQuotaEntry.getKey(), fileOutputStream);
+      ReadWriteIOUtils.write(spaceQuotaEntry.getValue().getDeviceNum(), fileOutputStream);
+      ReadWriteIOUtils.write(spaceQuotaEntry.getValue().getTimeserieNum(), fileOutputStream);
+      ReadWriteIOUtils.write(spaceQuotaEntry.getValue().getDiskSize(), fileOutputStream);
+    }
+  }
+
+  @Override
+  public void processLoadSnapshot(File snapshotDir) throws TException, IOException {
+    File snapshotFile = new File(snapshotDir, snapshotFileName);
+    if (!snapshotFile.exists() || !snapshotFile.isFile()) {
+      logger.error(
+          "Failed to load snapshot,snapshot file [{}] is not exist.",
+          snapshotFile.getAbsolutePath());
+      return;
+    }
+    spaceQuotaReadWriteLock.writeLock().lock();
+    try (FileInputStream fileInputStream = new FileInputStream(snapshotFile)) {
+      clear();
+      deserializeSpaceQuotaLimit(fileInputStream);
+    } finally {
+      spaceQuotaReadWriteLock.writeLock().unlock();
+    }
+  }
+
+  private void deserializeSpaceQuotaLimit(FileInputStream fileInputStream) throws IOException {
+    int size = ReadWriteIOUtils.readInt(fileInputStream);
+    while (size > 0) {
+      String path = ReadWriteIOUtils.readString(fileInputStream);
+      TSpaceQuota spaceQuota = new TSpaceQuota();
+      spaceQuota.setDeviceNum(ReadWriteIOUtils.readInt(fileInputStream));
+      spaceQuota.setTimeserieNum(ReadWriteIOUtils.readInt(fileInputStream));
+      spaceQuota.setDiskSize(ReadWriteIOUtils.readLong(fileInputStream));
+      spaceQuotaLimit.put(path, spaceQuota);
+      size--;
+    }
+  }
+
+  public void clear() {
+    spaceQuotaLimit.clear();
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 1da0aac..a4196a4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -22,6 +22,7 @@
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
 import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.auth.AuthException;
@@ -124,7 +125,6 @@
 import org.apache.iotdb.confignode.rpc.thrift.TSetDataReplicationFactorReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaReplicationFactorReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaTemplateReq;
-import org.apache.iotdb.confignode.rpc.thrift.TSetSpaceQuotaReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSetTimePartitionIntervalReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
@@ -135,6 +135,7 @@
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
+import org.apache.iotdb.confignode.rpc.thrift.TShowSpaceQuotaResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowStorageGroupResp;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
@@ -779,4 +780,9 @@
   public TSStatus setSpaceQuota(TSetSpaceQuotaReq req) throws TException {
     return configManager.setSpaceQuota(req);
   }
+
+  @Override
+  public TShowSpaceQuotaResp showSpaceQuota(List<String> storageGroups) throws TException {
+    return configManager.showSpaceQuotaResp(storageGroups);
+  }
 }
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index 3bfc22e..7234469 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -1312,7 +1312,7 @@
     TSpaceQuota spaceQuota = new TSpaceQuota();
     spaceQuota.setDeviceNum(2);
     spaceQuota.setTimeserieNum(3);
-    spaceQuota.setDisk(1024);
+    spaceQuota.setDiskSize(1024);
     SetSpaceQuotaPlan plan =
         new SetSpaceQuotaPlan(Collections.singletonList("root.sg"), spaceQuota);
     SetSpaceQuotaPlan deserializedPlan =
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/QuotaInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/QuotaInfoTest.java
new file mode 100644
index 0000000..075237e
--- /dev/null
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/QuotaInfoTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.persistence;
+
+import org.apache.iotdb.common.rpc.thrift.TSpaceQuota;
+import org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan;
+import org.apache.iotdb.confignode.persistence.quota.QuotaInfo;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.iotdb.db.constant.TestConstant.BASE_OUTPUT_PATH;
+
+public class QuotaInfoTest {
+
+  private QuotaInfo quotaInfo;
+  private static final File snapshotDir = new File(BASE_OUTPUT_PATH, "snapshot");
+
+  @Before
+  public void setup() throws IOException {
+    quotaInfo = new QuotaInfo();
+    if (!snapshotDir.exists()) {
+      snapshotDir.mkdirs();
+    }
+  }
+
+  @After
+  public void cleanup() throws IOException {
+    if (snapshotDir.exists()) {
+      FileUtils.deleteDirectory(snapshotDir);
+    }
+  }
+
+  private void prepareQuotaInfo() {
+    List<String> prefixPathList = new ArrayList<>();
+    prefixPathList.add("root.sg");
+    prefixPathList.add("root.ln");
+    TSpaceQuota spaceQuota = new TSpaceQuota();
+    spaceQuota.setTimeserieNum(10000);
+    spaceQuota.setDeviceNum(100);
+    spaceQuota.setDiskSize(512);
+    SetSpaceQuotaPlan setSpaceQuotaPlan = new SetSpaceQuotaPlan(prefixPathList, spaceQuota);
+    quotaInfo.setSpaceQuota(setSpaceQuotaPlan);
+  }
+
+  @Test
+  public void testSnapshot() throws TException, IOException {
+    prepareQuotaInfo();
+
+    quotaInfo.processTakeSnapshot(snapshotDir);
+    QuotaInfo quotaInfo2 = new QuotaInfo();
+    quotaInfo2.processLoadSnapshot(snapshotDir);
+
+    Assert.assertEquals(quotaInfo.getSpaceQuotaLimit(), quotaInfo2.getSpaceQuotaLimit());
+  }
+}
diff --git a/confignode/src/test/resources/confignode1conf/iotdb-common.properties b/confignode/src/test/resources/confignode1conf/iotdb-common.properties
index caf36cc..442e767 100644
--- a/confignode/src/test/resources/confignode1conf/iotdb-common.properties
+++ b/confignode/src/test/resources/confignode1conf/iotdb-common.properties
@@ -24,4 +24,6 @@
 data_replication_factor=3
 udf_lib_dir=target/confignode1/ext/udf
 trigger_lib_dir=target/confignode1/ext/trigger
-config_node_ratis_log_appender_buffer_size_max = 14194304
\ No newline at end of file
+config_node_ratis_log_appender_buffer_size_max = 14194304
+space_quota_dir=target/confignode1/system/quota/space
+
diff --git a/confignode/src/test/resources/confignode2conf/iotdb-common.properties b/confignode/src/test/resources/confignode2conf/iotdb-common.properties
index 2f25f57..a001002 100644
--- a/confignode/src/test/resources/confignode2conf/iotdb-common.properties
+++ b/confignode/src/test/resources/confignode2conf/iotdb-common.properties
@@ -24,4 +24,6 @@
 data_replication_factor=3
 udf_lib_dir=target/confignode2/ext/udf
 trigger_lib_dir=target/confignode2/ext/trigger
-config_node_ratis_log_appender_buffer_size_max = 14194304
\ No newline at end of file
+config_node_ratis_log_appender_buffer_size_max = 14194304
+space_quota_dir=target/confignode2/system/quota/space
+
diff --git a/confignode/src/test/resources/confignode3conf/iotdb-common.properties b/confignode/src/test/resources/confignode3conf/iotdb-common.properties
index 208d039..fe9eb1d 100644
--- a/confignode/src/test/resources/confignode3conf/iotdb-common.properties
+++ b/confignode/src/test/resources/confignode3conf/iotdb-common.properties
@@ -24,4 +24,5 @@
 data_replication_factor=3
 udf_lib_dir=target/confignode3/ext/udf
 trigger_lib_dir=target/confignode3/ext/trigger
-config_node_ratis_log_appender_buffer_size_max = 14194304
\ No newline at end of file
+config_node_ratis_log_appender_buffer_size_max = 14194304
+space_quota_dir=target/confignode3/system/quota/space
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index f41b0e6..b32508a 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -1111,3 +1111,22 @@
 # If enabled, we can set different quotas for storage groups
 # Datatype: boolean
 # quota_enable=false
+
+# Space quota dir
+# If this property is unset, system will save the data in the default relative path directory under
+# the space quota folder(i.e., %CONFIGNODE_HOME%/system/quota/space).
+#
+# If it is absolute, system will save the data in exact location it points to.
+# If it is relative, system will save the data in the relative path directory it indicates under the
+# Space quota folder.
+# Note: If data_dir is assigned an empty string(i.e.,zero-size), it will be handled as a relative
+# path.
+#
+# For Window platform
+# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is
+# absolute. Otherwise, it is relative.
+# space_quota_dir=system\\quota\\space
+#
+# For Linux platform
+# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
+# space_quota_dir=system/quota/space
\ No newline at end of file
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index f485208..da3ad97 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -257,6 +257,7 @@
 
   // quota
   public static final String SPACE_QUOTA_DISK = "disk";
+  public static final String SPACE_QUOTA_UNLIMITED = "unlimited";
 
   // client version number
   public enum ClientVersion {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/enums/SpaceQuotaType.java b/node-commons/src/main/java/org/apache/iotdb/commons/enums/SpaceQuotaType.java
index 36b23e4..bfbb656 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/enums/SpaceQuotaType.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/enums/SpaceQuotaType.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.commons.enums;
 
 public enum SpaceQuotaType {
-  DISK,
-  DEVICE_NUMBER,
-  TIMESERIES_NUMBER
+  diskSize,
+  deviceNum,
+  timeSeriesNum
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 463afad..b35d51c 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -23,6 +23,7 @@
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
 import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
 import org.apache.iotdb.commons.client.BaseClientFactory;
 import org.apache.iotdb.commons.client.ClientFactoryProperty;
@@ -91,7 +92,6 @@
 import org.apache.iotdb.confignode.rpc.thrift.TSetDataReplicationFactorReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaReplicationFactorReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaTemplateReq;
-import org.apache.iotdb.confignode.rpc.thrift.TSetSpaceQuotaReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSetTimePartitionIntervalReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
@@ -102,6 +102,7 @@
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
+import org.apache.iotdb.confignode.rpc.thrift.TShowSpaceQuotaResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowStorageGroupResp;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
 import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
@@ -1753,6 +1754,22 @@
     throw new TException(MSG_RECONNECTION_FAIL);
   }
 
+  @Override
+  public TShowSpaceQuotaResp showSpaceQuota(List<String> storageGroups) throws TException {
+    for (int i = 0; i < RETRY_NUM; i++) {
+      try {
+        TShowSpaceQuotaResp resp = client.showSpaceQuota(storageGroups);
+        if (!updateConfigNodeLeader(resp.getStatus())) {
+          return resp;
+        }
+      } catch (TException e) {
+        configLeader = null;
+      }
+      reconnect();
+    }
+    throw new TException(MSG_RECONNECTION_FAIL);
+  }
+
   public static class Factory extends BaseClientFactory<ConfigNodeRegionId, ConfigNodeClient> {
     public Factory(
         ClientManager<ConfigNodeRegionId, ConfigNodeClient> clientManager,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
index 1d93103..ba7f10f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
@@ -123,6 +123,12 @@
   public static final String CQID = "CQId";
   public static final String QUERY = "Query";
 
+  // column names for show space quota
+  public static final String COLUMN_DATABASE = "database";
+  public static final String COLUMN_QUOTA_TYPE = "quotaType";
+  public static final String COLUMN_LIMIT = "limit";
+  public static final String COLUMN_USED = "used";
+
   public static final List<ColumnHeader> lastQueryColumnHeaders =
       ImmutableList.of(
           new ColumnHeader(TIMESERIES, TSDataType.TEXT),
@@ -325,4 +331,10 @@
           new ColumnHeader(CQID, TSDataType.TEXT),
           new ColumnHeader(QUERY, TSDataType.TEXT),
           new ColumnHeader(STATE, TSDataType.TEXT));
+
+  public static final List<ColumnHeader> showSpaceQuotaColumnHeaders =
+      ImmutableList.of(
+          new ColumnHeader(COLUMN_DATABASE, TSDataType.TEXT),
+          new ColumnHeader(COLUMN_QUOTA_TYPE, TSDataType.TEXT),
+          new ColumnHeader(COLUMN_LIMIT, TSDataType.TEXT));
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
index 00d4f7a..f3343f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
@@ -154,4 +154,8 @@
   public static DatasetHeader getShowContinuousQueriesHeader() {
     return new DatasetHeader(ColumnHeaderConstant.showContinuousQueriesColumnHeaders, true);
   }
+
+  public static DatasetHeader getShowSpaceQuotaHeader() {
+    return new DatasetHeader(ColumnHeaderConstant.showSpaceQuotaColumnHeaders, true);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/constant/StatementType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/constant/StatementType.java
index 7e926f1..c179719 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/constant/StatementType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/constant/StatementType.java
@@ -152,4 +152,5 @@
   DEACTIVATE_TEMPLATE,
 
   SET_SPACE_QUOTA,
+  SHOW_SPACE_QUOTA,
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
index 43c8610..e11fc1f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
@@ -59,6 +59,7 @@
 import org.apache.iotdb.db.mpp.plan.execution.config.sys.MergeTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.sys.SetSystemStatusTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.sys.quota.SetSpaceQuotaTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.sys.quota.ShowSpaceQuotaTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.CreatePipeSinkTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.CreatePipeTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.DropPipeSinkTask;
@@ -109,6 +110,7 @@
 import org.apache.iotdb.db.mpp.plan.statement.sys.MergeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.SetSystemStatusStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.quota.SetSpaceQuotaStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.quota.ShowSpaceQuotaStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement;
@@ -307,6 +309,12 @@
   }
 
   @Override
+  public IConfigTask visitShowSpaceQuota(
+      ShowSpaceQuotaStatement showSpaceQuotaStatement, TaskContext context) {
+    return new ShowSpaceQuotaTask(showSpaceQuotaStatement);
+  }
+
+  @Override
   public IConfigTask visitShowDataNodes(
       ShowDataNodesStatement showDataNodesStatement, TaskContext context) {
     return new ShowDataNodesTask(showDataNodesStatement);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 66c720b..0431c8c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -21,6 +21,7 @@
 
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
 import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
 import org.apache.iotdb.common.rpc.thrift.TSpaceQuota;
 import org.apache.iotdb.commons.client.IClientManager;
@@ -58,7 +59,6 @@
 import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
-import org.apache.iotdb.confignode.rpc.thrift.TSetSpaceQuotaReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
@@ -69,6 +69,7 @@
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
+import org.apache.iotdb.confignode.rpc.thrift.TShowSpaceQuotaResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowStorageGroupResp;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
@@ -99,6 +100,7 @@
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.ShowNodesInSchemaTemplateTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.ShowPathSetTemplateTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.ShowSchemaTemplateTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.sys.quota.ShowSpaceQuotaTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.ShowPipeSinkTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.ShowPipeTask;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
@@ -126,6 +128,7 @@
 import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowSchemaTemplateStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.template.UnsetSchemaTemplateStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.quota.SetSpaceQuotaStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.quota.ShowSpaceQuotaStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement;
@@ -161,6 +164,7 @@
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -1096,10 +1100,10 @@
     TSpaceQuota spaceQuota = new TSpaceQuota();
     spaceQuota.setDeviceNum(setSpaceQuotaStatement.getDeviceNum());
     spaceQuota.setTimeserieNum(setSpaceQuotaStatement.getTimeSeriesNum());
-    spaceQuota.setDisk(setSpaceQuotaStatement.getDiskSize());
+    spaceQuota.setDiskSize(setSpaceQuotaStatement.getDiskSize());
     req.setSpaceLimit(spaceQuota);
     try (ConfigNodeClient client =
-        CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+        CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
       // Send request to some API server
       tsStatus = client.setSpaceQuota(req);
     } catch (IOException | TException e) {
@@ -1114,6 +1118,29 @@
   }
 
   @Override
+  public SettableFuture<ConfigTaskResult> showSpaceQuota(
+      ShowSpaceQuotaStatement showSpaceQuotaStatement) {
+    SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+
+    try (ConfigNodeClient configNodeClient =
+        CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
+      List<String> storageGroups = new ArrayList<>();
+      if (showSpaceQuotaStatement.getStorageGroups() != null) {
+        showSpaceQuotaStatement
+            .getStorageGroups()
+            .forEach(storageGroup -> storageGroups.add(storageGroup.toString()));
+      }
+      // Send request to some API server
+      TShowSpaceQuotaResp showSpaceQuotaResp = configNodeClient.showSpaceQuota(storageGroups);
+      // build TSBlock
+      ShowSpaceQuotaTask.buildTSBlock(showSpaceQuotaResp, future);
+    } catch (Exception e) {
+      future.setException(e);
+    }
+    return future;
+  }
+
+  @Override
   public SettableFuture<ConfigTaskResult> createPipeSink(
       CreatePipeSinkStatement createPipeSinkStatement) {
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
index 7b2ffa5..f182a77 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -47,6 +47,7 @@
 import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowSchemaTemplateStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.template.UnsetSchemaTemplateStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.quota.SetSpaceQuotaStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.quota.ShowSpaceQuotaStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement;
@@ -132,6 +133,8 @@
 
   SettableFuture<ConfigTaskResult> setSpaceQuota(SetSpaceQuotaStatement setSpaceQuotaStatement);
 
+  SettableFuture<ConfigTaskResult> showSpaceQuota(ShowSpaceQuotaStatement showSpaceQuotaStatement);
+
   SettableFuture<ConfigTaskResult> createPipeSink(CreatePipeSinkStatement createPipeSinkStatement);
 
   SettableFuture<ConfigTaskResult> dropPipeSink(DropPipeSinkStatement dropPipeSinkStatement);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
index 38f2eba..8acee47 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
@@ -65,6 +65,7 @@
 import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowSchemaTemplateStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.template.UnsetSchemaTemplateStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.quota.SetSpaceQuotaStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.quota.ShowSpaceQuotaStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement;
@@ -535,7 +536,18 @@
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
     future.setException(
         new IoTDBException(
-            "Executing unset schema template is not supported",
+            "Executing set space quota is not supported",
+            TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
+    return future;
+  }
+
+  @Override
+  public SettableFuture<ConfigTaskResult> showSpaceQuota(
+      ShowSpaceQuotaStatement showSpaceQuotaStatement) {
+    SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+    future.setException(
+        new IoTDBException(
+            "Executing show space quota is not supported",
             TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
     return future;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/quota/ShowSpaceQuotaTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/quota/ShowSpaceQuotaTask.java
new file mode 100644
index 0000000..7d77972
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/quota/ShowSpaceQuotaTask.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.execution.config.sys.quota;
+
+import org.apache.iotdb.common.rpc.thrift.TSpaceQuota;
+import org.apache.iotdb.commons.enums.SpaceQuotaType;
+import org.apache.iotdb.confignode.rpc.thrift.TShowSpaceQuotaResp;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeaderFactory;
+import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
+import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
+import org.apache.iotdb.db.mpp.plan.statement.sys.quota.ShowSpaceQuotaStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class ShowSpaceQuotaTask implements IConfigTask {
+
+  private final ShowSpaceQuotaStatement showSpaceQuotaStatement;
+
+  public ShowSpaceQuotaTask(ShowSpaceQuotaStatement showSpaceQuotaStatement) {
+    this.showSpaceQuotaStatement = showSpaceQuotaStatement;
+  }
+
+  @Override
+  public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor)
+      throws InterruptedException {
+    return configTaskExecutor.showSpaceQuota(showSpaceQuotaStatement);
+  }
+
+  public static void buildTSBlock(
+      TShowSpaceQuotaResp resp, SettableFuture<ConfigTaskResult> future) {
+    List<TSDataType> outputDataTypes =
+        ColumnHeaderConstant.showSpaceQuotaColumnHeaders.stream()
+            .map(ColumnHeader::getColumnType)
+            .collect(Collectors.toList());
+    TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
+    if (resp.getSpaceQuota() != null) {
+      for (Map.Entry<String, TSpaceQuota> spaceQuotaEntry : resp.getSpaceQuota().entrySet()) {
+        if (spaceQuotaEntry.getValue().getDiskSize() != -1) {
+          builder.getTimeColumnBuilder().writeLong(0L);
+          builder.getColumnBuilder(0).writeBinary(Binary.valueOf(spaceQuotaEntry.getKey()));
+          builder.getColumnBuilder(1).writeBinary(Binary.valueOf(SpaceQuotaType.diskSize.name()));
+          builder
+              .getColumnBuilder(2)
+              .writeBinary(
+                  Binary.valueOf(
+                      spaceQuotaEntry.getValue().getDiskSize() == 0
+                          ? "unlimited"
+                          : spaceQuotaEntry.getValue().getDiskSize() + "M"));
+          builder.declarePosition();
+        }
+        if (spaceQuotaEntry.getValue().getDeviceNum() != -1) {
+          builder.getTimeColumnBuilder().writeLong(0L);
+          builder.getColumnBuilder(0).writeBinary(Binary.valueOf(spaceQuotaEntry.getKey()));
+          builder.getColumnBuilder(1).writeBinary(Binary.valueOf(SpaceQuotaType.deviceNum.name()));
+          builder
+              .getColumnBuilder(2)
+              .writeBinary(
+                  Binary.valueOf(
+                      spaceQuotaEntry.getValue().getDeviceNum() == 0
+                          ? "unlimited"
+                          : spaceQuotaEntry.getValue().getDeviceNum() + ""));
+          builder.declarePosition();
+        }
+        if (spaceQuotaEntry.getValue().getTimeserieNum() != -1) {
+          builder.getTimeColumnBuilder().writeLong(0L);
+          builder.getColumnBuilder(0).writeBinary(Binary.valueOf(spaceQuotaEntry.getKey()));
+          builder
+              .getColumnBuilder(1)
+              .writeBinary(Binary.valueOf(SpaceQuotaType.timeSeriesNum.name()));
+          builder
+              .getColumnBuilder(2)
+              .writeBinary(
+                  Binary.valueOf(
+                      spaceQuotaEntry.getValue().getTimeserieNum() == 0
+                          ? "unlimited"
+                          : spaceQuotaEntry.getValue().getTimeserieNum() + ""));
+          builder.declarePosition();
+        }
+      }
+    }
+    DatasetHeader datasetHeader = DatasetHeaderFactory.getShowSpaceQuotaHeader();
+    future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, builder.build(), datasetHeader));
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index f04e01e..001949e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -147,6 +147,7 @@
 import org.apache.iotdb.db.mpp.plan.statement.sys.SetSystemStatusStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.ShowVersionStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.quota.SetSpaceQuotaStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.quota.ShowSpaceQuotaStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement;
@@ -2169,6 +2170,10 @@
   // Quota
   @Override
   public Statement visitSetSpaceQuota(IoTDBSqlParser.SetSpaceQuotaContext ctx) {
+    if (!IoTDBDescriptor.getInstance().getConfig().isQuotaEnable()) {
+      throw new SQLParserException("Limit configuration is not enabled, please enable it first.");
+    }
+
     SetSpaceQuotaStatement setSpaceQuotaStatement = new SetSpaceQuotaStatement();
     List<IoTDBSqlParser.PrefixPathContext> prefixPathContexts = ctx.prefixPath();
     List<String> paths = new ArrayList<>();
@@ -2185,35 +2190,73 @@
     }
 
     if (quotas.containsKey(IoTDBConstant.COLUMN_DEVICES)) {
-      setSpaceQuotaStatement.setDeviceNum(
-          Integer.parseInt(quotas.get(IoTDBConstant.COLUMN_DEVICES)));
+      if (quotas.get(IoTDBConstant.COLUMN_DEVICES).equals(IoTDBConstant.SPACE_QUOTA_UNLIMITED)) {
+        setSpaceQuotaStatement.setTimeSeriesNum(0);
+      } else if (Integer.parseInt(quotas.get(IoTDBConstant.COLUMN_DEVICES)) <= 0) {
+        throw new SQLParserException("Please set the number of devices greater than 0");
+      } else {
+        setSpaceQuotaStatement.setDeviceNum(
+            Integer.parseInt(quotas.get(IoTDBConstant.COLUMN_DEVICES)));
+      }
     }
     if (quotas.containsKey(IoTDBConstant.COLUMN_TIMESERIES)) {
-      setSpaceQuotaStatement.setTimeSeriesNum(
-          Integer.parseInt(quotas.get(IoTDBConstant.COLUMN_TIMESERIES)));
+      if (quotas.get(IoTDBConstant.COLUMN_TIMESERIES).equals(IoTDBConstant.SPACE_QUOTA_UNLIMITED)) {
+        setSpaceQuotaStatement.setDiskSize(0);
+      } else if (Integer.parseInt(quotas.get(IoTDBConstant.COLUMN_TIMESERIES)) <= 0) {
+        throw new SQLParserException("Please set the number of timeseries greater than 0");
+      } else {
+        setSpaceQuotaStatement.setTimeSeriesNum(
+            Integer.parseInt(quotas.get(IoTDBConstant.COLUMN_TIMESERIES)));
+      }
     }
     if (quotas.containsKey(IoTDBConstant.SPACE_QUOTA_DISK)) {
-      setSpaceQuotaStatement.setDiskSize(parseUnit(quotas.get(IoTDBConstant.SPACE_QUOTA_DISK)));
+      if (quotas.get(IoTDBConstant.SPACE_QUOTA_DISK).equals(IoTDBConstant.SPACE_QUOTA_UNLIMITED)) {
+        setSpaceQuotaStatement.setDeviceNum(0);
+      } else {
+        setSpaceQuotaStatement.setDiskSize(parseUnit(quotas.get(IoTDBConstant.SPACE_QUOTA_DISK)));
+      }
     }
     return setSpaceQuotaStatement;
   }
 
+  @Override
+  public Statement visitShowSpaceQuota(IoTDBSqlParser.ShowSpaceQuotaContext ctx) {
+    if (!IoTDBDescriptor.getInstance().getConfig().isQuotaEnable()) {
+      throw new SQLParserException("Limit configuration is not enabled, please enable it first.");
+    }
+    ShowSpaceQuotaStatement showSpaceQuotaStatement = new ShowSpaceQuotaStatement();
+    List<PartialPath> storageGroups = null;
+    if (ctx.prefixPath() != null) {
+      storageGroups = new ArrayList<>();
+      for (IoTDBSqlParser.PrefixPathContext prefixPathContext : ctx.prefixPath()) {
+        storageGroups.add(parsePrefixPath(prefixPathContext));
+      }
+      showSpaceQuotaStatement.setStorageGroups(storageGroups);
+    } else {
+      showSpaceQuotaStatement.setStorageGroups(null);
+    }
+    return showSpaceQuotaStatement;
+  }
+
   private long parseUnit(String data) {
     String unit = data.substring(data.length() - 1);
     long disk = Long.parseLong(data.substring(0, data.length() - 1));
+    if (disk <= 0) {
+      throw new SQLParserException("Please set the disk size greater than 0");
+    }
     switch (unit) {
       case "M":
       case "m":
-        return disk * 1024;
+        return disk;
       case "G":
       case "g":
-        return disk * 1024 * 1024;
+        return disk * 1024;
       case "T":
       case "t":
-        return disk * 1024 * 1024 * 1024;
+        return disk * 1024 * 1024;
       case "P":
       case "p":
-        return disk * 1024 * 1024 * 1024 * 1024;
+        return disk * 1024 * 1024 * 1024;
       default:
         throw new SQLParserException(
             "When setting the disk size, the unit is incorrect. Please use 'M', 'G', 'P', 'T' as the unit");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
index 29ac69a..dbdb04a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
@@ -85,6 +85,7 @@
 import org.apache.iotdb.db.mpp.plan.statement.sys.SetSystemStatusStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.ShowVersionStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.quota.SetSpaceQuotaStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.quota.ShowSpaceQuotaStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement;
@@ -450,4 +451,8 @@
   public R visitSetSpaceQuota(SetSpaceQuotaStatement setSpaceQuotaStatement, C context) {
     return visitStatement(setSpaceQuotaStatement, context);
   }
+
+  public R visitShowSpaceQuota(ShowSpaceQuotaStatement showSpaceQuotaStatement, C context) {
+    return visitStatement(showSpaceQuotaStatement, context);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/quota/ShowSpaceQuotaStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/quota/ShowSpaceQuotaStatement.java
new file mode 100644
index 0000000..8cf304f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/quota/ShowSpaceQuotaStatement.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.sys.quota;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import org.apache.iotdb.db.mpp.plan.constant.StatementType;
+import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+
+import java.util.List;
+
+public class ShowSpaceQuotaStatement extends Statement implements IConfigStatement {
+
+  private List<PartialPath> storageGroups;
+
+  public ShowSpaceQuotaStatement() {
+    super();
+    statementType = StatementType.SHOW_SPACE_QUOTA;
+  }
+
+  public List<PartialPath> getStorageGroups() {
+    return storageGroups;
+  }
+
+  public void setStorageGroups(List<PartialPath> storageGroups) {
+    this.storageGroups = storageGroups;
+  }
+
+  @Override
+  public QueryType getQueryType() {
+    return QueryType.READ;
+  }
+
+  @Override
+  public List<PartialPath> getPaths() {
+    return storageGroups;
+  }
+
+  @Override
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitShowSpaceQuota(this, context);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/quotas/DataNodeSpaceQuotaManager.java b/server/src/main/java/org/apache/iotdb/db/quotas/DataNodeSpaceQuotaManager.java
index 776080b..ee8dced 100644
--- a/server/src/main/java/org/apache/iotdb/db/quotas/DataNodeSpaceQuotaManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/quotas/DataNodeSpaceQuotaManager.java
@@ -19,16 +19,48 @@
 
 package org.apache.iotdb.db.quotas;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
 import org.apache.iotdb.common.rpc.thrift.TSpaceQuota;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
 import java.util.Map;
 
 // TODO: Store quota information of each sg
 public class DataNodeSpaceQuotaManager {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeSpaceQuotaManager.class);
+
   private Map<String, TSpaceQuota> spaceQuotaLimit;
   private Map<String, TSpaceQuota> useSpaceQuota;
   private Map<Integer, Integer> regionDisk;
 
-  public DataNodeSpaceQuotaManager() {}
+  public DataNodeSpaceQuotaManager() {
+    spaceQuotaLimit = new HashMap<>();
+    useSpaceQuota = new HashMap<>();
+    regionDisk = new HashMap<>();
+  }
+
+  /** SingleTone */
+  private static class DataNodeSpaceQuotaManagerHolder {
+    private static final DataNodeSpaceQuotaManager INSTANCE = new DataNodeSpaceQuotaManager();
+
+    private DataNodeSpaceQuotaManagerHolder() {}
+  }
+
+  public static DataNodeSpaceQuotaManager getInstance() {
+    return DataNodeSpaceQuotaManager.DataNodeSpaceQuotaManagerHolder.INSTANCE;
+  }
+
+  public TSStatus setSpaceQuota(TSetSpaceQuotaReq req) {
+    for (String storageGroup : req.getStorageGroup()) {
+      spaceQuotaLimit.put(storageGroup, req.getSpaceLimit());
+    }
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index f691488..ca737e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -25,6 +25,7 @@
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
 import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.conf.CommonConfig;
@@ -109,6 +110,7 @@
 import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.db.query.control.clientsession.IClientSession;
 import org.apache.iotdb.db.query.control.clientsession.InternalClientSession;
+import org.apache.iotdb.db.quotas.DataNodeSpaceQuotaManager;
 import org.apache.iotdb.db.service.DataNode;
 import org.apache.iotdb.db.service.RegionMigrateService;
 import org.apache.iotdb.db.sync.SyncService;
@@ -225,6 +227,9 @@
 
   private final DataNodeRegionManager regionManager = DataNodeRegionManager.getInstance();
 
+  private final DataNodeSpaceQuotaManager spaceQuotaManager =
+      DataNodeSpaceQuotaManager.getInstance();
+
   public DataNodeInternalRPCServiceImpl() {
     super();
     if (config.isClusterMode()) {
@@ -905,6 +910,11 @@
     }
   }
 
+  @Override
+  public TSStatus setSpaceQuota(TSetSpaceQuotaReq req) throws TException {
+    return spaceQuotaManager.setSpaceQuota(req);
+  }
+
   private PathPatternTree filterPathPatternTree(PathPatternTree patternTree, String storageGroup) {
     PathPatternTree filteredPatternTree = new PathPatternTree();
     try {
diff --git a/thrift-commons/src/main/thrift/common.thrift b/thrift-commons/src/main/thrift/common.thrift
index 87800d9..c1b0f35 100644
--- a/thrift-commons/src/main/thrift/common.thrift
+++ b/thrift-commons/src/main/thrift/common.thrift
@@ -125,7 +125,12 @@
 
 // quota
 struct TSpaceQuota {
-  1: optional i64 disk
+  1: optional i64 diskSize
   2: optional i32 deviceNum
   3: optional i32 timeserieNum
 }
+
+struct TSetSpaceQuotaReq {
+  1: required list<string> storageGroup
+  2: required TSpaceQuota spaceLimit
+}
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index e2fcb0e..66ac2d9 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -557,14 +557,6 @@
 }
 
 // ====================================================
-// Quota
-// ====================================================
-struct TSetSpaceQuotaReq {
-  1: required list<string> storageGroup
-  2: required common.TSpaceQuota spaceLimit
-}
-
-// ====================================================
 // CQ
 // ====================================================
 struct TCreateCQReq {
@@ -608,6 +600,14 @@
   3: required string path
 }
 
+// ====================================================
+// Quota
+// ====================================================
+struct TShowSpaceQuotaResp{
+  1: required common.TSStatus status
+  2: optional map<string, common.TSpaceQuota> spaceQuota
+}
+
 service IConfigNodeRPCService {
 
   // ======================================================
@@ -1086,6 +1086,11 @@
    */
   TShowCQResp showCQ()
 
-  common.TSStatus setSpaceQuota(TSetSpaceQuotaReq req)
+  /**
+   * Set space quota
+   **/
+  common.TSStatus setSpaceQuota(common.TSetSpaceQuotaReq req)
+
+  TShowSpaceQuotaResp showSpaceQuota(list<string> storageGroups);
 }
 
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index 174847c..c2926c0 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -639,6 +639,11 @@
   * Execute CQ on DataNode
   */
   common.TSStatus executeCQ(TExecuteCQ req)
+
+  /**
+   * Set space quota
+   **/
+  common.TSStatus setSpaceQuota(common.TSetSpaceQuotaReq req)
 }
 
 service MPPDataExchangeService {