[IOTDB-4378] Negtive feedback Region extension policy (#7400)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
index 7f017c6..ded73de 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
@@ -20,6 +20,7 @@
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.confignode.manager.node.DataNodeHeartbeatCache;
 import org.apache.iotdb.confignode.manager.node.NodeHeartbeatSample;
 import org.apache.iotdb.confignode.manager.partition.RegionGroupCache;
@@ -68,7 +69,9 @@
                               heartbeatResp.getHeartbeatTimestamp(),
                               receiveTime,
                               dataNodeLocation.getDataNodeId(),
-                              isLeader)));
+                              isLeader,
+                              // Region will inherit DataNode's status
+                              RegionStatus.parse(heartbeatResp.getStatus()))));
     }
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/exception/NotAvailableRegionGroupException.java b/confignode/src/main/java/org/apache/iotdb/confignode/exception/NotAvailableRegionGroupException.java
new file mode 100644
index 0000000..a9687a8
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/exception/NotAvailableRegionGroupException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.exception;
+
+public class NotAvailableRegionGroupException extends ConfigNodeException {
+
+  public NotAvailableRegionGroupException() {
+    super(
+        "There are no available RegionGroups currently, please check the status of cluster DataNodes");
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
index e950b9a..257a68d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
@@ -346,6 +346,23 @@
   }
 
   /**
+   * Only leader use this interface
+   *
+   * @param storageGroup StorageGroupName
+   * @param consensusGroupType SchemaRegion for SchemaReplicationFactor and DataRegion for
+   *     DataReplicationFactor
+   * @return SchemaReplicationFactor or DataReplicationFactor
+   * @throws StorageGroupNotExistsException When the specific StorageGroup doesn't exist
+   */
+  public int getReplicationFactor(String storageGroup, TConsensusGroupType consensusGroupType)
+      throws StorageGroupNotExistsException {
+    TStorageGroupSchema storageGroupSchema = getStorageGroupSchemaByName(storageGroup);
+    return consensusGroupType == TConsensusGroupType.SchemaRegion
+        ? storageGroupSchema.getSchemaReplicationFactor()
+        : storageGroupSchema.getDataReplicationFactor();
+  }
+
+  /**
    * Only leader use this interface.
    *
    * @param rawPathList List<StorageGroupName>
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 5e3d112..63be337 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -706,7 +706,7 @@
     }
     if (req.getDiskSpaceWarningThreshold()
         != CommonDescriptor.getInstance().getConfig().getDiskSpaceWarningThreshold()) {
-      return errorStatus.setMessage(errorPrefix + "disk_full_threshold" + errorSuffix);
+      return errorStatus.setMessage(errorPrefix + "disk_space_warning_threshold" + errorSuffix);
     }
     return null;
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 1ef7b97..7f01dcd 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -34,6 +34,7 @@
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
+import org.apache.iotdb.confignode.exception.NotAvailableRegionGroupException;
 import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
 import org.apache.iotdb.confignode.exception.StorageGroupNotExistsException;
 import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
@@ -118,7 +119,8 @@
    * @return Map<StorageGroupName, SchemaPartitionTable>, the allocating result
    */
   public Map<String, SchemaPartitionTable> allocateSchemaPartition(
-      Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap) {
+      Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap)
+      throws NotAvailableRegionGroupException {
     return partitionBalancer.allocateSchemaPartition(unassignedSchemaPartitionSlotsMap);
   }
 
@@ -130,7 +132,8 @@
    */
   public Map<String, DataPartitionTable> allocateDataPartition(
       Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>
-          unassignedDataPartitionSlotsMap) {
+          unassignedDataPartitionSlotsMap)
+      throws NotAvailableRegionGroupException {
     return partitionBalancer.allocateDataPartition(unassignedDataPartitionSlotsMap);
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
index 2c71f84..f51b0db 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
@@ -22,6 +22,7 @@
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.partition.DataPartitionTable;
 import org.apache.iotdb.commons.partition.SchemaPartitionTable;
+import org.apache.iotdb.confignode.exception.NotAvailableRegionGroupException;
 import org.apache.iotdb.confignode.manager.IManager;
 import org.apache.iotdb.confignode.manager.load.balancer.partition.GreedyPartitionAllocator;
 import org.apache.iotdb.confignode.manager.load.balancer.partition.IPartitionAllocator;
@@ -48,7 +49,8 @@
    * @return Map<StorageGroupName, SchemaPartitionTable>, the allocating result
    */
   public Map<String, SchemaPartitionTable> allocateSchemaPartition(
-      Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap) {
+      Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap)
+      throws NotAvailableRegionGroupException {
     return genPartitionAllocator().allocateSchemaPartition(unassignedSchemaPartitionSlotsMap);
   }
 
@@ -60,7 +62,8 @@
    */
   public Map<String, DataPartitionTable> allocateDataPartition(
       Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>
-          unassignedDataPartitionSlotsMap) {
+          unassignedDataPartitionSlotsMap)
+      throws NotAvailableRegionGroupException {
     return genPartitionAllocator().allocateDataPartition(unassignedDataPartitionSlotsMap);
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
index 010b2ac..43dcf46 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
@@ -35,7 +35,6 @@
 import org.apache.iotdb.confignode.manager.load.balancer.region.IRegionAllocator;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
-import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
 
 import java.util.List;
 import java.util.Map;
@@ -55,7 +54,7 @@
   }
 
   /**
-   * Generate a Regions allocation plan(CreateRegionsPlan)
+   * Generate a Regions' allocation plan(CreateRegionsPlan)
    *
    * @param allotmentMap Map<StorageGroupName, Region allotment>
    * @param consensusGroupType TConsensusGroupType of the new Regions
@@ -66,11 +65,27 @@
   public CreateRegionGroupsPlan genRegionsAllocationPlan(
       Map<String, Integer> allotmentMap, TConsensusGroupType consensusGroupType)
       throws NotEnoughDataNodeException, StorageGroupNotExistsException {
-    CreateRegionGroupsPlan createRegionGroupsPlan = new CreateRegionGroupsPlan();
-    IRegionAllocator regionAllocator = genRegionAllocator();
 
+    // The new Regions will occupy online DataNodes firstly
     List<TDataNodeConfiguration> onlineDataNodes =
         getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running);
+    // Some new Regions will have to occupy unknown DataNodes
+    // if the number of online DataNodes is insufficient
+    List<TDataNodeConfiguration> availableDataNodes =
+        getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.Unknown);
+
+    // Make sure the number of available DataNodes is enough for allocating new Regions
+    for (String storageGroup : allotmentMap.keySet()) {
+      int replicationFactor =
+          getClusterSchemaManager().getReplicationFactor(storageGroup, consensusGroupType);
+      if (availableDataNodes.size() < replicationFactor) {
+        throw new NotEnoughDataNodeException();
+      }
+    }
+
+    CreateRegionGroupsPlan createRegionGroupsPlan = new CreateRegionGroupsPlan();
+    IRegionAllocator regionAllocator = genRegionAllocator();
+    // Only considering the specified ConsensusGroupType when doing allocation
     List<TRegionReplicaSet> allocatedRegions = getPartitionManager().getAllReplicaSets();
     allocatedRegions.removeIf(
         allocateRegion -> allocateRegion.getRegionId().getType() != consensusGroupType);
@@ -78,25 +93,16 @@
     for (Map.Entry<String, Integer> entry : allotmentMap.entrySet()) {
       String storageGroup = entry.getKey();
       int allotment = entry.getValue();
-
-      // Get schema
-      TStorageGroupSchema storageGroupSchema =
-          getClusterSchemaManager().getStorageGroupSchemaByName(storageGroup);
       int replicationFactor =
-          consensusGroupType == TConsensusGroupType.SchemaRegion
-              ? storageGroupSchema.getSchemaReplicationFactor()
-              : storageGroupSchema.getDataReplicationFactor();
-
-      // Check validity
-      if (onlineDataNodes.size() < replicationFactor) {
-        throw new NotEnoughDataNodeException();
-      }
+          getClusterSchemaManager().getReplicationFactor(storageGroup, consensusGroupType);
+      List<TDataNodeConfiguration> targetDataNodes =
+          onlineDataNodes.size() >= replicationFactor ? onlineDataNodes : availableDataNodes;
 
       for (int i = 0; i < allotment; i++) {
         // Generate allocation plan
         TRegionReplicaSet newRegion =
             regionAllocator.allocateRegion(
-                onlineDataNodes,
+                targetDataNodes,
                 allocatedRegions,
                 replicationFactor,
                 new TConsensusGroupId(
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java
index 0800c97..73b5263 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java
@@ -26,6 +26,7 @@
 import org.apache.iotdb.commons.partition.SchemaPartitionTable;
 import org.apache.iotdb.commons.partition.SeriesPartitionTable;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.exception.NotAvailableRegionGroupException;
 import org.apache.iotdb.confignode.manager.IManager;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -50,27 +51,30 @@
 
   @Override
   public Map<String, SchemaPartitionTable> allocateSchemaPartition(
-      Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap) {
+      Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap)
+      throws NotAvailableRegionGroupException {
     Map<String, SchemaPartitionTable> result = new ConcurrentHashMap<>();
 
-    unassignedSchemaPartitionSlotsMap.forEach(
-        (storageGroup, unassignedPartitionSlots) -> {
-          // List<Pair<allocatedSlotsNum, TConsensusGroupId>>
-          List<Pair<Long, TConsensusGroupId>> regionSlotsCounter =
-              getPartitionManager()
-                  .getSortedRegionSlotsCounter(storageGroup, TConsensusGroupType.SchemaRegion);
+    for (Map.Entry<String, List<TSeriesPartitionSlot>> slotsMapEntry :
+        unassignedSchemaPartitionSlotsMap.entrySet()) {
+      final String storageGroup = slotsMapEntry.getKey();
+      final List<TSeriesPartitionSlot> unassignedPartitionSlots = slotsMapEntry.getValue();
 
-          // Enumerate SeriesPartitionSlot
-          Map<TSeriesPartitionSlot, TConsensusGroupId> schemaPartitionMap =
-              new ConcurrentHashMap<>();
-          for (TSeriesPartitionSlot seriesPartitionSlot : unassignedPartitionSlots) {
-            // Greedy allocation
-            schemaPartitionMap.put(seriesPartitionSlot, regionSlotsCounter.get(0).getRight());
-            // Bubble sort
-            bubbleSort(regionSlotsCounter.get(0).getRight(), regionSlotsCounter);
-          }
-          result.put(storageGroup, new SchemaPartitionTable(schemaPartitionMap));
-        });
+      // List<Pair<allocatedSlotsNum, TConsensusGroupId>>
+      List<Pair<Long, TConsensusGroupId>> regionSlotsCounter =
+          getPartitionManager()
+              .getSortedRegionGroupSlotsCounter(storageGroup, TConsensusGroupType.SchemaRegion);
+
+      // Enumerate SeriesPartitionSlot
+      Map<TSeriesPartitionSlot, TConsensusGroupId> schemaPartitionMap = new ConcurrentHashMap<>();
+      for (TSeriesPartitionSlot seriesPartitionSlot : unassignedPartitionSlots) {
+        // Greedy allocation
+        schemaPartitionMap.put(seriesPartitionSlot, regionSlotsCounter.get(0).getRight());
+        // Bubble sort
+        bubbleSort(regionSlotsCounter.get(0).getRight(), regionSlotsCounter);
+      }
+      result.put(storageGroup, new SchemaPartitionTable(schemaPartitionMap));
+    }
 
     return result;
   }
@@ -78,74 +82,79 @@
   @Override
   public Map<String, DataPartitionTable> allocateDataPartition(
       Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>
-          unassignedDataPartitionSlotsMap) {
+          unassignedDataPartitionSlotsMap)
+      throws NotAvailableRegionGroupException {
     Map<String, DataPartitionTable> result = new ConcurrentHashMap<>();
 
-    unassignedDataPartitionSlotsMap.forEach(
-        (storageGroup, unassignedPartitionSlotsMap) -> {
-          // List<Pair<allocatedSlotsNum, TConsensusGroupId>>
-          List<Pair<Long, TConsensusGroupId>> regionSlotsCounter =
-              getPartitionManager()
-                  .getSortedRegionSlotsCounter(storageGroup, TConsensusGroupType.DataRegion);
+    for (Map.Entry<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> slotsMapEntry :
+        unassignedDataPartitionSlotsMap.entrySet()) {
+      final String storageGroup = slotsMapEntry.getKey();
+      final Map<TSeriesPartitionSlot, List<TTimePartitionSlot>> unassignedPartitionSlotsMap =
+          slotsMapEntry.getValue();
 
-          DataPartitionTable dataPartitionTable = new DataPartitionTable();
+      // List<Pair<allocatedSlotsNum, TConsensusGroupId>>
+      List<Pair<Long, TConsensusGroupId>> regionSlotsCounter =
+          getPartitionManager()
+              .getSortedRegionGroupSlotsCounter(storageGroup, TConsensusGroupType.DataRegion);
 
-          // Enumerate SeriesPartitionSlot
-          for (Map.Entry<TSeriesPartitionSlot, List<TTimePartitionSlot>> seriesPartitionEntry :
-              unassignedPartitionSlotsMap.entrySet()) {
-            SeriesPartitionTable seriesPartitionTable = new SeriesPartitionTable();
+      DataPartitionTable dataPartitionTable = new DataPartitionTable();
 
-            // Enumerate TimePartitionSlot in ascending order
-            List<TTimePartitionSlot> timePartitionSlots = seriesPartitionEntry.getValue();
-            timePartitionSlots.sort(Comparator.comparingLong(TTimePartitionSlot::getStartTime));
-            for (TTimePartitionSlot timePartitionSlot : timePartitionSlots) {
+      // Enumerate SeriesPartitionSlot
+      for (Map.Entry<TSeriesPartitionSlot, List<TTimePartitionSlot>> seriesPartitionEntry :
+          unassignedPartitionSlotsMap.entrySet()) {
+        SeriesPartitionTable seriesPartitionTable = new SeriesPartitionTable();
 
-              /* Check if the current DataPartition has predecessor firstly, and inherit it if exists */
+        // Enumerate TimePartitionSlot in ascending order
+        List<TTimePartitionSlot> timePartitionSlots = seriesPartitionEntry.getValue();
+        timePartitionSlots.sort(Comparator.comparingLong(TTimePartitionSlot::getStartTime));
+        for (TTimePartitionSlot timePartitionSlot : timePartitionSlots) {
 
-              // Check if the current Partition's predecessor is allocated
-              // in the same batch of Partition creation
-              TConsensusGroupId predecessor =
-                  seriesPartitionTable.getPrecededDataPartition(
-                      timePartitionSlot, TIME_PARTITION_INTERVAL);
-              if (predecessor != null) {
-                seriesPartitionTable
-                    .getSeriesPartitionMap()
-                    .put(timePartitionSlot, Collections.singletonList(predecessor));
-                bubbleSort(predecessor, regionSlotsCounter);
-                continue;
-              }
+          /* Check if the current DataPartition has predecessor firstly, and inherit it if exists */
 
-              // Check if the current Partition's predecessor was allocated
-              // in the former Partition creation
-              predecessor =
-                  getPartitionManager()
-                      .getPrecededDataPartition(
-                          storageGroup,
-                          seriesPartitionEntry.getKey(),
-                          timePartitionSlot,
-                          TIME_PARTITION_INTERVAL);
-              if (predecessor != null) {
-                seriesPartitionTable
-                    .getSeriesPartitionMap()
-                    .put(timePartitionSlot, Collections.singletonList(predecessor));
-                bubbleSort(predecessor, regionSlotsCounter);
-                continue;
-              }
-
-              /* Greedy allocation */
-              seriesPartitionTable
-                  .getSeriesPartitionMap()
-                  .put(
-                      timePartitionSlot,
-                      Collections.singletonList(regionSlotsCounter.get(0).getRight()));
-              bubbleSort(regionSlotsCounter.get(0).getRight(), regionSlotsCounter);
-            }
-            dataPartitionTable
-                .getDataPartitionMap()
-                .put(seriesPartitionEntry.getKey(), seriesPartitionTable);
+          // Check if the current Partition's predecessor is allocated
+          // in the same batch of Partition creation
+          TConsensusGroupId predecessor =
+              seriesPartitionTable.getPrecededDataPartition(
+                  timePartitionSlot, TIME_PARTITION_INTERVAL);
+          if (predecessor != null) {
+            seriesPartitionTable
+                .getSeriesPartitionMap()
+                .put(timePartitionSlot, Collections.singletonList(predecessor));
+            bubbleSort(predecessor, regionSlotsCounter);
+            continue;
           }
-          result.put(storageGroup, dataPartitionTable);
-        });
+
+          // Check if the current Partition's predecessor was allocated
+          // in the former Partition creation
+          predecessor =
+              getPartitionManager()
+                  .getPrecededDataPartition(
+                      storageGroup,
+                      seriesPartitionEntry.getKey(),
+                      timePartitionSlot,
+                      TIME_PARTITION_INTERVAL);
+          if (predecessor != null) {
+            seriesPartitionTable
+                .getSeriesPartitionMap()
+                .put(timePartitionSlot, Collections.singletonList(predecessor));
+            bubbleSort(predecessor, regionSlotsCounter);
+            continue;
+          }
+
+          /* Greedy allocation */
+          seriesPartitionTable
+              .getSeriesPartitionMap()
+              .put(
+                  timePartitionSlot,
+                  Collections.singletonList(regionSlotsCounter.get(0).getRight()));
+          bubbleSort(regionSlotsCounter.get(0).getRight(), regionSlotsCounter);
+        }
+        dataPartitionTable
+            .getDataPartitionMap()
+            .put(seriesPartitionEntry.getKey(), seriesPartitionTable);
+      }
+      result.put(storageGroup, dataPartitionTable);
+    }
 
     return result;
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/IPartitionAllocator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/IPartitionAllocator.java
index 93d0a40..f957c0b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/IPartitionAllocator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/IPartitionAllocator.java
@@ -22,6 +22,7 @@
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.partition.DataPartitionTable;
 import org.apache.iotdb.commons.partition.SchemaPartitionTable;
