blob: b66ab85733eda44c0f04fd6de0363b5fb23a4d57 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.ipc.WeightedTimeCostProvider;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.yarn.server.resourcemanager.placement.MappingRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.QueuePlacementRuleUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.MappingRuleCreator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.ReservationACL;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping;
import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.QueueMappingBuilder;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AppPriorityACLConfigurationParser.AppPriorityACLKeyType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.WorkflowPriorityMappingsManager.WorkflowPriorityMapping;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeLookupPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodePolicySpec;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyWithExclusivePartitions;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.Set;
public class CapacitySchedulerConfiguration extends ReservationSchedulerConfiguration {
private static final Logger LOG =
LoggerFactory.getLogger(CapacitySchedulerConfiguration.class);
private static final String CS_CONFIGURATION_FILE = "capacity-scheduler.xml";
@Private
public static final String PREFIX = "yarn.scheduler.capacity.";
@Private
public static final String DOT = ".";
@Private
public static final String MAXIMUM_APPLICATIONS_SUFFIX =
"maximum-applications";
@Private
public static final String MAXIMUM_SYSTEM_APPLICATIONS =
PREFIX + MAXIMUM_APPLICATIONS_SUFFIX;
@Private
public static final String MAXIMUM_AM_RESOURCE_SUFFIX =
"maximum-am-resource-percent";
@Private
public static final String MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT =
PREFIX + MAXIMUM_AM_RESOURCE_SUFFIX;
@Private
public static final String QUEUES = "queues";
@Private
public static final String CAPACITY = "capacity";
@Private
public static final String MAXIMUM_CAPACITY = "maximum-capacity";
@Private
public static final String USER_LIMIT = "minimum-user-limit-percent";
@Private
public static final String USER_LIMIT_FACTOR = "user-limit-factor";
@Private
public static final String USER_WEIGHT = "weight";
@Private
public static final String USER_SETTINGS = "user-settings";
@Private
public static final float DEFAULT_USER_WEIGHT = 1.0f;
@Private
public static final String STATE = "state";
@Private
public static final String ACCESSIBLE_NODE_LABELS = "accessible-node-labels";
@Private
public static final String DEFAULT_NODE_LABEL_EXPRESSION =
"default-node-label-expression";
public static final String RESERVE_CONT_LOOK_ALL_NODES = PREFIX
+ "reservations-continue-look-all-nodes";
@Private
public static final boolean DEFAULT_RESERVE_CONT_LOOK_ALL_NODES = true;
@Private
public static final String MAXIMUM_ALLOCATION = "maximum-allocation";
@Private
public static final String MAXIMUM_ALLOCATION_MB = "maximum-allocation-mb";
@Private
public static final String MAXIMUM_ALLOCATION_VCORES =
"maximum-allocation-vcores";
/**
* Ordering policy of queues
*/
public static final String ORDERING_POLICY = "ordering-policy";
/*
* Ordering policy inside a leaf queue to sort apps
*/
public static final String FIFO_APP_ORDERING_POLICY = "fifo";
public static final String FAIR_APP_ORDERING_POLICY = "fair";
public static final String FIFO_WITH_PARTITIONS_APP_ORDERING_POLICY
= "fifo-with-partitions";
public static final String FIFO_FOR_PENDING_APPS
= "fifo-for-pending-apps";
public static final String DEFAULT_APP_ORDERING_POLICY =
FIFO_APP_ORDERING_POLICY;
@Private
public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000;
@Private
public static final float
DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT = 0.1f;
@Private
public static final float UNDEFINED = -1;
@Private
public static final float MINIMUM_CAPACITY_VALUE = 0;
@Private
public static final float MAXIMUM_CAPACITY_VALUE = 100;
@Private
public static final float DEFAULT_MAXIMUM_CAPACITY_VALUE = -1.0f;
@Private
public static final int DEFAULT_USER_LIMIT = 100;
@Private
public static final float DEFAULT_USER_LIMIT_FACTOR = 1.0f;
@Private
public static final String ALL_ACL = "*";
@Private
public static final String NONE_ACL = " ";
@Private public static final String ENABLE_USER_METRICS =
PREFIX +"user-metrics.enable";
@Private public static final boolean DEFAULT_ENABLE_USER_METRICS = false;
/** ResourceComparator for scheduling. */
@Private public static final String RESOURCE_CALCULATOR_CLASS =
PREFIX + "resource-calculator";
@Private public static final Class<? extends ResourceCalculator>
DEFAULT_RESOURCE_CALCULATOR_CLASS = DefaultResourceCalculator.class;
@Private
public static final String ROOT = "root";
@Private
public static final String NODE_LOCALITY_DELAY =
PREFIX + "node-locality-delay";
@Private
public static final int DEFAULT_NODE_LOCALITY_DELAY = 40;
@Private
public static final String RACK_LOCALITY_ADDITIONAL_DELAY =
PREFIX + "rack-locality-additional-delay";
@Private
public static final int DEFAULT_RACK_LOCALITY_ADDITIONAL_DELAY = -1;
@Private
public static final String RACK_LOCALITY_FULL_RESET =
PREFIX + "rack-locality-full-reset";
@Private
public static final int DEFAULT_OFFSWITCH_PER_HEARTBEAT_LIMIT = 1;
@Private
public static final String OFFSWITCH_PER_HEARTBEAT_LIMIT =
PREFIX + "per-node-heartbeat.maximum-offswitch-assignments";
@Private
public static final boolean DEFAULT_RACK_LOCALITY_FULL_RESET = true;
@Private
public static final String SCHEDULE_ASYNCHRONOUSLY_PREFIX =
PREFIX + "schedule-asynchronously";
@Private
public static final String SCHEDULE_ASYNCHRONOUSLY_ENABLE =
SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".enable";
@Private
public static final String SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD =
SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".maximum-threads";
@Private
public static final String SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS =
SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".maximum-pending-backlogs";
@Private
public static final String APP_FAIL_FAST = PREFIX + "application.fail-fast";
@Private
public static final boolean DEFAULT_APP_FAIL_FAST = false;
@Private
public static final Integer
DEFAULT_SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS = 100;
@Private
public static final boolean DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE = false;
@Private
public static final String QUEUE_MAPPING = PREFIX + "queue-mappings";
@Private
public static final String QUEUE_MAPPING_NAME =
YarnConfiguration.QUEUE_PLACEMENT_RULES + ".app-name";
@Private
public static final String ENABLE_QUEUE_MAPPING_OVERRIDE = QUEUE_MAPPING + "-override.enable";
@Private
public static final boolean DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE = false;
@Private
public static final String WORKFLOW_PRIORITY_MAPPINGS =
PREFIX + "workflow-priority-mappings";
@Private
public static final String ENABLE_WORKFLOW_PRIORITY_MAPPINGS_OVERRIDE =
WORKFLOW_PRIORITY_MAPPINGS + "-override.enable";
@Private
public static final boolean DEFAULT_ENABLE_WORKFLOW_PRIORITY_MAPPINGS_OVERRIDE = false;
@Private
public static final String QUEUE_PREEMPTION_DISABLED = "disable_preemption";
@Private
public static final String DEFAULT_APPLICATION_PRIORITY = "default-application-priority";
@Private
public static final Integer DEFAULT_CONFIGURATION_APPLICATION_PRIORITY = 0;
@Private
public static final String AVERAGE_CAPACITY = "average-capacity";
@Private
public static final String IS_RESERVABLE = "reservable";
@Private
public static final String RESERVATION_WINDOW = "reservation-window";
@Private
public static final String INSTANTANEOUS_MAX_CAPACITY =
"instantaneous-max-capacity";
@Private
public static final String RESERVATION_ADMISSION_POLICY =
"reservation-policy";
@Private
public static final String RESERVATION_AGENT_NAME = "reservation-agent";
@Private
public static final String RESERVATION_SHOW_RESERVATION_AS_QUEUE =
"show-reservations-as-queues";
@Private
public static final String RESERVATION_PLANNER_NAME = "reservation-planner";
@Private
public static final String RESERVATION_MOVE_ON_EXPIRY =
"reservation-move-on-expiry";
@Private
public static final String RESERVATION_ENFORCEMENT_WINDOW =
"reservation-enforcement-window";
@Private
public static final String LAZY_PREEMPTION_ENABLED =
PREFIX + "lazy-preemption-enabled";
@Private
public static final boolean DEFAULT_LAZY_PREEMPTION_ENABLED = false;
@Private
public static final String ASSIGN_MULTIPLE_ENABLED = PREFIX
+ "per-node-heartbeat.multiple-assignments-enabled";
@Private
public static final boolean DEFAULT_ASSIGN_MULTIPLE_ENABLED = true;
/** Maximum number of containers to assign on each check-in. */
@Private
public static final String MAX_ASSIGN_PER_HEARTBEAT = PREFIX
+ "per-node-heartbeat.maximum-container-assignments";
/**
* Avoid potential risk that greedy assign multiple may involve
* */
@Private
public static final int DEFAULT_MAX_ASSIGN_PER_HEARTBEAT = 100;
/** Configuring absolute min/max resources in a queue. **/
@Private
public static final String MINIMUM_RESOURCE = "min-resource";
@Private
public static final String MAXIMUM_RESOURCE = "max-resource";
public static final String DEFAULT_RESOURCE_TYPES = "memory,vcores";
public static final String PATTERN_FOR_ABSOLUTE_RESOURCE = "^\\[[\\w\\.,\\-_=\\ /]+\\]$";
public static final Pattern RESOURCE_PATTERN = Pattern.compile(PATTERN_FOR_ABSOLUTE_RESOURCE);
private static final String WEIGHT_SUFFIX = "w";
public static final String MAX_PARALLEL_APPLICATIONS = "max-parallel-apps";
public static final int DEFAULT_MAX_PARALLEL_APPLICATIONS = Integer.MAX_VALUE;
public static final String ALLOW_ZERO_CAPACITY_SUM =
"allow-zero-capacity-sum";
public static final boolean DEFAULT_ALLOW_ZERO_CAPACITY_SUM = false;
public static final String MAPPING_RULE_FORMAT =
PREFIX + "mapping-rule-format";
public static final String MAPPING_RULE_JSON =
PREFIX + "mapping-rule-json";
public static final String MAPPING_RULE_JSON_FILE =
PREFIX + "mapping-rule-json-file";
public static final String MAPPING_RULE_FORMAT_LEGACY = "legacy";
public static final String MAPPING_RULE_FORMAT_JSON = "json";
public static final String MAPPING_RULE_FORMAT_DEFAULT =
MAPPING_RULE_FORMAT_LEGACY;
/**
* Different resource types supported.
*/
public enum AbsoluteResourceType {
MEMORY, VCORES;
}
AppPriorityACLConfigurationParser priorityACLConfig = new AppPriorityACLConfigurationParser();
public CapacitySchedulerConfiguration() {
this(new Configuration());
}
public CapacitySchedulerConfiguration(Configuration configuration) {
this(configuration, true);
}
public CapacitySchedulerConfiguration(Configuration configuration,
boolean useLocalConfigurationProvider) {
super(configuration);
if (useLocalConfigurationProvider) {
addResource(CS_CONFIGURATION_FILE);
}
}
public static String getQueuePrefix(String queue) {
String queueName = PREFIX + queue + DOT;
return queueName;
}
static String getQueueOrderingPolicyPrefix(String queue) {
String queueName = PREFIX + queue + DOT + ORDERING_POLICY + DOT;
return queueName;
}
static String getUserPrefix(String user) {
return PREFIX + "user." + user + DOT;
}
private String getNodeLabelPrefix(String queue, String label) {
if (label.equals(CommonNodeLabelsManager.NO_LABEL)) {
return getQueuePrefix(queue);
}
return getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS + DOT + label + DOT;
}
public int getMaximumSystemApplications() {
int maxApplications =
getInt(MAXIMUM_SYSTEM_APPLICATIONS, DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS);
return maxApplications;
}
public float getMaximumApplicationMasterResourcePercent() {
return getFloat(MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT);
}
/**
* Get the maximum applications per queue setting.
* @param queue name of the queue
* @return setting specified or -1 if not set
*/
public int getMaximumApplicationsPerQueue(String queue) {
int maxApplicationsPerQueue =
getInt(getQueuePrefix(queue) + MAXIMUM_APPLICATIONS_SUFFIX,
(int)UNDEFINED);
return maxApplicationsPerQueue;
}
/**
* Get the maximum am resource percent per queue setting.
* @param queue name of the queue
* @return per queue setting or defaults to the global am-resource-percent
* setting if per queue setting not present
*/
public float getMaximumApplicationMasterResourcePerQueuePercent(String queue) {
return getFloat(getQueuePrefix(queue) + MAXIMUM_AM_RESOURCE_SUFFIX,
getMaximumApplicationMasterResourcePercent());
}
public void setMaximumApplicationMasterResourcePerQueuePercent(String queue,
float percent) {
setFloat(getQueuePrefix(queue) + MAXIMUM_AM_RESOURCE_SUFFIX, percent);
}
private void throwExceptionForUnexpectedWeight(float weight, String queue,
String label) {
if ((weight < -1e-6 && Math.abs(weight + 1) > 1e-6) || weight > 10000) {
throw new IllegalArgumentException(
"Illegal " + "weight=" + weight + " for queue=" + queue + "label="
+ label
+ ". Acceptable values: [0, 10000], -1 is same as not set");
}
}
public float getNonLabeledQueueWeight(String queue) {
String configuredValue = get(getQueuePrefix(queue) + CAPACITY);
float weight = extractFloatValueFromWeightConfig(configuredValue);
throwExceptionForUnexpectedWeight(weight, queue, "");
return weight;
}
public void setNonLabeledQueueWeight(String queue, float weight) {
set(getQueuePrefix(queue) + CAPACITY, weight + WEIGHT_SUFFIX);
}
public void setLabeledQueueWeight(String queue, String label, float weight) {
set(getNodeLabelPrefix(queue, label) + CAPACITY, weight + WEIGHT_SUFFIX);
}
public float getLabeledQueueWeight(String queue, String label) {
String configuredValue = get(getNodeLabelPrefix(queue, label) + CAPACITY);
float weight = extractFloatValueFromWeightConfig(configuredValue);
throwExceptionForUnexpectedWeight(weight, queue, label);
return weight;
}
public float getNonLabeledQueueCapacity(String queue) {
String configuredCapacity = get(getQueuePrefix(queue) + CAPACITY);
boolean absoluteResourceConfigured = (configuredCapacity != null)
&& RESOURCE_PATTERN.matcher(configuredCapacity).find();
if (absoluteResourceConfigured || configuredWeightAsCapacity(
configuredCapacity)) {
// Return capacity in percentage as 0 for non-root queues and 100 for
// root.From AbstractCSQueue, absolute resource will be parsed and
// updated. Once nodes are added/removed in cluster, capacity in
// percentage will also be re-calculated.
return queue.equals("root") ? 100.0f : 0f;
}
float capacity = queue.equals("root")
? 100.0f
: (configuredCapacity == null)
? 0f
: Float.parseFloat(configuredCapacity);
if (capacity < MINIMUM_CAPACITY_VALUE
|| capacity > MAXIMUM_CAPACITY_VALUE) {
throw new IllegalArgumentException(
"Illegal " + "capacity of " + capacity + " for queue " + queue);
}
LOG.debug("CSConf - getCapacity: queuePrefix={}, capacity={}",
getQueuePrefix(queue), capacity);
return capacity;
}
public void setCapacity(String queue, float capacity) {
if (queue.equals("root")) {
throw new IllegalArgumentException(
"Cannot set capacity, root queue has a fixed capacity of 100.0f");
}
setFloat(getQueuePrefix(queue) + CAPACITY, capacity);
LOG.debug("CSConf - setCapacity: queuePrefix={}, capacity={}",
getQueuePrefix(queue), capacity);
}
@VisibleForTesting
public void setCapacity(String queue, String absoluteResourceCapacity) {
if (queue.equals("root")) {
throw new IllegalArgumentException(
"Cannot set capacity, root queue has a fixed capacity");
}
set(getQueuePrefix(queue) + CAPACITY, absoluteResourceCapacity);
LOG.debug("CSConf - setCapacity: queuePrefix={}, capacity={}",
getQueuePrefix(queue), absoluteResourceCapacity);
}
public float getNonLabeledQueueMaximumCapacity(String queue) {
String configuredCapacity = get(getQueuePrefix(queue) + MAXIMUM_CAPACITY);
boolean matcher = (configuredCapacity != null)
&& RESOURCE_PATTERN.matcher(configuredCapacity).find();
if (matcher) {
// Return capacity in percentage as 0 for non-root queues and 100 for
// root.From AbstractCSQueue, absolute resource will be parsed and
// updated. Once nodes are added/removed in cluster, capacity in
// percentage will also be re-calculated.
return 100.0f;
}
float maxCapacity = (configuredCapacity == null)
? MAXIMUM_CAPACITY_VALUE
: Float.parseFloat(configuredCapacity);
maxCapacity = (maxCapacity == DEFAULT_MAXIMUM_CAPACITY_VALUE)
? MAXIMUM_CAPACITY_VALUE
: maxCapacity;
return maxCapacity;
}
public void setMaximumCapacity(String queue, float maxCapacity) {
if (maxCapacity > MAXIMUM_CAPACITY_VALUE) {
throw new IllegalArgumentException("Illegal " +
"maximum-capacity of " + maxCapacity + " for queue " + queue);
}
setFloat(getQueuePrefix(queue) + MAXIMUM_CAPACITY, maxCapacity);
LOG.debug("CSConf - setMaxCapacity: queuePrefix={}, maxCapacity={}",
getQueuePrefix(queue), maxCapacity);
}
public void setCapacityByLabel(String queue, String label, float capacity) {
setFloat(getNodeLabelPrefix(queue, label) + CAPACITY, capacity);
}
@VisibleForTesting
public void setCapacityByLabel(String queue, String label,
String absoluteResourceCapacity) {
set(getNodeLabelPrefix(queue, label) + CAPACITY, absoluteResourceCapacity);
}
public void setMaximumCapacityByLabel(String queue, String label,
float capacity) {
setFloat(getNodeLabelPrefix(queue, label) + MAXIMUM_CAPACITY, capacity);
}
public void setMaximumCapacityByLabel(String queue, String label,
String absoluteResourceCapacity) {
set(getNodeLabelPrefix(queue, label) + MAXIMUM_CAPACITY,
absoluteResourceCapacity);
}
public int getUserLimit(String queue) {
int userLimit = getInt(getQueuePrefix(queue) + USER_LIMIT,
DEFAULT_USER_LIMIT);
return userLimit;
}
// TODO (wangda): We need to better distinguish app ordering policy and queue
// ordering policy's classname / configuration options, etc. And dedup code
// if possible.
@SuppressWarnings("unchecked")
public <S extends SchedulableEntity> OrderingPolicy<S> getAppOrderingPolicy(
String queue) {
String policyType = get(getQueuePrefix(queue) + ORDERING_POLICY,
DEFAULT_APP_ORDERING_POLICY);
OrderingPolicy<S> orderingPolicy;
if (policyType.trim().equals(FIFO_APP_ORDERING_POLICY)) {
policyType = FifoOrderingPolicy.class.getName();
}
if (policyType.trim().equals(FAIR_APP_ORDERING_POLICY)) {
policyType = FairOrderingPolicy.class.getName();
}
if (policyType.trim().equals(FIFO_WITH_PARTITIONS_APP_ORDERING_POLICY)) {
policyType = FifoOrderingPolicyWithExclusivePartitions.class.getName();
}
if (policyType.trim().equals(FIFO_FOR_PENDING_APPS)) {
policyType = FifoOrderingPolicyForPendingApps.class.getName();
}
try {
orderingPolicy = (OrderingPolicy<S>)
Class.forName(policyType).newInstance();
} catch (Exception e) {
String message = "Unable to construct ordering policy for: " + policyType + ", " + e.getMessage();
throw new RuntimeException(message, e);
}
Map<String, String> config = new HashMap<String, String>();
String confPrefix = getQueuePrefix(queue) + ORDERING_POLICY + ".";
for (Map.Entry<String, String> kv : this) {
if (kv.getKey().startsWith(confPrefix)) {
config.put(kv.getKey().substring(confPrefix.length()), kv.getValue());
}
}
orderingPolicy.configure(config);
return orderingPolicy;
}
public void setUserLimit(String queue, int userLimit) {
setInt(getQueuePrefix(queue) + USER_LIMIT, userLimit);
LOG.debug("here setUserLimit: queuePrefix={}, userLimit={}",
getQueuePrefix(queue), getUserLimit(queue));
}
public float getUserLimitFactor(String queue) {
float userLimitFactor =
getFloat(getQueuePrefix(queue) + USER_LIMIT_FACTOR,
DEFAULT_USER_LIMIT_FACTOR);
return userLimitFactor;
}
public void setUserLimitFactor(String queue, float userLimitFactor) {
setFloat(getQueuePrefix(queue) + USER_LIMIT_FACTOR, userLimitFactor);
}
public QueueState getConfiguredState(String queue) {
String state = get(getQueuePrefix(queue) + STATE);
if (state == null) {
return null;
} else {
return QueueState.valueOf(StringUtils.toUpperCase(state));
}
}
public QueueState getState(String queue) {
QueueState state = getConfiguredState(queue);
return (state == null) ? QueueState.RUNNING : state;
}
@Private
@VisibleForTesting
public void setState(String queue, QueueState state) {
set(getQueuePrefix(queue) + STATE, state.name());
}
public void setAccessibleNodeLabels(String queue, Set<String> labels) {
if (labels == null) {
return;
}
String str = StringUtils.join(",", labels);
set(getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS, str);
}
public Set<String> getAccessibleNodeLabels(String queue) {
String accessibleLabelStr =
get(getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS);
// When accessible-label is null,
if (accessibleLabelStr == null) {
// Only return null when queue is not ROOT
if (!queue.equals(ROOT)) {
return null;
}
} else {
// print a warning when accessibleNodeLabel specified in config and queue
// is ROOT
if (queue.equals(ROOT)) {
LOG.warn("Accessible node labels for root queue will be ignored,"
+ " it will be automatically set to \"*\".");
}
}
// always return ANY for queue root
if (queue.equals(ROOT)) {
return ImmutableSet.of(RMNodeLabelsManager.ANY);
}
// In other cases, split the accessibleLabelStr by ","
Set<String> set = new HashSet<String>();
for (String str : accessibleLabelStr.split(",")) {
if (!str.trim().isEmpty()) {
set.add(str.trim());
}
}
// if labels contains "*", only keep ANY behind
if (set.contains(RMNodeLabelsManager.ANY)) {
set.clear();
set.add(RMNodeLabelsManager.ANY);
}
return Collections.unmodifiableSet(set);
}
private boolean configuredWeightAsCapacity(String configureValue) {
if (configureValue == null) {
return false;
}
return configureValue.endsWith(WEIGHT_SUFFIX);
}
private float extractFloatValueFromWeightConfig(String configureValue) {
if (!configuredWeightAsCapacity(configureValue)) {
return -1f;
} else {
return Float.valueOf(
configureValue.substring(0, configureValue.indexOf(WEIGHT_SUFFIX)));
}
}
private float internalGetLabeledQueueCapacity(String queue, String label,
String suffix, float defaultValue) {
String capacityPropertyName = getNodeLabelPrefix(queue, label) + suffix;
String configuredCapacity = get(capacityPropertyName);
boolean absoluteResourceConfigured =
(configuredCapacity != null) && RESOURCE_PATTERN.matcher(
configuredCapacity).find();
if (absoluteResourceConfigured || configuredWeightAsCapacity(
configuredCapacity)) {
// Return capacity in percentage as 0 for non-root queues and 100 for
// root.From AbstractCSQueue, absolute resource, and weight will be parsed
// and updated separately. Once nodes are added/removed in cluster,
// capacity is percentage will also be re-calculated.
return defaultValue;
}
float capacity = getFloat(capacityPropertyName, defaultValue);
if (capacity < MINIMUM_CAPACITY_VALUE
|| capacity > MAXIMUM_CAPACITY_VALUE) {
throw new IllegalArgumentException(
"Illegal capacity of " + capacity + " for node-label=" + label
+ " in queue=" + queue
+ ", valid capacity should in range of [0, 100].");
}
if (LOG.isDebugEnabled()) {
LOG.debug(
"CSConf - getCapacityOfLabel: prefix=" + getNodeLabelPrefix(queue,
label) + ", capacity=" + capacity);
}
return capacity;
}
public float getLabeledQueueCapacity(String queue, String label) {
return internalGetLabeledQueueCapacity(queue, label, CAPACITY, 0f);
}
public float getLabeledQueueMaximumCapacity(String queue, String label) {
return internalGetLabeledQueueCapacity(queue, label, MAXIMUM_CAPACITY, 100f);
}
public String getDefaultNodeLabelExpression(String queue) {
String defaultLabelExpression = get(getQueuePrefix(queue)
+ DEFAULT_NODE_LABEL_EXPRESSION);
if (defaultLabelExpression == null) {
return null;
}
return defaultLabelExpression.trim();
}
public void setDefaultNodeLabelExpression(String queue, String exp) {
set(getQueuePrefix(queue) + DEFAULT_NODE_LABEL_EXPRESSION, exp);
}
public float getMaximumAMResourcePercentPerPartition(String queue,
String label) {
// If per-partition max-am-resource-percent is not configured,
// use default value as max-am-resource-percent for this queue.
return getFloat(getNodeLabelPrefix(queue, label)
+ MAXIMUM_AM_RESOURCE_SUFFIX,
getMaximumApplicationMasterResourcePerQueuePercent(queue));
}
public void setMaximumAMResourcePercentPerPartition(String queue,
String label, float percent) {
setFloat(getNodeLabelPrefix(queue, label)
+ MAXIMUM_AM_RESOURCE_SUFFIX, percent);
}
/*
* Returns whether we should continue to look at all heart beating nodes even
* after the reservation limit was hit. The node heart beating in could
* satisfy the request thus could be a better pick then waiting for the
* reservation to be fullfilled. This config is refreshable.
*/
public boolean getReservationContinueLook() {
return getBoolean(RESERVE_CONT_LOOK_ALL_NODES,
DEFAULT_RESERVE_CONT_LOOK_ALL_NODES);
}
private static String getAclKey(QueueACL acl) {
return "acl_" + StringUtils.toLowerCase(acl.toString());
}
public AccessControlList getAcl(String queue, QueueACL acl) {
String queuePrefix = getQueuePrefix(queue);
// The root queue defaults to all access if not defined
// Sub queues inherit access if not defined
String defaultAcl = queue.equals(ROOT) ? ALL_ACL : NONE_ACL;
String aclString = get(queuePrefix + getAclKey(acl), defaultAcl);
return new AccessControlList(aclString);
}
public void setAcl(String queue, QueueACL acl, String aclString) {
String queuePrefix = getQueuePrefix(queue);
set(queuePrefix + getAclKey(acl), aclString);
}
private static String getAclKey(ReservationACL acl) {
return "acl_" + StringUtils.toLowerCase(acl.toString());
}
private static String getAclKey(AccessType acl) {
return "acl_" + StringUtils.toLowerCase(acl.toString());
}
@Override
public Map<ReservationACL, AccessControlList> getReservationAcls(String
queue) {
Map<ReservationACL, AccessControlList> resAcls = new HashMap<>();
for (ReservationACL acl : ReservationACL.values()) {
resAcls.put(acl, getReservationAcl(queue, acl));
}
return resAcls;
}
private AccessControlList getReservationAcl(String queue, ReservationACL
acl) {
String queuePrefix = getQueuePrefix(queue);
// The root queue defaults to all access if not defined
// Sub queues inherit access if not defined
String defaultAcl = ALL_ACL;
String aclString = get(queuePrefix + getAclKey(acl), defaultAcl);
return new AccessControlList(aclString);
}
private void setAcl(String queue, ReservationACL acl, String aclString) {
String queuePrefix = getQueuePrefix(queue);
set(queuePrefix + getAclKey(acl), aclString);
}
private void setAcl(String queue, AccessType acl, String aclString) {
String queuePrefix = getQueuePrefix(queue);
set(queuePrefix + getAclKey(acl), aclString);
}
public Map<AccessType, AccessControlList> getAcls(String queue) {
Map<AccessType, AccessControlList> acls =
new HashMap<AccessType, AccessControlList>();
for (QueueACL acl : QueueACL.values()) {
acls.put(SchedulerUtils.toAccessType(acl), getAcl(queue, acl));
}
return acls;
}
public void setAcls(String queue, Map<QueueACL, AccessControlList> acls) {
for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
setAcl(queue, e.getKey(), e.getValue().getAclString());
}
}
@VisibleForTesting
public void setReservationAcls(String queue,
Map<ReservationACL, AccessControlList> acls) {
for (Map.Entry<ReservationACL, AccessControlList> e : acls.entrySet()) {
setAcl(queue, e.getKey(), e.getValue().getAclString());
}
}
@VisibleForTesting
public void setPriorityAcls(String queue, Priority priority,
Priority defaultPriority, String[] acls) {
StringBuilder aclString = new StringBuilder();
StringBuilder userAndGroup = new StringBuilder();
for (int i = 0; i < acls.length; i++) {
userAndGroup.append(AppPriorityACLKeyType.values()[i] + "=" + acls[i].trim())
.append(" ");
}
aclString.append("[" + userAndGroup.toString().trim() + " "
+ "max_priority=" + priority.getPriority() + " " + "default_priority="
+ defaultPriority.getPriority() + "]");
setAcl(queue, AccessType.APPLICATION_MAX_PRIORITY, aclString.toString());
}
public List<AppPriorityACLGroup> getPriorityAcls(String queue,
Priority clusterMaxPriority) {
String queuePrefix = getQueuePrefix(queue);
String defaultAcl = ALL_ACL;
String aclString = get(
queuePrefix + getAclKey(AccessType.APPLICATION_MAX_PRIORITY),
defaultAcl);
return priorityACLConfig.getPriorityAcl(clusterMaxPriority, aclString);
}
public String[] getQueues(String queue) {
LOG.debug("CSConf - getQueues called for: queuePrefix={}",
getQueuePrefix(queue));
String[] queues = getStrings(getQueuePrefix(queue) + QUEUES);
List<String> trimmedQueueNames = new ArrayList<String>();
if (null != queues) {
for (String s : queues) {
trimmedQueueNames.add(s.trim());
}
queues = trimmedQueueNames.toArray(new String[0]);
}
LOG.debug("CSConf - getQueues: queuePrefix={}, queues={}",
getQueuePrefix(queue),
((queues == null) ? "" : StringUtils.arrayToString(queues)));
return queues;
}
public void setQueues(String queue, String[] subQueues) {
set(getQueuePrefix(queue) + QUEUES, StringUtils.arrayToString(subQueues));
LOG.debug("CSConf - setQueues: qPrefix={}, queues={}",
getQueuePrefix(queue), StringUtils.arrayToString(subQueues));
}
public Resource getMinimumAllocation() {
int minimumMemory = getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
int minimumCores = getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
return Resources.createResource(minimumMemory, minimumCores);
}
@Private
public Priority getQueuePriority(String queue) {
String queuePolicyPrefix = getQueuePrefix(queue);
Priority pri = Priority.newInstance(
getInt(queuePolicyPrefix + "priority", 0));
return pri;
}
@Private
public void setQueuePriority(String queue, int priority) {
String queuePolicyPrefix = getQueuePrefix(queue);
setInt(queuePolicyPrefix + "priority", priority);
}
/**
* Get maximum_allocation setting for the specified queue from the
* configuration.
*
* @param queue
* name of the queue
* @return Resource object or Resource.none if not set
*/
public Resource getQueueMaximumAllocation(String queue) {
String queuePrefix = getQueuePrefix(queue);
String rawQueueMaxAllocation = get(queuePrefix + MAXIMUM_ALLOCATION, null);
if (Strings.isNullOrEmpty(rawQueueMaxAllocation)) {
return Resources.none();
} else {
return ResourceUtils.createResourceFromString(rawQueueMaxAllocation,
ResourceUtils.getResourcesTypeInfo());
}
}
public long getQueueMaximumAllocationMb(String queue) {
String queuePrefix = getQueuePrefix(queue);
return getInt(queuePrefix + MAXIMUM_ALLOCATION_MB, (int)UNDEFINED);
}
public int getQueueMaximumAllocationVcores(String queue) {
String queuePrefix = getQueuePrefix(queue);
return getInt(queuePrefix + MAXIMUM_ALLOCATION_VCORES, (int)UNDEFINED);
}
public boolean getEnableUserMetrics() {
return getBoolean(ENABLE_USER_METRICS, DEFAULT_ENABLE_USER_METRICS);
}
public int getOffSwitchPerHeartbeatLimit() {
int limit = getInt(OFFSWITCH_PER_HEARTBEAT_LIMIT,
DEFAULT_OFFSWITCH_PER_HEARTBEAT_LIMIT);
if (limit < 1) {
LOG.warn(OFFSWITCH_PER_HEARTBEAT_LIMIT + "(" + limit + ") < 1. Using 1.");
limit = 1;
}
return limit;
}
public void setOffSwitchPerHeartbeatLimit(int limit) {
setInt(OFFSWITCH_PER_HEARTBEAT_LIMIT, limit);
}
public int getNodeLocalityDelay() {
return getInt(NODE_LOCALITY_DELAY, DEFAULT_NODE_LOCALITY_DELAY);
}
@VisibleForTesting
public void setNodeLocalityDelay(int nodeLocalityDelay) {
setInt(NODE_LOCALITY_DELAY, nodeLocalityDelay);
}
public int getRackLocalityAdditionalDelay() {
return getInt(RACK_LOCALITY_ADDITIONAL_DELAY,
DEFAULT_RACK_LOCALITY_ADDITIONAL_DELAY);
}
public boolean getRackLocalityFullReset() {
return getBoolean(RACK_LOCALITY_FULL_RESET,
DEFAULT_RACK_LOCALITY_FULL_RESET);
}
public ResourceCalculator getResourceCalculator() {
return ReflectionUtils.newInstance(
getClass(
RESOURCE_CALCULATOR_CLASS,
DEFAULT_RESOURCE_CALCULATOR_CLASS,
ResourceCalculator.class),
this);
}
public boolean getUsePortForNodeName() {
return getBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
}
public void setResourceComparator(
Class<? extends ResourceCalculator> resourceCalculatorClass) {
setClass(
RESOURCE_CALCULATOR_CLASS,
resourceCalculatorClass,
ResourceCalculator.class);
}
public boolean getScheduleAynschronously() {
return getBoolean(SCHEDULE_ASYNCHRONOUSLY_ENABLE,
DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE);
}
public void setScheduleAynschronously(boolean async) {
setBoolean(SCHEDULE_ASYNCHRONOUSLY_ENABLE, async);
}
public boolean getOverrideWithQueueMappings() {
return getBoolean(ENABLE_QUEUE_MAPPING_OVERRIDE,
DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE);
}
@Private
@VisibleForTesting
public void setOverrideWithQueueMappings(boolean overrideWithQueueMappings) {
setBoolean(ENABLE_QUEUE_MAPPING_OVERRIDE, overrideWithQueueMappings);
}
public List<QueueMapping> getQueueMappingEntity(
String queueMappingSuffix) {
String queueMappingName = buildQueueMappingRuleProperty(queueMappingSuffix);
List<QueueMapping> mappings =
new ArrayList<QueueMapping>();
Collection<String> mappingsString =
getTrimmedStringCollection(queueMappingName);
for (String mappingValue : mappingsString) {
String[] mapping =
StringUtils.getTrimmedStringCollection(mappingValue, ":")
.toArray(new String[] {});
if (mapping.length != 2 || mapping[1].length() == 0) {
throw new IllegalArgumentException(
"Illegal queue mapping " + mappingValue);
}
//Mappings should be consistent, and have the parent path parsed
// from the beginning
QueueMapping m = QueueMapping.QueueMappingBuilder.create()
.type(QueueMapping.MappingType.APPLICATION)
.source(mapping[0])
.parsePathString(mapping[1])
.build();
mappings.add(m);
}
return mappings;
}
private String buildQueueMappingRuleProperty (String queueMappingSuffix) {
StringBuilder queueMapping = new StringBuilder();
queueMapping.append(YarnConfiguration.QUEUE_PLACEMENT_RULES)
.append(".").append(queueMappingSuffix);
return queueMapping.toString();
}
@VisibleForTesting
public void setQueueMappingEntities(List<QueueMapping> queueMappings,
String queueMappingSuffix) {
if (queueMappings == null) {
return;
}
List<String> queueMappingStrs = new ArrayList<>();
for (QueueMapping mapping : queueMappings) {
queueMappingStrs.add(mapping.toTypelessString());
}
String mappingRuleProp = buildQueueMappingRuleProperty(queueMappingSuffix);
setStrings(mappingRuleProp, StringUtils.join(",", queueMappingStrs));
}
public boolean getOverrideWithWorkflowPriorityMappings() {
return getBoolean(ENABLE_WORKFLOW_PRIORITY_MAPPINGS_OVERRIDE,
DEFAULT_ENABLE_WORKFLOW_PRIORITY_MAPPINGS_OVERRIDE);
}
public Collection<String> getWorkflowPriorityMappings() {
return getTrimmedStringCollection(WORKFLOW_PRIORITY_MAPPINGS);
}
/**
* Get user/group mappings to queues.
*
* @return user/groups mappings or null on illegal configs
*/
public List<QueueMapping> getQueueMappings() {
List<QueueMapping> mappings =
new ArrayList<QueueMapping>();
Collection<String> mappingsString =
getTrimmedStringCollection(QUEUE_MAPPING);
for (String mappingValue : mappingsString) {
String[] mapping =
StringUtils.getTrimmedStringCollection(mappingValue, ":")
.toArray(new String[] {});
if (mapping.length != 3 || mapping[1].length() == 0
|| mapping[2].length() == 0) {
throw new IllegalArgumentException(
"Illegal queue mapping " + mappingValue);
}
QueueMapping m;
try {
QueueMapping.MappingType mappingType;
if (mapping[0].equals("u")) {
mappingType = QueueMapping.MappingType.USER;
} else if (mapping[0].equals("g")) {
mappingType = QueueMapping.MappingType.GROUP;
} else {
throw new IllegalArgumentException(
"unknown mapping prefix " + mapping[0]);
}
//forcing the queue path to be split to parent and leafQueue, to make
//queue mapping parentPath and queueName consistent
m = QueueMappingBuilder.create()
.type(mappingType)
.source(mapping[1])
.parsePathString(mapping[2])
.build();
} catch (Throwable t) {
throw new IllegalArgumentException(
"Illegal queue mapping " + mappingValue);
}
if (m != null) {
mappings.add(m);
}
}
return mappings;
}
public List<MappingRule> parseLegacyMappingRules() {
List<MappingRule> mappings = new ArrayList<MappingRule>();
Collection<String> mappingsString =
getTrimmedStringCollection(QUEUE_MAPPING);
for (String mappingValue : mappingsString) {
String[] mapping =
StringUtils.getTrimmedStringCollection(mappingValue, ":")
.toArray(new String[] {});
if (mapping.length != 3 || mapping[1].length() == 0
|| mapping[2].length() == 0) {
throw new IllegalArgumentException(
"Illegal queue mapping " + mappingValue);
}
if (mapping[0].equals("u") || mapping[0].equals("g")) {
mappings.add(MappingRule.createLegacyRule(
mapping[0], mapping[1], mapping[2]));
} else {
throw new IllegalArgumentException(
"unknown mapping prefix " + mapping[0]);
}
}
mappingsString = getTrimmedStringCollection(QUEUE_MAPPING_NAME);
for (String mappingValue : mappingsString) {
String[] mapping =
StringUtils.getTrimmedStringCollection(mappingValue, ":")
.toArray(new String[] {});
if (mapping.length != 2 || mapping[1].length() == 0) {
throw new IllegalArgumentException(
"Illegal queue mapping " + mappingValue);
}
mappings.add(MappingRule.createLegacyRule(mapping[0], mapping[1]));
}
return mappings;
}
public List<MappingRule> parseJSONMappingRules() throws IOException {
String mappingJson = get(MAPPING_RULE_JSON, "");
String mappingJsonFile = get(MAPPING_RULE_JSON_FILE, "");
MappingRuleCreator creator = new MappingRuleCreator();
if (!mappingJson.equals("")) {
LOG.info("Reading mapping rules from provided inline JSON '{}'.",
mappingJson);
try {
return creator.getMappingRulesFromString(mappingJson);
} catch (IOException e) {
LOG.error("Error parsing mapping rule inline JSON.");
throw e;
}
} else if (!mappingJsonFile.equals("")) {
LOG.info("Reading mapping rules from JSON file '{}'.",
mappingJsonFile);
try {
return creator.getMappingRulesFromFile(mappingJsonFile.trim());
} catch (IOException e) {
LOG.error("Error reading or parsing mapping rule JSON file '{}'.",
mappingJsonFile);
throw e;
}
} else {
LOG.warn("Mapping rule is set to JSON, but no inline JSON nor a JSON " +
"file was provided! Starting with no mapping rules!");
}
return new ArrayList<>();
}
public List<MappingRule> getMappingRules() throws IOException {
String mappingFormat =
get(MAPPING_RULE_FORMAT, MAPPING_RULE_FORMAT_DEFAULT);
if (mappingFormat.equals(MAPPING_RULE_FORMAT_LEGACY)) {
return parseLegacyMappingRules();
} else if (mappingFormat.equals(MAPPING_RULE_FORMAT_JSON)) {
return parseJSONMappingRules();
} else {
throw new IllegalArgumentException(
"Illegal queue mapping format '" + mappingFormat + "' please use '" +
MAPPING_RULE_FORMAT_LEGACY + "' or '" + MAPPING_RULE_FORMAT_JSON +
"'");
}
}
@Private
@VisibleForTesting
public void setQueuePlacementRules(Collection<String> queuePlacementRules) {
if (queuePlacementRules == null) {
return;
}
String str = StringUtils.join(",", queuePlacementRules);
setStrings(YarnConfiguration.QUEUE_PLACEMENT_RULES, str);
}
@Private
@VisibleForTesting
public void setQueueMappings(List<QueueMapping> queueMappings) {
if (queueMappings == null) {
return;
}
List<String> queueMappingStrs = new ArrayList<>();
for (QueueMapping mapping : queueMappings) {
queueMappingStrs.add(mapping.toString());
}
setStrings(QUEUE_MAPPING, StringUtils.join(",", queueMappingStrs));
}
@Private
@VisibleForTesting
public void setAppNameMappings(List<QueueMapping> queueMappings) {
if (queueMappings == null) {
return;
}
List<String> queueMappingStrs = new ArrayList<>();
for (QueueMapping mapping : queueMappings) {
String rule = mapping.toString();
String[] parts = rule.split(":");
queueMappingStrs.add(parts[1] + ":" + parts[2]);
}
setStrings(QUEUE_MAPPING_NAME, StringUtils.join(",", queueMappingStrs));
}
@Private
@VisibleForTesting
void setWorkflowPriorityMappings(
List<WorkflowPriorityMapping> workflowPriorityMappings) {
setStrings(WORKFLOW_PRIORITY_MAPPINGS, WorkflowPriorityMappingsManager
.getWorkflowPriorityMappingStr(workflowPriorityMappings));
}
public boolean isReservable(String queue) {
boolean isReservable =
getBoolean(getQueuePrefix(queue) + IS_RESERVABLE, false);
return isReservable;
}
public void setReservable(String queue, boolean isReservable) {
setBoolean(getQueuePrefix(queue) + IS_RESERVABLE, isReservable);
LOG.debug("here setReservableQueue: queuePrefix={}, isReservableQueue={}",
getQueuePrefix(queue), isReservable(queue));
}
@Override
public long getReservationWindow(String queue) {
long reservationWindow =
getLong(getQueuePrefix(queue) + RESERVATION_WINDOW,
DEFAULT_RESERVATION_WINDOW);
return reservationWindow;
}
@Override
public float getAverageCapacity(String queue) {
float avgCapacity =
getFloat(getQueuePrefix(queue) + AVERAGE_CAPACITY,
MAXIMUM_CAPACITY_VALUE);
return avgCapacity;
}
@Override
public float getInstantaneousMaxCapacity(String queue) {
float instMaxCapacity =
getFloat(getQueuePrefix(queue) + INSTANTANEOUS_MAX_CAPACITY,
MAXIMUM_CAPACITY_VALUE);
return instMaxCapacity;
}
public void setInstantaneousMaxCapacity(String queue, float instMaxCapacity) {
setFloat(getQueuePrefix(queue) + INSTANTANEOUS_MAX_CAPACITY,
instMaxCapacity);
}
public void setReservationWindow(String queue, long reservationWindow) {
setLong(getQueuePrefix(queue) + RESERVATION_WINDOW, reservationWindow);
}
public void setAverageCapacity(String queue, float avgCapacity) {
setFloat(getQueuePrefix(queue) + AVERAGE_CAPACITY, avgCapacity);
}
@Override
public String getReservationAdmissionPolicy(String queue) {
String reservationPolicy =
get(getQueuePrefix(queue) + RESERVATION_ADMISSION_POLICY,
DEFAULT_RESERVATION_ADMISSION_POLICY);
return reservationPolicy;
}
public void setReservationAdmissionPolicy(String queue,
String reservationPolicy) {
set(getQueuePrefix(queue) + RESERVATION_ADMISSION_POLICY, reservationPolicy);
}
@Override
public String getReservationAgent(String queue) {
String reservationAgent =
get(getQueuePrefix(queue) + RESERVATION_AGENT_NAME,
DEFAULT_RESERVATION_AGENT_NAME);
return reservationAgent;
}
public void setReservationAgent(String queue, String reservationPolicy) {
set(getQueuePrefix(queue) + RESERVATION_AGENT_NAME, reservationPolicy);
}
@Override
public boolean getShowReservationAsQueues(String queuePath) {
boolean showReservationAsQueues =
getBoolean(getQueuePrefix(queuePath)
+ RESERVATION_SHOW_RESERVATION_AS_QUEUE,
DEFAULT_SHOW_RESERVATIONS_AS_QUEUES);
return showReservationAsQueues;
}
@Override
public String getReplanner(String queue) {
String replanner =
get(getQueuePrefix(queue) + RESERVATION_PLANNER_NAME,
DEFAULT_RESERVATION_PLANNER_NAME);
return replanner;
}
@Override
public boolean getMoveOnExpiry(String queue) {
boolean killOnExpiry =
getBoolean(getQueuePrefix(queue) + RESERVATION_MOVE_ON_EXPIRY,
DEFAULT_RESERVATION_MOVE_ON_EXPIRY);
return killOnExpiry;
}
@Override
public long getEnforcementWindow(String queue) {
long enforcementWindow =
getLong(getQueuePrefix(queue) + RESERVATION_ENFORCEMENT_WINDOW,
DEFAULT_RESERVATION_ENFORCEMENT_WINDOW);
return enforcementWindow;
}
/**
* Sets the <em>disable_preemption</em> property in order to indicate
* whether or not container preemption will be disabled for the specified
* queue.
*
* @param queue queue path
* @param preemptionDisabled true if preemption is disabled on queue
*/
public void setPreemptionDisabled(String queue, boolean preemptionDisabled) {
setBoolean(getQueuePrefix(queue) + QUEUE_PREEMPTION_DISABLED,
preemptionDisabled);
}
/**
* Indicates whether preemption is disabled on the specified queue.
*
* @param queue queue path to query
* @param defaultVal used as default if the <em>disable_preemption</em>
* is not set in the configuration
* @return true if preemption is disabled on <em>queue</em>, false otherwise
*/
public boolean getPreemptionDisabled(String queue, boolean defaultVal) {
boolean preemptionDisabled =
getBoolean(getQueuePrefix(queue) + QUEUE_PREEMPTION_DISABLED,
defaultVal);
return preemptionDisabled;
}
/**
* Indicates whether intra-queue preemption is disabled on the specified queue
*
* @param queue queue path to query
* @param defaultVal used as default if the property is not set in the
* configuration
* @return true if preemption is disabled on queue, false otherwise
*/
public boolean getIntraQueuePreemptionDisabled(String queue,
boolean defaultVal) {
return
getBoolean(getQueuePrefix(queue) + INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX
+ QUEUE_PREEMPTION_DISABLED, defaultVal);
}
/**
* Get configured node labels in a given queuePath
*/
public Set<String> getConfiguredNodeLabels(String queuePath) {
Set<String> configuredNodeLabels = new HashSet<String>();
Entry<String, String> e = null;
Iterator<Entry<String, String>> iter = iterator();
while (iter.hasNext()) {
e = iter.next();
String key = e.getKey();
if (key.startsWith(getQueuePrefix(queuePath) + ACCESSIBLE_NODE_LABELS
+ DOT)) {
// Find <label-name> in
// <queue-path>.accessible-node-labels.<label-name>.property
int labelStartIdx =
key.indexOf(ACCESSIBLE_NODE_LABELS)
+ ACCESSIBLE_NODE_LABELS.length() + 1;
int labelEndIndx = key.indexOf('.', labelStartIdx);
String labelName = key.substring(labelStartIdx, labelEndIndx);
configuredNodeLabels.add(labelName);
}
}
// always add NO_LABEL
configuredNodeLabels.add(RMNodeLabelsManager.NO_LABEL);
return configuredNodeLabels;
}
public Integer getDefaultApplicationPriorityConfPerQueue(String queue) {
Integer defaultPriority = getInt(getQueuePrefix(queue)
+ DEFAULT_APPLICATION_PRIORITY,
DEFAULT_CONFIGURATION_APPLICATION_PRIORITY);
return defaultPriority;
}
@VisibleForTesting
public void setOrderingPolicy(String queue, String policy) {
set(getQueuePrefix(queue) + ORDERING_POLICY, policy);
}
@VisibleForTesting
public void setOrderingPolicyParameter(String queue,
String parameterKey, String parameterValue) {
set(getQueuePrefix(queue) + ORDERING_POLICY + "." + parameterKey,
parameterValue);
}
public boolean getLazyPreemptionEnabled() {
return getBoolean(LAZY_PREEMPTION_ENABLED, DEFAULT_LAZY_PREEMPTION_ENABLED);
}
public boolean shouldAppFailFast(Configuration conf) {
return conf.getBoolean(APP_FAIL_FAST, DEFAULT_APP_FAIL_FAST);
}
public Integer getMaxParallelAppsForQueue(String queue) {
int defaultMaxParallelAppsForQueue =
getInt(PREFIX + MAX_PARALLEL_APPLICATIONS,
DEFAULT_MAX_PARALLEL_APPLICATIONS);
String maxParallelAppsForQueue = get(getQueuePrefix(queue)
+ MAX_PARALLEL_APPLICATIONS);
return (maxParallelAppsForQueue != null) ?
Integer.parseInt(maxParallelAppsForQueue)
: defaultMaxParallelAppsForQueue;
}
public Integer getMaxParallelAppsForUser(String user) {
int defaultMaxParallelAppsForUser =
getInt(PREFIX + "user." + MAX_PARALLEL_APPLICATIONS,
DEFAULT_MAX_PARALLEL_APPLICATIONS);
String maxParallelAppsForUser = get(getUserPrefix(user)
+ MAX_PARALLEL_APPLICATIONS);
return (maxParallelAppsForUser != null) ?
Integer.parseInt(maxParallelAppsForUser)
: defaultMaxParallelAppsForUser;
}
public boolean getAllowZeroCapacitySum(String queue) {
return getBoolean(getQueuePrefix(queue)
+ ALLOW_ZERO_CAPACITY_SUM, DEFAULT_ALLOW_ZERO_CAPACITY_SUM);
}
public void setAllowZeroCapacitySum(String queue, boolean value) {
setBoolean(getQueuePrefix(queue)
+ ALLOW_ZERO_CAPACITY_SUM, value);
}
private static final String PREEMPTION_CONFIG_PREFIX =
"yarn.resourcemanager.monitor.capacity.preemption.";
private static final String INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX =
"intra-queue-preemption.";
/** If true, run the policy but do not affect the cluster with preemption and
* kill events. */
public static final String PREEMPTION_OBSERVE_ONLY =
PREEMPTION_CONFIG_PREFIX + "observe_only";
public static final boolean DEFAULT_PREEMPTION_OBSERVE_ONLY = false;
/** Time in milliseconds between invocations of this policy */
public static final String PREEMPTION_MONITORING_INTERVAL =
PREEMPTION_CONFIG_PREFIX + "monitoring_interval";
public static final long DEFAULT_PREEMPTION_MONITORING_INTERVAL = 3000L;
/** Time in milliseconds between requesting a preemption from an application
* and killing the container. */
public static final String PREEMPTION_WAIT_TIME_BEFORE_KILL =
PREEMPTION_CONFIG_PREFIX + "max_wait_before_kill";
public static final long DEFAULT_PREEMPTION_WAIT_TIME_BEFORE_KILL = 15000L;
/** Maximum percentage of resources preemptionCandidates in a single round. By
* controlling this value one can throttle the pace at which containers are
* reclaimed from the cluster. After computing the total desired preemption,
* the policy scales it back within this limit. */
public static final String TOTAL_PREEMPTION_PER_ROUND =
PREEMPTION_CONFIG_PREFIX + "total_preemption_per_round";
public static final float DEFAULT_TOTAL_PREEMPTION_PER_ROUND = 0.1f;
/** Maximum amount of resources above the target capacity ignored for
* preemption. This defines a deadzone around the target capacity that helps
* prevent thrashing and oscillations around the computed target balance.
* High values would slow the time to capacity and (absent natural
* completions) it might prevent convergence to guaranteed capacity. */
public static final String PREEMPTION_MAX_IGNORED_OVER_CAPACITY =
PREEMPTION_CONFIG_PREFIX + "max_ignored_over_capacity";
public static final double DEFAULT_PREEMPTION_MAX_IGNORED_OVER_CAPACITY = 0.1;
/**
* Given a computed preemption target, account for containers naturally
* expiring and preempt only this percentage of the delta. This determines
* the rate of geometric convergence into the deadzone ({@link
* #PREEMPTION_MAX_IGNORED_OVER_CAPACITY}). For example, a termination factor of 0.5
* will reclaim almost 95% of resources within 5 * {@link
* #PREEMPTION_WAIT_TIME_BEFORE_KILL}, even absent natural termination. */
public static final String PREEMPTION_NATURAL_TERMINATION_FACTOR =
PREEMPTION_CONFIG_PREFIX + "natural_termination_factor";
public static final double DEFAULT_PREEMPTION_NATURAL_TERMINATION_FACTOR =
0.2;
/**
* By default, reserved resource will be excluded while balancing capacities
* of queues.
*
* Why doing this? In YARN-4390, we added preemption-based-on-reserved-container
* Support. To reduce unnecessary preemption for large containers. We will
* not include reserved resources while calculating ideal-allocation in
* FifoCandidatesSelector.
*
* Changes in YARN-4390 will significantly reduce number of containers preempted
* When cluster has heterogeneous container requests. (Please check test
* report: https://issues.apache.org/jira/secure/attachment/12796197/YARN-4390-test-results.pdf
*
* However, on the other hand, in some corner cases, especially for
* fragmented cluster. It could lead to preemption cannot kick in in some
* cases. Please see YARN-5731.
*
* So to solve the problem, make this change to be configurable, and please
* note that it is an experimental option.
*/
public static final String
ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS =
PREEMPTION_CONFIG_PREFIX
+ "additional_res_balance_based_on_reserved_containers";
public static final boolean
DEFAULT_ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS = false;
/**
* When calculating which containers to be preempted, we will try to preempt
* containers for reserved containers first. By default is false.
*/
public static final String PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS =
PREEMPTION_CONFIG_PREFIX + "select_based_on_reserved_containers";
public static final boolean DEFAULT_PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS =
false;
/**
* For intra-queue preemption, priority/user-limit/fairness based selectors
* can help to preempt containers.
*/
public static final String INTRAQUEUE_PREEMPTION_ENABLED =
PREEMPTION_CONFIG_PREFIX +
INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "enabled";
public static final boolean DEFAULT_INTRAQUEUE_PREEMPTION_ENABLED = false;
/**
* For intra-queue preemption, consider those queues which are above used cap
* limit.
*/
public static final String INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD =
PREEMPTION_CONFIG_PREFIX +
INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "minimum-threshold";
public static final float DEFAULT_INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD =
0.5f;
/**
* For intra-queue preemption, allowable maximum-preemptable limit per queue.
*/
public static final String INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT =
PREEMPTION_CONFIG_PREFIX +
INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "max-allowable-limit";
public static final float DEFAULT_INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT =
0.2f;
/**
* For intra-queue preemption, enforce a preemption order such as
* "userlimit_first" or "priority_first".
*/
public static final String INTRAQUEUE_PREEMPTION_ORDER_POLICY = PREEMPTION_CONFIG_PREFIX
+ INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "preemption-order-policy";
public static final String DEFAULT_INTRAQUEUE_PREEMPTION_ORDER_POLICY = "userlimit_first";
/**
* Flag to determine whether or not to preempt containers from apps where some
* used resources are less than the user's user limit.
*/
public static final String CROSS_QUEUE_PREEMPTION_CONSERVATIVE_DRF =
PREEMPTION_CONFIG_PREFIX + "conservative-drf";
public static final Boolean DEFAULT_CROSS_QUEUE_PREEMPTION_CONSERVATIVE_DRF =
false;
public static final String IN_QUEUE_PREEMPTION_CONSERVATIVE_DRF =
PREEMPTION_CONFIG_PREFIX + INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX +
"conservative-drf";
public static final Boolean DEFAULT_IN_QUEUE_PREEMPTION_CONSERVATIVE_DRF =
true;
/**
* Should we allow queues continue grow after all queue reaches their
* guaranteed capacity.
*/
public static final String PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED =
PREEMPTION_CONFIG_PREFIX + "preemption-to-balance-queue-after-satisfied.enabled";
public static final boolean DEFAULT_PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED = false;
/**
* How long we will wait to balance queues, by default it is 5 mins.
*/
public static final String MAX_WAIT_BEFORE_KILL_FOR_QUEUE_BALANCE_PREEMPTION =
PREEMPTION_CONFIG_PREFIX + "preemption-to-balance-queue-after-satisfied.max-wait-before-kill";
public static final long
DEFAULT_MAX_WAIT_BEFORE_KILL_FOR_QUEUE_BALANCE_PREEMPTION =
300 * 1000;
/**
* Maximum application for a queue to be used when application per queue is
* not defined.To be consistent with previous version the default value is set
* as UNDEFINED.
*/
@Private
public static final String QUEUE_GLOBAL_MAX_APPLICATION =
PREFIX + "global-queue-max-application";
public int getGlobalMaximumApplicationsPerQueue() {
int maxApplicationsPerQueue =
getInt(QUEUE_GLOBAL_MAX_APPLICATION, (int) UNDEFINED);
return maxApplicationsPerQueue;
}
public void setGlobalMaximumApplicationsPerQueue(int val) {
setInt(QUEUE_GLOBAL_MAX_APPLICATION, val);
}
/**
* Ordering policy inside a parent queue to sort queues
*/
/**
* Less relative usage queue can get next resource, this is default
*/
public static final String QUEUE_UTILIZATION_ORDERING_POLICY = "utilization";
/**
* Combination of relative usage and priority
*/
public static final String QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY =
"priority-utilization";
public static final String DEFAULT_QUEUE_ORDERING_POLICY =
QUEUE_UTILIZATION_ORDERING_POLICY;
@Private
public void setQueueOrderingPolicy(String queue, String policy) {
set(getQueuePrefix(queue) + ORDERING_POLICY, policy);
}
@Private
public QueueOrderingPolicy getQueueOrderingPolicy(String queue,
String parentPolicy) {
String defaultPolicy = parentPolicy;
if (null == defaultPolicy) {
defaultPolicy = DEFAULT_QUEUE_ORDERING_POLICY;
}
String policyType = get(getQueuePrefix(queue) + ORDERING_POLICY,
defaultPolicy).trim();
QueueOrderingPolicy qop;
if (policyType.equals(QUEUE_UTILIZATION_ORDERING_POLICY)) {
// Doesn't respect priority
qop = new PriorityUtilizationQueueOrderingPolicy(false);
} else if (policyType.equals(
QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY)) {
qop = new PriorityUtilizationQueueOrderingPolicy(true);
} else {
try {
qop = (QueueOrderingPolicy) Class.forName(policyType).newInstance();
} catch (Exception e) {
String message = "Unable to construct queue ordering policy="
+ policyType + " queue=" + queue;
throw new YarnRuntimeException(message, e);
}
}
return qop;
}
/*
* Get global configuration for ordering policies
*/
private String getOrderingPolicyGlobalConfigKey(String orderPolicyName,
String configKey) {
return PREFIX + ORDERING_POLICY + DOT + orderPolicyName + DOT + configKey;
}
/**
* Global configurations of queue-priority-utilization ordering policy
*/
private static final String UNDER_UTILIZED_PREEMPTION_ENABLED =
"underutilized-preemption.enabled";
/**
* Do we allow under-utilized queue with higher priority to preempt queue
* with lower priority *even if queue with lower priority is not satisfied*.
*
* For example, two queues, a and b
* a.priority = 1, (a.used-capacity - a.reserved-capacity) = 40%
* b.priority = 0, b.used-capacity = 30%
*
* Set this configuration to true to allow queue-a to preempt container from
* queue-b.
*
* (The reason why deduct reserved-capacity from used-capacity for queue with
* higher priority is: the reserved-capacity is just scheduler's internal
* implementation to allocate large containers, it is not possible for
* application to use such reserved-capacity. It is possible that a queue with
* large container requests have a large number of containers but cannot
* allocate from any of them. But scheduler will make sure a satisfied queue
* will not preempt resource from any other queues. A queue is considered to
* be satisfied when queue's used-capacity - reserved-capacity ≥
* guaranteed-capacity.)
*
* @return allowed or not
*/
public boolean getPUOrderingPolicyUnderUtilizedPreemptionEnabled() {
return getBoolean(getOrderingPolicyGlobalConfigKey(
QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY,
UNDER_UTILIZED_PREEMPTION_ENABLED), false);
}
@VisibleForTesting
public void setPUOrderingPolicyUnderUtilizedPreemptionEnabled(
boolean enabled) {
setBoolean(getOrderingPolicyGlobalConfigKey(
QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY,
UNDER_UTILIZED_PREEMPTION_ENABLED), enabled);
}
private static final String UNDER_UTILIZED_PREEMPTION_DELAY =
"underutilized-preemption.reserved-container-delay-ms";
/**
* When a reserved container of an underutilized queue is created. Preemption
* will kick in after specified delay (in ms).
*
* The total time to preempt resources for a reserved container from higher
* priority queue will be: reserved-container-delay-ms +
* {@link CapacitySchedulerConfiguration#PREEMPTION_WAIT_TIME_BEFORE_KILL}.
*
* This parameter is added to make preemption from lower priority queue which
* is underutilized to be more careful. This parameter takes effect when
* underutilized-preemption.enabled set to true.
*
* @return delay
*/
public long getPUOrderingPolicyUnderUtilizedPreemptionDelay() {
return getLong(getOrderingPolicyGlobalConfigKey(
QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY,
UNDER_UTILIZED_PREEMPTION_DELAY), 60000L);
}
@VisibleForTesting
public void setPUOrderingPolicyUnderUtilizedPreemptionDelay(
long timeout) {
setLong(getOrderingPolicyGlobalConfigKey(
QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY,
UNDER_UTILIZED_PREEMPTION_DELAY), timeout);
}
private static final String UNDER_UTILIZED_PREEMPTION_MOVE_RESERVATION =
"underutilized-preemption.allow-move-reservation";
/**
* When doing preemption from under-satisfied queues for priority queue.
* Do we allow move reserved container from one host to another?
*
* @return allow or not
*/
public boolean getPUOrderingPolicyUnderUtilizedPreemptionMoveReservation() {
return getBoolean(getOrderingPolicyGlobalConfigKey(
QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY,
UNDER_UTILIZED_PREEMPTION_MOVE_RESERVATION), false);
}
@VisibleForTesting
public void setPUOrderingPolicyUnderUtilizedPreemptionMoveReservation(
boolean allowMoveReservation) {
setBoolean(getOrderingPolicyGlobalConfigKey(
QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY,
UNDER_UTILIZED_PREEMPTION_MOVE_RESERVATION), allowMoveReservation);
}
/**
* Get the weights of all users at this queue level from the configuration.
* Used in computing user-specific user limit, relative to other users.
* @param queuePath full queue path
* @return map of user weights, if they exists. Otherwise, return empty map.
*/
public Map<String, Float> getAllUserWeightsForQueue(String queuePath) {
Map <String, Float> userWeights = new HashMap <String, Float>();
String qPathPlusPrefix =
getQueuePrefix(queuePath).replaceAll("\\.", "\\\\.")
+ USER_SETTINGS + "\\.";
String weightKeyRegex =
qPathPlusPrefix + "\\w+\\." + USER_WEIGHT;
Map<String, String> props = getValByRegex(weightKeyRegex);
for (Entry<String, String> e : props.entrySet()) {
String userName =
e.getKey().replaceFirst(qPathPlusPrefix, "")
.replaceFirst("\\." + USER_WEIGHT, "");
if (userName != null && !userName.isEmpty()) {
userWeights.put(userName, new Float(e.getValue()));
}
}
return userWeights;
}
public boolean getAssignMultipleEnabled() {
return getBoolean(ASSIGN_MULTIPLE_ENABLED, DEFAULT_ASSIGN_MULTIPLE_ENABLED);
}
public int getMaxAssignPerHeartbeat() {
return getInt(MAX_ASSIGN_PER_HEARTBEAT, DEFAULT_MAX_ASSIGN_PER_HEARTBEAT);
}
public static final String MAXIMUM_LIFETIME_SUFFIX =
"maximum-application-lifetime";
public static final String DEFAULT_LIFETIME_SUFFIX =
"default-application-lifetime";
public long getMaximumLifetimePerQueue(String queue) {
long maximumLifetimePerQueue = getLong(
getQueuePrefix(queue) + MAXIMUM_LIFETIME_SUFFIX, (long) UNDEFINED);
return maximumLifetimePerQueue;
}
public void setMaximumLifetimePerQueue(String queue, long maximumLifetime) {
setLong(getQueuePrefix(queue) + MAXIMUM_LIFETIME_SUFFIX, maximumLifetime);
}
public long getDefaultLifetimePerQueue(String queue) {
long maximumLifetimePerQueue = getLong(
getQueuePrefix(queue) + DEFAULT_LIFETIME_SUFFIX, (long) UNDEFINED);
return maximumLifetimePerQueue;
}
public void setDefaultLifetimePerQueue(String queue, long defaultLifetime) {
setLong(getQueuePrefix(queue) + DEFAULT_LIFETIME_SUFFIX, defaultLifetime);
}
@Private
public static final boolean DEFAULT_AUTO_CREATE_CHILD_QUEUE_ENABLED = false;
@Private
private static final String AUTO_CREATE_CHILD_QUEUE_PREFIX =
"auto-create-child-queue.";
@Private
public static final String AUTO_CREATE_CHILD_QUEUE_ENABLED =
AUTO_CREATE_CHILD_QUEUE_PREFIX + "enabled";
@Private
private static final String AUTO_QUEUE_CREATION_V2_PREFIX =
"auto-queue-creation-v2.";
@Private
public static final String AUTO_QUEUE_CREATION_V2_ENABLED =
AUTO_QUEUE_CREATION_V2_PREFIX + "enabled";
@Private
public static final String AUTO_QUEUE_CREATION_V2_MAX_QUEUES =
AUTO_QUEUE_CREATION_V2_PREFIX + "max-queues";
@Private
public static final int
DEFAULT_AUTO_QUEUE_CREATION_V2_MAX_QUEUES = 1000;
@Private
public static final boolean DEFAULT_AUTO_QUEUE_CREATION_ENABLED = false;
@Private
public static final String AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX =
"leaf-queue-template";
@Private
public static final String AUTO_CREATE_QUEUE_MAX_QUEUES =
"auto-create-child-queue.max-queues";
@Private
public static final int DEFAULT_AUTO_CREATE_QUEUE_MAX_QUEUES = 1000;
/**
* If true, this queue will be created as a Parent Queue which Auto Created
* leaf child queues
*
* @param queuePath The queues path
* @return true if auto create is enabled for child queues else false. Default
* is false
*/
@Private
public boolean isAutoCreateChildQueueEnabled(String queuePath) {
boolean isAutoCreateEnabled = getBoolean(
getQueuePrefix(queuePath) + AUTO_CREATE_CHILD_QUEUE_ENABLED,
DEFAULT_AUTO_CREATE_CHILD_QUEUE_ENABLED);
return isAutoCreateEnabled;
}
@Private
@VisibleForTesting
public void setAutoCreateChildQueueEnabled(String queuePath,
boolean autoCreationEnabled) {
setBoolean(getQueuePrefix(queuePath) +
AUTO_CREATE_CHILD_QUEUE_ENABLED,
autoCreationEnabled);
}
public void setAutoQueueCreationV2Enabled(String queuePath,
boolean autoQueueCreation) {
setBoolean(
getQueuePrefix(queuePath) + AUTO_QUEUE_CREATION_V2_ENABLED,
autoQueueCreation);
}
public boolean isAutoQueueCreationV2Enabled(String queuePath) {
boolean isAutoQueueCreation = getBoolean(
getQueuePrefix(queuePath) + AUTO_QUEUE_CREATION_V2_ENABLED,
DEFAULT_AUTO_QUEUE_CREATION_ENABLED);
return isAutoQueueCreation;
}
/**
* Get the auto created leaf queue's template configuration prefix
* Leaf queue's template capacities are configured at the parent queue
*
* @param queuePath parent queue's path
* @return Config prefix for leaf queue template configurations
*/
@Private
public String getAutoCreatedQueueTemplateConfPrefix(String queuePath) {
return queuePath + DOT + AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX;
}
@Private
public static final String FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY =
"auto-create-child-queue.fail-on-exceeding-parent-capacity";
@Private
public static final boolean DEFAULT_FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY =
false;
/**
* Fail further auto leaf queue creation when parent's guaranteed capacity is
* exceeded.
*
* @param queuePath the parent queue's path
* @return true if configured to fail else false
*/
@Private
public boolean getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded(
String queuePath) {
boolean shouldFailAutoQueueCreationOnExceedingGuaranteedCapacity =
getBoolean(getQueuePrefix(queuePath)
+ FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY,
DEFAULT_FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY);
return shouldFailAutoQueueCreationOnExceedingGuaranteedCapacity;
}
@VisibleForTesting
@Private
public void setShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded(
String queuePath, boolean autoCreationEnabled) {
setBoolean(
getQueuePrefix(queuePath) +
FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY,
autoCreationEnabled);
}
/**
* Get the max number of leaf queues that are allowed to be created under
* a parent queue
*
* @param queuePath the paret queue's path
* @return the max number of leaf queues allowed to be auto created
*/
@Private
public int getAutoCreatedQueuesMaxChildQueuesLimit(String queuePath) {
return getInt(getQueuePrefix(queuePath) +
AUTO_CREATE_QUEUE_MAX_QUEUES,
DEFAULT_AUTO_CREATE_QUEUE_MAX_QUEUES);
}
/**
* Get the max number of queues that are allowed to be created under
* a parent queue which allowed auto creation v2.
*
* @param queuePath the parent queue's path
* @return the max number of queues allowed to be auto created,
* in new auto created.
*/
@Private
public int getAutoCreatedQueuesV2MaxChildQueuesLimit(String queuePath) {
return getInt(getQueuePrefix(queuePath) +
AUTO_QUEUE_CREATION_V2_MAX_QUEUES,
DEFAULT_AUTO_QUEUE_CREATION_V2_MAX_QUEUES);
}
@VisibleForTesting
public void setAutoCreatedQueuesV2MaxChildQueuesLimit(String queuePath,
int maxQueues) {
setInt(getQueuePrefix(queuePath) +
AUTO_QUEUE_CREATION_V2_MAX_QUEUES, maxQueues);
}
@Private
public static final String AUTO_CREATED_QUEUE_MANAGEMENT_POLICY =
AUTO_CREATE_CHILD_QUEUE_PREFIX + "management-policy";
@Private
public static final String DEFAULT_AUTO_CREATED_QUEUE_MANAGEMENT_POLICY =
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity"
+ ".queuemanagement."
+ "GuaranteedOrZeroCapacityOverTimePolicy";
@Private
private static final String QUEUE_MANAGEMENT_CONFIG_PREFIX =
"yarn.resourcemanager.monitor.capacity.queue-management.";
/**
* Time in milliseconds between invocations of this policy
*/
@Private
public static final String QUEUE_MANAGEMENT_MONITORING_INTERVAL =
QUEUE_MANAGEMENT_CONFIG_PREFIX + "monitoring-interval";
@Private
public static final long DEFAULT_QUEUE_MANAGEMENT_MONITORING_INTERVAL =
1500L;
/**
* Time in milliseconds between invocations
* of QueueConfigurationAutoRefreshPolicy.
*/
@Private
public static final String QUEUE_AUTO_REFRESH_MONITORING_INTERVAL =
PREFIX + "queue.auto.refresh.monitoring-interval";
@Private
public static final long DEFAULT_QUEUE_AUTO_REFRESH_MONITORING_INTERVAL =
5000L;
/**
* Queue Management computation policy for Auto Created queues
* @param queue The queue's path
* @return Configured policy class name
*/
@Private
public String getAutoCreatedQueueManagementPolicy(String queue) {
String autoCreatedQueueManagementPolicy =
get(getQueuePrefix(queue) + AUTO_CREATED_QUEUE_MANAGEMENT_POLICY,
DEFAULT_AUTO_CREATED_QUEUE_MANAGEMENT_POLICY);
return autoCreatedQueueManagementPolicy;
}
/**
* Get The policy class configured to manage capacities for auto created leaf
* queues under the specified parent
*
* @param queueName The parent queue's name
* @return The policy class configured to manage capacities for auto created
* leaf queues under the specified parent queue
*/
@Private
protected AutoCreatedQueueManagementPolicy
getAutoCreatedQueueManagementPolicyClass(
String queueName) {
String queueManagementPolicyClassName =
getAutoCreatedQueueManagementPolicy(queueName);
LOG.info("Using Auto Created Queue Management Policy: "
+ queueManagementPolicyClassName + " for queue: " + queueName);
try {
Class<?> queueManagementPolicyClazz = getClassByName(
queueManagementPolicyClassName);
if (AutoCreatedQueueManagementPolicy.class.isAssignableFrom(
queueManagementPolicyClazz)) {
return (AutoCreatedQueueManagementPolicy) ReflectionUtils.newInstance(
queueManagementPolicyClazz, this);
} else{
throw new YarnRuntimeException(
"Class: " + queueManagementPolicyClassName + " not instance of "
+ AutoCreatedQueueManagementPolicy.class.getCanonicalName());
}
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException(
"Could not instantiate " + "AutoCreatedQueueManagementPolicy: "
+ queueManagementPolicyClassName + " for queue: " + queueName,
e);
}
}
@VisibleForTesting
@Private
public void setAutoCreatedLeafQueueConfigCapacity(String queuePath,
float val) {
String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
queuePath);
setCapacity(leafQueueConfPrefix, val);
}
@VisibleForTesting
@Private
public void setAutoCreatedLeafQueueTemplateCapacityByLabel(String queuePath,
String label, float val) {
String leafQueueConfPrefix =
getAutoCreatedQueueTemplateConfPrefix(queuePath);
setCapacityByLabel(leafQueueConfPrefix, label, val);
}
@VisibleForTesting
@Private
public void setAutoCreatedLeafQueueTemplateCapacityByLabel(String queuePath,
String label, Resource resource) {
String leafQueueConfPrefix =
getAutoCreatedQueueTemplateConfPrefix(queuePath);
StringBuilder resourceString = new StringBuilder();
resourceString
.append("[" + AbsoluteResourceType.MEMORY.toString().toLowerCase() + "="
+ resource.getMemorySize() + ","
+ AbsoluteResourceType.VCORES.toString().toLowerCase() + "="
+ resource.getVirtualCores() + "]");
setCapacityByLabel(leafQueueConfPrefix, label, resourceString.toString());
}
@Private
@VisibleForTesting
public void setAutoCreatedLeafQueueConfigMaxCapacity(String queuePath,
float val) {
String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
queuePath);
setMaximumCapacity(leafQueueConfPrefix, val);
}
@Private
@VisibleForTesting
public void setAutoCreatedLeafQueueTemplateMaxCapacity(String queuePath,
String label, float val) {
String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
queuePath);
setMaximumCapacityByLabel(leafQueueConfPrefix, label, val);
}
@Private
@VisibleForTesting
public void setAutoCreatedLeafQueueTemplateMaxCapacity(String queuePath,
String label, Resource resource) {
String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
queuePath);
StringBuilder resourceString = new StringBuilder();
resourceString
.append("[" + AbsoluteResourceType.MEMORY.toString().toLowerCase() + "="
+ resource.getMemorySize() + ","
+ AbsoluteResourceType.VCORES.toString().toLowerCase() + "="
+ resource.getVirtualCores() + "]");
setMaximumCapacityByLabel(leafQueueConfPrefix, label, resourceString.toString());
}
@VisibleForTesting
@Private
public void setAutoCreatedLeafQueueConfigUserLimit(String queuePath,
int val) {
String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
queuePath);
setUserLimit(leafQueueConfPrefix, val);
}
@VisibleForTesting
@Private
public void setAutoCreatedLeafQueueConfigUserLimitFactor(String queuePath,
float val) {
String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
queuePath);
setUserLimitFactor(leafQueueConfPrefix, val);
}
@Private
@VisibleForTesting
public void setAutoCreatedLeafQueueConfigDefaultNodeLabelExpression(String
queuePath,
String expression) {
String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
queuePath);
setDefaultNodeLabelExpression(leafQueueConfPrefix, expression);
}
public static String getUnits(String resourceValue) {
String units;
for (int i = 0; i < resourceValue.length(); i++) {
if (Character.isAlphabetic(resourceValue.charAt(i))) {
units = resourceValue.substring(i);
if (StringUtils.isAlpha(units)) {
return units;
}
}
}
return "";
}
/**
* Get absolute minimum resource requirement for a queue.
*
* @param label
* NodeLabel
* @param queue
* queue path
* @param resourceTypes
* Resource types
* @return ResourceInformation
*/
public Resource getMinimumResourceRequirement(String label, String queue,
Set<String> resourceTypes) {
return internalGetLabeledResourceRequirementForQueue(queue, label,
resourceTypes, CAPACITY);
}
/**
* Get absolute maximum resource requirement for a queue.
*
* @param label
* NodeLabel
* @param queue
* queue path
* @param resourceTypes
* Resource types
* @return Resource
*/
public Resource getMaximumResourceRequirement(String label, String queue,
Set<String> resourceTypes) {
return internalGetLabeledResourceRequirementForQueue(queue, label,
resourceTypes, MAXIMUM_CAPACITY);
}
@VisibleForTesting
public void setMinimumResourceRequirement(String label, String queue,
Resource resource) {
updateMinMaxResourceToConf(label, queue, resource, CAPACITY);
}
@VisibleForTesting
public void setMaximumResourceRequirement(String label, String queue,
Resource resource) {
updateMinMaxResourceToConf(label, queue, resource, MAXIMUM_CAPACITY);
}
private void updateMinMaxResourceToConf(String label, String queue,
Resource resource, String type) {
if (queue.equals("root")) {
throw new IllegalArgumentException(
"Cannot set resource, root queue will take 100% of cluster capacity");
}
StringBuilder resourceString = new StringBuilder();
resourceString
.append("[" + AbsoluteResourceType.MEMORY.toString().toLowerCase() + "="
+ resource.getMemorySize() + ","
+ AbsoluteResourceType.VCORES.toString().toLowerCase() + "="
+ resource.getVirtualCores() + "]");
String prefix = getQueuePrefix(queue) + type;
if (!label.isEmpty()) {
prefix = getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS + DOT + label
+ DOT + type;
}
set(prefix, resourceString.toString());
}
public boolean checkConfigTypeIsAbsoluteResource(String label, String queue,
Set<String> resourceTypes) {
String propertyName = getNodeLabelPrefix(queue, label) + CAPACITY;
String resourceString = get(propertyName);
if (resourceString == null || resourceString.isEmpty()) {
return false;
}
Matcher matcher = RESOURCE_PATTERN.matcher(resourceString);
if (matcher.find()) {
return true;
}
return false;
}
private Resource internalGetLabeledResourceRequirementForQueue(String queue,
String label, Set<String> resourceTypes, String suffix) {
String propertyName = getNodeLabelPrefix(queue, label) + suffix;
String resourceString = get(propertyName);
if (resourceString == null || resourceString.isEmpty()) {
return Resources.none();
}
// Define resource here.
Resource resource = Resource.newInstance(0L, 0);
Matcher matcher = RESOURCE_PATTERN.matcher(resourceString);
/*
* Absolute resource configuration for a queue will be grouped by "[]".
* Syntax of absolute resource config could be like below
* "memory=4Gi vcores=2". Ideally this means "4GB of memory and 2 vcores".
*/
if (matcher.find()) {
// Get the sub-group.
String subGroup = matcher.group(0);
if (subGroup.trim().isEmpty()) {
return Resources.none();
}
subGroup = subGroup.substring(1, subGroup.length() - 1);
for (String kvPair : subGroup.trim().split(",")) {
String[] splits = kvPair.split("=");
// Ensure that each sub string is key value pair separated by '='.
if (splits != null && splits.length > 1) {
updateResourceValuesFromConfig(resourceTypes, resource, splits);
}
}
}
// Memory has to be configured always.
if (resource.getMemorySize() == 0L) {
return Resources.none();
}
if (LOG.isDebugEnabled()) {
LOG.debug("CSConf - getAbsolueResourcePerQueue: prefix="
+ getNodeLabelPrefix(queue, label) + ", capacity=" + resource);
}
return resource;
}
private void updateResourceValuesFromConfig(Set<String> resourceTypes,
Resource resource, String[] splits) {
// If key is not a valid type, skip it.
if (!resourceTypes.contains(splits[0])) {
return;
}
String units = getUnits(splits[1]);
Long resourceValue = Long
.valueOf(splits[1].substring(0, splits[1].length() - units.length()));
// Convert all incoming units to MB if units is configured.
if (!units.isEmpty()) {
resourceValue = UnitsConversionUtil.convert(units, "Mi", resourceValue);
}
// map it based on key.
AbsoluteResourceType resType = AbsoluteResourceType
.valueOf(StringUtils.toUpperCase(splits[0].trim()));
switch (resType) {
case MEMORY :
resource.setMemorySize(resourceValue);
break;
case VCORES :
resource.setVirtualCores(resourceValue.intValue());
break;
default :
resource.setResourceInformation(splits[0].trim(), ResourceInformation
.newInstance(splits[0].trim(), units, resourceValue));
break;
}
}
@Private public static final String MULTI_NODE_SORTING_POLICIES =
PREFIX + "multi-node-sorting.policy.names";
@Private public static final String MULTI_NODE_SORTING_POLICY_NAME =
PREFIX + "multi-node-sorting.policy";
/**
* resource usage based node sorting algorithm.
*/
public static final String DEFAULT_NODE_SORTING_POLICY = "default";
public static final String DEFAULT_NODE_SORTING_POLICY_CLASSNAME
= "org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceUsageMultiNodeLookupPolicy";
public static final long DEFAULT_MULTI_NODE_SORTING_INTERVAL = 1000L;
@Private
public static final String MULTI_NODE_PLACEMENT_ENABLED = PREFIX
+ "multi-node-placement-enabled";
@Private
public static final boolean DEFAULT_MULTI_NODE_PLACEMENT_ENABLED = false;
public String getMultiNodesSortingAlgorithmPolicy(
String queue) {
String policyName = get(
getQueuePrefix(queue) + "multi-node-sorting.policy");
if (policyName == null) {
policyName = get(MULTI_NODE_SORTING_POLICY_NAME);
}
// If node sorting policy is not configured in queue and in cluster level,
// it is been assumed that this queue is not enabled with multi-node lookup.
if (policyName == null || policyName.isEmpty()) {
return null;
}
String policyClassName = get(MULTI_NODE_SORTING_POLICY_NAME + DOT
+ policyName.trim() + DOT + "class");
if (policyClassName == null || policyClassName.isEmpty()) {
throw new YarnRuntimeException(
policyName.trim() + " Class is not configured or not an instance of "
+ MultiNodeLookupPolicy.class.getCanonicalName());
}
return normalizePolicyName(policyClassName.trim());
}
public boolean getMultiNodePlacementEnabled() {
return getBoolean(MULTI_NODE_PLACEMENT_ENABLED,
DEFAULT_MULTI_NODE_PLACEMENT_ENABLED);
}
public Set<MultiNodePolicySpec> getMultiNodePlacementPolicies() {
String[] policies = getTrimmedStrings(MULTI_NODE_SORTING_POLICIES);
// In other cases, split the accessibleLabelStr by ","
Set<MultiNodePolicySpec> set = new HashSet<MultiNodePolicySpec>();
for (String str : policies) {
if (!str.trim().isEmpty()) {
String policyClassName = get(
MULTI_NODE_SORTING_POLICY_NAME + DOT + str.trim() + DOT + "class");
if (str.trim().equals(DEFAULT_NODE_SORTING_POLICY)) {
policyClassName = get(
MULTI_NODE_SORTING_POLICY_NAME + DOT + str.trim() + DOT + "class",
DEFAULT_NODE_SORTING_POLICY_CLASSNAME);
}
// This check is needed as default class name is loaded only for
// DEFAULT_NODE_SORTING_POLICY.
if (policyClassName == null) {
throw new YarnRuntimeException(
str.trim() + " Class is not configured or not an instance of "
+ MultiNodeLookupPolicy.class.getCanonicalName());
}
policyClassName = normalizePolicyName(policyClassName.trim());
long policySortingInterval = getLong(
MULTI_NODE_SORTING_POLICY_NAME + DOT + str.trim()
+ DOT + "sorting-interval.ms",
DEFAULT_MULTI_NODE_SORTING_INTERVAL);
if (policySortingInterval < 0) {
throw new YarnRuntimeException(
str.trim()
+ " multi-node policy is configured with invalid"
+ " sorting-interval:" + policySortingInterval);
}
set.add(
new MultiNodePolicySpec(policyClassName, policySortingInterval));
}
}
return Collections.unmodifiableSet(set);
}
private String normalizePolicyName(String policyName) {
// Ensure that custom node sorting algorithm class is valid.
try {
Class<?> nodeSortingPolicyClazz = getClassByName(policyName);
if (MultiNodeLookupPolicy.class
.isAssignableFrom(nodeSortingPolicyClazz)) {
return policyName;
} else {
throw new YarnRuntimeException(
"Class: " + policyName + " not instance of "
+ MultiNodeLookupPolicy.class.getCanonicalName());
}
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException(
"Could not instantiate " + "NodesSortingPolicy: " + policyName, e);
}
}
}