blob: f640056556726990e2186ce3d4880d3588a7c61e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.JobInProgress;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import org.apache.hadoop.tools.rumen.JobStory;
import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
import org.apache.hadoop.mapred.SimulatorJobInProgress;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.security.Credentials;
/**
* {@link SimulatorJobTracker} extends {@link JobTracker}. It implements the
* {@link InterTrackerProtocol} protocols.
*/
@SuppressWarnings("deprecation")
public class SimulatorJobTracker extends JobTracker {
// A queue for cleaning up jobs from the memory. The length of this queue
// is always less than the constant specified by JOBS_IN_MUMAK_MEMORY.
private LinkedList<JobID> cleanupQueue;
// The simulatorClock maintains the current simulation time
// and should always be synchronized with the time maintained by the engine.
private static SimulatorClock clock = null;
public static final Log LOG = LogFactory.getLog(SimulatorJobTracker.class);
// This constant is used to specify how many jobs should be maintained in the
// memory of the mumak simulator.
private static final int JOBS_IN_MUMAK_MEMORY = 50;
// The SimulatorEngine data structure is engine that drives the simulator.
private static SimulatorEngine engine = null;
private static synchronized void resetEngineClock(SimulatorEngine engine, SimulatorClock clock) {
SimulatorJobTracker.engine = engine;
SimulatorJobTracker.clock = clock;
}
/**
* In addition to the standard JobConf object, the constructor for SimulatorJobTracker requires a
* start time for simulation and a reference to the SimulatorEngine object. The clock of the
* JobTracker is set with this start time.
* @param conf the starting configuration of the SimulatorJobTracker.
* @param clock the SimulatorClock object that we use to maintain simulator time.
* @param simulatorEngine the simulatorEngine that is running the simulation.
*/
SimulatorJobTracker(JobConf conf, SimulatorClock clock,
SimulatorEngine simulatorEngine)
throws IOException {
// Invoke the super constructor with a flag that
// indicates simulation
super(conf, clock, true);
resetEngineClock(simulatorEngine, clock);
cleanupQueue = new LinkedList<JobID>();
}
/**
* Starts the JobTracker with given configuration and a given time. It also
* starts the JobNotifier thread.
* @param conf the starting configuration of the SimulatorJobTracker.
* @param startTime the starting time of simulation -- this is used to
* initialize the clock.
* @param engine the SimulatorEngine that we talk to.
* @throws IOException
*/
public static SimulatorJobTracker startTracker(JobConf conf, long startTime, SimulatorEngine engine)
throws IOException {
SimulatorJobTracker result = null;
try {
SimulatorClock simClock = new SimulatorClock(startTime);
result = new SimulatorJobTracker(conf, simClock, engine);
result.taskScheduler.setTaskTrackerManager(result);
} catch (IOException e) {
LOG.warn("Error starting tracker: "
+ StringUtils.stringifyException(e));
}
if (result != null) {
JobEndNotifier.startNotifier();
}
return result;
}
/**
* Start the SimulatorJobTracker with given configuration after
* creating its own SimulatorEngine. Pretty much
* used for debugging only.
* @param conf :The starting configuration of the SimulatorJobTracker
* @param startTime :The starting time of simulation
* @return void
* @throws IOException
* @throws InterruptedException
*/
public static SimulatorJobTracker startTracker(JobConf conf, long startTime)
throws IOException, InterruptedException {
return startTracker(conf, startTime, new SimulatorEngine());
}
@Override
public void offerService() throws InterruptedException, IOException {
taskScheduler.start();
LOG.info("started taskScheduler...");
synchronized (this) {
state = State.RUNNING;
}
}
/**
* Returns the simulatorClock in that is a static object in SimulatorJobTracker.
*
* @return SimulatorClock object.
*/
static Clock getClock() {
assert(engine.getCurrentTime() == clock.getTime()):
" Engine time = " + engine.getCurrentTime() +
" JobTracker time = " + clock.getTime();
return clock;
}
/**
* Overriding the getCleanTaskReports function of the
* original JobTracker since we do not have setup and cleanup
* tasks.
* @param jobid JobID for which we desire cleanup reports.
*/
@Override
public synchronized TaskReport[] getCleanupTaskReports(JobID jobid) {
return null;
}
/**
* Overriding since we do not support queue acls.
*/
@Override
public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException {
return null;
}
/**
* Overriding since we do not simulate setup/cleanup tasks.
*/
@Override
public synchronized TaskReport[] getSetupTaskReports(JobID jobid) {
return null;
}
@Override
public synchronized JobStatus submitJob(
JobID jobId, String jobSubmitDir, Credentials ts)
throws IOException {
boolean loggingEnabled = LOG.isDebugEnabled();
if (loggingEnabled) {
LOG.debug("submitJob for jobname = " + jobId);
}
if (jobs.containsKey(jobId)) {
// job already running, don't start twice
if (loggingEnabled) {
LOG.debug("Job '" + jobId.getId() + "' already present ");
}
return jobs.get(jobId).getStatus();
}
JobStory jobStory = SimulatorJobCache.get(jobId);
if (jobStory == null) {
throw new IllegalArgumentException("Job not found in SimulatorJobCache: "+jobId);
}
validateAndSetClock(jobStory.getSubmissionTime());
SimulatorJobInProgress job = new SimulatorJobInProgress(jobId, jobSubmitDir, this,
this.conf,
jobStory);
// Check whether the queue information provided is valid
try {
checkQueueValidity(job);
} catch(IOException ioe) {
LOG.warn("Queue given for job " + job.getJobID() + " is not valid:"
+ ioe);
throw ioe;
}
// Check the job if it cannot run in the cluster because of invalid memory
// requirements.
try {
checkMemoryRequirements(job);
} catch (IOException ioe) {
LOG.warn("Exception in checking Memory requirements of jobId: " + jobId
+ ioe);
//throw ioe;
}
return addJob(jobId, job);
}
/**
* Return the SimulatorJob object given a jobID.
* @param jobid
* @return
*/
private SimulatorJobInProgress getSimulatorJob(JobID jobid) {
return (SimulatorJobInProgress)jobs.get(jobid);
}
/**
* Safely clean-up all data structures at the end of the
* job (success/failure/killed). In addition to performing the tasks that the
* original finalizeJob does, we also inform the SimulatorEngine about the
* completion of this job.
*
* @param job completed job.
*/
@Override
synchronized void finalizeJob(JobInProgress job) {
// Let the SimulatorEngine know that the job is done
JobStatus cloneStatus = (JobStatus)job.getStatus().clone();
engine.markCompletedJob(cloneStatus,
SimulatorJobTracker.getClock().getTime());
JobID jobId = job.getStatus().getJobID();
LOG.info("Finished job " + jobId + " endtime = " +
getClock().getTime() + " with status: " +
JobStatus.getJobRunState(job.getStatus().getRunState()));
// for updating the metrics and JobHistory, invoke the original
// finalizeJob.
super.finalizeJob(job);
// now placing this job in queue for future nuking
cleanupJob(job);
}
/**
* The cleanupJob method maintains the queue cleanQueue. When a job is finalized,
* it is added to the cleanupQueue. Jobs are removed from the cleanupQueue
* so that its size is maintained to be less than that specified by
* JOBS_IN_MUMAK_MEMORY.
* @param job : The JobInProgress object that was just finalized and is
* going to be added to the cleanupQueue.
*/
private void cleanupJob(JobInProgress job) {
cleanupQueue.add(job.getJobID());
while(cleanupQueue.size()> JOBS_IN_MUMAK_MEMORY) {
JobID removedJob = cleanupQueue.poll();
retireJob(removedJob, "");
}
}
// //////////////////////////////////////////////////
// InterTrackerProtocol
// //////////////////////////////////////////////////
@Override
synchronized boolean processHeartbeat(TaskTrackerStatus trackerStatus,
boolean initialContact) {
boolean loggingEnabled = LOG.isDebugEnabled();
String trackerName = trackerStatus.getTrackerName();
boolean seenBefore = updateTaskTrackerStatus(trackerName, trackerStatus);
TaskTracker taskTracker = getTaskTracker(trackerName);
// update the status of the task tracker. Also updates all aggregate
// statistics
if (loggingEnabled) {
LOG.debug("processing heartbeat for " + trackerName);
LOG.debug("updating TaskTracker status for " + trackerName);
}
if (initialContact) {
// If it's first contact, then clear out
// any state hanging around
if (seenBefore) {
lostTaskTracker(taskTracker);
}
} else {
// If not first contact, there should be some record of the tracker
if (!seenBefore) {
LOG.warn("Status from unknown Tracker : " + trackerName);
updateTaskTrackerStatus(trackerName, null);
return false;
}
}
if (initialContact) {
if (loggingEnabled) {
LOG.debug("adding new tracker name = " + trackerName);
}
addNewTracker(taskTracker);
}
if (loggingEnabled) {
LOG.debug("updating TaskStatus for " + trackerName);
}
// update status of all tasks from heartbeat
updateTaskStatuses(trackerStatus);
return true;
}
/**
* Utility to validate the current simulation time
* @param newSimulationTime
*/
private void validateAndSetClock(long newSimulationTime) {
// We do not use the getClock routine here as
// the Engine and JobTracker clocks are different at
// this point.
long currentSimulationTime = clock.getTime();
if (newSimulationTime < currentSimulationTime) {
// time has gone backwards
throw new IllegalArgumentException("Time has gone backwards! " +
"newSimulationTime: " + newSimulationTime +
" while currentTime: " +
currentSimulationTime);
}
// the simulation time should also match that in the engine
assert(newSimulationTime == engine.getCurrentTime()) :
" newTime =" + newSimulationTime +
" engineTime = " + engine.getCurrentTime();
// set the current simulation time
clock.setTime(newSimulationTime);
}
@Override
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,
boolean restarted, boolean initialContact, boolean acceptNewTasks,
short responseId) throws IOException {
boolean loggingEnabled = LOG.isDebugEnabled();
if (loggingEnabled) {
LOG.debug("Got heartbeat from: " + status.getTrackerName()
+ " (restarted: " + restarted + " initialContact: " + initialContact
+ " acceptNewTasks: " + acceptNewTasks + ")" + " with responseId: "
+ responseId);
}
if (!(status instanceof SimulatorTaskTrackerStatus)) {
throw new IllegalArgumentException(
"Expecting SimulatorTaskTrackerStatus, but got " + status.getClass());
}
SimulatorTaskTrackerStatus taskTrackerStatus =
(SimulatorTaskTrackerStatus) status;
String trackerName = taskTrackerStatus.getTrackerName();
// validate and set the simulation time
// according to the time sent by the tracker
validateAndSetClock(taskTrackerStatus.getCurrentSimulationTime());
HeartbeatResponse prevHeartbeatResponse =
trackerToHeartbeatResponseMap.get(trackerName);
if (initialContact != true) {
// If this isn't the 'initial contact' from the tasktracker,
// there is something seriously wrong if the JobTracker has
// no record of the 'previous heartbeat'; if so, ask the
// tasktracker to re-initialize itself.
if (prevHeartbeatResponse == null) {
// This is the first heartbeat from the old tracker to the newly
// started JobTracker
// Jobtracker might have restarted but no recovery is needed
// otherwise this code should not be reached
LOG.warn("Serious problem, cannot find record of 'previous' " +
" heartbeat for '" + trackerName +
"'; reinitializing the tasktracker");
return new HeartbeatResponse(responseId,
new TaskTrackerAction[] { new ReinitTrackerAction() });
} else {
// It is completely safe to not process a 'duplicate' heartbeat
// from a
// {@link TaskTracker} since it resends the heartbeat when rpcs
// are
// lost see {@link TaskTracker.transmitHeartbeat()};
// acknowledge it by re-sending the previous response to let the
// {@link TaskTracker} go forward.
if (prevHeartbeatResponse.getResponseId() != responseId) {
if (loggingEnabled) {
LOG.debug("Ignoring 'duplicate' heartbeat from '" + trackerName
+ "'; resending the previous 'lost' response");
}
return prevHeartbeatResponse;
}
}
}
if (loggingEnabled) {
LOG.debug("processed heartbeat for responseId = " + responseId);
}
short newResponseId = (short) (responseId + 1);
status.setLastSeen(getClock().getTime());
// process the incoming heartbeat
if (!processHeartbeat(taskTrackerStatus, initialContact)) {
if (prevHeartbeatResponse != null) {
trackerToHeartbeatResponseMap.remove(trackerName);
}
return new HeartbeatResponse(newResponseId,
new TaskTrackerAction[] { new ReinitTrackerAction() });
}
// Initialize the response to be sent for the heartbeat
HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
if (acceptNewTasks) {
TaskTracker taskTracker = getTaskTracker(trackerName);
// get the list of tasks to be executed on this tasktracker
List<Task> tasks = taskScheduler.assignTasks(taskTracker);
if (tasks != null) {
if (loggingEnabled && tasks.size()>0) {
LOG.debug("Tasks found from TaskScheduler: number = " + tasks.size());
}
for (Task task : tasks) {
TaskAttemptID taskAttemptID = task.getTaskID();
// get the JobID and the JIP object for this taskAttempt
JobID jobID = taskAttemptID.getJobID();
SimulatorJobInProgress job = getSimulatorJob(jobID);
if (job == null) {
LOG.error("Getting taskAttemptId=" + taskAttemptID +
" for job " + jobID +
" not present in SimulatorJobTracker");
throw new IOException("Getting taskAttemptId=" + taskAttemptID +
" for job " + jobID +
" not present in SimulatorJobTracker");
}
// add the launch task action to the list
if (loggingEnabled) {
LOG.debug("Getting taskAttemptInfo for '" + taskAttemptID
+ "' for tracker '" + trackerName + "'");
}
TaskAttemptInfo taskAttemptInfo =
job.getTaskAttemptInfo(taskTracker, taskAttemptID);
if (taskAttemptInfo == null) {
throw new RuntimeException("Empty taskAttemptInfo: " +
"task information missing");
}
// create the SLTA object using the task attempt information
if (loggingEnabled) {
LOG
.debug("Adding LaunchTaskAction for '" + taskAttemptID
+ "' for tracker " + trackerName + " taskType="
+ taskAttemptID.getTaskType() + " time="
+ getClock().getTime());
}
SimulatorLaunchTaskAction newlaunchTask =
new SimulatorLaunchTaskAction(task, taskAttemptInfo);
if (loggingEnabled) {
LOG.debug("Job " + jobID + " launched taskattempt " +
taskAttemptID + " at " + getClock().getTime());
}
actions.add(newlaunchTask);
}
}
}
// Check for tasks to be killed
// also get the attemptIDs in a separate set for quick lookup
// during the MapCompletion generation
List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName);
if (killTasksList != null) {
if (loggingEnabled) {
for (TaskTrackerAction ttAction : killTasksList) {
LOG.debug("Time =" + getClock().getTime() + " tracker=" + trackerName
+ " KillTaskAction for:"
+ ((KillTaskAction) ttAction).getTaskID());
}
}
actions.addAll(killTasksList);
}
// Check for tasks whose outputs can be saved
// this is currently a no-op
List<TaskTrackerAction> commitTasksList = getTasksToSave(status);
if (commitTasksList != null) {
actions.addAll(commitTasksList);
}
// check the reduce tasks in this task-tracker, and add in the
// AllMapTasksCompletedTaskAction for each of the reduce tasks
// this enables the reduce tasks to move from shuffle to reduce phase
List<TaskTrackerAction> mapCompletionTasks =
getMapCompletionTasks(taskTrackerStatus, killTasksList);
if (mapCompletionTasks != null) {
actions.addAll(mapCompletionTasks);
}
if (loggingEnabled) {
LOG.debug("Done with collection actions for tracker " + trackerName
+ " for responseId " + responseId);
}
// calculate next heartbeat interval and put in heartbeat response
int nextInterval = getNextHeartbeatInterval();
response.setHeartbeatInterval(nextInterval);
response.setActions(actions.toArray(new TaskTrackerAction[actions
.size()]));
if (loggingEnabled) {
LOG.debug("Nextinterval for tracker " + trackerName + " is "
+ nextInterval);
}
// Update the trackerToHeartbeatResponseMap
trackerToHeartbeatResponseMap.put(trackerName, response);
// Done processing the hearbeat, now remove 'marked' tasks
removeMarkedTasks(trackerName);
return response;
}
/**
* The getMapCompletion method is intended to inform taskTrackes when to change the status
* of reduce tasks from "shuffle" to "reduce".
* For all reduce tasks in this TaskTracker that are
* in the shuffle phase, getMapCompletionTasks finds the number of finished maps for
* this job from the jobInProgress object. If this
* number equals the number of desired maps for this job, then it adds an
* AllMapsCompletedTaskAction for this reduce task-attempt.
*
* @param status
* The status of the task tracker
* @return List of TaskTrackerActions
*/
private List<TaskTrackerAction> getMapCompletionTasks(
TaskTrackerStatus status,
List<TaskTrackerAction> tasksToKill) {
boolean loggingEnabled = LOG.isDebugEnabled();
// Build up the list of tasks about to be killed
Set<TaskAttemptID> killedTasks = new HashSet<TaskAttemptID>();
if (tasksToKill != null) {
for (TaskTrackerAction taskToKill : tasksToKill) {
killedTasks.add(((KillTaskAction)taskToKill).getTaskID());
}
}
String trackerName = status.getTrackerName();
List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
// loop through the list of task statuses
for (TaskStatus report : status.getTaskReports()) {
TaskAttemptID taskAttemptId = report.getTaskID();
SimulatorJobInProgress job = getSimulatorJob(taskAttemptId.getJobID());
if(job ==null) {
// This job has completed before.
// and this is a zombie reduce-task
Set<JobID> jobsToCleanup = trackerToJobsToCleanup.get(trackerName);
if (jobsToCleanup == null) {
jobsToCleanup = new HashSet<JobID>();
trackerToJobsToCleanup.put(trackerName, jobsToCleanup);
}
jobsToCleanup.add(taskAttemptId.getJobID());
continue;
}
JobStatus jobStatus = job.getStatus();
TaskInProgress tip = taskidToTIPMap.get(taskAttemptId);
// if the job is running, attempt is running
// no KillTask is being sent for this attempt
// task is a reduce and attempt is in shuffle phase
// this precludes sending both KillTask and AllMapsCompletion
// for same reduce-attempt
if (jobStatus.getRunState()== JobStatus.RUNNING &&
tip.isRunningTask(taskAttemptId) &&
!killedTasks.contains(taskAttemptId) &&
!report.getIsMap() &&
report.getPhase() == TaskStatus.Phase.SHUFFLE) {
if (loggingEnabled) {
LOG.debug("Need map-completion information for REDUCEattempt "
+ taskAttemptId + " in tracker " + trackerName);
LOG.debug("getMapCompletion: job=" + job.getJobID() + " pendingMaps="
+ job.pendingMaps());
}
// Check whether the number of finishedMaps equals the
// number of maps
boolean canSendMapCompletion = false;
canSendMapCompletion = (job.finishedMaps()==job.desiredMaps());
if (canSendMapCompletion) {
if (loggingEnabled) {
LOG.debug("Adding MapCompletion for taskAttempt " + taskAttemptId
+ " in tracker " + trackerName);
LOG.debug("FinishedMaps for job:" + job.getJobID() + " is = "
+ job.finishedMaps() + "/" + job.desiredMaps());
LOG.debug("AllMapsCompleted for task " + taskAttemptId + " time="
+ getClock().getTime());
}
actions.add(new AllMapsCompletedTaskAction(taskAttemptId));
}
}
}
return actions;
}
@Override
void updateTaskStatuses(TaskTrackerStatus status) {
boolean loggingEnabled = LOG.isDebugEnabled();
String trackerName = status.getTrackerName();
// loop through the list of task statuses
if (loggingEnabled) {
LOG.debug("Updating task statuses for tracker " + trackerName);
}
for (TaskStatus report : status.getTaskReports()) {
report.setTaskTracker(trackerName);
TaskAttemptID taskAttemptId = report.getTaskID();
JobID jobid = taskAttemptId.getJobID();
if (loggingEnabled) {
LOG.debug("Updating status for job " + jobid + " for task = "
+ taskAttemptId + " status=" + report.getProgress()
+ " for tracker: " + trackerName);
}
SimulatorJobInProgress job =
getSimulatorJob(taskAttemptId.getJobID());
if(job ==null) {
// This job bas completed before.
Set<JobID> jobsToCleanup = trackerToJobsToCleanup.get(trackerName);
if (jobsToCleanup == null) {
jobsToCleanup = new HashSet<JobID>();
trackerToJobsToCleanup.put(trackerName, jobsToCleanup);
}
jobsToCleanup.add(taskAttemptId.getJobID());
continue;
}
TaskInProgress tip = taskidToTIPMap.get(taskAttemptId);
JobStatus prevStatus = (JobStatus) job.getStatus().clone();
job.updateTaskStatus(tip, (TaskStatus) report.clone());
JobStatus newStatus = (JobStatus) job.getStatus().clone();
if (tip.isComplete()) {
if (loggingEnabled) {
LOG.debug("Completed task attempt " + taskAttemptId + " tracker:"
+ trackerName + " time=" + getClock().getTime());
}
}
if (prevStatus.getRunState() != newStatus.getRunState()) {
if (loggingEnabled) {
LOG.debug("Informing Listeners of job " + jobid + " of newStatus "
+ JobStatus.getJobRunState(newStatus.getRunState()));
}
JobStatusChangeEvent event = new JobStatusChangeEvent(job,
EventType.RUN_STATE_CHANGED, prevStatus, newStatus);
updateJobInProgressListeners(event);
}
}
}
}