[IOTDB-6111] Fix Region creation bugs (#10844) (#10858)

diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionCreationIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionCreationIT.java
index dba7b81..df83e00 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionCreationIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionCreationIT.java
@@ -114,7 +114,10 @@
   public void testPartitionAllocation() throws Exception {
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
-      // Status: Running, Running, Running, Region: [0], [0], [0]
+      // Current cluster: 1C3D
+      // Create 1 DataPartition to extend 1 DataRegionGroup
+      // DataNode status: Running, Running, Running
+      // Region distribution: [0], [0], [0]
       Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap =
           ConfigNodeTestUtils.constructPartitionSlotsMap(
               sg,
@@ -153,7 +156,10 @@
           testTimePartitionInterval,
           dataPartitionTableResp.getDataPartitionTable());
 
-      // Status: Running, Running, Removing, Region: [0], [0], [0]
+      // Current cluster: 1C3D
+      // Set 1 DataNode to Removing status
+      // DataNode status: Running, Running, Removing
+      // Region distribution: [0], [0], [0]
       TSetDataNodeStatusReq setDataNodeStatusReq = new TSetDataNodeStatusReq();
       DataNodeWrapper dataNodeWrapper = EnvFactory.getEnv().getDataNodeWrapper(2);
       setDataNodeStatusReq.setTargetDataNode(
@@ -183,7 +189,11 @@
         TimeUnit.SECONDS.sleep(1);
       }
 
-      // Status: Running, Running, Removing, Running, RegionGroup: [0, 1], [0, 1], [0], [1]
+      // Register 1 DataNode and Create 1 DataPartition to extend 1 DataRegionGroup
+      // The new DataRegions wouldn't be allocated to the Removing DataNode
+      // Current cluster: 1C4D
+      // DataNode status: Running, Running, Removing, Running
+      // Region distribution: [0, 1], [0, 1], [0], [1]
       EnvFactory.getEnv().registerNewDataNode(true);
       partitionSlotsMap =
           ConfigNodeTestUtils.constructPartitionSlotsMap(
@@ -222,8 +232,10 @@
           testTimePartitionInterval,
           dataPartitionTableResp.getDataPartitionTable());
 
-      // Status: Running, Running, Removing, ReadOnly, RegionGroup: [0, 1], [0, 1],
-      // [0], [1]
+      // Current cluster: 1C4D
+      // Set 1 DataNode to ReadOnly status
+      // DataNode status: Running, Running, Removing, ReadOnly
+      // Region distribution: [0, 1], [0, 1], [0], [1]
       setDataNodeStatusReq = new TSetDataNodeStatusReq();
       dataNodeWrapper = EnvFactory.getEnv().getDataNodeWrapper(3);
       setDataNodeStatusReq.setTargetDataNode(
@@ -253,8 +265,11 @@
         TimeUnit.SECONDS.sleep(1);
       }
 
-      // Status: Running, Running, Removing, ReadOnly, Running, RegionGroup: [0, 1, 2], [0, 1, 2],
-      // [0], [1], [2]
+      // Register 1 DataNode and Create 1 DataPartition to extend 1 DataRegionGroup
+      // The new DataRegions wouldn't be allocated to the Removing and ReadOnly DataNode
+      // Current cluster: 1C5D
+      // DataNode status: Running, Running, Removing, ReadOnly, Running
+      // Region distribution: [0, 1, 2], [0, 1, 2], [0], [1], [2]
       EnvFactory.getEnv().registerNewDataNode(true);
       partitionSlotsMap =
           ConfigNodeTestUtils.constructPartitionSlotsMap(
@@ -293,8 +308,10 @@
           testTimePartitionInterval,
           dataPartitionTableResp.getDataPartitionTable());
 
-      // Status: Running, Running, Removing, ReadOnly, Unknown, RegionGroup:[0, 1, 2], [0, 1, 2],
-      // [0], [1], [2]
+      // Shutdown 1 DataNode
+      // Current cluster: 1C5D
+      // DataNode status: Running, Running, Removing, ReadOnly, Unknown
+      // Region distribution: [0, 1, 2], [0, 1, 2], [0], [1], [2]
       EnvFactory.getEnv().shutdownDataNode(4);
       // Wait for shutdown check
       while (true) {
@@ -315,10 +332,13 @@
         TimeUnit.SECONDS.sleep(1);
       }
 
-      // Status: Running, Running, Removing, ReadOnly, Unknown, Running,
-      // RegionGroup: [0, 1, 2, 3], [0, 1, 2, 3], [0], [1], [2], [3]
+      // Register 1 DataNode and Create 1 DataPartition to extend 1 DataRegionGroup
+      // The new DataRegions wouldn't be allocated to the Removing and ReadOnly DataNode
+      // But the new DataRegion can be allocated to the Unknown DataNode
+      // Current cluster: 1C6D
+      // Status: Running, Running, Removing, ReadOnly, Unknown, Running
+      // RegionGroup: [0, 1, 2, 3], [0, 1, 2], [0], [1], [2, 3], [3]
       EnvFactory.getEnv().registerNewDataNode(false);
-
       // Use thread sleep to replace verifying because the Unknown DataNode can not pass the
       // connection check
       TimeUnit.SECONDS.sleep(25);
@@ -378,47 +398,60 @@
           readOnlyCnt += 1;
         }
       }
