blob: 4bf3e131638c0587d26bbad9d5be7e76bbc4be84 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.io.IOException;
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.InvalidLabelResourceRequestException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
* Utilities shared by schedulers.
*/
@Private
@Unstable
public class SchedulerUtils {
private static final Log LOG = LogFactory.getLog(SchedulerUtils.class);
private static final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
public static final String RELEASED_CONTAINER =
"Container released by application";
public static final String UPDATED_CONTAINER =
"Temporary container killed by application for ExeutionType update";
public static final String LOST_CONTAINER =
"Container released on a *lost* node";
public static final String PREEMPTED_CONTAINER =
"Container preempted by scheduler";
public static final String COMPLETED_APPLICATION =
"Container of a completed application";
public static final String EXPIRED_CONTAINER =
"Container expired since it was unused";
public static final String UNRESERVED_CONTAINER =
"Container reservation no longer required.";
/**
* Utility to create a {@link ContainerStatus} during exceptional
* circumstances.
*
* @param containerId {@link ContainerId} of returned/released/lost container.
* @param diagnostics diagnostic message
* @return <code>ContainerStatus</code> for an returned/released/lost
* container
*/
public static ContainerStatus createAbnormalContainerStatus(
ContainerId containerId, String diagnostics) {
return createAbnormalContainerStatus(containerId,
ContainerExitStatus.ABORTED, diagnostics);
}
/**
* Utility to create a {@link ContainerStatus} for killed containers.
* @param containerId {@link ContainerId} of the killed container.
* @param diagnostics diagnostic message
* @return <code>ContainerStatus</code> for a killed container
*/
public static ContainerStatus createKilledContainerStatus(
ContainerId containerId, String diagnostics) {
return createAbnormalContainerStatus(containerId,
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER, diagnostics);
}
/**
* Utility to create a {@link ContainerStatus} during exceptional
* circumstances.
*
* @param containerId {@link ContainerId} of returned/released/lost container.
* @param diagnostics diagnostic message
* @return <code>ContainerStatus</code> for an returned/released/lost
* container
*/
public static ContainerStatus createPreemptedContainerStatus(
ContainerId containerId, String diagnostics) {
return createAbnormalContainerStatus(containerId,
ContainerExitStatus.PREEMPTED, diagnostics);
}
/**
* Utility to create a {@link ContainerStatus} during exceptional
* circumstances.
*
* @param containerId {@link ContainerId} of returned/released/lost container.
* @param diagnostics diagnostic message
* @return <code>ContainerStatus</code> for an returned/released/lost
* container
*/
private static ContainerStatus createAbnormalContainerStatus(
ContainerId containerId, int exitStatus, String diagnostics) {
ContainerStatus containerStatus =
recordFactory.newRecordInstance(ContainerStatus.class);
containerStatus.setContainerId(containerId);
containerStatus.setDiagnostics(diagnostics);
containerStatus.setExitStatus(exitStatus);
containerStatus.setState(ContainerState.COMPLETE);
return containerStatus;
}
/**
* Utility method to normalize a resource request, by insuring that the
* requested memory is a multiple of minMemory and is not zero.
*/
@VisibleForTesting
public static void normalizeRequest(
ResourceRequest ask,
ResourceCalculator resourceCalculator,
Resource minimumResource,
Resource maximumResource) {
ask.setCapability(
getNormalizedResource(ask.getCapability(), resourceCalculator,
minimumResource, maximumResource, minimumResource));
}
/**
* Utility method to normalize a resource request, by insuring that the
* requested memory is a multiple of increment resource and is not zero.
*
* @return normalized resource
*/
public static Resource getNormalizedResource(
Resource ask,
ResourceCalculator resourceCalculator,
Resource minimumResource,
Resource maximumResource,
Resource incrementResource) {
Resource normalized = Resources.normalize(
resourceCalculator, ask, minimumResource,
maximumResource, incrementResource);
return normalized;
}
private static void normalizeNodeLabelExpressionInRequest(
ResourceRequest resReq, QueueInfo queueInfo) {
String labelExp = resReq.getNodeLabelExpression();
// if queue has default label expression, and RR doesn't have, use the
// default label expression of queue
if (labelExp == null && queueInfo != null && ResourceRequest.ANY
.equals(resReq.getResourceName())) {
labelExp = queueInfo.getDefaultNodeLabelExpression();
}
// If labelExp still equals to null, set it to be NO_LABEL
if (labelExp == null) {
labelExp = RMNodeLabelsManager.NO_LABEL;
}
resReq.setNodeLabelExpression(labelExp);
}
public static void normalizeAndValidateRequest(ResourceRequest resReq,
Resource maximumResource, String queueName, YarnScheduler scheduler,
boolean isRecovery, RMContext rmContext)
throws InvalidResourceRequestException {
normalizeAndValidateRequest(resReq, maximumResource, queueName, scheduler,
isRecovery, rmContext, null);
}
public static void normalizeAndValidateRequest(ResourceRequest resReq,
Resource maximumResource, String queueName, YarnScheduler scheduler,
boolean isRecovery, RMContext rmContext, QueueInfo queueInfo)
throws InvalidResourceRequestException {
Configuration conf = rmContext.getYarnConfiguration();
// If Node label is not enabled throw exception
if (null != conf && !YarnConfiguration.areNodeLabelsEnabled(conf)) {
String labelExp = resReq.getNodeLabelExpression();
if (!(RMNodeLabelsManager.NO_LABEL.equals(labelExp)
|| null == labelExp)) {
String message = "NodeLabel is not enabled in cluster, but resource"
+ " request contains a label expression.";
LOG.warn(message);
if (!isRecovery) {
throw new InvalidLabelResourceRequestException(
"Invalid resource request, node label not enabled "
+ "but request contains label expression");
}
}
}
if (null == queueInfo) {
try {
queueInfo = scheduler.getQueueInfo(queueName, false, false);
} catch (IOException e) {
// it is possible queue cannot get when queue mapping is set, just ignore
// the queueInfo here, and move forward
}
}
SchedulerUtils.normalizeNodeLabelExpressionInRequest(resReq, queueInfo);
if (!isRecovery) {
validateResourceRequest(resReq, maximumResource, queueInfo, rmContext);
}
}
public static void normalizeAndvalidateRequest(ResourceRequest resReq,
Resource maximumResource, String queueName, YarnScheduler scheduler,
RMContext rmContext)
throws InvalidResourceRequestException {
normalizeAndvalidateRequest(resReq, maximumResource, queueName, scheduler,
rmContext, null);
}
public static void normalizeAndvalidateRequest(ResourceRequest resReq,
Resource maximumResource, String queueName, YarnScheduler scheduler,
RMContext rmContext, QueueInfo queueInfo)
throws InvalidResourceRequestException {
normalizeAndValidateRequest(resReq, maximumResource, queueName, scheduler,
false, rmContext, queueInfo);
}
/**
* If RM should enforce partition exclusivity for enforced partition "x":
* 1) If request is "x" and app label is not "x",
* override request to app's label.
* 2) If app label is "x", ensure request is "x".
* @param resReq resource request
* @param enforcedPartitions list of exclusive enforced partitions
* @param appLabel app's node label expression
*/
public static void enforcePartitionExclusivity(ResourceRequest resReq,
Set<String> enforcedPartitions, String appLabel) {
if (enforcedPartitions == null || enforcedPartitions.isEmpty()) {
return;
}
if (!enforcedPartitions.contains(appLabel)
&& enforcedPartitions.contains(resReq.getNodeLabelExpression())) {
resReq.setNodeLabelExpression(appLabel);
}
if (enforcedPartitions.contains(appLabel)) {
resReq.setNodeLabelExpression(appLabel);
}
}
/**
* Utility method to validate a resource request, by insuring that the
* requested memory/vcore is non-negative and not greater than max
*
* @throws InvalidResourceRequestException when there is invalid request
*/
private static void validateResourceRequest(ResourceRequest resReq,
Resource maximumResource, QueueInfo queueInfo, RMContext rmContext)
throws InvalidResourceRequestException {
final Resource requestedResource = resReq.getCapability();
checkResourceRequestAgainstAvailableResource(requestedResource,
maximumResource);
String labelExp = resReq.getNodeLabelExpression();
// we don't allow specify label expression other than resourceName=ANY now
if (!ResourceRequest.ANY.equals(resReq.getResourceName())
&& labelExp != null && !labelExp.trim().isEmpty()) {
throw new InvalidLabelResourceRequestException(
"Invalid resource request, queue=" + queueInfo.getQueueName()
+ " specified node label expression in a "
+ "resource request has resource name = "
+ resReq.getResourceName());
}
// we don't allow specify label expression with more than one node labels now
if (labelExp != null && labelExp.contains("&&")) {
throw new InvalidLabelResourceRequestException(
"Invalid resource request, queue=" + queueInfo.getQueueName()
+ " specified more than one node label "
+ "in a node label expression, node label expression = "
+ labelExp);
}
if (labelExp != null && !labelExp.trim().isEmpty() && queueInfo != null) {
if (!checkQueueLabelExpression(queueInfo.getAccessibleNodeLabels(),
labelExp, rmContext)) {
throw new InvalidLabelResourceRequestException(
"Invalid resource request" + ", queue=" + queueInfo.getQueueName()
+ " doesn't have permission to access all labels "
+ "in resource request. labelExpression of resource request="
+ labelExp + ". Queue labels="
+ (queueInfo.getAccessibleNodeLabels() == null ? ""
: StringUtils.join(
queueInfo.getAccessibleNodeLabels().iterator(), ',')));
} else {
checkQueueLabelInLabelManager(labelExp, rmContext);
}
}
}
@Private
@VisibleForTesting
static void checkResourceRequestAgainstAvailableResource(Resource reqResource,
Resource availableResource) throws InvalidResourceRequestException {
for (int i = 0; i < ResourceUtils.getNumberOfKnownResourceTypes(); i++) {
final ResourceInformation requestedRI =
reqResource.getResourceInformation(i);
final String reqResourceName = requestedRI.getName();
if (requestedRI.getValue() < 0) {
throwInvalidResourceException(reqResource, availableResource,
reqResourceName);
}
final ResourceInformation availableRI =
availableResource.getResourceInformation(reqResourceName);
long requestedResourceValue = requestedRI.getValue();
long availableResourceValue = availableRI.getValue();
int unitsRelation = UnitsConversionUtil
.compareUnits(requestedRI.getUnits(), availableRI.getUnits());
if (LOG.isDebugEnabled()) {
LOG.debug("Requested resource information: " + requestedRI);
LOG.debug("Available resource information: " + availableRI);
LOG.debug("Relation of units: " + unitsRelation);
}
// requested resource unit is less than available resource unit
// e.g. requestedUnit: "m", availableUnit: "K")
if (unitsRelation < 0) {
availableResourceValue =
UnitsConversionUtil.convert(availableRI.getUnits(),
requestedRI.getUnits(), availableRI.getValue());
// requested resource unit is greater than available resource unit
// e.g. requestedUnit: "G", availableUnit: "M")
} else if (unitsRelation > 0) {
requestedResourceValue =
UnitsConversionUtil.convert(requestedRI.getUnits(),
availableRI.getUnits(), requestedRI.getValue());
}
if (LOG.isDebugEnabled()) {
LOG.debug("Requested resource value after conversion: " +
requestedResourceValue);
LOG.info("Available resource value after conversion: " +
availableResourceValue);
}
if (requestedResourceValue > availableResourceValue) {
throwInvalidResourceException(reqResource, availableResource,
reqResourceName);
}
}
}
private static void throwInvalidResourceException(Resource reqResource,
Resource availableResource, String reqResourceName)
throws InvalidResourceRequestException {
throw new InvalidResourceRequestException(
"Invalid resource request, requested resource type=[" + reqResourceName
+ "] < 0 or greater than maximum allowed allocation. Requested "
+ "resource=" + reqResource + ", maximum allowed allocation="
+ availableResource
+ ", please note that maximum allowed allocation is calculated "
+ "by scheduler based on maximum resource of registered "
+ "NodeManagers, which might be less than configured "
+ "maximum allocation="
+ ResourceUtils.getResourceTypesMaximumAllocation());
}
private static void checkQueueLabelInLabelManager(String labelExpression,
RMContext rmContext) throws InvalidLabelResourceRequestException {
// check node label manager contains this label
if (null != rmContext) {
RMNodeLabelsManager nlm = rmContext.getNodeLabelManager();
if (nlm != null && !nlm.containsNodeLabel(labelExpression)) {
throw new InvalidLabelResourceRequestException(
"Invalid label resource request, cluster do not contain "
+ ", label= " + labelExpression);
}
}
}
/**
* Check queue label expression, check if node label in queue's
* node-label-expression existed in clusterNodeLabels if rmContext != null
*/
public static boolean checkQueueLabelExpression(Set<String> queueLabels,
String labelExpression, RMContext rmContext) {
// if label expression is empty, we can allocate container on any node
if (labelExpression == null) {
return true;
}
for (String str : labelExpression.split("&&")) {
str = str.trim();
if (!str.trim().isEmpty()) {
// check queue label
if (queueLabels == null) {
return false;
} else {
if (!queueLabels.contains(str)
&& !queueLabels.contains(RMNodeLabelsManager.ANY)) {
return false;
}
}
}
}
return true;
}
public static AccessType toAccessType(QueueACL acl) {
switch (acl) {
case ADMINISTER_QUEUE:
return AccessType.ADMINISTER_QUEUE;
case SUBMIT_APPLICATIONS:
return AccessType.SUBMIT_APP;
}
return null;
}
private static boolean hasPendingResourceRequest(ResourceCalculator rc,
ResourceUsage usage, String partitionToLookAt, Resource cluster) {
if (Resources.greaterThan(rc, cluster,
usage.getPending(partitionToLookAt), Resources.none())) {
return true;
}
return false;
}
@Private
public static boolean hasPendingResourceRequest(ResourceCalculator rc,
ResourceUsage usage, String nodePartition, Resource cluster,
SchedulingMode schedulingMode) {
String partitionToLookAt = nodePartition;
if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
partitionToLookAt = RMNodeLabelsManager.NO_LABEL;
}
return hasPendingResourceRequest(rc, usage, partitionToLookAt, cluster);
}
public static RMContainer createOpportunisticRmContainer(RMContext rmContext,
Container container, boolean isRemotelyAllocated) {
SchedulerApplicationAttempt appAttempt =
((AbstractYarnScheduler) rmContext.getScheduler())
.getCurrentAttemptForContainer(container.getId());
RMContainer rmContainer = new RMContainerImpl(container,
SchedulerRequestKey.extractFrom(container),
appAttempt.getApplicationAttemptId(), container.getNodeId(),
appAttempt.getUser(), rmContext, isRemotelyAllocated);
appAttempt.addRMContainer(container.getId(), rmContainer);
((AbstractYarnScheduler) rmContext.getScheduler()).getNode(
container.getNodeId()).allocateContainer(rmContainer);
return rmContainer;
}
}