blob: 228ae24a955d4eb079dc83f37522ca309245cd21 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.job.impl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.api.records.Avataar;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobMapTaskRescheduledEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import com.google.common.annotations.VisibleForTesting;
/**
* Implementation of Task interface.
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
private static final Log LOG = LogFactory.getLog(TaskImpl.class);
private static final String SPECULATION = "Speculation: ";
protected final JobConf conf;
protected final Path jobFile;
protected final int partition;
protected final TaskAttemptListener taskAttemptListener;
protected final EventHandler eventHandler;
private final TaskId taskId;
private Map<TaskAttemptId, TaskAttempt> attempts;
private final int maxAttempts;
protected final Clock clock;
private final Lock readLock;
private final Lock writeLock;
private final MRAppMetrics metrics;
protected final AppContext appContext;
private long scheduledTime;
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
protected boolean encryptedShuffle;
protected Credentials credentials;
protected Token<JobTokenIdentifier> jobToken;
//should be set to one which comes first
//saying COMMIT_PENDING
private TaskAttemptId commitAttempt;
private TaskAttemptId successfulAttempt;
private final Set<TaskAttemptId> failedAttempts;
// Track the finished attempts - successful, failed and killed
private final Set<TaskAttemptId> finishedAttempts;
// counts the number of attempts that are either running or in a state where
// they will come to be running when they get a Container
private final Set<TaskAttemptId> inProgressAttempts;
private boolean historyTaskStartGenerated = false;
// Launch time reported in history events.
private long launchTime;
private static final SingleArcTransition<TaskImpl, TaskEvent>
ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
private static final SingleArcTransition<TaskImpl, TaskEvent>
KILL_TRANSITION = new KillTransition();
private static final StateMachineFactory
<TaskImpl, TaskStateInternal, TaskEventType, TaskEvent>
stateMachineFactory
= new StateMachineFactory<TaskImpl, TaskStateInternal, TaskEventType, TaskEvent>
(TaskStateInternal.NEW)
// define the state machine of Task
// Transitions from NEW state
.addTransition(TaskStateInternal.NEW, TaskStateInternal.SCHEDULED,
TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
.addTransition(TaskStateInternal.NEW, TaskStateInternal.KILLED,
TaskEventType.T_KILL, new KillNewTransition())
.addTransition(TaskStateInternal.NEW,
EnumSet.of(TaskStateInternal.FAILED,
TaskStateInternal.KILLED,
TaskStateInternal.RUNNING,
TaskStateInternal.SUCCEEDED),
TaskEventType.T_RECOVER, new RecoverTransition())
// Transitions from SCHEDULED state
//when the first attempt is launched, the task state is set to RUNNING
.addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.RUNNING,
TaskEventType.T_ATTEMPT_LAUNCHED, new LaunchTransition())
.addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.KILL_WAIT,
TaskEventType.T_KILL, KILL_TRANSITION)
.addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.SCHEDULED,
TaskEventType.T_ATTEMPT_KILLED, ATTEMPT_KILLED_TRANSITION)
.addTransition(TaskStateInternal.SCHEDULED,
EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.FAILED),
TaskEventType.T_ATTEMPT_FAILED,
new AttemptFailedTransition())
// Transitions from RUNNING state
.addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
TaskEventType.T_ATTEMPT_LAUNCHED) //more attempts may start later
.addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
TaskEventType.T_ATTEMPT_COMMIT_PENDING,
new AttemptCommitPendingTransition())
.addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
TaskEventType.T_ADD_SPEC_ATTEMPT, new RedundantScheduleTransition())
.addTransition(TaskStateInternal.RUNNING, TaskStateInternal.SUCCEEDED,
TaskEventType.T_ATTEMPT_SUCCEEDED,
new AttemptSucceededTransition())
.addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
TaskEventType.T_ATTEMPT_KILLED,
ATTEMPT_KILLED_TRANSITION)
.addTransition(TaskStateInternal.RUNNING,
EnumSet.of(TaskStateInternal.RUNNING, TaskStateInternal.FAILED),
TaskEventType.T_ATTEMPT_FAILED,
new AttemptFailedTransition())
.addTransition(TaskStateInternal.RUNNING, TaskStateInternal.KILL_WAIT,
TaskEventType.T_KILL, KILL_TRANSITION)
// Transitions from KILL_WAIT state
.addTransition(TaskStateInternal.KILL_WAIT,
EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
TaskEventType.T_ATTEMPT_KILLED,
new KillWaitAttemptKilledTransition())
.addTransition(TaskStateInternal.KILL_WAIT,
EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
TaskEventType.T_ATTEMPT_SUCCEEDED,
new KillWaitAttemptSucceededTransition())
.addTransition(TaskStateInternal.KILL_WAIT,
EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
TaskEventType.T_ATTEMPT_FAILED,
new KillWaitAttemptFailedTransition())
// Ignore-able transitions.
.addTransition(
TaskStateInternal.KILL_WAIT,
TaskStateInternal.KILL_WAIT,
EnumSet.of(TaskEventType.T_KILL,
TaskEventType.T_ATTEMPT_LAUNCHED,
TaskEventType.T_ATTEMPT_COMMIT_PENDING,
TaskEventType.T_ADD_SPEC_ATTEMPT))
// Transitions from SUCCEEDED state
.addTransition(TaskStateInternal.SUCCEEDED,
EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.SUCCEEDED, TaskStateInternal.FAILED),
TaskEventType.T_ATTEMPT_FAILED, new RetroactiveFailureTransition())
.addTransition(TaskStateInternal.SUCCEEDED,
EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.SUCCEEDED),
TaskEventType.T_ATTEMPT_KILLED, new RetroactiveKilledTransition())
.addTransition(TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
TaskEventType.T_ATTEMPT_SUCCEEDED,
new AttemptSucceededAtSucceededTransition())
// Ignore-able transitions.
.addTransition(
TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
EnumSet.of(TaskEventType.T_ADD_SPEC_ATTEMPT,
TaskEventType.T_ATTEMPT_COMMIT_PENDING,
TaskEventType.T_ATTEMPT_LAUNCHED,
TaskEventType.T_KILL))
// Transitions from FAILED state
.addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
EnumSet.of(TaskEventType.T_KILL,
TaskEventType.T_ADD_SPEC_ATTEMPT,
TaskEventType.T_ATTEMPT_COMMIT_PENDING,
TaskEventType.T_ATTEMPT_FAILED,
TaskEventType.T_ATTEMPT_KILLED,
TaskEventType.T_ATTEMPT_LAUNCHED,
TaskEventType.T_ATTEMPT_SUCCEEDED))
// Transitions from KILLED state
// There could be a race condition where TaskImpl might receive
// T_ATTEMPT_SUCCEEDED followed by T_ATTEMPTED_KILLED for the same attempt.
// a. The task is in KILL_WAIT.
// b. Before TA transitions to SUCCEEDED state, Task sends TA_KILL event.
// c. TA transitions to SUCCEEDED state and thus send T_ATTEMPT_SUCCEEDED
// to the task. The task transitions to KILLED state.
// d. TA processes TA_KILL event and sends T_ATTEMPT_KILLED to the task.
.addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED,
EnumSet.of(TaskEventType.T_KILL,
TaskEventType.T_SCHEDULE,
TaskEventType.T_ATTEMPT_KILLED,
TaskEventType.T_ADD_SPEC_ATTEMPT))
// create the topology tables
.installTopology();
private final StateMachine<TaskStateInternal, TaskEventType, TaskEvent>
stateMachine;
// By default, the next TaskAttempt number is zero. Changes during recovery
protected int nextAttemptNumber = 0;
// For sorting task attempts by completion time
private static final Comparator<TaskAttemptInfo> TA_INFO_COMPARATOR =
new Comparator<TaskAttemptInfo>() {
@Override
public int compare(TaskAttemptInfo a, TaskAttemptInfo b) {
long diff = a.getFinishTime() - b.getFinishTime();
return diff == 0 ? 0 : (diff < 0 ? -1 : 1);
}
};
@Override
public TaskState getState() {
readLock.lock();
try {
return getExternalState(getInternalState());
} finally {
readLock.unlock();
}
}
public TaskImpl(JobId jobId, TaskType taskType, int partition,
EventHandler eventHandler, Path remoteJobConfFile, JobConf conf,
TaskAttemptListener taskAttemptListener,
Token<JobTokenIdentifier> jobToken,
Credentials credentials, Clock clock,
int appAttemptId, MRAppMetrics metrics, AppContext appContext) {
this.conf = conf;
this.clock = clock;
this.jobFile = remoteJobConfFile;
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
readLock = readWriteLock.readLock();
writeLock = readWriteLock.writeLock();
this.attempts = Collections.emptyMap();
this.finishedAttempts = new HashSet<TaskAttemptId>(2);
this.failedAttempts = new HashSet<TaskAttemptId>(2);
this.inProgressAttempts = new HashSet<TaskAttemptId>(2);
// This overridable method call is okay in a constructor because we
// have a convention that none of the overrides depends on any
// fields that need initialization.
maxAttempts = getMaxAttempts();
taskId = MRBuilderUtils.newTaskId(jobId, partition, taskType);
this.partition = partition;
this.taskAttemptListener = taskAttemptListener;
this.eventHandler = eventHandler;
this.credentials = credentials;
this.jobToken = jobToken;
this.metrics = metrics;
this.appContext = appContext;
this.encryptedShuffle = conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT);
// This "this leak" is okay because the retained pointer is in an
// instance variable.
stateMachine = stateMachineFactory.make(this);
// All the new TaskAttemptIDs are generated based on MR
// ApplicationAttemptID so that attempts from previous lives don't
// over-step the current one. This assumes that a task won't have more
// than 1000 attempts in its single generation, which is very reasonable.
nextAttemptNumber = (appAttemptId - 1) * 1000;
}
@Override
public Map<TaskAttemptId, TaskAttempt> getAttempts() {
readLock.lock();
try {
if (attempts.size() <= 1) {
return attempts;
}
Map<TaskAttemptId, TaskAttempt> result
= new LinkedHashMap<TaskAttemptId, TaskAttempt>();
result.putAll(attempts);
return result;
} finally {
readLock.unlock();
}
}
@Override
public TaskAttempt getAttempt(TaskAttemptId attemptID) {
readLock.lock();
try {
return attempts.get(attemptID);
} finally {
readLock.unlock();
}
}
@Override
public TaskId getID() {
return taskId;
}
@Override
public boolean isFinished() {
readLock.lock();
try {
// TODO: Use stateMachine level method?
return (getInternalState() == TaskStateInternal.SUCCEEDED ||
getInternalState() == TaskStateInternal.FAILED ||
getInternalState() == TaskStateInternal.KILLED);
} finally {
readLock.unlock();
}
}
@Override
public TaskReport getReport() {
TaskReport report = recordFactory.newRecordInstance(TaskReport.class);
readLock.lock();
try {
TaskAttempt bestAttempt = selectBestAttempt();
report.setTaskId(taskId);
report.setStartTime(getLaunchTime());
report.setFinishTime(getFinishTime());
report.setTaskState(getState());
report.setProgress(bestAttempt == null ? 0f : bestAttempt.getProgress());
report.setStatus(bestAttempt == null
? ""
: bestAttempt.getReport().getStateString());
for (TaskAttempt attempt : attempts.values()) {
if (TaskAttemptState.RUNNING.equals(attempt.getState())) {
report.addRunningAttempt(attempt.getID());
}
}
report.setSuccessfulAttempt(successfulAttempt);
for (TaskAttempt att : attempts.values()) {
String prefix = "AttemptID:" + att.getID() + " Info:";
for (CharSequence cs : att.getDiagnostics()) {
report.addDiagnostics(prefix + cs);
}
}
// Add a copy of counters as the last step so that their lifetime on heap
// is as small as possible.
report.setCounters(TypeConverter.toYarn(bestAttempt == null
? TaskAttemptImpl.EMPTY_COUNTERS
: bestAttempt.getCounters()));
return report;
} finally {
readLock.unlock();
}
}
@Override
public Counters getCounters() {
Counters counters = null;
readLock.lock();
try {
TaskAttempt bestAttempt = selectBestAttempt();
if (bestAttempt != null) {
counters = bestAttempt.getCounters();
} else {
counters = TaskAttemptImpl.EMPTY_COUNTERS;
// counters.groups = new HashMap<CharSequence, CounterGroup>();
}
return counters;
} finally {
readLock.unlock();
}
}
@Override
public float getProgress() {
readLock.lock();
try {
TaskAttempt bestAttempt = selectBestAttempt();
if (bestAttempt == null) {
return 0f;
}
return bestAttempt.getProgress();
} finally {
readLock.unlock();
}
}
@VisibleForTesting
public TaskStateInternal getInternalState() {
readLock.lock();
try {
return stateMachine.getCurrentState();
} finally {
readLock.unlock();
}
}
private static TaskState getExternalState(TaskStateInternal smState) {
if (smState == TaskStateInternal.KILL_WAIT) {
return TaskState.KILLED;
} else {
return TaskState.valueOf(smState.name());
}
}
//this is always called in read/write lock
private long getLaunchTime() {
long taskLaunchTime = 0;
boolean launchTimeSet = false;
for (TaskAttempt at : attempts.values()) {
// select the least launch time of all attempts
long attemptLaunchTime = at.getLaunchTime();
if (attemptLaunchTime != 0 && !launchTimeSet) {
// For the first non-zero launch time
launchTimeSet = true;
taskLaunchTime = attemptLaunchTime;
} else if (attemptLaunchTime != 0 && taskLaunchTime > attemptLaunchTime) {
taskLaunchTime = attemptLaunchTime;
}
}
if (!launchTimeSet) {
return this.scheduledTime;
}
return taskLaunchTime;
}
//this is always called in read/write lock
//TODO Verify behaviour is Task is killed (no finished attempt)
private long getFinishTime() {
if (!isFinished()) {
return 0;
}
long finishTime = 0;
for (TaskAttempt at : attempts.values()) {
//select the max finish time of all attempts
if (finishTime < at.getFinishTime()) {
finishTime = at.getFinishTime();
}
}
return finishTime;
}
private long getFinishTime(TaskAttemptId taId) {
if (taId == null) {
return clock.getTime();
}
long finishTime = 0;
for (TaskAttempt at : attempts.values()) {
//select the max finish time of all attempts
if (at.getID().equals(taId)) {
return at.getFinishTime();
}
}
return finishTime;
}
private TaskStateInternal finished(TaskStateInternal finalState) {
if (getInternalState() == TaskStateInternal.RUNNING) {
metrics.endRunningTask(this);
}
return finalState;
}
//select the nextAttemptNumber with best progress
// always called inside the Read Lock
private TaskAttempt selectBestAttempt() {
if (successfulAttempt != null) {
return attempts.get(successfulAttempt);
}
float progress = 0f;
TaskAttempt result = null;
for (TaskAttempt at : attempts.values()) {
switch (at.getState()) {
// ignore all failed task attempts
case FAILED:
case KILLED:
continue;
}
if (result == null) {
result = at; //The first time around
}
// calculate the best progress
float attemptProgress = at.getProgress();
if (attemptProgress > progress) {
result = at;
progress = attemptProgress;
}
}
return result;
}
@Override
public boolean canCommit(TaskAttemptId taskAttemptID) {
readLock.lock();
boolean canCommit = false;
try {
if (commitAttempt != null) {
canCommit = taskAttemptID.equals(commitAttempt);
LOG.info("Result of canCommit for " + taskAttemptID + ":" + canCommit);
}
} finally {
readLock.unlock();
}
return canCommit;
}
protected abstract TaskAttemptImpl createAttempt();
// No override of this method may require that the subclass be initialized.
protected abstract int getMaxAttempts();
protected TaskAttempt getSuccessfulAttempt() {
readLock.lock();
try {
if (null == successfulAttempt) {
return null;
}
return attempts.get(successfulAttempt);
} finally {
readLock.unlock();
}
}
// This is always called in the Write Lock
private void addAndScheduleAttempt(Avataar avataar) {
addAndScheduleAttempt(avataar, false);
}
// This is always called in the Write Lock
private void addAndScheduleAttempt(Avataar avataar, boolean reschedule) {
TaskAttempt attempt = addAttempt(avataar);
inProgressAttempts.add(attempt.getID());
//schedule the nextAttemptNumber
if (failedAttempts.size() > 0 || reschedule) {
eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
TaskAttemptEventType.TA_RESCHEDULE));
} else {
eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
TaskAttemptEventType.TA_SCHEDULE));
}
}
private TaskAttemptImpl addAttempt(Avataar avataar) {
TaskAttemptImpl attempt = createAttempt();
attempt.setAvataar(avataar);
if (LOG.isDebugEnabled()) {
LOG.debug("Created attempt " + attempt.getID());
}
switch (attempts.size()) {
case 0:
attempts = Collections.singletonMap(attempt.getID(),
(TaskAttempt) attempt);
break;
case 1:
Map<TaskAttemptId, TaskAttempt> newAttempts
= new LinkedHashMap<TaskAttemptId, TaskAttempt>(maxAttempts);
newAttempts.putAll(attempts);
attempts = newAttempts;
attempts.put(attempt.getID(), attempt);
break;
default:
attempts.put(attempt.getID(), attempt);
break;
}
++nextAttemptNumber;
return attempt;
}
@Override
public void handle(TaskEvent event) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing " + event.getTaskID() + " of type "
+ event.getType());
}
try {
writeLock.lock();
TaskStateInternal oldState = getInternalState();
try {
stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitionException e) {
LOG.error("Can't handle this event at current state for "
+ this.taskId, e);
internalError(event.getType());
}
if (oldState != getInternalState()) {
LOG.info(taskId + " Task Transitioned from " + oldState + " to "
+ getInternalState());
}
} finally {
writeLock.unlock();
}
}
protected void internalError(TaskEventType type) {
LOG.error("Invalid event " + type + " on Task " + this.taskId);
eventHandler.handle(new JobDiagnosticsUpdateEvent(
this.taskId.getJobId(), "Invalid event " + type +
" on Task " + this.taskId));
eventHandler.handle(new JobEvent(this.taskId.getJobId(),
JobEventType.INTERNAL_ERROR));
}
// always called inside a transition, in turn inside the Write Lock
private void handleTaskAttemptCompletion(TaskAttemptId attemptId,
TaskAttemptCompletionEventStatus status) {
TaskAttempt attempt = attempts.get(attemptId);
//raise the completion event only if the container is assigned
// to nextAttemptNumber
if (attempt.getNodeHttpAddress() != null) {
TaskAttemptCompletionEvent tce = recordFactory
.newRecordInstance(TaskAttemptCompletionEvent.class);
tce.setEventId(-1);
String scheme = (encryptedShuffle) ? "https://" : "http://";
tce.setMapOutputServerAddress(StringInterner.weakIntern(scheme
+ attempt.getNodeHttpAddress().split(":")[0] + ":"
+ attempt.getShufflePort()));
tce.setStatus(status);
tce.setAttemptId(attempt.getID());
int runTime = 0;
if (attempt.getFinishTime() != 0 && attempt.getLaunchTime() !=0)
runTime = (int)(attempt.getFinishTime() - attempt.getLaunchTime());
tce.setAttemptRunTime(runTime);
//raise the event to job so that it adds the completion event to its
//data structures
eventHandler.handle(new JobTaskAttemptCompletedEvent(tce));
}
}
private void sendTaskStartedEvent() {
launchTime = getLaunchTime();
TaskStartedEvent tse = new TaskStartedEvent(
TypeConverter.fromYarn(taskId), launchTime,
TypeConverter.fromYarn(taskId.getTaskType()),
getSplitsAsString());
eventHandler
.handle(new JobHistoryEvent(taskId.getJobId(), tse));
historyTaskStartGenerated = true;
}
private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task,
TaskStateInternal taskState) {
TaskFinishedEvent tfe =
new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
TypeConverter.fromYarn(task.successfulAttempt),
task.getFinishTime(task.successfulAttempt),
TypeConverter.fromYarn(task.taskId.getTaskType()),
taskState.toString(), task.getCounters(), task.launchTime);
return tfe;
}
private static TaskFailedEvent createTaskFailedEvent(TaskImpl task,
List<String> diag, TaskStateInternal taskState, TaskAttemptId taId) {
StringBuilder errorSb = new StringBuilder();
if (diag != null) {
for (String d : diag) {
errorSb.append(", ").append(d);
}
}
TaskFailedEvent taskFailedEvent = new TaskFailedEvent(
TypeConverter.fromYarn(task.taskId),
// Hack since getFinishTime needs isFinished to be true and that doesn't happen till after the transition.
task.getFinishTime(taId),
TypeConverter.fromYarn(task.getType()),
errorSb.toString(),
taskState.toString(),
taId == null ? null : TypeConverter.fromYarn(taId),
task.getCounters(), task.launchTime);
return taskFailedEvent;
}
private static void unSucceed(TaskImpl task) {
task.commitAttempt = null;
task.successfulAttempt = null;
}
private void sendTaskSucceededEvents() {
eventHandler.handle(new JobTaskEvent(taskId, TaskState.SUCCEEDED));
LOG.info("Task succeeded with attempt " + successfulAttempt);
if (historyTaskStartGenerated) {
TaskFinishedEvent tfe = createTaskFinishedEvent(this,
TaskStateInternal.SUCCEEDED);
eventHandler.handle(new JobHistoryEvent(taskId.getJobId(), tfe));
}
}
/**
* @return a String representation of the splits.
*
* Subclasses can override this method to provide their own representations
* of splits (if any).
*
*/
protected String getSplitsAsString(){
return "";
}
/**
* Recover a completed task from a previous application attempt
* @param taskInfo recovered info about the task
* @param recoverTaskOutput whether to recover task outputs
* @return state of the task after recovery
*/
private TaskStateInternal recover(TaskInfo taskInfo,
OutputCommitter committer, boolean recoverTaskOutput) {
LOG.info("Recovering task " + taskId
+ " from prior app attempt, status was " + taskInfo.getTaskStatus());
scheduledTime = taskInfo.getStartTime();
sendTaskStartedEvent();
Collection<TaskAttemptInfo> attemptInfos =
taskInfo.getAllTaskAttempts().values();
if (attemptInfos.size() > 0) {
metrics.launchedTask(this);
}
// recover the attempts for this task in the order they finished
// so task attempt completion events are ordered properly
int savedNextAttemptNumber = nextAttemptNumber;
ArrayList<TaskAttemptInfo> taInfos =
new ArrayList<TaskAttemptInfo>(taskInfo.getAllTaskAttempts().values());
Collections.sort(taInfos, TA_INFO_COMPARATOR);
for (TaskAttemptInfo taInfo : taInfos) {
nextAttemptNumber = taInfo.getAttemptId().getId();
TaskAttemptImpl attempt = addAttempt(Avataar.VIRGIN);
// handle the recovery inline so attempts complete before task does
attempt.handle(new TaskAttemptRecoverEvent(attempt.getID(), taInfo,
committer, recoverTaskOutput));
finishedAttempts.add(attempt.getID());
TaskAttemptCompletionEventStatus taces = null;
TaskAttemptState attemptState = attempt.getState();
switch (attemptState) {
case FAILED:
taces = TaskAttemptCompletionEventStatus.FAILED;
break;
case KILLED:
taces = TaskAttemptCompletionEventStatus.KILLED;
break;
case SUCCEEDED:
taces = TaskAttemptCompletionEventStatus.SUCCEEDED;
break;
default:
throw new IllegalStateException(
"Unexpected attempt state during recovery: " + attemptState);
}
if (attemptState == TaskAttemptState.FAILED) {
failedAttempts.add(attempt.getID());
if (failedAttempts.size() >= maxAttempts) {
taces = TaskAttemptCompletionEventStatus.TIPFAILED;
}
}
// don't clobber the successful attempt completion event
// TODO: this shouldn't be necessary after MAPREDUCE-4330
if (successfulAttempt == null) {
handleTaskAttemptCompletion(attempt.getID(), taces);
if (attemptState == TaskAttemptState.SUCCEEDED) {
successfulAttempt = attempt.getID();
}
}
}
nextAttemptNumber = savedNextAttemptNumber;
TaskStateInternal taskState = TaskStateInternal.valueOf(
taskInfo.getTaskStatus());
switch (taskState) {
case SUCCEEDED:
if (successfulAttempt != null) {
sendTaskSucceededEvents();
} else {
LOG.info("Missing successful attempt for task " + taskId
+ ", recovering as RUNNING");
// there must have been a fetch failure and the retry wasn't complete
taskState = TaskStateInternal.RUNNING;
metrics.runningTask(this);
addAndScheduleAttempt(Avataar.VIRGIN);
}
break;
case FAILED:
case KILLED:
{
if (taskState == TaskStateInternal.KILLED && attemptInfos.size() == 0) {
metrics.endWaitingTask(this);
}
TaskFailedEvent tfe = new TaskFailedEvent(taskInfo.getTaskId(),
taskInfo.getFinishTime(), taskInfo.getTaskType(),
taskInfo.getError(), taskInfo.getTaskStatus(),
taskInfo.getFailedDueToAttemptId(), taskInfo.getCounters(),
launchTime);
eventHandler.handle(new JobHistoryEvent(taskId.getJobId(), tfe));
eventHandler.handle(
new JobTaskEvent(taskId, getExternalState(taskState)));
break;
}
default:
throw new java.lang.AssertionError("Unexpected recovered task state: "
+ taskState);
}
return taskState;
}
private static class RecoverTransition
implements MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
@Override
public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
TaskRecoverEvent tre = (TaskRecoverEvent) event;
return task.recover(tre.getTaskInfo(), tre.getOutputCommitter(),
tre.getRecoverTaskOutput());
}
}
private static class InitialScheduleTransition
implements SingleArcTransition<TaskImpl, TaskEvent> {
@Override
public void transition(TaskImpl task, TaskEvent event) {
task.addAndScheduleAttempt(Avataar.VIRGIN);
task.scheduledTime = task.clock.getTime();
task.sendTaskStartedEvent();
}
}
// Used when creating a new attempt while one is already running.
// Currently we do this for speculation. In the future we may do this
// for tasks that failed in a way that might indicate application code
// problems, so we can take later failures in parallel and flush the
// job quickly when this happens.
private static class RedundantScheduleTransition
implements SingleArcTransition<TaskImpl, TaskEvent> {
@Override
public void transition(TaskImpl task, TaskEvent event) {
LOG.info("Scheduling a redundant attempt for task " + task.taskId);
task.addAndScheduleAttempt(Avataar.SPECULATIVE);
}
}
private static class AttemptCommitPendingTransition
implements SingleArcTransition<TaskImpl, TaskEvent> {
@Override
public void transition(TaskImpl task, TaskEvent event) {
TaskTAttemptEvent ev = (TaskTAttemptEvent) event;
// The nextAttemptNumber is commit pending, decide on set the commitAttempt
TaskAttemptId attemptID = ev.getTaskAttemptID();
if (task.commitAttempt == null) {
// TODO: validate attemptID
task.commitAttempt = attemptID;
LOG.info(attemptID + " given a go for committing the task output.");
} else {
// Don't think this can be a pluggable decision, so simply raise an
// event for the TaskAttempt to delete its output.
LOG.info(task.commitAttempt
+ " already given a go for committing the task output, so killing "
+ attemptID);
task.eventHandler.handle(new TaskAttemptKillEvent(attemptID,
SPECULATION + task.commitAttempt + " committed first!"));
}
}
}
private static class AttemptSucceededTransition
implements SingleArcTransition<TaskImpl, TaskEvent> {
@Override
public void transition(TaskImpl task, TaskEvent event) {
TaskTAttemptEvent taskTAttemptEvent = (TaskTAttemptEvent) event;
TaskAttemptId taskAttemptId = taskTAttemptEvent.getTaskAttemptID();
task.handleTaskAttemptCompletion(
taskAttemptId,
TaskAttemptCompletionEventStatus.SUCCEEDED);
task.finishedAttempts.add(taskAttemptId);
task.inProgressAttempts.remove(taskAttemptId);
task.successfulAttempt = taskAttemptId;
task.sendTaskSucceededEvents();
for (TaskAttempt attempt : task.attempts.values()) {
if (attempt.getID() != task.successfulAttempt &&
// This is okay because it can only talk us out of sending a
// TA_KILL message to an attempt that doesn't need one for
// other reasons.
!attempt.isFinished()) {
LOG.info("Issuing kill to other attempt " + attempt.getID());
task.eventHandler.handle(new TaskAttemptKillEvent(attempt.getID(),
SPECULATION + task.successfulAttempt + " succeeded first!"));
}
}
task.finished(TaskStateInternal.SUCCEEDED);
}
}
private static class AttemptKilledTransition implements
SingleArcTransition<TaskImpl, TaskEvent> {
@Override
public void transition(TaskImpl task, TaskEvent event) {
TaskAttemptId taskAttemptId =
((TaskTAttemptEvent) event).getTaskAttemptID();
task.handleTaskAttemptCompletion(
taskAttemptId,
TaskAttemptCompletionEventStatus.KILLED);
task.finishedAttempts.add(taskAttemptId);
task.inProgressAttempts.remove(taskAttemptId);
if (task.successfulAttempt == null) {
boolean rescheduleNewAttempt = false;
if (event instanceof TaskTAttemptKilledEvent) {
rescheduleNewAttempt =
((TaskTAttemptKilledEvent)event).getRescheduleAttempt();
}
task.addAndScheduleAttempt(Avataar.VIRGIN, rescheduleNewAttempt);
}
if ((task.commitAttempt != null) && (task.commitAttempt == taskAttemptId)) {
task.commitAttempt = null;
}
}
}
private static class KillWaitAttemptKilledTransition implements
MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
protected TaskStateInternal finalState = TaskStateInternal.KILLED;
protected final TaskAttemptCompletionEventStatus taCompletionEventStatus;
public KillWaitAttemptKilledTransition() {
this(TaskAttemptCompletionEventStatus.KILLED);
}
public KillWaitAttemptKilledTransition(
TaskAttemptCompletionEventStatus taCompletionEventStatus) {
this.taCompletionEventStatus = taCompletionEventStatus;
}
@Override
public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
TaskAttemptId taskAttemptId =
((TaskTAttemptEvent) event).getTaskAttemptID();
task.handleTaskAttemptCompletion(taskAttemptId, taCompletionEventStatus);
task.finishedAttempts.add(taskAttemptId);
// check whether all attempts are finished
if (task.finishedAttempts.size() == task.attempts.size()) {
if (task.historyTaskStartGenerated) {
TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, null,
finalState, null); // TODO JH verify failedAttempt null
task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
taskFailedEvent));
} else {
LOG.debug("Not generating HistoryFinish event since start event not" +
" generated for task: " + task.getID());
}
task.eventHandler.handle(
new JobTaskEvent(task.taskId, getExternalState(finalState)));
return finalState;
}
return task.getInternalState();
}
}
private static class KillWaitAttemptSucceededTransition extends
KillWaitAttemptKilledTransition {
public KillWaitAttemptSucceededTransition() {
super(TaskAttemptCompletionEventStatus.SUCCEEDED);
}
}
private static class KillWaitAttemptFailedTransition extends
KillWaitAttemptKilledTransition {
public KillWaitAttemptFailedTransition() {
super(TaskAttemptCompletionEventStatus.FAILED);
}
}
private static class AttemptFailedTransition implements
MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
@Override
public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
TaskAttemptId taskAttemptId = castEvent.getTaskAttemptID();
task.failedAttempts.add(taskAttemptId);
if (taskAttemptId.equals(task.commitAttempt)) {
task.commitAttempt = null;
}
TaskAttempt attempt = task.attempts.get(taskAttemptId);
if (attempt.getAssignedContainerMgrAddress() != null) {
//container was assigned
task.eventHandler.handle(new ContainerFailedEvent(attempt.getID(),
attempt.getAssignedContainerMgrAddress()));
}
task.finishedAttempts.add(taskAttemptId);
if (task.failedAttempts.size() < task.maxAttempts) {
task.handleTaskAttemptCompletion(
taskAttemptId,
TaskAttemptCompletionEventStatus.FAILED);
// we don't need a new event if we already have a spare
task.inProgressAttempts.remove(taskAttemptId);
if (task.successfulAttempt == null) {
boolean shouldAddNewAttempt = true;
if (task.inProgressAttempts.size() > 0) {
// if not all of the inProgressAttempts are hanging for resource
for (TaskAttemptId attemptId : task.inProgressAttempts) {
if (((TaskAttemptImpl) task.getAttempt(attemptId))
.isContainerAssigned()) {
shouldAddNewAttempt = false;
break;
}
}
}
if (shouldAddNewAttempt) {
task.addAndScheduleAttempt(Avataar.VIRGIN);
}
}
} else {
task.handleTaskAttemptCompletion(
taskAttemptId,
TaskAttemptCompletionEventStatus.TIPFAILED);
// issue kill to all non finished attempts
for (TaskAttempt taskAttempt : task.attempts.values()) {
task.killUnfinishedAttempt
(taskAttempt, "Task has failed. Killing attempt!");
}
task.inProgressAttempts.clear();
if (task.historyTaskStartGenerated) {
TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, attempt.getDiagnostics(),
TaskStateInternal.FAILED, taskAttemptId);
task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
taskFailedEvent));
} else {
LOG.debug("Not generating HistoryFinish event since start event not" +
" generated for task: " + task.getID());
}
task.eventHandler.handle(
new JobTaskEvent(task.taskId, TaskState.FAILED));
return task.finished(TaskStateInternal.FAILED);
}
return getDefaultState(task);
}
protected TaskStateInternal getDefaultState(TaskImpl task) {
return task.getInternalState();
}
}
private static class RetroactiveFailureTransition
extends AttemptFailedTransition {
@Override
public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
if (task.getInternalState() == TaskStateInternal.SUCCEEDED &&
!castEvent.getTaskAttemptID().equals(task.successfulAttempt)) {
// don't allow a different task attempt to override a previous
// succeeded state
task.finishedAttempts.add(castEvent.getTaskAttemptID());
task.inProgressAttempts.remove(castEvent.getTaskAttemptID());
return TaskStateInternal.SUCCEEDED;
}
// a successful REDUCE task should not be overridden
//TODO: consider moving it to MapTaskImpl
if (!TaskType.MAP.equals(task.getType())) {
LOG.error("Unexpected event for REDUCE task " + event.getType());
task.internalError(event.getType());
}
// tell the job about the rescheduling
task.eventHandler.handle(
new JobMapTaskRescheduledEvent(task.taskId));
// super.transition is mostly coded for the case where an
// UNcompleted task failed. When a COMPLETED task retroactively
// fails, we have to let AttemptFailedTransition.transition
// believe that there's no redundancy.
unSucceed(task);
// fake increase in Uncomplete attempts for super.transition
task.inProgressAttempts.add(castEvent.getTaskAttemptID());
return super.transition(task, event);
}
@Override
protected TaskStateInternal getDefaultState(TaskImpl task) {
return TaskStateInternal.SCHEDULED;
}
}
private static class RetroactiveKilledTransition implements
MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
@Override
public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
TaskAttemptId attemptId = null;
if (event instanceof TaskTAttemptEvent) {
TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
attemptId = castEvent.getTaskAttemptID();
if (task.getInternalState() == TaskStateInternal.SUCCEEDED &&
!attemptId.equals(task.successfulAttempt)) {
// don't allow a different task attempt to override a previous
// succeeded state
task.finishedAttempts.add(castEvent.getTaskAttemptID());
task.inProgressAttempts.remove(castEvent.getTaskAttemptID());
return TaskStateInternal.SUCCEEDED;
}
}
// a successful REDUCE task should not be overridden
// TODO: consider moving it to MapTaskImpl
if (!TaskType.MAP.equals(task.getType())) {
LOG.error("Unexpected event for REDUCE task " + event.getType());
task.internalError(event.getType());
}
// successful attempt is now killed. reschedule
// tell the job about the rescheduling
unSucceed(task);
task.handleTaskAttemptCompletion(attemptId,
TaskAttemptCompletionEventStatus.KILLED);
task.eventHandler.handle(new JobMapTaskRescheduledEvent(task.taskId));
// typically we are here because this map task was run on a bad node and
// we want to reschedule it on a different node.
// Depending on whether there are previous failed attempts or not this
// can SCHEDULE or RESCHEDULE the container allocate request. If this
// SCHEDULE's then the dataLocal hosts of this taskAttempt will be used
// from the map splitInfo. So the bad node might be sent as a location
// to the RM. But the RM would ignore that just like it would ignore
// currently pending container requests affinitized to bad nodes.
boolean rescheduleNextTaskAttempt = false;
if (event instanceof TaskTAttemptKilledEvent) {
// Decide whether to reschedule next task attempt. If true, this
// typically indicates that a successful map attempt was killed on an
// unusable node being reported.
rescheduleNextTaskAttempt =
((TaskTAttemptKilledEvent)event).getRescheduleAttempt();
}
task.addAndScheduleAttempt(Avataar.VIRGIN, rescheduleNextTaskAttempt);
return TaskStateInternal.SCHEDULED;
}
}
private static class AttemptSucceededAtSucceededTransition
implements SingleArcTransition<TaskImpl, TaskEvent> {
@Override
public void transition(TaskImpl task, TaskEvent event) {
TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
task.finishedAttempts.add(castEvent.getTaskAttemptID());
task.inProgressAttempts.remove(castEvent.getTaskAttemptID());
}
}
private static class KillNewTransition
implements SingleArcTransition<TaskImpl, TaskEvent> {
@Override
public void transition(TaskImpl task, TaskEvent event) {
if (task.historyTaskStartGenerated) {
TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, null,
TaskStateInternal.KILLED, null); // TODO Verify failedAttemptId is null
task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
taskFailedEvent));
}else {
LOG.debug("Not generating HistoryFinish event since start event not" +
" generated for task: " + task.getID());
}
task.eventHandler.handle(new JobTaskEvent(task.taskId,
getExternalState(TaskStateInternal.KILLED)));
task.metrics.endWaitingTask(task);
}
}
private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg) {
if (attempt != null && !attempt.isFinished()) {
eventHandler.handle(
new TaskAttemptKillEvent(attempt.getID(), logMsg));
}
}
private static class KillTransition
implements SingleArcTransition<TaskImpl, TaskEvent> {
@Override
public void transition(TaskImpl task, TaskEvent event) {
// issue kill to all non finished attempts
for (TaskAttempt attempt : task.attempts.values()) {
task.killUnfinishedAttempt
(attempt, "Task KILL is received. Killing attempt!");
}
task.inProgressAttempts.clear();
}
}
static class LaunchTransition
implements SingleArcTransition<TaskImpl, TaskEvent> {
@Override
public void transition(TaskImpl task, TaskEvent event) {
task.metrics.launchedTask(task);
task.metrics.runningTask(task);
}
}
}