| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| |
| import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; |
| import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; |
| import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.classification.InterfaceStability.Unstable; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; |
| |
| /** |
| * Maintains a list of queues as well as scheduling parameters for each queue, |
| * such as guaranteed share allocations, from the fair scheduler config file. |
| */ |
| @Private |
| @Unstable |
| public class QueueManager { |
| private static final Logger LOG = |
| LoggerFactory.getLogger(QueueManager.class.getName()); |
| |
| private final class IncompatibleQueueRemovalTask { |
| |
| private final String queueToCreate; |
| private final FSQueueType queueType; |
| |
| private IncompatibleQueueRemovalTask(String queueToCreate, |
| FSQueueType queueType) { |
| this.queueToCreate = queueToCreate; |
| this.queueType = queueType; |
| } |
| |
| private void execute() { |
| Boolean removed = |
| removeEmptyIncompatibleQueues(queueToCreate, queueType).orElse(null); |
| if (Boolean.TRUE.equals(removed)) { |
| FSQueue queue = getQueue(queueToCreate, true, queueType, false, null); |
| if (queue != null && |
| // if queueToCreate is present in the allocation config, set it |
| // to static |
| scheduler.allocConf.configuredQueues.values().stream() |
| .anyMatch(s -> s.contains(queueToCreate))) { |
| queue.setDynamic(false); |
| } |
| } |
| if (!Boolean.FALSE.equals(removed)) { |
| incompatibleQueuesPendingRemoval.remove(this); |
| } |
| } |
| } |
| |
| public static final String ROOT_QUEUE = "root"; |
| |
| private final FairScheduler scheduler; |
| |
| private final Collection<FSLeafQueue> leafQueues = |
| new CopyOnWriteArrayList<>(); |
| private final Map<String, FSQueue> queues = new HashMap<>(); |
| private Set<IncompatibleQueueRemovalTask> incompatibleQueuesPendingRemoval = |
| new HashSet<>(); |
| private FSParentQueue rootQueue; |
| |
| public QueueManager(FairScheduler scheduler) { |
| this.scheduler = scheduler; |
| } |
| |
| public FSParentQueue getRootQueue() { |
| return rootQueue; |
| } |
| |
| public void initialize() { |
| // Policies of root and default queue are set to |
| // SchedulingPolicy.DEFAULT_POLICY since the allocation file hasn't been |
| // loaded yet. |
| rootQueue = new FSParentQueue("root", scheduler, null); |
| rootQueue.setDynamic(false); |
| queues.put(rootQueue.getName(), rootQueue); |
| |
| // Create the default queue |
| FSLeafQueue defaultQueue = |
| getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true); |
| defaultQueue.setDynamic(false); |
| // Recursively reinitialize to propagate queue properties |
| rootQueue.reinit(true); |
| } |
| |
| /** |
| * Get a leaf queue by name, creating it if the create param is |
| * <code>true</code> and the queue does not exist. |
| * If the queue is not or can not be a leaf queue, i.e. it already exists as |
| * a parent queue, or one of the parents in its name is already a leaf queue, |
| * <code>null</code> is returned. |
| * |
| * The root part of the name is optional, so a queue underneath the root |
| * named "queue1" could be referred to as just "queue1", and a queue named |
| * "queue2" underneath a parent named "parent1" that is underneath the root |
| * could be referred to as just "parent1.queue2". |
| * @param name name of the queue |
| * @param create <code>true</code> if the queue must be created if it does |
| * not exist, <code>false</code> otherwise |
| * @return the leaf queue or <code>null</code> if the queue cannot be found |
| */ |
| public FSLeafQueue getLeafQueue(String name, boolean create) { |
| return getLeafQueue(name, create, null, true); |
| } |
| |
| /** |
| * Get a leaf queue by name, creating it if the create param is |
| * <code>true</code> and the queue does not exist. |
| * If the queue is not or can not be a leaf queue, i.e. it already exists as |
| * a parent queue, or one of the parents in its name is already a leaf queue, |
| * <code>null</code> is returned. |
| * |
| * If the application will be assigned to the queue if the applicationId is |
| * not <code>null</code> |
| * @param name name of the queue |
| * @param create <code>true</code> if the queue must be created if it does |
| * not exist, <code>false</code> otherwise |
| * @param applicationId the application ID to assign to the queue |
| * @return the leaf queue or <code>null</code> if teh queue cannot be found |
| */ |
| public FSLeafQueue getLeafQueue(String name, boolean create, |
| ApplicationId applicationId) { |
| return getLeafQueue(name, create, applicationId, true); |
| } |
| |
| private FSLeafQueue getLeafQueue(String name, boolean create, |
| ApplicationId applicationId, |
| boolean recomputeSteadyShares) { |
| FSQueue queue = getQueue(name, create, FSQueueType.LEAF, |
| recomputeSteadyShares, applicationId); |
| if (queue instanceof FSParentQueue) { |
| return null; |
| } |
| return (FSLeafQueue) queue; |
| } |
| |
| /** |
| * Remove a leaf queue if empty. |
| * @param name name of the queue |
| * @return true if queue was removed or false otherwise |
| */ |
| public boolean removeLeafQueue(String name) { |
| name = ensureRootPrefix(name); |
| return !Boolean.FALSE.equals( |
| removeEmptyIncompatibleQueues(name, FSQueueType.PARENT).orElse(null)); |
| } |
| |
| |
| /** |
| * Get a parent queue by name, creating it if the create param is |
| * <code>true</code> and the queue does not exist. |
| * If the queue is not or can not be a parent queue, i.e. it already exists |
| * as a leaf queue, or one of the parents in its name is already a leaf |
| * queue, <code>null</code> is returned. |
| * |
| * The root part of the name is optional, so a queue underneath the root |
| * named "queue1" could be referred to as just "queue1", and a queue named |
| * "queue2" underneath a parent named "parent1" that is underneath the root |
| * could be referred to as just "parent1.queue2". |
| * @param name name of the queue |
| * @param create <code>true</code> if the queue must be created if it does |
| * not exist, <code>false</code> otherwise |
| * @return the parent queue or <code>null</code> if the queue cannot be found |
| */ |
| public FSParentQueue getParentQueue(String name, boolean create) { |
| return getParentQueue(name, create, true); |
| } |
| |
| /** |
| * Get a parent queue by name, creating it if the create param is |
| * <code>true</code> and the queue does not exist. |
| * If the queue is not or can not be a parent queue, i.e. it already exists |
| * as a leaf queue, or one of the parents in its name is already a leaf |
| * queue, <code>null</code> is returned. |
| * |
| * The root part of the name is optional, so a queue underneath the root |
| * named "queue1" could be referred to as just "queue1", and a queue named |
| * "queue2" underneath a parent named "parent1" that is underneath the root |
| * could be referred to as just "parent1.queue2". |
| * @param name name of the queue |
| * @param create <code>true</code> if the queue must be created if it does |
| * not exist, <code>false</code> otherwise |
| * @param recomputeSteadyShares <code>true</code> if the steady fair share |
| * should be recalculated when a queue is added, |
| * <code>false</code> otherwise |
| * @return the parent queue or <code>null</code> if the queue cannot be found |
| */ |
| public FSParentQueue getParentQueue(String name, boolean create, |
| boolean recomputeSteadyShares) { |
| FSQueue queue = getQueue(name, create, FSQueueType.PARENT, |
| recomputeSteadyShares, null); |
| if (queue instanceof FSLeafQueue) { |
| return null; |
| } |
| return (FSParentQueue) queue; |
| } |
| |
| private FSQueue getQueue(String name, boolean create, FSQueueType queueType, |
| boolean recomputeSteadyShares, ApplicationId applicationId) { |
| boolean recompute = recomputeSteadyShares; |
| name = ensureRootPrefix(name); |
| FSQueue queue; |
| synchronized (queues) { |
| queue = queues.get(name); |
| if (queue == null && create) { |
| // if the queue doesn't exist,create it and return |
| queue = createQueue(name, queueType); |
| } else { |
| recompute = false; |
| } |
| // At this point the queue exists and we need to assign the app if to the |
| // but only to a leaf queue |
| if (applicationId != null && queue instanceof FSLeafQueue) { |
| ((FSLeafQueue)queue).addAssignedApp(applicationId); |
| } |
| } |
| // Don't recompute if it is an existing queue or no change was made |
| if (recompute && queue != null) { |
| rootQueue.recomputeSteadyShares(); |
| } |
| return queue; |
| } |
| |
| /** |
| * Create a leaf or parent queue based on what is specified in |
| * {@code queueType} and place it in the tree. Create any parents that don't |
| * already exist. |
| * |
| * @return the created queue, if successful or null if not allowed (one of the |
| * parent queues in the queue name is already a leaf queue) |
| */ |
| @VisibleForTesting |
| FSQueue createQueue(String name, FSQueueType queueType) { |
| List<String> newQueueNames = new ArrayList<>(); |
| FSParentQueue parent = buildNewQueueList(name, newQueueNames); |
| FSQueue queue = null; |
| |
| if (parent != null) { |
| // Now that we know everything worked out, make all the queues |
| // and add them to the map. |
| queue = createNewQueues(queueType, parent, newQueueNames); |
| } |
| |
| return queue; |
| } |
| |
| /** |
| * Compile a list of all parent queues of the given queue name that do not |
| * already exist. The queue names will be added to the {@code newQueueNames} |
| * list. The list will be in order of increasing queue depth. The first |
| * element of the list will be the parent closest to the root. The last |
| * element added will be the queue to be created. This method returns the |
| * deepest parent that does exist. |
| * |
| * @param name the fully qualified name of the queue to create |
| * @param newQueueNames the list to which to add non-existent queues |
| * @return the deepest existing parent queue |
| */ |
| private FSParentQueue buildNewQueueList(String name, |
| List<String> newQueueNames) { |
| newQueueNames.add(name); |
| int sepIndex = name.length(); |
| FSParentQueue parent = null; |
| |
| // Move up the queue tree until we reach one that exists. |
| while (sepIndex != -1) { |
| int prevSepIndex = sepIndex; |
| sepIndex = name.lastIndexOf('.', sepIndex-1); |
| String node = name.substring(sepIndex+1, prevSepIndex); |
| if (!isQueueNameValid(node)) { |
| throw new InvalidQueueNameException("Illegal node name at offset " + |
| (sepIndex+1) + " for queue name " + name); |
| } |
| |
| String curName = name.substring(0, sepIndex); |
| FSQueue queue = queues.get(curName); |
| |
| if (queue == null) { |
| newQueueNames.add(0, curName); |
| } else { |
| if (queue instanceof FSParentQueue) { |
| parent = (FSParentQueue)queue; |
| } |
| |
| // If the queue isn't a parent queue, parent will still be null when |
| // we break |
| |
| break; |
| } |
| } |
| |
| return parent; |
| } |
| |
| /** |
| * Create all queues in the {@code newQueueNames} list. The list must be in |
| * order of increasing depth. All but the last element in the list will be |
| * created as parent queues. The last element will be created as the type |
| * specified by the {@code queueType} parameter. The first queue will be |
| * created as a child of the {@code topParent} queue. All subsequent queues |
| * will be created as a child of the previously created queue. |
| * |
| * @param queueType the type of the last queue to create |
| * @param topParent the parent of the first queue to create |
| * @param newQueueNames the list of queues to create |
| * @return the last queue created |
| */ |
| private FSQueue createNewQueues(FSQueueType queueType, |
| FSParentQueue topParent, List<String> newQueueNames) { |
| AllocationConfiguration queueConf = scheduler.getAllocationConfiguration(); |
| Iterator<String> i = newQueueNames.iterator(); |
| FSParentQueue parent = topParent; |
| FSQueue queue = null; |
| |
| while (i.hasNext()) { |
| FSParentQueue newParent = null; |
| String queueName = i.next(); |
| |
| // Check if child policy is allowed |
| SchedulingPolicy childPolicy = scheduler.getAllocationConfiguration(). |
| getSchedulingPolicy(queueName); |
| if (!parent.getPolicy().isChildPolicyAllowed(childPolicy)) { |
| LOG.error("Can't create queue '" + queueName + "'," + |
| "the child scheduling policy is not allowed by parent queue!"); |
| return null; |
| } |
| |
| // Only create a leaf queue at the very end |
| if (!i.hasNext() && (queueType != FSQueueType.PARENT)) { |
| FSLeafQueue leafQueue = new FSLeafQueue(queueName, scheduler, parent); |
| leafQueues.add(leafQueue); |
| queue = leafQueue; |
| } else { |
| if (childPolicy instanceof FifoPolicy) { |
| LOG.error("Can't create queue '" + queueName + "', since " |
| + FifoPolicy.NAME + " is only for leaf queues."); |
| return null; |
| } |
| newParent = new FSParentQueue(queueName, scheduler, parent); |
| queue = newParent; |
| } |
| |
| parent.addChildQueue(queue); |
| setChildResourceLimits(parent, queue, queueConf); |
| queues.put(queue.getName(), queue); |
| |
| // If we just created a leaf node, the newParent is null, but that's OK |
| // because we only create a leaf node in the very last iteration. |
| parent = newParent; |
| } |
| |
| return queue; |
| } |
| |
| /** |
| * For the given child queue, set the max resources based on the |
| * parent queue's default child resource settings. This method assumes that |
| * the child queue is ad hoc and hence does not do any safety checks around |
| * overwriting existing max resource settings. |
| * |
| * @param parent the parent queue |
| * @param child the child queue |
| * @param queueConf the {@link AllocationConfiguration} |
| */ |
| private void setChildResourceLimits(FSParentQueue parent, FSQueue child, |
| AllocationConfiguration queueConf) { |
| Map<FSQueueType, Set<String>> configuredQueues = |
| queueConf.getConfiguredQueues(); |
| |
| // Ad hoc queues do not exist in the configured queues map |
| if (!configuredQueues.get(FSQueueType.LEAF).contains(child.getName()) && |
| !configuredQueues.get(FSQueueType.PARENT).contains(child.getName())) { |
| // For ad hoc queues, set their max resource allocations based on |
| // their parents' default child settings. |
| ConfigurableResource maxChild = parent.getMaxChildQueueResource(); |
| |
| if (maxChild != null) { |
| child.setMaxShare(maxChild); |
| } |
| } |
| } |
| |
| /** |
| * Make way for the given queue if possible, by removing incompatible |
| * queues with no apps in them. Incompatibility could be due to |
| * (1) queueToCreate being currently a parent but needs to change to leaf |
| * (2) queueToCreate being currently a leaf but needs to change to parent |
| * (3) an existing leaf queue in the ancestry of queueToCreate. |
| * |
| * We will never remove the root queue or the default queue in this way. |
| * |
| * @return Optional.of(Boolean.TRUE) if there was an incompatible queue that |
| * has been removed, |
| * Optional.of(Boolean.FALSE) if there was an incompatible queue that |
| * have not be removed, |
| * Optional.empty() if there is no incompatible queue. |
| */ |
| private Optional<Boolean> removeEmptyIncompatibleQueues(String queueToCreate, |
| FSQueueType queueType) { |
| queueToCreate = ensureRootPrefix(queueToCreate); |
| |
| // Ensure queueToCreate is not root and doesn't |
| // have the default queue in its ancestry. |
| if (queueToCreate.equals(ROOT_QUEUE) || |
| queueToCreate.startsWith( |
| ROOT_QUEUE + "." + YarnConfiguration.DEFAULT_QUEUE_NAME + ".")) { |
| return Optional.empty(); |
| } |
| |
| FSQueue queue = queues.get(queueToCreate); |
| // Queue exists already. |
| if (queue != null) { |
| if (queue instanceof FSLeafQueue) { |
| if (queueType == FSQueueType.LEAF) { |
| return Optional.empty(); |
| } |
| // remove incompatibility since queue is a leaf currently |
| // needs to change to a parent. |
| return Optional.of(removeQueueIfEmpty(queue)); |
| } else { |
| if (queueType == FSQueueType.PARENT) { |
| return Optional.empty(); |
| } |
| // If it's an existing parent queue and needs to change to leaf, |
| // remove it if it's empty. |
| return Optional.of(removeQueueIfEmpty(queue)); |
| } |
| } |
| |
| // Queue doesn't exist already. Check if the new queue would be created |
| // under an existing leaf queue. If so, try removing that leaf queue. |
| int sepIndex = queueToCreate.length(); |
| sepIndex = queueToCreate.lastIndexOf('.', sepIndex-1); |
| while (sepIndex != -1) { |
| String prefixString = queueToCreate.substring(0, sepIndex); |
| FSQueue prefixQueue = queues.get(prefixString); |
| if (prefixQueue != null && prefixQueue instanceof FSLeafQueue) { |
| return Optional.of(removeQueueIfEmpty(prefixQueue)); |
| } |
| sepIndex = queueToCreate.lastIndexOf('.', sepIndex-1); |
| } |
| return Optional.empty(); |
| } |
| |
| /** |
| * Removes all empty dynamic queues (including empty dynamic parent queues). |
| */ |
| public void removeEmptyDynamicQueues() { |
| synchronized (queues) { |
| Set<FSParentQueue> parentQueuesToCheck = new HashSet<>(); |
| for (FSQueue queue : getQueues()) { |
| if (queue.isDynamic() && queue.getChildQueues().isEmpty()) { |
| boolean removed = removeQueueIfEmpty(queue); |
| if (removed && queue.getParent().isDynamic()) { |
| parentQueuesToCheck.add(queue.getParent()); |
| } |
| } |
| } |
| while (!parentQueuesToCheck.isEmpty()) { |
| FSParentQueue queue = parentQueuesToCheck.iterator().next(); |
| if (queue.isEmpty()) { |
| removeQueue(queue); |
| if (queue.getParent().isDynamic()) { |
| parentQueuesToCheck.add(queue.getParent()); |
| } |
| } |
| parentQueuesToCheck.remove(queue); |
| } |
| } |
| } |
| |
| /** |
| * Re-checking incompatible queues that could not be removed earlier due to |
| * not being empty, and removing those that became empty. |
| */ |
| public void removePendingIncompatibleQueues() { |
| synchronized (queues) { |
| for (IncompatibleQueueRemovalTask removalTask : |
| ImmutableSet.copyOf(incompatibleQueuesPendingRemoval)) { |
| removalTask.execute(); |
| } |
| } |
| } |
| |
| /** |
| * Remove the queue if it and its descendents are all empty. |
| * @param queue |
| * @return true if removed, false otherwise |
| */ |
| private boolean removeQueueIfEmpty(FSQueue queue) { |
| if (queue.isEmpty()) { |
| removeQueue(queue); |
| return true; |
| } |
| return false; |
| } |
| |
| /** |
| * Remove a queue and all its descendents. |
| */ |
| private void removeQueue(FSQueue queue) { |
| synchronized (queues) { |
| if (queue instanceof FSLeafQueue) { |
| leafQueues.remove(queue); |
| } else { |
| for (FSQueue childQueue:queue.getChildQueues()) { |
| removeQueue(childQueue); |
| } |
| } |
| queues.remove(queue.getName()); |
| FSParentQueue parent = queue.getParent(); |
| parent.removeChildQueue(queue); |
| } |
| } |
| |
| /** |
| * Gets a queue by name. |
| */ |
| public FSQueue getQueue(String name) { |
| name = ensureRootPrefix(name); |
| synchronized (queues) { |
| return queues.get(name); |
| } |
| } |
| |
| /** |
| * Return whether a queue exists already. |
| */ |
| public boolean exists(String name) { |
| name = ensureRootPrefix(name); |
| synchronized (queues) { |
| return queues.containsKey(name); |
| } |
| } |
| |
| /** |
| * Get a collection of all leaf queues. |
| */ |
| public Collection<FSLeafQueue> getLeafQueues() { |
| synchronized (queues) { |
| return leafQueues; |
| } |
| } |
| |
| /** |
| * Get a collection of all queues. |
| */ |
| public Collection<FSQueue> getQueues() { |
| synchronized (queues) { |
| return ImmutableList.copyOf(queues.values()); |
| } |
| } |
| |
| private static String ensureRootPrefix(String name) { |
| if (!name.startsWith(ROOT_QUEUE + ".") && !name.equals(ROOT_QUEUE)) { |
| name = ROOT_QUEUE + "." + name; |
| } |
| return name; |
| } |
| |
| public void updateAllocationConfiguration(AllocationConfiguration queueConf) { |
| // Create leaf queues and the parent queues in a leaf's |
| // ancestry if they do not exist |
| synchronized (queues) { |
| // Verify and set scheduling policies for existing queues before creating |
| // any queue, since we need parent policies to determine if we can create |
| // its children. |
| if (!rootQueue.verifyAndSetPolicyFromConf(queueConf)) { |
| LOG.error("Setting scheduling policies for existing queues failed!"); |
| } |
| |
| ensureQueueExistsAndIsCompatibleAndIsStatic(queueConf, FSQueueType.LEAF); |
| |
| // At this point all leaves and 'parents with |
| // at least one child' would have been created. |
| // Now create parents with no configured leaf. |
| ensureQueueExistsAndIsCompatibleAndIsStatic(queueConf, |
| FSQueueType.PARENT); |
| } |
| |
| // Initialize all queues recursively |
| rootQueue.reinit(true); |
| // Update steady fair shares for all queues |
| rootQueue.recomputeSteadyShares(); |
| } |
| |
| private void ensureQueueExistsAndIsCompatibleAndIsStatic( |
| AllocationConfiguration queueConf, FSQueueType queueType) { |
| for (String name : queueConf.getConfiguredQueues().get(queueType)) { |
| Boolean removed = |
| removeEmptyIncompatibleQueues(name, queueType).orElse(null); |
| if (Boolean.FALSE.equals(removed)) { |
| incompatibleQueuesPendingRemoval.add( |
| new IncompatibleQueueRemovalTask(name, queueType)); |
| } else { |
| FSQueue queue = getQueue(name, true, queueType, false, null); |
| if (queue != null) { |
| queue.setDynamic(false); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Setting a set of queues to dynamic. |
| * @param queueNames The names of the queues to be set to dynamic |
| */ |
| protected void setQueuesToDynamic(Set<String> queueNames) { |
| synchronized (queues) { |
| for (String queueName : queueNames) { |
| queues.get(queueName).setDynamic(true); |
| } |
| } |
| } |
| |
| /** |
| * Check whether queue name is valid, |
| * return true if it is valid, otherwise return false. |
| */ |
| @VisibleForTesting |
| boolean isQueueNameValid(String node) { |
| // use the same white space trim as in QueueMetrics() otherwise things fail |
| // This needs to trim additional Unicode whitespace characters beyond what |
| // the built-in JDK methods consider whitespace. See YARN-5272. |
| return !node.isEmpty() && |
| node.equals(FairSchedulerUtilities.trimQueueName(node)); |
| } |
| } |