blob: df19534bfcb3fbe96b5838e119efaed8fa31413b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app.rm.node;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.tez.dag.app.dag.DAG;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.rm.AMSchedulerEventNodeBlacklistUpdate;
import org.apache.tez.dag.app.rm.container.AMContainerEvent;
import org.apache.tez.dag.app.rm.container.AMContainerEventNodeFailed;
import org.apache.tez.dag.app.rm.container.AMContainerEventType;
import org.apache.tez.dag.records.TezTaskAttemptID;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
public class AMNodeImpl implements AMNode {
private static final Logger LOG = LoggerFactory.getLogger(AMNodeImpl.class);
private final ReadLock readLock;
private final WriteLock writeLock;
private final NodeId nodeId;
private final int schedulerId;
private final AppContext appContext;
private final int maxTaskFailuresPerNode;
private boolean blacklistingEnabled;
private boolean ignoreBlacklisting = false;
private boolean nodeUpdatesRescheduleEnabled;
private final Set<TezTaskAttemptID> failedAttemptIds = Sets.newHashSet();
@SuppressWarnings("rawtypes")
protected EventHandler eventHandler;
@VisibleForTesting
final Set<ContainerId> containers = new LinkedHashSet<>();
final Set<ContainerId> completedContainers = new HashSet<>();
int numFailedTAs = 0;
int numSuccessfulTAs = 0;
private static final ContainerCompletedTransition CONTAINER_COMPLETED_TRANSITION =
new ContainerCompletedTransition();
private final StateMachine<AMNodeState, AMNodeEventType, AMNodeEvent> stateMachine;
private static StateMachineFactory
<AMNodeImpl, AMNodeState, AMNodeEventType, AMNodeEvent>
stateMachineFactory =
new StateMachineFactory<AMNodeImpl, AMNodeState, AMNodeEventType, AMNodeEvent>(
AMNodeState.ACTIVE)
// Transitions from ACTIVE state.
.addTransition(AMNodeState.ACTIVE, AMNodeState.ACTIVE,
AMNodeEventType.N_CONTAINER_ALLOCATED,
new ContainerAllocatedTransition())
.addTransition(AMNodeState.ACTIVE, AMNodeState.ACTIVE,
AMNodeEventType.N_TA_SUCCEEDED, new TaskAttemptSucceededTransition())
.addTransition(AMNodeState.ACTIVE,
EnumSet.of(AMNodeState.ACTIVE, AMNodeState.BLACKLISTED),
AMNodeEventType.N_TA_ENDED, new TaskAttemptFailedTransition())
.addTransition(AMNodeState.ACTIVE, AMNodeState.UNHEALTHY,
AMNodeEventType.N_TURNED_UNHEALTHY,
new NodeTurnedUnhealthyTransition())
.addTransition(AMNodeState.ACTIVE,
EnumSet.of(AMNodeState.ACTIVE, AMNodeState.BLACKLISTED),
AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED,
new IgnoreBlacklistingDisabledTransition())
.addTransition(AMNodeState.ACTIVE, AMNodeState.FORCED_ACTIVE,
AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED,
new IgnoreBlacklistingStateChangeTransition(true))
.addTransition(AMNodeState.ACTIVE, AMNodeState.ACTIVE,
AMNodeEventType.N_TURNED_HEALTHY)
.addTransition(AMNodeState.ACTIVE, AMNodeState.ACTIVE,
AMNodeEventType.N_CONTAINER_COMPLETED, CONTAINER_COMPLETED_TRANSITION)
// Transitions from BLACKLISTED state.
.addTransition(AMNodeState.BLACKLISTED, AMNodeState.BLACKLISTED,
AMNodeEventType.N_CONTAINER_ALLOCATED,
new ContainerAllocatedWhileBlacklistedTransition())
.addTransition(AMNodeState.BLACKLISTED,
EnumSet.of(AMNodeState.BLACKLISTED, AMNodeState.ACTIVE),
AMNodeEventType.N_TA_SUCCEEDED,
new TaskAttemptSucceededWhileBlacklistedTransition())
.addTransition(AMNodeState.BLACKLISTED, AMNodeState.BLACKLISTED,
AMNodeEventType.N_TA_ENDED, new CountFailedTaskAttemptTransition())
.addTransition(AMNodeState.BLACKLISTED, AMNodeState.UNHEALTHY,
AMNodeEventType.N_TURNED_UNHEALTHY,
new NodeTurnedUnhealthyTransition())
.addTransition(AMNodeState.BLACKLISTED, AMNodeState.FORCED_ACTIVE,
AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED,
new IgnoreBlacklistingStateChangeTransition(true))
.addTransition(AMNodeState.BLACKLISTED, AMNodeState.BLACKLISTED,
AMNodeEventType.N_CONTAINER_COMPLETED, CONTAINER_COMPLETED_TRANSITION)
.addTransition(
AMNodeState.BLACKLISTED,
AMNodeState.BLACKLISTED,
EnumSet.of(AMNodeEventType.N_TURNED_HEALTHY,
AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED),
new GenericErrorTransition())
// Transitions from FORCED_ACTIVE state.
.addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.FORCED_ACTIVE,
AMNodeEventType.N_CONTAINER_ALLOCATED,
new ContainerAllocatedTransition())
.addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.FORCED_ACTIVE,
AMNodeEventType.N_TA_SUCCEEDED, new TaskAttemptSucceededTransition())
.addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.FORCED_ACTIVE,
AMNodeEventType.N_TA_ENDED, new CountFailedTaskAttemptTransition())
.addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.UNHEALTHY,
AMNodeEventType.N_TURNED_UNHEALTHY,
new NodeTurnedUnhealthyTransition())
.addTransition(AMNodeState.FORCED_ACTIVE,
EnumSet.of(AMNodeState.BLACKLISTED, AMNodeState.ACTIVE),
AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED,
new IgnoreBlacklistingDisabledTransition())
.addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.FORCED_ACTIVE,
AMNodeEventType.N_CONTAINER_COMPLETED, CONTAINER_COMPLETED_TRANSITION)
.addTransition(
AMNodeState.FORCED_ACTIVE,
AMNodeState.FORCED_ACTIVE,
EnumSet.of(AMNodeEventType.N_TURNED_HEALTHY,
AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED),
new GenericErrorTransition())
// Transitions from UNHEALTHY state.
.addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY,
AMNodeEventType.N_CONTAINER_ALLOCATED,
new ContainerAllocatedWhileUnhealthyTransition())
.addTransition(
AMNodeState.UNHEALTHY,
AMNodeState.UNHEALTHY,
EnumSet
.of(AMNodeEventType.N_TA_SUCCEEDED, AMNodeEventType.N_TA_ENDED))
.addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY,
AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED,
new IgnoreBlacklistingStateChangeTransition(false))
.addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY,
AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED,
new IgnoreBlacklistingStateChangeTransition(true))
.addTransition(AMNodeState.UNHEALTHY,
EnumSet.of(AMNodeState.ACTIVE, AMNodeState.FORCED_ACTIVE),
AMNodeEventType.N_TURNED_HEALTHY, new NodeTurnedHealthyTransition())
.addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY,
AMNodeEventType.N_CONTAINER_COMPLETED, CONTAINER_COMPLETED_TRANSITION)
.addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY,
AMNodeEventType.N_TURNED_UNHEALTHY, new GenericErrorTransition())
.installTopology();
@SuppressWarnings("rawtypes")
public AMNodeImpl(NodeId nodeId, int schedulerId, int maxTaskFailuresPerNode,
EventHandler eventHandler, boolean blacklistingEnabled,
boolean rescheduleOnUnhealthyNode, AppContext appContext) {
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
this.readLock = rwLock.readLock();
this.writeLock = rwLock.writeLock();
this.nodeId = nodeId;
this.schedulerId = schedulerId;
this.appContext = appContext;
this.eventHandler = eventHandler;
this.blacklistingEnabled = blacklistingEnabled;
this.nodeUpdatesRescheduleEnabled = rescheduleOnUnhealthyNode;
this.maxTaskFailuresPerNode = maxTaskFailuresPerNode;
this.stateMachine = stateMachineFactory.make(this);
// TODO Handle the case where a node is created due to the RM reporting it's
// state as UNHEALTHY
}
@Override
public NodeId getNodeId() {
return this.nodeId;
}
@Override
public AMNodeState getState() {
this.readLock.lock();
try {
return this.stateMachine.getCurrentState();
} finally {
this.readLock.unlock();
}
}
@Override
public List<ContainerId> getContainers() {
this.readLock.lock();
try {
List<ContainerId> cIds = new LinkedList<ContainerId>(this.containers);
return cIds;
} finally {
this.readLock.unlock();
}
}
@Override
public void handle(AMNodeEvent event) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing AMNodeEvent " + event.getNodeId()
+ " of type " + event.getType() + " while in state: " + getState()
+ ". Event: " + event);
}
this.writeLock.lock();
try {
final AMNodeState oldState = getState();
try {
stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitonException e) {
LOG.error("Can't handle event " + event.getType()
+ " at current state " + oldState + " for NodeId " + this.nodeId, e);
// TODO Should this fail the job ?
}
if (oldState != getState()) {
LOG.info("AMNode " + this.nodeId + " transitioned from " + oldState
+ " to " + getState());
}
} finally {
writeLock.unlock();
}
}
/* Check whether this node needs to be blacklisted based on node specific information */
protected boolean qualifiesForBlacklisting() {
return blacklistingEnabled && (numFailedTAs >= maxTaskFailuresPerNode);
}
/* Blacklist the node with the AMNodeTracker and check if the node should be blacklisted */
protected boolean registerBadNodeAndShouldBlacklist() {
return appContext.getNodeTracker().registerBadNodeAndShouldBlacklist(this, schedulerId);
}
protected void blacklistSelf() {
for (ContainerId c : containers) {
sendEvent(new AMContainerEventNodeFailed(c, "Node blacklisted"));
}
// these containers are not useful anymore
containers.clear();
sendEvent(new AMSchedulerEventNodeBlacklistUpdate(getNodeId(), true, schedulerId));
}
@SuppressWarnings("unchecked")
private void sendEvent(Event<?> event) {
this.eventHandler.handle(event);
}
//////////////////////////////////////////////////////////////////////////////
// Start of Transition Classes //
//////////////////////////////////////////////////////////////////////////////
protected static class ContainerAllocatedTransition implements
SingleArcTransition<AMNodeImpl, AMNodeEvent> {
@Override
public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
AMNodeEventContainerAllocated event = (AMNodeEventContainerAllocated) nEvent;
node.containers.add(event.getContainerId());
}
}
protected static class TaskAttemptSucceededTransition implements
SingleArcTransition<AMNodeImpl, AMNodeEvent> {
@Override
public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
node.numSuccessfulTAs++;
}
}
protected static class TaskAttemptFailedTransition implements
MultipleArcTransition<AMNodeImpl, AMNodeEvent, AMNodeState> {
@Override
public AMNodeState transition(AMNodeImpl node, AMNodeEvent nEvent) {
AMNodeEventTaskAttemptEnded event = (AMNodeEventTaskAttemptEnded) nEvent;
LOG.info("Attempt " + (event.failed() ? "failed" : "killed") + " on node: " + node.getNodeId()
+ " TA: " + event.getTaskAttemptId()
+ ", container: " + event.getContainerId() + ", numFailedTAs: "
+ node.numFailedTAs);
if (event.failed()) {
// ignore duplicate attempt ids
if (node.failedAttemptIds.add(event.getTaskAttemptId())) {
// new failed container on node
node.numFailedTAs++;
if (node.qualifiesForBlacklisting()) {
if (node.registerBadNodeAndShouldBlacklist()) {
LOG.info("Too many task attempt failures. " +
"Blacklisting node: " + node.getNodeId());
node.blacklistSelf();
return AMNodeState.BLACKLISTED;
} else {
// Stay in ACTIVE state. Move to FORCED_ACTIVE only when an explicit message is received.
}
}
}
}
return AMNodeState.ACTIVE;
}
}
// Forgetting about past errors. Will go back to ACTIVE, not FORCED_ACTIVE
protected static class NodeTurnedUnhealthyTransition implements
SingleArcTransition<AMNodeImpl, AMNodeEvent> {
@Override
public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
if (node.nodeUpdatesRescheduleEnabled) {
for (ContainerId c : node.containers) {
node.sendEvent(new AMContainerEventNodeFailed(c, "Node failed"));
}
// Resetting counters.
node.numFailedTAs = 0;
node.numSuccessfulTAs = 0;
}
}
}
protected static class IgnoreBlacklistingDisabledTransition implements
MultipleArcTransition<AMNodeImpl, AMNodeEvent, AMNodeState> {
@Override
public AMNodeState transition(AMNodeImpl node, AMNodeEvent nEvent) {
node.ignoreBlacklisting = false;
if (node.qualifiesForBlacklisting()) {
if (node.registerBadNodeAndShouldBlacklist()) {
LOG.info("Too many previous task failures after blacklisting re-enabled. " +
"Blacklisting node: " + node.getNodeId());
node.blacklistSelf();
return AMNodeState.BLACKLISTED;
} else {
// Stay in ACTIVE state. Move to FORCED_ACTIVE only when an explicit message is received.
}
}
return AMNodeState.ACTIVE;
}
}
protected static class IgnoreBlacklistingStateChangeTransition implements
SingleArcTransition<AMNodeImpl, AMNodeEvent> {
private boolean ignore;
public IgnoreBlacklistingStateChangeTransition(boolean ignore) {
this.ignore = ignore;
}
@Override
public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
node.ignoreBlacklisting = ignore;
if (node.getState() == AMNodeState.BLACKLISTED) {
node.sendEvent(new AMSchedulerEventNodeBlacklistUpdate(node.getNodeId(), false, node.schedulerId));
}
}
}
protected static class ContainerAllocatedWhileBlacklistedTransition implements
SingleArcTransition<AMNodeImpl, AMNodeEvent> {
@Override
public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
AMNodeEventContainerAllocated event = (AMNodeEventContainerAllocated) nEvent;
node.sendEvent(new AMContainerEvent(event.getContainerId(),
AMContainerEventType.C_STOP_REQUEST));
}
}
protected static class TaskAttemptSucceededWhileBlacklistedTransition
implements MultipleArcTransition<AMNodeImpl, AMNodeEvent, AMNodeState> {
@Override
public AMNodeState transition(AMNodeImpl node, AMNodeEvent nEvent) {
node.numSuccessfulTAs++;
return AMNodeState.BLACKLISTED;
// For now, always blacklisted. May change at a later point to re-enable
// the node.
}
}
protected static class CountFailedTaskAttemptTransition implements
SingleArcTransition<AMNodeImpl, AMNodeEvent> {
@Override
public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
AMNodeEventTaskAttemptEnded event = (AMNodeEventTaskAttemptEnded) nEvent;
if (event.failed())
node.numFailedTAs++;
}
}
protected static class GenericErrorTransition implements
SingleArcTransition<AMNodeImpl, AMNodeEvent> {
@Override
public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
LOG.warn("Invalid event: " + nEvent.getType() + " while in state: "
+ node.getState() + ". Ignoring." + " Event: " + nEvent);
}
}
protected static class ContainerAllocatedWhileUnhealthyTransition implements
SingleArcTransition<AMNodeImpl, AMNodeEvent> {
@Override
public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
AMNodeEventContainerAllocated event = (AMNodeEventContainerAllocated) nEvent;
LOG.info("Node: " + node.getNodeId()
+ " got allocated a contaienr with id: " + event.getContainerId()
+ " while in UNHEALTHY state. Releasing it.");
node.sendEvent(new AMContainerEventNodeFailed(event.getContainerId(),
"new container assigned on failed node " + node.getNodeId()));
}
}
protected static class NodeTurnedHealthyTransition implements
MultipleArcTransition<AMNodeImpl, AMNodeEvent, AMNodeState> {
@Override
public AMNodeState transition(AMNodeImpl node, AMNodeEvent nEvent) {
node.containers.clear();
if (node.ignoreBlacklisting) {
return AMNodeState.FORCED_ACTIVE;
} else {
return AMNodeState.ACTIVE;
}
}
}
protected static class ContainerCompletedTransition
implements SingleArcTransition<AMNodeImpl, AMNodeEvent> {
@Override
public void transition(AMNodeImpl amNode, AMNodeEvent amNodeEvent) {
AMNodeEventContainerCompleted cc =
(AMNodeEventContainerCompleted) amNodeEvent;
amNode.completedContainers.add(cc.getContainerId());
}
}
@Override
public boolean isUnhealthy() {
this.readLock.lock();
try {
return getState() == AMNodeState.UNHEALTHY;
} finally {
this.readLock.unlock();
}
}
@Override
public boolean isBlacklisted() {
this.readLock.lock();
try {
return getState() == AMNodeState.BLACKLISTED;
} finally {
this.readLock.unlock();
}
}
@Override
public boolean isUsable() {
return !(isUnhealthy() || isBlacklisted());
}
@Override
public void dagComplete(DAG dag) {
this.writeLock.lock();
try {
int countBefore = containers.size();
int countCompleted = completedContainers.size();
// Actual functionality.
containers.removeAll(completedContainers);
completedContainers.clear();
int countAfter = containers.size();
if (LOG.isDebugEnabled()) {
LOG.debug(
"Node {}, cleaning up knownContainers. current={}, completed={}, postCleanup={}",
getNodeId(), countBefore, countCompleted, countAfter);
}
} finally {
this.writeLock.unlock();
}
}
}