+import org.apache.iotdb.confignode.exception.NotAvailableRegionGroupException;
 
 import java.util.List;
 import java.util.Map;
@@ -39,7 +40,8 @@
    * @return Map<StorageGroupName, SchemaPartitionTable>, the allocating result
    */
   Map<String, SchemaPartitionTable> allocateSchemaPartition(
-      Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap);
+      Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap)
+      throws NotAvailableRegionGroupException;
 
   /**
    * Allocate DataPartitions
@@ -49,5 +51,6 @@
    */
   Map<String, DataPartitionTable> allocateDataPartition(
       Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>
-          unassignedDataPartitionSlotsMap);
+          unassignedDataPartitionSlotsMap)
+      throws NotAvailableRegionGroupException;
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/CopySetRegionAllocator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/CopySetRegionAllocator.java
index 990a08c..2259717 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/CopySetRegionAllocator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/CopySetRegionAllocator.java
@@ -50,14 +50,14 @@
 
   @Override
   public TRegionReplicaSet allocateRegion(
-      List<TDataNodeConfiguration> onlineDataNodes,
+      List<TDataNodeConfiguration> targetDataNodes,
       List<TRegionReplicaSet> allocatedRegions,
       int replicationFactor,
       TConsensusGroupId consensusGroupId) {
     TRegionReplicaSet result = null;
 
     // Build weightList for weighted random
-    buildWeightList(onlineDataNodes, allocatedRegions);
+    buildWeightList(targetDataNodes, allocatedRegions);
 
     boolean accepted = false;
     while (true) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionAllocator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionAllocator.java
index bbdab4c..97ac37f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionAllocator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionAllocator.java
@@ -37,12 +37,12 @@
 
   @Override
   public TRegionReplicaSet allocateRegion(
-      List<TDataNodeConfiguration> onlineDataNodes,
+      List<TDataNodeConfiguration> targetDataNodes,
       List<TRegionReplicaSet> allocatedRegions,
       int replicationFactor,
       TConsensusGroupId consensusGroupId) {
     // Build weightList order by number of regions allocated asc
-    List<TDataNodeLocation> weightList = buildWeightList(onlineDataNodes, allocatedRegions);
+    List<TDataNodeLocation> weightList = buildWeightList(targetDataNodes, allocatedRegions);
     return new TRegionReplicaSet(
         consensusGroupId,
         weightList.stream().limit(replicationFactor).collect(Collectors.toList()));
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionAllocator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionAllocator.java
index aa397c9..315ef74 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionAllocator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionAllocator.java
@@ -34,14 +34,14 @@
    * Calculating the next optimal TRegionReplicaSet based on the current online DataNodes and
    * allocated Regions
    *
-   * @param onlineDataNodes DataNodes that currently communicable
+   * @param targetDataNodes DataNodes that can be used for allocation
    * @param allocatedRegions Allocated Regions
    * @param replicationFactor Replication factor of TRegionReplicaSet
    * @param consensusGroupId TConsensusGroupId of result TRegionReplicaSet
    * @return The optimal TRegionReplicaSet derived by the specific algorithm
    */
   TRegionReplicaSet allocateRegion(
-      List<TDataNodeConfiguration> onlineDataNodes,
+      List<TDataNodeConfiguration> targetDataNodes,
       List<TRegionReplicaSet> allocatedRegions,
       int replicationFactor,
       TConsensusGroupId consensusGroupId);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 23c16a4..2f2f658 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -517,7 +517,7 @@
 
   /** loop body of the heartbeat thread */
   private void heartbeatLoopBody() {
-    // the consensusManager of configManager may not be fully initialized at this time
+    // The consensusManager of configManager may not be fully initialized at this time
     Optional.ofNullable(getConsensusManager())
         .ifPresent(
             consensusManager -> {
@@ -638,13 +638,14 @@
    * @param status The specific NodeStatus
    * @return Filtered ConfigNodes with the specific NodeStatus
    */
-  public List<TConfigNodeLocation> filterConfigNodeThroughStatus(NodeStatus status) {
+  public List<TConfigNodeLocation> filterConfigNodeThroughStatus(NodeStatus... status) {
     return getRegisteredConfigNodes().stream()
         .filter(
             registeredConfigNode -> {
               int configNodeId = registeredConfigNode.getConfigNodeId();
               return nodeCacheMap.containsKey(configNodeId)
-                  && status.equals(nodeCacheMap.get(configNodeId).getNodeStatus());
+                  && Arrays.stream(status)
+                      .anyMatch(s -> s.equals(nodeCacheMap.get(configNodeId).getNodeStatus()));
             })
         .collect(Collectors.toList());
   }
@@ -659,10 +660,10 @@
     return getRegisteredDataNodes().stream()
         .filter(
             registeredDataNode -> {
-              int id = registeredDataNode.getLocation().getDataNodeId();
-              return nodeCacheMap.containsKey(id)
+              int dataNodeId = registeredDataNode.getLocation().getDataNodeId();
+              return nodeCacheMap.containsKey(dataNodeId)
                   && Arrays.stream(status)
-                      .anyMatch(s -> s.equals(nodeCacheMap.get(id).getNodeStatus()));
+                      .anyMatch(s -> s.equals(nodeCacheMap.get(dataNodeId).getNodeStatus()));
             })
         .collect(Collectors.toList());
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index 5a81ffd..0c04b34 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -26,6 +26,7 @@
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.cluster.RegionRoleType;
+import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.partition.DataPartitionTable;
@@ -51,6 +52,7 @@
 import org.apache.iotdb.confignode.consensus.response.RegionInfoListResp;
 import org.apache.iotdb.confignode.consensus.response.SchemaNodeManagementResp;
 import org.apache.iotdb.confignode.consensus.response.SchemaPartitionResp;
+import org.apache.iotdb.confignode.exception.NotAvailableRegionGroupException;
 import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
 import org.apache.iotdb.confignode.exception.StorageGroupNotExistsException;
 import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
@@ -76,6 +78,9 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -84,6 +89,7 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 /** The PartitionManager Manages cluster PartitionTable read and write requests. */
 public class PartitionManager {
@@ -201,8 +207,18 @@
       } else {
         // Allocate SchemaPartitions only if
         // the current ConfigNode still holds its leadership
-        Map<String, SchemaPartitionTable> assignedSchemaPartition =
-            getLoadManager().allocateSchemaPartition(unassignedSchemaPartitionSlotsMap);
+        Map<String, SchemaPartitionTable> assignedSchemaPartition;
+        try {
+          assignedSchemaPartition =
+              getLoadManager().allocateSchemaPartition(unassignedSchemaPartitionSlotsMap);
+        } catch (NotAvailableRegionGroupException e) {
+          LOGGER.error(e.getMessage());
+          resp.setStatus(
+              new TSStatus(TSStatusCode.NOT_AVAILABLE_REGION_GROUP.getStatusCode())
+                  .setMessage(e.getMessage()));
+          return resp;
+        }
+
         // Cache allocating result
         CreateSchemaPartitionPlan createPlan = new CreateSchemaPartitionPlan();
         createPlan.setAssignedSchemaPartition(assignedSchemaPartition);
@@ -268,8 +284,18 @@
       } else {
         // Allocate DataPartitions only if
         // the current ConfigNode still holds its leadership
-        Map<String, DataPartitionTable> assignedDataPartition =
-            getLoadManager().allocateDataPartition(unassignedDataPartitionSlotsMap);
+        Map<String, DataPartitionTable> assignedDataPartition;
+        try {
+          assignedDataPartition =
+              getLoadManager().allocateDataPartition(unassignedDataPartitionSlotsMap);
+        } catch (NotAvailableRegionGroupException e) {
+          LOGGER.error(e.getMessage());
+          resp.setStatus(
+              new TSStatus(TSStatusCode.NOT_AVAILABLE_REGION_GROUP.getStatusCode())
+                  .setMessage(e.getMessage()));
+          return resp;
+        }
+
         // Cache allocating result
         CreateDataPartitionPlan createPlan = new CreateDataPartitionPlan();
         createPlan.setAssignedDataPartition(assignedDataPartition);
@@ -302,18 +328,20 @@
       Map<String, Integer> allotmentMap = new ConcurrentHashMap<>();
 
       for (Map.Entry<String, Integer> entry : unassignedPartitionSlotsCountMap.entrySet()) {
-        float allocatedRegionCount =
-            partitionInfo.getRegionCount(entry.getKey(), consensusGroupType);
+        final String storageGroup = entry.getKey();
+        final int unassignedPartitionSlotsCount = entry.getValue();
+
+        float allocatedRegionCount = partitionInfo.getRegionCount(storageGroup, consensusGroupType);
         // The slotCount equals to the sum of assigned slot count and unassigned slot count
         float slotCount =
-            (float) partitionInfo.getAssignedSeriesPartitionSlotsCount(entry.getKey())
-                + entry.getValue();
+            (float) partitionInfo.getAssignedSeriesPartitionSlotsCount(storageGroup)
+                + unassignedPartitionSlotsCount;
         float maxRegionCount =
-            getClusterSchemaManager().getMaxRegionGroupCount(entry.getKey(), consensusGroupType);
+            getClusterSchemaManager().getMaxRegionGroupCount(storageGroup, consensusGroupType);
         float maxSlotCount =
             ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionSlotNum();
 
-        /* Region extension is required in the following two cases  */
+        /* Region extension is required in the following cases */
         // 1. There are no Region has been created for the current StorageGroup
         if (allocatedRegionCount == 0) {
           // The delta is equal to the smallest integer solution that satisfies the inequality:
@@ -322,7 +350,7 @@
               Math.min(
                   (int) maxRegionCount,
                   Math.max(1, (int) Math.ceil(slotCount * maxRegionCount / maxSlotCount)));
-          allotmentMap.put(entry.getKey(), delta);
+          allotmentMap.put(storageGroup, delta);
           continue;
         }
 
@@ -340,7 +368,15 @@
                       (int)
                           Math.ceil(
                               slotCount * maxRegionCount / maxSlotCount - allocatedRegionCount)));
-          allotmentMap.put(entry.getKey(), delta);
+          allotmentMap.put(storageGroup, delta);
+          continue;
+        }
+
+        // 3. All RegionGroups in the specified StorageGroup are disabled currently
+        if (allocatedRegionCount
+                == filterRegionGroupThroughStatus(storageGroup, RegionGroupStatus.Disabled).size()
+            && allocatedRegionCount < maxRegionCount) {
+          allotmentMap.put(storageGroup, 1);
         }
       }
 
@@ -400,7 +436,7 @@
   /**
    * Only leader use this interface
    *
-   * @return All Regions' RegionReplicaSet
+   * @return Deep copy of all Regions' RegionReplicaSet
    */
   public List<TRegionReplicaSet> getAllReplicaSets() {
     return partitionInfo.getAllReplicaSets();
@@ -436,10 +472,31 @@
    * @param storageGroup StorageGroupName
    * @param type SchemaRegion or DataRegion
    * @return The specific StorageGroup's Regions that sorted by the number of allocated slots
+   * @throws NotAvailableRegionGroupException When all RegionGroups within the specified
+   *     StorageGroup are unavailable currently
    */
-  public List<Pair<Long, TConsensusGroupId>> getSortedRegionSlotsCounter(
-      String storageGroup, TConsensusGroupType type) {
-    return partitionInfo.getSortedRegionSlotsCounter(storageGroup, type);
+  public List<Pair<Long, TConsensusGroupId>> getSortedRegionGroupSlotsCounter(
+      String storageGroup, TConsensusGroupType type) throws NotAvailableRegionGroupException {
+    // Collect static data
+    List<Pair<Long, TConsensusGroupId>> regionGroupSlotsCounter =
+        partitionInfo.getRegionGroupSlotsCounter(storageGroup, type);
+
+    // Filter RegionGroups that have Disabled status
+    List<Pair<Long, TConsensusGroupId>> result = new ArrayList<>();
+    for (Pair<Long, TConsensusGroupId> slotsCounter : regionGroupSlotsCounter) {
+      // Use Running or Available RegionGroups
+      RegionGroupStatus status = getRegionGroupStatus(slotsCounter.getRight());
+      if (RegionGroupStatus.Running.equals(status) || RegionGroupStatus.Available.equals(status)) {
+        result.add(slotsCounter);
+      }
+    }
+
+    if (result.isEmpty()) {
+      throw new NotAvailableRegionGroupException();
+    }
+
+    result.sort(Comparator.comparingLong(Pair::getLeft));
+    return result;
   }
 
   /**
@@ -498,9 +555,7 @@
         .forEach(
             regionInfo -> {
               regionInfo.setStatus(
-                  regionGroupCacheMap
-                      .get(regionInfo.getConsensusGroupId())
-                      .getRegionStatus(regionInfo.getDataNodeId())
+                  getRegionStatus(regionInfo.getConsensusGroupId(), regionInfo.getDataNodeId())
                       .getStatus());
 
               String regionType =
@@ -709,6 +764,62 @@
     return result;
   }
 
+  /**
+   * Filter the RegionGroups in the specified StorageGroup through the RegionGroupStatus
+   *
+   * @param storageGroup The specified StorageGroup
+   * @param status The specified RegionGroupStatus
+   * @return Filtered RegionGroups with the specific RegionGroupStatus
+   */
+  public List<TRegionReplicaSet> filterRegionGroupThroughStatus(
+      String storageGroup, RegionGroupStatus... status) {
+    return getAllReplicaSets(storageGroup).stream()
+        .filter(
+            regionReplicaSet -> {
+              TConsensusGroupId regionGroupId = regionReplicaSet.getRegionId();
+              return regionGroupCacheMap.containsKey(regionGroupId)
+                  && Arrays.stream(status)
+                      .anyMatch(
+                          s ->
+                              s.equals(
+                                  regionGroupCacheMap.get(regionGroupId).getRegionGroupStatus()));
+            })
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Safely get RegionStatus
+   *
+   * @param consensusGroupId Specified RegionGroupId
+   * @param dataNodeId Specified RegionReplicaId
+   * @return Corresponding RegionStatus if cache exists, Unknown otherwise
+   */
+  public RegionStatus getRegionStatus(TConsensusGroupId consensusGroupId, int dataNodeId) {
+    return regionGroupCacheMap.containsKey(consensusGroupId)
+        ? regionGroupCacheMap.get(consensusGroupId).getRegionStatus(dataNodeId)
+        : RegionStatus.Unknown;
+  }
+
+  /**
+   * Safely get RegionGroupStatus
+   *
+   * @param consensusGroupId Specified RegionGroupId
+   * @return Corresponding RegionGroupStatus if cache exists, Disabled otherwise
+   */
+  public RegionGroupStatus getRegionGroupStatus(TConsensusGroupId consensusGroupId) {
+    return regionGroupCacheMap.containsKey(consensusGroupId)
+        ? regionGroupCacheMap.get(consensusGroupId).getRegionGroupStatus()
+        : RegionGroupStatus.Disabled;
+  }
+
+  public void cacheHeartbeatSample(
+      TConsensusGroupId regionGroupId, RegionHeartbeatSample regionHeartbeatSample) {
+    regionGroupCacheMap
+        .computeIfAbsent(regionGroupId, empty -> new RegionGroupCache(regionGroupId))
+        .cacheHeartbeatSample(regionHeartbeatSample);
+    regionGroupCacheMap.get(regionGroupId).updateRegionStatistics();
+  }
+
   public ScheduledExecutorService getRegionMaintainer() {
     return regionMaintainer;
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionCache.java
index 8f99211..c537b33 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionCache.java
@@ -66,12 +66,13 @@
       if (lastSample.getSendTimestamp() > versionTimestamp) {
         versionTimestamp = lastSample.getSendTimestamp();
         isLeader = lastSample.isLeader();
+        status = lastSample.getStatus();
       }
+    }
 
-      status =
-          System.currentTimeMillis() - versionTimestamp > HEARTBEAT_TIMEOUT_TIME
-              ? RegionStatus.Unknown
-              : RegionStatus.Running;
+    // TODO: Optimize judge logic
+    if (System.currentTimeMillis() - versionTimestamp > HEARTBEAT_TIMEOUT_TIME) {
+      status = RegionStatus.Unknown;
     }
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCache.java
index 4afef5f..c8140fc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCache.java
@@ -85,12 +85,37 @@
     return leaderDataNodeId;
   }
 
+  /**
+   * Get the specified Region's status
+   *
+   * @param dataNodeId Where the Region resides
+   * @return Region's latest status if received heartbeat recently, Unknown otherwise
+   */
   public RegionStatus getRegionStatus(int dataNodeId) {
     return regionCacheMap.containsKey(dataNodeId)
         ? regionCacheMap.get(dataNodeId).getStatus()
         : RegionStatus.Unknown;
   }
 
+  public RegionGroupStatus getRegionGroupStatus() {
+    int unknownCount = 0;
+    for (RegionCache regionCache : regionCacheMap.values()) {
+      if (RegionStatus.ReadOnly.equals(regionCache.getStatus())
+          || RegionStatus.Removing.equals(regionCache.getStatus())) {
+        return RegionGroupStatus.Disabled;
+      }
+      unknownCount += RegionStatus.Unknown.equals(regionCache.getStatus()) ? 1 : 0;
+    }
+
+    if (unknownCount == 0) {
+      return RegionGroupStatus.Running;
+    } else {
+      return unknownCount <= ((regionCacheMap.size() - 1) / 2)
+          ? RegionGroupStatus.Available
+          : RegionGroupStatus.Disabled;
+    }
+  }
+
   public TConsensusGroupId getConsensusGroupId() {
     return consensusGroupId;
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupStatus.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupStatus.java
new file mode 100644
index 0000000..73b8f55
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupStatus.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.partition;
+
+public enum RegionGroupStatus {
+
+  /** All Regions in RegionGroup are in the Running status */
+  Running("Running"),
+
+  /**
+   * All Regions in RegionGroup are in the Running or Unknown status, and the number of Regions in
+   * the Unknown status is less than half
+   */
+  Available("Available"),
+
+  /**
+   * The following cases will lead to Disabled RegionGroup:
+   *
+   * <p>1. There is a Region in ReadOnly or Removing status
+   *
+   * <p>2. More than half of the Regions are in Unknown status
+   */
+  Disabled("Disabled");
+
+  private final String status;
+
+  RegionGroupStatus(String status) {
+    this.status = status;
+  }
+
+  public String getStatus() {
+    return status;
+  }
+
+  public static RegionGroupStatus parse(String status) {
+    for (RegionGroupStatus regionGroupStatus : RegionGroupStatus.values()) {
+      if (regionGroupStatus.status.equals(status)) {
+        return regionGroupStatus;
+      }
+    }
+    throw new RuntimeException(String.format("RegionGroupStatus %s doesn't exist.", status));
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionHeartbeatSample.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionHeartbeatSample.java
index cff2fa7..f8685f2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionHeartbeatSample.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionHeartbeatSample.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.confignode.manager.partition;
 
+import org.apache.iotdb.commons.cluster.RegionStatus;
+
 public class RegionHeartbeatSample {
 
   // Unit: ms
@@ -26,16 +28,22 @@
 
   private final int belongedDataNodeId;
   private final boolean isLeader;
+  private final RegionStatus status;
 
   // TODO: Add load sample
 
   public RegionHeartbeatSample(
-      long sendTimestamp, long receiveTimestamp, int belongedDataNodeId, boolean isLeader) {
+      long sendTimestamp,
+      long receiveTimestamp,
+      int belongedDataNodeId,
+      boolean isLeader,
+      RegionStatus status) {
     this.sendTimestamp = sendTimestamp;
     this.receiveTimestamp = receiveTimestamp;
 
     this.belongedDataNodeId = belongedDataNodeId;
     this.isLeader = isLeader;
+    this.status = status;
   }
 
   public long getSendTimestamp() {
@@ -53,4 +61,8 @@
   public boolean isLeader() {
     return isLeader;
   }
+
+  public RegionStatus getStatus() {
+    return status;
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index 0432dae..c94a903 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -286,6 +286,8 @@
                     // Remove empty Map
                     schemaPartition.remove(storageGroup);
                   }
+                } else {
+                  isAllPartitionsExist.set(false);
                 }
               });
     }
@@ -323,6 +325,8 @@
                   // Remove empty Map
                   dataPartition.remove(storageGroup);
                 }
+              } else {
+                isAllPartitionsExist.set(false);
               }
             });
 
@@ -550,7 +554,7 @@
   /**
    * Only leader use this interface.
    *
-   * @return All Regions' RegionReplicaSet
+   * @return Deep copy of all Regions' RegionReplicaSet
    */
   public List<TRegionReplicaSet> getAllReplicaSets() {
     List<TRegionReplicaSet> result = new ArrayList<>();
@@ -615,11 +619,12 @@
    *
    * @param storageGroup StorageGroupName
    * @param type SchemaRegion or DataRegion
-   * @return The specific StorageGroup's Regions that sorted by the number of allocated slots
+   * @return The StorageGroup's Running or Available Regions that sorted by the number of allocated
+   *     slots
    */
-  public List<Pair<Long, TConsensusGroupId>> getSortedRegionSlotsCounter(
+  public List<Pair<Long, TConsensusGroupId>> getRegionGroupSlotsCounter(
       String storageGroup, TConsensusGroupType type) {
-    return storageGroupPartitionTables.get(storageGroup).getSortedRegionGroupSlotsCounter(type);
+    return storageGroupPartitionTables.get(storageGroup).getRegionGroupSlotsCounter(type);
   }
 
   /**
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
index e45c7dd..5ba8345 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
@@ -41,7 +41,6 @@
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -102,7 +101,7 @@
     replicaSets.forEach(replicaSet -> regionGroupMap.remove(replicaSet.getRegionId()));
   }
 
-  /** @return All Regions' RegionReplicaSet within one StorageGroup */
+  /** @return Deep copy of all Regions' RegionReplicaSet within one StorageGroup */
   public List<TRegionReplicaSet> getAllReplicaSets() {
     List<TRegionReplicaSet> result = new ArrayList<>();
 
@@ -276,20 +275,22 @@
    * Only leader use this interface.
    *
    * @param type SchemaRegion or DataRegion
-   * @return RegionGroups' indexes that sorted by the number of allocated slots
+   * @return RegionGroups' slot count and index
    */
-  public List<Pair<Long, TConsensusGroupId>> getSortedRegionGroupSlotsCounter(
-      TConsensusGroupType type) {
+  public List<Pair<Long, TConsensusGroupId>> getRegionGroupSlotsCounter(TConsensusGroupType type) {
     List<Pair<Long, TConsensusGroupId>> result = new Vector<>();
 
     regionGroupMap.forEach(
         (consensusGroupId, regionGroup) -> {
-          if (consensusGroupId.getType().equals(type)) {
-            result.add(new Pair<>(regionGroup.getSeriesSlotCount(), consensusGroupId));
+          if (type.equals(consensusGroupId.getType())) {
+            long slotCount =
+                type.equals(TConsensusGroupType.SchemaRegion)
+                    ? regionGroup.getSeriesSlotCount()
+                    : regionGroup.getTimeSlotCount();
+            result.add(new Pair<>(slotCount, consensusGroupId));
           }
         });
 
-    result.sort(Comparator.comparingLong(Pair::getLeft));
     return result;
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index dfee4dd..82dffe0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -25,6 +25,7 @@
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
 import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
@@ -43,6 +44,7 @@
 import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
+import org.apache.iotdb.confignode.manager.partition.RegionHeartbeatSample;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
 import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
@@ -343,6 +345,19 @@
     getLoadManager().broadcastLatestRegionRouteMap();
   }
 
+  public void buildRegionGroupCache(
+      TConsensusGroupId regionGroupId, Map<Integer, RegionStatus> regionStatusMap) {
+    long currentTime = System.currentTimeMillis();
+    regionStatusMap.forEach(
+        (dataNodeId, regionStatus) ->
+            getPartitionManager()
+                .cacheHeartbeatSample(
+                    regionGroupId,
+                    new RegionHeartbeatSample(
+                        currentTime, currentTime, dataNodeId, false, regionStatus)));
+    getPartitionManager().getRegionGroupCacheMap().get(regionGroupId).updateRegionStatistics();
+  }
+
   public List<TRegionReplicaSet> getAllReplicaSets(String storageGroup) {
     return getPartitionManager().getAllReplicaSets(storageGroup);
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java
index 4329b5f..4e7bf3a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java
@@ -21,6 +21,7 @@
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
 import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
@@ -41,6 +42,7 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class CreateRegionGroupsProcedure
     extends StateMachineProcedure<ConfigNodeProcedureEnv, CreateRegionGroupsState> {
@@ -139,6 +141,43 @@
 
         env.persistAndBroadcastRegionGroup(persistPlan);
         env.getConfigManager().getConsensusManager().write(offerPlan);
+        setNextState(CreateRegionGroupsState.BUILD_REGION_GROUP_CACHE);
+        break;
+      case BUILD_REGION_GROUP_CACHE:
+        // Build RegionGroupCache immediately to make these successfully built RegionGroup available
+        createRegionGroupsPlan
+            .getRegionGroupMap()
+            .forEach(
+                (storageGroup, regionReplicaSets) ->
+                    regionReplicaSets.forEach(
+                        regionReplicaSet -> {
+                          Map<Integer, RegionStatus> statusMap = new ConcurrentHashMap<>();
+                          regionReplicaSet
+                              .getDataNodeLocations()
+                              .forEach(
+                                  dataNodeLocation ->
+                                      statusMap.put(
+                                          dataNodeLocation.getDataNodeId(), RegionStatus.Running));
+
+                          if (!failedRegionReplicaSets.containsKey(
+                              regionReplicaSet.getRegionId())) {
+                            env.buildRegionGroupCache(regionReplicaSet.getRegionId(), statusMap);
+                          } else {
+                            TRegionReplicaSet failedRegionReplicas =
+                                failedRegionReplicaSets.get(regionReplicaSet.getRegionId());
+                            if (failedRegionReplicas.getDataNodeLocationsSize()
+                                <= (regionReplicaSet.getDataNodeLocationsSize() - 1) / 2) {
+                              failedRegionReplicas
+                                  .getDataNodeLocations()
+                                  .forEach(
+                                      dataNodeLocation ->
+                                          statusMap.replace(
+                                              dataNodeLocation.getDataNodeId(),
+                                              RegionStatus.Unknown));
+                              env.buildRegionGroupCache(regionReplicaSet.getRegionId(), statusMap);
+                            }
+                          }
+                        }));
         setNextState(CreateRegionGroupsState.CREATE_REGION_GROUPS_FINISH);
         break;
       case CREATE_REGION_GROUPS_FINISH:
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionGroupsState.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionGroupsState.java
index 87a1a84..a332f0c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionGroupsState.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionGroupsState.java
@@ -27,5 +27,6 @@
   // RegionReplicas created successfully on the same RegionGroup
   // 3. Delete redundant RegionReplicas in contrast to case 2.
   SHUNT_REGION_REPLICAS,
+  BUILD_REGION_GROUP_CACHE,
   CREATE_REGION_GROUPS_FINISH
 }
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
index 82bb08b..ef040a1 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
@@ -24,6 +24,7 @@
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.confignode.manager.node.BaseNodeCache;
 import org.apache.iotdb.confignode.manager.node.DataNodeHeartbeatCache;
 import org.apache.iotdb.confignode.manager.node.NodeHeartbeatSample;
@@ -102,22 +103,22 @@
     /* Simulate ratis consensus protocol(only one leader) */
     regionGroupCacheMap
         .get(groupId1)
-        .cacheHeartbeatSample(new RegionHeartbeatSample(10, 10, 0, false));
+        .cacheHeartbeatSample(new RegionHeartbeatSample(10, 10, 0, false, RegionStatus.Running));
     regionGroupCacheMap
         .get(groupId1)
-        .cacheHeartbeatSample(new RegionHeartbeatSample(11, 11, 1, true));
+        .cacheHeartbeatSample(new RegionHeartbeatSample(11, 11, 1, true, RegionStatus.Running));
     regionGroupCacheMap
         .get(groupId1)
-        .cacheHeartbeatSample(new RegionHeartbeatSample(12, 12, 2, false));
+        .cacheHeartbeatSample(new RegionHeartbeatSample(12, 12, 2, false, RegionStatus.Running));
     regionGroupCacheMap
         .get(groupId2)
-        .cacheHeartbeatSample(new RegionHeartbeatSample(13, 13, 3, false));
+        .cacheHeartbeatSample(new RegionHeartbeatSample(13, 13, 3, false, RegionStatus.Running));
     regionGroupCacheMap
         .get(groupId2)
-        .cacheHeartbeatSample(new RegionHeartbeatSample(14, 14, 4, true));
+        .cacheHeartbeatSample(new RegionHeartbeatSample(14, 14, 4, true, RegionStatus.Running));
     regionGroupCacheMap
         .get(groupId2)
-        .cacheHeartbeatSample(new RegionHeartbeatSample(15, 15, 5, false));
+        .cacheHeartbeatSample(new RegionHeartbeatSample(15, 15, 5, false, RegionStatus.Running));
 
     // Get leaderMap
     Map<TConsensusGroupId, Integer> leaderMap = new HashMap<>();
@@ -148,22 +149,28 @@
     for (int i = 2; i <= 1000; i++) {
       regionGroupCacheMap
           .get(groupId1)
-          .cacheHeartbeatSample(new RegionHeartbeatSample(i * 10, i * 10, 0, true));
+          .cacheHeartbeatSample(
+              new RegionHeartbeatSample(i * 10, i * 10, 0, true, RegionStatus.Running));
       regionGroupCacheMap
           .get(groupId1)
-          .cacheHeartbeatSample(new RegionHeartbeatSample(i * 10 + 1, i * 10 + 1, 1, true));
+          .cacheHeartbeatSample(
+              new RegionHeartbeatSample(i * 10 + 1, i * 10 + 1, 1, true, RegionStatus.Running));
       regionGroupCacheMap
           .get(groupId1)
-          .cacheHeartbeatSample(new RegionHeartbeatSample(i * 10 + 2, i * 10 + 2, 2, true));
+          .cacheHeartbeatSample(
+              new RegionHeartbeatSample(i * 10 + 2, i * 10 + 2, 2, true, RegionStatus.Running));
       regionGroupCacheMap
           .get(groupId2)
-          .cacheHeartbeatSample(new RegionHeartbeatSample(i * 10 + 3, i * 10 + 3, 3, true));
+          .cacheHeartbeatSample(
+              new RegionHeartbeatSample(i * 10 + 3, i * 10 + 3, 3, true, RegionStatus.Running));
       regionGroupCacheMap
           .get(groupId2)
-          .cacheHeartbeatSample(new RegionHeartbeatSample(i * 10 + 4, i * 10 + 4, 4, true));
+          .cacheHeartbeatSample(
+              new RegionHeartbeatSample(i * 10 + 4, i * 10 + 4, 4, true, RegionStatus.Running));
       regionGroupCacheMap
           .get(groupId2)
-          .cacheHeartbeatSample(new RegionHeartbeatSample(i * 10 + 5, i * 10 + 5, 5, true));
+          .cacheHeartbeatSample(
+              new RegionHeartbeatSample(i * 10 + 5, i * 10 + 5, 5, true, RegionStatus.Running));
 
       // Get leaderMap
       leaderMap.clear();
@@ -191,16 +198,20 @@
     /* Simulate multiLeader consensus protocol with a DataNode fails down */
     regionGroupCacheMap
         .get(groupId1)
-        .cacheHeartbeatSample(new RegionHeartbeatSample(10030, 10030, 0, true));
+        .cacheHeartbeatSample(
+            new RegionHeartbeatSample(10030, 10030, 0, true, RegionStatus.Running));
     regionGroupCacheMap
         .get(groupId1)
-        .cacheHeartbeatSample(new RegionHeartbeatSample(10031, 10031, 1, true));
+        .cacheHeartbeatSample(
+            new RegionHeartbeatSample(10031, 10031, 1, true, RegionStatus.Running));
     regionGroupCacheMap
         .get(groupId2)
-        .cacheHeartbeatSample(new RegionHeartbeatSample(10033, 10033, 3, true));
+        .cacheHeartbeatSample(
+            new RegionHeartbeatSample(10033, 10033, 3, true, RegionStatus.Running));
     regionGroupCacheMap
         .get(groupId2)
-        .cacheHeartbeatSample(new RegionHeartbeatSample(10034, 10034, 4, true));
+        .cacheHeartbeatSample(
+            new RegionHeartbeatSample(10034, 10034, 4, true, RegionStatus.Running));
 
     // Get leaderMap
     leaderMap.clear();
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCacheTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCacheTest.java
new file mode 100644
index 0000000..6f30202
--- /dev/null
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCacheTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.commons.cluster.RegionStatus;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RegionGroupCacheTest {
+
+  @Test
+  public void getLeaderDataNodeIdTest() {
+    final int leaderId = 1;
+    long currentTime = System.currentTimeMillis();
+    RegionGroupCache regionGroupCache =
+        new RegionGroupCache(new TConsensusGroupId(TConsensusGroupType.DataRegion, 0));
+    for (int i = 0; i < 3; i++) {
+      regionGroupCache.cacheHeartbeatSample(
+          new RegionHeartbeatSample(
+              currentTime, currentTime, i, i == leaderId, RegionStatus.Running));
+    }
+    Assert.assertTrue(regionGroupCache.updateRegionStatistics());
+    Assert.assertEquals(leaderId, regionGroupCache.getLeaderDataNodeId());
+  }
+
+  @Test
+  public void getRegionStatusTest() {
+    long currentTime = System.currentTimeMillis();
+    RegionGroupCache regionGroupCache =
+        new RegionGroupCache(new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 1));
+    regionGroupCache.cacheHeartbeatSample(
+        new RegionHeartbeatSample(currentTime, currentTime, 0, false, RegionStatus.Running));
+    regionGroupCache.cacheHeartbeatSample(
+        new RegionHeartbeatSample(currentTime, currentTime, 1, false, RegionStatus.Unknown));
+    regionGroupCache.cacheHeartbeatSample(
+        new RegionHeartbeatSample(currentTime, currentTime, 2, false, RegionStatus.Removing));
+    regionGroupCache.cacheHeartbeatSample(
+        new RegionHeartbeatSample(currentTime, currentTime, 3, false, RegionStatus.ReadOnly));
+    regionGroupCache.updateRegionStatistics();
+
+    Assert.assertEquals(RegionStatus.Running, regionGroupCache.getRegionStatus(0));
+    Assert.assertEquals(RegionStatus.Unknown, regionGroupCache.getRegionStatus(1));
+    Assert.assertEquals(RegionStatus.Removing, regionGroupCache.getRegionStatus(2));
+    Assert.assertEquals(RegionStatus.ReadOnly, regionGroupCache.getRegionStatus(3));
+  }
+
+  @Test
+  public void getRegionGroupStatusTest() {
+    long currentTime = System.currentTimeMillis();
+    RegionGroupCache runningRegionGroup =
+        new RegionGroupCache(new TConsensusGroupId(TConsensusGroupType.DataRegion, 0));
+    runningRegionGroup.cacheHeartbeatSample(
+        new RegionHeartbeatSample(currentTime, currentTime, 0, false, RegionStatus.Running));
+    runningRegionGroup.cacheHeartbeatSample(
+        new RegionHeartbeatSample(currentTime, currentTime, 1, false, RegionStatus.Running));
+    runningRegionGroup.cacheHeartbeatSample(
+        new RegionHeartbeatSample(currentTime, currentTime, 2, false, RegionStatus.Running));
+    runningRegionGroup.updateRegionStatistics();
+    Assert.assertEquals(RegionGroupStatus.Running, runningRegionGroup.getRegionGroupStatus());
+
+    RegionGroupCache availableRegionGroup =
+        new RegionGroupCache(new TConsensusGroupId(TConsensusGroupType.DataRegion, 1));
+    availableRegionGroup.cacheHeartbeatSample(
+        new RegionHeartbeatSample(currentTime, currentTime, 0, false, RegionStatus.Running));
+    availableRegionGroup.cacheHeartbeatSample(
+        new RegionHeartbeatSample(currentTime, currentTime, 1, false, RegionStatus.Unknown));
+    availableRegionGroup.cacheHeartbeatSample(
+        new RegionHeartbeatSample(currentTime, currentTime, 2, false, RegionStatus.Running));
+    availableRegionGroup.updateRegionStatistics();
+    Assert.assertEquals(RegionGroupStatus.Available, availableRegionGroup.getRegionGroupStatus());
+
+    RegionGroupCache disabledRegionGroup0 =
+        new RegionGroupCache(new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 2));
+    disabledRegionGroup0.cacheHeartbeatSample(
+        new RegionHeartbeatSample(currentTime, currentTime, 0, false, RegionStatus.Running));
+    disabledRegionGroup0.cacheHeartbeatSample(
+        new RegionHeartbeatSample(currentTime, currentTime, 1, false, RegionStatus.ReadOnly));
+    disabledRegionGroup0.cacheHeartbeatSample(
+        new RegionHeartbeatSample(currentTime, currentTime, 2, false, RegionStatus.Running));
+    disabledRegionGroup0.updateRegionStatistics();
+    Assert.assertEquals(RegionGroupStatus.Disabled, disabledRegionGroup0.getRegionGroupStatus());
+
+    RegionGroupCache disabledRegionGroup1 =
+        new RegionGroupCache(new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 3));
+    disabledRegionGroup1.cacheHeartbeatSample(
+        new RegionHeartbeatSample(currentTime, currentTime, 0, false, RegionStatus.Running));
+    disabledRegionGroup1.cacheHeartbeatSample(
+        new RegionHeartbeatSample(currentTime, currentTime, 1, false, RegionStatus.Unknown));
+    disabledRegionGroup1.cacheHeartbeatSample(
+        new RegionHeartbeatSample(currentTime, currentTime, 2, false, RegionStatus.Unknown));
+    disabledRegionGroup1.updateRegionStatistics();
+    Assert.assertEquals(RegionGroupStatus.Disabled, disabledRegionGroup1.getRegionGroupStatus());
+
+    RegionGroupCache disabledRegionGroup2 =
+        new RegionGroupCache(new TConsensusGroupId(TConsensusGroupType.DataRegion, 4));
+    disabledRegionGroup2.cacheHeartbeatSample(
+        new RegionHeartbeatSample(currentTime, currentTime, 0, false, RegionStatus.Running));
+    disabledRegionGroup2.cacheHeartbeatSample(
+        new RegionHeartbeatSample(currentTime, currentTime, 1, false, RegionStatus.Running));
+    disabledRegionGroup2.cacheHeartbeatSample(
+        new RegionHeartbeatSample(currentTime, currentTime, 2, false, RegionStatus.Removing));
+    disabledRegionGroup2.updateRegionStatistics();
+    Assert.assertEquals(RegionGroupStatus.Disabled, disabledRegionGroup2.getRegionGroupStatus());
+  }
+}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
index 9f8a839..b68c004 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
@@ -388,4 +388,13 @@
     }
     throw new IOException("Failed to get config node connection");
   }
+
+  @Override
+  public void restartDataNode(int index) {
+    dataNodeWrapperList.get(index).start();
+  }
+
+  public void shutdownDataNode(int index) {
+    dataNodeWrapperList.get(index).stop();
+  }
 }
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
index c35c3c4..1c6f257 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
@@ -228,6 +228,20 @@
   }
 
   @Override
+  public BaseConfig setSchemaReplicationFactor(int schemaReplicationFactor) {
+    confignodeProperties.setProperty(
+        "schema_replication_factor", String.valueOf(schemaReplicationFactor));
+    return this;
+  }
+
+  @Override
+  public BaseConfig setDataReplicationFactor(int dataReplicationFactor) {
+    confignodeProperties.setProperty(
+        "data_replication_factor", String.valueOf(dataReplicationFactor));
+    return this;
+  }
+
+  @Override
   public BaseConfig setTimePartitionInterval(long timePartitionInterval) {
     confignodeProperties.setProperty(
         "time_partition_interval", String.valueOf(timePartitionInterval));
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
index e086ce4..a85905b 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
@@ -152,4 +152,14 @@
     session.open();
     return session;
   }
+
+  @Override
+  public void restartDataNode(int index) {
+    getDataNodeWrapperList().get(index).start();
+  }
+
+  @Override
+  public void shutdownDataNode(int index) {
+    getDataNodeWrapperList().get(index).stop();
+  }
 }
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
index 6ecaad0..c1273b3 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
@@ -231,6 +231,22 @@
     return "org.apache.iotdb.consensus.standalone.StandAloneConsensus";
   }
 
+  default BaseConfig setSchemaReplicationFactor(int schemaReplicationFactor) {
+    return this;
+  }
+
+  default int getSchemaReplicationFactor() {
+    return 1;
+  }
+
+  default BaseConfig setDataReplicationFactor(int dataReplicationFactor) {
+    return this;
+  }
+
+  default int getDataReplicationFactor() {
+    return 1;
+  }
+
   default BaseConfig setTimePartitionInterval(long timePartitionInterval) {
     return this;
   }
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
index 35ff793..79550d2 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
@@ -129,4 +129,8 @@
     session.open();
     return session;
   }
