blob: 68efc64eaa5b09434a4d4897d821c43190c98282 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import org.apache.hadoop.mapreduce.TaskType;
import java.util.Iterator;
import java.util.Map;
import java.util.HashMap;
import java.util.Set;
/**
* ********************************************************************
* Keeping track of scheduling information for queues
* <p/>
* Maintain information specific to
* each kind of task, Map or Reduce (num of running tasks, pending
* tasks etc).
* <p/>
* This scheduling information is used to decide how to allocate
* tasks, redistribute capacity, etc.
* <p/>
* A TaskSchedulingContext (TSI) object represents scheduling
* information for a particular kind of task (Map or Reduce).
* <p/>
* ********************************************************************
*/
public class TaskSchedulingContext {
/**
* the actual capacity, which depends on how many slots are available
* in the cluster at any given time.
*/
private int capacity = 0;
// number of running tasks
private int numRunningTasks = 0;
// number of slots occupied by running tasks
private int numSlotsOccupied = 0;
//the actual capacity stretch which depends on how many slots are available
//in cluster at any given time.
private int maxCapacity = -1;
/**
* for each user, we need to keep track of number of slots occupied by
* running tasks
*/
private Map<String, Integer> numSlotsOccupiedByUser =
new HashMap<String, Integer>();
/**
* reset the variables associated with tasks
*/
void resetTaskVars() {
setNumRunningTasks(0);
setNumSlotsOccupied(0);
for (String s : getNumSlotsOccupiedByUser().keySet()) {
getNumSlotsOccupiedByUser().put(s, Integer.valueOf(0));
}
}
/**
* returns the capacity of queue as no of slots.
* @return
*/
int getCapacity() {
return capacity;
}
/**
* Mutator method for capacity
*
* @param capacity
*/
void setCapacity(int capacity) {
this.capacity = capacity;
}
/**
* return information about the tasks
*/
@Override
public String toString() {
float occupiedSlotsAsPercent =
getCapacity() != 0 ?
((float) getNumSlotsOccupied() * 100 / getCapacity()) : 0;
StringBuffer sb = new StringBuffer();
sb.append("Capacity: " + getCapacity() + " slots\n");
if(getMaxCapacity() >= 0) {
sb.append("Maximum capacity: " + getMaxCapacity() +" slots\n");
}
sb.append(
String.format(
"Used capacity: %d (%.1f%% of Capacity)\n",
Integer.valueOf(getNumSlotsOccupied()), Float
.valueOf(occupiedSlotsAsPercent)));
sb.append(
String.format(
"Running tasks: %d\n", Integer
.valueOf(getNumRunningTasks())));
// include info on active users
if (getNumSlotsOccupied() != 0) {
sb.append("Active users:\n");
for (Map.Entry<String, Integer> entry : getNumSlotsOccupiedByUser()
.entrySet()) {
if ((entry.getValue() == null) ||
(entry.getValue().intValue() <= 0)) {
// user has no tasks running
continue;
}
sb.append("User '" + entry.getKey() + "': ");
int numSlotsOccupiedByThisUser = entry.getValue().intValue();
float p =
(float) numSlotsOccupiedByThisUser * 100 / getNumSlotsOccupied();
sb.append(
String.format(
"%d (%.1f%% of used capacity)\n", Long
.valueOf(numSlotsOccupiedByThisUser), Float.valueOf(p)));
}
}
return sb.toString();
}
int getNumRunningTasks() {
return numRunningTasks;
}
void setNumRunningTasks(int numRunningTasks) {
this.numRunningTasks = numRunningTasks;
}
int getNumSlotsOccupied() {
return numSlotsOccupied;
}
void setNumSlotsOccupied(int numSlotsOccupied) {
this.numSlotsOccupied = numSlotsOccupied;
}
Map<String, Integer> getNumSlotsOccupiedByUser() {
return numSlotsOccupiedByUser;
}
void setNumSlotsOccupiedByUser(
Map<String, Integer> numSlotsOccupiedByUser) {
this.numSlotsOccupiedByUser = numSlotsOccupiedByUser;
}
int getMaxCapacity() {
return maxCapacity;
}
void setMaxCapacity(int maxCapacity) {
this.maxCapacity = maxCapacity;
}
void update(TaskSchedulingContext tc) {
this.numSlotsOccupied += tc.numSlotsOccupied;
this.numRunningTasks += tc.numRunningTasks;
//this.maxTaskLimit += tc.maxTaskLimit;
updateNoOfSlotsOccupiedByUser(tc.numSlotsOccupiedByUser);
}
private void updateNoOfSlotsOccupiedByUser(Map<String, Integer> nou) {
for (Iterator<Map.Entry<String, Integer>> it = nou.entrySet().iterator(); it.hasNext(); ) {
Map.Entry<String, Integer> entry = it.next();
String key = entry.getKey();
Integer currentVal = numSlotsOccupiedByUser.get(key);
if (currentVal != null) {
this.numSlotsOccupiedByUser.put(key, currentVal + entry.getValue());
}
}
}
}