blob: 2f2f658e8b8ac92135ec3a423ec397abcd2e97e5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.confignode.manager.node;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.cluster.RegionRoleType;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.async.confignode.AsyncConfigNodeHeartbeatClientPool;
import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeHeartbeatClientPool;
import org.apache.iotdb.confignode.client.async.handlers.ConfigNodeHeartbeatHandler;
import org.apache.iotdb.confignode.client.async.handlers.DataNodeHeartbeatHandler;
import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.read.GetDataNodeConfigurationPlan;
import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.RemoveDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
import org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationResp;
import org.apache.iotdb.confignode.consensus.response.DataNodeRegisterResp;
import org.apache.iotdb.confignode.consensus.response.DataNodeToStatusResp;
import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.ConsensusManager;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.confignode.persistence.NodeInfo;
import org.apache.iotdb.confignode.persistence.metric.NodeInfoMetrics;
import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
import org.apache.iotdb.confignode.rpc.thrift.TRatisConfig;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.db.service.metrics.MetricService;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
/** NodeManager manages cluster node addition and removal requests */
public class NodeManager {
private static final Logger LOGGER = LoggerFactory.getLogger(NodeManager.class);
private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
public static final long HEARTBEAT_INTERVAL = CONF.getHeartbeatInterval();
public static final TEndPoint CURRENT_NODE =
new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort());
private final IManager configManager;
private final NodeInfo nodeInfo;
private final ReentrantLock removeConfigNodeLock;
/** Heartbeat executor service */
// Monitor for leadership change
private final Object scheduleMonitor = new Object();
// Map<NodeId, INodeCache>
private final Map<Integer, BaseNodeCache> nodeCacheMap;
private final AtomicInteger heartbeatCounter = new AtomicInteger(0);
private Future<?> currentHeartbeatFuture;
private final ScheduledExecutorService heartBeatExecutor =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(LoadManager.class.getSimpleName());
public NodeManager(IManager configManager, NodeInfo nodeInfo) {
this.configManager = configManager;
this.nodeInfo = nodeInfo;
this.removeConfigNodeLock = new ReentrantLock();
this.nodeCacheMap = new ConcurrentHashMap<>();
}
private void setGlobalConfig(DataNodeRegisterResp dataSet) {
// Set TGlobalConfig
final ConfigNodeConfig configNodeConfig = ConfigNodeDescriptor.getInstance().getConf();
final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig();
TGlobalConfig globalConfig = new TGlobalConfig();
globalConfig.setDataRegionConsensusProtocolClass(
configNodeConfig.getDataRegionConsensusProtocolClass());
globalConfig.setSchemaRegionConsensusProtocolClass(
configNodeConfig.getSchemaRegionConsensusProtocolClass());
globalConfig.setSeriesPartitionSlotNum(configNodeConfig.getSeriesPartitionSlotNum());
globalConfig.setSeriesPartitionExecutorClass(
configNodeConfig.getSeriesPartitionExecutorClass());
globalConfig.setTimePartitionInterval(configNodeConfig.getTimePartitionInterval());
globalConfig.setReadConsistencyLevel(configNodeConfig.getReadConsistencyLevel());
globalConfig.setDiskSpaceWarningThreshold(commonConfig.getDiskSpaceWarningThreshold());
dataSet.setGlobalConfig(globalConfig);
}
private void setRatisConfig(DataNodeRegisterResp dataSet) {
final ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
TRatisConfig ratisConfig = new TRatisConfig();
ratisConfig.setAppenderBufferSize(conf.getRatisConsensusLogAppenderBufferSize());
ratisConfig.setSnapshotTriggerThreshold(conf.getRatisSnapshotTriggerThreshold());
ratisConfig.setLogUnsafeFlushEnable(conf.isRatisLogUnsafeFlushEnable());
ratisConfig.setLogSegmentSizeMax(conf.getRatisLogSegmentSizeMax());
ratisConfig.setGrpcFlowControlWindow(conf.getRatisGrpcFlowControlWindow());
ratisConfig.setLeaderElectionTimeoutMin(conf.getRatisRpcLeaderElectionTimeoutMinMs());
ratisConfig.setLeaderElectionTimeoutMax(conf.getRatisRpcLeaderElectionTimeoutMaxMs());
dataSet.setRatisConfig(ratisConfig);
}
/**
* Register DataNode
*
* @param registerDataNodePlan RegisterDataNodeReq
* @return DataNodeConfigurationDataSet. The TSStatus will be set to SUCCESS_STATUS when register
* success, and DATANODE_ALREADY_REGISTERED when the DataNode is already exist.
*/
public DataSet registerDataNode(RegisterDataNodePlan registerDataNodePlan) {
DataNodeRegisterResp dataSet = new DataNodeRegisterResp();
TSStatus status = new TSStatus();
if (nodeInfo.isRegisteredDataNode(registerDataNodePlan.getInfo().getLocation())) {
status.setCode(TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode());
status.setMessage("DataNode already registered.");
} else if (registerDataNodePlan.getInfo().getLocation().getDataNodeId() < 0) {
// Generating a new dataNodeId only when current DataNode doesn't exist yet
registerDataNodePlan.getInfo().getLocation().setDataNodeId(nodeInfo.generateNextNodeId());
getConsensusManager().write(registerDataNodePlan);
// Adjust the maximum RegionGroup number of each StorageGroup
getClusterSchemaManager().adjustMaxRegionGroupCount();
status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
status.setMessage("registerDataNode success.");
}
dataSet.setStatus(status);
dataSet.setDataNodeId(registerDataNodePlan.getInfo().getLocation().getDataNodeId());
dataSet.setConfigNodeList(getRegisteredConfigNodes());
setGlobalConfig(dataSet);
setRatisConfig(dataSet);
return dataSet;
}
/**
* Remove DataNodes
*
* @param removeDataNodePlan RemoveDataNodeReq
* @return DataNodeToStatusResp, The TSStatue will be SUCCEED_STATUS when request is accept,
* DATANODE_NOT_EXIST when some datanode not exist.
*/
public DataSet removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
LOGGER.info("NodeManager start to remove DataNode {}", removeDataNodePlan);
DataNodeRemoveHandler dataNodeRemoveHandler =
new DataNodeRemoveHandler((ConfigManager) configManager);
DataNodeToStatusResp preCheckStatus =
dataNodeRemoveHandler.checkRemoveDataNodeRequest(removeDataNodePlan);
if (preCheckStatus.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.error(
"the remove Data Node request check failed. req: {}, check result: {}",
removeDataNodePlan,
preCheckStatus.getStatus());
return preCheckStatus;
}
// if add request to queue, then return to client
DataNodeToStatusResp dataSet = new DataNodeToStatusResp();
boolean registerSucceed =
configManager.getProcedureManager().removeDataNode(removeDataNodePlan);
TSStatus status;
if (registerSucceed) {
status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
status.setMessage("Server accept the request");
} else {
status = new TSStatus(TSStatusCode.NODE_DELETE_FAILED_ERROR.getStatusCode());
status.setMessage("Server reject the request, maybe request is too much");
}
dataSet.setStatus(status);
LOGGER.info("NodeManager finished to remove DataNode {}", removeDataNodePlan);
return dataSet;
}
/**
* Get TDataNodeConfiguration
*
* @param req GetDataNodeConfigurationPlan
* @return The specific DataNode's configuration or all DataNodes' configuration if dataNodeId in
* GetDataNodeConfigurationPlan is -1
*/
public DataNodeConfigurationResp getDataNodeConfiguration(GetDataNodeConfigurationPlan req) {
return (DataNodeConfigurationResp) getConsensusManager().read(req).getDataset();
}
/**
* Only leader use this interface
*
* @return The number of registered DataNodes
*/
public int getRegisteredDataNodeCount() {
return nodeInfo.getRegisteredDataNodeCount();
}
/**
* Only leader use this interface
*
* @return The number of total cpu cores in online DataNodes
*/
public int getTotalCpuCoreCount() {
return nodeInfo.getTotalCpuCoreCount();
}
/**
* Only leader use this interface
*
* @return All registered DataNodes
*/
public List<TDataNodeConfiguration> getRegisteredDataNodes() {
return nodeInfo.getRegisteredDataNodes();
}
public Map<Integer, TDataNodeLocation> getRegisteredDataNodeLocations() {
Map<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<>();
nodeInfo
.getRegisteredDataNodes()
.forEach(
dataNodeConfiguration ->
dataNodeLocations.put(
dataNodeConfiguration.getLocation().getDataNodeId(),
dataNodeConfiguration.getLocation()));
return dataNodeLocations;
}
public List<TDataNodeInfo> getRegisteredDataNodeInfoList() {
List<TDataNodeInfo> dataNodeInfoList = new ArrayList<>();
List<TDataNodeConfiguration> registeredDataNodes = this.getRegisteredDataNodes();
if (registeredDataNodes != null) {
registeredDataNodes.forEach(
(dataNodeInfo) -> {
TDataNodeInfo info = new TDataNodeInfo();
int dataNodeId = dataNodeInfo.getLocation().getDataNodeId();
info.setDataNodeId(dataNodeId);
info.setStatus(getNodeStatusWithReason(dataNodeId));
info.setRpcAddresss(dataNodeInfo.getLocation().getClientRpcEndPoint().getIp());
info.setRpcPort(dataNodeInfo.getLocation().getClientRpcEndPoint().getPort());
info.setDataRegionNum(0);
info.setSchemaRegionNum(0);
dataNodeInfoList.add(info);
});
}
// Map<DataNodeId, DataRegionNum>
Map<Integer, AtomicInteger> dataRegionNumMap = new HashMap<>();
// Map<DataNodeId, SchemaRegionNum>
Map<Integer, AtomicInteger> schemaRegionNumMap = new HashMap<>();
List<TRegionReplicaSet> regionReplicaSets = getPartitionManager().getAllReplicaSets();
regionReplicaSets.forEach(
regionReplicaSet ->
regionReplicaSet
.getDataNodeLocations()
.forEach(
dataNodeLocation -> {
switch (regionReplicaSet.getRegionId().getType()) {
case SchemaRegion:
schemaRegionNumMap
.computeIfAbsent(
dataNodeLocation.getDataNodeId(), key -> new AtomicInteger())
.getAndIncrement();
break;
case DataRegion:
default:
dataRegionNumMap
.computeIfAbsent(
dataNodeLocation.getDataNodeId(), key -> new AtomicInteger())
.getAndIncrement();
}
}));
AtomicInteger zero = new AtomicInteger(0);
dataNodeInfoList.forEach(
(dataNodesInfo -> {
dataNodesInfo.setSchemaRegionNum(
schemaRegionNumMap.getOrDefault(dataNodesInfo.getDataNodeId(), zero).get());
dataNodesInfo.setDataRegionNum(
dataRegionNumMap.getOrDefault(dataNodesInfo.getDataNodeId(), zero).get());
}));
dataNodeInfoList.sort(Comparator.comparingInt(TDataNodeInfo::getDataNodeId));
return dataNodeInfoList;
}
public List<TConfigNodeInfo> getRegisteredConfigNodeInfoList() {
List<TConfigNodeInfo> configNodeInfoList = new ArrayList<>();
List<TConfigNodeLocation> registeredConfigNodes = this.getRegisteredConfigNodes();
if (registeredConfigNodes != null) {
registeredConfigNodes.forEach(
(configNodeLocation) -> {
TConfigNodeInfo info = new TConfigNodeInfo();
int configNodeId = configNodeLocation.getConfigNodeId();
info.setConfigNodeId(configNodeId);
info.setStatus(getNodeStatusWithReason(configNodeId));
info.setInternalAddress(configNodeLocation.getInternalEndPoint().getIp());
info.setInternalPort(configNodeLocation.getInternalEndPoint().getPort());
info.setRoleType(
configNodeLocation.getInternalEndPoint().equals(CURRENT_NODE)
? RegionRoleType.Leader.name()
: RegionRoleType.Follower.name());
configNodeInfoList.add(info);
});
}
configNodeInfoList.sort(Comparator.comparingInt(TConfigNodeInfo::getConfigNodeId));
return configNodeInfoList;
}
/**
* Only leader use this interface, record the new ConfigNode's information
*
* @param configNodeLocation The new ConfigNode
*/
public void applyConfigNode(TConfigNodeLocation configNodeLocation) {
// Generate new ConfigNode's index
configNodeLocation.setConfigNodeId(nodeInfo.generateNextNodeId());
ApplyConfigNodePlan applyConfigNodePlan = new ApplyConfigNodePlan(configNodeLocation);
getConsensusManager().write(applyConfigNodePlan);
}
public void addMetrics() {
MetricService.getInstance().addMetricSet(new NodeInfoMetrics(nodeInfo));
}
/**
* Only leader use this interface, check the ConfigNode before remove it
*
* @param removeConfigNodePlan RemoveConfigNodePlan
*/
public TSStatus checkConfigNodeBeforeRemove(RemoveConfigNodePlan removeConfigNodePlan) {
removeConfigNodeLock.tryLock();
try {
// Check OnlineConfigNodes number
if (filterConfigNodeThroughStatus(NodeStatus.Running).size() <= 1) {
return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_FAILED.getStatusCode())
.setMessage(
"Remove ConfigNode failed because there is only one ConfigNode in current Cluster.");
}
// Check whether the registeredConfigNodes contain the ConfigNode to be removed.
if (!getRegisteredConfigNodes().contains(removeConfigNodePlan.getConfigNodeLocation())) {
return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_FAILED.getStatusCode())
.setMessage("Remove ConfigNode failed because the ConfigNode not in current Cluster.");
}
// Check whether the remove ConfigNode is leader
TConfigNodeLocation leader = getConsensusManager().getLeader();
if (leader == null) {
return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_FAILED.getStatusCode())
.setMessage(
"Remove ConfigNode failed because the ConfigNodeGroup is on leader election, please retry.");
}
if (leader
.getInternalEndPoint()
.equals(removeConfigNodePlan.getConfigNodeLocation().getInternalEndPoint())) {
// transfer leader
return transferLeader(removeConfigNodePlan, getConsensusManager().getConsensusGroupId());
}
} finally {
removeConfigNodeLock.unlock();
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())
.setMessage("Success remove confignode.");
}
private TSStatus transferLeader(
RemoveConfigNodePlan removeConfigNodePlan, ConsensusGroupId groupId) {
TConfigNodeLocation newLeader =
filterConfigNodeThroughStatus(NodeStatus.Running).stream()
.filter(e -> !e.equals(removeConfigNodePlan.getConfigNodeLocation()))
.findAny()
.get();
ConsensusGenericResponse resp =
getConsensusManager()
.getConsensusImpl()
.transferLeader(groupId, new Peer(groupId, newLeader.getConsensusEndPoint()));
if (!resp.isSuccess()) {
return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_FAILED.getStatusCode())
.setMessage("Remove ConfigNode failed because transfer ConfigNode leader failed.");
}
return new TSStatus(TSStatusCode.NEED_REDIRECTION.getStatusCode())
.setRedirectNode(newLeader.getInternalEndPoint())
.setMessage(
"The ConfigNode to be removed is leader, already transfer Leader to "
+ newLeader
+ ".");
}
public List<TSStatus> merge() {
Map<Integer, TDataNodeLocation> dataNodeLocationMap =
configManager.getNodeManager().getRegisteredDataNodeLocations();
List<TSStatus> dataNodeResponseStatus =
Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
AsyncDataNodeClientPool.getInstance()
.sendAsyncRequestToDataNodeWithRetry(
null, dataNodeLocationMap, DataNodeRequestType.MERGE, dataNodeResponseStatus);
return dataNodeResponseStatus;
}
public List<TSStatus> flush(TFlushReq req) {
Map<Integer, TDataNodeLocation> dataNodeLocationMap =
configManager.getNodeManager().getRegisteredDataNodeLocations();
List<TSStatus> dataNodeResponseStatus =
Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
AsyncDataNodeClientPool.getInstance()
.sendAsyncRequestToDataNodeWithRetry(
req, dataNodeLocationMap, DataNodeRequestType.FLUSH, dataNodeResponseStatus);
return dataNodeResponseStatus;
}
public List<TSStatus> clearCache() {
Map<Integer, TDataNodeLocation> dataNodeLocationMap =
configManager.getNodeManager().getRegisteredDataNodeLocations();
List<TSStatus> dataNodeResponseStatus =
Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
AsyncDataNodeClientPool.getInstance()
.sendAsyncRequestToDataNodeWithRetry(
null, dataNodeLocationMap, DataNodeRequestType.CLEAR_CACHE, dataNodeResponseStatus);
return dataNodeResponseStatus;
}
public List<TSStatus> loadConfiguration() {
Map<Integer, TDataNodeLocation> dataNodeLocationMap =
configManager.getNodeManager().getRegisteredDataNodeLocations();
List<TSStatus> dataNodeResponseStatus =
Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
AsyncDataNodeClientPool.getInstance()
.sendAsyncRequestToDataNodeWithRetry(
null,
dataNodeLocationMap,
DataNodeRequestType.LOAD_CONFIGURATION,
dataNodeResponseStatus);
return dataNodeResponseStatus;
}
public List<TSStatus> setSystemStatus(String status) {
Map<Integer, TDataNodeLocation> dataNodeLocationMap =
configManager.getNodeManager().getRegisteredDataNodeLocations();
List<TSStatus> dataNodeResponseStatus =
Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
AsyncDataNodeClientPool.getInstance()
.sendAsyncRequestToDataNodeWithRetry(
status,
dataNodeLocationMap,
DataNodeRequestType.SET_SYSTEM_STATUS,
dataNodeResponseStatus);
return dataNodeResponseStatus;
}
/** Start the heartbeat service */
public void startHeartbeatService() {
synchronized (scheduleMonitor) {
if (currentHeartbeatFuture == null) {
currentHeartbeatFuture =
ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
heartBeatExecutor,
this::heartbeatLoopBody,
0,
HEARTBEAT_INTERVAL,
TimeUnit.MILLISECONDS);
LOGGER.info("Heartbeat service is started successfully.");
}
}
}
/** loop body of the heartbeat thread */
private void heartbeatLoopBody() {
// The consensusManager of configManager may not be fully initialized at this time
Optional.ofNullable(getConsensusManager())
.ifPresent(
consensusManager -> {
if (getConsensusManager().isLeader()) {
// Generate HeartbeatReq
THeartbeatReq heartbeatReq = genHeartbeatReq();
// Send heartbeat requests to all the registered DataNodes
pingRegisteredDataNodes(heartbeatReq, getRegisteredDataNodes());
// Send heartbeat requests to all the registered ConfigNodes
pingRegisteredConfigNodes(heartbeatReq, getRegisteredConfigNodes());
}
});
}
private THeartbeatReq genHeartbeatReq() {
/* Generate heartbeat request */
THeartbeatReq heartbeatReq = new THeartbeatReq();
heartbeatReq.setHeartbeatTimestamp(System.currentTimeMillis());
// We update RegionGroups' leadership in every 5 heartbeat loop
heartbeatReq.setNeedJudgeLeader(heartbeatCounter.get() % 5 == 0);
// We sample DataNode's load in every 10 heartbeat loop
heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % 10 == 0);
/* Update heartbeat counter */
heartbeatCounter.getAndUpdate((x) -> (x + 1) % 10);
return heartbeatReq;
}
/**
* Send heartbeat requests to all the Registered DataNodes
*
* @param registeredDataNodes DataNodes that registered in cluster
*/
private void pingRegisteredDataNodes(
THeartbeatReq heartbeatReq, List<TDataNodeConfiguration> registeredDataNodes) {
// Send heartbeat requests
for (TDataNodeConfiguration dataNodeInfo : registeredDataNodes) {
DataNodeHeartbeatHandler handler =
new DataNodeHeartbeatHandler(
dataNodeInfo.getLocation(),
(DataNodeHeartbeatCache)
nodeCacheMap.computeIfAbsent(
dataNodeInfo.getLocation().getDataNodeId(),
empty -> new DataNodeHeartbeatCache()),
getPartitionManager().getRegionGroupCacheMap());
AsyncDataNodeHeartbeatClientPool.getInstance()
.getDataNodeHeartBeat(
dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler);
}
}
/**
* Send heartbeat requests to all the Registered ConfigNodes
*
* @param registeredConfigNodes ConfigNodes that registered in cluster
*/
private void pingRegisteredConfigNodes(
THeartbeatReq heartbeatReq, List<TConfigNodeLocation> registeredConfigNodes) {
// Send heartbeat requests
for (TConfigNodeLocation configNodeLocation : registeredConfigNodes) {
if (configNodeLocation.getInternalEndPoint().equals(CURRENT_NODE)) {
// Skip itself
nodeCacheMap.putIfAbsent(
configNodeLocation.getConfigNodeId(), new ConfigNodeHeartbeatCache(configNodeLocation));
continue;
}
ConfigNodeHeartbeatHandler handler =
new ConfigNodeHeartbeatHandler(
(ConfigNodeHeartbeatCache)
nodeCacheMap.computeIfAbsent(
configNodeLocation.getConfigNodeId(),
empty -> new ConfigNodeHeartbeatCache(configNodeLocation)));
AsyncConfigNodeHeartbeatClientPool.getInstance()
.getConfigNodeHeartBeat(
configNodeLocation.getInternalEndPoint(),
heartbeatReq.getHeartbeatTimestamp(),
handler);
}
}
/** Stop the heartbeat service */
public void stopHeartbeatService() {
synchronized (scheduleMonitor) {
if (currentHeartbeatFuture != null) {
currentHeartbeatFuture.cancel(false);
currentHeartbeatFuture = null;
nodeCacheMap.clear();
LOGGER.info("Heartbeat service is stopped successfully.");
}
}
}
public Map<Integer, BaseNodeCache> getNodeCacheMap() {
return nodeCacheMap;
}
public void removeNodeCache(int nodeId) {
nodeCacheMap.remove(nodeId);
}
/**
* Safely get the specific Node's current status for showing cluster
*
* @param nodeId The specific Node's index
* @return The specific Node's current status if the nodeCache contains it, Unknown otherwise
*/
private String getNodeStatusWithReason(int nodeId) {
BaseNodeCache nodeCache = nodeCacheMap.get(nodeId);
return nodeCache == null
? NodeStatus.Unknown.getStatus() + "(NoHeartbeat)"
: nodeCache.getNodeStatusWithReason();
}
/**
* Filter the registered ConfigNodes through the specific NodeStatus
*
* @param status The specific NodeStatus
* @return Filtered ConfigNodes with the specific NodeStatus
*/
public List<TConfigNodeLocation> filterConfigNodeThroughStatus(NodeStatus... status) {
return getRegisteredConfigNodes().stream()
.filter(
registeredConfigNode -> {
int configNodeId = registeredConfigNode.getConfigNodeId();
return nodeCacheMap.containsKey(configNodeId)
&& Arrays.stream(status)
.anyMatch(s -> s.equals(nodeCacheMap.get(configNodeId).getNodeStatus()));
})
.collect(Collectors.toList());
}
/**
* Filter the registered DataNodes through the specific NodeStatus
*
* @param status The specific NodeStatus
* @return Filtered DataNodes with the specific NodeStatus
*/
public List<TDataNodeConfiguration> filterDataNodeThroughStatus(NodeStatus... status) {
return getRegisteredDataNodes().stream()
.filter(
registeredDataNode -> {
int dataNodeId = registeredDataNode.getLocation().getDataNodeId();
return nodeCacheMap.containsKey(dataNodeId)
&& Arrays.stream(status)
.anyMatch(s -> s.equals(nodeCacheMap.get(dataNodeId).getNodeStatus()));
})
.collect(Collectors.toList());
}
/**
* Get the loadScore of each DataNode
*
* @return Map<DataNodeId, loadScore>
*/
public Map<Integer, Long> getAllLoadScores() {
Map<Integer, Long> result = new ConcurrentHashMap<>();
nodeCacheMap.forEach(
(dataNodeId, heartbeatCache) -> result.put(dataNodeId, heartbeatCache.getLoadScore()));
return result;
}
public boolean isNodeRemoving(int dataNodeId) {
DataNodeHeartbeatCache cache =
(DataNodeHeartbeatCache) configManager.getNodeManager().getNodeCacheMap().get(dataNodeId);
if (cache != null) {
return NodeStatus.Removing.equals(cache.getNodeStatus());
}
return false;
}
public void setNodeRemovingStatus(TDataNodeLocation dataNodeLocation) {
DataNodeHeartbeatCache cache =
(DataNodeHeartbeatCache)
configManager.getNodeManager().getNodeCacheMap().get(dataNodeLocation.getDataNodeId());
if (cache != null) {
cache.setRemoving();
}
SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithRetry(
dataNodeLocation.getInternalEndPoint(),
NodeStatus.Removing.getStatus(),
DataNodeRequestType.SET_SYSTEM_STATUS);
}
public List<TConfigNodeLocation> getRegisteredConfigNodes() {
return nodeInfo.getRegisteredConfigNodes();
}
private ConsensusManager getConsensusManager() {
return configManager.getConsensusManager();
}
private ClusterSchemaManager getClusterSchemaManager() {
return configManager.getClusterSchemaManager();
}
private PartitionManager getPartitionManager() {
return configManager.getPartitionManager();
}
}