| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; |
| |
| import java.util.Set; |
| |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; |
| import org.apache.hadoop.yarn.server.utils.Lock; |
| import org.apache.hadoop.yarn.util.resource.ResourceCalculator; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| |
| import org.apache.hadoop.thirdparty.com.google.common.collect.Sets; |
| |
| public class CSQueueUtils { |
| |
| public final static float EPSILON = 0.0001f; |
| |
| /* |
| * Used only by tests |
| */ |
| public static void checkMaxCapacity(String queuePath, |
| float capacity, float maximumCapacity) { |
| if (maximumCapacity < 0.0f || maximumCapacity > 1.0f) { |
| throw new IllegalArgumentException( |
| "Illegal value of maximumCapacity " + maximumCapacity + |
| " used in call to setMaxCapacity for queue " + queuePath); |
| } |
| } |
| |
| /* |
| * Used only by tests |
| */ |
| public static void checkAbsoluteCapacity(String queuePath, |
| float absCapacity, float absMaxCapacity) { |
| if (absMaxCapacity < (absCapacity - EPSILON)) { |
| throw new IllegalArgumentException("Illegal call to setMaxCapacity. " |
| + "Queue '" + queuePath + "' has " |
| + "an absolute capacity (" + absCapacity |
| + ") greater than its absolute maximumCapacity (" + absMaxCapacity |
| + ")"); |
| } |
| } |
| |
| public static float computeAbsoluteMaximumCapacity( |
| float maximumCapacity, CSQueue parent) { |
| float parentAbsMaxCapacity = |
| (parent == null) ? 1.0f : parent.getAbsoluteMaximumCapacity(); |
| return (parentAbsMaxCapacity * maximumCapacity); |
| } |
| |
| public static void loadCapacitiesByLabelsFromConf(String queuePath, |
| QueueCapacities queueCapacities, CapacitySchedulerConfiguration csConf) { |
| queueCapacities.clearConfigurableFields(); |
| Set<String> configuredNodelabels = |
| csConf.getConfiguredNodeLabels(queuePath); |
| |
| for (String label : configuredNodelabels) { |
| if (label.equals(CommonNodeLabelsManager.NO_LABEL)) { |
| queueCapacities.setCapacity(label, |
| csConf.getNonLabeledQueueCapacity(queuePath) / 100); |
| queueCapacities.setMaximumCapacity(label, |
| csConf.getNonLabeledQueueMaximumCapacity(queuePath) / 100); |
| queueCapacities.setMaxAMResourcePercentage( |
| label, |
| csConf.getMaximumAMResourcePercentPerPartition(queuePath, label)); |
| queueCapacities.setWeight(label, |
| csConf.getNonLabeledQueueWeight(queuePath)); |
| } else{ |
| queueCapacities.setCapacity(label, |
| csConf.getLabeledQueueCapacity(queuePath, label) / 100); |
| queueCapacities.setMaximumCapacity(label, |
| csConf.getLabeledQueueMaximumCapacity(queuePath, label) / 100); |
| queueCapacities.setMaxAMResourcePercentage(label, |
| csConf.getMaximumAMResourcePercentPerPartition(queuePath, label)); |
| queueCapacities.setWeight(label, |
| csConf.getLabeledQueueWeight(queuePath, label)); |
| } |
| |
| /*float absCapacity = queueCapacities.getCapacity(label); |
| float absMaxCapacity = queueCapacities.getMaximumCapacity(label); |
| if (absCapacity > absMaxCapacity) { |
| throw new IllegalArgumentException("Illegal queue capacity setting " |
| + "(abs-capacity=" + absCapacity + ") > (abs-maximum-capacity=" |
| + absMaxCapacity + ") for queue=[" |
| + queuePath + "],label=[" + label + "]"); |
| }*/ |
| } |
| } |
| |
| /** |
| * Update partitioned resource usage, if nodePartition == null, will update |
| * used resource for all partitions of this queue. |
| */ |
| public static void updateUsedCapacity(final ResourceCalculator rc, |
| final Resource totalPartitionResource, String nodePartition, |
| AbstractCSQueue childQueue) { |
| QueueCapacities queueCapacities = childQueue.getQueueCapacities(); |
| CSQueueMetrics queueMetrics = childQueue.getMetrics(); |
| ResourceUsage queueResourceUsage = childQueue.getQueueResourceUsage(); |
| Resource minimumAllocation = childQueue.getMinimumAllocation(); |
| float absoluteUsedCapacity = 0.0f; |
| float usedCapacity = 0.0f; |
| float reservedCapacity = 0.0f; |
| float absoluteReservedCapacity = 0.0f; |
| |
| if (Resources.greaterThan(rc, totalPartitionResource, |
| totalPartitionResource, Resources.none())) { |
| |
| Resource queueGuaranteedResource = childQueue |
| .getEffectiveCapacity(nodePartition); |
| |
| //TODO : Modify below code to support Absolute Resource configurations |
| // (YARN-5881) for AutoCreatedLeafQueue |
| if (Float.compare(queueCapacities.getAbsoluteCapacity |
| (nodePartition), 0f) == 0 |
| && childQueue instanceof AutoCreatedLeafQueue) { |
| |
| //If absolute capacity is 0 for a leaf queue (could be a managed leaf |
| // queue, then use the leaf queue's template capacity to compute |
| // guaranteed resource for used capacity) |
| |
| // queueGuaranteed = totalPartitionedResource * |
| // absolute_capacity(partition) |
| ManagedParentQueue parentQueue = (ManagedParentQueue) |
| childQueue.getParent(); |
| QueueCapacities leafQueueTemplateCapacities = parentQueue |
| .getLeafQueueTemplate() |
| .getQueueCapacities(); |
| queueGuaranteedResource = Resources.multiply(totalPartitionResource, |
| leafQueueTemplateCapacities.getAbsoluteCapacity |
| (nodePartition)); |
| } |
| |
| // make queueGuranteed >= minimum_allocation to avoid divided by 0. |
| queueGuaranteedResource = |
| Resources.max(rc, totalPartitionResource, queueGuaranteedResource, |
| minimumAllocation); |
| |
| Resource usedResource = queueResourceUsage.getUsed(nodePartition); |
| absoluteUsedCapacity = |
| Resources.divide(rc, totalPartitionResource, usedResource, |
| totalPartitionResource); |
| usedCapacity = |
| Resources.divide(rc, totalPartitionResource, usedResource, |
| queueGuaranteedResource); |
| |
| Resource resResource = queueResourceUsage.getReserved(nodePartition); |
| reservedCapacity = |
| Resources.divide(rc, totalPartitionResource, resResource, |
| queueGuaranteedResource); |
| absoluteReservedCapacity = |
| Resources.divide(rc, totalPartitionResource, resResource, |
| totalPartitionResource); |
| } |
| |
| queueCapacities |
| .setAbsoluteUsedCapacity(nodePartition, absoluteUsedCapacity); |
| queueCapacities.setUsedCapacity(nodePartition, usedCapacity); |
| queueCapacities.setReservedCapacity(nodePartition, reservedCapacity); |
| queueCapacities |
| .setAbsoluteReservedCapacity(nodePartition, absoluteReservedCapacity); |
| |
| // QueueMetrics does not support per-label capacities, |
| // so we report values only for the default partition. |
| |
| queueMetrics.setUsedCapacity(nodePartition, |
| queueCapacities.getUsedCapacity(RMNodeLabelsManager.NO_LABEL)); |
| queueMetrics.setAbsoluteUsedCapacity(nodePartition, |
| queueCapacities.getAbsoluteUsedCapacity( |
| RMNodeLabelsManager.NO_LABEL)); |
| |
| } |
| |
| private static Resource getMaxAvailableResourceToQueuePartition( |
| final ResourceCalculator rc, CSQueue queue, |
| Resource cluster, String partition) { |
| // Calculate guaranteed resource for a label in a queue by below logic. |
| // (total label resource) * (absolute capacity of label in that queue) |
| Resource queueGuaranteedResource = queue.getEffectiveCapacity(partition); |
| |
| // Available resource in queue for a specific label will be calculated as |
| // {(guaranteed resource for a label in a queue) - |
| // (resource usage of that label in the queue)} |
| Resource available = (Resources.greaterThan(rc, cluster, |
| queueGuaranteedResource, |
| queue.getQueueResourceUsage().getUsed(partition))) ? Resources |
| .componentwiseMax(Resources.subtractFrom(queueGuaranteedResource, |
| queue.getQueueResourceUsage().getUsed(partition)), Resources |
| .none()) : Resources.none(); |
| |
| return available; |
| } |
| |
| /** |
| * <p> |
| * Update Queue Statistics: |
| * </p> |
| * |
| * <ul> |
| * <li>used-capacity/absolute-used-capacity by partition</li> |
| * <li>non-partitioned max-avail-resource to queue</li> |
| * </ul> |
| * |
| * <p> |
| * When nodePartition is null, all partition of |
| * used-capacity/absolute-used-capacity will be updated. |
| * </p> |
| */ |
| @Lock(CSQueue.class) |
| public static void updateQueueStatistics( |
| final ResourceCalculator rc, final Resource cluster, |
| final AbstractCSQueue childQueue, final RMNodeLabelsManager nlm, |
| final String nodePartition) { |
| QueueCapacities queueCapacities = childQueue.getQueueCapacities(); |
| ResourceUsage queueResourceUsage = childQueue.getQueueResourceUsage(); |
| |
| if (nodePartition == null) { |
| for (String partition : Sets.union(queueCapacities.getNodePartitionsSet(), |
| queueResourceUsage.getNodePartitionsSet())) { |
| updateUsedCapacity(rc, nlm.getResourceByLabel(partition, cluster), |
| partition, childQueue); |
| |
| // Update queue metrics w.r.t node labels. |
| // In QueueMetrics, null label is handled the same as NO_LABEL. |
| // This is because queue metrics for partitions are not tracked. |
| // In the future, will have to change this when/if queue metrics |
| // for partitions also get tracked. |
| childQueue.getMetrics().setAvailableResourcesToQueue( |
| partition, |
| getMaxAvailableResourceToQueuePartition(rc, childQueue, |
| cluster, partition)); |
| } |
| } else { |
| updateUsedCapacity(rc, nlm.getResourceByLabel(nodePartition, cluster), |
| nodePartition, childQueue); |
| |
| // Same as above. |
| childQueue.getMetrics().setAvailableResourcesToQueue( |
| nodePartition, |
| getMaxAvailableResourceToQueuePartition(rc, childQueue, |
| cluster, nodePartition)); |
| } |
| } |
| |
| /** |
| * Updated configured capacity/max-capacity for queue. |
| * @param rc resource calculator |
| * @param partitionResource total cluster resources for this partition |
| * @param partition partition being updated |
| * @param queue queue |
| */ |
| public static void updateConfiguredCapacityMetrics(ResourceCalculator rc, |
| Resource partitionResource, String partition, AbstractCSQueue queue) { |
| queue.getMetrics().setGuaranteedResources(partition, rc.multiplyAndNormalizeDown( |
| partitionResource, queue.getQueueCapacities().getAbsoluteCapacity(partition), |
| queue.getMinimumAllocation())); |
| queue.getMetrics().setMaxCapacityResources(partition, rc.multiplyAndNormalizeDown( |
| partitionResource, queue.getQueueCapacities().getAbsoluteMaximumCapacity(partition), |
| queue.getMinimumAllocation())); |
| queue.getMetrics().setGuaranteedCapacities(partition, |
| queue.getQueueCapacities().getCapacity(partition), |
| queue.getQueueCapacities().getAbsoluteCapacity(partition)); |
| queue.getMetrics().setMaxCapacities(partition, |
| queue.getQueueCapacities().getMaximumCapacity(partition), |
| queue.getQueueCapacities().getAbsoluteMaximumCapacity(partition)); |
| } |
| |
| public static void updateAbsoluteCapacitiesByNodeLabels(QueueCapacities queueCapacities, |
| QueueCapacities parentQueueCapacities, |
| Set<String> nodeLabels) { |
| for (String label : nodeLabels) { |
| // Weight will be normalized to queue.weight = |
| // queue.weight(sum({sibling-queues.weight})) |
| // When weight is set, capacity will be set to 0; |
| // When capacity is set, weight will be normalized to 0, |
| // So get larger from normalized_weight and capacity will make sure we do |
| // calculation correct |
| float capacity = Math.max( |
| queueCapacities.getCapacity(label), |
| queueCapacities |
| .getNormalizedWeight(label)); |
| if (capacity > 0f) { |
| queueCapacities.setAbsoluteCapacity(label, capacity * ( |
| parentQueueCapacities == null ? 1 : |
| parentQueueCapacities.getAbsoluteCapacity(label))); |
| } |
| |
| float maxCapacity = queueCapacities |
| .getMaximumCapacity(label); |
| if (maxCapacity > 0f) { |
| queueCapacities.setAbsoluteMaximumCapacity(label, maxCapacity * ( |
| parentQueueCapacities == null ? 1 : |
| parentQueueCapacities.getAbsoluteMaximumCapacity(label))); |
| } |
| } |
| } |
| |
| public static ApplicationPlacementContext extractQueuePath(String queuePath) { |
| int parentQueueNameEndIndex = queuePath.lastIndexOf("."); |
| if (parentQueueNameEndIndex > -1) { |
| String parent = queuePath.substring(0, parentQueueNameEndIndex).trim(); |
| String leaf = queuePath.substring(parentQueueNameEndIndex + 1).trim(); |
| return new ApplicationPlacementContext(leaf, parent); |
| } else{ |
| return new ApplicationPlacementContext(queuePath); |
| } |
| } |
| } |