blob: a279e3e3fe040abb5e1ed886918ebcd4220e6b98 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
* Allocate normal (new) containers, considers locality/label, etc. Using
* delayed scheduling mechanism to get better locality allocation.
*/
public class RegularContainerAllocator extends AbstractContainerAllocator {
private static final Log LOG = LogFactory.getLog(RegularContainerAllocator.class);
public RegularContainerAllocator(FiCaSchedulerApp application,
ResourceCalculator rc, RMContext rmContext,
ActivitiesManager activitiesManager) {
super(application, rc, rmContext, activitiesManager);
}
private boolean checkHeadroom(Resource clusterResource,
ResourceLimits currentResourceLimits, Resource required,
String nodePartition) {
// If headroom + currentReservation < required, we cannot allocate this
// require
Resource resourceCouldBeUnReserved = application.getCurrentReservation();
if (!application.getCSLeafQueue().getReservationContinueLooking()
|| !nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
// If we don't allow reservation continuous looking, OR we're looking at
// non-default node partition, we won't allow to unreserve before
// allocation.
resourceCouldBeUnReserved = Resources.none();
}
return Resources.greaterThanOrEqual(rc, clusterResource, Resources.add(
currentResourceLimits.getHeadroom(), resourceCouldBeUnReserved),
required);
}
/*
* Pre-check if we can allocate a pending resource request
* (given schedulerKey) to a given PlacementSet.
* We will consider stuffs like exclusivity, pending resource, node partition,
* headroom, etc.
*/
private ContainerAllocation preCheckForPlacementSet(Resource clusterResource,
PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode,
ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey) {
Priority priority = schedulerKey.getPriority();
FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps);
PendingAsk offswitchPendingAsk = application.getPendingAsk(schedulerKey,
ResourceRequest.ANY);
if (offswitchPendingAsk.getCount() <= 0) {
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.PRIORITY_SKIPPED_BECAUSE_NULL_ANY_REQUEST);
return ContainerAllocation.PRIORITY_SKIPPED;
}
// Required resource
Resource required = offswitchPendingAsk.getPerAllocationResource();
// Do we need containers at this 'priority'?
if (application.getOutstandingAsksCount(schedulerKey) <= 0) {
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.APPLICATION_PRIORITY_DO_NOT_NEED_RESOURCE);
return ContainerAllocation.PRIORITY_SKIPPED;
}
// AM container allocation doesn't support non-exclusive allocation to
// avoid painful of preempt an AM container
if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
if (application.isWaitingForAMContainer()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip allocating AM container to app_attempt="
+ application.getApplicationAttemptId()
+ ", don't allow to allocate AM container in non-exclusive mode");
}
application.updateAppSkipNodeDiagnostics(
"Skipping assigning to Node in Ignore Exclusivity mode. ");
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.SKIP_IN_IGNORE_EXCLUSIVITY_MODE);
return ContainerAllocation.APP_SKIPPED;
}
}
// Is the nodePartition of pending request matches the node's partition
// If not match, jump to next priority.
if (!appInfo.acceptNodePartition(schedulerKey, node.getPartition(),
schedulingMode)) {
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.
PRIORITY_SKIPPED_BECAUSE_NODE_PARTITION_DOES_NOT_MATCH_REQUEST);
return ContainerAllocation.PRIORITY_SKIPPED;
}
if (!application.getCSLeafQueue().getReservationContinueLooking()) {
if (!shouldAllocOrReserveNewContainer(schedulerKey, required)) {
if (LOG.isDebugEnabled()) {
LOG.debug("doesn't need containers based on reservation algo!");
}
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.DO_NOT_NEED_ALLOCATIONATTEMPTINFOS);
return ContainerAllocation.PRIORITY_SKIPPED;
}
}
if (!checkHeadroom(clusterResource, resourceLimits, required,
ps.getPartition())) {
if (LOG.isDebugEnabled()) {
LOG.debug("cannot allocate required resource=" + required
+ " because of headroom");
}
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.QUEUE_SKIPPED_HEADROOM);
return ContainerAllocation.QUEUE_SKIPPED;
}
// Increase missed-non-partitioned-resource-request-opportunity.
// This is to make sure non-partitioned-resource-request will prefer
// to be allocated to non-partitioned nodes
int missedNonPartitionedRequestSchedulingOpportunity = 0;
// Only do this when request associated with given scheduler key accepts
// NO_LABEL under RESPECT_EXCLUSIVITY mode
if (StringUtils.equals(RMNodeLabelsManager.NO_LABEL,
appInfo.getSchedulingPlacementSet(schedulerKey)
.getPrimaryRequestedNodePartition())) {
missedNonPartitionedRequestSchedulingOpportunity =
application.addMissedNonPartitionedRequestSchedulingOpportunity(
schedulerKey);
}
if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
// Before doing allocation, we need to check scheduling opportunity to
// make sure : non-partitioned resource request should be scheduled to
// non-partitioned partition first.
if (missedNonPartitionedRequestSchedulingOpportunity < rmContext
.getScheduler().getNumClusterNodes()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId()
+ " priority=" + schedulerKey.getPriority()
+ " because missed-non-partitioned-resource-request"
+ " opportunity under required:" + " Now="
+ missedNonPartitionedRequestSchedulingOpportunity + " required="
+ rmContext.getScheduler().getNumClusterNodes());
}
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.NON_PARTITIONED_PARTITION_FIRST);
return ContainerAllocation.APP_SKIPPED;
}
}
return null;
}
private ContainerAllocation checkIfNodeBlackListed(FiCaSchedulerNode node,
SchedulerRequestKey schedulerKey) {
Priority priority = schedulerKey.getPriority();
if (SchedulerAppUtils.isPlaceBlacklisted(application, node, LOG)) {
application.updateAppSkipNodeDiagnostics(
CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_IN_BLACK_LISTED_NODE);
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.SKIP_BLACK_LISTED_NODE);
return ContainerAllocation.APP_SKIPPED;
}
return null;
}
ContainerAllocation tryAllocateOnNode(Resource clusterResource,
FiCaSchedulerNode node, SchedulingMode schedulingMode,
ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey,
RMContainer reservedContainer) {
ContainerAllocation result;
// Sanity checks before assigning to this node
result = checkIfNodeBlackListed(node, schedulerKey);
if (null != result) {
return result;
}
// Inform the application it is about to get a scheduling opportunity
// TODO, we may need to revisit here to see if we should add scheduling
// opportunity here
application.addSchedulingOpportunity(schedulerKey);
// Try to allocate containers on node
result =
assignContainersOnNode(clusterResource, node, schedulerKey,
reservedContainer, schedulingMode, resourceLimits);
if (null == reservedContainer) {
if (result.state == AllocationState.PRIORITY_SKIPPED) {
// Don't count 'skipped nodes' as a scheduling opportunity!
application.subtractSchedulingOpportunity(schedulerKey);
}
}
return result;
}
public float getLocalityWaitFactor(
SchedulerRequestKey schedulerKey, int clusterNodes) {
// Estimate: Required unique resources (i.e. hosts + racks)
int requiredResources = Math.max(
application.getSchedulingPlacementSet(schedulerKey)
.getUniqueLocationAsks() - 1, 0);
// waitFactor can't be more than '1'
// i.e. no point skipping more than clustersize opportunities
return Math.min(((float)requiredResources / clusterNodes), 1.0f);
}
private int getActualNodeLocalityDelay() {
return Math.min(rmContext.getScheduler().getNumClusterNodes(), application
.getCSLeafQueue().getNodeLocalityDelay());
}
private int getActualRackLocalityDelay() {
return Math.min(rmContext.getScheduler().getNumClusterNodes(),
application.getCSLeafQueue().getNodeLocalityDelay()
+ application.getCSLeafQueue().getRackLocalityAdditionalDelay());
}
private boolean canAssign(SchedulerRequestKey schedulerKey,
FiCaSchedulerNode node, NodeType type, RMContainer reservedContainer) {
// Clearly we need containers for this application...
if (type == NodeType.OFF_SWITCH) {
if (reservedContainer != null) {
return true;
}
// If there are no nodes in the cluster, return false.
if (rmContext.getScheduler().getNumClusterNodes() == 0) {
return false;
}
// If we have only ANY requests for this schedulerKey, we should not
// delay its scheduling.
if (application.getSchedulingPlacementSet(schedulerKey)
.getUniqueLocationAsks() == 1) {
return true;
}
// 'Delay' off-switch
long missedOpportunities =
application.getSchedulingOpportunities(schedulerKey);
// If rack locality additional delay parameter is enabled.
if (application.getCSLeafQueue().getRackLocalityAdditionalDelay() > -1) {
return missedOpportunities > getActualRackLocalityDelay();
} else {
long requiredContainers = application.getOutstandingAsksCount(
schedulerKey);
float localityWaitFactor = getLocalityWaitFactor(schedulerKey,
rmContext.getScheduler().getNumClusterNodes());
// Cap the delay by the number of nodes in the cluster.
return (Math.min(rmContext.getScheduler().getNumClusterNodes(),
(requiredContainers * localityWaitFactor)) < missedOpportunities);
}
}
// Check if we need containers on this rack
if (application.getOutstandingAsksCount(schedulerKey, node.getRackName())
<= 0) {
return false;
}
// If we are here, we do need containers on this rack for RACK_LOCAL req
if (type == NodeType.RACK_LOCAL) {
// 'Delay' rack-local just a little bit...
long missedOpportunities =
application.getSchedulingOpportunities(schedulerKey);
return getActualNodeLocalityDelay() < missedOpportunities;
}
// Check if we need containers on this host
if (type == NodeType.NODE_LOCAL) {
// Now check if we need containers on this host...
return application.getOutstandingAsksCount(schedulerKey,
node.getNodeName()) > 0;
}
return false;
}
private ContainerAllocation assignNodeLocalContainers(
Resource clusterResource, PendingAsk nodeLocalAsk,
FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
RMContainer reservedContainer, SchedulingMode schedulingMode,
ResourceLimits currentResoureLimits) {
if (canAssign(schedulerKey, node, NodeType.NODE_LOCAL, reservedContainer)) {
return assignContainer(clusterResource, node, schedulerKey,
nodeLocalAsk, NodeType.NODE_LOCAL, reservedContainer,
schedulingMode, currentResoureLimits);
}
// Skip node-local request, go to rack-local request
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, schedulerKey.getPriority(),
ActivityDiagnosticConstant.SKIP_NODE_LOCAL_REQUEST);
return ContainerAllocation.LOCALITY_SKIPPED;
}
private ContainerAllocation assignRackLocalContainers(
Resource clusterResource, PendingAsk rackLocalAsk,
FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
RMContainer reservedContainer, SchedulingMode schedulingMode,
ResourceLimits currentResoureLimits) {
if (canAssign(schedulerKey, node, NodeType.RACK_LOCAL, reservedContainer)) {
return assignContainer(clusterResource, node, schedulerKey,
rackLocalAsk, NodeType.RACK_LOCAL, reservedContainer,
schedulingMode, currentResoureLimits);
}
// Skip rack-local request, go to off-switch request
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, schedulerKey.getPriority(),
ActivityDiagnosticConstant.SKIP_RACK_LOCAL_REQUEST);
return ContainerAllocation.LOCALITY_SKIPPED;
}
private ContainerAllocation assignOffSwitchContainers(
Resource clusterResource, PendingAsk offSwitchAsk,
FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
RMContainer reservedContainer, SchedulingMode schedulingMode,
ResourceLimits currentResoureLimits) {
if (canAssign(schedulerKey, node, NodeType.OFF_SWITCH, reservedContainer)) {
return assignContainer(clusterResource, node, schedulerKey,
offSwitchAsk, NodeType.OFF_SWITCH, reservedContainer,
schedulingMode, currentResoureLimits);
}
application.updateAppSkipNodeDiagnostics(
CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_DUE_TO_LOCALITY);
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, schedulerKey.getPriority(),
ActivityDiagnosticConstant.SKIP_OFF_SWITCH_REQUEST);
return ContainerAllocation.APP_SKIPPED;
}
private ContainerAllocation assignContainersOnNode(Resource clusterResource,
FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
RMContainer reservedContainer, SchedulingMode schedulingMode,
ResourceLimits currentResoureLimits) {
Priority priority = schedulerKey.getPriority();
ContainerAllocation allocation;
NodeType requestLocalityType = null;
// Data-local
PendingAsk nodeLocalAsk =
application.getPendingAsk(schedulerKey, node.getNodeName());
if (nodeLocalAsk.getCount() > 0) {
requestLocalityType = NodeType.NODE_LOCAL;
allocation =
assignNodeLocalContainers(clusterResource, nodeLocalAsk,
node, schedulerKey, reservedContainer, schedulingMode,
currentResoureLimits);
if (Resources.greaterThan(rc, clusterResource,
allocation.getResourceToBeAllocated(), Resources.none())) {
allocation.requestLocalityType = requestLocalityType;
return allocation;
}
}
// Rack-local
PendingAsk rackLocalAsk =
application.getPendingAsk(schedulerKey, node.getRackName());
if (rackLocalAsk.getCount() > 0) {
if (!appInfo.canDelayTo(schedulerKey, node.getRackName())) {
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.SKIP_PRIORITY_BECAUSE_OF_RELAX_LOCALITY);
return ContainerAllocation.PRIORITY_SKIPPED;
}
requestLocalityType = requestLocalityType == null ?
NodeType.RACK_LOCAL :
requestLocalityType;
allocation =
assignRackLocalContainers(clusterResource, rackLocalAsk,
node, schedulerKey, reservedContainer, schedulingMode,
currentResoureLimits);
if (Resources.greaterThan(rc, clusterResource,
allocation.getResourceToBeAllocated(), Resources.none())) {
allocation.requestLocalityType = requestLocalityType;
return allocation;
}
}
// Off-switch
PendingAsk offSwitchAsk =
application.getPendingAsk(schedulerKey, ResourceRequest.ANY);
if (offSwitchAsk.getCount() > 0) {
if (!appInfo.canDelayTo(schedulerKey, ResourceRequest.ANY)) {
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.SKIP_PRIORITY_BECAUSE_OF_RELAX_LOCALITY);
return ContainerAllocation.PRIORITY_SKIPPED;
}
requestLocalityType = requestLocalityType == null ?
NodeType.OFF_SWITCH :
requestLocalityType;
allocation =
assignOffSwitchContainers(clusterResource, offSwitchAsk,
node, schedulerKey, reservedContainer, schedulingMode,
currentResoureLimits);
// When a returned allocation is LOCALITY_SKIPPED, since we're in
// off-switch request now, we will skip this app w.r.t priorities
if (allocation.state == AllocationState.LOCALITY_SKIPPED) {
allocation.state = AllocationState.APP_SKIPPED;
}
allocation.requestLocalityType = requestLocalityType;
return allocation;
}
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.PRIORITY_SKIPPED);
return ContainerAllocation.PRIORITY_SKIPPED;
}
private ContainerAllocation assignContainer(Resource clusterResource,
FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
PendingAsk pendingAsk, NodeType type, RMContainer rmContainer,
SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
Priority priority = schedulerKey.getPriority();
if (LOG.isDebugEnabled()) {
LOG.debug("assignContainers: node=" + node.getNodeName()
+ " application=" + application.getApplicationId()
+ " priority=" + schedulerKey.getPriority()
+ " pendingAsk=" + pendingAsk + " type=" + type);
}
Resource capability = pendingAsk.getPerAllocationResource();
Resource available = node.getUnallocatedResource();
Resource totalResource = node.getTotalResource();
if (!Resources.lessThanOrEqual(rc, clusterResource,
capability, totalResource)) {
LOG.warn("Node : " + node.getNodeID()
+ " does not have sufficient resource for ask : " + pendingAsk
+ " node total capability : " + node.getTotalResource());
// Skip this locality request
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.NOT_SUFFICIENT_RESOURCE);
return ContainerAllocation.LOCALITY_SKIPPED;
}
boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer(
schedulerKey, capability);
// Can we allocate a container on this node?
long availableContainers =
rc.computeAvailableContainers(available, capability);
// How much need to unreserve equals to:
// max(required - headroom, amountNeedUnreserve)
Resource resourceNeedToUnReserve =
Resources.max(rc, clusterResource,
Resources.subtract(capability, currentResoureLimits.getHeadroom()),
currentResoureLimits.getAmountNeededUnreserve());
boolean needToUnreserve =
Resources.greaterThan(rc, clusterResource,
resourceNeedToUnReserve, Resources.none());
RMContainer unreservedContainer = null;
boolean reservationsContinueLooking =
application.getCSLeafQueue().getReservationContinueLooking();
// Check if we need to kill some containers to allocate this one
List<RMContainer> toKillContainers = null;
if (availableContainers == 0 && currentResoureLimits.isAllowPreemption()) {
Resource availableAndKillable = Resources.clone(available);
for (RMContainer killableContainer : node
.getKillableContainers().values()) {
if (null == toKillContainers) {
toKillContainers = new ArrayList<>();
}
toKillContainers.add(killableContainer);
Resources.addTo(availableAndKillable,
killableContainer.getAllocatedResource());
if (Resources.fitsIn(rc,
clusterResource,
capability,
availableAndKillable)) {
// Stop if we find enough spaces
availableContainers = 1;
break;
}
}
}
if (availableContainers > 0) {
// Allocate...
// We will only do continuous reservation when this is not allocated from
// reserved container
if (rmContainer == null && reservationsContinueLooking
&& node.getLabels().isEmpty()) {
// when reservationsContinueLooking is set, we may need to unreserve
// some containers to meet this queue, its parents', or the users'
// resource limits.
// TODO, need change here when we want to support continuous reservation
// looking for labeled partitions.
if (!shouldAllocOrReserveNewContainer || needToUnreserve) {
if (!needToUnreserve) {
// If we shouldn't allocate/reserve new container then we should
// unreserve one the same size we are asking for since the
// currentResoureLimits.getAmountNeededUnreserve could be zero. If
// the limit was hit then use the amount we need to unreserve to be
// under the limit.
resourceNeedToUnReserve = capability;
}
unreservedContainer =
application.findNodeToUnreserve(clusterResource, node,
schedulerKey, resourceNeedToUnReserve);
// When (minimum-unreserved-resource > 0 OR we cannot allocate
// new/reserved
// container (That means we *have to* unreserve some resource to
// continue)). If we failed to unreserve some resource, we can't
// continue.
if (null == unreservedContainer) {
// Skip the locality request
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.LOCALITY_SKIPPED);
return ContainerAllocation.LOCALITY_SKIPPED;
}
}
}
ContainerAllocation result = new ContainerAllocation(unreservedContainer,
pendingAsk.getPerAllocationResource(), AllocationState.ALLOCATED);
result.containerNodeType = type;
result.setToKillContainers(toKillContainers);
return result;
} else {
// if we are allowed to allocate but this node doesn't have space, reserve
// it or if this was an already a reserved container, reserve it again
if (shouldAllocOrReserveNewContainer || rmContainer != null) {
if (reservationsContinueLooking && rmContainer == null) {
// we could possibly ignoring queue capacity or user limits when
// reservationsContinueLooking is set. Make sure we didn't need to
// unreserve one.
if (needToUnreserve) {
if (LOG.isDebugEnabled()) {
LOG.debug("we needed to unreserve to be able to allocate");
}
// Skip the locality request
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.LOCALITY_SKIPPED);
return ContainerAllocation.LOCALITY_SKIPPED;
}
}
ContainerAllocation result = new ContainerAllocation(null,
pendingAsk.getPerAllocationResource(), AllocationState.RESERVED);
result.containerNodeType = type;
result.setToKillContainers(null);
return result;
}
// Skip the locality request
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.LOCALITY_SKIPPED);
return ContainerAllocation.LOCALITY_SKIPPED;
}
}
boolean shouldAllocOrReserveNewContainer(
SchedulerRequestKey schedulerKey, Resource required) {
int requiredContainers =
application.getOutstandingAsksCount(schedulerKey);
int reservedContainers = application.getNumReservedContainers(schedulerKey);
int starvation = 0;
if (reservedContainers > 0) {
float nodeFactor = Resources.ratio(
rc, required, application.getCSLeafQueue().getMaximumAllocation());
// Use percentage of node required to bias against large containers...
// Protect against corner case where you need the whole node with
// Math.min(nodeFactor, minimumAllocationFactor)
starvation =
(int) ((application.getReReservations(schedulerKey) /
(float) reservedContainers) * (1.0f - (Math.min(
nodeFactor, application.getCSLeafQueue()
.getMinimumAllocationFactor()))));
if (LOG.isDebugEnabled()) {
LOG.debug("needsContainers:" + " app.#re-reserve="
+ application.getReReservations(schedulerKey) + " reserved="
+ reservedContainers + " nodeFactor=" + nodeFactor
+ " minAllocFactor="
+ application.getCSLeafQueue().getMinimumAllocationFactor()
+ " starvation=" + starvation);
}
}
return (((starvation + requiredContainers) - reservedContainers) > 0);
}
private Container getContainer(RMContainer rmContainer,
FiCaSchedulerNode node, Resource capability,
SchedulerRequestKey schedulerKey) {
return (rmContainer != null) ? rmContainer.getContainer()
: createContainer(node, capability, schedulerKey);
}
private Container createContainer(FiCaSchedulerNode node, Resource capability,
SchedulerRequestKey schedulerKey) {
NodeId nodeId = node.getRMNode().getNodeID();
// Create the container
// Now set the containerId to null first, because it is possible the
// container will be rejected because of concurrent resource allocation.
// new containerId will be generated and assigned to the container
// after confirmed.
return BuilderUtils.newContainer(null, nodeId,
node.getRMNode().getHttpAddress(), capability,
schedulerKey.getPriority(), null,
schedulerKey.getAllocationRequestId());
}
private ContainerAllocation handleNewContainerAllocation(
ContainerAllocation allocationResult, FiCaSchedulerNode node,
SchedulerRequestKey schedulerKey, Container container) {
// Inform the application
RMContainer allocatedContainer = application.allocate(node, schedulerKey,
container);
allocationResult.updatedContainer = allocatedContainer;
// Does the application need this resource?
if (allocatedContainer == null) {
// Skip this app if we failed to allocate.
ContainerAllocation ret =
new ContainerAllocation(allocationResult.containerToBeUnreserved,
null, AllocationState.APP_SKIPPED);
ActivitiesLogger.APP.recordAppActivityWithoutAllocation(activitiesManager,
node, application, schedulerKey.getPriority(),
ActivityDiagnosticConstant.FAIL_TO_ALLOCATE, ActivityState.REJECTED);
return ret;
}
return allocationResult;
}
ContainerAllocation doAllocation(ContainerAllocation allocationResult,
FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
RMContainer reservedContainer) {
// Create the container if necessary
Container container =
getContainer(reservedContainer, node,
allocationResult.getResourceToBeAllocated(), schedulerKey);
// something went wrong getting/creating the container
if (container == null) {
application
.updateAppSkipNodeDiagnostics("Scheduling of container failed. ");
LOG.warn("Couldn't get container for allocation!");
ActivitiesLogger.APP.recordAppActivityWithoutAllocation(activitiesManager,
node, application, schedulerKey.getPriority(),
ActivityDiagnosticConstant.COULD_NOT_GET_CONTAINER,
ActivityState.REJECTED);
return ContainerAllocation.APP_SKIPPED;
}
if (allocationResult.getAllocationState() == AllocationState.ALLOCATED) {
// When allocating container
allocationResult = handleNewContainerAllocation(allocationResult, node,
schedulerKey, container);
} else {
// When reserving container
RMContainer updatedContainer = reservedContainer;
if (updatedContainer == null) {
SchedulingPlacementSet<FiCaSchedulerNode> ps =
application.getAppSchedulingInfo()
.getSchedulingPlacementSet(schedulerKey);
if (null == ps) {
LOG.warn("Failed to get " + SchedulingPlacementSet.class.getName()
+ " for application=" + application.getApplicationId()
+ " schedulerRequestKey=" + schedulerKey);
ActivitiesLogger.APP
.recordAppActivityWithoutAllocation(activitiesManager, node,
application, schedulerKey.getPriority(),
ActivityDiagnosticConstant.
PRIORITY_SKIPPED_BECAUSE_NULL_ANY_REQUEST,
ActivityState.REJECTED);
return ContainerAllocation.PRIORITY_SKIPPED;
}
updatedContainer = new RMContainerImpl(container, schedulerKey,
application.getApplicationAttemptId(), node.getNodeID(),
application.getAppSchedulingInfo().getUser(), rmContext,
ps.getPrimaryRequestedNodePartition());
}
allocationResult.updatedContainer = updatedContainer;
}
// Only reset opportunities when we FIRST allocate the container. (IAW, When
// reservedContainer != null, it's not the first time)
if (reservedContainer == null) {
// Don't reset scheduling opportunities for off-switch assignments
// otherwise the app will be delayed for each non-local assignment.
// This helps apps with many off-cluster requests schedule faster.
if (allocationResult.containerNodeType != NodeType.OFF_SWITCH) {
if (LOG.isDebugEnabled()) {
LOG.debug("Resetting scheduling opportunities");
}
// Only reset scheduling opportunities for RACK_LOCAL if configured
// to do so. Not resetting means we will continue to schedule
// RACK_LOCAL without delay.
if (allocationResult.containerNodeType == NodeType.NODE_LOCAL
|| application.getCSLeafQueue().getRackLocalityFullReset()) {
application.resetSchedulingOpportunities(schedulerKey);
}
}
// Non-exclusive scheduling opportunity is different: we need reset
// it when:
// - It allocated on the default partition
//
// This is to make sure non-labeled resource request will be
// most likely allocated on non-labeled nodes first.
if (StringUtils.equals(node.getPartition(),
RMNodeLabelsManager.NO_LABEL)) {
application
.resetMissedNonPartitionedRequestSchedulingOpportunity(schedulerKey);
}
}
return allocationResult;
}
private ContainerAllocation allocate(Resource clusterResource,
PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode,
ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey,
RMContainer reservedContainer) {
// Do checks before determining which node to allocate
// Directly return if this check fails.
ContainerAllocation result;
if (reservedContainer == null) {
result = preCheckForPlacementSet(clusterResource, ps, schedulingMode,
resourceLimits, schedulerKey);
if (null != result) {
return result;
}
} else {
// pre-check when allocating reserved container
if (application.getOutstandingAsksCount(schedulerKey) == 0) {
// Release
return new ContainerAllocation(reservedContainer, null,
AllocationState.QUEUE_SKIPPED);
}
}
SchedulingPlacementSet<FiCaSchedulerNode> schedulingPS =
application.getAppSchedulingInfo().getSchedulingPlacementSet(
schedulerKey);
result = ContainerAllocation.PRIORITY_SKIPPED;
Iterator<FiCaSchedulerNode> iter = schedulingPS.getPreferredNodeIterator(
ps);
while (iter.hasNext()) {
FiCaSchedulerNode node = iter.next();
result = tryAllocateOnNode(clusterResource, node, schedulingMode,
resourceLimits, schedulerKey, reservedContainer);
if (AllocationState.ALLOCATED == result.state
|| AllocationState.RESERVED == result.state) {
result = doAllocation(result, node, schedulerKey, reservedContainer);
break;
}
}
return result;
}
@Override
public CSAssignment assignContainers(Resource clusterResource,
PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode,
ResourceLimits resourceLimits,
RMContainer reservedContainer) {
FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps);
if (reservedContainer == null) {
// Check if application needs more resource, skip if it doesn't need more.
if (!application.hasPendingResourceRequest(rc,
ps.getPartition(), clusterResource, schedulingMode)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId()
+ ", because it doesn't need more resource, schedulingMode="
+ schedulingMode.name() + " node-label=" + ps.getPartition());
}
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, application.getPriority(),
ActivityDiagnosticConstant.APPLICATION_DO_NOT_NEED_RESOURCE);
return CSAssignment.SKIP_ASSIGNMENT;
}
// Schedule in priority order
for (SchedulerRequestKey schedulerKey : application.getSchedulerKeys()) {
ContainerAllocation result =
allocate(clusterResource, ps, schedulingMode, resourceLimits,
schedulerKey, null);
AllocationState allocationState = result.getAllocationState();
if (allocationState == AllocationState.PRIORITY_SKIPPED) {
continue;
}
return getCSAssignmentFromAllocateResult(clusterResource, result,
null, node);
}
// We will reach here if we skipped all priorities of the app, so we will
// skip the app.
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, application.getPriority(),
ActivityDiagnosticConstant.SKIPPED_ALL_PRIORITIES);
return CSAssignment.SKIP_ASSIGNMENT;
} else {
ContainerAllocation result =
allocate(clusterResource, ps, schedulingMode, resourceLimits,
reservedContainer.getReservedSchedulerKey(), reservedContainer);
return getCSAssignmentFromAllocateResult(clusterResource, result,
reservedContainer, node);
}
}
}