Fixed multiple config bugs
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index effb1c4..48deaef 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -92,8 +92,8 @@
   private static final long LOG_FILE_MAX_SIZE =
       CONF.getConfigNodeSimpleConsensusLogSegmentSizeMax();
   private final TEndPoint currentNodeTEndPoint;
-  private static Pattern LOG_INPROGRESS_PATTERN = Pattern.compile("\\d+");
-  private static Pattern LOG_PATTERN = Pattern.compile("(?<=_)(\\d+)$");
+  private static final Pattern LOG_INPROGRESS_PATTERN = Pattern.compile("log_inprogress_(\\d+)$");
+  private static final Pattern LOG_PATTERN = Pattern.compile("log_(\\d+)_(\\d+)$");
 
   public ConfigRegionStateMachine(ConfigManager configManager, ConfigPlanExecutor executor) {
     this.executor = executor;
@@ -121,6 +121,13 @@
 
   /** Transmit {@link ConfigPhysicalPlan} to {@link ConfigPlanExecutor} */
   protected TSStatus write(ConfigPhysicalPlan plan) {
+    if (ConsensusFactory.SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass())) {
+      final TSStatus persistStatus = persistPlanForSimpleConsensus(plan);
+      if (persistStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return persistStatus;
+      }
+    }
+
     TSStatus result;
     try {
       result = executor.executeNonQueryPlan(plan);
@@ -129,10 +136,6 @@
       result = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
     }
 
-    if (ConsensusFactory.SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass())) {
-      writeLogForSimpleConsensus(plan);
-    }
-
     if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       PipeConfigNodeAgent.runtime().listener().tryListenToPlan(plan, false);
     }
@@ -197,7 +200,6 @@
         PipeConfigNodeAgent.runtime()
             .listener()
             .tryListenToSnapshots(ConfigNodeSnapshotParser.getSnapshots());
-        return true;
       } catch (IOException e) {
         if (PipeConfigNodeAgent.runtime().listener().isOpened()) {
           LOGGER.warn(
@@ -205,14 +207,18 @@
               e);
         }
       }
+      return true;
     }
     return false;
   }
 
   @Override
   public void loadSnapshot(final File latestSnapshotRootDir) {
+    if (!executor.loadSnapshot(latestSnapshotRootDir)) {
+      return;
+    }
+
     try {
-      executor.loadSnapshot(latestSnapshotRootDir);
       // We recompute the snapshot for pipe listener when loading snapshot
       // to recover the newest snapshot in cache
       PipeConfigNodeAgent.runtime()
@@ -342,6 +348,9 @@
 
   @Override
   public void stop() {
+    if (ConsensusFactory.SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass())) {
+      closeSimpleLogWriter();
+    }
     // Shutdown leader related service for config pipe
     PipeConfigNodeAgent.runtime().notifyLeaderUnavailable();
   }
@@ -351,56 +360,48 @@
     return CommonDescriptor.getInstance().getConfig().isReadOnly();
   }
 
-  private void writeLogForSimpleConsensus(ConfigPhysicalPlan plan) {
-    if (simpleLogFile.length() > LOG_FILE_MAX_SIZE) {
-      try {
-        simpleLogWriter.force();
-        File completedFilePath = new File(FILE_PATH + startIndex + "_" + endIndex);
-        Files.move(
-            simpleLogFile.toPath(), completedFilePath.toPath(), StandardCopyOption.ATOMIC_MOVE);
-      } catch (IOException e) {
-        LOGGER.error("Can't force logWriter for ConfigNode SimpleConsensus mode", e);
-      }
-      for (int retry = 0; retry < 5; retry++) {
-        try {
-          simpleLogWriter.close();
-        } catch (IOException e) {
-          LOGGER.warn(
-              "Can't close StandAloneLog for ConfigNode SimpleConsensus mode, "
-                  + "filePath: {}, retry: {}",
-              simpleLogFile.getAbsolutePath(),
-              retry);
-          try {
-            // Sleep 1s and retry
-            TimeUnit.SECONDS.sleep(1);
-          } catch (InterruptedException e2) {
-            Thread.currentThread().interrupt();
-            LOGGER.warn("Unexpected interruption during the close method of logWriter");
-          }
-          continue;
-        }
-        break;
-      }
-      startIndex = endIndex + 1;
-      createLogFile(startIndex);
-    }
-
+  private TSStatus persistPlanForSimpleConsensus(ConfigPhysicalPlan plan) {
     try {
+      if (simpleLogWriter == null || simpleLogFile == null) {
+        throw new IOException("SimpleConsensus log writer is not initialized.");
+      }
+
+      if (simpleLogFile.length() > LOG_FILE_MAX_SIZE) {
+        rollSimpleConsensusLogFile();
+      }
+
       ByteBuffer buffer = plan.serializeToByteBuffer();
       buffer.position(buffer.limit());
       simpleLogWriter.write(buffer);
+      simpleLogWriter.force();
 
       endIndex = endIndex + 1;
     } catch (Exception e) {
       LOGGER.error(
-          "Can't serialize current ConfigPhysicalPlan for ConfigNode SimpleConsensus mode", e);
+          "Persist current ConfigPhysicalPlan for ConfigNode SimpleConsensus mode failed", e);
+      return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+          .setMessage(
+              "Persist ConfigNode SimpleConsensus log failed: " + String.valueOf(e.getMessage()));
     }
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+  }
+
+  private void rollSimpleConsensusLogFile() throws IOException {
+    simpleLogWriter.force();
+    closeSimpleLogWriter();
+    Files.move(
+        simpleLogFile.toPath(),
+        new File(FILE_PATH + startIndex + "_" + endIndex).toPath(),
+        StandardCopyOption.ATOMIC_MOVE);
+    startIndex = endIndex + 1;
+    createLogFile(startIndex);
   }
 
   private void initStandAloneConfigNode() {
     File dir = new File(CURRENT_FILE_DIR);
     dir.mkdirs();
     String[] list = new File(CURRENT_FILE_DIR).list();
+    endIndex = 0;
     if (list != null && list.length != 0) {
       Arrays.sort(list, new FileComparator());
       for (String logFileName : list) {
@@ -417,7 +418,7 @@
           continue;
         }
 
-        startIndex = endIndex;
+        final int recoveredStartIndex = parseStartIndex(logFileName);
         while (logReader.hasNext()) {
           endIndex++;
           // Read and re-serialize the PhysicalPlan
@@ -435,13 +436,13 @@
           }
         }
         logReader.close();
+        if (isInProgressLogFile(logFileName)) {
+          sealRecoveredInProgressLogFile(logFile, recoveredStartIndex, endIndex);
+        }
       }
