blob: c13324c4341108779b12d79bb2446bdd1abd67f4 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.mapred;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.AbstractQueue.AbstractQueueComparator;
import org.apache.hadoop.mapred.JobTracker.IllegalStateException;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import org.apache.hadoop.util.StringUtils;
* A {@link TaskScheduler} that implements the requirements in HADOOP-3421
* and provides a HOD-less way to share large clusters. This scheduler
* provides the following features:
* * support for queues, where a job is submitted to a queue.
* * Queues are assigned a fraction of the capacity of the grid (their
* 'capacity') in the sense that a certain capacity of resources
* will be at their disposal. All jobs submitted to the queues of an Org
* will have access to the capacity to the Org.
* * Free resources can be allocated to any queue beyond its
* capacity.
* * Queues optionally support job priorities (disabled by default).
* * Within a queue, jobs with higher priority will have access to the
* queue's resources before jobs with lower priority. However, once a job
* is running, it will not be preempted for a higher priority job.
* * In order to prevent one or more users from monopolizing its resources,
* each queue enforces a limit on the percentage of resources allocated to a
* user at any given time, if there is competition for them.
class CapacityTaskScheduler extends TaskScheduler {
/** quick way to get qsc object given a queue name */
private Map<String, QueueSchedulingContext> queueInfoMap =
new HashMap<String, QueueSchedulingContext>();
//Root level queue . It has all the
//cluster capacity at its disposal.
//Queues declared by users would
//be children of this queue.
//CS would have handle to root.
private AbstractQueue root = null;
* This class captures scheduling information we want to display or log.
private static class SchedulingDisplayInfo {
private String queueName;
CapacityTaskScheduler scheduler;
SchedulingDisplayInfo(String queueName, CapacityTaskScheduler scheduler) {
this.queueName = queueName;
this.scheduler = scheduler;
public String toString(){
// note that we do not call updateContextObjects() here for performance
// reasons. This means that the data we print out may be slightly
// stale. This data is updated whenever assignTasks() is called
// If this doesn't happen, the data gets stale. If we see
// this often, we may need to detect this situation and call
// updateContextObjects(), or just call it each time.
return scheduler.getDisplayInfo(queueName);
// this class encapsulates the result of a task lookup
private static class TaskLookupResult {
static enum LookUpStatus {
// constant TaskLookupResult objects. Should not be accessed directly.
private static final TaskLookupResult NoTaskLookupResult =
new TaskLookupResult(null, TaskLookupResult.LookUpStatus.NO_TASK_FOUND);
private static final TaskLookupResult MemFailedLookupResult =
new TaskLookupResult(null,
private LookUpStatus lookUpStatus;
private Task task;
// should not call this constructor directly. use static factory methods.
private TaskLookupResult(Task t, LookUpStatus lUStatus) {
this.task = t;
this.lookUpStatus = lUStatus;
static TaskLookupResult getTaskFoundResult(Task t) {
LOG.debug("Returning task " + t);
return new TaskLookupResult(t, LookUpStatus.TASK_FOUND);
static TaskLookupResult getNoTaskFoundResult() {
return NoTaskLookupResult;
static TaskLookupResult getMemFailedResult() {
return MemFailedLookupResult;
Task getTask() {
return task;
LookUpStatus getLookUpStatus() {
return lookUpStatus;
* This class handles the scheduling algorithms.
* The algos are the same for both Map and Reduce tasks.
* There may be slight variations later, in which case we can make this
* an abstract base class and have derived classes for Map and Reduce.
private static abstract class TaskSchedulingMgr {
/** our TaskScheduler object */
protected CapacityTaskScheduler scheduler;
protected TaskType type = null;
abstract Task obtainNewTask(TaskTrackerStatus taskTracker,
JobInProgress job) throws IOException;
abstract int getClusterCapacity();
abstract TaskSchedulingContext getTSC(
QueueSchedulingContext qsc);
* To check if job has a speculative task on the particular tracker.
* @param job job to check for speculative tasks.
* @param tts task tracker on which speculative task would run.
* @return true if there is a speculative task to run on the tracker.
abstract boolean hasSpeculativeTask(JobInProgress job,
TaskTrackerStatus tts);
* Comparator to sort queues.
* For maps, we need to sort on QueueSchedulingContext.mapTSC. For
* reducers, we use reduceTSC. So we'll need separate comparators.
private static abstract class QueueComparator
implements Comparator<AbstractQueue> {
abstract TaskSchedulingContext getTSC(
QueueSchedulingContext qsi);
public int compare(AbstractQueue q1, AbstractQueue q2) {
TaskSchedulingContext t1 = getTSC(q1.getQueueSchedulingContext());
TaskSchedulingContext t2 = getTSC(q2.getQueueSchedulingContext());
// look at how much capacity they've filled. Treat a queue with
// capacity=0 equivalent to a queue running at capacity
double r1 = (0 == t1.getCapacity())? 1.0f:
(double) t1.getNumSlotsOccupied() /(double) t1.getCapacity();
double r2 = (0 == t2.getCapacity())? 1.0f:
(double) t2.getNumSlotsOccupied() /(double) t2.getCapacity();
if (r1<r2) return -1;
else if (r1>r2) return 1;
else return 0;
// subclass for map and reduce comparators
private static final class MapQueueComparator extends QueueComparator {
TaskSchedulingContext getTSC(QueueSchedulingContext qsi) {
return qsi.getMapTSC();
private static final class ReduceQueueComparator extends QueueComparator {
TaskSchedulingContext getTSC(QueueSchedulingContext qsi) {
return qsi.getReduceTSC();
// these are our comparator instances
protected final static MapQueueComparator mapComparator =
new MapQueueComparator();
protected final static ReduceQueueComparator reduceComparator =
new ReduceQueueComparator();
// and this is the comparator to use
protected QueueComparator queueComparator;
// Returns queues sorted according to the QueueComparator.
// Mainly for testing purposes.
String[] getOrderedQueues() {
List<AbstractQueue> queueList = getOrderedJobQueues();
List<String> queues = new ArrayList<String>(queueList.size());
for (AbstractQueue q : queueList) {
return queues.toArray(new String[queues.size()]);
* Return an ordered list of {@link JobQueue}s wrapped as
* {@link AbstractQueue}s. Ordering is according to {@link QueueComparator}.
* To reflect the true ordering of the JobQueues, the complete hierarchy is
* sorted such that {@link AbstractQueue}s are ordered according to their
* needs at each level in the hierarchy, after which only the leaf level
* {@link JobQueue}s are returned.
* @return a list of {@link JobQueue}s wrapped as {@link AbstractQueue}s
* sorted by their needs.
List<AbstractQueue> getOrderedJobQueues() {
return scheduler.root.getDescendentJobQueues();
TaskSchedulingMgr(CapacityTaskScheduler sched) {
scheduler = sched;
private boolean isUserOverLimit(JobInProgress j,
QueueSchedulingContext qsc) {
// what is our current capacity? It is equal to the queue-capacity if
// we're running below capacity. If we're running over capacity, then its
// #running plus slotPerTask of the job (which is the number of extra
// slots we're getting).
int currentCapacity;
TaskSchedulingContext tsi = getTSC(qsc);
if (tsi.getNumSlotsOccupied() < tsi.getCapacity()) {
currentCapacity = tsi.getCapacity();
else {
currentCapacity =
tsi.getNumSlotsOccupied() +
int limit = Math.max((int)(Math.ceil((double)currentCapacity/
(double) qsc.getNumJobsByUser().size())),
(int)(Math.ceil((double)(qsc.getUlMin() *currentCapacity)/100.0)));
String user = j.getProfile().getUser();
if (tsi.getNumSlotsOccupiedByUser().get(user) >= limit) {
LOG.debug("User " + user + " is over limit, num slots occupied = " +
tsi.getNumSlotsOccupiedByUser().get(user) + ", limit = " + limit);
return true;
else {
return false;
* This is the central scheduling method.
* It tries to get a task from jobs in a single queue.
* Always return a TaskLookupResult object. Don't return null.
private TaskLookupResult getTaskFromQueue(TaskTracker taskTracker,
QueueSchedulingContext qsi)
throws IOException {
TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
// we only look at jobs in the running queues, as these are the ones
// who have been potentially initialized
for (JobInProgress j :
.getRunningJobs()) {
// only look at jobs that can be run. We ignore jobs that haven't
// initialized, or have completed but haven't been removed from the
// running queue.
//Check queue for maximum capacity .
if(areTasksInQueueOverMaxCapacity(qsi,j.getNumSlotsPerTask(type))) {
if (j.getStatus().getRunState() != JobStatus.RUNNING) {
// check if the job's user is over limit
if (isUserOverLimit(j, qsi)) {
//If this job meets memory requirements. Ask the JobInProgress for
//a task to be scheduled on the task tracker.
//if we find a job then we pass it on.
if (scheduler.memoryMatcher.matchesMemoryRequirements(j, type,
taskTrackerStatus)) {
// We found a suitable job. Get task from it.
Task t = obtainNewTask(taskTrackerStatus, j);
//if there is a task return it immediately.
if (t != null) {
// we're successful in getting a task
return TaskLookupResult.getTaskFoundResult(t);
} else {
//skip to the next job in the queue.
LOG.debug("Job " + j.getJobID().toString()
+ " returned no tasks of type " + type);
} else {
// if memory requirements don't match then we check if the job has
// pending tasks and has insufficient number of 'reserved'
// tasktrackers to cover all pending tasks. If so we reserve the
// current tasktracker for this job so that high memory jobs are not
// starved
TaskDataView view = TaskDataView.getTaskDataView(type);
if ((view.getPendingTasks(j) != 0 &&
!view.hasSufficientReservedTaskTrackers(j))) {
// Reserve all available slots on this tasktracker + ": Reserving "
+ taskTracker.getTrackerName()
+ " since memory-requirements don't match");
taskTracker.reserveSlots(type, j, taskTracker
// Block
return TaskLookupResult.getMemFailedResult();
}//end of memory check block
// if we're here, this job has no task to run. Look at the next job.
}//end of for loop
// if we're here, we haven't found any task to run among all jobs in
// the queue. This could be because there is nothing to run, or that
// the user limit for some user is too strict, i.e., there's at least
// one user who doesn't have enough tasks to satisfy his limit. If
// it's the latter case, re-look at jobs without considering user
// limits, and get a task from the first eligible job; however
// we do not 'reserve' slots on tasktrackers anymore since the user is
// already over the limit
// Note: some of the code from above is repeated here. This is on
// purpose as it improves overall readability.
// Note: we walk through jobs again. Some of these jobs, which weren't
// considered in the first pass, shouldn't be considered here again,
// but we still check for their viability to keep the code simple. In
// some cases, for high mem jobs that have nothing to run, we call
// obtainNewTask() unnecessarily. Should this be a problem, we can
// create a list of jobs to look at (those whose users were over
// limit) in the first pass and walk through that list only.
for (JobInProgress j :
.getRunningJobs()) {
if (j.getStatus().getRunState() != JobStatus.RUNNING) {
//Check for the maximum-capacity.
if(areTasksInQueueOverMaxCapacity(qsi,j.getNumSlotsPerTask(type))) {
if (scheduler.memoryMatcher.matchesMemoryRequirements(j, type,
taskTrackerStatus)) {
// We found a suitable job. Get task from it.
Task t = obtainNewTask(taskTrackerStatus, j);
//if there is a task return it immediately.
if (t != null) {
// we're successful in getting a task
return TaskLookupResult.getTaskFoundResult(t);
} else {
} else {
//if memory requirements don't match then we check if the
//job has either pending or speculative task. If the job
//has pending or speculative task we block till this job
//tasks get scheduled, so that high memory jobs are not
if (TaskDataView.getTaskDataView(type).getPendingTasks(j) != 0 ||
hasSpeculativeTask(j, taskTrackerStatus)) {
return TaskLookupResult.getMemFailedResult();
}//end of memory check block
}//end of for loop
// found nothing for this queue, look at the next one.
String msg = "Found no task from the queue " + qsi.getQueueName();
return TaskLookupResult.getNoTaskFoundResult();
// Always return a TaskLookupResult object. Don't return null.
// The caller is responsible for ensuring that the QSC objects and the
// collections are up-to-date.
private TaskLookupResult assignTasks(TaskTracker taskTracker)
throws IOException {
TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
// Check if this tasktracker has been reserved for a job...
JobInProgress job = taskTracker.getJobForFallowSlot(type);
if (job != null) {
int availableSlots = taskTracker.getAvailableSlots(type);
if (LOG.isDebugEnabled()) {
LOG.debug(job.getJobID() + ": Checking 'reserved' tasktracker " +
taskTracker.getTrackerName() + " with " + availableSlots +
" '" + type + "' slots");
if (availableSlots >= job.getNumSlotsPerTask(type)) {
// Unreserve
taskTracker.unreserveSlots(type, job);
// We found a suitable job. Get task from it.
Task t = obtainNewTask(taskTrackerStatus, job);
//if there is a task return it immediately.
if (t != null) {
if (LOG.isDebugEnabled()) { + ": Got " + t.getTaskID() +
" for reserved tasktracker " +
// we're successful in getting a task
return TaskLookupResult.getTaskFoundResult(t);
} else {
// Re-reserve the current tasktracker
taskTracker.reserveSlots(type, job, availableSlots);
if (LOG.isDebugEnabled()) {
LOG.debug(job.getJobID() + ": Re-reserving " +
return TaskLookupResult.getMemFailedResult();
for (AbstractQueue q : getOrderedJobQueues()) {
QueueSchedulingContext qsc = q.getQueueSchedulingContext();
// we may have queues with capacity=0. We shouldn't look at jobs from
// these queues
if (0 == getTSC(qsc).getCapacity()) {
//This call is important for optimization purposes , if we
//have reached the limit already no need for traversing the queue.
if(this.areTasksInQueueOverMaxCapacity(qsc,1)) {
TaskLookupResult tlr = getTaskFromQueue(taskTracker, qsc);
TaskLookupResult.LookUpStatus lookUpStatus = tlr.getLookUpStatus();
if (lookUpStatus == TaskLookupResult.LookUpStatus.NO_TASK_FOUND) {
continue; // Look in other queues.
// if we find a task, return
if (lookUpStatus == TaskLookupResult.LookUpStatus.TASK_FOUND) {
return tlr;
// if there was a memory mismatch, return
else if (lookUpStatus ==
return tlr;
// nothing to give
return TaskLookupResult.getNoTaskFoundResult();
* Check if maximum-capacity is set for this queue.
* If set and greater than 0 ,
* check if numofslotsoccupied+numSlotsPerTask is greater than
* maximum-Capacity ,if yes , implies this queue is over limit.
* Incase noOfSlotsOccupied is less than maximum-capacity ,but ,
* numOfSlotsOccupied+noSlotsPerTask is more than maximum-capacity we still
* dont assign the task . This may lead to under utilization of very small
* set of slots. But this is ok ,as we strictly respect the maximum-capacity
* @param qsc
* @param noOfSlotsPerTask
* @return true if queue is over maximum-capacity
private boolean areTasksInQueueOverMaxCapacity(
QueueSchedulingContext qsc,int noOfSlotsPerTask) {
TaskSchedulingContext tsi = getTSC(qsc);
//check for maximum-capacity
if(tsi.getMaxCapacity() >= 0) {
if ((tsi.getNumSlotsOccupied() + noOfSlotsPerTask) >
tsi.getMaxCapacity()) {
if (LOG.isDebugEnabled()) {
"Queue " + qsc.getQueueName() + " " + "has reached its max " +
type + "Capacity");
LOG.debug("Current running tasks " + tsi.getCapacity());
return true;
return false;
// for debugging.
private void printQSCs() {
if (LOG.isDebugEnabled()) {
StringBuffer s = new StringBuffer();
for (AbstractQueue aq: getOrderedJobQueues()) {
QueueSchedulingContext qsi = aq.getQueueSchedulingContext();
TaskSchedulingContext tsi = getTSC(qsi);
Collection<JobInProgress> runJobs =
" Queue '%s'(%s): runningTasks=%d, "
+ "occupiedSlots=%d, capacity=%d, runJobs=%d maximumCapacity=%d ",
this.type, tsi.getNumRunningTasks(),
tsi.getNumSlotsOccupied(), tsi.getCapacity(), (runJobs.size()),
* Check if one of the tasks have a speculative task to execute on the
* particular task tracker.
* @param tips tasks of a job
* @param tts task tracker status for which we are asking speculative tip
* @return true if job has a speculative task to run on particular TT.
boolean hasSpeculativeTask(
TaskInProgress[] tips,
TaskTrackerStatus tts) {
long currentTime = System.currentTimeMillis();
for(TaskInProgress tip : tips) {
&& !(tip.hasRunOnMachine(tts.getHost(), tts.getTrackerName()))
&& tip.canBeSpeculated(currentTime)) {
return true;
return false;
* The scheduling algorithms for map tasks.
private static class MapSchedulingMgr extends TaskSchedulingMgr {
MapSchedulingMgr(CapacityTaskScheduler schedulr) {
type = TaskType.MAP;
queueComparator = mapComparator;
Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job)
throws IOException {
ClusterStatus clusterStatus =
int numTaskTrackers = clusterStatus.getTaskTrackers();
return job.obtainNewMapTask(taskTracker, numTaskTrackers,
int getClusterCapacity() {
return scheduler.taskTrackerManager.getClusterStatus().getMaxMapTasks();
TaskSchedulingContext getTSC(QueueSchedulingContext qsi) {
return qsi.getMapTSC();
boolean hasSpeculativeTask(JobInProgress job, TaskTrackerStatus tts) {
//Check if job supports speculative map execution first then
//check if job has speculative maps.
return (job.getJobConf().getMapSpeculativeExecution())&& (
* The scheduling algorithms for reduce tasks.
private static class ReduceSchedulingMgr extends TaskSchedulingMgr {
ReduceSchedulingMgr(CapacityTaskScheduler schedulr) {
type = TaskType.REDUCE;
queueComparator = reduceComparator;
Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job)
throws IOException {
ClusterStatus clusterStatus =
int numTaskTrackers = clusterStatus.getTaskTrackers();
return job.obtainNewReduceTask(taskTracker, numTaskTrackers,
int getClusterCapacity() {
return scheduler.taskTrackerManager.getClusterStatus()
TaskSchedulingContext getTSC(QueueSchedulingContext qsi) {
return qsi.getReduceTSC();
boolean hasSpeculativeTask(JobInProgress job, TaskTrackerStatus tts) {
//check if the job supports reduce speculative execution first then
//check if the job has speculative tasks.
return (job.getJobConf().getReduceSpeculativeExecution()) && (
/** the scheduling mgrs for Map and Reduce tasks */
protected TaskSchedulingMgr mapScheduler = new MapSchedulingMgr(this);
protected TaskSchedulingMgr reduceScheduler = new ReduceSchedulingMgr(this);
MemoryMatcher memoryMatcher = new MemoryMatcher();
static final Log LOG = LogFactory.getLog(CapacityTaskScheduler.class);
protected JobQueuesManager jobQueuesManager;
/** whether scheduler has started or not */
private boolean started = false;
* A clock class - can be mocked out for testing.
static class Clock {
long getTime() {
return System.currentTimeMillis();
private Clock clock;
private JobInitializationPoller initializationPoller;
class CapacitySchedulerQueueRefresher extends QueueRefresher {
void refreshQueues(List<JobQueueInfo> newRootQueues)
throws Throwable {
if (!started) {
String msg =
"Capacity Scheduler is not in the 'started' state."
+ " Cannot refresh queues.";
throw new IOException(msg);
CapacitySchedulerConf schedConf = new CapacitySchedulerConf();
initializeQueues(newRootQueues, schedConf, true);
public CapacityTaskScheduler() {
this(new Clock());
// for testing
public CapacityTaskScheduler(Clock clock) {
this.jobQueuesManager = new JobQueuesManager();
this.clock = clock;
QueueRefresher getQueueRefresher() {
return new CapacitySchedulerQueueRefresher();
* Only for testing.
* @param type
* @return
String[] getOrderedQueues(TaskType type) {
if (type == TaskType.MAP) {
return mapScheduler.getOrderedQueues();
} else if (type == TaskType.REDUCE) {
return reduceScheduler.getOrderedQueues();
return null;
public synchronized void start() throws IOException {
if (started) return;
// Initialize MemoryMatcher
// read queue info from config file
QueueManager queueManager = taskTrackerManager.getQueueManager();
// initialize our queues from the config settings
CapacitySchedulerConf schedConf = new CapacitySchedulerConf();
try {
schedConf, false);
} catch (Throwable e) {
LOG.error("Couldn't initialize queues because of the excecption : "
+ StringUtils.stringifyException(e));
throw new IOException(e);
// Queues are ready. Now register jobQueuesManager with the JobTracker so as
// to listen to job changes
//Start thread for initialization
if (initializationPoller == null) {
this.initializationPoller = new JobInitializationPoller(
jobQueuesManager, taskTrackerManager);
initializationPoller.init(jobQueuesManager.getJobQueueNames(), schedConf);
started = true;"Capacity scheduler started successfully");
* Read the configuration and initialize the queues. This operation should be
* done only when either the scheduler is starting or a request is received
* from {@link QueueManager} to refresh the queue configuration.
* <p>
* Even in case of refresh, we do not explicitly destroy AbstractQueue items,
* or the info maps, they will be automatically garbage-collected.
* <p>
* We don't explicitly lock the scheduler completely. This method is called at
* two times. 1) When the scheduler is starting. During this time, the lock
* sequence is JT->scheduler and so we don't need any more locking here. 2)
* When refresh is issued to {@link QueueManager}. When this happens, parallel
* refreshes are guarded by {@link QueueManager} itself by taking its lock.
* @param newRootQueues
* @param schedConf
* @param refreshingQueues
* @throws Throwable
private void initializeQueues(List<JobQueueInfo> newRootQueues,
CapacitySchedulerConf schedConf, boolean refreshingQueues)
throws Throwable {
if (newRootQueues == null) {
throw new IOException(
"Cannot initialize the queues with null root-queues!");
// Sanity check: there should be at least one queue.
if (0 == newRootQueues.size()) {
throw new IllegalStateException("System has no queue configured!");
// Create a new queue-hierarchy builder and try loading the complete
// hierarchy of queues.
AbstractQueue newRootAbstractQueue;
try {
newRootAbstractQueue =
new QueueHierarchyBuilder().createHierarchy(newRootQueues, schedConf);
} catch (Throwable e) {
LOG.error("Exception while tryign to (re)initializing queues : "
+ StringUtils.stringifyException(e));"(Re)initializing the queues with the new configuration "
+ "failed, so keeping the old configuration.");
throw e;
// New configuration is successfully validated and applied, set the new
// configuration to the current queue-hierarchy.
if (refreshingQueues) {
// Scheduler is being refreshed.
// Going to commit the changes to the hierarchy. Lock the scheduler.
synchronized (this) {
AbstractQueueComparator comparator = new AbstractQueueComparator();
} else {
// Scheduler is just starting.
this.root = newRootAbstractQueue;
// JobQueue objects are created. Inform the JobQueuesManager so that it
// can track the running/waiting jobs. JobQueuesManager is still not added
// as a listener to JobTracker, so no locking needed.
List<AbstractQueue> allQueues = new ArrayList<AbstractQueue>();
for (AbstractQueue queue : allQueues) {
if (!refreshingQueues) {
// Scheduler is just starting, create the display info also
createDisplayInfo(taskTrackerManager.getQueueManager(), queue.getName());
// QueueSchedulingContext objects are created/have changed. Put them
// (back) in the queue-info so as to be consumed by the UI.
* Inform the {@link JobQueuesManager} about the newly constructed
* {@link JobQueue}s.
private void addJobQueuesToJobQueuesManager() {
List<AbstractQueue> allJobQueues = getRoot().getDescendentJobQueues();
for (AbstractQueue jobQ : allJobQueues) {
/** mostly for testing purposes */
void setInitializationPoller(JobInitializationPoller p) {
this.initializationPoller = p;
public synchronized void terminate() throws IOException {
if (!started) return;
if (jobQueuesManager != null) {
started = false;
public synchronized void setConf(Configuration conf) {
* provided for the test classes
* lets you update the QSI objects and sorted collections
void updateContextInfoForTests() {
ClusterStatus c = taskTrackerManager.getClusterStatus();
int mapClusterCapacity = c.getMaxMapTasks();
int reduceClusterCapacity = c.getMaxReduceTasks();
// update the QSI objects
updateContextObjects(mapClusterCapacity, reduceClusterCapacity);
* Update individual QSC objects.
* We don't need exact information for all variables, just enough for us
* to make scheduling decisions. For example, we don't need an exact count
* of numRunningTasks. Once we count upto the grid capacity, any
* number beyond that will make no difference.
private synchronized void updateContextObjects(int mapClusterCapacity,
int reduceClusterCapacity) {
* The grand plan for assigning a task.
* Always assigns 1 reduce and 1 map , if sufficient slots are
* available for each of types.
* If not , then which ever type of slots are available , that type of task is
* assigned.
* Next, pick a queue. We only look at queues that need a slot. Among these,
* we first look at queues whose (# of running tasks)/capacity is the least.
* Next, pick a job in a queue. we pick the job at the front of the queue
* unless its user is over the user limit.
* Finally, given a job, pick a task from the job.
public synchronized List<Task> assignTasks(TaskTracker taskTracker)
throws IOException {
TaskLookupResult tlr;
TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
List<Task> result = new ArrayList<Task>();
* If TT has Map and Reduce slot free, we assign 1 map and 1 reduce
* We base decision on how much is needed
* versus how much is used
ClusterStatus c = taskTrackerManager.getClusterStatus();
int mapClusterCapacity = c.getMaxMapTasks();
int reduceClusterCapacity = c.getMaxReduceTasks();
int maxMapSlots = taskTrackerStatus.getMaxMapSlots();
int currentMapSlots = taskTrackerStatus.countOccupiedMapSlots();
int maxReduceSlots = taskTrackerStatus.getMaxReduceSlots();
int currentReduceSlots = taskTrackerStatus.countOccupiedReduceSlots();
LOG.debug("TT asking for task, max maps="
+ taskTrackerStatus.getMaxMapSlots() +
", run maps=" + taskTrackerStatus.countMapTasks() + ", max reds=" +
taskTrackerStatus.getMaxReduceSlots() + ", run reds=" +
taskTrackerStatus.countReduceTasks() + ", map cap=" +
mapClusterCapacity + ", red cap = " +
* update all our QSC objects.
* This involves updating each qsC structure. This operation depends
* on the number of running jobs in a queue, and some waiting jobs. If it
* becomes expensive, do it once every few heartbeats only.
updateContextObjects(mapClusterCapacity, reduceClusterCapacity);
// make sure we get our map or reduce scheduling object to update its
// collection of QSC objects too.
if (maxReduceSlots > currentReduceSlots) {
//reduce slot available , try to get a
//reduce task
tlr = reduceScheduler.assignTasks(taskTracker);
if (TaskLookupResult.LookUpStatus.TASK_FOUND ==
tlr.getLookUpStatus()) {
if(maxMapSlots > currentMapSlots) {
//map slot available , try to get a map task
tlr = mapScheduler.assignTasks(taskTracker);
if (TaskLookupResult.LookUpStatus.TASK_FOUND ==
tlr.getLookUpStatus()) {
return (result.isEmpty()) ? null : result;
public synchronized Collection<JobInProgress> getJobs(String queueName) {
Collection<JobInProgress> jobCollection = new ArrayList<JobInProgress>();
JobQueue jobQueue = jobQueuesManager.getJobQueue(queueName);
if (jobQueue == null) {
return jobCollection;
Collection<JobInProgress> runningJobs =
if (runningJobs != null) {
Collection<JobInProgress> waitingJobs =
Collection<JobInProgress> tempCollection = new ArrayList<JobInProgress>();
if(waitingJobs != null) {
if(!tempCollection.isEmpty()) {
return jobCollection;
JobInitializationPoller getInitializationPoller() {
return initializationPoller;
private synchronized String getDisplayInfo(String queueName) {
QueueSchedulingContext qsi = queueInfoMap.get(queueName);
if (null == qsi) {
return null;
return qsi.toString();
private synchronized void addToQueueInfoMap(QueueSchedulingContext qsc) {
queueInfoMap.put(qsc.getQueueName(), qsc);
* Create the scheduler information and set it in the {@link QueueManager}.
* this should be only called when the scheduler is starting.
* @param queueManager
* @param queueName
private void createDisplayInfo(QueueManager queueManager, String queueName) {
if (queueManager != null) {
SchedulingDisplayInfo schedulingInfo =
new SchedulingDisplayInfo(queueName, this);
queueManager.setSchedulerInfo(queueName, schedulingInfo);
* Use for testing purposes.
* returns the root
* @return
AbstractQueue getRoot() {
return this.root;
* This is used for testing purpose only
* Dont use this method.
* @param rt
void setRoot(AbstractQueue rt) {
this.root = rt;