-      Assert.assertEquals(9, runningCnt);
+      Assert.assertEquals(8, runningCnt);
       Assert.assertEquals(1, removingCnt);
       Assert.assertEquals(1, readOnlyCnt);
-      Assert.assertEquals(1, unknownCnt);
+      Assert.assertEquals(2, unknownCnt);
 
-      partitionSlotsMap =
-          ConfigNodeTestUtils.constructPartitionSlotsMap(
-              sg,
-              4,
-              4 + testSeriesPartitionBatchSize,
-              4,
-              4 + testTimePartitionBatchSize,
-              testTimePartitionInterval);
-      dataPartitionReq = new TDataPartitionReq(partitionSlotsMap);
-      for (int retry = 0; retry < 5; retry++) {
-        // Build new Client since it's unstable in Win8 environment
-        try (SyncConfigNodeIServiceClient configNodeClient =
-            (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
-          dataPartitionTableResp = configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
-          if (dataPartitionTableResp != null) {
-            break;
-          }
-        } catch (Exception e) {
-          // Retry sometimes in order to avoid request timeout
-          LOGGER.error(e.getMessage());
-          TimeUnit.SECONDS.sleep(1);
+      // Restart 1 DataNode
+      // Current cluster: 1C6D
+      // Status: Running, Running, Removing, ReadOnly, Running, Running
+      // RegionGroup: [0, 1, 2, 3], [0, 1, 2], [0], [1], [2, 3], [3]
+      EnvFactory.getEnv().startDataNode(4);
+      // Wait for restart check
+      while (true) {
+        AtomicBoolean containUnknown = new AtomicBoolean(false);
+        TShowDataNodesResp showDataNodesResp = client.showDataNodes();
+        showDataNodesResp
+            .getDataNodesInfoList()
+            .forEach(
+                dataNodeInfo -> {
+                  if (NodeStatus.Unknown.getStatus().equals(dataNodeInfo.getStatus())) {
+                    containUnknown.set(true);
+                  }
+                });
+
+        if (!containUnknown.get()) {
+          break;
         }
+        TimeUnit.SECONDS.sleep(1);
       }
-      Assert.assertNotNull(dataPartitionTableResp);
-      Assert.assertEquals(
-          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
-          dataPartitionTableResp.getStatus().getCode());
-      Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
-      ConfigNodeTestUtils.checkDataPartitionTable(
-          sg,
-          4,
-          4 + testSeriesPartitionBatchSize,
-          4,
-          4 + testTimePartitionBatchSize,
-          testTimePartitionInterval,
-          dataPartitionTableResp.getDataPartitionTable());
+      // Check Region count and status
+      for (int i = 0; i < 10; i++) {
+        runningCnt = 0;
+        unknownCnt = 0;
+        readOnlyCnt = 0;
+        removingCnt = 0;
+        showRegionResp = client.showRegion(new TShowRegionReq());
+        Assert.assertEquals(
+            TSStatusCode.SUCCESS_STATUS.getStatusCode(), showRegionResp.getStatus().getCode());
+        for (TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
+          if (RegionStatus.Running.getStatus().equals(regionInfo.getStatus())) {
+            runningCnt += 1;
+          } else if (RegionStatus.Unknown.getStatus().equals(regionInfo.getStatus())) {
+            unknownCnt += 1;
+          } else if (RegionStatus.Removing.getStatus().equals(regionInfo.getStatus())) {
+            removingCnt += 1;
+          } else if (RegionStatus.ReadOnly.getStatus().equals(regionInfo.getStatus())) {
+            readOnlyCnt += 1;
+          }
+        }
+        if (runningCnt == 10 && unknownCnt == 0 && readOnlyCnt == 1 && removingCnt == 1) {
+          return;
+        }
+        TimeUnit.SECONDS.sleep(1);
+      }
+      Assert.fail("Region status is not correct after 10s of recovery");
     }
   }
 }
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
index 31cb976..c9965fe 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
@@ -74,18 +74,15 @@
       Map<String, Integer> allotmentMap, TConsensusGroupType consensusGroupType)
       throws NotEnoughDataNodeException, DatabaseNotExistsException {
 
-    // The new RegionGroups will occupy online DataNodes firstly
-    List<TDataNodeConfiguration> onlineDataNodes =
-        getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running);
     // Some new RegionGroups will have to occupy unknown DataNodes
     // if the number of online DataNodes is insufficient
     List<TDataNodeConfiguration> availableDataNodes =
         getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.Unknown);
 
     // Make sure the number of available DataNodes is enough for allocating new RegionGroups
