blob: ecaeae35ca8ef08f1c42473841cd80a77a37c8b0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.xml.parsers.ParserConfigurationException;
import com.google.common.collect.ImmutableList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.xml.sax.SAXException;
import com.google.common.base.CharMatcher;
import com.google.common.annotations.VisibleForTesting;
import java.util.Iterator;
import java.util.Set;
/**
* Maintains a list of queues as well as scheduling parameters for each queue,
* such as guaranteed share allocations, from the fair scheduler config file.
*/
@Private
@Unstable
public class QueueManager {
public static final Log LOG = LogFactory.getLog(
QueueManager.class.getName());
public static final String ROOT_QUEUE = "root";
private final FairScheduler scheduler;
private final Collection<FSLeafQueue> leafQueues =
new CopyOnWriteArrayList<FSLeafQueue>();
private final Map<String, FSQueue> queues = new HashMap<String, FSQueue>();
private FSParentQueue rootQueue;
public QueueManager(FairScheduler scheduler) {
this.scheduler = scheduler;
}
public FSParentQueue getRootQueue() {
return rootQueue;
}
public void initialize(Configuration conf) throws IOException,
SAXException, AllocationConfigurationException, ParserConfigurationException {
// Policies of root and default queue are set to
// SchedulingPolicy.DEFAULT_POLICY since the allocation file hasn't been
// loaded yet.
rootQueue = new FSParentQueue("root", scheduler, null);
queues.put(rootQueue.getName(), rootQueue);
// Create the default queue
getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true);
// Recursively reinitialize to propagate queue properties
rootQueue.reinit(true);
}
/**
* Get a leaf queue by name, creating it if the create param is true and is necessary.
* If the queue is not or can not be a leaf queue, i.e. it already exists as a
* parent queue, or one of the parents in its name is already a leaf queue,
* null is returned.
*
* The root part of the name is optional, so a queue underneath the root
* named "queue1" could be referred to as just "queue1", and a queue named
* "queue2" underneath a parent named "parent1" that is underneath the root
* could be referred to as just "parent1.queue2".
*/
public FSLeafQueue getLeafQueue(String name, boolean create) {
return getLeafQueue(name, create, true);
}
public FSLeafQueue getLeafQueue(
String name,
boolean create,
boolean recomputeSteadyShares) {
FSQueue queue = getQueue(
name,
create,
FSQueueType.LEAF,
recomputeSteadyShares
);
if (queue instanceof FSParentQueue) {
return null;
}
return (FSLeafQueue) queue;
}
/**
* Remove a leaf queue if empty
* @param name name of the queue
* @return true if queue was removed or false otherwise
*/
public boolean removeLeafQueue(String name) {
name = ensureRootPrefix(name);
return removeEmptyIncompatibleQueues(name, FSQueueType.PARENT);
}
/**
* Get a parent queue by name, creating it if the create param is true and is necessary.
* If the queue is not or can not be a parent queue, i.e. it already exists as a
* leaf queue, or one of the parents in its name is already a leaf queue,
* null is returned.
*
* The root part of the name is optional, so a queue underneath the root
* named "queue1" could be referred to as just "queue1", and a queue named
* "queue2" underneath a parent named "parent1" that is underneath the root
* could be referred to as just "parent1.queue2".
*/
public FSParentQueue getParentQueue(String name, boolean create) {
return getParentQueue(name, create, true);
}
public FSParentQueue getParentQueue(
String name,
boolean create,
boolean recomputeSteadyShares) {
FSQueue queue = getQueue(
name,
create,
FSQueueType.PARENT,
recomputeSteadyShares
);
if (queue instanceof FSLeafQueue) {
return null;
}
return (FSParentQueue) queue;
}
private FSQueue getQueue(
String name,
boolean create,
FSQueueType queueType,
boolean recomputeSteadyShares) {
boolean recompute = recomputeSteadyShares;
name = ensureRootPrefix(name);
FSQueue queue;
synchronized (queues) {
queue = queues.get(name);
if (queue == null && create) {
// if the queue doesn't exist,create it and return
queue = createQueue(name, queueType);
} else {
recompute = false;
}
}
if (recompute) {
rootQueue.recomputeSteadyShares();
}
return queue;
}
/**
* Create a leaf or parent queue based on what is specified in
* {@code queueType} and place it in the tree. Create any parents that don't
* already exist.
*
* @return the created queue, if successful or null if not allowed (one of the
* parent queues in the queue name is already a leaf queue)
*/
@VisibleForTesting
FSQueue createQueue(String name, FSQueueType queueType) {
List<String> newQueueNames = new ArrayList<>();
FSParentQueue parent = buildNewQueueList(name, newQueueNames);
FSQueue queue = null;
if (parent != null) {
// Now that we know everything worked out, make all the queues
// and add them to the map.
queue = createNewQueues(queueType, parent, newQueueNames);
}
return queue;
}
/**
* Compile a list of all parent queues of the given queue name that do not
* already exist. The queue names will be added to the {@code newQueueNames}
* list. The list will be in order of increasing queue depth. The first
* element of the list will be the parent closest to the root. The last
* element added will be the queue to be created. This method returns the
* deepest parent that does exist.
*
* @param name the fully qualified name of the queue to create
* @param newQueueNames the list to which to add non-existent queues
* @return the deepest existing parent queue
*/
private FSParentQueue buildNewQueueList(String name,
List<String> newQueueNames) {
newQueueNames.add(name);
int sepIndex = name.length();
FSParentQueue parent = null;
// Move up the queue tree until we reach one that exists.
while (sepIndex != -1) {
int prevSepIndex = sepIndex;
sepIndex = name.lastIndexOf('.', sepIndex-1);
String node = name.substring(sepIndex+1, prevSepIndex);
if (!isQueueNameValid(node)) {
throw new InvalidQueueNameException("Illegal node name at offset " +
(sepIndex+1) + " for queue name " + name);
}
String curName = name.substring(0, sepIndex);
FSQueue queue = queues.get(curName);
if (queue == null) {
newQueueNames.add(0, curName);
} else {
if (queue instanceof FSParentQueue) {
parent = (FSParentQueue)queue;
}
// If the queue isn't a parent queue, parent will still be null when
// we break
break;
}
}
return parent;
}
/**
* Create all queues in the {@code newQueueNames} list. The list must be in
* order of increasing depth. All but the last element in the list will be
* created as parent queues. The last element will be created as the type
* specified by the {@code queueType} parameter. The first queue will be
* created as a child of the {@code topParent} queue. All subsequent queues
* will be created as a child of the previously created queue.
*
* @param queueType the type of the last queue to create
* @param topParent the parent of the first queue to create
* @param newQueueNames the list of queues to create
* @return the last queue created
*/
private FSQueue createNewQueues(FSQueueType queueType,
FSParentQueue topParent, List<String> newQueueNames) {
AllocationConfiguration queueConf = scheduler.getAllocationConfiguration();
Iterator<String> i = newQueueNames.iterator();
FSParentQueue parent = topParent;
FSQueue queue = null;
while (i.hasNext()) {
FSParentQueue newParent = null;
String queueName = i.next();
// Check if child policy is allowed
SchedulingPolicy childPolicy = scheduler.getAllocationConfiguration().
getSchedulingPolicy(queueName);
if (!parent.getPolicy().isChildPolicyAllowed(childPolicy)) {
LOG.error("Can't create queue '" + queueName + "'.");
return null;
}
// Only create a leaf queue at the very end
if (!i.hasNext() && (queueType != FSQueueType.PARENT)) {
FSLeafQueue leafQueue = new FSLeafQueue(queueName, scheduler, parent);
leafQueues.add(leafQueue);
queue = leafQueue;
} else {
if (childPolicy instanceof FifoPolicy) {
LOG.error("Can't create queue '" + queueName + "', since "
+ FifoPolicy.NAME + " is only for leaf queues.");
return null;
}
newParent = new FSParentQueue(queueName, scheduler, parent);
queue = newParent;
}
parent.addChildQueue(queue);
setChildResourceLimits(parent, queue, queueConf);
queues.put(queue.getName(), queue);
// If we just created a leaf node, the newParent is null, but that's OK
// because we only create a leaf node in the very last iteration.
parent = newParent;
}
return queue;
}
/**
* For the given child queue, set the max resources based on the
* parent queue's default child resource settings. This method assumes that
* the child queue is ad hoc and hence does not do any safety checks around
* overwriting existing max resource settings.
*
* @param parent the parent queue
* @param child the child queue
* @param queueConf the {@link AllocationConfiguration}
*/
void setChildResourceLimits(FSParentQueue parent, FSQueue child,
AllocationConfiguration queueConf) {
Map<FSQueueType, Set<String>> configuredQueues =
queueConf.getConfiguredQueues();
// Ad hoc queues do not exist in the configured queues map
if (!configuredQueues.get(FSQueueType.LEAF).contains(child.getName()) &&
!configuredQueues.get(FSQueueType.PARENT).contains(child.getName())) {
// For ad hoc queues, set their max reource allocations based on
// their parents' default child settings.
ConfigurableResource maxChild = parent.getMaxChildQueueResource();
if (maxChild != null) {
child.setMaxShare(maxChild);
}
}
}
/**
* Make way for the given queue if possible, by removing incompatible
* queues with no apps in them. Incompatibility could be due to
* (1) queueToCreate being currently a parent but needs to change to leaf
* (2) queueToCreate being currently a leaf but needs to change to parent
* (3) an existing leaf queue in the ancestry of queueToCreate.
*
* We will never remove the root queue or the default queue in this way.
*
* @return true if we can create queueToCreate or it already exists.
*/
private boolean removeEmptyIncompatibleQueues(String queueToCreate,
FSQueueType queueType) {
queueToCreate = ensureRootPrefix(queueToCreate);
// Ensure queueToCreate is not root and doesn't have the default queue in its
// ancestry.
if (queueToCreate.equals(ROOT_QUEUE) ||
queueToCreate.startsWith(
ROOT_QUEUE + "." + YarnConfiguration.DEFAULT_QUEUE_NAME + ".")) {
return false;
}
FSQueue queue = queues.get(queueToCreate);
// Queue exists already.
if (queue != null) {
if (queue instanceof FSLeafQueue) {
if (queueType == FSQueueType.LEAF) {
// if queue is already a leaf then return true
return true;
}
// remove incompatibility since queue is a leaf currently
// needs to change to a parent.
return removeQueueIfEmpty(queue);
} else {
if (queueType == FSQueueType.PARENT) {
return true;
}
// If it's an existing parent queue and needs to change to leaf,
// remove it if it's empty.
return removeQueueIfEmpty(queue);
}
}
// Queue doesn't exist already. Check if the new queue would be created
// under an existing leaf queue. If so, try removing that leaf queue.
int sepIndex = queueToCreate.length();
sepIndex = queueToCreate.lastIndexOf('.', sepIndex-1);
while (sepIndex != -1) {
String prefixString = queueToCreate.substring(0, sepIndex);
FSQueue prefixQueue = queues.get(prefixString);
if (prefixQueue != null && prefixQueue instanceof FSLeafQueue) {
return removeQueueIfEmpty(prefixQueue);
}
sepIndex = queueToCreate.lastIndexOf('.', sepIndex-1);
}
return true;
}
/**
* Remove the queue if it and its descendents are all empty.
* @param queue
* @return true if removed, false otherwise
*/
private boolean removeQueueIfEmpty(FSQueue queue) {
if (isEmpty(queue)) {
removeQueue(queue);
return true;
}
return false;
}
/**
* Remove a queue and all its descendents.
*/
private void removeQueue(FSQueue queue) {
synchronized (queues) {
if (queue instanceof FSLeafQueue) {
leafQueues.remove(queue);
} else {
for (FSQueue childQueue:queue.getChildQueues()) {
removeQueue(childQueue);
}
}
queues.remove(queue.getName());
FSParentQueue parent = queue.getParent();
parent.removeChildQueue(queue);
}
}
/**
* Returns true if there are no applications, running or not, in the given
* queue or any of its descendents.
*/
protected boolean isEmpty(FSQueue queue) {
if (queue instanceof FSLeafQueue) {
FSLeafQueue leafQueue = (FSLeafQueue)queue;
return queue.getNumRunnableApps() == 0 &&
leafQueue.getNumNonRunnableApps() == 0;
} else {
for (FSQueue child : queue.getChildQueues()) {
if (!isEmpty(child)) {
return false;
}
}
return true;
}
}
/**
* Gets a queue by name.
*/
public FSQueue getQueue(String name) {
name = ensureRootPrefix(name);
synchronized (queues) {
return queues.get(name);
}
}
/**
* Return whether a queue exists already.
*/
public boolean exists(String name) {
name = ensureRootPrefix(name);
synchronized (queues) {
return queues.containsKey(name);
}
}
/**
* Get a collection of all leaf queues
*/
public Collection<FSLeafQueue> getLeafQueues() {
synchronized (queues) {
return leafQueues;
}
}
/**
* Get a collection of all queues
*/
public Collection<FSQueue> getQueues() {
synchronized (queues) {
return ImmutableList.copyOf(queues.values());
}
}
private String ensureRootPrefix(String name) {
if (!name.startsWith(ROOT_QUEUE + ".") && !name.equals(ROOT_QUEUE)) {
name = ROOT_QUEUE + "." + name;
}
return name;
}
public void updateAllocationConfiguration(AllocationConfiguration queueConf) {
// Create leaf queues and the parent queues in a leaf's ancestry if they do not exist
synchronized (queues) {
// Verify and set scheduling policies for existing queues before creating
// any queue, since we need parent policies to determine if we can create
// its children.
if (!rootQueue.verifyAndSetPolicyFromConf(queueConf)) {
LOG.error("Setting scheduling policies for existing queues failed!");
}
for (String name : queueConf.getConfiguredQueues().get(
FSQueueType.LEAF)) {
if (removeEmptyIncompatibleQueues(name, FSQueueType.LEAF)) {
getLeafQueue(name, true, false);
}
}
// At this point all leaves and 'parents with
// at least one child' would have been created.
// Now create parents with no configured leaf.
for (String name : queueConf.getConfiguredQueues().get(
FSQueueType.PARENT)) {
if (removeEmptyIncompatibleQueues(name, FSQueueType.PARENT)) {
getParentQueue(name, true, false);
}
}
}
// Initialize all queues recursively
rootQueue.reinit(true);
// Update steady fair shares for all queues
rootQueue.recomputeSteadyShares();
}
/**
* Check whether queue name is valid,
* return true if it is valid, otherwise return false.
*/
@VisibleForTesting
boolean isQueueNameValid(String node) {
// use the same white space trim as in QueueMetrics() otherwise things fail
// guava uses a different definition for whitespace than java.
return !node.isEmpty() &&
node.equals(CharMatcher.WHITESPACE.trimFrom(node));
}
}