blob: 3915b1e782dc2eb37c1b34942bffef11090f230f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@Private
@Unstable
public class LeafQueue extends AbstractCSQueue {
private static final Log LOG = LogFactory.getLog(LeafQueue.class);
private float absoluteUsedCapacity = 0.0f;
private int userLimit;
private float userLimitFactor;
protected int maxApplications;
protected int maxApplicationsPerUser;
private float maxAMResourcePerQueuePercent;
private volatile int nodeLocalityDelay;
private volatile boolean rackLocalityFullReset;
Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap =
new HashMap<ApplicationAttemptId, FiCaSchedulerApp>();
private Priority defaultAppPriorityPerQueue;
private OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicy = null;
private volatile float minimumAllocationFactor;
private Map<String, User> users = new HashMap<String, User>();
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
private CapacitySchedulerContext scheduler;
private final ActiveUsersManager activeUsersManager;
// cache last cluster resource to compute actual capacity
private Resource lastClusterResource = Resources.none();
private final QueueResourceLimitsInfo queueResourceLimitsInfo =
new QueueResourceLimitsInfo();
private volatile ResourceLimits cachedResourceLimitsForHeadroom = null;
private OrderingPolicy<FiCaSchedulerApp> orderingPolicy = null;
// Summation of consumed ratios for all users in queue
private float totalUserConsumedRatio = 0;
private UsageRatios qUsageRatios;
// record all ignore partition exclusivityRMContainer, this will be used to do
// preemption, key is the partition of the RMContainer allocated on
private Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityRMContainers =
new HashMap<>();
private Set<String> activeUsersSet =
Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
private float activeUsersTimesWeights = 0.0f;
@SuppressWarnings({ "unchecked", "rawtypes" })
public LeafQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old);
this.scheduler = cs;
this.activeUsersManager = new ActiveUsersManager(metrics);
// One time initialization is enough since it is static ordering policy
this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps();
qUsageRatios = new UsageRatios();
if(LOG.isDebugEnabled()) {
LOG.debug("LeafQueue:" + " name=" + queueName
+ ", fullname=" + getQueuePath());
}
setupQueueConfigs(cs.getClusterResource());
}
protected synchronized void setupQueueConfigs(Resource clusterResource)
throws IOException {
super.setupQueueConfigs(clusterResource);
this.lastClusterResource = clusterResource;
this.cachedResourceLimitsForHeadroom = new ResourceLimits(clusterResource);
// Initialize headroom info, also used for calculating application
// master resource limits. Since this happens during queue initialization
// and all queues may not be realized yet, we'll use (optimistic)
// absoluteMaxCapacity (it will be replaced with the more accurate
// absoluteMaxAvailCapacity during headroom/userlimit/allocation events)
setQueueResourceLimitsInfo(clusterResource);
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
setOrderingPolicy(conf.<FiCaSchedulerApp>getOrderingPolicy(getQueuePath()));
userLimit = conf.getUserLimit(getQueuePath());
userLimitFactor = conf.getUserLimitFactor(getQueuePath());
maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath());
if (maxApplications < 0) {
int maxSystemApps = conf.getMaximumSystemApplications();
maxApplications =
(int) (maxSystemApps * queueCapacities.getAbsoluteCapacity());
}
maxApplicationsPerUser = Math.min(maxApplications,
(int)(maxApplications * (userLimit / 100.0f) * userLimitFactor));
maxAMResourcePerQueuePercent =
conf.getMaximumApplicationMasterResourcePerQueuePercent(getQueuePath());
if (!SchedulerUtils.checkQueueLabelExpression(
this.accessibleLabels, this.defaultLabelExpression, null)) {
throw new IOException("Invalid default label expression of "
+ " queue="
+ getQueueName()
+ " doesn't have permission to access all labels "
+ "in default label expression. labelExpression of resource request="
+ (this.defaultLabelExpression == null ? ""
: this.defaultLabelExpression)
+ ". Queue labels="
+ (getAccessibleNodeLabels() == null ? "" : StringUtils.join(
getAccessibleNodeLabels().iterator(), ',')));
}
nodeLocalityDelay = conf.getNodeLocalityDelay();
rackLocalityFullReset = conf.getRackLocalityFullReset();
// re-init this since max allocation could have changed
this.minimumAllocationFactor =
Resources.ratio(resourceCalculator,
Resources.subtract(maximumAllocation, minimumAllocation),
maximumAllocation);
StringBuilder aclsString = new StringBuilder();
for (Map.Entry<AccessType, AccessControlList> e : acls.entrySet()) {
aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
}
StringBuilder labelStrBuilder = new StringBuilder();
if (accessibleLabels != null) {
for (String s : accessibleLabels) {
labelStrBuilder.append(s);
labelStrBuilder.append(",");
}
}
defaultAppPriorityPerQueue = Priority.newInstance(conf
.getDefaultApplicationPriorityConfPerQueue(getQueuePath()));
// Validate leaf queue's user's weights.
int queueUL = Math.min(100, conf.getUserLimit(getQueuePath()));
for (Entry<String, Float> e : getUserWeights().entrySet()) {
float val = e.getValue().floatValue();
if (val < 0.0f || val > (100.0f / queueUL)) {
throw new IOException("Weight (" + val + ") for user \"" + e.getKey()
+ "\" must be between 0 and" + " 100 / " + queueUL + " (= " +
100.0f/queueUL + ", the number of concurrent active users in "
+ getQueuePath() + ")");
}
}
updateUserWeights();
LOG.info("Initializing " + queueName + "\n" +
"capacity = " + queueCapacities.getCapacity() +
" [= (float) configuredCapacity / 100 ]" + "\n" +
"asboluteCapacity = " + queueCapacities.getAbsoluteCapacity() +
" [= parentAbsoluteCapacity * capacity ]" + "\n" +
"maxCapacity = " + queueCapacities.getMaximumCapacity() +
" [= configuredMaxCapacity ]" + "\n" +
"absoluteMaxCapacity = " + queueCapacities.getAbsoluteMaximumCapacity() +
" [= 1.0 maximumCapacity undefined, " +
"(parentAbsoluteMaxCapacity * maximumCapacity) / 100 otherwise ]" +
"\n" +
"userLimit = " + userLimit +
" [= configuredUserLimit ]" + "\n" +
"userLimitFactor = " + userLimitFactor +
" [= configuredUserLimitFactor ]" + "\n" +
"maxApplications = " + maxApplications +
" [= configuredMaximumSystemApplicationsPerQueue or" +
" (int)(configuredMaximumSystemApplications * absoluteCapacity)]" +
"\n" +
"maxApplicationsPerUser = " + maxApplicationsPerUser +
" [= (int)(maxApplications * (userLimit / 100.0f) * " +
"userLimitFactor) ]" + "\n" +
"usedCapacity = " + queueCapacities.getUsedCapacity() +
" [= usedResourcesMemory / " +
"(clusterResourceMemory * absoluteCapacity)]" + "\n" +
"absoluteUsedCapacity = " + absoluteUsedCapacity +
" [= usedResourcesMemory / clusterResourceMemory]" + "\n" +
"maxAMResourcePerQueuePercent = " + maxAMResourcePerQueuePercent +
" [= configuredMaximumAMResourcePercent ]" + "\n" +
"minimumAllocationFactor = " + minimumAllocationFactor +
" [= (float)(maximumAllocationMemory - minimumAllocationMemory) / " +
"maximumAllocationMemory ]" + "\n" +
"maximumAllocation = " + maximumAllocation +
" [= configuredMaxAllocation ]" + "\n" +
"numContainers = " + numContainers +
" [= currentNumContainers ]" + "\n" +
"state = " + state +
" [= configuredState ]" + "\n" +
"acls = " + aclsString +
" [= configuredAcls ]" + "\n" +
"nodeLocalityDelay = " + nodeLocalityDelay + "\n" +
"labels=" + labelStrBuilder.toString() + "\n" +
"reservationsContinueLooking = " +
reservationsContinueLooking + "\n" +
"preemptionDisabled = " + getPreemptionDisabled() + "\n" +
"defaultAppPriorityPerQueue = " + defaultAppPriorityPerQueue);
}
// This must be called from a synchronized method.
private void updateUserWeights() {
activeUsersSet = activeUsersManager.getActiveUsersSet();
for (Map.Entry<String, User> ue : users.entrySet()) {
ue.getValue().setWeight(getUserWeightFromQueue(ue.getKey()));
}
activeUsersTimesWeights = sumActiveUsersTimesWeights();
}
/**
* Used only by tests.
*/
@Private
public float getMinimumAllocationFactor() {
return minimumAllocationFactor;
}
/**
* Used only by tests.
*/
@Private
public float getMaxAMResourcePerQueuePercent() {
return maxAMResourcePerQueuePercent;
}
public int getMaxApplications() {
return maxApplications;
}
public synchronized int getMaxApplicationsPerUser() {
return maxApplicationsPerUser;
}
@Override
public ActiveUsersManager getActiveUsersManager() {
return activeUsersManager;
}
@Override
public List<CSQueue> getChildQueues() {
return null;
}
/**
* Set user limit - used only for testing.
* @param userLimit new user limit
*/
synchronized void setUserLimit(int userLimit) {
this.userLimit = userLimit;
}
/**
* Set user limit factor - used only for testing.
* @param userLimitFactor new user limit factor
*/
synchronized void setUserLimitFactor(float userLimitFactor) {
this.userLimitFactor = userLimitFactor;
}
@Override
public synchronized int getNumApplications() {
return getNumPendingApplications() + getNumActiveApplications();
}
public synchronized int getNumPendingApplications() {
return pendingOrderingPolicy.getNumSchedulableEntities();
}
public synchronized int getNumActiveApplications() {
return orderingPolicy.getNumSchedulableEntities();
}
@Private
public synchronized int getNumApplications(String user) {
return getUser(user).getTotalApplications();
}
@Private
public synchronized int getNumPendingApplications(String user) {
return getUser(user).getPendingApplications();
}
@Private
public synchronized int getNumActiveApplications(String user) {
return getUser(user).getActiveApplications();
}
@Override
public synchronized QueueState getState() {
return state;
}
@Private
public synchronized int getUserLimit() {
return userLimit;
}
@Private
public synchronized float getUserLimitFactor() {
return userLimitFactor;
}
@Override
public QueueInfo getQueueInfo(
boolean includeChildQueues, boolean recursive) {
QueueInfo queueInfo = getQueueInfo();
return queueInfo;
}
@Override
public synchronized List<QueueUserACLInfo>
getQueueUserAclInfo(UserGroupInformation user) {
QueueUserACLInfo userAclInfo =
recordFactory.newRecordInstance(QueueUserACLInfo.class);
List<QueueACL> operations = new ArrayList<QueueACL>();
for (QueueACL operation : QueueACL.values()) {
if (hasAccess(operation, user)) {
operations.add(operation);
}
}
userAclInfo.setQueueName(getQueueName());
userAclInfo.setUserAcls(operations);
return Collections.singletonList(userAclInfo);
}
public String toString() {
return queueName + ": " +
"capacity=" + queueCapacities.getCapacity() + ", " +
"absoluteCapacity=" + queueCapacities.getAbsoluteCapacity() + ", " +
"usedResources=" + queueUsage.getUsed() + ", " +
"usedCapacity=" + getUsedCapacity() + ", " +
"absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + ", " +
"numApps=" + getNumApplications() + ", " +
"numContainers=" + getNumContainers();
}
@VisibleForTesting
public synchronized void setNodeLabelManager(RMNodeLabelsManager mgr) {
this.labelManager = mgr;
}
@VisibleForTesting
public synchronized User getUser(String userName) {
User user = users.get(userName);
if (user == null) {
user = new User(userName);
users.put(userName, user);
user.setWeight(getUserWeightFromQueue(userName));
}
return user;
}
private float getUserWeightFromQueue(String userName) {
Float weight = getUserWeights().get(userName);
return (weight == null) ? 1.0f : weight.floatValue();
}
/**
* @return an ArrayList of UserInfo objects who are active in this queue
*/
public synchronized ArrayList<UserInfo> getUsers() {
ArrayList<UserInfo> usersToReturn = new ArrayList<UserInfo>();
for (Map.Entry<String, User> entry : users.entrySet()) {
User user = entry.getValue();
usersToReturn.add(new UserInfo(entry.getKey(), Resources.clone(user
.getAllUsed()), user.getActiveApplications(), user
.getPendingApplications(), Resources.clone(user
.getConsumedAMResources()), Resources.clone(user
.getUserResourceLimit()), user.getResourceUsage(),
user.getWeight(), activeUsersSet.contains(user.userName)));
}
return usersToReturn;
}
@Override
public synchronized void reinitialize(
CSQueue newlyParsedQueue, Resource clusterResource)
throws IOException {
// Sanity check
if (!(newlyParsedQueue instanceof LeafQueue) ||
!newlyParsedQueue.getQueuePath().equals(getQueuePath())) {
throw new IOException("Trying to reinitialize " + getQueuePath() +
" from " + newlyParsedQueue.getQueuePath());
}
LeafQueue newlyParsedLeafQueue = (LeafQueue)newlyParsedQueue;
// don't allow the maximum allocation to be decreased in size
// since we have already told running AM's the size
Resource oldMax = getMaximumAllocation();
Resource newMax = newlyParsedLeafQueue.getMaximumAllocation();
if (newMax.getMemorySize() < oldMax.getMemorySize()
|| newMax.getVirtualCores() < oldMax.getVirtualCores()) {
throw new IOException(
"Trying to reinitialize "
+ getQueuePath()
+ " the maximum allocation size can not be decreased!"
+ " Current setting: " + oldMax
+ ", trying to set it to: " + newMax);
}
setupQueueConfigs(clusterResource);
// queue metrics are updated, more resource may be available
// activate the pending applications if possible
activateApplications();
}
@Override
public void submitApplicationAttempt(FiCaSchedulerApp application,
String userName) {
// Careful! Locking order is important!
synchronized (this) {
User user = getUser(userName);
// Add the attempt to our data-structures
addApplicationAttempt(application, user);
}
// We don't want to update metrics for move app
if (application.isPending()) {
metrics.submitAppAttempt(userName);
}
getParent().submitApplicationAttempt(application, userName);
}
@Override
public void submitApplication(ApplicationId applicationId, String userName,
String queue) throws AccessControlException {
// Careful! Locking order is important!
// Check queue ACLs
UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(userName);
if (!hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi)
&& !hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
throw new AccessControlException("User " + userName + " cannot submit" +
" applications to queue " + getQueuePath());
}
User user = null;
synchronized (this) {
// Check if the queue is accepting jobs
if (getState() != QueueState.RUNNING) {
String msg = "Queue " + getQueuePath() +
" is STOPPED. Cannot accept submission of application: " + applicationId;
LOG.info(msg);
throw new AccessControlException(msg);
}
// Check submission limits for queues
if (getNumApplications() >= getMaxApplications()) {
String msg = "Queue " + getQueuePath() +
" already has " + getNumApplications() + " applications," +
" cannot accept submission of application: " + applicationId;
LOG.info(msg);
throw new AccessControlException(msg);
}
// Check submission limits for the user on this queue
user = getUser(userName);
if (user.getTotalApplications() >= getMaxApplicationsPerUser()) {
String msg = "Queue " + getQueuePath() +
" already has " + user.getTotalApplications() +
" applications from user " + userName +
" cannot accept submission of application: " + applicationId;
LOG.info(msg);
throw new AccessControlException(msg);
}
}
// Inform the parent queue
try {
getParent().submitApplication(applicationId, userName, queue);
} catch (AccessControlException ace) {
LOG.info("Failed to submit application to parent-queue: " +
getParent().getQueuePath(), ace);
throw ace;
}
}
public Resource getAMResourceLimit() {
return queueUsage.getAMLimit();
}
public Resource getAMResourceLimitPerPartition(String nodePartition) {
return queueUsage.getAMLimit(nodePartition);
}
public synchronized Resource calculateAndGetAMResourceLimit() {
return calculateAndGetAMResourceLimitPerPartition(
RMNodeLabelsManager.NO_LABEL);
}
@VisibleForTesting
public synchronized Resource getUserAMResourceLimit() {
return getUserAMResourceLimitPerPartition(RMNodeLabelsManager.NO_LABEL,
null);
}
public synchronized Resource getUserAMResourceLimitPerPartition(
String nodePartition, String userName) {
float userWeight = 1.0f;
if (userName != null && getUser(userName) != null) {
userWeight = getUser(userName).getWeight();
}
if (activeUsersManager.getActiveUsersChanged()) {
activeUsersSet = activeUsersManager.getActiveUsersSet();
activeUsersTimesWeights = sumActiveUsersTimesWeights();
activeUsersManager.clearActiveUsersChanged();
}
/*
* The user am resource limit is based on the same approach as the user
* limit (as it should represent a subset of that). This means that it uses
* the absolute queue capacity (per partition) instead of the max and is
* modified by the userlimit and the userlimit factor as is the userlimit
*/
float effectiveUserLimit;
if (activeUsersTimesWeights > 0.0f) {
effectiveUserLimit = Math.max(userLimit / 100.0f,
1.0f / activeUsersTimesWeights);
} else {
effectiveUserLimit = Math.max(userLimit / 100.0f,
1.0f / Math.max(getActiveUsersManager().getNumActiveUsers(), 1));
}
effectiveUserLimit = Math.min(effectiveUserLimit * userWeight, 1.0f);
Resource queuePartitionResource = Resources.multiplyAndNormalizeUp(
resourceCalculator,
labelManager.getResourceByLabel(nodePartition, lastClusterResource),
queueCapacities.getAbsoluteCapacity(nodePartition), minimumAllocation);
Resource userAMLimit = Resources.multiplyAndNormalizeUp(resourceCalculator,
queuePartitionResource,
queueCapacities.getMaxAMResourcePercentage(nodePartition)
* effectiveUserLimit * userLimitFactor, minimumAllocation);
return Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
userAMLimit, getAMResourceLimitPerPartition(nodePartition))
? userAMLimit
: getAMResourceLimitPerPartition(nodePartition);
}
public synchronized Resource calculateAndGetAMResourceLimitPerPartition(
String nodePartition) {
/*
* For non-labeled partition, get the max value from resources currently
* available to the queue and the absolute resources guaranteed for the
* partition in the queue. For labeled partition, consider only the absolute
* resources guaranteed. Multiply this value (based on labeled/
* non-labeled), * with per-partition am-resource-percent to get the max am
* resource limit for this queue and partition.
*/
Resource queuePartitionResource = Resources.multiplyAndNormalizeUp(
resourceCalculator,
labelManager.getResourceByLabel(nodePartition, lastClusterResource),
queueCapacities.getAbsoluteCapacity(nodePartition), minimumAllocation);
Resource queueCurrentLimit = Resources.none();
// For non-labeled partition, we need to consider the current queue
// usage limit.
if (nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
synchronized (queueResourceLimitsInfo) {
queueCurrentLimit = queueResourceLimitsInfo.getQueueCurrentLimit();
}
}
float amResourcePercent = queueCapacities
.getMaxAMResourcePercentage(nodePartition);
// Current usable resource for this queue and partition is the max of
// queueCurrentLimit and queuePartitionResource.
Resource queuePartitionUsableResource = Resources.max(resourceCalculator,
lastClusterResource, queueCurrentLimit, queuePartitionResource);
Resource amResouceLimit = Resources.multiplyAndNormalizeUp(
resourceCalculator, queuePartitionUsableResource, amResourcePercent,
minimumAllocation);
metrics.setAMResouceLimit(nodePartition, amResouceLimit);
queueUsage.setAMLimit(nodePartition, amResouceLimit);
return amResouceLimit;
}
private synchronized void activateApplications() {
// limit of allowed resource usage for application masters
Map<String, Resource> userAmPartitionLimit =
new HashMap<String, Resource>();
// AM Resource Limit for accessible labels can be pre-calculated.
// This will help in updating AMResourceLimit for all labels when queue
// is initialized for the first time (when no applications are present).
for (String nodePartition : getNodeLabelsForQueue()) {
calculateAndGetAMResourceLimitPerPartition(nodePartition);
}
for (Iterator<FiCaSchedulerApp> fsApp =
getPendingAppsOrderingPolicy().getAssignmentIterator();
fsApp.hasNext();) {
FiCaSchedulerApp application = fsApp.next();
ApplicationId applicationId = application.getApplicationId();
// Get the am-node-partition associated with each application
// and calculate max-am resource limit for this partition.
String partitionName = application.getAppAMNodePartitionName();
Resource amLimit = getAMResourceLimitPerPartition(partitionName);
// Verify whether we already calculated am-limit for this label.
if (amLimit == null) {
amLimit = calculateAndGetAMResourceLimitPerPartition(partitionName);
}
// Check am resource limit.
Resource amIfStarted = Resources.add(
application.getAMResource(partitionName),
queueUsage.getAMUsed(partitionName));
if (LOG.isDebugEnabled()) {
LOG.debug("application "+application.getId() +" AMResource "
+ application.getAMResource(partitionName)
+ " maxAMResourcePerQueuePercent " + maxAMResourcePerQueuePercent
+ " amLimit " + amLimit + " lastClusterResource "
+ lastClusterResource + " amIfStarted " + amIfStarted
+ " AM node-partition name " + partitionName);
}
if (!Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
amIfStarted, amLimit)) {
if (getNumActiveApplications() < 1
|| (Resources.lessThanOrEqual(resourceCalculator,
lastClusterResource, queueUsage.getAMUsed(partitionName),
Resources.none()))) {
LOG.warn("maximum-am-resource-percent is insufficient to start a"
+ " single application in queue, it is likely set too low."
+ " skipping enforcement to allow at least one application"
+ " to start");
} else {
application.updateAMContainerDiagnostics(AMState.INACTIVATED,
CSAMContainerLaunchDiagnosticsConstants.QUEUE_AM_RESOURCE_LIMIT_EXCEED);
if (LOG.isDebugEnabled()) {
LOG.debug("Not activating application " + applicationId
+ " as amIfStarted: " + amIfStarted + " exceeds amLimit: "
+ amLimit);
}
continue;
}
}
// Check user am resource limit
User user = getUser(application.getUser());
Resource userAMLimit = userAmPartitionLimit.get(partitionName);
// Verify whether we already calculated user-am-limit for this label.
if (userAMLimit == null) {
userAMLimit = getUserAMResourceLimitPerPartition(partitionName,
application.getUser());
userAmPartitionLimit.put(partitionName, userAMLimit);
}
Resource userAmIfStarted = Resources.add(
application.getAMResource(partitionName),
user.getConsumedAMResources(partitionName));
if (!Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
userAmIfStarted, userAMLimit)) {
if (getNumActiveApplications() < 1
|| (Resources.lessThanOrEqual(resourceCalculator,
lastClusterResource, queueUsage.getAMUsed(partitionName),
Resources.none()))) {
LOG.warn("maximum-am-resource-percent is insufficient to start a"
+ " single application in queue for user, it is likely set too"
+ " low. skipping enforcement to allow at least one application"
+ " to start");
} else {
application.updateAMContainerDiagnostics(AMState.INACTIVATED,
CSAMContainerLaunchDiagnosticsConstants.USER_AM_RESOURCE_LIMIT_EXCEED);
if (LOG.isDebugEnabled()) {
LOG.debug("Not activating application " + applicationId
+ " for user: " + user + " as userAmIfStarted: "
+ userAmIfStarted + " exceeds userAmLimit: " + userAMLimit);
}
continue;
}
}
user.activateApplication();
orderingPolicy.addSchedulableEntity(application);
application.updateAMContainerDiagnostics(AMState.ACTIVATED, null);
queueUsage.incAMUsed(partitionName,
application.getAMResource(partitionName));
user.getResourceUsage().incAMUsed(partitionName,
application.getAMResource(partitionName));
user.getResourceUsage().setAMLimit(partitionName, userAMLimit);
metrics.incAMUsed(partitionName, application.getUser(),
application.getAMResource(partitionName));
metrics.setAMResouceLimitForUser(partitionName,
application.getUser(), userAMLimit);
fsApp.remove();
LOG.info("Application " + applicationId + " from user: "
+ application.getUser() + " activated in queue: " + getQueueName());
}
}
private synchronized void addApplicationAttempt(FiCaSchedulerApp application,
User user) {
// Accept
user.submitApplication();
getPendingAppsOrderingPolicy().addSchedulableEntity(application);
applicationAttemptMap.put(application.getApplicationAttemptId(), application);
// Activate applications
if (Resources.greaterThan(resourceCalculator, lastClusterResource,
lastClusterResource, Resources.none())) {
activateApplications();
} else {
application.updateAMContainerDiagnostics(AMState.INACTIVATED,
CSAMContainerLaunchDiagnosticsConstants.CLUSTER_RESOURCE_EMPTY);
LOG.info("Skipping activateApplications for "
+ application.getApplicationAttemptId()
+ " since cluster resource is " + Resources.none());
}
LOG.info("Application added -" +
" appId: " + application.getApplicationId() +
" user: " + application.getUser() + "," +
" leaf-queue: " + getQueueName() +
" #user-pending-applications: " + user.getPendingApplications() +
" #user-active-applications: " + user.getActiveApplications() +
" #queue-pending-applications: " + getNumPendingApplications() +
" #queue-active-applications: " + getNumActiveApplications()
);
}
@Override
public void finishApplication(ApplicationId application, String user) {
// Inform the activeUsersManager
activeUsersManager.deactivateApplication(user, application);
// Inform the parent queue
getParent().finishApplication(application, user);
}
@Override
public void finishApplicationAttempt(FiCaSchedulerApp application, String queue) {
// Careful! Locking order is important!
synchronized (this) {
removeApplicationAttempt(application, getUser(application.getUser()));
}
getParent().finishApplicationAttempt(application, queue);
}
public synchronized void removeApplicationAttempt(
FiCaSchedulerApp application, User user) {
String partitionName = application.getAppAMNodePartitionName();
boolean wasActive =
orderingPolicy.removeSchedulableEntity(application);
if (!wasActive) {
pendingOrderingPolicy.removeSchedulableEntity(application);
} else {
queueUsage.decAMUsed(partitionName,
application.getAMResource(partitionName));
user.getResourceUsage().decAMUsed(partitionName,
application.getAMResource(partitionName));
metrics.decAMUsed(partitionName,
application.getUser(), application.getAMResource());
}
applicationAttemptMap.remove(application.getApplicationAttemptId());
user.finishApplication(wasActive);
if (user.getTotalApplications() == 0) {
users.remove(application.getUser());
}
// Check if we can activate more applications
activateApplications();
LOG.info("Application removed -" +
" appId: " + application.getApplicationId() +
" user: " + application.getUser() +
" queue: " + getQueueName() +
" #user-pending-applications: " + user.getPendingApplications() +
" #user-active-applications: " + user.getActiveApplications() +
" #queue-pending-applications: " + getNumPendingApplications() +
" #queue-active-applications: " + getNumActiveApplications()
);
}
private synchronized FiCaSchedulerApp getApplication(
ApplicationAttemptId applicationAttemptId) {
return applicationAttemptMap.get(applicationAttemptId);
}
private void handleExcessReservedContainer(Resource clusterResource,
CSAssignment assignment, FiCaSchedulerNode node, FiCaSchedulerApp app) {
if (assignment.getExcessReservation() != null) {
RMContainer excessReservedContainer = assignment.getExcessReservation();
if (excessReservedContainer.hasIncreaseReservation()) {
unreserveIncreasedContainer(clusterResource,
app, node, excessReservedContainer);
} else {
completedContainer(clusterResource, assignment.getApplication(),
scheduler.getNode(excessReservedContainer.getAllocatedNode()),
excessReservedContainer,
SchedulerUtils.createAbnormalContainerStatus(
excessReservedContainer.getContainerId(),
SchedulerUtils.UNRESERVED_CONTAINER),
RMContainerEventType.RELEASED, null, false);
}
assignment.setExcessReservation(null);
}
}
private void killToPreemptContainers(Resource clusterResource,
FiCaSchedulerNode node,
CSAssignment assignment) {
if (assignment.getContainersToKill() != null) {
StringBuilder sb = new StringBuilder("Killing containers: [");
for (RMContainer c : assignment.getContainersToKill()) {
FiCaSchedulerApp application = csContext.getApplicationAttempt(
c.getApplicationAttemptId());
LeafQueue q = application.getCSLeafQueue();
q.completedContainer(clusterResource, application, node, c, SchedulerUtils
.createPreemptedContainerStatus(c.getContainerId(),
SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL,
null, false);
sb.append("(container=" + c.getContainerId() + " resource=" + c
.getAllocatedResource() + ")");
}
sb.append("] for container=" + assignment.getAssignmentInformation()
.getFirstAllocatedOrReservedContainerId() + " resource=" + assignment
.getResource());
LOG.info(sb.toString());
}
}
private void setPreemptionAllowed(ResourceLimits limits, String nodePartition) {
// Set preemption-allowed:
// For leaf queue, only under-utilized queue is allowed to preempt resources from other queues
float usedCapacity = queueCapacities.getAbsoluteUsedCapacity(nodePartition);
float guaranteedCapacity = queueCapacities.getAbsoluteCapacity(nodePartition);
limits.setIsAllowPreemption(usedCapacity < guaranteedCapacity);
}
@Override
public CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, ResourceLimits currentResourceLimits,
SchedulingMode schedulingMode) {
FiCaSchedulerApp reservedApp = null;
CSAssignment reservedCSAssignment = null;
synchronized (this) {
updateCurrentResourceLimits(currentResourceLimits, clusterResource);
if (LOG.isDebugEnabled()) {
LOG.debug(
"assignContainers: node=" + node.getNodeName() + " #applications="
+ orderingPolicy.getNumSchedulableEntities());
}
setPreemptionAllowed(currentResourceLimits, node.getPartition());
// Check for reserved resources
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
reservedApp = getApplication(
reservedContainer.getApplicationAttemptId());
synchronized (reservedApp) {
reservedCSAssignment = reservedApp.assignContainers(
clusterResource, node, currentResourceLimits, schedulingMode,
reservedContainer);
}
}
}
// Handle possible completedContainer out of synchronized lock to avoid
// deadlock.
if (reservedCSAssignment != null) {
handleExcessReservedContainer(clusterResource, reservedCSAssignment, node,
reservedApp);
killToPreemptContainers(clusterResource, node, reservedCSAssignment);
return reservedCSAssignment;
}
synchronized (this) {
// if our queue cannot access this node, just return
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
&& !accessibleToPartition(node.getPartition())) {
return CSAssignment.NULL_ASSIGNMENT;
}
// Check if this queue need more resource, simply skip allocation if this
// queue doesn't need more resources.
if (!hasPendingResourceRequest(node.getPartition(), clusterResource,
schedulingMode)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip this queue=" + getQueuePath()
+ ", because it doesn't need more resource, schedulingMode="
+ schedulingMode.name() + " node-partition=" + node
.getPartition());
}
return CSAssignment.NULL_ASSIGNMENT;
}
Map<String, CachedUserLimit> userLimits = new HashMap<>();
boolean needAssignToQueueCheck = true;
for (Iterator<FiCaSchedulerApp> assignmentIterator =
orderingPolicy.getAssignmentIterator(); assignmentIterator
.hasNext(); ) {
FiCaSchedulerApp application = assignmentIterator.next();
// Check queue max-capacity limit
Resource appReserved = application.getCurrentReservation();
if (needAssignToQueueCheck) {
if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
currentResourceLimits, appReserved, schedulingMode)) {
return CSAssignment.NULL_ASSIGNMENT;
}
// If there was no reservation and canAssignToThisQueue returned
// true, there is no reason to check further.
if (!this.reservationsContinueLooking
|| appReserved.equals(Resources.none()) || !node.getPartition()
.equals(CommonNodeLabelsManager.NO_LABEL)) {
needAssignToQueueCheck = false;
}
}
CachedUserLimit cul = userLimits.get(application.getUser());
Resource cachedUserLimit = null;
if (cul != null) {
cachedUserLimit = cul.userLimit;
}
Resource userLimit =
computeUserLimitAndSetHeadroom(application, clusterResource,
node.getPartition(), schedulingMode, cachedUserLimit);
if (cul == null) {
cul = new CachedUserLimit(userLimit);
userLimits.put(application.getUser(), cul);
}
// Check user limit
boolean userAssignable = true;
if (!cul.canAssign && Resources.fitsIn(appReserved, cul.reservation)) {
userAssignable = false;
} else {
userAssignable =
canAssignToUser(clusterResource, application.getUser(), userLimit,
appReserved, node.getPartition(), currentResourceLimits);
if (!userAssignable && Resources.fitsIn(cul.reservation, appReserved)) {
cul.canAssign = false;
cul.reservation = appReserved;
}
}
if (!userAssignable) {
application.updateAMContainerDiagnostics(AMState.ACTIVATED,
"User capacity has reached its maximum limit.");
continue;
}
// Try to schedule
CSAssignment assignment =
application.assignContainers(clusterResource, node,
currentResourceLimits, schedulingMode, null);
if (LOG.isDebugEnabled()) {
LOG.debug("post-assignContainers for application "
+ application.getApplicationId());
application.showRequests();
}
// Did we schedule or reserve a container?
Resource assigned = assignment.getResource();
handleExcessReservedContainer(clusterResource, assignment, node,
application);
killToPreemptContainers(clusterResource, node, assignment);
if (Resources.greaterThan(resourceCalculator, clusterResource, assigned,
Resources.none())) {
// Get reserved or allocated container from application
RMContainer reservedOrAllocatedRMContainer =
application.getRMContainer(assignment.getAssignmentInformation()
.getFirstAllocatedOrReservedContainerId());
// Book-keeping
// Note: Update headroom to account for current allocation too...
allocateResource(clusterResource, application, assigned,
node.getPartition(), reservedOrAllocatedRMContainer,
assignment.isIncreasedAllocation());
// Update reserved metrics
Resource reservedRes = assignment.getAssignmentInformation()
.getReserved();
if (reservedRes != null && !reservedRes.equals(Resources.none())) {
incReservedResource(node.getPartition(), reservedRes);
}
// Done
return assignment;
} else if (assignment.getSkippedType()
== CSAssignment.SkippedType.OTHER) {
application.updateNodeInfoForAMDiagnostics(node);
} else if(assignment.getSkippedType()
== CSAssignment.SkippedType.QUEUE_LIMIT) {
return assignment;
} else {
// If we don't allocate anything, and it is not skipped by application,
// we will return to respect FIFO of applications
return CSAssignment.NULL_ASSIGNMENT;
}
}
return CSAssignment.NULL_ASSIGNMENT;
}
}
protected Resource getHeadroom(User user, Resource queueCurrentLimit,
Resource clusterResource, FiCaSchedulerApp application) {
return getHeadroom(user, queueCurrentLimit, clusterResource, application,
RMNodeLabelsManager.NO_LABEL);
}
protected Resource getHeadroom(User user, Resource queueCurrentLimit,
Resource clusterResource, FiCaSchedulerApp application,
String partition) {
return getHeadroom(user, queueCurrentLimit, clusterResource,
computeUserLimit(application.getUser(), clusterResource, user,
partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, true),
partition);
}
private Resource getHeadroom(User user,
Resource currentPartitionResourceLimit, Resource clusterResource,
Resource userLimitResource, String partition) {
/**
* Headroom is:
* min(
* min(userLimit, queueMaxCap) - userConsumed,
* queueMaxCap - queueUsedResources
* )
*
* ( which can be expressed as,
* min (userLimit - userConsumed, queuMaxCap - userConsumed,
* queueMaxCap - queueUsedResources)
* )
*
* given that queueUsedResources >= userConsumed, this simplifies to
*
* >> min (userlimit - userConsumed, queueMaxCap - queueUsedResources) <<
*
* sum of queue max capacities of multiple queue's will be greater than the
* actual capacity of a given partition, hence we need to ensure that the
* headroom is not greater than the available resource for a given partition
*
* headroom = min (unused resourcelimit of a label, calculated headroom )
*/
currentPartitionResourceLimit =
partition.equals(RMNodeLabelsManager.NO_LABEL)
? currentPartitionResourceLimit
: getQueueMaxResource(partition, clusterResource);
Resource headroom = Resources.componentwiseMin(
Resources.subtract(userLimitResource, user.getUsed(partition)),
Resources.subtract(currentPartitionResourceLimit,
queueUsage.getUsed(partition)));
// Normalize it before return
headroom =
Resources.roundDown(resourceCalculator, headroom, minimumAllocation);
//headroom = min (unused resourcelimit of a label, calculated headroom )
Resource clusterPartitionResource =
labelManager.getResourceByLabel(partition, clusterResource);
Resource clusterFreePartitionResource =
Resources.subtract(clusterPartitionResource,
csContext.getClusterResourceUsage().getUsed(partition));
headroom = Resources.min(resourceCalculator, clusterPartitionResource,
clusterFreePartitionResource, headroom);
return headroom;
}
private void setQueueResourceLimitsInfo(
Resource clusterResource) {
synchronized (queueResourceLimitsInfo) {
queueResourceLimitsInfo.setQueueCurrentLimit(cachedResourceLimitsForHeadroom
.getLimit());
queueResourceLimitsInfo.setClusterResource(clusterResource);
}
}
@Lock({LeafQueue.class, FiCaSchedulerApp.class})
Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application,
Resource clusterResource, String nodePartition,
SchedulingMode schedulingMode, Resource userLimit) {
String user = application.getUser();
User queueUser = getUser(user);
// Compute user limit respect requested labels,
// TODO, need consider headroom respect labels also
if (userLimit == null) {
userLimit =
computeUserLimit(application.getUser(), clusterResource, queueUser,
nodePartition, schedulingMode, true);
}
setQueueResourceLimitsInfo(clusterResource);
Resource headroom =
metrics.getUserMetrics(user) == null ? Resources.none() :
getHeadroom(queueUser, cachedResourceLimitsForHeadroom.getLimit(),
clusterResource, userLimit, nodePartition);
if (LOG.isDebugEnabled()) {
LOG.debug("Headroom calculation for user " + user + ": " +
" userLimit=" + userLimit +
" queueMaxAvailRes=" + cachedResourceLimitsForHeadroom.getLimit() +
" consumed=" + queueUser.getUsed());
}
CapacityHeadroomProvider headroomProvider = new CapacityHeadroomProvider(
queueUser, this, application, queueResourceLimitsInfo);
application.setHeadroomProvider(headroomProvider);
metrics.setAvailableResourcesToUser(nodePartition, user, headroom);
return userLimit;
}
@Lock(NoLock.class)
public int getNodeLocalityDelay() {
return nodeLocalityDelay;
}
@Lock(NoLock.class)
public boolean getRackLocalityFullReset() {
return rackLocalityFullReset;
}
@Lock(NoLock.class)
private Resource computeUserLimit(String userName,
Resource clusterResource, User user,
String nodePartition, SchedulingMode schedulingMode, boolean forActive) {
Resource partitionResource = labelManager.getResourceByLabel(nodePartition,
clusterResource);
// What is our current capacity?
// * It is equal to the max(required, queue-capacity) if
// we're running below capacity. The 'max' ensures that jobs in queues
// with miniscule capacity (< 1 slot) make progress
// * If we're running over capacity, then its
// (usedResources + required) (which extra resources we are allocating)
Resource queueCapacity =
Resources.multiplyAndNormalizeUp(resourceCalculator,
partitionResource,
queueCapacities.getAbsoluteCapacity(nodePartition),
minimumAllocation);
// Assume we have required resource equals to minimumAllocation, this can
// make sure user limit can continuously increase till queueMaxResource
// reached.
Resource required = minimumAllocation;
// Allow progress for queues with miniscule capacity
queueCapacity =
Resources.max(
resourceCalculator, partitionResource,
queueCapacity,
required);
/* We want to base the userLimit calculation on
* max(queueCapacity, usedResources+required). However, we want
* usedResources to be based on the combined ratios of all the users in the
* queue so we use consumedRatio to calculate such.
* The calculation is dependent on how the resourceCalculator calculates the
* ratio between two Resources. DRF Example: If usedResources is
* greater than queueCapacity and users have the following [mem,cpu] usages:
* User1: [10%,20%] - Dominant resource is 20%
* User2: [30%,10%] - Dominant resource is 30%
* Then total consumedRatio is then 20+30=50%. Yes, this value can be
* larger than 100% but for the purposes of making sure all users are
* getting their fair share, it works.
*/
Resource consumed = Resources.multiplyAndNormalizeUp(resourceCalculator,
partitionResource, qUsageRatios.getUsageRatio(nodePartition),
minimumAllocation);
Resource currentCapacity =
Resources.lessThan(resourceCalculator, partitionResource, consumed,
queueCapacity) ? queueCapacity : Resources.add(consumed, required);
// Never allow a single user to take more than the
// queue's configured capacity * user-limit-factor.
// Also, the queue's configured capacity should be higher than
// queue-hard-limit * ulMin
if (activeUsersManager.getActiveUsersChanged()) {
activeUsersSet = activeUsersManager.getActiveUsersSet();
activeUsersTimesWeights = sumActiveUsersTimesWeights();
activeUsersManager.clearActiveUsersChanged();
}
float usersSummedByWeight = activeUsersTimesWeights;
// Align the preemption algorithm with the assignment algorithm.
// If calculating for preemption and the user is not active, calculate the
// limit as if the user will be preempted (since that will make it active).
if (!forActive && !activeUsersSet.contains(userName)) {
usersSummedByWeight = activeUsersTimesWeights + user.getWeight();
}
// User limit resource is determined by:
// max(currentCapacity / #activeUsers, currentCapacity *
// user-limit-percentage%)
Resource userLimitResource = Resources.max(
resourceCalculator, partitionResource,
Resources.divideAndCeil(
resourceCalculator, currentCapacity, usersSummedByWeight),
Resources.divideAndCeil(
resourceCalculator,
Resources.multiplyAndRoundDown(
currentCapacity, userLimit),
100)
);
// User limit is capped by maxUserLimit
// - maxUserLimit = queueCapacity * user-limit-factor (RESPECT_PARTITION_EXCLUSIVITY)
// - maxUserLimit = total-partition-resource (IGNORE_PARTITION_EXCLUSIVITY)
//
// In IGNORE_PARTITION_EXCLUSIVITY mode, if a queue cannot access a
// partition, its guaranteed resource on that partition is 0. And
// user-limit-factor computation is based on queue's guaranteed capacity. So
// we will not cap user-limit as well as used resource when doing
// IGNORE_PARTITION_EXCLUSIVITY allocation.
Resource maxUserLimit = Resources.none();
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
maxUserLimit =
Resources.multiplyAndRoundDown(queueCapacity, userLimitFactor);
} else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
maxUserLimit = partitionResource;
}
// Cap final user limit with maxUserLimit
userLimitResource =
Resources.roundUp(
resourceCalculator,
Resources.min(
resourceCalculator, partitionResource,
userLimitResource,
maxUserLimit
),
minimumAllocation);
if (LOG.isDebugEnabled()) {
LOG.debug("User limit computation for " + userName +
" in queue " + getQueueName() +
" userLimitPercent=" + userLimit +
" userLimitFactor=" + userLimitFactor +
" required: " + required +
" consumed: " + consumed +
" user-limit-resource: " + userLimitResource +
" queueCapacity: " + queueCapacity +
" qconsumed: " + queueUsage.getUsed() +
" consumedRatio: " + totalUserConsumedRatio +
" currentCapacity: " + currentCapacity +
" activeUsers: " + usersSummedByWeight +
" clusterCapacity: " + clusterResource +
" resourceByLabel: " + partitionResource +
" usageratio: " + qUsageRatios.getUsageRatio(nodePartition) +
" Partition: " + nodePartition +
" maxUserLimit=" + maxUserLimit +
" userWeight=" + ((user != null) ? user.getWeight() : 1.0f)
);
}
// Apply user's weight.
float weight = (user == null) ? 1.0f : user.getWeight();
userLimitResource =
Resources.multiplyAndNormalizeDown(resourceCalculator,
userLimitResource, weight, minimumAllocation);
if (forActive) {
user.setUserResourceLimit(userLimitResource);
}
return userLimitResource;
}
float sumActiveUsersTimesWeights() {
float count = 0.0f;
for (String userName : activeUsersSet) {
// Do the following instead of calling getUser to avoid synchronization.
User user = users.get(userName);
count += (user != null) ? user.getWeight() : 0.0f;
}
return count;
}
@Private
protected synchronized boolean canAssignToUser(Resource clusterResource,
String userName, Resource limit, Resource rsrv,
String nodePartition, ResourceLimits currentResourceLimits) {
User user = getUser(userName);
Resource used = user.getUsed(nodePartition);
currentResourceLimits.setAmountNeededUnreserve(Resources.none());
// Note: We aren't considering the current request since there is a fixed
// overhead of the AM, but it's a > check, not a >= check, so...
if (Resources
.greaterThan(resourceCalculator, clusterResource,
used,
limit)) {
// if enabled, check to see if could we potentially use this node instead
// of a reserved node if the application has reserved containers
if (this.reservationsContinueLooking && !rsrv.equals(Resources.none())
&& nodePartition.equals(CommonNodeLabelsManager.NO_LABEL)) {
if (Resources.lessThanOrEqual(
resourceCalculator,
clusterResource,
Resources.subtract(used,
rsrv), limit)) {
if (LOG.isDebugEnabled()) {
LOG.debug("User " + userName + " in queue " + getQueueName()
+ " will exceed limit based on reservations - " + " consumed: "
+ used + " reserved: "
+ rsrv + " limit: " + limit);
}
Resource amountNeededToUnreserve =
Resources.subtract(used, limit);
// we can only acquire a new container if we unreserve first to
// respect user-limit
currentResourceLimits.setAmountNeededUnreserve(amountNeededToUnreserve);
return true;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("User " + userName + " in queue " + getQueueName()
+ " will exceed limit - " + " consumed: "
+ used + " limit: " + limit);
}
return false;
}
return true;
}
@Override
public void unreserveIncreasedContainer(Resource clusterResource,
FiCaSchedulerApp app, FiCaSchedulerNode node, RMContainer rmContainer) {
boolean removed = false;
Priority priority = null;
synchronized (this) {
if (rmContainer.getContainer() != null) {
priority = rmContainer.getContainer().getPriority();
}
if (null != priority) {
removed = app.unreserve(rmContainer.getContainer().getPriority(), node,
rmContainer);
}
if (removed) {
// Inform the ordering policy
orderingPolicy.containerReleased(app, rmContainer);
releaseResource(clusterResource, app, rmContainer.getReservedResource(),
node.getPartition(), rmContainer, true);
}
}
if (removed) {
getParent().unreserveIncreasedContainer(clusterResource, app, node,
rmContainer);
}
}
private synchronized float calculateUserUsageRatio(Resource clusterResource,
String nodePartition) {
Resource resourceByLabel =
labelManager.getResourceByLabel(nodePartition, clusterResource);
float consumed = 0;
User user;
for (Map.Entry<String, User> entry : users.entrySet()) {
user = entry.getValue();
consumed += user.resetAndUpdateUsageRatio(resourceCalculator,
resourceByLabel, nodePartition);
}
return consumed;
}
private synchronized void recalculateQueueUsageRatio(Resource clusterResource,
String nodePartition) {
ResourceUsage queueResourceUsage = this.getQueueResourceUsage();
if (nodePartition == null) {
for (String partition : Sets.union(queueCapacities.getNodePartitionsSet(),
queueResourceUsage.getNodePartitionsSet())) {
qUsageRatios.setUsageRatio(partition,
calculateUserUsageRatio(clusterResource, partition));
}
} else {
qUsageRatios.setUsageRatio(nodePartition,
calculateUserUsageRatio(clusterResource, nodePartition));
}
}
private synchronized void updateQueueUsageRatio(String nodePartition,
float delta) {
qUsageRatios.incUsageRatio(nodePartition, delta);
}
private void updateSchedulerHealthForCompletedContainer(
RMContainer rmContainer, ContainerStatus containerStatus) {
// Update SchedulerHealth for released / preempted container
SchedulerHealth schedulerHealth = csContext.getSchedulerHealth();
if (null == schedulerHealth) {
// Only do update if we have schedulerHealth
return;
}
if (containerStatus.getExitStatus() == ContainerExitStatus.PREEMPTED) {
schedulerHealth.updatePreemption(Time.now(), rmContainer.getAllocatedNode(),
rmContainer.getContainerId(), getQueuePath());
schedulerHealth.updateSchedulerPreemptionCounts(1);
} else {
schedulerHealth.updateRelease(csContext.getLastNodeUpdateTime(),
rmContainer.getAllocatedNode(), rmContainer.getContainerId(),
getQueuePath());
}
}
@Override
public void completedContainer(Resource clusterResource,
FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event, CSQueue childQueue,
boolean sortQueues) {
// Update SchedulerHealth for released / preempted container
updateSchedulerHealthForCompletedContainer(rmContainer, containerStatus);
if (application != null) {
// unreserve container increase request if it previously reserved.
if (rmContainer.hasIncreaseReservation()) {
unreserveIncreasedContainer(clusterResource, application, node,
rmContainer);
}
// Remove container increase request if it exists
application.removeIncreaseRequest(node.getNodeID(),
rmContainer.getAllocatedPriority(), rmContainer.getContainerId());
boolean removed = false;
// Careful! Locking order is important!
synchronized (this) {
Container container = rmContainer.getContainer();
// Inform the application & the node
// Note: It's safe to assume that all state changes to RMContainer
// happen under scheduler's lock...
// So, this is, in effect, a transaction across application & node
if (rmContainer.getState() == RMContainerState.RESERVED) {
removed = application.unreserve(rmContainer.getReservedPriority(),
node, rmContainer);
} else {
removed =
application.containerCompleted(rmContainer, containerStatus,
event, node.getPartition());
node.releaseContainer(rmContainer.getContainerId(), false);
}
// Book-keeping
if (removed) {
// Inform the ordering policy
orderingPolicy.containerReleased(application, rmContainer);
releaseResource(clusterResource, application, container.getResource(),
node.getPartition(), rmContainer, false);
}
}
if (removed) {
// Inform the parent queue _outside_ of the leaf-queue lock
getParent().completedContainer(clusterResource, application, node,
rmContainer, null, event, this, sortQueues);
}
}
// Notify PreemptionManager
csContext.getPreemptionManager().removeKillableContainer(
new KillableContainer(rmContainer, node.getPartition(), queueName));
}
synchronized void allocateResource(Resource clusterResource,
SchedulerApplicationAttempt application, Resource resource,
String nodePartition, RMContainer rmContainer,
boolean isIncreasedAllocation) {
super.allocateResource(clusterResource, resource, nodePartition,
isIncreasedAllocation);
Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition,
clusterResource);
// handle ignore exclusivity container
if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
RMNodeLabelsManager.NO_LABEL)
&& !nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
TreeSet<RMContainer> rmContainers = null;
if (null == (rmContainers =
ignorePartitionExclusivityRMContainers.get(nodePartition))) {
rmContainers = new TreeSet<>();
ignorePartitionExclusivityRMContainers.put(nodePartition, rmContainers);
}
rmContainers.add(rmContainer);
}
// Update user metrics
String userName = application.getUser();
User user = getUser(userName);
user.assignContainer(resource, nodePartition);
// Update usage ratios
updateQueueUsageRatio(nodePartition,
user.updateUsageRatio(resourceCalculator, resourceByLabel,
nodePartition));
// Note this is a bit unconventional since it gets the object and modifies
// it here, rather then using set routine
Resources.subtractFrom(application.getHeadroom(), resource); // headroom
metrics.setAvailableResourcesToUser(nodePartition,
userName, application.getHeadroom());
if (LOG.isDebugEnabled()) {
LOG.debug(getQueueName() +
" user=" + userName +
" used=" + queueUsage.getUsed() + " numContainers=" + numContainers +
" headroom = " + application.getHeadroom() +
" user-resources=" + user.getUsed()
);
}
}
synchronized void releaseResource(Resource clusterResource,
FiCaSchedulerApp application, Resource resource, String nodePartition,
RMContainer rmContainer, boolean isChangeResource) {
super.releaseResource(clusterResource, resource, nodePartition,
isChangeResource);
Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition,
clusterResource);
// handle ignore exclusivity container
if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
RMNodeLabelsManager.NO_LABEL)
&& !nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
if (ignorePartitionExclusivityRMContainers.containsKey(nodePartition)) {
Set<RMContainer> rmContainers =
ignorePartitionExclusivityRMContainers.get(nodePartition);
rmContainers.remove(rmContainer);
if (rmContainers.isEmpty()) {
ignorePartitionExclusivityRMContainers.remove(nodePartition);
}
}
}
// Update user metrics
String userName = application.getUser();
User user = getUser(userName);
user.releaseContainer(resource, nodePartition);
// Update usage ratios
updateQueueUsageRatio(nodePartition,
user.updateUsageRatio(resourceCalculator, resourceByLabel,
nodePartition));
metrics.setAvailableResourcesToUser(nodePartition,
userName, application.getHeadroom());
if (LOG.isDebugEnabled()) {
LOG.debug(getQueueName() +
" used=" + queueUsage.getUsed() + " numContainers=" + numContainers +
" user=" + userName + " user-resources=" + user.getUsed());
}
}
private void updateCurrentResourceLimits(
ResourceLimits currentResourceLimits, Resource clusterResource) {
// TODO: need consider non-empty node labels when resource limits supports
// node labels
// Even if ParentQueue will set limits respect child's max queue capacity,
// but when allocating reserved container, CapacityScheduler doesn't do
// this. So need cap limits by queue's max capacity here.
this.cachedResourceLimitsForHeadroom =
new ResourceLimits(currentResourceLimits.getLimit());
Resource queueMaxResource =
Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager
.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource),
queueCapacities
.getAbsoluteMaximumCapacity(RMNodeLabelsManager.NO_LABEL),
minimumAllocation);
this.cachedResourceLimitsForHeadroom.setLimit(Resources.min(
resourceCalculator, clusterResource, queueMaxResource,
currentResourceLimits.getLimit()));
}
@Override
public synchronized void updateClusterResource(Resource clusterResource,
ResourceLimits currentResourceLimits) {
updateCurrentResourceLimits(currentResourceLimits, clusterResource);
lastClusterResource = clusterResource;
// Update headroom info based on new cluster resource value
// absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity
// during allocation
setQueueResourceLimitsInfo(clusterResource);
// Update user consumedRatios
recalculateQueueUsageRatio(clusterResource, null);
// Update metrics
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
minimumAllocation, this, labelManager, null);
// queue metrics are updated, more resource may be available
// activate the pending applications if possible
activateApplications();
// Update application properties
for (FiCaSchedulerApp application :
orderingPolicy.getSchedulableEntities()) {
synchronized (application) {
computeUserLimitAndSetHeadroom(application, clusterResource,
RMNodeLabelsManager.NO_LABEL,
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null);
}
}
}
@Override
public void incUsedResource(String nodeLabel, Resource resourceToInc,
SchedulerApplicationAttempt application) {
getUser(application.getUser()).getResourceUsage().incUsed(nodeLabel,
resourceToInc);
super.incUsedResource(nodeLabel, resourceToInc, application);
}
@Override
public void decUsedResource(String nodeLabel, Resource resourceToDec,
SchedulerApplicationAttempt application) {
getUser(application.getUser()).getResourceUsage().decUsed(nodeLabel,
resourceToDec);
super.decUsedResource(nodeLabel, resourceToDec, application);
}
public void incAMUsedResource(String nodeLabel, Resource resourceToInc,
SchedulerApplicationAttempt application) {
getUser(application.getUser()).getResourceUsage().incAMUsed(nodeLabel,
resourceToInc);
// ResourceUsage has its own lock, no addition lock needs here.
queueUsage.incAMUsed(nodeLabel, resourceToInc);
}
public void decAMUsedResource(String nodeLabel, Resource resourceToDec,
SchedulerApplicationAttempt application) {
getUser(application.getUser()).getResourceUsage().decAMUsed(nodeLabel,
resourceToDec);
// ResourceUsage has its own lock, no addition lock needs here.
queueUsage.decAMUsed(nodeLabel, resourceToDec);
}
/*
* Usage Ratio
*/
static private class UsageRatios {
private Map<String, Float> usageRatios;
private ReadLock readLock;
private WriteLock writeLock;
public UsageRatios() {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
usageRatios = new HashMap<String, Float>();
}
private void incUsageRatio(String label, float delta) {
try {
writeLock.lock();
Float fl = usageRatios.get(label);
if (null == fl) {
fl = new Float(0.0);
}
fl += delta;
usageRatios.put(label, new Float(fl));
} finally {
writeLock.unlock();
}
}
float getUsageRatio(String label) {
try {
readLock.lock();
Float f = usageRatios.get(label);
if (null == f) {
return 0.0f;
}
return f;
} finally {
readLock.unlock();
}
}
private void setUsageRatio(String label, float ratio) {
try {
writeLock.lock();
usageRatios.put(label, new Float(ratio));
} finally {
writeLock.unlock();
}
}
}
@VisibleForTesting
public float getUsageRatio(String label) {
return qUsageRatios.getUsageRatio(label);
}
@VisibleForTesting
public static class User {
ResourceUsage userResourceUsage = new ResourceUsage();
volatile Resource userResourceLimit = Resource.newInstance(0, 0);
int pendingApplications = 0;
int activeApplications = 0;
private UsageRatios userUsageRatios = new UsageRatios();
String userName;
float weight = 1.0f;
public User(String name) {
this.userName = name;
}
public ResourceUsage getResourceUsage() {
return userResourceUsage;
}
public synchronized float resetAndUpdateUsageRatio(
ResourceCalculator resourceCalculator,
Resource resource, String nodePartition) {
userUsageRatios.setUsageRatio(nodePartition, 0);
return updateUsageRatio(resourceCalculator, resource, nodePartition);
}
public synchronized float updateUsageRatio(
ResourceCalculator resourceCalculator,
Resource resource, String nodePartition) {
float delta;
float newRatio =
Resources.ratio(resourceCalculator, getUsed(nodePartition), resource);
delta = newRatio - userUsageRatios.getUsageRatio(nodePartition);
userUsageRatios.setUsageRatio(nodePartition, newRatio);
return delta;
}
public Resource getUsed() {
return userResourceUsage.getUsed();
}
public Resource getAllUsed() {
return userResourceUsage.getAllUsed();
}
public Resource getUsed(String label) {
return userResourceUsage.getUsed(label);
}
public int getPendingApplications() {
return pendingApplications;
}
public int getActiveApplications() {
return activeApplications;
}
public Resource getConsumedAMResources() {
return userResourceUsage.getAMUsed();
}
public Resource getConsumedAMResources(String label) {
return userResourceUsage.getAMUsed(label);
}
public int getTotalApplications() {
return getPendingApplications() + getActiveApplications();
}
public synchronized void submitApplication() {
++pendingApplications;
}
public synchronized void activateApplication() {
--pendingApplications;
++activeApplications;
}
public synchronized void finishApplication(boolean wasActive) {
if (wasActive) {
--activeApplications;
}
else {
--pendingApplications;
}
}
public void assignContainer(Resource resource, String nodePartition) {
userResourceUsage.incUsed(nodePartition, resource);
}
public void releaseContainer(Resource resource, String nodePartition) {
userResourceUsage.decUsed(nodePartition, resource);
}
public Resource getUserResourceLimit() {
return userResourceLimit;
}
public void setUserResourceLimit(Resource userResourceLimit) {
this.userResourceLimit = userResourceLimit;
}
public String getUserName() {
return this.userName;
}
@VisibleForTesting
public void setResourceUsage(ResourceUsage resourceUsage) {
this.userResourceUsage = resourceUsage;
}
/**
* @return the weight
*/
public float getWeight() {
return weight;
}
/**
* @param weight the weight to set
*/
public void setWeight(float weight) {
this.weight = weight;
}
}
@Override
public void recoverContainer(Resource clusterResource,
SchedulerApplicationAttempt attempt, RMContainer rmContainer) {
if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
return;
}
// Careful! Locking order is important!
synchronized (this) {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
allocateResource(clusterResource, attempt, rmContainer.getContainer()
.getResource(), node.getPartition(), rmContainer, false);
}
getParent().recoverContainer(clusterResource, attempt, rmContainer);
}
/**
* Obtain (read-only) collection of pending applications.
*/
public Collection<FiCaSchedulerApp> getPendingApplications() {
return Collections.unmodifiableCollection(pendingOrderingPolicy
.getSchedulableEntities());
}
/**
* Obtain (read-only) collection of active applications.
*/
public synchronized Collection<FiCaSchedulerApp> getApplications() {
return Collections.unmodifiableCollection(orderingPolicy
.getSchedulableEntities());
}
/**
* Obtain (read-only) collection of all applications.
*/
public synchronized Collection<FiCaSchedulerApp> getAllApplications() {
Collection<FiCaSchedulerApp> apps = new HashSet<FiCaSchedulerApp>(
pendingOrderingPolicy.getSchedulableEntities());
apps.addAll(orderingPolicy.getSchedulableEntities());
return Collections.unmodifiableCollection(apps);
}
// Consider the headroom for each user in the queue.
// Total pending for the queue =
// sum(for each user(min((user's headroom), sum(user's pending requests))))
// NOTE: Used for calculating pedning resources in the preemption monitor.
public synchronized Resource getTotalPendingResourcesConsideringUserLimit(
Resource resources, String partition) {
Map<String, Resource> userNameToHeadroom = new HashMap<String, Resource>();
Resource pendingConsideringUserLimit = Resource.newInstance(0, 0);
for (FiCaSchedulerApp app : getApplications()) {
String userName = app.getUser();
if (!userNameToHeadroom.containsKey(userName)) {
User user = getUser(userName);
Resource headroom = Resources.subtract(
computeUserLimit(app.getUser(), resources, user, partition,
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, true),
user.getUsed(partition));
// Make sure headroom is not negative.
headroom = Resources.componentwiseMax(headroom, Resources.none());
userNameToHeadroom.put(userName, headroom);
}
Resource minpendingConsideringUserLimit =
Resources.componentwiseMin(userNameToHeadroom.get(userName),
app.getAppAttemptResourceUsage().getPending(partition));
Resources.addTo(pendingConsideringUserLimit,
minpendingConsideringUserLimit);
Resources.subtractFrom(
userNameToHeadroom.get(userName), minpendingConsideringUserLimit);
}
return pendingConsideringUserLimit;
}
public synchronized Resource getUserLimitPerUser(String userName,
Resource resources, String partition) {
// Check user resource limit
User user = getUser(userName);
return computeUserLimit(userName, resources, user, partition,
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, true);
}
@Override
public synchronized void collectSchedulerApplications(
Collection<ApplicationAttemptId> apps) {
for (FiCaSchedulerApp pendingApp : pendingOrderingPolicy
.getSchedulableEntities()) {
apps.add(pendingApp.getApplicationAttemptId());
}
for (FiCaSchedulerApp app :
orderingPolicy.getSchedulableEntities()) {
apps.add(app.getApplicationAttemptId());
}
}
@Override
public void attachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer rmContainer) {
if (application != null) {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
allocateResource(clusterResource, application, rmContainer.getContainer()
.getResource(), node.getPartition(), rmContainer, false);
LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
+ " resource=" + rmContainer.getContainer().getResource()
+ " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity()
+ " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
+ queueUsage.getUsed() + " cluster=" + clusterResource);
// Inform the parent queue
getParent().attachContainer(clusterResource, application, rmContainer);
}
}
@Override
public void detachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer rmContainer) {
if (application != null) {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
releaseResource(clusterResource, application, rmContainer.getContainer()
.getResource(), node.getPartition(), rmContainer, false);
LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
+ " resource=" + rmContainer.getContainer().getResource()
+ " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity()
+ " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
+ queueUsage.getUsed() + " cluster=" + clusterResource);
// Inform the parent queue
getParent().detachContainer(clusterResource, application, rmContainer);
}
}
/**
* @return all ignored partition exclusivity RMContainers in the LeafQueue,
* this will be used by preemption policy, and use of return
* ignorePartitionExclusivityRMContainer should protected by LeafQueue
* synchronized lock
*/
public synchronized Map<String, TreeSet<RMContainer>>
getIgnoreExclusivityRMContainers() {
return ignorePartitionExclusivityRMContainers;
}
public void setCapacity(float capacity) {
queueCapacities.setCapacity(capacity);
}
public void setAbsoluteCapacity(float absoluteCapacity) {
queueCapacities.setAbsoluteCapacity(absoluteCapacity);
}
public void setMaxApplications(int maxApplications) {
this.maxApplications = maxApplications;
}
public synchronized OrderingPolicy<FiCaSchedulerApp>
getOrderingPolicy() {
return orderingPolicy;
}
public synchronized void setOrderingPolicy(
OrderingPolicy<FiCaSchedulerApp> orderingPolicy) {
if (null != this.orderingPolicy) {
orderingPolicy.addAllSchedulableEntities(this.orderingPolicy
.getSchedulableEntities());
}
this.orderingPolicy = orderingPolicy;
}
@Override
public Priority getDefaultApplicationPriority() {
return defaultAppPriorityPerQueue;
}
/**
*
* @param clusterResource Total cluster resource
* @param decreaseRequest The decrease request
* @param app The application of interest
*/
@Override
public void decreaseContainer(Resource clusterResource,
SchedContainerChangeRequest decreaseRequest,
FiCaSchedulerApp app) throws InvalidResourceRequestException {
// If the container being decreased is reserved, we need to unreserve it
// first.
RMContainer rmContainer = decreaseRequest.getRMContainer();
if (rmContainer.hasIncreaseReservation()) {
unreserveIncreasedContainer(clusterResource, app,
(FiCaSchedulerNode)decreaseRequest.getSchedulerNode(), rmContainer);
}
boolean resourceDecreased = false;
Resource resourceBeforeDecrease;
// Grab queue lock to avoid race condition when getting container resource
synchronized (this) {
// Make sure the decrease request is valid in terms of current resource
// and target resource. This must be done under the leaf queue lock.
// Throws exception if the check fails.
RMServerUtils.checkSchedContainerChangeRequest(decreaseRequest, false);
// Save resource before decrease for debug log
resourceBeforeDecrease =
Resources.clone(rmContainer.getAllocatedResource());
// Do we have increase request for the same container? If so, remove it
boolean hasIncreaseRequest =
app.removeIncreaseRequest(decreaseRequest.getNodeId(),
decreaseRequest.getPriority(), decreaseRequest.getContainerId());
if (hasIncreaseRequest) {
if (LOG.isDebugEnabled()) {
LOG.debug("While processing decrease requests, found an increase"
+ " request for the same container "
+ decreaseRequest.getContainerId()
+ ", removed the increase request");
}
}
// Delta capacity is negative when it's a decrease request
Resource absDelta = Resources.negate(decreaseRequest.getDeltaCapacity());
if (Resources.equals(absDelta, Resources.none())) {
// If delta capacity of this decrease request is 0, this decrease
// request serves the purpose of cancelling an existing increase request
// if any
if (LOG.isDebugEnabled()) {
LOG.debug("Decrease target resource equals to existing resource for"
+ " container:" + decreaseRequest.getContainerId()
+ " ignore this decrease request.");
}
} else {
// Release the delta resource
releaseResource(clusterResource, app, absDelta,
decreaseRequest.getNodePartition(),
decreaseRequest.getRMContainer(),
true);
// Notify application
app.decreaseContainer(decreaseRequest);
// Notify node
decreaseRequest.getSchedulerNode()
.decreaseContainer(decreaseRequest.getContainerId(), absDelta);
resourceDecreased = true;
}
}
if (resourceDecreased) {
// Notify parent queue outside of leaf queue lock
getParent().decreaseContainer(clusterResource, decreaseRequest, app);
LOG.info("Application attempt " + app.getApplicationAttemptId()
+ " decreased container:" + decreaseRequest.getContainerId()
+ " from " + resourceBeforeDecrease + " to "
+ decreaseRequest.getTargetCapacity());
}
}
public synchronized OrderingPolicy<FiCaSchedulerApp>
getPendingAppsOrderingPolicy() {
return pendingOrderingPolicy;
}
/*
* Holds shared values used by all applications in
* the queue to calculate headroom on demand
*/
static class QueueResourceLimitsInfo {
private Resource queueCurrentLimit;
private Resource clusterResource;
public void setQueueCurrentLimit(Resource currentLimit) {
this.queueCurrentLimit = currentLimit;
}
public Resource getQueueCurrentLimit() {
return queueCurrentLimit;
}
public void setClusterResource(Resource clusterResource) {
this.clusterResource = clusterResource;
}
public Resource getClusterResource() {
return clusterResource;
}
}
static class CachedUserLimit {
final Resource userLimit;
boolean canAssign = true;
Resource reservation = Resources.none();
CachedUserLimit(Resource userLimit) {
this.userLimit = userLimit;
}
}
/**
* Get all valid users in this queue.
* @return user list
*/
public Set<String> getAllUsers() {
return this.users.keySet();
}
public synchronized Resource getResourceLimitForAllUsers(String userName,
Resource clusterResource, String partition, SchedulingMode schedulingMode)
{
return computeUserLimit(userName, clusterResource, getUser(userName),
partition, schedulingMode, false);
}
}