-    } else {
-      startIndex = 0;
-      endIndex = 0;
     }
-    startIndex = startIndex + 1;
-    createLogFile(endIndex);
+    startIndex = endIndex + 1;
+    createLogFile(startIndex);
 
     ScheduledExecutorService simpleConsensusThread =
         IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
@@ -482,26 +483,72 @@
     }
   }
 
+  private void sealRecoveredInProgressLogFile(
+      File logFile, int recoveredStartIndex, int recoveredEndIndex) {
+    try {
+      if (recoveredStartIndex > recoveredEndIndex) {
+        Files.deleteIfExists(logFile.toPath());
+        return;
+      }
+      Files.move(
+          logFile.toPath(),
+          new File(FILE_PATH + recoveredStartIndex + "_" + recoveredEndIndex).toPath(),
+          StandardCopyOption.ATOMIC_MOVE);
+    } catch (IOException e) {
+      LOGGER.warn("Seal recovered ConfigNode SimpleConsensus log failed: {}", logFile, e);
+    }
+  }
+
+  private boolean isInProgressLogFile(String filename) {
+    return filename.startsWith("log_inprogress_");
+  }
+
+  private void closeSimpleLogWriter() {
+    if (simpleLogWriter == null) {
+      return;
+    }
+    for (int retry = 0; retry < 5; retry++) {
+      try {
+        simpleLogWriter.close();
+        simpleLogWriter = null;
+        return;
+      } catch (IOException e) {
+        LOGGER.warn(
+            "Can't close StandAloneLog for ConfigNode SimpleConsensus mode, "
+                + "filePath: {}, retry: {}",
+            simpleLogFile == null ? null : simpleLogFile.getAbsolutePath(),
+            retry);
+        try {
+          TimeUnit.SECONDS.sleep(1);
+        } catch (InterruptedException e2) {
+          Thread.currentThread().interrupt();
+          LOGGER.warn("Unexpected interruption during the close method of logWriter");
+          break;
+        }
+      }
+    }
+  }
+
   static class FileComparator implements Comparator<String> {
 
     @Override
     public int compare(String filename1, String filename2) {
-      long id1 = parseEndIndex(filename1);
-      long id2 = parseEndIndex(filename2);
+      long id1 = parseStartIndex(filename1);
+      long id2 = parseStartIndex(filename2);
       return Long.compare(id1, id2);
     }
   }
 
