| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.yarn.server.resourcemanager.scheduler; |
| |
| import java.io.IOException; |
| import java.util.Set; |
| |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.classification.InterfaceStability.Unstable; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.yarn.api.records.ContainerExitStatus; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.ContainerState; |
| import org.apache.hadoop.yarn.api.records.ContainerStatus; |
| import org.apache.hadoop.yarn.api.records.QueueACL; |
| import org.apache.hadoop.yarn.api.records.QueueInfo; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceRequest; |
| import org.apache.hadoop.yarn.api.records.AbstractResourceRequest; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.InvalidLabelResourceRequestException; |
| import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.factories.RecordFactory; |
| import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; |
| import org.apache.hadoop.yarn.security.AccessType; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMContext; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; |
| import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; |
| import org.apache.hadoop.yarn.util.resource.ResourceCalculator; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| |
| /** |
| * Utilities shared by schedulers. |
| */ |
| @Private |
| @Unstable |
| public class SchedulerUtils { |
| |
| private static final RecordFactory recordFactory = |
| RecordFactoryProvider.getRecordFactory(null); |
| |
| public static final String RELEASED_CONTAINER = |
| "Container released by application"; |
| |
| public static final String LOST_CONTAINER = |
| "Container released on a *lost* node"; |
| |
| public static final String PREEMPTED_CONTAINER = |
| "Container preempted by scheduler"; |
| |
| public static final String COMPLETED_APPLICATION = |
| "Container of a completed application"; |
| |
| public static final String EXPIRED_CONTAINER = |
| "Container expired since it was unused"; |
| |
| public static final String UNRESERVED_CONTAINER = |
| "Container reservation no longer required."; |
| |
| /** |
| * Utility to create a {@link ContainerStatus} during exceptional |
| * circumstances. |
| * |
| * @param containerId {@link ContainerId} of returned/released/lost container. |
| * @param diagnostics diagnostic message |
| * @return <code>ContainerStatus</code> for an returned/released/lost |
| * container |
| */ |
| public static ContainerStatus createAbnormalContainerStatus( |
| ContainerId containerId, String diagnostics) { |
| return createAbnormalContainerStatus(containerId, |
| ContainerExitStatus.ABORTED, diagnostics); |
| } |
| |
| /** |
| * Utility to create a {@link ContainerStatus} during exceptional |
| * circumstances. |
| * |
| * @param containerId {@link ContainerId} of returned/released/lost container. |
| * @param diagnostics diagnostic message |
| * @return <code>ContainerStatus</code> for an returned/released/lost |
| * container |
| */ |
| public static ContainerStatus createPreemptedContainerStatus( |
| ContainerId containerId, String diagnostics) { |
| return createAbnormalContainerStatus(containerId, |
| ContainerExitStatus.PREEMPTED, diagnostics); |
| } |
| |
| /** |
| * Utility to create a {@link ContainerStatus} during exceptional |
| * circumstances. |
| * |
| * @param containerId {@link ContainerId} of returned/released/lost container. |
| * @param diagnostics diagnostic message |
| * @return <code>ContainerStatus</code> for an returned/released/lost |
| * container |
| */ |
| private static ContainerStatus createAbnormalContainerStatus( |
| ContainerId containerId, int exitStatus, String diagnostics) { |
| ContainerStatus containerStatus = |
| recordFactory.newRecordInstance(ContainerStatus.class); |
| containerStatus.setContainerId(containerId); |
| containerStatus.setDiagnostics(diagnostics); |
| containerStatus.setExitStatus(exitStatus); |
| containerStatus.setState(ContainerState.COMPLETE); |
| return containerStatus; |
| } |
| |
| /** |
| * Utility method to normalize a resource request, by insuring that the |
| * requested memory is a multiple of minMemory and is not zero. |
| */ |
| public static void normalizeRequest( |
| ResourceRequest ask, |
| ResourceCalculator resourceCalculator, |
| Resource minimumResource, |
| Resource maximumResource) { |
| normalizeRequest(ask, resourceCalculator, |
| minimumResource, maximumResource, minimumResource); |
| } |
| |
| /** |
| * Utility method to normalize a resource request, by insuring that the |
| * requested memory is a multiple of increment resource and is not zero. |
| */ |
| public static void normalizeRequest( |
| AbstractResourceRequest ask, |
| ResourceCalculator resourceCalculator, |
| Resource minimumResource, |
| Resource maximumResource, |
| Resource incrementResource) { |
| Resource normalized = Resources.normalize( |
| resourceCalculator, ask.getCapability(), minimumResource, |
| maximumResource, incrementResource); |
| ask.setCapability(normalized); |
| } |
| |
| private static void normalizeNodeLabelExpressionInRequest( |
| ResourceRequest resReq, QueueInfo queueInfo) { |
| |
| String labelExp = resReq.getNodeLabelExpression(); |
| |
| // if queue has default label expression, and RR doesn't have, use the |
| // default label expression of queue |
| if (labelExp == null && queueInfo != null && ResourceRequest.ANY |
| .equals(resReq.getResourceName())) { |
| labelExp = queueInfo.getDefaultNodeLabelExpression(); |
| } |
| |
| // If labelExp still equals to null, set it to be NO_LABEL |
| if (labelExp == null) { |
| labelExp = RMNodeLabelsManager.NO_LABEL; |
| } |
| resReq.setNodeLabelExpression(labelExp); |
| } |
| |
| public static void normalizeAndValidateRequest(ResourceRequest resReq, |
| Resource maximumResource, String queueName, YarnScheduler scheduler, |
| boolean isRecovery, RMContext rmContext) |
| throws InvalidResourceRequestException { |
| normalizeAndValidateRequest(resReq, maximumResource, queueName, scheduler, |
| isRecovery, rmContext, null); |
| } |
| |
| public static void normalizeAndValidateRequest(ResourceRequest resReq, |
| Resource maximumResource, String queueName, YarnScheduler scheduler, |
| boolean isRecovery, RMContext rmContext, QueueInfo queueInfo) |
| throws InvalidResourceRequestException { |
| Configuration conf = rmContext.getYarnConfiguration(); |
| // If Node label is not enabled throw exception |
| if (null != conf && !YarnConfiguration.areNodeLabelsEnabled(conf)) { |
| String labelExp = resReq.getNodeLabelExpression(); |
| if (!(RMNodeLabelsManager.NO_LABEL.equals(labelExp) |
| || null == labelExp)) { |
| throw new InvalidLabelResourceRequestException( |
| "Invalid resource request, node label not enabled " |
| + "but request contains label expression"); |
| } |
| } |
| if (null == queueInfo) { |
| try { |
| queueInfo = scheduler.getQueueInfo(queueName, false, false); |
| } catch (IOException e) { |
| // it is possible queue cannot get when queue mapping is set, just ignore |
| // the queueInfo here, and move forward |
| } |
| } |
| SchedulerUtils.normalizeNodeLabelExpressionInRequest(resReq, queueInfo); |
| if (!isRecovery) { |
| validateResourceRequest(resReq, maximumResource, queueInfo, rmContext); |
| } |
| } |
| |
| public static void normalizeAndvalidateRequest(ResourceRequest resReq, |
| Resource maximumResource, String queueName, YarnScheduler scheduler, |
| RMContext rmContext) |
| throws InvalidResourceRequestException { |
| normalizeAndvalidateRequest(resReq, maximumResource, queueName, scheduler, |
| rmContext, null); |
| } |
| |
| public static void normalizeAndvalidateRequest(ResourceRequest resReq, |
| Resource maximumResource, String queueName, YarnScheduler scheduler, |
| RMContext rmContext, QueueInfo queueInfo) |
| throws InvalidResourceRequestException { |
| normalizeAndValidateRequest(resReq, maximumResource, queueName, scheduler, |
| false, rmContext, queueInfo); |
| } |
| |
| /** |
| * Utility method to validate a resource request, by insuring that the |
| * requested memory/vcore is non-negative and not greater than max |
| * |
| * @throws InvalidResourceRequestException when there is invalid request |
| */ |
| private static void validateResourceRequest(ResourceRequest resReq, |
| Resource maximumResource, QueueInfo queueInfo, RMContext rmContext) |
| throws InvalidResourceRequestException { |
| try { |
| RMServerUtils.convertProfileToResourceCapability(resReq, |
| rmContext.getYarnConfiguration(), |
| rmContext.getResourceProfilesManager()); |
| } catch (YarnException ye) { |
| throw new InvalidResourceRequestException(ye); |
| } |
| |
| if (resReq.getCapability().getMemorySize() < 0 || |
| resReq.getCapability().getMemorySize() > maximumResource.getMemorySize()) { |
| throw new InvalidResourceRequestException("Invalid resource request" |
| + ", requested memory < 0" |
| + ", or requested memory > max configured" |
| + ", requestedMemory=" + resReq.getCapability().getMemorySize() |
| + ", maxMemory=" + maximumResource.getMemorySize()); |
| } |
| if (resReq.getCapability().getVirtualCores() < 0 || |
| resReq.getCapability().getVirtualCores() > |
| maximumResource.getVirtualCores()) { |
| throw new InvalidResourceRequestException("Invalid resource request" |
| + ", requested virtual cores < 0" |
| + ", or requested virtual cores > max configured" |
| + ", requestedVirtualCores=" |
| + resReq.getCapability().getVirtualCores() |
| + ", maxVirtualCores=" + maximumResource.getVirtualCores()); |
| } |
| String labelExp = resReq.getNodeLabelExpression(); |
| // we don't allow specify label expression other than resourceName=ANY now |
| if (!ResourceRequest.ANY.equals(resReq.getResourceName()) |
| && labelExp != null && !labelExp.trim().isEmpty()) { |
| throw new InvalidLabelResourceRequestException( |
| "Invalid resource request, queue=" + queueInfo.getQueueName() |
| + " specified node label expression in a " |
| + "resource request has resource name = " |
| + resReq.getResourceName()); |
| } |
| |
| // we don't allow specify label expression with more than one node labels now |
| if (labelExp != null && labelExp.contains("&&")) { |
| throw new InvalidLabelResourceRequestException( |
| "Invailid resource request, queue=" + queueInfo.getQueueName() |
| + " specified more than one node label " |
| + "in a node label expression, node label expression = " |
| + labelExp); |
| } |
| |
| if (labelExp != null && !labelExp.trim().isEmpty() && queueInfo != null) { |
| if (!checkQueueLabelExpression(queueInfo.getAccessibleNodeLabels(), |
| labelExp, rmContext)) { |
| throw new InvalidLabelResourceRequestException( |
| "Invalid resource request" + ", queue=" + queueInfo.getQueueName() |
| + " doesn't have permission to access all labels " |
| + "in resource request. labelExpression of resource request=" |
| + labelExp + ". Queue labels=" |
| + (queueInfo.getAccessibleNodeLabels() == null ? "" |
| : StringUtils.join( |
| queueInfo.getAccessibleNodeLabels().iterator(), ','))); |
| } else { |
| checkQueueLabelInLabelManager(labelExp, rmContext); |
| } |
| } |
| } |
| |
| private static void checkQueueLabelInLabelManager(String labelExpression, |
| RMContext rmContext) throws InvalidLabelResourceRequestException { |
| // check node label manager contains this label |
| if (null != rmContext) { |
| RMNodeLabelsManager nlm = rmContext.getNodeLabelManager(); |
| if (nlm != null && !nlm.containsNodeLabel(labelExpression)) { |
| throw new InvalidLabelResourceRequestException( |
| "Invalid label resource request, cluster do not contain " |
| + ", label= " + labelExpression); |
| } |
| } |
| } |
| |
| /** |
| * Check queue label expression, check if node label in queue's |
| * node-label-expression existed in clusterNodeLabels if rmContext != null |
| */ |
| public static boolean checkQueueLabelExpression(Set<String> queueLabels, |
| String labelExpression, RMContext rmContext) { |
| // if label expression is empty, we can allocate container on any node |
| if (labelExpression == null) { |
| return true; |
| } |
| for (String str : labelExpression.split("&&")) { |
| str = str.trim(); |
| if (!str.trim().isEmpty()) { |
| // check queue label |
| if (queueLabels == null) { |
| return false; |
| } else { |
| if (!queueLabels.contains(str) |
| && !queueLabels.contains(RMNodeLabelsManager.ANY)) { |
| return false; |
| } |
| } |
| } |
| } |
| return true; |
| } |
| |
| |
| public static AccessType toAccessType(QueueACL acl) { |
| switch (acl) { |
| case ADMINISTER_QUEUE: |
| return AccessType.ADMINISTER_QUEUE; |
| case SUBMIT_APPLICATIONS: |
| return AccessType.SUBMIT_APP; |
| } |
| return null; |
| } |
| |
| public static boolean checkResourceRequestMatchingNodePartition( |
| String requestedPartition, String nodePartition, |
| SchedulingMode schedulingMode) { |
| // We will only look at node label = nodeLabelToLookAt according to |
| // schedulingMode and partition of node. |
| String nodePartitionToLookAt = null; |
| if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) { |
| nodePartitionToLookAt = nodePartition; |
| } else { |
| nodePartitionToLookAt = RMNodeLabelsManager.NO_LABEL; |
| } |
| |
| if (null == requestedPartition) { |
| requestedPartition = RMNodeLabelsManager.NO_LABEL; |
| } |
| return requestedPartition.equals(nodePartitionToLookAt); |
| } |
| |
| private static boolean hasPendingResourceRequest(ResourceCalculator rc, |
| ResourceUsage usage, String partitionToLookAt, Resource cluster) { |
| if (Resources.greaterThan(rc, cluster, |
| usage.getPending(partitionToLookAt), Resources.none())) { |
| return true; |
| } |
| return false; |
| } |
| |
| @Private |
| public static boolean hasPendingResourceRequest(ResourceCalculator rc, |
| ResourceUsage usage, String nodePartition, Resource cluster, |
| SchedulingMode schedulingMode) { |
| String partitionToLookAt = nodePartition; |
| if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { |
| partitionToLookAt = RMNodeLabelsManager.NO_LABEL; |
| } |
| return hasPendingResourceRequest(rc, usage, partitionToLookAt, cluster); |
| } |
| } |