blob: dceffa1b2deaa24f657d1717219ff2ac2ea05f57 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.List;
import java.util.Properties;
/**
* Hierarchy builder for the CapacityScheduler.
*
*/
class QueueHierarchyBuilder {
static final Log LOG = LogFactory.getLog(QueueHierarchyBuilder.class);
QueueHierarchyBuilder() {
}
/**
* Create a new {@link AbstractQueue}s-hierarchy and set the new queue
* properties in the passed {@link CapacitySchedulerConf}.
*
* @param rootChildren
* @param schedConf
* @return the root {@link AbstractQueue} of the newly created hierarchy.
*/
AbstractQueue createHierarchy(List<JobQueueInfo> rootChildren,
CapacitySchedulerConf schedConf) {
if (LOG.isDebugEnabled()) {
LOG.debug("Root queues defined : ");
for (JobQueueInfo q : rootChildren) {
LOG.debug(q.getQueueName());
}
}
// Create the root.
AbstractQueue newRootAbstractQueue = createRootAbstractQueue();
// Create the complete hierarchy rooted at newRootAbstractQueue
createHierarchy(newRootAbstractQueue, rootChildren, schedConf);
// Distribute any un-configured capacities
newRootAbstractQueue.distributeUnConfiguredCapacity();
return newRootAbstractQueue;
}
static final String TOTAL_CAPACITY_OVERFLOWN_MSG =
"The cumulative capacity for the queues (%s) at the same level "
+ "has overflown over 100%% at %f%%";
/**
* Recursively create a complete AbstractQueues-hierarchy. 'Parent' is the
* root of the hierarchy. 'Children' is the immediate children of the 'parent'
* and may in-turn be the parent of further child queues. Any JobQueueInfo
* which doesn't have any more children is used to create a JobQueue in the
* AbstractQueues-hierarchy and every other AbstractQueue is used to create a
* ContainerQueue.
*
* <p>
*
* While creating the hierarchy, we make sure at each level that the total
* capacity of all the children at that level doesn't cross 100%
*
* @param parent the queue that will be the root of the new hierarchy.
* @param children the immediate children of the 'parent'
* @param schedConfig Configuration object to which the new queue
* properties are set. The new queue properties are set with key
* names obtained by expanding the queue-names to reflect the whole
* hierarchy.
*/
private void createHierarchy(AbstractQueue parent,
List<JobQueueInfo> children, CapacitySchedulerConf schedConfig) {
//check if children have further childrens.
if (children != null && !children.isEmpty()) {
float totalCapacity = 0.0f;
for (JobQueueInfo qs : children) {
//Check if this child has any more children.
List<JobQueueInfo> childQueues = qs.getChildren();
if (childQueues != null && childQueues.size() > 0) {
//generate a new ContainerQueue and recursively
//create hierarchy.
AbstractQueue cq =
new ContainerQueue(parent, loadContext(qs.getProperties(),
qs.getQueueName(), schedConfig));
//update totalCapacity
totalCapacity += cq.qsc.getCapacityPercent();
LOG.info("Created a ContainerQueue " + qs.getQueueName()
+ " and added it as a child to " + parent.getName());
//create child hiearchy
createHierarchy(cq, childQueues, schedConfig);
} else {
//if not this is a JobQueue.
//create a JobQueue.
AbstractQueue jq =
new JobQueue(parent, loadContext(qs.getProperties(),
qs.getQueueName(), schedConfig));
totalCapacity += jq.qsc.getCapacityPercent();
LOG.info("Created a jobQueue " + qs.getQueueName()
+ " and added it as a child to " + parent.getName());
}
}
//check for totalCapacity at each level , the total for children
//shouldn't cross 100.
if (totalCapacity > 100.0) {
StringBuilder childQueueNames = new StringBuilder();
for (JobQueueInfo child : children) {
childQueueNames.append(child.getQueueName()).append(",");
}
throw new IllegalArgumentException(String.format(
TOTAL_CAPACITY_OVERFLOWN_MSG,
childQueueNames.toString().substring(0,
childQueueNames.toString().length() - 1),
Float.valueOf(totalCapacity)));
}
}
}
/**
* Create a new {@link QueueSchedulingContext} from the given props. Also set
* these properties in the passed scheduler configuration object.
*
* @param props Properties to be set in the {@link QueueSchedulingContext}
* @param queueName Queue name
* @param schedConf Scheduler configuration object to set the properties in.
* @return the generated {@link QueueSchedulingContext} object
*/
private QueueSchedulingContext loadContext(Properties props,
String queueName, CapacitySchedulerConf schedConf) {
schedConf.setProperties(queueName,props);
float capacity = schedConf.getCapacity(queueName);
float stretchCapacity = schedConf.getMaxCapacity(queueName);
if (capacity == -1.0) {
LOG.info("No capacity specified for queue " + queueName);
}
int ulMin = schedConf.getMinimumUserLimitPercent(queueName);
// create our QSC and add to our hashmap
QueueSchedulingContext qsi = new QueueSchedulingContext(
queueName, capacity, stretchCapacity, ulMin
);
qsi.setSupportsPriorities(
schedConf.isPrioritySupported(
queueName));
return qsi;
}
/**
* Create an {@link AbstractQueue} with an empty
* {@link QueueSchedulingContext}. Can be used to as the root queue to create
* {@link AbstractQueue} hierarchies.
*
* @return a root {@link AbstractQueue}
*/
static AbstractQueue createRootAbstractQueue() {
QueueSchedulingContext rootContext =
new QueueSchedulingContext("", 100, -1, -1);
AbstractQueue root = new ContainerQueue(null, rootContext);
return root;
}
}