blob: 572fb3d5773c96ea10ba846b74eb0c7573ab7a8d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAFairOrderingComparator;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPriorityComparator;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.IntraQueuePreemptionOrderPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
* FifoIntraQueuePreemptionPlugin will handle intra-queue preemption for
* priority and user-limit.
*/
public class FifoIntraQueuePreemptionPlugin
implements
IntraQueuePreemptionComputePlugin {
protected final CapacitySchedulerPreemptionContext context;
protected final ResourceCalculator rc;
private static final Log LOG =
LogFactory.getLog(FifoIntraQueuePreemptionPlugin.class);
public FifoIntraQueuePreemptionPlugin(ResourceCalculator rc,
CapacitySchedulerPreemptionContext preemptionContext) {
this.context = preemptionContext;
this.rc = rc;
}
@Override
public Collection<FiCaSchedulerApp> getPreemptableApps(String queueName,
String partition) {
TempQueuePerPartition tq = context.getQueueByPartition(queueName,
partition);
List<FiCaSchedulerApp> apps = new ArrayList<FiCaSchedulerApp>();
for (TempAppPerPartition tmpApp : tq.getApps()) {
// If a lower priority app was not selected to get preempted, mark such
// apps out from preemption candidate selection.
if (Resources.equals(tmpApp.getActuallyToBePreempted(),
Resources.none())) {
continue;
}
apps.add(tmpApp.app);
}
return apps;
}
@Override
public Map<String, Resource> getResourceDemandFromAppsPerQueue(
String queueName, String partition) {
Map<String, Resource> resToObtainByPartition = new HashMap<>();
TempQueuePerPartition tq = context
.getQueueByPartition(queueName, partition);
Collection<TempAppPerPartition> appsOrderedByPriority = tq.getApps();
Resource actualPreemptNeeded = resToObtainByPartition.get(partition);
// Updating pending resource per-partition level.
if (actualPreemptNeeded == null) {
actualPreemptNeeded = Resources.createResource(0, 0);
resToObtainByPartition.put(partition, actualPreemptNeeded);
}
for (TempAppPerPartition a1 : appsOrderedByPriority) {
Resources.addTo(actualPreemptNeeded, a1.getActuallyToBePreempted());
}
if (LOG.isDebugEnabled()) {
LOG.debug("Selected to preempt " + actualPreemptNeeded
+ " resource from partition:" + partition);
}
return resToObtainByPartition;
}
@Override
public void computeAppsIdealAllocation(Resource clusterResource,
TempQueuePerPartition tq,
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource totalPreemptedResourceAllowed,
Resource queueReassignableResource, float maxAllowablePreemptLimit) {
// 1. AM used resource can be considered as a frozen resource for now.
// Hence such containers in a queue can be omitted from the preemption
// calculation.
Map<String, Resource> perUserAMUsed = new HashMap<String, Resource>();
Resource amUsed = calculateUsedAMResourcesPerQueue(tq.partition,
tq.leafQueue, perUserAMUsed);
Resources.subtractFrom(queueReassignableResource, amUsed);
// 2. tq.leafQueue will not be null as we validated it in caller side
Collection<FiCaSchedulerApp> apps = tq.leafQueue.getAllApplications();
// We do not need preemption for a single app
if (apps.size() == 1) {
return;
}
// 3. Create all tempApps for internal calculation and return a list from
// high priority to low priority order.
PriorityQueue<TempAppPerPartition> orderedByPriority = createTempAppForResCalculation(
tq, apps, clusterResource, perUserAMUsed);
// 4. Calculate idealAssigned per app by checking based on queue's
// unallocated resource.Also return apps arranged from lower priority to
// higher priority.
TreeSet<TempAppPerPartition> orderedApps = calculateIdealAssignedResourcePerApp(
clusterResource, tq, selectedCandidates, queueReassignableResource,
orderedByPriority);
// 5. A configurable limit that could define an ideal allowable preemption
// limit. Based on current queue's capacity,defined how much % could become
// preemptable.
Resource maxIntraQueuePreemptable = Resources.multiply(tq.getGuaranteed(),
maxAllowablePreemptLimit);
if (Resources.greaterThan(rc, clusterResource, maxIntraQueuePreemptable,
tq.getActuallyToBePreempted())) {
Resources.subtractFrom(maxIntraQueuePreemptable,
tq.getActuallyToBePreempted());
} else {
maxIntraQueuePreemptable = Resource.newInstance(0, 0);
}
// 6. We have two configurations here, one is intra queue limit and second
// one is per-round limit for any time preemption. Take a minimum of these
Resource preemptionLimit = Resources.min(rc, clusterResource,
maxIntraQueuePreemptable, totalPreemptedResourceAllowed);
// 7. From lowest priority app onwards, calculate toBePreempted resource
// based on demand.
calculateToBePreemptedResourcePerApp(clusterResource, orderedApps,
Resources.clone(preemptionLimit));
// Save all apps (low to high) to temp queue for further reference
tq.addAllApps(orderedApps);
// 8. There are chances that we may preempt for the demand from same
// priority level, such cases are to be validated out.
validateOutSameAppPriorityFromDemand(clusterResource,
(TreeSet<TempAppPerPartition>) orderedApps, tq.getUsersPerPartition(),
context.getIntraQueuePreemptionOrderPolicy());
if (LOG.isDebugEnabled()) {
LOG.debug("Queue Name:" + tq.queueName + ", partition:" + tq.partition);
for (TempAppPerPartition tmpApp : tq.getApps()) {
LOG.debug(tmpApp);
}
}
}
private void calculateToBePreemptedResourcePerApp(Resource clusterResource,
TreeSet<TempAppPerPartition> orderedApps, Resource preemptionLimit) {
for (TempAppPerPartition tmpApp : orderedApps) {
if (Resources.lessThanOrEqual(rc, clusterResource, preemptionLimit,
Resources.none())
|| Resources.lessThanOrEqual(rc, clusterResource, tmpApp.getUsed(),
Resources.none())) {
continue;
}
Resource preemtableFromApp = Resources.subtract(tmpApp.getUsed(),
tmpApp.idealAssigned);
Resources.subtractFromNonNegative(preemtableFromApp, tmpApp.selected);
Resources.subtractFromNonNegative(preemtableFromApp, tmpApp.getAMUsed());
if (context.getIntraQueuePreemptionOrderPolicy()
.equals(IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST)) {
Resources.subtractFromNonNegative(preemtableFromApp,
tmpApp.getFiCaSchedulerApp().getCSLeafQueue().getMinimumAllocation());
}
// Calculate toBePreempted from apps as follows:
// app.preemptable = min(max(app.used - app.selected - app.ideal, 0),
// intra_q_preemptable)
tmpApp.toBePreempted = Resources.min(rc, clusterResource, Resources
.max(rc, clusterResource, preemtableFromApp, Resources.none()),
Resources.clone(preemptionLimit));
preemptionLimit = Resources.subtractFromNonNegative(preemptionLimit,
tmpApp.toBePreempted);
}
}
/**
* Algorithm for calculating idealAssigned is as follows:
* For each partition:
* Q.reassignable = Q.used - Q.selected;
*
* # By default set ideal assigned 0 for app.
* app.idealAssigned as 0
* # get user limit from scheduler.
* userLimitRes = Q.getUserLimit(userName)
*
* # initial all value to 0
* Map<String, Resource> userToAllocated
*
* # Loop from highest priority to lowest priority app to calculate ideal
* for app in sorted-by(priority) {
* if Q.reassignable < 0:
* break;
*
* if (user-to-allocated.get(app.user) < userLimitRes) {
* idealAssigned = min((userLimitRes - userToAllocated.get(app.user)),
* (app.used + app.pending - app.selected))
* app.idealAssigned = min(Q.reassignable, idealAssigned)
* userToAllocated.get(app.user) += app.idealAssigned;
* } else {
* // skip this app because user-limit reached
* }
* Q.reassignable -= app.idealAssigned
* }
*
* @param clusterResource Cluster Resource
* @param tq TempQueue
* @param selectedCandidates Already Selected preemption candidates
* @param queueReassignableResource Resource used in a queue
* @param orderedByPriority List of running apps
* @return List of temp apps ordered from low to high priority
*/
private TreeSet<TempAppPerPartition> calculateIdealAssignedResourcePerApp(
Resource clusterResource, TempQueuePerPartition tq,
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource queueReassignableResource,
PriorityQueue<TempAppPerPartition> orderedByPriority) {
Comparator<TempAppPerPartition> reverseComp;
OrderingPolicy<FiCaSchedulerApp> queueOrderingPolicy =
tq.leafQueue.getOrderingPolicy();
if (queueOrderingPolicy instanceof FairOrderingPolicy
&& (context.getIntraQueuePreemptionOrderPolicy()
== IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST)) {
reverseComp = Collections.reverseOrder(
new TAFairOrderingComparator(this.rc, clusterResource));
} else {
reverseComp = Collections.reverseOrder(new TAPriorityComparator());
}
TreeSet<TempAppPerPartition> orderedApps = new TreeSet<>(reverseComp);
String partition = tq.partition;
Map<String, TempUserPerPartition> usersPerPartition = tq.getUsersPerPartition();
while (!orderedByPriority.isEmpty()) {
// Remove app from the next highest remaining priority and process it to
// calculate idealAssigned per app.
TempAppPerPartition tmpApp = orderedByPriority.remove();
orderedApps.add(tmpApp);
// Once unallocated resource is 0, we can stop assigning ideal per app.
if (Resources.lessThanOrEqual(rc, clusterResource,
queueReassignableResource, Resources.none())
|| Resources.isAnyMajorResourceZero(rc, queueReassignableResource)) {
continue;
}
String userName = tmpApp.app.getUser();
TempUserPerPartition tmpUser = usersPerPartition.get(userName);
Resource userLimitResource = tmpUser.getUserLimit();
Resource idealAssignedForUser = tmpUser.idealAssigned;
// Calculate total selected container resources from current app.
getAlreadySelectedPreemptionCandidatesResource(selectedCandidates, tmpApp,
tmpUser, partition);
// For any app, used+pending will give its idealAssigned. However it will
// be tightly linked to queue's unallocated quota. So lower priority apps
// idealAssigned may fall to 0 if higher priority apps demand is more.
Resource appIdealAssigned = Resources.add(tmpApp.getUsedDeductAM(),
tmpApp.getPending());
Resources.subtractFrom(appIdealAssigned, tmpApp.selected);
if (Resources.lessThan(rc, clusterResource, idealAssignedForUser,
userLimitResource)) {
Resource idealAssigned = Resources.min(rc, clusterResource,
appIdealAssigned,
Resources.subtract(userLimitResource, idealAssignedForUser));
tmpApp.idealAssigned = Resources.clone(Resources.min(rc,
clusterResource, queueReassignableResource, idealAssigned));
Resources.addTo(idealAssignedForUser, tmpApp.idealAssigned);
} else {
continue;
}
// Also set how much resource is needed by this app from others.
Resource appUsedExcludedSelected = Resources
.subtract(tmpApp.getUsedDeductAM(), tmpApp.selected);
if (Resources.greaterThan(rc, clusterResource, tmpApp.idealAssigned,
appUsedExcludedSelected)) {
tmpApp.setToBePreemptFromOther(
Resources.subtract(tmpApp.idealAssigned, appUsedExcludedSelected));
}
Resources.subtractFromNonNegative(queueReassignableResource,
tmpApp.idealAssigned);
}
return orderedApps;
}
/*
* Previous policies would have already selected few containers from an
* application. Calculate total resource from these selected containers.
*/
private void getAlreadySelectedPreemptionCandidatesResource(
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
TempAppPerPartition tmpApp, TempUserPerPartition tmpUser,
String partition) {
tmpApp.selected = Resources.createResource(0, 0);
Set<RMContainer> containers = selectedCandidates
.get(tmpApp.app.getApplicationAttemptId());
if (containers == null) {
return;
}
for (RMContainer cont : containers) {
if (partition.equals(cont.getNodeLabelExpression())) {
Resources.addTo(tmpApp.selected, cont.getAllocatedResource());
Resources.addTo(tmpUser.selected, cont.getAllocatedResource());
}
}
}
private PriorityQueue<TempAppPerPartition> createTempAppForResCalculation(
TempQueuePerPartition tq, Collection<FiCaSchedulerApp> apps,
Resource clusterResource,
Map<String, Resource> perUserAMUsed) {
Comparator<TempAppPerPartition> taComparator;
OrderingPolicy<FiCaSchedulerApp> orderingPolicy =
tq.leafQueue.getOrderingPolicy();
if (orderingPolicy instanceof FairOrderingPolicy
&& (context.getIntraQueuePreemptionOrderPolicy()
== IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST)) {
taComparator = new TAFairOrderingComparator(this.rc, clusterResource);
} else {
taComparator = new TAPriorityComparator();
}
PriorityQueue<TempAppPerPartition> orderedByPriority = new PriorityQueue<>(
100, taComparator);
String partition = tq.partition;
Map<String, TempUserPerPartition> usersPerPartition = tq
.getUsersPerPartition();
// have an internal temp app structure to store intermediate data(priority)
for (FiCaSchedulerApp app : apps) {
Resource used = app.getAppAttemptResourceUsage().getUsed(partition);
Resource amUsed = null;
if (!app.isWaitingForAMContainer()) {
amUsed = app.getAMResource(partition);
}
Resource pending = app.getTotalPendingRequestsPerPartition()
.get(partition);
Resource reserved = app.getAppAttemptResourceUsage()
.getReserved(partition);
used = (used == null) ? Resources.createResource(0, 0) : used;
amUsed = (amUsed == null) ? Resources.createResource(0, 0) : amUsed;
pending = (pending == null) ? Resources.createResource(0, 0) : pending;
reserved = (reserved == null) ? Resources.createResource(0, 0) : reserved;
HashSet<String> partitions = new HashSet<String>(
app.getAppAttemptResourceUsage().getNodePartitionsSet());
partitions.addAll(app.getTotalPendingRequestsPerPartition().keySet());
// Create TempAppPerQueue for further calculation.
TempAppPerPartition tmpApp = new TempAppPerPartition(app,
Resources.clone(used), Resources.clone(amUsed),
Resources.clone(reserved), Resources.clone(pending));
// Set ideal allocation of app as 0.
tmpApp.idealAssigned = Resources.createResource(0, 0);
// Create a TempUserPerPartition structure to hold more information
// regarding each user's entities such as UserLimit etc. This could
// be kept in a user to TempUserPerPartition map for further reference.
String userName = app.getUser();
TempUserPerPartition tmpUser = usersPerPartition.get(userName);
if (tmpUser == null) {
ResourceUsage userResourceUsage = tq.leafQueue.getUser(userName)
.getResourceUsage();
// perUserAMUsed was populated with running apps, now we are looping
// through both running and pending apps.
Resource userSpecificAmUsed = perUserAMUsed.get(userName);
amUsed = (userSpecificAmUsed == null)
? Resources.none() : userSpecificAmUsed;
tmpUser = new TempUserPerPartition(
tq.leafQueue.getUser(userName), tq.queueName,
Resources.clone(userResourceUsage.getUsed(partition)),
Resources.clone(amUsed),
Resources.clone(userResourceUsage.getReserved(partition)),
Resources.none());
Resource userLimitResource = Resources.clone(
tq.leafQueue.getResourceLimitForAllUsers(userName, clusterResource,
partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
// Real AM used need not have to be considered for user-limit as well.
userLimitResource = Resources.subtract(userLimitResource,
tmpUser.amUsed);
tmpUser.setUserLimit(userLimitResource);
if (LOG.isDebugEnabled()) {
LOG.debug("TempUser:" + tmpUser);
}
tmpUser.idealAssigned = Resources.createResource(0, 0);
tq.addUserPerPartition(userName, tmpUser);
}
tmpApp.setTempUserPerPartition(tmpUser);
orderedByPriority.add(tmpApp);
}
return orderedByPriority;
}
/*
* Fifo+Priority based preemption policy need not have to preempt resources at
* same priority level. Such cases will be validated out. But if the demand is
* from an app of different user, force to preempt resources even if apps are
* at same priority.
*/
public void validateOutSameAppPriorityFromDemand(Resource cluster,
TreeSet<TempAppPerPartition> orderedApps,
Map<String, TempUserPerPartition> usersPerPartition,
IntraQueuePreemptionOrderPolicy intraQueuePreemptionOrder) {
TempAppPerPartition[] apps = orderedApps
.toArray(new TempAppPerPartition[orderedApps.size()]);
if (apps.length <= 0) {
return;
}
for (int hPriority = apps.length - 1; hPriority >= 0; hPriority--) {
// Check whether high priority app with demand needs resource from other
// user.
if (Resources.greaterThan(rc, cluster,
apps[hPriority].getToBePreemptFromOther(), Resources.none())) {
// Given we have a demand from a high priority app, we can do a reverse
// scan from lower priority apps to select resources.
// Since idealAssigned of each app has considered user-limit, this logic
// will provide eventual consistency w.r.t user-limit as well.
for (int lPriority = 0; lPriority < apps.length; lPriority++) {
// Check whether app with demand needs resource from other user.
if (Resources.greaterThan(rc, cluster, apps[lPriority].toBePreempted,
Resources.none())) {
// If apps are of same user, and priority is same, then skip.
if ((apps[hPriority].getUser().equals(apps[lPriority].getUser()))
&& (apps[lPriority].getPriority() >= apps[hPriority]
.getPriority())) {
continue;
}
if (Resources.lessThanOrEqual(rc, cluster,
apps[lPriority].toBePreempted,
apps[lPriority].getActuallyToBePreempted())
|| Resources.equals(apps[hPriority].getToBePreemptFromOther(),
Resources.none())) {
continue;
}
// Ideally if any application has a higher priority, then it can
// force to preempt any lower priority app from any user. However
// if admin enforces user-limit over priority, preemption module
// will not choose lower priority apps from usre's who are not yet
// met its user-limit.
TempUserPerPartition tmpUser = usersPerPartition
.get(apps[lPriority].getUser());
if ((!apps[hPriority].getUser().equals(apps[lPriority].getUser()))
&& (!tmpUser.isUserLimitReached(rc, cluster))
&& (intraQueuePreemptionOrder
.equals(IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST))) {
continue;
}
Resource toPreemptFromOther = apps[hPriority]
.getToBePreemptFromOther();
Resource actuallyToPreempt = apps[lPriority]
.getActuallyToBePreempted();
// A lower priority app could offer more resource to preempt, if
// multiple higher priority/under served users needs resources.
// After one iteration, we need to ensure that actuallyToPreempt is
// subtracted from the resource to preempt.
Resource preemptableFromLowerPriorityApp = Resources
.subtract(apps[lPriority].toBePreempted, actuallyToPreempt);
// In case of user-limit preemption, when app's are from different
// user and of same priority, we will do user-limit preemption if
// there is a demand from under UL quota app.
// However this under UL quota app's demand may be more.
// Still we should ensure that we are not doing over preemption such
// that only a maximum of (user's used - UL quota) could be
// preempted.
if ((!apps[hPriority].getUser().equals(apps[lPriority].getUser()))
&& (apps[lPriority].getPriority() == apps[hPriority]
.getPriority())
&& tmpUser.isUserLimitReached(rc, cluster)) {
Resource deltaULQuota = Resources
.subtract(tmpUser.getUsedDeductAM(), tmpUser.selected);
Resources.subtractFrom(deltaULQuota, tmpUser.getUserLimit());
if (tmpUser.isPreemptionQuotaForULDeltaDone()) {
deltaULQuota = Resources.createResource(0, 0);
}
if (Resources.lessThan(rc, cluster, deltaULQuota,
preemptableFromLowerPriorityApp)) {
tmpUser.updatePreemptionQuotaForULDeltaAsDone(true);
preemptableFromLowerPriorityApp = deltaULQuota;
}
}
if (Resources.greaterThan(rc, cluster,
preemptableFromLowerPriorityApp, Resources.none())) {
Resource toPreempt = Resources.min(rc, cluster,
toPreemptFromOther, preemptableFromLowerPriorityApp);
apps[hPriority].setToBePreemptFromOther(
Resources.subtract(toPreemptFromOther, toPreempt));
apps[lPriority].setActuallyToBePreempted(
Resources.add(actuallyToPreempt, toPreempt));
}
}
}
}
}
}
private Resource calculateUsedAMResourcesPerQueue(String partition,
LeafQueue leafQueue, Map<String, Resource> perUserAMUsed) {
Collection<FiCaSchedulerApp> runningApps = leafQueue.getApplications();
Resource amUsed = Resources.createResource(0, 0);
synchronized (leafQueue) {
for (FiCaSchedulerApp app : runningApps) {
Resource userAMResource = perUserAMUsed.get(app.getUser());
if (null == userAMResource) {
userAMResource = Resources.createResource(0, 0);
perUserAMUsed.put(app.getUser(), userAMResource);
}
Resources.addTo(userAMResource, app.getAMResource(partition));
Resources.addTo(amUsed, app.getAMResource(partition));
}
}
return amUsed;
}
@Override
public boolean skipContainerBasedOnIntraQueuePolicy(FiCaSchedulerApp app,
Resource clusterResource, Resource usedResource, RMContainer c) {
// Ensure below checks
// 1. This check must be done only when preemption order is USERLIMIT_FIRST
// 2. By selecting container "c", check whether this user's resource usage
// is going below its user-limit.
// 3. Used resource of user must be always greater than user-limit to
// skip some containers as per this check. If used resource is under user
// limit, then these containers of this user has to be preempted as demand
// might be due to high priority apps running in same user.
String partition = context.getScheduler()
.getSchedulerNode(c.getAllocatedNode()).getPartition();
TempQueuePerPartition tq = context.getQueueByPartition(app.getQueueName(),
partition);
TempUserPerPartition tmpUser = tq.getUsersPerPartition().get(app.getUser());
// Given user is not present, skip the check.
if (tmpUser == null) {
return false;
}
// For ideal resource computations, user-limit got saved by subtracting am
// used resource in TempUser. Hence it has to be added back here for
// complete check.
Resource userLimit = Resources.add(tmpUser.getUserLimit(), tmpUser.amUsed);
return Resources.lessThanOrEqual(rc, clusterResource,
Resources.subtract(usedResource, c.getAllocatedResource()), userLimit)
&& context.getIntraQueuePreemptionOrderPolicy()
.equals(IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST);
}
}