blob: 954cb0e8ea46d8ca92df7dbeb44074f3cb881f7c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.node;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.scm.HddsServerUtil;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.node.states.*;
import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap;
import org.apache.hadoop.hdds.server.events.Event;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.common.statemachine
.InvalidStateTransitionException;
import org.apache.hadoop.ozone.common.statemachine.StateMachine;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.util.*;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_DEADNODE_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_STALENODE_INTERVAL;
/**
* NodeStateManager maintains the state of all the datanodes in the cluster. All
* the node state change should happen only via NodeStateManager. It also
* runs a heartbeat thread which periodically updates the node state.
* <p>
* The getNode(byState) functions make copy of node maps and then creates a list
* based on that. It should be assumed that these get functions always report
* *stale* information. For example, getting the deadNodeCount followed by
* getNodes(DEAD) could very well produce totally different count. Also
* getNodeCount(HEALTHY) + getNodeCount(DEAD) + getNodeCode(STALE), is not
* guaranteed to add up to the total nodes that we know off. Please treat all
* get functions in this file as a snap-shot of information that is inconsistent
* as soon as you read it.
*/
public class NodeStateManager implements Runnable, Closeable {
/**
* Node's life cycle events.
*/
private enum NodeLifeCycleEvent {
TIMEOUT, RESTORE, RESURRECT, DECOMMISSION, DECOMMISSIONED
}
private static final Logger LOG = LoggerFactory
.getLogger(NodeStateManager.class);
/**
* StateMachine for node lifecycle.
*/
private final StateMachine<NodeState, NodeLifeCycleEvent> stateMachine;
/**
* This is the map which maintains the current state of all datanodes.
*/
private final NodeStateMap nodeStateMap;
/**
* Maintains the mapping from node to pipelines a node is part of.
*/
private final Node2PipelineMap node2PipelineMap;
/**
* Used for publishing node state change events.
*/
private final EventPublisher eventPublisher;
/**
* Maps the event to be triggered when a node state us updated.
*/
private final Map<NodeState, Event<DatanodeDetails>> state2EventMap;
/**
* ExecutorService used for scheduling heartbeat processing thread.
*/
private final ScheduledExecutorService executorService;
/**
* The frequency in which we have run the heartbeat processing thread.
*/
private final long heartbeatCheckerIntervalMs;
/**
* The timeout value which will be used for marking a datanode as stale.
*/
private final long staleNodeIntervalMs;
/**
* The timeout value which will be used for marking a datanode as dead.
*/
private final long deadNodeIntervalMs;
/**
* The future is used to pause/unpause the scheduled checks.
*/
private ScheduledFuture<?> healthCheckFuture;
/**
* Test utility - tracks if health check has been paused (unit tests).
*/
private boolean checkPaused;
/**
* timestamp of the latest heartbeat check process.
*/
private long lastHealthCheck;
/**
* number of times the heart beat check was skipped.
*/
private long skippedHealthChecks;
/**
* Constructs a NodeStateManager instance with the given configuration.
*
* @param conf Configuration
*/
public NodeStateManager(Configuration conf, EventPublisher eventPublisher) {
this.nodeStateMap = new NodeStateMap();
this.node2PipelineMap = new Node2PipelineMap();
this.eventPublisher = eventPublisher;
this.state2EventMap = new HashMap<>();
initialiseState2EventMap();
Set<NodeState> finalStates = new HashSet<>();
finalStates.add(NodeState.DECOMMISSIONED);
this.stateMachine = new StateMachine<>(NodeState.HEALTHY, finalStates);
initializeStateMachine();
heartbeatCheckerIntervalMs = HddsServerUtil
.getScmheartbeatCheckerInterval(conf);
staleNodeIntervalMs = HddsServerUtil.getStaleNodeInterval(conf);
deadNodeIntervalMs = HddsServerUtil.getDeadNodeInterval(conf);
Preconditions.checkState(heartbeatCheckerIntervalMs > 0,
OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL + " should be greater than 0.");
Preconditions.checkState(staleNodeIntervalMs < deadNodeIntervalMs,
OZONE_SCM_STALENODE_INTERVAL + " should be less than" +
OZONE_SCM_DEADNODE_INTERVAL);
executorService = HadoopExecutors.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("SCM Heartbeat Processing Thread - %d").build());
skippedHealthChecks = 0;
checkPaused = false; // accessed only from test functions
scheduleNextHealthCheck();
}
/**
* Populates state2event map.
*/
private void initialiseState2EventMap() {
state2EventMap.put(NodeState.STALE, SCMEvents.STALE_NODE);
state2EventMap.put(NodeState.DEAD, SCMEvents.DEAD_NODE);
state2EventMap
.put(NodeState.HEALTHY, SCMEvents.NON_HEALTHY_TO_HEALTHY_NODE);
}
/*
*
* Node and State Transition Mapping:
*
* State: HEALTHY -------------------> STALE
* Event: TIMEOUT
*
* State: STALE -------------------> DEAD
* Event: TIMEOUT
*
* State: STALE -------------------> HEALTHY
* Event: RESTORE
*
* State: DEAD -------------------> HEALTHY
* Event: RESURRECT
*
* State: HEALTHY -------------------> DECOMMISSIONING
* Event: DECOMMISSION
*
* State: STALE -------------------> DECOMMISSIONING
* Event: DECOMMISSION
*
* State: DEAD -------------------> DECOMMISSIONING
* Event: DECOMMISSION
*
* State: DECOMMISSIONING -------------------> DECOMMISSIONED
* Event: DECOMMISSIONED
*
* Node State Flow
*
* +--------------------------------------------------------+
* | (RESURRECT) |
* | +--------------------------+ |
* | | (RESTORE) | |
* | | | |
* V V | |
* [HEALTHY]------------------->[STALE]------------------->[DEAD]
* | (TIMEOUT) | (TIMEOUT) |
* | | |
* | | |
* | | |
* | | |
* | (DECOMMISSION) | (DECOMMISSION) | (DECOMMISSION)
* | V |
* +------------------->[DECOMMISSIONING]<----------------+
* |
* | (DECOMMISSIONED)
* |
* V
* [DECOMMISSIONED]
*
*/
/**
* Initializes the lifecycle of node state machine.
*/
private void initializeStateMachine() {
stateMachine.addTransition(
NodeState.HEALTHY, NodeState.STALE, NodeLifeCycleEvent.TIMEOUT);
stateMachine.addTransition(
NodeState.STALE, NodeState.DEAD, NodeLifeCycleEvent.TIMEOUT);
stateMachine.addTransition(
NodeState.STALE, NodeState.HEALTHY, NodeLifeCycleEvent.RESTORE);
stateMachine.addTransition(
NodeState.DEAD, NodeState.HEALTHY, NodeLifeCycleEvent.RESURRECT);
stateMachine.addTransition(
NodeState.HEALTHY, NodeState.DECOMMISSIONING,
NodeLifeCycleEvent.DECOMMISSION);
stateMachine.addTransition(
NodeState.STALE, NodeState.DECOMMISSIONING,
NodeLifeCycleEvent.DECOMMISSION);
stateMachine.addTransition(
NodeState.DEAD, NodeState.DECOMMISSIONING,
NodeLifeCycleEvent.DECOMMISSION);
stateMachine.addTransition(
NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONED,
NodeLifeCycleEvent.DECOMMISSIONED);
}
/**
* Adds a new node to the state manager.
*
* @param datanodeDetails DatanodeDetails
*
* @throws NodeAlreadyExistsException if the node is already present
*/
public void addNode(DatanodeDetails datanodeDetails)
throws NodeAlreadyExistsException {
nodeStateMap.addNode(datanodeDetails, stateMachine.getInitialState());
eventPublisher.fireEvent(SCMEvents.NEW_NODE, datanodeDetails);
}
/**
* Adds a pipeline in the node2PipelineMap.
* @param pipeline - Pipeline to be added
*/
public void addPipeline(Pipeline pipeline) {
node2PipelineMap.addPipeline(pipeline);
}
/**
* Get information about the node.
*
* @param datanodeDetails DatanodeDetails
*
* @return DatanodeInfo
*
* @throws NodeNotFoundException if the node is not present
*/
public DatanodeInfo getNode(DatanodeDetails datanodeDetails)
throws NodeNotFoundException {
return nodeStateMap.getNodeInfo(datanodeDetails.getUuid());
}
/**
* Updates the last heartbeat time of the node.
*
* @throws NodeNotFoundException if the node is not present
*/
public void updateLastHeartbeatTime(DatanodeDetails datanodeDetails)
throws NodeNotFoundException {
nodeStateMap.getNodeInfo(datanodeDetails.getUuid())
.updateLastHeartbeatTime();
}
/**
* Returns the current state of the node.
*
* @param datanodeDetails DatanodeDetails
*
* @return NodeState
*
* @throws NodeNotFoundException if the node is not present
*/
public NodeState getNodeState(DatanodeDetails datanodeDetails)
throws NodeNotFoundException {
return nodeStateMap.getNodeState(datanodeDetails.getUuid());
}
/**
* Returns all the node which are in healthy state.
*
* @return list of healthy nodes
*/
public List<DatanodeInfo> getHealthyNodes() {
return getNodes(NodeState.HEALTHY);
}
/**
* Returns all the node which are in stale state.
*
* @return list of stale nodes
*/
public List<DatanodeInfo> getStaleNodes() {
return getNodes(NodeState.STALE);
}
/**
* Returns all the node which are in dead state.
*
* @return list of dead nodes
*/
public List<DatanodeInfo> getDeadNodes() {
return getNodes(NodeState.DEAD);
}
/**
* Returns all the node which are in the specified state.
*
* @param state NodeState
*
* @return list of nodes
*/
public List<DatanodeInfo> getNodes(NodeState state) {
List<DatanodeInfo> nodes = new ArrayList<>();
nodeStateMap.getNodes(state).forEach(
uuid -> {
try {
nodes.add(nodeStateMap.getNodeInfo(uuid));
} catch (NodeNotFoundException e) {
// This should not happen unless someone else other than
// NodeStateManager is directly modifying NodeStateMap and removed
// the node entry after we got the list of UUIDs.
LOG.error("Inconsistent NodeStateMap! " + nodeStateMap);
}
});
return nodes;
}
/**
* Returns all the nodes which have registered to NodeStateManager.
*
* @return all the managed nodes
*/
public List<DatanodeInfo> getAllNodes() {
List<DatanodeInfo> nodes = new ArrayList<>();
nodeStateMap.getAllNodes().forEach(
uuid -> {
try {
nodes.add(nodeStateMap.getNodeInfo(uuid));
} catch (NodeNotFoundException e) {
// This should not happen unless someone else other than
// NodeStateManager is directly modifying NodeStateMap and removed
// the node entry after we got the list of UUIDs.
LOG.error("Inconsistent NodeStateMap! " + nodeStateMap);
}
});
return nodes;
}
/**
* Gets set of pipelineID a datanode belongs to.
* @param dnId - Datanode ID
* @return Set of PipelineID
*/
public Set<PipelineID> getPipelineByDnID(UUID dnId) {
return node2PipelineMap.getPipelines(dnId);
}
/**
* Returns the count of healthy nodes.
*
* @return healthy node count
*/
public int getHealthyNodeCount() {
return getNodeCount(NodeState.HEALTHY);
}
/**
* Returns the count of stale nodes.
*
* @return stale node count
*/
public int getStaleNodeCount() {
return getNodeCount(NodeState.STALE);
}
/**
* Returns the count of dead nodes.
*
* @return dead node count
*/
public int getDeadNodeCount() {
return getNodeCount(NodeState.DEAD);
}
/**
* Returns the count of nodes in specified state.
*
* @param state NodeState
*
* @return node count
*/
public int getNodeCount(NodeState state) {
return nodeStateMap.getNodeCount(state);
}
/**
* Returns the count of all nodes managed by NodeStateManager.
*
* @return node count
*/
public int getTotalNodeCount() {
return nodeStateMap.getTotalNodeCount();
}
/**
* Removes a pipeline from the node2PipelineMap.
* @param pipeline - Pipeline to be removed
*/
public void removePipeline(Pipeline pipeline) {
node2PipelineMap.removePipeline(pipeline);
}
/**
* Adds the given container to the specified datanode.
*
* @param uuid - datanode uuid
* @param containerId - containerID
* @throws NodeNotFoundException - if datanode is not known. For new datanode
* use addDatanodeInContainerMap call.
*/
public void addContainer(final UUID uuid,
final ContainerID containerId)
throws NodeNotFoundException {
nodeStateMap.addContainer(uuid, containerId);
}
/**
* Update set of containers available on a datanode.
* @param uuid - DatanodeID
* @param containerIds - Set of containerIDs
* @throws NodeNotFoundException - if datanode is not known.
*/
public void setContainers(UUID uuid, Set<ContainerID> containerIds)
throws NodeNotFoundException {
nodeStateMap.setContainers(uuid, containerIds);
}
/**
* Return set of containerIDs available on a datanode.
* @param uuid - DatanodeID
* @return - set of containerIDs
*/
public Set<ContainerID> getContainers(UUID uuid)
throws NodeNotFoundException {
return nodeStateMap.getContainers(uuid);
}
/**
* Move Stale or Dead node to healthy if we got a heartbeat from them.
* Move healthy nodes to stale nodes if it is needed.
* Move Stales node to dead if needed.
*
* @see Thread#run()
*/
@Override
public void run() {
if (shouldSkipCheck()) {
skippedHealthChecks++;
LOG.info("Detected long delay in scheduling HB processing thread. "
+ "Skipping heartbeat checks for one iteration.");
} else {
checkNodesHealth();
}
// we purposefully make this non-deterministic. Instead of using a
// scheduleAtFixedFrequency we will just go to sleep
// and wake up at the next rendezvous point, which is currentTime +
// heartbeatCheckerIntervalMs. This leads to the issue that we are now
// heart beating not at a fixed cadence, but clock tick + time taken to
// work.
//
// This time taken to work can skew the heartbeat processor thread.
// The reason why we don't care is because of the following reasons.
//
// 1. checkerInterval is general many magnitudes faster than datanode HB
// frequency.
//
// 2. if we have too much nodes, the SCM would be doing only HB
// processing, this could lead to SCM's CPU starvation. With this
// approach we always guarantee that HB thread sleeps for a little while.
//
// 3. It is possible that we will never finish processing the HB's in the
// thread. But that means we have a mis-configured system. We will warn
// the users by logging that information.
//
// 4. And the most important reason, heartbeats are not blocked even if
// this thread does not run, they will go into the processing queue.
scheduleNextHealthCheck();
}
private void checkNodesHealth() {
/*
*
* staleNodeDeadline healthyNodeDeadline
* | |
* Dead | Stale | Healthy
* Node | Node | Node
* Window | Window | Window
* ----------------+----------------------------------+------------------->
* >>-->> time-line >>-->>
*
* Here is the logic of computing the health of a node.
     *
     * 1. We get the current time and look back that the time
     *  when we got a heartbeat from a node.
     * 
     * 2. If the last heartbeat was within the window of healthy node we mark
     *  it as healthy.
     * 
     * 3. If the last HB Time stamp is longer and falls within the window of
     *  Stale Node time, we will mark it as Stale.
     * 
     * 4. If the last HB time is older than the Stale Window, then the node is
     * marked as dead.
*
* The Processing starts from current time and looks backwards in time.
*/
long processingStartTime = Time.monotonicNow();
// After this time node is considered to be stale.
long healthyNodeDeadline = processingStartTime - staleNodeIntervalMs;
// After this time node is considered to be dead.
long staleNodeDeadline = processingStartTime - deadNodeIntervalMs;
Predicate<Long> healthyNodeCondition =
(lastHbTime) -> lastHbTime >= healthyNodeDeadline;
// staleNodeCondition is superset of stale and dead node
Predicate<Long> staleNodeCondition =
(lastHbTime) -> lastHbTime < healthyNodeDeadline;
Predicate<Long> deadNodeCondition =
(lastHbTime) -> lastHbTime < staleNodeDeadline;
try {
for (NodeState state : NodeState.values()) {
List<UUID> nodes = nodeStateMap.getNodes(state);
for (UUID id : nodes) {
DatanodeInfo node = nodeStateMap.getNodeInfo(id);
switch (state) {
case HEALTHY:
// Move the node to STALE if the last heartbeat time is less than
// configured stale-node interval.
updateNodeState(node, staleNodeCondition, state,
NodeLifeCycleEvent.TIMEOUT);
break;
case STALE:
// Move the node to DEAD if the last heartbeat time is less than
// configured dead-node interval.
updateNodeState(node, deadNodeCondition, state,
NodeLifeCycleEvent.TIMEOUT);
// Restore the node if we have received heartbeat before configured
// stale-node interval.
updateNodeState(node, healthyNodeCondition, state,
NodeLifeCycleEvent.RESTORE);
break;
case DEAD:
// Resurrect the node if we have received heartbeat before
// configured stale-node interval.
updateNodeState(node, healthyNodeCondition, state,
NodeLifeCycleEvent.RESURRECT);
break;
// We don't do anything for DECOMMISSIONING and DECOMMISSIONED in
// heartbeat processing.
case DECOMMISSIONING:
case DECOMMISSIONED:
default:
}
}
}
} catch (NodeNotFoundException e) {
// This should not happen unless someone else other than
// NodeStateManager is directly modifying NodeStateMap and removed
// the node entry after we got the list of UUIDs.
LOG.error("Inconsistent NodeStateMap! " + nodeStateMap);
}
long processingEndTime = Time.monotonicNow();
//If we have taken too much time for HB processing, log that information.
if ((processingEndTime - processingStartTime) >
heartbeatCheckerIntervalMs) {
LOG.error("Total time spend processing datanode HB's is greater than " +
"configured values for datanode heartbeats. Please adjust the" +
" heartbeat configs. Time Spend on HB processing: {} seconds " +
"Datanode heartbeat Interval: {} seconds.",
TimeUnit.MILLISECONDS
.toSeconds(processingEndTime - processingStartTime),
heartbeatCheckerIntervalMs);
}
}
private void scheduleNextHealthCheck() {
if (!Thread.currentThread().isInterrupted() &&
!executorService.isShutdown()) {
//BUGBUG: The return future needs to checked here to make sure the
// exceptions are handled correctly.
healthCheckFuture = executorService.schedule(this,
heartbeatCheckerIntervalMs, TimeUnit.MILLISECONDS);
} else {
LOG.warn("Current Thread is interrupted, shutting down HB processing " +
"thread for Node Manager.");
}
lastHealthCheck = Time.monotonicNow();
}
/**
* if the time since last check exceeds the stale|dead node interval, skip.
* such long delays might be caused by a JVM pause. SCM cannot make reliable
* conclusions about datanode health in such situations.
* @return : true indicates skip HB checks
*/
private boolean shouldSkipCheck() {
long currentTime = Time.monotonicNow();
long minInterval = Math.min(staleNodeIntervalMs, deadNodeIntervalMs);
return ((currentTime - lastHealthCheck) >= minInterval);
}
/**
* Updates the node state if the condition satisfies.
*
* @param node DatanodeInfo
* @param condition condition to check
* @param state current state of node
* @param lifeCycleEvent NodeLifeCycleEvent to be applied if condition
* matches
*
* @throws NodeNotFoundException if the node is not present
*/
private void updateNodeState(DatanodeInfo node, Predicate<Long> condition,
NodeState state, NodeLifeCycleEvent lifeCycleEvent)
throws NodeNotFoundException {
try {
if (condition.test(node.getLastHeartbeatTime())) {
NodeState newState = stateMachine.getNextState(state, lifeCycleEvent);
nodeStateMap.updateNodeState(node.getUuid(), state, newState);
if (state2EventMap.containsKey(newState)) {
eventPublisher.fireEvent(state2EventMap.get(newState), node);
}
}
} catch (InvalidStateTransitionException e) {
LOG.warn("Invalid state transition of node {}." +
" Current state: {}, life cycle event: {}",
node, state, lifeCycleEvent);
}
}
@Override
public void close() {
executorService.shutdown();
try {
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
LOG.error("Unable to shutdown NodeStateManager properly.");
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
/**
* Test Utility : return number of times heartbeat check was skipped.
* @return : count of times HB process was skipped
*/
@VisibleForTesting
long getSkippedHealthChecks() {
return skippedHealthChecks;
}
/**
* Test Utility : Pause the periodic node hb check.
* @return ScheduledFuture for the scheduled check that got cancelled.
*/
@VisibleForTesting
ScheduledFuture pause() {
if (executorService.isShutdown() || checkPaused) {
return null;
}
checkPaused = healthCheckFuture.cancel(false);
return healthCheckFuture;
}
/**
* Test utility : unpause the periodic node hb check.
* @return ScheduledFuture for the next scheduled check
*/
@VisibleForTesting
ScheduledFuture unpause() {
if (executorService.isShutdown()) {
return null;
}
if (checkPaused) {
Preconditions.checkState(((healthCheckFuture == null)
|| healthCheckFuture.isCancelled()
|| healthCheckFuture.isDone()));
checkPaused = false;
/**
* We do not call scheduleNextHealthCheck because we are
* not updating the lastHealthCheck timestamp.
*/
healthCheckFuture = executorService.schedule(this,
heartbeatCheckerIntervalMs, TimeUnit.MILLISECONDS);
}
return healthCheckFuture;
}
}