[IOTDB-4419] Maintain RegionStatus through cluster heartbeat (#7345)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
index 890b448..2ed85ef 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
@@ -21,7 +21,6 @@
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.confignode.manager.load.heartbeat.DataNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.load.heartbeat.IRegionGroupCache;
 import org.apache.iotdb.confignode.manager.load.heartbeat.NodeHeartbeatSample;
 import org.apache.iotdb.confignode.manager.load.heartbeat.RegionGroupCache;
 import org.apache.iotdb.confignode.manager.load.heartbeat.RegionHeartbeatSample;
@@ -36,12 +35,12 @@
   // Update DataNodeHeartbeatCache when success
   private final TDataNodeLocation dataNodeLocation;
   private final DataNodeHeartbeatCache dataNodeHeartbeatCache;
-  private final Map<TConsensusGroupId, IRegionGroupCache> regionGroupCacheMap;
+  private final Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap;
 
   public DataNodeHeartbeatHandler(
       TDataNodeLocation dataNodeLocation,
       DataNodeHeartbeatCache dataNodeHeartbeatCache,
-      Map<TConsensusGroupId, IRegionGroupCache> regionGroupCacheMap) {
+      Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap) {
     this.dataNodeLocation = dataNodeLocation;
     this.dataNodeHeartbeatCache = dataNodeHeartbeatCache;
     this.regionGroupCacheMap = regionGroupCacheMap;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index ec119bf..33fb5ed 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -21,7 +21,6 @@
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
@@ -29,7 +28,6 @@
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.cluster.RegionRoleType;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IllegalPathException;
@@ -80,14 +78,12 @@
 import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
-import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
@@ -102,7 +98,6 @@
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.apache.commons.collections4.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -117,7 +112,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
@@ -792,24 +786,7 @@
   public RegionInfoListResp showRegion(GetRegionInfoListPlan getRegionInfoListPlan) {
     TSStatus status = confirmLeader();
     if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      RegionInfoListResp regionInfoListResp =
-          (RegionInfoListResp) partitionManager.getRegionInfoList(getRegionInfoListPlan);
-      regionInfoListResp
-          .getRegionInfoList()
-          .forEach(
-              regionInfo -> {
-                Map<TConsensusGroupId, Integer> allLeadership =
-                    getPartitionManager().getAllLeadership();
-                if (!allLeadership.isEmpty()) {
-                  String regionType =
-                      regionInfo.getDataNodeId()
-                              == allLeadership.getOrDefault(regionInfo.getConsensusGroupId(), -1)
-                          ? RegionRoleType.Leader.toString()
-                          : RegionRoleType.Follower.toString();
-                  regionInfo.setRoleType(regionType);
-                }
-              });
-      return regionInfoListResp;
+      return (RegionInfoListResp) partitionManager.getRegionInfoList(getRegionInfoListPlan);
     } else {
       RegionInfoListResp regionResp = new RegionInfoListResp();
       regionResp.setStatus(status);
@@ -822,48 +799,8 @@
     TSStatus status = confirmLeader();
     TShowDataNodesResp resp = new TShowDataNodesResp();
     if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      List<TDataNodeInfo> registeredDataNodesInfoList = nodeManager.getRegisteredDataNodeInfoList();
-
-      // Map<DataNodeId, DataRegionNum>
-      Map<Integer, AtomicInteger> dataRegionNumMap = new HashMap<>();
-      // Map<DataNodeId, SchemaRegionNum>
-      Map<Integer, AtomicInteger> schemaRegionNumMap = new HashMap<>();
-
-      List<TRegionInfo> regionInfoList =
-          ((RegionInfoListResp) partitionManager.getRegionInfoList(new GetRegionInfoListPlan()))
-              .getRegionInfoList();
-      if (CollectionUtils.isNotEmpty(regionInfoList)) {
-
-        regionInfoList.forEach(
-            (regionInfo) -> {
-              int dataNodeId = regionInfo.getDataNodeId();
-              int regionTypeValue = regionInfo.getConsensusGroupId().getType().getValue();
-              int dataRegionNum =
-                  regionTypeValue == TConsensusGroupType.DataRegion.getValue() ? 1 : 0;
-              int schemaRegionNum =
-                  regionTypeValue == TConsensusGroupType.SchemaRegion.getValue() ? 1 : 0;
-              dataRegionNumMap
-                  .computeIfAbsent(dataNodeId, key -> new AtomicInteger())
-                  .addAndGet(dataRegionNum);
-              schemaRegionNumMap
-                  .computeIfAbsent(dataNodeId, key -> new AtomicInteger())
-                  .addAndGet(schemaRegionNum);
-            });
-
-        registeredDataNodesInfoList.forEach(
-            (dataNodesInfo -> {
-              if (dataRegionNumMap.containsKey(dataNodesInfo.getDataNodeId())) {
-                dataNodesInfo.setDataRegionNum(
-                    dataRegionNumMap.get(dataNodesInfo.getDataNodeId()).get());
-              }
-              if (schemaRegionNumMap.containsKey(dataNodesInfo.getDataNodeId())) {
-                dataNodesInfo.setSchemaRegionNum(
-                    schemaRegionNumMap.get(dataNodesInfo.getDataNodeId()).get());
-              }
-            }));
-      }
-
-      return resp.setDataNodesInfoList(registeredDataNodesInfoList).setStatus(StatusUtils.OK);
+      return resp.setDataNodesInfoList(nodeManager.getRegisteredDataNodeInfoList())
+          .setStatus(StatusUtils.OK);
     } else {
       return resp.setStatus(status);
     }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index 218bbc2..89197da 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -23,6 +23,7 @@
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.cluster.RegionRoleType;
@@ -70,6 +71,7 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -275,6 +277,43 @@
             dataNodeInfoList.add(info);
           });
     }
