[IOTDB-5570] Move heartbeat thread and statistics thread to load manager (#9608)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
index c97f3d8..c5a7257 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
@@ -18,24 +18,25 @@
  */
 package org.apache.iotdb.confignode.client.async.handlers.heartbeat;
 
-import org.apache.iotdb.confignode.manager.node.heartbeat.ConfigNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
 
 import org.apache.thrift.async.AsyncMethodCallback;
 
 public class ConfigNodeHeartbeatHandler implements AsyncMethodCallback<Long> {
 
-  // Update ConfigNodeHeartbeatCache when success
-  private final ConfigNodeHeartbeatCache configNodeHeartbeatCache;
+  private final int nodeId;
+  private final LoadCache cache;
 
-  public ConfigNodeHeartbeatHandler(ConfigNodeHeartbeatCache configNodeHeartbeatCache) {
-    this.configNodeHeartbeatCache = configNodeHeartbeatCache;
+  public ConfigNodeHeartbeatHandler(int nodeId, LoadCache cache) {
+    this.nodeId = nodeId;
+    this.cache = cache;
   }
 
   @Override
   public void onComplete(Long timestamp) {
-    configNodeHeartbeatCache.cacheHeartbeatSample(
-        new NodeHeartbeatSample(timestamp, System.currentTimeMillis()));
+    long receiveTime = System.currentTimeMillis();
+    cache.cacheConfigNodeHeartbeatSample(nodeId, new NodeHeartbeatSample(timestamp, receiveTime));
   }
 
   @Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
index fe1af63..60ea48b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
@@ -18,14 +18,11 @@
  */
 package org.apache.iotdb.confignode.client.async.handlers.heartbeat;
 
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
-import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
 import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
 import org.apache.iotdb.tsfile.utils.Pair;
 
@@ -35,26 +32,25 @@
 
 public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatResp> {
 
-  // Update DataNodeHeartbeatCache when success
-  private final TDataNodeLocation dataNodeLocation;
-  private final DataNodeHeartbeatCache dataNodeHeartbeatCache;
-  private final Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap;
+  private final int nodeId;
+
+  private final LoadCache loadCache;
   private final RouteBalancer routeBalancer;
+
   private final Map<Integer, Long> deviceNum;
   private final Map<Integer, Long> timeSeriesNum;
   private final Map<Integer, Long> regionDisk;
 
   public DataNodeHeartbeatHandler(
-      TDataNodeLocation dataNodeLocation,
-      DataNodeHeartbeatCache dataNodeHeartbeatCache,
-      Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap,
+      int nodeId,
+      LoadCache loadCache,
       RouteBalancer routeBalancer,
       Map<Integer, Long> deviceNum,
       Map<Integer, Long> timeSeriesNum,
       Map<Integer, Long> regionDisk) {
-    this.dataNodeLocation = dataNodeLocation;
-    this.dataNodeHeartbeatCache = dataNodeHeartbeatCache;
-    this.regionGroupCacheMap = regionGroupCacheMap;
+
+    this.nodeId = nodeId;
+    this.loadCache = loadCache;
     this.routeBalancer = routeBalancer;
     this.deviceNum = deviceNum;
     this.timeSeriesNum = timeSeriesNum;
@@ -66,31 +62,30 @@
     long receiveTime = System.currentTimeMillis();
 
     // Update NodeCache
-    dataNodeHeartbeatCache.cacheHeartbeatSample(
-        new NodeHeartbeatSample(heartbeatResp, receiveTime));
+    loadCache.cacheDataNodeHeartbeatSample(
+        nodeId, new NodeHeartbeatSample(heartbeatResp, receiveTime));
 
-    // Update RegionGroupCache And leaderCache
     heartbeatResp
         .getJudgedLeaders()
         .forEach(
             (regionGroupId, isLeader) -> {
-              regionGroupCacheMap
-                  .computeIfAbsent(regionGroupId, empty -> new RegionGroupCache(regionGroupId))
-                  .cacheHeartbeatSample(
-                      dataNodeLocation.getDataNodeId(),
-                      new RegionHeartbeatSample(
-                          heartbeatResp.getHeartbeatTimestamp(),
-                          receiveTime,
-                          // Region will inherit DataNode's status
-                          RegionStatus.parse(heartbeatResp.getStatus())));
+              // Update RegionGroupCache
+              loadCache.cacheRegionHeartbeatSample(
+                  regionGroupId,
+                  nodeId,
+                  new RegionHeartbeatSample(
+                      heartbeatResp.getHeartbeatTimestamp(),
+                      receiveTime,
+                      // Region will inherit DataNode's status
+                      RegionStatus.parse(heartbeatResp.getStatus())));
 
               if (isLeader) {
+                // Update leaderCache
                 routeBalancer.cacheLeaderSample(
-                    regionGroupId,
-                    new Pair<>(
-                        heartbeatResp.getHeartbeatTimestamp(), dataNodeLocation.getDataNodeId()));
+                    regionGroupId, new Pair<>(heartbeatResp.getHeartbeatTimestamp(), nodeId));
               }
             });
+
     if (heartbeatResp.getDeviceNum() != null) {
       deviceNum.putAll(heartbeatResp.getDeviceNum());
     }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index 9e93061..f6b3f25 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -204,15 +204,13 @@
           newLeaderId,
           currentNodeTEndPoint);
 
-      // Always initiate all kinds of HeartbeatCache first
-      configManager.getLoadManager().initHeartbeatCache();
+      // Always start load services first
+      configManager.getLoadManager().startLoadServices();
 
       // Start leader scheduling services
       configManager.getProcedureManager().shiftExecutor(true);
-      configManager.getLoadManager().startLoadStatisticsService();
       configManager.getLoadManager().getRouteBalancer().startRouteBalancingService();
       configManager.getRetryFailedTasksThread().startRetryFailedTasksService();
-      configManager.getNodeManager().startHeartbeatService();
       configManager.getPartitionManager().startRegionCleaner();
 
       // we do cq recovery async for two reasons:
@@ -229,11 +227,10 @@
           newLeaderId);
 
       // Stop leader scheduling services
+      configManager.getLoadManager().stopLoadServices();
       configManager.getProcedureManager().shiftExecutor(false);
-      configManager.getLoadManager().stopLoadStatisticsService();
       configManager.getLoadManager().getRouteBalancer().stopRouteBalancingService();
       configManager.getRetryFailedTasksThread().stopRetryFailedTasksService();
-      configManager.getNodeManager().stopHeartbeatService();
       configManager.getPartitionManager().stopRegionCleaner();
       configManager.getCQManager().stopCQScheduler();
     }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 69cf4a4..c6ac131 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -82,10 +82,10 @@
 import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
 import org.apache.iotdb.confignode.manager.cq.CQManager;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
 import org.apache.iotdb.confignode.manager.node.ClusterNodeStartUtils;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.confignode.manager.node.NodeMetrics;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
 import org.apache.iotdb.confignode.manager.pipe.PipeManager;
@@ -387,10 +387,11 @@
     TSStatus status = confirmLeader();
     if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       // Force updating the target DataNode's status to Unknown
-      getNodeManager()
-          .getNodeCacheMap()
-          .get(dataNodeLocation.getDataNodeId())
-          .forceUpdate(NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown));
+      getLoadManager()
+          .forceUpdateNodeCache(
+              NodeType.DataNode,
+              dataNodeLocation.getDataNodeId(),
+              NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown));
       LOGGER.info(
           "[ShutdownHook] The DataNode-{} will be shutdown soon, mark it as Unknown",
           dataNodeLocation.getDataNodeId());
@@ -431,12 +432,7 @@
               .map(TDataNodeConfiguration::getLocation)
               .sorted(Comparator.comparingInt(TDataNodeLocation::getDataNodeId))
               .collect(Collectors.toList());
-      Map<Integer, String> nodeStatus = new HashMap<>();
-      getNodeManager()
-          .getNodeCacheMap()
-          .forEach(
-              (nodeId, heartbeatCache) ->
-                  nodeStatus.put(nodeId, heartbeatCache.getNodeStatusWithReason()));
+      Map<Integer, String> nodeStatus = getLoadManager().getNodeStatusWithReason();
       return new TShowClusterResp(status, configNodeLocations, dataNodeInfoLocations, nodeStatus);
     } else {
       return new TShowClusterResp(status, new ArrayList<>(), new ArrayList<>(), new HashMap<>());
@@ -1131,10 +1127,11 @@
     TSStatus status = confirmLeader();
     if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       // Force updating the target ConfigNode's status to Unknown
-      getNodeManager()
-          .getNodeCacheMap()
-          .get(configNodeLocation.getConfigNodeId())
-          .forceUpdate(NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown));
+      getLoadManager()
+          .forceUpdateNodeCache(
+              NodeType.ConfigNode,
+              configNodeLocation.getConfigNodeId(),
+              NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown));
       LOGGER.info(
           "[ShutdownHook] The ConfigNode-{} will be shutdown soon, mark it as Unknown",
           configNodeLocation.getConfigNodeId());
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 6b03eea..93d73b5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -40,7 +40,6 @@
 import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
 import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache;
 import org.apache.iotdb.confignode.persistence.ProcedureInfo;
 import org.apache.iotdb.confignode.procedure.Procedure;
 import org.apache.iotdb.confignode.procedure.ProcedureExecutor;