-  static long parseEndIndex(String filename) {
+  static int parseStartIndex(String filename) {
     if (filename.startsWith("log_inprogress_")) {
       Matcher matcher = LOG_INPROGRESS_PATTERN.matcher(filename);
       if (matcher.find()) {
-        return Long.parseLong(matcher.group());
+        return Integer.parseInt(matcher.group(1));
       }
     } else {
       Matcher matcher = LOG_PATTERN.matcher(filename);
       if (matcher.find()) {
-        return Long.parseLong(matcher.group());
+        return Integer.parseInt(matcher.group(1));
       }
     }
     return 0;
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 2e50c6b..abd89ff 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.confignode.manager.node;
 
 import org.apache.iotdb.common.rpc.thrift.TAINodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TAINodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
@@ -46,6 +47,7 @@
 import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
 import org.apache.iotdb.confignode.consensus.request.read.ainode.GetAINodeConfigurationPlan;
 import org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan;
 import org.apache.iotdb.confignode.consensus.request.write.ainode.RegisterAINodePlan;
@@ -321,30 +323,33 @@
     DataNodeRegisterResp resp = new DataNodeRegisterResp();
     resp.setConfigNodeList(getRegisteredConfigNodes());
 
-    // Create a new DataNodeHeartbeatCache and force update NodeStatus
     int dataNodeId = nodeInfo.generateNextNodeId();
-    getLoadManager().getLoadCache().createNodeHeartbeatCache(NodeType.DataNode, dataNodeId);
-    // TODO: invoke a force heartbeat to update new DataNode's status immediately
 
     RegisterDataNodePlan registerDataNodePlan =
         new RegisterDataNodePlan(req.getDataNodeConfiguration());
     // Register new DataNode
     registerDataNodePlan.getDataNodeConfiguration().getLocation().setDataNodeId(dataNodeId);
-    try {
-      getConsensusManager().write(registerDataNodePlan);
-    } catch (ConsensusException e) {
-      LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
+    TSStatus registerStatus = writeConfigPhysicalPlan(registerDataNodePlan);
+    if (!isConsensusWriteSuccessful(registerStatus)) {
+      resp.setStatus(registerStatus);
+      return resp;
     }
 
     // update datanode's versionInfo
     UpdateVersionInfoPlan updateVersionInfoPlan =
         new UpdateVersionInfoPlan(req.getVersionInfo(), dataNodeId);
-    try {
-      getConsensusManager().write(updateVersionInfoPlan);
-    } catch (ConsensusException e) {
-      LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
+    TSStatus updateVersionStatus = writeConfigPhysicalPlan(updateVersionInfoPlan);
+    if (!isConsensusWriteSuccessful(updateVersionStatus)) {
+      resp.setStatus(
+          rollbackDataNodeRegistration(
+              registerDataNodePlan.getDataNodeConfiguration().getLocation(), updateVersionStatus));
+      return resp;
     }
 
+    // Create a new DataNodeHeartbeatCache and force update NodeStatus
+    getLoadManager().getLoadCache().createNodeHeartbeatCache(NodeType.DataNode, dataNodeId);
+    // TODO: invoke a force heartbeat to update new DataNode's status immediately
+
     // Bind DataNode metrics
     PartitionMetrics.bindDataNodePartitionMetricsWhenUpdate(
         MetricService.getInstance(), configManager, dataNodeId);
@@ -352,7 +357,10 @@
     // Adjust the maximum RegionGroup number of each Database
     getClusterSchemaManager().adjustMaxRegionGroupNum();
 
-    resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION);
+    resp.setStatus(
+        buildSuccessStatus(
+            ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION.getMessage(),
+            registerStatus.getMessage()));
     resp.setDataNodeId(
         registerDataNodePlan.getDataNodeConfiguration().getLocation().getDataNodeId());
     resp.setRuntimeConfiguration(getRuntimeConfiguration(dataNodeId));
@@ -380,10 +388,10 @@
       // Update DataNodeConfiguration when modified during restart
       UpdateDataNodePlan updateDataNodePlan =
           new UpdateDataNodePlan(req.getDataNodeConfiguration());
-      try {
-        getConsensusManager().write(updateDataNodePlan);
-      } catch (ConsensusException e) {
-        LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
+      TSStatus updateStatus = writeConfigPhysicalPlan(updateDataNodePlan);
+      if (!isConsensusWriteSuccessful(updateStatus)) {
+        resp.setStatus(updateStatus);
+        return resp;
       }
     }
     TNodeVersionInfo versionInfo = nodeInfo.getVersionInfo(nodeId);
@@ -391,14 +399,14 @@
       // Update versionInfo when modified during restart
       UpdateVersionInfoPlan updateVersionInfoPlan =
           new UpdateVersionInfoPlan(req.getVersionInfo(), nodeId);
-      try {
-        getConsensusManager().write(updateVersionInfoPlan);
-      } catch (ConsensusException e) {
-        LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
+      TSStatus updateStatus = writeConfigPhysicalPlan(updateVersionInfoPlan);
+      if (!isConsensusWriteSuccessful(updateStatus)) {
+        resp.setStatus(updateStatus);
+        return resp;
       }
     }
 
-    resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART);
+    resp.setStatus(buildSuccessStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART.getMessage()));
     resp.setRuntimeConfiguration(getRuntimeConfiguration(nodeId));
 
     resp.setCorrectConsensusGroups(getPartitionManager().getAllReplicaSets(nodeId));
@@ -476,13 +484,12 @@
       // Update versionInfo when modified during restart
       UpdateVersionInfoPlan updateConfigNodePlan =
           new UpdateVersionInfoPlan(versionInfo, configNodeId);
-      try {
-        return getConsensusManager().write(updateConfigNodePlan);
-      } catch (ConsensusException e) {
-        return new TSStatus(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode());
+      TSStatus updateStatus = writeConfigPhysicalPlan(updateConfigNodePlan);
+      if (!isConsensusWriteSuccessful(updateStatus)) {
+        return updateStatus;
       }
     }
-    return ClusterNodeStartUtils.ACCEPT_NODE_RESTART;
+    return buildSuccessStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART.getMessage());
   }
 
   public List<TAINodeInfo> getRegisteredAINodeInfoList() {
@@ -528,27 +535,37 @@
     }
 
     int aiNodeId = nodeInfo.generateNextNodeId();