+
+    // Map<DataNodeId, DataRegionNum>
+    Map<Integer, AtomicInteger> dataRegionNumMap = new HashMap<>();
+    // Map<DataNodeId, SchemaRegionNum>
+    Map<Integer, AtomicInteger> schemaRegionNumMap = new HashMap<>();
+    List<TRegionReplicaSet> regionReplicaSets = getPartitionManager().getAllReplicaSets();
+    regionReplicaSets.forEach(
+        regionReplicaSet ->
+            regionReplicaSet
+                .getDataNodeLocations()
+                .forEach(
+                    dataNodeLocation -> {
+                      switch (regionReplicaSet.getRegionId().getType()) {
+                        case SchemaRegion:
+                          schemaRegionNumMap
+                              .computeIfAbsent(
+                                  dataNodeLocation.getDataNodeId(), key -> new AtomicInteger())
+                              .getAndIncrement();
+                          break;
+                        case DataRegion:
+                        default:
+                          dataRegionNumMap
+                              .computeIfAbsent(
+                                  dataNodeLocation.getDataNodeId(), key -> new AtomicInteger())
+                              .getAndIncrement();
+                      }
+                    }));
+    AtomicInteger zero = new AtomicInteger(0);
+    dataNodeInfoList.forEach(
+        (dataNodesInfo -> {
+          dataNodesInfo.setSchemaRegionNum(
+              schemaRegionNumMap.getOrDefault(dataNodesInfo.getDataNodeId(), zero).get());
+          dataNodesInfo.setDataRegionNum(
+              dataRegionNumMap.getOrDefault(dataNodesInfo.getDataNodeId(), zero).get());
+        }));
+
+    dataNodeInfoList.sort(Comparator.comparingInt(TDataNodeInfo::getDataNodeId));
     return dataNodeInfoList;
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index cffdc97..1d54772 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -25,6 +25,7 @@
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.cluster.RegionRoleType;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.partition.DataPartitionTable;
@@ -45,12 +46,13 @@
 import org.apache.iotdb.confignode.consensus.request.write.PreDeleteStorageGroupPlan;
 import org.apache.iotdb.confignode.consensus.request.write.UpdateRegionLocationPlan;
 import org.apache.iotdb.confignode.consensus.response.DataPartitionResp;
+import org.apache.iotdb.confignode.consensus.response.RegionInfoListResp;
 import org.apache.iotdb.confignode.consensus.response.SchemaNodeManagementResp;
 import org.apache.iotdb.confignode.consensus.response.SchemaPartitionResp;
 import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
 import org.apache.iotdb.confignode.exception.StorageGroupNotExistsException;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
-import org.apache.iotdb.confignode.manager.load.heartbeat.IRegionGroupCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.RegionGroupCache;
 import org.apache.iotdb.confignode.persistence.metric.PartitionInfoMetrics;
 import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
 import org.apache.iotdb.consensus.ConsensusFactory;
@@ -93,7 +95,7 @@
   private Future<?> currentRegionCleanerFuture;
 
   // Map<RegionId, RegionGroupCache>