@@ -93,7 +92,6 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -346,13 +344,33 @@
   }
 
   public TSStatus migrateRegion(TMigrateRegionReq migrateRegionReq) {
-    // TODO: Whether to guarantee the check high consistency, i.e, use consensus read to check
-    Map<TConsensusGroupId, RegionGroupCache> regionReplicaMap =
-        configManager.getPartitionManager().getRegionGroupCacheMap();
-    Optional<TConsensusGroupId> regionId =
-        regionReplicaMap.keySet().stream()
-            .filter(id -> id.getId() == migrateRegionReq.getRegionId())
-            .findAny();
+    TConsensusGroupId regionGroupId;
+    if (configManager
+        .getPartitionManager()
+        .isRegionGroupExists(
+            new TConsensusGroupId(
+                TConsensusGroupType.SchemaRegion, migrateRegionReq.getRegionId()))) {
+      regionGroupId =
+          new TConsensusGroupId(TConsensusGroupType.SchemaRegion, migrateRegionReq.getRegionId());
+    } else if (configManager
+        .getPartitionManager()
+        .isRegionGroupExists(
+            new TConsensusGroupId(
+                TConsensusGroupType.DataRegion, migrateRegionReq.getRegionId()))) {
+      regionGroupId =
+          new TConsensusGroupId(TConsensusGroupType.DataRegion, migrateRegionReq.getRegionId());
+    } else {
+      LOGGER.warn(
+          "Submit RegionMigrateProcedure failed, because RegionGroup: {} doesn't exist",
+          migrateRegionReq.getRegionId());
+      TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+      status.setMessage(
+          String.format(
+              "Submit RegionMigrateProcedure failed, because RegionGroup: %s doesn't exist",
+              migrateRegionReq.getRegionId()));
+      return status;
+    }
+
     TDataNodeLocation originalDataNode =
         configManager
             .getNodeManager()
@@ -363,18 +381,7 @@
             .getNodeManager()
             .getRegisteredDataNode(migrateRegionReq.getToId())
             .getLocation();
-    if (!regionId.isPresent()) {
-      LOGGER.warn(
-          "Submit RegionMigrateProcedure failed, because no Region {}",
-          migrateRegionReq.getRegionId());
-      TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
-      status.setMessage(
-          "Submit RegionMigrateProcedure failed, because no region Group "
-              + migrateRegionReq.getRegionId());
-      return status;
-    }
-    Set<Integer> dataNodesInRegion =
-        regionReplicaMap.get(regionId.get()).getStatistics().getRegionStatisticsMap().keySet();
+
     if (originalDataNode == null) {
       LOGGER.warn(
           "Submit RegionMigrateProcedure failed, because no original DataNode {}",
@@ -393,7 +400,9 @@
           "Submit RegionMigrateProcedure failed, because no target DataNode "
               + migrateRegionReq.getToId());
       return status;
-    } else if (!dataNodesInRegion.contains(migrateRegionReq.getFromId())) {
+    } else if (configManager.getPartitionManager()
+        .getAllReplicaSets(originalDataNode.getDataNodeId()).stream()
+        .noneMatch(replicaSet -> replicaSet.getRegionId().equals(regionGroupId))) {
       LOGGER.warn(
           "Submit RegionMigrateProcedure failed, because the original DataNode {} doesn't contain Region {}",
           migrateRegionReq.getFromId(),
@@ -405,7 +414,9 @@
               + " doesn't contain Region "
               + migrateRegionReq.getRegionId());
       return status;
-    } else if (dataNodesInRegion.contains(migrateRegionReq.getToId())) {
+    } else if (configManager.getPartitionManager().getAllReplicaSets(destDataNode.getDataNodeId())
+        .stream()
+        .anyMatch(replicaSet -> replicaSet.getRegionId().equals(regionGroupId))) {
       LOGGER.warn(
           "Submit RegionMigrateProcedure failed, because the target DataNode {} already contains Region {}",
           migrateRegionReq.getToId(),
@@ -425,10 +436,8 @@
             .map(TDataNodeConfiguration::getLocation)
             .map(TDataNodeLocation::getDataNodeId)
             .collect(Collectors.toSet());
-    if (configManager
-        .getNodeManager()
-        .getNodeStatusByNodeId(migrateRegionReq.getFromId())
-        .equals(NodeStatus.Unknown)) {
+    if (NodeStatus.Unknown.equals(
+        configManager.getLoadManager().getNodeStatus(migrateRegionReq.getFromId()))) {
       LOGGER.warn(
           "Submit RegionMigrateProcedure failed, because the sourceDataNode {} is Unknown.",
           migrateRegionReq.getFromId());
@@ -439,18 +448,8 @@
               + " is Unknown.");
       return status;
     }
-    dataNodesInRegion.retainAll(aliveDataNodes);
-    if (dataNodesInRegion.isEmpty()) {
-      LOGGER.warn(
-          "Submit RegionMigrateProcedure failed, because all of the DataNodes in Region Group {} is unavailable.",
-          migrateRegionReq.getRegionId());
-      TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
-      status.setMessage(
-          "Submit RegionMigrateProcedure failed, because all of the DataNodes in Region Group "
-              + migrateRegionReq.getRegionId()
-              + " are unavailable.");
-      return status;
-    } else if (!aliveDataNodes.contains(migrateRegionReq.getToId())) {
+
+    if (!aliveDataNodes.contains(migrateRegionReq.getToId())) {
       LOGGER.warn(
           "Submit RegionMigrateProcedure failed, because the destDataNode {} is ReadOnly or Unknown.",
           migrateRegionReq.getToId());
@@ -462,7 +461,7 @@
       return status;
     }
     this.executor.submitProcedure(
-        new RegionMigrateProcedure(regionId.get(), originalDataNode, destDataNode));
+        new RegionMigrateProcedure(regionGroupId, originalDataNode, destDataNode));
     LOGGER.info(
         "Submit RegionMigrateProcedure successfully, Region: {}, From: {}, To: {}",
         migrateRegionReq.getRegionId(),
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java
index c9ffdd6..021ef30 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java
@@ -25,8 +25,8 @@
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
-import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
@@ -46,12 +46,15 @@
  */
 public class RetryFailedTasksThread {
 
+  // TODO: Replace this class by cluster events
+
   private static final Logger LOGGER = LoggerFactory.getLogger(RetryFailedTasksThread.class);
 
   private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
   private static final long HEARTBEAT_INTERVAL = CONF.getHeartbeatIntervalInMs();
   private final IManager configManager;
   private final NodeManager nodeManager;
+  private final LoadManager loadManager;
   private final ScheduledExecutorService retryFailTasksExecutor =
       IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-RetryFailedTasks-Service");
   private final Object scheduleMonitor = new Object();
@@ -63,6 +66,7 @@
   public RetryFailedTasksThread(IManager configManager) {
     this.configManager = configManager;
     this.nodeManager = configManager.getNodeManager();
+    this.loadManager = configManager.getLoadManager();
     this.oldUnknownNodes = new HashSet<>();
   }
 
@@ -113,15 +117,12 @@
         .forEach(
             DataNodeConfiguration -> {
               TDataNodeLocation dataNodeLocation = DataNodeConfiguration.getLocation();
-              BaseNodeCache newestNodeInformation =
-                  nodeManager.getNodeCacheMap().get(dataNodeLocation.dataNodeId);
-              if (newestNodeInformation != null) {
-                if (newestNodeInformation.getNodeStatus() == NodeStatus.Running) {
-                  oldUnknownNodes.remove(dataNodeLocation);
-                } else if (!oldUnknownNodes.contains(dataNodeLocation)
-                    && newestNodeInformation.getNodeStatus() == NodeStatus.Unknown) {
-                  newUnknownNodes.add(dataNodeLocation);
-                }
+              NodeStatus nodeStatus = loadManager.getNodeStatus(dataNodeLocation.getDataNodeId());
+              if (nodeStatus == NodeStatus.Running) {
+                oldUnknownNodes.remove(dataNodeLocation);
+              } else if (!oldUnknownNodes.contains(dataNodeLocation)
+                  && nodeStatus == NodeStatus.Unknown) {
+                newUnknownNodes.add(dataNodeLocation);
               }
             });
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 932f403..2aaa730 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -16,24 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.confignode.manager.load;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.cluster.NodeType;
+import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.commons.partition.DataPartitionTable;
 import org.apache.iotdb.commons.partition.SchemaPartitionTable;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
-import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
-import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
 import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
 import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException;
@@ -42,31 +36,21 @@
 import org.apache.iotdb.confignode.manager.load.balancer.PartitionBalancer;
 import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
 import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
-import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap;
-import org.apache.iotdb.confignode.manager.node.NodeManager;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeStatistics;
-import org.apache.iotdb.confignode.manager.observer.NodeStatisticsEvent;
-import org.apache.iotdb.confignode.manager.partition.PartitionManager;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupStatistics;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.service.HeartbeatService;
+import org.apache.iotdb.confignode.manager.load.service.StatisticsService;
+import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
 import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
-import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
-import org.apache.iotdb.tsfile.utils.Pair;
 
 import com.google.common.eventbus.AsyncEventBus;
 import com.google.common.eventbus.EventBus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
 
 /**
  * The LoadManager at ConfigNodeGroup-Leader is active. It proactively implements the cluster
@@ -74,25 +58,19 @@
  */
 public class LoadManager {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(LoadManager.class);
-
-  private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
-  private static final long HEARTBEAT_INTERVAL = CONF.getHeartbeatIntervalInMs();
-
   private final IManager configManager;
 
-  /** Balancers */
+  /** Balancers. */
   private final RegionBalancer regionBalancer;
 
   private final PartitionBalancer partitionBalancer;
   private final RouteBalancer routeBalancer;
 
-  /** Load statistics executor service */
-  private Future<?> currentLoadStatisticsFuture;
+  /** Cluster load services. */
+  private final LoadCache loadCache;
 
-  private final ScheduledExecutorService loadStatisticsExecutor =
-      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-LoadStatistics-Service");
-  private final Object scheduleMonitor = new Object();
+  private final HeartbeatService heartbeatService;
+  private final StatisticsService statisticsService;
 
   private final EventBus eventBus =
       new AsyncEventBus("LoadManager-EventBus", Executors.newFixedThreadPool(5));
@@ -104,11 +82,16 @@
     this.partitionBalancer = new PartitionBalancer(configManager);
     this.routeBalancer = new RouteBalancer(configManager);
 
+    this.loadCache = new LoadCache();
+    this.heartbeatService = new HeartbeatService(configManager, loadCache);
+    this.statisticsService =
+        new StatisticsService(configManager, routeBalancer, loadCache, eventBus);
+
     eventBus.register(configManager.getClusterSchemaManager());
   }
 
   /**
-   * Generate an optimal CreateRegionGroupsPlan
+   * Generate an optimal CreateRegionGroupsPlan.
    *
    * @param allotmentMap Map<StorageGroupName, Region allotment>
    * @param consensusGroupType TConsensusGroupType of RegionGroup to be allocated
@@ -123,7 +106,7 @@
   }
 
   /**
-   * Allocate SchemaPartitions
+   * Allocate SchemaPartitions.
    *
    * @param unassignedSchemaPartitionSlotsMap SchemaPartitionSlots that should be assigned
    * @return Map<StorageGroupName, SchemaPartitionTable>, the allocating result
@@ -135,7 +118,7 @@
   }
 
   /**
-   * Allocate DataPartitions
+   * Allocate DataPartitions.
    *
    * @param unassignedDataPartitionSlotsMap DataPartitionSlots that should be assigned
    * @return Map<StorageGroupName, DataPartitionTable>, the allocating result
@@ -182,182 +165,197 @@
     return routeBalancer.getLatestRegionPriorityMap();
   }
 
-  /** Start the load statistics service */
-  public void startLoadStatisticsService() {
-    synchronized (scheduleMonitor) {
-      if (currentLoadStatisticsFuture == null) {
-        currentLoadStatisticsFuture =
-            ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
-                loadStatisticsExecutor,
-                this::updateLoadStatistics,
-                0,
-                HEARTBEAT_INTERVAL,
-                TimeUnit.MILLISECONDS);
-        LOGGER.info("LoadStatistics service is started successfully.");
-      }
-    }
-  }
-
-  /** Stop the load statistics service */
-  public void stopLoadStatisticsService() {
-    synchronized (scheduleMonitor) {
-      if (currentLoadStatisticsFuture != null) {
-        currentLoadStatisticsFuture.cancel(false);
-        currentLoadStatisticsFuture = null;
-        LOGGER.info("LoadStatistics service is stopped successfully.");
-      }
-    }
-  }
-
-  private void updateLoadStatistics() {
-    // Broadcast the RegionRouteMap if some LoadStatistics has changed
-    boolean isNeedBroadcast = false;
-
-    // Update NodeStatistics:
-    // Pair<NodeStatistics, NodeStatistics>:left one means the current NodeStatistics, right one
-    // means the previous NodeStatistics
-    Map<Integer, Pair<NodeStatistics, NodeStatistics>> differentNodeStatisticsMap =
-        new ConcurrentHashMap<>();
-    getNodeManager()
-        .getNodeCacheMap()
-        .forEach(
-            (nodeId, nodeCache) -> {
-              NodeStatistics preNodeStatistics = nodeCache.getPreviousStatistics().deepCopy();
-              if (nodeCache.periodicUpdate()) {
-                // Update and record the changed NodeStatistics
-                differentNodeStatisticsMap.put(
-                    nodeId, new Pair<>(nodeCache.getStatistics(), preNodeStatistics));
-              }
-            });
-    if (!differentNodeStatisticsMap.isEmpty()) {
-      isNeedBroadcast = true;
-      recordNodeStatistics(differentNodeStatisticsMap);
-      eventBus.post(new NodeStatisticsEvent(differentNodeStatisticsMap));
-    }
-
-    // Update RegionGroupStatistics
-    Map<TConsensusGroupId, RegionGroupStatistics> differentRegionGroupStatisticsMap =
-        new ConcurrentHashMap<>();
-    getPartitionManager()
-        .getRegionGroupCacheMap()
-        .forEach(
-            (regionGroupId, regionGroupCache) -> {
-              if (regionGroupCache.periodicUpdate()) {
-                // Update and record the changed RegionGroupStatistics
-                differentRegionGroupStatisticsMap.put(
-                    regionGroupId, regionGroupCache.getStatistics());
-              }
-            });
-    if (!differentRegionGroupStatisticsMap.isEmpty()) {
-      isNeedBroadcast = true;
-      recordRegionGroupStatistics(differentRegionGroupStatisticsMap);
-    }
-
-    // Update RegionRouteMap
-    if (routeBalancer.updateRegionRouteMap()) {
-      isNeedBroadcast = true;
-      recordRegionRouteMap(routeBalancer.getRegionRouteMap());
-    }
-
-    if (isNeedBroadcast) {
-      broadcastLatestRegionRouteMap();
-    }
-  }
-
-  private void recordNodeStatistics(
-      Map<Integer, Pair<NodeStatistics, NodeStatistics>> differentNodeStatisticsMap) {
-    LOGGER.info("[UpdateLoadStatistics] NodeStatisticsMap: ");
-    for (Map.Entry<Integer, Pair<NodeStatistics, NodeStatistics>> nodeCacheEntry :
-        differentNodeStatisticsMap.entrySet()) {
-      LOGGER.info(
-          "[UpdateLoadStatistics]\t {}={}",
-          "nodeId{" + nodeCacheEntry.getKey() + "}",
-          nodeCacheEntry.getValue().left);
-    }
-  }
-
-  private void recordRegionGroupStatistics(
-      Map<TConsensusGroupId, RegionGroupStatistics> differentRegionGroupStatisticsMap) {
-    LOGGER.info("[UpdateLoadStatistics] RegionGroupStatisticsMap: ");
-    for (Map.Entry<TConsensusGroupId, RegionGroupStatistics> regionGroupStatisticsEntry :
-        differentRegionGroupStatisticsMap.entrySet()) {
-      LOGGER.info("[UpdateLoadStatistics]\t RegionGroup: {}", regionGroupStatisticsEntry.getKey());
-      LOGGER.info("[UpdateLoadStatistics]\t {}", regionGroupStatisticsEntry.getValue());
-      for (Map.Entry<Integer, RegionStatistics> regionStatisticsEntry :
-          regionGroupStatisticsEntry.getValue().getRegionStatisticsMap().entrySet()) {
-        LOGGER.info(
-            "[UpdateLoadStatistics]\t dataNodeId{}={}",
-            regionStatisticsEntry.getKey(),
-            regionStatisticsEntry.getValue());
-      }
-    }
-  }
-
-  private void recordRegionRouteMap(RegionRouteMap regionRouteMap) {
-    LOGGER.info("[UpdateLoadStatistics] RegionLeaderMap: ");
-    for (Map.Entry<TConsensusGroupId, Integer> regionLeaderEntry :
-        regionRouteMap.getRegionLeaderMap().entrySet()) {
-      LOGGER.info(
-          "[UpdateLoadStatistics]\t {}={}",
-          regionLeaderEntry.getKey(),
-          regionLeaderEntry.getValue());
-    }
-
-    LOGGER.info("[UpdateLoadStatistics] RegionPriorityMap: ");
-    for (Map.Entry<TConsensusGroupId, TRegionReplicaSet> regionPriorityEntry :
-        regionRouteMap.getRegionPriorityMap().entrySet()) {
-      LOGGER.info(
-          "[UpdateLoadStatistics]\t {}={}",
-          regionPriorityEntry.getKey(),
-          regionPriorityEntry.getValue().getDataNodeLocations().stream()
-              .map(TDataNodeLocation::getDataNodeId)
-              .collect(Collectors.toList()));
-    }
-  }
-
   public void broadcastLatestRegionRouteMap() {
-    Map<TConsensusGroupId, TRegionReplicaSet> latestRegionRouteMap = getLatestRegionRouteMap();
-    Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
-    // Broadcast the RegionRouteMap to all DataNodes except the unknown ones
-    getNodeManager()
-        .filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.Removing, NodeStatus.ReadOnly)
-        .forEach(
-            onlineDataNode ->
-                dataNodeLocationMap.put(
-                    onlineDataNode.getLocation().getDataNodeId(), onlineDataNode.getLocation()));
-
-    LOGGER.info("[UpdateLoadStatistics] Begin to broadcast RegionRouteMap:");
-    long broadcastTime = System.currentTimeMillis();
-
-    AsyncClientHandler<TRegionRouteReq, TSStatus> clientHandler =
-        new AsyncClientHandler<>(
-            DataNodeRequestType.UPDATE_REGION_ROUTE_MAP,
-            new TRegionRouteReq(broadcastTime, latestRegionRouteMap),
-            dataNodeLocationMap);
-    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
-    LOGGER.info("[UpdateLoadStatistics] Broadcast the latest RegionRouteMap finished.");
+    statisticsService.broadcastLatestRegionRouteMap();
   }
 
-  /** Initialize all kinds of the HeartbeatCache when the ConfigNode-Leader is switched */
-  public void initHeartbeatCache() {
-    getNodeManager().initNodeHeartbeatCache();
-    getPartitionManager().initRegionGroupHeartbeatCache();
+  public void startLoadServices() {
+    loadCache.initHeartbeatCache(configManager);
     routeBalancer.initRegionRouteMap();
+    heartbeatService.startHeartbeatService();
+    statisticsService.startLoadStatisticsService();
+  }
+
+  public void stopLoadServices() {
+    heartbeatService.stopHeartbeatService();
+    statisticsService.stopLoadStatisticsService();
+    loadCache.clearHeartbeatCache();
   }
 
   public RouteBalancer getRouteBalancer() {
     return routeBalancer;
   }
 
-  private NodeManager getNodeManager() {
-    return configManager.getNodeManager();
+  /**
+   * Safely get NodeStatus by NodeId.
+   *
+   * @param nodeId The specified NodeId
+   * @return NodeStatus of the specified Node. Unknown if cache doesn't exist.
+   */
+  public NodeStatus getNodeStatus(int nodeId) {
+    return loadCache.getNodeStatus(nodeId);
   }
 
-  private PartitionManager getPartitionManager() {
-    return configManager.getPartitionManager();
+  /**
+   * Safely get the specified Node's current status with reason.
+   *
+   * @param nodeId The specified NodeId
+   * @return The specified Node's current status if the nodeCache contains it, Unknown otherwise
+   */
+  public String getNodeStatusWithReason(int nodeId) {
+    return loadCache.getNodeStatusWithReason(nodeId);
   }
 
-  public EventBus getEventBus() {
-    return eventBus;
+  /**
+   * Get all Node's current status with reason.
+   *
+   * @return Map<NodeId, NodeStatus with reason>
+   */
+  public Map<Integer, String> getNodeStatusWithReason() {
+    return loadCache.getNodeStatusWithReason();
+  }
+
+  /**
+   * Filter ConfigNodes through the specified NodeStatus.
+   *
+   * @param status The specified NodeStatus
+   * @return Filtered ConfigNodes with the specified NodeStatus
+   */
+  public List<Integer> filterConfigNodeThroughStatus(NodeStatus... status) {
+    return loadCache.filterConfigNodeThroughStatus(status);
+  }
+
+  /**
+   * Filter DataNodes through the specified NodeStatus.
+   *
+   * @param status The specified NodeStatus
+   * @return Filtered DataNodes with the specified NodeStatus
+   */
+  public List<Integer> filterDataNodeThroughStatus(NodeStatus... status) {
+    return loadCache.filterDataNodeThroughStatus(status);
+  }
+
+  /**
+   * Get the free disk space of the specified DataNode.
+   *
+   * @param dataNodeId The index of the specified DataNode
+   * @return The free disk space that sample through heartbeat, 0 if no heartbeat received
+   */
+  public double getFreeDiskSpace(int dataNodeId) {
+    return loadCache.getFreeDiskSpace(dataNodeId);
+  }
+
+  /**
+   * Get the loadScore of each DataNode.
+   *
+   * @return Map<DataNodeId, loadScore>
+   */
+  public Map<Integer, Long> getAllDataNodeLoadScores() {
+    return loadCache.getAllDataNodeLoadScores();
+  }
+
+  /**
+   * Get the lowest loadScore DataNode.
+   *
+   * @return The index of the lowest loadScore DataNode. -1 if no DataNode heartbeat received.
+   */
+  public int getLowestLoadDataNode() {
+    return loadCache.getLowestLoadDataNode();
+  }
+
+  /**
+   * Get the lowest loadScore DataNode from the specified DataNodes.
+   *
+   * @param dataNodeIds The specified DataNodes
+   * @return The index of the lowest loadScore DataNode. -1 if no DataNode heartbeat received.
+   */
+  public int getLowestLoadDataNode(List<Integer> dataNodeIds) {
+    return loadCache.getLowestLoadDataNode(dataNodeIds);
+  }
+
+  /**
+   * Force update the specified Node's cache.
+   *
+   * @param nodeType Specified NodeType
+   * @param nodeId Specified NodeId
+   * @param heartbeatSample Specified NodeHeartbeatSample
+   */
+  public void forceUpdateNodeCache(
+      NodeType nodeType, int nodeId, NodeHeartbeatSample heartbeatSample) {
+    loadCache.forceUpdateNodeCache(nodeType, nodeId, heartbeatSample);
+  }
+
+  /** Remove the specified Node's cache. */
+  public void removeNodeCache(int nodeId) {
+    loadCache.removeNodeCache(nodeId);
+  }
+
+  /**
+   * Safely get RegionStatus.
+   *
+   * @param consensusGroupId Specified RegionGroupId
+   * @param dataNodeId Specified RegionReplicaId
+   * @return Corresponding RegionStatus if cache exists, Unknown otherwise
+   */
+  public RegionStatus getRegionStatus(TConsensusGroupId consensusGroupId, int dataNodeId) {
+    return loadCache.getRegionStatus(consensusGroupId, dataNodeId);
+  }
+
+  /**
+   * Safely get RegionGroupStatus.
+   *
+   * @param consensusGroupId Specified RegionGroupId
+   * @return Corresponding RegionGroupStatus if cache exists, Disabled otherwise
+   */
+  public RegionGroupStatus getRegionGroupStatus(TConsensusGroupId consensusGroupId) {
+    return loadCache.getRegionGroupStatus(consensusGroupId);
+  }
+
+  /**
+   * Safely get RegionGroupStatus.
+   *
+   * @param consensusGroupIds Specified RegionGroupIds
+   * @return Corresponding RegionGroupStatus if cache exists, Disabled otherwise
+   */
+  public Map<TConsensusGroupId, RegionGroupStatus> getRegionGroupStatus(
+      List<TConsensusGroupId> consensusGroupIds) {
+    return loadCache.getRegionGroupStatus(consensusGroupIds);
+  }
+
+  /**
+   * Filter the RegionGroups through the RegionGroupStatus.
+   *
+   * @param status The specified RegionGroupStatus
+   * @return Filtered RegionGroups with the specified RegionGroupStatus
+   */
+  public List<TConsensusGroupId> filterRegionGroupThroughStatus(RegionGroupStatus... status) {
+    return loadCache.filterRegionGroupThroughStatus(status);
+  }
+
+  /**
+   * Count the number of cluster Regions with specified RegionStatus.
+   *
+   * @param type The specified RegionGroupType
+   * @param status The specified statues
+   * @return The number of cluster Regions with specified RegionStatus
+   */
+  public int countRegionWithSpecifiedStatus(TConsensusGroupType type, RegionStatus... status) {
+    return loadCache.countRegionWithSpecifiedStatus(type, status);
+  }
+
+  /**
+   * Force update the specified RegionGroup's cache.
+   *
+   * @param regionGroupId Specified RegionGroupId
+   * @param heartbeatSampleMap Specified RegionHeartbeatSampleMap
+   */
+  public void forceUpdateRegionGroupCache(
+      TConsensusGroupId regionGroupId, Map<Integer, RegionHeartbeatSample> heartbeatSampleMap) {
+    loadCache.forceUpdateRegionGroupCache(regionGroupId, heartbeatSampleMap);
+  }
+
+  /** Remove the specified RegionGroup's cache. */
+  public void removeRegionGroupCache(TConsensusGroupId consensusGroupId) {
+    loadCache.removeRegionGroupCache(consensusGroupId);
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
index 882a5ee..f6d1524 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
@@ -29,6 +29,7 @@
 import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
 import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
 import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.manager.load.balancer.region.CopySetRegionGroupAllocator;
 import org.apache.iotdb.confignode.manager.load.balancer.region.GreedyRegionGroupAllocator;
 import org.apache.iotdb.confignode.manager.load.balancer.region.IRegionGroupAllocator;
@@ -111,7 +112,7 @@
             dataNodeConfiguration -> {
               int dataNodeId = dataNodeConfiguration.getLocation().getDataNodeId();
               availableDataNodeMap.put(dataNodeId, dataNodeConfiguration);
-              freeDiskSpaceMap.put(dataNodeId, getNodeManager().getFreeDiskSpace(dataNodeId));
+              freeDiskSpaceMap.put(dataNodeId, getLoadManager().getFreeDiskSpace(dataNodeId));
             });
 
         // Generate allocation plan
@@ -145,6 +146,10 @@
     return configManager.getPartitionManager();
   }
 
+  private LoadManager getLoadManager() {
+    return configManager.getLoadManager();
+  }
+
   public enum RegionGroupAllocatePolicy {
     COPY_SET,
     GREEDY
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index 31f2839..a87cb05 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -33,6 +33,7 @@
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap;
 import org.apache.iotdb.confignode.manager.load.balancer.router.leader.GreedyLeaderBalancer;
 import org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer;
@@ -91,14 +92,14 @@
   private static final boolean IS_DATA_REGION_IOT_CONSENSUS =
       ConsensusFactory.IOT_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS);
 
-  private final IManager configManager;
-
   // Key: RegionGroupId
   // Value: Pair<Timestamp, LeaderDataNodeId>, where
   // the left value stands for sampling timestamp
   // and the right value stands for the index of DataNode that leader resides.
   private final Map<TConsensusGroupId, Pair<Long, Integer>> leaderCache;
 
+  private final IManager configManager;
+
   /** RegionRouteMap */
   private final RegionRouteMap regionRouteMap;
   // For generating optimal RegionLeaderMap
@@ -107,6 +108,7 @@
   private final IPriorityBalancer priorityRouter;
 
   /** Leader Balancing service */
+  // TODO: leader balancing should be triggered by cluster events
   private Future<?> currentLeaderBalancingFuture;
 
   private final ScheduledExecutorService leaderBalancingExecutor =
@@ -115,9 +117,8 @@
 
   public RouteBalancer(IManager configManager) {
     this.configManager = configManager;
-
-    this.leaderCache = new ConcurrentHashMap<>();
     this.regionRouteMap = new RegionRouteMap();
+    this.leaderCache = new ConcurrentHashMap<>();
 
     switch (CONF.getLeaderDistributionPolicy()) {
       case ILeaderBalancer.GREEDY_POLICY:
@@ -141,10 +142,10 @@
   }
 
   /**
-   * Cache the newest leaderHeartbeatSample
+   * Cache the latest leader of a RegionGroup.
    *
-   * @param regionGroupId Corresponding RegionGroup's index
-   * @param leaderSample <Sample timestamp, leaderDataNodeId>, The newest HeartbeatSample
+   * @param regionGroupId the id of the RegionGroup
+   * @param leaderSample the latest leader of a RegionGroup
    */
   public void cacheLeaderSample(TConsensusGroupId regionGroupId, Pair<Long, Integer> leaderSample) {
     if (TConsensusGroupType.DataRegion.equals(regionGroupId.getType())
@@ -193,7 +194,7 @@
 
   private boolean updateRegionPriorityMap() {
     Map<TConsensusGroupId, Integer> regionLeaderMap = regionRouteMap.getRegionLeaderMap();
-    Map<Integer, Long> dataNodeLoadScoreMap = getNodeManager().getAllLoadScores();
+    Map<Integer, Long> dataNodeLoadScoreMap = getLoadManager().getAllDataNodeLoadScores();
 
     // Balancing region priority in each SchemaRegionGroup
     Map<TConsensusGroupId, TRegionReplicaSet> latestRegionPriorityMap =
@@ -412,4 +413,8 @@
   private PartitionManager getPartitionManager() {
     return configManager.getPartitionManager();
   }
+
+  private LoadManager getLoadManager() {
+    return configManager.getLoadManager();
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
new file mode 100644
index 0000000..0b2fd71
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.cache;
+
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.NodeType;
+import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.load.cache.node.BaseNodeCache;
+import org.apache.iotdb.confignode.manager.load.cache.node.ConfigNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.cache.node.DataNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupCache;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/** Maintain all kinds of heartbeat samples. */
+public class LoadCache {
+
+  // Map<NodeId, INodeCache>
+  private final Map<Integer, BaseNodeCache> nodeCacheMap;
+  // Map<RegionGroupId, RegionGroupCache>
+  private final Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap;
+
+  public LoadCache() {
+    this.nodeCacheMap = new ConcurrentHashMap<>();
+    this.regionGroupCacheMap = new ConcurrentHashMap<>();
+  }
+
+  public void initHeartbeatCache(IManager configManager) {
+    initNodeHeartbeatCache(
+        configManager.getNodeManager().getRegisteredConfigNodes(),
+        configManager.getNodeManager().getRegisteredDataNodes());
+    initRegionGroupHeartbeatCache(configManager.getPartitionManager().getAllReplicaSets());
+  }
+
+  /** Initialize the nodeCacheMap when the ConfigNode-Leader is switched. */
+  private void initNodeHeartbeatCache(
+      List<TConfigNodeLocation> registeredConfigNodes,
+      List<TDataNodeConfiguration> registeredDataNodes) {
+
+    final int CURRENT_NODE_ID = ConfigNodeHeartbeatCache.CURRENT_NODE_ID;
+    nodeCacheMap.clear();
+
+    // Init ConfigNodeHeartbeatCache
+    registeredConfigNodes.forEach(
+        configNodeLocation -> {
+          int configNodeId = configNodeLocation.getConfigNodeId();
+          if (configNodeId != CURRENT_NODE_ID) {
+            nodeCacheMap.put(configNodeId, new ConfigNodeHeartbeatCache(configNodeId));
+          }
+        });
+    // Force set itself and never update
+    nodeCacheMap.put(
+        ConfigNodeHeartbeatCache.CURRENT_NODE_ID,
+        new ConfigNodeHeartbeatCache(
+            CURRENT_NODE_ID, ConfigNodeHeartbeatCache.CURRENT_NODE_STATISTICS));
+
+    // Init DataNodeHeartbeatCache
+    registeredDataNodes.forEach(
+        dataNodeConfiguration -> {
+          int dataNodeId = dataNodeConfiguration.getLocation().getDataNodeId();
+          nodeCacheMap.put(dataNodeId, new DataNodeHeartbeatCache(dataNodeId));
+        });
+  }
+
+  /** Initialize the regionGroupCacheMap when the ConfigNode-Leader is switched. */
+  private void initRegionGroupHeartbeatCache(List<TRegionReplicaSet> regionReplicaSets) {
+    regionGroupCacheMap.clear();
+    regionReplicaSets.forEach(
+        regionReplicaSet ->
+            regionGroupCacheMap.put(
+                regionReplicaSet.getRegionId(),
+                new RegionGroupCache(regionReplicaSet.getRegionId())));
+  }
+
+  public void clearHeartbeatCache() {
+    nodeCacheMap.clear();
+    regionGroupCacheMap.clear();
+  }
+
+  /**
+   * Cache the latest heartbeat sample of a ConfigNode.
+   *
+   * @param nodeId the id of the ConfigNode
+   * @param sample the latest heartbeat sample
+   */
+  public void cacheConfigNodeHeartbeatSample(int nodeId, NodeHeartbeatSample sample) {
+    nodeCacheMap
+        .computeIfAbsent(nodeId, empty -> new ConfigNodeHeartbeatCache(nodeId))
+        .cacheHeartbeatSample(sample);
+  }
+
+  /**
+   * Cache the latest heartbeat sample of a DataNode.
+   *
+   * @param nodeId the id of the DataNode
+   * @param sample the latest heartbeat sample
+   */
+  public void cacheDataNodeHeartbeatSample(int nodeId, NodeHeartbeatSample sample) {
+    nodeCacheMap
+        .computeIfAbsent(nodeId, empty -> new DataNodeHeartbeatCache(nodeId))
+        .cacheHeartbeatSample(sample);
+  }
+
+  /**
+   * Cache the latest heartbeat sample of a RegionGroup.
+   *
+   * @param regionGroupId the id of the RegionGroup
+   * @param nodeId the id of the DataNode where specified Region resides
+   * @param sample the latest heartbeat sample
+   */
+  public void cacheRegionHeartbeatSample(
+      TConsensusGroupId regionGroupId, int nodeId, RegionHeartbeatSample sample) {
+    regionGroupCacheMap
+        .computeIfAbsent(regionGroupId, empty -> new RegionGroupCache(regionGroupId))
+        .cacheHeartbeatSample(nodeId, sample);
+  }
+
+  /**
+   * Periodic invoke to update the NodeStatistics of all Nodes.
+   *
+   * @return a map of changed NodeStatistics
+   */
+  public Map<Integer, Pair<NodeStatistics, NodeStatistics>> updateNodeStatistics() {
+    Map<Integer, Pair<NodeStatistics, NodeStatistics>> differentNodeStatisticsMap =
+        new ConcurrentHashMap<>();
+    nodeCacheMap.forEach(
+        (nodeId, nodeCache) -> {
+          NodeStatistics preNodeStatistics = nodeCache.getPreviousStatistics().deepCopy();
+          if (nodeCache.periodicUpdate()) {
+            // Update and record the changed NodeStatistics
+            differentNodeStatisticsMap.put(
+                nodeId, new Pair<>(nodeCache.getStatistics(), preNodeStatistics));
+          }
+        });
+    return differentNodeStatisticsMap;
+  }
+
+  /**
+   * Periodic invoke to update the RegionGroupStatistics of all RegionGroups.
+   *
+   * @return a map of changed RegionGroupStatistics
+   */
+  public Map<TConsensusGroupId, RegionGroupStatistics> updateRegionGroupStatistics() {
+    Map<TConsensusGroupId, RegionGroupStatistics> differentRegionGroupStatisticsMap =
+        new ConcurrentHashMap<>();
+    regionGroupCacheMap.forEach(
+        (regionGroupId, regionGroupCache) -> {
+          if (regionGroupCache.periodicUpdate()) {
+            // Update and record the changed RegionGroupStatistics
+            differentRegionGroupStatisticsMap.put(regionGroupId, regionGroupCache.getStatistics());
+          }
+        });
+    return differentRegionGroupStatisticsMap;
+  }
+
+  /**
+   * Safely get NodeStatus by NodeId.
+   *
+   * @param nodeId The specified NodeId
+   * @return NodeStatus of the specified Node. Unknown if cache doesn't exist.
+   */
+  public NodeStatus getNodeStatus(int nodeId) {
+    BaseNodeCache nodeCache = nodeCacheMap.get(nodeId);
+    return nodeCache == null ? NodeStatus.Unknown : nodeCache.getNodeStatus();
+  }
+
+  /**
+   * Safely get the specified Node's current status with reason.
+   *
+   * @param nodeId The specified NodeId
+   * @return The specified Node's current status if the nodeCache contains it, Unknown otherwise
+   */
+  public String getNodeStatusWithReason(int nodeId) {
+    BaseNodeCache nodeCache = nodeCacheMap.get(nodeId);
+    return nodeCache == null
+        ? NodeStatus.Unknown.getStatus() + "(NoHeartbeat)"
+        : nodeCache.getNodeStatusWithReason();
+  }
+
+  /**
+   * Get all Node's current status with reason.
+   *
+   * @return Map<NodeId, NodeStatus with reason>
+   */
+  public Map<Integer, String> getNodeStatusWithReason() {
+    return nodeCacheMap.entrySet().stream()
+        .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getNodeStatusWithReason()));
+  }
+
+  /**
+   * Filter ConfigNodes through the specified NodeStatus.
+   *
+   * @param status The specified NodeStatus
+   * @return Filtered ConfigNodes with the specified NodeStatus
+   */
+  public List<Integer> filterConfigNodeThroughStatus(NodeStatus... status) {
+    return nodeCacheMap.entrySet().stream()
+        .filter(
+            nodeCacheEntry ->
+                nodeCacheEntry.getValue() instanceof ConfigNodeHeartbeatCache
+                    && Arrays.stream(status)
+                        .anyMatch(s -> s.equals(nodeCacheEntry.getValue().getNodeStatus())))
+        .map(Map.Entry::getKey)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Filter DataNodes through the specified NodeStatus.
+   *
+   * @param status The specified NodeStatus
+   * @return Filtered DataNodes with the specified NodeStatus
+   */
+  public List<Integer> filterDataNodeThroughStatus(NodeStatus... status) {
+    return nodeCacheMap.entrySet().stream()
+        .filter(
+            nodeCacheEntry ->
+                nodeCacheEntry.getValue() instanceof DataNodeHeartbeatCache
+                    && Arrays.stream(status)
+                        .anyMatch(s -> s.equals(nodeCacheEntry.getValue().getNodeStatus())))
+        .map(Map.Entry::getKey)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Get the free disk space of the specified DataNode.
+   *
+   * @param dataNodeId The index of the specified DataNode
+   * @return The free disk space that sample through heartbeat, 0 if no heartbeat received
+   */
+  public double getFreeDiskSpace(int dataNodeId) {
+    DataNodeHeartbeatCache dataNodeHeartbeatCache =
+        (DataNodeHeartbeatCache) nodeCacheMap.get(dataNodeId);
+    return dataNodeHeartbeatCache == null ? 0d : dataNodeHeartbeatCache.getFreeDiskSpace();
+  }
+
+  /**
+   * Get the loadScore of each DataNode.
+   *
+   * @return Map<DataNodeId, loadScore>
+   */
+  public Map<Integer, Long> getAllDataNodeLoadScores() {
+    Map<Integer, Long> result = new ConcurrentHashMap<>();
+    nodeCacheMap.forEach(
+        (dataNodeId, heartbeatCache) -> {
+          if (heartbeatCache instanceof DataNodeHeartbeatCache) {
+            result.put(dataNodeId, heartbeatCache.getLoadScore());
+          }
+        });
+    return result;
+  }
+
+  /**
+   * Get the lowest loadScore DataNode.
+   *
+   * @return The index of the lowest loadScore DataNode. -1 if no DataNode heartbeat received.
+   */
+  public int getLowestLoadDataNode() {
+    return nodeCacheMap.entrySet().stream()
+        .filter(nodeCacheEntry -> nodeCacheEntry.getValue() instanceof DataNodeHeartbeatCache)
+        .min(Comparator.comparingLong(nodeCacheEntry -> nodeCacheEntry.getValue().getLoadScore()))
+        .map(Map.Entry::getKey)
+        .orElse(-1);
+  }
+
+  /**
+   * Get the lowest loadScore DataNode from the specified DataNodes.
+   *
+   * @param dataNodeIds The specified DataNodes
+   * @return The index of the lowest loadScore DataNode. -1 if no DataNode heartbeat received.
+   */
+  public int getLowestLoadDataNode(List<Integer> dataNodeIds) {
+    return dataNodeIds.stream()
+        .map(nodeCacheMap::get)
+        .filter(Objects::nonNull)
+        .min(Comparator.comparingLong(BaseNodeCache::getLoadScore))
+        .map(BaseNodeCache::getNodeId)
+        .orElse(-1);
+  }
+
+  /**
+   * Force update the specified Node's cache.
+   *
+   * @param nodeType Specified NodeType
+   * @param nodeId Specified NodeId
+   * @param heartbeatSample Specified NodeHeartbeatSample
+   */
+  public void forceUpdateNodeCache(
+      NodeType nodeType, int nodeId, NodeHeartbeatSample heartbeatSample) {
+    switch (nodeType) {
+      case ConfigNode:
+        nodeCacheMap
+            .computeIfAbsent(nodeId, empty -> new ConfigNodeHeartbeatCache(nodeId))
+            .forceUpdate(heartbeatSample);
+        break;
+      case DataNode:
+      default:
+        nodeCacheMap
+            .computeIfAbsent(nodeId, empty -> new DataNodeHeartbeatCache(nodeId))
+            .forceUpdate(heartbeatSample);
+        break;
+    }
+  }
+
+  /** Remove the specified Node's cache. */
+  public void removeNodeCache(int nodeId) {
+    nodeCacheMap.remove(nodeId);
+  }
+
+  /**
+   * Safely get RegionStatus.
+   *
+   * @param consensusGroupId Specified RegionGroupId
+   * @param dataNodeId Specified RegionReplicaId
+   * @return Corresponding RegionStatus if cache exists, Unknown otherwise
+   */
+  public RegionStatus getRegionStatus(TConsensusGroupId consensusGroupId, int dataNodeId) {
+    return regionGroupCacheMap.containsKey(consensusGroupId)
+        ? regionGroupCacheMap.get(consensusGroupId).getStatistics().getRegionStatus(dataNodeId)
+        : RegionStatus.Unknown;
+  }
+
+  /**
+   * Safely get RegionGroupStatus.
+   *
+   * @param consensusGroupId Specified RegionGroupId
+   * @return Corresponding RegionGroupStatus if cache exists, Disabled otherwise
+   */
+  public RegionGroupStatus getRegionGroupStatus(TConsensusGroupId consensusGroupId) {
+    return regionGroupCacheMap.containsKey(consensusGroupId)
+        ? regionGroupCacheMap.get(consensusGroupId).getStatistics().getRegionGroupStatus()
+        : RegionGroupStatus.Disabled;
+  }
+
+  /**
+   * Safely get RegionGroupStatus.
+   *
+   * @param consensusGroupIds Specified RegionGroupIds
+   * @return Corresponding RegionGroupStatus if cache exists, Disabled otherwise
+   */
+  public Map<TConsensusGroupId, RegionGroupStatus> getRegionGroupStatus(
+      List<TConsensusGroupId> consensusGroupIds) {
+    Map<TConsensusGroupId, RegionGroupStatus> regionGroupStatusMap = new ConcurrentHashMap<>();
+    for (TConsensusGroupId consensusGroupId : consensusGroupIds) {
+      regionGroupStatusMap.put(consensusGroupId, getRegionGroupStatus(consensusGroupId));
+    }
+    return regionGroupStatusMap;
+  }
+
+  /**
+   * Filter the RegionGroups through the RegionGroupStatus.
+   *
+   * @param status The specified RegionGroupStatus
+   * @return Filtered RegionGroups with the specified RegionGroupStatus
+   */
+  public List<TConsensusGroupId> filterRegionGroupThroughStatus(RegionGroupStatus... status) {
+    return regionGroupCacheMap.entrySet().stream()
+        .filter(
+            regionGroupCacheEntry ->
+                Arrays.stream(status)
+                    .anyMatch(
+                        s ->
+                            s.equals(
+                                regionGroupCacheEntry
+                                    .getValue()
+                                    .getStatistics()
+                                    .getRegionGroupStatus())))
+        .map(Map.Entry::getKey)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Count the number of cluster Regions with specified RegionStatus.
+   *
+   * @param type The specified RegionGroupType
+   * @param status The specified statues
+   * @return The number of cluster Regions with specified RegionStatus
+   */
+  public int countRegionWithSpecifiedStatus(TConsensusGroupType type, RegionStatus... status) {
+    AtomicInteger result = new AtomicInteger(0);
+    regionGroupCacheMap.forEach(
+        (regionGroupId, regionGroupCache) -> {
+          if (type.equals(regionGroupId.getType())) {
+            regionGroupCache
+                .getStatistics()
+                .getRegionStatisticsMap()
+                .values()
+                .forEach(
+                    regionStatistics -> {
+                      if (Arrays.stream(status)
+                          .anyMatch(s -> s.equals(regionStatistics.getRegionStatus()))) {
+                        result.getAndIncrement();
+                      }
+                    });
+          }
+        });
+    return result.get();
+  }
+
+  /**
+   * Force update the specified RegionGroup's cache.
+   *
+   * @param regionGroupId Specified RegionGroupId
+   * @param heartbeatSampleMap Specified RegionHeartbeatSampleMap
+   */
+  public void forceUpdateRegionGroupCache(
+      TConsensusGroupId regionGroupId, Map<Integer, RegionHeartbeatSample> heartbeatSampleMap) {
+    regionGroupCacheMap
+        .computeIfAbsent(regionGroupId, empty -> new RegionGroupCache(regionGroupId))
+        .forceUpdate(heartbeatSampleMap);
+  }
+
+  /** Remove the specified RegionGroup's cache. */
+  public void removeRegionGroupCache(TConsensusGroupId consensusGroupId) {
+    regionGroupCacheMap.remove(consensusGroupId);
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/BaseNodeCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/BaseNodeCache.java
similarity index 74%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/BaseNodeCache.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/BaseNodeCache.java
index 24f9a9e..65a1ccc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/BaseNodeCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/BaseNodeCache.java
@@ -16,13 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.node.heartbeat;
+
+package org.apache.iotdb.confignode.manager.load.cache.node;
 
 import org.apache.iotdb.commons.cluster.NodeStatus;
 
 import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicReference;
 
-/** All the statistic interfaces that provided by HeartbeatCache */
+/** All the statistic interfaces that provided by HeartbeatCache. */
 public abstract class BaseNodeCache {
 
   // When the response time of heartbeat is more than 20s, the Node is considered as down
@@ -31,23 +33,26 @@
   // Max heartbeat cache samples store size
   public static final int MAXIMUM_WINDOW_SIZE = 100;
 
+  protected final int nodeId;
+
   // SlidingWindow stores the heartbeat sample data
   protected final LinkedList<NodeHeartbeatSample> slidingWindow = new LinkedList<>();
 
   // The previous NodeStatistics, used for comparing with
   // the current NodeStatistics to initiate notification when they are different
-  protected volatile NodeStatistics previousStatistics;
+  protected AtomicReference<NodeStatistics> previousStatistics;
   // The current NodeStatistics, used for providing statistics to other services
-  protected volatile NodeStatistics currentStatistics;
+  protected AtomicReference<NodeStatistics> currentStatistics;
 
-  /** Constructor for NodeCache with default NodeStatistics */
-  protected BaseNodeCache() {
-    this.previousStatistics = NodeStatistics.generateDefaultNodeStatistics();
-    this.currentStatistics = NodeStatistics.generateDefaultNodeStatistics();
+  /** Constructor for NodeCache with default NodeStatistics. */
+  protected BaseNodeCache(int nodeId) {
+    this.nodeId = nodeId;
+    this.previousStatistics = new AtomicReference<>(NodeStatistics.generateDefaultNodeStatistics());
+    this.currentStatistics = new AtomicReference<>(NodeStatistics.generateDefaultNodeStatistics());
   }
 
   /**
-   * Cache the newest NodeHeartbeatSample
+   * Cache the newest NodeHeartbeatSample.
    *
    * @param newHeartbeatSample The newest NodeHeartbeatSample
    */
@@ -69,15 +74,15 @@
   /**
    * Invoking periodically in the Cluster-LoadStatistics-Service to update currentStatistics and
    * compare with the previousStatistics, in order to detect whether the Node's statistics has
-   * changed
+   * changed.
    *
    * @return True if the currentStatistics has changed recently(compare with the
    *     previousStatistics), false otherwise
    */
   public boolean periodicUpdate() {
     updateCurrentStatistics();
-    if (!currentStatistics.equals(previousStatistics)) {
-      previousStatistics = currentStatistics.deepCopy();
+    if (!currentStatistics.get().equals(previousStatistics.get())) {
+      previousStatistics.set(currentStatistics.get());
       return true;
     } else {
       return false;
@@ -103,42 +108,42 @@
   }
 
   /**
-   * Update currentStatistics based on recent NodeHeartbeatSamples that cached in the slidingWindow
+   * Update currentStatistics based on recent NodeHeartbeatSamples that cached in the slidingWindow.
    */
   protected abstract void updateCurrentStatistics();
 
+  public int getNodeId() {
+    return nodeId;
+  }
+
   /**
    * TODO: The loadScore of each Node will be changed to Double
    *
    * @return The latest load score of a node, the higher the score the higher the load
    */
   public long getLoadScore() {
-    return currentStatistics.getLoadScore();
+    return currentStatistics.get().getLoadScore();
   }
 
-  /** @return The current status of the Node */
+  /** @return The current status of the Node. */
   public NodeStatus getNodeStatus() {
     // Return a copy of status
-    return NodeStatus.parse(currentStatistics.getStatus().getStatus());
+    return NodeStatus.parse(currentStatistics.get().getStatus().getStatus());
   }
 
-  /** @return The reason why lead to current NodeStatus */
+  /** @return The reason why lead to current NodeStatus. */
   public String getNodeStatusWithReason() {
-    if (currentStatistics.getStatusReason() == null) {
-      return currentStatistics.getStatus().getStatus();
-    } else {
-      return currentStatistics.getStatus().getStatus()
-          + "("
-          + currentStatistics.getStatusReason()
-          + ")";
-    }
+    NodeStatistics statistics = this.currentStatistics.get();
+    return statistics.getStatusReason() == null
+        ? statistics.getStatus().getStatus()
+        : statistics.getStatus().getStatus() + "(" + statistics.getStatusReason() + ")";
   }
 
   public NodeStatistics getStatistics() {
-    return currentStatistics;
+    return currentStatistics.get();
   }
 
   public NodeStatistics getPreviousStatistics() {
-    return previousStatistics;
+    return previousStatistics.get();
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/ConfigNodeHeartbeatCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
similarity index 80%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/ConfigNodeHeartbeatCache.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
index 17fc9b1..4e7ee07 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/ConfigNodeHeartbeatCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
@@ -16,40 +16,39 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.node.heartbeat;
+
+package org.apache.iotdb.confignode.manager.load.cache.node;
 
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 
+import java.util.concurrent.atomic.AtomicReference;
+
 public class ConfigNodeHeartbeatCache extends BaseNodeCache {
 
-  /** Only get CURRENT_NODE_ID here due to initialization order */
+  /** Only get CURRENT_NODE_ID here due to initialization order. */
   public static final int CURRENT_NODE_ID =
       ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId();
 
   public static final NodeStatistics CURRENT_NODE_STATISTICS =
       new NodeStatistics(0, NodeStatus.Running, null);
 
-  private final int configNodeId;
-
-  /** Constructor for create ConfigNodeHeartbeatCache with default NodeStatistics */
+  /** Constructor for create ConfigNodeHeartbeatCache with default NodeStatistics. */
   public ConfigNodeHeartbeatCache(int configNodeId) {
-    super();
-    this.configNodeId = configNodeId;
+    super(configNodeId);
   }
 
-  /** Constructor only for ConfigNode-leader */
+  /** Constructor only for ConfigNode-leader. */
   public ConfigNodeHeartbeatCache(int configNodeId, NodeStatistics statistics) {
-    super();
-    this.configNodeId = configNodeId;
-    this.previousStatistics = statistics;
-    this.currentStatistics = statistics;
+    super(configNodeId);
+    this.previousStatistics = new AtomicReference<>(statistics);
+    this.currentStatistics = new AtomicReference<>(statistics);
   }
 
   @Override
   protected void updateCurrentStatistics() {
     // Skip itself
-    if (configNodeId == CURRENT_NODE_ID) {
+    if (nodeId == CURRENT_NODE_ID) {
       return;
     }
 
@@ -76,9 +75,9 @@
     long loadScore = NodeStatus.isNormalStatus(status) ? 0 : Long.MAX_VALUE;
 
     NodeStatistics newStatistics = new NodeStatistics(loadScore, status, null);
-    if (!currentStatistics.equals(newStatistics)) {
+    if (!currentStatistics.get().equals(newStatistics)) {
       // Update the current NodeStatistics if necessary
-      currentStatistics = newStatistics;
+      currentStatistics.set(newStatistics);
     }
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/DataNodeHeartbeatCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
similarity index 80%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/DataNodeHeartbeatCache.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
index 5754d27..c53f6dd 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/DataNodeHeartbeatCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
@@ -16,20 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.node.heartbeat;
+
+package org.apache.iotdb.confignode.manager.load.cache.node;
 
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.mpp.rpc.thrift.TLoadSample;
 
-/** DataNodeHeartbeatCache caches and maintains all the heartbeat data */
+import java.util.concurrent.atomic.AtomicReference;
+
+/** DataNodeHeartbeatCache caches and maintains all the heartbeat data. */
 public class DataNodeHeartbeatCache extends BaseNodeCache {
 
-  private volatile TLoadSample latestLoadSample;
+  private final AtomicReference<TLoadSample> latestLoadSample;
 
-  /** Constructor for create DataNodeHeartbeatCache with default NodeStatistics */
-  public DataNodeHeartbeatCache() {
-    super();
-    this.latestLoadSample = new TLoadSample();
+  /** Constructor for create DataNodeHeartbeatCache with default NodeStatistics. */
+  public DataNodeHeartbeatCache(int dataNodeId) {
+    super(dataNodeId);
+    this.latestLoadSample = new AtomicReference<>(new TLoadSample());
   }
 
   @Override
@@ -44,7 +47,7 @@
 
     /* Update load sample */
     if (lastSample != null && lastSample.isSetLoadSample()) {
-      latestLoadSample = lastSample.getLoadSample();
+      latestLoadSample.set(lastSample.getLoadSample());
     }
 
     /* Update Node status */
@@ -64,13 +67,13 @@
     long loadScore = NodeStatus.isNormalStatus(status) ? 0 : Long.MAX_VALUE;
 
     NodeStatistics newStatistics = new NodeStatistics(loadScore, status, statusReason);
-    if (!currentStatistics.equals(newStatistics)) {
+    if (!currentStatistics.get().equals(newStatistics)) {
       // Update the current NodeStatistics if necessary
-      currentStatistics = newStatistics;
+      currentStatistics.set(newStatistics);
     }
   }
 
   public double getFreeDiskSpace() {
-    return latestLoadSample.getFreeDiskSpace();
+    return latestLoadSample.get().getFreeDiskSpace();
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeHeartbeatSample.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java
similarity index 94%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeHeartbeatSample.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java
index dceff72..c188978 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeHeartbeatSample.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java
@@ -16,7 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.node.heartbeat;
+
+package org.apache.iotdb.confignode.manager.load.cache.node;
 
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
@@ -33,7 +34,7 @@
 
   private TLoadSample loadSample = null;
 
-  /** Constructor for ConfigNode sample */
+  /** Constructor for ConfigNode sample. */
   public NodeHeartbeatSample(long sendTimestamp, long receiveTimestamp) {
     this.sendTimestamp = sendTimestamp;
     this.receiveTimestamp = receiveTimestamp;
@@ -41,7 +42,7 @@
     this.statusReason = null;
   }
 
-  /** Constructor for DataNode sample */
+  /** Constructor for DataNode sample. */
   public NodeHeartbeatSample(THeartbeatResp heartbeatResp, long receiveTimestamp) {
     this.sendTimestamp = heartbeatResp.getHeartbeatTimestamp();
     this.receiveTimestamp = receiveTimestamp;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeStatistics.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeStatistics.java
similarity index 95%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeStatistics.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeStatistics.java
index 627ac00..10399b8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeStatistics.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeStatistics.java
@@ -16,7 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.node.heartbeat;
+
+package org.apache.iotdb.confignode.manager.load.cache.node;
 
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
@@ -111,8 +112,12 @@
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
     NodeStatistics that = (NodeStatistics) o;
     return loadScore == that.loadScore
         && status == that.status
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
similarity index 89%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionCache.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
index 706b40e..a395c10 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
@@ -16,7 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.partition.heartbeat;
+
+package org.apache.iotdb.confignode.manager.load.cache.region;
 
 import org.apache.iotdb.commons.cluster.RegionStatus;
 
@@ -24,8 +25,8 @@
 import java.util.LinkedList;
 import java.util.List;
 
-import static org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache.HEARTBEAT_TIMEOUT_TIME;
-import static org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache.MAXIMUM_WINDOW_SIZE;
+import static org.apache.iotdb.confignode.manager.load.cache.node.BaseNodeCache.HEARTBEAT_TIMEOUT_TIME;
+import static org.apache.iotdb.confignode.manager.load.cache.node.BaseNodeCache.MAXIMUM_WINDOW_SIZE;
 
 public class RegionCache {
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
similarity index 86%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupCache.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
index 7e49067..a531e44 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
@@ -16,7 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.partition.heartbeat;
+
+package org.apache.iotdb.confignode.manager.load.cache.region;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.commons.cluster.RegionStatus;
@@ -25,6 +26,7 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class RegionGroupCache {
 
@@ -35,21 +37,23 @@
 
   // The previous RegionGroupStatistics, used for comparing with
   // the current RegionGroupStatistics to initiate notification when they are different
-  protected volatile RegionGroupStatistics previousStatistics;
+  protected AtomicReference<RegionGroupStatistics> previousStatistics;
   // The current RegionGroupStatistics, used for providing statistics to other services
-  private volatile RegionGroupStatistics currentStatistics;
+  private final AtomicReference<RegionGroupStatistics> currentStatistics;
 
-  /** Constructor for create RegionGroupCache with default RegionGroupStatistics */
+  /** Constructor for create RegionGroupCache with default RegionGroupStatistics. */
   public RegionGroupCache(TConsensusGroupId consensusGroupId) {
     this.consensusGroupId = consensusGroupId;
     this.regionCacheMap = new ConcurrentHashMap<>();
 
-    this.previousStatistics = RegionGroupStatistics.generateDefaultRegionGroupStatistics();
-    this.currentStatistics = RegionGroupStatistics.generateDefaultRegionGroupStatistics();
+    this.previousStatistics =
+        new AtomicReference<>(RegionGroupStatistics.generateDefaultRegionGroupStatistics());
+    this.currentStatistics =
+        new AtomicReference<>(RegionGroupStatistics.generateDefaultRegionGroupStatistics());
   }
 
   /**
-   * Cache the newest RegionHeartbeatSample
+   * Cache the newest RegionHeartbeatSample.
    *
    * @param dataNodeId Where the specified Region resides
    * @param newHeartbeatSample The newest RegionHeartbeatSample
@@ -63,15 +67,15 @@
   /**
    * Invoking periodically in the Cluster-LoadStatistics-Service to update currentStatistics and
    * compare with the previousStatistics, in order to detect whether the RegionGroup's statistics
-   * has changed
+   * has changed.
    *
    * @return True if the currentStatistics has changed recently(compare with the
    *     previousStatistics), false otherwise
    */
   public boolean periodicUpdate() {
     updateCurrentStatistics();
-    if (!currentStatistics.equals(previousStatistics)) {
-      previousStatistics = currentStatistics.deepCopy();
+    if (!currentStatistics.get().equals(previousStatistics.get())) {
+      previousStatistics.set(currentStatistics.get());
       return true;
     } else {
       return false;
@@ -99,7 +103,7 @@
   }
 
   /**
-   * Update currentStatistics based on recent NodeHeartbeatSamples that cached in the slidingWindow
+   * Update currentStatistics based on recent NodeHeartbeatSamples that cached in the slidingWindow.
    */
   protected void updateCurrentStatistics() {
     Map<Integer, RegionStatistics> regionStatisticsMap = new HashMap<>();
@@ -114,9 +118,9 @@
 
     RegionGroupStatistics newRegionGroupStatistics =
         new RegionGroupStatistics(status, regionStatisticsMap);
-    if (!currentStatistics.equals(newRegionGroupStatistics)) {
+    if (!currentStatistics.get().equals(newRegionGroupStatistics)) {
       // Update RegionGroupStatistics if necessary
-      currentStatistics = newRegionGroupStatistics;
+      currentStatistics.set(newRegionGroupStatistics);
     }
   }
 
@@ -156,11 +160,7 @@
     }
   }
 
-  public void removeCacheIfExists(int dataNodeId) {
-    regionCacheMap.remove(dataNodeId);
-  }
-
   public RegionGroupStatistics getStatistics() {
-    return currentStatistics;
+    return currentStatistics.get();
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupStatistics.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupStatistics.java
similarity index 94%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupStatistics.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupStatistics.java
index d36175b..8dc7a4c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupStatistics.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupStatistics.java
@@ -16,7 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.partition.heartbeat;
+
+package org.apache.iotdb.confignode.manager.load.cache.region;
 
 import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
@@ -32,8 +33,7 @@
 
 public class RegionGroupStatistics {
 
-  private RegionGroupStatus regionGroupStatus;
-
+  private volatile RegionGroupStatus regionGroupStatus;
   private final Map<Integer, RegionStatistics> regionStatisticsMap;
 
   public RegionGroupStatistics() {
@@ -51,7 +51,7 @@
   }
 
   /**
-   * Get the specified Region's status
+   * Get the specified Region's status.
    *
    * @param dataNodeId Where the Region resides
    * @return Region's latest status if received heartbeat recently, Unknown otherwise
@@ -116,8 +116,12 @@
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
     RegionGroupStatistics that = (RegionGroupStatistics) o;
     return regionGroupStatus == that.regionGroupStatus
         && regionStatisticsMap.equals(that.regionStatisticsMap);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionHeartbeatSample.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionHeartbeatSample.java
similarity index 95%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionHeartbeatSample.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionHeartbeatSample.java
index 8de58a5..e55497b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionHeartbeatSample.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionHeartbeatSample.java
@@ -16,7 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.partition.heartbeat;
+
+package org.apache.iotdb.confignode.manager.load.cache.region;
 
 import org.apache.iotdb.commons.cluster.RegionStatus;
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionStatistics.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionStatistics.java
similarity index 86%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionStatistics.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionStatistics.java
index d30bf43..c8d5106 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionStatistics.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionStatistics.java
@@ -16,7 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.partition.heartbeat;
+
+package org.apache.iotdb.confignode.manager.load.cache.region;
 
 import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -47,11 +48,6 @@
     return new RegionStatistics(regionStatus);
   }
 
-  public RegionHeartbeatSample convertToRegionHeartbeatSample() {
-    long currentTime = System.currentTimeMillis();
-    return new RegionHeartbeatSample(currentTime, currentTime, regionStatus);
-  }
-
   public void serialize(OutputStream stream) throws IOException {
     ReadWriteIOUtils.write(regionStatus.getStatus(), stream);
   }
@@ -68,8 +64,12 @@
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
     RegionStatistics that = (RegionStatistics) o;
     return regionStatus == that.regionStatus;
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
new file mode 100644
index 0000000..18c3915
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.service;
+
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.confignode.client.async.AsyncConfigNodeHeartbeatClientPool;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeHeartbeatClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.heartbeat.ConfigNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.client.async.handlers.heartbeat.DataNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
+import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
+import org.apache.iotdb.confignode.manager.load.cache.node.ConfigNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.node.NodeManager;
+import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Maintain the Cluster-Heartbeat-Service. */
+public class HeartbeatService {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatService.class);
+
+  private static final long HEARTBEAT_INTERVAL =
+      ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs();
+
+  private final IManager configManager;
+  private final LoadCache loadCache;
+
+  /** Heartbeat executor service. */
+  // Monitor for leadership change
+  private final Object heartbeatScheduleMonitor = new Object();
+
+  private Future<?> currentHeartbeatFuture;
+  private final ScheduledExecutorService heartBeatExecutor =
+      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-Heartbeat-Service");
+  private final AtomicInteger heartbeatCounter = new AtomicInteger(0);
+
+  public HeartbeatService(IManager configManager, LoadCache loadCache) {
+    this.configManager = configManager;
+    this.loadCache = loadCache;
+  }
+
+  /** Start the heartbeat service. */
+  public void startHeartbeatService() {
+    synchronized (heartbeatScheduleMonitor) {
+      if (currentHeartbeatFuture == null) {
+        currentHeartbeatFuture =
+            ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+                heartBeatExecutor,
+                this::heartbeatLoopBody,
+                0,
+                HEARTBEAT_INTERVAL,
+                TimeUnit.MILLISECONDS);
+        LOGGER.info("Heartbeat service is started successfully.");
+      }
+    }
+  }
+
+  /** Stop the heartbeat service. */
+  public void stopHeartbeatService() {
+    synchronized (heartbeatScheduleMonitor) {
+      if (currentHeartbeatFuture != null) {
+        currentHeartbeatFuture.cancel(false);
+        currentHeartbeatFuture = null;
+        LOGGER.info("Heartbeat service is stopped successfully.");
+      }
+    }
+  }
+
+  /** loop body of the heartbeat thread. */
+  private void heartbeatLoopBody() {
+    // The consensusManager of configManager may not be fully initialized at this time
+    Optional.ofNullable(getConsensusManager())
+        .ifPresent(
+            consensusManager -> {
+              if (getConsensusManager().isLeader()) {
+                // Generate HeartbeatReq
+                THeartbeatReq heartbeatReq = genHeartbeatReq();
+                // Send heartbeat requests to all the registered ConfigNodes
+                pingRegisteredConfigNodes(
+                    heartbeatReq, getNodeManager().getRegisteredConfigNodes());
+                // Send heartbeat requests to all the registered DataNodes
+                pingRegisteredDataNodes(heartbeatReq, getNodeManager().getRegisteredDataNodes());
+              }
+            });
+  }
+
+  private THeartbeatReq genHeartbeatReq() {
+    /* Generate heartbeat request */
+    THeartbeatReq heartbeatReq = new THeartbeatReq();
+    heartbeatReq.setHeartbeatTimestamp(System.currentTimeMillis());
+    // Always sample RegionGroups' leadership as the Region heartbeat
+    heartbeatReq.setNeedJudgeLeader(true);
+    // We sample DataNode's load in every 10 heartbeat loop
+    heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % 10 == 0);
+
+    /* Update heartbeat counter */
+    heartbeatCounter.getAndUpdate(x -> (x + 1) % 10);
+    if (!configManager.getClusterQuotaManager().hasSpaceQuotaLimit()) {
+      heartbeatReq.setSchemaRegionIds(configManager.getClusterQuotaManager().getSchemaRegionIds());
+      heartbeatReq.setDataRegionIds(configManager.getClusterQuotaManager().getDataRegionIds());
+      heartbeatReq.setSpaceQuotaUsage(configManager.getClusterQuotaManager().getSpaceQuotaUsage());
+    }
+    return heartbeatReq;
+  }
+
+  /**
+   * Send heartbeat requests to all the Registered ConfigNodes.
+   *
+   * @param registeredConfigNodes ConfigNodes that registered in cluster
+   */
+  private void pingRegisteredConfigNodes(
+      THeartbeatReq heartbeatReq, List<TConfigNodeLocation> registeredConfigNodes) {
+    // Send heartbeat requests
+    for (TConfigNodeLocation configNodeLocation : registeredConfigNodes) {
+      if (configNodeLocation.getConfigNodeId() == ConfigNodeHeartbeatCache.CURRENT_NODE_ID) {
+        // Skip itself
+        continue;
+      }
+
+      ConfigNodeHeartbeatHandler handler =
+          new ConfigNodeHeartbeatHandler(configNodeLocation.getConfigNodeId(), loadCache);
+      AsyncConfigNodeHeartbeatClientPool.getInstance()
+          .getConfigNodeHeartBeat(
+              configNodeLocation.getInternalEndPoint(),
+              heartbeatReq.getHeartbeatTimestamp(),
+              handler);
+    }
+  }
+
+  /**
+   * Send heartbeat requests to all the Registered DataNodes.
+   *
+   * @param registeredDataNodes DataNodes that registered in cluster
+   */
+  private void pingRegisteredDataNodes(
+      THeartbeatReq heartbeatReq, List<TDataNodeConfiguration> registeredDataNodes) {
+    // Send heartbeat requests
+    for (TDataNodeConfiguration dataNodeInfo : registeredDataNodes) {
+      DataNodeHeartbeatHandler handler =
+          new DataNodeHeartbeatHandler(
+              dataNodeInfo.getLocation().getDataNodeId(),
+              loadCache,
+              configManager.getLoadManager().getRouteBalancer(),
+              configManager.getClusterQuotaManager().getDeviceNum(),
+              configManager.getClusterQuotaManager().getTimeSeriesNum(),
+              configManager.getClusterQuotaManager().getRegionDisk());
+      configManager.getClusterQuotaManager().updateSpaceQuotaUsage();
+      AsyncDataNodeHeartbeatClientPool.getInstance()
+          .getDataNodeHeartBeat(
+              dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler);
+    }
+  }
+
+  private ConsensusManager getConsensusManager() {
+    return configManager.getConsensusManager();
+  }
+
+  private NodeManager getNodeManager() {
+    return configManager.getNodeManager();
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java
new file mode 100644
index 0000000..9e51b15
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.service;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
+import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap;
+import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
+import org.apache.iotdb.confignode.manager.observer.NodeStatisticsEvent;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import com.google.common.eventbus.EventBus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class StatisticsService {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(StatisticsService.class);
+
+  private static final long HEARTBEAT_INTERVAL =
+      ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs();
+
+  private final IManager configManager;
+  private final RouteBalancer routeBalancer;
+  private final LoadCache loadCache;
+  private final EventBus eventBus;
+
+  public StatisticsService(
+      IManager configManager, RouteBalancer routeBalancer, LoadCache loadCache, EventBus eventBus) {
+    this.configManager = configManager;
+    this.routeBalancer = routeBalancer;
+    this.loadCache = loadCache;
+    this.eventBus = eventBus;
+  }
+
+  /** Load statistics executor service. */
+  private final Object statisticsScheduleMonitor = new Object();
+
+  private Future<?> currentLoadStatisticsFuture;
+  private final ScheduledExecutorService loadStatisticsExecutor =
+      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-LoadStatistics-Service");
+
+  /** Start the load statistics service. */
+  public void startLoadStatisticsService() {
+    synchronized (statisticsScheduleMonitor) {
+      if (currentLoadStatisticsFuture == null) {
+        currentLoadStatisticsFuture =
+            ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+                loadStatisticsExecutor,
+                this::updateLoadStatistics,
+                0,
+                HEARTBEAT_INTERVAL,
+                TimeUnit.MILLISECONDS);
+        LOGGER.info("LoadStatistics service is started successfully.");
+      }
+    }
+  }
+
+  /** Stop the load statistics service. */
+  public void stopLoadStatisticsService() {
+    synchronized (statisticsScheduleMonitor) {
+      if (currentLoadStatisticsFuture != null) {
+        currentLoadStatisticsFuture.cancel(false);
+        currentLoadStatisticsFuture = null;
+        LOGGER.info("LoadStatistics service is stopped successfully.");
+      }
+    }
+  }
+
+  private void updateLoadStatistics() {
+    // Broadcast the RegionRouteMap if some LoadStatistics has changed
+    boolean isNeedBroadcast = false;
+
+    // Update NodeStatistics:
+    // Pair<NodeStatistics, NodeStatistics>:left one means the current NodeStatistics, right one
+    // means the previous NodeStatistics
+    Map<Integer, Pair<NodeStatistics, NodeStatistics>> differentNodeStatisticsMap =
+        loadCache.updateNodeStatistics();
+    if (!differentNodeStatisticsMap.isEmpty()) {
+      isNeedBroadcast = true;
+      recordNodeStatistics(differentNodeStatisticsMap);
+      eventBus.post(new NodeStatisticsEvent(differentNodeStatisticsMap));
+    }
+
+    // Update RegionGroupStatistics
+    Map<TConsensusGroupId, RegionGroupStatistics> differentRegionGroupStatisticsMap =
+        loadCache.updateRegionGroupStatistics();
+    if (!differentRegionGroupStatisticsMap.isEmpty()) {
+      isNeedBroadcast = true;
+      recordRegionGroupStatistics(differentRegionGroupStatisticsMap);
+    }
+
+    // Update RegionRouteMap
+    if (routeBalancer.updateRegionRouteMap()) {
+      isNeedBroadcast = true;
+      recordRegionRouteMap(routeBalancer.getRegionRouteMap());
+    }
+
+    if (isNeedBroadcast) {
+      broadcastLatestRegionRouteMap();
+    }
+  }
+
+  private void recordNodeStatistics(
+      Map<Integer, Pair<NodeStatistics, NodeStatistics>> differentNodeStatisticsMap) {
+    LOGGER.info("[UpdateLoadStatistics] NodeStatisticsMap: ");
+    for (Map.Entry<Integer, Pair<NodeStatistics, NodeStatistics>> nodeCacheEntry :
+        differentNodeStatisticsMap.entrySet()) {
+      LOGGER.info(
+          "[UpdateLoadStatistics]\t {}={}",
+          "nodeId{" + nodeCacheEntry.getKey() + "}",
+          nodeCacheEntry.getValue().left);
+    }
+  }
+
+  private void recordRegionGroupStatistics(
+      Map<TConsensusGroupId, RegionGroupStatistics> differentRegionGroupStatisticsMap) {
+    LOGGER.info("[UpdateLoadStatistics] RegionGroupStatisticsMap: ");
+    for (Map.Entry<TConsensusGroupId, RegionGroupStatistics> regionGroupStatisticsEntry :
+        differentRegionGroupStatisticsMap.entrySet()) {
+      LOGGER.info("[UpdateLoadStatistics]\t RegionGroup: {}", regionGroupStatisticsEntry.getKey());
+      LOGGER.info("[UpdateLoadStatistics]\t {}", regionGroupStatisticsEntry.getValue());
+      for (Map.Entry<Integer, RegionStatistics> regionStatisticsEntry :
+          regionGroupStatisticsEntry.getValue().getRegionStatisticsMap().entrySet()) {
+        LOGGER.info(
+            "[UpdateLoadStatistics]\t dataNodeId{}={}",
+            regionStatisticsEntry.getKey(),
+            regionStatisticsEntry.getValue());
+      }
+    }
+  }
+
+  private void recordRegionRouteMap(RegionRouteMap regionRouteMap) {
+    LOGGER.info("[UpdateLoadStatistics] RegionLeaderMap: ");
+    for (Map.Entry<TConsensusGroupId, Integer> regionLeaderEntry :
+        regionRouteMap.getRegionLeaderMap().entrySet()) {
+      LOGGER.info(
+          "[UpdateLoadStatistics]\t {}={}",
+          regionLeaderEntry.getKey(),
+          regionLeaderEntry.getValue());
+    }
+
+    LOGGER.info("[UpdateLoadStatistics] RegionPriorityMap: ");
+    for (Map.Entry<TConsensusGroupId, TRegionReplicaSet> regionPriorityEntry :
+        regionRouteMap.getRegionPriorityMap().entrySet()) {
+      LOGGER.info(
+          "[UpdateLoadStatistics]\t {}={}",
+          regionPriorityEntry.getKey(),
+          regionPriorityEntry.getValue().getDataNodeLocations().stream()
+              .map(TDataNodeLocation::getDataNodeId)
+              .collect(Collectors.toList()));
+    }
+  }
+
+  public void broadcastLatestRegionRouteMap() {
+    Map<TConsensusGroupId, TRegionReplicaSet> latestRegionRouteMap =
+        routeBalancer.getLatestRegionPriorityMap();
+    Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
+    // Broadcast the RegionRouteMap to all DataNodes except the unknown ones
+    configManager
+        .getNodeManager()
+        .filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.Removing, NodeStatus.ReadOnly)
+        .forEach(
+            onlineDataNode ->
+                dataNodeLocationMap.put(
+                    onlineDataNode.getLocation().getDataNodeId(), onlineDataNode.getLocation()));
+
+    LOGGER.info("[UpdateLoadStatistics] Begin to broadcast RegionRouteMap:");
+    long broadcastTime = System.currentTimeMillis();
+
+    AsyncClientHandler<TRegionRouteReq, TSStatus> clientHandler =
+        new AsyncClientHandler<>(
+            DataNodeRequestType.UPDATE_REGION_ROUTE_MAP,
+            new TRegionRouteReq(broadcastTime, latestRegionRouteMap),
+            dataNodeLocationMap);
+    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    LOGGER.info("[UpdateLoadStatistics] Broadcast the latest RegionRouteMap finished.");
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index bcf1167..4cdcf6b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -27,18 +27,12 @@
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.cluster.RegionRoleType;
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.AsyncConfigNodeHeartbeatClientPool;
 import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.async.AsyncDataNodeHeartbeatClientPool;
 import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
-import org.apache.iotdb.confignode.client.async.handlers.heartbeat.ConfigNodeHeartbeatHandler;
-import org.apache.iotdb.confignode.client.async.handlers.heartbeat.DataNodeHeartbeatHandler;
 import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
@@ -52,7 +46,6 @@
 import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeConfigurationResp;
 import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeRegisterResp;
 import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
-import org.apache.iotdb.confignode.manager.ClusterQuotaManager;
 import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.manager.IManager;
@@ -60,9 +53,7 @@
 import org.apache.iotdb.confignode.manager.UDFManager;
 import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
-import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.ConfigNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.cache.node.ConfigNodeHeartbeatCache;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
 import org.apache.iotdb.confignode.manager.pipe.PipeManager;
@@ -81,7 +72,6 @@
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
-import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -89,22 +79,15 @@
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
 
 /** NodeManager manages cluster node addition and removal requests */
 public class NodeManager {
@@ -119,24 +102,10 @@
 
   private final ReentrantLock removeConfigNodeLock;
 
-  /** Heartbeat executor service */
-  // Monitor for leadership change
-  private final Object scheduleMonitor = new Object();
-  // Map<NodeId, INodeCache>
-  private final Map<Integer, BaseNodeCache> nodeCacheMap;
-  private final AtomicInteger heartbeatCounter = new AtomicInteger(0);
-  private Future<?> currentHeartbeatFuture;
-  private final ScheduledExecutorService heartBeatExecutor =
-      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-Heartbeat-Service");
-
-  private final Random random;
-
   public NodeManager(IManager configManager, NodeInfo nodeInfo) {
     this.configManager = configManager;
     this.nodeInfo = nodeInfo;
     this.removeConfigNodeLock = new ReentrantLock();
-    this.nodeCacheMap = new ConcurrentHashMap<>();
-    this.random = new Random(System.currentTimeMillis());
   }
 
   /**
@@ -391,15 +360,6 @@
   /**
    * Only leader use this interface
    *
-   * @return The number of total cpu cores in online DataNodes
-   */
-  public int getTotalCpuCoreCount() {
-    return nodeInfo.getTotalCpuCoreCount();
-  }
-
-  /**
-   * Only leader use this interface
-   *
    * @return All registered DataNodes
    */
   public List<TDataNodeConfiguration> getRegisteredDataNodes() {
@@ -440,7 +400,7 @@
             TDataNodeInfo dataNodeInfo = new TDataNodeInfo();
             int dataNodeId = registeredDataNode.getLocation().getDataNodeId();
             dataNodeInfo.setDataNodeId(dataNodeId);
-            dataNodeInfo.setStatus(getNodeStatusWithReason(dataNodeId));
+            dataNodeInfo.setStatus(getLoadManager().getNodeStatusWithReason(dataNodeId));
             dataNodeInfo.setRpcAddresss(
                 registeredDataNode.getLocation().getClientRpcEndPoint().getIp());
             dataNodeInfo.setRpcPort(
@@ -504,7 +464,7 @@
             TConfigNodeInfo info = new TConfigNodeInfo();
             int configNodeId = configNodeLocation.getConfigNodeId();
             info.setConfigNodeId(configNodeId);
-            info.setStatus(getNodeStatusWithReason(configNodeId));
+            info.setStatus(getLoadManager().getNodeStatusWithReason(configNodeId));
             info.setInternalAddress(configNodeLocation.getInternalEndPoint().getIp());
             info.setInternalPort(configNodeLocation.getInternalEndPoint().getPort());
             info.setRoleType(
@@ -689,218 +649,25 @@
     }
   }
 
-  /** Start the heartbeat service */
-  public void startHeartbeatService() {
-    synchronized (scheduleMonitor) {
-      if (currentHeartbeatFuture == null) {
-        currentHeartbeatFuture =
-            ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
-                heartBeatExecutor,
-                this::heartbeatLoopBody,
-                0,
-                HEARTBEAT_INTERVAL,
-                TimeUnit.MILLISECONDS);
-        LOGGER.info("Heartbeat service is started successfully.");
-      }
-    }
-  }
-
-  /** loop body of the heartbeat thread */
-  private void heartbeatLoopBody() {
-    // The consensusManager of configManager may not be fully initialized at this time
-    Optional.ofNullable(getConsensusManager())
-        .ifPresent(
-            consensusManager -> {
-              if (getConsensusManager().isLeader()) {
-                // Generate HeartbeatReq
-                THeartbeatReq heartbeatReq = genHeartbeatReq();
-                // Send heartbeat requests to all the registered DataNodes
-                pingRegisteredDataNodes(heartbeatReq, getRegisteredDataNodes());
-                // Send heartbeat requests to all the registered ConfigNodes
-                pingRegisteredConfigNodes(heartbeatReq, getRegisteredConfigNodes());
-              }
-            });
-  }
-
-  private THeartbeatReq genHeartbeatReq() {
-    /* Generate heartbeat request */
-    THeartbeatReq heartbeatReq = new THeartbeatReq();
-    heartbeatReq.setHeartbeatTimestamp(System.currentTimeMillis());
-    // Always sample RegionGroups' leadership as the Region heartbeat
-    heartbeatReq.setNeedJudgeLeader(true);
-    // We sample DataNode's load in every 10 heartbeat loop
-    heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % 10 == 0);
-
-    /* Update heartbeat counter */
-    heartbeatCounter.getAndUpdate((x) -> (x + 1) % 10);
-    if (!getClusterQuotaManager().hasSpaceQuotaLimit()) {
-      heartbeatReq.setSchemaRegionIds(getClusterQuotaManager().getSchemaRegionIds());
-      heartbeatReq.setDataRegionIds(getClusterQuotaManager().getDataRegionIds());
-      heartbeatReq.setSpaceQuotaUsage(getClusterQuotaManager().getSpaceQuotaUsage());
-    }
-    return heartbeatReq;
-  }
-
   /**
-   * Send heartbeat requests to all the Registered DataNodes
+   * Filter ConfigNodes through the specified NodeStatus
    *
-   * @param registeredDataNodes DataNodes that registered in cluster
-   */
-  private void pingRegisteredDataNodes(
-      THeartbeatReq heartbeatReq, List<TDataNodeConfiguration> registeredDataNodes) {
-    // Send heartbeat requests
-    for (TDataNodeConfiguration dataNodeInfo : registeredDataNodes) {
-      DataNodeHeartbeatHandler handler =
-          new DataNodeHeartbeatHandler(
-              dataNodeInfo.getLocation(),
-              (DataNodeHeartbeatCache)
-                  nodeCacheMap.computeIfAbsent(
-                      dataNodeInfo.getLocation().getDataNodeId(),
-                      empty -> new DataNodeHeartbeatCache()),
-              getPartitionManager().getRegionGroupCacheMap(),
-              getLoadManager().getRouteBalancer(),
-              getClusterQuotaManager().getDeviceNum(),
-              getClusterQuotaManager().getTimeSeriesNum(),
-              getClusterQuotaManager().getRegionDisk());
-      getClusterQuotaManager().updateSpaceQuotaUsage();
-      AsyncDataNodeHeartbeatClientPool.getInstance()
-          .getDataNodeHeartBeat(
-              dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler);
-    }
-  }
-
-  /**
-   * Send heartbeat requests to all the Registered ConfigNodes
-   *
-   * @param registeredConfigNodes ConfigNodes that registered in cluster
-   */
-  private void pingRegisteredConfigNodes(
-      THeartbeatReq heartbeatReq, List<TConfigNodeLocation> registeredConfigNodes) {
-    // Send heartbeat requests
-    for (TConfigNodeLocation configNodeLocation : registeredConfigNodes) {
-      if (configNodeLocation.getConfigNodeId() == ConfigNodeHeartbeatCache.CURRENT_NODE_ID) {
-        // Skip itself
-        continue;
-      }
-
-      ConfigNodeHeartbeatHandler handler =
-          new ConfigNodeHeartbeatHandler(
-              (ConfigNodeHeartbeatCache)
-                  nodeCacheMap.computeIfAbsent(
-                      configNodeLocation.getConfigNodeId(),
-                      empty -> new ConfigNodeHeartbeatCache(configNodeLocation.getConfigNodeId())));
-      AsyncConfigNodeHeartbeatClientPool.getInstance()
-          .getConfigNodeHeartBeat(
-              configNodeLocation.getInternalEndPoint(),
-              heartbeatReq.getHeartbeatTimestamp(),
-              handler);
-    }
-  }
-
-  /** Stop the heartbeat service */
-  public void stopHeartbeatService() {
-    synchronized (scheduleMonitor) {
-      if (currentHeartbeatFuture != null) {
-        currentHeartbeatFuture.cancel(false);
-        currentHeartbeatFuture = null;
-        nodeCacheMap.clear();
-        LOGGER.info("Heartbeat service is stopped successfully.");
-      }
-    }
-  }
-
-  public Map<Integer, BaseNodeCache> getNodeCacheMap() {
-    return nodeCacheMap;
-  }
-
-  public void removeNodeCache(int nodeId) {
-    nodeCacheMap.remove(nodeId);
-  }
-
-  /**
-   * Safely get the specific Node's current status for showing cluster
-   *
-   * @param nodeId The specific Node's index
-   * @return The specific Node's current status if the nodeCache contains it, Unknown otherwise
-   */
-  private String getNodeStatusWithReason(int nodeId) {
-    BaseNodeCache nodeCache = nodeCacheMap.get(nodeId);
-    return nodeCache == null
-        ? NodeStatus.Unknown.getStatus() + "(NoHeartbeat)"
-        : nodeCache.getNodeStatusWithReason();
-  }
-
-  /**
-   * Filter the registered ConfigNodes through the specific NodeStatus
-   *
-   * @param status The specific NodeStatus
-   * @return Filtered ConfigNodes with the specific NodeStatus
+   * @param status The specified NodeStatus
+   * @return Filtered ConfigNodes with the specified NodeStatus
    */
   public List<TConfigNodeLocation> filterConfigNodeThroughStatus(NodeStatus... status) {
-    return getRegisteredConfigNodes().stream()
-        .filter(
-            registeredConfigNode -> {
-              int configNodeId = registeredConfigNode.getConfigNodeId();
-              return nodeCacheMap.containsKey(configNodeId)
-                  && Arrays.stream(status)
-                      .anyMatch(s -> s.equals(nodeCacheMap.get(configNodeId).getNodeStatus()));
-            })
-        .collect(Collectors.toList());
+    return nodeInfo.getRegisteredConfigNodes(
+        getLoadManager().filterConfigNodeThroughStatus(status));
   }
 
   /**
-   * Get NodeStatus by nodeId
+   * Filter DataNodes through the specified NodeStatus
    *
-   * @param nodeId The specific NodeId
-   * @return NodeStatus of the specific node. If node does not exist, return null.
-   */
-  public NodeStatus getNodeStatusByNodeId(int nodeId) {
-    BaseNodeCache baseNodeCache = nodeCacheMap.get(nodeId);
-    return baseNodeCache == null ? null : baseNodeCache.getNodeStatus();
-  }
-
-  /**
-   * Filter the registered DataNodes through the specific NodeStatus
-   *
-   * @param status The specific NodeStatus
-   * @return Filtered DataNodes with the specific NodeStatus
+   * @param status The specified NodeStatus
+   * @return Filtered DataNodes with the specified NodeStatus
    */
   public List<TDataNodeConfiguration> filterDataNodeThroughStatus(NodeStatus... status) {
-    return getRegisteredDataNodes().stream()
-        .filter(
-            registeredDataNode -> {
-              int dataNodeId = registeredDataNode.getLocation().getDataNodeId();
-              return nodeCacheMap.containsKey(dataNodeId)
-                  && Arrays.stream(status)
-                      .anyMatch(s -> s.equals(nodeCacheMap.get(dataNodeId).getNodeStatus()));
-            })
-        .collect(Collectors.toList());
-  }
-
-  /**
-   * Get the loadScore of each DataNode
-   *
-   * @return Map<DataNodeId, loadScore>
-   */
-  public Map<Integer, Long> getAllLoadScores() {
-    Map<Integer, Long> result = new ConcurrentHashMap<>();
-
-    nodeCacheMap.forEach(
-        (dataNodeId, heartbeatCache) -> result.put(dataNodeId, heartbeatCache.getLoadScore()));
-
-    return result;
-  }
-
-  /**
-   * Get the free disk space of the specified DataNode
-   *
-   * @param dataNodeId The index of the specified DataNode
-   * @return The free disk space that sample through heartbeat, 0 if no heartbeat received
-   */
-  public double getFreeDiskSpace(int dataNodeId) {
-    DataNodeHeartbeatCache dataNodeHeartbeatCache =
-        (DataNodeHeartbeatCache) nodeCacheMap.get(dataNodeId);
-    return dataNodeHeartbeatCache == null ? 0d : dataNodeHeartbeatCache.getFreeDiskSpace();
+    return nodeInfo.getRegisteredDataNodes(getLoadManager().filterDataNodeThroughStatus(status));
   }
 
   /**
@@ -910,15 +677,10 @@
    */
   public Optional<TDataNodeLocation> getLowestLoadDataNode() {
     // TODO get real lowest load data node after scoring algorithm being implemented
-    List<TDataNodeConfiguration> targetDataNodeList =
-        filterDataNodeThroughStatus(NodeStatus.Running);
-
-    if (targetDataNodeList == null || targetDataNodeList.isEmpty()) {
-      return Optional.empty();
-    } else {
-      int index = random.nextInt(targetDataNodeList.size());
-      return Optional.of(targetDataNodeList.get(index).location);
-    }
+    int dataNodeId = getLoadManager().getLowestLoadDataNode();
+    return dataNodeId < 0
+        ? Optional.empty()
+        : Optional.of(getRegisteredDataNode(dataNodeId).getLocation());
   }
 
   /**
@@ -927,52 +689,8 @@
    * @return TDataNodeLocation with the lowest loadScore
    */
   public TDataNodeLocation getLowestLoadDataNode(Set<Integer> nodes) {
-    AtomicInteger result = new AtomicInteger();
-    AtomicLong lowestLoadScore = new AtomicLong(Long.MAX_VALUE);
-
-    nodes.forEach(
-        nodeID -> {
-          BaseNodeCache cache = nodeCacheMap.get(nodeID);
-          long score = (cache == null) ? Long.MAX_VALUE : cache.getLoadScore();
-          if (score < lowestLoadScore.get()) {
-            result.set(nodeID);
-            lowestLoadScore.set(score);
-          }
-        });
-
-    LOGGER.info(
-        "get the lowest load DataNode, NodeID: [{}], LoadScore: [{}]", result, lowestLoadScore);
-    return configManager.getNodeManager().getRegisteredDataNodeLocations().get(result.get());
-  }
-
-  /** Initialize the nodeCacheMap when the ConfigNode-Leader is switched */
-  public void initNodeHeartbeatCache() {
-    final int CURRENT_NODE_ID = ConfigNodeHeartbeatCache.CURRENT_NODE_ID;
-    nodeCacheMap.clear();
-
-    // Init ConfigNodeHeartbeatCache
-    getRegisteredConfigNodes()
-        .forEach(
-            configNodeLocation -> {
-              if (configNodeLocation.getConfigNodeId() != CURRENT_NODE_ID) {
-                nodeCacheMap.put(
-                    configNodeLocation.getConfigNodeId(),
-                    new ConfigNodeHeartbeatCache(configNodeLocation.getConfigNodeId()));
-              }
-            });
-    // Force set itself and never update
-    nodeCacheMap.put(
-        ConfigNodeHeartbeatCache.CURRENT_NODE_ID,
-        new ConfigNodeHeartbeatCache(
-            CURRENT_NODE_ID, ConfigNodeHeartbeatCache.CURRENT_NODE_STATISTICS));
-
-    // Init DataNodeHeartbeatCache
-    getRegisteredDataNodes()
-        .forEach(
-            dataNodeConfiguration ->
-                nodeCacheMap.put(
-                    dataNodeConfiguration.getLocation().getDataNodeId(),
-                    new DataNodeHeartbeatCache()));
+    int dataNodeId = getLoadManager().getLowestLoadDataNode(new ArrayList<>(nodes));
+    return getRegisteredDataNode(dataNodeId).getLocation();
   }
 
   private ConsensusManager getConsensusManager() {
@@ -1002,8 +720,4 @@
   private UDFManager getUDFManager() {
     return configManager.getUDFManager();
   }
-
-  private ClusterQuotaManager getClusterQuotaManager() {
-    return configManager.getClusterQuotaManager();
-  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java
index a38adaf..50e9023 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.confignode.manager.observer;
 
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.util.Map;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index 223bee1..f15978d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -26,7 +26,6 @@
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.cluster.RegionRoleType;
-import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.partition.DataPartitionTable;
@@ -68,7 +67,6 @@
 import org.apache.iotdb.confignode.manager.ProcedureManager;
 import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache;
 import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
 import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionCreateTask;
 import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask;
@@ -88,7 +86,6 @@
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -101,7 +98,6 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 /** The PartitionManager Manages cluster PartitionTable read and write requests. */
@@ -128,15 +124,11 @@
   private final ScheduledExecutorService regionMaintainer;
   private Future<?> currentRegionMaintainerFuture;
 
-  // Map<RegionId, RegionGroupCache>
-  private final Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap;
-
   public PartitionManager(IManager configManager, PartitionInfo partitionInfo) {
     this.configManager = configManager;
     this.partitionInfo = partitionInfo;
     this.regionMaintainer =
         IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("IoTDB-Region-Maintainer");
-    this.regionGroupCacheMap = new ConcurrentHashMap<>();
     setSeriesPartitionExecutor();
   }
 
@@ -593,13 +585,35 @@
    * Only leader use this interface.
    *
    * @param database The specified Database
-   * @return All Regions' RegionReplicaSet of the specified StorageGroup
+   * @return All Regions' RegionReplicaSet of the specified Database
    */
   public List<TRegionReplicaSet> getAllReplicaSets(String database) {
     return partitionInfo.getAllReplicaSets(database);
   }
 
   /**
+   * Get all RegionGroups currently owned by the specified Database
+   *
+   * @param dataNodeId The specified dataNodeId
+   * @return Deep copy of all RegionGroups' RegionReplicaSet with the specified dataNodeId
+   */
+  public List<TRegionReplicaSet> getAllReplicaSets(int dataNodeId) {
+    return partitionInfo.getAllReplicaSets(dataNodeId);
+  }
+
+  /**
+   * Only leader use this interface.
+   *
+   * @param database The specified Database
+   * @param regionGroupIds The specified RegionGroupIds
+   * @return All Regions' RegionReplicaSet of the specified Database
+   */
+  public List<TRegionReplicaSet> getReplicaSets(
+      String database, List<TConsensusGroupId> regionGroupIds) {
+    return partitionInfo.getReplicaSets(database, regionGroupIds);
+  }
+
+  /**
    * Only leader use this interface.
    *
    * <p>Get the number of Regions currently owned by the specified DataNode
@@ -671,22 +685,22 @@
   /**
    * Only leader use this interface.
    *
-   * @param storageGroup StorageGroupName
+   * @param database DatabaseName
    * @param type SchemaRegion or DataRegion
    * @return The specific StorageGroup's Regions that sorted by the number of allocated slots
    * @throws NoAvailableRegionGroupException When all RegionGroups within the specified StorageGroup
    *     are unavailable currently
    */
   public List<Pair<Long, TConsensusGroupId>> getSortedRegionGroupSlotsCounter(
-      String storageGroup, TConsensusGroupType type) throws NoAvailableRegionGroupException {
+      String database, TConsensusGroupType type) throws NoAvailableRegionGroupException {
     // Collect static data
     List<Pair<Long, TConsensusGroupId>> regionGroupSlotsCounter =
-        partitionInfo.getRegionGroupSlotsCounter(storageGroup, type);
+        partitionInfo.getRegionGroupSlotsCounter(database, type);
 
     // Filter RegionGroups that have Disabled status
     List<Pair<Long, TConsensusGroupId>> result = new ArrayList<>();
     for (Pair<Long, TConsensusGroupId> slotsCounter : regionGroupSlotsCounter) {
-      RegionGroupStatus status = getRegionGroupStatus(slotsCounter.getRight());
+      RegionGroupStatus status = getLoadManager().getRegionGroupStatus(slotsCounter.getRight());
       if (!RegionGroupStatus.Disabled.equals(status)) {
         result.add(slotsCounter);
       }
@@ -696,6 +710,9 @@
       throw new NoAvailableRegionGroupException(type);
     }
 
+    Map<TConsensusGroupId, RegionGroupStatus> regionGroupStatusMap =
+        getLoadManager()
+            .getRegionGroupStatus(result.stream().map(Pair::getRight).collect(Collectors.toList()));
     result.sort(
         (o1, o2) -> {
           // Use the number of partitions as the first priority
@@ -705,8 +722,9 @@
             return 1;
           } else {
             // Use RegionGroup status as second priority, Running > Available > Discouraged
-            return getRegionGroupStatus(o1.getRight())
-                .compareTo(getRegionGroupStatus(o2.getRight()));
+            return regionGroupStatusMap
+                .get(o1.getRight())
+                .compare(regionGroupStatusMap.get(o2.getRight()));
           }
         });
     return result;
@@ -764,7 +782,8 @@
         .forEach(
             regionInfo -> {
               regionInfo.setStatus(
-                  getRegionStatus(regionInfo.getConsensusGroupId(), regionInfo.getDataNodeId())
+                  getLoadManager()
+                      .getRegionStatus(regionInfo.getConsensusGroupId(), regionInfo.getDataNodeId())
                       .getStatus());
 
               String regionType =
@@ -779,19 +798,21 @@
   }
 
   /**
+   * Check if the specified RegionGroup exists.
+   *
+   * @param regionGroupId The specified RegionGroup
+   */
+  public boolean isRegionGroupExists(TConsensusGroupId regionGroupId) {
+    return partitionInfo.isRegionGroupExisted(regionGroupId);
+  }
+
+  /**
    * update region location
    *
    * @param req UpdateRegionLocationReq
    * @return TSStatus
    */
   public TSStatus updateRegionLocation(UpdateRegionLocationPlan req) {
-    // Remove heartbeat cache if exists
-    if (regionGroupCacheMap.containsKey(req.getRegionId())) {
-      regionGroupCacheMap
-          .get(req.getRegionId())
-          .removeCacheIfExists(req.getOldNode().getDataNodeId());
-    }
-
     return getConsensusManager().write(req).getStatus();
   }
 
@@ -1064,108 +1085,23 @@
         /* Stop the RegionCleaner service */
         currentRegionMaintainerFuture.cancel(false);
         currentRegionMaintainerFuture = null;
-        regionGroupCacheMap.clear();
         LOGGER.info("RegionCleaner is stopped successfully.");
       }
     }
   }
 
-  public Map<TConsensusGroupId, RegionGroupCache> getRegionGroupCacheMap() {
-    return regionGroupCacheMap;
-  }
-
-  public void removeRegionGroupCache(TConsensusGroupId consensusGroupId) {
-    regionGroupCacheMap.remove(consensusGroupId);
-  }
-
   /**
-   * Filter the RegionGroups in the specified StorageGroup through the RegionGroupStatus
+   * Filter the RegionGroups in the specified Database through the RegionGroupStatus
    *
-   * @param storageGroup The specified StorageGroup
+   * @param database The specified Database
    * @param status The specified RegionGroupStatus
-   * @return Filtered RegionGroups with the specific RegionGroupStatus
+   * @return Filtered RegionGroups with the specified RegionGroupStatus
    */
   public List<TRegionReplicaSet> filterRegionGroupThroughStatus(
-      String storageGroup, RegionGroupStatus... status) {
-    return getAllReplicaSets(storageGroup).stream()
-        .filter(
-            regionReplicaSet -> {
-              TConsensusGroupId regionGroupId = regionReplicaSet.getRegionId();
-              return regionGroupCacheMap.containsKey(regionGroupId)
-                  && Arrays.stream(status)
-                      .anyMatch(
-                          s ->
-                              s.equals(
-                                  regionGroupCacheMap
-                                      .get(regionGroupId)
-                                      .getStatistics()
-                                      .getRegionGroupStatus()));
-            })
-        .collect(Collectors.toList());
-  }
-
-  /**
-   * Count the number of cluster Regions with specified RegionStatus
-   *
-   * @param type The specified RegionGroupType
-   * @param status The specified statues
-   * @return The number of cluster Regions with specified RegionStatus
-   */
-  public int countRegionWithSpecifiedStatus(TConsensusGroupType type, RegionStatus... status) {
-    AtomicInteger result = new AtomicInteger(0);
-    regionGroupCacheMap.forEach(
-        (regionGroupId, regionGroupCache) -> {
-          if (type.equals(regionGroupId.getType())) {
-            regionGroupCache
-                .getStatistics()
-                .getRegionStatisticsMap()
-                .values()
-                .forEach(
-                    regionStatistics -> {
-                      if (Arrays.stream(status)
-                          .anyMatch(s -> s.equals(regionStatistics.getRegionStatus()))) {
-                        result.getAndIncrement();
-                      }
-                    });
-          }
-        });
-    return result.get();
-  }
-
-  /**
-   * Safely get RegionStatus.
-   *
-   * @param consensusGroupId Specified RegionGroupId
-   * @param dataNodeId Specified RegionReplicaId
-   * @return Corresponding RegionStatus if cache exists, Unknown otherwise
-   */
-  public RegionStatus getRegionStatus(TConsensusGroupId consensusGroupId, int dataNodeId) {
-    return regionGroupCacheMap.containsKey(consensusGroupId)
-        ? regionGroupCacheMap.get(consensusGroupId).getStatistics().getRegionStatus(dataNodeId)
-        : RegionStatus.Unknown;
-  }
-
-  /**
-   * Safely get RegionGroupStatus.
-   *
-   * @param consensusGroupId Specified RegionGroupId
-   * @return Corresponding RegionGroupStatus if cache exists, Disabled otherwise
-   */
-  public RegionGroupStatus getRegionGroupStatus(TConsensusGroupId consensusGroupId) {
-    return regionGroupCacheMap.containsKey(consensusGroupId)
-        ? regionGroupCacheMap.get(consensusGroupId).getStatistics().getRegionGroupStatus()
-        : RegionGroupStatus.Disabled;
-  }
-
-  /** Initialize the regionGroupCacheMap when the ConfigNode-Leader is switched. */
-  public void initRegionGroupHeartbeatCache() {
-    regionGroupCacheMap.clear();
-    getAllReplicaSets()
-        .forEach(
-            regionReplicaSet ->
-                regionGroupCacheMap.put(
-                    regionReplicaSet.getRegionId(),
-                    new RegionGroupCache(regionReplicaSet.getRegionId())));
+      String database, RegionGroupStatus... status) {
+    List<TConsensusGroupId> matchedRegionGroups =
+        getLoadManager().filterRegionGroupThroughStatus(status);
+    return getReplicaSets(database, matchedRegionGroups);
   }
 
   public void getSchemaRegionIds(
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java
index 21909d6..f9eff7b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java
@@ -72,10 +72,9 @@
       metricService.createAutoGauge(
           Metric.REGION_NUM.toString(),
           MetricLevel.CORE,
-          getPartitionManager(),
-          partitionManager ->
-              partitionManager.countRegionWithSpecifiedStatus(
-                  TConsensusGroupType.SchemaRegion, status),
+          getLoadManager(),
+          loadManager ->
+              loadManager.countRegionWithSpecifiedStatus(TConsensusGroupType.SchemaRegion, status),
           Tag.TYPE.toString(),
           TConsensusGroupType.SchemaRegion.toString(),
           Tag.STATUS.toString(),
@@ -85,10 +84,9 @@
       metricService.createAutoGauge(
           Metric.REGION_NUM.toString(),
           MetricLevel.CORE,
-          getPartitionManager(),
-          partitionManager ->
-              partitionManager.countRegionWithSpecifiedStatus(
-                  TConsensusGroupType.DataRegion, status),
+          getLoadManager(),
+          loadManager ->
+              loadManager.countRegionWithSpecifiedStatus(TConsensusGroupType.DataRegion, status),
           Tag.TYPE.toString(),
           TConsensusGroupType.DataRegion.toString(),
           Tag.STATUS.toString(),
@@ -330,8 +328,8 @@
     return configManager.getClusterSchemaManager();
   }
 
-  private PartitionManager getPartitionManager() {
-    return configManager.getPartitionManager();
+  private LoadManager getLoadManager() {
+    return configManager.getLoadManager();
   }
 
   @Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupStatus.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupStatus.java
index 9e58ae6..b4cf1e0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupStatus.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupStatus.java
@@ -21,20 +21,20 @@
 public enum RegionGroupStatus {
 
   /** All Regions in RegionGroup are in the Running status */
-  Running("Running"),
+  Running("Running", 1),
 
   /**
    * All Regions in RegionGroup are in the Running or Unknown status, and the number of Regions in
    * the Unknown status is less than half
    */
-  Available("Available"),
+  Available("Available", 2),
 
   /**
    * All Regions in RegionGroup are in the Running, Unknown or ReadOnly status, and at least 1 node
    * is in ReadOnly status, the number of Regions in the Unknown or ReadOnly status is less than
    * half
    */
-  Discouraged("Discouraged"),
+  Discouraged("Discouraged", 3),
 
   /**
    * The following cases will lead to Disabled RegionGroup:
@@ -43,12 +43,14 @@
    *
    * <p>2. More than half of the Regions are in Unknown or ReadOnly status
    */
-  Disabled("Disabled");
+  Disabled("Disabled", 4);
 
   private final String status;
+  private final int weight;
 
-  RegionGroupStatus(String status) {
+  RegionGroupStatus(String status, int weight) {
     this.status = status;
+    this.weight = weight;
   }
 
   public String getStatus() {
@@ -63,4 +65,13 @@
     }
     throw new RuntimeException(String.format("RegionGroupStatus %s doesn't exist.", status));
   }
+
+  /**
+   * Compare the weight of two RegionGroupStatus.
+   *
+   * <p>Running > Available > Discouraged > Disabled
+   */
+  public int compare(RegionGroupStatus other) {
+    return Integer.compare(this.weight, other.weight);
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
index 67c5526..4c0b176 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
@@ -221,18 +221,6 @@
     return result;
   }
 
-  /** Return the number of registered ConfigNodes */
-  public int getRegisteredConfigNodeCount() {
-    int result;
-    configNodeInfoReadWriteLock.readLock().lock();
-    try {
-      result = registeredConfigNodes.size();
-    } finally {
-      configNodeInfoReadWriteLock.readLock().unlock();
-    }
-    return result;
-  }
-
   /** Return the number of total cpu cores in online DataNodes */
   public int getTotalCpuCoreCount() {
     int result = 0;
@@ -269,6 +257,23 @@
     }
   }
 
+  /** @return The specified registered DataNodes */
+  public List<TDataNodeConfiguration> getRegisteredDataNodes(List<Integer> dataNodeIds) {
+    List<TDataNodeConfiguration> result = new ArrayList<>();
+    dataNodeInfoReadWriteLock.readLock().lock();
+    try {
+      dataNodeIds.forEach(
+          dataNodeId -> {
+            if (registeredDataNodes.containsKey(dataNodeId)) {
+              result.add(registeredDataNodes.get(dataNodeId).deepCopy());
+            }
+          });
+    } finally {
+      dataNodeInfoReadWriteLock.readLock().unlock();
+    }
+    return result;
+  }
+
   /**
    * Update ConfigNodeList both in memory and confignode-system.properties file
    *
@@ -336,6 +341,7 @@
     return status;
   }
 
+  /** @return All registered ConfigNodes */
   public List<TConfigNodeLocation> getRegisteredConfigNodes() {
     List<TConfigNodeLocation> result;
     configNodeInfoReadWriteLock.readLock().lock();
@@ -347,6 +353,23 @@
     return result;
   }
 
+  /** @return The specified registered ConfigNode */
+  public List<TConfigNodeLocation> getRegisteredConfigNodes(List<Integer> configNodeIds) {
+    List<TConfigNodeLocation> result = new ArrayList<>();
+    configNodeInfoReadWriteLock.readLock().lock();
+    try {
+      configNodeIds.forEach(
+          configNodeId -> {
+            if (registeredConfigNodes.containsKey(configNodeId)) {
+              result.add(registeredConfigNodes.get(configNodeId).deepCopy());
+            }
+          });
+    } finally {
+      configNodeInfoReadWriteLock.readLock().unlock();
+    }
+    return result;
+  }
+
   public int generateNextNodeId() {
     return nextNodeId.incrementAndGet();
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
index b27a4b5..736083f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
@@ -89,7 +89,7 @@
   }
 
   /**
-   * Cache allocation result of new RegionGroups
+   * Cache allocation result of new RegionGroups.
    *
    * @param replicaSets List<TRegionReplicaSet>
    */
@@ -101,7 +101,7 @@
   }
 
   /**
-   * Delete RegionGroups' cache
+   * Delete RegionGroups' cache.
    *
    * @param replicaSets List<TRegionReplicaSet>
    */
@@ -109,7 +109,7 @@
     replicaSets.forEach(replicaSet -> regionGroupMap.remove(replicaSet.getRegionId()));
   }
 
-  /** @return Deep copy of all Regions' RegionReplicaSet within one StorageGroup */
+  /** @return Deep copy of all Regions' RegionReplicaSet within one StorageGroup. */
   public List<TRegionReplicaSet> getAllReplicaSets() {
     List<TRegionReplicaSet> result = new ArrayList<>();
 
@@ -119,8 +119,9 @@
 
     return result;
   }
+
   /**
-   * Get all RegionGroups currently owned by this StorageGroup
+   * Get all RegionGroups currently owned by this Database.
    *
    * @param type The specified TConsensusGroupType
    * @return Deep copy of all Regions' RegionReplicaSet with the specified TConsensusGroupType
@@ -138,6 +139,37 @@
   }
 
   /**
+   * Get all RegionGroups currently owned by the specified Database.
+   *
+   * @param dataNodeId The specified dataNodeId
+   * @return Deep copy of all RegionGroups' RegionReplicaSet with the specified dataNodeId
+   */
+  public List<TRegionReplicaSet> getAllReplicaSets(int dataNodeId) {
+    return regionGroupMap.values().stream()
+        .filter(regionGroup -> regionGroup.belongsToDataNode(dataNodeId))
+        .map(RegionGroup::getReplicaSet)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Get the RegionGroups with the specified RegionGroupIds.
+   *
+   * @param regionGroupIds The specified RegionGroupIds
+   * @return Deep copy of the RegionGroups with the specified RegionGroupIds
+   */
+  public List<TRegionReplicaSet> getReplicaSets(List<TConsensusGroupId> regionGroupIds) {
+    List<TRegionReplicaSet> result = new ArrayList<>();
+
+    for (TConsensusGroupId regionGroupId : regionGroupIds) {
+      if (regionGroupMap.containsKey(regionGroupId)) {
+        result.add(regionGroupMap.get(regionGroupId).getReplicaSet());
+      }
+    }
+
+    return result;
+  }
+
+  /**
    * Only leader use this interface.
    *
    * <p>Get the number of Regions currently owned by the specified DataNode
@@ -168,7 +200,7 @@
   }
 
   /**
-   * Get the number of RegionGroups currently owned by this StorageGroup
+   * Get the number of RegionGroups currently owned by this StorageGroup.
    *
    * @param type SchemaRegion or DataRegion
    * @return The number of Regions currently owned by this StorageGroup
@@ -193,7 +225,7 @@
   }
 
   /**
-   * Thread-safely get SchemaPartition within the specific StorageGroup
+   * Thread-safely get SchemaPartition within the specific StorageGroup.
    *
    * @param partitionSlots SeriesPartitionSlots
    * @param schemaPartition Where the results are stored
@@ -205,7 +237,7 @@
   }
 
   /**
-   * Thread-safely get DataPartition within the specific StorageGroup
+   * Thread-safely get DataPartition within the specific StorageGroup.
    *
    * @param partitionSlots SeriesPartitionSlots and TimePartitionSlots
    * @param dataPartition Where the results are stored
@@ -217,7 +249,7 @@
   }
 
   /**
-   * Checks whether the specified DataPartition has a predecessor and returns if it does
+   * Checks whether the specified DataPartition has a predecessor and returns if it does.
    *
    * @param seriesPartitionSlot Corresponding SeriesPartitionSlot
    * @param timePartitionSlot Corresponding TimePartitionSlot
@@ -233,7 +265,7 @@
   }
 
   /**
-   * Create SchemaPartition within the specific StorageGroup
+   * Create SchemaPartition within the specific StorageGroup.
    *
    * @param assignedSchemaPartition Assigned result
    */
@@ -250,7 +282,7 @@
   }
 
   /**
-   * Create DataPartition within the specific StorageGroup
+   * Create DataPartition within the specific StorageGroup.
    *
    * @param assignedDataPartition Assigned result
    */
@@ -268,7 +300,7 @@
 
   /**
    * Only Leader use this interface. Filter unassigned SchemaPartitionSlots within the specific
-   * StorageGroup
+   * StorageGroup.
    *
    * @param partitionSlots List<TSeriesPartitionSlot>
    * @return Unassigned PartitionSlots
@@ -279,7 +311,7 @@
   }
 
   /**
-   * Get the DataNodes who contain the specific StorageGroup's Schema or Data
+   * Get the DataNodes who contain the specific StorageGroup's Schema or Data.
    *
    * @param type SchemaRegion or DataRegion
    * @return Set<TDataNodeLocation>, the related DataNodes
@@ -297,7 +329,7 @@
 
   /**
    * Only Leader use this interface. Filter unassigned DataPartitionSlots within the specific
-   * StorageGroup
+   * StorageGroup.
    *
    * @param partitionSlots List<TSeriesPartitionSlot>
    * @return Unassigned PartitionSlots
@@ -436,7 +468,7 @@
     }
   }
   /**
-   * update region location
+   * update region location.
    *
    * @param regionId regionId
    * @param oldNode old location, will remove it
@@ -496,7 +528,7 @@
    * @param regionId TConsensusGroupId
    * @return True if contains.
    */
-  public boolean containRegion(TConsensusGroupId regionId) {
+  public boolean containRegionGroup(TConsensusGroupId regionId) {
     return regionGroupMap.containsKey(regionId);
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index b19ac83..799773a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -489,6 +489,17 @@
   }
 
   /**
+   * Check if the specified RegionGroup exists.
+   *
+   * @param regionGroupId The specified RegionGroup
+   */
+  public boolean isRegionGroupExisted(TConsensusGroupId regionGroupId) {
+    return databasePartitionTables.values().stream()
+        .anyMatch(
+            databasePartitionTable -> databasePartitionTable.containRegionGroup(regionGroupId));
+  }
+
+  /**
    * Update the location info of given regionId
    *
    * @param req UpdateRegionLocationReq
@@ -500,9 +511,10 @@
     TDataNodeLocation oldNode = req.getOldNode();
     TDataNodeLocation newNode = req.getNewNode();
     databasePartitionTables.values().stream()
-        .filter(sgPartitionTable -> sgPartitionTable.containRegion(regionId))
+        .filter(databasePartitionTable -> databasePartitionTable.containRegionGroup(regionId))
         .forEach(
-            sgPartitionTable -> sgPartitionTable.updateRegionLocation(regionId, oldNode, newNode));
+            databasePartitionTable ->
+                databasePartitionTable.updateRegionLocation(regionId, oldNode, newNode));
 
     return status;
   }
@@ -516,7 +528,7 @@
   public String getRegionStorageGroup(TConsensusGroupId regionId) {
     Optional<DatabasePartitionTable> sgPartitionTableOptional =
         databasePartitionTables.values().stream()
-            .filter(s -> s.containRegion(regionId))
+            .filter(s -> s.containRegionGroup(regionId))
             .findFirst();
     return sgPartitionTableOptional.map(DatabasePartitionTable::getDatabaseName).orElse(null);
   }
@@ -620,6 +632,38 @@
   }
 
   /**
+   * Get all RegionGroups currently owned by the specified Database.
+   *
+   * @param dataNodeId The specified dataNodeId
+   * @return Deep copy of all RegionGroups' RegionReplicaSet with the specified dataNodeId
+   */
+  public List<TRegionReplicaSet> getAllReplicaSets(int dataNodeId) {
+    List<TRegionReplicaSet> result = new ArrayList<>();
+    databasePartitionTables
+        .values()
+        .forEach(
+            databasePartitionTable ->
+                result.addAll(databasePartitionTable.getAllReplicaSets(dataNodeId)));
+    return result;
+  }
+
+  /**
+   * Only leader use this interface.
+   *
+   * @param database The specified Database
+   * @param regionGroupIds The specified RegionGroupIds
+   * @return All Regions' RegionReplicaSet of the specified Database
+   */
+  public List<TRegionReplicaSet> getReplicaSets(
+      String database, List<TConsensusGroupId> regionGroupIds) {
+    if (databasePartitionTables.containsKey(database)) {
+      return databasePartitionTables.get(database).getReplicaSets(regionGroupIds);
+    } else {
+      return new ArrayList<>();
+    }
+  }
+
+  /**
    * Only leader use this interface.
    *
    * <p>Get the number of Regions currently owned by the specified DataNode
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
index 4c4c7ff..a555c23 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
@@ -71,7 +71,7 @@
   }
 
   public TRegionReplicaSet getReplicaSet() {
-    return replicaSet;
+    return replicaSet.deepCopy();
   }
 
   /** @param deltaMap Map<TSeriesPartitionSlot, Delta TTimePartitionSlot Count> */
@@ -93,6 +93,17 @@
     return totalTimeSlotCount.get();
   }
 
+  /**
+   * Check if the RegionGroup belongs to the specified DataNode.
+   *
+   * @param dataNodeId The specified DataNodeId.
+   * @return True if the RegionGroup belongs to the specified DataNode.
+   */
+  public boolean belongsToDataNode(int dataNodeId) {
+    return replicaSet.getDataNodeLocations().stream()
+        .anyMatch(dataNodeLocation -> dataNodeLocation.getDataNodeId() == dataNodeId);
+  }
+
   public void serialize(OutputStream outputStream, TProtocol protocol)
       throws IOException, TException {
     ReadWriteIOUtils.write(createTime, outputStream);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index d722d37..46a4c7a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -27,6 +27,7 @@
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.NodeType;
 import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
@@ -49,11 +50,10 @@
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionHeartbeatSample;
 import org.apache.iotdb.confignode.persistence.node.NodeInfo;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
@@ -148,8 +148,7 @@
    * @throws TException Thrift IOE
    */
   public boolean invalidateCache(String storageGroupName) throws IOException, TException {
-    NodeManager nodeManager = configManager.getNodeManager();
-    List<TDataNodeConfiguration> allDataNodes = nodeManager.getRegisteredDataNodes();
+    List<TDataNodeConfiguration> allDataNodes = getNodeManager().getRegisteredDataNodes();
     TInvalidateCacheReq invalidateCacheReq = new TInvalidateCacheReq();
     invalidateCacheReq.setStorageGroup(true);
     invalidateCacheReq.setFullPath(storageGroupName);
@@ -157,14 +156,14 @@
       int dataNodeId = dataNodeConfiguration.getLocation().getDataNodeId();
 
       // if the node is not alive, sleep 1 second and try again
-      NodeStatus nodeStatus = nodeManager.getNodeStatusByNodeId(dataNodeId);
+      NodeStatus nodeStatus = getLoadManager().getNodeStatus(dataNodeId);
       if (nodeStatus == NodeStatus.Unknown) {
         try {
           TimeUnit.MILLISECONDS.sleep(1000);
         } catch (InterruptedException e) {
           LOG.error("Sleep failed in ConfigNodeProcedureEnv: ", e);
         }
-        nodeStatus = nodeManager.getNodeStatusByNodeId(dataNodeId);
+        nodeStatus = getLoadManager().getNodeStatus(dataNodeId);
       }
 
       if (nodeStatus == NodeStatus.Running) {
@@ -205,14 +204,11 @@
   }
 
   public boolean doubleCheckReplica(TDataNodeLocation removedDatanode) {
-    return configManager
-                .getNodeManager()
+    return getNodeManager()
                 .filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.ReadOnly)
                 .size()
             - Boolean.compare(
-                configManager
-                        .getNodeManager()
-                        .getNodeStatusByNodeId(removedDatanode.getDataNodeId())
+                getLoadManager().getNodeStatus(removedDatanode.getDataNodeId())
                     != NodeStatus.Unknown,
                 false)
         >= NodeInfo.getMinimumDataNode();
@@ -308,8 +304,6 @@
    * @throws ProcedureException if failed status
    */
   public void stopConfigNode(TConfigNodeLocation tConfigNodeLocation) throws ProcedureException {
-    getNodeManager().removeNodeCache(tConfigNodeLocation.getConfigNodeId());
-
     TSStatus tsStatus =
         (TSStatus)
             SyncConfigNodeClientPool.getInstance()
@@ -321,6 +315,8 @@
     if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       throw new ProcedureException(tsStatus.getMessage());
     }
+
+    getLoadManager().removeNodeCache(tConfigNodeLocation.getConfigNodeId());
   }
 
   /**
@@ -374,8 +370,7 @@
    */
   public void markDataNodeAsRemovingAndBroadcast(TDataNodeLocation dataNodeLocation) {
     // Send request to update NodeStatus on the DataNode to be removed
-    if (configManager.getNodeManager().getNodeStatusByNodeId(dataNodeLocation.getDataNodeId())
-        == NodeStatus.Unknown) {
+    if (getLoadManager().getNodeStatus(dataNodeLocation.getDataNodeId()) == NodeStatus.Unknown) {
       SyncDataNodeClientPool.getInstance()
           .sendSyncRequestToDataNodeWithGivenRetry(
               dataNodeLocation.getInternalEndPoint(),
@@ -391,10 +386,11 @@
     }
 
     // Force updating NodeStatus to Removing
-    getNodeManager()
-        .getNodeCacheMap()
-        .get(dataNodeLocation.getDataNodeId())
-        .forceUpdate(NodeHeartbeatSample.generateDefaultSample(NodeStatus.Removing));
+    getLoadManager()
+        .forceUpdateNodeCache(
+            NodeType.DataNode,
+            dataNodeLocation.getDataNodeId(),
+            NodeHeartbeatSample.generateDefaultSample(NodeStatus.Removing));
   }
 
   /**
@@ -542,10 +538,7 @@
         (dataNodeId, regionStatus) ->
             heartbeatSampleMap.put(
                 dataNodeId, new RegionHeartbeatSample(currentTime, currentTime, regionStatus)));
-    getPartitionManager()
-        .getRegionGroupCacheMap()
-        .computeIfAbsent(regionGroupId, empty -> new RegionGroupCache(regionGroupId))
-        .forceUpdate(heartbeatSampleMap);
+    getLoadManager().forceUpdateRegionGroupCache(regionGroupId, heartbeatSampleMap);
 
     // Select leader greedily for iot consensus protocol
     if (TConsensusGroupType.DataRegion.equals(regionGroupId.getType())
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index afa2d3c..c75fd41 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -34,7 +34,6 @@
 import org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan;
 import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
 import org.apache.iotdb.confignode.manager.ConfigManager;
-import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache;
 import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
 import org.apache.iotdb.confignode.persistence.node.NodeInfo;
 import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
@@ -328,7 +327,7 @@
     TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId, originalDataNode);
 
     status =
-        configManager.getNodeManager().getNodeStatusByNodeId(originalDataNode.getDataNodeId())
+        configManager.getLoadManager().getNodeStatus(originalDataNode.getDataNodeId())
                 == NodeStatus.Unknown
             ? SyncDataNodeClientPool.getInstance()
                 .sendSyncRequestToDataNodeWithGivenRetry(
@@ -375,6 +374,9 @@
         getIdWithRpcEndpoint(originalDataNode),
         getIdWithRpcEndpoint(destDataNode));
 
+    // Remove the RegionGroupCache of the regionId
+    configManager.getLoadManager().removeRegionGroupCache(regionId);
+
     // Broadcast the latest RegionRouteMap when Region migration finished
     configManager.getLoadManager().broadcastLatestRegionRouteMap();
   }
@@ -427,7 +429,7 @@
         SyncDataNodeClientPool.getInstance()
             .sendSyncRequestToDataNodeWithGivenRetry(
                 dataNode.getInternalEndPoint(), dataNode, DataNodeRequestType.STOP_DATA_NODE, 2);
-    configManager.getNodeManager().removeNodeCache(dataNode.getDataNodeId());
+    configManager.getLoadManager().removeNodeCache(dataNode.getDataNodeId());
     LOGGER.info(
         "{}, Stop Data Node result: {}, stoppedDataNode: {}",
         REMOVE_DATANODE_PROCESS,
@@ -509,9 +511,8 @@
     if (CONF.getSchemaReplicationFactor() == 1 || CONF.getDataReplicationFactor() == 1) {
       for (TDataNodeLocation dataNodeLocation : removedDataNodes) {
         // check whether removed data node is in running state
-        BaseNodeCache nodeCache =
-            configManager.getNodeManager().getNodeCacheMap().get(dataNodeLocation.getDataNodeId());
-        if (!NodeStatus.Running.equals(nodeCache.getNodeStatus())) {
+        if (!NodeStatus.Running.equals(
+            configManager.getLoadManager().getNodeStatus(dataNodeLocation.getDataNodeId()))) {
           removedDataNodes.remove(dataNodeLocation);
           LOGGER.error(
               "Failed to remove data node {} because it is not in running and the configuration of cluster is one replication",
@@ -530,7 +531,7 @@
             removeDataNodePlan.getDataNodeLocations().stream()
                 .filter(
                     x ->
-                        configManager.getNodeManager().getNodeStatusByNodeId(x.getDataNodeId())
+                        configManager.getLoadManager().getNodeStatus(x.getDataNodeId())
                             != NodeStatus.Unknown)
                 .count();
     if (availableDatanodeSize - removedDataNodeSize < NodeInfo.getMinimumDataNode()) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
index f1271c2..43c3404 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
@@ -120,7 +120,7 @@
               regionReplicaSet -> {
                 // Clear heartbeat cache along the way
                 env.getConfigManager()
-                    .getPartitionManager()
+                    .getLoadManager()
                     .removeRegionGroupCache(regionReplicaSet.getRegionId());
                 env.getConfigManager()
                     .getLoadManager()
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
index 46b5839..e1f02cf 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
@@ -47,6 +47,9 @@
 /** Region migrate procedure */
 public class RegionMigrateProcedure
     extends StateMachineProcedure<ConfigNodeProcedureEnv, RegionTransitionState> {
+
+  // TODO: Reach an agreement on RegionMigrateProcedure
+
   private static final Logger LOG = LoggerFactory.getLogger(RegionMigrateProcedure.class);
   private static final int RETRY_THRESHOLD = 5;
 
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java
index cb617bf..64eae9a 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java
@@ -24,9 +24,9 @@
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.node.BaseNodeCache;
+import org.apache.iotdb.confignode.manager.load.cache.node.DataNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
 import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
 
 import org.junit.Assert;
@@ -64,7 +64,7 @@
           NodeStatus.Running, NodeStatus.Unknown, NodeStatus.Running, NodeStatus.ReadOnly
         };
     for (int i = 0; i < 4; i++) {
-      nodeCacheMap.put(i, new DataNodeHeartbeatCache());
+      nodeCacheMap.put(i, new DataNodeHeartbeatCache(i));
       nodeCacheMap
           .get(i)
           .cacheHeartbeatSample(
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
index a028219..99d0e88 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
@@ -24,9 +24,9 @@
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.node.BaseNodeCache;
+import org.apache.iotdb.confignode.manager.load.cache.node.DataNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
 import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
 
 import org.junit.Assert;
@@ -61,7 +61,7 @@
     long currentTimeMillis = System.currentTimeMillis();
     Map<Integer, BaseNodeCache> nodeCacheMap = new HashMap<>();
     for (int i = 0; i < 6; i++) {
-      nodeCacheMap.put(i, new DataNodeHeartbeatCache());
+      nodeCacheMap.put(i, new DataNodeHeartbeatCache(i));
       if (i != 2 && i != 5) {
         nodeCacheMap
             .get(i)
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java
index 4d93b63..46ffbe2 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java
@@ -19,8 +19,8 @@
 package org.apache.iotdb.confignode.manager.node;
 
 import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.node.DataNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
 import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
 
 import org.junit.Assert;
@@ -30,7 +30,7 @@
 
   @Test
   public void forceUpdateTest() {
-    DataNodeHeartbeatCache dataNodeHeartbeatCache = new DataNodeHeartbeatCache();
+    DataNodeHeartbeatCache dataNodeHeartbeatCache = new DataNodeHeartbeatCache(1);
 
     // Test default
     Assert.assertEquals(NodeStatus.Unknown, dataNodeHeartbeatCache.getNodeStatus());
@@ -55,7 +55,7 @@
 
   @Test
   public void periodicUpdateTest() {
-    DataNodeHeartbeatCache dataNodeHeartbeatCache = new DataNodeHeartbeatCache();
+    DataNodeHeartbeatCache dataNodeHeartbeatCache = new DataNodeHeartbeatCache(1);
     long currentTime = System.currentTimeMillis();
     dataNodeHeartbeatCache.cacheHeartbeatSample(
         new NodeHeartbeatSample(
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCacheTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCacheTest.java
index 3c3cf48..1dac94e 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCacheTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCacheTest.java
@@ -21,8 +21,8 @@
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.commons.cluster.RegionStatus;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupCache;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
 
 import org.junit.Assert;
 import org.junit.Test;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/node/NodeStatisticsTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/node/NodeStatisticsTest.java
index 4181fa9..7d71f58 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/node/NodeStatisticsTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/node/NodeStatisticsTest.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.confignode.persistence.node;
 
 import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
 import org.junit.Assert;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatisticsTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatisticsTest.java
index cce3ed2..4d1642b 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatisticsTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatisticsTest.java
@@ -19,9 +19,9 @@
 package org.apache.iotdb.confignode.persistence.partition.statistics;
 
 import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
 import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupStatistics;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionStatistics;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
 import org.junit.Assert;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionStatisticsTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionStatisticsTest.java
index 8ccf87c..b5935cc 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionStatisticsTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionStatisticsTest.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.confignode.persistence.partition.statistics;
 
 import org.apache.iotdb.commons.cluster.RegionStatus;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
 import org.junit.Assert;