blob: 143b3a28050f4e644ce9146a14ffaffc137488c8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
/**
* A {@link TaskScheduler} that limits the maximum number of tasks
* running for a job. The limit is set by means of the
* {@link JTConfig#JT_RUNNINGTASKS_PER_JOB} property.
*/
class LimitTasksPerJobTaskScheduler extends JobQueueTaskScheduler {
private static final Log LOG = LogFactory.getLog(
"org.apache.hadoop.mapred.TaskLimitedJobQueueTaskScheduler");
private long maxTasksPerJob;
public LimitTasksPerJobTaskScheduler() {
super();
}
@Override
public synchronized void start() throws IOException {
super.start();
QueueManager queueManager = taskTrackerManager.getQueueManager();
String queueName = queueManager.getJobQueueInfos()[0].getQueueName();
queueManager.setSchedulerInfo(queueName
,"Maximum Tasks Per Job :: " + String.valueOf(maxTasksPerJob));
}
@Override
public synchronized void setConf(Configuration conf) {
super.setConf(conf);
maxTasksPerJob =
conf.getLong(JTConfig.JT_RUNNINGTASKS_PER_JOB, Long.MAX_VALUE);
if (maxTasksPerJob <= 0) {
String msg = JTConfig.JT_RUNNINGTASKS_PER_JOB +
" is set to zero or a negative value. Aborting.";
LOG.fatal(msg);
throw new RuntimeException (msg);
}
}
@Override
public synchronized List<Task> assignTasks(TaskTracker taskTracker)
throws IOException {
TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
final int numTaskTrackers =
taskTrackerManager.getClusterStatus().getTaskTrackers();
Collection<JobInProgress> jobQueue =
jobQueueJobInProgressListener.getJobQueue();
Task task;
/* Stats about the current taskTracker */
final int mapTasksNumber = taskTrackerStatus.countMapTasks();
final int reduceTasksNumber = taskTrackerStatus.countReduceTasks();
final int maximumMapTasksNumber = taskTrackerStatus.getMaxMapSlots();
final int maximumReduceTasksNumber = taskTrackerStatus.getMaxReduceSlots();
/*
* Statistics about the whole cluster. Most are approximate because of
* concurrency
*/
final int[] maxMapAndReduceLoad = getMaxMapAndReduceLoad(
maximumMapTasksNumber, maximumReduceTasksNumber);
final int maximumMapLoad = maxMapAndReduceLoad[0];
final int maximumReduceLoad = maxMapAndReduceLoad[1];
final int beginAtStep;
/*
* When step == 0, this loop starts as many map tasks it can wrt
* maxTasksPerJob
* When step == 1, this loop starts as many reduce tasks it can wrt
* maxTasksPerJob
* When step == 2, this loop starts as many map tasks it can
* When step == 3, this loop starts as many reduce tasks it can
*
* It may seem that we would improve this loop by queuing jobs we cannot
* start in steps 0 and 1 because of maxTasksPerJob, and using that queue
* in step 2 and 3.
* A first thing to notice is that the time with the current algorithm is
* logarithmic, because it is the sum of (p^k) for k from 1 to N, were
* N is the number of jobs and p is the probability for a job to not exceed
* limits The probability for the cache to be useful would be similar to
* p^N, that is 1/(e^N), whereas its size and the time spent to manage it
* would be in ln(N).
* So it is not a good idea.
*/
if (maxTasksPerJob != Long.MAX_VALUE) {
beginAtStep = 0;
}
else {
beginAtStep = 2;
}
List<Task> assignedTasks = new ArrayList<Task>();
scheduleTasks:
for (int step = beginAtStep; step <= 3; ++step) {
/* If we reached the maximum load for this step, go to the next */
if ((step == 0 || step == 2) && mapTasksNumber >= maximumMapLoad ||
(step == 1 || step == 3) && reduceTasksNumber >= maximumReduceLoad) {
continue;
}
/* For each job, start its tasks */
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
/* Ignore non running jobs */
if (job.getStatus().getRunState() != JobStatus.RUNNING) {
continue;
}
/* Check that we're not exceeding the global limits */
if ((step == 0 || step == 1)
&& (job.runningMaps() + job.runningReduces() >= maxTasksPerJob)) {
continue;
}
if (step == 0 || step == 2) {
task = job.obtainNewMapTask(taskTrackerStatus, numTaskTrackers,
taskTrackerManager.getNumberOfUniqueHosts());
}
else {
task = job.obtainNewReduceTask(taskTrackerStatus, numTaskTrackers,
taskTrackerManager.getNumberOfUniqueHosts());
}
if (task != null) {
assignedTasks.add(task);
break scheduleTasks;
}
}
}
}
return assignedTasks;
}
/**
* Determine the maximum number of maps or reduces that we are willing to run
* on a taskTracker which accept a maximum of localMaxMapLoad maps and
* localMaxReduceLoad reduces
* @param localMaxMapLoad The local maximum number of map tasks for a host
* @param localMaxReduceLoad The local maximum number of reduce tasks for a
* host
* @return An array of the two maximums: map then reduce.
*/
protected synchronized int[] getMaxMapAndReduceLoad(int localMaxMapLoad,
int localMaxReduceLoad) {
// Approximate because of concurrency
final int numTaskTrackers =
taskTrackerManager.getClusterStatus().getTaskTrackers();
/* Hold the result */
int maxMapLoad = 0;
int maxReduceLoad = 0;
int neededMaps = 0;
int neededReduces = 0;
Collection<JobInProgress> jobQueue =
jobQueueJobInProgressListener.getJobQueue();
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() == JobStatus.RUNNING) {
neededMaps += job.desiredMaps() - job.finishedMaps();
neededReduces += job.desiredReduces() - job.finishedReduces();
}
}
}
if (numTaskTrackers > 0) {
maxMapLoad = Math.min(localMaxMapLoad, (int) Math
.ceil((double) neededMaps / numTaskTrackers));
maxReduceLoad = Math.min(localMaxReduceLoad, (int) Math
.ceil((double) neededReduces / numTaskTrackers));
}
return new int[] { maxMapLoad, maxReduceLoad };
}
}