| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.yarn.server.resourcemanager.scheduler; |
| |
| import java.io.IOException; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; |
| import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; |
| import org.apache.hadoop.thirdparty.com.google.common.collect.Maps; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.hadoop.util.Time; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.classification.InterfaceStability.Unstable; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.ContainerExitStatus; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.ContainerState; |
| import org.apache.hadoop.yarn.api.records.ContainerStatus; |
| import org.apache.hadoop.yarn.api.records.QueueACL; |
| import org.apache.hadoop.yarn.api.records.QueueInfo; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceInformation; |
| import org.apache.hadoop.yarn.api.records.ResourceRequest; |
| import org.apache.hadoop.yarn.exceptions.InvalidLabelResourceRequestException; |
| import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; |
| import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException |
| .InvalidResourceType; |
| import org.apache.hadoop.yarn.exceptions |
| .SchedulerInvalidResoureRequestException; |
| import org.apache.hadoop.yarn.factories.RecordFactory; |
| import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; |
| import org.apache.hadoop.yarn.security.AccessType; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMContext; |
| import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; |
| import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; |
| import org.apache.hadoop.yarn.util.UnitsConversionUtil; |
| import org.apache.hadoop.yarn.util.resource.ResourceCalculator; |
| import org.apache.hadoop.yarn.util.resource.ResourceUtils; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| |
| import static org.apache.hadoop.yarn.exceptions |
| .InvalidResourceRequestException |
| .GREATER_THAN_MAX_RESOURCE_MESSAGE_TEMPLATE; |
| import static org.apache.hadoop.yarn.exceptions |
| .InvalidResourceRequestException |
| .LESS_THAN_ZERO_RESOURCE_MESSAGE_TEMPLATE; |
| import static org.apache.hadoop.yarn.exceptions |
| .InvalidResourceRequestException.UNKNOWN_REASON_MESSAGE_TEMPLATE; |
| |
| /** |
| * Utilities shared by schedulers. |
| */ |
| @Private |
| @Unstable |
| public class SchedulerUtils { |
| |
| /** |
| * This class contains invalid resource information along with its |
| * resource request. |
| */ |
| public static class MaxResourceValidationResult { |
| private ResourceRequest resourceRequest; |
| private List<ResourceInformation> invalidResources; |
| |
| MaxResourceValidationResult(ResourceRequest resourceRequest, |
| List<ResourceInformation> invalidResources) { |
| this.resourceRequest = resourceRequest; |
| this.invalidResources = invalidResources; |
| } |
| |
| public boolean isValid() { |
| return invalidResources.isEmpty(); |
| } |
| |
| @Override |
| public String toString() { |
| return "MaxResourceValidationResult{" + "resourceRequest=" |
| + resourceRequest + ", invalidResources=" + invalidResources + '}'; |
| } |
| } |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(SchedulerUtils.class); |
| |
| private static final RecordFactory recordFactory = |
| RecordFactoryProvider.getRecordFactory(null); |
| |
| public static final String RELEASED_CONTAINER = |
| "Container released by application"; |
| |
| public static final String UPDATED_CONTAINER = |
| "Temporary container killed by application for ExeutionType update"; |
| |
| public static final String LOST_CONTAINER = |
| "Container released on a *lost* node"; |
| |
| public static final String PREEMPTED_CONTAINER = |
| "Container preempted by scheduler"; |
| |
| public static final String COMPLETED_APPLICATION = |
| "Container of a completed application"; |
| |
| public static final String EXPIRED_CONTAINER = |
| "Container expired since it was unused"; |
| |
| public static final String UNRESERVED_CONTAINER = |
| "Container reservation no longer required."; |
| |
| /** |
| * Utility to create a {@link ContainerStatus} during exceptional |
| * circumstances. |
| * |
| * @param containerId {@link ContainerId} of returned/released/lost container. |
| * @param diagnostics diagnostic message |
| * @return <code>ContainerStatus</code> for an returned/released/lost |
| * container |
| */ |
| public static ContainerStatus createAbnormalContainerStatus( |
| ContainerId containerId, String diagnostics) { |
| return createAbnormalContainerStatus(containerId, |
| ContainerExitStatus.ABORTED, diagnostics); |
| } |
| |
| |
| /** |
| * Utility to create a {@link ContainerStatus} for killed containers. |
| * @param containerId {@link ContainerId} of the killed container. |
| * @param diagnostics diagnostic message |
| * @return <code>ContainerStatus</code> for a killed container |
| */ |
| public static ContainerStatus createKilledContainerStatus( |
| ContainerId containerId, String diagnostics) { |
| return createAbnormalContainerStatus(containerId, |
| ContainerExitStatus.KILLED_BY_RESOURCEMANAGER, diagnostics); |
| } |
| |
| /** |
| * Utility to create a {@link ContainerStatus} during exceptional |
| * circumstances. |
| * |
| * @param containerId {@link ContainerId} of returned/released/lost container. |
| * @param diagnostics diagnostic message |
| * @return <code>ContainerStatus</code> for an returned/released/lost |
| * container |
| */ |
| public static ContainerStatus createPreemptedContainerStatus( |
| ContainerId containerId, String diagnostics) { |
| return createAbnormalContainerStatus(containerId, |
| ContainerExitStatus.PREEMPTED, diagnostics); |
| } |
| |
| /** |
| * Utility to create a {@link ContainerStatus} during exceptional |
| * circumstances. |
| * |
| * @param containerId {@link ContainerId} of returned/released/lost container. |
| * @param diagnostics diagnostic message |
| * @return <code>ContainerStatus</code> for an returned/released/lost |
| * container |
| */ |
| private static ContainerStatus createAbnormalContainerStatus( |
| ContainerId containerId, int exitStatus, String diagnostics) { |
| ContainerStatus containerStatus = |
| recordFactory.newRecordInstance(ContainerStatus.class); |
| containerStatus.setContainerId(containerId); |
| containerStatus.setDiagnostics(diagnostics); |
| containerStatus.setExitStatus(exitStatus); |
| containerStatus.setState(ContainerState.COMPLETE); |
| return containerStatus; |
| } |
| |
| /** |
| * Utility method to normalize a resource request, by ensuring that the |
| * requested memory is a multiple of minMemory and is not zero. |
| */ |
| @VisibleForTesting |
| public static void normalizeRequest( |
| ResourceRequest ask, |
| ResourceCalculator resourceCalculator, |
| Resource minimumResource, |
| Resource maximumResource) { |
| ask.setCapability( |
| getNormalizedResource(ask.getCapability(), resourceCalculator, |
| minimumResource, maximumResource, minimumResource)); |
| } |
| |
| /** |
| * Utility method to normalize a resource request, by ensuring that the |
| * requested memory is a multiple of increment resource and is not zero. |
| * |
| * @return normalized resource |
| */ |
| public static Resource getNormalizedResource( |
| Resource ask, |
| ResourceCalculator resourceCalculator, |
| Resource minimumResource, |
| Resource maximumResource, |
| Resource incrementResource) { |
| Resource normalized = Resources.normalize( |
| resourceCalculator, ask, minimumResource, |
| maximumResource, incrementResource); |
| return normalized; |
| } |
| |
| private static void normalizeNodeLabelExpressionInRequest( |
| ResourceRequest resReq, QueueInfo queueInfo) { |
| |
| String labelExp = resReq.getNodeLabelExpression(); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Requested Node Label Expression : " + labelExp); |
| LOG.debug("Queue Info : " + queueInfo); |
| } |
| |
| // if queue has default label expression, and RR doesn't have, use the |
| // default label expression of queue |
| if (labelExp == null && queueInfo != null && ResourceRequest.ANY |
| .equals(resReq.getResourceName())) { |
| LOG.debug("Setting default node label expression : {}", queueInfo |
| .getDefaultNodeLabelExpression()); |
| labelExp = queueInfo.getDefaultNodeLabelExpression(); |
| } |
| |
| // If labelExp still equals to null, it could either be a dynamic queue |
| // or the label is not configured |
| // set it to be NO_LABEL in case of a pre-configured queue. Dynamic |
| // queues are handled in RMAppAttemptImp.ScheduledTransition |
| if (labelExp == null && queueInfo != null) { |
| labelExp = RMNodeLabelsManager.NO_LABEL; |
| } |
| |
| if (labelExp != null) { |
| resReq.setNodeLabelExpression(labelExp); |
| } |
| } |
| |
| public static void normalizeAndValidateRequest(ResourceRequest resReq, |
| Resource maximumAllocation, String queueName, boolean isRecovery, |
| RMContext rmContext, QueueInfo queueInfo, boolean nodeLabelsEnabled) |
| throws InvalidResourceRequestException { |
| Configuration conf = rmContext.getYarnConfiguration(); |
| // If Node label is not enabled throw exception |
| if (null != conf && !nodeLabelsEnabled) { |
| String labelExp = resReq.getNodeLabelExpression(); |
| if (!(RMNodeLabelsManager.NO_LABEL.equals(labelExp) |
| || null == labelExp)) { |
| String message = "NodeLabel is not enabled in cluster, but resource" |
| + " request contains a label expression."; |
| LOG.warn(message); |
| if (!isRecovery) { |
| throw new InvalidLabelResourceRequestException( |
| "Invalid resource request, node label not enabled " |
| + "but request contains label expression"); |
| } |
| } |
| } |
| if (null == queueInfo) { |
| try { |
| queueInfo = rmContext.getScheduler().getQueueInfo(queueName, false, |
| false); |
| } catch (IOException e) { |
| //Queue may not exist since it could be auto-created in case of |
| // dynamic queues |
| } |
| } |
| SchedulerUtils.normalizeNodeLabelExpressionInRequest(resReq, queueInfo); |
| |
| if (!isRecovery) { |
| validateResourceRequest(resReq, maximumAllocation, queueInfo, rmContext); |
| } |
| } |
| |
| public static void normalizeAndValidateRequest(ResourceRequest resReq, |
| Resource maximumAllocation, String queueName, RMContext rmContext, |
| QueueInfo queueInfo, boolean nodeLabelsEnabled) |
| throws InvalidResourceRequestException { |
| normalizeAndValidateRequest(resReq, maximumAllocation, queueName, false, |
| rmContext, queueInfo, nodeLabelsEnabled); |
| } |
| |
| /** |
| * If RM should enforce partition exclusivity for enforced partition "x": |
| * 1) If request is "x" and app label is not "x", |
| * override request to app's label. |
| * 2) If app label is "x", ensure request is "x". |
| * @param resReq resource request |
| * @param enforcedPartitions list of exclusive enforced partitions |
| * @param appLabel app's node label expression |
| */ |
| public static void enforcePartitionExclusivity(ResourceRequest resReq, |
| Set<String> enforcedPartitions, String appLabel) { |
| if (enforcedPartitions == null || enforcedPartitions.isEmpty()) { |
| return; |
| } |
| if (!enforcedPartitions.contains(appLabel) |
| && enforcedPartitions.contains(resReq.getNodeLabelExpression())) { |
| resReq.setNodeLabelExpression(appLabel); |
| } |
| if (enforcedPartitions.contains(appLabel)) { |
| resReq.setNodeLabelExpression(appLabel); |
| } |
| } |
| |
| /** |
| * Utility method to validate a resource request, by ensuring that the |
| * requested memory/vcore is non-negative and not greater than max |
| * |
| * @throws InvalidResourceRequestException when there is invalid request |
| */ |
| private static void validateResourceRequest(ResourceRequest resReq, |
| Resource maximumAllocation, QueueInfo queueInfo, RMContext rmContext) |
| throws InvalidResourceRequestException { |
| final Resource requestedResource = resReq.getCapability(); |
| checkResourceRequestAgainstAvailableResource(requestedResource, |
| maximumAllocation); |
| |
| String labelExp = resReq.getNodeLabelExpression(); |
| // we don't allow specify label expression other than resourceName=ANY now |
| if (!ResourceRequest.ANY.equals(resReq.getResourceName()) |
| && labelExp != null && !labelExp.trim().isEmpty()) { |
| throw new InvalidLabelResourceRequestException( |
| "Invalid resource request, queue=" + queueInfo.getQueueName() |
| + " specified node label expression in a " |
| + "resource request has resource name = " |
| + resReq.getResourceName()); |
| } |
| |
| // we don't allow specify label expression with more than one node labels now |
| if (labelExp != null && labelExp.contains("&&")) { |
| throw new InvalidLabelResourceRequestException( |
| "Invalid resource request, queue=" + queueInfo.getQueueName() |
| + " specified more than one node label " |
| + "in a node label expression, node label expression = " |
| + labelExp); |
| } |
| |
| if (labelExp != null && !labelExp.trim().isEmpty() && queueInfo != null) { |
| if (!checkQueueLabelExpression(queueInfo.getAccessibleNodeLabels(), |
| labelExp, rmContext)) { |
| throw new InvalidLabelResourceRequestException( |
| "Invalid resource request" + ", queue=" + queueInfo.getQueueName() |
| + " doesn't have permission to access all labels " |
| + "in resource request. labelExpression of resource request=" |
| + labelExp + ". Queue labels=" |
| + (queueInfo.getAccessibleNodeLabels() == null ? "" |
| : StringUtils.join( |
| queueInfo.getAccessibleNodeLabels().iterator(), ','))); |
| } else { |
| checkQueueLabelInLabelManager(labelExp, rmContext); |
| } |
| } |
| } |
| |
| private static Map<String, ResourceInformation> getZeroResources( |
| Resource resource) { |
| Map<String, ResourceInformation> resourceInformations = Maps.newHashMap(); |
| int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); |
| |
| for (int i = 0; i < maxLength; i++) { |
| ResourceInformation resourceInformation = |
| resource.getResourceInformation(i); |
| if (resourceInformation.getValue() == 0L) { |
| resourceInformations.put(resourceInformation.getName(), |
| resourceInformation); |
| } |
| } |
| return resourceInformations; |
| } |
| |
| @Private |
| @VisibleForTesting |
| static void checkResourceRequestAgainstAvailableResource(Resource reqResource, |
| Resource availableResource) throws InvalidResourceRequestException { |
| for (int i = 0; i < ResourceUtils.getNumberOfCountableResourceTypes(); i++) { |
| final ResourceInformation requestedRI = |
| reqResource.getResourceInformation(i); |
| final String reqResourceName = requestedRI.getName(); |
| |
| if (requestedRI.getValue() < 0) { |
| throwInvalidResourceException(reqResource, availableResource, |
| reqResourceName, InvalidResourceType.LESS_THAN_ZERO); |
| } |
| |
| boolean valid = checkResource(requestedRI, availableResource); |
| if (!valid) { |
| throwInvalidResourceException(reqResource, availableResource, |
| reqResourceName, InvalidResourceType.GREATER_THEN_MAX_ALLOCATION); |
| } |
| } |
| } |
| |
| public static MaxResourceValidationResult |
| validateResourceRequestsAgainstQueueMaxResource( |
| ResourceRequest resReq, Resource availableResource) |
| throws SchedulerInvalidResoureRequestException { |
| final Resource reqResource = resReq.getCapability(); |
| Map<String, ResourceInformation> resourcesWithZeroAmount = |
| getZeroResources(availableResource); |
| |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Resources with zero amount: " |
| + Arrays.toString(resourcesWithZeroAmount.entrySet().toArray())); |
| } |
| |
| List<ResourceInformation> invalidResources = Lists.newArrayList(); |
| for (int i = 0; i < ResourceUtils.getNumberOfCountableResourceTypes(); i++) { |
| final ResourceInformation requestedRI = |
| reqResource.getResourceInformation(i); |
| final String reqResourceName = requestedRI.getName(); |
| |
| if (resourcesWithZeroAmount.containsKey(reqResourceName) |
| && requestedRI.getValue() > 0) { |
| invalidResources.add(requestedRI); |
| } |
| } |
| return new MaxResourceValidationResult(resReq, invalidResources); |
| } |
| |
| /** |
| * Checks requested ResouceInformation against available Resource. |
| * @param requestedRI |
| * @param availableResource |
| * @return true if request is valid, false otherwise. |
| */ |
| private static boolean checkResource( |
| ResourceInformation requestedRI, Resource availableResource) { |
| final ResourceInformation availableRI = |
| availableResource.getResourceInformation(requestedRI.getName()); |
| |
| long requestedResourceValue = requestedRI.getValue(); |
| long availableResourceValue = availableRI.getValue(); |
| int unitsRelation = UnitsConversionUtil.compareUnits(requestedRI.getUnits(), |
| availableRI.getUnits()); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Requested resource information: " + requestedRI); |
| LOG.debug("Available resource information: " + availableRI); |
| LOG.debug("Relation of units: " + unitsRelation); |
| } |
| |
| // requested resource unit is less than available resource unit |
| // e.g. requestedUnit: "m", availableUnit: "K") |
| if (unitsRelation < 0) { |
| availableResourceValue = |
| UnitsConversionUtil.convert(availableRI.getUnits(), |
| requestedRI.getUnits(), availableRI.getValue()); |
| |
| // requested resource unit is greater than available resource unit |
| // e.g. requestedUnit: "G", availableUnit: "M") |
| } else if (unitsRelation > 0) { |
| requestedResourceValue = |
| UnitsConversionUtil.convert(requestedRI.getUnits(), |
| availableRI.getUnits(), requestedRI.getValue()); |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Requested resource value after conversion: " |
| + requestedResourceValue); |
| LOG.info("Available resource value after conversion: " |
| + availableResourceValue); |
| } |
| |
| return requestedResourceValue <= availableResourceValue; |
| } |
| |
| private static void throwInvalidResourceException(Resource reqResource, |
| Resource maxAllowedAllocation, String reqResourceName, |
| InvalidResourceType invalidResourceType) |
| throws InvalidResourceRequestException { |
| final String message; |
| |
| if (invalidResourceType == InvalidResourceType.LESS_THAN_ZERO) { |
| message = String.format(LESS_THAN_ZERO_RESOURCE_MESSAGE_TEMPLATE, |
| reqResourceName, reqResource); |
| } else if (invalidResourceType == |
| InvalidResourceType.GREATER_THEN_MAX_ALLOCATION) { |
| message = String.format(GREATER_THAN_MAX_RESOURCE_MESSAGE_TEMPLATE, |
| reqResourceName, reqResource, maxAllowedAllocation, |
| ResourceUtils.getResourceTypesMaximumAllocation()); |
| } else if (invalidResourceType == InvalidResourceType.UNKNOWN) { |
| message = String.format(UNKNOWN_REASON_MESSAGE_TEMPLATE, reqResourceName, |
| reqResource); |
| } else { |
| throw new IllegalArgumentException(String.format( |
| "InvalidResourceType argument should be either " + "%s, %s or %s", |
| InvalidResourceType.LESS_THAN_ZERO, |
| InvalidResourceType.GREATER_THEN_MAX_ALLOCATION, |
| InvalidResourceType.UNKNOWN)); |
| } |
| throw new InvalidResourceRequestException(message, invalidResourceType); |
| } |
| |
| private static void checkQueueLabelInLabelManager(String labelExpression, |
| RMContext rmContext) throws InvalidLabelResourceRequestException { |
| // check node label manager contains this label |
| if (null != rmContext) { |
| RMNodeLabelsManager nlm = rmContext.getNodeLabelManager(); |
| if (nlm != null && !nlm.containsNodeLabel(labelExpression)) { |
| throw new InvalidLabelResourceRequestException( |
| "Invalid label resource request, cluster do not contain " |
| + ", label= " + labelExpression); |
| } |
| } |
| } |
| |
| /** |
| * Check queue label expression, check if node label in queue's |
| * node-label-expression existed in clusterNodeLabels if rmContext != null |
| */ |
| public static boolean checkQueueLabelExpression(Set<String> queueLabels, |
| String labelExpression, RMContext rmContext) { |
| // if label expression is empty, we can allocate container on any node |
| if (labelExpression == null) { |
| return true; |
| } |
| for (String str : labelExpression.split("&&")) { |
| str = str.trim(); |
| if (!str.trim().isEmpty()) { |
| // check queue label |
| if (queueLabels == null) { |
| return false; |
| } else { |
| if (!queueLabels.contains(str) |
| && !queueLabels.contains(RMNodeLabelsManager.ANY)) { |
| return false; |
| } |
| } |
| } |
| } |
| return true; |
| } |
| |
| |
| public static AccessType toAccessType(QueueACL acl) { |
| switch (acl) { |
| case ADMINISTER_QUEUE: |
| return AccessType.ADMINISTER_QUEUE; |
| case SUBMIT_APPLICATIONS: |
| return AccessType.SUBMIT_APP; |
| } |
| return null; |
| } |
| |
| private static boolean hasPendingResourceRequest(ResourceCalculator rc, |
| ResourceUsage usage, String partitionToLookAt, Resource cluster) { |
| if (Resources.greaterThan(rc, cluster, |
| usage.getPending(partitionToLookAt), Resources.none())) { |
| return true; |
| } |
| return false; |
| } |
| |
| @Private |
| public static boolean hasPendingResourceRequest(ResourceCalculator rc, |
| ResourceUsage usage, String nodePartition, Resource cluster, |
| SchedulingMode schedulingMode) { |
| String partitionToLookAt = nodePartition; |
| if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { |
| partitionToLookAt = RMNodeLabelsManager.NO_LABEL; |
| } |
| return hasPendingResourceRequest(rc, usage, partitionToLookAt, cluster); |
| } |
| |
| public static RMContainer createOpportunisticRmContainer(RMContext rmContext, |
| Container container, boolean isRemotelyAllocated) { |
| SchedulerNode node = ((AbstractYarnScheduler) rmContext.getScheduler()) |
| .getNode(container.getNodeId()); |
| if (node == null) { |
| return null; |
| } |
| SchedulerApplicationAttempt appAttempt = |
| ((AbstractYarnScheduler) rmContext.getScheduler()) |
| .getCurrentAttemptForContainer(container.getId()); |
| RMContainer rmContainer = new RMContainerImpl(container, |
| SchedulerRequestKey.extractFrom(container), |
| appAttempt.getApplicationAttemptId(), container.getNodeId(), |
| appAttempt.getUser(), rmContext, isRemotelyAllocated); |
| appAttempt.addRMContainer(container.getId(), rmContainer); |
| node.allocateContainer(rmContainer); |
| return rmContainer; |
| } |
| |
| public static boolean isNodeHeartbeated(SchedulerNode node, |
| long skipNodeInterval) { |
| long timeElapsedFromLastHeartbeat = |
| Time.monotonicNow() - node.getLastHeartbeatMonotonicTime(); |
| return timeElapsedFromLastHeartbeat <= skipNodeInterval; |
| } |
| } |