| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hdds.scm.node; |
| |
| import java.io.Closeable; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.Predicate; |
| |
| import org.apache.hadoop.hdds.conf.ConfigurationSource; |
| import org.apache.hadoop.hdds.protocol.DatanodeDetails; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; |
| import org.apache.hadoop.hdds.protocol.proto |
| .StorageContainerDatanodeProtocolProtos.LayoutVersionProto; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState; |
| import org.apache.hadoop.hdds.scm.container.ContainerID; |
| import org.apache.hadoop.hdds.scm.events.SCMEvents; |
| import org.apache.hadoop.hdds.scm.ha.SCMContext; |
| import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap; |
| import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException; |
| import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; |
| import org.apache.hadoop.hdds.scm.node.states.NodeStateMap; |
| import org.apache.hadoop.hdds.scm.pipeline.Pipeline; |
| import org.apache.hadoop.hdds.scm.pipeline.PipelineID; |
| import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationCheckpoint; |
| import org.apache.hadoop.hdds.server.events.Event; |
| import org.apache.hadoop.hdds.server.events.EventPublisher; |
| import org.apache.hadoop.hdds.utils.HddsServerUtil; |
| import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException; |
| import org.apache.hadoop.ozone.common.statemachine.StateMachine; |
| import org.apache.hadoop.ozone.upgrade.LayoutVersionManager; |
| import org.apache.hadoop.util.Time; |
| import org.apache.hadoop.util.concurrent.HadoopExecutors; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| |
| import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD; |
| import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY; |
| import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY_READONLY; |
| import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE; |
| import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL; |
| import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL; |
| import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; |
| import static org.apache.hadoop.hdds.scm.events.SCMEvents.HEALTHY_READONLY_NODE; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * NodeStateManager maintains the state of all the datanodes in the cluster. All |
| * the node state change should happen only via NodeStateManager. It also |
| * runs a heartbeat thread which periodically updates the node state. |
| * <p> |
| * The getNode(byState) functions make copy of node maps and then creates a list |
| * based on that. It should be assumed that these get functions always report |
| * *stale* information. For example, getting the deadNodeCount followed by |
| * getNodes(DEAD) could very well produce totally different count. Also |
| * getNodeCount(HEALTHY) + getNodeCount(DEAD) + getNodeCode(STALE), is not |
| * guaranteed to add up to the total nodes that we know off. Please treat all |
| * get functions in this file as a snap-shot of information that is inconsistent |
| * as soon as you read it. |
| */ |
| public class NodeStateManager implements Runnable, Closeable { |
| |
| /** |
| * Node's life cycle events. |
| */ |
| private enum NodeLifeCycleEvent { |
| TIMEOUT, RESTORE, RESURRECT, LAYOUT_MISMATCH, LAYOUT_MATCH |
| } |
| |
| private static final Logger LOG = LoggerFactory |
| .getLogger(NodeStateManager.class); |
| |
| |
| /** |
| * StateMachine for node lifecycle. |
| */ |
| private final StateMachine<NodeState, NodeLifeCycleEvent> nodeHealthSM; |
| /** |
| * This is the map which maintains the current state of all datanodes. |
| */ |
| private final NodeStateMap nodeStateMap; |
| /** |
| * Maintains the mapping from node to pipelines a node is part of. |
| */ |
| private final Node2PipelineMap node2PipelineMap; |
| /** |
| * Used for publishing node state change events. |
| */ |
| private final EventPublisher eventPublisher; |
| /** |
| * Maps the event to be triggered when a node state us updated. |
| */ |
| private final Map<NodeState, Event<DatanodeDetails>> state2EventMap; |
| /** |
| * ExecutorService used for scheduling heartbeat processing thread. |
| */ |
| private final ScheduledExecutorService executorService; |
| /** |
| * The frequency in which we have run the heartbeat processing thread. |
| */ |
| private final long heartbeatCheckerIntervalMs; |
| /** |
| * The timeout value which will be used for marking a datanode as stale. |
| */ |
| private final long staleNodeIntervalMs; |
| /** |
| * The timeout value which will be used for marking a datanode as dead. |
| */ |
| private final long deadNodeIntervalMs; |
| |
| /** |
| * The future is used to pause/unpause the scheduled checks. |
| */ |
| private ScheduledFuture<?> healthCheckFuture; |
| |
| /** |
| * Test utility - tracks if health check has been paused (unit tests). |
| */ |
| private boolean checkPaused; |
| |
| /** |
| * timestamp of the latest heartbeat check process. |
| */ |
| private long lastHealthCheck; |
| |
| /** |
| * number of times the heart beat check was skipped. |
| */ |
| private long skippedHealthChecks; |
| |
| private LayoutVersionManager layoutVersionManager; |
| |
| /** |
| * Conditions to check whether a node's metadata layout version matches |
| * that of SCM. |
| */ |
| private final Predicate<LayoutVersionProto> layoutMatchCondition; |
| private final Predicate<LayoutVersionProto> layoutMisMatchCondition; |
| |
| /** |
| * Constructs a NodeStateManager instance with the given configuration. |
| * |
| * @param conf Configuration |
| * @param eventPublisher event publisher |
| * @param layoutManager Layout version manager |
| */ |
| public NodeStateManager(ConfigurationSource conf, |
| EventPublisher eventPublisher, |
| LayoutVersionManager layoutManager, |
| SCMContext scmContext) { |
| this.layoutVersionManager = layoutManager; |
| this.nodeStateMap = new NodeStateMap(); |
| this.node2PipelineMap = new Node2PipelineMap(); |
| this.eventPublisher = eventPublisher; |
| this.state2EventMap = new HashMap<>(); |
| initialiseState2EventMap(); |
| Set<NodeState> finalStates = new HashSet<>(); |
| this.nodeHealthSM = new StateMachine<>(NodeState.HEALTHY, |
| finalStates); |
| initializeStateMachines(); |
| heartbeatCheckerIntervalMs = HddsServerUtil |
| .getScmheartbeatCheckerInterval(conf); |
| staleNodeIntervalMs = HddsServerUtil.getStaleNodeInterval(conf); |
| deadNodeIntervalMs = HddsServerUtil.getDeadNodeInterval(conf); |
| Preconditions.checkState(heartbeatCheckerIntervalMs > 0, |
| OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL + " should be greater than 0."); |
| Preconditions.checkState(staleNodeIntervalMs < deadNodeIntervalMs, |
| OZONE_SCM_STALENODE_INTERVAL + " should be less than" + |
| OZONE_SCM_DEADNODE_INTERVAL); |
| executorService = HadoopExecutors.newScheduledThreadPool(1, |
| new ThreadFactoryBuilder().setDaemon(true) |
| .setNameFormat("SCM Heartbeat Processing Thread - %d").build()); |
| |
| skippedHealthChecks = 0; |
| checkPaused = false; // accessed only from test functions |
| |
| // This will move a datanode out of healthy readonly state if passed. |
| layoutMatchCondition = (layout) -> |
| (layout.getMetadataLayoutVersion() == |
| layoutVersionManager.getMetadataLayoutVersion()) && |
| (layout.getSoftwareLayoutVersion() == |
| layoutVersionManager.getSoftwareLayoutVersion()); |
| |
| // This will move a datanode in to healthy readonly state if passed. |
| // When SCM finishes finalizing, it will automatically move all datanodes |
| // to healthy readonly as well. |
| // If nodes heartbeat while SCM is finalizing, they should not be moved |
| // to healthy readonly until SCM finishes updating its MLV, hence the |
| // checkpoint check here. |
| layoutMisMatchCondition = (layout) -> |
| scmContext.isFinalizationCheckpointCrossed( |
| FinalizationCheckpoint.MLV_EQUALS_SLV) && |
| !layoutMatchCondition.test(layout); |
| |
| scheduleNextHealthCheck(); |
| } |
| |
| /** |
| * Populates state2event map. |
| */ |
| private void initialiseState2EventMap() { |
| state2EventMap.put(STALE, SCMEvents.STALE_NODE); |
| state2EventMap.put(DEAD, SCMEvents.DEAD_NODE); |
| state2EventMap |
| .put(HEALTHY, SCMEvents.HEALTHY_READONLY_TO_HEALTHY_NODE); |
| state2EventMap |
| .put(NodeState.HEALTHY_READONLY, HEALTHY_READONLY_NODE); |
| } |
| |
| /* |
| * |
| * Node and State Transition Mapping: |
| * |
| * State: HEALTHY -------------------> STALE |
| * Event: TIMEOUT |
| * |
| * State: HEALTHY -------------------> HEALTHY_READONLY |
| * Event: LAYOUT_MISMATCH |
| * |
| * State: HEALTHY_READONLY -------------------> HEALTHY |
| * Event: LAYOUT_MATCH |
| * |
| * State: HEALTHY_READONLY -------------------> STALE |
| * Event: TIMEOUT |
| * |
| * State: STALE -------------------> HEALTHY_READONLY |
| * Event: RESTORE |
| * |
| * State: DEAD -------------------> HEALTHY_READONLY |
| * Event: RESURRECT |
| * |
| * State: STALE -------------------> DEAD |
| * Event: TIMEOUT |
| * |
| * Node State Flow |
| * |
| * +-----<---------<---+ |
| * | (RESURRECT) | |
| * +-->-----(LAYOUT_MISMATCH)-->--+ V | |
| * | | | ^ |
| * | | | | |
| * | V V | |
| * | +-----(LAYOUT_MATCH)--[HEALTHY_READONLY] | |
| * | | ^ | | |
| * | | | | ^ |
| * | | | |(TIMEOUT) | |
| * ^ | (RESTORE) | | | |
| * | V | V | |
| * [HEALTHY]---->----------------->[STALE]------->--------->[DEAD] |
| * (TIMEOUT) (TIMEOUT) |
| * |
| */ |
| |
| /** |
| * Initializes the lifecycle of node state machine. |
| */ |
| private void initializeStateMachines() { |
| nodeHealthSM.addTransition(HEALTHY_READONLY, HEALTHY, |
| NodeLifeCycleEvent.LAYOUT_MATCH); |
| nodeHealthSM.addTransition(HEALTHY_READONLY, STALE, |
| NodeLifeCycleEvent.TIMEOUT); |
| nodeHealthSM.addTransition(HEALTHY, STALE, NodeLifeCycleEvent.TIMEOUT); |
| nodeHealthSM.addTransition(HEALTHY, HEALTHY_READONLY, |
| NodeLifeCycleEvent.LAYOUT_MISMATCH); |
| nodeHealthSM.addTransition(STALE, DEAD, NodeLifeCycleEvent.TIMEOUT); |
| nodeHealthSM.addTransition(STALE, HEALTHY_READONLY, |
| NodeLifeCycleEvent.RESTORE); |
| nodeHealthSM.addTransition(DEAD, HEALTHY_READONLY, |
| NodeLifeCycleEvent.RESURRECT); |
| } |
| |
| /** |
| * Adds a new node to the state manager. |
| * |
| * @param datanodeDetails DatanodeDetails |
| * @param layoutInfo LayoutVersionProto |
| * |
| * @throws NodeAlreadyExistsException if the node is already present |
| */ |
| public void addNode(DatanodeDetails datanodeDetails, |
| LayoutVersionProto layoutInfo) throws NodeAlreadyExistsException { |
| NodeStatus newNodeStatus = newNodeStatus(datanodeDetails, layoutInfo); |
| nodeStateMap.addNode(datanodeDetails, newNodeStatus, layoutInfo); |
| UUID dnID = datanodeDetails.getUuid(); |
| try { |
| updateLastKnownLayoutVersion(datanodeDetails, layoutInfo); |
| } catch (NodeNotFoundException ex) { |
| LOG.error("Inconsistent NodeStateMap! Datanode with ID {} was " + |
| "added but not found in map: {}", dnID, nodeStateMap); |
| } |
| } |
| |
| /** |
| * When a node registers with SCM, the operational state stored on the |
| * datanode is the source of truth. Therefore, if the datanode reports |
| * anything other than IN_SERVICE on registration, the state in SCM should be |
| * updated to reflect the datanode state. |
| * @param dn DatanodeDetails reported by the datanode |
| */ |
| private NodeStatus newNodeStatus(DatanodeDetails dn, |
| LayoutVersionProto layoutInfo) { |
| HddsProtos.NodeOperationalState dnOpState = dn.getPersistedOpState(); |
| NodeState state = HEALTHY; |
| |
| if (layoutMisMatchCondition.test(layoutInfo)) { |
| state = HEALTHY_READONLY; |
| } |
| |
| if (dnOpState != NodeOperationalState.IN_SERVICE) { |
| LOG.info("Updating nodeOperationalState on registration as the " + |
| "datanode has a persisted state of {} and expiry of {}", |
| dnOpState, dn.getPersistedOpStateExpiryEpochSec()); |
| return new NodeStatus(dnOpState, state, |
| dn.getPersistedOpStateExpiryEpochSec()); |
| } else { |
| return new NodeStatus( |
| NodeOperationalState.IN_SERVICE, state); |
| } |
| } |
| |
| /** |
| * Adds a pipeline in the node2PipelineMap. |
| * @param pipeline - Pipeline to be added |
| */ |
| public void addPipeline(Pipeline pipeline) { |
| node2PipelineMap.addPipeline(pipeline); |
| } |
| |
| /** |
| * Get the count of pipelines associated to single datanode. |
| * @param datanodeDetails single datanode |
| * @return number of pipelines associated with it |
| */ |
| public int getPipelinesCount(DatanodeDetails datanodeDetails) { |
| return node2PipelineMap.getPipelinesCount(datanodeDetails.getUuid()); |
| } |
| |
| /** |
| * Get information about the node. |
| * |
| * @param datanodeDetails DatanodeDetails |
| * |
| * @return DatanodeInfo |
| * |
| * @throws NodeNotFoundException if the node is not present |
| */ |
| public DatanodeInfo getNode(DatanodeDetails datanodeDetails) |
| throws NodeNotFoundException { |
| return nodeStateMap.getNodeInfo(datanodeDetails.getUuid()); |
| } |
| |
| /** |
| * Updates the last heartbeat time of the node. |
| * |
| * @throws NodeNotFoundException if the node is not present |
| */ |
| public void updateLastHeartbeatTime(DatanodeDetails datanodeDetails) |
| throws NodeNotFoundException { |
| nodeStateMap.getNodeInfo(datanodeDetails.getUuid()) |
| .updateLastHeartbeatTime(); |
| } |
| |
| /** |
| * Updates the last known layout version of the node. |
| * @param datanodeDetails DataNode Details |
| * @param layoutInfo DataNode Layout Information |
| * |
| * @throws NodeNotFoundException if the node is not present |
| */ |
| public void updateLastKnownLayoutVersion(DatanodeDetails datanodeDetails, |
| LayoutVersionProto layoutInfo) |
| throws NodeNotFoundException { |
| nodeStateMap.getNodeInfo(datanodeDetails.getUuid()) |
| .updateLastKnownLayoutVersion(layoutInfo); |
| } |
| |
| /** |
| * Update node. |
| * |
| * @param datanodeDetails the datanode details |
| * @param layoutInfo the layoutInfo |
| * @throws NodeNotFoundException the node not found exception |
| */ |
| public void updateNode(DatanodeDetails datanodeDetails, |
| LayoutVersionProto layoutInfo) |
| throws NodeNotFoundException { |
| DatanodeInfo datanodeInfo = |
| nodeStateMap.getNodeInfo(datanodeDetails.getUuid()); |
| NodeStatus newNodeStatus = newNodeStatus(datanodeDetails, layoutInfo); |
| LOG.info("updating node {} from {} to {} with status {}", |
| datanodeDetails.getUuidString(), |
| datanodeInfo, |
| datanodeDetails, |
| newNodeStatus); |
| nodeStateMap.updateNode(datanodeDetails, newNodeStatus, layoutInfo); |
| updateLastKnownLayoutVersion(datanodeDetails, layoutInfo); |
| } |
| |
| /** |
| * Returns the current state of the node. |
| * |
| * @param datanodeDetails DatanodeDetails |
| * |
| * @return NodeState |
| * |
| * @throws NodeNotFoundException if the node is not present |
| */ |
| public NodeStatus getNodeStatus(DatanodeDetails datanodeDetails) |
| throws NodeNotFoundException { |
| return nodeStateMap.getNodeStatus(datanodeDetails.getUuid()); |
| } |
| |
| /** |
| * Returns all the node which are in healthy state, ignoring the operational |
| * state. |
| * |
| * @return list of healthy nodes |
| */ |
| public List<DatanodeInfo> getHealthyNodes() { |
| return getNodes(null, HEALTHY); |
| } |
| |
| /** |
| * Returns all the node which are in stale state, ignoring the operational |
| * state. |
| * |
| * @return list of stale nodes |
| */ |
| public List<DatanodeInfo> getStaleNodes() { |
| return getNodes(null, NodeState.STALE); |
| } |
| |
| /** |
| * Returns all the node which are in dead state, ignoring the operational |
| * state. |
| * |
| * @return list of dead nodes |
| */ |
| public List<DatanodeInfo> getDeadNodes() { |
| return getNodes(null, NodeState.DEAD); |
| } |
| |
| /** |
| * Returns all the nodes with the specified status. |
| * |
| * @param status NodeStatus |
| * |
| * @return list of nodes |
| */ |
| public List<DatanodeInfo> getNodes(NodeStatus status) { |
| return nodeStateMap.getDatanodeInfos(status); |
| } |
| |
| /** |
| * Returns all the nodes with the specified operationalState and health. |
| * |
| * @param opState The operationalState of the node |
| * @param health The node health |
| * |
| * @return list of nodes matching the passed states |
| */ |
| public List<DatanodeInfo> getNodes( |
| NodeOperationalState opState, NodeState health) { |
| return nodeStateMap.getDatanodeInfos(opState, health); |
| } |
| |
| /** |
| * Returns all the nodes which have registered to NodeStateManager. |
| * |
| * @return all the managed nodes |
| */ |
| public List<DatanodeInfo> getAllNodes() { |
| return nodeStateMap.getAllDatanodeInfos(); |
| } |
| |
| /** |
| * Sets the operational state of the given node. Intended to be called when |
| * a node is being decommissioned etc. |
| * |
| * @param dn The datanode having its state set |
| * @param newState The new operational State of the node. |
| */ |
| public void setNodeOperationalState(DatanodeDetails dn, |
| NodeOperationalState newState) throws NodeNotFoundException { |
| setNodeOperationalState(dn, newState, 0); |
| } |
| |
| /** |
| * Sets the operational state of the given node. Intended to be called when |
| * a node is being decommissioned etc. |
| * |
| * @param dn The datanode having its state set |
| * @param newState The new operational State of the node. |
| * @param stateExpiryEpochSec The number of seconds from the epoch when the |
| * operational state should expire. Passing zero |
| * indicates the state will never expire |
| */ |
| public void setNodeOperationalState(DatanodeDetails dn, |
| NodeOperationalState newState, |
| long stateExpiryEpochSec) throws NodeNotFoundException { |
| DatanodeInfo dni = nodeStateMap.getNodeInfo(dn.getUuid()); |
| NodeStatus oldStatus = dni.getNodeStatus(); |
| if (oldStatus.getOperationalState() != newState || |
| oldStatus.getOpStateExpiryEpochSeconds() != stateExpiryEpochSec) { |
| nodeStateMap.updateNodeOperationalState( |
| dn.getUuid(), newState, stateExpiryEpochSec); |
| // This will trigger an event based on the nodes health when the |
| // operational state changes. Eg a node that was IN_MAINTENANCE goes |
| // to IN_SERVICE + HEALTHY. This will trigger the HEALTHY node event to |
| // create new pipelines. OTH, if the nodes goes IN_MAINTENANCE to |
| // IN_SERVICE + DEAD, it will trigger the dead node handler to remove its |
| // container replicas. Sometimes the event will do nothing, but it will |
| // not do any harm either. Eg DECOMMISSIONING -> DECOMMISSIONED + HEALTHY |
| // but the pipeline creation logic will ignore decommissioning nodes. |
| if (oldStatus.getOperationalState() != newState) { |
| fireHealthStateEvent(oldStatus.getHealth(), dn); |
| } |
| } |
| } |
| |
| /** |
| * Gets set of pipelineID a datanode belongs to. |
| * @param dnId - Datanode ID |
| * @return Set of PipelineID |
| */ |
| public Set<PipelineID> getPipelineByDnID(UUID dnId) { |
| return node2PipelineMap.getPipelines(dnId); |
| } |
| |
| /** |
| * Returns the count of healthy nodes, ignoring operational state. |
| * |
| * @return healthy node count |
| */ |
| public int getHealthyNodeCount() { |
| return getHealthyNodes().size(); |
| } |
| |
| /** |
| * Returns the count of stale nodes, ignoring operational state. |
| * |
| * @return stale node count |
| */ |
| public int getStaleNodeCount() { |
| return getStaleNodes().size(); |
| } |
| |
| /** |
| * Returns the count of dead nodes, ignoring operational state. |
| * |
| * @return dead node count |
| */ |
| public int getDeadNodeCount() { |
| return getDeadNodes().size(); |
| } |
| |
| /** |
| * Returns the count of nodes in specified status. |
| * |
| * @param status NodeState |
| * |
| * @return node count |
| */ |
| public int getNodeCount(NodeStatus status) { |
| return nodeStateMap.getNodeCount(status); |
| } |
| |
| /** |
| * Returns the count of nodes in the specified states. |
| * |
| * @param opState The operational state of the node |
| * @param health The health of the node |
| * |
| * @return node count |
| */ |
| public int getNodeCount(NodeOperationalState opState, NodeState health) { |
| return nodeStateMap.getNodeCount(opState, health); |
| } |
| |
| /** |
| * Returns the count of all nodes managed by NodeStateManager. |
| * |
| * @return node count |
| */ |
| public int getTotalNodeCount() { |
| return nodeStateMap.getTotalNodeCount(); |
| } |
| |
| /** |
| * Removes a pipeline from the node2PipelineMap. |
| * @param pipeline - Pipeline to be removed |
| */ |
| public void removePipeline(Pipeline pipeline) { |
| node2PipelineMap.removePipeline(pipeline); |
| } |
| |
| /** |
| * Adds the given container to the specified datanode. |
| * |
| * @param uuid - datanode uuid |
| * @param containerId - containerID |
| * @throws NodeNotFoundException - if datanode is not known. For new datanode |
| * use addDatanodeInContainerMap call. |
| */ |
| public void addContainer(final UUID uuid, |
| final ContainerID containerId) |
| throws NodeNotFoundException { |
| nodeStateMap.addContainer(uuid, containerId); |
| } |
| |
| /** |
| * Removes the given container from the specified datanode. |
| * |
| * @param uuid - datanode uuid |
| * @param containerId - containerID |
| * @throws NodeNotFoundException - if datanode is not known. For new datanode |
| * use addDatanodeInContainerMap call. |
| */ |
| public void removeContainer(final UUID uuid, |
| final ContainerID containerId) |
| throws NodeNotFoundException { |
| nodeStateMap.removeContainer(uuid, containerId); |
| } |
| |
| /** |
| * Update set of containers available on a datanode. |
| * @param uuid - DatanodeID |
| * @param containerIds - Set of containerIDs |
| * @throws NodeNotFoundException - if datanode is not known. |
| */ |
| public void setContainers(UUID uuid, Set<ContainerID> containerIds) |
| throws NodeNotFoundException { |
| nodeStateMap.setContainers(uuid, containerIds); |
| } |
| |
| /** |
| * Return set of containerIDs available on a datanode. This is a copy of the |
| * set which resides inside NodeStateMap and hence can be modified without |
| * synchronization or side effects. |
| * @param uuid - DatanodeID |
| * @return - set of containerIDs |
| */ |
| public Set<ContainerID> getContainers(UUID uuid) |
| throws NodeNotFoundException { |
| return nodeStateMap.getContainers(uuid); |
| } |
| |
| /** |
| * Move Stale or Dead node to healthy if we got a heartbeat from them. |
| * Move healthy nodes to stale nodes if it is needed. |
| * Move Stales node to dead if needed. |
| * |
| * @see Thread#run() |
| */ |
| @Override |
| public void run() { |
| |
| if (shouldSkipCheck()) { |
| skippedHealthChecks++; |
| LOG.info("Detected long delay in scheduling HB processing thread. " |
| + "Skipping heartbeat checks for one iteration."); |
| } else { |
| checkNodesHealth(); |
| } |
| |
| // we purposefully make this non-deterministic. Instead of using a |
| // scheduleAtFixedFrequency we will just go to sleep |
| // and wake up at the next rendezvous point, which is currentTime + |
| // heartbeatCheckerIntervalMs. This leads to the issue that we are now |
| // heart beating not at a fixed cadence, but clock tick + time taken to |
| // work. |
| // |
| // This time taken to work can skew the heartbeat processor thread. |
| // The reason why we don't care is because of the following reasons. |
| // |
| // 1. checkerInterval is general many magnitudes faster than datanode HB |
| // frequency. |
| // |
| // 2. if we have too much nodes, the SCM would be doing only HB |
| // processing, this could lead to SCM's CPU starvation. With this |
| // approach we always guarantee that HB thread sleeps for a little while. |
| // |
| // 3. It is possible that we will never finish processing the HB's in the |
| // thread. But that means we have a mis-configured system. We will warn |
| // the users by logging that information. |
| // |
| // 4. And the most important reason, heartbeats are not blocked even if |
| // this thread does not run, they will go into the processing queue. |
| scheduleNextHealthCheck(); |
| } |
| |
| /** |
| * Upgrade finalization needs to move all nodes to a healthy readonly state |
| * when finalization finishes to make sure no nodes with metadata layout |
| * version older than SCM's are used in pipelines. Pipeline creation is |
| * still frozen at this point in the finalization flow. |
| * |
| * This method is synchronized to coordinate node state updates between |
| * the upgrade finalization thread which calls this method, and the |
| * node health processing thread that calls {@link this#checkNodesHealth}. |
| */ |
| public synchronized void forceNodesToHealthyReadOnly() { |
| try { |
| List<UUID> nodes = nodeStateMap.getNodes(null, HEALTHY); |
| for (UUID id : nodes) { |
| DatanodeInfo node = nodeStateMap.getNodeInfo(id); |
| nodeStateMap.updateNodeHealthState(node.getUuid(), |
| HEALTHY_READONLY); |
| if (state2EventMap.containsKey(HEALTHY_READONLY)) { |
| // At this point pipeline creation is already frozen and the node's |
| // state has been updated in nodeStateMap. This event should be a |
| // no-op aside from logging a message, so it is ok to complete |
| // asynchronously. |
| eventPublisher.fireEvent(state2EventMap.get(HEALTHY_READONLY), |
| node); |
| } |
| } |
| |
| } catch (NodeNotFoundException ex) { |
| LOG.error("Inconsistent NodeStateMap! {}", nodeStateMap); |
| } |
| } |
| |
| /** |
| * This method is synchronized to coordinate node state updates between |
| * the upgrade finalization thread which calls |
| * {@link this#forceNodesToHealthyReadOnly}, and the node health processing |
| * thread that calls this method. |
| */ |
| @VisibleForTesting |
| public synchronized void checkNodesHealth() { |
| |
| /* |
| * |
| * staleNodeDeadline healthyNodeDeadline |
| * | | |
| * Dead | Stale | Healthy |
| * Node | Node | Node |
| * Window | Window | Window |
| * ----------------+----------------------------------+-------------------> |
| * >>-->> time-line >>-->> |
| * |
| * Here is the logic of computing the health of a node. |
| * |
| * 1. We get the current time and look back that the time |
| *  when we got a heartbeat from a node. |
| * |
| * 2. If the last heartbeat was within the window of healthy node we mark |
| *  it as healthy. |
| * |
| * 3. If the last HB Time stamp is longer and falls within the window of |
| *  Stale Node time, we will mark it as Stale. |
| * |
| * 4. If the last HB time is older than the Stale Window, then the node is |
| * marked as dead. |
| * |
| * The Processing starts from current time and looks backwards in time. |
| */ |
| long processingStartTime = Time.monotonicNow(); |
| // After this time node is considered to be stale. |
| long healthyNodeDeadline = processingStartTime - staleNodeIntervalMs; |
| // After this time node is considered to be dead. |
| long staleNodeDeadline = processingStartTime - deadNodeIntervalMs; |
| |
| Predicate<Long> healthyNodeCondition = |
| (lastHbTime) -> lastHbTime >= healthyNodeDeadline; |
| // staleNodeCondition is superset of stale and dead node |
| Predicate<Long> staleNodeCondition = |
| (lastHbTime) -> lastHbTime < healthyNodeDeadline; |
| Predicate<Long> deadNodeCondition = |
| (lastHbTime) -> lastHbTime < staleNodeDeadline; |
| |
| try { |
| for (DatanodeInfo node : nodeStateMap.getAllDatanodeInfos()) { |
| NodeStatus status = nodeStateMap.getNodeStatus(node.getUuid()); |
| switch (status.getHealth()) { |
| case HEALTHY: |
| updateNodeLayoutVersionState(node, layoutMisMatchCondition, status, |
| NodeLifeCycleEvent.LAYOUT_MISMATCH); |
| // Move the node to STALE if the last heartbeat time is less than |
| // configured stale-node interval. |
| updateNodeState(node, staleNodeCondition, status, |
| NodeLifeCycleEvent.TIMEOUT); |
| break; |
| case HEALTHY_READONLY: |
| updateNodeLayoutVersionState(node, layoutMatchCondition, status, |
| NodeLifeCycleEvent.LAYOUT_MATCH); |
| // Move the node to STALE if the last heartbeat time is less than |
| // configured stale-node interval. |
| updateNodeState(node, staleNodeCondition, status, |
| NodeLifeCycleEvent.TIMEOUT); |
| break; |
| case STALE: |
| // Move the node to DEAD if the last heartbeat time is less than |
| // configured dead-node interval. |
| updateNodeState(node, deadNodeCondition, status, |
| NodeLifeCycleEvent.TIMEOUT); |
| // Restore the node if we have received heartbeat before configured |
| // stale-node interval. |
| updateNodeState(node, healthyNodeCondition, status, |
| NodeLifeCycleEvent.RESTORE); |
| break; |
| case DEAD: |
| // Resurrect the node if we have received heartbeat before |
| // configured stale-node interval. |
| updateNodeState(node, healthyNodeCondition, status, |
| NodeLifeCycleEvent.RESURRECT); |
| break; |
| default: |
| } |
| } |
| } catch (NodeNotFoundException e) { |
| // This should not happen unless someone else other than |
| // NodeStateManager is directly modifying NodeStateMap and removed |
| // the node entry after we got the list of UUIDs. |
| LOG.error("Inconsistent NodeStateMap! {}", nodeStateMap); |
| } |
| long processingEndTime = Time.monotonicNow(); |
| //If we have taken too much time for HB processing, log that information. |
| if ((processingEndTime - processingStartTime) > |
| heartbeatCheckerIntervalMs) { |
| LOG.error("Total time spend processing datanode HB's is greater than " + |
| "configured values for datanode heartbeats. Please adjust the" + |
| " heartbeat configs. Time Spend on HB processing: {} seconds " + |
| "Datanode heartbeat Interval: {} seconds.", |
| TimeUnit.MILLISECONDS |
| .toSeconds(processingEndTime - processingStartTime), |
| heartbeatCheckerIntervalMs); |
| } |
| |
| } |
| |
| private void scheduleNextHealthCheck() { |
| |
| if (!Thread.currentThread().isInterrupted() && |
| !executorService.isShutdown()) { |
| //BUGBUG: The return future needs to checked here to make sure the |
| // exceptions are handled correctly. |
| healthCheckFuture = executorService.schedule(this, |
| heartbeatCheckerIntervalMs, TimeUnit.MILLISECONDS); |
| } else { |
| LOG.warn("Current Thread is interrupted, shutting down HB processing " + |
| "thread for Node Manager."); |
| } |
| |
| lastHealthCheck = Time.monotonicNow(); |
| } |
| |
| /** |
| * if the time since last check exceeds the stale|dead node interval, skip. |
| * such long delays might be caused by a JVM pause. SCM cannot make reliable |
| * conclusions about datanode health in such situations. |
| * @return : true indicates skip HB checks |
| */ |
| private boolean shouldSkipCheck() { |
| |
| long currentTime = Time.monotonicNow(); |
| long minInterval = Math.min(staleNodeIntervalMs, deadNodeIntervalMs); |
| |
| return ((currentTime - lastHealthCheck) >= minInterval); |
| } |
| |
| /** |
| * Updates the node state if the condition satisfies. |
| * |
| * @param node DatanodeInfo |
| * @param condition condition to check |
| * @param status current status of node |
| * @param lifeCycleEvent NodeLifeCycleEvent to be applied if condition |
| * matches |
| * |
| * @throws NodeNotFoundException if the node is not present |
| */ |
| private void updateNodeState(DatanodeInfo node, Predicate<Long> condition, |
| NodeStatus status, NodeLifeCycleEvent lifeCycleEvent) |
| throws NodeNotFoundException { |
| try { |
| if (condition.test(node.getLastHeartbeatTime())) { |
| NodeState newHealthState = nodeHealthSM. |
| getNextState(status.getHealth(), lifeCycleEvent); |
| NodeStatus newStatus = |
| nodeStateMap.updateNodeHealthState(node.getUuid(), newHealthState); |
| fireHealthStateEvent(newStatus.getHealth(), node); |
| } |
| } catch (InvalidStateTransitionException e) { |
| LOG.warn("Invalid state transition of node {}." + |
| " Current state: {}, life cycle event: {}", |
| node, status.getHealth(), lifeCycleEvent); |
| } |
| } |
| |
| private void fireHealthStateEvent(HddsProtos.NodeState health, |
| DatanodeDetails node) { |
| Event<DatanodeDetails> event = state2EventMap.get(health); |
| if (event != null) { |
| eventPublisher.fireEvent(event, node); |
| } |
| } |
| |
| /** |
| * Updates the node state if the condition satisfies. |
| * |
| * @param node DatanodeInfo |
| * @param condition condition to check |
| * @param status current state of node |
| * @param lifeCycleEvent NodeLifeCycleEvent to be applied if condition |
| * matches |
| * |
| * @throws NodeNotFoundException if the node is not present |
| */ |
| private void updateNodeLayoutVersionState(DatanodeInfo node, |
| Predicate<LayoutVersionProto> condition, |
| NodeStatus status, |
| NodeLifeCycleEvent lifeCycleEvent) |
| throws NodeNotFoundException { |
| try { |
| if (condition.test(node.getLastKnownLayoutVersion())) { |
| NodeState newHealthState = nodeHealthSM.getNextState(status.getHealth(), |
| lifeCycleEvent); |
| NodeStatus newStatus = |
| nodeStateMap.updateNodeHealthState(node.getUuid(), newHealthState); |
| fireHealthStateEvent(newStatus.getHealth(), node); |
| } |
| } catch (InvalidStateTransitionException e) { |
| LOG.warn("Invalid state transition of node {}." + |
| " Current state: {}, life cycle event: {}", |
| node, status, lifeCycleEvent); |
| } |
| } |
| |
| @Override |
| public void close() { |
| executorService.shutdown(); |
| try { |
| if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { |
| executorService.shutdownNow(); |
| } |
| |
| if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { |
| LOG.error("Unable to shutdown NodeStateManager properly."); |
| } |
| } catch (InterruptedException e) { |
| executorService.shutdownNow(); |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| /** |
| * Test Utility : return number of times heartbeat check was skipped. |
| * @return : count of times HB process was skipped |
| */ |
| @VisibleForTesting |
| long getSkippedHealthChecks() { |
| return skippedHealthChecks; |
| } |
| |
| /** |
| * Test Utility : Pause the periodic node hb check. |
| * @return ScheduledFuture for the scheduled check that got cancelled. |
| */ |
| @VisibleForTesting |
| ScheduledFuture pause() { |
| |
| if (executorService.isShutdown() || checkPaused) { |
| return null; |
| } |
| |
| checkPaused = healthCheckFuture.cancel(false); |
| |
| return healthCheckFuture; |
| } |
| |
| /** |
| * Test utility : unpause the periodic node hb check. |
| * @return ScheduledFuture for the next scheduled check |
| */ |
| @VisibleForTesting |
| ScheduledFuture unpause() { |
| |
| if (executorService.isShutdown()) { |
| return null; |
| } |
| |
| if (checkPaused) { |
| Preconditions.checkState(((healthCheckFuture == null) |
| || healthCheckFuture.isCancelled() |
| || healthCheckFuture.isDone())); |
| |
| checkPaused = false; |
| /** |
| * We do not call scheduleNextHealthCheck because we are |
| * not updating the lastHealthCheck timestamp. |
| */ |
| healthCheckFuture = executorService.schedule(this, |
| heartbeatCheckerIntervalMs, TimeUnit.MILLISECONDS); |
| } |
| |
| return healthCheckFuture; |
| } |
| } |