blob: cb060b4bbfa4a2b9315763066e52300871915c60 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import org.apache.hadoop.mapreduce.TaskType;
import java.util.Map;
import java.util.HashMap;
/**
* ********************************************************************
* Keeping track of scheduling information for queues
* <p/>
* We need to maintain scheduling information relevant to a queue (its
* name, capacity, etc), along with information specific to
* each kind of task, Map or Reduce (num of running tasks, pending
* tasks etc).
* <p/>
* This scheduling information is used to decide how to allocate
* tasks, redistribute capacity, etc.
* <p/>
* A QueueSchedulingContext(qsc) object represents scheduling information for
* a queue.
* ********************************************************************
*/
public class QueueSchedulingContext {
//Name of this queue
private String queueName;
//Get the maximum capacity of this queue for running map tasks
// in the cluster.
private int mapCapacity;
//Get the maximum capacity of this queue for running reduce tasks
// in the cluster.
private int reduceCapacity;
/**
* capacity(%) is set in the config as
* mapred.capacity-scheduler.queue.<queue-name>.capacity"
* Percentage of the number of slots in the cluster that are
* to be available for jobs in this queue.
*/
private float capacityPercent = 0;
/**
* maxCapacityStretch(%) is set in config as
* mapred.capacity-scheduler.queue.<queue-name>.maximum-capacity
* maximum-capacity-stretch defines a limit beyond which a sub-queue
* cannot use the capacity of its parent queue.
*/
private float maxCapacityPercent = -1;
/**
* to handle user limits, we need to know how many users have jobs in
* the queue.
*/
private Map<String, Integer> numJobsByUser = new HashMap<String, Integer>();
/**
* min value of user limit (same for all users)
*/
private int ulMin;
// whether the queue supports priorities
//default is false
private boolean supportsPriorities = false;
//No of waiting jobs.
private int numOfWaitingJobs = 0;
//State of mapCapacity
private int prevMapCapacity = 0;
//State of reduceCapacity
private int prevReduceCapacity = 0;
/**
* We keep a TaskSchedulingInfo object for each kind of task we support
*/
private TaskSchedulingContext mapTSC;
private TaskSchedulingContext reduceTSC;
QueueSchedulingContext(
String queueName, float capacityPercent, float maxCapacityPercent,
int ulMin) {
this.setQueueName(new String(queueName));
this.setCapacityPercent(capacityPercent);
this.setMaxCapacityPercent(maxCapacityPercent);
this.setUlMin(ulMin);
this.setMapTSC(new TaskSchedulingContext(TaskType.MAP));
this.setReduceTSC(new TaskSchedulingContext(TaskType.REDUCE));
}
/**
* return information about the queue
*
* @return a String representing the information about the queue.
*/
@Override
public String toString() {
// We print out the queue information first, followed by info
// on map and reduce tasks and job info
StringBuffer sb = new StringBuffer();
sb.append("Queue configuration\n");
sb.append("Capacity Percentage: ");
sb.append(getCapacityPercent());
sb.append("%\n");
sb.append(String.format("User Limit: %d%s\n", getUlMin(), "%"));
sb.append(
String.format(
"Priority Supported: %s\n",
(supportsPriorities()) ?
"YES" : "NO"));
sb.append("-------------\n");
sb.append("Map tasks\n");
sb.append(getMapTSC().toString());
sb.append("-------------\n");
sb.append("Reduce tasks\n");
sb.append(getReduceTSC().toString());
sb.append("-------------\n");
sb.append("Job info\n");
sb.append(
String.format(
"Number of Waiting Jobs: %d\n",
this.getNumOfWaitingJobs()));
sb.append(
String.format(
"Number of users who have submitted jobs: %d\n",
getNumJobsByUser().size()));
return sb.toString();
}
String getQueueName() {
return queueName;
}
void setQueueName(String queueName) {
this.queueName = queueName;
}
int getMapCapacity() {
return mapCapacity;
}
void setMapCapacity(int mapCapacity) {
this.mapCapacity = mapCapacity;
}
int getReduceCapacity() {
return reduceCapacity;
}
void setReduceCapacity(int reduceCapacity) {
this.reduceCapacity = reduceCapacity;
}
float getCapacityPercent() {
return capacityPercent;
}
void setCapacityPercent(float capacityPercent) {
this.capacityPercent = capacityPercent;
}
Map<String, Integer> getNumJobsByUser() {
return numJobsByUser;
}
void setNumJobsByUser(Map<String, Integer> numJobsByUser) {
this.numJobsByUser = numJobsByUser;
}
int getUlMin() {
return ulMin;
}
void setUlMin(int ulMin) {
this.ulMin = ulMin;
}
TaskSchedulingContext getMapTSC() {
return mapTSC;
}
void setMapTSC(TaskSchedulingContext mapTSC) {
this.mapTSC = mapTSC;
}
TaskSchedulingContext getReduceTSC() {
return reduceTSC;
}
void setReduceTSC(TaskSchedulingContext reduceTSC) {
this.reduceTSC = reduceTSC;
}
boolean supportsPriorities() {
return supportsPriorities;
}
void setSupportsPriorities(boolean supportsPriorities) {
this.supportsPriorities = supportsPriorities;
}
int getNumOfWaitingJobs() {
return numOfWaitingJobs;
}
void setNumOfWaitingJobs(int numOfWaitingJobs) {
this.numOfWaitingJobs = numOfWaitingJobs;
}
float getMaxCapacityPercent() {
return maxCapacityPercent;
}
void setMaxCapacityPercent(float maxCapacityPercent) {
this.maxCapacityPercent = maxCapacityPercent;
}
void updateContext(int mapClusterCapacity , int reduceClusterCapacity) {
setMapCapacity(mapClusterCapacity);
setReduceCapacity(reduceClusterCapacity);
// if # of slots have changed since last time, update.
// First, compute whether the total number of TT slots have changed
// compute new capacities, if TT slots have changed
if (getMapCapacity() != prevMapCapacity) {
getMapTSC().setCapacity(
(int)
(getCapacityPercent() * getMapCapacity() / 100));
//Check if max capacity percent is set for this queue.
//if yes then set the maxcapacity for this queue.
if (getMaxCapacityPercent() > 0) {
getMapTSC().setMaxCapacity(
(int) (getMaxCapacityPercent() * getMapCapacity() /
100)
);
}
}
//REDUCES
if (getReduceCapacity() != prevReduceCapacity) {
getReduceTSC().setCapacity(
(int)
(getCapacityPercent() * getReduceCapacity() / 100));
//set stretch capacity for reduce
//check if max capacity percent is set for this QueueSchedulingContext.
//if yes then set the maxCapacity for this JobQueue.
if (getMaxCapacityPercent() > 0) {
getReduceTSC().setMaxCapacity(
(int) (getMaxCapacityPercent() * getReduceCapacity() /
100));
}
}
// reset running/pending tasks, tasks per user
getMapTSC().resetTaskVars();
getReduceTSC().resetTaskVars();
// update stats on running jobs
prevMapCapacity = getMapCapacity();
prevReduceCapacity = getReduceCapacity();
}
}