blob: e58d9ec0d520c4fdb9078615f36568cb7081e952 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.PrintWriter;
import java.util.Collection;
import java.util.List;
import java.util.LinkedList;
import java.util.Set;
import java.util.HashSet;
import java.util.HashMap;
import java.util.TreeMap;
import java.util.Timer;
import java.util.TimerTask;
import java.util.Map;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
/**
* A {@link TaskScheduler} that
* provides the following features:
* (1) allows continuous enforcement of user controlled dynamic queue shares,
* (2) preempts tasks exceeding their queue shares instantaneously when new
* jobs arrive,
* (3) is work conserving,
* (4) tracks queue usage to only charge when jobs are pending or running,
* (5) authorizes queue submissions based on symmetric private key HMAC/SHA1
* signatures.
*/
class PriorityScheduler extends QueueTaskScheduler {
private class InitThread extends Thread {
JobInProgress job;
InitThread(JobInProgress job) {
this.job = job;
}
@Override
public void run() {
taskTrackerManager.initJob(job);
}
}
private class JobListener extends JobInProgressListener {
@Override
public void jobAdded(JobInProgress job) {
new InitThread(job).start();
synchronized (PriorityScheduler.this) {
String queue = authorize(job);
if (queue.equals("")) {
job.kill();
return;
}
jobQueue.add(job);
QueueJobs jobs = queueJobs.get(queue);
if (jobs == null) {
jobs = new QueueJobs(queue);
queueJobs.put(queue,jobs);
}
jobs.jobs.add(job);
if (debug) {
LOG.debug("Add job " + job.getProfile().getJobID());
}
}
}
@Override
public void jobRemoved(JobInProgress job) {
synchronized (PriorityScheduler.this) {
jobQueue.remove(job);
String queue = getQueue(job);
queueJobs.get(queue).jobs.remove(job);
}
}
@Override
public void jobUpdated(JobChangeEvent event) {
}
}
static final Comparator<TaskInProgress> TASK_COMPARATOR
= new Comparator<TaskInProgress>() {
public int compare(TaskInProgress o1, TaskInProgress o2) {
int res = 0;
if (o1.getProgress() < o2.getProgress()) {
res = -1;
} else {
res = (o1.getProgress() == o2.getProgress() ? 0 : 1);
}
if (res == 0) {
if (o1.getExecStartTime() > o2.getExecStartTime()) {
res = -1;
} else {
res = (o1.getExecStartTime() == o2.getExecStartTime() ? 0 : 1);
}
}
return res;
}
};
static final Comparator<KillQueue> QUEUE_COMPARATOR
= new Comparator<KillQueue>() {
public int compare(KillQueue o1, KillQueue o2) {
if (o1.startTime < o2.startTime) {
return 1;
}
if (o1.startTime > o2.startTime) {
return -1;
}
return 0;
}
};
class QueueJobs {
String name;
LinkedList<JobInProgress> jobs = new LinkedList<JobInProgress>();
QueueJobs(String name) {
this.name = name;
}
}
class QueueQuota {
int quota;
int map_used;
int reduce_used;
int map_pending;
int reduce_pending;
int mappers;
int reducers;
String name;
QueueQuota(String name) {
this.name = name;
}
}
private QueueAllocator allocator;
private static final Log LOG =
LogFactory.getLog(PriorityScheduler.class);
static final boolean MAP = true;
static final boolean REDUCE = false;
private static final boolean FILL = true;
private static final boolean NO_FILL = false;
private JobListener jobListener = new JobListener();
private static final boolean debug = LOG.isDebugEnabled();
private boolean sortTasks = true;
private long lastKill = 0;
private long killInterval = 0;
private PriorityAuthorization auth = new PriorityAuthorization();
private LinkedList<JobInProgress> jobQueue =
new LinkedList<JobInProgress>();
private HashMap<String,QueueJobs> queueJobs =
new HashMap<String,QueueJobs>();
@Override
public void start() throws IOException {
taskTrackerManager.addJobInProgressListener(jobListener);
sortTasks = conf.getBoolean("mapred.priority-scheduler.sort-tasks", true);
killInterval = conf.getLong("mapred.priority-scheduler.kill-interval", 0);
auth.init(conf);
}
@Override
public void terminate() throws IOException {
}
@Override
public void setAllocator(QueueAllocator allocator) {
this.allocator = allocator;
}
private boolean assignMapRedTask(JobInProgress job,
TaskTrackerStatus taskTracker, int numTrackers, List<Task> assignedTasks,
Map<String,QueueQuota> queueQuota, boolean fill, boolean map)
throws IOException {
String queue = getQueue(job);
QueueQuota quota = queueQuota.get(queue);
if (quota == null) {
LOG.error("Queue " + queue + " not configured properly");
return false;
}
if (quota.quota < 1 && !fill) {
return false;
}
Task t = null;
if (map) {
t = job.obtainNewLocalMapTask(taskTracker, numTrackers,
taskTrackerManager.getNumberOfUniqueHosts());
if (t != null) {
if (debug) {
LOG.debug("assigned local task for job " + job.getProfile().getJobID() +
" " + taskType(map) );
}
assignedTasks.add(t);
if (map) {
quota.map_used++;
} else {
quota.reduce_used++;
}
quota.quota--;
return true;
}
t = job.obtainNewNonLocalMapTask(taskTracker, numTrackers,
taskTrackerManager.getNumberOfUniqueHosts());
} else {
t = job.obtainNewReduceTask(taskTracker, numTrackers,
taskTrackerManager.getNumberOfUniqueHosts());
}
if (t != null) {
if (debug) {
LOG.debug("assigned remote task for job " + job.getProfile().getJobID() +
" " + taskType(map));
}
assignedTasks.add(t);
if (map) {
quota.map_used++;
} else {
quota.reduce_used++;
}
quota.quota--;
return true;
}
return false;
}
Map<String,QueueQuota> getQueueQuota(int maxMapTasks, int maxReduceTasks,
boolean map) {
if (debug) {
LOG.debug("max map tasks " + Integer.toString(maxMapTasks) + " " +
taskType(map));
LOG.debug("max reduce tasks " + Integer.toString(maxReduceTasks) + " " +
taskType(map));
}
int maxTasks = (map) ? maxMapTasks : maxReduceTasks;
Map<String,QueueAllocation> shares = allocator.getAllocation();
Map<String,QueueQuota> quotaMap = new HashMap<String,QueueQuota>();
for (QueueAllocation share: shares.values()) {
QueueQuota quota = new QueueQuota(share.getName());
quota.mappers = Math.round(share.getShare() * maxMapTasks);
quota.reducers = Math.round(share.getShare() * maxReduceTasks);
quota.quota = (map) ? quota.mappers : quota.reducers;
if (debug) {
LOG.debug("queue " + quota.name + " initial quota " +
Integer.toString(quota.quota) + " " + taskType(map));
}
quota.map_used = 0;
quota.reduce_used = 0;
quota.map_pending = 0;
quota.reduce_pending = 0;
Collection<JobInProgress> jobs = getJobs(quota.name);
for (JobInProgress job : jobs) {
quota.map_pending += job.pendingMaps();
quota.reduce_pending += job.pendingReduces();
int running = (map) ? job.runningMapTasks : job.runningReduceTasks;
quota.quota -= running;
quota.map_used += job.runningMapTasks ;
quota.reduce_used += job.runningReduceTasks;
}
if (debug) {
LOG.debug("queue " + quota.name + " quota " +
Integer.toString(quota.quota) + " " + taskType(map));
}
quotaMap.put(quota.name,quota);
}
return quotaMap;
}
private void scheduleJobs(int availableSlots, boolean map, boolean fill,
TaskTrackerStatus taskTracker, int numTrackers, List<Task> assignedTasks,
Map<String,QueueQuota> queueQuota) throws IOException {
for (int i = 0; i < availableSlots; i++) {
for (JobInProgress job : jobQueue) {
if ((job.getStatus().getRunState() != JobStatus.RUNNING) ||
(!map && job.numReduceTasks == 0)) {
continue;
}
if (assignMapRedTask(job, taskTracker, numTrackers, assignedTasks,
queueQuota, fill, map)) {
break;
}
}
}
}
private int countTasksToKill(Map<String,QueueQuota> queueQuota, boolean map) {
int killTasks = 0;
for (QueueQuota quota : queueQuota.values()) {
killTasks += Math.min((map) ? quota.map_pending : quota.reduce_pending,
Math.max(quota.quota,0));
}
return killTasks;
}
protected void markIdle(Map<String, QueueQuota> queueQuota) {
for (QueueQuota quota: queueQuota.values()) {
allocator.setUsage(quota.name, Math.min(quota.map_used, quota.mappers) +
Math.min(quota.reduce_used, quota.reducers),
(quota.map_pending + quota.reduce_pending));
}
}
private synchronized void assignMapRedTasks(List<Task> assignedTasks,
TaskTrackerStatus taskTracker, int numTrackers, boolean map)
throws IOException {
int taskOffset = assignedTasks.size();
int maxTasks = (map) ? taskTracker.getMaxMapSlots() :
taskTracker.getMaxReduceSlots();
int countTasks = (map) ? taskTracker.countMapTasks() :
taskTracker.countReduceTasks();
int availableSlots = maxTasks - countTasks;
int map_capacity = 0;
int reduce_capacity = 0;
ClusterStatus status = taskTrackerManager.getClusterStatus();
if (status != null) {
map_capacity = status.getMaxMapTasks();
reduce_capacity = status.getMaxReduceTasks();
}
Map<String,QueueQuota> queueQuota = getQueueQuota(map_capacity,
reduce_capacity,map);
if (debug) {
LOG.debug("available slots " + Integer.toString(availableSlots) + " " +
taskType(map));
LOG.debug("queue size " + Integer.toString(jobQueue.size()));
LOG.debug("map capacity " + Integer.toString(map_capacity) + " " +
taskType(map));
LOG.debug("reduce capacity " + Integer.toString(reduce_capacity) + " " +
taskType(map));
}
scheduleJobs(availableSlots, map, NO_FILL, taskTracker, numTrackers,
assignedTasks, queueQuota);
availableSlots -= assignedTasks.size() + taskOffset;
scheduleJobs(availableSlots, map, FILL, taskTracker, numTrackers,
assignedTasks, queueQuota);
if (map) {
markIdle(queueQuota);
}
long currentTime = System.currentTimeMillis()/1000;
if ((killInterval > 0) && (currentTime - lastKill > killInterval)) {
lastKill = currentTime;
} else {
return;
}
int killTasks = countTasksToKill(queueQuota, map);
if (debug) {
LOG.debug("trying to kill " + Integer.toString(killTasks) + " tasks " +
taskType(map));
}
killMapRedTasks(killTasks, queueQuota, map);
}
class KillQueue {
String name;
long startTime;
QueueQuota quota;
}
private Collection<KillQueue> getKillQueues(Map<String,
QueueQuota> queueQuota) {
TreeMap killQueues = new TreeMap(QUEUE_COMPARATOR);
for (QueueJobs queueJob : queueJobs.values()) {
QueueQuota quota = queueQuota.get(queueJob.name);
if (quota.quota >= 0) {
continue;
}
for (JobInProgress job : queueJob.jobs) {
if (job.getStatus().getRunState() == JobStatus.RUNNING) {
KillQueue killQueue = new KillQueue();
killQueue.name = queueJob.name;
killQueue.startTime = job.getStartTime();
killQueue.quota = quota;
killQueues.put(killQueue, killQueue);
}
}
}
return killQueues.values();
}
private void killMapRedTasks(int killTasks, Map<String,QueueQuota> queueQuota,
boolean map) {
int killed = 0;
// sort queues exceeding quota in reverse order of time since starting
// a running job
Collection<KillQueue> killQueues = getKillQueues(queueQuota);
for (KillQueue killQueue : killQueues) {
if (killed == killTasks) {
return;
}
QueueQuota quota = killQueue.quota;
// don't kill more than needed and not more than quota exceeded
int toKill = Math.min(killTasks-killed,-quota.quota);
killQueueTasks(quota.name, toKill, map);
killed += toKill;
}
}
private String taskType(boolean map) {
return (map) ? "MAP" : "REDUCE";
}
private void killQueueTasks(String queue, int killTasks, boolean map) {
if (killTasks == 0) {
return;
}
if (debug) {
LOG.debug("trying to kill " + Integer.toString(killTasks) +
" tasks from queue " + queue + " " + taskType(map));
}
int killed = 0;
Collection<JobInProgress> jobs = getJobs(queue);
if (debug) {
LOG.debug("total jobs to kill from " + Integer.toString(jobs.size()) +
" " + taskType(map));
}
for (JobInProgress job : jobs) {
TaskInProgress tasks[] = (map) ? job.maps.clone() :
job.reduces.clone();
if (sortTasks) {
Arrays.sort(tasks, TASK_COMPARATOR);
}
if (debug) {
LOG.debug("total tasks to kill from " +
Integer.toString(tasks.length) + " " + taskType(map));
}
for (int i=0; i < tasks.length; i++) {
if (debug) {
LOG.debug("total active tasks to kill from " +
Integer.toString(tasks[i].getActiveTasks().keySet().size()) +
" " + taskType(map));
}
for (TaskAttemptID id: tasks[i].getActiveTasks().keySet()) {
if (tasks[i].isCommitPending(id)) {
continue;
}
tasks[i].killTask(id, false);
if (debug) {
LOG.debug("killed task " + id + " progress " +
Double.toString(tasks[i].getProgress()) +
" start time " + Long.toString(tasks[i].getExecStartTime()) +
" " + taskType(map));
}
killed += 1;
if (killed == killTasks) {
return;
}
}
}
}
}
@Override
public List<Task> assignTasks(TaskTracker taskTracker)
throws IOException {
long millis = 0;
if (debug) {
millis = System.currentTimeMillis();
}
ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
int numTrackers = clusterStatus.getTaskTrackers();
List<Task> assignedTasks = new ArrayList<Task>();
assignMapRedTasks(assignedTasks, taskTracker.getStatus(), numTrackers, MAP);
assignMapRedTasks(assignedTasks, taskTracker.getStatus(), numTrackers, REDUCE);
if (debug) {
long elapsed = System.currentTimeMillis() - millis;
LOG.debug("assigned total tasks: " +
Integer.toString(assignedTasks.size()) + " in " +
Long.toString(elapsed) + " ms");
}
return assignedTasks;
}
@Override
public Collection<JobInProgress> getJobs(String queueName) {
QueueJobs jobs = queueJobs.get(queueName);
if (jobs == null) {
return new ArrayList<JobInProgress>();
}
return jobs.jobs;
}
private String getQueue(JobInProgress job) {
JobConf conf = job.getJobConf();
return conf.getQueueName();
}
private String getUser(JobInProgress job) {
JobConf conf = job.getJobConf();
return conf.getUser();
}
private String authorize(JobInProgress job) {
JobConf conf = job.getJobConf();
String user = conf.getUser();
String queue = conf.getQueueName();
if (!user.equals(queue)) {
return "";
}
String timestamp = conf.get("mapred.job.timestamp");
String signature = conf.get("mapred.job.signature");
int role = auth.authorize("&user=" + user + "&timestamp=" + timestamp,
signature, user, timestamp);
if (role != PriorityAuthorization.NO_ACCESS) {
return queue;
}
return "";
}
}