| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tez.dag.app.rm.container; |
| |
| import java.util.Collections; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.ContainerExitStatus; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; |
| import org.apache.hadoop.yarn.api.records.LocalResource; |
| import org.apache.hadoop.yarn.event.Event; |
| import org.apache.hadoop.yarn.event.EventHandler; |
| import org.apache.hadoop.yarn.state.InvalidStateTransitonException; |
| import org.apache.hadoop.yarn.state.MultipleArcTransition; |
| import org.apache.hadoop.yarn.state.SingleArcTransition; |
| import org.apache.hadoop.yarn.state.StateMachine; |
| import org.apache.hadoop.yarn.state.StateMachineFactory; |
| import org.apache.hadoop.yarn.util.Clock; |
| import org.apache.tez.dag.app.AppContext; |
| import org.apache.tez.dag.app.ContainerHeartbeatHandler; |
| import org.apache.tez.dag.app.ContainerContext; |
| import org.apache.tez.dag.app.TaskAttemptListener; |
| import org.apache.tez.dag.app.dag.event.DiagnosableEvent; |
| import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminatedBySystem; |
| import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated; |
| import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminating; |
| import org.apache.tez.dag.app.dag.event.TaskAttemptEventNodeFailed; |
| import org.apache.tez.dag.app.rm.AMSchedulerEventDeallocateContainer; |
| import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent; |
| import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent; |
| import org.apache.tez.dag.history.DAGHistoryEvent; |
| import org.apache.tez.dag.history.HistoryEventHandler; |
| import org.apache.tez.dag.history.events.ContainerStoppedEvent; |
| import org.apache.tez.dag.records.TaskAttemptTerminationCause; |
| import org.apache.tez.dag.records.TezDAGID; |
| import org.apache.tez.dag.records.TezTaskAttemptID; |
| //import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate; |
| import org.apache.tez.runtime.api.impl.TaskSpec; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| |
| @SuppressWarnings("rawtypes") |
| public class AMContainerImpl implements AMContainer { |
| |
| private static final Log LOG = LogFactory.getLog(AMContainerImpl.class); |
| |
| private final ReadLock readLock; |
| private final WriteLock writeLock; |
| private final ContainerId containerId; |
| // Container to be used for getters on capability, locality etc. |
| private final Container container; |
| private final AppContext appContext; |
| private final ContainerHeartbeatHandler containerHeartbeatHandler; |
| private final TaskAttemptListener taskAttemptListener; |
| protected final EventHandler eventHandler; |
| private final ContainerSignatureMatcher signatureMatcher; |
| |
| private final List<TezTaskAttemptID> completedAttempts = |
| new LinkedList<TezTaskAttemptID>(); |
| |
| // TODO Maybe this should be pulled from the TaskAttempt.s |
| private final Map<TezTaskAttemptID, TaskSpec> remoteTaskMap = |
| new HashMap<TezTaskAttemptID, TaskSpec>(); |
| |
| // TODO ?? Convert to list and hash. |
| |
| private long idleTimeBetweenTasks = 0; |
| private long lastTaskFinishTime; |
| |
| private TezDAGID lastTaskDAGID; |
| |
| // An assign can happen even during wind down. e.g. NodeFailure caused the |
| // wind down, and an allocation was pending in the AMScheduler. This could |
| // be modelled as a separate state. |
| private boolean nodeFailed = false; |
| |
| private TezTaskAttemptID pendingAttempt; |
| private TezTaskAttemptID runningAttempt; |
| private List<TezTaskAttemptID> failedAssignments; |
| private TezTaskAttemptID pullAttempt; |
| |
| private AMContainerTask noAllocationContainerTask; |
| |
| private static final AMContainerTask NO_MORE_TASKS = new AMContainerTask( |
| true, null, null, null, false); |
| private static final AMContainerTask WAIT_TASK = new AMContainerTask(false, |
| null, null, null, false); |
| |
| private boolean inError = false; |
| |
| @VisibleForTesting |
| Map<String, LocalResource> containerLocalResources; |
| @VisibleForTesting |
| Map<String, LocalResource> additionalLocalResources; |
| |
| private Credentials credentials; |
| private boolean credentialsChanged = false; |
| |
| // TODO Consider registering with the TAL, instead of the TAL pulling. |
| // Possibly after splitting TAL and ContainerListener. |
| |
| // TODO What should be done with pendingAttempts. Nullify when handled ? |
| // Add them to failed ta list ? Some historic information should be maintained. |
| |
| // TODO Create a generic ERROR state. Container tries informing relevant components in this case. |
| |
| |
| private final StateMachine<AMContainerState, AMContainerEventType, AMContainerEvent> stateMachine; |
| private static final StateMachineFactory |
| <AMContainerImpl, AMContainerState, AMContainerEventType, AMContainerEvent> |
| stateMachineFactory = |
| new StateMachineFactory<AMContainerImpl, AMContainerState, AMContainerEventType, AMContainerEvent>( |
| AMContainerState.ALLOCATED) |
| |
| .addTransition(AMContainerState.ALLOCATED, AMContainerState.LAUNCHING, |
| AMContainerEventType.C_LAUNCH_REQUEST, new LaunchRequestTransition()) |
| .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, |
| AMContainerEventType.C_ASSIGN_TA, |
| new AssignTaskAttemptAtAllocatedTransition()) |
| .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, |
| AMContainerEventType.C_COMPLETED, |
| new CompletedAtAllocatedTransition()) |
| .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, |
| AMContainerEventType.C_STOP_REQUEST, |
| new StopRequestAtAllocatedTransition()) |
| .addTransition(AMContainerState.ALLOCATED, AMContainerState.COMPLETED, |
| AMContainerEventType.C_NODE_FAILED, |
| new NodeFailedAtAllocatedTransition()) |
| .addTransition( |
| AMContainerState.ALLOCATED, |
| AMContainerState.COMPLETED, |
| EnumSet.of(AMContainerEventType.C_LAUNCHED, |
| AMContainerEventType.C_LAUNCH_FAILED, |
| AMContainerEventType.C_PULL_TA, |
| AMContainerEventType.C_TA_SUCCEEDED, |
| AMContainerEventType.C_NM_STOP_SENT, |
| AMContainerEventType.C_NM_STOP_FAILED, |
| AMContainerEventType.C_TIMED_OUT), new ErrorTransition()) |
| |
| .addTransition( |
| AMContainerState.LAUNCHING, |
| EnumSet.of(AMContainerState.LAUNCHING, |
| AMContainerState.STOP_REQUESTED), |
| AMContainerEventType.C_ASSIGN_TA, new AssignTaskAttemptTransition()) |
| .addTransition(AMContainerState.LAUNCHING, AMContainerState.IDLE, |
| AMContainerEventType.C_LAUNCHED, new LaunchedTransition()) |
| .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING, |
| AMContainerEventType.C_LAUNCH_FAILED, new LaunchFailedTransition()) |
| // TODO CREUSE : Maybe, consider sending back an attempt if the container |
| // asks for one in this state. Waiting for a LAUNCHED event from the |
| // NMComm may delay the task allocation. |
| .addTransition(AMContainerState.LAUNCHING, AMContainerState.LAUNCHING, |
| AMContainerEventType.C_PULL_TA) |
| // Is assuming the pullAttempt will be null. |
| .addTransition(AMContainerState.LAUNCHING, AMContainerState.COMPLETED, |
| AMContainerEventType.C_COMPLETED, |
| new CompletedAtLaunchingTransition()) |
| .addTransition(AMContainerState.LAUNCHING, |
| AMContainerState.STOP_REQUESTED, AMContainerEventType.C_STOP_REQUEST, |
| new StopRequestAtLaunchingTransition()) |
| .addTransition(AMContainerState.LAUNCHING, AMContainerState.STOPPING, |
| AMContainerEventType.C_NODE_FAILED, |
| new NodeFailedAtLaunchingTransition()) |
| .addTransition( |
| AMContainerState.LAUNCHING, |
| AMContainerState.STOP_REQUESTED, |
| EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, |
| AMContainerEventType.C_TA_SUCCEEDED, |
| AMContainerEventType.C_NM_STOP_SENT, |
| AMContainerEventType.C_NM_STOP_FAILED, |
| AMContainerEventType.C_TIMED_OUT), |
| new ErrorAtLaunchingTransition()) |
| |
| .addTransition(AMContainerState.IDLE, |
| EnumSet.of(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED), |
| AMContainerEventType.C_ASSIGN_TA, |
| new AssignTaskAttemptAtIdleTransition()) |
| .addTransition(AMContainerState.IDLE, |
| EnumSet.of(AMContainerState.RUNNING, AMContainerState.IDLE), |
| AMContainerEventType.C_PULL_TA, new PullTAAtIdleTransition()) |
| .addTransition(AMContainerState.IDLE, AMContainerState.COMPLETED, |
| AMContainerEventType.C_COMPLETED, new CompletedAtIdleTransition()) |
| .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, |
| AMContainerEventType.C_STOP_REQUEST, |
| new StopRequestAtIdleTransition()) |
| .addTransition(AMContainerState.IDLE, AMContainerState.STOP_REQUESTED, |
| AMContainerEventType.C_TIMED_OUT, new TimedOutAtIdleTransition()) |
| .addTransition(AMContainerState.IDLE, AMContainerState.STOPPING, |
| AMContainerEventType.C_NODE_FAILED, new NodeFailedAtIdleTransition()) |
| .addTransition( |
| AMContainerState.IDLE, |
| AMContainerState.STOP_REQUESTED, |
| EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, |
| AMContainerEventType.C_LAUNCHED, |
| AMContainerEventType.C_LAUNCH_FAILED, |
| AMContainerEventType.C_TA_SUCCEEDED, |
| AMContainerEventType.C_NM_STOP_SENT, |
| AMContainerEventType.C_NM_STOP_FAILED), |
| new ErrorAtIdleTransition()) |
| |
| .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, |
| AMContainerEventType.C_ASSIGN_TA, |
| new AssignTaskAttemptAtRunningTransition()) |
| .addTransition(AMContainerState.RUNNING, AMContainerState.RUNNING, |
| AMContainerEventType.C_PULL_TA) |
| .addTransition(AMContainerState.RUNNING, AMContainerState.IDLE, |
| AMContainerEventType.C_TA_SUCCEEDED, |
| new TASucceededAtRunningTransition()) |
| .addTransition(AMContainerState.RUNNING, AMContainerState.COMPLETED, |
| AMContainerEventType.C_COMPLETED, new CompletedAtRunningTransition()) |
| .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, |
| AMContainerEventType.C_STOP_REQUEST, |
| new StopRequestAtRunningTransition()) |
| .addTransition(AMContainerState.RUNNING, AMContainerState.STOP_REQUESTED, |
| AMContainerEventType.C_TIMED_OUT, new TimedOutAtRunningTransition()) |
| .addTransition(AMContainerState.RUNNING, AMContainerState.STOPPING, |
| AMContainerEventType.C_NODE_FAILED, |
| new NodeFailedAtRunningTransition()) |
| .addTransition( |
| AMContainerState.RUNNING, |
| AMContainerState.STOP_REQUESTED, |
| EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, |
| AMContainerEventType.C_LAUNCHED, |
| AMContainerEventType.C_LAUNCH_FAILED, |
| AMContainerEventType.C_NM_STOP_SENT, |
| AMContainerEventType.C_NM_STOP_FAILED), |
| new ErrorAtRunningTransition()) |
| |
| .addTransition(AMContainerState.STOP_REQUESTED, |
| AMContainerState.STOP_REQUESTED, AMContainerEventType.C_ASSIGN_TA, |
| new AssignTAAtWindDownTransition()) |
| .addTransition(AMContainerState.STOP_REQUESTED, |
| AMContainerState.STOP_REQUESTED, AMContainerEventType.C_PULL_TA, |
| new PullTAAfterStopTransition()) |
| .addTransition(AMContainerState.STOP_REQUESTED, |
| AMContainerState.COMPLETED, AMContainerEventType.C_COMPLETED, |
| new CompletedAtWindDownTransition()) |
| .addTransition(AMContainerState.STOP_REQUESTED, |
| AMContainerState.STOPPING, AMContainerEventType.C_NM_STOP_SENT) |
| .addTransition(AMContainerState.STOP_REQUESTED, |
| AMContainerState.STOPPING, AMContainerEventType.C_NM_STOP_FAILED, |
| new NMStopRequestFailedTransition()) |
| .addTransition(AMContainerState.STOP_REQUESTED, |
| AMContainerState.STOPPING, AMContainerEventType.C_NODE_FAILED, |
| new NodeFailedAtNMStopRequestedTransition()) |
| .addTransition( |
| AMContainerState.STOP_REQUESTED, |
| AMContainerState.STOP_REQUESTED, |
| EnumSet.of(AMContainerEventType.C_LAUNCHED, |
| AMContainerEventType.C_LAUNCH_FAILED, |
| AMContainerEventType.C_TA_SUCCEEDED, |
| AMContainerEventType.C_STOP_REQUEST, |
| AMContainerEventType.C_TIMED_OUT)) |
| .addTransition(AMContainerState.STOP_REQUESTED, |
| AMContainerState.STOP_REQUESTED, |
| AMContainerEventType.C_LAUNCH_REQUEST, |
| new ErrorAtNMStopRequestedTransition()) |
| |
| .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, |
| AMContainerEventType.C_ASSIGN_TA, new AssignTAAtWindDownTransition()) |
| .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, |
| AMContainerEventType.C_PULL_TA, new PullTAAfterStopTransition()) |
| // TODO This transition is wrong. Should be a noop / error. |
| .addTransition(AMContainerState.STOPPING, AMContainerState.COMPLETED, |
| AMContainerEventType.C_COMPLETED, new CompletedAtWindDownTransition()) |
| .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, |
| AMContainerEventType.C_NODE_FAILED, new NodeFailedBaseTransition()) |
| .addTransition( |
| AMContainerState.STOPPING, |
| AMContainerState.STOPPING, |
| EnumSet.of(AMContainerEventType.C_LAUNCHED, |
| AMContainerEventType.C_LAUNCH_FAILED, |
| AMContainerEventType.C_TA_SUCCEEDED, |
| AMContainerEventType.C_STOP_REQUEST, |
| AMContainerEventType.C_NM_STOP_SENT, |
| AMContainerEventType.C_NM_STOP_FAILED, |
| AMContainerEventType.C_TIMED_OUT)) |
| .addTransition(AMContainerState.STOPPING, AMContainerState.STOPPING, |
| AMContainerEventType.C_LAUNCH_REQUEST, |
| new ErrorAtStoppingTransition()) |
| |
| .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, |
| AMContainerEventType.C_ASSIGN_TA, new AssignTAAtCompletedTransition()) |
| .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, |
| AMContainerEventType.C_PULL_TA, new PullTAAfterStopTransition()) |
| .addTransition(AMContainerState.COMPLETED, AMContainerState.COMPLETED, |
| AMContainerEventType.C_NODE_FAILED, new NodeFailedBaseTransition()) |
| .addTransition( |
| AMContainerState.COMPLETED, |
| AMContainerState.COMPLETED, |
| EnumSet.of(AMContainerEventType.C_LAUNCH_REQUEST, |
| AMContainerEventType.C_LAUNCHED, |
| AMContainerEventType.C_LAUNCH_FAILED, |
| AMContainerEventType.C_TA_SUCCEEDED, |
| AMContainerEventType.C_COMPLETED, |
| AMContainerEventType.C_STOP_REQUEST, |
| AMContainerEventType.C_NM_STOP_SENT, |
| AMContainerEventType.C_NM_STOP_FAILED, |
| AMContainerEventType.C_TIMED_OUT)) |
| |
| .installTopology(); |
| |
| // Note: Containers will not reach their final state if the RM link is broken, |
| // AM shutdown should not wait for this. |
| |
| // Attempting to use a container based purely on reosurces required, etc needs |
| // additional change - JvmID, YarnChild, etc depend on TaskType. |
| public AMContainerImpl(Container container, ContainerHeartbeatHandler chh, |
| TaskAttemptListener tal, ContainerSignatureMatcher signatureMatcher, |
| AppContext appContext) { |
| ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); |
| this.readLock = rwLock.readLock(); |
| this.writeLock = rwLock.writeLock(); |
| this.container = container; |
| this.containerId = container.getId(); |
| this.eventHandler = appContext.getEventHandler(); |
| this.signatureMatcher = signatureMatcher; |
| this.appContext = appContext; |
| this.containerHeartbeatHandler = chh; |
| this.taskAttemptListener = tal; |
| this.failedAssignments = new LinkedList<TezTaskAttemptID>(); |
| this.noAllocationContainerTask = WAIT_TASK; |
| this.stateMachine = stateMachineFactory.make(this); |
| } |
| |
| @Override |
| public AMContainerState getState() { |
| readLock.lock(); |
| try { |
| return stateMachine.getCurrentState(); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public ContainerId getContainerId() { |
| return this.containerId; |
| } |
| |
| @Override |
| public Container getContainer() { |
| return this.container; |
| } |
| |
| @Override |
| public List<TezTaskAttemptID> getAllTaskAttempts() { |
| readLock.lock(); |
| try { |
| List<TezTaskAttemptID> allAttempts = new LinkedList<TezTaskAttemptID>(); |
| allAttempts.addAll(this.completedAttempts); |
| allAttempts.addAll(this.failedAssignments); |
| if (this.pendingAttempt != null) { |
| allAttempts.add(this.pendingAttempt); |
| } |
| if (this.runningAttempt != null) { |
| allAttempts.add(this.runningAttempt); |
| } |
| return allAttempts; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public List<TezTaskAttemptID> getQueuedTaskAttempts() { |
| readLock.lock(); |
| try { |
| if (pendingAttempt != null) { |
| return Collections.singletonList(this.pendingAttempt); |
| } else { |
| return Collections.emptyList(); |
| } |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public TezTaskAttemptID getRunningTaskAttempt() { |
| readLock.lock(); |
| try { |
| return this.runningAttempt; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public boolean isInErrorState() { |
| return inError; |
| } |
| |
| @Override |
| public void handle(AMContainerEvent event) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Processing AMContainerEvent " + event.getContainerId() |
| + " of type " + event.getType() + " while in state: " + getState() |
| + ". Event: " + event); |
| } |
| this.writeLock.lock(); |
| try { |
| final AMContainerState oldState = getState(); |
| try { |
| stateMachine.doTransition(event.getType(), event); |
| } catch (InvalidStateTransitonException e) { |
| LOG.error("Can't handle event " + event.getType() |
| + " at current state " + oldState + " for ContainerId " |
| + this.containerId, e); |
| inError = true; |
| // TODO Can't set state to COMPLETED. Add a default error state. |
| } |
| if (oldState != getState()) { |
| LOG.info("AMContainer " + this.containerId + " transitioned from " |
| + oldState + " to " + getState() |
| + " via event " + event.getType()); |
| } |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| private void sendEvent(Event<?> event) { |
| this.eventHandler.handle(event); |
| } |
| |
| // Push the TaskAttempt to the TAL, instead of the TAL pulling when a JVM asks |
| // for a TaskAttempt. |
| public AMContainerTask pullTaskContext() { |
| this.writeLock.lock(); |
| try { |
| this.handle( |
| new AMContainerEvent(containerId, AMContainerEventType.C_PULL_TA)); |
| if (pullAttempt == null) { |
| // As a later optimization, it should be possible for a running container to localize |
| // additional resources before a task is assigned to the container. |
| return noAllocationContainerTask; |
| } else { |
| // Avoid sending credentials if credentials have not changed. |
| AMContainerTask amContainerTask = new AMContainerTask(false, |
| remoteTaskMap.remove(pullAttempt), this.additionalLocalResources, |
| this.credentialsChanged ? this.credentials : null, this.credentialsChanged); |
| this.additionalLocalResources = null; |
| this.credentialsChanged = false; |
| return amContainerTask; |
| } |
| } finally { |
| this.pullAttempt = null; |
| this.writeLock.unlock(); |
| } |
| } |
| |
| ////////////////////////////////////////////////////////////////////////////// |
| // Start of Transition Classes // |
| ////////////////////////////////////////////////////////////////////////////// |
| |
| protected static class LaunchRequestTransition implements |
| SingleArcTransition<AMContainerImpl, AMContainerEvent> { |
| @Override |
| public void transition(AMContainerImpl container, AMContainerEvent cEvent) { |
| AMContainerEventLaunchRequest event = (AMContainerEventLaunchRequest) cEvent; |
| |
| ContainerContext containerContext = event.getContainerContext(); |
| // Clone - don't use the object that is passed in, since this is likely to |
| // be modified here. |
| container.containerLocalResources = new HashMap<String, LocalResource>( |
| containerContext.getLocalResources()); |
| container.credentials = containerContext.getCredentials(); |
| container.credentialsChanged = true; |
| |
| TezDAGID dagId = null; |
| Map<String, LocalResource> dagLocalResources = null; |
| if (container.appContext.getCurrentDAG() != null) { |
| dagId = container.appContext.getCurrentDAG().getID(); |
| dagLocalResources = container.appContext.getCurrentDAG().getLocalResources(); |
| } |
| ContainerLaunchContext clc = AMContainerHelpers.createContainerLaunchContext( |
| dagId, dagLocalResources, |
| container.appContext.getApplicationACLs(), |
| container.getContainerId(), |
| containerContext.getLocalResources(), |
| containerContext.getEnvironment(), |
| containerContext.getJavaOpts(), |
| container.taskAttemptListener.getAddress(), containerContext.getCredentials(), |
| container.appContext, container.container.getResource(), |
| container.appContext.getAMConf()); |
| |
| // Registering now, so that in case of delayed NM response, the child |
| // task is not told to die since the TAL does not know about the container. |
| container.registerWithTAListener(); |
| container.sendStartRequestToNM(clc); |
| LOG.info("Sending Launch Request for Container with id: " + |
| container.container.getId()); |
| } |
| } |
| |
| protected static class AssignTaskAttemptAtAllocatedTransition implements |
| SingleArcTransition<AMContainerImpl, AMContainerEvent> { |
| @Override |
| public void transition(AMContainerImpl container, AMContainerEvent cEvent) { |
| AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent; |
| container.inError = true; |
| container.registerFailedAttempt(event.getTaskAttemptId()); |
| container.maybeSendNodeFailureForFailedAssignment(event |
| .getTaskAttemptId()); |
| container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(), |
| "AMScheduler Error: TaskAttempt allocated to unlaunched container: " + |
| container.getContainerId(), TaskAttemptTerminationCause.FRAMEWORK_ERROR); |
| container.deAllocate(); |
| LOG.warn("Unexpected TA Assignment: TAId: " + event.getTaskAttemptId() + |
| " for ContainerId: " + container.getContainerId() + |
| " while in state: " + container.getState()); |
| } |
| } |
| |
| protected static class CompletedAtAllocatedTransition implements |
| SingleArcTransition<AMContainerImpl, AMContainerEvent> { |
| @Override |
| public void transition(AMContainerImpl container, AMContainerEvent cEvent) { |
| AMContainerEventCompleted event = (AMContainerEventCompleted)cEvent; |
| String diag = event.getDiagnostics(); |
| if (!(diag == null || diag.equals(""))) { |
| LOG.info("Container " + container.getContainerId() |
| + " exited with diagnostics set to " + diag); |
| } |
| } |
| } |
| |
| protected static class StopRequestAtAllocatedTransition implements |
| SingleArcTransition<AMContainerImpl, AMContainerEvent> { |
| @Override |
| public void transition(AMContainerImpl container, AMContainerEvent cEvent) { |
| container.deAllocate(); |
| } |
| } |
| |
| protected static class NodeFailedAtAllocatedTransition extends |
| NodeFailedBaseTransition { |
| @Override |
| public void transition(AMContainerImpl container, AMContainerEvent cEvent) { |
| super.transition(container, cEvent); |
| container.deAllocate(); |
| } |
| } |
| |
| protected static class ErrorTransition extends ErrorBaseTransition { |
| @Override |
| public void transition(AMContainerImpl container, AMContainerEvent cEvent) { |
| super.transition(container, cEvent); |
| container.deAllocate(); |
| LOG.info( |
| "Unexpected event type: " + cEvent.getType() + " while in state: " + |
| container.getState() + ". Event: " + cEvent); |
| |
| } |
| } |
| |
| protected static class AssignTaskAttemptTransition implements |
| MultipleArcTransition<AMContainerImpl, AMContainerEvent, AMContainerState> { |
| |
| @Override |
| public AMContainerState transition( |
| AMContainerImpl container, AMContainerEvent cEvent) { |
| AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent; |
| if (container.pendingAttempt != null) { |
| // This may include a couple of additional (harmless) unregister calls |
| // to the taskAttemptListener and containerHeartbeatHandler - in case |
| // of assign at any state prior to IDLE. |
| container.handleExtraTAAssign(event, container.pendingAttempt); |
| // TODO XXX: Verify that it's ok to send in a NM_STOP_REQUEST. The |
| // NMCommunicator should be able to handle this. The STOP_REQUEST would |
| // only go out after the START_REQUEST. |
| return AMContainerState.STOP_REQUESTED; |
| } |
| |
| Map<String, LocalResource> taskLocalResources = event.getRemoteTaskLocalResources(); |
| Preconditions.checkState(container.additionalLocalResources == null, |
| "No additional resources should be pending when assigning a new task"); |
| container.additionalLocalResources = container.signatureMatcher.getAdditionalResources( |
| container.containerLocalResources, taskLocalResources); |
| // Register the additional resources back for this container. |
| container.containerLocalResources.putAll(container.additionalLocalResources); |
| container.pendingAttempt = event.getTaskAttemptId(); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("AssignTA: attempt: " + event.getRemoteTaskSpec()); |
| LOG.debug("AdditionalLocalResources: " + container.additionalLocalResources); |
| } |
| |
| TezDAGID currentDAGID = container.appContext.getCurrentDAGID(); |
| if (!currentDAGID.equals(container.lastTaskDAGID)) { |
| // Will be null for the first task. |
| container.credentialsChanged = true; |
| container.credentials = event.getCredentials(); |
| container.lastTaskDAGID = currentDAGID; |
| } else { |
| container.credentialsChanged = false; |
| } |
| |
| container.remoteTaskMap |
| .put(event.getTaskAttemptId(), event.getRemoteTaskSpec()); |
| return container.getState(); |
| } |
| } |
| |
| protected static class LaunchedTransition implements |
| SingleArcTransition<AMContainerImpl, AMContainerEvent> { |
| @Override |
| public void transition(AMContainerImpl container, AMContainerEvent cEvent) { |
| container.registerWithContainerListener(); |
| } |
| } |
| |
| protected static class LaunchFailedTransition implements |
| SingleArcTransition<AMContainerImpl, AMContainerEvent> { |
| @Override |
| public void transition(AMContainerImpl container, AMContainerEvent cEvent) { |
| if (container.pendingAttempt != null) { |
| AMContainerEventLaunchFailed event = (AMContainerEventLaunchFailed) cEvent; |
| // for a properly setup cluster this should almost always be an app error |
| // need to differentiate between launch failed due to framework/cluster or app |
| container.sendTerminatingToTaskAttempt(container.pendingAttempt, |
| event.getMessage(), TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED); |
| } |
| container.unregisterFromTAListener(); |
| container.deAllocate(); |
| } |
| } |
| |
| protected static class CompletedAtLaunchingTransition |
| implements SingleArcTransition<AMContainerImpl, AMContainerEvent> { |
| @Override |
| public void transition(AMContainerImpl container, AMContainerEvent cEvent) { |
| AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent; |
| if (container.pendingAttempt != null) { |
| String errorMessage = getMessage(container, event); |
| if (event.isClusterAction()) { |
| container.sendContainerTerminatedBySystemToTaskAttempt(container.pendingAttempt, |
| errorMessage, event.getTerminationCause()); |
| } else { |
| container |
| .sendTerminatedToTaskAttempt( |
| container.pendingAttempt, |
| errorMessage, |
| // if termination cause is generic exited then replace with specific |
| (event.getTerminationCause() == TaskAttemptTerminationCause.CONTAINER_EXITED ? |
| TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED : event.getTerminationCause())); |
| } |
| container.registerFailedAttempt(container.pendingAttempt); |
| container.pendingAttempt = null; |
| LOG.warn(errorMessage); |
| } |
| container.containerLocalResources = null; |
| container.additionalLocalResources = null; |
| container.unregisterFromTAListener(); |
| String diag = event.getDiagnostics(); |
| if (!(diag == null || diag.equals(""))) { |
| LOG.info("Container " + container.getContainerId() |
| + " exited with diagnostics set to " + diag); |
| } |
| container.logStopped(event.getContainerExitStatus()); |
| } |
| |
| public String getMessage(AMContainerImpl container, |
| AMContainerEventCompleted event) { |
| return "Container" + container.getContainerId() |
| + " finished while trying to launch. Diagnostics: [" |
| + event.getDiagnostics() +"]"; |
| } |
| } |
| |
| protected static class StopRequestAtLaunchingTransition |
| implements SingleArcTransition<AMContainerImpl, AMContainerEvent> { |
| |
| @Override |
| public void transition(AMContainerImpl container, AMContainerEvent cEvent) { |
| if (container.pendingAttempt != null) { |
| container.sendTerminatingToTaskAttempt(container.pendingAttempt, |
| getMessage(container, cEvent), TaskAttemptTerminationCause.CONTAINER_STOPPED); |
| } |
| container.unregisterFromTAListener(); |
| container.logStopped(container.pendingAttempt == null ? |
| ContainerExitStatus.SUCCESS |
| : ContainerExitStatus.INVALID); |
| container.sendStopRequestToNM(); |
| } |
| |
| public String getMessage( |
| AMContainerImpl container, AMContainerEvent event) { |
| return "Container " + container.getContainerId() + |
| " received a STOP_REQUEST"; |
| } |
| } |
| |
| protected static class NodeFailedBaseTransition |
| implements SingleArcTransition<AMContainerImpl, AMContainerEvent> { |
| @Override |
| public void transition(AMContainerImpl container, AMContainerEvent cEvent) { |
| |
| if (container.nodeFailed) { |
| // ignore duplicates |
| return; |
| } |
| container.nodeFailed = true; |
| String errorMessage = "Node " + container.getContainer().getNodeId() + " failed. "; |
| if (cEvent instanceof DiagnosableEvent) { |
| errorMessage += ((DiagnosableEvent) cEvent).getDiagnosticInfo(); |
| } |
| |
| for (TezTaskAttemptID taId : container.failedAssignments) { |
| container.sendNodeFailureToTA(taId, errorMessage, TaskAttemptTerminationCause.NODE_FAILED); |
| } |
| for (TezTaskAttemptID taId : container.completedAttempts) { |
| container.sendNodeFailureToTA(taId, errorMessage, TaskAttemptTerminationCause.NODE_FAILED); |
| } |
| |
| if (container.pendingAttempt != null) { |
| // Will be null in COMPLETED state. |
| container.sendNodeFailureToTA(container.pendingAttempt, errorMessage, |
| TaskAttemptTerminationCause.NODE_FAILED); |
| container.sendTerminatingToTaskAttempt(container.pendingAttempt, errorMessage, |
| TaskAttemptTerminationCause.NODE_FAILED); |
| } |
| if (container.runningAttempt != null) { |
| // Will be null in COMPLETED state. |
| container.sendNodeFailureToTA(container.runningAttempt, errorMessage, |
| TaskAttemptTerminationCause.NODE_FAILED); |
| container.sendTerminatingToTaskAttempt(container.runningAttempt, errorMessage, |
| TaskAttemptTerminationCause.NODE_FAILED); |
| } |
| container.logStopped(ContainerExitStatus.ABORTED); |
| } |
| } |
| |
| protected static class NodeFailedAtLaunchingTransition |
| extends NodeFailedBaseTransition { |
| @Override |
| public void transition(AMContainerImpl container, AMContainerEvent cEvent) { |
| super.transition(container, cEvent); |
| container.unregisterFromTAListener(); |
| container.deAllocate(); |
| } |
| } |
| |
| protected static class ErrorAtLaunchingTransition |
| extends ErrorBaseTransition { |
| @Override |
| public void transition(AMContainerImpl container, AMContainerEvent cEvent) { |
| super.transition(container, cEvent); |
| if (container.pendingAttempt != null) { |
| container.sendTerminatingToTaskAttempt(container.pendingAttempt, |
| "Container " + container.getContainerId() + |
| " hit an invalid transition - " + cEvent.getType() + " at " + |
| container.getState(), TaskAttemptTerminationCause.FRAMEWORK_ERROR); |
| } |
| container.logStopped(ContainerExitStatus.ABORTED); |
| container.sendStopRequestToNM(); |
| container.unregisterFromTAListener(); |
| } |
| } |
| |
| protected static class AssignTaskAttemptAtIdleTransition |
| extends AssignTaskAttemptTransition { |
| @Override |
| public AMContainerState transition( |
| AMContainerImpl container, AMContainerEvent cEvent) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("AssignTAAtIdle: attempt: " + |
| ((AMContainerEventAssignTA) cEvent).getRemoteTaskSpec()); |
| } |
| return super.transition(container, cEvent); |
| } |
| } |
| |
| protected static class PullTAAtIdleTransition implements |
| MultipleArcTransition<AMContainerImpl, AMContainerEvent, AMContainerState> { |
| |
| @Override |
| public AMContainerState transition( |
| AMContainerImpl container, AMContainerEvent cEvent) { |
| if (container.pendingAttempt != null) { |
| // This will be invoked as part of the PULL_REQUEST - so pullAttempt pullAttempt |
| // should ideally only end up being populated during the duration of this call, |
| // which is in a write lock. pullRequest() should move this to the running state. |
| container.pullAttempt = container.pendingAttempt; |
| container.runningAttempt = container.pendingAttempt; |
| container.pendingAttempt = null; |
| if (container.lastTaskFinishTime != 0) { |
| long idleTimeDiff = |
| System.currentTimeMillis() - container.lastTaskFinishTime; |
| container.idleTimeBetweenTasks += idleTimeDiff; |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Computing idle time for container: " + |
| container.getContainerId() + ", lastFinishTime: " + |
| container.lastTaskFinishTime + ", Incremented by: " + |
| idleTimeDiff); |
| } |
| } |
| LOG.info("Assigned taskAttempt + [" + container.runningAttempt + |
| "] to container: [" + container.getContainerId() + "]"); |
| return AMContainerState.RUNNING; |
| } else { |
| return AMContainerState.IDLE; |
| } |
| } |
| } |
| |
| protected static class CompletedAtIdleTransition |
| extends CompletedAtLaunchingTransition { |
| @Override |
| public void transition(AMContainerImpl container, AMContainerEvent cEvent) { |
| super.transition(container, cEvent); |
| container.unregisterFromContainerListener(); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("TotalIdleTimeBetweenTasks for container: " |
| + container.getContainerId() + " = " |
| + container.idleTimeBetweenTasks); |
| } |
| } |
| |
| @Override |
| public String getMessage( |
| AMContainerImpl container, AMContainerEventCompleted event) { |
| return "Container " + container.getContainerId() |
| + " finished with diagnostics set to [" |
| + event.getDiagnostics() + "]"; |
| } |
| } |
| |
| protected static class StopRequestAtIdleTransition |
| extends StopRequestAtLaunchingTransition { |
| |
| @Override |
| public void transition(AMContainerImpl container, AMContainerEvent cEvent) { |
| super.transition(container, cEvent); |
| container.unregisterFromContainerListener(); |
| } |
| } |
| |
| protected static class TimedOutAtIdleTransition |
| extends StopRequestAtIdleTransition { |
| |
| public String getMessage( |
| AMContainerImpl container, AMContainerEvent event) { |
| return "Container " + container.getContainerId() + |
| " timed out"; |
| } |
| } |
| |
| protected static class NodeFailedAtIdleTransition |
| extends NodeFailedAtLaunchingTransition { |
| |
| @Override |
| public void transition(AMContainerImpl container, AMContainerEvent cEvent) { |
| super.transition(container, cEvent); |
| container.unregisterFromContainerListener(); |
| } |
| } |
| |
| protected static class ErrorAtIdleTransition |
| extends ErrorAtLaunchingTransition { |
| @Override |
| public void transition(AMContainerImpl container, AMContainerEvent cEvent) { |
| super.transition(container, cEvent); |
| container.unregisterFromContainerListener(); |
| } |
| } |
| |
| protected static class AssignTaskAttemptAtRunningTransition implements |
| SingleArcTransition<AMContainerImpl, AMContainerEvent> { |
| @Override |
| public void transition(AMContainerImpl container, AMContainerEvent cEvent) { |
| |
| AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent; |
| container.unregisterAttemptFromListener(container.runningAttempt); |
| container.handleExtraTAAssign(event, container.runningAttempt); |
| } |
| } |
| |
| protected static class TASucceededAtRunningTransition |
| implements SingleArcTransition<AMContainerImpl, AMContainerEvent> { |
| @Override |
| public void transition(AMContainerImpl container, AMContainerEvent cEvent) { |
| container.lastTaskFinishTime = System.currentTimeMillis(); |
| container.completedAttempts.add(container.runningAttempt); |
| container.unregisterAttemptFromListener(container.runningAttempt); |
| container.runningAttempt = null; |
| } |
| } |
| |
| protected static class CompletedAtRunningTransition |
| extends CompletedAtIdleTransition { |
| @Override |
| public void transition(AMContainerImpl container, AMContainerEvent cEvent) { |
| AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent; |
| if (event.isClusterAction()) { |
| container.sendContainerTerminatedBySystemToTaskAttempt(container.runningAttempt, |
| getMessage(container, event), event.getTerminationCause()); |
| } else { |
| container.sendTerminatedToTaskAttempt(container.runningAttempt, |
| getMessage(container, event), event.getTerminationCause()); |
| } |
| container.unregisterAttemptFromListener(container.runningAttempt); |
| container.registerFailedAttempt(container.runningAttempt); |
| container.runningAttempt = null; |
| super.transition(container, cEvent); |
| } |
| } |
| |
| protected static class StopRequestAtRunningTransition |
| extends StopRequestAtIdleTransition { |
| public void transition(AMContainerImpl container, AMContainerEvent cEvent) { |
| |
| container.unregisterAttemptFromListener(container.runningAttempt); |
| container.sendTerminatingToTaskAttempt(container.runningAttempt, |
| " Container" + container.getContainerId() + " received a STOP_REQUEST", |
| TaskAttemptTerminationCause.CONTAINER_STOPPED); |
| super.transition(container, cEvent); |
| } |
| } |
| |
| protected static class TimedOutAtRunningTransition |
| extends StopRequestAtRunningTransition { |
| @Override |
| public String getMessage( |
| AMContainerImpl container, AMContainerEvent event) { |
| return "Container " + container.getContainerId() + |
| " timed out"; |
| } |
| } |
| |
| protected static class NodeFailedAtRunningTransition |
| extends NodeFailedAtIdleTransition { |
| |
| @Override |
| public void transition(AMContainerImpl container, AMContainerEvent cEvent) { |
| super.transition(container, cEvent); |
| container.unregisterAttemptFromListener(container.runningAttempt); |
| } |
| } |
| |
| protected static class ErrorAtRunningTransition |
| extends ErrorAtIdleTransition { |
| @Override |
| public void transition(AMContainerImpl container, AMContainerEvent cEvent) { |
| super.transition(container, cEvent); |
| container.unregisterAttemptFromListener(container.runningAttempt); |
| container.sendTerminatingToTaskAttempt(container.runningAttempt, |
| "Container " + container.getContainerId() + |
| " hit an invalid transition - " + cEvent.getType() + " at " + |
| container.getState(), TaskAttemptTerminationCause.FRAMEWORK_ERROR); |
| } |
| } |
| |
| protected static class AssignTAAtWindDownTransition |
| implements SingleArcTransition<AMContainerImpl, AMContainerEvent> { |
| @Override |
| public void transition(AMContainerImpl container, AMContainerEvent cEvent) { |
| AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent; |
| container.inError = true; |
| String errorMessage = "AttemptId: " + event.getTaskAttemptId() + |
| " cannot be allocated to container: " + container.getContainerId() + |
| " in " + container.getState() + " state"; |
| container.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId()); |
| container.sendTerminatingToTaskAttempt(event.getTaskAttemptId(), errorMessage, |
| TaskAttemptTerminationCause.CONTAINER_EXITED); |
| container.registerFailedAttempt(event.getTaskAttemptId()); |
| } |
| } |
| |
| // Hack to some extent. This allocation should be done while entering one of |
| // the post-running states, insetad of being a transition on the post stop |
| // states. |
| protected static class PullTAAfterStopTransition |
| implements SingleArcTransition<AMContainerImpl, AMContainerEvent> { |
| @Override |
| public void transition(AMContainerImpl container, AMContainerEvent cEvent) { |
| container.noAllocationContainerTask = NO_MORE_TASKS; |
| } |
| } |
| |
| protected static class CompletedAtWindDownTransition implements |
| SingleArcTransition<AMContainerImpl, AMContainerEvent> { |
| @Override |
| public void transition(AMContainerImpl container, AMContainerEvent cEvent) { |
| AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent; |
| String diag = event.getDiagnostics(); |
| for (TezTaskAttemptID taId : container.failedAssignments) { |
| container.sendTerminatedToTaskAttempt(taId, diag, |
| TaskAttemptTerminationCause.CONTAINER_EXITED); |
| } |
| if (container.pendingAttempt != null) { |
| container.sendTerminatedToTaskAttempt(container.pendingAttempt, diag, |
| TaskAttemptTerminationCause.CONTAINER_EXITED); |
| container.registerFailedAttempt(container.pendingAttempt); |
| container.pendingAttempt = null; |
| } |
| if (container.runningAttempt != null) { |
| container.sendTerminatedToTaskAttempt(container.runningAttempt, diag, |
| TaskAttemptTerminationCause.CONTAINER_EXITED); |
| container.registerFailedAttempt(container.runningAttempt); |
| container.runningAttempt = null; |
| } |
| if (!(diag == null || diag.equals(""))) { |
| LOG.info("Container " + container.getContainerId() |
| + " exited with diagnostics set to " + diag); |
| } |
| container.containerLocalResources = null; |
| container.additionalLocalResources = null; |
| } |
| } |
| |
| protected static class NMStopRequestFailedTransition |
| implements SingleArcTransition<AMContainerImpl, AMContainerEvent> { |
| @Override |
| public void transition(AMContainerImpl container, AMContainerEvent cEvent) { |
| container.deAllocate(); |
| } |
| } |
| |
| protected static class NodeFailedAtNMStopRequestedTransition |
| extends NodeFailedBaseTransition { |
| public void transition(AMContainerImpl container, AMContainerEvent cEvent) { |
| super.transition(container, cEvent); |
| container.deAllocate(); |
| } |
| } |
| |
| protected static class ErrorAtNMStopRequestedTransition |
| extends ErrorBaseTransition { |
| public void transition(AMContainerImpl container, AMContainerEvent cEvent) { |
| super.transition(container, cEvent); |
| } |
| } |
| |
| protected static class ErrorAtStoppingTransition |
| extends ErrorBaseTransition { |
| public void transition(AMContainerImpl container, AMContainerEvent cEvent) { |
| super.transition(container, cEvent); |
| } |
| } |
| |
| protected static class ErrorBaseTransition implements |
| SingleArcTransition<AMContainerImpl, AMContainerEvent> { |
| |
| @Override |
| public void transition(AMContainerImpl container, AMContainerEvent cEvent) { |
| container.inError = true; |
| } |
| } |
| |
| protected static class AssignTAAtCompletedTransition implements |
| SingleArcTransition<AMContainerImpl, AMContainerEvent> { |
| |
| @Override |
| public void transition(AMContainerImpl container, AMContainerEvent cEvent) { |
| // TODO CREUSE CRITICAL: This is completely incorrect. COMPLETED comes |
| // from RMComm directly to the container. Meanwhile, the scheduler may |
| // think the container is still around and assign a task to it. The task |
| // ends up getting a CONTAINER_KILLED message. Task could handle this by |
| // asking for a reschedule in this case. Will end up FAILING the task instead of KILLING it. |
| container.inError = true; |
| AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent; |
| String errorMessage = "AttemptId: " + event.getTaskAttemptId() |
| + " cannot be allocated to container: " + container.getContainerId() |
| + " in COMPLETED state"; |
| container.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId()); |
| container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(), |
| errorMessage, TaskAttemptTerminationCause.FRAMEWORK_ERROR); |
| container.registerFailedAttempt(event.getTaskAttemptId()); |
| } |
| } |
| |
| private void handleExtraTAAssign( |
| AMContainerEventAssignTA event, TezTaskAttemptID currentTaId) { |
| this.inError = true; |
| String errorMessage = "AMScheduler Error: Multiple simultaneous " + |
| "taskAttempt allocations to: " + this.getContainerId() + |
| ". Attempts: " + currentTaId + ", " + event.getTaskAttemptId() + |
| ". Current state: " + this.getState(); |
| this.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId()); |
| this.sendTerminatingToTaskAttempt(event.getTaskAttemptId(), errorMessage, |
| TaskAttemptTerminationCause.FRAMEWORK_ERROR); |
| this.sendTerminatingToTaskAttempt(currentTaId, errorMessage, |
| TaskAttemptTerminationCause.FRAMEWORK_ERROR); |
| this.registerFailedAttempt(event.getTaskAttemptId()); |
| LOG.warn(errorMessage); |
| this.logStopped(ContainerExitStatus.INVALID); |
| this.sendStopRequestToNM(); |
| this.unregisterFromTAListener(); |
| this.unregisterFromContainerListener(); |
| } |
| |
| protected void registerFailedAttempt(TezTaskAttemptID taId) { |
| failedAssignments.add(taId); |
| } |
| |
| private void logStopped(int exitStatus) { |
| final Clock clock = appContext.getClock(); |
| final HistoryEventHandler historyHandler = appContext.getHistoryHandler(); |
| ContainerStoppedEvent lEvt = new ContainerStoppedEvent(containerId, |
| clock.getTime(), |
| exitStatus, |
| appContext.getApplicationAttemptId()); |
| historyHandler.handle( |
| new DAGHistoryEvent(appContext.getCurrentDAGID(),lEvt)); |
| } |
| |
| protected void deAllocate() { |
| sendEvent(new AMSchedulerEventDeallocateContainer(containerId)); |
| } |
| |
| protected void sendTerminatedToTaskAttempt( |
| TezTaskAttemptID taId, String message, TaskAttemptTerminationCause errCause) { |
| sendEvent(new TaskAttemptEventContainerTerminated(taId, message, errCause)); |
| } |
| |
| protected void sendContainerTerminatedBySystemToTaskAttempt( |
| TezTaskAttemptID taId, String message, TaskAttemptTerminationCause errorCause) { |
| sendEvent(new TaskAttemptEventContainerTerminatedBySystem(taId, message, errorCause)); |
| } |
| |
| protected void sendTerminatingToTaskAttempt(TezTaskAttemptID taId, |
| String message, TaskAttemptTerminationCause errorCause) { |
| sendEvent(new TaskAttemptEventContainerTerminating(taId, message, errorCause)); |
| } |
| |
| protected void maybeSendNodeFailureForFailedAssignment(TezTaskAttemptID taId) { |
| if (this.nodeFailed) { |
| this.sendNodeFailureToTA(taId, "Node Failed", TaskAttemptTerminationCause.NODE_FAILED); |
| } |
| } |
| |
| protected void sendNodeFailureToTA(TezTaskAttemptID taId, String message, |
| TaskAttemptTerminationCause errorCause) { |
| sendEvent(new TaskAttemptEventNodeFailed(taId, message, errorCause)); |
| } |
| |
| protected void sendStartRequestToNM(ContainerLaunchContext clc) { |
| sendEvent(new NMCommunicatorLaunchRequestEvent(clc, container)); |
| } |
| |
| protected void sendStopRequestToNM() { |
| sendEvent(new NMCommunicatorStopRequestEvent(containerId, |
| container.getNodeId(), container.getContainerToken())); |
| } |
| |
| protected void unregisterAttemptFromListener(TezTaskAttemptID attemptId) { |
| taskAttemptListener.unregisterTaskAttempt(attemptId); |
| } |
| |
| protected void registerWithTAListener() { |
| taskAttemptListener.registerRunningContainer(containerId); |
| } |
| |
| protected void unregisterFromTAListener() { |
| this.taskAttemptListener.unregisterRunningContainer(containerId); |
| } |
| |
| |
| protected void registerWithContainerListener() { |
| this.containerHeartbeatHandler.register(this.containerId); |
| } |
| |
| protected void unregisterFromContainerListener() { |
| this.containerHeartbeatHandler.unregister(this.containerId); |
| } |
| |
| |
| |
| } |