-    for (String storageGroup : allotmentMap.keySet()) {
+    for (String database : allotmentMap.keySet()) {
       int replicationFactor =
-          getClusterSchemaManager().getReplicationFactor(storageGroup, consensusGroupType);
+          getClusterSchemaManager().getReplicationFactor(database, consensusGroupType);
       if (availableDataNodes.size() < replicationFactor) {
         throw new NotEnoughDataNodeException();
       }
@@ -97,18 +94,16 @@
         getPartitionManager().getAllReplicaSets(consensusGroupType);
 
     for (Map.Entry<String, Integer> entry : allotmentMap.entrySet()) {
-      String storageGroup = entry.getKey();
+      String database = entry.getKey();
       int allotment = entry.getValue();
       int replicationFactor =
-          getClusterSchemaManager().getReplicationFactor(storageGroup, consensusGroupType);
-      List<TDataNodeConfiguration> targetDataNodes =
-          onlineDataNodes.size() >= replicationFactor ? onlineDataNodes : availableDataNodes;
+          getClusterSchemaManager().getReplicationFactor(database, consensusGroupType);
 
       for (int i = 0; i < allotment; i++) {
         // Prepare input data
         Map<Integer, TDataNodeConfiguration> availableDataNodeMap = new ConcurrentHashMap<>();
         Map<Integer, Double> freeDiskSpaceMap = new ConcurrentHashMap<>();
-        targetDataNodes.forEach(
+        availableDataNodes.forEach(
             dataNodeConfiguration -> {
               int dataNodeId = dataNodeConfiguration.getLocation().getDataNodeId();
               availableDataNodeMap.put(dataNodeId, dataNodeConfiguration);
@@ -124,7 +119,7 @@
                 replicationFactor,
                 new TConsensusGroupId(
                     consensusGroupType, getPartitionManager().generateNextRegionGroupId()));
-        createRegionGroupsPlan.addRegionGroup(storageGroup, newRegionGroup);
+        createRegionGroupsPlan.addRegionGroup(database, newRegionGroup);
 
         // Mark the new RegionGroup as allocated
         allocatedRegionGroups.add(newRegionGroup);
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
index a395c10..0bce99e 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
@@ -59,7 +59,10 @@
 
     // TODO: Optimize judge logic
     RegionStatus status;
-    if (System.currentTimeMillis() - lastSample.getSendTimestamp() > HEARTBEAT_TIMEOUT_TIME) {
+    if (RegionStatus.Removing.equals(lastSample.getStatus())) {
+      status = RegionStatus.Removing;
+    } else if (System.currentTimeMillis() - lastSample.getSendTimestamp()
+        > HEARTBEAT_TIMEOUT_TIME) {
       status = RegionStatus.Unknown;
     } else {
       status = lastSample.getStatus();
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
index b2a6cb4..55c4e3b 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
@@ -32,6 +32,7 @@
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
+import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.exception.DataRegionException;
@@ -121,6 +122,11 @@
           SchemaRegionConsensusImpl.getInstance().createPeer(schemaRegionId, peers);
       if (consensusGenericResponse.isSuccess()) {
         tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+      } else if (consensusGenericResponse.getException()
+          instanceof ConsensusGroupAlreadyExistException) {
+        tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+        tsStatus.setMessage(
+            String.format("SchemaRegion %d already exists.", schemaRegionId.getId()));
       } else {
         tsStatus = new TSStatus(TSStatusCode.CREATE_REGION_ERROR.getStatusCode());
         tsStatus.setMessage(consensusGenericResponse.getException().getMessage());
@@ -157,6 +163,10 @@
           DataRegionConsensusImpl.getInstance().createPeer(dataRegionId, peers);
       if (consensusGenericResponse.isSuccess()) {
         tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+      } else if (consensusGenericResponse.getException()
+          instanceof ConsensusGroupAlreadyExistException) {
+        tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+        tsStatus.setMessage(String.format("DataRegion %d already exists.", dataRegionId.getId()));
       } else {
         tsStatus = new TSStatus(TSStatusCode.CREATE_REGION_ERROR.getStatusCode());
         tsStatus.setMessage(consensusGenericResponse.getException().getMessage());