blob: 48c289f0cde744927aeca10e33607e07ec533da1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.Permission;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueueManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
import com.google.common.annotations.VisibleForTesting;
/**
*
* Context of the Queues in Capacity Scheduler.
*
*/
@Private
@Unstable
public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
CSQueue, CapacitySchedulerConfiguration>{
private static final Log LOG = LogFactory.getLog(
CapacitySchedulerQueueManager.class);
static final Comparator<CSQueue> NON_PARTITIONED_QUEUE_COMPARATOR =
new Comparator<CSQueue>() {
@Override
public int compare(CSQueue q1, CSQueue q2) {
int result = Float.compare(q1.getUsedCapacity(),
q2.getUsedCapacity());
if (result < 0) {
return -1;
} else if (result > 0) {
return 1;
}
return q1.getQueuePath().compareTo(q2.getQueuePath());
}
};
static class QueueHook {
public CSQueue hook(CSQueue queue) {
return queue;
}
}
private static final QueueHook NOOP = new QueueHook();
private CapacitySchedulerContext csContext;
private final YarnAuthorizationProvider authorizer;
private final Map<String, CSQueue> queues = new ConcurrentHashMap<>();
private CSQueue root;
private final RMNodeLabelsManager labelManager;
private AppPriorityACLsManager appPriorityACLManager;
private QueueStateManager<CSQueue, CapacitySchedulerConfiguration>
queueStateManager;
/**
* Construct the service.
* @param conf the configuration
* @param labelManager the labelManager
* @param appPriorityACLManager App priority ACL manager
*/
public CapacitySchedulerQueueManager(Configuration conf,
RMNodeLabelsManager labelManager,
AppPriorityACLsManager appPriorityACLManager) {
this.authorizer = YarnAuthorizationProvider.getInstance(conf);
this.labelManager = labelManager;
this.queueStateManager = new QueueStateManager<>();
this.appPriorityACLManager = appPriorityACLManager;
}
@Override
public CSQueue getRootQueue() {
return this.root;
}
@Override
public Map<String, CSQueue> getQueues() {
return queues;
}
@Override
public void removeQueue(String queueName) {
this.queues.remove(queueName);
}
@Override
public void addQueue(String queueName, CSQueue queue) {
this.queues.put(queueName, queue);
}
@Override
public CSQueue getQueue(String queueName) {
return queues.get(queueName);
}
/**
* Set the CapacitySchedulerContext.
* @param capacitySchedulerContext the CapacitySchedulerContext
*/
public void setCapacitySchedulerContext(
CapacitySchedulerContext capacitySchedulerContext) {
this.csContext = capacitySchedulerContext;
}
/**
* Initialized the queues.
* @param conf the CapacitySchedulerConfiguration
* @throws IOException if fails to initialize queues
*/
public void initializeQueues(CapacitySchedulerConfiguration conf)
throws IOException {
root = parseQueue(this.csContext, conf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues, NOOP);
setQueueAcls(authorizer, appPriorityACLManager, queues);
labelManager.reinitializeQueueLabels(getQueueToLabels());
this.queueStateManager.initialize(this);
LOG.info("Initialized root queue " + root);
}
@Override
public void reinitializeQueues(CapacitySchedulerConfiguration newConf)
throws IOException {
// Parse new queues
Map<String, CSQueue> newQueues = new HashMap<>();
CSQueue newRoot = parseQueue(this.csContext, newConf, null,
CapacitySchedulerConfiguration.ROOT, newQueues, queues, NOOP);
// When failing over, if using configuration store, don't validate queue
// hierarchy since queues can be removed without being STOPPED.
if (!csContext.isConfigurationMutable() ||
csContext.getRMContext().getHAServiceState()
!= HAServiceProtocol.HAServiceState.STANDBY) {
// Ensure queue hiearchy in the new XML file is proper.
validateQueueHierarchy(queues, newQueues);
}
// Add new queues and delete OldQeueus only after validation.
updateQueues(queues, newQueues);
// Re-configure queues
root.reinitialize(newRoot, this.csContext.getClusterResource());
setQueueAcls(authorizer, appPriorityACLManager, queues);
// Re-calculate headroom for active applications
Resource clusterResource = this.csContext.getClusterResource();
root.updateClusterResource(clusterResource, new ResourceLimits(
clusterResource));
labelManager.reinitializeQueueLabels(getQueueToLabels());
this.queueStateManager.initialize(this);
}
/**
* Parse the queue from the configuration.
* @param csContext the CapacitySchedulerContext
* @param conf the CapacitySchedulerConfiguration
* @param parent the parent queue
* @param queueName the queue name
* @param queues all the queues
* @param oldQueues the old queues
* @param hook the queue hook
* @return the CSQueue
* @throws IOException
*/
static CSQueue parseQueue(
CapacitySchedulerContext csContext,
CapacitySchedulerConfiguration conf,
CSQueue parent, String queueName, Map<String, CSQueue> queues,
Map<String, CSQueue> oldQueues,
QueueHook hook) throws IOException {
CSQueue queue;
String fullQueueName =
(parent == null) ? queueName
: (parent.getQueuePath() + "." + queueName);
String[] childQueueNames = conf.getQueues(fullQueueName);
boolean isReservableQueue = conf.isReservable(fullQueueName);
if (childQueueNames == null || childQueueNames.length == 0) {
if (null == parent) {
throw new IllegalStateException(
"Queue configuration missing child queue names for " + queueName);
}
// Check if the queue will be dynamically managed by the Reservation
// system
if (isReservableQueue) {
queue =
new PlanQueue(csContext, queueName, parent,
oldQueues.get(queueName));
//initializing the "internal" default queue, for SLS compatibility
String defReservationId =
queueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
List<CSQueue> childQueues = new ArrayList<>();
ReservationQueue resQueue = new ReservationQueue(csContext,
defReservationId, (PlanQueue) queue);
try {
resQueue.setEntitlement(new QueueEntitlement(1.0f, 1.0f));
} catch (SchedulerDynamicEditException e) {
throw new IllegalStateException(e);
}
childQueues.add(resQueue);
((PlanQueue) queue).setChildQueues(childQueues);
queues.put(defReservationId, resQueue);
} else {
queue =
new LeafQueue(csContext, queueName, parent,
oldQueues.get(queueName));
// Used only for unit tests
queue = hook.hook(queue);
}
} else {
if (isReservableQueue) {
throw new IllegalStateException(
"Only Leaf Queues can be reservable for " + queueName);
}
ParentQueue parentQueue =
new ParentQueue(csContext, queueName, parent,
oldQueues.get(queueName));
// Used only for unit tests
queue = hook.hook(parentQueue);
List<CSQueue> childQueues = new ArrayList<>();
for (String childQueueName : childQueueNames) {
CSQueue childQueue =
parseQueue(csContext, conf, queue, childQueueName,
queues, oldQueues, hook);
childQueues.add(childQueue);
}
parentQueue.setChildQueues(childQueues);
}
if (queue instanceof LeafQueue && queues.containsKey(queueName)
&& queues.get(queueName) instanceof LeafQueue) {
throw new IOException("Two leaf queues were named " + queueName
+ ". Leaf queue names must be distinct");
}
queues.put(queueName, queue);
LOG.info("Initialized queue: " + queue);
return queue;
}
/**
* Ensure all existing queues are present. Queues cannot be deleted if its not
* in Stopped state, Queue's cannot be moved from one hierarchy to other also.
* Previous child queue could be converted into parent queue if it is in
* STOPPED state.
*
* @param queues existing queues
* @param newQueues new queues
*/
private void validateQueueHierarchy(Map<String, CSQueue> queues,
Map<String, CSQueue> newQueues) throws IOException {
// check that all static queues are included in the newQueues list
for (Map.Entry<String, CSQueue> e : queues.entrySet()) {
if (!(e.getValue() instanceof ReservationQueue)) {
String queueName = e.getKey();
CSQueue oldQueue = e.getValue();
CSQueue newQueue = newQueues.get(queueName);
if (null == newQueue) {
// old queue doesn't exist in the new XML
if (oldQueue.getState() == QueueState.STOPPED) {
LOG.info("Deleting Queue " + queueName + ", as it is not"
+ " present in the modified capacity configuration xml");
} else {
throw new IOException(oldQueue.getQueuePath() + " is deleted from"
+ " the new capacity scheduler configuration, but the"
+ " queue is not yet in stopped state. "
+ "Current State : " + oldQueue.getState());
}
} else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) {
//Queue's cannot be moved from one hierarchy to other
throw new IOException(queueName + " is moved from:"
+ oldQueue.getQueuePath() + " to:" + newQueue.getQueuePath()
+ " after refresh, which is not allowed.");
} else if (oldQueue instanceof LeafQueue
&& newQueue instanceof ParentQueue) {
if (oldQueue.getState() == QueueState.STOPPED) {
LOG.info("Converting the leaf queue: " + oldQueue.getQueuePath()
+ " to parent queue.");
} else {
throw new IOException("Can not convert the leaf queue: "
+ oldQueue.getQueuePath() + " to parent queue since "
+ "it is not yet in stopped state. Current State : "
+ oldQueue.getState());
}
} else if (oldQueue instanceof ParentQueue
&& newQueue instanceof LeafQueue) {
LOG.info("Converting the parent queue: " + oldQueue.getQueuePath()
+ " to leaf queue.");
}
}
}
}
/**
* Updates to our list of queues: Adds the new queues and deletes the removed
* ones... be careful, do not overwrite existing queues.
*
* @param existingQueues, the existing queues
* @param newQueues the new queues based on new XML
*/
private void updateQueues(Map<String, CSQueue> existingQueues,
Map<String, CSQueue> newQueues) {
for (Map.Entry<String, CSQueue> e : newQueues.entrySet()) {
String queueName = e.getKey();
CSQueue queue = e.getValue();
if (!existingQueues.containsKey(queueName)) {
existingQueues.put(queueName, queue);
}
}
for (Iterator<Map.Entry<String, CSQueue>> itr = existingQueues.entrySet()
.iterator(); itr.hasNext();) {
Map.Entry<String, CSQueue> e = itr.next();
String queueName = e.getKey();
if (!newQueues.containsKey(queueName)) {
itr.remove();
}
}
}
@VisibleForTesting
/**
* Set the acls for the queues.
* @param authorizer the yarnAuthorizationProvider
* @param queues the queues
* @throws IOException if fails to set queue acls
*/
public static void setQueueAcls(YarnAuthorizationProvider authorizer,
AppPriorityACLsManager appPriorityACLManager, Map<String, CSQueue> queues)
throws IOException {
List<Permission> permissions = new ArrayList<>();
for (CSQueue queue : queues.values()) {
AbstractCSQueue csQueue = (AbstractCSQueue) queue;
permissions.add(
new Permission(csQueue.getPrivilegedEntity(), csQueue.getACLs()));
if (queue instanceof LeafQueue) {
LeafQueue lQueue = (LeafQueue) queue;
// Clear Priority ACLs first since reinitialize also call same.
appPriorityACLManager.clearPriorityACLs(lQueue.getQueueName());
appPriorityACLManager.addPrioirityACLs(lQueue.getPriorityACLs(),
lQueue.getQueueName());
}
}
authorizer.setPermission(permissions,
UserGroupInformation.getCurrentUser());
}
/**
* Check that the String provided in input is the name of an existing,
* LeafQueue, if successful returns the queue.
*
* @param queue the queue name
* @return the LeafQueue
* @throws YarnException if the queue does not exist or the queue
* is not the type of LeafQueue.
*/
public LeafQueue getAndCheckLeafQueue(String queue) throws YarnException {
CSQueue ret = this.getQueue(queue);
if (ret == null) {
throw new YarnException("The specified Queue: " + queue
+ " doesn't exist");
}
if (!(ret instanceof LeafQueue)) {
throw new YarnException("The specified Queue: " + queue
+ " is not a Leaf Queue.");
}
return (LeafQueue) ret;
}
/**
* Get the default priority of the queue.
* @param queueName the queue name
* @return the default priority of the queue
*/
public Priority getDefaultPriorityForQueue(String queueName) {
Queue queue = getQueue(queueName);
if (null == queue || null == queue.getDefaultApplicationPriority()) {
// Return with default application priority
return Priority.newInstance(CapacitySchedulerConfiguration
.DEFAULT_CONFIGURATION_APPLICATION_PRIORITY);
}
return Priority.newInstance(queue.getDefaultApplicationPriority()
.getPriority());
}
/**
* Get a map of queueToLabels.
* @return the map of queueToLabels
*/
private Map<String, Set<String>> getQueueToLabels() {
Map<String, Set<String>> queueToLabels = new HashMap<>();
for (CSQueue queue : getQueues().values()) {
queueToLabels.put(queue.getQueueName(), queue.getAccessibleNodeLabels());
}
return queueToLabels;
}
@Private
public QueueStateManager<CSQueue, CapacitySchedulerConfiguration>
getQueueStateManager() {
return this.queueStateManager;
}
}