blob: 104a89caee0c00041f1b53050db520e608a41fde [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.IteratorSelector;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.collect.Sets;
@Private
@Unstable
public class LeafQueue extends AbstractCSQueue {
private static final Logger LOG =
LoggerFactory.getLogger(LeafQueue.class);
private float absoluteUsedCapacity = 0.0f;
protected int maxApplications;
protected volatile int maxApplicationsPerUser;
private float maxAMResourcePerQueuePercent;
private volatile int nodeLocalityDelay;
private volatile int rackLocalityAdditionalDelay;
private volatile boolean rackLocalityFullReset;
Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap =
new ConcurrentHashMap<>();
private Priority defaultAppPriorityPerQueue;
private final OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicy;
private volatile float minimumAllocationFactor;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
private CapacitySchedulerContext scheduler;
private final UsersManager usersManager;
// cache last cluster resource to compute actual capacity
private Resource lastClusterResource = Resources.none();
private final QueueResourceLimitsInfo queueResourceLimitsInfo =
new QueueResourceLimitsInfo();
private volatile ResourceLimits cachedResourceLimitsForHeadroom = null;
private volatile OrderingPolicy<FiCaSchedulerApp> orderingPolicy = null;
// Map<Partition, Map<SchedulingMode, Map<User, CachedUserLimit>>>
// Not thread safe: only the last level is a ConcurrentMap
@VisibleForTesting
Map<String, Map<SchedulingMode, ConcurrentMap<String, CachedUserLimit>>>
userLimitsCache = new HashMap<>();
// Not thread safe
@VisibleForTesting
long currentUserLimitCacheVersion = 0;
// record all ignore partition exclusivityRMContainer, this will be used to do
// preemption, key is the partition of the RMContainer allocated on
private Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityRMContainers =
new ConcurrentHashMap<>();
List<AppPriorityACLGroup> priorityAcls =
new ArrayList<AppPriorityACLGroup>();
private final List<FiCaSchedulerApp> runnableApps = new ArrayList<>();
private final List<FiCaSchedulerApp> nonRunnableApps = new ArrayList<>();
@SuppressWarnings({ "unchecked", "rawtypes" })
public LeafQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
this(cs, cs.getConfiguration(), queueName, parent, old);
}
public LeafQueue(CapacitySchedulerContext cs,
CapacitySchedulerConfiguration configuration,
String queueName, CSQueue parent, CSQueue old) throws
IOException {
super(cs, configuration, queueName, parent, old);
this.scheduler = cs;
this.usersManager = new UsersManager(metrics, this, labelManager, scheduler,
resourceCalculator);
// One time initialization is enough since it is static ordering policy
this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps();
LOG.debug("LeafQueue: name={}, fullname={}", queueName, getQueuePath());
setupQueueConfigs(cs.getClusterResource(), configuration);
}
@SuppressWarnings("checkstyle:nowhitespaceafter")
protected void setupQueueConfigs(Resource clusterResource,
CapacitySchedulerConfiguration conf) throws
IOException {
writeLock.lock();
try {
CapacitySchedulerConfiguration schedConf = csContext.getConfiguration();
super.setupQueueConfigs(clusterResource, conf);
this.lastClusterResource = clusterResource;
this.cachedResourceLimitsForHeadroom = new ResourceLimits(
clusterResource);
// Initialize headroom info, also used for calculating application
// master resource limits. Since this happens during queue initialization
// and all queues may not be realized yet, we'll use (optimistic)
// absoluteMaxCapacity (it will be replaced with the more accurate
// absoluteMaxAvailCapacity during headroom/userlimit/allocation events)
setQueueResourceLimitsInfo(clusterResource);
setOrderingPolicy(
conf.<FiCaSchedulerApp>getAppOrderingPolicy(getQueuePath()));
usersManager.setUserLimit(conf.getUserLimit(getQueuePath()));
usersManager.setUserLimitFactor(conf.getUserLimitFactor(getQueuePath()));
maxAMResourcePerQueuePercent =
conf.getMaximumApplicationMasterResourcePerQueuePercent(
getQueuePath());
maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath());
if (maxApplications < 0) {
int maxGlobalPerQueueApps =
csContext.getConfiguration().getGlobalMaximumApplicationsPerQueue();
if (maxGlobalPerQueueApps > 0) {
maxApplications = maxGlobalPerQueueApps;
}
}
priorityAcls = conf.getPriorityAcls(getQueuePath(),
scheduler.getMaxClusterLevelAppPriority());
if (!SchedulerUtils.checkQueueLabelExpression(this.accessibleLabels,
this.defaultLabelExpression, null)) {
throw new IOException(
"Invalid default label expression of " + " queue=" + getQueuePath()
+ " doesn't have permission to access all labels "
+ "in default label expression. labelExpression of resource request="
+ (this.defaultLabelExpression == null ?
"" :
this.defaultLabelExpression) + ". Queue labels=" + (
getAccessibleNodeLabels() == null ?
"" :
StringUtils
.join(getAccessibleNodeLabels().iterator(), ',')));
}
nodeLocalityDelay = schedConf.getNodeLocalityDelay();
rackLocalityAdditionalDelay = schedConf
.getRackLocalityAdditionalDelay();
rackLocalityFullReset = schedConf
.getRackLocalityFullReset();
// re-init this since max allocation could have changed
this.minimumAllocationFactor = Resources.ratio(resourceCalculator,
Resources.subtract(maximumAllocation, minimumAllocation),
maximumAllocation);
StringBuilder aclsString = new StringBuilder();
for (Map.Entry<AccessType, AccessControlList> e : acls.entrySet()) {
aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
}
StringBuilder labelStrBuilder = new StringBuilder();
if (accessibleLabels != null) {
for (String s : accessibleLabels) {
labelStrBuilder.append(s)
.append(",");
}
}
defaultAppPriorityPerQueue = Priority.newInstance(
conf.getDefaultApplicationPriorityConfPerQueue(getQueuePath()));
// Validate leaf queue's user's weights.
int queueUL = Math.min(100, conf.getUserLimit(getQueuePath()));
for (Entry<String, Float> e : getUserWeights().entrySet()) {
float val = e.getValue().floatValue();
if (val < 0.0f || val > (100.0f / queueUL)) {
throw new IOException("Weight (" + val + ") for user \"" + e.getKey()
+ "\" must be between 0 and" + " 100 / " + queueUL + " (= " +
100.0f/queueUL + ", the number of concurrent active users in "
+ getQueuePath() + ")");
}
}
usersManager.updateUserWeights();
LOG.info(
"Initializing " + getQueuePath() + "\n" +
getExtendedCapacityOrWeightString() + "\n"
+ "absoluteCapacity = " + queueCapacities.getAbsoluteCapacity()
+ " [= parentAbsoluteCapacity * capacity ]" + "\n"
+ "maxCapacity = " + queueCapacities.getMaximumCapacity()
+ " [= configuredMaxCapacity ]" + "\n" + "absoluteMaxCapacity = "
+ queueCapacities.getAbsoluteMaximumCapacity()
+ " [= 1.0 maximumCapacity undefined, "
+ "(parentAbsoluteMaxCapacity * maximumCapacity) / 100 otherwise ]"
+ "\n" + "effectiveMinResource=" +
getEffectiveCapacity(CommonNodeLabelsManager.NO_LABEL) + "\n"
+ " , effectiveMaxResource=" +
getEffectiveMaxCapacity(CommonNodeLabelsManager.NO_LABEL)
+ "\n" + "userLimit = " + usersManager.getUserLimit()
+ " [= configuredUserLimit ]" + "\n" + "userLimitFactor = "
+ usersManager.getUserLimitFactor()
+ " [= configuredUserLimitFactor ]" + "\n" + "maxApplications = "
+ maxApplications
+ " [= configuredMaximumSystemApplicationsPerQueue or"
+ " (int)(configuredMaximumSystemApplications * absoluteCapacity)]"
+ "\n" + "maxApplicationsPerUser = " + maxApplicationsPerUser
+ " [= (int)(maxApplications * (userLimit / 100.0f) * "
+ "userLimitFactor) ]" + "\n"
+ "maxParallelApps = " + getMaxParallelApps() + "\n"
+ "usedCapacity = " +
+ queueCapacities.getUsedCapacity() + " [= usedResourcesMemory / "
+ "(clusterResourceMemory * absoluteCapacity)]" + "\n"
+ "absoluteUsedCapacity = " + absoluteUsedCapacity
+ " [= usedResourcesMemory / clusterResourceMemory]" + "\n"
+ "maxAMResourcePerQueuePercent = " + maxAMResourcePerQueuePercent
+ " [= configuredMaximumAMResourcePercent ]" + "\n"
+ "minimumAllocationFactor = " + minimumAllocationFactor
+ " [= (float)(maximumAllocationMemory - minimumAllocationMemory) / "
+ "maximumAllocationMemory ]" + "\n" + "maximumAllocation = "
+ maximumAllocation + " [= configuredMaxAllocation ]" + "\n"
+ "numContainers = " + numContainers
+ " [= currentNumContainers ]" + "\n" + "state = " + getState()
+ " [= configuredState ]" + "\n" + "acls = " + aclsString
+ " [= configuredAcls ]" + "\n"
+ "nodeLocalityDelay = " + nodeLocalityDelay + "\n"
+ "rackLocalityAdditionalDelay = "
+ rackLocalityAdditionalDelay + "\n"
+ "labels=" + labelStrBuilder.toString() + "\n"
+ "reservationsContinueLooking = "
+ reservationsContinueLooking + "\n" + "preemptionDisabled = "
+ getPreemptionDisabled() + "\n" + "defaultAppPriorityPerQueue = "
+ defaultAppPriorityPerQueue + "\npriority = " + priority
+ "\nmaxLifetime = " + getMaximumApplicationLifetime()
+ " seconds" + "\ndefaultLifetime = "
+ getDefaultApplicationLifetime() + " seconds");
} finally {
writeLock.unlock();
}
}
/**
* Used only by tests.
*/
@Private
public float getMinimumAllocationFactor() {
return minimumAllocationFactor;
}
/**
* Used only by tests.
*/
@Private
public float getMaxAMResourcePerQueuePercent() {
return maxAMResourcePerQueuePercent;
}
public int getMaxApplications() {
return maxApplications;
}
public int getMaxApplicationsPerUser() {
return maxApplicationsPerUser;
}
/**
*
* @return UsersManager instance.
*/
public UsersManager getUsersManager() {
return usersManager;
}
@Override
public AbstractUsersManager getAbstractUsersManager() {
return usersManager;
}
@Override
public List<CSQueue> getChildQueues() {
return null;
}
/**
* Set user limit - used only for testing.
* @param userLimit new user limit
*/
@VisibleForTesting
void setUserLimit(int userLimit) {
usersManager.setUserLimit(userLimit);
usersManager.userLimitNeedsRecompute();
}
/**
* Set user limit factor - used only for testing.
* @param userLimitFactor new user limit factor
*/
@VisibleForTesting
void setUserLimitFactor(float userLimitFactor) {
usersManager.setUserLimitFactor(userLimitFactor);
usersManager.userLimitNeedsRecompute();
}
@Override
public int getNumApplications() {
readLock.lock();
try {
return getNumPendingApplications() + getNumActiveApplications() +
getNumNonRunnableApps();
} finally {
readLock.unlock();
}
}
public int getNumPendingApplications() {
readLock.lock();
try {
return pendingOrderingPolicy.getNumSchedulableEntities();
} finally {
readLock.unlock();
}
}
public int getNumActiveApplications() {
readLock.lock();
try {
return orderingPolicy.getNumSchedulableEntities();
} finally {
readLock.unlock();
}
}
@Private
public int getNumPendingApplications(String user) {
readLock.lock();
try {
User u = getUser(user);
if (null == u) {
return 0;
}
return u.getPendingApplications();
} finally {
readLock.unlock();
}
}
@Private
public int getNumActiveApplications(String user) {
readLock.lock();
try {
User u = getUser(user);
if (null == u) {
return 0;
}
return u.getActiveApplications();
} finally {
readLock.unlock();
}
}
@Private
public int getUserLimit() {
return usersManager.getUserLimit();
}
@Private
public float getUserLimitFactor() {
return usersManager.getUserLimitFactor();
}
@Override
public QueueInfo getQueueInfo(
boolean includeChildQueues, boolean recursive) {
QueueInfo queueInfo = getQueueInfo();
return queueInfo;
}
@Override
public List<QueueUserACLInfo>
getQueueUserAclInfo(UserGroupInformation user) {
readLock.lock();
try {
QueueUserACLInfo userAclInfo = recordFactory.newRecordInstance(
QueueUserACLInfo.class);
List<QueueACL> operations = new ArrayList<>();
for (QueueACL operation : QueueACL.values()) {
if (hasAccess(operation, user)) {
operations.add(operation);
}
}
userAclInfo.setQueueName(getQueuePath());
userAclInfo.setUserAcls(operations);
return Collections.singletonList(userAclInfo);
} finally {
readLock.unlock();
}
}
public String toString() {
readLock.lock();
try {
return getQueuePath() + ": " + getCapacityOrWeightString()
+ ", " + "absoluteCapacity=" + queueCapacities.getAbsoluteCapacity()
+ ", " + "usedResources=" + queueUsage.getUsed() + ", "
+ "usedCapacity=" + getUsedCapacity() + ", " + "absoluteUsedCapacity="
+ getAbsoluteUsedCapacity() + ", " + "numApps=" + getNumApplications()
+ ", " + "numContainers=" + getNumContainers() + ", "
+ "effectiveMinResource=" +
getEffectiveCapacity(CommonNodeLabelsManager.NO_LABEL) +
" , effectiveMaxResource=" +
getEffectiveMaxCapacity(CommonNodeLabelsManager.NO_LABEL);
} finally {
readLock.unlock();
}
}
protected String getExtendedCapacityOrWeightString() {
if (queueCapacities.getWeight() != -1) {
return "weight = " + queueCapacities.getWeight()
+ " [= (float) configuredCapacity (with w suffix)] " + "\n"
+ "normalizedWeight = " + queueCapacities.getNormalizedWeight()
+ " [= (float) configuredCapacity / sum(configuredCapacity of " +
"all queues under the parent)]";
} else {
return "capacity = " + queueCapacities.getCapacity()
+ " [= (float) configuredCapacity / 100 ]";
}
}
@VisibleForTesting
public User getUser(String userName) {
return usersManager.getUser(userName);
}
@Private
public List<AppPriorityACLGroup> getPriorityACLs() {
readLock.lock();
try {
return new ArrayList<>(priorityAcls);
} finally {
readLock.unlock();
}
}
protected void reinitialize(
CSQueue newlyParsedQueue, Resource clusterResource,
CapacitySchedulerConfiguration configuration) throws
IOException {
writeLock.lock();
try {
// We skip reinitialize for dynamic queues, when this is called, and
// new queue is different from this queue, we will make this queue to be
// static queue.
if (newlyParsedQueue != this) {
this.setDynamicQueue(false);
}
// Sanity check
if (!(newlyParsedQueue instanceof LeafQueue) || !newlyParsedQueue
.getQueuePath().equals(getQueuePath())) {
throw new IOException(
"Trying to reinitialize " + getQueuePath() + " from "
+ newlyParsedQueue.getQueuePath());
}
LeafQueue newlyParsedLeafQueue = (LeafQueue) newlyParsedQueue;
// don't allow the maximum allocation to be decreased in size
// since we have already told running AM's the size
Resource oldMax = getMaximumAllocation();
Resource newMax = newlyParsedLeafQueue.getMaximumAllocation();
if (!Resources.fitsIn(oldMax, newMax)) {
throw new IOException("Trying to reinitialize " + getQueuePath()
+ " the maximum allocation size can not be decreased!"
+ " Current setting: " + oldMax + ", trying to set it to: "
+ newMax);
}
setupQueueConfigs(clusterResource, configuration);
} finally {
writeLock.unlock();
}
}
@Override
public void reinitialize(
CSQueue newlyParsedQueue, Resource clusterResource)
throws IOException {
reinitialize(newlyParsedQueue, clusterResource,
csContext.getConfiguration());
}
@Override
public void submitApplicationAttempt(FiCaSchedulerApp application,
String userName) {
submitApplicationAttempt(application, userName, false);
}
@Override
public void submitApplicationAttempt(FiCaSchedulerApp application,
String userName, boolean isMoveApp) {
// Careful! Locking order is important!
writeLock.lock();
try {
// TODO, should use getUser, use this method just to avoid UT failure
// which is caused by wrong invoking order, will fix UT separately
User user = usersManager.getUserAndAddIfAbsent(userName);
// Add the attempt to our data-structures
addApplicationAttempt(application, user);
} finally {
writeLock.unlock();
}
// We don't want to update metrics for move app
if (!isMoveApp) {
metrics.submitAppAttempt(userName);
}
getParent().submitApplicationAttempt(application, userName);
}
@Override
public void submitApplication(ApplicationId applicationId, String userName,
String queue) throws AccessControlException {
// Careful! Locking order is important!
validateSubmitApplication(applicationId, userName, queue);
// Inform the parent queue
try {
getParent().submitApplication(applicationId, userName, queue);
} catch (AccessControlException ace) {
LOG.info("Failed to submit application to parent-queue: " +
getParent().getQueuePath(), ace);
throw ace;
}
}
public void validateSubmitApplication(ApplicationId applicationId,
String userName, String queue) throws AccessControlException {
writeLock.lock();
try {
// Check if the queue is accepting jobs
if (getState() != QueueState.RUNNING) {
String msg = "Queue " + getQueuePath()
+ " is STOPPED. Cannot accept submission of application: "
+ applicationId;
LOG.info(msg);
throw new AccessControlException(msg);
}
// Check submission limits for queues
//TODO recalculate max applications because they can depend on capacity
if (getNumApplications() >= getMaxApplications() && !(this instanceof AutoCreatedLeafQueue)) {
String msg =
"Queue " + getQueuePath() + " already has " + getNumApplications()
+ " applications,"
+ " cannot accept submission of application: " + applicationId;
LOG.info(msg);
throw new AccessControlException(msg);
}
// Check submission limits for the user on this queue
User user = usersManager.getUserAndAddIfAbsent(userName);
//TODO recalculate max applications because they can depend on capacity
if (user.getTotalApplications() >= getMaxApplicationsPerUser() && !(this instanceof AutoCreatedLeafQueue)) {
String msg = "Queue " + getQueuePath() + " already has " + user
.getTotalApplications() + " applications from user " + userName
+ " cannot accept submission of application: " + applicationId;
LOG.info(msg);
throw new AccessControlException(msg);
}
} finally {
writeLock.unlock();
}
try {
getParent().validateSubmitApplication(applicationId, userName, queue);
} catch (AccessControlException ace) {
LOG.info("Failed to submit application to parent-queue: " +
getParent().getQueuePath(), ace);
throw ace;
}
}
public Resource getAMResourceLimit() {
return queueUsage.getAMLimit();
}
public Resource getAMResourceLimitPerPartition(String nodePartition) {
return queueUsage.getAMLimit(nodePartition);
}
@VisibleForTesting
public Resource calculateAndGetAMResourceLimit() {
return calculateAndGetAMResourceLimitPerPartition(
RMNodeLabelsManager.NO_LABEL);
}
@VisibleForTesting
public Resource getUserAMResourceLimit() {
return getUserAMResourceLimitPerPartition(RMNodeLabelsManager.NO_LABEL,
null);
}
public Resource getUserAMResourceLimitPerPartition(
String nodePartition, String userName) {
float userWeight = 1.0f;
if (userName != null && getUser(userName) != null) {
userWeight = getUser(userName).getWeight();
}
readLock.lock();
try {
/*
* The user am resource limit is based on the same approach as the user
* limit (as it should represent a subset of that). This means that it uses
* the absolute queue capacity (per partition) instead of the max and is
* modified by the userlimit and the userlimit factor as is the userlimit
*/
float effectiveUserLimit = Math.max(usersManager.getUserLimit() / 100.0f,
1.0f / Math.max(getAbstractUsersManager().getNumActiveUsers(), 1));
float preWeightedUserLimit = effectiveUserLimit;
effectiveUserLimit = Math.min(effectiveUserLimit * userWeight, 1.0f);
Resource queuePartitionResource = getEffectiveCapacity(nodePartition);
Resource userAMLimit = Resources.multiplyAndNormalizeUp(
resourceCalculator, queuePartitionResource,
queueCapacities.getMaxAMResourcePercentage(nodePartition)
* effectiveUserLimit * usersManager.getUserLimitFactor(),
minimumAllocation);
if (getUserLimitFactor() == -1) {
userAMLimit = Resources.multiplyAndNormalizeUp(
resourceCalculator, queuePartitionResource,
queueCapacities.getMaxAMResourcePercentage(nodePartition),
minimumAllocation);
}
userAMLimit =
Resources.min(resourceCalculator, lastClusterResource,
userAMLimit,
Resources.clone(getAMResourceLimitPerPartition(nodePartition)));
Resource preWeighteduserAMLimit =
Resources.multiplyAndNormalizeUp(
resourceCalculator, queuePartitionResource,
queueCapacities.getMaxAMResourcePercentage(nodePartition)
* preWeightedUserLimit * usersManager.getUserLimitFactor(),
minimumAllocation);
if (getUserLimitFactor() == -1) {
preWeighteduserAMLimit = Resources.multiplyAndNormalizeUp(
resourceCalculator, queuePartitionResource,
queueCapacities.getMaxAMResourcePercentage(nodePartition),
minimumAllocation);
}
preWeighteduserAMLimit =
Resources.min(resourceCalculator, lastClusterResource,
preWeighteduserAMLimit,
Resources.clone(getAMResourceLimitPerPartition(nodePartition)));
queueUsage.setUserAMLimit(nodePartition, preWeighteduserAMLimit);
LOG.debug("Effective user AM limit for \"{}\":{}. Effective weighted"
+ " user AM limit: {}. User weight: {}", userName,
preWeighteduserAMLimit, userAMLimit, userWeight);
return userAMLimit;
} finally {
readLock.unlock();
}
}
public Resource calculateAndGetAMResourceLimitPerPartition(
String nodePartition) {
writeLock.lock();
try {
/*
* For non-labeled partition, get the max value from resources currently
* available to the queue and the absolute resources guaranteed for the
* partition in the queue. For labeled partition, consider only the absolute
* resources guaranteed. Multiply this value (based on labeled/
* non-labeled), * with per-partition am-resource-percent to get the max am
* resource limit for this queue and partition.
*/
Resource queuePartitionResource = getEffectiveCapacity(nodePartition);
Resource queueCurrentLimit = Resources.none();
// For non-labeled partition, we need to consider the current queue
// usage limit.
if (nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
synchronized (queueResourceLimitsInfo){
queueCurrentLimit = queueResourceLimitsInfo.getQueueCurrentLimit();
}
}
float amResourcePercent = queueCapacities.getMaxAMResourcePercentage(
nodePartition);
// Current usable resource for this queue and partition is the max of
// queueCurrentLimit and queuePartitionResource.
Resource queuePartitionUsableResource = Resources.max(resourceCalculator,
lastClusterResource, queueCurrentLimit, queuePartitionResource);
Resource amResouceLimit = Resources.multiplyAndNormalizeUp(
resourceCalculator, queuePartitionUsableResource, amResourcePercent,
minimumAllocation);
metrics.setAMResouceLimit(nodePartition, amResouceLimit);
queueUsage.setAMLimit(nodePartition, amResouceLimit);
LOG.debug("Queue: {}, node label : {}, queue partition resource : {},"
+ " queue current limit : {}, queue partition usable resource : {},"
+ " amResourceLimit : {}", getQueuePath(), nodePartition,
queuePartitionResource, queueCurrentLimit,
queuePartitionUsableResource, amResouceLimit);
return amResouceLimit;
} finally {
writeLock.unlock();
}
}
protected void activateApplications() {
writeLock.lock();
try {
// limit of allowed resource usage for application masters
Map<String, Resource> userAmPartitionLimit =
new HashMap<String, Resource>();
// AM Resource Limit for accessible labels can be pre-calculated.
// This will help in updating AMResourceLimit for all labels when queue
// is initialized for the first time (when no applications are present).
for (String nodePartition : getNodeLabelsForQueue()) {
calculateAndGetAMResourceLimitPerPartition(nodePartition);
}
for (Iterator<FiCaSchedulerApp> fsApp =
getPendingAppsOrderingPolicy()
.getAssignmentIterator(IteratorSelector.EMPTY_ITERATOR_SELECTOR);
fsApp.hasNext(); ) {
FiCaSchedulerApp application = fsApp.next();
ApplicationId applicationId = application.getApplicationId();
// Get the am-node-partition associated with each application
// and calculate max-am resource limit for this partition.
String partitionName = application.getAppAMNodePartitionName();
Resource amLimit = getAMResourceLimitPerPartition(partitionName);
// Verify whether we already calculated am-limit for this label.
if (amLimit == null) {
amLimit = calculateAndGetAMResourceLimitPerPartition(partitionName);
}
// Check am resource limit.
Resource amIfStarted = Resources.add(
application.getAMResource(partitionName),
queueUsage.getAMUsed(partitionName));
if (LOG.isDebugEnabled()) {
LOG.debug("application " + application.getId() + " AMResource "
+ application.getAMResource(partitionName)
+ " maxAMResourcePerQueuePercent " + maxAMResourcePerQueuePercent
+ " amLimit " + amLimit + " lastClusterResource "
+ lastClusterResource + " amIfStarted " + amIfStarted
+ " AM node-partition name " + partitionName);
}
if (!resourceCalculator.fitsIn(amIfStarted, amLimit)) {
if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual(
resourceCalculator, lastClusterResource,
queueUsage.getAMUsed(partitionName), Resources.none()))) {
LOG.warn("maximum-am-resource-percent is insufficient to start a"
+ " single application in queue, it is likely set too low."
+ " skipping enforcement to allow at least one application"
+ " to start");
} else{
application.updateAMContainerDiagnostics(AMState.INACTIVATED,
CSAMContainerLaunchDiagnosticsConstants.QUEUE_AM_RESOURCE_LIMIT_EXCEED);
LOG.debug("Not activating application {} as amIfStarted: {}"
+ " exceeds amLimit: {}", applicationId, amIfStarted, amLimit);
continue;
}
}
// Check user am resource limit
User user = getUser(application.getUser());
Resource userAMLimit = userAmPartitionLimit.get(partitionName);
// Verify whether we already calculated user-am-limit for this label.
if (userAMLimit == null) {
userAMLimit = getUserAMResourceLimitPerPartition(partitionName,
application.getUser());
userAmPartitionLimit.put(partitionName, userAMLimit);
}
Resource userAmIfStarted = Resources.add(
application.getAMResource(partitionName),
user.getConsumedAMResources(partitionName));
if (!resourceCalculator.fitsIn(userAmIfStarted, userAMLimit)) {
if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual(
resourceCalculator, lastClusterResource,
queueUsage.getAMUsed(partitionName), Resources.none()))) {
LOG.warn("maximum-am-resource-percent is insufficient to start a"
+ " single application in queue for user, it is likely set too"
+ " low. skipping enforcement to allow at least one application"
+ " to start");
} else{
application.updateAMContainerDiagnostics(AMState.INACTIVATED,
CSAMContainerLaunchDiagnosticsConstants.USER_AM_RESOURCE_LIMIT_EXCEED);
LOG.debug("Not activating application {} for user: {} as"
+ " userAmIfStarted: {} exceeds userAmLimit: {}",
applicationId, user, userAmIfStarted, userAMLimit);
continue;
}
}
user.activateApplication();
orderingPolicy.addSchedulableEntity(application);
application.updateAMContainerDiagnostics(AMState.ACTIVATED, null);
queueUsage.incAMUsed(partitionName,
application.getAMResource(partitionName));
user.getResourceUsage().incAMUsed(partitionName,
application.getAMResource(partitionName));
user.getResourceUsage().setAMLimit(partitionName, userAMLimit);
metrics.incAMUsed(partitionName, application.getUser(),
application.getAMResource(partitionName));
metrics.setAMResouceLimitForUser(partitionName,
application.getUser(), userAMLimit);
fsApp.remove();
LOG.info("Application " + applicationId + " from user: " + application
.getUser() + " activated in queue: " + getQueuePath());
}
} finally {
writeLock.unlock();
}
}
private void addApplicationAttempt(FiCaSchedulerApp application,
User user) {
writeLock.lock();
try {
applicationAttemptMap.put(application.getApplicationAttemptId(),
application);
if (application.isRunnable()) {
runnableApps.add(application);
LOG.debug("Adding runnable application: {}",
application.getApplicationAttemptId());
} else {
nonRunnableApps.add(application);
LOG.info("Application attempt {} is not runnable,"
+ " parallel limit reached", application.getApplicationAttemptId());
return;
}
// Accept
user.submitApplication();
getPendingAppsOrderingPolicy().addSchedulableEntity(application);
// Activate applications
if (Resources.greaterThan(resourceCalculator, lastClusterResource,
lastClusterResource, Resources.none())) {
activateApplications();
} else {
application.updateAMContainerDiagnostics(AMState.INACTIVATED,
CSAMContainerLaunchDiagnosticsConstants.CLUSTER_RESOURCE_EMPTY);
LOG.info("Skipping activateApplications for "
+ application.getApplicationAttemptId()
+ " since cluster resource is " + Resources.none());
}
LOG.info(
"Application added -" + " appId: " + application.getApplicationId()
+ " user: " + application.getUser() + "," + " leaf-queue: "
+ getQueuePath() + " #user-pending-applications: " + user
.getPendingApplications() + " #user-active-applications: " + user
.getActiveApplications() + " #queue-pending-applications: "
+ getNumPendingApplications() + " #queue-active-applications: "
+ getNumActiveApplications()
+ " #queue-nonrunnable-applications: "
+ getNumNonRunnableApps());
} finally {
writeLock.unlock();
}
}
@Override
public void finishApplication(ApplicationId application, String user) {
// Inform the activeUsersManager
usersManager.deactivateApplication(user, application);
appFinished();
// Inform the parent queue
getParent().finishApplication(application, user);
}
@Override
public void finishApplicationAttempt(FiCaSchedulerApp application, String queue) {
// Careful! Locking order is important!
removeApplicationAttempt(application, application.getUser());
getParent().finishApplicationAttempt(application, queue);
}
private void removeApplicationAttempt(
FiCaSchedulerApp application, String userName) {
writeLock.lock();
try {
// TODO, should use getUser, use this method just to avoid UT failure
// which is caused by wrong invoking order, will fix UT separately
User user = usersManager.getUserAndAddIfAbsent(userName);
boolean runnable = runnableApps.remove(application);
if (!runnable) {
// removeNonRunnableApp acquires the write lock again, which is fine
if (!removeNonRunnableApp(application)) {
LOG.error("Given app to remove " + application +
" does not exist in queue " + getQueuePath());
}
}
String partitionName = application.getAppAMNodePartitionName();
boolean wasActive = orderingPolicy.removeSchedulableEntity(application);
if (!wasActive) {
pendingOrderingPolicy.removeSchedulableEntity(application);
} else{
queueUsage.decAMUsed(partitionName,
application.getAMResource(partitionName));
user.getResourceUsage().decAMUsed(partitionName,
application.getAMResource(partitionName));
metrics.decAMUsed(partitionName, application.getUser(),
application.getAMResource(partitionName));
}
applicationAttemptMap.remove(application.getApplicationAttemptId());
user.finishApplication(wasActive);
if (user.getTotalApplications() == 0) {
usersManager.removeUser(application.getUser());
}
// Check if we can activate more applications
activateApplications();
LOG.info(
"Application removed -" + " appId: " + application.getApplicationId()
+ " user: " + application.getUser() + " queue: " + getQueuePath()
+ " #user-pending-applications: " + user.getPendingApplications()
+ " #user-active-applications: " + user.getActiveApplications()
+ " #queue-pending-applications: " + getNumPendingApplications()
+ " #queue-active-applications: " + getNumActiveApplications());
} finally {
writeLock.unlock();
}
}
private FiCaSchedulerApp getApplication(
ApplicationAttemptId applicationAttemptId) {
return applicationAttemptMap.get(applicationAttemptId);
}
private void setPreemptionAllowed(ResourceLimits limits, String nodePartition) {
// Set preemption-allowed:
// For leaf queue, only under-utilized queue is allowed to preempt resources from other queues
if (!queueResourceQuotas.getEffectiveMinResource(nodePartition)
.equals(Resources.none())) {
limits.setIsAllowPreemption(Resources.lessThan(resourceCalculator,
csContext.getClusterResource(), queueUsage.getUsed(nodePartition),
queueResourceQuotas.getEffectiveMinResource(nodePartition)));
return;
}
float usedCapacity = queueCapacities.getAbsoluteUsedCapacity(nodePartition);
float guaranteedCapacity = queueCapacities.getAbsoluteCapacity(nodePartition);
limits.setIsAllowPreemption(usedCapacity < guaranteedCapacity);
}
private CSAssignment allocateFromReservedContainer(Resource clusterResource,
CandidateNodeSet<FiCaSchedulerNode> candidates,
ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) {
// Irrespective of Single / Multi Node Placement, the allocate from
// Reserved Container has to happen only for the single node which
// CapacityScheduler#allocateFromReservedContainer invokes with.
// Else In Multi Node Placement, there won't be any Allocation or
// Reserve of new containers when there is a RESERVED container on
// a node which is full.
FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
if (node != null) {
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
FiCaSchedulerApp application = getApplication(
reservedContainer.getApplicationAttemptId());
if (null != application) {
ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
node, SystemClock.getInstance().getTime(), application);
CSAssignment assignment = application.assignContainers(
clusterResource, candidates, currentResourceLimits,
schedulingMode, reservedContainer);
return assignment;
}
}
}
return null;
}
private ConcurrentMap<String, CachedUserLimit> getUserLimitCache(
String partition,
SchedulingMode schedulingMode) {
synchronized (userLimitsCache) {
long latestVersion = usersManager.getLatestVersionOfUsersState();
if (latestVersion != this.currentUserLimitCacheVersion) {
// User limits cache needs invalidating
this.currentUserLimitCacheVersion = latestVersion;
userLimitsCache.clear();
Map<SchedulingMode, ConcurrentMap<String, CachedUserLimit>>
uLCByPartition = new HashMap<>();
userLimitsCache.put(partition, uLCByPartition);
ConcurrentMap<String, CachedUserLimit> uLCBySchedulingMode =
new ConcurrentHashMap<>();
uLCByPartition.put(schedulingMode, uLCBySchedulingMode);
return uLCBySchedulingMode;
}
// User limits cache does not need invalidating
Map<SchedulingMode, ConcurrentMap<String, CachedUserLimit>>
uLCByPartition = userLimitsCache.get(partition);
if (uLCByPartition == null) {
uLCByPartition = new HashMap<>();
userLimitsCache.put(partition, uLCByPartition);
}
ConcurrentMap<String, CachedUserLimit> uLCBySchedulingMode =
uLCByPartition.get(schedulingMode);
if (uLCBySchedulingMode == null) {
uLCBySchedulingMode = new ConcurrentHashMap<>();
uLCByPartition.put(schedulingMode, uLCBySchedulingMode);
}
return uLCBySchedulingMode;
}
}
@Override
public CSAssignment assignContainers(Resource clusterResource,
CandidateNodeSet<FiCaSchedulerNode> candidates,
ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) {
updateCurrentResourceLimits(currentResourceLimits, clusterResource);
FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
if (LOG.isDebugEnabled()) {
LOG.debug("assignContainers: partition=" + candidates.getPartition()
+ " #applications=" + orderingPolicy.getNumSchedulableEntities());
}
setPreemptionAllowed(currentResourceLimits, candidates.getPartition());
// Check for reserved resources, try to allocate reserved container first.
CSAssignment assignment = allocateFromReservedContainer(clusterResource,
candidates, currentResourceLimits, schedulingMode);
if (null != assignment) {
return assignment;
}
// if our queue cannot access this node, just return
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
&& !accessibleToPartition(candidates.getPartition())) {
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParent().getQueuePath(), getQueuePath(), ActivityState.REJECTED,
ActivityDiagnosticConstant.QUEUE_NOT_ABLE_TO_ACCESS_PARTITION);
return CSAssignment.NULL_ASSIGNMENT;
}
// Check if this queue need more resource, simply skip allocation if this
// queue doesn't need more resources.
if (!hasPendingResourceRequest(candidates.getPartition(), clusterResource,
schedulingMode)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip this queue=" + getQueuePath()
+ ", because it doesn't need more resource, schedulingMode="
+ schedulingMode.name() + " node-partition=" + candidates
.getPartition());
}
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParent().getQueuePath(), getQueuePath(), ActivityState.SKIPPED,
ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE);
return CSAssignment.NULL_ASSIGNMENT;
}
ConcurrentMap<String, CachedUserLimit> userLimits =
this.getUserLimitCache(candidates.getPartition(), schedulingMode);
boolean needAssignToQueueCheck = true;
IteratorSelector sel = new IteratorSelector();
sel.setPartition(candidates.getPartition());
for (Iterator<FiCaSchedulerApp> assignmentIterator =
orderingPolicy.getAssignmentIterator(sel);
assignmentIterator.hasNext(); ) {
FiCaSchedulerApp application = assignmentIterator.next();
ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
node, SystemClock.getInstance().getTime(), application);
// Check queue max-capacity limit
Resource appReserved = application.getCurrentReservation();
if (needAssignToQueueCheck) {
if (!super.canAssignToThisQueue(clusterResource,
candidates.getPartition(), currentResourceLimits, appReserved,
schedulingMode)) {
ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
activitiesManager, node, application, application.getPriority(),
ActivityDiagnosticConstant.QUEUE_HIT_MAX_CAPACITY_LIMIT);
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParent().getQueuePath(), getQueuePath(),
ActivityState.REJECTED,
ActivityDiagnosticConstant.QUEUE_HIT_MAX_CAPACITY_LIMIT);
return CSAssignment.NULL_ASSIGNMENT;
}
// If there was no reservation and canAssignToThisQueue returned
// true, there is no reason to check further.
if (!this.reservationsContinueLooking
|| appReserved.equals(Resources.none())) {
needAssignToQueueCheck = false;
}
}
CachedUserLimit cul = userLimits.get(application.getUser());
Resource cachedUserLimit = null;
if (cul != null) {
cachedUserLimit = cul.userLimit;
}
Resource userLimit = computeUserLimitAndSetHeadroom(application,
clusterResource, candidates.getPartition(), schedulingMode,
cachedUserLimit);
if (cul == null) {
cul = new CachedUserLimit(userLimit);
CachedUserLimit retVal =
userLimits.putIfAbsent(application.getUser(), cul);
if (retVal != null) {
// another thread updated the user limit cache before us
cul = retVal;
userLimit = cul.userLimit;
}
}
// Check user limit
boolean userAssignable = true;
if (!cul.canAssign && Resources.fitsIn(appReserved, cul.reservation)) {
userAssignable = false;
} else {
userAssignable = canAssignToUser(clusterResource, application.getUser(),
userLimit, application, candidates.getPartition(),
currentResourceLimits);
if (!userAssignable && Resources.fitsIn(cul.reservation, appReserved)) {
cul.canAssign = false;
cul.reservation = appReserved;
}
}
if (!userAssignable) {
application.updateAMContainerDiagnostics(AMState.ACTIVATED,
"User capacity has reached its maximum limit.");
ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
activitiesManager, node, application, application.getPriority(),
ActivityDiagnosticConstant.QUEUE_HIT_USER_MAX_CAPACITY_LIMIT);
continue;
}
// Try to schedule
assignment = application.assignContainers(clusterResource,
candidates, currentResourceLimits, schedulingMode, null);
if (LOG.isDebugEnabled()) {
LOG.debug("post-assignContainers for application " + application
.getApplicationId());
application.showRequests();
}
// Did we schedule or reserve a container?
Resource assigned = assignment.getResource();
if (Resources.greaterThan(resourceCalculator, clusterResource, assigned,
Resources.none())) {
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParent().getQueuePath(), getQueuePath(),
ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
return assignment;
} else if (assignment.getSkippedType()
== CSAssignment.SkippedType.OTHER) {
ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
activitiesManager, application.getApplicationId(),
ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
application.updateNodeInfoForAMDiagnostics(node);
} else if (assignment.getSkippedType()
== CSAssignment.SkippedType.QUEUE_LIMIT) {
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParent().getQueuePath(), getQueuePath(), ActivityState.REJECTED,
() -> ActivityDiagnosticConstant.QUEUE_DO_NOT_HAVE_ENOUGH_HEADROOM
+ " from " + application.getApplicationId());
return assignment;
} else{
// If we don't allocate anything, and it is not skipped by application,
// we will return to respect FIFO of applications
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParent().getQueuePath(), getQueuePath(), ActivityState.SKIPPED,
ActivityDiagnosticConstant.QUEUE_SKIPPED_TO_RESPECT_FIFO);
ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
activitiesManager, application.getApplicationId(),
ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
return CSAssignment.NULL_ASSIGNMENT;
}
}
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParent().getQueuePath(), getQueuePath(), ActivityState.SKIPPED,
ActivityDiagnosticConstant.EMPTY);
return CSAssignment.NULL_ASSIGNMENT;
}
@Override
public boolean accept(Resource cluster,
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) {
ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> allocation =
request.getFirstAllocatedOrReservedContainer();
SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> schedulerContainer =
allocation.getAllocatedOrReservedContainer();
// Do not check limits when allocation from a reserved container
if (allocation.getAllocateFromReservedContainer() == null) {
readLock.lock();
try {
FiCaSchedulerApp app =
schedulerContainer.getSchedulerApplicationAttempt();
String username = app.getUser();
String p = schedulerContainer.getNodePartition();
// check user-limit
Resource userLimit = computeUserLimitAndSetHeadroom(app, cluster, p,
allocation.getSchedulingMode(), null);
// Deduct resources that we can release
User user = getUser(username);
if (user == null) {
LOG.debug("User {} has been removed!", username);
return false;
}
Resource usedResource = Resources.clone(user.getUsed(p));
Resources.subtractFrom(usedResource,
request.getTotalReleasedResource());
if (Resources.greaterThan(resourceCalculator, cluster, usedResource,
userLimit)) {
LOG.debug("Used resource={} exceeded user-limit={}",
usedResource, userLimit);
return false;
}
} finally {
readLock.unlock();
}
}
return super.accept(cluster, request);
}
private void internalReleaseContainer(Resource clusterResource,
SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> schedulerContainer) {
RMContainer rmContainer = schedulerContainer.getRmContainer();
LeafQueue targetLeafQueue =
schedulerContainer.getSchedulerApplicationAttempt().getCSLeafQueue();
if (targetLeafQueue == this) {
// When trying to preempt containers from the same queue
if (rmContainer.getState() == RMContainerState.RESERVED) {
// For other reserved containers
// This is a reservation exchange, complete previous reserved container
completedContainer(clusterResource,
schedulerContainer.getSchedulerApplicationAttempt(),
schedulerContainer.getSchedulerNode(), rmContainer, SchedulerUtils
.createAbnormalContainerStatus(rmContainer.getContainerId(),
SchedulerUtils.UNRESERVED_CONTAINER),
RMContainerEventType.RELEASED, null, false);
}
} else{
// When trying to preempt containers from different queue -- this
// is for lazy preemption feature (kill preemption candidate in scheduling
// cycle).
targetLeafQueue.completedContainer(clusterResource,
schedulerContainer.getSchedulerApplicationAttempt(),
schedulerContainer.getSchedulerNode(),
schedulerContainer.getRmContainer(), SchedulerUtils
.createPreemptedContainerStatus(rmContainer.getContainerId(),
SchedulerUtils.PREEMPTED_CONTAINER),
RMContainerEventType.KILL, null, false);
}
}
private void releaseContainers(Resource clusterResource,
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) {
for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> c : request
.getContainersToRelease()) {
internalReleaseContainer(clusterResource, c);
}
// Handle container reservation looking, or lazy preemption case:
if (null != request.getContainersToAllocate() && !request
.getContainersToAllocate().isEmpty()) {
for (ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> context : request
.getContainersToAllocate()) {
if (null != context.getToRelease()) {
for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> c : context
.getToRelease()) {
internalReleaseContainer(clusterResource, c);
}
}
}
}
}
public void apply(Resource cluster,
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) {
// Do we need to call parent queue's apply?
boolean applyToParentQueue = false;
releaseContainers(cluster, request);
writeLock.lock();
try {
if (request.anythingAllocatedOrReserved()) {
ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode>
allocation = request.getFirstAllocatedOrReservedContainer();
SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
schedulerContainer = allocation.getAllocatedOrReservedContainer();
// Do not modify queue when allocation from reserved container
if (allocation.getAllocateFromReservedContainer() == null) {
// Only invoke apply() of ParentQueue when new allocation /
// reservation happen.
applyToParentQueue = true;
// Book-keeping
// Note: Update headroom to account for current allocation too...
allocateResource(cluster,
schedulerContainer.getSchedulerApplicationAttempt(),
allocation.getAllocatedOrReservedResource(),
schedulerContainer.getNodePartition(),
schedulerContainer.getRmContainer());
orderingPolicy.containerAllocated(
schedulerContainer.getSchedulerApplicationAttempt(),
schedulerContainer.getRmContainer());
}
// Update reserved resource
if (Resources.greaterThan(resourceCalculator, cluster,
request.getTotalReservedResource(), Resources.none())) {
incReservedResource(schedulerContainer.getNodePartition(),
request.getTotalReservedResource());
}
}
} finally {
writeLock.unlock();
}
if (parent != null && applyToParentQueue) {
parent.apply(cluster, request);
}
}
protected Resource getHeadroom(User user, Resource queueCurrentLimit,
Resource clusterResource, FiCaSchedulerApp application) {
return getHeadroom(user, queueCurrentLimit, clusterResource, application,
RMNodeLabelsManager.NO_LABEL);
}
protected Resource getHeadroom(User user, Resource queueCurrentLimit,
Resource clusterResource, FiCaSchedulerApp application,
String partition) {
return getHeadroom(user, queueCurrentLimit, clusterResource,
getResourceLimitForActiveUsers(application.getUser(), clusterResource,
partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
partition);
}
private Resource getHeadroom(User user,
Resource currentPartitionResourceLimit, Resource clusterResource,
Resource userLimitResource, String partition) {
/**
* Headroom is:
* min(
* min(userLimit, queueMaxCap) - userConsumed,
* queueMaxCap - queueUsedResources
* )
*
* ( which can be expressed as,
* min (userLimit - userConsumed, queuMaxCap - userConsumed,
* queueMaxCap - queueUsedResources)
* )
*
* given that queueUsedResources >= userConsumed, this simplifies to
*
* >> min (userlimit - userConsumed, queueMaxCap - queueUsedResources) <<
*
* sum of queue max capacities of multiple queue's will be greater than the
* actual capacity of a given partition, hence we need to ensure that the
* headroom is not greater than the available resource for a given partition
*
* headroom = min (unused resourcelimit of a label, calculated headroom )
*/
currentPartitionResourceLimit =
partition.equals(RMNodeLabelsManager.NO_LABEL)
? currentPartitionResourceLimit
: getQueueMaxResource(partition);
Resource headroom = Resources.componentwiseMin(
Resources.subtractNonNegative(userLimitResource,
user.getUsed(partition)),
Resources.subtractNonNegative(currentPartitionResourceLimit,
queueUsage.getUsed(partition)));
// Normalize it before return
headroom =
Resources.roundDown(resourceCalculator, headroom, minimumAllocation);
//headroom = min (unused resourcelimit of a label, calculated headroom )
Resource clusterPartitionResource =
labelManager.getResourceByLabel(partition, clusterResource);
Resource clusterFreePartitionResource =
Resources.subtract(clusterPartitionResource,
csContext.getClusterResourceUsage().getUsed(partition));
headroom = Resources.min(resourceCalculator, clusterPartitionResource,
clusterFreePartitionResource, headroom);
return headroom;
}
private void setQueueResourceLimitsInfo(
Resource clusterResource) {
synchronized (queueResourceLimitsInfo) {
queueResourceLimitsInfo.setQueueCurrentLimit(cachedResourceLimitsForHeadroom
.getLimit());
queueResourceLimitsInfo.setClusterResource(clusterResource);
}
}
// It doesn't necessarily to hold application's lock here.
@Lock({LeafQueue.class})
Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application,
Resource clusterResource, String nodePartition,
SchedulingMode schedulingMode, Resource userLimit) {
String user = application.getUser();
User queueUser = getUser(user);
if (queueUser == null) {
LOG.debug("User {} has been removed!", user);
return Resources.none();
}
// Compute user limit respect requested labels,
// TODO, need consider headroom respect labels also
if (userLimit == null) {
userLimit = getResourceLimitForActiveUsers(application.getUser(),
clusterResource, nodePartition, schedulingMode);
}
setQueueResourceLimitsInfo(clusterResource);
Resource headroom =
metrics.getUserMetrics(user) == null ? Resources.none() :
getHeadroom(queueUser, cachedResourceLimitsForHeadroom.getLimit(),
clusterResource, userLimit, nodePartition);
if (LOG.isDebugEnabled()) {
LOG.debug("Headroom calculation for user " + user + ": " + " userLimit="
+ userLimit + " queueMaxAvailRes="
+ cachedResourceLimitsForHeadroom.getLimit() + " consumed="
+ queueUser.getUsed() + " partition="
+ nodePartition);
}
CapacityHeadroomProvider headroomProvider = new CapacityHeadroomProvider(
queueUser, this, application, queueResourceLimitsInfo);
application.setHeadroomProvider(headroomProvider);
metrics.setAvailableResourcesToUser(nodePartition, user, headroom);
return userLimit;
}
@Lock(NoLock.class)
public int getNodeLocalityDelay() {
return nodeLocalityDelay;
}
@Lock(NoLock.class)
public int getRackLocalityAdditionalDelay() {
return rackLocalityAdditionalDelay;
}
@Lock(NoLock.class)
public boolean getRackLocalityFullReset() {
return rackLocalityFullReset;
}
/**
*
* @param userName
* Name of user who has submitted one/more app to given queue.
* @param clusterResource
* total cluster resource
* @param nodePartition
* partition name
* @param schedulingMode
* scheduling mode
* RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY
* @return Computed User Limit
*/
public Resource getResourceLimitForActiveUsers(String userName,
Resource clusterResource, String nodePartition,
SchedulingMode schedulingMode) {
return usersManager.getComputedResourceLimitForActiveUsers(userName,
clusterResource, nodePartition, schedulingMode);
}
/**
*
* @param userName
* Name of user who has submitted one/more app to given queue.
* @param clusterResource
* total cluster resource
* @param nodePartition
* partition name
* @param schedulingMode
* scheduling mode
* RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY
* @return Computed User Limit
*/
public Resource getResourceLimitForAllUsers(String userName,
Resource clusterResource, String nodePartition,
SchedulingMode schedulingMode) {
return usersManager.getComputedResourceLimitForAllUsers(userName,
clusterResource, nodePartition, schedulingMode);
}
@Private
protected boolean canAssignToUser(Resource clusterResource,
String userName, Resource limit, FiCaSchedulerApp application,
String nodePartition, ResourceLimits currentResourceLimits) {
readLock.lock();
try {
User user = getUser(userName);
if (user == null) {
LOG.debug("User {} has been removed!", userName);
return false;
}
currentResourceLimits.setAmountNeededUnreserve(Resources.none());
// Note: We aren't considering the current request since there is a fixed
// overhead of the AM, but it's a > check, not a >= check, so...
if (Resources.greaterThan(resourceCalculator, clusterResource,
user.getUsed(nodePartition), limit)) {
// if enabled, check to see if could we potentially use this node instead
// of a reserved node if the application has reserved containers
if (this.reservationsContinueLooking) {
if (Resources.lessThanOrEqual(resourceCalculator, clusterResource,
Resources.subtract(user.getUsed(),
application.getCurrentReservation()), limit)) {
if (LOG.isDebugEnabled()) {
LOG.debug("User " + userName + " in queue " + getQueuePath()
+ " will exceed limit based on reservations - "
+ " consumed: " + user.getUsed() + " reserved: " + application
.getCurrentReservation() + " limit: " + limit);
}
Resource amountNeededToUnreserve = Resources.subtract(
user.getUsed(nodePartition), limit);
// we can only acquire a new container if we unreserve first to
// respect user-limit
currentResourceLimits.setAmountNeededUnreserve(
amountNeededToUnreserve);
return true;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("User " + userName + " in queue " + getQueuePath()
+ " will exceed limit - " + " consumed: " + user
.getUsed(nodePartition) + " limit: " + limit);
}
return false;
}
return true;
} finally {
readLock.unlock();
}
}
private void updateSchedulerHealthForCompletedContainer(
RMContainer rmContainer, ContainerStatus containerStatus) {
// Update SchedulerHealth for released / preempted container
SchedulerHealth schedulerHealth = csContext.getSchedulerHealth();
if (null == schedulerHealth) {
// Only do update if we have schedulerHealth
return;
}
if (containerStatus.getExitStatus() == ContainerExitStatus.PREEMPTED) {
schedulerHealth.updatePreemption(Time.now(), rmContainer.getAllocatedNode(),
rmContainer.getContainerId(), getQueuePath());
schedulerHealth.updateSchedulerPreemptionCounts(1);
} else {
schedulerHealth.updateRelease(csContext.getLastNodeUpdateTime(),
rmContainer.getAllocatedNode(), rmContainer.getContainerId(),
getQueuePath());
}
}
/**
* Recalculate QueueUsage Ratio.
*
* @param clusterResource
* Total Cluster Resource
* @param nodePartition
* Partition
*/
public void recalculateQueueUsageRatio(Resource clusterResource,
String nodePartition) {
writeLock.lock();
try {
ResourceUsage queueResourceUsage = getQueueResourceUsage();
if (nodePartition == null) {
for (String partition : Sets.union(
getQueueCapacities().getNodePartitionsSet(),
queueResourceUsage.getNodePartitionsSet())) {
usersManager.updateUsageRatio(partition, clusterResource);
}
} else {
usersManager.updateUsageRatio(nodePartition, clusterResource);
}
} finally {
writeLock.unlock();
}
}
@Override
public void completedContainer(Resource clusterResource,
FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event, CSQueue childQueue,
boolean sortQueues) {
// Update SchedulerHealth for released / preempted container
updateSchedulerHealthForCompletedContainer(rmContainer, containerStatus);
if (application != null) {
boolean removed = false;
// Careful! Locking order is important!
writeLock.lock();
try {
Container container = rmContainer.getContainer();
// Inform the application & the node
// Note: It's safe to assume that all state changes to RMContainer
// happen under scheduler's lock...
// So, this is, in effect, a transaction across application & node
if (rmContainer.getState() == RMContainerState.RESERVED) {
removed = application.unreserve(rmContainer.getReservedSchedulerKey(),
node, rmContainer);
} else{
removed = application.containerCompleted(rmContainer, containerStatus,
event, node.getPartition());
node.releaseContainer(rmContainer.getContainerId(), false);
}
// Book-keeping
if (removed) {
// Inform the ordering policy
orderingPolicy.containerReleased(application, rmContainer);
releaseResource(clusterResource, application, container.getResource(),
node.getPartition(), rmContainer);
}
} finally {
writeLock.unlock();
}
if (removed) {
// Inform the parent queue _outside_ of the leaf-queue lock
getParent().completedContainer(clusterResource, application, node,
rmContainer, null, event, this, sortQueues);
}
}
// Notify PreemptionManager
csContext.getPreemptionManager().removeKillableContainer(
new KillableContainer(
rmContainer,
node.getPartition(),
getQueuePath()));
// Update preemption metrics if exit status is PREEMPTED
if (containerStatus != null
&& ContainerExitStatus.PREEMPTED == containerStatus.getExitStatus()) {
updateQueuePreemptionMetrics(rmContainer);
}
}
void allocateResource(Resource clusterResource,
SchedulerApplicationAttempt application, Resource resource,
String nodePartition, RMContainer rmContainer) {
writeLock.lock();
try {
super.allocateResource(clusterResource, resource, nodePartition);
// handle ignore exclusivity container
if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
RMNodeLabelsManager.NO_LABEL) && !nodePartition.equals(
RMNodeLabelsManager.NO_LABEL)) {
TreeSet<RMContainer> rmContainers = null;
if (null == (rmContainers = ignorePartitionExclusivityRMContainers.get(
nodePartition))) {
rmContainers = new TreeSet<>();
ignorePartitionExclusivityRMContainers.put(nodePartition,
rmContainers);
}
rmContainers.add(rmContainer);
}
// Update user metrics
String userName = application.getUser();
// Increment user's resource usage.
User user = usersManager.updateUserResourceUsage(userName, resource,
nodePartition, true);
Resource partitionHeadroom = Resources.createResource(0, 0);
if (metrics.getUserMetrics(userName) != null) {
partitionHeadroom = getHeadroom(user,
cachedResourceLimitsForHeadroom.getLimit(), clusterResource,
getResourceLimitForActiveUsers(userName, clusterResource,
nodePartition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
nodePartition);
}
metrics.setAvailableResourcesToUser(nodePartition, userName,
partitionHeadroom);
if (LOG.isDebugEnabled()) {
LOG.debug(getQueuePath() + " user=" + userName + " used="
+ queueUsage.getUsed(nodePartition) + " numContainers="
+ numContainers + " headroom = " + application.getHeadroom()
+ " user-resources=" + user.getUsed());
}
} finally {
writeLock.unlock();
}
}
void releaseResource(Resource clusterResource,
FiCaSchedulerApp application, Resource resource, String nodePartition,
RMContainer rmContainer) {
writeLock.lock();
try {
super.releaseResource(clusterResource, resource, nodePartition);
// handle ignore exclusivity container
if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
RMNodeLabelsManager.NO_LABEL) && !nodePartition.equals(
RMNodeLabelsManager.NO_LABEL)) {
if (ignorePartitionExclusivityRMContainers.containsKey(nodePartition)) {
Set<RMContainer> rmContainers =
ignorePartitionExclusivityRMContainers.get(nodePartition);
rmContainers.remove(rmContainer);
if (rmContainers.isEmpty()) {
ignorePartitionExclusivityRMContainers.remove(nodePartition);
}
}
}
// Update user metrics
String userName = application.getUser();
User user = usersManager.updateUserResourceUsage(userName, resource,
nodePartition, false);
Resource partitionHeadroom = Resources.createResource(0, 0);
if (metrics.getUserMetrics(userName) != null) {
partitionHeadroom = getHeadroom(user,
cachedResourceLimitsForHeadroom.getLimit(), clusterResource,
getResourceLimitForActiveUsers(userName, clusterResource,
nodePartition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
nodePartition);
}
metrics.setAvailableResourcesToUser(nodePartition, userName,
partitionHeadroom);
if (LOG.isDebugEnabled()) {
LOG.debug(
getQueuePath() + " used=" + queueUsage.getUsed() + " numContainers="
+ numContainers + " user=" + userName + " user-resources="
+ user.getUsed());
}
} finally {
writeLock.unlock();
}
}
private void updateCurrentResourceLimits(
ResourceLimits currentResourceLimits, Resource clusterResource) {
// TODO: need consider non-empty node labels when resource limits supports
// node labels
// Even if ParentQueue will set limits respect child's max queue capacity,
// but when allocating reserved container, CapacityScheduler doesn't do
// this. So need cap limits by queue's max capacity here.
this.cachedResourceLimitsForHeadroom =
new ResourceLimits(currentResourceLimits.getLimit());
Resource queueMaxResource = getEffectiveMaxCapacityDown(
RMNodeLabelsManager.NO_LABEL, minimumAllocation);
this.cachedResourceLimitsForHeadroom.setLimit(Resources.min(
resourceCalculator, clusterResource, queueMaxResource,
currentResourceLimits.getLimit()));
}
private void updateAbsoluteCapacitiesAndRelatedFields() {
updateAbsoluteCapacities();
CapacitySchedulerConfiguration schedulerConf = csContext.getConfiguration();
// If maxApplications not set, use the system total max app, apply newly
// calculated abs capacity of the queue.
if (maxApplications <= 0) {
int maxSystemApps = schedulerConf.
getMaximumSystemApplications();
maxApplications =
(int) (maxSystemApps * queueCapacities.getAbsoluteCapacity());
}
maxApplicationsPerUser =
Math.min(maxApplications,
(int) (maxApplications * (usersManager.getUserLimit() / 100.0f)
* usersManager.getUserLimitFactor()));
if (getUserLimitFactor() == -1) {
maxApplicationsPerUser = maxApplications;
}
}
@Override
public void updateClusterResource(Resource clusterResource,
ResourceLimits currentResourceLimits) {
writeLock.lock();
try {
lastClusterResource = clusterResource;
updateAbsoluteCapacitiesAndRelatedFields();
super.updateEffectiveResources(clusterResource);
updateCurrentResourceLimits(currentResourceLimits, clusterResource);
// Update headroom info based on new cluster resource value
// absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity
// during allocation
setQueueResourceLimitsInfo(clusterResource);
// Update user consumedRatios
recalculateQueueUsageRatio(clusterResource, null);
// Update metrics
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
this, labelManager, null);
// Update configured capacity/max-capacity for default partition only
CSQueueUtils.updateConfiguredCapacityMetrics(resourceCalculator,
labelManager.getResourceByLabel(null, clusterResource),
RMNodeLabelsManager.NO_LABEL, this);
// queue metrics are updated, more resource may be available
// activate the pending applications if possible
activateApplications();
// In case of any resource change, invalidate recalculateULCount to clear
// the computed user-limit.
usersManager.userLimitNeedsRecompute();
// Update application properties
for (FiCaSchedulerApp application : orderingPolicy
.getSchedulableEntities()) {
computeUserLimitAndSetHeadroom(application, clusterResource,
RMNodeLabelsManager.NO_LABEL,
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null);
}
} finally {
writeLock.unlock();
}
}
@Override
public void incUsedResource(String nodeLabel, Resource resourceToInc,
SchedulerApplicationAttempt application) {
usersManager.updateUserResourceUsage(application.getUser(), resourceToInc,
nodeLabel, true);
super.incUsedResource(nodeLabel, resourceToInc, application);
}
@Override
public void decUsedResource(String nodeLabel, Resource resourceToDec,
SchedulerApplicationAttempt application) {
usersManager.updateUserResourceUsage(application.getUser(), resourceToDec,
nodeLabel, false);
super.decUsedResource(nodeLabel, resourceToDec, application);
}
public void incAMUsedResource(String nodeLabel, Resource resourceToInc,
SchedulerApplicationAttempt application) {
getUser(application.getUser()).getResourceUsage().incAMUsed(nodeLabel,
resourceToInc);
// ResourceUsage has its own lock, no addition lock needs here.
queueUsage.incAMUsed(nodeLabel, resourceToInc);
}
public void decAMUsedResource(String nodeLabel, Resource resourceToDec,
SchedulerApplicationAttempt application) {
getUser(application.getUser()).getResourceUsage().decAMUsed(nodeLabel,
resourceToDec);
// ResourceUsage has its own lock, no addition lock needs here.
queueUsage.decAMUsed(nodeLabel, resourceToDec);
}
@Override
public void recoverContainer(Resource clusterResource,
SchedulerApplicationAttempt attempt, RMContainer rmContainer) {
if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
return;
}
if (rmContainer.getExecutionType() != ExecutionType.GUARANTEED) {
return;
}
// Careful! Locking order is important!
writeLock.lock();
try {
FiCaSchedulerNode node = scheduler.getNode(
rmContainer.getContainer().getNodeId());
allocateResource(clusterResource, attempt,
rmContainer.getContainer().getResource(), node.getPartition(),
rmContainer);
} finally {
writeLock.unlock();
}
getParent().recoverContainer(clusterResource, attempt, rmContainer);
}
/**
* Obtain (read-only) collection of pending applications.
*/
public Collection<FiCaSchedulerApp> getPendingApplications() {
return Collections.unmodifiableCollection(pendingOrderingPolicy
.getSchedulableEntities());
}
/**
* Obtain (read-only) collection of active applications.
*/
public Collection<FiCaSchedulerApp> getApplications() {
return Collections.unmodifiableCollection(orderingPolicy
.getSchedulableEntities());
}
/**
* Obtain (read-only) collection of all applications.
*/
public Collection<FiCaSchedulerApp> getAllApplications() {
Collection<FiCaSchedulerApp> apps = new HashSet<FiCaSchedulerApp>(
pendingOrderingPolicy.getSchedulableEntities());
apps.addAll(orderingPolicy.getSchedulableEntities());
return Collections.unmodifiableCollection(apps);
}
/**
* Get total pending resource considering user limit for the leaf queue. This
* will be used for calculating pending resources in the preemption monitor.
*
* Consider the headroom for each user in the queue.
* Total pending for the queue =
* sum(for each user(min((user's headroom), sum(user's pending requests))))
* NOTE:
* @param clusterResources clusterResource
* @param partition node partition
* @param deductReservedFromPending When a container is reserved in CS,
* pending resource will not be deducted.
* This could lead to double accounting when
* doing preemption:
* In normal cases, we should deduct reserved
* resource from pending to avoid
* excessive preemption.
* @return Total pending resource considering user limit
*/
public Resource getTotalPendingResourcesConsideringUserLimit(
Resource clusterResources, String partition,
boolean deductReservedFromPending) {
readLock.lock();
try {
Map<String, Resource> userNameToHeadroom =
new HashMap<>();
Resource totalPendingConsideringUserLimit = Resource.newInstance(0, 0);
for (FiCaSchedulerApp app : getApplications()) {
String userName = app.getUser();
if (!userNameToHeadroom.containsKey(userName)) {
User user = getUser(userName);
Resource headroom = Resources.subtract(
getResourceLimitForActiveUsers(app.getUser(), clusterResources,
partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
user.getUsed(partition));
// Make sure headroom is not negative.
headroom = Resources.componentwiseMax(headroom, Resources.none());
userNameToHeadroom.put(userName, headroom);
}
// Check if we need to deduct reserved from pending
Resource pending = app.getAppAttemptResourceUsage().getPending(
partition);
if (deductReservedFromPending) {
pending = Resources.subtract(pending,
app.getAppAttemptResourceUsage().getReserved(partition));
}
pending = Resources.componentwiseMax(pending, Resources.none());
Resource minpendingConsideringUserLimit = Resources.componentwiseMin(
userNameToHeadroom.get(userName), pending);
Resources.addTo(totalPendingConsideringUserLimit,
minpendingConsideringUserLimit);
Resources.subtractFrom(userNameToHeadroom.get(userName),
minpendingConsideringUserLimit);
}
return totalPendingConsideringUserLimit;
} finally {
readLock.unlock();
}
}
@Override
public void collectSchedulerApplications(
Collection<ApplicationAttemptId> apps) {
readLock.lock();
try {
for (FiCaSchedulerApp pendingApp : pendingOrderingPolicy
.getSchedulableEntities()) {
apps.add(pendingApp.getApplicationAttemptId());
}
for (FiCaSchedulerApp app : orderingPolicy.getSchedulableEntities()) {
apps.add(app.getApplicationAttemptId());
}
} finally {
readLock.unlock();
}
}
@Override
public void attachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer rmContainer) {
if (application != null && rmContainer != null
&& rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
allocateResource(clusterResource, application, rmContainer.getContainer()
.getResource(), node.getPartition(), rmContainer);
LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
+ " containerState="+ rmContainer.getState()
+ " resource=" + rmContainer.getContainer().getResource()
+ " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity()
+ " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
+ queueUsage.getUsed() + " cluster=" + clusterResource);
// Inform the parent queue
getParent().attachContainer(clusterResource, application, rmContainer);
}
}
@Override
public void detachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer rmContainer) {
if (application != null && rmContainer != null
&& rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
releaseResource(clusterResource, application, rmContainer.getContainer()
.getResource(), node.getPartition(), rmContainer);
LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
+ " containerState="+ rmContainer.getState()
+ " resource=" + rmContainer.getContainer().getResource()
+ " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity()
+ " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
+ queueUsage.getUsed() + " cluster=" + clusterResource);
// Inform the parent queue
getParent().detachContainer(clusterResource, application, rmContainer);
}
}
/**
* @return all ignored partition exclusivity RMContainers in the LeafQueue,
* this will be used by preemption policy.
*/
public Map<String, TreeSet<RMContainer>>
getIgnoreExclusivityRMContainers() {
Map<String, TreeSet<RMContainer>> clonedMap = new HashMap<>();
readLock.lock();
try {
for (Map.Entry<String, TreeSet<RMContainer>> entry : ignorePartitionExclusivityRMContainers
.entrySet()) {
clonedMap.put(entry.getKey(), new TreeSet<>(entry.getValue()));
}
return clonedMap;
} finally {
readLock.unlock();
}
}
public void setCapacity(float capacity) {
queueCapacities.setCapacity(capacity);
}
public void setCapacity(String nodeLabel, float capacity) {
queueCapacities.setCapacity(nodeLabel, capacity);
}
public void setAbsoluteCapacity(float absoluteCapacity) {
queueCapacities.setAbsoluteCapacity(absoluteCapacity);
}
public void setAbsoluteCapacity(String nodeLabel, float absoluteCapacity) {
queueCapacities.setAbsoluteCapacity(nodeLabel, absoluteCapacity);
}
public void setMaxApplicationsPerUser(int maxApplicationsPerUser) {
this.maxApplicationsPerUser = maxApplicationsPerUser;
}
public void setMaxApplications(int maxApplications) {
this.maxApplications = maxApplications;
}
public void setMaxAMResourcePerQueuePercent(
float maxAMResourcePerQueuePercent) {
this.maxAMResourcePerQueuePercent = maxAMResourcePerQueuePercent;
}
public OrderingPolicy<FiCaSchedulerApp>
getOrderingPolicy() {
return orderingPolicy;
}
void setOrderingPolicy(
OrderingPolicy<FiCaSchedulerApp> orderingPolicy) {
writeLock.lock();
try {
if (null != this.orderingPolicy) {
orderingPolicy.addAllSchedulableEntities(
this.orderingPolicy.getSchedulableEntities());
}
this.orderingPolicy = orderingPolicy;
} finally {
writeLock.unlock();
}
}
@Override
public Priority getDefaultApplicationPriority() {
return defaultAppPriorityPerQueue;
}
public void updateApplicationPriority(SchedulerApplication<FiCaSchedulerApp> app,
Priority newAppPriority) {
writeLock.lock();
try {
FiCaSchedulerApp attempt = app.getCurrentAppAttempt();
boolean isActive = orderingPolicy.removeSchedulableEntity(attempt);
if (!isActive) {
pendingOrderingPolicy.removeSchedulableEntity(attempt);
}
// Update new priority in SchedulerApplication
attempt.setPriority(newAppPriority);
if (isActive) {
orderingPolicy.addSchedulableEntity(attempt);
} else {
pendingOrderingPolicy.addSchedulableEntity(attempt);
}
} finally {
writeLock.unlock();
}
}
public OrderingPolicy<FiCaSchedulerApp>
getPendingAppsOrderingPolicy() {
return pendingOrderingPolicy;
}
/*
* Holds shared values used by all applications in
* the queue to calculate headroom on demand
*/
static class QueueResourceLimitsInfo {
private Resource queueCurrentLimit;
private Resource clusterResource;
public void setQueueCurrentLimit(Resource currentLimit) {
this.queueCurrentLimit = currentLimit;
}
public Resource getQueueCurrentLimit() {
return queueCurrentLimit;
}
public void setClusterResource(Resource clusterResource) {
this.clusterResource = clusterResource;
}
public Resource getClusterResource() {
return clusterResource;
}
}
@Override
public void stopQueue() {
writeLock.lock();
try {
if (getNumApplications() > 0) {
updateQueueState(QueueState.DRAINING);
} else {
updateQueueState(QueueState.STOPPED);
}
} finally {
writeLock.unlock();
}
}
/**
* Get all valid users in this queue.
* @return user list
*/
public Set<String> getAllUsers() {
return this.getUsersManager().getUsers().keySet();
}
static class CachedUserLimit {
final Resource userLimit;
volatile boolean canAssign = true;
volatile Resource reservation = Resources.none();
CachedUserLimit(Resource userLimit) {
this.userLimit = userLimit;
}
}
private void updateQueuePreemptionMetrics(RMContainer rmc) {
final long usedMillis = rmc.getFinishTime() - rmc.getCreationTime();
final long usedSeconds = usedMillis / DateUtils.MILLIS_PER_SECOND;
Resource containerResource = rmc.getAllocatedResource();
metrics.preemptContainer();
long mbSeconds = (containerResource.getMemorySize() * usedMillis)
/ DateUtils.MILLIS_PER_SECOND;
long vcSeconds = (containerResource.getVirtualCores() * usedMillis)
/ DateUtils.MILLIS_PER_SECOND;
metrics.updatePreemptedMemoryMBSeconds(mbSeconds);
metrics.updatePreemptedVcoreSeconds(vcSeconds);
metrics.updatePreemptedResources(containerResource);
metrics.updatePreemptedSecondsForCustomResources(containerResource,
usedSeconds);
metrics.updatePreemptedForCustomResources(containerResource);
}
@Override
int getNumRunnableApps() {
readLock.lock();
try {
return runnableApps.size();
} finally {
readLock.unlock();
}
}
int getNumNonRunnableApps() {
readLock.lock();
try {
return nonRunnableApps.size();
} finally {
readLock.unlock();
}
}
boolean removeNonRunnableApp(FiCaSchedulerApp app) {
writeLock.lock();
try {
return nonRunnableApps.remove(app);
} finally {
writeLock.unlock();
}
}
List<FiCaSchedulerApp> getCopyOfNonRunnableAppSchedulables() {
List<FiCaSchedulerApp> appsToReturn = new ArrayList<>();
readLock.lock();
try {
appsToReturn.addAll(nonRunnableApps);
} finally {
readLock.unlock();
}
return appsToReturn;
}
}