blob: 375ae354ad9cef6990df58d4b783faab149b7777 [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. See accompanying LICENSE file.
*/
package org.apache.hadoop.yarn.submarine.common.resource;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.submarine.common.exception.SubmarineRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* This class implements some methods with the almost the same logic as
* org.apache.hadoop.yarn.util.resource.ResourceUtils of hadoop 3.3.
* If the hadoop dependencies are upgraded to 3.3, this class can be refactored
* with org.apache.hadoop.yarn.util.resource.ResourceUtils.
*/
public final class ResourceUtils {
private final static String RES_PATTERN = "^[^=]+=\\d+\\s?\\w*$";
private final static String SET_RESOURCE_VALUE_METHOD = "setResourceValue";
private final static String SET_MEMORY_SIZE_METHOD = "setMemorySize";
private final static String DEPRECATED_SET_MEMORY_SIZE_METHOD =
"setMemory";
private final static String GET_MEMORY_SIZE_METHOD = "getMemorySize";
private final static String DEPRECATED_GET_MEMORY_SIZE_METHOD =
"getMemory";
private final static String GET_RESOURCE_VALUE_METHOD = "getResourceValue";
private final static String GET_RESOURCE_TYPE_METHOD =
"getResourcesTypeInfo";
private final static String REINITIALIZE_RESOURCES_METHOD =
"reinitializeResources";
public static final String MEMORY_URI = "memory-mb";
public static final String VCORES_URI = "vcores";
public static final String GPU_URI = "yarn.io/gpu";
public static final String FPGA_URI = "yarn.io/fpga";
private static final Logger LOG =
LoggerFactory.getLogger(ResourceUtils.class);
private ResourceUtils() {}
public static Resource createResourceFromString(String resourceStr) {
Map<String, Long> typeToValue = parseResourcesString(resourceStr);
Resource resource = Resource.newInstance(0, 0);
for (Map.Entry<String, Long> entry : typeToValue.entrySet()) {
if(entry.getKey().equals(VCORES_URI)) {
resource.setVirtualCores(entry.getValue().intValue());
continue;
} else if (entry.getKey().equals(MEMORY_URI)) {
setMemorySize(resource, entry.getValue());
continue;
}
setResource(resource, entry.getKey(), entry.getValue().intValue());
}
return resource;
}
private static Map<String, Long> parseResourcesString(String resourcesStr) {
Map<String, Long> resources = new HashMap<>();
String[] pairs = resourcesStr.trim().split(",");
for (String resource : pairs) {
resource = resource.trim();
if (!resource.matches(RES_PATTERN)) {
throw new IllegalArgumentException("\"" + resource + "\" is not a "
+ "valid resource type/amount pair. "
+ "Please provide key=amount pairs separated by commas.");
}
String[] splits = resource.split("=");
String key = splits[0], value = splits[1];
String units = getUnits(value);
String valueWithoutUnit = value.substring(0,
value.length()- units.length()).trim();
long resourceValue = Long.parseLong(valueWithoutUnit);
// Convert commandline unit to standard YARN unit.
if (units.equals("M") || units.equals("m")) {
units = "Mi";
} else if (units.equals("G") || units.equals("g")) {
units = "Gi";
} else if (!units.isEmpty()){
throw new IllegalArgumentException("Acceptable units are M/G or empty");
}
// special handle memory-mb and memory
if (key.equals(MEMORY_URI)) {
if (!units.isEmpty()) {
resourceValue = UnitsConversionUtil.convert(units, "Mi",
resourceValue);
}
}
if (key.equals("memory")) {
key = MEMORY_URI;
resourceValue = UnitsConversionUtil.convert(units, "Mi",
resourceValue);
}
// special handle gpu
if (key.equals("gpu")) {
key = GPU_URI;
}
// special handle fpga
if (key.equals("fpga")) {
key = FPGA_URI;
}
resources.put(key, resourceValue);
}
return resources;
}
/**
* As hadoop 2.9.2 and lower don't support resources except cpu and memory.
* Use reflection to set GPU or other resources for compatibility with
* hadoop 2.9.2
*/
public static void setResource(Resource resource, String resourceName,
int resourceValue) {
try {
Method method = resource.getClass().getMethod(SET_RESOURCE_VALUE_METHOD,
String.class, long.class);
method.invoke(resource, resourceName, resourceValue);
} catch (NoSuchMethodException e) {
LOG.error("There is no '" + SET_RESOURCE_VALUE_METHOD + "' API in this" +
"version of YARN", e);
throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
} catch (IllegalAccessException | InvocationTargetException e) {
LOG.error("Failed to invoke '" + SET_RESOURCE_VALUE_METHOD +
"' method to set GPU resources", e);
throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
}
return;
}
public static void setMemorySize(Resource resource, Long memorySize) {
boolean useWithIntParameter = false;
// For hadoop 2.9.2 and above
try {
Method method = resource.getClass().getMethod(SET_MEMORY_SIZE_METHOD,
long.class);
method.setAccessible(true);
method.invoke(resource, memorySize);
} catch (NoSuchMethodException nsme) {
LOG.info("There is no '" + SET_MEMORY_SIZE_METHOD + "(long)' API in" +
" this version of YARN");
useWithIntParameter = true;
} catch (IllegalAccessException | InvocationTargetException e) {
LOG.error("Failed to invoke '" + SET_MEMORY_SIZE_METHOD +
"' method", e);
throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
}
// For hadoop 2.7.3
if (useWithIntParameter) {
try {
LOG.info("Trying to use '" + DEPRECATED_SET_MEMORY_SIZE_METHOD +
"(int)' API for this version of YARN");
Method method = resource.getClass().getMethod(
DEPRECATED_SET_MEMORY_SIZE_METHOD, int.class);
method.invoke(resource, memorySize.intValue());
} catch (NoSuchMethodException e) {
LOG.error("There is no '" + DEPRECATED_SET_MEMORY_SIZE_METHOD +
"(int)' API in this version of YARN", e);
throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
} catch (IllegalAccessException | InvocationTargetException e) {
LOG.error("Failed to invoke '" + DEPRECATED_SET_MEMORY_SIZE_METHOD +
"' method", e);
throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
}
}
}
public static long getMemorySize(Resource resource) {
boolean useWithIntParameter = false;
long memory = 0;
// For hadoop 2.9.2 and above
try {
Method method = resource.getClass().getMethod(GET_MEMORY_SIZE_METHOD);
method.setAccessible(true);
memory = (long) method.invoke(resource);
} catch (NoSuchMethodException e) {
LOG.info("There is no '" + GET_MEMORY_SIZE_METHOD + "' API in" +
" this version of YARN");
useWithIntParameter = true;
} catch (IllegalAccessException | InvocationTargetException e) {
LOG.error("Failed to invoke '" + GET_MEMORY_SIZE_METHOD +
"' method", e);
throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
}
// For hadoop 2.7.3
if (useWithIntParameter) {
try {
LOG.info("Trying to use '" + DEPRECATED_GET_MEMORY_SIZE_METHOD +
"' API for this version of YARN");
Method method = resource.getClass().getMethod(
DEPRECATED_GET_MEMORY_SIZE_METHOD);
method.setAccessible(true);
memory = ((Integer) method.invoke(resource)).longValue();
} catch (NoSuchMethodException e) {
LOG.error("There is no '" + DEPRECATED_GET_MEMORY_SIZE_METHOD +
"' API in this version of YARN", e);
throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
} catch (IllegalAccessException | InvocationTargetException e) {
LOG.error("Failed to invoke '" + DEPRECATED_GET_MEMORY_SIZE_METHOD +
"' method", e);
throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
}
}
return memory;
}
/**
* As hadoop 2.9.2 and lower don't support resources except cpu and memory.
* Use reflection to set GPU or other resources for compatibility with
* hadoop 2.9.2
*/
public static long getResourceValue(Resource resource, String resourceName) {
long resourceValue = 0;
try {
Method method = resource.getClass().getMethod(GET_RESOURCE_VALUE_METHOD,
String.class);
Object value = method.invoke(resource, resourceName);
resourceValue = (long) value;
} catch (NoSuchMethodException e) {
LOG.info("There is no '" + GET_RESOURCE_VALUE_METHOD + "' API in this" +
" version of YARN");
} catch (InvocationTargetException e) {
if (e.getTargetException().getClass().getName().equals(
"org.apache.hadoop.yarn.exceptions.ResourceNotFoundException")) {
LOG.info("Not found resource " + resourceName);
} else {
LOG.info("Failed to invoke '" + GET_RESOURCE_VALUE_METHOD + "'" +
" method to get resource " + resourceName);
throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
}
} catch (IllegalAccessException | ClassCastException e) {
LOG.error("Failed to invoke '" + GET_RESOURCE_VALUE_METHOD +
"' method to get resource " + resourceName, e);
throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
}
return resourceValue;
}
/**
* As hadoop 2.9.2 and lower don't support resources except cpu and memory.
* Use reflection to add GPU or other resources for compatibility with
* hadoop 2.9.2
*/
public static void configureResourceType(String resrouceName) {
Class resourceTypeInfo;
try{
resourceTypeInfo = Class.forName(
"org.apache.hadoop.yarn.api.records.ResourceTypeInfo");
Class resourceUtils = Class.forName(
"org.apache.hadoop.yarn.util.resource.ResourceUtils");
Method method = resourceUtils.getMethod(GET_RESOURCE_TYPE_METHOD);
Object resTypes = method.invoke(null);
Method resourceTypeInstance = resourceTypeInfo.getMethod("newInstance",
String.class, String.class);
Object resourceType = resourceTypeInstance.invoke(null, resrouceName, "");
((ArrayList)resTypes).add(resourceType);
Method reInitialMethod = resourceUtils.getMethod(
REINITIALIZE_RESOURCES_METHOD, List.class);
reInitialMethod.invoke(null, resTypes);
} catch (ClassNotFoundException e) {
LOG.info("There is no specified class API in this" +
" version of YARN");
LOG.info(e.getMessage());
throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
} catch (NoSuchMethodException nsme) {
LOG.info("There is no '" + GET_RESOURCE_VALUE_METHOD + "' API in this" +
" version of YARN");
} catch (IllegalAccessException | InvocationTargetException e) {
LOG.info("Failed to invoke 'configureResourceType' method ", e);
throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
}
}
private static String getUnits(String resourceValue) {
return parseResourceValue(resourceValue)[0];
}
/**
* Extract unit and actual value from resource value.
* @param resourceValue Value of the resource
* @return Array containing unit and value. [0]=unit, [1]=value
* @throws IllegalArgumentException if units contain non alpha characters
*/
private static String[] parseResourceValue(String resourceValue) {
String[] resource = new String[2];
int i = 0;
for (; i < resourceValue.length(); i++) {
if (Character.isAlphabetic(resourceValue.charAt(i))) {
break;
}
}
String units = resourceValue.substring(i);
if (StringUtils.isAlpha(units) || units.equals("")) {
resource[0] = units;
resource[1] = resourceValue.substring(0, i);
return resource;
} else {
throw new IllegalArgumentException("Units '" + units + "'"
+ " contains non alphabet characters, which is not allowed.");
}
}
}