+
+  void restartDataNode(int index);
+
+  void shutdownDataNode(int index);
 }
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/confignode/IoTDBClusterPartitionTableTest.java b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBClusterPartitionIT.java
similarity index 61%
rename from integration-test/src/test/java/org/apache/iotdb/db/it/confignode/IoTDBClusterPartitionTableTest.java
rename to integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBClusterPartitionIT.java
index 39907d1..fff68f9 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/confignode/IoTDBClusterPartitionTableTest.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBClusterPartitionIT.java
@@ -16,21 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.it.confignode;
+package org.apache.iotdb.confignode;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
@@ -48,6 +53,8 @@
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -55,24 +62,31 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({ClusterIT.class})
-public class IoTDBClusterPartitionTableTest {
+public class IoTDBClusterPartitionIT {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBClusterPartitionIT.class);
 
   protected static String originalConfigNodeConsensusProtocolClass;
   protected static String originalSchemaRegionConsensusProtocolClass;
   protected static String originalDataRegionConsensusProtocolClass;
+  private static final String testConsensusProtocolClass =
+      "org.apache.iotdb.consensus.ratis.RatisConsensus";
+
+  protected static int originSchemaReplicationFactor;
+  protected static int originalDataReplicationFactor;
+  private static final int testReplicationFactor = 3;
 
   protected static long originalTimePartitionInterval;
-
   private static final long testTimePartitionInterval = 86400;
+
   private static final String sg = "root.sg";
   private static final int storageGroupNum = 5;
   private static final int seriesPartitionSlotsNum = 10000;
-  private static final int seriesPartitionBatchSize = 1000;
   private static final int timePartitionSlotsNum = 10;
-  private static final int timePartitionBatchSize = 10;
 
   @Before
   public void setUp() throws Exception {
@@ -82,14 +96,16 @@
         ConfigFactory.getConfig().getSchemaRegionConsensusProtocolClass();
     originalDataRegionConsensusProtocolClass =
         ConfigFactory.getConfig().getDataRegionConsensusProtocolClass();
-    originalTimePartitionInterval = ConfigFactory.getConfig().getTimePartitionInterval();
+    ConfigFactory.getConfig().setConfigNodeConsesusProtocolClass(testConsensusProtocolClass);
+    ConfigFactory.getConfig().setSchemaRegionConsensusProtocolClass(testConsensusProtocolClass);
+    ConfigFactory.getConfig().setDataRegionConsensusProtocolClass(testConsensusProtocolClass);
 
-    ConfigFactory.getConfig()
-        .setConfigNodeConsesusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus");
-    ConfigFactory.getConfig()
-        .setSchemaRegionConsensusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus");
-    ConfigFactory.getConfig()
-        .setDataRegionConsensusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus");
+    originSchemaReplicationFactor = ConfigFactory.getConfig().getSchemaReplicationFactor();
+    originalDataReplicationFactor = ConfigFactory.getConfig().getDataReplicationFactor();
+    ConfigFactory.getConfig().setSchemaReplicationFactor(testReplicationFactor);
+    ConfigFactory.getConfig().setDataReplicationFactor(testReplicationFactor);
+
+    originalTimePartitionInterval = ConfigFactory.getConfig().getTimePartitionInterval();
     ConfigFactory.getConfig().setTimePartitionInterval(testTimePartitionInterval);
 
     EnvFactory.getEnv().initBeforeClass();
@@ -243,14 +259,14 @@
     Assert.assertTrue(dataPartitionTable.containsKey(storageGroup));
     Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>
         seriesPartitionTable = dataPartitionTable.get(storageGroup);
-    Assert.assertEquals(seriesPartitionBatchSize, seriesPartitionTable.size());
+    Assert.assertEquals(seriesSlotEnd - seriesSlotStart, seriesPartitionTable.size());
 
     for (int i = seriesSlotStart; i < seriesSlotEnd; i++) {
       TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
       Assert.assertTrue(seriesPartitionTable.containsKey(seriesPartitionSlot));
       Map<TTimePartitionSlot, List<TConsensusGroupId>> timePartitionTable =
           seriesPartitionTable.get(seriesPartitionSlot);
-      Assert.assertEquals(timePartitionBatchSize, timePartitionTable.size());
+      Assert.assertEquals(timeSlotEnd - timeSlotStart, timePartitionTable.size());
 
       for (long j = timeSlotStart; j < timeSlotEnd; j++) {
         TTimePartitionSlot timePartitionSlot =
@@ -269,6 +285,9 @@
 
   @Test
   public void testGetAndCreateDataPartition() throws TException, IOException {
+    final int seriesPartitionBatchSize = 100;
+    final int timePartitionBatchSize = 10;
+
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
       TSStatus status;
@@ -349,4 +368,193 @@
               });
     }
   }
+
+  @Test
+  public void testPartitionDurable() throws IOException, TException, InterruptedException {
+    final int testDataNodeId = 0;
+    final int seriesPartitionBatchSize = 10;
+    final int timePartitionBatchSize = 10;
+
+    // Shutdown the first DataNode
+    EnvFactory.getEnv().shutdownDataNode(testDataNodeId);
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+      final String sg0 = sg + 0;
+      final String sg1 = sg + 1;
+
+      // Set StorageGroup, the result should be success
+      TSetStorageGroupReq setStorageGroupReq =
+          new TSetStorageGroupReq(new TStorageGroupSchema(sg0));
+      TSStatus status = client.setStorageGroup(setStorageGroupReq);
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+      setStorageGroupReq = new TSetStorageGroupReq(new TStorageGroupSchema(sg1));
+      status = client.setStorageGroup(setStorageGroupReq);
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+
+      // Test getOrCreateDataPartition, ConfigNode should create DataPartition and return
+      Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap =
+          constructPartitionSlotsMap(sg0, 0, seriesPartitionBatchSize, 0, timePartitionBatchSize);
+      TDataPartitionReq dataPartitionReq = new TDataPartitionReq(partitionSlotsMap);
+      TDataPartitionTableResp dataPartitionTableResp = null;
+      for (int retry = 0; retry < 5; retry++) {
+        // Build new Client since it's unstable in Win8 environment
+        try (SyncConfigNodeIServiceClient configNodeClient =
+            (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+          dataPartitionTableResp = configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
+          if (dataPartitionTableResp != null) {
+            break;
+          }
+        } catch (Exception e) {
+          // Retry sometimes in order to avoid request timeout
+          LOGGER.error(e.getMessage());
+          TimeUnit.SECONDS.sleep(1);
+        }
+      }
+      Assert.assertNotNull(dataPartitionTableResp);
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+          dataPartitionTableResp.getStatus().getCode());
+      Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
+      checkDataPartitionMap(
+          sg0,
+          0,
+          seriesPartitionBatchSize,
+          0,
+          timePartitionBatchSize,
+          dataPartitionTableResp.getDataPartitionTable());
+
+      // Check Region count
+      int runningCnt = 0;
+      int unknownCnt = 0;
+      TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), showRegionResp.getStatus().getCode());
+      for (TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
+        if (RegionStatus.Running.getStatus().equals(regionInfo.getStatus())) {
+          runningCnt += 1;
+        } else if (RegionStatus.Unknown.getStatus().equals(regionInfo.getStatus())) {
+          unknownCnt += 1;
+        }
+      }
+      // The runningCnt should be exactly twice as the unknownCnt
+      // since there exists one DataNode is shutdown
+      Assert.assertEquals(unknownCnt * 2, runningCnt);
+
+      // Wait for shutdown check
+      TShowClusterResp showClusterResp;
+      while (true) {
+        boolean containUnknown = false;
+        showClusterResp = client.showCluster();
+        for (TDataNodeLocation dataNodeLocation : showClusterResp.getDataNodeList()) {
+          if (NodeStatus.Unknown.getStatus()
+              .equals(showClusterResp.getNodeStatus().get(dataNodeLocation.getDataNodeId()))) {
+            containUnknown = true;
+            break;
+          }
+        }
+        if (containUnknown) {
+          break;
+        }
+      }
+      runningCnt = 0;
+      unknownCnt = 0;
+      showClusterResp = client.showCluster();
+      for (TDataNodeLocation dataNodeLocation : showClusterResp.getDataNodeList()) {
+        if (NodeStatus.Running.getStatus()
+            .equals(showClusterResp.getNodeStatus().get(dataNodeLocation.getDataNodeId()))) {
+          runningCnt += 1;
+        } else if (NodeStatus.Unknown.getStatus()
+            .equals(showClusterResp.getNodeStatus().get(dataNodeLocation.getDataNodeId()))) {
+          unknownCnt += 1;
+        }
+      }
+      Assert.assertEquals(2, runningCnt);
+      Assert.assertEquals(1, unknownCnt);
+
+      // Test getOrCreateDataPartition, ConfigNode should create DataPartition and return
+      partitionSlotsMap =
+          constructPartitionSlotsMap(sg1, 0, seriesPartitionBatchSize, 0, timePartitionBatchSize);
+      dataPartitionReq = new TDataPartitionReq(partitionSlotsMap);
+      for (int retry = 0; retry < 5; retry++) {
+        // Build new Client since it's unstable in Win8 environment
+        try (SyncConfigNodeIServiceClient configNodeClient =
+            (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+          dataPartitionTableResp = configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
+          if (dataPartitionTableResp != null) {
+            break;
+          }
+        } catch (Exception e) {
+          // Retry sometimes in order to avoid request timeout
+          LOGGER.error(e.getMessage());
+          TimeUnit.SECONDS.sleep(1);
+        }
+      }
+      Assert.assertNotNull(dataPartitionTableResp);
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+          dataPartitionTableResp.getStatus().getCode());
+      Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
+      checkDataPartitionMap(
+          sg1,
+          0,
+          seriesPartitionBatchSize,
+          0,
+          timePartitionBatchSize,
+          dataPartitionTableResp.getDataPartitionTable());
+
+      // Check Region count and status
+      runningCnt = 0;
+      unknownCnt = 0;
+      showRegionResp = client.showRegion(new TShowRegionReq());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), showRegionResp.getStatus().getCode());
+      for (TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
+        if (RegionStatus.Running.getStatus().equals(regionInfo.getStatus())) {
+          runningCnt += 1;
+        } else if (RegionStatus.Unknown.getStatus().equals(regionInfo.getStatus())) {
+          unknownCnt += 1;
+        }
+      }
+      // The runningCnt should be exactly twice as the unknownCnt
+      // since there exists one DataNode is shutdown
+      Assert.assertEquals(unknownCnt * 2, runningCnt);
+
+      EnvFactory.getEnv().restartDataNode(testDataNodeId);
+      // Wait for heartbeat check
+      while (true) {
+        boolean containUnknown = false;
+        showClusterResp = client.showCluster();
+        for (TDataNodeLocation dataNodeLocation : showClusterResp.getDataNodeList()) {
+          if (NodeStatus.Unknown.getStatus()
+              .equals(showClusterResp.getNodeStatus().get(dataNodeLocation.getDataNodeId()))) {
+            containUnknown = true;
+            break;
+          }
+        }
+        if (!containUnknown) {
+          break;
+        }
+      }
+
+      // All Regions should alive after the testDataNode is restarted
+      boolean allRunning = true;
+      for (int retry = 0; retry < 30; retry++) {
+        allRunning = true;
+        showRegionResp = client.showRegion(new TShowRegionReq());
+        for (TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
+          if (!RegionStatus.Running.getStatus().equals(regionInfo.getStatus())) {
+            allRunning = false;
+            break;
+          }
+        }
+        if (allRunning) {
+          break;
+        }
+
+        TimeUnit.SECONDS.sleep(1);
+      }
+      Assert.assertTrue(allRunning);
+    }
+  }
 }
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/confignode/IoTDBConfigNodeIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeIT.java
similarity index 99%
rename from integration-test/src/test/java/org/apache/iotdb/db/it/confignode/IoTDBConfigNodeIT.java
rename to integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeIT.java
index ebdc7f8..aba7da3 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/confignode/IoTDBConfigNodeIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeIT.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.it.confignode;
+package org.apache.iotdb.confignode;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
index 70f4cb2..4c45167 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
@@ -172,4 +172,14 @@
     session.open();
     return session;
   }
