blob: d559927593f5f173cada3a1713be3f6abf7c1405 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Set;
import java.util.TreeMap;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.JobQueueJobInProgressListener.JobSchedulingInfo;
import org.apache.hadoop.util.StringUtils;
/**
* This class asynchronously initializes jobs submitted to the
* Map/Reduce cluster running with the {@link CapacityTaskScheduler}.
*
* <p>
* The class comprises of a main poller thread, and a set of worker
* threads that together initialize the jobs. The poller thread periodically
* looks at jobs submitted to the scheduler, and selects a set of them
* to be initialized. It passes these to the worker threads for initializing.
* Each worker thread is configured to look at jobs submitted to a fixed
* set of queues. It initializes jobs in a round robin manner - selecting
* the first job in order from each queue ready to be initialized.
* </p>
*
* <p>
* An initialized job occupies memory resources on the Job Tracker. Hence,
* the poller limits the number of jobs initialized at any given time to
* a configured limit. The limit is specified per user per queue.
* </p>
*
* <p>
* However, since a job needs to be initialized before the scheduler can
* select tasks from it to run, it tries to keep a backlog of jobs
* initialized so the scheduler does not need to wait and let empty slots
* go waste. The core logic of the poller is to pick up the right jobs,
* which have a good potential to be run next by the scheduler. To do this,
* it picks up jobs submitted across users and across queues to account
* both for guaranteed capacities and user limits. It also always initializes
* high priority jobs, whenever they need to be initialized, even if this
* means going over the limit for initialized jobs.
* </p>
*/
public class JobInitializationPoller extends Thread {
private static final Log LOG = LogFactory
.getLog(JobInitializationPoller.class.getName());
/*
* The poller picks up jobs across users to initialize based on user limits.
* Suppose the user limit for a queue is 25%, it means atmost 4 users' jobs
* can run together. However, in order to account for jobs from a user that
* might complete faster than others, it initializes jobs from an additional
* number of users as a backlog. This variable defines the additional
* number of users whose jobs can be considered for initializing.
*/
private static final int MAX_ADDITIONAL_USERS_TO_INIT = 2;
private JobQueuesManager jobQueueManager;
private long sleepInterval;
private int poolSize;
/**
* A worker thread that initializes jobs in one or more queues assigned to
* it.
*
* Jobs are initialized in a round robin fashion one from each queue at a
* time.
*/
class JobInitializationThread extends Thread {
private JobInProgress initializingJob;
private volatile boolean startIniting;
private AtomicInteger currentJobCount = new AtomicInteger(0); // number of jobs to initialize
/**
* The hash map which maintains relationship between queue to jobs to
* initialize per queue.
*/
private HashMap<String, TreeMap<JobSchedulingInfo, JobInProgress>> jobsPerQueue;
public JobInitializationThread() {
startIniting = true;
jobsPerQueue = new HashMap<String, TreeMap<JobSchedulingInfo, JobInProgress>>();
}
@Override
public void run() {
while (startIniting) {
initializeJobs();
try {
if (startIniting) {
Thread.sleep(sleepInterval);
} else {
break;
}
} catch (Throwable t) {
}
}
}
// The key method that initializes jobs from queues
// This method is package-private to allow test cases to call it
// synchronously in a controlled manner.
void initializeJobs() {
// while there are more jobs to initialize...
while (currentJobCount.get() > 0) {
Set<String> queues = jobsPerQueue.keySet();
for (String queue : queues) {
JobInProgress job = getFirstJobInQueue(queue);
if (job == null) {
continue;
}
LOG.info("Initializing job : " + job.getJobID() + " in AbstractQueue "
+ job.getProfile().getQueueName() + " For user : "
+ job.getProfile().getUser());
if (startIniting) {
setInitializingJob(job);
ttm.initJob(job);
setInitializingJob(null);
} else {
break;
}
}
}
}
/**
* This method returns the first job in the queue and removes the same.
*
* @param queue
* queue name
* @return First job in the queue and removes it.
*/
private JobInProgress getFirstJobInQueue(String queue) {
TreeMap<JobSchedulingInfo, JobInProgress> jobsList = jobsPerQueue
.get(queue);
synchronized (jobsList) {
if (jobsList.isEmpty()) {
return null;
}
Iterator<JobInProgress> jobIterator = jobsList.values().iterator();
JobInProgress job = jobIterator.next();
jobIterator.remove();
currentJobCount.getAndDecrement();
return job;
}
}
/*
* Test method to check if the thread is currently initialising the job
*/
synchronized JobInProgress getInitializingJob() {
return this.initializingJob;
}
synchronized void setInitializingJob(JobInProgress job) {
this.initializingJob = job;
}
void terminate() {
startIniting = false;
}
void addJobsToQueue(String queue, JobInProgress job) {
TreeMap<JobSchedulingInfo, JobInProgress> jobs = jobsPerQueue
.get(queue);
if (jobs == null) {
LOG.error("Invalid queue passed to the thread : " + queue
+ " For job :: " + job.getJobID());
}
synchronized (jobs) {
JobSchedulingInfo schedInfo = new JobSchedulingInfo(job);
jobs.put(schedInfo, job);
currentJobCount.getAndIncrement();
}
}
void addQueue(String queue) {
TreeMap<JobSchedulingInfo, JobInProgress> jobs = new TreeMap<JobSchedulingInfo, JobInProgress>(
jobQueueManager.getComparator(queue));
jobsPerQueue.put(queue, jobs);
}
}
/**
* The queue information class maintains following information per queue:
* Maximum users allowed to initialize job in the particular queue. Maximum
* jobs allowed to be initialize per user in the queue.
*
*/
private class QueueInfo {
String queue;
int maxUsersAllowedToInitialize;
int maxJobsPerUserToInitialize;
public QueueInfo(String queue, int maxUsersAllowedToInitialize,
int maxJobsPerUserToInitialize) {
this.queue = queue;
this.maxJobsPerUserToInitialize = maxJobsPerUserToInitialize;
this.maxUsersAllowedToInitialize = maxUsersAllowedToInitialize;
}
}
/**
* Map which contains the configuration used for initializing jobs
* in that associated to a particular job queue.
*/
private HashMap<String, QueueInfo> jobQueues;
/**
* Set of jobs which have been passed to Initialization threads.
* This is maintained so that we dont call initTasks() for same job twice.
*/
private HashMap<JobID,JobInProgress> initializedJobs;
private volatile boolean running;
private TaskTrackerManager ttm;
/**
* The map which provides information which thread should be used to
* initialize jobs for a given job queue.
*/
private HashMap<String, JobInitializationThread> threadsToQueueMap;
public JobInitializationPoller(
JobQueuesManager mgr,
TaskTrackerManager ttm) {
initializedJobs = new HashMap<JobID,JobInProgress>();
jobQueues = new HashMap<String, QueueInfo>();
this.jobQueueManager = mgr;
threadsToQueueMap = new HashMap<String, JobInitializationThread>();
super.setName("JobInitializationPollerThread");
running = true;
this.ttm = ttm;
}
/*
* method to read all configuration values required by the initialisation
* poller
*/
void init(Set<String> queues,
CapacitySchedulerConf capacityConf) {
setupJobInitializerConfiguration(queues, capacityConf);
assignThreadsToQueues();
Collection<JobInitializationThread> threads = threadsToQueueMap.values();
for (JobInitializationThread t : threads) {
if (!t.isAlive()) {
t.setDaemon(true);
t.start();
}
}
}
/**
* Initialize the configuration of the JobInitializer as well as of the specific
* queues.
*
* @param queues
* @param schedulerConf
*/
private void setupJobInitializerConfiguration(Set<String> queues,
CapacitySchedulerConf schedulerConf) {
for (String queue : queues) {
int maxUsersToInitialize = getMaxUsersToInit(schedulerConf, queue);
int maxJobsPerUserToInitialize =
schedulerConf.getMaxJobsPerUserToInitialize(queue);
QueueInfo qi =
new QueueInfo(queue, maxUsersToInitialize,
maxJobsPerUserToInitialize);
jobQueues.put(queue, qi);
}
sleepInterval = schedulerConf.getSleepInterval();
poolSize = schedulerConf.getMaxWorkerThreads();
if (poolSize > queues.size()) {
poolSize = queues.size();
}
}
/**
*
* @param schedulerConf
* @param queue
* @return
*/
private int getMaxUsersToInit(CapacitySchedulerConf schedulerConf,
String queue) {
int userlimit = schedulerConf.getMinimumUserLimitPercent(queue);
return (100 / userlimit) + MAX_ADDITIONAL_USERS_TO_INIT;
}
/**
* Refresh the Scheduler configuration cached with the initializer. This
* should be called only by
* {@link CapacityTaskScheduler.CapacitySchedulerQueueRefresher#refreshQueues()}
* . The cached configuration currently is only used by the main thread in the
* initializer. So, any updates are picked up automatically by subsequent
* iterations of the main thread.
*/
void refreshQueueInfo(CapacitySchedulerConf schedulerConf) {
for (String queue : jobQueues.keySet()) {
QueueInfo queueInfo = jobQueues.get(queue);
synchronized (queueInfo) {
queueInfo.maxUsersAllowedToInitialize =
getMaxUsersToInit(schedulerConf, queue);
queueInfo.maxJobsPerUserToInitialize =
schedulerConf.getMaxJobsPerUserToInitialize(queue);
}
}
}
/**
* This is main thread of initialization poller, We essentially do
* following in the main threads:
*
* <ol>
* <li> Clean up the list of initialized jobs list which poller maintains
* </li>
* <li> Select jobs to initialize in the polling interval.</li>
* </ol>
*/
public void run() {
while (running) {
try {
cleanUpInitializedJobsList();
selectJobsToInitialize();
if (!this.isInterrupted()) {
Thread.sleep(sleepInterval);
}
} catch (InterruptedException e) {
LOG.error("Job Initialization poller interrupted"
+ StringUtils.stringifyException(e));
}
}
}
/**
* The key method which does selecting jobs to be initalized across
* queues and assign those jobs to their appropriate init-worker threads.
* <br/>
* This method is overriden in test case which is used to test job
* initialization poller.
*
*/
void selectJobsToInitialize() {
for (String queue : jobQueues.keySet()) {
ArrayList<JobInProgress> jobsToInitialize = getJobsToInitialize(queue);
printJobs(jobsToInitialize);
JobInitializationThread t = threadsToQueueMap.get(queue);
for (JobInProgress job : jobsToInitialize) {
t.addJobsToQueue(queue, job);
}
}
}
/**
* Method used to print log statements about which jobs are being
* passed to init-threads.
*
* @param jobsToInitialize list of jobs which are passed to be
* init-threads.
*/
private void printJobs(ArrayList<JobInProgress> jobsToInitialize) {
for (JobInProgress job : jobsToInitialize) {
LOG.info("Passing to Initializer Job Id :" + job.getJobID()
+ " User: " + job.getProfile().getUser() + " AbstractQueue : "
+ job.getProfile().getQueueName());
}
}
/**
* This method exists to be overridden by test cases that wish to
* create a test-friendly worker thread which can be controlled
* synchronously.
*
* @return Instance of worker init-threads.
*/
JobInitializationThread createJobInitializationThread() {
return new JobInitializationThread();
}
/**
* Method which is used by the poller to assign appropriate worker thread
* to a queue. The number of threads would be always less than or equal
* to number of queues in a system. If number of threads is configured to
* be more than number of queues then poller does not create threads more
* than number of queues.
*
*/
private void assignThreadsToQueues() {
int countOfQueues = jobQueues.size();
String[] queues = (String[]) jobQueues.keySet().toArray(
new String[countOfQueues]);
int numberOfQueuesPerThread = countOfQueues / poolSize;
int numberOfQueuesAssigned = 0;
for (int i = 0; i < poolSize; i++) {
JobInitializationThread initializer = createJobInitializationThread();
int batch = (i * numberOfQueuesPerThread);
for (int j = batch; j < (batch + numberOfQueuesPerThread); j++) {
initializer.addQueue(queues[j]);
threadsToQueueMap.put(queues[j], initializer);
numberOfQueuesAssigned++;
}
}
if (numberOfQueuesAssigned < countOfQueues) {
// Assign remaining queues in round robin fashion to other queues
int startIndex = 0;
for (int i = numberOfQueuesAssigned; i < countOfQueues; i++) {
JobInitializationThread t = threadsToQueueMap
.get(queues[startIndex]);
t.addQueue(queues[i]);
threadsToQueueMap.put(queues[i], t);
startIndex++;
}
}
}
/**
*
* Method used to select jobs to be initialized for a given queue. <br/>
*
* We want to ensure that enough jobs have been initialized, so that when the
* Scheduler wants to consider a new job to run, it's ready. We clearly don't
* want to initialize too many jobs as each initialized job has a memory
* footprint, sometimes significant.
*
* Number of jobs to be initialized is restricted by two values: - Maximum
* number of users whose jobs we want to initialize, which is equal to
* the number of concurrent users the queue can support. - Maximum number
* of initialized jobs per user. The product of these two gives us the
* total number of initialized jobs.
*
* Note that this is a rough number, meant for decreasing extra memory
* footprint. It's OK if we go over it once in a while, if we have to.
*
* This can happen as follows. Suppose we have initialized 3 jobs for a
* user. Now, suppose the user submits a job who's priority is higher than
* that of the 3 jobs initialized. This job needs to be initialized, since it
* will run earlier than the 3 jobs. We'll now have 4 initialized jobs for the
* user. If memory becomes a problem, we should ideally un-initialize one of
* the 3 jobs, to keep the count of initialized jobs at 3, but that's
* something we don't do for now. This situation can also arise when a new
* user submits a high priority job, thus superceeding a user whose jobs have
* already been initialized. The latter user's initialized jobs are redundant,
* but we'll leave them initialized.
*
* @param queue name of the queue to pick the jobs to initialize.
* @return list of jobs to be initalized in a queue. An empty queue is
* returned if no jobs are found.
*/
ArrayList<JobInProgress> getJobsToInitialize(String queue) {
QueueInfo qi = jobQueues.get(queue);
ArrayList<JobInProgress> jobsToInitialize = new ArrayList<JobInProgress>();
// use the configuration parameter which is configured for the particular
// queue.
int maximumUsersAllowedToInitialize;
int maxJobsPerUserAllowedToInitialize;
synchronized (qi) {
maximumUsersAllowedToInitialize = qi.maxUsersAllowedToInitialize;
maxJobsPerUserAllowedToInitialize = qi.maxJobsPerUserToInitialize;
}
int maxJobsPerQueueToInitialize = maximumUsersAllowedToInitialize
* maxJobsPerUserAllowedToInitialize;
int countOfJobsInitialized = 0;
HashMap<String, Integer> userJobsInitialized = new HashMap<String, Integer>();
Collection<JobInProgress> jobs = jobQueueManager.getJobQueue(queue).getWaitingJobs();
/*
* Walk through the collection of waiting jobs.
* We maintain a map of jobs that have already been initialized. If a
* job exists in that map, increment the count for that job's user
* and move on to the next job.
*
* If the job doesn't exist, see whether we want to initialize it.
* We initialize it if: - at least one job of the user has already
* been initialized, but the user's total initialized jobs are below
* the limit, OR - this is a new user, and we haven't reached the limit
* for the number of users whose jobs we want to initialize. We break
* when we've reached the limit of maximum jobs to initialize.
*/
for (JobInProgress job : jobs) {
String user = job.getProfile().getUser();
int numberOfJobs = userJobsInitialized.get(user) == null ? 0
: userJobsInitialized.get(user);
// If the job is already initialized then add the count against user
// then continue.
if (initializedJobs.containsKey(job.getJobID())) {
userJobsInitialized.put(user, Integer.valueOf(numberOfJobs + 1));
countOfJobsInitialized++;
continue;
}
boolean isUserPresent = userJobsInitialized.containsKey(user);
if (!isUserPresent
&& userJobsInitialized.size() < maximumUsersAllowedToInitialize) {
// this is a new user being considered and the number of users
// is within limits.
userJobsInitialized.put(user, Integer.valueOf(numberOfJobs + 1));
jobsToInitialize.add(job);
initializedJobs.put(job.getJobID(),job);
countOfJobsInitialized++;
} else if (isUserPresent
&& numberOfJobs < maxJobsPerUserAllowedToInitialize) {
userJobsInitialized.put(user, Integer.valueOf(numberOfJobs + 1));
jobsToInitialize.add(job);
initializedJobs.put(job.getJobID(),job);
countOfJobsInitialized++;
}
/*
* if the maximum number of jobs to initalize for a queue is reached
* then we stop looking at further jobs. The jobs beyond this number
* can be initialized.
*/
if(countOfJobsInitialized > maxJobsPerQueueToInitialize) {
break;
}
}
return jobsToInitialize;
}
/**
* Method which is used internally to clean up the initialized jobs
* data structure which the job initialization poller uses to check
* if a job is initalized or not.
*
* Algorithm for cleaning up task is as follows:
*
* <ul>
* <li> For jobs in <b>initalizedJobs</b> list </li>
* <ul>
* <li> If job is running</li>
* <ul>
* <li> If job is scheduled then remove the job from the waiting queue
* of the scheduler and <b>initalizedJobs</b>.<br/>
* The check for a job is scheduled or not is done by following
* formulae:<br/>
* if pending <i>task</i> &lt; desired <i>task</i> then scheduled else
* not scheduled.<br/>
* The formulae would return <i>scheduled</i> if one task has run or failed,
* any cases in which there has been a failure but not enough to mark task
* as failed, we return <i>not scheduled</i> in formulae.
* </li>
* </ul>
*
* <li> If job is complete, then remove the job from <b>initalizedJobs</b>.
* </li>
*
* </ul>
* </ul>
*
*/
void cleanUpInitializedJobsList() {
Iterator<Entry<JobID, JobInProgress>> jobsIterator =
initializedJobs.entrySet().iterator();
while(jobsIterator.hasNext()) {
Entry<JobID,JobInProgress> entry = jobsIterator.next();
JobInProgress job = entry.getValue();
if (job.getStatus().getRunState() == JobStatus.RUNNING) {
if (isScheduled(job)) {
LOG.info("Removing scheduled jobs from waiting queue"
+ job.getJobID());
jobsIterator.remove();
jobQueueManager.getJobQueue(job).removeWaitingJob(new JobSchedulingInfo(job));
continue;
}
}
if(job.isComplete()) {
LOG.info("Removing killed/completed job from initalized jobs " +
"list : "+ job.getJobID());
jobsIterator.remove();
}
}
}
/**
* Convenience method to check if job has been scheduled or not.
*
* The method may return false in case of job which has failure but
* has not failed the tip.
* @param job
* @return
*/
private boolean isScheduled(JobInProgress job) {
return ((job.pendingMaps() < job.desiredMaps())
|| (job.pendingReduces() < job.desiredReduces()));
}
void terminate() {
running = false;
for (Entry<String, JobInitializationThread> entry : threadsToQueueMap
.entrySet()) {
JobInitializationThread t = entry.getValue();
if (t.isAlive()) {
t.terminate();
t.interrupt();
}
}
}
/*
* Test method used only for testing purposes.
*/
JobInProgress getInitializingJob(String queue) {
JobInitializationThread t = threadsToQueueMap.get(queue);
if (t == null) {
return null;
} else {
return t.getInitializingJob();
}
}
Set<JobID> getInitializedJobList() {
return initializedJobs.keySet();
}
}