| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.util.resource; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceStability; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceInformation; |
| import org.apache.hadoop.yarn.api.records.ResourceTypeInfo; |
| import org.apache.hadoop.yarn.conf.ConfigurationProvider; |
| import org.apache.hadoop.yarn.conf.ConfigurationProviderFactory; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; |
| |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| |
| /** |
| * Helper class to read the resource-types to be supported by the system. |
| */ |
| public class ResourceUtils { |
| |
| public static final String UNITS = ".units"; |
| public static final String TYPE = ".type"; |
| public static final String MINIMUM_ALLOCATION = ".minimum-allocation"; |
| public static final String MAXIMUM_ALLOCATION = ".maximum-allocation"; |
| |
| private static final String MEMORY = ResourceInformation.MEMORY_MB.getName(); |
| private static final String VCORES = ResourceInformation.VCORES.getName(); |
| |
| private static volatile boolean initializedResources = false; |
| private static final Map<String, Integer> RESOURCE_NAME_TO_INDEX = |
| new ConcurrentHashMap<String, Integer>(); |
| private static volatile Map<String, ResourceInformation> resourceTypes; |
| private static volatile ResourceInformation[] resourceTypesArray; |
| private static volatile boolean initializedNodeResources = false; |
| private static volatile Map<String, ResourceInformation> readOnlyNodeResources; |
| private static volatile int numKnownResourceTypes = -1; |
| |
| static final Log LOG = LogFactory.getLog(ResourceUtils.class); |
| |
| private ResourceUtils() { |
| } |
| |
| private static void checkMandatoryResources( |
| Map<String, ResourceInformation> resourceInformationMap) |
| throws YarnRuntimeException { |
| /* |
| * Supporting 'memory' also as invalid resource name, in addition to |
| * 'MEMORY' for historical reasons |
| */ |
| String key = "memory"; |
| if (resourceInformationMap.containsKey(key)) { |
| LOG.warn("Attempt to define resource '" + key + |
| "', but it is not allowed."); |
| throw new YarnRuntimeException("Attempt to re-define mandatory resource '" |
| + key + "'."); |
| } |
| |
| if (resourceInformationMap.containsKey(MEMORY)) { |
| ResourceInformation memInfo = resourceInformationMap.get(MEMORY); |
| String memUnits = ResourceInformation.MEMORY_MB.getUnits(); |
| ResourceTypes memType = ResourceInformation.MEMORY_MB.getResourceType(); |
| if (!memInfo.getUnits().equals(memUnits) || !memInfo.getResourceType() |
| .equals(memType)) { |
| throw new YarnRuntimeException( |
| "Attempt to re-define mandatory resource 'memory-mb'. It can only" |
| + " be of type 'COUNTABLE' and have units 'Mi'."); |
| } |
| } |
| |
| if (resourceInformationMap.containsKey(VCORES)) { |
| ResourceInformation vcoreInfo = resourceInformationMap.get(VCORES); |
| String vcoreUnits = ResourceInformation.VCORES.getUnits(); |
| ResourceTypes vcoreType = ResourceInformation.VCORES.getResourceType(); |
| if (!vcoreInfo.getUnits().equals(vcoreUnits) || !vcoreInfo |
| .getResourceType().equals(vcoreType)) { |
| throw new YarnRuntimeException( |
| "Attempt to re-define mandatory resource 'vcores'. It can only be" |
| + " of type 'COUNTABLE' and have units ''(no units)."); |
| } |
| } |
| } |
| |
| private static void addMandatoryResources( |
| Map<String, ResourceInformation> res) { |
| ResourceInformation ri; |
| if (!res.containsKey(MEMORY)) { |
| LOG.info("Adding resource type - name = " + MEMORY + ", units = " |
| + ResourceInformation.MEMORY_MB.getUnits() + ", type = " |
| + ResourceTypes.COUNTABLE); |
| ri = ResourceInformation |
| .newInstance(MEMORY, |
| ResourceInformation.MEMORY_MB.getUnits()); |
| res.put(MEMORY, ri); |
| } |
| if (!res.containsKey(VCORES)) { |
| LOG.info("Adding resource type - name = " + VCORES + ", units = , type = " |
| + ResourceTypes.COUNTABLE); |
| ri = |
| ResourceInformation.newInstance(VCORES); |
| res.put(VCORES, ri); |
| } |
| } |
| |
| private static void setMinimumAllocationForMandatoryResources( |
| Map<String, ResourceInformation> res, Configuration conf) { |
| String[][] resourceTypesKeys = { |
| {ResourceInformation.MEMORY_MB.getName(), |
| YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, |
| String.valueOf( |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB), |
| ResourceInformation.MEMORY_MB.getName()}, |
| {ResourceInformation.VCORES.getName(), |
| YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, |
| String.valueOf( |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES), |
| ResourceInformation.VCORES.getName()}}; |
| for (String[] arr : resourceTypesKeys) { |
| String resourceTypesKey = |
| YarnConfiguration.RESOURCE_TYPES + "." + arr[0] + MINIMUM_ALLOCATION; |
| long minimumResourceTypes = conf.getLong(resourceTypesKey, -1); |
| long minimumConf = conf.getLong(arr[1], -1); |
| long minimum; |
| if (minimumResourceTypes != -1) { |
| minimum = minimumResourceTypes; |
| if (minimumConf != -1) { |
| LOG.warn("Using minimum allocation for memory specified in " |
| + "resource-types config file with key " |
| + minimumResourceTypes + ", ignoring minimum specified using " |
| + arr[1]); |
| } |
| } else { |
| minimum = conf.getLong(arr[1], Long.parseLong(arr[2])); |
| } |
| ResourceInformation ri = res.get(arr[3]); |
| ri.setMinimumAllocation(minimum); |
| } |
| } |
| |
| private static void setMaximumAllocationForMandatoryResources( |
| Map<String, ResourceInformation> res, Configuration conf) { |
| String[][] resourceTypesKeys = { |
| {ResourceInformation.MEMORY_MB.getName(), |
| YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, |
| String.valueOf( |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB), |
| ResourceInformation.MEMORY_MB.getName()}, |
| {ResourceInformation.VCORES.getName(), |
| YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, |
| String.valueOf( |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES), |
| ResourceInformation.VCORES.getName()}}; |
| for (String[] arr : resourceTypesKeys) { |
| String resourceTypesKey = |
| YarnConfiguration.RESOURCE_TYPES + "." + arr[0] + MAXIMUM_ALLOCATION; |
| long maximumResourceTypes = conf.getLong(resourceTypesKey, -1); |
| long maximumConf = conf.getLong(arr[1], -1); |
| long maximum; |
| if (maximumResourceTypes != -1) { |
| maximum = maximumResourceTypes; |
| if (maximumConf != -1) { |
| LOG.warn("Using maximum allocation for memory specified in " |
| + "resource-types config file with key " |
| + maximumResourceTypes + ", ignoring maximum specified using " |
| + arr[1]); |
| } |
| } else { |
| maximum = conf.getLong(arr[1], Long.parseLong(arr[2])); |
| } |
| ResourceInformation ri = res.get(arr[3]); |
| ri.setMaximumAllocation(maximum); |
| } |
| } |
| |
| @VisibleForTesting |
| static void initializeResourcesMap(Configuration conf) { |
| |
| Map<String, ResourceInformation> resourceInformationMap = new HashMap<>(); |
| String[] resourceNames = conf.getStrings(YarnConfiguration.RESOURCE_TYPES); |
| |
| if (resourceNames != null && resourceNames.length != 0) { |
| for (String resourceName : resourceNames) { |
| String resourceUnits = conf.get( |
| YarnConfiguration.RESOURCE_TYPES + "." + resourceName + UNITS, ""); |
| String resourceTypeName = conf.get( |
| YarnConfiguration.RESOURCE_TYPES + "." + resourceName + TYPE, |
| ResourceTypes.COUNTABLE.toString()); |
| Long minimumAllocation = conf.getLong( |
| YarnConfiguration.RESOURCE_TYPES + "." + resourceName |
| + MINIMUM_ALLOCATION, 0L); |
| Long maximumAllocation = conf.getLong( |
| YarnConfiguration.RESOURCE_TYPES + "." + resourceName |
| + MAXIMUM_ALLOCATION, Long.MAX_VALUE); |
| if (resourceName == null || resourceName.isEmpty() |
| || resourceUnits == null || resourceTypeName == null) { |
| throw new YarnRuntimeException( |
| "Incomplete configuration for resource type '" + resourceName |
| + "'. One of name, units or type is configured incorrectly."); |
| } |
| ResourceTypes resourceType = ResourceTypes.valueOf(resourceTypeName); |
| LOG.info("Adding resource type - name = " + resourceName + ", units = " |
| + resourceUnits + ", type = " + resourceTypeName); |
| if (resourceInformationMap.containsKey(resourceName)) { |
| throw new YarnRuntimeException( |
| "Error in config, key '" + resourceName + "' specified twice"); |
| } |
| resourceInformationMap.put(resourceName, ResourceInformation |
| .newInstance(resourceName, resourceUnits, 0L, resourceType, |
| minimumAllocation, maximumAllocation)); |
| } |
| } |
| |
| checkMandatoryResources(resourceInformationMap); |
| addMandatoryResources(resourceInformationMap); |
| |
| setMinimumAllocationForMandatoryResources(resourceInformationMap, conf); |
| setMaximumAllocationForMandatoryResources(resourceInformationMap, conf); |
| |
| initializeResourcesFromResourceInformationMap(resourceInformationMap); |
| } |
| |
| /** |
| * This method is visible for testing, unit test can construct a |
| * resourceInformationMap and pass it to this method to initialize multiple resources. |
| * @param resourceInformationMap constructed resource information map. |
| */ |
| @VisibleForTesting |
| public static void initializeResourcesFromResourceInformationMap( |
| Map<String, ResourceInformation> resourceInformationMap) { |
| resourceTypes = Collections.unmodifiableMap(resourceInformationMap); |
| updateKnownResources(); |
| updateResourceTypeIndex(); |
| initializedResources = true; |
| } |
| |
| private static void updateKnownResources() { |
| // Update resource names. |
| resourceTypesArray = new ResourceInformation[resourceTypes.size()]; |
| |
| int index = 2; |
| for (ResourceInformation resInfo : resourceTypes.values()) { |
| if (resInfo.getName().equals(MEMORY)) { |
| resourceTypesArray[0] = ResourceInformation |
| .newInstance(resourceTypes.get(MEMORY)); |
| } else if (resInfo.getName().equals(VCORES)) { |
| resourceTypesArray[1] = ResourceInformation |
| .newInstance(resourceTypes.get(VCORES)); |
| } else { |
| resourceTypesArray[index] = ResourceInformation.newInstance(resInfo); |
| index++; |
| } |
| } |
| } |
| |
| private static void updateResourceTypeIndex() { |
| RESOURCE_NAME_TO_INDEX.clear(); |
| |
| for (int index = 0; index < resourceTypesArray.length; index++) { |
| ResourceInformation resInfo = resourceTypesArray[index]; |
| RESOURCE_NAME_TO_INDEX.put(resInfo.getName(), index); |
| } |
| } |
| |
| /** |
| * Get associate index of resource types such memory, cpu etc. |
| * This could help to access each resource types in a resource faster. |
| * @return Index map for all Resource Types. |
| */ |
| public static Map<String, Integer> getResourceTypeIndex() { |
| return RESOURCE_NAME_TO_INDEX; |
| } |
| |
| /** |
| * Get the resource types to be supported by the system. |
| * @return A map of the resource name to a ResouceInformation object |
| * which contains details such as the unit. |
| */ |
| public static Map<String, ResourceInformation> getResourceTypes() { |
| return getResourceTypes(null, |
| YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE); |
| } |
| |
| public static ResourceInformation[] getResourceTypesArray() { |
| initializeResourceTypesIfNeeded(null, |
| YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE); |
| return resourceTypesArray; |
| } |
| |
| public static int getNumberOfKnownResourceTypes() { |
| if (numKnownResourceTypes < 0) { |
| initializeResourceTypesIfNeeded(null, |
| YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE); |
| } |
| return numKnownResourceTypes; |
| } |
| |
| private static Map<String, ResourceInformation> getResourceTypes( |
| Configuration conf) { |
| return getResourceTypes(conf, |
| YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE); |
| } |
| |
| private static void initializeResourceTypesIfNeeded(Configuration conf, |
| String resourceFile) { |
| if (!initializedResources) { |
| synchronized (ResourceUtils.class) { |
| if (!initializedResources) { |
| if (conf == null) { |
| conf = new YarnConfiguration(); |
| } |
| try { |
| addResourcesFileToConf(resourceFile, conf); |
| LOG.debug("Found " + resourceFile + ", adding to configuration"); |
| } catch (FileNotFoundException fe) { |
| LOG.info("Unable to find '" + resourceFile |
| + "'. Falling back to memory and vcores as resources."); |
| } |
| initializeResourcesMap(conf); |
| |
| } |
| } |
| } |
| numKnownResourceTypes = resourceTypes.size(); |
| } |
| |
| private static Map<String, ResourceInformation> getResourceTypes( |
| Configuration conf, String resourceFile) { |
| initializeResourceTypesIfNeeded(conf, resourceFile); |
| return resourceTypes; |
| } |
| |
| private static InputStream getConfInputStream(String resourceFile, |
| Configuration conf) throws IOException, YarnException { |
| |
| ConfigurationProvider provider = |
| ConfigurationProviderFactory.getConfigurationProvider(conf); |
| try { |
| provider.init(conf); |
| } catch (Exception e) { |
| throw new IOException(e); |
| } |
| |
| InputStream ris = provider.getConfigurationInputStream(conf, resourceFile); |
| if (ris == null) { |
| if (conf.getResource(resourceFile) == null) { |
| throw new FileNotFoundException("Unable to find " + resourceFile); |
| } |
| throw new IOException( |
| "Unable to open resource types file '" + resourceFile |
| + "'. Using provider " + provider); |
| } |
| return ris; |
| } |
| |
| private static void addResourcesFileToConf(String resourceFile, |
| Configuration conf) throws FileNotFoundException { |
| try { |
| InputStream ris = getConfInputStream(resourceFile, conf); |
| LOG.debug("Found " + resourceFile + ", adding to configuration"); |
| conf.addResource(ris); |
| } catch (FileNotFoundException fe) { |
| throw fe; |
| } catch (IOException ie) { |
| LOG.fatal("Exception trying to read resource types configuration '" |
| + resourceFile + "'.", ie); |
| throw new YarnRuntimeException(ie); |
| } catch (YarnException ye) { |
| LOG.fatal("YARN Exception trying to read resource types configuration '" |
| + resourceFile + "'.", ye); |
| throw new YarnRuntimeException(ye); |
| } |
| } |
| |
| @VisibleForTesting |
| synchronized static void resetResourceTypes() { |
| initializedResources = false; |
| } |
| |
| @VisibleForTesting |
| public static Map<String, ResourceInformation> |
| resetResourceTypes(Configuration conf) { |
| synchronized (ResourceUtils.class) { |
| initializedResources = false; |
| } |
| return getResourceTypes(conf); |
| } |
| |
| public static String getUnits(String resourceValue) { |
| String units; |
| for (int i = 0; i < resourceValue.length(); i++) { |
| if (Character.isAlphabetic(resourceValue.charAt(i))) { |
| units = resourceValue.substring(i); |
| if (StringUtils.isAlpha(units)) { |
| return units; |
| } |
| } |
| } |
| return ""; |
| } |
| |
| /** |
| * Function to get the resources for a node. This function will look at the |
| * file {@link YarnConfiguration#NODE_RESOURCES_CONFIGURATION_FILE} to |
| * determine the node resources. |
| * |
| * @param conf configuration file |
| * @return a map to resource name to the ResourceInformation object. The map |
| * is guaranteed to have entries for memory and vcores |
| */ |
| public static Map<String, ResourceInformation> getNodeResourceInformation( |
| Configuration conf) { |
| if (!initializedNodeResources) { |
| synchronized (ResourceUtils.class) { |
| if (!initializedNodeResources) { |
| Map<String, ResourceInformation> nodeResources = initializeNodeResourceInformation( |
| conf); |
| addMandatoryResources(nodeResources); |
| checkMandatoryResources(nodeResources); |
| setMinimumAllocationForMandatoryResources(nodeResources, conf); |
| setMaximumAllocationForMandatoryResources(nodeResources, conf); |
| readOnlyNodeResources = Collections.unmodifiableMap(nodeResources); |
| initializedNodeResources = true; |
| } |
| } |
| } |
| return readOnlyNodeResources; |
| } |
| |
| private static Map<String, ResourceInformation> initializeNodeResourceInformation( |
| Configuration conf) { |
| Map<String, ResourceInformation> nodeResources = new HashMap<>(); |
| try { |
| addResourcesFileToConf( |
| YarnConfiguration.NODE_RESOURCES_CONFIGURATION_FILE, conf); |
| for (Map.Entry<String, String> entry : conf) { |
| String key = entry.getKey(); |
| String value = entry.getValue(); |
| if (key.startsWith(YarnConfiguration.NM_RESOURCES_PREFIX)) { |
| addResourceInformation(key, value, nodeResources); |
| } |
| } |
| } catch (FileNotFoundException fe) { |
| LOG.info("Couldn't find node resources file"); |
| } |
| return nodeResources; |
| } |
| |
| private static void addResourceInformation(String prop, String value, |
| Map<String, ResourceInformation> nodeResources) { |
| String[] parts = prop.split("\\."); |
| LOG.info("Found resource entry " + prop); |
| if (parts.length == 4) { |
| String resourceType = parts[3]; |
| if (!nodeResources.containsKey(resourceType)) { |
| nodeResources |
| .put(resourceType, ResourceInformation.newInstance(resourceType)); |
| } |
| String units = getUnits(value); |
| Long resourceValue = |
| Long.valueOf(value.substring(0, value.length() - units.length())); |
| nodeResources.get(resourceType).setValue(resourceValue); |
| nodeResources.get(resourceType).setUnits(units); |
| LOG.debug("Setting value for resource type " + resourceType + " to " |
| + resourceValue + " with units " + units); |
| } |
| } |
| |
| @VisibleForTesting |
| synchronized public static void resetNodeResources() { |
| initializedNodeResources = false; |
| } |
| |
| public static Resource getResourceTypesMinimumAllocation() { |
| Resource ret = Resource.newInstance(0, 0); |
| for (ResourceInformation entry : resourceTypesArray) { |
| String name = entry.getName(); |
| if (name.equals(ResourceInformation.MEMORY_MB.getName())) { |
| ret.setMemorySize(entry.getMinimumAllocation()); |
| } else if (name.equals(ResourceInformation.VCORES.getName())) { |
| Long tmp = entry.getMinimumAllocation(); |
| if (tmp > Integer.MAX_VALUE) { |
| tmp = (long) Integer.MAX_VALUE; |
| } |
| ret.setVirtualCores(tmp.intValue()); |
| } else { |
| ret.setResourceValue(name, entry.getMinimumAllocation()); |
| } |
| } |
| return ret; |
| } |
| |
| /** |
| * Get a Resource object with for the maximum allocation possible. |
| * @return a Resource object with the maximum allocation for the scheduler |
| */ |
| public static Resource getResourceTypesMaximumAllocation() { |
| Resource ret = Resource.newInstance(0, 0); |
| for (ResourceInformation entry : resourceTypesArray) { |
| String name = entry.getName(); |
| if (name.equals(ResourceInformation.MEMORY_MB.getName())) { |
| ret.setMemorySize(entry.getMaximumAllocation()); |
| } else if (name.equals(ResourceInformation.VCORES.getName())) { |
| Long tmp = entry.getMaximumAllocation(); |
| if (tmp > Integer.MAX_VALUE) { |
| tmp = (long) Integer.MAX_VALUE; |
| } |
| ret.setVirtualCores(tmp.intValue()); |
| continue; |
| } else { |
| ret.setResourceValue(name, entry.getMaximumAllocation()); |
| } |
| } |
| return ret; |
| } |
| |
| /** |
| * Get default unit by given resource type. |
| * @param resourceType resourceType |
| * @return default unit |
| */ |
| public static String getDefaultUnit(String resourceType) { |
| ResourceInformation ri = getResourceTypes().get(resourceType); |
| if (ri != null) { |
| return ri.getUnits(); |
| } |
| return ""; |
| } |
| |
| /** |
| * Get all resource types information from known resource types. |
| * @return List of ResourceTypeInfo |
| */ |
| public static List<ResourceTypeInfo> getResourcesTypeInfo() { |
| List<ResourceTypeInfo> array = new ArrayList<>(); |
| // Add all resource types |
| Collection<ResourceInformation> resourcesInfo = |
| ResourceUtils.getResourceTypes().values(); |
| for (ResourceInformation resourceInfo : resourcesInfo) { |
| array.add(ResourceTypeInfo |
| .newInstance(resourceInfo.getName(), resourceInfo.getUnits(), |
| resourceInfo.getResourceType())); |
| } |
| return array; |
| } |
| } |