-    getLoadManager().getLoadCache().createNodeHeartbeatCache(NodeType.AINode, aiNodeId);
     RegisterAINodePlan registerAINodePlan = new RegisterAINodePlan(req.getAiNodeConfiguration());
     // Register new DataNode
     registerAINodePlan.getAINodeConfiguration().getLocation().setAiNodeId(aiNodeId);
-    try {
-      getConsensusManager().write(registerAINodePlan);
-    } catch (ConsensusException e) {
-      LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
+    TSStatus registerStatus = writeConfigPhysicalPlan(registerAINodePlan);
+    if (!isConsensusWriteSuccessful(registerStatus)) {
+      AINodeRegisterResp resp = new AINodeRegisterResp();
+      resp.setConfigNodeList(getRegisteredConfigNodes());
+      resp.setStatus(registerStatus);
+      return resp;
     }
 
     // update datanode's versionInfo
     UpdateVersionInfoPlan updateVersionInfoPlan =
         new UpdateVersionInfoPlan(req.getVersionInfo(), aiNodeId);
-    try {
-      getConsensusManager().write(updateVersionInfoPlan);
-    } catch (ConsensusException e) {
-      LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
+    TSStatus updateVersionStatus = writeConfigPhysicalPlan(updateVersionInfoPlan);
+    if (!isConsensusWriteSuccessful(updateVersionStatus)) {
+      AINodeRegisterResp resp = new AINodeRegisterResp();
+      resp.setConfigNodeList(getRegisteredConfigNodes());
+      resp.setStatus(
+          rollbackAINodeRegistration(
+              registerAINodePlan.getAINodeConfiguration().getLocation(), updateVersionStatus));
+      return resp;
     }
 
+    getLoadManager().getLoadCache().createNodeHeartbeatCache(NodeType.AINode, aiNodeId);
+
     AINodeRegisterResp resp = new AINodeRegisterResp();
-    resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION);
+    resp.setStatus(
+        buildSuccessStatus(
+            ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION.getMessage(),
+            registerStatus.getMessage()));
     resp.setConfigNodeList(getRegisteredConfigNodes());
     resp.setAINodeId(registerAINodePlan.getAINodeConfiguration().getLocation().getAiNodeId());
     return resp;
@@ -586,10 +603,12 @@
     if (!req.getAiNodeConfiguration().equals(aiNodeConfiguration)) {
       // Update AINodeConfiguration when modified during restart
       UpdateAINodePlan updateAINodePlan = new UpdateAINodePlan(req.getAiNodeConfiguration());
-      try {
-        getConsensusManager().write(updateAINodePlan);
-      } catch (ConsensusException e) {
-        LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
+      TSStatus updateStatus = writeConfigPhysicalPlan(updateAINodePlan);
+      if (!isConsensusWriteSuccessful(updateStatus)) {
+        TAINodeRestartResp resp = new TAINodeRestartResp();
+        resp.setConfigNodeList(getRegisteredConfigNodes());
+        resp.setStatus(updateStatus);
+        return resp;
       }
     }
     TNodeVersionInfo versionInfo = nodeInfo.getVersionInfo(nodeId);
@@ -597,15 +616,17 @@
       // Update versionInfo when modified during restart
       UpdateVersionInfoPlan updateVersionInfoPlan =
           new UpdateVersionInfoPlan(req.getVersionInfo(), nodeId);
-      try {
-        getConsensusManager().write(updateVersionInfoPlan);
-      } catch (ConsensusException e) {
-        LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
+      TSStatus updateStatus = writeConfigPhysicalPlan(updateVersionInfoPlan);
+      if (!isConsensusWriteSuccessful(updateStatus)) {
+        TAINodeRestartResp resp = new TAINodeRestartResp();
+        resp.setConfigNodeList(getRegisteredConfigNodes());
+        resp.setStatus(updateStatus);
+        return resp;
       }
     }
 
     TAINodeRestartResp resp = new TAINodeRestartResp();
-    resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART);
+    resp.setStatus(buildSuccessStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART.getMessage()));
     resp.setConfigNodeList(getRegisteredConfigNodes());
     return resp;
   }
