| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; |
| |
| import static org.apache.hadoop.yarn.util.resource.ResourceUtils.RESOURCE_REQUEST_VALUE_PATTERN; |
| |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| |
| import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.classification.InterfaceStability.Evolving; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceInformation; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; |
| import org.apache.hadoop.yarn.server.utils.BuilderUtils; |
| import org.apache.hadoop.yarn.util.UnitsConversionUtil; |
| import org.apache.hadoop.yarn.util.resource.ResourceUtils; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| |
| @Private |
| @Evolving |
| public class FairSchedulerConfiguration extends Configuration { |
| |
| public static final Logger LOG = LoggerFactory.getLogger( |
| FairSchedulerConfiguration.class.getName()); |
| |
| /** |
| * Resource Increment request grant-able by the FairScheduler. |
| * This property is looked up in the yarn-site.xml. |
| * @deprecated The preferred way to configure the increment is by using the |
| * yarn.resource-types.{RESOURCE_NAME}.increment-allocation property, |
| * for memory: yarn.resource-types.memory-mb.increment-allocation |
| */ |
| @Deprecated |
| public static final String RM_SCHEDULER_INCREMENT_ALLOCATION_MB = |
| YarnConfiguration.YARN_PREFIX + "scheduler.increment-allocation-mb"; |
| @Deprecated |
| public static final int DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB = 1024; |
| /** |
| * Resource Increment request grant-able by the FairScheduler. |
| * This property is looked up in the yarn-site.xml. |
| * @deprecated The preferred way to configure the increment is by using the |
| * yarn.resource-types.{RESOURCE_NAME}.increment-allocation property, |
| * for CPU: yarn.resource-types.vcores.increment-allocation |
| */ |
| @Deprecated |
| public static final String RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES = |
| YarnConfiguration.YARN_PREFIX + "scheduler.increment-allocation-vcores"; |
| @Deprecated |
| public static final int DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES = 1; |
| |
| /** Threshold for container size for making a container reservation as a |
| * multiple of increment allocation. Only container sizes above this are |
| * allowed to reserve a node */ |
| public static final String |
| RM_SCHEDULER_RESERVATION_THRESHOLD_INCREMENT_MULTIPLE = |
| YarnConfiguration.YARN_PREFIX + |
| "scheduler.reservation-threshold.increment-multiple"; |
| public static final float |
| DEFAULT_RM_SCHEDULER_RESERVATION_THRESHOLD_INCREMENT_MULTIPLE = 2f; |
| |
| private static final String CONF_PREFIX = "yarn.scheduler.fair."; |
| |
| /** |
| * Used during FS->CS conversion. When enabled, background threads are |
| * not started. This property should NOT be used by end-users! |
| */ |
| public static final String MIGRATION_MODE = CONF_PREFIX + "migration.mode"; |
| |
| /** |
| * Disables checking whether a placement rule is terminal or not. Only |
| * used during migration mode. This property should NOT be used by end users! |
| */ |
| public static final String NO_TERMINAL_RULE_CHECK = CONF_PREFIX + |
| "no-terminal-rule.check"; |
| |
| public static final String ALLOCATION_FILE = CONF_PREFIX + "allocation.file"; |
| protected static final String DEFAULT_ALLOCATION_FILE = "fair-scheduler.xml"; |
| |
| /** Whether pools can be created that were not specified in the FS configuration file |
| */ |
| public static final String ALLOW_UNDECLARED_POOLS = CONF_PREFIX + |
| "allow-undeclared-pools"; |
| public static final boolean DEFAULT_ALLOW_UNDECLARED_POOLS = true; |
| |
| /** Whether to use the user name as the queue name (instead of "default") if |
| * the request does not specify a queue. */ |
| public static final String USER_AS_DEFAULT_QUEUE = CONF_PREFIX + |
| "user-as-default-queue"; |
| public static final boolean DEFAULT_USER_AS_DEFAULT_QUEUE = true; |
| |
| protected static final float DEFAULT_LOCALITY_THRESHOLD = -1.0f; |
| |
| /** Cluster threshold for node locality. */ |
| public static final String LOCALITY_THRESHOLD_NODE = CONF_PREFIX + |
| "locality.threshold.node"; |
| public static final float DEFAULT_LOCALITY_THRESHOLD_NODE = |
| DEFAULT_LOCALITY_THRESHOLD; |
| |
| /** Cluster threshold for rack locality. */ |
| public static final String LOCALITY_THRESHOLD_RACK = CONF_PREFIX + |
| "locality.threshold.rack"; |
| public static final float DEFAULT_LOCALITY_THRESHOLD_RACK = |
| DEFAULT_LOCALITY_THRESHOLD; |
| |
| /** |
| * Delay for node locality. |
| * @deprecated Continuous scheduling is known to cause locking issue inside |
| * Only used when {@link #CONTINUOUS_SCHEDULING_ENABLED} is enabled |
| */ |
| @Deprecated |
| protected static final String LOCALITY_DELAY_NODE_MS = CONF_PREFIX + |
| "locality-delay-node-ms"; |
| @Deprecated |
| protected static final long DEFAULT_LOCALITY_DELAY_NODE_MS = -1L; |
| |
| /** |
| * Delay for rack locality. |
| * @deprecated Continuous scheduling is known to cause locking issue inside |
| * Only used when {@link #CONTINUOUS_SCHEDULING_ENABLED} is enabled |
| */ |
| @Deprecated |
| protected static final String LOCALITY_DELAY_RACK_MS = CONF_PREFIX + |
| "locality-delay-rack-ms"; |
| @Deprecated |
| protected static final long DEFAULT_LOCALITY_DELAY_RACK_MS = -1L; |
| |
| /** |
| * Enable continuous scheduling or not. |
| * @deprecated Continuous scheduling is known to cause locking issue inside |
| * the scheduler in larger cluster, more than 100 nodes, use |
| * {@link #ASSIGN_MULTIPLE} to improve container allocation ramp up. |
| */ |
| @Deprecated |
| public static final String CONTINUOUS_SCHEDULING_ENABLED = CONF_PREFIX + |
| "continuous-scheduling-enabled"; |
| @Deprecated |
| public static final boolean DEFAULT_CONTINUOUS_SCHEDULING_ENABLED = false; |
| |
| /** |
| * Sleep time of each pass in continuous scheduling (5ms in default). |
| * @deprecated Continuous scheduling is known to cause locking issue inside |
| * Only used when {@link #CONTINUOUS_SCHEDULING_ENABLED} is enabled |
| */ |
| @Deprecated |
| public static final String CONTINUOUS_SCHEDULING_SLEEP_MS = CONF_PREFIX + |
| "continuous-scheduling-sleep-ms"; |
| @Deprecated |
| public static final int DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS = 5; |
| |
| /** Whether preemption is enabled. */ |
| public static final String PREEMPTION = CONF_PREFIX + "preemption"; |
| public static final boolean DEFAULT_PREEMPTION = false; |
| |
| protected static final String AM_PREEMPTION = |
| CONF_PREFIX + "am.preemption"; |
| protected static final String AM_PREEMPTION_PREFIX = |
| CONF_PREFIX + "am.preemption."; |
| protected static final boolean DEFAULT_AM_PREEMPTION = true; |
| |
| protected static final String PREEMPTION_THRESHOLD = |
| CONF_PREFIX + "preemption.cluster-utilization-threshold"; |
| protected static final float DEFAULT_PREEMPTION_THRESHOLD = 0.8f; |
| |
| public static final String WAIT_TIME_BEFORE_KILL = CONF_PREFIX + |
| "waitTimeBeforeKill"; |
| public static final int DEFAULT_WAIT_TIME_BEFORE_KILL = 15000; |
| |
| /** |
| * Postfix for resource allocation increments in the |
| * yarn.resource-types.{RESOURCE_NAME}.increment-allocation property. |
| */ |
| static final String INCREMENT_ALLOCATION = ".increment-allocation"; |
| |
| /** |
| * Configurable delay (ms) before an app's starvation is considered after |
| * it is identified. This is to give the scheduler enough time to |
| * allocate containers post preemption. This delay is added to the |
| * {@link #WAIT_TIME_BEFORE_KILL} and enough heartbeats. |
| * |
| * This is intended to be a backdoor on production clusters, and hence |
| * intentionally not documented. |
| */ |
| public static final String WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS = |
| CONF_PREFIX + "waitTimeBeforeNextStarvationCheck"; |
| public static final long |
| DEFAULT_WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS = 10000; |
| |
| /** Whether to assign multiple containers in one check-in. */ |
| public static final String ASSIGN_MULTIPLE = CONF_PREFIX + "assignmultiple"; |
| public static final boolean DEFAULT_ASSIGN_MULTIPLE = false; |
| |
| /** Whether to give more weight to apps requiring many resources. */ |
| public static final String SIZE_BASED_WEIGHT = CONF_PREFIX + |
| "sizebasedweight"; |
| public static final boolean DEFAULT_SIZE_BASED_WEIGHT = false; |
| |
| /** Maximum number of containers to assign on each check-in. */ |
| public static final String DYNAMIC_MAX_ASSIGN = |
| CONF_PREFIX + "dynamic.max.assign"; |
| private static final boolean DEFAULT_DYNAMIC_MAX_ASSIGN = true; |
| |
| /** |
| * Specify exact number of containers to assign on each heartbeat, if dynamic |
| * max assign is turned off. |
| */ |
| public static final String MAX_ASSIGN = CONF_PREFIX + "max.assign"; |
| public static final int DEFAULT_MAX_ASSIGN = -1; |
| |
| /** The update interval for calculating resources in FairScheduler .*/ |
| public static final String UPDATE_INTERVAL_MS = |
| CONF_PREFIX + "update-interval-ms"; |
| public static final int DEFAULT_UPDATE_INTERVAL_MS = 500; |
| |
| /** Ratio of nodes available for an app to make an reservation on. */ |
| public static final String RESERVABLE_NODES = |
| CONF_PREFIX + "reservable-nodes"; |
| public static final float RESERVABLE_NODES_DEFAULT = 0.05f; |
| |
| private static final String INVALID_RESOURCE_DEFINITION_PREFIX = |
| "Error reading resource config--invalid resource definition: "; |
| private static final String RESOURCE_PERCENTAGE_PATTERN = |
| "^(-?(\\d+)(\\.\\d*)?)\\s*%\\s*"; |
| private static final String RESOURCE_VALUE_PATTERN = |
| "^(-?\\d+)(\\.\\d*)?\\s*"; |
| /** |
| * For resources separated by spaces instead of a comma. |
| */ |
| private static final String RESOURCES_WITH_SPACES_PATTERN = |
| "-?\\d+(?:\\.\\d*)?\\s*[a-z]+\\s*"; |
| |
| public FairSchedulerConfiguration() { |
| super(); |
| } |
| |
| public FairSchedulerConfiguration(Configuration conf) { |
| super(conf); |
| } |
| |
| public Resource getMinimumAllocation() { |
| int mem = getInt( |
| YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); |
| int cpu = getInt( |
| YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); |
| return Resources.createResource(mem, cpu); |
| } |
| |
| public Resource getMaximumAllocation() { |
| int mem = getInt( |
| YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); |
| int cpu = getInt( |
| YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); |
| return Resources.createResource(mem, cpu); |
| } |
| |
| public Resource getIncrementAllocation() { |
| Long memory = null; |
| Integer vCores = null; |
| Map<String, Long> others = new HashMap<>(); |
| ResourceInformation[] resourceTypes = ResourceUtils.getResourceTypesArray(); |
| for (int i=0; i < resourceTypes.length; ++i) { |
| String name = resourceTypes[i].getName(); |
| String propertyKey = getAllocationIncrementPropKey(name); |
| String propValue = get(propertyKey); |
| if (propValue != null) { |
| Matcher matcher = RESOURCE_REQUEST_VALUE_PATTERN.matcher(propValue); |
| if (matcher.matches()) { |
| long value = Long.parseLong(matcher.group(1)); |
| String unit = matcher.group(2); |
| long valueInDefaultUnits = getValueInDefaultUnits(value, unit, name); |
| others.put(name, valueInDefaultUnits); |
| } else { |
| throw new IllegalArgumentException("Property " + propertyKey + |
| " is not in \"value [unit]\" format: " + propValue); |
| } |
| } |
| } |
| if (others.containsKey(ResourceInformation.MEMORY_MB.getName())) { |
| memory = others.get(ResourceInformation.MEMORY_MB.getName()); |
| if (get(RM_SCHEDULER_INCREMENT_ALLOCATION_MB) != null) { |
| String overridingKey = getAllocationIncrementPropKey( |
| ResourceInformation.MEMORY_MB.getName()); |
| LOG.warn("Configuration " + overridingKey + "=" + get(overridingKey) + |
| " is overriding the " + RM_SCHEDULER_INCREMENT_ALLOCATION_MB + |
| "=" + get(RM_SCHEDULER_INCREMENT_ALLOCATION_MB) + " property"); |
| } |
| others.remove(ResourceInformation.MEMORY_MB.getName()); |
| } else { |
| memory = getLong( |
| RM_SCHEDULER_INCREMENT_ALLOCATION_MB, |
| DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB); |
| } |
| if (others.containsKey(ResourceInformation.VCORES.getName())) { |
| vCores = others.get(ResourceInformation.VCORES.getName()).intValue(); |
| if (get(RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES) != null) { |
| String overridingKey = getAllocationIncrementPropKey( |
| ResourceInformation.VCORES.getName()); |
| LOG.warn("Configuration " + overridingKey + "=" + get(overridingKey) + |
| " is overriding the " + RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES + |
| "=" + get(RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES) + " property"); |
| } |
| others.remove(ResourceInformation.VCORES.getName()); |
| } else { |
| vCores = getInt( |
| RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, |
| DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES); |
| } |
| return Resource.newInstance(memory, vCores, others); |
| } |
| |
| private long getValueInDefaultUnits(long value, String unit, |
| String resourceName) { |
| return unit.isEmpty() ? value : UnitsConversionUtil.convert(unit, |
| ResourceUtils.getDefaultUnit(resourceName), value); |
| } |
| |
| private String getAllocationIncrementPropKey(String resourceName) { |
| return YarnConfiguration.RESOURCE_TYPES + "." + resourceName + |
| INCREMENT_ALLOCATION; |
| } |
| |
| public float getReservationThresholdIncrementMultiple() { |
| return getFloat( |
| RM_SCHEDULER_RESERVATION_THRESHOLD_INCREMENT_MULTIPLE, |
| DEFAULT_RM_SCHEDULER_RESERVATION_THRESHOLD_INCREMENT_MULTIPLE); |
| } |
| |
| public float getLocalityThresholdNode() { |
| return getFloat(LOCALITY_THRESHOLD_NODE, DEFAULT_LOCALITY_THRESHOLD_NODE); |
| } |
| |
| public float getLocalityThresholdRack() { |
| return getFloat(LOCALITY_THRESHOLD_RACK, DEFAULT_LOCALITY_THRESHOLD_RACK); |
| } |
| |
| /** |
| * Whether continuous scheduling is turned on. |
| * @deprecated use {@link #ASSIGN_MULTIPLE} to improve container allocation |
| * ramp up. |
| * @return whether continuous scheduling is enabled |
| */ |
| @Deprecated |
| public boolean isContinuousSchedulingEnabled() { |
| return getBoolean(CONTINUOUS_SCHEDULING_ENABLED, |
| DEFAULT_CONTINUOUS_SCHEDULING_ENABLED); |
| } |
| |
| /** |
| * The sleep time of the continuous scheduler thread. |
| * @deprecated linked to {@link #CONTINUOUS_SCHEDULING_ENABLED} deprecation |
| * @return sleep time in ms |
| */ |
| @Deprecated |
| public int getContinuousSchedulingSleepMs() { |
| return getInt(CONTINUOUS_SCHEDULING_SLEEP_MS, |
| DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS); |
| } |
| |
| /** |
| * Delay in milliseconds for locality fallback node to rack. |
| * @deprecated linked to {@link #CONTINUOUS_SCHEDULING_ENABLED} deprecation |
| * @return delay in ms |
| */ |
| @Deprecated |
| public long getLocalityDelayNodeMs() { |
| return getLong(LOCALITY_DELAY_NODE_MS, DEFAULT_LOCALITY_DELAY_NODE_MS); |
| } |
| |
| /** |
| * Delay in milliseconds for locality fallback rack to other. |
| * @deprecated linked to {@link #CONTINUOUS_SCHEDULING_ENABLED} deprecation |
| * @return delay in ms |
| */ |
| @Deprecated |
| public long getLocalityDelayRackMs() { |
| return getLong(LOCALITY_DELAY_RACK_MS, DEFAULT_LOCALITY_DELAY_RACK_MS); |
| } |
| |
| public boolean getPreemptionEnabled() { |
| return getBoolean(PREEMPTION, DEFAULT_PREEMPTION); |
| } |
| |
| public boolean getAMPreemptionEnabled(String queueName) { |
| String propertyName = AM_PREEMPTION_PREFIX + queueName; |
| |
| if (get(propertyName) != null) { |
| boolean amPreemptionEnabled = |
| getBoolean(propertyName, DEFAULT_AM_PREEMPTION); |
| LOG.debug("AM preemption enabled for queue {}: {}", |
| queueName, amPreemptionEnabled); |
| return amPreemptionEnabled; |
| } |
| |
| return getBoolean(AM_PREEMPTION, DEFAULT_AM_PREEMPTION); |
| } |
| |
| public float getPreemptionUtilizationThreshold() { |
| return getFloat(PREEMPTION_THRESHOLD, DEFAULT_PREEMPTION_THRESHOLD); |
| } |
| |
| public boolean getAssignMultiple() { |
| return getBoolean(ASSIGN_MULTIPLE, DEFAULT_ASSIGN_MULTIPLE); |
| } |
| |
| public boolean isMaxAssignDynamic() { |
| return getBoolean(DYNAMIC_MAX_ASSIGN, DEFAULT_DYNAMIC_MAX_ASSIGN); |
| } |
| |
| public int getMaxAssign() { |
| return getInt(MAX_ASSIGN, DEFAULT_MAX_ASSIGN); |
| } |
| |
| public boolean getSizeBasedWeight() { |
| return getBoolean(SIZE_BASED_WEIGHT, DEFAULT_SIZE_BASED_WEIGHT); |
| } |
| |
| public long getWaitTimeBeforeNextStarvationCheck() { |
| return getLong(WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS, |
| DEFAULT_WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS); |
| } |
| |
| public int getWaitTimeBeforeKill() { |
| return getInt(WAIT_TIME_BEFORE_KILL, DEFAULT_WAIT_TIME_BEFORE_KILL); |
| } |
| |
| public boolean getUsePortForNodeName() { |
| return getBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME); |
| } |
| |
| public float getReservableNodes() { |
| return getFloat(RESERVABLE_NODES, RESERVABLE_NODES_DEFAULT); |
| } |
| |
| /** |
| * Parses a resource config value in one of three forms: |
| * <ol> |
| * <li>Percentage: "50%" or "40% memory, 60% cpu"</li> |
| * <li>New style resources: "vcores=10, memory-mb=1024" |
| * or "vcores=60%, memory-mb=40%"</li> |
| * <li>Old style resources: "1024 mb, 10 vcores"</li> |
| * </ol> |
| * In new style resources, any resource that is not specified will be |
| * set to {@link Long#MAX_VALUE} or 100%, as appropriate. Also, in the new |
| * style resources, units are not allowed. Units are assumed from the resource |
| * manager's settings for the resources when the value isn't a percentage. |
| * |
| * @param value the resource definition to parse |
| * @return a {@link ConfigurableResource} that represents the parsed value |
| * @throws AllocationConfigurationException if the raw value is not a valid |
| * resource definition |
| */ |
| public static ConfigurableResource parseResourceConfigValue(String value) |
| throws AllocationConfigurationException { |
| return parseResourceConfigValue(value, Long.MAX_VALUE); |
| } |
| |
| /** |
| * Parses a resource config value in one of three forms: |
| * <ol> |
| * <li>Percentage: "50%" or "40% memory, 60% cpu"</li> |
| * <li>New style resources: "vcores=10, memory-mb=1024" |
| * or "vcores=60%, memory-mb=40%"</li> |
| * <li>Old style resources: "1024 mb, 10 vcores"</li> |
| * </ol> |
| * In new style resources, any resource that is not specified will be |
| * set to {@code missing} or 0%, as appropriate. Also, in the new style |
| * resources, units are not allowed. Units are assumed from the resource |
| * manager's settings for the resources when the value isn't a percentage. |
| * |
| * The {@code missing} parameter is only used in the case of new style |
| * resources without percentages. With new style resources with percentages, |
| * any missing resources will be assumed to be 100% because percentages are |
| * only used with maximum resource limits. |
| * |
| * @param value the resource definition to parse |
| * @param missing the value to use for any unspecified resources |
| * @return a {@link ConfigurableResource} that represents the parsed value |
| * @throws AllocationConfigurationException if the raw value is not a valid |
| * resource definition |
| */ |
| public static ConfigurableResource parseResourceConfigValue(String value, |
| long missing) throws AllocationConfigurationException { |
| ConfigurableResource configurableResource; |
| |
| if (value.trim().isEmpty()) { |
| throw new AllocationConfigurationException("Error reading resource " |
| + "config--the resource string is empty."); |
| } |
| |
| try { |
| if (value.contains("=")) { |
| configurableResource = parseNewStyleResource(value, missing); |
| } else if (value.contains("%")) { |
| configurableResource = parseOldStyleResourceAsPercentage(value); |
| } else { |
| configurableResource = parseOldStyleResource(value); |
| } |
| } catch (RuntimeException ex) { |
| throw new AllocationConfigurationException( |
| "Error reading resource config", ex); |
| } |
| |
| return configurableResource; |
| } |
| |
| private static ConfigurableResource parseNewStyleResource(String value, |
| long missing) throws AllocationConfigurationException { |
| |
| final ConfigurableResource configurableResource; |
| boolean asPercent = value.contains("%"); |
| if (asPercent) { |
| configurableResource = new ConfigurableResource(); |
| } else { |
| configurableResource = new ConfigurableResource(missing); |
| } |
| |
| String[] resources = value.split(","); |
| for (String resource : resources) { |
| String[] parts = resource.split("="); |
| |
| if (parts.length != 2) { |
| throw createConfigException(value, |
| "Every resource must be of the form: name=value."); |
| } |
| |
| String resourceName = parts[0].trim(); |
| String resourceValue = parts[1].trim(); |
| try { |
| if (asPercent) { |
| double percentage = parseNewStyleResourceAsPercentage(value, |
| resourceName, resourceValue); |
| configurableResource.setPercentage(resourceName, percentage); |
| } else { |
| long parsedValue = parseNewStyleResourceAsAbsoluteValue(value, |
| resourceValue, resourceName); |
| configurableResource.setValue(resourceName, parsedValue); |
| } |
| } catch (ResourceNotFoundException ex) { |
| throw createConfigException(value, "The " |
| + "resource name, \"" + resourceName + "\" was not " |
| + "recognized. Please check the value of " |
| + YarnConfiguration.RESOURCE_TYPES + " in the Resource " |
| + "Manager's configuration files.", ex); |
| } |
| } |
| return configurableResource; |
| } |
| |
| private static double parseNewStyleResourceAsPercentage( |
| String value, String resource, String resourceValue) |
| throws AllocationConfigurationException { |
| try { |
| return findPercentage(resourceValue, resource); |
| } catch (AllocationConfigurationException ex) { |
| throw createConfigException(value, |
| "The resource values must all be percentages. \"" |
| + resourceValue + "\" is either not a non-negative number " + |
| "or does not include the '%' symbol.", ex); |
| } |
| } |
| |
| private static long parseNewStyleResourceAsAbsoluteValue(String value, |
| String resourceValue, String resourceName) |
| throws AllocationConfigurationException { |
| final long parsedValue; |
| try { |
| parsedValue = Long.parseLong(resourceValue); |
| } catch (NumberFormatException e) { |
| throw createConfigException(value, "The " |
| + "resource values must all be integers. \"" + resourceValue |
| + "\" is not an integer.", e); |
| } |
| if (parsedValue < 0) { |
| throw new AllocationConfigurationException( |
| "Invalid value of " + resourceName + |
| ": " + parsedValue + ", value should not be negative!"); |
| } |
| return parsedValue; |
| } |
| |
| private static ConfigurableResource parseOldStyleResourceAsPercentage( |
| String value) throws AllocationConfigurationException { |
| return new ConfigurableResource( |
| getResourcePercentage(StringUtils.toLowerCase(value))); |
| } |
| |
| private static ConfigurableResource parseOldStyleResource(String input) |
| throws AllocationConfigurationException { |
| final String lowerCaseInput = StringUtils.toLowerCase(input); |
| String[] resources = lowerCaseInput.split(","); |
| |
| if (resources.length != 2) { |
| resources = findOldStyleResourcesInSpaceSeparatedInput(lowerCaseInput); |
| if (resources.length != 2) { |
| throw new AllocationConfigurationException( |
| "Cannot parse resource values from input: " + input); |
| } |
| } |
| final int memory = parseOldStyleResourceMemory(resources); |
| final int vcores = parseOldStyleResourceVcores(resources); |
| return new ConfigurableResource( |
| BuilderUtils.newResource(memory, vcores)); |
| } |
| |
| private static String[] findOldStyleResourcesInSpaceSeparatedInput( |
| String input) { |
| final Pattern pattern = Pattern.compile(RESOURCES_WITH_SPACES_PATTERN); |
| final Matcher matcher = pattern.matcher(input); |
| |
| List<String> resources = Lists.newArrayList(); |
| while (matcher.find()) { |
| resources.add(matcher.group(0)); |
| } |
| return resources.toArray(new String[0]); |
| } |
| |
| private static int parseOldStyleResourceMemory(String[] resources) |
| throws AllocationConfigurationException { |
| final int memory = findResource(resources, "mb"); |
| |
| if (memory < 0) { |
| throw new AllocationConfigurationException( |
| "Invalid value of memory: " + memory + |
| ", value should not be negative!"); |
| } |
| return memory; |
| } |
| |
| private static int parseOldStyleResourceVcores(String[] resources) |
| throws AllocationConfigurationException { |
| final int vcores = findResource(resources, "vcores"); |
| |
| if (vcores < 0) { |
| throw new AllocationConfigurationException( |
| "Invalid value of vcores: " + vcores + |
| ", value should not be negative!"); |
| } |
| return vcores; |
| } |
| |
| private static double[] getResourcePercentage(String val) |
| throws AllocationConfigurationException { |
| int numberOfKnownResourceTypes = ResourceUtils |
| .getNumberOfCountableResourceTypes(); |
| double[] resourcePercentage = new double[numberOfKnownResourceTypes]; |
| String[] values = val.split(","); |
| |
| if (values.length == 1) { |
| double percentage = findPercentage(values, ""); |
| for (int i = 0; i < numberOfKnownResourceTypes; i++) { |
| resourcePercentage[i] = percentage; |
| } |
| } else { |
| resourcePercentage[0] = findPercentage(values, "memory"); |
| resourcePercentage[1] = findPercentage(values, "cpu"); |
| } |
| |
| return resourcePercentage; |
| } |
| |
| private static double findPercentage(String resourceValue, String resource) |
| throws AllocationConfigurationException { |
| return findPercentageInternal(resource, resourceValue, false); |
| } |
| |
| private static double findPercentage(String[] resourceValues, String resource) |
| throws AllocationConfigurationException { |
| String resourceValue = findResourceFromValues(resourceValues, resource); |
| return findPercentageInternal(resource, resourceValue, true); |
| } |
| |
| private static double findPercentageInternal(String resource, |
| String resourceValue, boolean includeResourceInPattern) |
| throws AllocationConfigurationException { |
| final Pattern pattern; |
| if (includeResourceInPattern) { |
| pattern = Pattern.compile(RESOURCE_PERCENTAGE_PATTERN + resource); |
| } else { |
| pattern = Pattern.compile(RESOURCE_PERCENTAGE_PATTERN); |
| } |
| |
| Matcher matcher = pattern.matcher(resourceValue); |
| if (!matcher.matches()) { |
| if (resource.equals("")) { |
| throw new AllocationConfigurationException("Invalid percentage: " + |
| resourceValue); |
| } else { |
| throw new AllocationConfigurationException("Invalid percentage of " + |
| resource + ": " + resourceValue); |
| } |
| } |
| double percentage = Double.parseDouble(matcher.group(1)) / 100.0; |
| |
| if (percentage < 0) { |
| throw new AllocationConfigurationException("Invalid percentage: " + |
| resourceValue + ", percentage should not be negative!"); |
| } |
| |
| return percentage; |
| } |
| |
| private static AllocationConfigurationException createConfigException( |
| String value, String message) { |
| return createConfigException(value, message, null); |
| } |
| |
| private static AllocationConfigurationException createConfigException( |
| String value, String message, Throwable t) { |
| String msg = INVALID_RESOURCE_DEFINITION_PREFIX + value + ". " + message; |
| if (t != null) { |
| return new AllocationConfigurationException(msg, t); |
| } else { |
| return new AllocationConfigurationException(msg); |
| } |
| } |
| |
| public long getUpdateInterval() { |
| return getLong(UPDATE_INTERVAL_MS, DEFAULT_UPDATE_INTERVAL_MS); |
| } |
| |
| private static int findResource(String[] resourceValues, String resource) |
| throws AllocationConfigurationException { |
| String resourceValue = findResourceFromValues(resourceValues, resource); |
| final Pattern pattern = Pattern.compile(RESOURCE_VALUE_PATTERN + |
| resource); |
| Matcher matcher = pattern.matcher(resourceValue); |
| if (!matcher.find()) { |
| throw new AllocationConfigurationException("Invalid value of " + |
| (resource.equals("mb") ? "memory" : resource) + ": " + resourceValue); |
| } |
| return Integer.parseInt(matcher.group(1)); |
| } |
| |
| private static String findResourceFromValues(String[] resourceValues, |
| String resource) throws AllocationConfigurationException { |
| for (String resourceValue : resourceValues) { |
| if (resourceValue.contains(resource)) { |
| return resourceValue.trim(); |
| } |
| } |
| throw new AllocationConfigurationException("Missing resource: " + resource); |
| } |
| } |