blob: 4731a249c515150d35ce2bdc44cce1fd5f520d78 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app.rm.container;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.dag.event.DiagnosableEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminating;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventNodeFailed;
import org.apache.tez.dag.app.rm.AMSchedulerEventContainerCompleted;
import org.apache.tez.dag.app.rm.AMSchedulerEventDeallocateContainer;
import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent;
import org.apache.tez.dag.records.TezTaskAttemptID;
//import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
import org.apache.tez.runtime.api.impl.TaskSpec;
@SuppressWarnings("rawtypes")
public class AMContainerImpl implements AMContainer {
private static final Log LOG = LogFactory.getLog(AMContainerImpl.class);
private final ReadLock readLock;
private final WriteLock writeLock;
private final ContainerId containerId;
// Container to be used for getters on capability, locality etc.
private final Container container;
private final AppContext appContext;
private final ContainerHeartbeatHandler containerHeartbeatHandler;
private final TaskAttemptListener taskAttemptListener;
protected final EventHandler eventHandler;
private final List<TezTaskAttemptID> completedAttempts = new LinkedList<TezTaskAttemptID>();
// TODO Maybe this should be pulled from the TaskAttempt.s
private final Map<TezTaskAttemptID, TaskSpec> remoteTaskMap =
new HashMap<TezTaskAttemptID, TaskSpec>();
// TODO ?? Convert to list and hash.
private long idleTimeBetweenTasks = 0;
private long lastTaskFinishTime;
// An assign can happen even during wind down. e.g. NodeFailure caused the
// wind down, and an allocation was pending in the AMScheduler. This could
// be modelled as a separate state.
private boolean nodeFailed = false;
private String nodeFailedMessage;
private TezTaskAttemptID pendingAttempt;
private TezTaskAttemptID runningAttempt;
private List<TezTaskAttemptID> failedAssignments;
private TezTaskAttemptID pullAttempt;
private AMContainerTask noAllocationContainerTask;
private static final AMContainerTask NO_MORE_TASKS = new AMContainerTask(
true, null);
private static final AMContainerTask WAIT_TASK = new AMContainerTask(false,
null);
private boolean inError = false;
private ContainerLaunchContext clc;
// TODO Consider registering with the TAL, instead of the TAL pulling.
// Possibly after splitting TAL and ContainerListener.
// TODO What should be done with pendingAttempts. Nullify when handled ?
// Add them to failed ta list ? Some historic information should be maintained.
// TODO Create a generic ERROR state. Container tries informing relevant components in this case.
private final StateMachine<AMContainerState, AMContainerEventType, AMContainerEvent> stateMachine;
private static final StateMachineFactory
<AMContainerImpl, AMContainerState, AMContainerEventType, AMContainerEvent>
stateMachineFactory =
new StateMachineFactory<AMContainerImpl, AMContainerState, AMContainerEventType, AMContainerEvent>(
AMContainerState.ALLOCATED)
.addTransition(AMContainerState.ALLOCATED, AMContainerState.LAUNCHING, AMContainerEventType.C_LAUNCH_REQUEST, new LaunchRequestTransition())
.addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_ASSIGN_TA, new AssignTaskAttemptAtAllocatedTransition())
.addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtAllocatedTransition())
.addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_STOP_REQUEST, new StopRequestAtAllocatedTransition())
.addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtAllocatedTransition())
.addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_PULL_TA, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT), new ErrorTransition())
.addTransition(AMContainerState.LAUNCHING, EnumSet.of(AMContainerState.LAUNCHING, AMContainerState.STOP_REQUESTED), AMContainerEventType.C_ASSIGN_TA, new AssignTaskAttemptTransition())
.addTransition(AMContainerState.LAUNCHING, AMContainerState.IDLE, AMContainerEventType.C_LAUNCHED, new LaunchedTransition())
.addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING, AMContainerEventType.C_LAUNCH_FAILED, new LaunchFailedTransition())
// TODO CREUSE : Maybe, consider sending back an attempt if the container asks for one in this state. Waiting for a LAUNCHED event from the NMComm may delay the task allocation.
.addTransition(AMContainerState.LAUNCHING, AMContainerState.LAUNCHING, AMContainerEventType.C_PULL_TA) // Is assuming the pullAttempt will be null.
.addTransition(AMContainerState.LAUNCHING, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtLaunchingTransition())
.addTransition(AMContainerState.LAUNCHING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST, new StopRequestAtLaunchingTransition())
.addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtLaunchingTransition())
.addTransition(AMContainerState.LAUNCHING, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT), new ErrorAtLaunchingTransition())
.addTransition(AMContainerState.IDLE, EnumSet.of(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED), AMContainerEventType.C_ASSIGN_TA, new AssignTaskAttemptAtIdleTransition())
.addTransition(AMContainerState.IDLE, EnumSet.of(AMContainerState.RUNNING, AMContainerState.IDLE), AMContainerEventType.C_PULL_TA, new PullTAAtIdleTransition())
.addTransition(AMContainerState.IDLE, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtIdleTransition())
.addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST, new StopRequestAtIdleTransition())
.addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_TIMED_OUT, new TimedOutAtIdleTransition())
.addTransition(AMContainerState.IDLE, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtIdleTransition())
.addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED), new ErrorAtIdleTransition())
.addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_ASSIGN_TA, new AssignTaskAttemptAtRunningTransition())
.addTransition(AMContainerState.RUNNING, AMContainerState.RUNNING, AMContainerEventType.C_PULL_TA)
.addTransition(AMContainerState.RUNNING, AMContainerState.IDLE, AMContainerEventType.C_TA_SUCCEEDED, new TASucceededAtRunningTransition())
.addTransition(AMContainerState.RUNNING, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtRunningTransition())
.addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST, new StopRequestAtRunningTransition())
.addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_TIMED_OUT, new TimedOutAtRunningTransition())
.addTransition(AMContainerState.RUNNING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtRunningTransition())
.addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED), new ErrorAtRunningTransition())
.addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_ASSIGN_TA, new AssignTAAtWindDownTransition())
.addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_PULL_TA, new PullTAAfterStopTransition())
.addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtWindDownTransition())
.addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOPPING, AMContainerEventType.C_NM_STOP_SENT)
.addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOPPING, AMContainerEventType.C_NM_STOP_FAILED, new NMStopRequestFailedTransition())
.addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedAtNMStopRequestedTransition())
.addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_TIMED_OUT))
.addTransition(AMContainerState.STOP_REQUESTED, AMContainerState.STOP_REQUESTED, AMContainerEventType.C_LAUNCH_REQUEST, new ErrorAtNMStopRequestedTransition())
.addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_ASSIGN_TA, new AssignTAAtWindDownTransition())
.addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_PULL_TA, new PullTAAfterStopTransition())
// TODO This transition is wrong. Should be a noop / error.
.addTransition(AMContainerState.STOPPING, AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, new CompletedAtWindDownTransition())
.addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, new NodeFailedBaseTransition())
.addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, EnumSet.of(AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT))
.addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, AMContainerEventType.C_LAUNCH_REQUEST, new ErrorAtStoppingTransition())
.addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_ASSIGN_TA, new AssignTAAtCompletedTransition())
.addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_PULL_TA, new PullTAAfterStopTransition())
.addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, AMContainerEventType.C_NODE_FAILED, new NodeFailedBaseTransition())
.addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, AMContainerEventType.C_LAUNCHED, AMContainerEventType.C_LAUNCH_FAILED, AMContainerEventType.C_TA_SUCCEEDED, AMContainerEventType.C_COMPLETED, AMContainerEventType.C_STOP_REQUEST, AMContainerEventType.C_NM_STOP_SENT, AMContainerEventType.C_NM_STOP_FAILED, AMContainerEventType.C_TIMED_OUT))
.installTopology();
// Note: Containers will not reach their final state if the RM link is broken,
// AM shutdown should not wait for this.
// Attempting to use a container based purely on reosurces required, etc needs
// additional change - JvmID, YarnChild, etc depend on TaskType.
public AMContainerImpl(Container container, ContainerHeartbeatHandler chh,
TaskAttemptListener tal, AppContext appContext) {
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
this.readLock = rwLock.readLock();
this.writeLock = rwLock.writeLock();
this.container = container;
this.containerId = container.getId();
this.eventHandler = appContext.getEventHandler();
this.appContext = appContext;
this.containerHeartbeatHandler = chh;
this.taskAttemptListener = tal;
this.failedAssignments = new LinkedList<TezTaskAttemptID>();
this.noAllocationContainerTask = WAIT_TASK;
this.stateMachine = stateMachineFactory.make(this);
}
@Override
public AMContainerState getState() {
readLock.lock();
try {
return stateMachine.getCurrentState();
} finally {
readLock.unlock();
}
}
@Override
public ContainerId getContainerId() {
return this.containerId;
}
@Override
public Container getContainer() {
return this.container;
}
@Override
public List<TezTaskAttemptID> getAllTaskAttempts() {
readLock.lock();
try {
List<TezTaskAttemptID> allAttempts = new LinkedList<TezTaskAttemptID>();
allAttempts.addAll(this.completedAttempts);
allAttempts.addAll(this.failedAssignments);
if (this.pendingAttempt != null) {
allAttempts.add(this.pendingAttempt);
}
if (this.runningAttempt != null) {
allAttempts.add(this.runningAttempt);
}
return allAttempts;
} finally {
readLock.unlock();
}
}
@Override
public List<TezTaskAttemptID> getQueuedTaskAttempts() {
readLock.lock();
try {
if (pendingAttempt != null) {
return Collections.singletonList(this.pendingAttempt);
} else {
return Collections.emptyList();
}
} finally {
readLock.unlock();
}
}
@Override
public TezTaskAttemptID getRunningTaskAttempt() {
readLock.lock();
try {
return this.runningAttempt;
} finally {
readLock.unlock();
}
}
public boolean isInErrorState() {
return inError;
}
@Override
public void handle(AMContainerEvent event) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing AMContainerEvent " + event.getContainerId()
+ " of type " + event.getType() + " while in state: " + getState()
+ ". Event: " + event);
}
this.writeLock.lock();
try {
final AMContainerState oldState = getState();
try {
stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitonException e) {
LOG.error("Can't handle event " + event.getType()
+ " at current state " + oldState + " for ContainerId "
+ this.containerId, e);
inError = true;
// TODO Can't set state to COMPLETED. Add a default error state.
}
if (oldState != getState()) {
LOG.info("AMContainer " + this.containerId + " transitioned from "
+ oldState + " to " + getState());
}
} finally {
writeLock.unlock();
}
}
@SuppressWarnings("unchecked")
private void sendEvent(Event<?> event) {
this.eventHandler.handle(event);
}
// Push the TaskAttempt to the TAL, instead of the TAL pulling when a JVM asks
// for a TaskAttempt.
public AMContainerTask pullTaskContext() {
this.writeLock.lock();
try {
this.handle(
new AMContainerEvent(containerId, AMContainerEventType.C_PULL_TA));
if (pullAttempt == null) {
return noAllocationContainerTask;
} else {
return new AMContainerTask(false, remoteTaskMap.remove(pullAttempt));
}
} finally {
this.pullAttempt = null;
this.writeLock.unlock();
}
}
//////////////////////////////////////////////////////////////////////////////
// Start of Transition Classes //
//////////////////////////////////////////////////////////////////////////////
protected static class LaunchRequestTransition implements
SingleArcTransition<AMContainerImpl, AMContainerEvent> {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
AMContainerEventLaunchRequest event = (AMContainerEventLaunchRequest) cEvent;
ContainerContext containerContext = event.getContainerContext();
container.clc = AMContainerHelpers.createContainerLaunchContext(
container.appContext.getApplicationACLs(),
container.getContainerId(),
containerContext.getLocalResources(),
containerContext.getEnvironment(),
containerContext.getJavaOpts(),
container.taskAttemptListener, containerContext.getCredentials(),
event.shouldProfile(), container.appContext);
// Registering now, so that in case of delayed NM response, the child
// task is not told to die since the TAL does not know about the container.
container.registerWithTAListener();
container.sendStartRequestToNM();
LOG.info("Sending Launch Request for Container with id: " +
container.container.getId());
// Forget about the clc to save resources. At some point, part of the clc
// info may need to be exposed to the scheduler to figure out whether a
// container can be used for a specific TaskAttempt.
container.clc = null;
}
}
protected static class AssignTaskAttemptAtAllocatedTransition implements
SingleArcTransition<AMContainerImpl, AMContainerEvent> {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
container.inError = true;
container.registerFailedAttempt(event.getTaskAttemptId());
container.maybeSendNodeFailureForFailedAssignment(event
.getTaskAttemptId());
container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
"AMScheduler Error: TaskAttempt allocated to unlaunched container: " +
container.getContainerId());
container.sendCompletedToScheduler();
container.deAllocate();
LOG.warn("Unexpected TA Assignment: TAId: " + event.getTaskAttemptId() +
" for ContainerId: " + container.getContainerId() +
" while in state: " + container.getState());
}
}
protected static class CompletedAtAllocatedTransition implements
SingleArcTransition<AMContainerImpl, AMContainerEvent> {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
AMContainerEventCompleted event = (AMContainerEventCompleted)cEvent;
container.sendCompletedToScheduler();
String diag = event.getContainerStatus().getDiagnostics();
if (!(diag == null || diag.equals(""))) {
LOG.info("Container " + container.getContainerId()
+ " exited with diagnostics set to " + diag);
}
}
}
protected static class StopRequestAtAllocatedTransition implements
SingleArcTransition<AMContainerImpl, AMContainerEvent> {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
container.nodeFailed = true;
if (cEvent instanceof DiagnosableEvent) {
container.nodeFailedMessage = ((DiagnosableEvent) cEvent)
.getDiagnosticInfo();
}
// TODO why are these sent. no need to send these now.
container.sendCompletedToScheduler();
container.deAllocate();
}
}
protected static class NodeFailedAtAllocatedTransition extends
NodeFailedBaseTransition {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
super.transition(container, cEvent);
container.sendCompletedToScheduler();
container.deAllocate();
}
}
protected static class ErrorTransition extends ErrorBaseTransition {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
super.transition(container, cEvent);
container.sendCompletedToScheduler();
container.deAllocate();
LOG.info(
"Unexpected event type: " + cEvent.getType() + " while in state: " +
container.getState() + ". Event: " + cEvent);
}
}
protected static class AssignTaskAttemptTransition implements
MultipleArcTransition<AMContainerImpl, AMContainerEvent, AMContainerState> {
@Override
public AMContainerState transition(
AMContainerImpl container, AMContainerEvent cEvent) {
AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
if (container.pendingAttempt != null) {
// This may include a couple of additional (harmless) unregister calls
// to the taskAttemptListener and containerHeartbeatHandler - in case
// of assign at any state prior to IDLE.
container.handleExtraTAAssign(event, container.pendingAttempt);
// TODO XXX: Verify that it's ok to send in a NM_STOP_REQUEST. The
// NMCommunicator should be able to handle this. The STOP_REQUEST would
// only go out after the START_REQUEST.
return AMContainerState.STOP_REQUESTED;
}
container.pendingAttempt = event.getTaskAttemptId();
if (LOG.isDebugEnabled()) {
LOG.debug("AssignTA: attempt: " + event.getRemoteTaskSpec());
}
container.remoteTaskMap
.put(event.getTaskAttemptId(), event.getRemoteTaskSpec());
return container.getState();
}
}
protected static class LaunchedTransition implements
SingleArcTransition<AMContainerImpl, AMContainerEvent> {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
container.registerWithContainerListener();
}
}
protected static class LaunchFailedTransition implements
SingleArcTransition<AMContainerImpl, AMContainerEvent> {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
if (container.pendingAttempt != null) {
AMContainerEventLaunchFailed event = (AMContainerEventLaunchFailed) cEvent;
container.sendTerminatingToTaskAttempt(container.pendingAttempt,
event.getMessage());
}
container.unregisterFromTAListener();
container.deAllocate();
}
}
protected static class CompletedAtLaunchingTransition
implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
if (container.pendingAttempt != null) {
String errorMessage = getMessage(container, event);
container.sendTerminatedToTaskAttempt(container.pendingAttempt,
errorMessage);
container.registerFailedAttempt(container.pendingAttempt);
container.pendingAttempt = null;
LOG.warn(errorMessage);
}
container.unregisterFromTAListener();
container.sendCompletedToScheduler();
String diag = event.getContainerStatus().getDiagnostics();
if (!(diag == null || diag.equals(""))) {
LOG.info("Container " + container.getContainerId()
+ " exited with diagnostics set to " + diag);
}
}
public String getMessage(AMContainerImpl container,
AMContainerEventCompleted event) {
return "Container" + container.getContainerId()
+ " COMPLETED while trying to launch. Diagnostics: ["
+ event.getContainerStatus().getDiagnostics() +"]";
}
}
protected static class StopRequestAtLaunchingTransition
implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
if (container.pendingAttempt != null) {
container.sendTerminatingToTaskAttempt(container.pendingAttempt,
getMessage(container, cEvent));
}
container.unregisterFromTAListener();
container.sendStopRequestToNM();
}
public String getMessage(
AMContainerImpl container, AMContainerEvent event) {
return "Container " + container.getContainerId() +
" received a STOP_REQUEST";
}
}
protected static class NodeFailedBaseTransition
implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
container.nodeFailed = true;
String errorMessage = null;
if (cEvent instanceof DiagnosableEvent) {
errorMessage = ((DiagnosableEvent) cEvent).getDiagnosticInfo();
}
for (TezTaskAttemptID taId : container.failedAssignments) {
container.sendNodeFailureToTA(taId, errorMessage);
}
for (TezTaskAttemptID taId : container.completedAttempts) {
// TODO XXX: Make sure TaskAttempt knows how to handle kills to REDUCEs.
container.sendNodeFailureToTA(taId, errorMessage);
}
if (container.pendingAttempt != null) {
// Will be null in COMPLETED state.
container.sendNodeFailureToTA(container.pendingAttempt, errorMessage);
container.sendTerminatingToTaskAttempt(container.pendingAttempt, "Node failure");
}
if (container.runningAttempt != null) {
// Will be null in COMPLETED state.
container.sendNodeFailureToTA(container.runningAttempt, errorMessage);
container.sendTerminatingToTaskAttempt(container.runningAttempt, "Node failure");
}
}
}
protected static class NodeFailedAtLaunchingTransition
extends NodeFailedBaseTransition {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
super.transition(container, cEvent);
container.unregisterFromTAListener();
container.deAllocate();
}
}
protected static class ErrorAtLaunchingTransition
extends ErrorBaseTransition {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
super.transition(container, cEvent);
if (container.pendingAttempt != null) {
container.sendTerminatingToTaskAttempt(container.pendingAttempt,
"Container " + container.getContainerId() +
" hit an invalid transition - " + cEvent.getType() + " at " +
container.getState());
}
container.sendStopRequestToNM();
container.unregisterFromTAListener();
}
}
protected static class AssignTaskAttemptAtIdleTransition
extends AssignTaskAttemptTransition {
@Override
public AMContainerState transition(
AMContainerImpl container, AMContainerEvent cEvent) {
if (LOG.isDebugEnabled()) {
LOG.debug("AssignTAAtIdle: attempt: " +
((AMContainerEventAssignTA) cEvent).getRemoteTaskSpec());
}
return super.transition(container, cEvent);
}
}
protected static class PullTAAtIdleTransition implements
MultipleArcTransition<AMContainerImpl, AMContainerEvent, AMContainerState> {
@Override
public AMContainerState transition(
AMContainerImpl container, AMContainerEvent cEvent) {
if (container.pendingAttempt != null) {
// This will be invoked as part of the PULL_REQUEST - so pullAttempt pullAttempt
// should ideally only end up being populated during the duration of this call,
// which is in a write lock. pullRequest() should move this to the running state.
container.pullAttempt = container.pendingAttempt;
container.runningAttempt = container.pendingAttempt;
container.pendingAttempt = null;
if (container.lastTaskFinishTime != 0) {
long idleTimeDiff =
System.currentTimeMillis() - container.lastTaskFinishTime;
container.idleTimeBetweenTasks += idleTimeDiff;
if (LOG.isDebugEnabled()) {
LOG.debug("Computing idle time for container: " +
container.getContainerId() + ", lastFinishTime: " +
container.lastTaskFinishTime + ", Incremented by: " +
idleTimeDiff);
}
}
LOG.info("Assigned taskAttempt + [" + container.runningAttempt +
"] to container: [" + container.getContainerId() + "]");
return AMContainerState.RUNNING;
} else {
return AMContainerState.IDLE;
}
}
}
protected static class CompletedAtIdleTransition
extends CompletedAtLaunchingTransition {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
super.transition(container, cEvent);
container.unregisterFromContainerListener();
if (LOG.isDebugEnabled()) {
LOG.debug("TotalIdleTimeBetweenTasks for container: "
+ container.getContainerId() + " = "
+ container.idleTimeBetweenTasks);
}
}
@Override
public String getMessage(
AMContainerImpl container, AMContainerEventCompleted event) {
return "Container " + container.getContainerId() + " COMPLETED"
+ " with diagnostics set to ["
+ event.getContainerStatus().getDiagnostics() + "]";
}
}
protected static class StopRequestAtIdleTransition
extends StopRequestAtLaunchingTransition {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
super.transition(container, cEvent);
container.unregisterFromContainerListener();
}
}
protected static class TimedOutAtIdleTransition
extends StopRequestAtIdleTransition {
public String getMessage(
AMContainerImpl container, AMContainerEvent event) {
return "Container " + container.getContainerId() +
" timed out";
}
}
protected static class NodeFailedAtIdleTransition
extends NodeFailedAtLaunchingTransition {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
super.transition(container, cEvent);
container.unregisterFromContainerListener();
}
}
protected static class ErrorAtIdleTransition
extends ErrorAtLaunchingTransition {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
super.transition(container, cEvent);
container.unregisterFromContainerListener();
}
}
protected static class AssignTaskAttemptAtRunningTransition implements
SingleArcTransition<AMContainerImpl, AMContainerEvent> {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
container.unregisterAttemptFromListener(container.runningAttempt);
container.handleExtraTAAssign(event, container.runningAttempt);
}
}
protected static class TASucceededAtRunningTransition
implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
container.lastTaskFinishTime = System.currentTimeMillis();
container.completedAttempts.add(container.runningAttempt);
container.unregisterAttemptFromListener(container.runningAttempt);
container.runningAttempt = null;
}
}
protected static class CompletedAtRunningTransition
extends CompletedAtIdleTransition {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
container.sendTerminatedToTaskAttempt(container.runningAttempt,
getMessage(container, event));
container.unregisterAttemptFromListener(container.runningAttempt);
container.registerFailedAttempt(container.runningAttempt);
container.runningAttempt = null;
super.transition(container, cEvent);
}
}
protected static class StopRequestAtRunningTransition
extends StopRequestAtIdleTransition {
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
container.unregisterAttemptFromListener(container.runningAttempt);
container.sendTerminatingToTaskAttempt(container.runningAttempt,
" Container" + container.getContainerId() +
" received a STOP_REQUEST");
super.transition(container, cEvent);
}
}
protected static class TimedOutAtRunningTransition
extends StopRequestAtRunningTransition {
@Override
public String getMessage(
AMContainerImpl container, AMContainerEvent event) {
return "Container " + container.getContainerId() +
" timed out";
}
}
protected static class NodeFailedAtRunningTransition
extends NodeFailedAtIdleTransition {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
super.transition(container, cEvent);
container.unregisterAttemptFromListener(container.runningAttempt);
}
}
protected static class ErrorAtRunningTransition
extends ErrorAtIdleTransition {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
super.transition(container, cEvent);
container.unregisterAttemptFromListener(container.runningAttempt);
container.sendTerminatingToTaskAttempt(container.runningAttempt,
"Container " + container.getContainerId() +
" hit an invalid transition - " + cEvent.getType() + " at " +
container.getState());
}
}
protected static class AssignTAAtWindDownTransition
implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
container.inError = true;
String errorMessage = "AttemptId: " + event.getTaskAttemptId() +
" cannot be allocated to container: " + container.getContainerId() +
" in " + container.getState() + " state";
container.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId());
container.sendTerminatingToTaskAttempt(event.getTaskAttemptId(), errorMessage);
container.registerFailedAttempt(event.getTaskAttemptId());
}
}
// Hack to some extent. This allocation should be done while entering one of
// the post-running states, insetad of being a transition on the post stop
// states.
protected static class PullTAAfterStopTransition
implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
container.noAllocationContainerTask = NO_MORE_TASKS;
}
}
protected static class CompletedAtWindDownTransition implements
SingleArcTransition<AMContainerImpl, AMContainerEvent> {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
String diag = event.getContainerStatus().getDiagnostics();
for (TezTaskAttemptID taId : container.failedAssignments) {
container.sendTerminatedToTaskAttempt(taId, diag);
}
if (container.pendingAttempt != null) {
container.sendTerminatedToTaskAttempt(container.pendingAttempt, diag);
container.registerFailedAttempt(container.pendingAttempt);
container.pendingAttempt = null;
}
if (container.runningAttempt != null) {
container.sendTerminatedToTaskAttempt(container.runningAttempt, diag);
container.registerFailedAttempt(container.runningAttempt);
container.runningAttempt = null;
}
if (!(diag == null || diag.equals(""))) {
LOG.info("Container " + container.getContainerId()
+ " exited with diagnostics set to " + diag);
}
container.sendCompletedToScheduler();
}
}
protected static class NMStopRequestFailedTransition
implements SingleArcTransition<AMContainerImpl, AMContainerEvent> {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
container.deAllocate();
}
}
protected static class NodeFailedAtNMStopRequestedTransition
extends NodeFailedBaseTransition {
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
super.transition(container, cEvent);
container.deAllocate();
}
}
protected static class ErrorAtNMStopRequestedTransition
extends ErrorBaseTransition {
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
super.transition(container, cEvent);
}
}
protected static class ErrorAtStoppingTransition
extends ErrorBaseTransition {
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
super.transition(container, cEvent);
container.sendCompletedToScheduler();
}
}
protected static class ErrorBaseTransition implements
SingleArcTransition<AMContainerImpl, AMContainerEvent> {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
container.inError = true;
}
}
protected static class AssignTAAtCompletedTransition implements
SingleArcTransition<AMContainerImpl, AMContainerEvent> {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
// TODO CREUSE CRITICAL: This is completely incorrect. COMPLETED comes
// from RMComm directly to the container. Meanwhile, the scheduler may
// think the container is still around and assign a task to it. The task
// ends up getting a CONTAINER_KILLED message. Task could handle this by
// asking for a reschedule in this case. Will end up FAILING the task instead of KILLING it.
container.inError = true;
AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
String errorMessage = "AttemptId: " + event.getTaskAttemptId()
+ " cannot be allocated to container: " + container.getContainerId()
+ " in COMPLETED state";
container.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId());
container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
errorMessage);
container.registerFailedAttempt(event.getTaskAttemptId());
}
}
private void handleExtraTAAssign(
AMContainerEventAssignTA event, TezTaskAttemptID currentTaId) {
this.inError = true;
String errorMessage = "AMScheduler Error: Multiple simultaneous " +
"taskAttempt allocations to: " + this.getContainerId() +
". Attempts: " + currentTaId + ", " + event.getTaskAttemptId() +
". Current state: " + this.getState();
this.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId());
this.sendTerminatingToTaskAttempt(event.getTaskAttemptId(), errorMessage);
this.sendTerminatingToTaskAttempt(currentTaId, errorMessage);
this.registerFailedAttempt(event.getTaskAttemptId());
LOG.warn(errorMessage);
this.sendStopRequestToNM();
this.unregisterFromTAListener();
this.unregisterFromContainerListener();
}
protected void registerFailedAttempt(TezTaskAttemptID taId) {
failedAssignments.add(taId);
}
protected void deAllocate() {
sendEvent(new AMSchedulerEventDeallocateContainer(containerId));
}
protected void sendCompletedToScheduler() {
sendEvent(new AMSchedulerEventContainerCompleted(containerId));
}
protected void sendTerminatedToTaskAttempt(
TezTaskAttemptID taId, String message) {
sendEvent(new TaskAttemptEventContainerTerminated(taId, message));
}
protected void sendTerminatingToTaskAttempt(TezTaskAttemptID taId,
String message) {
sendEvent(new TaskAttemptEventContainerTerminating(taId, message));
}
protected void maybeSendNodeFailureForFailedAssignment(TezTaskAttemptID taId) {
if (this.nodeFailed) {
this.sendNodeFailureToTA(taId, nodeFailedMessage);
}
}
protected void sendNodeFailureToTA(TezTaskAttemptID taId, String message) {
sendEvent(new TaskAttemptEventNodeFailed(taId, message));
}
protected void sendStartRequestToNM() {
sendEvent(new NMCommunicatorLaunchRequestEvent(clc, container));
}
protected void sendStopRequestToNM() {
sendEvent(new NMCommunicatorStopRequestEvent(containerId,
container.getNodeId(), container.getContainerToken()));
}
protected void unregisterAttemptFromListener(TezTaskAttemptID attemptId) {
taskAttemptListener.unregisterTaskAttempt(attemptId);
}
protected void registerWithTAListener() {
taskAttemptListener.registerRunningContainer(containerId);
}
protected void unregisterFromTAListener() {
this.taskAttemptListener.unregisterRunningContainer(containerId);
}
protected void registerWithContainerListener() {
this.containerHeartbeatHandler.register(this.containerId);
}
protected void unregisterFromContainerListener() {
this.containerHeartbeatHandler.unregister(this.containerId);
}
}