blob: 19b3bef01232b78acd2e48a17060a6625d86ec1e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.Set;
import java.util.HashSet;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.TaskStatus.Phase;
import org.apache.hadoop.mapred.TaskStatus.State;
import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
import org.apache.hadoop.tools.rumen.ReduceTaskAttemptInfo;
// Explicitly use the new api, older o.a.h.mapred.TaskAttemptID is deprecated
import org.apache.hadoop.mapreduce.TaskAttemptID;
/**
* This class simulates a {@link TaskTracker}. Its main purpose is to call heartbeat()
* of the simulated Job Tracker with apropriately updated statuses of the
* tasks assigned to it.
*
* The events emitted and consumed are HeartbeatEvent and
* TaskAttemptCompletionEvent .
*
* Internal naming convention: accept() dispatches simulation events to
* process*Event() methods. heartbeat() dispactches task tracker actions to
* handle*Action() methods.
*/
public class SimulatorTaskTracker implements SimulatorEventListener {
/** The name of the task tracker. */
protected final String taskTrackerName;
/** The name of the host the task tracker is running on. */
protected final String hostName;
/** The http port the simulated task tracker reports to the jobtracker. */
protected final int httpPort = 80;
/** Number of map slots. */
protected final int maxMapSlots;
/** Number of reduce slots. */
protected final int maxReduceSlots;
/** The job tracker this task tracker is a slave of. */
protected final InterTrackerProtocol jobTracker;
/**
* State of and bookkeeping information for all tasks assigned to the task
* tracker. Contains all the information about running or completed but
* not yet reported tasks. We manage it in a mark & sweep garbage collector
* manner. We insert tasks on launch, mark them on completion, and remove
* completed tasks at heartbeat() reports.
*/
protected Map<TaskAttemptID, SimulatorTaskInProgress> tasks =
new HashMap<TaskAttemptID, SimulatorTaskInProgress>();
/**
* Number of map slots allocated to tasks in RUNNING state on this task
* tracker. Must be in sync with the tasks map above.
*/
private int usedMapSlots = 0;
/**
* Number of reduce slots allocated to tasks in RUNNING state on this task
* tracker. Must be in sync with the tasks map above.
*/
private int usedReduceSlots = 0;
/**
* True if the jobTracker.heartbeat() call to be made is the first one.
* We need this to mimick the InterTrackerProtocol properly.
*/
private boolean firstHeartbeat = true;
// last heartbeat response recieved
private short heartbeatResponseId = -1;
/**
* Task attempt ids for which TaskAttemptCompletionEvent was created but the
* task attempt got killed.
*/
private Set<TaskAttemptID> orphanTaskCompletions =
new HashSet<TaskAttemptID>();
/** The log object to send our messages to; only used for debugging. */
private static final Log LOG = LogFactory.getLog(SimulatorTaskTracker.class);
/**
* Constructs a task tracker.
*
* @param jobTracker the SimulatorJobTracker we talk to
* @param taskTrackerName the task tracker name to report, otherwise unused
* @param hostName the host name to report, otherwise unused
* @param maxMapTasks the number of map slots
* @param maxReduceTasks the number of reduce slots
*/
public SimulatorTaskTracker(InterTrackerProtocol jobTracker,
String taskTrackerName, String hostName,
int maxMapTasks, int maxReduceTasks) {
LOG.debug("SimulatorTaskTracker constructor, taskTrackerName=" +
taskTrackerName);
this.jobTracker = jobTracker;
this.taskTrackerName = taskTrackerName;
this.hostName = hostName;
this.maxMapSlots = maxMapTasks;
this.maxReduceSlots = maxReduceTasks;
}
/**
* Processes a simulation event.
*
* @param event the event to process, should be an instance of HeartbeatEvent
* or TaskAttemptCompletionEvent
* @return the list of new events generated in response
*/
@Override
public List<SimulatorEvent> accept(SimulatorEvent event) {
if (LOG.isDebugEnabled()) {
LOG.debug("Accepted event " + event);
}
if (event instanceof HeartbeatEvent) {
return processHeartbeatEvent((HeartbeatEvent)event);
} else if (event instanceof TaskAttemptCompletionEvent) {
return processTaskAttemptCompletionEvent((TaskAttemptCompletionEvent)
event);
} else {
throw new IllegalArgumentException("Unhandled event "+event);
}
}
/**
* Called once at the start of the simulation.
*
* @param when Time when the task tracker starts.
* @return the initial HeartbeatEvent for ourselves.
*/
public List<SimulatorEvent> init(long when) {
LOG.debug("TaskTracker starting up, current simulation time=" + when);
return Collections.<SimulatorEvent>singletonList(new HeartbeatEvent(this, when));
}
/**
* Stops running a task attempt on the task tracker. It also updates the
* number of available slots accordingly.
*
* @param finalStatus the TaskStatus containing the task id and final
* status of the task attempt. This rountine asserts a lot of the
* finalStatus params, in case it is coming from a task attempt
* completion event sent to ourselves. Only the run state, finish time,
* and progress fields of the task attempt are updated.
* @param now Current simulation time, used for assert only
*/
private void finishRunningTask(TaskStatus finalStatus, long now) {
TaskAttemptID taskId = finalStatus.getTaskID();
if (LOG.isDebugEnabled()) {
LOG.debug("Finishing running task id=" + taskId + ", now=" + now);
}
SimulatorTaskInProgress tip = tasks.get(taskId);
if (tip == null) {
throw new IllegalArgumentException("Unknown task attempt " + taskId
+ " completed");
}
TaskStatus currentStatus = tip.getTaskStatus();
if (currentStatus.getRunState() != State.RUNNING) {
throw new IllegalArgumentException(
"Task attempt to finish is not running: " + tip);
}
// Check that finalStatus describes a task attempt that has just been
// completed
State finalRunState = finalStatus.getRunState();
if (finalRunState != State.SUCCEEDED && finalRunState != State.FAILED
&& finalRunState != State.KILLED) {
throw new IllegalArgumentException(
"Final run state for completed task can't be : " + finalRunState
+ " " + tip);
}
if (now != finalStatus.getFinishTime()) {
throw new IllegalArgumentException(
"Current time does not match task finish time: now=" + now
+ ", finish=" + finalStatus.getFinishTime());
}
if (currentStatus.getIsMap() != finalStatus.getIsMap()
|| currentStatus.getNumSlots() != finalStatus.getNumSlots()
|| currentStatus.getPhase() != finalStatus.getPhase()
|| currentStatus.getStartTime() != finalStatus.getStartTime()) {
throw new IllegalArgumentException(
"Current status does not match final status");
}
// We can't assert getShuffleFinishTime() and getSortFinishTime() for
// reduces as those were unknown when the task attempt completion event
// was created. We have not called setMapFinishTime() for maps either.
// If we were really thorough we could update the progress of the task
// and check if it is consistent with finalStatus.
// If we've got this far it is safe to update the task status
currentStatus.setRunState(finalStatus.getRunState());
currentStatus.setFinishTime(finalStatus.getFinishTime());
currentStatus.setProgress(finalStatus.getProgress());
// and update the free slots
int numSlots = currentStatus.getNumSlots();
if (tip.isMapTask()) {
usedMapSlots -= numSlots;
if (usedMapSlots < 0) {
throw new IllegalStateException(
"TaskTracker reaches negative map slots: " + usedMapSlots);
}
} else {
usedReduceSlots -= numSlots;
if (usedReduceSlots < 0) {
throw new IllegalStateException(
"TaskTracker reaches negative reduce slots: " + usedReduceSlots);
}
}
}
/**
* Records that a task attempt has completed. Ignores the event for tasks
* that got killed after the creation of the completion event.
*
* @param event the TaskAttemptCompletionEvent the tracker sent to itself
* @return the list of response events, empty
*/
private List<SimulatorEvent> processTaskAttemptCompletionEvent(
TaskAttemptCompletionEvent event) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing task attempt completion event" + event);
}
long now = event.getTimeStamp();
TaskStatus finalStatus = event.getStatus();
TaskAttemptID taskID = finalStatus.getTaskID();
boolean killedEarlier = orphanTaskCompletions.remove(taskID);
if (!killedEarlier) {
finishRunningTask(finalStatus, now);
}
return SimulatorEngine.EMPTY_EVENTS;
}
/**
* Creates a signal for itself marking the completion of a task attempt.
* It assumes that the task attempt hasn't made any progress in the user
* space code so far, i.e. it is called right at launch for map tasks and
* immediately after all maps completed for reduce tasks.
*
* @param tip the simulator task in progress
* @param now the current simulation time
* @return the TaskAttemptCompletionEvent we are sending to ourselves
*/
private TaskAttemptCompletionEvent createTaskAttemptCompletionEvent(
SimulatorTaskInProgress tip, long now) {
// We need to clone() status as we modify and it goes into an Event
TaskStatus status = (TaskStatus)tip.getTaskStatus().clone();
long delta = tip.getUserSpaceRunTime();
assert delta >= 0 : "TaskAttempt " + tip.getTaskStatus().getTaskID()
+ " has negative UserSpaceRunTime = " + delta;
long finishTime = now + delta;
status.setFinishTime(finishTime);
if (tip.isMapTask()) {
status.setMapFinishTime(finishTime);
}
status.setProgress(1.0f);
status.setRunState(tip.getFinalRunState());
TaskAttemptCompletionEvent event =
new TaskAttemptCompletionEvent(this, status);
if (LOG.isDebugEnabled()) {
LOG.debug("Created task attempt completion event " + event);
}
return event;
}
/**
* Launches a task on the simulated task tracker.
*
* @param action SimulatorLaunchTaskAction sent by the job tracker
* @param now current simulation time
* @return new events generated, a TaskAttemptCompletionEvent for map
* tasks, empty otherwise
*/
private List<SimulatorEvent> handleSimulatorLaunchTaskAction(
SimulatorLaunchTaskAction action, long now) {
if (LOG.isDebugEnabled()) {
LOG.debug("Handling launch task action " + action);
}
// First, create statuses and update used slots for map and reduce
// task separately
Task task = action.getTask();
TaskAttemptID taskId = task.getTaskID();
if (tasks.containsKey(taskId)) {
throw new IllegalArgumentException("Multiple launch of task id =" + taskId);
}
// Ctor of MapTaskStatus and ReduceTaskStatus need deprecated
// o.a.h.mapred.TaskAttemptID, hence the downgrade
org.apache.hadoop.mapred.TaskAttemptID taskIdOldApi =
org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId);
TaskStatus status;
int numSlotsRequired = task.getNumSlotsRequired();
Counters emptyCounters = new Counters();
if (task.isMapTask()) {
status = new MapTaskStatus(taskIdOldApi, 0f, numSlotsRequired,
State.RUNNING, "", "", taskTrackerName,
Phase.MAP, emptyCounters);
usedMapSlots += numSlotsRequired;
if (usedMapSlots > maxMapSlots) {
throw new IllegalStateException("usedMapSlots exceeds maxMapSlots: " +
usedMapSlots + " > " + maxMapSlots);
}
} else {
status = new ReduceTaskStatus(taskIdOldApi, 0f, numSlotsRequired,
State.RUNNING, "", "", taskTrackerName,
Phase.SHUFFLE, emptyCounters);
usedReduceSlots += numSlotsRequired;
if (usedReduceSlots > maxReduceSlots) {
throw new IllegalStateException("usedReduceSlots exceeds usedReduceSlots: " +
usedReduceSlots + " > " + usedReduceSlots);
}
}
// Second, create and store a TIP
status.setStartTime(now);
SimulatorTaskInProgress tip =
new SimulatorTaskInProgress(action, status, now);
tasks.put(taskId, tip);
// Third, schedule events for ourselves
if (task.isMapTask()) {
// we know when this task attempts ends iff it is a map
TaskAttemptCompletionEvent e = createTaskAttemptCompletionEvent(tip, now);
return Collections.<SimulatorEvent>singletonList(e);
} else {
// reduce, completion time can only be determined when all maps are done
return SimulatorEngine.EMPTY_EVENTS;
}
}
/**
* Kills a task attempt.
*
* @param action contains the task attempt to kill
* @param now current simulation time
* @return new events generated in response, empty
*/
private List<SimulatorEvent> handleKillTaskAction(KillTaskAction action, long now) {
TaskAttemptID taskId = action.getTaskID();
// we don't have a nice(r) toString() in Hadoop's TaskActions
if (LOG.isDebugEnabled()) {
LOG.debug("Handling kill task action, taskId=" + taskId + ", now=" + now);
}
SimulatorTaskInProgress tip = tasks.get(taskId);
// Safety check: We might get a KillTaskAction even for completed reduces
if (tip == null) {
return SimulatorEngine.EMPTY_EVENTS;
}
progressTaskStatus(tip, now); // make progress up to date
TaskStatus finalStatus = (TaskStatus)tip.getTaskStatus().clone();
finalStatus.setFinishTime(now);
finalStatus.setRunState(State.KILLED);
finishRunningTask(finalStatus, now);
if (finalStatus.getIsMap() || finalStatus.getPhase() == Phase.REDUCE) {
// if we have already created a task attempt completion event we remember
// the task id, so that we can safely ignore the event when its delivered
orphanTaskCompletions.add(taskId);
}
return SimulatorEngine.EMPTY_EVENTS;
}
/**
* Starts "running" the REDUCE phase of reduce upon being notified that
* all map tasks are (successfully) done.
*
* @param action contains the notification for one of the reduce tasks
* @param now current simulation time
* @return new events generated, a single TaskAttemptCompletionEvent for the
* reduce
*/
private List<SimulatorEvent> handleAllMapsCompletedTaskAction(
AllMapsCompletedTaskAction action, long now) {
if (LOG.isDebugEnabled()) {
LOG.debug("Handling all maps completed task action " + action);
}
TaskAttemptID taskId = action.getTaskID();
SimulatorTaskInProgress tip = tasks.get(taskId);
// If tip is null here it is because the task attempt to be notified is
// unknown to this TaskTracker.
TaskStatus status = tip.getTaskStatus();
if (status.getIsMap()) {
throw new IllegalStateException(
"Invalid AllMapsCompletedTaskAction, task attempt "
+ "to be notified is a map: " + taskId + " " + status);
}
if (status.getPhase() != Phase.SHUFFLE) {
throw new IllegalArgumentException(
"Reducer task attempt already notified: " + taskId + " " + status);
}
// Warning: setPhase() uses System.currentTimeMillis() internally to
// set shuffle and sort times, but we overwrite that manually anyway
status.setPhase(Phase.REDUCE);
status.setShuffleFinishTime(now);
status.setSortFinishTime(now);
// Forecast the completion of this reduce
TaskAttemptCompletionEvent e = createTaskAttemptCompletionEvent(tip, now);
return Collections.<SimulatorEvent>singletonList(e);
}
/**
* Updates the progress indicator of a task if it is running.
*
* @param tip simulator task in progress whose progress is to be updated
* @param now current simulation time
*/
private void progressTaskStatus(SimulatorTaskInProgress tip, long now) {
TaskStatus status = tip.getTaskStatus();
if (status.getRunState() != State.RUNNING) {
return; // nothing to be done
}
boolean isMap = tip.isMapTask();
// Time when the user space code started
long startTime = -1;
// Time spent in map or just in the REDUCE phase of a reduce task
long runTime = tip.getUserSpaceRunTime();
float progress = 0.0f;
if (isMap) {
// We linearly estimate the progress of maps since their start
startTime = status.getStartTime();
progress = ((float)(now - startTime)) / runTime;
} else {
// We don't model reduce progress in the SHUFFLE or SORT phases
// We use linear estimate for the 3rd, REDUCE phase
Phase reducePhase = status.getPhase();
switch (reducePhase) {
case SHUFFLE:
progress = 0.0f; // 0 phase is done out of 3
break;
case SORT:
progress = 1.0f/3; // 1 phase is done out of 3
break;
case REDUCE: {
// REDUCE phase with the user code started when sort finished
startTime = status.getSortFinishTime();
// 0.66f : 2 phases are done out of of 3
progress = 2.0f/3 + (((float) (now - startTime)) / runTime) / 3.0f;
}
break;
default:
// should never get here
throw new IllegalArgumentException("Invalid reducePhase=" + reducePhase);
}
}
final float EPSILON = 0.0001f;
if (progress < -EPSILON || progress > 1 + EPSILON) {
throw new IllegalStateException("Task progress out of range: " + progress);
}
progress = Math.max(Math.min(1.0f, progress), 0.0f);
status.setProgress(progress);
if (LOG.isDebugEnabled()) {
LOG.debug("Updated task progress, taskId=" + status.getTaskID()
+ ", progress=" + status.getProgress());
}
}
/**
* Updates the progress indicator of all running tasks.
*
* @param now current simulation time
*/
private void progressTaskStatuses(long now) {
for (SimulatorTaskInProgress tip : tasks.values()) {
progressTaskStatus(tip, now);
}
}
/**
* Frees up bookkeping memory used by completed tasks.
* Has no effect on the events or logs produced by the SimulatorTaskTracker.
* We need this in order not to report completed task multiple times and
* to ensure that we do not run out of Java heap memory in larger
* simulations.
*/
private void garbageCollectCompletedTasks() {
for (Iterator<TaskAttemptID> iter = tasks.keySet().iterator();
iter.hasNext();) {
TaskAttemptID taskId = iter.next();
SimulatorTaskInProgress tip = tasks.get(taskId);
if (tip.getTaskStatus().getRunState() != State.RUNNING) {
iter.remove();
if (LOG.isDebugEnabled()) {
LOG.debug("Garbage collected SimulatorTIP, taskId=" + taskId);
}
// We don't have to / must not touch usedMapSlots and usedReduceSlots
// as those were already updated by processTaskAttemptCompletionEvent()
// when the task switched its state from running
}
}
}
/**
* Creates a list of task statuses suitable for transmission via heartbeat().
* The task statuses are cloned() so that the heartbeat() callee, the job
* tracker, can't mess up the SimulatorTaskTracker's internal data.
*
* @return the list of running and recently completed task statuses
* on the tracker
*/
private List<TaskStatus> collectAndCloneTaskStatuses() {
ArrayList<TaskStatus> statuses = new ArrayList<TaskStatus>();
for (SimulatorTaskInProgress tip : tasks.values()) {
statuses.add((TaskStatus)tip.getTaskStatus().clone());
}
return statuses;
}
/**
* Handles the HeartbeatResponse received from the job tracker upon
* heartbeat(). Dispatches to handle*Action() methods.
*
* @param response HeartbeatResponse received from the job tracker
* @param now current simulation time
* @return list of new events generated in response to the task actions
*/
private List<SimulatorEvent> handleHeartbeatResponse(HeartbeatResponse response,
long now) {
if (LOG.isDebugEnabled()) {
LOG.debug("Handling heartbeat response " + response);
}
List<SimulatorEvent> events = new ArrayList<SimulatorEvent>();
TaskTrackerAction[] actions = response.getActions();
for (TaskTrackerAction action : actions) {
List<SimulatorEvent> actionEvents;
if (action instanceof SimulatorLaunchTaskAction) {
actionEvents = handleSimulatorLaunchTaskAction(
(SimulatorLaunchTaskAction)action, now);
} else if(action instanceof KillTaskAction) {
actionEvents = handleKillTaskAction((KillTaskAction)action, now);
} else if(action instanceof AllMapsCompletedTaskAction) {
// our extra task action for notifying the reducers
actionEvents = handleAllMapsCompletedTaskAction(
(AllMapsCompletedTaskAction)action, now);
} else {
// Should never get here.
// CommitTaskAction is not implemented in the simulator
// LaunchTaskAction has to be SimulatorLaunchTaskAction
throw new UnsupportedOperationException("Unimplemented TaskAction: "
+ action);
}
events.addAll(actionEvents);
}
return events;
}
/**
* Transmits a heartbeat event to the jobtracker and processes the response.
*
* @param event HeartbeatEvent to process
* @return list of new events generated in response
*/
private List<SimulatorEvent> processHeartbeatEvent(HeartbeatEvent event) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing heartbeat event " + event);
}
long now = event.getTimeStamp();
// Create the TaskTrackerStatus to report
progressTaskStatuses(now);
List<TaskStatus> taskStatuses = collectAndCloneTaskStatuses();
boolean askForNewTask = (usedMapSlots < maxMapSlots ||
usedReduceSlots < maxReduceSlots);
// 0 means failures == 0 here. Undocumented in TaskTracker, but does not
// seem to be used at all in org.apache.hadoop.mapred .
TaskTrackerStatus taskTrackerStatus =
new SimulatorTaskTrackerStatus(taskTrackerName, hostName, httpPort,
taskStatuses, 0,
maxMapSlots, maxReduceSlots, now);
// This is the right, and only, place to release bookkeping memory held
// by completed tasks: after collectAndCloneTaskStatuses() and before
// heartbeat().
// The status of TIPs to be purged is already cloned & copied to
// taskStatuses for reporting
// We shouldn't run the gc after heartbeat() since KillTaskAction might
// produce new completed tasks that we have not yet reported back and
// don't want to purge immediately.
garbageCollectCompletedTasks();
// Transmit the heartbeat
HeartbeatResponse response = null;
try {
response =
jobTracker.heartbeat(taskTrackerStatus, false, firstHeartbeat,
askForNewTask, heartbeatResponseId);
} catch (IOException ioe) {
throw new IllegalStateException("Internal error", ioe);
}
firstHeartbeat = false;
// The heartbeat got through successfully!
heartbeatResponseId = response.getResponseId();
// Process the heartbeat response
List<SimulatorEvent> events = handleHeartbeatResponse(response, now);
// Next heartbeat
events.add(new HeartbeatEvent(this, now + response.getHeartbeatInterval()));
return events;
}
/**
* Internal helper class used for storing the current status and other
* auxilliary information associated with a task attempt assigned to
* a simulator task tracker.
* WARNING: This is a completely different inner class than the one with
* the same name in SimulatorJobTracker.
*/
static class SimulatorTaskInProgress {
/**
* Current status of the task attempt.
* We store the start time, the start time of reduce phases and the
* run state of the task in this object.
*/
private TaskStatus taskStatus;
/**
* Object storing the run time and the final state of the task attempt.
* It is never read directly by the SimulatorTaskTracker.
*/
private TaskAttemptInfo taskAttempInfo;
/**
* Runtime of the user-space code of the task attempt. This is the full
* runtime for map tasks, and only that of the REDUCE phase for reduce
* tasks.
*/
private final long userSpaceRunTime;
/**
* Constructs an object by copying most of the fields from a
* SimulatorTaskAction.
*/
public SimulatorTaskInProgress(SimulatorLaunchTaskAction action,
TaskStatus taskStatus, long now) {
this.taskStatus = taskStatus;
this.taskAttempInfo = action.getTaskAttemptInfo();
if (taskStatus.getIsMap()) {
this.userSpaceRunTime = taskAttempInfo.getRuntime();
} else {
this.userSpaceRunTime =
((ReduceTaskAttemptInfo)taskAttempInfo).getReduceRuntime();
}
}
/**
* Returns whether the task attempt is a map.
*
* @return true iff the task attempt is a map
*/
public boolean isMapTask() {
return taskStatus.getIsMap();
}
/*
* Returns the current status of the task attempt.
*
* @return current task status
*/
public TaskStatus getTaskStatus() {
return taskStatus;
}
/**
* Sets the status of the task attempt.
*
* @param status the new task status
*/
public void setTaskStatus(TaskStatus status) {
this.taskStatus = status;
}
/**
* Returns the final state of the completed task.
*
* @return the final state of the completed task;
* it is either State.SUCCEEDED or State.FAILED
*/
public State getFinalRunState() {
return taskAttempInfo.getRunState();
}
/**
* Gets the time spent in the user space code of the task attempt.
* This is the full runtime for map tasks, and only that of the REDUCE
* phase for reduce tasks.
*
* @return the user space runtime
*/
public long getUserSpaceRunTime() {
return userSpaceRunTime;
}
}
}