-  private final Map<TConsensusGroupId, IRegionGroupCache> regionGroupCacheMap;
+  private final Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap;
 
   public PartitionManager(IManager configManager, PartitionInfo partitionInfo) {
     this.configManager = configManager;
@@ -460,7 +462,31 @@
   }
 
   public DataSet getRegionInfoList(GetRegionInfoListPlan req) {
-    return getConsensusManager().read(req).getDataset();
+    // Get static result
+    RegionInfoListResp regionInfoListResp =
+        (RegionInfoListResp) getConsensusManager().read(req).getDataset();
+    Map<TConsensusGroupId, Integer> allLeadership = getAllLeadership();
+
+    // Get cached result
+    regionInfoListResp
+        .getRegionInfoList()
+        .forEach(
+            regionInfo -> {
+              regionInfo.setStatus(
+                  regionGroupCacheMap
+                      .get(regionInfo.getConsensusGroupId())
+                      .getRegionStatus(regionInfo.getDataNodeId())
+                      .getStatus());
+
+              String regionType =
+                  regionInfo.getDataNodeId()
+                          == allLeadership.getOrDefault(regionInfo.getConsensusGroupId(), -1)
+                      ? RegionRoleType.Leader.toString()
+                      : RegionRoleType.Follower.toString();
+              regionInfo.setRoleType(regionType);
+            });
+
+    return regionInfoListResp;
   }
 
   /**
@@ -540,7 +566,7 @@
     }
   }
 
-  public Map<TConsensusGroupId, IRegionGroupCache> getRegionGroupCacheMap() {
+  public Map<TConsensusGroupId, RegionGroupCache> getRegionGroupCacheMap() {
     return regionGroupCacheMap;
   }
 
@@ -549,12 +575,16 @@
   }
 
   /**
-   * Get the leadership of each RegionGroup If a node is in unknown or removing status, this node
-   * can't be leader
+   * Get the leadership of each RegionGroup.
    *
-   * @return Map<RegionGroupId, leader location>
+   * @return Map<RegionGroupId, DataNodeId where the leader located>
+   *     <p>Some RegionGroups that supposed to be occurred in the result map might be nonexistent
+   *     and some leaderId might be -1(leader unknown yet) due to heartbeat latency
    */
   public Map<TConsensusGroupId, Integer> getAllLeadership() {
+
+    // TODO: Will be optimized by IOTDB-4341
+
     Map<TConsensusGroupId, Integer> result = new ConcurrentHashMap<>();
     if (ConfigNodeDescriptor.getInstance()
         .getConf()
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 63ab1e5..072e575 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -196,7 +196,7 @@
         .values()
         .forEach(
             regionGroupCache -> {
-              boolean updateResult = regionGroupCache.updateLoadStatistic();
+              boolean updateResult = regionGroupCache.updateRegionStatistics();
               switch (regionGroupCache.getConsensusGroupId().getType()) {
                   // Check if some RegionGroups change their leader
                 case SchemaRegion:
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/BaseNodeCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/BaseNodeCache.java
index 6bb0cb2..7e70717 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/BaseNodeCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/BaseNodeCache.java
@@ -26,10 +26,10 @@
 public abstract class BaseNodeCache {
 
   /** When the response time of heartbeat is more than 20s, the node is considered as down */
-  static final int HEARTBEAT_TIMEOUT_TIME = 20_000;
+  public static final int HEARTBEAT_TIMEOUT_TIME = 20_000;
 
   /** Max heartbeat cache samples store size */
-  static final int MAXIMUM_WINDOW_SIZE = 100;
+  public static final int MAXIMUM_WINDOW_SIZE = 100;
 
   /** SlidingWindow stores the heartbeat sample data */
   final LinkedList<NodeHeartbeatSample> slidingWindow = new LinkedList<>();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IRegionGroupCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IRegionGroupCache.java
deleted file mode 100644
index 19968a7..0000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IRegionGroupCache.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.manager.load.heartbeat;
-
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-
-public interface IRegionGroupCache {
-
-  /**
-   * Cache the newest HeartbeatSample
-   *
-   * @param newHeartbeatSample The newest HeartbeatSample
-   */
-  void cacheHeartbeatSample(RegionHeartbeatSample newHeartbeatSample);
-
-  /**
-   * Remove the specific sample cache if exists
-   *
-   * @param dataNodeId DataNodeId
-   */
-  void removeCacheIfExists(Integer dataNodeId);
-
-  /**
-   * Invoking periodically to update RegionGroups' load statistics
-   *
-   * @return true if some load statistic changed
-   */
-  boolean updateLoadStatistic();
-
-  /**
-   * Get RegionGroup's latest leader
-   *
-   * @return The DataNodeId of the latest leader
-   */
-  int getLeaderDataNodeId();
-
-  /**
-   * Get RegionGroup's ConsensusGroupId
-   *
-   * @return TConsensusGroupId
-   */
-  TConsensusGroupId getConsensusGroupId();
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionCache.java
new file mode 100644
index 0000000..aea08a7
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionCache.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.load.heartbeat;
+
+import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.apache.iotdb.confignode.manager.load.heartbeat.BaseNodeCache.HEARTBEAT_TIMEOUT_TIME;
+import static org.apache.iotdb.confignode.manager.load.heartbeat.BaseNodeCache.MAXIMUM_WINDOW_SIZE;
+
+public class RegionCache {
+
+  private final List<RegionHeartbeatSample> slidingWindow;
+
+  // Indicates the version of the statistics
+  private volatile long versionTimestamp;
+  private volatile RegionStatus status;
+  private volatile boolean isLeader;
+
+  public RegionCache() {
+    this.slidingWindow = Collections.synchronizedList(new LinkedList<>());
+
+    this.versionTimestamp = 0;
+    this.status = RegionStatus.Unknown;
+    this.isLeader = false;
+  }
+
+  public void cacheHeartbeatSample(RegionHeartbeatSample newHeartbeatSample) {
+    synchronized (slidingWindow) {
+      // Only sequential HeartbeatSamples are accepted.
+      // And un-sequential HeartbeatSamples will be discarded.
+      if (slidingWindow.size() == 0
+          || getLastSample().getSendTimestamp() < newHeartbeatSample.getSendTimestamp()) {
+        slidingWindow.add(newHeartbeatSample);
+      }
+
+      if (slidingWindow.size() > MAXIMUM_WINDOW_SIZE) {
+        slidingWindow.remove(0);
+      }
+    }
+  }
+
+  public void updateStatistics() {
+    synchronized (slidingWindow) {
+      RegionHeartbeatSample lastSample = getLastSample();
+      if (lastSample.getSendTimestamp() > versionTimestamp) {
+        versionTimestamp = lastSample.getSendTimestamp();
+        isLeader = lastSample.isLeader();
+      }
+
+      status =
+          System.currentTimeMillis() - versionTimestamp > HEARTBEAT_TIMEOUT_TIME
+              ? RegionStatus.Unknown
+              : RegionStatus.Running;
+    }
+  }
+
+  public RegionStatus getStatus() {
+    return status;
+  }
+
+  /** @return Pair<Last update time, is leader> */
+  public Pair<Long, Boolean> isLeader() {
+    return new Pair<>(versionTimestamp, isLeader);
+  }
+
+  private RegionHeartbeatSample getLastSample() {
+    return slidingWindow.get(slidingWindow.size() - 1);
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
index 8cfde19..d770d21 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
@@ -19,103 +19,78 @@
 package org.apache.iotdb.confignode.manager.load.heartbeat;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.LinkedList;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 
-public class RegionGroupCache implements IRegionGroupCache {
+public class RegionGroupCache {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(RegionGroupCache.class);
 
-  // TODO: This class might be split into SchemaRegionGroupCache and DataRegionGroupCache
-
-  private static final int maximumWindowSize = 100;
-
   private final TConsensusGroupId consensusGroupId;
 
-  // Map<DataNodeId(where a RegionReplica resides), LinkedList<RegionHeartbeatSample>>
-  private final Map<Integer, LinkedList<RegionHeartbeatSample>> slidingWindow;
+  // Map<DataNodeId(where a RegionReplica resides), RegionCache>
+  private final Map<Integer, RegionCache> regionCacheMap;
 
-  // Indicates the version of the statistics
-  private final AtomicLong versionTimestamp;
   // The DataNode where the leader resides
-  private final AtomicInteger leaderDataNodeId;
+  private volatile int leaderDataNodeId;
 
   public RegionGroupCache(TConsensusGroupId consensusGroupId) {
     this.consensusGroupId = consensusGroupId;
-
-    this.slidingWindow = new ConcurrentHashMap<>();
-
-    this.versionTimestamp = new AtomicLong(0);
-    this.leaderDataNodeId = new AtomicInteger(-1);
+    this.regionCacheMap = new ConcurrentHashMap<>();
+    this.leaderDataNodeId = -1;
   }
 
-  @Override
   public void cacheHeartbeatSample(RegionHeartbeatSample newHeartbeatSample) {
-    slidingWindow.putIfAbsent(newHeartbeatSample.getBelongedDataNodeId(), new LinkedList<>());
-    synchronized (slidingWindow.get(newHeartbeatSample.getBelongedDataNodeId())) {
-      LinkedList<RegionHeartbeatSample> samples =
-          slidingWindow.get(newHeartbeatSample.getBelongedDataNodeId());
-
-      // Only sequential HeartbeatSamples are accepted.
-      // And un-sequential HeartbeatSamples will be discarded.
-      if (samples.size() == 0
-          || samples.getLast().getSendTimestamp() < newHeartbeatSample.getSendTimestamp()) {
-        samples.add(newHeartbeatSample);
-      }
-
-      if (samples.size() > maximumWindowSize) {
-        samples.removeFirst();
-      }
-    }
+    regionCacheMap
+        .computeIfAbsent(newHeartbeatSample.getBelongedDataNodeId(), empty -> new RegionCache())
+        .cacheHeartbeatSample(newHeartbeatSample);
   }
 
-  @Override
-  public boolean updateLoadStatistic() {
+  /**
+   * Update RegionReplicas' statistics, including:
+   *
+   * <p>1. RegionStatus
+   *
+   * <p>2. Leadership
+   *
+   * @return True if the leader changed, false otherwise
+   */
+  public boolean updateRegionStatistics() {
     long updateVersion = Long.MIN_VALUE;
-    int updateLeaderDataNodeId = -1;
-    int originLeaderDataNodeId = leaderDataNodeId.get();
+    int originLeaderDataNodeId = leaderDataNodeId;
 
-    synchronized (slidingWindow) {
-      for (LinkedList<RegionHeartbeatSample> samples : slidingWindow.values()) {
-        if (samples.size() > 0) {
-          RegionHeartbeatSample lastSample = samples.getLast();
-          if (lastSample.getSendTimestamp() > updateVersion && lastSample.isLeader()) {
-            updateVersion = lastSample.getSendTimestamp();
-            updateLeaderDataNodeId = lastSample.getBelongedDataNodeId();
-          }
-        }
+    for (Map.Entry<Integer, RegionCache> cacheEntry : regionCacheMap.entrySet()) {
+      cacheEntry.getValue().updateStatistics();
+      Pair<Long, Boolean> isLeader = cacheEntry.getValue().isLeader();
+      if (isLeader.getLeft() > updateVersion && isLeader.getRight()) {
+        updateVersion = isLeader.getLeft();
+        leaderDataNodeId = cacheEntry.getKey();
       }
     }
 
-    if (updateVersion > versionTimestamp.get()) {
-      // Only update when the leadership information is latest
-      versionTimestamp.set(updateVersion);
-      leaderDataNodeId.set(updateLeaderDataNodeId);
-    }
-
-    return originLeaderDataNodeId != leaderDataNodeId.get();
+    return originLeaderDataNodeId != leaderDataNodeId;
   }
 
-  @Override
-  public void removeCacheIfExists(Integer dataNodeId) {
-    synchronized (slidingWindow) {
-      slidingWindow.remove(dataNodeId);
-    }
+  public void removeCacheIfExists(int dataNodeId) {
+    regionCacheMap.remove(dataNodeId);
   }
 
-  @Override
   public int getLeaderDataNodeId() {
-    return leaderDataNodeId.get();
+    return leaderDataNodeId;
   }
 
-  @Override
+  public RegionStatus getRegionStatus(int dataNodeId) {
+    return regionCacheMap.containsKey(dataNodeId)
+        ? regionCacheMap.get(dataNodeId).getStatus()
+        : RegionStatus.Unknown;
+  }
+
   public TConsensusGroupId getConsensusGroupId() {
     return consensusGroupId;
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index 7cd539b..84abd26 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -71,7 +71,6 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -459,7 +458,10 @@
           regionInfoList.addAll(storageGroupPartitionTable.getRegionInfoList(regionsInfoPlan));
         });
     regionInfoList.sort(
-        Comparator.comparingInt(regionId -> regionId.getConsensusGroupId().getId()));
+        (o1, o2) ->
+            o1.getConsensusGroupId().getId() != o2.getConsensusGroupId().getId()
+                ? o1.getConsensusGroupId().getId() - o2.getConsensusGroupId().getId()
+                : o1.getDataNodeId() - o2.getDataNodeId());
     regionResp.setRegionInfoList(regionInfoList);
     regionResp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
     return regionResp;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
index f527ea1..e45c7dd 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
@@ -24,7 +24,6 @@
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.commons.partition.DataPartitionTable;
 import org.apache.iotdb.commons.partition.SchemaPartitionTable;
 import org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
@@ -300,9 +299,9 @@
 
     regionGroupMap.forEach(
         (consensusGroupId, regionGroup) -> {
-          if (showRegionReq == null || showRegionReq.getConsensusGroupType() == null) {
-            regionInfoList.addAll(buildRegionInfoList(regionGroup));
-          } else if (showRegionReq.getConsensusGroupType().equals(regionGroup.getId().getType())) {
+          if (showRegionReq == null
+              || showRegionReq.getConsensusGroupType() == null
+              || showRegionReq.getConsensusGroupType().equals(regionGroup.getId().getType())) {
             regionInfoList.addAll(buildRegionInfoList(regionGroup));
           }
         });
@@ -327,8 +326,6 @@
               regionInfo.setDataNodeId(dataNodeLocation.getDataNodeId());
               regionInfo.setClientRpcIp(dataNodeLocation.getClientRpcEndPoint().getIp());
               regionInfo.setClientRpcPort(dataNodeLocation.getClientRpcEndPoint().getPort());
-              // TODO: Maintain Region status
-              regionInfo.setStatus(RegionStatus.Up.getStatus());
               regionInfoList.add(regionInfo);
             });
 
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
index de50103..97de8d4 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
@@ -26,7 +26,6 @@
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.confignode.manager.load.heartbeat.BaseNodeCache;
 import org.apache.iotdb.confignode.manager.load.heartbeat.DataNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.load.heartbeat.IRegionGroupCache;
 import org.apache.iotdb.confignode.manager.load.heartbeat.NodeHeartbeatSample;
 import org.apache.iotdb.confignode.manager.load.heartbeat.RegionGroupCache;
 import org.apache.iotdb.confignode.manager.load.heartbeat.RegionHeartbeatSample;
@@ -96,7 +95,7 @@
     List<TRegionReplicaSet> regionReplicaSets = Arrays.asList(regionReplicaSet1, regionReplicaSet2);
 
     // Build regionGroupCacheMap
-    Map<TConsensusGroupId, IRegionGroupCache> regionGroupCacheMap = new HashMap<>();
+    Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap = new HashMap<>();
     regionGroupCacheMap.put(groupId1, new RegionGroupCache(groupId1));
     regionGroupCacheMap.put(groupId2, new RegionGroupCache(groupId2));
 
@@ -124,7 +123,7 @@
     Map<TConsensusGroupId, Integer> leaderMap = new HashMap<>();
     regionGroupCacheMap
         .values()
-        .forEach(regionGroupCache -> Assert.assertTrue(regionGroupCache.updateLoadStatistic()));
+        .forEach(regionGroupCache -> Assert.assertTrue(regionGroupCache.updateRegionStatistics()));
     regionGroupCacheMap.forEach(
         (groupId, regionGroupCache) ->
             leaderMap.put(groupId, regionGroupCache.getLeaderDataNodeId()));
@@ -168,7 +167,7 @@
 
       // Get leaderMap
       leaderMap.clear();
-      regionGroupCacheMap.values().forEach(IRegionGroupCache::updateLoadStatistic);
+      regionGroupCacheMap.values().forEach(RegionGroupCache::updateRegionStatistics);
       regionGroupCacheMap.forEach(
           (groupId, regionGroupCache) ->
               leaderMap.put(groupId, regionGroupCache.getLeaderDataNodeId()));
@@ -207,7 +206,7 @@
     leaderMap.clear();
     regionGroupCacheMap
         .values()
-        .forEach(regionGroupCache -> Assert.assertTrue(regionGroupCache.updateLoadStatistic()));
+        .forEach(regionGroupCache -> Assert.assertTrue(regionGroupCache.updateRegionStatistics()));
     regionGroupCacheMap.forEach(
         (groupId, regionGroupCache) ->
             leaderMap.put(groupId, regionGroupCache.getLeaderDataNodeId()));
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/cluster/RegionStatus.java b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/RegionStatus.java
index ca1282b..6d60ec6 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/cluster/RegionStatus.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/RegionStatus.java
@@ -19,10 +19,13 @@
 
 package org.apache.iotdb.commons.cluster;
 
-/** Node status for showing regions */
+/** Region status for showing regions */
 public enum RegionStatus {
-  // Node running properly
-  Up("Up");
+  /** Region running properly */
+  Running("Running"),
+
+  /** Region connection failure */
+  Unknown("Unknown");
 
   private final String status;