blob: 88fae00f1b4595ed0ee6b90622ff38633d8305d5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue.CapacityConfigType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.queuemanagement.GuaranteedOrZeroCapacityOverTimePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica
.FiCaSchedulerApp;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* Auto Creation enabled Parent queue. This queue initially does not have any
* children to start with and all child
* leaf queues will be auto created. Currently this does not allow other
* pre-configured leaf or parent queues to
* co-exist along with auto-created leaf queues. The auto creation is limited
* to leaf queues currently.
*/
public class ManagedParentQueue extends AbstractManagedParentQueue {
private boolean shouldFailAutoCreationWhenGuaranteedCapacityExceeded = false;
private static final Logger LOG = LoggerFactory.getLogger(
ManagedParentQueue.class);
public ManagedParentQueue(final CapacitySchedulerContext cs,
final String queueName, final CSQueue parent, final CSQueue old)
throws IOException {
super(cs, queueName, parent, old);
shouldFailAutoCreationWhenGuaranteedCapacityExceeded =
csContext.getConfiguration()
.getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded(
getQueuePath());
leafQueueTemplate = initializeLeafQueueConfigs().build();
LOG.info(
"Created Managed Parent Queue: [{}] with capacity: [{}]"
+ " with max capacity: [{}]",
queueName, super.getCapacity(), super.getMaximumCapacity());
initializeQueueManagementPolicy();
}
@Override
public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
throws IOException {
writeLock.lock();
try {
validate(newlyParsedQueue);
shouldFailAutoCreationWhenGuaranteedCapacityExceeded =
csContext.getConfiguration()
.getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded(
getQueuePath());
//validate if capacity is exceeded for child queues
if (shouldFailAutoCreationWhenGuaranteedCapacityExceeded) {
float childCap = sumOfChildCapacities();
if (getCapacity() < childCap) {
throw new IOException(
"Total of Auto Created leaf queues guaranteed capacity : "
+ childCap + " exceeds Parent queue's " + getQueuePath()
+ " guaranteed capacity " + getCapacity() + ""
+ ".Cannot enforce policy to auto"
+ " create queues beyond parent queue's capacity");
}
}
leafQueueTemplate = initializeLeafQueueConfigs().build();
super.reinitialize(newlyParsedQueue, clusterResource);
// run reinitialize on each existing queue, to trigger absolute cap
// recomputations
for (CSQueue res : this.getChildQueues()) {
res.reinitialize(res, clusterResource);
}
//clear state in policy
reinitializeQueueManagementPolicy();
//reassign capacities according to policy
final List<QueueManagementChange> queueManagementChanges =
queueManagementPolicy.computeQueueManagementChanges();
validateAndApplyQueueManagementChanges(queueManagementChanges);
LOG.info(
"Reinitialized Managed Parent Queue: [{}] with capacity [{}]"
+ " with max capacity [{}]",
queueName, super.getCapacity(), super.getMaximumCapacity());
} catch (YarnException ye) {
LOG.error("Exception while computing policy changes for leaf queue : "
+ getQueuePath(), ye);
throw new IOException(ye);
} finally {
writeLock.unlock();
}
}
private void initializeQueueManagementPolicy() throws IOException {
queueManagementPolicy =
csContext.getConfiguration().getAutoCreatedQueueManagementPolicyClass(
getQueuePath());
queueManagementPolicy.init(csContext, this);
}
private void reinitializeQueueManagementPolicy() throws IOException {
AutoCreatedQueueManagementPolicy managementPolicy =
csContext.getConfiguration().getAutoCreatedQueueManagementPolicyClass(
getQueuePath());
if (!(managementPolicy.getClass().equals(
this.queueManagementPolicy.getClass()))) {
queueManagementPolicy = managementPolicy;
queueManagementPolicy.init(csContext, this);
} else{
queueManagementPolicy.reinitialize(csContext, this);
}
}
protected AutoCreatedLeafQueueConfig.Builder initializeLeafQueueConfigs() throws IOException {
AutoCreatedLeafQueueConfig.Builder builder =
new AutoCreatedLeafQueueConfig.Builder();
String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix(
csContext.getConfiguration());
//Load template configuration
CapacitySchedulerConfiguration conf =
super.initializeLeafQueueConfigs(leafQueueTemplateConfPrefix);
builder.configuration(conf);
for (String nodeLabel : conf
.getConfiguredNodeLabels(csContext.getConfiguration()
.getAutoCreatedQueueTemplateConfPrefix(getQueuePath()))) {
Resource templateMinResource = conf.getMinimumResourceRequirement(
nodeLabel, csContext.getConfiguration()
.getAutoCreatedQueueTemplateConfPrefix(getQueuePath()),
resourceTypes);
if (this.capacityConfigType.equals(CapacityConfigType.PERCENTAGE)
&& !templateMinResource.equals(Resources.none())) {
throw new IOException("Managed Parent Queue " + this.getQueuePath()
+ " config type is different from leaf queue template config type");
}
}
//Load template capacities
QueueCapacities queueCapacities = new QueueCapacities(false);
CSQueueUtils.loadCapacitiesByLabelsFromConf(csContext.getConfiguration()
.getAutoCreatedQueueTemplateConfPrefix(getQueuePath()),
queueCapacities,
csContext.getConfiguration());
/**
* Populate leaf queue template (of Parent resources configured in
* ABSOLUTE_RESOURCE) capacities with actual values for which configured has
* been defined in ABSOLUTE_RESOURCE format.
*
*/
if (this.capacityConfigType.equals(CapacityConfigType.ABSOLUTE_RESOURCE)) {
updateQueueCapacities(queueCapacities);
}
builder.capacities(queueCapacities);
return builder;
}
private void updateQueueCapacities(QueueCapacities queueCapacities) {
for (String label : queueCapacities.getExistingNodeLabels()) {
queueCapacities.setCapacity(label,
this.csContext.getResourceCalculator().divide(
this.csContext.getClusterResource(),
this.csContext.getConfiguration().getMinimumResourceRequirement(
label,
this.csContext.getConfiguration()
.getAutoCreatedQueueTemplateConfPrefix(getQueuePath()),
resourceTypes),
getQueueResourceQuotas().getConfiguredMinResource(label)));
Resource childMaxResource = this.csContext.getConfiguration()
.getMaximumResourceRequirement(label,
this.csContext.getConfiguration()
.getAutoCreatedQueueTemplateConfPrefix(getQueuePath()),
resourceTypes);
Resource parentMaxRes = getQueueResourceQuotas()
.getConfiguredMaxResource(label);
Resource effMaxResource = Resources.min(
this.csContext.getResourceCalculator(),
this.csContext.getClusterResource(),
childMaxResource.equals(Resources.none()) ? parentMaxRes
: childMaxResource,
parentMaxRes);
queueCapacities.setMaximumCapacity(
label, this.csContext.getResourceCalculator().divide(
this.csContext.getClusterResource(),
effMaxResource,
getQueueResourceQuotas().getConfiguredMaxResource(label)));
queueCapacities.setAbsoluteCapacity(
label, queueCapacities.getCapacity(label)
* getQueueCapacities().getAbsoluteCapacity(label));
queueCapacities.setAbsoluteMaximumCapacity(label,
queueCapacities.getMaximumCapacity(label)
* getQueueCapacities().getAbsoluteMaximumCapacity(label));
}
}
protected void validate(final CSQueue newlyParsedQueue) throws IOException {
// Sanity check
if (!(newlyParsedQueue instanceof ManagedParentQueue) || !newlyParsedQueue
.getQueuePath().equals(getQueuePath())) {
throw new IOException(
"Trying to reinitialize " + getQueuePath() + " from "
+ newlyParsedQueue.getQueuePath());
}
}
@Override
public void addChildQueue(CSQueue childQueue)
throws SchedulerDynamicEditException, IOException {
writeLock.lock();
try {
if (childQueue == null || !(childQueue instanceof AutoCreatedLeafQueue)) {
throw new SchedulerDynamicEditException(
"Expected child queue to be an instance of AutoCreatedLeafQueue");
}
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
ManagedParentQueue parentQueue =
(ManagedParentQueue) childQueue.getParent();
if (parentQueue == null) {
throw new SchedulerDynamicEditException(
"Parent Queue is null, should not add child queue!");
}
String leafQueuePath = childQueue.getQueuePath();
int maxQueues = conf.getAutoCreatedQueuesMaxChildQueuesLimit(
parentQueue.getQueuePath());
if (parentQueue.getChildQueues().size() >= maxQueues) {
throw new SchedulerDynamicEditException(
"Cannot auto create leaf queue " + leafQueuePath + ".Max Child "
+ "Queue limit exceeded which is configured as : " + maxQueues
+ " and number of child queues is : " + parentQueue
.getChildQueues().size());
}
if (shouldFailAutoCreationWhenGuaranteedCapacityExceeded) {
if (getLeafQueueTemplate().getQueueCapacities().getAbsoluteCapacity()
+ parentQueue.sumOfChildAbsCapacities() > parentQueue
.getAbsoluteCapacity()) {
throw new SchedulerDynamicEditException(
"Cannot auto create leaf queue " + leafQueuePath + ". Child "
+ "queues capacities have reached parent queue : "
+ parentQueue.getQueuePath() + "'s guaranteed capacity");
}
}
((GuaranteedOrZeroCapacityOverTimePolicy) queueManagementPolicy)
.updateTemplateAbsoluteCapacities(parentQueue.getQueueCapacities());
AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) childQueue;
super.addChildQueue(leafQueue);
/* Below is to avoid Setting Queue Capacity to NaN when ClusterResource
is zero during RM Startup with DominantResourceCalculator */
if (this.capacityConfigType.equals(
CapacityConfigType.ABSOLUTE_RESOURCE)) {
QueueCapacities queueCapacities =
getLeafQueueTemplate().getQueueCapacities();
updateQueueCapacities(queueCapacities);
}
final AutoCreatedLeafQueueConfig initialLeafQueueTemplate =
queueManagementPolicy.getInitialLeafQueueConfiguration(leafQueue);
leafQueue.reinitializeFromTemplate(initialLeafQueueTemplate);
// Do one update cluster resource call to make sure all absolute resources
// effective resources are updated.
updateClusterResource(this.csContext.getClusterResource(),
new ResourceLimits(this.csContext.getClusterResource()));
} finally {
writeLock.unlock();
}
}
public List<FiCaSchedulerApp> getScheduleableApplications() {
readLock.lock();
try {
List<FiCaSchedulerApp> apps = new ArrayList<>();
for (CSQueue childQueue : getChildQueues()) {
apps.addAll(((LeafQueue) childQueue).getApplications());
}
return Collections.unmodifiableList(apps);
} finally {
readLock.unlock();
}
}
public List<FiCaSchedulerApp> getPendingApplications() {
readLock.lock();
try {
List<FiCaSchedulerApp> apps = new ArrayList<>();
for (CSQueue childQueue : getChildQueues()) {
apps.addAll(((LeafQueue) childQueue).getPendingApplications());
}
return Collections.unmodifiableList(apps);
} finally {
readLock.unlock();
}
}
public List<FiCaSchedulerApp> getAllApplications() {
readLock.lock();
try {
List<FiCaSchedulerApp> apps = new ArrayList<>();
for (CSQueue childQueue : getChildQueues()) {
apps.addAll(((LeafQueue) childQueue).getAllApplications());
}
return Collections.unmodifiableList(apps);
} finally {
readLock.unlock();
}
}
public String getLeafQueueConfigPrefix(CapacitySchedulerConfiguration conf) {
return CapacitySchedulerConfiguration.PREFIX + conf
.getAutoCreatedQueueTemplateConfPrefix(getQueuePath());
}
public boolean shouldFailAutoCreationWhenGuaranteedCapacityExceeded() {
return shouldFailAutoCreationWhenGuaranteedCapacityExceeded;
}
/**
* Asynchronously called from scheduler to apply queue management changes
*
* @param queueManagementChanges
*/
public void validateAndApplyQueueManagementChanges(
List<QueueManagementChange> queueManagementChanges)
throws IOException, SchedulerDynamicEditException {
writeLock.lock();
try {
validateQueueManagementChanges(queueManagementChanges);
applyQueueManagementChanges(queueManagementChanges);
AutoCreatedQueueManagementPolicy policy =
getAutoCreatedQueueManagementPolicy();
//acquires write lock on policy
policy.commitQueueManagementChanges(queueManagementChanges);
} finally {
writeLock.unlock();
}
}
public void validateQueueManagementChanges(
List<QueueManagementChange> queueManagementChanges)
throws SchedulerDynamicEditException {
for (QueueManagementChange queueManagementChange : queueManagementChanges) {
CSQueue childQueue = queueManagementChange.getQueue();
if (!(childQueue instanceof AutoCreatedLeafQueue)) {
throw new SchedulerDynamicEditException(
"queue should be " + "AutoCreatedLeafQueue. Found " + childQueue
.getClass());
}
if (!(AbstractManagedParentQueue.class.
isAssignableFrom(childQueue.getParent().getClass()))) {
LOG.error("Queue " + getQueuePath()
+ " is not an instance of PlanQueue or ManagedParentQueue." + " "
+ "Ignoring update " + queueManagementChanges);
throw new SchedulerDynamicEditException(
"Queue " + getQueuePath() + " is not a AutoEnabledParentQueue."
+ " Ignoring update " + queueManagementChanges);
}
switch (queueManagementChange.getQueueAction()){
case UPDATE_QUEUE:
AutoCreatedLeafQueueConfig template =
queueManagementChange.getUpdatedQueueTemplate();
((AutoCreatedLeafQueue) childQueue).validateConfigurations(template);
break;
}
}
}
private void applyQueueManagementChanges(
List<QueueManagementChange> queueManagementChanges)
throws SchedulerDynamicEditException, IOException {
for (QueueManagementChange queueManagementChange : queueManagementChanges) {
switch (queueManagementChange.getQueueAction()){
case UPDATE_QUEUE:
AutoCreatedLeafQueue childQueueToBeUpdated =
(AutoCreatedLeafQueue) queueManagementChange.getQueue();
//acquires write lock on leaf queue
childQueueToBeUpdated.reinitializeFromTemplate(
queueManagementChange.getUpdatedQueueTemplate());
break;
}
}
}
public CapacitySchedulerConfiguration getLeafQueueConfigs(
String leafQueueName) {
return getLeafQueueConfigs(getLeafQueueTemplate().getLeafQueueConfigs(),
leafQueueName);
}
public CapacitySchedulerConfiguration getLeafQueueConfigs(
CapacitySchedulerConfiguration templateConfig, String leafQueueName) {
CapacitySchedulerConfiguration leafQueueConfigTemplate = new
CapacitySchedulerConfiguration(new Configuration(false), false);
for (final Iterator<Map.Entry<String, String>> iterator =
templateConfig.iterator(); iterator.hasNext(); ) {
Map.Entry<String, String> confKeyValuePair = iterator.next();
final String name = confKeyValuePair.getKey().replaceFirst(
CapacitySchedulerConfiguration
.AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX,
leafQueueName);
leafQueueConfigTemplate.set(name, confKeyValuePair.getValue());
}
return leafQueueConfigTemplate;
}
}