| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter; |
| |
| import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX; |
| |
| import java.util.List; |
| import java.util.stream.Collectors; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.ConfigurableResource; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.weightconversion.CapacityConverter; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.weightconversion.CapacityConverterFactory; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| |
| /** |
| * Converts a Fair Schedule queue hierarchy to Capacity Scheduler |
| * configuration. |
| * |
| */ |
| public class FSQueueConverter { |
| public static final float QUEUE_MAX_AM_SHARE_DISABLED = -1.0f; |
| private static final int MAX_RUNNING_APPS_UNSET = Integer.MAX_VALUE; |
| private static final String FAIR_POLICY = "fair"; |
| private static final String FIFO_POLICY = "fifo"; |
| |
| private final FSConfigToCSConfigRuleHandler ruleHandler; |
| private Configuration capacitySchedulerConfig; |
| private final boolean preemptionEnabled; |
| private final boolean sizeBasedWeight; |
| @SuppressWarnings("unused") |
| private final Resource clusterResource; |
| private final float queueMaxAMShareDefault; |
| private final int queueMaxAppsDefault; |
| private final boolean drfUsed; |
| private final boolean usePercentages; |
| |
| private ConversionOptions conversionOptions; |
| |
| public FSQueueConverter(FSQueueConverterBuilder builder) { |
| this.ruleHandler = builder.ruleHandler; |
| this.capacitySchedulerConfig = builder.capacitySchedulerConfig; |
| this.preemptionEnabled = builder.preemptionEnabled; |
| this.sizeBasedWeight = builder.sizeBasedWeight; |
| this.clusterResource = builder.clusterResource; |
| this.queueMaxAMShareDefault = builder.queueMaxAMShareDefault; |
| this.queueMaxAppsDefault = builder.queueMaxAppsDefault; |
| this.conversionOptions = builder.conversionOptions; |
| this.drfUsed = builder.drfUsed; |
| this.usePercentages = builder.usePercentages; |
| } |
| |
| public void convertQueueHierarchy(FSQueue queue) { |
| List<FSQueue> children = queue.getChildQueues(); |
| final String queueName = queue.getName(); |
| |
| emitChildQueues(queueName, children); |
| emitMaxAMShare(queueName, queue); |
| emitMaxParallelApps(queueName, queue); |
| emitMaxAllocations(queueName, queue); |
| emitPreemptionDisabled(queueName, queue); |
| |
| emitChildCapacity(queue); |
| emitMaximumCapacity(queueName, queue); |
| emitSizeBasedWeight(queueName); |
| emitOrderingPolicy(queueName, queue); |
| checkMaxChildCapacitySetting(queue); |
| |
| for (FSQueue childQueue : children) { |
| convertQueueHierarchy(childQueue); |
| } |
| } |
| |
| /** |
| * Generates yarn.scheduler.capacity.<queue-name>.queues. |
| * @param queueName |
| * @param children |
| */ |
| private void emitChildQueues(String queueName, List<FSQueue> children) { |
| ruleHandler.handleChildQueueCount(queueName, children.size()); |
| |
| if (children.size() > 0) { |
| String childQueues = children.stream() |
| .map(child -> getQueueShortName(child.getName())) |
| .collect(Collectors.joining(",")); |
| |
| capacitySchedulerConfig.set(PREFIX + queueName + ".queues", childQueues); |
| } |
| } |
| |
| /** |
| * <maxAMShare> |
| * ==> yarn.scheduler.capacity.<queue-name>.maximum-am-resource-percent. |
| * @param queueName |
| * @param queue |
| */ |
| private void emitMaxAMShare(String queueName, FSQueue queue) { |
| float queueMaxAmShare = queue.getMaxAMShare(); |
| |
| // Direct floating point comparison is OK here |
| if (queueMaxAmShare != 0.0f |
| && queueMaxAmShare != queueMaxAMShareDefault |
| && queueMaxAmShare != QUEUE_MAX_AM_SHARE_DISABLED) { |
| capacitySchedulerConfig.setFloat(PREFIX + queueName + |
| ".maximum-am-resource-percent", queueMaxAmShare); |
| } |
| |
| if (queueMaxAmShare == QUEUE_MAX_AM_SHARE_DISABLED |
| && queueMaxAmShare != queueMaxAMShareDefault) { |
| capacitySchedulerConfig.setFloat(PREFIX + queueName + |
| ".maximum-am-resource-percent", 1.0f); |
| } |
| } |
| |
| /** |
| * <maxRunningApps> |
| * ==> yarn.scheduler.capacity.<queue-name>.max-parallel-apps. |
| * @param queueName |
| * @param queue |
| */ |
| private void emitMaxParallelApps(String queueName, FSQueue queue) { |
| if (queue.getMaxRunningApps() != MAX_RUNNING_APPS_UNSET |
| && queue.getMaxRunningApps() != queueMaxAppsDefault) { |
| capacitySchedulerConfig.set(PREFIX + queueName + ".max-parallel-apps", |
| String.valueOf(queue.getMaxRunningApps())); |
| } |
| } |
| |
| /** |
| * <maxResources> |
| * ==> yarn.scheduler.capacity.<queue-name>.maximum-capacity. |
| * @param queueName |
| * @param queue |
| */ |
| private void emitMaximumCapacity(String queueName, FSQueue queue) { |
| ConfigurableResource rawMaxShare = queue.getRawMaxShare(); |
| final Resource maxResource = rawMaxShare.getResource(); |
| |
| if ((maxResource == null && rawMaxShare.getPercentages() != null) |
| || isNotUnboundedResource(maxResource)) { |
| ruleHandler.handleMaxResources(); |
| } |
| |
| capacitySchedulerConfig.set(PREFIX + queueName + ".maximum-capacity", |
| "100"); |
| } |
| |
| /** |
| * <maxContainerAllocation> |
| * ==> yarn.scheduler.capacity.<queue-name>.maximum-allocation-mb |
| * / vcores. |
| * @param queueName |
| * @param queue |
| */ |
| private void emitMaxAllocations(String queueName, FSQueue queue) { |
| Resource maxAllocation = queue.getMaximumContainerAllocation(); |
| |
| if (isNotUnboundedResource(maxAllocation)) { |
| long parentMaxVcores = Integer.MIN_VALUE; |
| long parentMaxMemory = Integer.MIN_VALUE; |
| |
| if (queue.getParent() != null) { |
| FSQueue parent = queue.getParent(); |
| Resource parentMaxAllocation = parent.getMaximumContainerAllocation(); |
| if (isNotUnboundedResource(parentMaxAllocation)) { |
| parentMaxVcores = parentMaxAllocation.getVirtualCores(); |
| parentMaxMemory = parentMaxAllocation.getMemorySize(); |
| } |
| } |
| |
| long maxVcores = maxAllocation.getVirtualCores(); |
| long maxMemory = maxAllocation.getMemorySize(); |
| |
| // only emit max allocation if it differs from the parent's setting |
| if (maxVcores != parentMaxVcores || maxMemory != parentMaxMemory) { |
| capacitySchedulerConfig.set(PREFIX + queueName + |
| ".maximum-allocation-mb", String.valueOf(maxMemory)); |
| |
| capacitySchedulerConfig.set(PREFIX + queueName + |
| ".maximum-allocation-vcores", String.valueOf(maxVcores)); |
| } |
| } |
| } |
| |
| /** |
| * <allowPreemptionFrom> |
| * ==> yarn.scheduler.capacity.<queue-name>.disable_preemption. |
| * @param queueName |
| * @param queue |
| */ |
| private void emitPreemptionDisabled(String queueName, FSQueue queue) { |
| if (preemptionEnabled && !queue.isPreemptable()) { |
| capacitySchedulerConfig.set(PREFIX + queueName + ".disable_preemption", |
| "true"); |
| } |
| } |
| |
| /** |
| * yarn.scheduler.fair.sizebasedweight ==> |
| * yarn.scheduler.capacity.<queue-path> |
| * .ordering-policy.fair.enable-size-based-weight. |
| * @param queueName |
| */ |
| private void emitSizeBasedWeight(String queueName) { |
| if (sizeBasedWeight) { |
| capacitySchedulerConfig.setBoolean(PREFIX + queueName + |
| ".ordering-policy.fair.enable-size-based-weight", true); |
| } |
| } |
| |
| /** |
| * <schedulingPolicy> |
| * ==> yarn.scheduler.capacity.<queue-path>.ordering-policy. |
| * @param queueName |
| * @param queue |
| */ |
| private void emitOrderingPolicy(String queueName, FSQueue queue) { |
| if (queue instanceof FSLeafQueue) { |
| String policy = queue.getPolicy().getName(); |
| |
| switch (policy) { |
| case DominantResourceFairnessPolicy.NAME: |
| capacitySchedulerConfig.set(PREFIX + queueName |
| + ".ordering-policy", FAIR_POLICY); |
| break; |
| case FairSharePolicy.NAME: |
| capacitySchedulerConfig.set(PREFIX + queueName |
| + ".ordering-policy", FAIR_POLICY); |
| if (drfUsed) { |
| ruleHandler.handleFairAsDrf(queueName); |
| } |
| break; |
| case FifoPolicy.NAME: |
| capacitySchedulerConfig.set(PREFIX + queueName |
| + ".ordering-policy", FIFO_POLICY); |
| break; |
| default: |
| String msg = String.format("Unexpected ordering policy " + |
| "on queue %s: %s", queue, policy); |
| conversionOptions.handleConversionError(msg); |
| } |
| } |
| } |
| |
| /** |
| * weight + minResources |
| * ==> yarn.scheduler.capacity.<queue-name>.capacity. |
| * @param queue |
| */ |
| private void emitChildCapacity(FSQueue queue) { |
| CapacityConverter converter = |
| CapacityConverterFactory.getConverter(usePercentages); |
| |
| converter.convertWeightsForChildQueues(queue, |
| capacitySchedulerConfig); |
| |
| if (Resources.none().compareTo(queue.getMinShare()) != 0) { |
| ruleHandler.handleMinResources(); |
| } |
| } |
| |
| /** |
| * Missing feature, "leaf-queue-template.capacity" only accepts a single |
| * pct value. |
| * @param queue |
| */ |
| private void checkMaxChildCapacitySetting(FSQueue queue) { |
| if (queue.getMaxChildQueueResource() != null) { |
| Resource resource = queue.getMaxChildQueueResource().getResource(); |
| |
| if ((resource != null && isNotUnboundedResource(resource)) |
| || queue.getMaxChildQueueResource().getPercentages() != null) { |
| // Maximum child resource is defined |
| ruleHandler.handleMaxChildCapacity(); |
| } |
| } |
| } |
| |
| private String getQueueShortName(String queueName) { |
| int lastDot = queueName.lastIndexOf("."); |
| return queueName.substring(lastDot + 1); |
| } |
| |
| private boolean isNotUnboundedResource(Resource res) { |
| return Resources.unbounded().compareTo(res) != 0; |
| } |
| } |