+
+  @Override
+  public void restartDataNode(int index) {
+    // Do nothing
+  }
+
+  @Override
+  public void shutdownDataNode(int index) {
+    // Do nothing
+  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/cluster/RegionStatus.java b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/RegionStatus.java
index 6d60ec6..bc81946 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/cluster/RegionStatus.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/RegionStatus.java
@@ -25,7 +25,13 @@
   Running("Running"),
 
   /** Region connection failure */
-  Unknown("Unknown");
+  Unknown("Unknown"),
+
+  /** Region is in removing */
+  Removing("Removing"),
+
+  /** Only query statements are permitted */
+  ReadOnly("ReadOnly");
 
   private final String status;
 
@@ -36,4 +42,13 @@
   public String getStatus() {
     return status;
   }
+
+  public static RegionStatus parse(String status) {
+    for (RegionStatus regionStatus : RegionStatus.values()) {
+      if (regionStatus.status.equals(status)) {
+        return regionStatus;
+      }
+    }
+    throw new RuntimeException(String.format("RegionStatus %s doesn't exist.", status));
+  }
 }
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 5e1afe6..1bca6b5 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -164,7 +164,8 @@
   DATANODE_STOP_ERROR(917),
   REGION_LEADER_CHANGE_FAILED(918),
   REMOVE_DATANODE_FAILED(919),
-  OVERLAP_WITH_EXISTING_DELETE_TIMESERIES_TASK(920);
+  OVERLAP_WITH_EXISTING_DELETE_TIMESERIES_TASK(920),
+  NOT_AVAILABLE_REGION_GROUP(921);
 
   private int statusCode;