@@ -890,17 +911,15 @@
   public void applyConfigNode(
       TConfigNodeLocation configNodeLocation, TNodeVersionInfo versionInfo) {
     ApplyConfigNodePlan applyConfigNodePlan = new ApplyConfigNodePlan(configNodeLocation);
-    try {
-      getConsensusManager().write(applyConfigNodePlan);
-    } catch (ConsensusException e) {
-      LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
-    }
+    ensureConsensusWriteSuccessful(
+        writeConfigPhysicalPlan(applyConfigNodePlan),
+        String.format("apply ConfigNode %s", configNodeLocation));
     UpdateVersionInfoPlan updateVersionInfoPlan =
         new UpdateVersionInfoPlan(versionInfo, configNodeLocation.getConfigNodeId());
-    try {
-      getConsensusManager().write(updateVersionInfoPlan);
-    } catch (ConsensusException e) {
-      LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
+    final TSStatus updateStatus = writeConfigPhysicalPlan(updateVersionInfoPlan);
+    if (!isConsensusWriteSuccessful(updateStatus)) {
+      throw new IllegalStateException(
+          rollbackConfigNodeRegistration(configNodeLocation, updateStatus).getMessage());
     }
   }
 
@@ -1303,6 +1322,144 @@
     return getRegisteredDataNode(dataNodeId).getLocation();
   }
 
+  private TSStatus writeConfigPhysicalPlan(ConfigPhysicalPlan plan) {
+    try {
+      return getConsensusManager().write(plan);
+    } catch (ConsensusException e) {
+      LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
+      return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+          .setMessage(e.getMessage());
+    }
+  }
+
+  private boolean isConsensusWriteSuccessful(TSStatus status) {
+    return status != null && status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
+  }
+
+  private TSStatus rollbackDataNodeRegistration(
+      TDataNodeLocation dataNodeLocation, TSStatus versionUpdateStatus) {
+    final TSStatus rollbackStatus =
+        writeConfigPhysicalPlan(
+            new RemoveDataNodePlan(Collections.singletonList(dataNodeLocation)));
+    final String failureMessage =
+        String.format(
+            "Failed to persist version info for DataNode %d: %s",
+            dataNodeLocation.getDataNodeId(), describeStatus(versionUpdateStatus));
+    if (isConsensusWriteSuccessful(rollbackStatus)) {
+      return buildStatus(
+          versionUpdateStatus.getCode(),
+          failureMessage,
+          "The registration has been rolled back. Please retry the registration.");
+    }
+
+    LOGGER.error(
+        "Failed to roll back DataNode registration {} after version info persistence failure. "
+            + "versionUpdateStatus: {}, rollbackStatus: {}",
+        dataNodeLocation,
+        versionUpdateStatus,
+        rollbackStatus);
+    return buildStatus(
+        rollbackStatus.getCode(),
+        failureMessage,
+        String.format("The registration rollback also failed: %s", describeStatus(rollbackStatus)),
+        "Manual cleanup may be required before retrying the registration.");
+  }
+
+  private TSStatus rollbackAINodeRegistration(
+      TAINodeLocation aiNodeLocation, TSStatus versionUpdateStatus) {
+    final TSStatus rollbackStatus = writeConfigPhysicalPlan(new RemoveAINodePlan(aiNodeLocation));
+    final String failureMessage =
+        String.format(
+            "Failed to persist version info for AINode %d: %s",
+            aiNodeLocation.getAiNodeId(), describeStatus(versionUpdateStatus));
+    if (isConsensusWriteSuccessful(rollbackStatus)) {
+      return buildStatus(
+          versionUpdateStatus.getCode(),
+          failureMessage,
+          "The registration has been rolled back. Please retry the registration.");
+    }
+
+    LOGGER.error(
+        "Failed to roll back AINode registration {} after version info persistence failure. "
+            + "versionUpdateStatus: {}, rollbackStatus: {}",
+        aiNodeLocation,
+        versionUpdateStatus,
+        rollbackStatus);
+    return buildStatus(
+        rollbackStatus.getCode(),
+        failureMessage,
+        String.format("The registration rollback also failed: %s", describeStatus(rollbackStatus)),
+        "Manual cleanup may be required before retrying the registration.");
+  }
+
+  private TSStatus rollbackConfigNodeRegistration(
+      TConfigNodeLocation configNodeLocation, TSStatus versionUpdateStatus) {
+    final TSStatus rollbackStatus =
+        writeConfigPhysicalPlan(new RemoveConfigNodePlan(configNodeLocation));
+    final String failureMessage =
+        String.format(
+            "Failed to persist version info for ConfigNode %d: %s",
+            configNodeLocation.getConfigNodeId(), describeStatus(versionUpdateStatus));
+    if (isConsensusWriteSuccessful(rollbackStatus)) {
+      return buildStatus(
+          versionUpdateStatus.getCode(),
+          failureMessage,
+          "The ConfigNode registration has been rolled back.");
+    }
+
+    LOGGER.error(
+        "Failed to roll back ConfigNode registration {} after version info persistence failure. "
+            + "versionUpdateStatus: {}, rollbackStatus: {}",
+        configNodeLocation,
+        versionUpdateStatus,
+        rollbackStatus);
+    return buildStatus(
+        rollbackStatus.getCode(),
+        failureMessage,
+        String.format("The registration rollback also failed: %s", describeStatus(rollbackStatus)),
+        "Manual cleanup may be required before retrying the registration.");
+  }
+
+  private TSStatus buildStatus(int statusCode, String... messages) {
+    final TSStatus status = new TSStatus(statusCode);
+    final StringBuilder builder = new StringBuilder();
+    for (String message : messages) {
+      if (message == null || message.isEmpty()) {
+        continue;
+      }
+      if (builder.length() > 0) {
+        builder.append(' ');
+      }
+      builder.append(message);
+    }
+    if (builder.length() > 0) {
+      status.setMessage(builder.toString());
+    }
+    return status;
+  }
+
+  private TSStatus buildSuccessStatus(String... messages) {
+    return buildStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode(), messages);
+  }
+
+  private String describeStatus(TSStatus status) {
+    if (status == null) {
+      return "unknown error";
+    }
+    if (status.getMessage() != null && !status.getMessage().isEmpty()) {
+      return status.getMessage();
+    }
+    return "status code " + status.getCode();
+  }
+
+  private void ensureConsensusWriteSuccessful(TSStatus status, String action) {
+    if (isConsensusWriteSuccessful(status)) {
+      return;
+    }
+    throw new IllegalStateException(
+        String.format("Failed to %s through consensus layer: %s", action, status));
+  }
+
   private ConsensusManager getConsensusManager() {
     return configManager.getConsensusManager();
   }
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index b2bae24..fe1a608 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -760,12 +760,12 @@
     return result.get();
   }
 
