blob: 1da5d6afb08b991ee2add19647a3f3c5b6eb1651 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.util.resource;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
import org.apache.hadoop.yarn.conf.ConfigurationProviderFactory;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Helper class to read the resource-types to be supported by the system.
*/
public class ResourceUtils {
public static final String UNITS = ".units";
public static final String TYPE = ".type";
public static final String MINIMUM_ALLOCATION = ".minimum-allocation";
public static final String MAXIMUM_ALLOCATION = ".maximum-allocation";
private static final String MEMORY = ResourceInformation.MEMORY_MB.getName();
private static final String VCORES = ResourceInformation.VCORES.getName();
private static volatile boolean initializedResources = false;
private static final Map<String, Integer> RESOURCE_NAME_TO_INDEX =
new ConcurrentHashMap<String, Integer>();
private static volatile Map<String, ResourceInformation> resourceTypes;
private static volatile ResourceInformation[] resourceTypesArray;
private static volatile boolean initializedNodeResources = false;
private static volatile Map<String, ResourceInformation> readOnlyNodeResources;
private static volatile int numKnownResourceTypes = -1;
static final Log LOG = LogFactory.getLog(ResourceUtils.class);
private ResourceUtils() {
}
private static void checkMandatoryResources(
Map<String, ResourceInformation> resourceInformationMap)
throws YarnRuntimeException {
/*
* Supporting 'memory' also as invalid resource name, in addition to
* 'MEMORY' for historical reasons
*/
String key = "memory";
if (resourceInformationMap.containsKey(key)) {
LOG.warn("Attempt to define resource '" + key +
"', but it is not allowed.");
throw new YarnRuntimeException("Attempt to re-define mandatory resource '"
+ key + "'.");
}
if (resourceInformationMap.containsKey(MEMORY)) {
ResourceInformation memInfo = resourceInformationMap.get(MEMORY);
String memUnits = ResourceInformation.MEMORY_MB.getUnits();
ResourceTypes memType = ResourceInformation.MEMORY_MB.getResourceType();
if (!memInfo.getUnits().equals(memUnits) || !memInfo.getResourceType()
.equals(memType)) {
throw new YarnRuntimeException(
"Attempt to re-define mandatory resource 'memory-mb'. It can only"
+ " be of type 'COUNTABLE' and have units 'Mi'.");
}
}
if (resourceInformationMap.containsKey(VCORES)) {
ResourceInformation vcoreInfo = resourceInformationMap.get(VCORES);
String vcoreUnits = ResourceInformation.VCORES.getUnits();
ResourceTypes vcoreType = ResourceInformation.VCORES.getResourceType();
if (!vcoreInfo.getUnits().equals(vcoreUnits) || !vcoreInfo
.getResourceType().equals(vcoreType)) {
throw new YarnRuntimeException(
"Attempt to re-define mandatory resource 'vcores'. It can only be"
+ " of type 'COUNTABLE' and have units ''(no units).");
}
}
}
private static void addMandatoryResources(
Map<String, ResourceInformation> res) {
ResourceInformation ri;
if (!res.containsKey(MEMORY)) {
LOG.info("Adding resource type - name = " + MEMORY + ", units = "
+ ResourceInformation.MEMORY_MB.getUnits() + ", type = "
+ ResourceTypes.COUNTABLE);
ri = ResourceInformation
.newInstance(MEMORY,
ResourceInformation.MEMORY_MB.getUnits());
res.put(MEMORY, ri);
}
if (!res.containsKey(VCORES)) {
LOG.info("Adding resource type - name = " + VCORES + ", units = , type = "
+ ResourceTypes.COUNTABLE);
ri =
ResourceInformation.newInstance(VCORES);
res.put(VCORES, ri);
}
}
private static void setMinimumAllocationForMandatoryResources(
Map<String, ResourceInformation> res, Configuration conf) {
String[][] resourceTypesKeys = {
{ResourceInformation.MEMORY_MB.getName(),
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
String.valueOf(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB),
ResourceInformation.MEMORY_MB.getName()},
{ResourceInformation.VCORES.getName(),
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
String.valueOf(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES),
ResourceInformation.VCORES.getName()}};
for (String[] arr : resourceTypesKeys) {
String resourceTypesKey =
YarnConfiguration.RESOURCE_TYPES + "." + arr[0] + MINIMUM_ALLOCATION;
long minimumResourceTypes = conf.getLong(resourceTypesKey, -1);
long minimumConf = conf.getLong(arr[1], -1);
long minimum;
if (minimumResourceTypes != -1) {
minimum = minimumResourceTypes;
if (minimumConf != -1) {
LOG.warn("Using minimum allocation for memory specified in "
+ "resource-types config file with key "
+ minimumResourceTypes + ", ignoring minimum specified using "
+ arr[1]);
}
} else {
minimum = conf.getLong(arr[1], Long.parseLong(arr[2]));
}
ResourceInformation ri = res.get(arr[3]);
ri.setMinimumAllocation(minimum);
}
}
private static void setMaximumAllocationForMandatoryResources(
Map<String, ResourceInformation> res, Configuration conf) {
String[][] resourceTypesKeys = {
{ResourceInformation.MEMORY_MB.getName(),
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
String.valueOf(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB),
ResourceInformation.MEMORY_MB.getName()},
{ResourceInformation.VCORES.getName(),
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
String.valueOf(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES),
ResourceInformation.VCORES.getName()}};
for (String[] arr : resourceTypesKeys) {
String resourceTypesKey =
YarnConfiguration.RESOURCE_TYPES + "." + arr[0] + MAXIMUM_ALLOCATION;
long maximumResourceTypes = conf.getLong(resourceTypesKey, -1);
long maximumConf = conf.getLong(arr[1], -1);
long maximum;
if (maximumResourceTypes != -1) {
maximum = maximumResourceTypes;
if (maximumConf != -1) {
LOG.warn("Using maximum allocation for memory specified in "
+ "resource-types config file with key "
+ maximumResourceTypes + ", ignoring maximum specified using "
+ arr[1]);
}
} else {
maximum = conf.getLong(arr[1], Long.parseLong(arr[2]));
}
ResourceInformation ri = res.get(arr[3]);
ri.setMaximumAllocation(maximum);
}
}
@VisibleForTesting
static void initializeResourcesMap(Configuration conf) {
Map<String, ResourceInformation> resourceInformationMap = new HashMap<>();
String[] resourceNames = conf.getStrings(YarnConfiguration.RESOURCE_TYPES);
if (resourceNames != null && resourceNames.length != 0) {
for (String resourceName : resourceNames) {
String resourceUnits = conf.get(
YarnConfiguration.RESOURCE_TYPES + "." + resourceName + UNITS, "");
String resourceTypeName = conf.get(
YarnConfiguration.RESOURCE_TYPES + "." + resourceName + TYPE,
ResourceTypes.COUNTABLE.toString());
Long minimumAllocation = conf.getLong(
YarnConfiguration.RESOURCE_TYPES + "." + resourceName
+ MINIMUM_ALLOCATION, 0L);
Long maximumAllocation = conf.getLong(
YarnConfiguration.RESOURCE_TYPES + "." + resourceName
+ MAXIMUM_ALLOCATION, Long.MAX_VALUE);
if (resourceName == null || resourceName.isEmpty()
|| resourceUnits == null || resourceTypeName == null) {
throw new YarnRuntimeException(
"Incomplete configuration for resource type '" + resourceName
+ "'. One of name, units or type is configured incorrectly.");
}
ResourceTypes resourceType = ResourceTypes.valueOf(resourceTypeName);
LOG.info("Adding resource type - name = " + resourceName + ", units = "
+ resourceUnits + ", type = " + resourceTypeName);
if (resourceInformationMap.containsKey(resourceName)) {
throw new YarnRuntimeException(
"Error in config, key '" + resourceName + "' specified twice");
}
resourceInformationMap.put(resourceName, ResourceInformation
.newInstance(resourceName, resourceUnits, 0L, resourceType,
minimumAllocation, maximumAllocation));
}
}
checkMandatoryResources(resourceInformationMap);
addMandatoryResources(resourceInformationMap);
setMinimumAllocationForMandatoryResources(resourceInformationMap, conf);
setMaximumAllocationForMandatoryResources(resourceInformationMap, conf);
initializeResourcesFromResourceInformationMap(resourceInformationMap);
}
/**
* This method is visible for testing, unit test can construct a
* resourceInformationMap and pass it to this method to initialize multiple resources.
* @param resourceInformationMap constructed resource information map.
*/
@VisibleForTesting
public static void initializeResourcesFromResourceInformationMap(
Map<String, ResourceInformation> resourceInformationMap) {
resourceTypes = Collections.unmodifiableMap(resourceInformationMap);
updateKnownResources();
updateResourceTypeIndex();
initializedResources = true;
}
private static void updateKnownResources() {
// Update resource names.
resourceTypesArray = new ResourceInformation[resourceTypes.size()];
int index = 2;
for (ResourceInformation resInfo : resourceTypes.values()) {
if (resInfo.getName().equals(MEMORY)) {
resourceTypesArray[0] = ResourceInformation
.newInstance(resourceTypes.get(MEMORY));
} else if (resInfo.getName().equals(VCORES)) {
resourceTypesArray[1] = ResourceInformation
.newInstance(resourceTypes.get(VCORES));
} else {
resourceTypesArray[index] = ResourceInformation.newInstance(resInfo);
index++;
}
}
}
private static void updateResourceTypeIndex() {
RESOURCE_NAME_TO_INDEX.clear();
for (int index = 0; index < resourceTypesArray.length; index++) {
ResourceInformation resInfo = resourceTypesArray[index];
RESOURCE_NAME_TO_INDEX.put(resInfo.getName(), index);
}
}
/**
* Get associate index of resource types such memory, cpu etc.
* This could help to access each resource types in a resource faster.
* @return Index map for all Resource Types.
*/
public static Map<String, Integer> getResourceTypeIndex() {
return RESOURCE_NAME_TO_INDEX;
}
/**
* Get the resource types to be supported by the system.
* @return A map of the resource name to a ResouceInformation object
* which contains details such as the unit.
*/
public static Map<String, ResourceInformation> getResourceTypes() {
return getResourceTypes(null,
YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE);
}
public static ResourceInformation[] getResourceTypesArray() {
initializeResourceTypesIfNeeded(null,
YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE);
return resourceTypesArray;
}
public static int getNumberOfKnownResourceTypes() {
if (numKnownResourceTypes < 0) {
initializeResourceTypesIfNeeded(null,
YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE);
}
return numKnownResourceTypes;
}
private static Map<String, ResourceInformation> getResourceTypes(
Configuration conf) {
return getResourceTypes(conf,
YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE);
}
private static void initializeResourceTypesIfNeeded(Configuration conf,
String resourceFile) {
if (!initializedResources) {
synchronized (ResourceUtils.class) {
if (!initializedResources) {
if (conf == null) {
conf = new YarnConfiguration();
}
try {
addResourcesFileToConf(resourceFile, conf);
LOG.debug("Found " + resourceFile + ", adding to configuration");
} catch (FileNotFoundException fe) {
LOG.info("Unable to find '" + resourceFile
+ "'. Falling back to memory and vcores as resources.");
}
initializeResourcesMap(conf);
}
}
}
numKnownResourceTypes = resourceTypes.size();
}
private static Map<String, ResourceInformation> getResourceTypes(
Configuration conf, String resourceFile) {
initializeResourceTypesIfNeeded(conf, resourceFile);
return resourceTypes;
}
private static InputStream getConfInputStream(String resourceFile,
Configuration conf) throws IOException, YarnException {
ConfigurationProvider provider =
ConfigurationProviderFactory.getConfigurationProvider(conf);
try {
provider.init(conf);
} catch (Exception e) {
throw new IOException(e);
}
InputStream ris = provider.getConfigurationInputStream(conf, resourceFile);
if (ris == null) {
if (conf.getResource(resourceFile) == null) {
throw new FileNotFoundException("Unable to find " + resourceFile);
}
throw new IOException(
"Unable to open resource types file '" + resourceFile
+ "'. Using provider " + provider);
}
return ris;
}
private static void addResourcesFileToConf(String resourceFile,
Configuration conf) throws FileNotFoundException {
try {
InputStream ris = getConfInputStream(resourceFile, conf);
LOG.debug("Found " + resourceFile + ", adding to configuration");
conf.addResource(ris);
} catch (FileNotFoundException fe) {
throw fe;
} catch (IOException ie) {
LOG.fatal("Exception trying to read resource types configuration '"
+ resourceFile + "'.", ie);
throw new YarnRuntimeException(ie);
} catch (YarnException ye) {
LOG.fatal("YARN Exception trying to read resource types configuration '"
+ resourceFile + "'.", ye);
throw new YarnRuntimeException(ye);
}
}
@VisibleForTesting
synchronized static void resetResourceTypes() {
initializedResources = false;
}
@VisibleForTesting
public static Map<String, ResourceInformation>
resetResourceTypes(Configuration conf) {
synchronized (ResourceUtils.class) {
initializedResources = false;
}
return getResourceTypes(conf);
}
public static String getUnits(String resourceValue) {
String units;
for (int i = 0; i < resourceValue.length(); i++) {
if (Character.isAlphabetic(resourceValue.charAt(i))) {
units = resourceValue.substring(i);
if (StringUtils.isAlpha(units)) {
return units;
}
}
}
return "";
}
/**
* Function to get the resources for a node. This function will look at the
* file {@link YarnConfiguration#NODE_RESOURCES_CONFIGURATION_FILE} to
* determine the node resources.
*
* @param conf configuration file
* @return a map to resource name to the ResourceInformation object. The map
* is guaranteed to have entries for memory and vcores
*/
public static Map<String, ResourceInformation> getNodeResourceInformation(
Configuration conf) {
if (!initializedNodeResources) {
synchronized (ResourceUtils.class) {
if (!initializedNodeResources) {
Map<String, ResourceInformation> nodeResources = initializeNodeResourceInformation(
conf);
addMandatoryResources(nodeResources);
checkMandatoryResources(nodeResources);
setMinimumAllocationForMandatoryResources(nodeResources, conf);
setMaximumAllocationForMandatoryResources(nodeResources, conf);
readOnlyNodeResources = Collections.unmodifiableMap(nodeResources);
initializedNodeResources = true;
}
}
}
return readOnlyNodeResources;
}
private static Map<String, ResourceInformation> initializeNodeResourceInformation(
Configuration conf) {
Map<String, ResourceInformation> nodeResources = new HashMap<>();
try {
addResourcesFileToConf(
YarnConfiguration.NODE_RESOURCES_CONFIGURATION_FILE, conf);
for (Map.Entry<String, String> entry : conf) {
String key = entry.getKey();
String value = entry.getValue();
if (key.startsWith(YarnConfiguration.NM_RESOURCES_PREFIX)) {
addResourceInformation(key, value, nodeResources);
}
}
} catch (FileNotFoundException fe) {
LOG.info("Couldn't find node resources file");
}
return nodeResources;
}
private static void addResourceInformation(String prop, String value,
Map<String, ResourceInformation> nodeResources) {
String[] parts = prop.split("\\.");
LOG.info("Found resource entry " + prop);
if (parts.length == 4) {
String resourceType = parts[3];
if (!nodeResources.containsKey(resourceType)) {
nodeResources
.put(resourceType, ResourceInformation.newInstance(resourceType));
}
String units = getUnits(value);
Long resourceValue =
Long.valueOf(value.substring(0, value.length() - units.length()));
nodeResources.get(resourceType).setValue(resourceValue);
nodeResources.get(resourceType).setUnits(units);
LOG.debug("Setting value for resource type " + resourceType + " to "
+ resourceValue + " with units " + units);
}
}
@VisibleForTesting
synchronized public static void resetNodeResources() {
initializedNodeResources = false;
}
public static Resource getResourceTypesMinimumAllocation() {
Resource ret = Resource.newInstance(0, 0);
for (ResourceInformation entry : resourceTypesArray) {
String name = entry.getName();
if (name.equals(ResourceInformation.MEMORY_MB.getName())) {
ret.setMemorySize(entry.getMinimumAllocation());
} else if (name.equals(ResourceInformation.VCORES.getName())) {
Long tmp = entry.getMinimumAllocation();
if (tmp > Integer.MAX_VALUE) {
tmp = (long) Integer.MAX_VALUE;
}
ret.setVirtualCores(tmp.intValue());
} else {
ret.setResourceValue(name, entry.getMinimumAllocation());
}
}
return ret;
}
/**
* Get a Resource object with for the maximum allocation possible.
* @return a Resource object with the maximum allocation for the scheduler
*/
public static Resource getResourceTypesMaximumAllocation() {
Resource ret = Resource.newInstance(0, 0);
for (ResourceInformation entry : resourceTypesArray) {
String name = entry.getName();
if (name.equals(ResourceInformation.MEMORY_MB.getName())) {
ret.setMemorySize(entry.getMaximumAllocation());
} else if (name.equals(ResourceInformation.VCORES.getName())) {
Long tmp = entry.getMaximumAllocation();
if (tmp > Integer.MAX_VALUE) {
tmp = (long) Integer.MAX_VALUE;
}
ret.setVirtualCores(tmp.intValue());
continue;
} else {
ret.setResourceValue(name, entry.getMaximumAllocation());
}
}
return ret;
}
/**
* Get default unit by given resource type.
* @param resourceType resourceType
* @return default unit
*/
public static String getDefaultUnit(String resourceType) {
ResourceInformation ri = getResourceTypes().get(resourceType);
if (ri != null) {
return ri.getUnits();
}
return "";
}
/**
* Get all resource types information from known resource types.
* @return List of ResourceTypeInfo
*/
public static List<ResourceTypeInfo> getResourcesTypeInfo() {
List<ResourceTypeInfo> array = new ArrayList<>();
// Add all resource types
Collection<ResourceInformation> resourcesInfo =
ResourceUtils.getResourceTypes().values();
for (ResourceInformation resourceInfo : resourcesInfo) {
array.add(ResourceTypeInfo
.newInstance(resourceInfo.getName(), resourceInfo.getUnits(),
resourceInfo.getResourceType()));
}
return array;
}
}