blob: 490ba686598f0a7b5a6a91a87a6e9ea754c4ee48 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.xml.sax.SAXException;
/**
* Maintains a list of queues as well as scheduling parameters for each queue,
* such as guaranteed share allocations, from the fair scheduler config file.
*
*/
@Private
@Unstable
public class QueueManager {
public static final Log LOG = LogFactory.getLog(
QueueManager.class.getName());
public static final String ROOT_QUEUE = "root";
private final FairScheduler scheduler;
private final Collection<FSLeafQueue> leafQueues =
new CopyOnWriteArrayList<FSLeafQueue>();
private final Map<String, FSQueue> queues = new HashMap<String, FSQueue>();
private FSParentQueue rootQueue;
public QueueManager(FairScheduler scheduler) {
this.scheduler = scheduler;
}
public FSParentQueue getRootQueue() {
return rootQueue;
}
public void initialize(Configuration conf) throws IOException,
SAXException, AllocationConfigurationException, ParserConfigurationException {
rootQueue = new FSParentQueue("root", scheduler, null);
queues.put(rootQueue.getName(), rootQueue);
// Create the default queue
getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true);
}
/**
* Get a leaf queue by name, creating it if the create param is true and is necessary.
* If the queue is not or can not be a leaf queue, i.e. it already exists as a
* parent queue, or one of the parents in its name is already a leaf queue,
* null is returned.
*
* The root part of the name is optional, so a queue underneath the root
* named "queue1" could be referred to as just "queue1", and a queue named
* "queue2" underneath a parent named "parent1" that is underneath the root
* could be referred to as just "parent1.queue2".
*/
public FSLeafQueue getLeafQueue(String name, boolean create) {
FSQueue queue = getQueue(name, create, FSQueueType.LEAF);
if (queue instanceof FSParentQueue) {
return null;
}
return (FSLeafQueue) queue;
}
/**
* Get a parent queue by name, creating it if the create param is true and is necessary.
* If the queue is not or can not be a parent queue, i.e. it already exists as a
* leaf queue, or one of the parents in its name is already a leaf queue,
* null is returned.
*
* The root part of the name is optional, so a queue underneath the root
* named "queue1" could be referred to as just "queue1", and a queue named
* "queue2" underneath a parent named "parent1" that is underneath the root
* could be referred to as just "parent1.queue2".
*/
public FSParentQueue getParentQueue(String name, boolean create) {
FSQueue queue = getQueue(name, create, FSQueueType.PARENT);
if (queue instanceof FSLeafQueue) {
return null;
}
return (FSParentQueue) queue;
}
private FSQueue getQueue(String name, boolean create, FSQueueType queueType) {
name = ensureRootPrefix(name);
synchronized (queues) {
FSQueue queue = queues.get(name);
if (queue == null && create) {
// if the queue doesn't exist,create it and return
queue = createQueue(name, queueType);
// Update steady fair share for all queues
if (queue != null) {
rootQueue.recomputeSteadyShares();
}
}
return queue;
}
}
/**
* Creates a leaf or parent queue based on what is specified in 'queueType'
* and places it in the tree. Creates any parents that don't already exist.
*
* @return
* the created queue, if successful. null if not allowed (one of the parent
* queues in the queue name is already a leaf queue)
*/
private FSQueue createQueue(String name, FSQueueType queueType) {
List<String> newQueueNames = new ArrayList<String>();
newQueueNames.add(name);
int sepIndex = name.length();
FSParentQueue parent = null;
// Move up the queue tree until we reach one that exists.
while (sepIndex != -1) {
sepIndex = name.lastIndexOf('.', sepIndex-1);
FSQueue queue;
String curName = null;
curName = name.substring(0, sepIndex);
queue = queues.get(curName);
if (queue == null) {
newQueueNames.add(curName);
} else {
if (queue instanceof FSParentQueue) {
parent = (FSParentQueue)queue;
break;
} else {
return null;
}
}
}
// At this point, parent refers to the deepest existing parent of the
// queue to create.
// Now that we know everything worked out, make all the queues
// and add them to the map.
AllocationConfiguration queueConf = scheduler.getAllocationConfiguration();
FSLeafQueue leafQueue = null;
for (int i = newQueueNames.size()-1; i >= 0; i--) {
String queueName = newQueueNames.get(i);
if (i == 0 && queueType != FSQueueType.PARENT) {
leafQueue = new FSLeafQueue(name, scheduler, parent);
try {
leafQueue.setPolicy(queueConf.getDefaultSchedulingPolicy());
} catch (AllocationConfigurationException ex) {
LOG.warn("Failed to set default scheduling policy "
+ queueConf.getDefaultSchedulingPolicy() + " on new leaf queue.", ex);
}
parent.addChildQueue(leafQueue);
queues.put(leafQueue.getName(), leafQueue);
leafQueues.add(leafQueue);
return leafQueue;
} else {
FSParentQueue newParent = new FSParentQueue(queueName, scheduler, parent);
try {
newParent.setPolicy(queueConf.getDefaultSchedulingPolicy());
} catch (AllocationConfigurationException ex) {
LOG.warn("Failed to set default scheduling policy "
+ queueConf.getDefaultSchedulingPolicy() + " on new parent queue.", ex);
}
parent.addChildQueue(newParent);
queues.put(newParent.getName(), newParent);
parent = newParent;
}
}
return parent;
}
/**
* Make way for the given queue if possible, by removing incompatible
* queues with no apps in them. Incompatibility could be due to
* (1) queueToCreate being currently a parent but needs to change to leaf
* (2) queueToCreate being currently a leaf but needs to change to parent
* (3) an existing leaf queue in the ancestry of queueToCreate.
*
* We will never remove the root queue or the default queue in this way.
*
* @return true if we can create queueToCreate or it already exists.
*/
private boolean removeEmptyIncompatibleQueues(String queueToCreate,
FSQueueType queueType) {
queueToCreate = ensureRootPrefix(queueToCreate);
// Ensure queueToCreate is not root and doesn't have the default queue in its
// ancestry.
if (queueToCreate.equals(ROOT_QUEUE) ||
queueToCreate.startsWith(
ROOT_QUEUE + "." + YarnConfiguration.DEFAULT_QUEUE_NAME + ".")) {
return false;
}
FSQueue queue = queues.get(queueToCreate);
// Queue exists already.
if (queue != null) {
if (queue instanceof FSLeafQueue) {
if (queueType == FSQueueType.LEAF) {
// if queue is already a leaf then return true
return true;
}
// remove incompatibility since queue is a leaf currently
// needs to change to a parent.
return removeQueueIfEmpty(queue);
} else {
if (queueType == FSQueueType.PARENT) {
return true;
}
// If it's an existing parent queue and needs to change to leaf,
// remove it if it's empty.
return removeQueueIfEmpty(queue);
}
}
// Queue doesn't exist already. Check if the new queue would be created
// under an existing leaf queue. If so, try removing that leaf queue.
int sepIndex = queueToCreate.length();
sepIndex = queueToCreate.lastIndexOf('.', sepIndex-1);
while (sepIndex != -1) {
String prefixString = queueToCreate.substring(0, sepIndex);
FSQueue prefixQueue = queues.get(prefixString);
if (prefixQueue != null && prefixQueue instanceof FSLeafQueue) {
return removeQueueIfEmpty(prefixQueue);
}
sepIndex = queueToCreate.lastIndexOf('.', sepIndex-1);
}
return true;
}
/**
* Remove the queue if it and its descendents are all empty.
* @param queue
* @return true if removed, false otherwise
*/
private boolean removeQueueIfEmpty(FSQueue queue) {
if (isEmpty(queue)) {
removeQueue(queue);
return true;
}
return false;
}
/**
* Remove a queue and all its descendents.
*/
private void removeQueue(FSQueue queue) {
if (queue instanceof FSLeafQueue) {
leafQueues.remove(queue);
} else {
List<FSQueue> childQueues = queue.getChildQueues();
while (!childQueues.isEmpty()) {
removeQueue(childQueues.get(0));
}
}
queues.remove(queue.getName());
queue.getParent().getChildQueues().remove(queue);
}
/**
* Returns true if there are no applications, running or not, in the given
* queue or any of its descendents.
*/
protected boolean isEmpty(FSQueue queue) {
if (queue instanceof FSLeafQueue) {
FSLeafQueue leafQueue = (FSLeafQueue)queue;
return queue.getNumRunnableApps() == 0 &&
leafQueue.getNonRunnableAppSchedulables().isEmpty();
} else {
for (FSQueue child : queue.getChildQueues()) {
if (!isEmpty(child)) {
return false;
}
}
return true;
}
}
/**
* Gets a queue by name.
*/
public FSQueue getQueue(String name) {
name = ensureRootPrefix(name);
synchronized (queues) {
return queues.get(name);
}
}
/**
* Return whether a queue exists already.
*/
public boolean exists(String name) {
name = ensureRootPrefix(name);
synchronized (queues) {
return queues.containsKey(name);
}
}
/**
* Get a collection of all leaf queues
*/
public Collection<FSLeafQueue> getLeafQueues() {
synchronized (queues) {
return leafQueues;
}
}
/**
* Get a collection of all queues
*/
public Collection<FSQueue> getQueues() {
return queues.values();
}
private String ensureRootPrefix(String name) {
if (!name.startsWith(ROOT_QUEUE + ".") && !name.equals(ROOT_QUEUE)) {
name = ROOT_QUEUE + "." + name;
}
return name;
}
public void updateAllocationConfiguration(AllocationConfiguration queueConf) {
// Create leaf queues and the parent queues in a leaf's ancestry if they do not exist
for (String name : queueConf.getConfiguredQueues().get(FSQueueType.LEAF)) {
if (removeEmptyIncompatibleQueues(name, FSQueueType.LEAF)) {
getLeafQueue(name, true);
}
}
// At this point all leaves and 'parents with at least one child' would have been created.
// Now create parents with no configured leaf.
for (String name : queueConf.getConfiguredQueues().get(
FSQueueType.PARENT)) {
if (removeEmptyIncompatibleQueues(name, FSQueueType.PARENT)) {
getParentQueue(name, true);
}
}
for (FSQueue queue : queues.values()) {
// Update queue metrics
FSQueueMetrics queueMetrics = queue.getMetrics();
queueMetrics.setMinShare(queue.getMinShare());
queueMetrics.setMaxShare(queue.getMaxShare());
// Set scheduling policies
try {
SchedulingPolicy policy = queueConf.getSchedulingPolicy(queue.getName());
policy.initialize(scheduler.getClusterResource());
queue.setPolicy(policy);
} catch (AllocationConfigurationException ex) {
LOG.warn("Cannot apply configured scheduling policy to queue "
+ queue.getName(), ex);
}
}
// Update steady fair shares for all queues
rootQueue.recomputeSteadyShares();
}
}