blob: f05885f6576251c5a6317136c4b7095c4bd6e2bc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app2.job.impl;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.WrappedProgressSplitsBlock;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app2.AppContext;
import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app2.TaskHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttemptStateInternal;
import org.apache.hadoop.mapreduce.v2.app2.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app2.job.event.JobDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app2.job.event.JobTaskAttemptFetchFailureEvent;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventContainerTerminated;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventContainerTerminating;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventNodeFailed;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptRemoteStartEvent;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptScheduleEvent;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskTAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTALaunchRequestEvent;
import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventTAEnded;
import org.apache.hadoop.mapreduce.v2.app2.speculate.SpeculatorEvent;
import org.apache.hadoop.mapreduce.v2.app2.taskclean.TaskCleanupEvent;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.hadoop.yarn.util.Records;
import com.google.common.annotations.VisibleForTesting;
public abstract class TaskAttemptImpl implements TaskAttempt,
EventHandler<TaskAttemptEvent> {
private static final Log LOG = LogFactory.getLog(TaskAttemptImpl.class);
private static final String LINE_SEPARATOR = System
.getProperty("line.separator");
static final Counters EMPTY_COUNTERS = new Counters();
private static final long MEMORY_SPLITS_RESOLUTION = 1024; //TODO Make configurable?
protected final JobConf conf;
protected final Path jobFile;
protected final int partition;
@SuppressWarnings("rawtypes")
protected EventHandler eventHandler;
private final TaskAttemptId attemptId;
private final TaskId taskId;
private final JobId jobId;
private final Clock clock;
// private final TaskAttemptListener taskAttemptListener;
private final OutputCommitter committer;
private final Resource resourceCapability;
private final String[] dataLocalHosts;
private final List<String> diagnostics = new ArrayList<String>();
private final Lock readLock;
private final Lock writeLock;
private final AppContext appContext;
private final TaskHeartbeatHandler taskHeartbeatHandler;
private Credentials credentials;
private Token<JobTokenIdentifier> jobToken;
private long launchTime = 0;
private long finishTime = 0;
private WrappedProgressSplitsBlock progressSplitBlock;
private int shufflePort = -1;
private String trackerName;
private int httpPort;
// TODO Can these be replaced by the container object ?
private ContainerId containerId;
private NodeId containerNodeId;
private String containerMgrAddress;
private String nodeHttpAddress;
private String nodeRackName;
private TaskAttemptStatus reportedStatus;
private boolean speculatorContainerRequestSent = false;
private static StateMachineFactory
<TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
stateMachineFactory
= new StateMachineFactory
<TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
(TaskAttemptStateInternal.NEW);
private static SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION;
private static boolean stateMachineInited = false;
private final StateMachine
<TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent> stateMachine;
// TODO XXX: Ensure MAPREDUCE-4457 is factored in. Also MAPREDUCE-4068.
// TODO XXX: Rename all CONTAINER_COMPLETED transitions to TERMINATED.
private void initStateMachine() {
stateMachineFactory =
stateMachineFactory
.addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.START_WAIT, TaskAttemptEventType.TA_SCHEDULE, createScheduleTransition())
.addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.NEW, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
.addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_FAIL_REQUEST, createFailRequestTransition())
.addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_KILL_REQUEST, createKillRequestTransition())
.addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_STARTED_REMOTELY, createStartedTransition())
.addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.START_WAIT, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
.addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAIL_REQUEST, createFailRequestBeforeRunningTransition())
.addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_KILL_REQUEST, createKillRequestBeforeRunningTransition())
.addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, createNodeFailedBeforeRunningTransition())
.addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_CONTAINER_TERMINATING, createContainerTerminatingBeforeRunningTransition())
.addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, createContainerCompletedBeforeRunningTransition())
.addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_STATUS_UPDATE, createStatusUpdateTransition())
.addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
.addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_COMMIT_PENDING, createCommitPendingTransition())
.addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DONE, createSucceededTransition())
.addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAILED, createFailRequestWhileRunningTransition())
.addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_TIMED_OUT, createFailRequestWhileRunningTransition())
.addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAIL_REQUEST, createFailRequestWhileRunningTransition())
.addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_KILL_REQUEST, createKillRequestWhileRunningTransition())
.addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, createNodeFailedWhileRunningTransition())
.addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_CONTAINER_TERMINATING, createContainerTerminatingWhileRunningTransition())
.addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, createContainerCompletedWhileRunningTransition())
// XXX Maybe move getMessage / getDiagnosticInfo into the base TaskAttemptEvent ?
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_STATUS_UPDATE, createStatusUpdateTransition())
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_COMMIT_PENDING) // TODO ensure this is an ignorable event.
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DONE, createSucceededTransition())
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAILED, createFailRequestWhileRunningTransition())
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_TIMED_OUT, createFailRequestWhileRunningTransition())
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAIL_REQUEST, createFailRequestWhileRunningTransition())
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_KILL_REQUEST, createKillRequestWhileRunningTransition())
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, createNodeFailedWhileRunningTransition())
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_CONTAINER_TERMINATING, createContainerTerminatingWhileRunningTransition())
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, createContainerCompletedWhileRunningTransition())
.addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, createTerminatedTransition())
.addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
.addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING))
.addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, createTerminatedTransition())
.addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
.addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAIL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING))
.addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
.addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED))
.addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
.addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED))
// TODO XXX: FailRequest / KillRequest at SUCCEEDED need to consider Map / Reduce task.
.addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
.addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_FAIL_REQUEST, createFailRequestAfterSuccessTransition())
.addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_KILL_REQUEST, createKillRequestAfterSuccessTransition())
.addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_NODE_FAILED, createNodeFailedAfterSuccessTransition())
.addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES, createTooManyFetchFailuresTransition())
.addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED, EnumSet.of(TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_CONTAINER_TERMINATING))
.installTopology();
}
// TODO Remove TaskAttemptListener from the constructor.
@SuppressWarnings("rawtypes")
public TaskAttemptImpl(TaskId taskId, int i, EventHandler eventHandler,
TaskAttemptListener tal, Path jobFile, int partition, JobConf conf,
String[] dataLocalHosts, OutputCommitter committer,
Token<JobTokenIdentifier> jobToken, Credentials credentials, Clock clock,
TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext) {
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
this.readLock = rwLock.readLock();
this.writeLock = rwLock.writeLock();
this.taskId = taskId;
this.jobId = taskId.getJobId();
this.attemptId = MRBuilderUtils.newTaskAttemptId(taskId, i);
this.eventHandler = eventHandler;
//Reported status
this.jobFile = jobFile;
this.partition = partition;
this.conf = conf;
this.dataLocalHosts = dataLocalHosts;
this.committer = committer;
this.jobToken = jobToken;
this.credentials = credentials;
this.clock = clock;
this.taskHeartbeatHandler = taskHeartbeatHandler;
this.appContext = appContext;
this.resourceCapability = BuilderUtils.newResource(getMemoryRequired(conf,
taskId.getTaskType()));
this.reportedStatus = new TaskAttemptStatus();
initTaskAttemptStatus(reportedStatus);
RackResolver.init(conf);
synchronized(stateMachineFactory) {
if (!stateMachineInited) {
LOG.info("XXX: Initializing State Machine Factory");
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION = createDiagnosticUpdateTransition();
initStateMachine();
stateMachineInited = true;
}
}
this.stateMachine = stateMachineFactory.make(this);
}
@Override
public TaskAttemptId getID() {
return attemptId;
}
protected abstract org.apache.hadoop.mapred.Task createRemoteTask();
@Override
public TaskAttemptReport getReport() {
TaskAttemptReport result = Records.newRecord(TaskAttemptReport.class);
readLock.lock();
try {
result.setTaskAttemptId(attemptId);
//take the LOCAL state of attempt
//DO NOT take from reportedStatus
result.setTaskAttemptState(getState());
result.setProgress(reportedStatus.progress);
result.setStartTime(launchTime);
result.setFinishTime(finishTime);
result.setShuffleFinishTime(this.reportedStatus.shuffleFinishTime);
result.setDiagnosticInfo(StringUtils.join(LINE_SEPARATOR, getDiagnostics()));
result.setPhase(reportedStatus.phase);
result.setStateString(reportedStatus.stateString);
result.setCounters(TypeConverter.toYarn(getCounters()));
result.setContainerId(this.getAssignedContainerID());
result.setNodeManagerHost(trackerName);
result.setNodeManagerHttpPort(httpPort);
if (this.containerNodeId != null) {
result.setNodeManagerPort(this.containerNodeId.getPort());
}
return result;
} finally {
readLock.unlock();
}
}
@Override
public List<String> getDiagnostics() {
List<String> result = new ArrayList<String>();
readLock.lock();
try {
result.addAll(diagnostics);
return result;
} finally {
readLock.unlock();
}
}
@Override
public Counters getCounters() {
readLock.lock();
try {
Counters counters = reportedStatus.counters;
if (counters == null) {
counters = EMPTY_COUNTERS;
}
return counters;
} finally {
readLock.unlock();
}
}
@Override
public float getProgress() {
readLock.lock();
try {
return reportedStatus.progress;
} finally {
readLock.unlock();
}
}
@Override
public TaskAttemptState getState() {
readLock.lock();
try {
return getExternalState(stateMachine.getCurrentState());
} finally {
readLock.unlock();
}
}
@Override
public boolean isFinished() {
readLock.lock();
try {
// TODO: Use stateMachine level method?
return (EnumSet.of(TaskAttemptStateInternal.SUCCEEDED,
TaskAttemptStateInternal.FAILED,
TaskAttemptStateInternal.FAIL_IN_PROGRESS,
TaskAttemptStateInternal.KILLED,
TaskAttemptStateInternal.KILL_IN_PROGRESS)
.contains(getInternalState()));
} finally {
readLock.unlock();
}
}
@Override
public ContainerId getAssignedContainerID() {
readLock.lock();
try {
return containerId;
} finally {
readLock.unlock();
}
}
@Override
public String getAssignedContainerMgrAddress() {
readLock.lock();
try {
return containerMgrAddress;
} finally {
readLock.unlock();
}
}
@Override
public NodeId getNodeId() {
readLock.lock();
try {
return containerNodeId;
} finally {
readLock.unlock();
}
}
/**If container Assigned then return the node's address, otherwise null.
*/
@Override
public String getNodeHttpAddress() {
readLock.lock();
try {
return nodeHttpAddress;
} finally {
readLock.unlock();
}
}
/**
* If container Assigned then return the node's rackname, otherwise null.
*/
@Override
public String getNodeRackName() {
this.readLock.lock();
try {
return this.nodeRackName;
} finally {
this.readLock.unlock();
}
}
@Override
public long getLaunchTime() {
readLock.lock();
try {
return launchTime;
} finally {
readLock.unlock();
}
}
@Override
public long getFinishTime() {
readLock.lock();
try {
return finishTime;
} finally {
readLock.unlock();
}
}
@Override
public long getShuffleFinishTime() {
readLock.lock();
try {
return this.reportedStatus.shuffleFinishTime;
} finally {
readLock.unlock();
}
}
@Override
public long getSortFinishTime() {
readLock.lock();
try {
return this.reportedStatus.sortFinishTime;
} finally {
readLock.unlock();
}
}
@Override
public int getShufflePort() {
readLock.lock();
try {
return shufflePort;
} finally {
readLock.unlock();
}
}
@SuppressWarnings("unchecked")
@Override
public void handle(TaskAttemptEvent event) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing " + event.getTaskAttemptID() + " of type "
+ event.getType());
}
LOG.info("XXX: Processing " + event.getTaskAttemptID() + " of type "
+ event.getType() + " while in state: " + getInternalState());
writeLock.lock();
try {
final TaskAttemptStateInternal oldState = getInternalState();
try {
stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitonException e) {
LOG.error("Can't handle this event at current state for "
+ this.attemptId, e);
eventHandler.handle(new JobDiagnosticsUpdateEvent(
this.attemptId.getTaskId().getJobId(), "Invalid event " + event.getType() +
" on TaskAttempt " + this.attemptId));
eventHandler.handle(new JobEvent(this.attemptId.getTaskId().getJobId(),
JobEventType.INTERNAL_ERROR));
}
if (oldState != getInternalState()) {
LOG.info(attemptId + " TaskAttempt Transitioned from "
+ oldState + " to "
+ getInternalState());
}
} finally {
writeLock.unlock();
}
}
@VisibleForTesting
public TaskAttemptStateInternal getInternalState() {
readLock.lock();
try {
return stateMachine.getCurrentState();
} finally {
readLock.unlock();
}
}
private static TaskAttemptState getExternalState(
TaskAttemptStateInternal smState) {
switch (smState) {
case NEW:
case START_WAIT:
return TaskAttemptState.STARTING;
case RUNNING:
return TaskAttemptState.RUNNING;
case COMMIT_PENDING:
return TaskAttemptState.COMMIT_PENDING;
case FAILED:
case FAIL_IN_PROGRESS:
return TaskAttemptState.FAILED;
case KILLED:
case KILL_IN_PROGRESS:
return TaskAttemptState.KILLED;
case SUCCEEDED:
return TaskAttemptState.SUCCEEDED;
default:
throw new YarnException("Attempt to convert invalid "
+ "stateMachineTaskAttemptState to externalTaskAttemptState: "
+ smState);
}
}
@SuppressWarnings("unchecked")
private void sendEvent(Event<?> event) {
this.eventHandler.handle(event);
}
private int getMemoryRequired(Configuration conf, TaskType taskType) {
int memory = 1024;
if (taskType == TaskType.MAP) {
memory =
conf.getInt(MRJobConfig.MAP_MEMORY_MB,
MRJobConfig.DEFAULT_MAP_MEMORY_MB);
} else if (taskType == TaskType.REDUCE) {
memory =
conf.getInt(MRJobConfig.REDUCE_MEMORY_MB,
MRJobConfig.DEFAULT_REDUCE_MEMORY_MB);
}
return memory;
}
// always called in write lock
private void setFinishTime() {
// set the finish time only if launch time is set
if (launchTime != 0) {
finishTime = clock.getTime();
}
}
// TOOD Merge some of these JobCounter events.
private static JobCounterUpdateEvent createJobCounterUpdateEventTALaunched(
TaskAttemptImpl ta) {
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(ta.jobId);
jce.addCounterUpdate(
ta.taskId.getTaskType() == TaskType.MAP ? JobCounter.TOTAL_LAUNCHED_MAPS
: JobCounter.TOTAL_LAUNCHED_REDUCES, 1);
return jce;
}
private static JobCounterUpdateEvent createJobCounterUpdateEventSlotMillis(
TaskAttemptImpl ta) {
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(ta.jobId);
long slotMillis = computeSlotMillis(ta);
jce.addCounterUpdate(
ta.taskId.getTaskType() == TaskType.MAP ? JobCounter.SLOTS_MILLIS_MAPS
: JobCounter.SLOTS_MILLIS_REDUCES, slotMillis);
return jce;
}
private static JobCounterUpdateEvent createJobCounterUpdateEventTATerminated(
TaskAttemptImpl taskAttempt, boolean taskAlreadyCompleted,
TaskAttemptStateInternal taState) {
TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskAttempt.getID()
.getTaskId().getJobId());
long slotMillisIncrement = computeSlotMillis(taskAttempt);
if (taskType == TaskType.MAP) {
if (taState == TaskAttemptStateInternal.FAILED) {
jce.addCounterUpdate(JobCounter.NUM_FAILED_MAPS, 1);
} else if (taState == TaskAttemptStateInternal.KILLED) {
jce.addCounterUpdate(JobCounter.NUM_KILLED_MAPS, 1);
}
if (!taskAlreadyCompleted) {
// dont double count the elapsed time
jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement);
}
} else {
if (taState == TaskAttemptStateInternal.FAILED) {
jce.addCounterUpdate(JobCounter.NUM_FAILED_REDUCES, 1);
} else if (taState == TaskAttemptStateInternal.KILLED) {
jce.addCounterUpdate(JobCounter.NUM_KILLED_REDUCES, 1);
}
if (!taskAlreadyCompleted) {
// dont double count the elapsed time
jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES,
slotMillisIncrement);
}
}
return jce;
}
private static long computeSlotMillis(TaskAttemptImpl taskAttempt) {
TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
int slotMemoryReq =
taskAttempt.getMemoryRequired(taskAttempt.conf, taskType);
int minSlotMemSize =
taskAttempt.appContext.getClusterInfo().getMinContainerCapability()
.getMemory();
int simSlotsRequired =
minSlotMemSize == 0 ? 0 : (int) Math.ceil((float) slotMemoryReq
/ minSlotMemSize);
long slotMillisIncrement =
simSlotsRequired
* (taskAttempt.getFinishTime() - taskAttempt.getLaunchTime());
return slotMillisIncrement;
}
// TODO Change to return a JobHistoryEvent.
private static
TaskAttemptUnsuccessfulCompletionEvent
createTaskAttemptUnsuccessfulCompletionEvent(TaskAttemptImpl taskAttempt,
TaskAttemptStateInternal attemptState) {
TaskAttemptUnsuccessfulCompletionEvent tauce =
new TaskAttemptUnsuccessfulCompletionEvent(
TypeConverter.fromYarn(taskAttempt.attemptId),
TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId()
.getTaskType()), attemptState.toString(),
taskAttempt.finishTime,
taskAttempt.containerNodeId == null ? "UNKNOWN"
: taskAttempt.containerNodeId.getHost(),
taskAttempt.containerNodeId == null ? -1
: taskAttempt.containerNodeId.getPort(),
taskAttempt.nodeRackName == null ? "UNKNOWN"
: taskAttempt.nodeRackName,
StringUtils.join(
LINE_SEPARATOR, taskAttempt.getDiagnostics()), taskAttempt
.getProgressSplitBlock().burst());
return tauce;
}
private JobHistoryEvent createTaskAttemptStartedEvent() {
TaskAttemptStartedEvent tase = new TaskAttemptStartedEvent(
TypeConverter.fromYarn(attemptId), TypeConverter.fromYarn(taskId
.getTaskType()), launchTime, trackerName, httpPort, shufflePort,
containerId);
return new JobHistoryEvent(jobId, tase);
}
private WrappedProgressSplitsBlock getProgressSplitBlock() {
readLock.lock();
try {
if (progressSplitBlock == null) {
progressSplitBlock = new WrappedProgressSplitsBlock(conf.getInt(
MRJobConfig.MR_AM_NUM_PROGRESS_SPLITS,
MRJobConfig.DEFAULT_MR_AM_NUM_PROGRESS_SPLITS));
}
return progressSplitBlock;
} finally {
readLock.unlock();
}
}
private void updateProgressSplits() {
double newProgress = reportedStatus.progress;
newProgress = Math.max(Math.min(newProgress, 1.0D), 0.0D);
Counters counters = reportedStatus.counters;
if (counters == null)
return;
WrappedProgressSplitsBlock splitsBlock = getProgressSplitBlock();
if (splitsBlock != null) {
long now = clock.getTime();
long start = getLaunchTime();
if (start == 0)
return;
if (start != 0 && now - start <= Integer.MAX_VALUE) {
splitsBlock.getProgressWallclockTime().extend(newProgress,
(int) (now - start));
}
Counter cpuCounter = counters.findCounter(TaskCounter.CPU_MILLISECONDS);
if (cpuCounter != null && cpuCounter.getValue() <= Integer.MAX_VALUE) {
splitsBlock.getProgressCPUTime().extend(newProgress,
(int) cpuCounter.getValue()); // long to int? TODO: FIX. Same below
}
Counter virtualBytes = counters
.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES);
if (virtualBytes != null) {
splitsBlock.getProgressVirtualMemoryKbytes().extend(newProgress,
(int) (virtualBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
}
Counter physicalBytes = counters
.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES);
if (physicalBytes != null) {
splitsBlock.getProgressPhysicalMemoryKbytes().extend(newProgress,
(int) (physicalBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
}
}
}
@SuppressWarnings({ "unchecked" })
private void logAttemptFinishedEvent(TaskAttemptStateInternal state) {
//Log finished events only if an attempt started.
if (getLaunchTime() == 0) return;
if (attemptId.getTaskId().getTaskType() == TaskType.MAP) {
MapAttemptFinishedEvent mfe =
new MapAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
state.toString(),
this.reportedStatus.mapFinishTime,
finishTime,
this.containerNodeId == null ? "UNKNOWN"
: this.containerNodeId.getHost(),
this.containerNodeId == null ? -1 : this.containerNodeId.getPort(),
this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
this.reportedStatus.stateString,
getCounters(),
getProgressSplitBlock().burst());
eventHandler.handle(
new JobHistoryEvent(attemptId.getTaskId().getJobId(), mfe));
} else {
ReduceAttemptFinishedEvent rfe =
new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
state.toString(),
this.reportedStatus.shuffleFinishTime,
this.reportedStatus.sortFinishTime,
finishTime,
this.containerNodeId == null ? "UNKNOWN"
: this.containerNodeId.getHost(),
this.containerNodeId == null ? -1 : this.containerNodeId.getPort(),
this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
this.reportedStatus.stateString,
getCounters(),
getProgressSplitBlock().burst());
eventHandler.handle(
new JobHistoryEvent(attemptId.getTaskId().getJobId(), rfe));
}
}
private void maybeSendSpeculatorContainerRequest() {
if (!speculatorContainerRequestSent) {
sendEvent(new SpeculatorEvent(taskId, +1));
speculatorContainerRequestSent = true;
}
}
private void maybeSendSpeculatorContainerRelease() {
if (speculatorContainerRequestSent) {
sendEvent(new SpeculatorEvent(taskId, -1));
speculatorContainerRequestSent = false;
}
}
private void sendTaskAttemptCleanupEvent() {
TaskAttemptContext taContext = new TaskAttemptContextImpl(this.conf,
TypeConverter.fromYarn(this.attemptId));
sendEvent(new TaskCleanupEvent(this.attemptId, this.committer, taContext));
}
protected SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
createScheduleTransition() {
return new ScheduleTaskattempt();
}
protected static class ScheduleTaskattempt implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
TaskAttemptScheduleEvent scheduleEvent = (TaskAttemptScheduleEvent) event;
// Event to speculator - containerNeeded++
// TODO How does the speculator handle this.. should it be going out from
// the container instead.
ta.maybeSendSpeculatorContainerRequest();
// TODO XXX: Creating the remote task here may not be required in case of recovery.
// Could be solved by having the Scheduler pull the RemoteTask.
// Create the remote task.
org.apache.hadoop.mapred.Task remoteTask = ta.createRemoteTask();
// Create startTaskRequest
String[] hostArray;
String[] rackArray;
if (scheduleEvent.isRescheduled()) {
// No node/rack locality.
hostArray = new String[0];
rackArray = new String[0];
} else {
// Ask for node / rack locality.
Set<String> racks = new HashSet<String>();
for (String host : ta.dataLocalHosts) {
racks.add(RackResolver.resolve(host).getNetworkLocation());
}
hostArray = TaskAttemptImplHelpers.resolveHosts(ta.dataLocalHosts);
rackArray = racks.toArray(new String[racks.size()]);
}
// Send out a launch request to the scheduler.
AMSchedulerTALaunchRequestEvent launchRequestEvent = new AMSchedulerTALaunchRequestEvent(
ta.attemptId, scheduleEvent.isRescheduled(), ta.resourceCapability,
remoteTask, ta, ta.credentials, ta.jobToken, hostArray, rackArray);
ta.sendEvent(launchRequestEvent);
}
}
protected SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
createDiagnosticUpdateTransition() {
return new DiagnosticInformationUpdater();
}
protected static class DiagnosticInformationUpdater implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
TaskAttemptDiagnosticsUpdateEvent diagEvent =
(TaskAttemptDiagnosticsUpdateEvent) event;
if (LOG.isDebugEnabled()) {
LOG.debug("Diagnostics update for " + ta.attemptId + ": "
+ diagEvent.getDiagnosticInfo());
}
ta.addDiagnosticInfo(diagEvent.getDiagnosticInfo());
}
}
private void addDiagnosticInfo(String diag) {
if (diag != null && !diag.equals("")) {
diagnostics.add(diag);
}
}
protected SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
createFailRequestTransition() {
return new FailRequest();
}
// TODO XXX: FailRequest == KillRequest.
protected static class FailRequest implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
// TODO XXX -> Remove this comment after getting to CompletedEvent processing.
// TODO XXX move this out into a helper method. CompletedEvents should not
// be extending Failed events.
//set the finish time
ta.setFinishTime();
ta.sendEvent(createJobCounterUpdateEventTATerminated(ta, false,
TaskAttemptStateInternal.FAILED));
if (ta.getLaunchTime() != 0) {
// TODO XXX: For cases like this, recovery goes for a toss, since the the attempt will not exist in the history file.
ta.sendEvent(new JobHistoryEvent(ta.jobId,
createTaskAttemptUnsuccessfulCompletionEvent(ta,
TaskAttemptStateInternal.FAILED)));
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Not generating HistoryFinish event since start event not " +
"generated for taskAttempt: " + ta.getID());
}
}
// Send out events to the Task - indicating TaskAttemptFailure.
ta.sendEvent(new TaskTAttemptEvent(ta.attemptId,
TaskEventType.T_ATTEMPT_FAILED));
}
}
protected SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
createKillRequestTransition() {
return new KillRequest();
}
// TODO: Identical to TAFailRequest except for the states.. Merge together.
protected static class KillRequest implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
//set the finish time
ta.setFinishTime();
ta.sendEvent(createJobCounterUpdateEventTATerminated(ta, false,
TaskAttemptStateInternal.KILLED));
if (ta.getLaunchTime() != 0) {
ta.sendEvent(new JobHistoryEvent(ta.jobId,
createTaskAttemptUnsuccessfulCompletionEvent(ta,
TaskAttemptStateInternal.KILLED)));
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Not generating HistoryFinish event since start event not " +
"generated for taskAttempt: " + ta.getID());
}
}
// Send out events to the Task - indicating TaskAttemptFailure.
ta.sendEvent(new TaskTAttemptEvent(ta.attemptId,
TaskEventType.T_ATTEMPT_KILLED));
}
}
protected SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
createStartedTransition() {
return new Started();
}
protected static class Started implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent origEvent) {
TaskAttemptRemoteStartEvent event = (TaskAttemptRemoteStartEvent) origEvent;
// TODO XXX What all changes here after CLC construction is done. Remove TODOs after that.
Container container = ta.appContext.getAllContainers()
.get(event.getContainerId()).getContainer();
ta.containerId = event.getContainerId();
ta.containerNodeId = container.getNodeId();
ta.containerMgrAddress = ta.containerNodeId.toString();
ta.nodeHttpAddress = container.getNodeHttpAddress();
ta.nodeRackName = RackResolver.resolve(ta.containerNodeId.getHost())
.getNetworkLocation();
// TODO ContainerToken not required in TA.
// TODO assignedCapability not required in TA.
// TODO jvmId only required if TAL registration happens here.
// TODO Anything to be done with the TaskAttemptListener ? or is that in
// the Container.
ta.launchTime = ta.clock.getTime();
ta.shufflePort = event.getShufflePort();
// TODO Resolve to host / IP in case of a local address.
InetSocketAddress nodeHttpInetAddr = NetUtils
.createSocketAddr(ta.nodeHttpAddress); // TODO: Costly?
ta.trackerName = nodeHttpInetAddr.getHostName();
ta.httpPort = nodeHttpInetAddr.getPort();
ta.sendEvent(createJobCounterUpdateEventTALaunched(ta));
LOG.info("TaskAttempt: [" + ta.attemptId+ "] started."
+ " Is using containerId: [" + ta.containerId + "]"
+ " on NM: [" + ta.containerMgrAddress + "]");
// JobHistoryEvent
ta.sendEvent(ta.createTaskAttemptStartedEvent());
// Inform the speculator about the container assignment.
ta.maybeSendSpeculatorContainerRelease();
// Inform speculator about startTime
ta.sendEvent(new SpeculatorEvent(ta.attemptId, true, ta.launchTime));
// Inform the Task
ta.sendEvent(new TaskTAttemptEvent(ta.attemptId,
TaskEventType.T_ATTEMPT_LAUNCHED));
ta.taskHeartbeatHandler.register(ta.attemptId);
}
}
protected SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
createFailRequestBeforeRunningTransition() {
return new FailRequestBeforeRunning();
}
// TODO XXX: Rename FailRequest / KillRequest*BEFORE*Running
// TODO Again, can failReqeust and KillRequest be merged ?
protected static class FailRequestBeforeRunning extends FailRequest {
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
super.transition(ta, event);
// Inform the scheduler
ta.sendEvent(new AMSchedulerEventTAEnded(ta.attemptId,
TaskAttemptState.FAILED));
// Decrement speculator container request.
ta.maybeSendSpeculatorContainerRelease();
// TODO XXX. AnyCounterUpdates ? KILL/FAIL count. Slot_Millis, etc
}
}
protected SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
createKillRequestBeforeRunningTransition() {
return new KillRequestBeforeRunning();
}
// TODO XXX: Identical to FailRequestWhileRunning except for states.
protected static class KillRequestBeforeRunning extends KillRequest {
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
// XXX Remove Comment: Takes care of finish time, history, TaskEvent.
super.transition(ta, event);
// Inform the scheduler
ta.sendEvent(new AMSchedulerEventTAEnded(ta.attemptId,
TaskAttemptState.KILLED));
// Decrement speculator container request.
ta.maybeSendSpeculatorContainerRelease();
// TODO XXX. AnyCounterUpdates ? KILL/FAIL count. Slot_Millis, etc
}
}
protected SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
createNodeFailedBeforeRunningTransition() {
return new NodeFailedBeforeRunning();
}
protected static class NodeFailedBeforeRunning extends
KillRequestBeforeRunning {
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event;
ta.addDiagnosticInfo(nfEvent.getDiagnosticInfo());
}
}
protected SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
createContainerTerminatingBeforeRunningTransition() {
return new ContainerTerminatingBeforeRunning();
}
protected static class ContainerTerminatingBeforeRunning extends
FailRequestBeforeRunning {
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
super.transition(ta, event);
TaskAttemptEventContainerTerminating tEvent =
(TaskAttemptEventContainerTerminating) event;
ta.addDiagnosticInfo(tEvent.getDiagnosticInfo());
}
}
protected SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
createContainerCompletedBeforeRunningTransition() {
return new ContainerCompletedBeforeRunning();
}
protected static class ContainerCompletedBeforeRunning extends
FailRequestBeforeRunning {
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
super.transition(ta, event);
ta.sendTaskAttemptCleanupEvent();
TaskAttemptEventContainerTerminated tEvent =
(TaskAttemptEventContainerTerminated) event;
ta.addDiagnosticInfo(tEvent.getDiagnosticInfo());
// TODO XXX: Maybe other counters: Failed, Killed, etc.
}
}
protected static SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
createStatusUpdateTransition() {
return new StatusUpdater();
}
protected static class StatusUpdater implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
TaskAttemptStatus newReportedStatus = ((TaskAttemptStatusUpdateEvent) event)
.getReportedTaskAttemptStatus();
ta.reportedStatus = newReportedStatus;
ta.reportedStatus.taskState = ta.getState();
// Inform speculator of status.
ta.sendEvent(new SpeculatorEvent(ta.reportedStatus, ta.clock.getTime()));
ta.updateProgressSplits();
// Inform the job about fetch failures if they exist.
if (ta.reportedStatus.fetchFailedMaps != null
&& ta.reportedStatus.fetchFailedMaps.size() > 0) {
ta.sendEvent(new JobTaskAttemptFetchFailureEvent(ta.attemptId,
ta.reportedStatus.fetchFailedMaps));
}
// TODO at some point. Nodes may be interested in FetchFailure info.
// Can be used to blacklist nodes.
}
}
protected SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
createCommitPendingTransition() {
return new CommitPendingHandler();
}
protected static class CommitPendingHandler implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
// Inform the task that the attempt wants to commit.
ta.sendEvent(new TaskTAttemptEvent(ta.attemptId,
TaskEventType.T_ATTEMPT_COMMIT_PENDING));
}
}
protected SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
createSucceededTransition() {
return new Succeeded();
}
protected static class Succeeded implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
//Inform the speculator. Generate history event. Update counters.
ta.setFinishTime();
ta.sendEvent(new SpeculatorEvent(ta.reportedStatus, ta.finishTime));
ta.logAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED);
ta.sendEvent(createJobCounterUpdateEventSlotMillis(ta));
// Inform the Scheduler.
ta.sendEvent(new AMSchedulerEventTAEnded(ta.attemptId,
TaskAttemptState.SUCCEEDED));
// Inform the task.
ta.sendEvent(new TaskTAttemptEvent(ta.attemptId,
TaskEventType.T_ATTEMPT_SUCCEEDED));
//Unregister from the TaskHeartbeatHandler.
ta.taskHeartbeatHandler.unregister(ta.attemptId);
// TODO maybe. For reuse ... Stacking pulls for a reduce task, even if the
// TA finishes independently. // Will likely be the Job's responsibility.
}
}
protected SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
createFailRequestWhileRunningTransition() {
return new FailRequestWhileRunning();
}
protected static class FailRequestWhileRunning extends
FailRequestBeforeRunning {
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
super.transition(ta, event);
ta.taskHeartbeatHandler.unregister(ta.attemptId);
// TODO Speculator does not need to go out. maybeSend... will take care of this for now.
}
}
// TODO XXX: Remove and merge with FailRequestWhileRunning
protected SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
createKillRequestWhileRunningTransition() {
return new KillRequestWhileRunning();
}
protected static class KillRequestWhileRunning extends
KillRequestBeforeRunning {
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
super.transition(ta, event);
ta.taskHeartbeatHandler.unregister(ta.attemptId);
// TODO Speculator does not need to go out. maybeSend... will take care of this for now.
}
}
protected SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
createNodeFailedWhileRunningTransition() {
return new NodeFailedWhileRunning();
}
protected static class NodeFailedWhileRunning extends FailRequestWhileRunning {
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event;
ta.addDiagnosticInfo(nfEvent.getDiagnosticInfo());
}
}
protected SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
createContainerTerminatingWhileRunningTransition() {
return new ContainerTerminatingWhileRunning();
}
protected static class ContainerTerminatingWhileRunning extends
FailRequestWhileRunning {
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
super.transition(ta, event);
TaskAttemptEventContainerTerminating tEvent =
(TaskAttemptEventContainerTerminating) event;
ta.addDiagnosticInfo(tEvent.getDiagnosticInfo());
}
}
protected SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
createContainerCompletedWhileRunningTransition() {
return new ContaienrCompletedWhileRunning();
}
protected static class ContaienrCompletedWhileRunning extends
FailRequestWhileRunning {
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
super.transition(ta, event);
ta.sendTaskAttemptCleanupEvent();
TaskAttemptEventContainerTerminated tEvent =
(TaskAttemptEventContainerTerminated) event;
ta.addDiagnosticInfo(tEvent.getDiagnosticInfo());
}
}
protected SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
createTerminatedTransition() {
return new Terminated();
}
protected static class Terminated implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
ta.sendTaskAttemptCleanupEvent();
TaskAttemptEventContainerTerminated tEvent =
(TaskAttemptEventContainerTerminated) event;
ta.addDiagnosticInfo(tEvent.getDiagnosticInfo());
}
}
protected SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
createFailRequestAfterSuccessTransition() {
return new FailRequestAfterSuccess();
}
protected static class FailRequestAfterSuccess extends FailRequest {
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
super.transition(ta, event);
ta.sendTaskAttemptCleanupEvent();
// XXX: This may need some additional handling.
// TaskAttempt failed after SUCCESS -> the container should also be STOPPED if it's still RUNNING.
// Inform the Scheduler.
// XXX: Also maybe change the ougoing Task Attempt to be FETCH_FAILURE related.
// TODO XXX: Any counter updates.
}
}
protected SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
createKillRequestAfterSuccessTransition() {
return new KillRequestAfterSuccess();
}
protected static class KillRequestAfterSuccess extends KillRequest {
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
super.transition(ta, event);
// TODO XXX (Comments Not really required) Check for this being a MAP task only. Otherwise ignore it.
//... It may be possible for the event to come in for a REDUCE task, since
// this event and the DONE event are generated in separate threads. Ignore
// in that case.
// TODO Handle diagnostics info.
ta.sendTaskAttemptCleanupEvent();
}
}
protected SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
createNodeFailedAfterSuccessTransition() {
return new NodeFailedAfterSuccess();
}
protected static class NodeFailedAfterSuccess extends KillRequestAfterSuccess {
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event;
ta.addDiagnosticInfo(nfEvent.getDiagnosticInfo());
}
}
protected SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
createTooManyFetchFailuresTransition() {
return new TooManyFetchFailures();
}
protected static class TooManyFetchFailures extends FailRequestAfterSuccess {
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
// TODO Maybe change this to send out a TaskEvent.TOO_MANY_FETCH_FAILURES.
ta.addDiagnosticInfo("Too many fetch failures. Failing the attempt");
super.transition(ta, event);
}
}
private void initTaskAttemptStatus(TaskAttemptStatus result) {
result.progress = 0.0f;
result.phase = Phase.STARTING;
result.stateString = "NEW";
result.taskState = TaskAttemptState.NEW;
Counters counters = EMPTY_COUNTERS;
// counters.groups = new HashMap<String, CounterGroup>();
result.counters = counters;
}
// TODO Consolidate all the Failed / Killed methods which only differ in state.
// Move some of the functionality out to helpers, instead of extending non-related event classes.
// TODO. The transition classes / methods may need to be public for testing.
// Leaving the return type as SingleArcTransition - so that extension is not required, when testing.
// Extension doesn't help anything iac.
// TODO Can all these create* methods be made more generic...
}