-  public void loadSnapshot(final File latestSnapshotRootDir) {
+  public boolean loadSnapshot(final File latestSnapshotRootDir) {
     if (!latestSnapshotRootDir.exists()) {
       LOGGER.error(
           "snapshot directory [{}] is not exist, can not load snapshot with this directory.",
           latestSnapshotRootDir.getAbsolutePath());
-      return;
+      return false;
     }
 
     final AtomicBoolean result = new AtomicBoolean(true);
@@ -793,6 +793,7 @@
           "[ConfigNodeSnapshot] Load snapshot success, latestSnapshotRootDir: {}",
           latestSnapshotRootDir);
     }
+    return result.get();
   }
 
   private DataSet getSchemaNodeManagementPartition(ConfigPhysicalPlan req) {
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
index 5a80364..e8b3cb3 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
@@ -243,7 +243,7 @@
                 result.getAndIncrement();
               }
             });
-    return result.getAndIncrement();
+    return result.get();
   }
 
   /**
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index 5c2c93d..f876d4d 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -243,7 +243,9 @@
    */
   public TSStatus pollRegionMaintainTask() {
     synchronized (regionMaintainTaskList) {
-      regionMaintainTaskList.remove(0);
+      if (!regionMaintainTaskList.isEmpty()) {
+        regionMaintainTaskList.remove(0);
+      }
       return RpcUtils.SUCCESS_STATUS;
     }
   }
@@ -1008,9 +1010,14 @@
         databasePartitionTableEntry.getValue().serialize(bufferedOutputStream, protocol);
       }
 
+      final List<RegionMaintainTask> copiedRegionMaintainTaskList;
+      synchronized (regionMaintainTaskList) {
+        copiedRegionMaintainTaskList = new ArrayList<>(regionMaintainTaskList);
+      }
+
       // serialize regionCleanList
-      ReadWriteIOUtils.write(regionMaintainTaskList.size(), bufferedOutputStream);
-      for (RegionMaintainTask task : regionMaintainTaskList) {
+      ReadWriteIOUtils.write(copiedRegionMaintainTaskList.size(), bufferedOutputStream);
+      for (RegionMaintainTask task : copiedRegionMaintainTaskList) {
         task.serialize(bufferedOutputStream, protocol);
       }
 
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
index b38696a..c0d45d0 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
@@ -80,9 +80,9 @@
           LOG.info("Successfully ADD_PEER {}", tConfigNodeLocation);
           break;
         case REGISTER_SUCCESS:
-          env.notifyRegisterSuccess(tConfigNodeLocation);
-          env.createConfigNodeHeartbeatCache(tConfigNodeLocation.getConfigNodeId());
           env.applyConfigNode(tConfigNodeLocation, versionInfo);
+          env.createConfigNodeHeartbeatCache(tConfigNodeLocation.getConfigNodeId());
+          env.notifyRegisterSuccess(tConfigNodeLocation);
           LOG.info("The ConfigNode: {} is successfully added to the cluster", tConfigNodeLocation);
           return Flow.NO_MORE_STATE;
       }
diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachineTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachineTest.java
new file mode 100644
index 0000000..7bd59c8
--- /dev/null
+++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachineTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.statemachine;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class ConfigRegionStateMachineTest {
+
+  @Test
+  public void testParseStartIndex() {
+    Assert.assertEquals(1, ConfigRegionStateMachine.parseStartIndex("log_1_10"));
+    Assert.assertEquals(11, ConfigRegionStateMachine.parseStartIndex("log_11_20"));
+    Assert.assertEquals(21, ConfigRegionStateMachine.parseStartIndex("log_inprogress_21"));
+    Assert.assertEquals(0, ConfigRegionStateMachine.parseStartIndex("invalid"));
+  }
+
+  @Test
+  public void testFileComparatorSortsByStartIndex() {
+    List<String> filenames =
+        new ArrayList<>(Arrays.asList("log_inprogress_21", "log_11_20", "log_1_10"));
+
+    filenames.sort(new ConfigRegionStateMachine.FileComparator());
+
+    Assert.assertEquals(Arrays.asList("log_1_10", "log_11_20", "log_inprogress_21"), filenames);
+  }
+}
diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeManagerTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeManagerTest.java
new file mode 100644
index 0000000..247d745
--- /dev/null
+++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeManagerTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.node;
+
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.NodeType;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan;
+import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
+import org.apache.iotdb.confignode.consensus.request.write.confignode.UpdateVersionInfoPlan;
+import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
+import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
+import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeRegisterResp;
+import org.apache.iotdb.confignode.manager.ClusterManager;
+import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
+import org.apache.iotdb.confignode.manager.load.LoadManager;
+import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
+import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
+import org.apache.iotdb.confignode.persistence.node.NodeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
+import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class NodeManagerTest {
+
+  private IManager configManager;
+  private ConsensusManager consensusManager;
+  private LoadManager loadManager;
+  private LoadCache loadCache;
+  private ClusterSchemaManager clusterSchemaManager;
+  private ClusterManager clusterManager;
+  private NodeInfo nodeInfo;
+  private NodeManager nodeManager;
+
+  @Before
+  public void setUp() {
+    configManager = Mockito.mock(IManager.class);
+    consensusManager = Mockito.mock(ConsensusManager.class);
+    loadManager = Mockito.mock(LoadManager.class);
+    loadCache = Mockito.mock(LoadCache.class);
+    clusterSchemaManager = Mockito.mock(ClusterSchemaManager.class);
+    clusterManager = Mockito.mock(ClusterManager.class);
+    nodeInfo = new NodeInfo();
+    nodeManager = new NodeManager(configManager, nodeInfo);
+
+    when(configManager.getConsensusManager()).thenReturn(consensusManager);
+    when(configManager.getLoadManager()).thenReturn(loadManager);
+    when(loadManager.getLoadCache()).thenReturn(loadCache);
+    when(configManager.getClusterSchemaManager()).thenReturn(clusterSchemaManager);
+    when(configManager.getClusterManager()).thenReturn(clusterManager);
+  }
+
+  @Test
+  public void testRegisterDataNodeStopsWhenRegisterWriteFails() throws ConsensusException {
+    TSStatus failureStatus =
+        new TSStatus(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()).setMessage("redirect");
+    when(consensusManager.write(any())).thenReturn(failureStatus);
+
+    DataNodeRegisterResp resp =
+        (DataNodeRegisterResp) nodeManager.registerDataNode(generateDataNodeRegisterReq(1));
+
+    Assert.assertEquals(failureStatus, resp.getStatus());
+    verify(loadCache, never()).createNodeHeartbeatCache(eq(NodeType.DataNode), anyInt());
+    verify(clusterSchemaManager, never()).adjustMaxRegionGroupNum();
+  }
+
+  @Test
+  public void testRegisterDataNodeRollsBackWhenVersionWriteFails() throws ConsensusException {
+    TSStatus successStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    TSStatus failureStatus =
+        new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+            .setMessage("update failed");
+    when(consensusManager.write(any())).thenReturn(successStatus, failureStatus, successStatus);
+
+    DataNodeRegisterResp resp =
+        (DataNodeRegisterResp) nodeManager.registerDataNode(generateDataNodeRegisterReq(1));
+
+    Assert.assertEquals(
+        TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), resp.getStatus().getCode());
+    Assert.assertTrue(resp.getStatus().getMessage().contains("rolled back"));
+    verify(loadCache, never()).createNodeHeartbeatCache(eq(NodeType.DataNode), anyInt());
+    verify(clusterSchemaManager, never()).adjustMaxRegionGroupNum();
+
+    ArgumentCaptor<ConfigPhysicalPlan> planCaptor =
+        ArgumentCaptor.forClass(ConfigPhysicalPlan.class);
+    verify(consensusManager, Mockito.times(3)).write(planCaptor.capture());
+    Assert.assertTrue(planCaptor.getAllValues().get(0) instanceof RegisterDataNodePlan);
+    Assert.assertTrue(planCaptor.getAllValues().get(1) instanceof UpdateVersionInfoPlan);
+    Assert.assertTrue(planCaptor.getAllValues().get(2) instanceof RemoveDataNodePlan);
+  }
+
+  @Test
+  public void testRestartDataNodeReturnsFailureWhenUpdateWriteFails() throws ConsensusException {
+    final TDataNodeConfiguration registeredConfig = generateDataNodeConfiguration(1, "127.0.0.1");
+    nodeInfo.registerDataNode(new RegisterDataNodePlan(registeredConfig));
+    when(clusterManager.getClusterIdWithRetry(anyLong())).thenReturn("cluster");
+
+    TSStatus failureStatus =
+        new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+            .setMessage("update failed");
+    when(consensusManager.write(any())).thenReturn(failureStatus);
+
+    final TDataNodeRestartReq req = new TDataNodeRestartReq();
+    req.setDataNodeConfiguration(generateDataNodeConfiguration(1, "127.0.0.2"));
+    req.setVersionInfo(new TNodeVersionInfo("version", "build"));
+
+    final TDataNodeRestartResp resp = nodeManager.updateDataNodeIfNecessary(req);
+
+    Assert.assertEquals(failureStatus, resp.getStatus());
+  }
+
+  @Test
+  public void testApplyConfigNodeRollsBackWhenVersionWriteFails() throws ConsensusException {
+    TSStatus successStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    TSStatus failureStatus =
+        new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+            .setMessage("apply failed");
+    when(consensusManager.write(any())).thenReturn(successStatus, failureStatus, successStatus);
+
+    try {
+      nodeManager.applyConfigNode(
+          new TConfigNodeLocation(
+              1, new TEndPoint("127.0.0.1", 10710), new TEndPoint("127.0.0.1", 10720)),
+          new TNodeVersionInfo("version", "build"));
+      Assert.fail("Expected applyConfigNode to fail fast");
+    } catch (IllegalStateException e) {
+      Assert.assertTrue(e.getMessage().contains("rolled back"));
+    }
+
+    ArgumentCaptor<ConfigPhysicalPlan> planCaptor =
+        ArgumentCaptor.forClass(ConfigPhysicalPlan.class);
+    verify(consensusManager, Mockito.times(3)).write(planCaptor.capture());
+    Assert.assertTrue(planCaptor.getAllValues().get(0) instanceof ApplyConfigNodePlan);
+    Assert.assertTrue(planCaptor.getAllValues().get(1) instanceof UpdateVersionInfoPlan);
+    Assert.assertTrue(planCaptor.getAllValues().get(2) instanceof RemoveConfigNodePlan);
+  }
+
+  private TDataNodeRegisterReq generateDataNodeRegisterReq(int dataNodeId) {
+    final TDataNodeRegisterReq req = new TDataNodeRegisterReq();
+    req.setDataNodeConfiguration(generateDataNodeConfiguration(dataNodeId, "127.0.0.1"));
+    req.setVersionInfo(new TNodeVersionInfo("version", "build"));
+    return req;
+  }
+
+  private TDataNodeConfiguration generateDataNodeConfiguration(int dataNodeId, String ip) {
+    final TDataNodeLocation dataNodeLocation = new TDataNodeLocation();
+    dataNodeLocation.setDataNodeId(dataNodeId);
+    dataNodeLocation.setClientRpcEndPoint(new TEndPoint(ip, 6667 + dataNodeId));
+    dataNodeLocation.setInternalEndPoint(new TEndPoint(ip, 10730 + dataNodeId));
+    dataNodeLocation.setMPPDataExchangeEndPoint(new TEndPoint(ip, 10740 + dataNodeId));
+    dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint(ip, 10760 + dataNodeId));
+    dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint(ip, 10750 + dataNodeId));
+
+    final TDataNodeConfiguration dataNodeConfiguration = new TDataNodeConfiguration();
+    dataNodeConfiguration.setLocation(dataNodeLocation);
+    return dataNodeConfiguration;
+  }
+}
diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
index afccb0c..eccddca 100644
--- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
+++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
@@ -37,6 +37,7 @@
 import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
 import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
 import org.apache.iotdb.confignode.consensus.response.partition.RegionInfoListResp;
