[IOTDB-5570] Move heartbeat thread and statistics thread to load manager (#9608)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
index c97f3d8..c5a7257 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
@@ -18,24 +18,25 @@
*/
package org.apache.iotdb.confignode.client.async.handlers.heartbeat;
-import org.apache.iotdb.confignode.manager.node.heartbeat.ConfigNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
import org.apache.thrift.async.AsyncMethodCallback;
public class ConfigNodeHeartbeatHandler implements AsyncMethodCallback<Long> {
- // Update ConfigNodeHeartbeatCache when success
- private final ConfigNodeHeartbeatCache configNodeHeartbeatCache;
+ private final int nodeId;
+ private final LoadCache cache;
- public ConfigNodeHeartbeatHandler(ConfigNodeHeartbeatCache configNodeHeartbeatCache) {
- this.configNodeHeartbeatCache = configNodeHeartbeatCache;
+ public ConfigNodeHeartbeatHandler(int nodeId, LoadCache cache) {
+ this.nodeId = nodeId;
+ this.cache = cache;
}
@Override
public void onComplete(Long timestamp) {
- configNodeHeartbeatCache.cacheHeartbeatSample(
- new NodeHeartbeatSample(timestamp, System.currentTimeMillis()));
+ long receiveTime = System.currentTimeMillis();
+ cache.cacheConfigNodeHeartbeatSample(nodeId, new NodeHeartbeatSample(timestamp, receiveTime));
}
@Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
index fe1af63..60ea48b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
@@ -18,14 +18,11 @@
*/
package org.apache.iotdb.confignode.client.async.handlers.heartbeat;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
-import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -35,26 +32,25 @@
public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatResp> {
- // Update DataNodeHeartbeatCache when success
- private final TDataNodeLocation dataNodeLocation;
- private final DataNodeHeartbeatCache dataNodeHeartbeatCache;
- private final Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap;
+ private final int nodeId;
+
+ private final LoadCache loadCache;
private final RouteBalancer routeBalancer;
+
private final Map<Integer, Long> deviceNum;
private final Map<Integer, Long> timeSeriesNum;
private final Map<Integer, Long> regionDisk;
public DataNodeHeartbeatHandler(
- TDataNodeLocation dataNodeLocation,
- DataNodeHeartbeatCache dataNodeHeartbeatCache,
- Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap,
+ int nodeId,
+ LoadCache loadCache,
RouteBalancer routeBalancer,
Map<Integer, Long> deviceNum,
Map<Integer, Long> timeSeriesNum,
Map<Integer, Long> regionDisk) {
- this.dataNodeLocation = dataNodeLocation;
- this.dataNodeHeartbeatCache = dataNodeHeartbeatCache;
- this.regionGroupCacheMap = regionGroupCacheMap;
+
+ this.nodeId = nodeId;
+ this.loadCache = loadCache;
this.routeBalancer = routeBalancer;
this.deviceNum = deviceNum;
this.timeSeriesNum = timeSeriesNum;
@@ -66,31 +62,30 @@
long receiveTime = System.currentTimeMillis();
// Update NodeCache
- dataNodeHeartbeatCache.cacheHeartbeatSample(
- new NodeHeartbeatSample(heartbeatResp, receiveTime));
+ loadCache.cacheDataNodeHeartbeatSample(
+ nodeId, new NodeHeartbeatSample(heartbeatResp, receiveTime));
- // Update RegionGroupCache And leaderCache
heartbeatResp
.getJudgedLeaders()
.forEach(
(regionGroupId, isLeader) -> {
- regionGroupCacheMap
- .computeIfAbsent(regionGroupId, empty -> new RegionGroupCache(regionGroupId))
- .cacheHeartbeatSample(
- dataNodeLocation.getDataNodeId(),
- new RegionHeartbeatSample(
- heartbeatResp.getHeartbeatTimestamp(),
- receiveTime,
- // Region will inherit DataNode's status
- RegionStatus.parse(heartbeatResp.getStatus())));
+ // Update RegionGroupCache
+ loadCache.cacheRegionHeartbeatSample(
+ regionGroupId,
+ nodeId,
+ new RegionHeartbeatSample(
+ heartbeatResp.getHeartbeatTimestamp(),
+ receiveTime,
+ // Region will inherit DataNode's status
+ RegionStatus.parse(heartbeatResp.getStatus())));
if (isLeader) {
+ // Update leaderCache
routeBalancer.cacheLeaderSample(
- regionGroupId,
- new Pair<>(
- heartbeatResp.getHeartbeatTimestamp(), dataNodeLocation.getDataNodeId()));
+ regionGroupId, new Pair<>(heartbeatResp.getHeartbeatTimestamp(), nodeId));
}
});
+
if (heartbeatResp.getDeviceNum() != null) {
deviceNum.putAll(heartbeatResp.getDeviceNum());
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index 9e93061..f6b3f25 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -204,15 +204,13 @@
newLeaderId,
currentNodeTEndPoint);
- // Always initiate all kinds of HeartbeatCache first
- configManager.getLoadManager().initHeartbeatCache();
+ // Always start load services first
+ configManager.getLoadManager().startLoadServices();
// Start leader scheduling services
configManager.getProcedureManager().shiftExecutor(true);
- configManager.getLoadManager().startLoadStatisticsService();
configManager.getLoadManager().getRouteBalancer().startRouteBalancingService();
configManager.getRetryFailedTasksThread().startRetryFailedTasksService();
- configManager.getNodeManager().startHeartbeatService();
configManager.getPartitionManager().startRegionCleaner();
// we do cq recovery async for two reasons:
@@ -229,11 +227,10 @@
newLeaderId);
// Stop leader scheduling services
+ configManager.getLoadManager().stopLoadServices();
configManager.getProcedureManager().shiftExecutor(false);
- configManager.getLoadManager().stopLoadStatisticsService();
configManager.getLoadManager().getRouteBalancer().stopRouteBalancingService();
configManager.getRetryFailedTasksThread().stopRetryFailedTasksService();
- configManager.getNodeManager().stopHeartbeatService();
configManager.getPartitionManager().stopRegionCleaner();
configManager.getCQManager().stopCQScheduler();
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 69cf4a4..c6ac131 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -82,10 +82,10 @@
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.manager.cq.CQManager;
import org.apache.iotdb.confignode.manager.load.LoadManager;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
import org.apache.iotdb.confignode.manager.node.ClusterNodeStartUtils;
import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.manager.node.NodeMetrics;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
import org.apache.iotdb.confignode.manager.pipe.PipeManager;
@@ -387,10 +387,11 @@
TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
// Force updating the target DataNode's status to Unknown
- getNodeManager()
- .getNodeCacheMap()
- .get(dataNodeLocation.getDataNodeId())
- .forceUpdate(NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown));
+ getLoadManager()
+ .forceUpdateNodeCache(
+ NodeType.DataNode,
+ dataNodeLocation.getDataNodeId(),
+ NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown));
LOGGER.info(
"[ShutdownHook] The DataNode-{} will be shutdown soon, mark it as Unknown",
dataNodeLocation.getDataNodeId());
@@ -431,12 +432,7 @@
.map(TDataNodeConfiguration::getLocation)
.sorted(Comparator.comparingInt(TDataNodeLocation::getDataNodeId))
.collect(Collectors.toList());
- Map<Integer, String> nodeStatus = new HashMap<>();
- getNodeManager()
- .getNodeCacheMap()
- .forEach(
- (nodeId, heartbeatCache) ->
- nodeStatus.put(nodeId, heartbeatCache.getNodeStatusWithReason()));
+ Map<Integer, String> nodeStatus = getLoadManager().getNodeStatusWithReason();
return new TShowClusterResp(status, configNodeLocations, dataNodeInfoLocations, nodeStatus);
} else {
return new TShowClusterResp(status, new ArrayList<>(), new ArrayList<>(), new HashMap<>());
@@ -1131,10 +1127,11 @@
TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
// Force updating the target ConfigNode's status to Unknown
- getNodeManager()
- .getNodeCacheMap()
- .get(configNodeLocation.getConfigNodeId())
- .forceUpdate(NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown));
+ getLoadManager()
+ .forceUpdateNodeCache(
+ NodeType.ConfigNode,
+ configNodeLocation.getConfigNodeId(),
+ NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown));
LOGGER.info(
"[ShutdownHook] The ConfigNode-{} will be shutdown soon, mark it as Unknown",
configNodeLocation.getConfigNodeId());
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 6b03eea..93d73b5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -40,7 +40,6 @@
import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache;
import org.apache.iotdb.confignode.persistence.ProcedureInfo;
import org.apache.iotdb.confignode.procedure.Procedure;
import org.apache.iotdb.confignode.procedure.ProcedureExecutor;
@@ -93,7 +92,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -346,13 +344,33 @@
}
public TSStatus migrateRegion(TMigrateRegionReq migrateRegionReq) {
- // TODO: Whether to guarantee the check high consistency, i.e, use consensus read to check
- Map<TConsensusGroupId, RegionGroupCache> regionReplicaMap =
- configManager.getPartitionManager().getRegionGroupCacheMap();
- Optional<TConsensusGroupId> regionId =
- regionReplicaMap.keySet().stream()
- .filter(id -> id.getId() == migrateRegionReq.getRegionId())
- .findAny();
+ TConsensusGroupId regionGroupId;
+ if (configManager
+ .getPartitionManager()
+ .isRegionGroupExists(
+ new TConsensusGroupId(
+ TConsensusGroupType.SchemaRegion, migrateRegionReq.getRegionId()))) {
+ regionGroupId =
+ new TConsensusGroupId(TConsensusGroupType.SchemaRegion, migrateRegionReq.getRegionId());
+ } else if (configManager
+ .getPartitionManager()
+ .isRegionGroupExists(
+ new TConsensusGroupId(
+ TConsensusGroupType.DataRegion, migrateRegionReq.getRegionId()))) {
+ regionGroupId =
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, migrateRegionReq.getRegionId());
+ } else {
+ LOGGER.warn(
+ "Submit RegionMigrateProcedure failed, because RegionGroup: {} doesn't exist",
+ migrateRegionReq.getRegionId());
+ TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+ status.setMessage(
+ String.format(
+ "Submit RegionMigrateProcedure failed, because RegionGroup: %s doesn't exist",
+ migrateRegionReq.getRegionId()));
+ return status;
+ }
+
TDataNodeLocation originalDataNode =
configManager
.getNodeManager()
@@ -363,18 +381,7 @@
.getNodeManager()
.getRegisteredDataNode(migrateRegionReq.getToId())
.getLocation();
- if (!regionId.isPresent()) {
- LOGGER.warn(
- "Submit RegionMigrateProcedure failed, because no Region {}",
- migrateRegionReq.getRegionId());
- TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
- status.setMessage(
- "Submit RegionMigrateProcedure failed, because no region Group "
- + migrateRegionReq.getRegionId());
- return status;
- }
- Set<Integer> dataNodesInRegion =
- regionReplicaMap.get(regionId.get()).getStatistics().getRegionStatisticsMap().keySet();
+
if (originalDataNode == null) {
LOGGER.warn(
"Submit RegionMigrateProcedure failed, because no original DataNode {}",
@@ -393,7 +400,9 @@
"Submit RegionMigrateProcedure failed, because no target DataNode "
+ migrateRegionReq.getToId());
return status;
- } else if (!dataNodesInRegion.contains(migrateRegionReq.getFromId())) {
+ } else if (configManager.getPartitionManager()
+ .getAllReplicaSets(originalDataNode.getDataNodeId()).stream()
+ .noneMatch(replicaSet -> replicaSet.getRegionId().equals(regionGroupId))) {
LOGGER.warn(
"Submit RegionMigrateProcedure failed, because the original DataNode {} doesn't contain Region {}",
migrateRegionReq.getFromId(),
@@ -405,7 +414,9 @@
+ " doesn't contain Region "
+ migrateRegionReq.getRegionId());
return status;
- } else if (dataNodesInRegion.contains(migrateRegionReq.getToId())) {
+ } else if (configManager.getPartitionManager().getAllReplicaSets(destDataNode.getDataNodeId())
+ .stream()
+ .anyMatch(replicaSet -> replicaSet.getRegionId().equals(regionGroupId))) {
LOGGER.warn(
"Submit RegionMigrateProcedure failed, because the target DataNode {} already contains Region {}",
migrateRegionReq.getToId(),
@@ -425,10 +436,8 @@
.map(TDataNodeConfiguration::getLocation)
.map(TDataNodeLocation::getDataNodeId)
.collect(Collectors.toSet());
- if (configManager
- .getNodeManager()
- .getNodeStatusByNodeId(migrateRegionReq.getFromId())
- .equals(NodeStatus.Unknown)) {
+ if (NodeStatus.Unknown.equals(
+ configManager.getLoadManager().getNodeStatus(migrateRegionReq.getFromId()))) {
LOGGER.warn(
"Submit RegionMigrateProcedure failed, because the sourceDataNode {} is Unknown.",
migrateRegionReq.getFromId());
@@ -439,18 +448,8 @@
+ " is Unknown.");
return status;
}
- dataNodesInRegion.retainAll(aliveDataNodes);
- if (dataNodesInRegion.isEmpty()) {
- LOGGER.warn(
- "Submit RegionMigrateProcedure failed, because all of the DataNodes in Region Group {} is unavailable.",
- migrateRegionReq.getRegionId());
- TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
- status.setMessage(
- "Submit RegionMigrateProcedure failed, because all of the DataNodes in Region Group "
- + migrateRegionReq.getRegionId()
- + " are unavailable.");
- return status;
- } else if (!aliveDataNodes.contains(migrateRegionReq.getToId())) {
+
+ if (!aliveDataNodes.contains(migrateRegionReq.getToId())) {
LOGGER.warn(
"Submit RegionMigrateProcedure failed, because the destDataNode {} is ReadOnly or Unknown.",
migrateRegionReq.getToId());
@@ -462,7 +461,7 @@
return status;
}
this.executor.submitProcedure(
- new RegionMigrateProcedure(regionId.get(), originalDataNode, destDataNode));
+ new RegionMigrateProcedure(regionGroupId, originalDataNode, destDataNode));
LOGGER.info(
"Submit RegionMigrateProcedure successfully, Region: {}, From: {}, To: {}",
migrateRegionReq.getRegionId(),
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java
index c9ffdd6..021ef30 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java
@@ -25,8 +25,8 @@
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.manager.node.NodeManager;
-import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
@@ -46,12 +46,15 @@
*/
public class RetryFailedTasksThread {
+ // TODO: Replace this class by cluster events
+
private static final Logger LOGGER = LoggerFactory.getLogger(RetryFailedTasksThread.class);
private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
private static final long HEARTBEAT_INTERVAL = CONF.getHeartbeatIntervalInMs();
private final IManager configManager;
private final NodeManager nodeManager;
+ private final LoadManager loadManager;
private final ScheduledExecutorService retryFailTasksExecutor =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-RetryFailedTasks-Service");
private final Object scheduleMonitor = new Object();
@@ -63,6 +66,7 @@
public RetryFailedTasksThread(IManager configManager) {
this.configManager = configManager;
this.nodeManager = configManager.getNodeManager();
+ this.loadManager = configManager.getLoadManager();
this.oldUnknownNodes = new HashSet<>();
}
@@ -113,15 +117,12 @@
.forEach(
DataNodeConfiguration -> {
TDataNodeLocation dataNodeLocation = DataNodeConfiguration.getLocation();
- BaseNodeCache newestNodeInformation =
- nodeManager.getNodeCacheMap().get(dataNodeLocation.dataNodeId);
- if (newestNodeInformation != null) {
- if (newestNodeInformation.getNodeStatus() == NodeStatus.Running) {
- oldUnknownNodes.remove(dataNodeLocation);
- } else if (!oldUnknownNodes.contains(dataNodeLocation)
- && newestNodeInformation.getNodeStatus() == NodeStatus.Unknown) {
- newUnknownNodes.add(dataNodeLocation);
- }
+ NodeStatus nodeStatus = loadManager.getNodeStatus(dataNodeLocation.getDataNodeId());
+ if (nodeStatus == NodeStatus.Running) {
+ oldUnknownNodes.remove(dataNodeLocation);
+ } else if (!oldUnknownNodes.contains(dataNodeLocation)
+ && nodeStatus == NodeStatus.Unknown) {
+ newUnknownNodes.add(dataNodeLocation);
}
});
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 932f403..2aaa730 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -16,24 +16,18 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.confignode.manager.load;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.cluster.NodeType;
+import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.commons.partition.DataPartitionTable;
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
-import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
-import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException;
@@ -42,31 +36,21 @@
import org.apache.iotdb.confignode.manager.load.balancer.PartitionBalancer;
import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
-import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap;
-import org.apache.iotdb.confignode.manager.node.NodeManager;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeStatistics;
-import org.apache.iotdb.confignode.manager.observer.NodeStatisticsEvent;
-import org.apache.iotdb.confignode.manager.partition.PartitionManager;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupStatistics;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.service.HeartbeatService;
+import org.apache.iotdb.confignode.manager.load.service.StatisticsService;
+import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
-import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
-import org.apache.iotdb.tsfile.utils.Pair;
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
/**
* The LoadManager at ConfigNodeGroup-Leader is active. It proactively implements the cluster
@@ -74,25 +58,19 @@
*/
public class LoadManager {
- private static final Logger LOGGER = LoggerFactory.getLogger(LoadManager.class);
-
- private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
- private static final long HEARTBEAT_INTERVAL = CONF.getHeartbeatIntervalInMs();
-
private final IManager configManager;
- /** Balancers */
+ /** Balancers. */
private final RegionBalancer regionBalancer;
private final PartitionBalancer partitionBalancer;
private final RouteBalancer routeBalancer;
- /** Load statistics executor service */
- private Future<?> currentLoadStatisticsFuture;
+ /** Cluster load services. */
+ private final LoadCache loadCache;
- private final ScheduledExecutorService loadStatisticsExecutor =
- IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-LoadStatistics-Service");
- private final Object scheduleMonitor = new Object();
+ private final HeartbeatService heartbeatService;
+ private final StatisticsService statisticsService;
private final EventBus eventBus =
new AsyncEventBus("LoadManager-EventBus", Executors.newFixedThreadPool(5));
@@ -104,11 +82,16 @@
this.partitionBalancer = new PartitionBalancer(configManager);
this.routeBalancer = new RouteBalancer(configManager);
+ this.loadCache = new LoadCache();
+ this.heartbeatService = new HeartbeatService(configManager, loadCache);
+ this.statisticsService =
+ new StatisticsService(configManager, routeBalancer, loadCache, eventBus);
+
eventBus.register(configManager.getClusterSchemaManager());
}
/**
- * Generate an optimal CreateRegionGroupsPlan
+ * Generate an optimal CreateRegionGroupsPlan.
*
* @param allotmentMap Map<StorageGroupName, Region allotment>
* @param consensusGroupType TConsensusGroupType of RegionGroup to be allocated
@@ -123,7 +106,7 @@
}
/**
- * Allocate SchemaPartitions
+ * Allocate SchemaPartitions.
*
* @param unassignedSchemaPartitionSlotsMap SchemaPartitionSlots that should be assigned
* @return Map<StorageGroupName, SchemaPartitionTable>, the allocating result
@@ -135,7 +118,7 @@
}
/**
- * Allocate DataPartitions
+ * Allocate DataPartitions.
*
* @param unassignedDataPartitionSlotsMap DataPartitionSlots that should be assigned
* @return Map<StorageGroupName, DataPartitionTable>, the allocating result
@@ -182,182 +165,197 @@
return routeBalancer.getLatestRegionPriorityMap();
}
- /** Start the load statistics service */
- public void startLoadStatisticsService() {
- synchronized (scheduleMonitor) {
- if (currentLoadStatisticsFuture == null) {
- currentLoadStatisticsFuture =
- ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
- loadStatisticsExecutor,
- this::updateLoadStatistics,
- 0,
- HEARTBEAT_INTERVAL,
- TimeUnit.MILLISECONDS);
- LOGGER.info("LoadStatistics service is started successfully.");
- }
- }
- }
-
- /** Stop the load statistics service */
- public void stopLoadStatisticsService() {
- synchronized (scheduleMonitor) {
- if (currentLoadStatisticsFuture != null) {
- currentLoadStatisticsFuture.cancel(false);
- currentLoadStatisticsFuture = null;
- LOGGER.info("LoadStatistics service is stopped successfully.");
- }
- }
- }
-
- private void updateLoadStatistics() {
- // Broadcast the RegionRouteMap if some LoadStatistics has changed
- boolean isNeedBroadcast = false;
-
- // Update NodeStatistics:
- // Pair<NodeStatistics, NodeStatistics>:left one means the current NodeStatistics, right one
- // means the previous NodeStatistics
- Map<Integer, Pair<NodeStatistics, NodeStatistics>> differentNodeStatisticsMap =
- new ConcurrentHashMap<>();
- getNodeManager()
- .getNodeCacheMap()
- .forEach(
- (nodeId, nodeCache) -> {
- NodeStatistics preNodeStatistics = nodeCache.getPreviousStatistics().deepCopy();
- if (nodeCache.periodicUpdate()) {
- // Update and record the changed NodeStatistics
- differentNodeStatisticsMap.put(
- nodeId, new Pair<>(nodeCache.getStatistics(), preNodeStatistics));
- }
- });
- if (!differentNodeStatisticsMap.isEmpty()) {
- isNeedBroadcast = true;
- recordNodeStatistics(differentNodeStatisticsMap);
- eventBus.post(new NodeStatisticsEvent(differentNodeStatisticsMap));
- }
-
- // Update RegionGroupStatistics
- Map<TConsensusGroupId, RegionGroupStatistics> differentRegionGroupStatisticsMap =
- new ConcurrentHashMap<>();
- getPartitionManager()
- .getRegionGroupCacheMap()
- .forEach(
- (regionGroupId, regionGroupCache) -> {
- if (regionGroupCache.periodicUpdate()) {
- // Update and record the changed RegionGroupStatistics
- differentRegionGroupStatisticsMap.put(
- regionGroupId, regionGroupCache.getStatistics());
- }
- });
- if (!differentRegionGroupStatisticsMap.isEmpty()) {
- isNeedBroadcast = true;
- recordRegionGroupStatistics(differentRegionGroupStatisticsMap);
- }
-
- // Update RegionRouteMap
- if (routeBalancer.updateRegionRouteMap()) {
- isNeedBroadcast = true;
- recordRegionRouteMap(routeBalancer.getRegionRouteMap());
- }
-
- if (isNeedBroadcast) {
- broadcastLatestRegionRouteMap();
- }
- }
-
- private void recordNodeStatistics(
- Map<Integer, Pair<NodeStatistics, NodeStatistics>> differentNodeStatisticsMap) {
- LOGGER.info("[UpdateLoadStatistics] NodeStatisticsMap: ");
- for (Map.Entry<Integer, Pair<NodeStatistics, NodeStatistics>> nodeCacheEntry :
- differentNodeStatisticsMap.entrySet()) {
- LOGGER.info(
- "[UpdateLoadStatistics]\t {}={}",
- "nodeId{" + nodeCacheEntry.getKey() + "}",
- nodeCacheEntry.getValue().left);
- }
- }
-
- private void recordRegionGroupStatistics(
- Map<TConsensusGroupId, RegionGroupStatistics> differentRegionGroupStatisticsMap) {
- LOGGER.info("[UpdateLoadStatistics] RegionGroupStatisticsMap: ");
- for (Map.Entry<TConsensusGroupId, RegionGroupStatistics> regionGroupStatisticsEntry :
- differentRegionGroupStatisticsMap.entrySet()) {
- LOGGER.info("[UpdateLoadStatistics]\t RegionGroup: {}", regionGroupStatisticsEntry.getKey());
- LOGGER.info("[UpdateLoadStatistics]\t {}", regionGroupStatisticsEntry.getValue());
- for (Map.Entry<Integer, RegionStatistics> regionStatisticsEntry :
- regionGroupStatisticsEntry.getValue().getRegionStatisticsMap().entrySet()) {
- LOGGER.info(
- "[UpdateLoadStatistics]\t dataNodeId{}={}",
- regionStatisticsEntry.getKey(),
- regionStatisticsEntry.getValue());
- }
- }
- }
-
- private void recordRegionRouteMap(RegionRouteMap regionRouteMap) {
- LOGGER.info("[UpdateLoadStatistics] RegionLeaderMap: ");
- for (Map.Entry<TConsensusGroupId, Integer> regionLeaderEntry :
- regionRouteMap.getRegionLeaderMap().entrySet()) {
- LOGGER.info(
- "[UpdateLoadStatistics]\t {}={}",
- regionLeaderEntry.getKey(),
- regionLeaderEntry.getValue());
- }
-
- LOGGER.info("[UpdateLoadStatistics] RegionPriorityMap: ");
- for (Map.Entry<TConsensusGroupId, TRegionReplicaSet> regionPriorityEntry :
- regionRouteMap.getRegionPriorityMap().entrySet()) {
- LOGGER.info(
- "[UpdateLoadStatistics]\t {}={}",
- regionPriorityEntry.getKey(),
- regionPriorityEntry.getValue().getDataNodeLocations().stream()
- .map(TDataNodeLocation::getDataNodeId)
- .collect(Collectors.toList()));
- }
- }
-
public void broadcastLatestRegionRouteMap() {
- Map<TConsensusGroupId, TRegionReplicaSet> latestRegionRouteMap = getLatestRegionRouteMap();
- Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
- // Broadcast the RegionRouteMap to all DataNodes except the unknown ones
- getNodeManager()
- .filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.Removing, NodeStatus.ReadOnly)
- .forEach(
- onlineDataNode ->
- dataNodeLocationMap.put(
- onlineDataNode.getLocation().getDataNodeId(), onlineDataNode.getLocation()));
-
- LOGGER.info("[UpdateLoadStatistics] Begin to broadcast RegionRouteMap:");
- long broadcastTime = System.currentTimeMillis();
-
- AsyncClientHandler<TRegionRouteReq, TSStatus> clientHandler =
- new AsyncClientHandler<>(
- DataNodeRequestType.UPDATE_REGION_ROUTE_MAP,
- new TRegionRouteReq(broadcastTime, latestRegionRouteMap),
- dataNodeLocationMap);
- AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
- LOGGER.info("[UpdateLoadStatistics] Broadcast the latest RegionRouteMap finished.");
+ statisticsService.broadcastLatestRegionRouteMap();
}
- /** Initialize all kinds of the HeartbeatCache when the ConfigNode-Leader is switched */
- public void initHeartbeatCache() {
- getNodeManager().initNodeHeartbeatCache();
- getPartitionManager().initRegionGroupHeartbeatCache();
+ public void startLoadServices() {
+ loadCache.initHeartbeatCache(configManager);
routeBalancer.initRegionRouteMap();
+ heartbeatService.startHeartbeatService();
+ statisticsService.startLoadStatisticsService();
+ }
+
+ public void stopLoadServices() {
+ heartbeatService.stopHeartbeatService();
+ statisticsService.stopLoadStatisticsService();
+ loadCache.clearHeartbeatCache();
}
public RouteBalancer getRouteBalancer() {
return routeBalancer;
}
- private NodeManager getNodeManager() {
- return configManager.getNodeManager();
+ /**
+ * Safely get NodeStatus by NodeId.
+ *
+ * @param nodeId The specified NodeId
+ * @return NodeStatus of the specified Node. Unknown if cache doesn't exist.
+ */
+ public NodeStatus getNodeStatus(int nodeId) {
+ return loadCache.getNodeStatus(nodeId);
}
- private PartitionManager getPartitionManager() {
- return configManager.getPartitionManager();
+ /**
+ * Safely get the specified Node's current status with reason.
+ *
+ * @param nodeId The specified NodeId
+ * @return The specified Node's current status if the nodeCache contains it, Unknown otherwise
+ */
+ public String getNodeStatusWithReason(int nodeId) {
+ return loadCache.getNodeStatusWithReason(nodeId);
}
- public EventBus getEventBus() {
- return eventBus;
+ /**
+ * Get all Node's current status with reason.
+ *
+ * @return Map<NodeId, NodeStatus with reason>
+ */
+ public Map<Integer, String> getNodeStatusWithReason() {
+ return loadCache.getNodeStatusWithReason();
+ }
+
+ /**
+ * Filter ConfigNodes through the specified NodeStatus.
+ *
+ * @param status The specified NodeStatus
+ * @return Filtered ConfigNodes with the specified NodeStatus
+ */
+ public List<Integer> filterConfigNodeThroughStatus(NodeStatus... status) {
+ return loadCache.filterConfigNodeThroughStatus(status);
+ }
+
+ /**
+ * Filter DataNodes through the specified NodeStatus.
+ *
+ * @param status The specified NodeStatus
+ * @return Filtered DataNodes with the specified NodeStatus
+ */
+ public List<Integer> filterDataNodeThroughStatus(NodeStatus... status) {
+ return loadCache.filterDataNodeThroughStatus(status);
+ }
+
+ /**
+ * Get the free disk space of the specified DataNode.
+ *
+ * @param dataNodeId The index of the specified DataNode
+ * @return The free disk space that sample through heartbeat, 0 if no heartbeat received
+ */
+ public double getFreeDiskSpace(int dataNodeId) {
+ return loadCache.getFreeDiskSpace(dataNodeId);
+ }
+
+ /**
+ * Get the loadScore of each DataNode.
+ *
+ * @return Map<DataNodeId, loadScore>
+ */
+ public Map<Integer, Long> getAllDataNodeLoadScores() {
+ return loadCache.getAllDataNodeLoadScores();
+ }
+
+ /**
+ * Get the lowest loadScore DataNode.
+ *
+ * @return The index of the lowest loadScore DataNode. -1 if no DataNode heartbeat received.
+ */
+ public int getLowestLoadDataNode() {
+ return loadCache.getLowestLoadDataNode();
+ }
+
+ /**
+ * Get the lowest loadScore DataNode from the specified DataNodes.
+ *
+ * @param dataNodeIds The specified DataNodes
+ * @return The index of the lowest loadScore DataNode. -1 if no DataNode heartbeat received.
+ */
+ public int getLowestLoadDataNode(List<Integer> dataNodeIds) {
+ return loadCache.getLowestLoadDataNode(dataNodeIds);
+ }
+
+ /**
+ * Force update the specified Node's cache.
+ *
+ * @param nodeType Specified NodeType
+ * @param nodeId Specified NodeId
+ * @param heartbeatSample Specified NodeHeartbeatSample
+ */
+ public void forceUpdateNodeCache(
+ NodeType nodeType, int nodeId, NodeHeartbeatSample heartbeatSample) {
+ loadCache.forceUpdateNodeCache(nodeType, nodeId, heartbeatSample);
+ }
+
+ /** Remove the specified Node's cache. */
+ public void removeNodeCache(int nodeId) {
+ loadCache.removeNodeCache(nodeId);
+ }
+
+ /**
+ * Safely get RegionStatus.
+ *
+ * @param consensusGroupId Specified RegionGroupId
+ * @param dataNodeId Specified RegionReplicaId
+ * @return Corresponding RegionStatus if cache exists, Unknown otherwise
+ */
+ public RegionStatus getRegionStatus(TConsensusGroupId consensusGroupId, int dataNodeId) {
+ return loadCache.getRegionStatus(consensusGroupId, dataNodeId);
+ }
+
+ /**
+ * Safely get RegionGroupStatus.
+ *
+ * @param consensusGroupId Specified RegionGroupId
+ * @return Corresponding RegionGroupStatus if cache exists, Disabled otherwise
+ */
+ public RegionGroupStatus getRegionGroupStatus(TConsensusGroupId consensusGroupId) {
+ return loadCache.getRegionGroupStatus(consensusGroupId);
+ }
+
+ /**
+ * Safely get RegionGroupStatus.
+ *
+ * @param consensusGroupIds Specified RegionGroupIds
+ * @return Corresponding RegionGroupStatus if cache exists, Disabled otherwise
+ */
+ public Map<TConsensusGroupId, RegionGroupStatus> getRegionGroupStatus(
+ List<TConsensusGroupId> consensusGroupIds) {
+ return loadCache.getRegionGroupStatus(consensusGroupIds);
+ }
+
+ /**
+ * Filter the RegionGroups through the RegionGroupStatus.
+ *
+ * @param status The specified RegionGroupStatus
+ * @return Filtered RegionGroups with the specified RegionGroupStatus
+ */
+ public List<TConsensusGroupId> filterRegionGroupThroughStatus(RegionGroupStatus... status) {
+ return loadCache.filterRegionGroupThroughStatus(status);
+ }
+
+ /**
+ * Count the number of cluster Regions with specified RegionStatus.
+ *
+ * @param type The specified RegionGroupType
+ * @param status The specified statues
+ * @return The number of cluster Regions with specified RegionStatus
+ */
+ public int countRegionWithSpecifiedStatus(TConsensusGroupType type, RegionStatus... status) {
+ return loadCache.countRegionWithSpecifiedStatus(type, status);
+ }
+
+ /**
+ * Force update the specified RegionGroup's cache.
+ *
+ * @param regionGroupId Specified RegionGroupId
+ * @param heartbeatSampleMap Specified RegionHeartbeatSampleMap
+ */
+ public void forceUpdateRegionGroupCache(
+ TConsensusGroupId regionGroupId, Map<Integer, RegionHeartbeatSample> heartbeatSampleMap) {
+ loadCache.forceUpdateRegionGroupCache(regionGroupId, heartbeatSampleMap);
+ }
+
+ /** Remove the specified RegionGroup's cache. */
+ public void removeRegionGroupCache(TConsensusGroupId consensusGroupId) {
+ loadCache.removeRegionGroupCache(consensusGroupId);
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
index 882a5ee..f6d1524 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
@@ -29,6 +29,7 @@
import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.manager.load.balancer.region.CopySetRegionGroupAllocator;
import org.apache.iotdb.confignode.manager.load.balancer.region.GreedyRegionGroupAllocator;
import org.apache.iotdb.confignode.manager.load.balancer.region.IRegionGroupAllocator;
@@ -111,7 +112,7 @@
dataNodeConfiguration -> {
int dataNodeId = dataNodeConfiguration.getLocation().getDataNodeId();
availableDataNodeMap.put(dataNodeId, dataNodeConfiguration);
- freeDiskSpaceMap.put(dataNodeId, getNodeManager().getFreeDiskSpace(dataNodeId));
+ freeDiskSpaceMap.put(dataNodeId, getLoadManager().getFreeDiskSpace(dataNodeId));
});
// Generate allocation plan
@@ -145,6 +146,10 @@
return configManager.getPartitionManager();
}
+ private LoadManager getLoadManager() {
+ return configManager.getLoadManager();
+ }
+
public enum RegionGroupAllocatePolicy {
COPY_SET,
GREEDY
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index 31f2839..a87cb05 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -33,6 +33,7 @@
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap;
import org.apache.iotdb.confignode.manager.load.balancer.router.leader.GreedyLeaderBalancer;
import org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer;
@@ -91,14 +92,14 @@
private static final boolean IS_DATA_REGION_IOT_CONSENSUS =
ConsensusFactory.IOT_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS);
- private final IManager configManager;
-
// Key: RegionGroupId
// Value: Pair<Timestamp, LeaderDataNodeId>, where
// the left value stands for sampling timestamp
// and the right value stands for the index of DataNode that leader resides.
private final Map<TConsensusGroupId, Pair<Long, Integer>> leaderCache;
+ private final IManager configManager;
+
/** RegionRouteMap */
private final RegionRouteMap regionRouteMap;
// For generating optimal RegionLeaderMap
@@ -107,6 +108,7 @@
private final IPriorityBalancer priorityRouter;
/** Leader Balancing service */
+ // TODO: leader balancing should be triggered by cluster events
private Future<?> currentLeaderBalancingFuture;
private final ScheduledExecutorService leaderBalancingExecutor =
@@ -115,9 +117,8 @@
public RouteBalancer(IManager configManager) {
this.configManager = configManager;
-
- this.leaderCache = new ConcurrentHashMap<>();
this.regionRouteMap = new RegionRouteMap();
+ this.leaderCache = new ConcurrentHashMap<>();
switch (CONF.getLeaderDistributionPolicy()) {
case ILeaderBalancer.GREEDY_POLICY:
@@ -141,10 +142,10 @@
}
/**
- * Cache the newest leaderHeartbeatSample
+ * Cache the latest leader of a RegionGroup.
*
- * @param regionGroupId Corresponding RegionGroup's index
- * @param leaderSample <Sample timestamp, leaderDataNodeId>, The newest HeartbeatSample
+ * @param regionGroupId the id of the RegionGroup
+ * @param leaderSample the latest leader of a RegionGroup
*/
public void cacheLeaderSample(TConsensusGroupId regionGroupId, Pair<Long, Integer> leaderSample) {
if (TConsensusGroupType.DataRegion.equals(regionGroupId.getType())
@@ -193,7 +194,7 @@
private boolean updateRegionPriorityMap() {
Map<TConsensusGroupId, Integer> regionLeaderMap = regionRouteMap.getRegionLeaderMap();
- Map<Integer, Long> dataNodeLoadScoreMap = getNodeManager().getAllLoadScores();
+ Map<Integer, Long> dataNodeLoadScoreMap = getLoadManager().getAllDataNodeLoadScores();
// Balancing region priority in each SchemaRegionGroup
Map<TConsensusGroupId, TRegionReplicaSet> latestRegionPriorityMap =
@@ -412,4 +413,8 @@
private PartitionManager getPartitionManager() {
return configManager.getPartitionManager();
}
+
+ private LoadManager getLoadManager() {
+ return configManager.getLoadManager();
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
new file mode 100644
index 0000000..0b2fd71
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.cache;
+
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.NodeType;
+import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.load.cache.node.BaseNodeCache;
+import org.apache.iotdb.confignode.manager.load.cache.node.ConfigNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.cache.node.DataNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupCache;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/** Maintain all kinds of heartbeat samples. */
+public class LoadCache {
+
+ // Map<NodeId, INodeCache>
+ private final Map<Integer, BaseNodeCache> nodeCacheMap;
+ // Map<RegionGroupId, RegionGroupCache>
+ private final Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap;
+
+ public LoadCache() {
+ this.nodeCacheMap = new ConcurrentHashMap<>();
+ this.regionGroupCacheMap = new ConcurrentHashMap<>();
+ }
+
+ public void initHeartbeatCache(IManager configManager) {
+ initNodeHeartbeatCache(
+ configManager.getNodeManager().getRegisteredConfigNodes(),
+ configManager.getNodeManager().getRegisteredDataNodes());
+ initRegionGroupHeartbeatCache(configManager.getPartitionManager().getAllReplicaSets());
+ }
+
+ /** Initialize the nodeCacheMap when the ConfigNode-Leader is switched. */
+ private void initNodeHeartbeatCache(
+ List<TConfigNodeLocation> registeredConfigNodes,
+ List<TDataNodeConfiguration> registeredDataNodes) {
+
+ final int CURRENT_NODE_ID = ConfigNodeHeartbeatCache.CURRENT_NODE_ID;
+ nodeCacheMap.clear();
+
+ // Init ConfigNodeHeartbeatCache
+ registeredConfigNodes.forEach(
+ configNodeLocation -> {
+ int configNodeId = configNodeLocation.getConfigNodeId();
+ if (configNodeId != CURRENT_NODE_ID) {
+ nodeCacheMap.put(configNodeId, new ConfigNodeHeartbeatCache(configNodeId));
+ }
+ });
+ // Force set itself and never update
+ nodeCacheMap.put(
+ ConfigNodeHeartbeatCache.CURRENT_NODE_ID,
+ new ConfigNodeHeartbeatCache(
+ CURRENT_NODE_ID, ConfigNodeHeartbeatCache.CURRENT_NODE_STATISTICS));
+
+ // Init DataNodeHeartbeatCache
+ registeredDataNodes.forEach(
+ dataNodeConfiguration -> {
+ int dataNodeId = dataNodeConfiguration.getLocation().getDataNodeId();
+ nodeCacheMap.put(dataNodeId, new DataNodeHeartbeatCache(dataNodeId));
+ });
+ }
+
+ /** Initialize the regionGroupCacheMap when the ConfigNode-Leader is switched. */
+ private void initRegionGroupHeartbeatCache(List<TRegionReplicaSet> regionReplicaSets) {
+ regionGroupCacheMap.clear();
+ regionReplicaSets.forEach(
+ regionReplicaSet ->
+ regionGroupCacheMap.put(
+ regionReplicaSet.getRegionId(),
+ new RegionGroupCache(regionReplicaSet.getRegionId())));
+ }
+
+ public void clearHeartbeatCache() {
+ nodeCacheMap.clear();
+ regionGroupCacheMap.clear();
+ }
+
+ /**
+ * Cache the latest heartbeat sample of a ConfigNode.
+ *
+ * @param nodeId the id of the ConfigNode
+ * @param sample the latest heartbeat sample
+ */
+ public void cacheConfigNodeHeartbeatSample(int nodeId, NodeHeartbeatSample sample) {
+ nodeCacheMap
+ .computeIfAbsent(nodeId, empty -> new ConfigNodeHeartbeatCache(nodeId))
+ .cacheHeartbeatSample(sample);
+ }
+
+ /**
+ * Cache the latest heartbeat sample of a DataNode.
+ *
+ * @param nodeId the id of the DataNode
+ * @param sample the latest heartbeat sample
+ */
+ public void cacheDataNodeHeartbeatSample(int nodeId, NodeHeartbeatSample sample) {
+ nodeCacheMap
+ .computeIfAbsent(nodeId, empty -> new DataNodeHeartbeatCache(nodeId))
+ .cacheHeartbeatSample(sample);
+ }
+
+ /**
+ * Cache the latest heartbeat sample of a RegionGroup.
+ *
+ * @param regionGroupId the id of the RegionGroup
+ * @param nodeId the id of the DataNode where specified Region resides
+ * @param sample the latest heartbeat sample
+ */
+ public void cacheRegionHeartbeatSample(
+ TConsensusGroupId regionGroupId, int nodeId, RegionHeartbeatSample sample) {
+ regionGroupCacheMap
+ .computeIfAbsent(regionGroupId, empty -> new RegionGroupCache(regionGroupId))
+ .cacheHeartbeatSample(nodeId, sample);
+ }
+
+ /**
+ * Periodic invoke to update the NodeStatistics of all Nodes.
+ *
+ * @return a map of changed NodeStatistics
+ */
+ public Map<Integer, Pair<NodeStatistics, NodeStatistics>> updateNodeStatistics() {
+ Map<Integer, Pair<NodeStatistics, NodeStatistics>> differentNodeStatisticsMap =
+ new ConcurrentHashMap<>();
+ nodeCacheMap.forEach(
+ (nodeId, nodeCache) -> {
+ NodeStatistics preNodeStatistics = nodeCache.getPreviousStatistics().deepCopy();
+ if (nodeCache.periodicUpdate()) {
+ // Update and record the changed NodeStatistics
+ differentNodeStatisticsMap.put(
+ nodeId, new Pair<>(nodeCache.getStatistics(), preNodeStatistics));
+ }
+ });
+ return differentNodeStatisticsMap;
+ }
+
+ /**
+ * Periodic invoke to update the RegionGroupStatistics of all RegionGroups.
+ *
+ * @return a map of changed RegionGroupStatistics
+ */
+ public Map<TConsensusGroupId, RegionGroupStatistics> updateRegionGroupStatistics() {
+ Map<TConsensusGroupId, RegionGroupStatistics> differentRegionGroupStatisticsMap =
+ new ConcurrentHashMap<>();
+ regionGroupCacheMap.forEach(
+ (regionGroupId, regionGroupCache) -> {
+ if (regionGroupCache.periodicUpdate()) {
+ // Update and record the changed RegionGroupStatistics
+ differentRegionGroupStatisticsMap.put(regionGroupId, regionGroupCache.getStatistics());
+ }
+ });
+ return differentRegionGroupStatisticsMap;
+ }
+
+ /**
+ * Safely get NodeStatus by NodeId.
+ *
+ * @param nodeId The specified NodeId
+ * @return NodeStatus of the specified Node. Unknown if cache doesn't exist.
+ */
+ public NodeStatus getNodeStatus(int nodeId) {
+ BaseNodeCache nodeCache = nodeCacheMap.get(nodeId);
+ return nodeCache == null ? NodeStatus.Unknown : nodeCache.getNodeStatus();
+ }
+
+ /**
+ * Safely get the specified Node's current status with reason.
+ *
+ * @param nodeId The specified NodeId
+ * @return The specified Node's current status if the nodeCache contains it, Unknown otherwise
+ */
+ public String getNodeStatusWithReason(int nodeId) {
+ BaseNodeCache nodeCache = nodeCacheMap.get(nodeId);
+ return nodeCache == null
+ ? NodeStatus.Unknown.getStatus() + "(NoHeartbeat)"
+ : nodeCache.getNodeStatusWithReason();
+ }
+
+ /**
+ * Get all Node's current status with reason.
+ *
+ * @return Map<NodeId, NodeStatus with reason>
+ */
+ public Map<Integer, String> getNodeStatusWithReason() {
+ return nodeCacheMap.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getNodeStatusWithReason()));
+ }
+
+ /**
+ * Filter ConfigNodes through the specified NodeStatus.
+ *
+ * @param status The specified NodeStatus
+ * @return Filtered ConfigNodes with the specified NodeStatus
+ */
+ public List<Integer> filterConfigNodeThroughStatus(NodeStatus... status) {
+ return nodeCacheMap.entrySet().stream()
+ .filter(
+ nodeCacheEntry ->
+ nodeCacheEntry.getValue() instanceof ConfigNodeHeartbeatCache
+ && Arrays.stream(status)
+ .anyMatch(s -> s.equals(nodeCacheEntry.getValue().getNodeStatus())))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Filter DataNodes through the specified NodeStatus.
+ *
+ * @param status The specified NodeStatus
+ * @return Filtered DataNodes with the specified NodeStatus
+ */
+ public List<Integer> filterDataNodeThroughStatus(NodeStatus... status) {
+ return nodeCacheMap.entrySet().stream()
+ .filter(
+ nodeCacheEntry ->
+ nodeCacheEntry.getValue() instanceof DataNodeHeartbeatCache
+ && Arrays.stream(status)
+ .anyMatch(s -> s.equals(nodeCacheEntry.getValue().getNodeStatus())))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Get the free disk space of the specified DataNode.
+ *
+ * @param dataNodeId The index of the specified DataNode
+ * @return The free disk space that sample through heartbeat, 0 if no heartbeat received
+ */
+ public double getFreeDiskSpace(int dataNodeId) {
+ DataNodeHeartbeatCache dataNodeHeartbeatCache =
+ (DataNodeHeartbeatCache) nodeCacheMap.get(dataNodeId);
+ return dataNodeHeartbeatCache == null ? 0d : dataNodeHeartbeatCache.getFreeDiskSpace();
+ }
+
+ /**
+ * Get the loadScore of each DataNode.
+ *
+ * @return Map<DataNodeId, loadScore>
+ */
+ public Map<Integer, Long> getAllDataNodeLoadScores() {
+ Map<Integer, Long> result = new ConcurrentHashMap<>();
+ nodeCacheMap.forEach(
+ (dataNodeId, heartbeatCache) -> {
+ if (heartbeatCache instanceof DataNodeHeartbeatCache) {
+ result.put(dataNodeId, heartbeatCache.getLoadScore());
+ }
+ });
+ return result;
+ }
+
+ /**
+ * Get the lowest loadScore DataNode.
+ *
+ * @return The index of the lowest loadScore DataNode. -1 if no DataNode heartbeat received.
+ */
+ public int getLowestLoadDataNode() {
+ return nodeCacheMap.entrySet().stream()
+ .filter(nodeCacheEntry -> nodeCacheEntry.getValue() instanceof DataNodeHeartbeatCache)
+ .min(Comparator.comparingLong(nodeCacheEntry -> nodeCacheEntry.getValue().getLoadScore()))
+ .map(Map.Entry::getKey)
+ .orElse(-1);
+ }
+
+ /**
+ * Get the lowest loadScore DataNode from the specified DataNodes.
+ *
+ * @param dataNodeIds The specified DataNodes
+ * @return The index of the lowest loadScore DataNode. -1 if no DataNode heartbeat received.
+ */
+ public int getLowestLoadDataNode(List<Integer> dataNodeIds) {
+ return dataNodeIds.stream()
+ .map(nodeCacheMap::get)
+ .filter(Objects::nonNull)
+ .min(Comparator.comparingLong(BaseNodeCache::getLoadScore))
+ .map(BaseNodeCache::getNodeId)
+ .orElse(-1);
+ }
+
+ /**
+ * Force update the specified Node's cache.
+ *
+ * @param nodeType Specified NodeType
+ * @param nodeId Specified NodeId
+ * @param heartbeatSample Specified NodeHeartbeatSample
+ */
+ public void forceUpdateNodeCache(
+ NodeType nodeType, int nodeId, NodeHeartbeatSample heartbeatSample) {
+ switch (nodeType) {
+ case ConfigNode:
+ nodeCacheMap
+ .computeIfAbsent(nodeId, empty -> new ConfigNodeHeartbeatCache(nodeId))
+ .forceUpdate(heartbeatSample);
+ break;
+ case DataNode:
+ default:
+ nodeCacheMap
+ .computeIfAbsent(nodeId, empty -> new DataNodeHeartbeatCache(nodeId))
+ .forceUpdate(heartbeatSample);
+ break;
+ }
+ }
+
+ /** Remove the specified Node's cache. */
+ public void removeNodeCache(int nodeId) {
+ nodeCacheMap.remove(nodeId);
+ }
+
+ /**
+ * Safely get RegionStatus.
+ *
+ * @param consensusGroupId Specified RegionGroupId
+ * @param dataNodeId Specified RegionReplicaId
+ * @return Corresponding RegionStatus if cache exists, Unknown otherwise
+ */
+ public RegionStatus getRegionStatus(TConsensusGroupId consensusGroupId, int dataNodeId) {
+ return regionGroupCacheMap.containsKey(consensusGroupId)
+ ? regionGroupCacheMap.get(consensusGroupId).getStatistics().getRegionStatus(dataNodeId)
+ : RegionStatus.Unknown;
+ }
+
+ /**
+ * Safely get RegionGroupStatus.
+ *
+ * @param consensusGroupId Specified RegionGroupId
+ * @return Corresponding RegionGroupStatus if cache exists, Disabled otherwise
+ */
+ public RegionGroupStatus getRegionGroupStatus(TConsensusGroupId consensusGroupId) {
+ return regionGroupCacheMap.containsKey(consensusGroupId)
+ ? regionGroupCacheMap.get(consensusGroupId).getStatistics().getRegionGroupStatus()
+ : RegionGroupStatus.Disabled;
+ }
+
+ /**
+ * Safely get RegionGroupStatus.
+ *
+ * @param consensusGroupIds Specified RegionGroupIds
+ * @return Corresponding RegionGroupStatus if cache exists, Disabled otherwise
+ */
+ public Map<TConsensusGroupId, RegionGroupStatus> getRegionGroupStatus(
+ List<TConsensusGroupId> consensusGroupIds) {
+ Map<TConsensusGroupId, RegionGroupStatus> regionGroupStatusMap = new ConcurrentHashMap<>();
+ for (TConsensusGroupId consensusGroupId : consensusGroupIds) {
+ regionGroupStatusMap.put(consensusGroupId, getRegionGroupStatus(consensusGroupId));
+ }
+ return regionGroupStatusMap;
+ }
+
+ /**
+ * Filter the RegionGroups through the RegionGroupStatus.
+ *
+ * @param status The specified RegionGroupStatus
+ * @return Filtered RegionGroups with the specified RegionGroupStatus
+ */
+ public List<TConsensusGroupId> filterRegionGroupThroughStatus(RegionGroupStatus... status) {
+ return regionGroupCacheMap.entrySet().stream()
+ .filter(
+ regionGroupCacheEntry ->
+ Arrays.stream(status)
+ .anyMatch(
+ s ->
+ s.equals(
+ regionGroupCacheEntry
+ .getValue()
+ .getStatistics()
+ .getRegionGroupStatus())))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Count the number of cluster Regions with specified RegionStatus.
+ *
+ * @param type The specified RegionGroupType
+ * @param status The specified statues
+ * @return The number of cluster Regions with specified RegionStatus
+ */
+ public int countRegionWithSpecifiedStatus(TConsensusGroupType type, RegionStatus... status) {
+ AtomicInteger result = new AtomicInteger(0);
+ regionGroupCacheMap.forEach(
+ (regionGroupId, regionGroupCache) -> {
+ if (type.equals(regionGroupId.getType())) {
+ regionGroupCache
+ .getStatistics()
+ .getRegionStatisticsMap()
+ .values()
+ .forEach(
+ regionStatistics -> {
+ if (Arrays.stream(status)
+ .anyMatch(s -> s.equals(regionStatistics.getRegionStatus()))) {
+ result.getAndIncrement();
+ }
+ });
+ }
+ });
+ return result.get();
+ }
+
+ /**
+ * Force update the specified RegionGroup's cache.
+ *
+ * @param regionGroupId Specified RegionGroupId
+ * @param heartbeatSampleMap Specified RegionHeartbeatSampleMap
+ */
+ public void forceUpdateRegionGroupCache(
+ TConsensusGroupId regionGroupId, Map<Integer, RegionHeartbeatSample> heartbeatSampleMap) {
+ regionGroupCacheMap
+ .computeIfAbsent(regionGroupId, empty -> new RegionGroupCache(regionGroupId))
+ .forceUpdate(heartbeatSampleMap);
+ }
+
+ /** Remove the specified RegionGroup's cache. */
+ public void removeRegionGroupCache(TConsensusGroupId consensusGroupId) {
+ regionGroupCacheMap.remove(consensusGroupId);
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/BaseNodeCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/BaseNodeCache.java
similarity index 74%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/BaseNodeCache.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/BaseNodeCache.java
index 24f9a9e..65a1ccc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/BaseNodeCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/BaseNodeCache.java
@@ -16,13 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.node.heartbeat;
+
+package org.apache.iotdb.confignode.manager.load.cache.node;
import org.apache.iotdb.commons.cluster.NodeStatus;
import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicReference;
-/** All the statistic interfaces that provided by HeartbeatCache */
+/** All the statistic interfaces that provided by HeartbeatCache. */
public abstract class BaseNodeCache {
// When the response time of heartbeat is more than 20s, the Node is considered as down
@@ -31,23 +33,26 @@
// Max heartbeat cache samples store size
public static final int MAXIMUM_WINDOW_SIZE = 100;
+ protected final int nodeId;
+
// SlidingWindow stores the heartbeat sample data
protected final LinkedList<NodeHeartbeatSample> slidingWindow = new LinkedList<>();
// The previous NodeStatistics, used for comparing with
// the current NodeStatistics to initiate notification when they are different
- protected volatile NodeStatistics previousStatistics;
+ protected AtomicReference<NodeStatistics> previousStatistics;
// The current NodeStatistics, used for providing statistics to other services
- protected volatile NodeStatistics currentStatistics;
+ protected AtomicReference<NodeStatistics> currentStatistics;
- /** Constructor for NodeCache with default NodeStatistics */
- protected BaseNodeCache() {
- this.previousStatistics = NodeStatistics.generateDefaultNodeStatistics();
- this.currentStatistics = NodeStatistics.generateDefaultNodeStatistics();
+ /** Constructor for NodeCache with default NodeStatistics. */
+ protected BaseNodeCache(int nodeId) {
+ this.nodeId = nodeId;
+ this.previousStatistics = new AtomicReference<>(NodeStatistics.generateDefaultNodeStatistics());
+ this.currentStatistics = new AtomicReference<>(NodeStatistics.generateDefaultNodeStatistics());
}
/**
- * Cache the newest NodeHeartbeatSample
+ * Cache the newest NodeHeartbeatSample.
*
* @param newHeartbeatSample The newest NodeHeartbeatSample
*/
@@ -69,15 +74,15 @@
/**
* Invoking periodically in the Cluster-LoadStatistics-Service to update currentStatistics and
* compare with the previousStatistics, in order to detect whether the Node's statistics has
- * changed
+ * changed.
*
* @return True if the currentStatistics has changed recently(compare with the
* previousStatistics), false otherwise
*/
public boolean periodicUpdate() {
updateCurrentStatistics();
- if (!currentStatistics.equals(previousStatistics)) {
- previousStatistics = currentStatistics.deepCopy();
+ if (!currentStatistics.get().equals(previousStatistics.get())) {
+ previousStatistics.set(currentStatistics.get());
return true;
} else {
return false;
@@ -103,42 +108,42 @@
}
/**
- * Update currentStatistics based on recent NodeHeartbeatSamples that cached in the slidingWindow
+ * Update currentStatistics based on recent NodeHeartbeatSamples that cached in the slidingWindow.
*/
protected abstract void updateCurrentStatistics();
+ public int getNodeId() {
+ return nodeId;
+ }
+
/**
* TODO: The loadScore of each Node will be changed to Double
*
* @return The latest load score of a node, the higher the score the higher the load
*/
public long getLoadScore() {
- return currentStatistics.getLoadScore();
+ return currentStatistics.get().getLoadScore();
}
- /** @return The current status of the Node */
+ /** @return The current status of the Node. */
public NodeStatus getNodeStatus() {
// Return a copy of status
- return NodeStatus.parse(currentStatistics.getStatus().getStatus());
+ return NodeStatus.parse(currentStatistics.get().getStatus().getStatus());
}
- /** @return The reason why lead to current NodeStatus */
+ /** @return The reason why lead to current NodeStatus. */
public String getNodeStatusWithReason() {
- if (currentStatistics.getStatusReason() == null) {
- return currentStatistics.getStatus().getStatus();
- } else {
- return currentStatistics.getStatus().getStatus()
- + "("
- + currentStatistics.getStatusReason()
- + ")";
- }
+ NodeStatistics statistics = this.currentStatistics.get();
+ return statistics.getStatusReason() == null
+ ? statistics.getStatus().getStatus()
+ : statistics.getStatus().getStatus() + "(" + statistics.getStatusReason() + ")";
}
public NodeStatistics getStatistics() {
- return currentStatistics;
+ return currentStatistics.get();
}
public NodeStatistics getPreviousStatistics() {
- return previousStatistics;
+ return previousStatistics.get();
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/ConfigNodeHeartbeatCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
similarity index 80%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/ConfigNodeHeartbeatCache.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
index 17fc9b1..4e7ee07 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/ConfigNodeHeartbeatCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
@@ -16,40 +16,39 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.node.heartbeat;
+
+package org.apache.iotdb.confignode.manager.load.cache.node;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import java.util.concurrent.atomic.AtomicReference;
+
public class ConfigNodeHeartbeatCache extends BaseNodeCache {
- /** Only get CURRENT_NODE_ID here due to initialization order */
+ /** Only get CURRENT_NODE_ID here due to initialization order. */
public static final int CURRENT_NODE_ID =
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId();
public static final NodeStatistics CURRENT_NODE_STATISTICS =
new NodeStatistics(0, NodeStatus.Running, null);
- private final int configNodeId;
-
- /** Constructor for create ConfigNodeHeartbeatCache with default NodeStatistics */
+ /** Constructor for create ConfigNodeHeartbeatCache with default NodeStatistics. */
public ConfigNodeHeartbeatCache(int configNodeId) {
- super();
- this.configNodeId = configNodeId;
+ super(configNodeId);
}
- /** Constructor only for ConfigNode-leader */
+ /** Constructor only for ConfigNode-leader. */
public ConfigNodeHeartbeatCache(int configNodeId, NodeStatistics statistics) {
- super();
- this.configNodeId = configNodeId;
- this.previousStatistics = statistics;
- this.currentStatistics = statistics;
+ super(configNodeId);
+ this.previousStatistics = new AtomicReference<>(statistics);
+ this.currentStatistics = new AtomicReference<>(statistics);
}
@Override
protected void updateCurrentStatistics() {
// Skip itself
- if (configNodeId == CURRENT_NODE_ID) {
+ if (nodeId == CURRENT_NODE_ID) {
return;
}
@@ -76,9 +75,9 @@
long loadScore = NodeStatus.isNormalStatus(status) ? 0 : Long.MAX_VALUE;
NodeStatistics newStatistics = new NodeStatistics(loadScore, status, null);
- if (!currentStatistics.equals(newStatistics)) {
+ if (!currentStatistics.get().equals(newStatistics)) {
// Update the current NodeStatistics if necessary
- currentStatistics = newStatistics;
+ currentStatistics.set(newStatistics);
}
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/DataNodeHeartbeatCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
similarity index 80%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/DataNodeHeartbeatCache.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
index 5754d27..c53f6dd 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/DataNodeHeartbeatCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
@@ -16,20 +16,23 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.node.heartbeat;
+
+package org.apache.iotdb.confignode.manager.load.cache.node;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.mpp.rpc.thrift.TLoadSample;
-/** DataNodeHeartbeatCache caches and maintains all the heartbeat data */
+import java.util.concurrent.atomic.AtomicReference;
+
+/** DataNodeHeartbeatCache caches and maintains all the heartbeat data. */
public class DataNodeHeartbeatCache extends BaseNodeCache {
- private volatile TLoadSample latestLoadSample;
+ private final AtomicReference<TLoadSample> latestLoadSample;
- /** Constructor for create DataNodeHeartbeatCache with default NodeStatistics */
- public DataNodeHeartbeatCache() {
- super();
- this.latestLoadSample = new TLoadSample();
+ /** Constructor for create DataNodeHeartbeatCache with default NodeStatistics. */
+ public DataNodeHeartbeatCache(int dataNodeId) {
+ super(dataNodeId);
+ this.latestLoadSample = new AtomicReference<>(new TLoadSample());
}
@Override
@@ -44,7 +47,7 @@
/* Update load sample */
if (lastSample != null && lastSample.isSetLoadSample()) {
- latestLoadSample = lastSample.getLoadSample();
+ latestLoadSample.set(lastSample.getLoadSample());
}
/* Update Node status */
@@ -64,13 +67,13 @@
long loadScore = NodeStatus.isNormalStatus(status) ? 0 : Long.MAX_VALUE;
NodeStatistics newStatistics = new NodeStatistics(loadScore, status, statusReason);
- if (!currentStatistics.equals(newStatistics)) {
+ if (!currentStatistics.get().equals(newStatistics)) {
// Update the current NodeStatistics if necessary
- currentStatistics = newStatistics;
+ currentStatistics.set(newStatistics);
}
}
public double getFreeDiskSpace() {
- return latestLoadSample.getFreeDiskSpace();
+ return latestLoadSample.get().getFreeDiskSpace();
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeHeartbeatSample.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java
similarity index 94%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeHeartbeatSample.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java
index dceff72..c188978 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeHeartbeatSample.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java
@@ -16,7 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.node.heartbeat;
+
+package org.apache.iotdb.confignode.manager.load.cache.node;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
@@ -33,7 +34,7 @@
private TLoadSample loadSample = null;
- /** Constructor for ConfigNode sample */
+ /** Constructor for ConfigNode sample. */
public NodeHeartbeatSample(long sendTimestamp, long receiveTimestamp) {
this.sendTimestamp = sendTimestamp;
this.receiveTimestamp = receiveTimestamp;
@@ -41,7 +42,7 @@
this.statusReason = null;
}
- /** Constructor for DataNode sample */
+ /** Constructor for DataNode sample. */
public NodeHeartbeatSample(THeartbeatResp heartbeatResp, long receiveTimestamp) {
this.sendTimestamp = heartbeatResp.getHeartbeatTimestamp();
this.receiveTimestamp = receiveTimestamp;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeStatistics.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeStatistics.java
similarity index 95%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeStatistics.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeStatistics.java
index 627ac00..10399b8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/NodeStatistics.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeStatistics.java
@@ -16,7 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.node.heartbeat;
+
+package org.apache.iotdb.confignode.manager.load.cache.node;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
@@ -111,8 +112,12 @@
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
NodeStatistics that = (NodeStatistics) o;
return loadScore == that.loadScore
&& status == that.status
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
similarity index 89%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionCache.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
index 706b40e..a395c10 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
@@ -16,7 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.partition.heartbeat;
+
+package org.apache.iotdb.confignode.manager.load.cache.region;
import org.apache.iotdb.commons.cluster.RegionStatus;
@@ -24,8 +25,8 @@
import java.util.LinkedList;
import java.util.List;
-import static org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache.HEARTBEAT_TIMEOUT_TIME;
-import static org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache.MAXIMUM_WINDOW_SIZE;
+import static org.apache.iotdb.confignode.manager.load.cache.node.BaseNodeCache.HEARTBEAT_TIMEOUT_TIME;
+import static org.apache.iotdb.confignode.manager.load.cache.node.BaseNodeCache.MAXIMUM_WINDOW_SIZE;
public class RegionCache {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
similarity index 86%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupCache.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
index 7e49067..a531e44 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
@@ -16,7 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.partition.heartbeat;
+
+package org.apache.iotdb.confignode.manager.load.cache.region;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.commons.cluster.RegionStatus;
@@ -25,6 +26,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
public class RegionGroupCache {
@@ -35,21 +37,23 @@
// The previous RegionGroupStatistics, used for comparing with
// the current RegionGroupStatistics to initiate notification when they are different
- protected volatile RegionGroupStatistics previousStatistics;
+ protected AtomicReference<RegionGroupStatistics> previousStatistics;
// The current RegionGroupStatistics, used for providing statistics to other services
- private volatile RegionGroupStatistics currentStatistics;
+ private final AtomicReference<RegionGroupStatistics> currentStatistics;
- /** Constructor for create RegionGroupCache with default RegionGroupStatistics */
+ /** Constructor for create RegionGroupCache with default RegionGroupStatistics. */
public RegionGroupCache(TConsensusGroupId consensusGroupId) {
this.consensusGroupId = consensusGroupId;
this.regionCacheMap = new ConcurrentHashMap<>();
- this.previousStatistics = RegionGroupStatistics.generateDefaultRegionGroupStatistics();
- this.currentStatistics = RegionGroupStatistics.generateDefaultRegionGroupStatistics();
+ this.previousStatistics =
+ new AtomicReference<>(RegionGroupStatistics.generateDefaultRegionGroupStatistics());
+ this.currentStatistics =
+ new AtomicReference<>(RegionGroupStatistics.generateDefaultRegionGroupStatistics());
}
/**
- * Cache the newest RegionHeartbeatSample
+ * Cache the newest RegionHeartbeatSample.
*
* @param dataNodeId Where the specified Region resides
* @param newHeartbeatSample The newest RegionHeartbeatSample
@@ -63,15 +67,15 @@
/**
* Invoking periodically in the Cluster-LoadStatistics-Service to update currentStatistics and
* compare with the previousStatistics, in order to detect whether the RegionGroup's statistics
- * has changed
+ * has changed.
*
* @return True if the currentStatistics has changed recently(compare with the
* previousStatistics), false otherwise
*/
public boolean periodicUpdate() {
updateCurrentStatistics();
- if (!currentStatistics.equals(previousStatistics)) {
- previousStatistics = currentStatistics.deepCopy();
+ if (!currentStatistics.get().equals(previousStatistics.get())) {
+ previousStatistics.set(currentStatistics.get());
return true;
} else {
return false;
@@ -99,7 +103,7 @@
}
/**
- * Update currentStatistics based on recent NodeHeartbeatSamples that cached in the slidingWindow
+ * Update currentStatistics based on recent NodeHeartbeatSamples that cached in the slidingWindow.
*/
protected void updateCurrentStatistics() {
Map<Integer, RegionStatistics> regionStatisticsMap = new HashMap<>();
@@ -114,9 +118,9 @@
RegionGroupStatistics newRegionGroupStatistics =
new RegionGroupStatistics(status, regionStatisticsMap);
- if (!currentStatistics.equals(newRegionGroupStatistics)) {
+ if (!currentStatistics.get().equals(newRegionGroupStatistics)) {
// Update RegionGroupStatistics if necessary
- currentStatistics = newRegionGroupStatistics;
+ currentStatistics.set(newRegionGroupStatistics);
}
}
@@ -156,11 +160,7 @@
}
}
- public void removeCacheIfExists(int dataNodeId) {
- regionCacheMap.remove(dataNodeId);
- }
-
public RegionGroupStatistics getStatistics() {
- return currentStatistics;
+ return currentStatistics.get();
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupStatistics.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupStatistics.java
similarity index 94%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupStatistics.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupStatistics.java
index d36175b..8dc7a4c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupStatistics.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupStatistics.java
@@ -16,7 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.partition.heartbeat;
+
+package org.apache.iotdb.confignode.manager.load.cache.region;
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
@@ -32,8 +33,7 @@
public class RegionGroupStatistics {
- private RegionGroupStatus regionGroupStatus;
-
+ private volatile RegionGroupStatus regionGroupStatus;
private final Map<Integer, RegionStatistics> regionStatisticsMap;
public RegionGroupStatistics() {
@@ -51,7 +51,7 @@
}
/**
- * Get the specified Region's status
+ * Get the specified Region's status.
*
* @param dataNodeId Where the Region resides
* @return Region's latest status if received heartbeat recently, Unknown otherwise
@@ -116,8 +116,12 @@
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
RegionGroupStatistics that = (RegionGroupStatistics) o;
return regionGroupStatus == that.regionGroupStatus
&& regionStatisticsMap.equals(that.regionStatisticsMap);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionHeartbeatSample.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionHeartbeatSample.java
similarity index 95%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionHeartbeatSample.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionHeartbeatSample.java
index 8de58a5..e55497b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionHeartbeatSample.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionHeartbeatSample.java
@@ -16,7 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.partition.heartbeat;
+
+package org.apache.iotdb.confignode.manager.load.cache.region;
import org.apache.iotdb.commons.cluster.RegionStatus;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionStatistics.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionStatistics.java
similarity index 86%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionStatistics.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionStatistics.java
index d30bf43..c8d5106 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionStatistics.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionStatistics.java
@@ -16,7 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.partition.heartbeat;
+
+package org.apache.iotdb.confignode.manager.load.cache.region;
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -47,11 +48,6 @@
return new RegionStatistics(regionStatus);
}
- public RegionHeartbeatSample convertToRegionHeartbeatSample() {
- long currentTime = System.currentTimeMillis();
- return new RegionHeartbeatSample(currentTime, currentTime, regionStatus);
- }
-
public void serialize(OutputStream stream) throws IOException {
ReadWriteIOUtils.write(regionStatus.getStatus(), stream);
}
@@ -68,8 +64,12 @@
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
RegionStatistics that = (RegionStatistics) o;
return regionStatus == that.regionStatus;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
new file mode 100644
index 0000000..18c3915
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.service;
+
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.confignode.client.async.AsyncConfigNodeHeartbeatClientPool;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeHeartbeatClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.heartbeat.ConfigNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.client.async.handlers.heartbeat.DataNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
+import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
+import org.apache.iotdb.confignode.manager.load.cache.node.ConfigNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.node.NodeManager;
+import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Maintain the Cluster-Heartbeat-Service. */
+public class HeartbeatService {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatService.class);
+
+ private static final long HEARTBEAT_INTERVAL =
+ ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs();
+
+ private final IManager configManager;
+ private final LoadCache loadCache;
+
+ /** Heartbeat executor service. */
+ // Monitor for leadership change
+ private final Object heartbeatScheduleMonitor = new Object();
+
+ private Future<?> currentHeartbeatFuture;
+ private final ScheduledExecutorService heartBeatExecutor =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-Heartbeat-Service");
+ private final AtomicInteger heartbeatCounter = new AtomicInteger(0);
+
+ public HeartbeatService(IManager configManager, LoadCache loadCache) {
+ this.configManager = configManager;
+ this.loadCache = loadCache;
+ }
+
+ /** Start the heartbeat service. */
+ public void startHeartbeatService() {
+ synchronized (heartbeatScheduleMonitor) {
+ if (currentHeartbeatFuture == null) {
+ currentHeartbeatFuture =
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ heartBeatExecutor,
+ this::heartbeatLoopBody,
+ 0,
+ HEARTBEAT_INTERVAL,
+ TimeUnit.MILLISECONDS);
+ LOGGER.info("Heartbeat service is started successfully.");
+ }
+ }
+ }
+
+ /** Stop the heartbeat service. */
+ public void stopHeartbeatService() {
+ synchronized (heartbeatScheduleMonitor) {
+ if (currentHeartbeatFuture != null) {
+ currentHeartbeatFuture.cancel(false);
+ currentHeartbeatFuture = null;
+ LOGGER.info("Heartbeat service is stopped successfully.");
+ }
+ }
+ }
+
+ /** loop body of the heartbeat thread. */
+ private void heartbeatLoopBody() {
+ // The consensusManager of configManager may not be fully initialized at this time
+ Optional.ofNullable(getConsensusManager())
+ .ifPresent(
+ consensusManager -> {
+ if (getConsensusManager().isLeader()) {
+ // Generate HeartbeatReq
+ THeartbeatReq heartbeatReq = genHeartbeatReq();
+ // Send heartbeat requests to all the registered ConfigNodes
+ pingRegisteredConfigNodes(
+ heartbeatReq, getNodeManager().getRegisteredConfigNodes());
+ // Send heartbeat requests to all the registered DataNodes
+ pingRegisteredDataNodes(heartbeatReq, getNodeManager().getRegisteredDataNodes());
+ }
+ });
+ }
+
+ private THeartbeatReq genHeartbeatReq() {
+ /* Generate heartbeat request */
+ THeartbeatReq heartbeatReq = new THeartbeatReq();
+ heartbeatReq.setHeartbeatTimestamp(System.currentTimeMillis());
+ // Always sample RegionGroups' leadership as the Region heartbeat
+ heartbeatReq.setNeedJudgeLeader(true);
+ // We sample DataNode's load in every 10 heartbeat loop
+ heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % 10 == 0);
+
+ /* Update heartbeat counter */
+ heartbeatCounter.getAndUpdate(x -> (x + 1) % 10);
+ if (!configManager.getClusterQuotaManager().hasSpaceQuotaLimit()) {
+ heartbeatReq.setSchemaRegionIds(configManager.getClusterQuotaManager().getSchemaRegionIds());
+ heartbeatReq.setDataRegionIds(configManager.getClusterQuotaManager().getDataRegionIds());
+ heartbeatReq.setSpaceQuotaUsage(configManager.getClusterQuotaManager().getSpaceQuotaUsage());
+ }
+ return heartbeatReq;
+ }
+
+ /**
+ * Send heartbeat requests to all the Registered ConfigNodes.
+ *
+ * @param registeredConfigNodes ConfigNodes that registered in cluster
+ */
+ private void pingRegisteredConfigNodes(
+ THeartbeatReq heartbeatReq, List<TConfigNodeLocation> registeredConfigNodes) {
+ // Send heartbeat requests
+ for (TConfigNodeLocation configNodeLocation : registeredConfigNodes) {
+ if (configNodeLocation.getConfigNodeId() == ConfigNodeHeartbeatCache.CURRENT_NODE_ID) {
+ // Skip itself
+ continue;
+ }
+
+ ConfigNodeHeartbeatHandler handler =
+ new ConfigNodeHeartbeatHandler(configNodeLocation.getConfigNodeId(), loadCache);
+ AsyncConfigNodeHeartbeatClientPool.getInstance()
+ .getConfigNodeHeartBeat(
+ configNodeLocation.getInternalEndPoint(),
+ heartbeatReq.getHeartbeatTimestamp(),
+ handler);
+ }
+ }
+
+ /**
+ * Send heartbeat requests to all the Registered DataNodes.
+ *
+ * @param registeredDataNodes DataNodes that registered in cluster
+ */
+ private void pingRegisteredDataNodes(
+ THeartbeatReq heartbeatReq, List<TDataNodeConfiguration> registeredDataNodes) {
+ // Send heartbeat requests
+ for (TDataNodeConfiguration dataNodeInfo : registeredDataNodes) {
+ DataNodeHeartbeatHandler handler =
+ new DataNodeHeartbeatHandler(
+ dataNodeInfo.getLocation().getDataNodeId(),
+ loadCache,
+ configManager.getLoadManager().getRouteBalancer(),
+ configManager.getClusterQuotaManager().getDeviceNum(),
+ configManager.getClusterQuotaManager().getTimeSeriesNum(),
+ configManager.getClusterQuotaManager().getRegionDisk());
+ configManager.getClusterQuotaManager().updateSpaceQuotaUsage();
+ AsyncDataNodeHeartbeatClientPool.getInstance()
+ .getDataNodeHeartBeat(
+ dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler);
+ }
+ }
+
+ private ConsensusManager getConsensusManager() {
+ return configManager.getConsensusManager();
+ }
+
+ private NodeManager getNodeManager() {
+ return configManager.getNodeManager();
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java
new file mode 100644
index 0000000..9e51b15
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.service;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
+import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap;
+import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
+import org.apache.iotdb.confignode.manager.observer.NodeStatisticsEvent;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import com.google.common.eventbus.EventBus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class StatisticsService {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(StatisticsService.class);
+
+ private static final long HEARTBEAT_INTERVAL =
+ ConfigNodeDescriptor.getInstance().getConf().getHeartbeatIntervalInMs();
+
+ private final IManager configManager;
+ private final RouteBalancer routeBalancer;
+ private final LoadCache loadCache;
+ private final EventBus eventBus;
+
+ public StatisticsService(
+ IManager configManager, RouteBalancer routeBalancer, LoadCache loadCache, EventBus eventBus) {
+ this.configManager = configManager;
+ this.routeBalancer = routeBalancer;
+ this.loadCache = loadCache;
+ this.eventBus = eventBus;
+ }
+
+ /** Load statistics executor service. */
+ private final Object statisticsScheduleMonitor = new Object();
+
+ private Future<?> currentLoadStatisticsFuture;
+ private final ScheduledExecutorService loadStatisticsExecutor =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-LoadStatistics-Service");
+
+ /** Start the load statistics service. */
+ public void startLoadStatisticsService() {
+ synchronized (statisticsScheduleMonitor) {
+ if (currentLoadStatisticsFuture == null) {
+ currentLoadStatisticsFuture =
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ loadStatisticsExecutor,
+ this::updateLoadStatistics,
+ 0,
+ HEARTBEAT_INTERVAL,
+ TimeUnit.MILLISECONDS);
+ LOGGER.info("LoadStatistics service is started successfully.");
+ }
+ }
+ }
+
+ /** Stop the load statistics service. */
+ public void stopLoadStatisticsService() {
+ synchronized (statisticsScheduleMonitor) {
+ if (currentLoadStatisticsFuture != null) {
+ currentLoadStatisticsFuture.cancel(false);
+ currentLoadStatisticsFuture = null;
+ LOGGER.info("LoadStatistics service is stopped successfully.");
+ }
+ }
+ }
+
+ private void updateLoadStatistics() {
+ // Broadcast the RegionRouteMap if some LoadStatistics has changed
+ boolean isNeedBroadcast = false;
+
+ // Update NodeStatistics:
+ // Pair<NodeStatistics, NodeStatistics>:left one means the current NodeStatistics, right one
+ // means the previous NodeStatistics
+ Map<Integer, Pair<NodeStatistics, NodeStatistics>> differentNodeStatisticsMap =
+ loadCache.updateNodeStatistics();
+ if (!differentNodeStatisticsMap.isEmpty()) {
+ isNeedBroadcast = true;
+ recordNodeStatistics(differentNodeStatisticsMap);
+ eventBus.post(new NodeStatisticsEvent(differentNodeStatisticsMap));
+ }
+
+ // Update RegionGroupStatistics
+ Map<TConsensusGroupId, RegionGroupStatistics> differentRegionGroupStatisticsMap =
+ loadCache.updateRegionGroupStatistics();
+ if (!differentRegionGroupStatisticsMap.isEmpty()) {
+ isNeedBroadcast = true;
+ recordRegionGroupStatistics(differentRegionGroupStatisticsMap);
+ }
+
+ // Update RegionRouteMap
+ if (routeBalancer.updateRegionRouteMap()) {
+ isNeedBroadcast = true;
+ recordRegionRouteMap(routeBalancer.getRegionRouteMap());
+ }
+
+ if (isNeedBroadcast) {
+ broadcastLatestRegionRouteMap();
+ }
+ }
+
+ private void recordNodeStatistics(
+ Map<Integer, Pair<NodeStatistics, NodeStatistics>> differentNodeStatisticsMap) {
+ LOGGER.info("[UpdateLoadStatistics] NodeStatisticsMap: ");
+ for (Map.Entry<Integer, Pair<NodeStatistics, NodeStatistics>> nodeCacheEntry :
+ differentNodeStatisticsMap.entrySet()) {
+ LOGGER.info(
+ "[UpdateLoadStatistics]\t {}={}",
+ "nodeId{" + nodeCacheEntry.getKey() + "}",
+ nodeCacheEntry.getValue().left);
+ }
+ }
+
+ private void recordRegionGroupStatistics(
+ Map<TConsensusGroupId, RegionGroupStatistics> differentRegionGroupStatisticsMap) {
+ LOGGER.info("[UpdateLoadStatistics] RegionGroupStatisticsMap: ");
+ for (Map.Entry<TConsensusGroupId, RegionGroupStatistics> regionGroupStatisticsEntry :
+ differentRegionGroupStatisticsMap.entrySet()) {
+ LOGGER.info("[UpdateLoadStatistics]\t RegionGroup: {}", regionGroupStatisticsEntry.getKey());
+ LOGGER.info("[UpdateLoadStatistics]\t {}", regionGroupStatisticsEntry.getValue());
+ for (Map.Entry<Integer, RegionStatistics> regionStatisticsEntry :
+ regionGroupStatisticsEntry.getValue().getRegionStatisticsMap().entrySet()) {
+ LOGGER.info(
+ "[UpdateLoadStatistics]\t dataNodeId{}={}",
+ regionStatisticsEntry.getKey(),
+ regionStatisticsEntry.getValue());
+ }
+ }
+ }
+
+ private void recordRegionRouteMap(RegionRouteMap regionRouteMap) {
+ LOGGER.info("[UpdateLoadStatistics] RegionLeaderMap: ");
+ for (Map.Entry<TConsensusGroupId, Integer> regionLeaderEntry :
+ regionRouteMap.getRegionLeaderMap().entrySet()) {
+ LOGGER.info(
+ "[UpdateLoadStatistics]\t {}={}",
+ regionLeaderEntry.getKey(),
+ regionLeaderEntry.getValue());
+ }
+
+ LOGGER.info("[UpdateLoadStatistics] RegionPriorityMap: ");
+ for (Map.Entry<TConsensusGroupId, TRegionReplicaSet> regionPriorityEntry :
+ regionRouteMap.getRegionPriorityMap().entrySet()) {
+ LOGGER.info(
+ "[UpdateLoadStatistics]\t {}={}",
+ regionPriorityEntry.getKey(),
+ regionPriorityEntry.getValue().getDataNodeLocations().stream()
+ .map(TDataNodeLocation::getDataNodeId)
+ .collect(Collectors.toList()));
+ }
+ }
+
+ public void broadcastLatestRegionRouteMap() {
+ Map<TConsensusGroupId, TRegionReplicaSet> latestRegionRouteMap =
+ routeBalancer.getLatestRegionPriorityMap();
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
+ // Broadcast the RegionRouteMap to all DataNodes except the unknown ones
+ configManager
+ .getNodeManager()
+ .filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.Removing, NodeStatus.ReadOnly)
+ .forEach(
+ onlineDataNode ->
+ dataNodeLocationMap.put(
+ onlineDataNode.getLocation().getDataNodeId(), onlineDataNode.getLocation()));
+
+ LOGGER.info("[UpdateLoadStatistics] Begin to broadcast RegionRouteMap:");
+ long broadcastTime = System.currentTimeMillis();
+
+ AsyncClientHandler<TRegionRouteReq, TSStatus> clientHandler =
+ new AsyncClientHandler<>(
+ DataNodeRequestType.UPDATE_REGION_ROUTE_MAP,
+ new TRegionRouteReq(broadcastTime, latestRegionRouteMap),
+ dataNodeLocationMap);
+ AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ LOGGER.info("[UpdateLoadStatistics] Broadcast the latest RegionRouteMap finished.");
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index bcf1167..4cdcf6b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -27,18 +27,12 @@
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.cluster.RegionRoleType;
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.AsyncConfigNodeHeartbeatClientPool;
import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.async.AsyncDataNodeHeartbeatClientPool;
import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
-import org.apache.iotdb.confignode.client.async.handlers.heartbeat.ConfigNodeHeartbeatHandler;
-import org.apache.iotdb.confignode.client.async.handlers.heartbeat.DataNodeHeartbeatHandler;
import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
@@ -52,7 +46,6 @@
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeConfigurationResp;
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeRegisterResp;
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
-import org.apache.iotdb.confignode.manager.ClusterQuotaManager;
import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.IManager;
@@ -60,9 +53,7 @@
import org.apache.iotdb.confignode.manager.UDFManager;
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.manager.load.LoadManager;
-import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.ConfigNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.cache.node.ConfigNodeHeartbeatCache;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
import org.apache.iotdb.confignode.manager.pipe.PipeManager;
@@ -81,7 +72,6 @@
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
-import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -89,22 +79,15 @@
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
/** NodeManager manages cluster node addition and removal requests */
public class NodeManager {
@@ -119,24 +102,10 @@
private final ReentrantLock removeConfigNodeLock;
- /** Heartbeat executor service */
- // Monitor for leadership change
- private final Object scheduleMonitor = new Object();
- // Map<NodeId, INodeCache>
- private final Map<Integer, BaseNodeCache> nodeCacheMap;
- private final AtomicInteger heartbeatCounter = new AtomicInteger(0);
- private Future<?> currentHeartbeatFuture;
- private final ScheduledExecutorService heartBeatExecutor =
- IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-Heartbeat-Service");
-
- private final Random random;
-
public NodeManager(IManager configManager, NodeInfo nodeInfo) {
this.configManager = configManager;
this.nodeInfo = nodeInfo;
this.removeConfigNodeLock = new ReentrantLock();
- this.nodeCacheMap = new ConcurrentHashMap<>();
- this.random = new Random(System.currentTimeMillis());
}
/**
@@ -391,15 +360,6 @@
/**
* Only leader use this interface
*
- * @return The number of total cpu cores in online DataNodes
- */
- public int getTotalCpuCoreCount() {
- return nodeInfo.getTotalCpuCoreCount();
- }
-
- /**
- * Only leader use this interface
- *
* @return All registered DataNodes
*/
public List<TDataNodeConfiguration> getRegisteredDataNodes() {
@@ -440,7 +400,7 @@
TDataNodeInfo dataNodeInfo = new TDataNodeInfo();
int dataNodeId = registeredDataNode.getLocation().getDataNodeId();
dataNodeInfo.setDataNodeId(dataNodeId);
- dataNodeInfo.setStatus(getNodeStatusWithReason(dataNodeId));
+ dataNodeInfo.setStatus(getLoadManager().getNodeStatusWithReason(dataNodeId));
dataNodeInfo.setRpcAddresss(
registeredDataNode.getLocation().getClientRpcEndPoint().getIp());
dataNodeInfo.setRpcPort(
@@ -504,7 +464,7 @@
TConfigNodeInfo info = new TConfigNodeInfo();
int configNodeId = configNodeLocation.getConfigNodeId();
info.setConfigNodeId(configNodeId);
- info.setStatus(getNodeStatusWithReason(configNodeId));
+ info.setStatus(getLoadManager().getNodeStatusWithReason(configNodeId));
info.setInternalAddress(configNodeLocation.getInternalEndPoint().getIp());
info.setInternalPort(configNodeLocation.getInternalEndPoint().getPort());
info.setRoleType(
@@ -689,218 +649,25 @@
}
}
- /** Start the heartbeat service */
- public void startHeartbeatService() {
- synchronized (scheduleMonitor) {
- if (currentHeartbeatFuture == null) {
- currentHeartbeatFuture =
- ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
- heartBeatExecutor,
- this::heartbeatLoopBody,
- 0,
- HEARTBEAT_INTERVAL,
- TimeUnit.MILLISECONDS);
- LOGGER.info("Heartbeat service is started successfully.");
- }
- }
- }
-
- /** loop body of the heartbeat thread */
- private void heartbeatLoopBody() {
- // The consensusManager of configManager may not be fully initialized at this time
- Optional.ofNullable(getConsensusManager())
- .ifPresent(
- consensusManager -> {
- if (getConsensusManager().isLeader()) {
- // Generate HeartbeatReq
- THeartbeatReq heartbeatReq = genHeartbeatReq();
- // Send heartbeat requests to all the registered DataNodes
- pingRegisteredDataNodes(heartbeatReq, getRegisteredDataNodes());
- // Send heartbeat requests to all the registered ConfigNodes
- pingRegisteredConfigNodes(heartbeatReq, getRegisteredConfigNodes());
- }
- });
- }
-
- private THeartbeatReq genHeartbeatReq() {
- /* Generate heartbeat request */
- THeartbeatReq heartbeatReq = new THeartbeatReq();
- heartbeatReq.setHeartbeatTimestamp(System.currentTimeMillis());
- // Always sample RegionGroups' leadership as the Region heartbeat
- heartbeatReq.setNeedJudgeLeader(true);
- // We sample DataNode's load in every 10 heartbeat loop
- heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % 10 == 0);
-
- /* Update heartbeat counter */
- heartbeatCounter.getAndUpdate((x) -> (x + 1) % 10);
- if (!getClusterQuotaManager().hasSpaceQuotaLimit()) {
- heartbeatReq.setSchemaRegionIds(getClusterQuotaManager().getSchemaRegionIds());
- heartbeatReq.setDataRegionIds(getClusterQuotaManager().getDataRegionIds());
- heartbeatReq.setSpaceQuotaUsage(getClusterQuotaManager().getSpaceQuotaUsage());
- }
- return heartbeatReq;
- }
-
/**
- * Send heartbeat requests to all the Registered DataNodes
+ * Filter ConfigNodes through the specified NodeStatus
*
- * @param registeredDataNodes DataNodes that registered in cluster
- */
- private void pingRegisteredDataNodes(
- THeartbeatReq heartbeatReq, List<TDataNodeConfiguration> registeredDataNodes) {
- // Send heartbeat requests
- for (TDataNodeConfiguration dataNodeInfo : registeredDataNodes) {
- DataNodeHeartbeatHandler handler =
- new DataNodeHeartbeatHandler(
- dataNodeInfo.getLocation(),
- (DataNodeHeartbeatCache)
- nodeCacheMap.computeIfAbsent(
- dataNodeInfo.getLocation().getDataNodeId(),
- empty -> new DataNodeHeartbeatCache()),
- getPartitionManager().getRegionGroupCacheMap(),
- getLoadManager().getRouteBalancer(),
- getClusterQuotaManager().getDeviceNum(),
- getClusterQuotaManager().getTimeSeriesNum(),
- getClusterQuotaManager().getRegionDisk());
- getClusterQuotaManager().updateSpaceQuotaUsage();
- AsyncDataNodeHeartbeatClientPool.getInstance()
- .getDataNodeHeartBeat(
- dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler);
- }
- }
-
- /**
- * Send heartbeat requests to all the Registered ConfigNodes
- *
- * @param registeredConfigNodes ConfigNodes that registered in cluster
- */
- private void pingRegisteredConfigNodes(
- THeartbeatReq heartbeatReq, List<TConfigNodeLocation> registeredConfigNodes) {
- // Send heartbeat requests
- for (TConfigNodeLocation configNodeLocation : registeredConfigNodes) {
- if (configNodeLocation.getConfigNodeId() == ConfigNodeHeartbeatCache.CURRENT_NODE_ID) {
- // Skip itself
- continue;
- }
-
- ConfigNodeHeartbeatHandler handler =
- new ConfigNodeHeartbeatHandler(
- (ConfigNodeHeartbeatCache)
- nodeCacheMap.computeIfAbsent(
- configNodeLocation.getConfigNodeId(),
- empty -> new ConfigNodeHeartbeatCache(configNodeLocation.getConfigNodeId())));
- AsyncConfigNodeHeartbeatClientPool.getInstance()
- .getConfigNodeHeartBeat(
- configNodeLocation.getInternalEndPoint(),
- heartbeatReq.getHeartbeatTimestamp(),
- handler);
- }
- }
-
- /** Stop the heartbeat service */
- public void stopHeartbeatService() {
- synchronized (scheduleMonitor) {
- if (currentHeartbeatFuture != null) {
- currentHeartbeatFuture.cancel(false);
- currentHeartbeatFuture = null;
- nodeCacheMap.clear();
- LOGGER.info("Heartbeat service is stopped successfully.");
- }
- }
- }
-
- public Map<Integer, BaseNodeCache> getNodeCacheMap() {
- return nodeCacheMap;
- }
-
- public void removeNodeCache(int nodeId) {
- nodeCacheMap.remove(nodeId);
- }
-
- /**
- * Safely get the specific Node's current status for showing cluster
- *
- * @param nodeId The specific Node's index
- * @return The specific Node's current status if the nodeCache contains it, Unknown otherwise
- */
- private String getNodeStatusWithReason(int nodeId) {
- BaseNodeCache nodeCache = nodeCacheMap.get(nodeId);
- return nodeCache == null
- ? NodeStatus.Unknown.getStatus() + "(NoHeartbeat)"
- : nodeCache.getNodeStatusWithReason();
- }
-
- /**
- * Filter the registered ConfigNodes through the specific NodeStatus
- *
- * @param status The specific NodeStatus
- * @return Filtered ConfigNodes with the specific NodeStatus
+ * @param status The specified NodeStatus
+ * @return Filtered ConfigNodes with the specified NodeStatus
*/
public List<TConfigNodeLocation> filterConfigNodeThroughStatus(NodeStatus... status) {
- return getRegisteredConfigNodes().stream()
- .filter(
- registeredConfigNode -> {
- int configNodeId = registeredConfigNode.getConfigNodeId();
- return nodeCacheMap.containsKey(configNodeId)
- && Arrays.stream(status)
- .anyMatch(s -> s.equals(nodeCacheMap.get(configNodeId).getNodeStatus()));
- })
- .collect(Collectors.toList());
+ return nodeInfo.getRegisteredConfigNodes(
+ getLoadManager().filterConfigNodeThroughStatus(status));
}
/**
- * Get NodeStatus by nodeId
+ * Filter DataNodes through the specified NodeStatus
*
- * @param nodeId The specific NodeId
- * @return NodeStatus of the specific node. If node does not exist, return null.
- */
- public NodeStatus getNodeStatusByNodeId(int nodeId) {
- BaseNodeCache baseNodeCache = nodeCacheMap.get(nodeId);
- return baseNodeCache == null ? null : baseNodeCache.getNodeStatus();
- }
-
- /**
- * Filter the registered DataNodes through the specific NodeStatus
- *
- * @param status The specific NodeStatus
- * @return Filtered DataNodes with the specific NodeStatus
+ * @param status The specified NodeStatus
+ * @return Filtered DataNodes with the specified NodeStatus
*/
public List<TDataNodeConfiguration> filterDataNodeThroughStatus(NodeStatus... status) {
- return getRegisteredDataNodes().stream()
- .filter(
- registeredDataNode -> {
- int dataNodeId = registeredDataNode.getLocation().getDataNodeId();
- return nodeCacheMap.containsKey(dataNodeId)
- && Arrays.stream(status)
- .anyMatch(s -> s.equals(nodeCacheMap.get(dataNodeId).getNodeStatus()));
- })
- .collect(Collectors.toList());
- }
-
- /**
- * Get the loadScore of each DataNode
- *
- * @return Map<DataNodeId, loadScore>
- */
- public Map<Integer, Long> getAllLoadScores() {
- Map<Integer, Long> result = new ConcurrentHashMap<>();
-
- nodeCacheMap.forEach(
- (dataNodeId, heartbeatCache) -> result.put(dataNodeId, heartbeatCache.getLoadScore()));
-
- return result;
- }
-
- /**
- * Get the free disk space of the specified DataNode
- *
- * @param dataNodeId The index of the specified DataNode
- * @return The free disk space that sample through heartbeat, 0 if no heartbeat received
- */
- public double getFreeDiskSpace(int dataNodeId) {
- DataNodeHeartbeatCache dataNodeHeartbeatCache =
- (DataNodeHeartbeatCache) nodeCacheMap.get(dataNodeId);
- return dataNodeHeartbeatCache == null ? 0d : dataNodeHeartbeatCache.getFreeDiskSpace();
+ return nodeInfo.getRegisteredDataNodes(getLoadManager().filterDataNodeThroughStatus(status));
}
/**
@@ -910,15 +677,10 @@
*/
public Optional<TDataNodeLocation> getLowestLoadDataNode() {
// TODO get real lowest load data node after scoring algorithm being implemented
- List<TDataNodeConfiguration> targetDataNodeList =
- filterDataNodeThroughStatus(NodeStatus.Running);
-
- if (targetDataNodeList == null || targetDataNodeList.isEmpty()) {
- return Optional.empty();
- } else {
- int index = random.nextInt(targetDataNodeList.size());
- return Optional.of(targetDataNodeList.get(index).location);
- }
+ int dataNodeId = getLoadManager().getLowestLoadDataNode();
+ return dataNodeId < 0
+ ? Optional.empty()
+ : Optional.of(getRegisteredDataNode(dataNodeId).getLocation());
}
/**
@@ -927,52 +689,8 @@
* @return TDataNodeLocation with the lowest loadScore
*/
public TDataNodeLocation getLowestLoadDataNode(Set<Integer> nodes) {
- AtomicInteger result = new AtomicInteger();
- AtomicLong lowestLoadScore = new AtomicLong(Long.MAX_VALUE);
-
- nodes.forEach(
- nodeID -> {
- BaseNodeCache cache = nodeCacheMap.get(nodeID);
- long score = (cache == null) ? Long.MAX_VALUE : cache.getLoadScore();
- if (score < lowestLoadScore.get()) {
- result.set(nodeID);
- lowestLoadScore.set(score);
- }
- });
-
- LOGGER.info(
- "get the lowest load DataNode, NodeID: [{}], LoadScore: [{}]", result, lowestLoadScore);
- return configManager.getNodeManager().getRegisteredDataNodeLocations().get(result.get());
- }
-
- /** Initialize the nodeCacheMap when the ConfigNode-Leader is switched */
- public void initNodeHeartbeatCache() {
- final int CURRENT_NODE_ID = ConfigNodeHeartbeatCache.CURRENT_NODE_ID;
- nodeCacheMap.clear();
-
- // Init ConfigNodeHeartbeatCache
- getRegisteredConfigNodes()
- .forEach(
- configNodeLocation -> {
- if (configNodeLocation.getConfigNodeId() != CURRENT_NODE_ID) {
- nodeCacheMap.put(
- configNodeLocation.getConfigNodeId(),
- new ConfigNodeHeartbeatCache(configNodeLocation.getConfigNodeId()));
- }
- });
- // Force set itself and never update
- nodeCacheMap.put(
- ConfigNodeHeartbeatCache.CURRENT_NODE_ID,
- new ConfigNodeHeartbeatCache(
- CURRENT_NODE_ID, ConfigNodeHeartbeatCache.CURRENT_NODE_STATISTICS));
-
- // Init DataNodeHeartbeatCache
- getRegisteredDataNodes()
- .forEach(
- dataNodeConfiguration ->
- nodeCacheMap.put(
- dataNodeConfiguration.getLocation().getDataNodeId(),
- new DataNodeHeartbeatCache()));
+ int dataNodeId = getLoadManager().getLowestLoadDataNode(new ArrayList<>(nodes));
+ return getRegisteredDataNode(dataNodeId).getLocation();
}
private ConsensusManager getConsensusManager() {
@@ -1002,8 +720,4 @@
private UDFManager getUDFManager() {
return configManager.getUDFManager();
}
-
- private ClusterQuotaManager getClusterQuotaManager() {
- return configManager.getClusterQuotaManager();
- }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java
index a38adaf..50e9023 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.confignode.manager.observer;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
import org.apache.iotdb.tsfile.utils.Pair;
import java.util.Map;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index 223bee1..f15978d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -26,7 +26,6 @@
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.cluster.RegionRoleType;
-import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.partition.DataPartitionTable;
@@ -68,7 +67,6 @@
import org.apache.iotdb.confignode.manager.ProcedureManager;
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.manager.load.LoadManager;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache;
import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionCreateTask;
import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask;
@@ -88,7 +86,6 @@
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -101,7 +98,6 @@
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/** The PartitionManager Manages cluster PartitionTable read and write requests. */
@@ -128,15 +124,11 @@
private final ScheduledExecutorService regionMaintainer;
private Future<?> currentRegionMaintainerFuture;
- // Map<RegionId, RegionGroupCache>
- private final Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap;
-
public PartitionManager(IManager configManager, PartitionInfo partitionInfo) {
this.configManager = configManager;
this.partitionInfo = partitionInfo;
this.regionMaintainer =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("IoTDB-Region-Maintainer");
- this.regionGroupCacheMap = new ConcurrentHashMap<>();
setSeriesPartitionExecutor();
}
@@ -593,13 +585,35 @@
* Only leader use this interface.
*
* @param database The specified Database
- * @return All Regions' RegionReplicaSet of the specified StorageGroup
+ * @return All Regions' RegionReplicaSet of the specified Database
*/
public List<TRegionReplicaSet> getAllReplicaSets(String database) {
return partitionInfo.getAllReplicaSets(database);
}
/**
+ * Get all RegionGroups currently owned by the specified Database
+ *
+ * @param dataNodeId The specified dataNodeId
+ * @return Deep copy of all RegionGroups' RegionReplicaSet with the specified dataNodeId
+ */
+ public List<TRegionReplicaSet> getAllReplicaSets(int dataNodeId) {
+ return partitionInfo.getAllReplicaSets(dataNodeId);
+ }
+
+ /**
+ * Only leader use this interface.
+ *
+ * @param database The specified Database
+ * @param regionGroupIds The specified RegionGroupIds
+ * @return All Regions' RegionReplicaSet of the specified Database
+ */
+ public List<TRegionReplicaSet> getReplicaSets(
+ String database, List<TConsensusGroupId> regionGroupIds) {
+ return partitionInfo.getReplicaSets(database, regionGroupIds);
+ }
+
+ /**
* Only leader use this interface.
*
* <p>Get the number of Regions currently owned by the specified DataNode
@@ -671,22 +685,22 @@
/**
* Only leader use this interface.
*
- * @param storageGroup StorageGroupName
+ * @param database DatabaseName
* @param type SchemaRegion or DataRegion
* @return The specific StorageGroup's Regions that sorted by the number of allocated slots
* @throws NoAvailableRegionGroupException When all RegionGroups within the specified StorageGroup
* are unavailable currently
*/
public List<Pair<Long, TConsensusGroupId>> getSortedRegionGroupSlotsCounter(
- String storageGroup, TConsensusGroupType type) throws NoAvailableRegionGroupException {
+ String database, TConsensusGroupType type) throws NoAvailableRegionGroupException {
// Collect static data
List<Pair<Long, TConsensusGroupId>> regionGroupSlotsCounter =
- partitionInfo.getRegionGroupSlotsCounter(storageGroup, type);
+ partitionInfo.getRegionGroupSlotsCounter(database, type);
// Filter RegionGroups that have Disabled status
List<Pair<Long, TConsensusGroupId>> result = new ArrayList<>();
for (Pair<Long, TConsensusGroupId> slotsCounter : regionGroupSlotsCounter) {
- RegionGroupStatus status = getRegionGroupStatus(slotsCounter.getRight());
+ RegionGroupStatus status = getLoadManager().getRegionGroupStatus(slotsCounter.getRight());
if (!RegionGroupStatus.Disabled.equals(status)) {
result.add(slotsCounter);
}
@@ -696,6 +710,9 @@
throw new NoAvailableRegionGroupException(type);
}
+ Map<TConsensusGroupId, RegionGroupStatus> regionGroupStatusMap =
+ getLoadManager()
+ .getRegionGroupStatus(result.stream().map(Pair::getRight).collect(Collectors.toList()));
result.sort(
(o1, o2) -> {
// Use the number of partitions as the first priority
@@ -705,8 +722,9 @@
return 1;
} else {
// Use RegionGroup status as second priority, Running > Available > Discouraged
- return getRegionGroupStatus(o1.getRight())
- .compareTo(getRegionGroupStatus(o2.getRight()));
+ return regionGroupStatusMap
+ .get(o1.getRight())
+ .compare(regionGroupStatusMap.get(o2.getRight()));
}
});
return result;
@@ -764,7 +782,8 @@
.forEach(
regionInfo -> {
regionInfo.setStatus(
- getRegionStatus(regionInfo.getConsensusGroupId(), regionInfo.getDataNodeId())
+ getLoadManager()
+ .getRegionStatus(regionInfo.getConsensusGroupId(), regionInfo.getDataNodeId())
.getStatus());
String regionType =
@@ -779,19 +798,21 @@
}
/**
+ * Check if the specified RegionGroup exists.
+ *
+ * @param regionGroupId The specified RegionGroup
+ */
+ public boolean isRegionGroupExists(TConsensusGroupId regionGroupId) {
+ return partitionInfo.isRegionGroupExisted(regionGroupId);
+ }
+
+ /**
* update region location
*
* @param req UpdateRegionLocationReq
* @return TSStatus
*/
public TSStatus updateRegionLocation(UpdateRegionLocationPlan req) {
- // Remove heartbeat cache if exists
- if (regionGroupCacheMap.containsKey(req.getRegionId())) {
- regionGroupCacheMap
- .get(req.getRegionId())
- .removeCacheIfExists(req.getOldNode().getDataNodeId());
- }
-
return getConsensusManager().write(req).getStatus();
}
@@ -1064,108 +1085,23 @@
/* Stop the RegionCleaner service */
currentRegionMaintainerFuture.cancel(false);
currentRegionMaintainerFuture = null;
- regionGroupCacheMap.clear();
LOGGER.info("RegionCleaner is stopped successfully.");
}
}
}
- public Map<TConsensusGroupId, RegionGroupCache> getRegionGroupCacheMap() {
- return regionGroupCacheMap;
- }
-
- public void removeRegionGroupCache(TConsensusGroupId consensusGroupId) {
- regionGroupCacheMap.remove(consensusGroupId);
- }
-
/**
- * Filter the RegionGroups in the specified StorageGroup through the RegionGroupStatus
+ * Filter the RegionGroups in the specified Database through the RegionGroupStatus
*
- * @param storageGroup The specified StorageGroup
+ * @param database The specified Database
* @param status The specified RegionGroupStatus
- * @return Filtered RegionGroups with the specific RegionGroupStatus
+ * @return Filtered RegionGroups with the specified RegionGroupStatus
*/
public List<TRegionReplicaSet> filterRegionGroupThroughStatus(
- String storageGroup, RegionGroupStatus... status) {
- return getAllReplicaSets(storageGroup).stream()
- .filter(
- regionReplicaSet -> {
- TConsensusGroupId regionGroupId = regionReplicaSet.getRegionId();
- return regionGroupCacheMap.containsKey(regionGroupId)
- && Arrays.stream(status)
- .anyMatch(
- s ->
- s.equals(
- regionGroupCacheMap
- .get(regionGroupId)
- .getStatistics()
- .getRegionGroupStatus()));
- })
- .collect(Collectors.toList());
- }
-
- /**
- * Count the number of cluster Regions with specified RegionStatus
- *
- * @param type The specified RegionGroupType
- * @param status The specified statues
- * @return The number of cluster Regions with specified RegionStatus
- */
- public int countRegionWithSpecifiedStatus(TConsensusGroupType type, RegionStatus... status) {
- AtomicInteger result = new AtomicInteger(0);
- regionGroupCacheMap.forEach(
- (regionGroupId, regionGroupCache) -> {
- if (type.equals(regionGroupId.getType())) {
- regionGroupCache
- .getStatistics()
- .getRegionStatisticsMap()
- .values()
- .forEach(
- regionStatistics -> {
- if (Arrays.stream(status)
- .anyMatch(s -> s.equals(regionStatistics.getRegionStatus()))) {
- result.getAndIncrement();
- }
- });
- }
- });
- return result.get();
- }
-
- /**
- * Safely get RegionStatus.
- *
- * @param consensusGroupId Specified RegionGroupId
- * @param dataNodeId Specified RegionReplicaId
- * @return Corresponding RegionStatus if cache exists, Unknown otherwise
- */
- public RegionStatus getRegionStatus(TConsensusGroupId consensusGroupId, int dataNodeId) {
- return regionGroupCacheMap.containsKey(consensusGroupId)
- ? regionGroupCacheMap.get(consensusGroupId).getStatistics().getRegionStatus(dataNodeId)
- : RegionStatus.Unknown;
- }
-
- /**
- * Safely get RegionGroupStatus.
- *
- * @param consensusGroupId Specified RegionGroupId
- * @return Corresponding RegionGroupStatus if cache exists, Disabled otherwise
- */
- public RegionGroupStatus getRegionGroupStatus(TConsensusGroupId consensusGroupId) {
- return regionGroupCacheMap.containsKey(consensusGroupId)
- ? regionGroupCacheMap.get(consensusGroupId).getStatistics().getRegionGroupStatus()
- : RegionGroupStatus.Disabled;
- }
-
- /** Initialize the regionGroupCacheMap when the ConfigNode-Leader is switched. */
- public void initRegionGroupHeartbeatCache() {
- regionGroupCacheMap.clear();
- getAllReplicaSets()
- .forEach(
- regionReplicaSet ->
- regionGroupCacheMap.put(
- regionReplicaSet.getRegionId(),
- new RegionGroupCache(regionReplicaSet.getRegionId())));
+ String database, RegionGroupStatus... status) {
+ List<TConsensusGroupId> matchedRegionGroups =
+ getLoadManager().filterRegionGroupThroughStatus(status);
+ return getReplicaSets(database, matchedRegionGroups);
}
public void getSchemaRegionIds(
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java
index 21909d6..f9eff7b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java
@@ -72,10 +72,9 @@
metricService.createAutoGauge(
Metric.REGION_NUM.toString(),
MetricLevel.CORE,
- getPartitionManager(),
- partitionManager ->
- partitionManager.countRegionWithSpecifiedStatus(
- TConsensusGroupType.SchemaRegion, status),
+ getLoadManager(),
+ loadManager ->
+ loadManager.countRegionWithSpecifiedStatus(TConsensusGroupType.SchemaRegion, status),
Tag.TYPE.toString(),
TConsensusGroupType.SchemaRegion.toString(),
Tag.STATUS.toString(),
@@ -85,10 +84,9 @@
metricService.createAutoGauge(
Metric.REGION_NUM.toString(),
MetricLevel.CORE,
- getPartitionManager(),
- partitionManager ->
- partitionManager.countRegionWithSpecifiedStatus(
- TConsensusGroupType.DataRegion, status),
+ getLoadManager(),
+ loadManager ->
+ loadManager.countRegionWithSpecifiedStatus(TConsensusGroupType.DataRegion, status),
Tag.TYPE.toString(),
TConsensusGroupType.DataRegion.toString(),
Tag.STATUS.toString(),
@@ -330,8 +328,8 @@
return configManager.getClusterSchemaManager();
}
- private PartitionManager getPartitionManager() {
- return configManager.getPartitionManager();
+ private LoadManager getLoadManager() {
+ return configManager.getLoadManager();
}
@Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupStatus.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupStatus.java
index 9e58ae6..b4cf1e0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupStatus.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupStatus.java
@@ -21,20 +21,20 @@
public enum RegionGroupStatus {
/** All Regions in RegionGroup are in the Running status */
- Running("Running"),
+ Running("Running", 1),
/**
* All Regions in RegionGroup are in the Running or Unknown status, and the number of Regions in
* the Unknown status is less than half
*/
- Available("Available"),
+ Available("Available", 2),
/**
* All Regions in RegionGroup are in the Running, Unknown or ReadOnly status, and at least 1 node
* is in ReadOnly status, the number of Regions in the Unknown or ReadOnly status is less than
* half
*/
- Discouraged("Discouraged"),
+ Discouraged("Discouraged", 3),
/**
* The following cases will lead to Disabled RegionGroup:
@@ -43,12 +43,14 @@
*
* <p>2. More than half of the Regions are in Unknown or ReadOnly status
*/
- Disabled("Disabled");
+ Disabled("Disabled", 4);
private final String status;
+ private final int weight;
- RegionGroupStatus(String status) {
+ RegionGroupStatus(String status, int weight) {
this.status = status;
+ this.weight = weight;
}
public String getStatus() {
@@ -63,4 +65,13 @@
}
throw new RuntimeException(String.format("RegionGroupStatus %s doesn't exist.", status));
}
+
+ /**
+ * Compare the weight of two RegionGroupStatus.
+ *
+ * <p>Running > Available > Discouraged > Disabled
+ */
+ public int compare(RegionGroupStatus other) {
+ return Integer.compare(this.weight, other.weight);
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
index 67c5526..4c0b176 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
@@ -221,18 +221,6 @@
return result;
}
- /** Return the number of registered ConfigNodes */
- public int getRegisteredConfigNodeCount() {
- int result;
- configNodeInfoReadWriteLock.readLock().lock();
- try {
- result = registeredConfigNodes.size();
- } finally {
- configNodeInfoReadWriteLock.readLock().unlock();
- }
- return result;
- }
-
/** Return the number of total cpu cores in online DataNodes */
public int getTotalCpuCoreCount() {
int result = 0;
@@ -269,6 +257,23 @@
}
}
+ /** @return The specified registered DataNodes */
+ public List<TDataNodeConfiguration> getRegisteredDataNodes(List<Integer> dataNodeIds) {
+ List<TDataNodeConfiguration> result = new ArrayList<>();
+ dataNodeInfoReadWriteLock.readLock().lock();
+ try {
+ dataNodeIds.forEach(
+ dataNodeId -> {
+ if (registeredDataNodes.containsKey(dataNodeId)) {
+ result.add(registeredDataNodes.get(dataNodeId).deepCopy());
+ }
+ });
+ } finally {
+ dataNodeInfoReadWriteLock.readLock().unlock();
+ }
+ return result;
+ }
+
/**
* Update ConfigNodeList both in memory and confignode-system.properties file
*
@@ -336,6 +341,7 @@
return status;
}
+ /** @return All registered ConfigNodes */
public List<TConfigNodeLocation> getRegisteredConfigNodes() {
List<TConfigNodeLocation> result;
configNodeInfoReadWriteLock.readLock().lock();
@@ -347,6 +353,23 @@
return result;
}
+ /** @return The specified registered ConfigNode */
+ public List<TConfigNodeLocation> getRegisteredConfigNodes(List<Integer> configNodeIds) {
+ List<TConfigNodeLocation> result = new ArrayList<>();
+ configNodeInfoReadWriteLock.readLock().lock();
+ try {
+ configNodeIds.forEach(
+ configNodeId -> {
+ if (registeredConfigNodes.containsKey(configNodeId)) {
+ result.add(registeredConfigNodes.get(configNodeId).deepCopy());
+ }
+ });
+ } finally {
+ configNodeInfoReadWriteLock.readLock().unlock();
+ }
+ return result;
+ }
+
public int generateNextNodeId() {
return nextNodeId.incrementAndGet();
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
index b27a4b5..736083f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
@@ -89,7 +89,7 @@
}
/**
- * Cache allocation result of new RegionGroups
+ * Cache allocation result of new RegionGroups.
*
* @param replicaSets List<TRegionReplicaSet>
*/
@@ -101,7 +101,7 @@
}
/**
- * Delete RegionGroups' cache
+ * Delete RegionGroups' cache.
*
* @param replicaSets List<TRegionReplicaSet>
*/
@@ -109,7 +109,7 @@
replicaSets.forEach(replicaSet -> regionGroupMap.remove(replicaSet.getRegionId()));
}
- /** @return Deep copy of all Regions' RegionReplicaSet within one StorageGroup */
+ /** @return Deep copy of all Regions' RegionReplicaSet within one StorageGroup. */
public List<TRegionReplicaSet> getAllReplicaSets() {
List<TRegionReplicaSet> result = new ArrayList<>();
@@ -119,8 +119,9 @@
return result;
}
+
/**
- * Get all RegionGroups currently owned by this StorageGroup
+ * Get all RegionGroups currently owned by this Database.
*
* @param type The specified TConsensusGroupType
* @return Deep copy of all Regions' RegionReplicaSet with the specified TConsensusGroupType
@@ -138,6 +139,37 @@
}
/**
+ * Get all RegionGroups currently owned by the specified Database.
+ *
+ * @param dataNodeId The specified dataNodeId
+ * @return Deep copy of all RegionGroups' RegionReplicaSet with the specified dataNodeId
+ */
+ public List<TRegionReplicaSet> getAllReplicaSets(int dataNodeId) {
+ return regionGroupMap.values().stream()
+ .filter(regionGroup -> regionGroup.belongsToDataNode(dataNodeId))
+ .map(RegionGroup::getReplicaSet)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Get the RegionGroups with the specified RegionGroupIds.
+ *
+ * @param regionGroupIds The specified RegionGroupIds
+ * @return Deep copy of the RegionGroups with the specified RegionGroupIds
+ */
+ public List<TRegionReplicaSet> getReplicaSets(List<TConsensusGroupId> regionGroupIds) {
+ List<TRegionReplicaSet> result = new ArrayList<>();
+
+ for (TConsensusGroupId regionGroupId : regionGroupIds) {
+ if (regionGroupMap.containsKey(regionGroupId)) {
+ result.add(regionGroupMap.get(regionGroupId).getReplicaSet());
+ }
+ }
+
+ return result;
+ }
+
+ /**
* Only leader use this interface.
*
* <p>Get the number of Regions currently owned by the specified DataNode
@@ -168,7 +200,7 @@
}
/**
- * Get the number of RegionGroups currently owned by this StorageGroup
+ * Get the number of RegionGroups currently owned by this StorageGroup.
*
* @param type SchemaRegion or DataRegion
* @return The number of Regions currently owned by this StorageGroup
@@ -193,7 +225,7 @@
}
/**
- * Thread-safely get SchemaPartition within the specific StorageGroup
+ * Thread-safely get SchemaPartition within the specific StorageGroup.
*
* @param partitionSlots SeriesPartitionSlots
* @param schemaPartition Where the results are stored
@@ -205,7 +237,7 @@
}
/**
- * Thread-safely get DataPartition within the specific StorageGroup
+ * Thread-safely get DataPartition within the specific StorageGroup.
*
* @param partitionSlots SeriesPartitionSlots and TimePartitionSlots
* @param dataPartition Where the results are stored
@@ -217,7 +249,7 @@
}
/**
- * Checks whether the specified DataPartition has a predecessor and returns if it does
+ * Checks whether the specified DataPartition has a predecessor and returns if it does.
*
* @param seriesPartitionSlot Corresponding SeriesPartitionSlot
* @param timePartitionSlot Corresponding TimePartitionSlot
@@ -233,7 +265,7 @@
}
/**
- * Create SchemaPartition within the specific StorageGroup
+ * Create SchemaPartition within the specific StorageGroup.
*
* @param assignedSchemaPartition Assigned result
*/
@@ -250,7 +282,7 @@
}
/**
- * Create DataPartition within the specific StorageGroup
+ * Create DataPartition within the specific StorageGroup.
*
* @param assignedDataPartition Assigned result
*/
@@ -268,7 +300,7 @@
/**
* Only Leader use this interface. Filter unassigned SchemaPartitionSlots within the specific
- * StorageGroup
+ * StorageGroup.
*
* @param partitionSlots List<TSeriesPartitionSlot>
* @return Unassigned PartitionSlots
@@ -279,7 +311,7 @@
}
/**
- * Get the DataNodes who contain the specific StorageGroup's Schema or Data
+ * Get the DataNodes who contain the specific StorageGroup's Schema or Data.
*
* @param type SchemaRegion or DataRegion
* @return Set<TDataNodeLocation>, the related DataNodes
@@ -297,7 +329,7 @@
/**
* Only Leader use this interface. Filter unassigned DataPartitionSlots within the specific
- * StorageGroup
+ * StorageGroup.
*
* @param partitionSlots List<TSeriesPartitionSlot>
* @return Unassigned PartitionSlots
@@ -436,7 +468,7 @@
}
}
/**
- * update region location
+ * update region location.
*
* @param regionId regionId
* @param oldNode old location, will remove it
@@ -496,7 +528,7 @@
* @param regionId TConsensusGroupId
* @return True if contains.
*/
- public boolean containRegion(TConsensusGroupId regionId) {
+ public boolean containRegionGroup(TConsensusGroupId regionId) {
return regionGroupMap.containsKey(regionId);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index b19ac83..799773a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -489,6 +489,17 @@
}
/**
+ * Check if the specified RegionGroup exists.
+ *
+ * @param regionGroupId The specified RegionGroup
+ */
+ public boolean isRegionGroupExisted(TConsensusGroupId regionGroupId) {
+ return databasePartitionTables.values().stream()
+ .anyMatch(
+ databasePartitionTable -> databasePartitionTable.containRegionGroup(regionGroupId));
+ }
+
+ /**
* Update the location info of given regionId
*
* @param req UpdateRegionLocationReq
@@ -500,9 +511,10 @@
TDataNodeLocation oldNode = req.getOldNode();
TDataNodeLocation newNode = req.getNewNode();
databasePartitionTables.values().stream()
- .filter(sgPartitionTable -> sgPartitionTable.containRegion(regionId))
+ .filter(databasePartitionTable -> databasePartitionTable.containRegionGroup(regionId))
.forEach(
- sgPartitionTable -> sgPartitionTable.updateRegionLocation(regionId, oldNode, newNode));
+ databasePartitionTable ->
+ databasePartitionTable.updateRegionLocation(regionId, oldNode, newNode));
return status;
}
@@ -516,7 +528,7 @@
public String getRegionStorageGroup(TConsensusGroupId regionId) {
Optional<DatabasePartitionTable> sgPartitionTableOptional =
databasePartitionTables.values().stream()
- .filter(s -> s.containRegion(regionId))
+ .filter(s -> s.containRegionGroup(regionId))
.findFirst();
return sgPartitionTableOptional.map(DatabasePartitionTable::getDatabaseName).orElse(null);
}
@@ -620,6 +632,38 @@
}
/**
+ * Get all RegionGroups currently owned by the specified Database.
+ *
+ * @param dataNodeId The specified dataNodeId
+ * @return Deep copy of all RegionGroups' RegionReplicaSet with the specified dataNodeId
+ */
+ public List<TRegionReplicaSet> getAllReplicaSets(int dataNodeId) {
+ List<TRegionReplicaSet> result = new ArrayList<>();
+ databasePartitionTables
+ .values()
+ .forEach(
+ databasePartitionTable ->
+ result.addAll(databasePartitionTable.getAllReplicaSets(dataNodeId)));
+ return result;
+ }
+
+ /**
+ * Only leader use this interface.
+ *
+ * @param database The specified Database
+ * @param regionGroupIds The specified RegionGroupIds
+ * @return All Regions' RegionReplicaSet of the specified Database
+ */
+ public List<TRegionReplicaSet> getReplicaSets(
+ String database, List<TConsensusGroupId> regionGroupIds) {
+ if (databasePartitionTables.containsKey(database)) {
+ return databasePartitionTables.get(database).getReplicaSets(regionGroupIds);
+ } else {
+ return new ArrayList<>();
+ }
+ }
+
+ /**
* Only leader use this interface.
*
* <p>Get the number of Regions currently owned by the specified DataNode
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
index 4c4c7ff..a555c23 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
@@ -71,7 +71,7 @@
}
public TRegionReplicaSet getReplicaSet() {
- return replicaSet;
+ return replicaSet.deepCopy();
}
/** @param deltaMap Map<TSeriesPartitionSlot, Delta TTimePartitionSlot Count> */
@@ -93,6 +93,17 @@
return totalTimeSlotCount.get();
}
+ /**
+ * Check if the RegionGroup belongs to the specified DataNode.
+ *
+ * @param dataNodeId The specified DataNodeId.
+ * @return True if the RegionGroup belongs to the specified DataNode.
+ */
+ public boolean belongsToDataNode(int dataNodeId) {
+ return replicaSet.getDataNodeLocations().stream()
+ .anyMatch(dataNodeLocation -> dataNodeLocation.getDataNodeId() == dataNodeId);
+ }
+
public void serialize(OutputStream outputStream, TProtocol protocol)
throws IOException, TException {
ReadWriteIOUtils.write(createTime, outputStream);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index d722d37..46a4c7a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -27,6 +27,7 @@
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.NodeType;
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
@@ -49,11 +50,10 @@
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.manager.load.LoadManager;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
import org.apache.iotdb.confignode.manager.node.NodeManager;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionHeartbeatSample;
import org.apache.iotdb.confignode.persistence.node.NodeInfo;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
@@ -148,8 +148,7 @@
* @throws TException Thrift IOE
*/
public boolean invalidateCache(String storageGroupName) throws IOException, TException {
- NodeManager nodeManager = configManager.getNodeManager();
- List<TDataNodeConfiguration> allDataNodes = nodeManager.getRegisteredDataNodes();
+ List<TDataNodeConfiguration> allDataNodes = getNodeManager().getRegisteredDataNodes();
TInvalidateCacheReq invalidateCacheReq = new TInvalidateCacheReq();
invalidateCacheReq.setStorageGroup(true);
invalidateCacheReq.setFullPath(storageGroupName);
@@ -157,14 +156,14 @@
int dataNodeId = dataNodeConfiguration.getLocation().getDataNodeId();
// if the node is not alive, sleep 1 second and try again
- NodeStatus nodeStatus = nodeManager.getNodeStatusByNodeId(dataNodeId);
+ NodeStatus nodeStatus = getLoadManager().getNodeStatus(dataNodeId);
if (nodeStatus == NodeStatus.Unknown) {
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
LOG.error("Sleep failed in ConfigNodeProcedureEnv: ", e);
}
- nodeStatus = nodeManager.getNodeStatusByNodeId(dataNodeId);
+ nodeStatus = getLoadManager().getNodeStatus(dataNodeId);
}
if (nodeStatus == NodeStatus.Running) {
@@ -205,14 +204,11 @@
}
public boolean doubleCheckReplica(TDataNodeLocation removedDatanode) {
- return configManager
- .getNodeManager()
+ return getNodeManager()
.filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.ReadOnly)
.size()
- Boolean.compare(
- configManager
- .getNodeManager()
- .getNodeStatusByNodeId(removedDatanode.getDataNodeId())
+ getLoadManager().getNodeStatus(removedDatanode.getDataNodeId())
!= NodeStatus.Unknown,
false)
>= NodeInfo.getMinimumDataNode();
@@ -308,8 +304,6 @@
* @throws ProcedureException if failed status
*/
public void stopConfigNode(TConfigNodeLocation tConfigNodeLocation) throws ProcedureException {
- getNodeManager().removeNodeCache(tConfigNodeLocation.getConfigNodeId());
-
TSStatus tsStatus =
(TSStatus)
SyncConfigNodeClientPool.getInstance()
@@ -321,6 +315,8 @@
if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new ProcedureException(tsStatus.getMessage());
}
+
+ getLoadManager().removeNodeCache(tConfigNodeLocation.getConfigNodeId());
}
/**
@@ -374,8 +370,7 @@
*/
public void markDataNodeAsRemovingAndBroadcast(TDataNodeLocation dataNodeLocation) {
// Send request to update NodeStatus on the DataNode to be removed
- if (configManager.getNodeManager().getNodeStatusByNodeId(dataNodeLocation.getDataNodeId())
- == NodeStatus.Unknown) {
+ if (getLoadManager().getNodeStatus(dataNodeLocation.getDataNodeId()) == NodeStatus.Unknown) {
SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithGivenRetry(
dataNodeLocation.getInternalEndPoint(),
@@ -391,10 +386,11 @@
}
// Force updating NodeStatus to Removing
- getNodeManager()
- .getNodeCacheMap()
- .get(dataNodeLocation.getDataNodeId())
- .forceUpdate(NodeHeartbeatSample.generateDefaultSample(NodeStatus.Removing));
+ getLoadManager()
+ .forceUpdateNodeCache(
+ NodeType.DataNode,
+ dataNodeLocation.getDataNodeId(),
+ NodeHeartbeatSample.generateDefaultSample(NodeStatus.Removing));
}
/**
@@ -542,10 +538,7 @@
(dataNodeId, regionStatus) ->
heartbeatSampleMap.put(
dataNodeId, new RegionHeartbeatSample(currentTime, currentTime, regionStatus)));
- getPartitionManager()
- .getRegionGroupCacheMap()
- .computeIfAbsent(regionGroupId, empty -> new RegionGroupCache(regionGroupId))
- .forceUpdate(heartbeatSampleMap);
+ getLoadManager().forceUpdateRegionGroupCache(regionGroupId, heartbeatSampleMap);
// Select leader greedily for iot consensus protocol
if (TConsensusGroupType.DataRegion.equals(regionGroupId.getType())
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index afa2d3c..c75fd41 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -34,7 +34,6 @@
import org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan;
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
import org.apache.iotdb.confignode.manager.ConfigManager;
-import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache;
import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
import org.apache.iotdb.confignode.persistence.node.NodeInfo;
import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
@@ -328,7 +327,7 @@
TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId, originalDataNode);
status =
- configManager.getNodeManager().getNodeStatusByNodeId(originalDataNode.getDataNodeId())
+ configManager.getLoadManager().getNodeStatus(originalDataNode.getDataNodeId())
== NodeStatus.Unknown
? SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithGivenRetry(
@@ -375,6 +374,9 @@
getIdWithRpcEndpoint(originalDataNode),
getIdWithRpcEndpoint(destDataNode));
+ // Remove the RegionGroupCache of the regionId
+ configManager.getLoadManager().removeRegionGroupCache(regionId);
+
// Broadcast the latest RegionRouteMap when Region migration finished
configManager.getLoadManager().broadcastLatestRegionRouteMap();
}
@@ -427,7 +429,7 @@
SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithGivenRetry(
dataNode.getInternalEndPoint(), dataNode, DataNodeRequestType.STOP_DATA_NODE, 2);
- configManager.getNodeManager().removeNodeCache(dataNode.getDataNodeId());
+ configManager.getLoadManager().removeNodeCache(dataNode.getDataNodeId());
LOGGER.info(
"{}, Stop Data Node result: {}, stoppedDataNode: {}",
REMOVE_DATANODE_PROCESS,
@@ -509,9 +511,8 @@
if (CONF.getSchemaReplicationFactor() == 1 || CONF.getDataReplicationFactor() == 1) {
for (TDataNodeLocation dataNodeLocation : removedDataNodes) {
// check whether removed data node is in running state
- BaseNodeCache nodeCache =
- configManager.getNodeManager().getNodeCacheMap().get(dataNodeLocation.getDataNodeId());
- if (!NodeStatus.Running.equals(nodeCache.getNodeStatus())) {
+ if (!NodeStatus.Running.equals(
+ configManager.getLoadManager().getNodeStatus(dataNodeLocation.getDataNodeId()))) {
removedDataNodes.remove(dataNodeLocation);
LOGGER.error(
"Failed to remove data node {} because it is not in running and the configuration of cluster is one replication",
@@ -530,7 +531,7 @@
removeDataNodePlan.getDataNodeLocations().stream()
.filter(
x ->
- configManager.getNodeManager().getNodeStatusByNodeId(x.getDataNodeId())
+ configManager.getLoadManager().getNodeStatus(x.getDataNodeId())
!= NodeStatus.Unknown)
.count();
if (availableDatanodeSize - removedDataNodeSize < NodeInfo.getMinimumDataNode()) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
index f1271c2..43c3404 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
@@ -120,7 +120,7 @@
regionReplicaSet -> {
// Clear heartbeat cache along the way
env.getConfigManager()
- .getPartitionManager()
+ .getLoadManager()
.removeRegionGroupCache(regionReplicaSet.getRegionId());
env.getConfigManager()
.getLoadManager()
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
index 46b5839..e1f02cf 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
@@ -47,6 +47,9 @@
/** Region migrate procedure */
public class RegionMigrateProcedure
extends StateMachineProcedure<ConfigNodeProcedureEnv, RegionTransitionState> {
+
+ // TODO: Reach an agreement on RegionMigrateProcedure
+
private static final Logger LOG = LoggerFactory.getLogger(RegionMigrateProcedure.class);
private static final int RETRY_THRESHOLD = 5;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java
index cb617bf..64eae9a 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java
@@ -24,9 +24,9 @@
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.node.BaseNodeCache;
+import org.apache.iotdb.confignode.manager.load.cache.node.DataNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
import org.junit.Assert;
@@ -64,7 +64,7 @@
NodeStatus.Running, NodeStatus.Unknown, NodeStatus.Running, NodeStatus.ReadOnly
};
for (int i = 0; i < 4; i++) {
- nodeCacheMap.put(i, new DataNodeHeartbeatCache());
+ nodeCacheMap.put(i, new DataNodeHeartbeatCache(i));
nodeCacheMap
.get(i)
.cacheHeartbeatSample(
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
index a028219..99d0e88 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
@@ -24,9 +24,9 @@
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.node.BaseNodeCache;
+import org.apache.iotdb.confignode.manager.load.cache.node.DataNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
import org.junit.Assert;
@@ -61,7 +61,7 @@
long currentTimeMillis = System.currentTimeMillis();
Map<Integer, BaseNodeCache> nodeCacheMap = new HashMap<>();
for (int i = 0; i < 6; i++) {
- nodeCacheMap.put(i, new DataNodeHeartbeatCache());
+ nodeCacheMap.put(i, new DataNodeHeartbeatCache(i));
if (i != 2 && i != 5) {
nodeCacheMap
.get(i)
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java
index 4d93b63..46ffbe2 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeCacheTest.java
@@ -19,8 +19,8 @@
package org.apache.iotdb.confignode.manager.node;
import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.node.DataNodeHeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
import org.junit.Assert;
@@ -30,7 +30,7 @@
@Test
public void forceUpdateTest() {
- DataNodeHeartbeatCache dataNodeHeartbeatCache = new DataNodeHeartbeatCache();
+ DataNodeHeartbeatCache dataNodeHeartbeatCache = new DataNodeHeartbeatCache(1);
// Test default
Assert.assertEquals(NodeStatus.Unknown, dataNodeHeartbeatCache.getNodeStatus());
@@ -55,7 +55,7 @@
@Test
public void periodicUpdateTest() {
- DataNodeHeartbeatCache dataNodeHeartbeatCache = new DataNodeHeartbeatCache();
+ DataNodeHeartbeatCache dataNodeHeartbeatCache = new DataNodeHeartbeatCache(1);
long currentTime = System.currentTimeMillis();
dataNodeHeartbeatCache.cacheHeartbeatSample(
new NodeHeartbeatSample(
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCacheTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCacheTest.java
index 3c3cf48..1dac94e 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCacheTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCacheTest.java
@@ -21,8 +21,8 @@
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.commons.cluster.RegionStatus;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupCache;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
import org.junit.Assert;
import org.junit.Test;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/node/NodeStatisticsTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/node/NodeStatisticsTest.java
index 4181fa9..7d71f58 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/node/NodeStatisticsTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/node/NodeStatisticsTest.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.confignode.persistence.node;
import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.confignode.manager.node.heartbeat.NodeStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.junit.Assert;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatisticsTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatisticsTest.java
index cce3ed2..4d1642b 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatisticsTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionGroupStatisticsTest.java
@@ -19,9 +19,9 @@
package org.apache.iotdb.confignode.persistence.partition.statistics;
import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupStatistics;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionStatistics;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.junit.Assert;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionStatisticsTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionStatisticsTest.java
index 8ccf87c..b5935cc 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionStatisticsTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/partition/statistics/RegionStatisticsTest.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.confignode.persistence.partition.statistics;
import org.apache.iotdb.commons.cluster.RegionStatus;
-import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.junit.Assert;