blob: 4ef37731a4eb72d2fd3a37df8090f62a8951e5a0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.SortedRanges.Range;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.net.Node;
/*************************************************************
* TaskInProgress maintains all the info needed for a
* Task in the lifetime of its owning Job. A given Task
* might be speculatively executed or reexecuted, so we
* need a level of indirection above the running-id itself.
* <br>
* A given TaskInProgress contains multiple taskids,
* 0 or more of which might be executing at any one time.
* (That's what allows speculative execution.) A taskid
* is now *never* recycled. A TIP allocates enough taskids
* to account for all the speculation and failures it will
* ever have to handle. Once those are up, the TIP is dead.
* **************************************************************
*/
class TaskInProgress {
static final int MAX_TASK_EXECS = 1;
int maxTaskAttempts = 4;
static final double SPECULATIVE_GAP = 0.2;
static final long SPECULATIVE_LAG = 60 * 1000;
private static final int NUM_ATTEMPTS_PER_RESTART = 1000;
public static final Log LOG = LogFactory.getLog(TaskInProgress.class);
// Defines the TIP
private String jobFile = null;
private final TaskSplitMetaInfo splitInfo;
private int numMaps;
private int partition;
private JobTracker jobtracker;
private TaskID id;
private JobInProgress job;
private final int numSlotsRequired;
// Status of the TIP
private int successEventNumber = -1;
private int numTaskFailures = 0;
private int numKilledTasks = 0;
private double progress = 0;
private String state = "";
private long startTime = 0;
private long execStartTime = 0;
private long execFinishTime = 0;
private int completes = 0;
private boolean failed = false;
private boolean killed = false;
private long maxSkipRecords = 0;
private FailedRanges failedRanges = new FailedRanges();
private volatile boolean skipping = false;
private boolean jobCleanup = false;
private boolean jobSetup = false;
// The 'next' usable taskid of this tip
int nextTaskId = 0;
// The taskid that took this TIP to SUCCESS
private TaskAttemptID successfulTaskId;
// The first taskid of this tip
private TaskAttemptID firstTaskId;
// Map from task Id -> TaskTracker Id, contains tasks that are
// currently runnings
private TreeMap<TaskAttemptID, String> activeTasks = new TreeMap<TaskAttemptID, String>();
// All attempt Ids of this TIP
private TreeSet<TaskAttemptID> tasks = new TreeSet<TaskAttemptID>();
private JobConf conf;
private Map<TaskAttemptID,List<String>> taskDiagnosticData =
new TreeMap<TaskAttemptID,List<String>>();
/**
* Map from taskId -> TaskStatus
*/
private TreeMap<TaskAttemptID,TaskStatus> taskStatuses =
new TreeMap<TaskAttemptID,TaskStatus>();
// Map from taskId -> TaskTracker Id,
// contains cleanup attempts and where they ran, if any
private TreeMap<TaskAttemptID, String> cleanupTasks =
new TreeMap<TaskAttemptID, String>();
private TreeSet<String> machinesWhereFailed = new TreeSet<String>();
private TreeSet<TaskAttemptID> tasksReportedClosed = new TreeSet<TaskAttemptID>();
//list of tasks to kill, <taskid> -> <shouldFail>
private TreeMap<TaskAttemptID, Boolean> tasksToKill = new TreeMap<TaskAttemptID, Boolean>();
//task to commit, <taskattemptid>
private TaskAttemptID taskToCommit;
private Counters counters = new Counters();
private String user;
/**
* Constructor for MapTask
*/
public TaskInProgress(JobID jobid, String jobFile,
TaskSplitMetaInfo split,
JobTracker jobtracker, JobConf conf,
JobInProgress job, int partition,
int numSlotsRequired) {
this.jobFile = jobFile;
this.splitInfo = split;
this.jobtracker = jobtracker;
this.job = job;
this.conf = conf;
this.partition = partition;
this.maxSkipRecords = SkipBadRecords.getMapperMaxSkipRecords(conf);
this.numSlotsRequired = numSlotsRequired;
setMaxTaskAttempts();
init(jobid);
}
/**
* Constructor for ReduceTask
*/
public TaskInProgress(JobID jobid, String jobFile,
int numMaps,
int partition, JobTracker jobtracker, JobConf conf,
JobInProgress job, int numSlotsRequired) {
this.jobFile = jobFile;
this.splitInfo = null;
this.numMaps = numMaps;
this.partition = partition;
this.jobtracker = jobtracker;
this.job = job;
this.conf = conf;
this.maxSkipRecords = SkipBadRecords.getReducerMaxSkipGroups(conf);
this.numSlotsRequired = numSlotsRequired;
setMaxTaskAttempts();
init(jobid);
}
/**
* Set the max number of attempts before we declare a TIP as "failed"
*/
private void setMaxTaskAttempts() {
if (isMapTask()) {
this.maxTaskAttempts = conf.getMaxMapAttempts();
} else {
this.maxTaskAttempts = conf.getMaxReduceAttempts();
}
this.user = job.getUser();
}
/**
* Return the index of the tip within the job, so
* "task_200707121733_1313_0002_m_012345" would return 12345;
* @return int the tip index
*/
public int idWithinJob() {
return partition;
}
public boolean isJobCleanupTask() {
return jobCleanup;
}
public void setJobCleanupTask() {
jobCleanup = true;
}
public boolean isJobSetupTask() {
return jobSetup;
}
public void setJobSetupTask() {
jobSetup = true;
}
public boolean isOnlyCommitPending() {
for (TaskStatus t : taskStatuses.values()) {
if (t.getRunState() == TaskStatus.State.COMMIT_PENDING) {
return true;
}
}
return false;
}
public boolean isCommitPending(TaskAttemptID taskId) {
TaskStatus t = taskStatuses.get(taskId);
if (t == null) {
return false;
}
return t.getRunState() == TaskStatus.State.COMMIT_PENDING;
}
/**
* Initialization common to Map and Reduce
*/
void init(JobID jobId) {
this.startTime = jobtracker.getClock().getTime();
this.id = new TaskID(jobId, isMapTask(), partition);
this.skipping = startSkipping();
}
////////////////////////////////////
// Accessors, info, profiles, etc.
////////////////////////////////////
/**
* Return the start time
*/
public long getStartTime() {
return startTime;
}
/**
* Return the exec start time
*/
public long getExecStartTime() {
return execStartTime;
}
/**
* Set the exec start time
*/
public void setExecStartTime(long startTime) {
execStartTime = startTime;
}
/**
* Return the exec finish time
*/
public long getExecFinishTime() {
return execFinishTime;
}
/**
* Set the exec finish time
*/
public void setExecFinishTime(long finishTime) {
execFinishTime = finishTime;
JobHistory.Task.logUpdates(id, execFinishTime); // log the update
}
/**
* Return the parent job
*/
public JobInProgress getJob() {
return job;
}
/**
* Return an ID for this task, not its component taskid-threads
*/
public TaskID getTIPId() {
return this.id;
}
/**
* Whether this is a map task
*/
public boolean isMapTask() {
return splitInfo != null;
}
/**
* Returns the type of the {@link TaskAttemptID} passed.
* The type of an attempt is determined by the nature of the task and not its
* id.
* For example,
* - Attempt 'attempt_123_01_m_01_0' might be a job-setup task even though it
* has a _m_ in its id. Hence the task type of this attempt is JOB_SETUP
* instead of MAP.
* - Similarly reduce attempt 'attempt_123_01_r_01_0' might have failed and is
* now supposed to do the task-level cleanup. In such a case this attempt
* will be of type TASK_CLEANUP instead of REDUCE.
*/
TaskType getAttemptType (TaskAttemptID id) {
if (isCleanupAttempt(id)) {
return TaskType.TASK_CLEANUP;
} else if (isJobSetupTask()) {
return TaskType.JOB_SETUP;
} else if (isJobCleanupTask()) {
return TaskType.JOB_CLEANUP;
} else if (isMapTask()) {
return TaskType.MAP;
} else {
return TaskType.REDUCE;
}
}
TaskType getFirstTaskType() {
assert firstTaskId != null : "got first task";
return getAttemptType(firstTaskId);
}
/**
* Is the Task associated with taskid is the first attempt of the tip?
* @param taskId
* @return Returns true if the Task is the first attempt of the tip
*/
public boolean isFirstAttempt(TaskAttemptID taskId) {
return firstTaskId == null ? false : firstTaskId.equals(taskId);
}
/**
* Is this tip currently running any tasks?
* @return true if any tasks are running
*/
public boolean isRunning() {
return !activeTasks.isEmpty();
}
/**
* Is this attempt currently running ?
* @param taskId task attempt id.
* @return true if attempt taskId is running
*/
boolean isAttemptRunning(TaskAttemptID taskId) {
return activeTasks.containsKey(taskId);
}
TaskAttemptID getSuccessfulTaskid() {
return successfulTaskId;
}
private void setSuccessfulTaskid(TaskAttemptID successfulTaskId) {
this.successfulTaskId = successfulTaskId;
}
private void resetSuccessfulTaskid() {
this.successfulTaskId = null;
}
/**
* Is this tip complete?
*
* @return <code>true</code> if the tip is complete, else <code>false</code>
*/
public synchronized boolean isComplete() {
return (completes > 0);
}
/**
* Is the given taskid the one that took this tip to completion?
*
* @param taskid taskid of attempt to check for completion
* @return <code>true</code> if taskid is complete, else <code>false</code>
*/
public boolean isComplete(TaskAttemptID taskid) {
return ((completes > 0)
&& taskid.equals(getSuccessfulTaskid()));
}
/**
* Is the tip a failure?
*
* @return <code>true</code> if tip has failed, else <code>false</code>
*/
public boolean isFailed() {
return failed;
}
/**
* Number of times the TaskInProgress has failed.
*/
public int numTaskFailures() {
return numTaskFailures;
}
/**
* Number of times the TaskInProgress has been killed by the framework.
*/
public int numKilledTasks() {
return numKilledTasks;
}
/**
* Get the overall progress (from 0 to 1.0) for this TIP
*/
public double getProgress() {
return progress;
}
/**
* Get the task's counters
*/
public Counters getCounters() {
return counters;
}
/**
* Returns whether a component task-thread should be
* closed because the containing JobInProgress has completed
* or the task is killed by the user
*/
public boolean shouldClose(TaskAttemptID taskid) {
/**
* If the task hasn't been closed yet, and it belongs to a completed
* TaskInProgress close it.
*
* However, for completed map tasks we do not close the task which
* actually was the one responsible for _completing_ the TaskInProgress.
*/
boolean close = false;
TaskStatus ts = taskStatuses.get(taskid);
if ((ts != null) &&
(!tasksReportedClosed.contains(taskid)) &&
((this.failed) ||
((job.getStatus().getRunState() != JobStatus.RUNNING &&
(job.getStatus().getRunState() != JobStatus.PREP))))) {
tasksReportedClosed.add(taskid);
close = true;
} else if (isComplete() &&
!(isMapTask() && !jobSetup &&
!jobCleanup && isComplete(taskid)) &&
!tasksReportedClosed.contains(taskid)) {
tasksReportedClosed.add(taskid);
close = true;
} else if (isCommitPending(taskid) && !shouldCommit(taskid) &&
!tasksReportedClosed.contains(taskid)) {
tasksReportedClosed.add(taskid);
close = true;
} else {
close = tasksToKill.keySet().contains(taskid);
}
return close;
}
/**
* Commit this task attempt for the tip.
* @param taskid
*/
public void doCommit(TaskAttemptID taskid) {
taskToCommit = taskid;
}
/**
* Returns whether the task attempt should be committed or not
*/
public boolean shouldCommit(TaskAttemptID taskid) {
return !isComplete() && isCommitPending(taskid) &&
taskToCommit.equals(taskid);
}
/**
* Creates a "status report" for this task. Includes the
* task ID and overall status, plus reports for all the
* component task-threads that have ever been started.
*/
synchronized TaskReport generateSingleReport() {
ArrayList<String> diagnostics = new ArrayList<String>();
for (List<String> l : taskDiagnosticData.values()) {
diagnostics.addAll(l);
}
TIPStatus currentStatus = null;
if (isRunning() && !isComplete()) {
currentStatus = TIPStatus.RUNNING;
} else if (isComplete()) {
currentStatus = TIPStatus.COMPLETE;
} else if (wasKilled()) {
currentStatus = TIPStatus.KILLED;
} else if (isFailed()) {
currentStatus = TIPStatus.FAILED;
} else if (!(isComplete() || isRunning() || wasKilled())) {
currentStatus = TIPStatus.PENDING;
}
TaskReport report = new TaskReport
(getTIPId(), (float)progress, state,
diagnostics.toArray(new String[diagnostics.size()]),
currentStatus, execStartTime, execFinishTime, counters);
if (currentStatus == TIPStatus.RUNNING) {
report.setRunningTaskAttempts(activeTasks.keySet());
} else if (currentStatus == TIPStatus.COMPLETE) {
report.setSuccessfulAttempt(getSuccessfulTaskid());
}
return report;
}
/**
* Get the diagnostic messages for a given task within this tip.
*
* @param taskId the id of the required task
* @return the list of diagnostics for that task
*/
synchronized List<String> getDiagnosticInfo(TaskAttemptID taskId) {
return taskDiagnosticData.get(taskId);
}
////////////////////////////////////////////////
// Update methods, usually invoked by the owning
// job.
////////////////////////////////////////////////
/**
* Save diagnostic information for a given task.
*
* @param taskId id of the task
* @param diagInfo diagnostic information for the task
*/
public void addDiagnosticInfo(TaskAttemptID taskId, String diagInfo) {
List<String> diagHistory = taskDiagnosticData.get(taskId);
if (diagHistory == null) {
diagHistory = new ArrayList<String>();
taskDiagnosticData.put(taskId, diagHistory);
}
diagHistory.add(diagInfo);
}
/**
* A status message from a client has arrived.
* It updates the status of a single component-thread-task,
* which might result in an overall TaskInProgress status update.
* @return has the task changed its state noticably?
*/
synchronized boolean updateStatus(TaskStatus status) {
TaskAttemptID taskid = status.getTaskID();
String diagInfo = status.getDiagnosticInfo();
TaskStatus oldStatus = taskStatuses.get(taskid);
boolean changed = true;
if (diagInfo != null && diagInfo.length() > 0) {
LOG.info("Error from "+taskid+": "+diagInfo);
addDiagnosticInfo(taskid, diagInfo);
}
if(skipping) {
failedRanges.updateState(status);
}
if (oldStatus != null) {
TaskStatus.State oldState = oldStatus.getRunState();
TaskStatus.State newState = status.getRunState();
// We should never recieve a duplicate success/failure/killed
// status update for the same taskid! This is a safety check,
// and is addressed better at the TaskTracker to ensure this.
// @see {@link TaskTracker.transmitHeartbeat()}
if ((newState != TaskStatus.State.RUNNING &&
newState != TaskStatus.State.COMMIT_PENDING &&
newState != TaskStatus.State.FAILED_UNCLEAN &&
newState != TaskStatus.State.KILLED_UNCLEAN &&
newState != TaskStatus.State.UNASSIGNED) &&
(oldState == newState)) {
LOG.warn("Recieved duplicate status update of '" + newState +
"' for '" + taskid + "' of TIP '" + getTIPId() + "'" +
"oldTT=" + oldStatus.getTaskTracker() +
" while newTT=" + status.getTaskTracker());
return false;
}
// The task is not allowed to move from completed back to running.
// We have seen out of order status messagesmoving tasks from complete
// to running. This is a spot fix, but it should be addressed more
// globally.
if ((newState == TaskStatus.State.RUNNING ||
newState == TaskStatus.State.UNASSIGNED) &&
(oldState == TaskStatus.State.FAILED ||
oldState == TaskStatus.State.KILLED ||
oldState == TaskStatus.State.FAILED_UNCLEAN ||
oldState == TaskStatus.State.KILLED_UNCLEAN ||
oldState == TaskStatus.State.SUCCEEDED ||
oldState == TaskStatus.State.COMMIT_PENDING)) {
return false;
}
//Do not accept any status once the task is marked FAILED/KILLED
//This is to handle the case of the JobTracker timing out a task
//due to launch delay, but the TT comes back with any state or
//TT got expired
if (oldState == TaskStatus.State.FAILED ||
oldState == TaskStatus.State.KILLED) {
tasksToKill.put(taskid, true);
return false;
}
changed = oldState != newState;
}
// if task is a cleanup attempt, do not replace the complete status,
// update only specific fields.
// For example, startTime should not be updated,
// but finishTime has to be updated.
if (!isCleanupAttempt(taskid)) {
taskStatuses.put(taskid, status);
} else {
taskStatuses.get(taskid).statusUpdate(status.getRunState(),
status.getProgress(), status.getStateString(), status.getPhase(),
status.getFinishTime());
}
// Recompute progress
recomputeProgress();
return changed;
}
/**
* Indicate that one of the taskids in this TaskInProgress
* has failed.
*/
public void incompleteSubTask(TaskAttemptID taskid,
JobStatus jobStatus) {
//
// Note the failure and its location
//
TaskStatus status = taskStatuses.get(taskid);
String trackerName;
String trackerHostName = null;
TaskStatus.State taskState = TaskStatus.State.FAILED;
if (status != null) {
trackerName = status.getTaskTracker();
trackerHostName =
JobInProgress.convertTrackerNameToHostName(trackerName);
// Check if the user manually KILLED/FAILED this task-attempt...
Boolean shouldFail = tasksToKill.remove(taskid);
if (shouldFail != null) {
if (status.getRunState() == TaskStatus.State.FAILED ||
status.getRunState() == TaskStatus.State.KILLED) {
taskState = (shouldFail) ? TaskStatus.State.FAILED :
TaskStatus.State.KILLED;
} else {
taskState = (shouldFail) ? TaskStatus.State.FAILED_UNCLEAN :
TaskStatus.State.KILLED_UNCLEAN;
}
status.setRunState(taskState);
addDiagnosticInfo(taskid, "Task has been " + taskState + " by the user" );
}
taskState = status.getRunState();
if (taskState != TaskStatus.State.FAILED &&
taskState != TaskStatus.State.KILLED &&
taskState != TaskStatus.State.FAILED_UNCLEAN &&
taskState != TaskStatus.State.KILLED_UNCLEAN) {
LOG.info("Task '" + taskid + "' running on '" + trackerName +
"' in state: '" + taskState + "' being failed!");
status.setRunState(TaskStatus.State.FAILED);
taskState = TaskStatus.State.FAILED;
}
// tasktracker went down and failed time was not reported.
if (0 == status.getFinishTime()){
status.setFinishTime(jobtracker.getClock().getTime());
}
}
this.activeTasks.remove(taskid);
// Since we do not fail completed reduces (whose outputs go to hdfs), we
// should note this failure only for completed maps, only if this taskid;
// completed this map. however if the job is done, there is no need to
// manipulate completed maps
if (this.isMapTask() && !jobSetup && !jobCleanup && isComplete(taskid) &&
jobStatus.getRunState() != JobStatus.SUCCEEDED) {
this.completes--;
// Reset the successfulTaskId since we don't have a SUCCESSFUL task now
resetSuccessfulTaskid();
}
// Note that there can be failures of tasks that are hosted on a machine
// that has not yet registered with restarted jobtracker
// recalculate the counts only if its a genuine failure
if (tasks.contains(taskid)) {
if (taskState == TaskStatus.State.FAILED) {
numTaskFailures++;
machinesWhereFailed.add(trackerHostName);
if(maxSkipRecords>0) {
//skipping feature enabled
if(LOG.isDebugEnabled()) {
LOG.debug("TaskInProgress adding" + status.getNextRecordRange());
}
failedRanges.add(status.getNextRecordRange());
skipping = startSkipping();
}
} else if (taskState == TaskStatus.State.KILLED) {
numKilledTasks++;
}
}
if (numTaskFailures >= maxTaskAttempts) {
LOG.info("TaskInProgress " + getTIPId() + " has failed " + numTaskFailures + " times.");
kill();
}
this.user = job.getUser();
}
/**
* Get whether to start skipping mode.
*/
private boolean startSkipping() {
if(maxSkipRecords>0 &&
numTaskFailures>=SkipBadRecords.getAttemptsToStartSkipping(conf)) {
return true;
}
return false;
}
/**
* Finalize the <b>completed</b> task; note that this might not be the first
* task-attempt of the {@link TaskInProgress} and hence might be declared
* {@link TaskStatus.State.SUCCEEDED} or {@link TaskStatus.State.KILLED}
*
* @param taskId id of the completed task-attempt
* @param finalTaskState final {@link TaskStatus.State} of the task-attempt
*/
private void completedTask(TaskAttemptID taskId, TaskStatus.State finalTaskState) {
TaskStatus status = taskStatuses.get(taskId);
status.setRunState(finalTaskState);
activeTasks.remove(taskId);
}
/**
* Indicate that one of the taskids in this already-completed
* TaskInProgress has successfully completed; hence we mark this
* taskid as {@link TaskStatus.State.KILLED}.
*/
void alreadyCompletedTask(TaskAttemptID taskid) {
// 'KILL' the task
completedTask(taskid, TaskStatus.State.KILLED);
// Note the reason for the task being 'KILLED'
addDiagnosticInfo(taskid, "Already completed TIP");
LOG.info("Already complete TIP " + getTIPId() +
" has completed task " + taskid);
}
/**
* Indicate that one of the taskids in this TaskInProgress
* has successfully completed!
*/
public void completed(TaskAttemptID taskid) {
//
// Record that this taskid is complete
//
completedTask(taskid, TaskStatus.State.SUCCEEDED);
// Note the successful taskid
setSuccessfulTaskid(taskid);
//
// Now that the TIP is complete, the other speculative
// subtasks will be closed when the owning tasktracker
// reports in and calls shouldClose() on this object.
//
this.completes++;
this.execFinishTime = jobtracker.getClock().getTime();
recomputeProgress();
}
/**
* Get the split locations
*/
public String[] getSplitLocations() {
if (isMapTask() && !jobSetup && !jobCleanup) {
return splitInfo.getLocations();
}
return new String[0];
}
/**
* Get the Status of the tasks managed by this TIP
*/
public TaskStatus[] getTaskStatuses() {
return taskStatuses.values().toArray(new TaskStatus[taskStatuses.size()]);
}
/**
* Get all the {@link TaskAttemptID}s in this {@link TaskInProgress}
*/
TaskAttemptID[] getAllTaskAttemptIDs() {
return tasks.toArray(new TaskAttemptID[tasks.size()]);
}
/**
* Get the status of the specified task
* @param taskid
* @return
*/
public TaskStatus getTaskStatus(TaskAttemptID taskid) {
return taskStatuses.get(taskid);
}
/**
* The TIP's been ordered kill()ed.
*/
public void kill() {
if (isComplete() || failed) {
return;
}
this.failed = true;
killed = true;
this.execFinishTime = jobtracker.getClock().getTime();
recomputeProgress();
}
/**
* Was the task killed?
* @return true if the task killed
*/
public boolean wasKilled() {
return killed;
}
/**
* Kill the given task
*/
boolean killTask(TaskAttemptID taskId, boolean shouldFail) {
TaskStatus st = taskStatuses.get(taskId);
if(st != null && (st.getRunState() == TaskStatus.State.RUNNING
|| st.getRunState() == TaskStatus.State.COMMIT_PENDING ||
st.inTaskCleanupPhase() ||
st.getRunState() == TaskStatus.State.UNASSIGNED)
&& tasksToKill.put(taskId, shouldFail) == null ) {
String logStr = "Request received to " + (shouldFail ? "fail" : "kill")
+ " task '" + taskId + "' by user";
addDiagnosticInfo(taskId, logStr);
LOG.info(logStr);
return true;
}
return false;
}
/**
* This method is called whenever there's a status change
* for one of the TIP's sub-tasks. It recomputes the overall
* progress for the TIP. We examine all sub-tasks and find
* the one that's most advanced (and non-failed).
*/
void recomputeProgress() {
if (isComplete()) {
this.progress = 1;
// update the counters and the state
TaskStatus completedStatus = taskStatuses.get(getSuccessfulTaskid());
this.counters = completedStatus.getCounters();
this.state = completedStatus.getStateString();
} else if (failed) {
this.progress = 0;
// reset the counters and the state
this.state = "";
this.counters = new Counters();
} else {
double bestProgress = 0;
String bestState = "";
Counters bestCounters = new Counters();
for (Iterator<TaskAttemptID> it = taskStatuses.keySet().iterator(); it.hasNext();) {
TaskAttemptID taskid = it.next();
TaskStatus status = taskStatuses.get(taskid);
if (status.getRunState() == TaskStatus.State.SUCCEEDED) {
bestProgress = 1;
bestState = status.getStateString();
bestCounters = status.getCounters();
break;
} else if (status.getRunState() == TaskStatus.State.COMMIT_PENDING) {
//for COMMIT_PENDING, we take the last state that we recorded
//when the task was RUNNING
bestProgress = this.progress;
bestState = this.state;
bestCounters = this.counters;
} else if (status.getRunState() == TaskStatus.State.RUNNING) {
if (status.getProgress() >= bestProgress) {
bestProgress = status.getProgress();
bestState = status.getStateString();
if (status.getIncludeCounters()) {
bestCounters = status.getCounters();
} else {
bestCounters = this.counters;
}
}
}
}
this.progress = bestProgress;
this.state = bestState;
this.counters = bestCounters;
}
}
/////////////////////////////////////////////////
// "Action" methods that actually require the TIP
// to do something.
/////////////////////////////////////////////////
/**
* Return whether this TIP still needs to run
*/
boolean isRunnable() {
return !failed && (completes == 0);
}
/**
* Return whether the TIP has a speculative task to run. We
* only launch a speculative task if the current TIP is really
* far behind, and has been behind for a non-trivial amount of
* time.
*/
boolean hasSpeculativeTask(long currentTime, double averageProgress) {
//
// REMIND - mjc - these constants should be examined
// in more depth eventually...
//
if (!skipping && activeTasks.size() <= MAX_TASK_EXECS &&
(averageProgress - progress >= SPECULATIVE_GAP) &&
(currentTime - startTime >= SPECULATIVE_LAG)
&& completes == 0 && !isOnlyCommitPending()) {
return true;
}
return false;
}
/**
* Return a Task that can be sent to a TaskTracker for execution.
*/
public Task getTaskToRun(String taskTracker) throws IOException {
if (0 == execStartTime){
// assume task starts running now
execStartTime = jobtracker.getClock().getTime();
}
// Create the 'taskid'; do not count the 'killed' tasks against the job!
TaskAttemptID taskid = null;
if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts + numKilledTasks)) {
// Make sure that the attempts are unqiue across restarts
int attemptId = job.getNumRestarts() * NUM_ATTEMPTS_PER_RESTART + nextTaskId;
taskid = new TaskAttemptID( id, attemptId);
++nextTaskId;
} else {
LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + maxTaskAttempts) +
" (plus " + numKilledTasks + " killed)" +
" attempts for the tip '" + getTIPId() + "'");
return null;
}
return addRunningTask(taskid, taskTracker);
}
public Task addRunningTask(TaskAttemptID taskid, String taskTracker) {
return addRunningTask(taskid, taskTracker, false);
}
String getUser() {
return user;
}
void setUser(String user) {
this.user = user;
}
/**
* Adds a previously running task to this tip. This is used in case of
* jobtracker restarts.
*/
public Task addRunningTask(TaskAttemptID taskid,
String taskTracker,
boolean taskCleanup) {
// 1 slot is enough for taskCleanup task
int numSlotsNeeded = taskCleanup ? 1 : numSlotsRequired;
// create the task
Task t = null;
if (isMapTask()) {
if(LOG.isDebugEnabled()) {
LOG.debug("attempt " + numTaskFailures + " sending skippedRecords "
+ failedRanges.getIndicesCount());
}
t = new MapTask(jobFile, taskid, partition, splitInfo.getSplitIndex(),
numSlotsNeeded);
} else {
t = new ReduceTask(jobFile, taskid, partition, numMaps,
numSlotsNeeded);
}
if (jobCleanup) {
t.setJobCleanupTask();
}
if (jobSetup) {
t.setJobSetupTask();
}
if (taskCleanup) {
t.setTaskCleanupTask();
t.setState(taskStatuses.get(taskid).getRunState());
cleanupTasks.put(taskid, taskTracker);
}
t.setConf(conf);
t.setUser(getUser());
if (LOG.isDebugEnabled()) {
LOG.debug("Launching task with skipRanges:"+failedRanges.getSkipRanges());
}
t.setSkipRanges(failedRanges.getSkipRanges());
t.setSkipping(skipping);
if(failedRanges.isTestAttempt()) {
t.setWriteSkipRecs(false);
}
activeTasks.put(taskid, taskTracker);
tasks.add(taskid);
// Ask JobTracker to note that the task exists
jobtracker.createTaskEntry(taskid, taskTracker, this);
// check and set the first attempt
if (firstTaskId == null) {
firstTaskId = taskid;
}
return t;
}
boolean isRunningTask(TaskAttemptID taskid) {
TaskStatus status = taskStatuses.get(taskid);
return status != null && status.getRunState() == TaskStatus.State.RUNNING;
}
boolean isCleanupAttempt(TaskAttemptID taskid) {
return cleanupTasks.containsKey(taskid);
}
String machineWhereCleanupRan(TaskAttemptID taskid) {
return cleanupTasks.get(taskid);
}
String machineWhereTaskRan(TaskAttemptID taskid) {
return taskStatuses.get(taskid).getTaskTracker();
}
boolean wasKilled(TaskAttemptID taskid) {
return tasksToKill.containsKey(taskid);
}
/**
* Has this task already failed on this machine?
* @param trackerHost The task tracker hostname
* @return Has it failed?
*/
public boolean hasFailedOnMachine(String trackerHost) {
return machinesWhereFailed.contains(trackerHost);
}
/**
* Was this task ever scheduled to run on this machine?
* @param trackerHost The task tracker hostname
* @param trackerName The tracker name
* @return Was task scheduled on the tracker?
*/
public boolean hasRunOnMachine(String trackerHost, String trackerName) {
return this.activeTasks.values().contains(trackerName) ||
hasFailedOnMachine(trackerHost);
}
/**
* Get the number of machines where this task has failed.
* @return the size of the failed machine set
*/
public int getNumberOfFailedMachines() {
return machinesWhereFailed.size();
}
/**
* Get the id of this map or reduce task.
* @return The index of this tip in the maps/reduces lists.
*/
public int getIdWithinJob() {
return partition;
}
/**
* Set the event number that was raised for this tip
*/
public void setSuccessEventNumber(int eventNumber) {
successEventNumber = eventNumber;
}
/**
* Get the event number that was raised for this tip
*/
public int getSuccessEventNumber() {
return successEventNumber;
}
/**
* Gets the Node list of input split locations sorted in rack order.
*/
public String getSplitNodes() {
if (!isMapTask() || jobSetup || jobCleanup) {
return "";
}
String[] splits = splitInfo.getLocations();
Node[] nodes = new Node[splits.length];
for (int i = 0; i < splits.length; i++) {
nodes[i] = jobtracker.getNode(splits[i]);
}
// sort nodes on rack location
Arrays.sort(nodes, new Comparator<Node>() {
public int compare(Node a, Node b) {
String left = a.getNetworkLocation();
String right = b.getNetworkLocation();
return left.compareTo(right);
}
});
return nodeToString(nodes);
}
private static String nodeToString(Node[] nodes) {
if (nodes == null || nodes.length == 0) {
return "";
}
StringBuffer ret = new StringBuffer(nodes[0].toString());
for(int i = 1; i < nodes.length;i++) {
ret.append(",");
ret.append(nodes[i].toString());
}
return ret.toString();
}
public long getMapInputSize() {
if(isMapTask() && !jobSetup && !jobCleanup) {
return splitInfo.getInputDataLength();
} else {
return 0;
}
}
/**
* This class keeps the records to be skipped during further executions
* based on failed records from all the previous attempts.
* It also narrow down the skip records if it is more than the
* acceptable value by dividing the failed range into half. In this case one
* half is executed in the next attempt (test attempt).
* In the test attempt, only the test range gets executed, others get skipped.
* Based on the success/failure of the test attempt, the range is divided
* further.
*/
private class FailedRanges {
private SortedRanges skipRanges = new SortedRanges();
private Divide divide;
synchronized SortedRanges getSkipRanges() {
if(divide!=null) {
return divide.skipRange;
}
return skipRanges;
}
synchronized boolean isTestAttempt() {
return divide!=null;
}
synchronized long getIndicesCount() {
if(isTestAttempt()) {
return divide.skipRange.getIndicesCount();
}
return skipRanges.getIndicesCount();
}
synchronized void updateState(TaskStatus status){
if (isTestAttempt() &&
(status.getRunState() == TaskStatus.State.SUCCEEDED)) {
divide.testPassed = true;
//since it was the test attempt we need to set it to failed
//as it worked only on the test range
status.setRunState(TaskStatus.State.FAILED);
}
}
synchronized void add(Range failedRange) {
LOG.warn("FailedRange:"+ failedRange);
if(divide!=null) {
LOG.warn("FailedRange:"+ failedRange +" test:"+divide.test +
" pass:"+divide.testPassed);
if(divide.testPassed) {
//test range passed
//other range would be bad. test it
failedRange = divide.other;
}
else {
//test range failed
//other range would be good.
failedRange = divide.test;
}
//reset
divide = null;
}
if(maxSkipRecords==0 || failedRange.getLength()<=maxSkipRecords) {
skipRanges.add(failedRange);
} else {
//start dividing the range to narrow down the skipped
//records until maxSkipRecords are met OR all attempts
//get exhausted
divide = new Divide(failedRange);
}
}
class Divide {
private final SortedRanges skipRange;
private final Range test;
private final Range other;
private boolean testPassed;
Divide(Range range){
long half = range.getLength()/2;
test = new Range(range.getStartIndex(), half);
other = new Range(test.getEndIndex(), range.getLength()-half);
//construct the skip range from the skipRanges
skipRange = new SortedRanges();
for(Range r : skipRanges.getRanges()) {
skipRange.add(r);
}
skipRange.add(new Range(0,test.getStartIndex()));
skipRange.add(new Range(test.getEndIndex(),
(Long.MAX_VALUE-test.getEndIndex())));
}
}
}
TreeMap<TaskAttemptID, String> getActiveTasks() {
return activeTasks;
}
int getNumSlotsRequired() {
return numSlotsRequired;
}
}