+import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
 import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
 import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionCreateTask;
 import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask;
@@ -267,6 +268,34 @@
             });
   }
 
+  @Test
+  public void testRegionGroupCount() throws DatabaseNotExistsException {
+    partitionInfo.createDatabase(
+        new DatabaseSchemaPlan(
+            ConfigPhysicalPlanType.CreateDatabase, new TDatabaseSchema("root.region_count")));
+
+    CreateRegionGroupsPlan createRegionGroupsPlan = new CreateRegionGroupsPlan();
+    createRegionGroupsPlan.addRegionGroup(
+        "root.region_count",
+        generateTRegionReplicaSet(
+            testFlag.SchemaPartition.getFlag(),
+            generateTConsensusGroupId(
+                testFlag.SchemaPartition.getFlag(), TConsensusGroupType.SchemaRegion)));
+    createRegionGroupsPlan.addRegionGroup(
+        "root.region_count",
+        generateTRegionReplicaSet(
+            testFlag.DataPartition.getFlag(),
+            generateTConsensusGroupId(
+                testFlag.DataPartition.getFlag(), TConsensusGroupType.DataRegion)));
+    partitionInfo.createRegionGroups(createRegionGroupsPlan);
+
+    Assert.assertEquals(
+        1,
+        partitionInfo.getRegionGroupCount("root.region_count", TConsensusGroupType.SchemaRegion));
+    Assert.assertEquals(
+        1, partitionInfo.getRegionGroupCount("root.region_count", TConsensusGroupType.DataRegion));
+  }
+
   private TRegionReplicaSet generateTRegionReplicaSet(
       int startFlag, TConsensusGroupId tConsensusGroupId) {
     TRegionReplicaSet tRegionReplicaSet = new TRegionReplicaSet();