| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; |
| |
| import java.io.IOException; |
| import java.util.*; |
| import java.util.Map.Entry; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.commons.lang3.time.DateUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.classification.InterfaceStability.Unstable; |
| import org.apache.hadoop.security.AccessControlException; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.authorize.AccessControlList; |
| import org.apache.hadoop.util.Time; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.ContainerExitStatus; |
| import org.apache.hadoop.yarn.api.records.ContainerStatus; |
| import org.apache.hadoop.yarn.api.records.ExecutionType; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.api.records.QueueACL; |
| import org.apache.hadoop.yarn.api.records.QueueInfo; |
| import org.apache.hadoop.yarn.api.records.QueueState; |
| import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.factories.RecordFactory; |
| import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; |
| import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; |
| import org.apache.hadoop.yarn.security.AccessType; |
| import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.IteratorSelector; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; |
| import org.apache.hadoop.yarn.server.utils.Lock; |
| import org.apache.hadoop.yarn.server.utils.Lock.NoLock; |
| import org.apache.hadoop.yarn.util.SystemClock; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| |
| import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; |
| import org.apache.hadoop.thirdparty.com.google.common.collect.Sets; |
| |
| @Private |
| @Unstable |
| public class LeafQueue extends AbstractCSQueue { |
| private static final Logger LOG = |
| LoggerFactory.getLogger(LeafQueue.class); |
| |
| private float absoluteUsedCapacity = 0.0f; |
| |
| protected int maxApplications; |
| protected volatile int maxApplicationsPerUser; |
| |
| private float maxAMResourcePerQueuePercent; |
| |
| private volatile int nodeLocalityDelay; |
| private volatile int rackLocalityAdditionalDelay; |
| private volatile boolean rackLocalityFullReset; |
| |
| Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap = |
| new ConcurrentHashMap<>(); |
| |
| private Priority defaultAppPriorityPerQueue; |
| |
| private final OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicy; |
| |
| private volatile float minimumAllocationFactor; |
| |
| private final RecordFactory recordFactory = |
| RecordFactoryProvider.getRecordFactory(null); |
| |
| private CapacitySchedulerContext scheduler; |
| |
| private final UsersManager usersManager; |
| |
| // cache last cluster resource to compute actual capacity |
| private Resource lastClusterResource = Resources.none(); |
| |
| private final QueueResourceLimitsInfo queueResourceLimitsInfo = |
| new QueueResourceLimitsInfo(); |
| |
| private volatile ResourceLimits cachedResourceLimitsForHeadroom = null; |
| |
| private volatile OrderingPolicy<FiCaSchedulerApp> orderingPolicy = null; |
| |
| // Map<Partition, Map<SchedulingMode, Map<User, CachedUserLimit>>> |
| // Not thread safe: only the last level is a ConcurrentMap |
| @VisibleForTesting |
| Map<String, Map<SchedulingMode, ConcurrentMap<String, CachedUserLimit>>> |
| userLimitsCache = new HashMap<>(); |
| |
| // Not thread safe |
| @VisibleForTesting |
| long currentUserLimitCacheVersion = 0; |
| |
| // record all ignore partition exclusivityRMContainer, this will be used to do |
| // preemption, key is the partition of the RMContainer allocated on |
| private Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityRMContainers = |
| new ConcurrentHashMap<>(); |
| |
| List<AppPriorityACLGroup> priorityAcls = |
| new ArrayList<AppPriorityACLGroup>(); |
| |
| private final List<FiCaSchedulerApp> runnableApps = new ArrayList<>(); |
| private final List<FiCaSchedulerApp> nonRunnableApps = new ArrayList<>(); |
| |
| @SuppressWarnings({ "unchecked", "rawtypes" }) |
| public LeafQueue(CapacitySchedulerContext cs, |
| String queueName, CSQueue parent, CSQueue old) throws IOException { |
| this(cs, cs.getConfiguration(), queueName, parent, old); |
| } |
| |
| public LeafQueue(CapacitySchedulerContext cs, |
| CapacitySchedulerConfiguration configuration, |
| String queueName, CSQueue parent, CSQueue old) throws |
| IOException { |
| super(cs, configuration, queueName, parent, old); |
| this.scheduler = cs; |
| |
| this.usersManager = new UsersManager(metrics, this, labelManager, scheduler, |
| resourceCalculator); |
| |
| // One time initialization is enough since it is static ordering policy |
| this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps(); |
| |
| LOG.debug("LeafQueue: name={}, fullname={}", queueName, getQueuePath()); |
| |
| setupQueueConfigs(cs.getClusterResource(), configuration); |
| |
| } |
| |
| @SuppressWarnings("checkstyle:nowhitespaceafter") |
| protected void setupQueueConfigs(Resource clusterResource, |
| CapacitySchedulerConfiguration conf) throws |
| IOException { |
| writeLock.lock(); |
| try { |
| CapacitySchedulerConfiguration schedConf = csContext.getConfiguration(); |
| super.setupQueueConfigs(clusterResource, conf); |
| |
| this.lastClusterResource = clusterResource; |
| |
| this.cachedResourceLimitsForHeadroom = new ResourceLimits( |
| clusterResource); |
| |
| // Initialize headroom info, also used for calculating application |
| // master resource limits. Since this happens during queue initialization |
| // and all queues may not be realized yet, we'll use (optimistic) |
| // absoluteMaxCapacity (it will be replaced with the more accurate |
| // absoluteMaxAvailCapacity during headroom/userlimit/allocation events) |
| setQueueResourceLimitsInfo(clusterResource); |
| |
| setOrderingPolicy( |
| conf.<FiCaSchedulerApp>getAppOrderingPolicy(getQueuePath())); |
| |
| usersManager.setUserLimit(conf.getUserLimit(getQueuePath())); |
| usersManager.setUserLimitFactor(conf.getUserLimitFactor(getQueuePath())); |
| |
| maxAMResourcePerQueuePercent = |
| conf.getMaximumApplicationMasterResourcePerQueuePercent( |
| getQueuePath()); |
| |
| maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath()); |
| if (maxApplications < 0) { |
| int maxGlobalPerQueueApps = |
| csContext.getConfiguration().getGlobalMaximumApplicationsPerQueue(); |
| if (maxGlobalPerQueueApps > 0) { |
| maxApplications = maxGlobalPerQueueApps; |
| } |
| } |
| |
| priorityAcls = conf.getPriorityAcls(getQueuePath(), |
| scheduler.getMaxClusterLevelAppPriority()); |
| |
| if (!SchedulerUtils.checkQueueLabelExpression(this.accessibleLabels, |
| this.defaultLabelExpression, null)) { |
| throw new IOException( |
| "Invalid default label expression of " + " queue=" + getQueuePath() |
| + " doesn't have permission to access all labels " |
| + "in default label expression. labelExpression of resource request=" |
| + (this.defaultLabelExpression == null ? |
| "" : |
| this.defaultLabelExpression) + ". Queue labels=" + ( |
| getAccessibleNodeLabels() == null ? |
| "" : |
| StringUtils |
| .join(getAccessibleNodeLabels().iterator(), ','))); |
| } |
| |
| nodeLocalityDelay = schedConf.getNodeLocalityDelay(); |
| rackLocalityAdditionalDelay = schedConf |
| .getRackLocalityAdditionalDelay(); |
| rackLocalityFullReset = schedConf |
| .getRackLocalityFullReset(); |
| |
| // re-init this since max allocation could have changed |
| this.minimumAllocationFactor = Resources.ratio(resourceCalculator, |
| Resources.subtract(maximumAllocation, minimumAllocation), |
| maximumAllocation); |
| |
| StringBuilder aclsString = new StringBuilder(); |
| for (Map.Entry<AccessType, AccessControlList> e : acls.entrySet()) { |
| aclsString.append(e.getKey() + ":" + e.getValue().getAclString()); |
| } |
| |
| StringBuilder labelStrBuilder = new StringBuilder(); |
| if (accessibleLabels != null) { |
| for (String s : accessibleLabels) { |
| labelStrBuilder.append(s) |
| .append(","); |
| } |
| } |
| |
| defaultAppPriorityPerQueue = Priority.newInstance( |
| conf.getDefaultApplicationPriorityConfPerQueue(getQueuePath())); |
| |
| // Validate leaf queue's user's weights. |
| int queueUL = Math.min(100, conf.getUserLimit(getQueuePath())); |
| for (Entry<String, Float> e : getUserWeights().entrySet()) { |
| float val = e.getValue().floatValue(); |
| if (val < 0.0f || val > (100.0f / queueUL)) { |
| throw new IOException("Weight (" + val + ") for user \"" + e.getKey() |
| + "\" must be between 0 and" + " 100 / " + queueUL + " (= " + |
| 100.0f/queueUL + ", the number of concurrent active users in " |
| + getQueuePath() + ")"); |
| } |
| } |
| |
| usersManager.updateUserWeights(); |
| |
| LOG.info( |
| "Initializing " + getQueuePath() + "\n" + |
| getExtendedCapacityOrWeightString() + "\n" |
| + "absoluteCapacity = " + queueCapacities.getAbsoluteCapacity() |
| + " [= parentAbsoluteCapacity * capacity ]" + "\n" |
| + "maxCapacity = " + queueCapacities.getMaximumCapacity() |
| + " [= configuredMaxCapacity ]" + "\n" + "absoluteMaxCapacity = " |
| + queueCapacities.getAbsoluteMaximumCapacity() |
| + " [= 1.0 maximumCapacity undefined, " |
| + "(parentAbsoluteMaxCapacity * maximumCapacity) / 100 otherwise ]" |
| + "\n" + "effectiveMinResource=" + |
| getEffectiveCapacity(CommonNodeLabelsManager.NO_LABEL) + "\n" |
| + " , effectiveMaxResource=" + |
| getEffectiveMaxCapacity(CommonNodeLabelsManager.NO_LABEL) |
| + "\n" + "userLimit = " + usersManager.getUserLimit() |
| + " [= configuredUserLimit ]" + "\n" + "userLimitFactor = " |
| + usersManager.getUserLimitFactor() |
| + " [= configuredUserLimitFactor ]" + "\n" + "maxApplications = " |
| + maxApplications |
| + " [= configuredMaximumSystemApplicationsPerQueue or" |
| + " (int)(configuredMaximumSystemApplications * absoluteCapacity)]" |
| + "\n" + "maxApplicationsPerUser = " + maxApplicationsPerUser |
| + " [= (int)(maxApplications * (userLimit / 100.0f) * " |
| + "userLimitFactor) ]" + "\n" |
| + "maxParallelApps = " + getMaxParallelApps() + "\n" |
| + "usedCapacity = " + |
| + queueCapacities.getUsedCapacity() + " [= usedResourcesMemory / " |
| + "(clusterResourceMemory * absoluteCapacity)]" + "\n" |
| + "absoluteUsedCapacity = " + absoluteUsedCapacity |
| + " [= usedResourcesMemory / clusterResourceMemory]" + "\n" |
| + "maxAMResourcePerQueuePercent = " + maxAMResourcePerQueuePercent |
| + " [= configuredMaximumAMResourcePercent ]" + "\n" |
| + "minimumAllocationFactor = " + minimumAllocationFactor |
| + " [= (float)(maximumAllocationMemory - minimumAllocationMemory) / " |
| + "maximumAllocationMemory ]" + "\n" + "maximumAllocation = " |
| + maximumAllocation + " [= configuredMaxAllocation ]" + "\n" |
| + "numContainers = " + numContainers |
| + " [= currentNumContainers ]" + "\n" + "state = " + getState() |
| + " [= configuredState ]" + "\n" + "acls = " + aclsString |
| + " [= configuredAcls ]" + "\n" |
| + "nodeLocalityDelay = " + nodeLocalityDelay + "\n" |
| + "rackLocalityAdditionalDelay = " |
| + rackLocalityAdditionalDelay + "\n" |
| + "labels=" + labelStrBuilder.toString() + "\n" |
| + "reservationsContinueLooking = " |
| + reservationsContinueLooking + "\n" + "preemptionDisabled = " |
| + getPreemptionDisabled() + "\n" + "defaultAppPriorityPerQueue = " |
| + defaultAppPriorityPerQueue + "\npriority = " + priority |
| + "\nmaxLifetime = " + getMaximumApplicationLifetime() |
| + " seconds" + "\ndefaultLifetime = " |
| + getDefaultApplicationLifetime() + " seconds"); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| /** |
| * Used only by tests. |
| */ |
| @Private |
| public float getMinimumAllocationFactor() { |
| return minimumAllocationFactor; |
| } |
| |
| /** |
| * Used only by tests. |
| */ |
| @Private |
| public float getMaxAMResourcePerQueuePercent() { |
| return maxAMResourcePerQueuePercent; |
| } |
| |
| public int getMaxApplications() { |
| return maxApplications; |
| } |
| |
| public int getMaxApplicationsPerUser() { |
| return maxApplicationsPerUser; |
| } |
| |
| /** |
| * |
| * @return UsersManager instance. |
| */ |
| public UsersManager getUsersManager() { |
| return usersManager; |
| } |
| |
| @Override |
| public AbstractUsersManager getAbstractUsersManager() { |
| return usersManager; |
| } |
| |
| @Override |
| public List<CSQueue> getChildQueues() { |
| return null; |
| } |
| |
| /** |
| * Set user limit - used only for testing. |
| * @param userLimit new user limit |
| */ |
| @VisibleForTesting |
| void setUserLimit(int userLimit) { |
| usersManager.setUserLimit(userLimit); |
| usersManager.userLimitNeedsRecompute(); |
| } |
| |
| /** |
| * Set user limit factor - used only for testing. |
| * @param userLimitFactor new user limit factor |
| */ |
| @VisibleForTesting |
| void setUserLimitFactor(float userLimitFactor) { |
| usersManager.setUserLimitFactor(userLimitFactor); |
| usersManager.userLimitNeedsRecompute(); |
| } |
| |
| @Override |
| public int getNumApplications() { |
| readLock.lock(); |
| try { |
| return getNumPendingApplications() + getNumActiveApplications() + |
| getNumNonRunnableApps(); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public int getNumPendingApplications() { |
| readLock.lock(); |
| try { |
| return pendingOrderingPolicy.getNumSchedulableEntities(); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public int getNumActiveApplications() { |
| readLock.lock(); |
| try { |
| return orderingPolicy.getNumSchedulableEntities(); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Private |
| public int getNumPendingApplications(String user) { |
| readLock.lock(); |
| try { |
| User u = getUser(user); |
| if (null == u) { |
| return 0; |
| } |
| return u.getPendingApplications(); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Private |
| public int getNumActiveApplications(String user) { |
| readLock.lock(); |
| try { |
| User u = getUser(user); |
| if (null == u) { |
| return 0; |
| } |
| return u.getActiveApplications(); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Private |
| public int getUserLimit() { |
| return usersManager.getUserLimit(); |
| } |
| |
| @Private |
| public float getUserLimitFactor() { |
| return usersManager.getUserLimitFactor(); |
| } |
| |
| @Override |
| public QueueInfo getQueueInfo( |
| boolean includeChildQueues, boolean recursive) { |
| QueueInfo queueInfo = getQueueInfo(); |
| return queueInfo; |
| } |
| |
| @Override |
| public List<QueueUserACLInfo> |
| getQueueUserAclInfo(UserGroupInformation user) { |
| readLock.lock(); |
| try { |
| QueueUserACLInfo userAclInfo = recordFactory.newRecordInstance( |
| QueueUserACLInfo.class); |
| List<QueueACL> operations = new ArrayList<>(); |
| for (QueueACL operation : QueueACL.values()) { |
| if (hasAccess(operation, user)) { |
| operations.add(operation); |
| } |
| } |
| |
| userAclInfo.setQueueName(getQueuePath()); |
| userAclInfo.setUserAcls(operations); |
| return Collections.singletonList(userAclInfo); |
| } finally { |
| readLock.unlock(); |
| } |
| |
| } |
| |
| public String toString() { |
| readLock.lock(); |
| try { |
| return getQueuePath() + ": " + getCapacityOrWeightString() |
| + ", " + "absoluteCapacity=" + queueCapacities.getAbsoluteCapacity() |
| + ", " + "usedResources=" + queueUsage.getUsed() + ", " |
| + "usedCapacity=" + getUsedCapacity() + ", " + "absoluteUsedCapacity=" |
| + getAbsoluteUsedCapacity() + ", " + "numApps=" + getNumApplications() |
| + ", " + "numContainers=" + getNumContainers() + ", " |
| + "effectiveMinResource=" + |
| getEffectiveCapacity(CommonNodeLabelsManager.NO_LABEL) + |
| " , effectiveMaxResource=" + |
| getEffectiveMaxCapacity(CommonNodeLabelsManager.NO_LABEL); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| protected String getExtendedCapacityOrWeightString() { |
| if (queueCapacities.getWeight() != -1) { |
| return "weight = " + queueCapacities.getWeight() |
| + " [= (float) configuredCapacity (with w suffix)] " + "\n" |
| + "normalizedWeight = " + queueCapacities.getNormalizedWeight() |
| + " [= (float) configuredCapacity / sum(configuredCapacity of " + |
| "all queues under the parent)]"; |
| } else { |
| return "capacity = " + queueCapacities.getCapacity() |
| + " [= (float) configuredCapacity / 100 ]"; |
| } |
| } |
| |
| @VisibleForTesting |
| public User getUser(String userName) { |
| return usersManager.getUser(userName); |
| } |
| |
| @Private |
| public List<AppPriorityACLGroup> getPriorityACLs() { |
| readLock.lock(); |
| try { |
| return new ArrayList<>(priorityAcls); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| protected void reinitialize( |
| CSQueue newlyParsedQueue, Resource clusterResource, |
| CapacitySchedulerConfiguration configuration) throws |
| IOException { |
| |
| writeLock.lock(); |
| try { |
| // We skip reinitialize for dynamic queues, when this is called, and |
| // new queue is different from this queue, we will make this queue to be |
| // static queue. |
| if (newlyParsedQueue != this) { |
| this.setDynamicQueue(false); |
| } |
| |
| // Sanity check |
| if (!(newlyParsedQueue instanceof LeafQueue) || !newlyParsedQueue |
| .getQueuePath().equals(getQueuePath())) { |
| throw new IOException( |
| "Trying to reinitialize " + getQueuePath() + " from " |
| + newlyParsedQueue.getQueuePath()); |
| } |
| |
| LeafQueue newlyParsedLeafQueue = (LeafQueue) newlyParsedQueue; |
| |
| // don't allow the maximum allocation to be decreased in size |
| // since we have already told running AM's the size |
| Resource oldMax = getMaximumAllocation(); |
| Resource newMax = newlyParsedLeafQueue.getMaximumAllocation(); |
| |
| if (!Resources.fitsIn(oldMax, newMax)) { |
| throw new IOException("Trying to reinitialize " + getQueuePath() |
| + " the maximum allocation size can not be decreased!" |
| + " Current setting: " + oldMax + ", trying to set it to: " |
| + newMax); |
| } |
| |
| setupQueueConfigs(clusterResource, configuration); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void reinitialize( |
| CSQueue newlyParsedQueue, Resource clusterResource) |
| throws IOException { |
| reinitialize(newlyParsedQueue, clusterResource, |
| csContext.getConfiguration()); |
| } |
| |
| @Override |
| public void submitApplicationAttempt(FiCaSchedulerApp application, |
| String userName) { |
| submitApplicationAttempt(application, userName, false); |
| } |
| |
| @Override |
| public void submitApplicationAttempt(FiCaSchedulerApp application, |
| String userName, boolean isMoveApp) { |
| // Careful! Locking order is important! |
| writeLock.lock(); |
| try { |
| // TODO, should use getUser, use this method just to avoid UT failure |
| // which is caused by wrong invoking order, will fix UT separately |
| User user = usersManager.getUserAndAddIfAbsent(userName); |
| |
| // Add the attempt to our data-structures |
| addApplicationAttempt(application, user); |
| } finally { |
| writeLock.unlock(); |
| } |
| |
| // We don't want to update metrics for move app |
| if (!isMoveApp) { |
| metrics.submitAppAttempt(userName); |
| } |
| |
| getParent().submitApplicationAttempt(application, userName); |
| } |
| |
| @Override |
| public void submitApplication(ApplicationId applicationId, String userName, |
| String queue) throws AccessControlException { |
| // Careful! Locking order is important! |
| validateSubmitApplication(applicationId, userName, queue); |
| |
| // Inform the parent queue |
| try { |
| getParent().submitApplication(applicationId, userName, queue); |
| } catch (AccessControlException ace) { |
| LOG.info("Failed to submit application to parent-queue: " + |
| getParent().getQueuePath(), ace); |
| throw ace; |
| } |
| |
| } |
| |
| public void validateSubmitApplication(ApplicationId applicationId, |
| String userName, String queue) throws AccessControlException { |
| writeLock.lock(); |
| try { |
| // Check if the queue is accepting jobs |
| if (getState() != QueueState.RUNNING) { |
| String msg = "Queue " + getQueuePath() |
| + " is STOPPED. Cannot accept submission of application: " |
| + applicationId; |
| LOG.info(msg); |
| throw new AccessControlException(msg); |
| } |
| |
| // Check submission limits for queues |
| //TODO recalculate max applications because they can depend on capacity |
| if (getNumApplications() >= getMaxApplications() && !(this instanceof AutoCreatedLeafQueue)) { |
| String msg = |
| "Queue " + getQueuePath() + " already has " + getNumApplications() |
| + " applications," |
| + " cannot accept submission of application: " + applicationId; |
| LOG.info(msg); |
| throw new AccessControlException(msg); |
| } |
| |
| // Check submission limits for the user on this queue |
| User user = usersManager.getUserAndAddIfAbsent(userName); |
| //TODO recalculate max applications because they can depend on capacity |
| if (user.getTotalApplications() >= getMaxApplicationsPerUser() && !(this instanceof AutoCreatedLeafQueue)) { |
| String msg = "Queue " + getQueuePath() + " already has " + user |
| .getTotalApplications() + " applications from user " + userName |
| + " cannot accept submission of application: " + applicationId; |
| LOG.info(msg); |
| throw new AccessControlException(msg); |
| } |
| } finally { |
| writeLock.unlock(); |
| } |
| |
| try { |
| getParent().validateSubmitApplication(applicationId, userName, queue); |
| } catch (AccessControlException ace) { |
| LOG.info("Failed to submit application to parent-queue: " + |
| getParent().getQueuePath(), ace); |
| throw ace; |
| } |
| } |
| |
| public Resource getAMResourceLimit() { |
| return queueUsage.getAMLimit(); |
| } |
| |
| public Resource getAMResourceLimitPerPartition(String nodePartition) { |
| return queueUsage.getAMLimit(nodePartition); |
| } |
| |
| @VisibleForTesting |
| public Resource calculateAndGetAMResourceLimit() { |
| return calculateAndGetAMResourceLimitPerPartition( |
| RMNodeLabelsManager.NO_LABEL); |
| } |
| |
| @VisibleForTesting |
| public Resource getUserAMResourceLimit() { |
| return getUserAMResourceLimitPerPartition(RMNodeLabelsManager.NO_LABEL, |
| null); |
| } |
| |
| public Resource getUserAMResourceLimitPerPartition( |
| String nodePartition, String userName) { |
| float userWeight = 1.0f; |
| if (userName != null && getUser(userName) != null) { |
| userWeight = getUser(userName).getWeight(); |
| } |
| |
| readLock.lock(); |
| try { |
| /* |
| * The user am resource limit is based on the same approach as the user |
| * limit (as it should represent a subset of that). This means that it uses |
| * the absolute queue capacity (per partition) instead of the max and is |
| * modified by the userlimit and the userlimit factor as is the userlimit |
| */ |
| float effectiveUserLimit = Math.max(usersManager.getUserLimit() / 100.0f, |
| 1.0f / Math.max(getAbstractUsersManager().getNumActiveUsers(), 1)); |
| float preWeightedUserLimit = effectiveUserLimit; |
| effectiveUserLimit = Math.min(effectiveUserLimit * userWeight, 1.0f); |
| |
| Resource queuePartitionResource = getEffectiveCapacity(nodePartition); |
| |
| Resource userAMLimit = Resources.multiplyAndNormalizeUp( |
| resourceCalculator, queuePartitionResource, |
| queueCapacities.getMaxAMResourcePercentage(nodePartition) |
| * effectiveUserLimit * usersManager.getUserLimitFactor(), |
| minimumAllocation); |
| |
| if (getUserLimitFactor() == -1) { |
| userAMLimit = Resources.multiplyAndNormalizeUp( |
| resourceCalculator, queuePartitionResource, |
| queueCapacities.getMaxAMResourcePercentage(nodePartition), |
| minimumAllocation); |
| } |
| |
| userAMLimit = |
| Resources.min(resourceCalculator, lastClusterResource, |
| userAMLimit, |
| Resources.clone(getAMResourceLimitPerPartition(nodePartition))); |
| |
| Resource preWeighteduserAMLimit = |
| Resources.multiplyAndNormalizeUp( |
| resourceCalculator, queuePartitionResource, |
| queueCapacities.getMaxAMResourcePercentage(nodePartition) |
| * preWeightedUserLimit * usersManager.getUserLimitFactor(), |
| minimumAllocation); |
| |
| if (getUserLimitFactor() == -1) { |
| preWeighteduserAMLimit = Resources.multiplyAndNormalizeUp( |
| resourceCalculator, queuePartitionResource, |
| queueCapacities.getMaxAMResourcePercentage(nodePartition), |
| minimumAllocation); |
| } |
| |
| preWeighteduserAMLimit = |
| Resources.min(resourceCalculator, lastClusterResource, |
| preWeighteduserAMLimit, |
| Resources.clone(getAMResourceLimitPerPartition(nodePartition))); |
| queueUsage.setUserAMLimit(nodePartition, preWeighteduserAMLimit); |
| |
| LOG.debug("Effective user AM limit for \"{}\":{}. Effective weighted" |
| + " user AM limit: {}. User weight: {}", userName, |
| preWeighteduserAMLimit, userAMLimit, userWeight); |
| return userAMLimit; |
| } finally { |
| readLock.unlock(); |
| } |
| |
| } |
| |
| public Resource calculateAndGetAMResourceLimitPerPartition( |
| String nodePartition) { |
| writeLock.lock(); |
| try { |
| /* |
| * For non-labeled partition, get the max value from resources currently |
| * available to the queue and the absolute resources guaranteed for the |
| * partition in the queue. For labeled partition, consider only the absolute |
| * resources guaranteed. Multiply this value (based on labeled/ |
| * non-labeled), * with per-partition am-resource-percent to get the max am |
| * resource limit for this queue and partition. |
| */ |
| Resource queuePartitionResource = getEffectiveCapacity(nodePartition); |
| |
| Resource queueCurrentLimit = Resources.none(); |
| // For non-labeled partition, we need to consider the current queue |
| // usage limit. |
| if (nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) { |
| synchronized (queueResourceLimitsInfo){ |
| queueCurrentLimit = queueResourceLimitsInfo.getQueueCurrentLimit(); |
| } |
| } |
| |
| float amResourcePercent = queueCapacities.getMaxAMResourcePercentage( |
| nodePartition); |
| |
| // Current usable resource for this queue and partition is the max of |
| // queueCurrentLimit and queuePartitionResource. |
| Resource queuePartitionUsableResource = Resources.max(resourceCalculator, |
| lastClusterResource, queueCurrentLimit, queuePartitionResource); |
| |
| Resource amResouceLimit = Resources.multiplyAndNormalizeUp( |
| resourceCalculator, queuePartitionUsableResource, amResourcePercent, |
| minimumAllocation); |
| |
| metrics.setAMResouceLimit(nodePartition, amResouceLimit); |
| queueUsage.setAMLimit(nodePartition, amResouceLimit); |
| LOG.debug("Queue: {}, node label : {}, queue partition resource : {}," |
| + " queue current limit : {}, queue partition usable resource : {}," |
| + " amResourceLimit : {}", getQueuePath(), nodePartition, |
| queuePartitionResource, queueCurrentLimit, |
| queuePartitionUsableResource, amResouceLimit); |
| return amResouceLimit; |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| protected void activateApplications() { |
| writeLock.lock(); |
| try { |
| // limit of allowed resource usage for application masters |
| Map<String, Resource> userAmPartitionLimit = |
| new HashMap<String, Resource>(); |
| |
| // AM Resource Limit for accessible labels can be pre-calculated. |
| // This will help in updating AMResourceLimit for all labels when queue |
| // is initialized for the first time (when no applications are present). |
| for (String nodePartition : getNodeLabelsForQueue()) { |
| calculateAndGetAMResourceLimitPerPartition(nodePartition); |
| } |
| |
| for (Iterator<FiCaSchedulerApp> fsApp = |
| getPendingAppsOrderingPolicy() |
| .getAssignmentIterator(IteratorSelector.EMPTY_ITERATOR_SELECTOR); |
| fsApp.hasNext(); ) { |
| FiCaSchedulerApp application = fsApp.next(); |
| ApplicationId applicationId = application.getApplicationId(); |
| |
| // Get the am-node-partition associated with each application |
| // and calculate max-am resource limit for this partition. |
| String partitionName = application.getAppAMNodePartitionName(); |
| |
| Resource amLimit = getAMResourceLimitPerPartition(partitionName); |
| // Verify whether we already calculated am-limit for this label. |
| if (amLimit == null) { |
| amLimit = calculateAndGetAMResourceLimitPerPartition(partitionName); |
| } |
| // Check am resource limit. |
| Resource amIfStarted = Resources.add( |
| application.getAMResource(partitionName), |
| queueUsage.getAMUsed(partitionName)); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("application " + application.getId() + " AMResource " |
| + application.getAMResource(partitionName) |
| + " maxAMResourcePerQueuePercent " + maxAMResourcePerQueuePercent |
| + " amLimit " + amLimit + " lastClusterResource " |
| + lastClusterResource + " amIfStarted " + amIfStarted |
| + " AM node-partition name " + partitionName); |
| } |
| |
| if (!resourceCalculator.fitsIn(amIfStarted, amLimit)) { |
| if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual( |
| resourceCalculator, lastClusterResource, |
| queueUsage.getAMUsed(partitionName), Resources.none()))) { |
| LOG.warn("maximum-am-resource-percent is insufficient to start a" |
| + " single application in queue, it is likely set too low." |
| + " skipping enforcement to allow at least one application" |
| + " to start"); |
| } else{ |
| application.updateAMContainerDiagnostics(AMState.INACTIVATED, |
| CSAMContainerLaunchDiagnosticsConstants.QUEUE_AM_RESOURCE_LIMIT_EXCEED); |
| LOG.debug("Not activating application {} as amIfStarted: {}" |
| + " exceeds amLimit: {}", applicationId, amIfStarted, amLimit); |
| continue; |
| } |
| } |
| |
| // Check user am resource limit |
| User user = getUser(application.getUser()); |
| Resource userAMLimit = userAmPartitionLimit.get(partitionName); |
| |
| // Verify whether we already calculated user-am-limit for this label. |
| if (userAMLimit == null) { |
| userAMLimit = getUserAMResourceLimitPerPartition(partitionName, |
| application.getUser()); |
| userAmPartitionLimit.put(partitionName, userAMLimit); |
| } |
| |
| Resource userAmIfStarted = Resources.add( |
| application.getAMResource(partitionName), |
| user.getConsumedAMResources(partitionName)); |
| |
| if (!resourceCalculator.fitsIn(userAmIfStarted, userAMLimit)) { |
| if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual( |
| resourceCalculator, lastClusterResource, |
| queueUsage.getAMUsed(partitionName), Resources.none()))) { |
| LOG.warn("maximum-am-resource-percent is insufficient to start a" |
| + " single application in queue for user, it is likely set too" |
| + " low. skipping enforcement to allow at least one application" |
| + " to start"); |
| } else{ |
| application.updateAMContainerDiagnostics(AMState.INACTIVATED, |
| CSAMContainerLaunchDiagnosticsConstants.USER_AM_RESOURCE_LIMIT_EXCEED); |
| LOG.debug("Not activating application {} for user: {} as" |
| + " userAmIfStarted: {} exceeds userAmLimit: {}", |
| applicationId, user, userAmIfStarted, userAMLimit); |
| continue; |
| } |
| } |
| user.activateApplication(); |
| orderingPolicy.addSchedulableEntity(application); |
| application.updateAMContainerDiagnostics(AMState.ACTIVATED, null); |
| |
| queueUsage.incAMUsed(partitionName, |
| application.getAMResource(partitionName)); |
| user.getResourceUsage().incAMUsed(partitionName, |
| application.getAMResource(partitionName)); |
| user.getResourceUsage().setAMLimit(partitionName, userAMLimit); |
| metrics.incAMUsed(partitionName, application.getUser(), |
| application.getAMResource(partitionName)); |
| metrics.setAMResouceLimitForUser(partitionName, |
| application.getUser(), userAMLimit); |
| fsApp.remove(); |
| LOG.info("Application " + applicationId + " from user: " + application |
| .getUser() + " activated in queue: " + getQueuePath()); |
| } |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| private void addApplicationAttempt(FiCaSchedulerApp application, |
| User user) { |
| writeLock.lock(); |
| try { |
| applicationAttemptMap.put(application.getApplicationAttemptId(), |
| application); |
| |
| if (application.isRunnable()) { |
| runnableApps.add(application); |
| LOG.debug("Adding runnable application: {}", |
| application.getApplicationAttemptId()); |
| } else { |
| nonRunnableApps.add(application); |
| LOG.info("Application attempt {} is not runnable," |
| + " parallel limit reached", application.getApplicationAttemptId()); |
| return; |
| } |
| |
| // Accept |
| user.submitApplication(); |
| getPendingAppsOrderingPolicy().addSchedulableEntity(application); |
| |
| // Activate applications |
| if (Resources.greaterThan(resourceCalculator, lastClusterResource, |
| lastClusterResource, Resources.none())) { |
| activateApplications(); |
| } else { |
| application.updateAMContainerDiagnostics(AMState.INACTIVATED, |
| CSAMContainerLaunchDiagnosticsConstants.CLUSTER_RESOURCE_EMPTY); |
| LOG.info("Skipping activateApplications for " |
| + application.getApplicationAttemptId() |
| + " since cluster resource is " + Resources.none()); |
| } |
| |
| LOG.info( |
| "Application added -" + " appId: " + application.getApplicationId() |
| + " user: " + application.getUser() + "," + " leaf-queue: " |
| + getQueuePath() + " #user-pending-applications: " + user |
| .getPendingApplications() + " #user-active-applications: " + user |
| .getActiveApplications() + " #queue-pending-applications: " |
| + getNumPendingApplications() + " #queue-active-applications: " |
| + getNumActiveApplications() |
| + " #queue-nonrunnable-applications: " |
| + getNumNonRunnableApps()); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void finishApplication(ApplicationId application, String user) { |
| // Inform the activeUsersManager |
| usersManager.deactivateApplication(user, application); |
| |
| appFinished(); |
| |
| // Inform the parent queue |
| getParent().finishApplication(application, user); |
| } |
| |
| @Override |
| public void finishApplicationAttempt(FiCaSchedulerApp application, String queue) { |
| // Careful! Locking order is important! |
| removeApplicationAttempt(application, application.getUser()); |
| getParent().finishApplicationAttempt(application, queue); |
| } |
| |
| private void removeApplicationAttempt( |
| FiCaSchedulerApp application, String userName) { |
| |
| writeLock.lock(); |
| try { |
| // TODO, should use getUser, use this method just to avoid UT failure |
| // which is caused by wrong invoking order, will fix UT separately |
| User user = usersManager.getUserAndAddIfAbsent(userName); |
| |
| boolean runnable = runnableApps.remove(application); |
| if (!runnable) { |
| // removeNonRunnableApp acquires the write lock again, which is fine |
| if (!removeNonRunnableApp(application)) { |
| LOG.error("Given app to remove " + application + |
| " does not exist in queue " + getQueuePath()); |
| } |
| } |
| |
| String partitionName = application.getAppAMNodePartitionName(); |
| boolean wasActive = orderingPolicy.removeSchedulableEntity(application); |
| if (!wasActive) { |
| pendingOrderingPolicy.removeSchedulableEntity(application); |
| } else{ |
| queueUsage.decAMUsed(partitionName, |
| application.getAMResource(partitionName)); |
| user.getResourceUsage().decAMUsed(partitionName, |
| application.getAMResource(partitionName)); |
| metrics.decAMUsed(partitionName, application.getUser(), |
| application.getAMResource(partitionName)); |
| } |
| applicationAttemptMap.remove(application.getApplicationAttemptId()); |
| |
| user.finishApplication(wasActive); |
| if (user.getTotalApplications() == 0) { |
| usersManager.removeUser(application.getUser()); |
| } |
| |
| // Check if we can activate more applications |
| activateApplications(); |
| |
| LOG.info( |
| "Application removed -" + " appId: " + application.getApplicationId() |
| + " user: " + application.getUser() + " queue: " + getQueuePath() |
| + " #user-pending-applications: " + user.getPendingApplications() |
| + " #user-active-applications: " + user.getActiveApplications() |
| + " #queue-pending-applications: " + getNumPendingApplications() |
| + " #queue-active-applications: " + getNumActiveApplications()); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| private FiCaSchedulerApp getApplication( |
| ApplicationAttemptId applicationAttemptId) { |
| return applicationAttemptMap.get(applicationAttemptId); |
| } |
| |
| private void setPreemptionAllowed(ResourceLimits limits, String nodePartition) { |
| // Set preemption-allowed: |
| // For leaf queue, only under-utilized queue is allowed to preempt resources from other queues |
| if (!queueResourceQuotas.getEffectiveMinResource(nodePartition) |
| .equals(Resources.none())) { |
| limits.setIsAllowPreemption(Resources.lessThan(resourceCalculator, |
| csContext.getClusterResource(), queueUsage.getUsed(nodePartition), |
| queueResourceQuotas.getEffectiveMinResource(nodePartition))); |
| return; |
| } |
| |
| float usedCapacity = queueCapacities.getAbsoluteUsedCapacity(nodePartition); |
| float guaranteedCapacity = queueCapacities.getAbsoluteCapacity(nodePartition); |
| limits.setIsAllowPreemption(usedCapacity < guaranteedCapacity); |
| } |
| |
| private CSAssignment allocateFromReservedContainer(Resource clusterResource, |
| CandidateNodeSet<FiCaSchedulerNode> candidates, |
| ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) { |
| |
| // Irrespective of Single / Multi Node Placement, the allocate from |
| // Reserved Container has to happen only for the single node which |
| // CapacityScheduler#allocateFromReservedContainer invokes with. |
| // Else In Multi Node Placement, there won't be any Allocation or |
| // Reserve of new containers when there is a RESERVED container on |
| // a node which is full. |
| FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates); |
| if (node != null) { |
| RMContainer reservedContainer = node.getReservedContainer(); |
| if (reservedContainer != null) { |
| FiCaSchedulerApp application = getApplication( |
| reservedContainer.getApplicationAttemptId()); |
| |
| if (null != application) { |
| ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, |
| node, SystemClock.getInstance().getTime(), application); |
| CSAssignment assignment = application.assignContainers( |
| clusterResource, candidates, currentResourceLimits, |
| schedulingMode, reservedContainer); |
| return assignment; |
| } |
| } |
| } |
| |
| return null; |
| } |
| |
| private ConcurrentMap<String, CachedUserLimit> getUserLimitCache( |
| String partition, |
| SchedulingMode schedulingMode) { |
| synchronized (userLimitsCache) { |
| long latestVersion = usersManager.getLatestVersionOfUsersState(); |
| |
| if (latestVersion != this.currentUserLimitCacheVersion) { |
| // User limits cache needs invalidating |
| this.currentUserLimitCacheVersion = latestVersion; |
| userLimitsCache.clear(); |
| |
| Map<SchedulingMode, ConcurrentMap<String, CachedUserLimit>> |
| uLCByPartition = new HashMap<>(); |
| userLimitsCache.put(partition, uLCByPartition); |
| |
| ConcurrentMap<String, CachedUserLimit> uLCBySchedulingMode = |
| new ConcurrentHashMap<>(); |
| uLCByPartition.put(schedulingMode, uLCBySchedulingMode); |
| |
| return uLCBySchedulingMode; |
| } |
| |
| // User limits cache does not need invalidating |
| Map<SchedulingMode, ConcurrentMap<String, CachedUserLimit>> |
| uLCByPartition = userLimitsCache.get(partition); |
| if (uLCByPartition == null) { |
| uLCByPartition = new HashMap<>(); |
| userLimitsCache.put(partition, uLCByPartition); |
| } |
| |
| ConcurrentMap<String, CachedUserLimit> uLCBySchedulingMode = |
| uLCByPartition.get(schedulingMode); |
| if (uLCBySchedulingMode == null) { |
| uLCBySchedulingMode = new ConcurrentHashMap<>(); |
| uLCByPartition.put(schedulingMode, uLCBySchedulingMode); |
| } |
| |
| return uLCBySchedulingMode; |
| } |
| } |
| |
| @Override |
| public CSAssignment assignContainers(Resource clusterResource, |
| CandidateNodeSet<FiCaSchedulerNode> candidates, |
| ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) { |
| updateCurrentResourceLimits(currentResourceLimits, clusterResource); |
| FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("assignContainers: partition=" + candidates.getPartition() |
| + " #applications=" + orderingPolicy.getNumSchedulableEntities()); |
| } |
| |
| setPreemptionAllowed(currentResourceLimits, candidates.getPartition()); |
| |
| // Check for reserved resources, try to allocate reserved container first. |
| CSAssignment assignment = allocateFromReservedContainer(clusterResource, |
| candidates, currentResourceLimits, schedulingMode); |
| if (null != assignment) { |
| return assignment; |
| } |
| |
| // if our queue cannot access this node, just return |
| if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY |
| && !accessibleToPartition(candidates.getPartition())) { |
| ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, |
| getParent().getQueuePath(), getQueuePath(), ActivityState.REJECTED, |
| ActivityDiagnosticConstant.QUEUE_NOT_ABLE_TO_ACCESS_PARTITION); |
| return CSAssignment.NULL_ASSIGNMENT; |
| } |
| |
| // Check if this queue need more resource, simply skip allocation if this |
| // queue doesn't need more resources. |
| if (!hasPendingResourceRequest(candidates.getPartition(), clusterResource, |
| schedulingMode)) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Skip this queue=" + getQueuePath() |
| + ", because it doesn't need more resource, schedulingMode=" |
| + schedulingMode.name() + " node-partition=" + candidates |
| .getPartition()); |
| } |
| ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, |
| getParent().getQueuePath(), getQueuePath(), ActivityState.SKIPPED, |
| ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE); |
| return CSAssignment.NULL_ASSIGNMENT; |
| } |
| |
| ConcurrentMap<String, CachedUserLimit> userLimits = |
| this.getUserLimitCache(candidates.getPartition(), schedulingMode); |
| boolean needAssignToQueueCheck = true; |
| IteratorSelector sel = new IteratorSelector(); |
| sel.setPartition(candidates.getPartition()); |
| for (Iterator<FiCaSchedulerApp> assignmentIterator = |
| orderingPolicy.getAssignmentIterator(sel); |
| assignmentIterator.hasNext(); ) { |
| FiCaSchedulerApp application = assignmentIterator.next(); |
| |
| ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, |
| node, SystemClock.getInstance().getTime(), application); |
| |
| // Check queue max-capacity limit |
| Resource appReserved = application.getCurrentReservation(); |
| if (needAssignToQueueCheck) { |
| if (!super.canAssignToThisQueue(clusterResource, |
| candidates.getPartition(), currentResourceLimits, appReserved, |
| schedulingMode)) { |
| ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( |
| activitiesManager, node, application, application.getPriority(), |
| ActivityDiagnosticConstant.QUEUE_HIT_MAX_CAPACITY_LIMIT); |
| ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, |
| getParent().getQueuePath(), getQueuePath(), |
| ActivityState.REJECTED, |
| ActivityDiagnosticConstant.QUEUE_HIT_MAX_CAPACITY_LIMIT); |
| return CSAssignment.NULL_ASSIGNMENT; |
| } |
| // If there was no reservation and canAssignToThisQueue returned |
| // true, there is no reason to check further. |
| if (!this.reservationsContinueLooking |
| || appReserved.equals(Resources.none())) { |
| needAssignToQueueCheck = false; |
| } |
| } |
| |
| CachedUserLimit cul = userLimits.get(application.getUser()); |
| Resource cachedUserLimit = null; |
| if (cul != null) { |
| cachedUserLimit = cul.userLimit; |
| } |
| Resource userLimit = computeUserLimitAndSetHeadroom(application, |
| clusterResource, candidates.getPartition(), schedulingMode, |
| cachedUserLimit); |
| if (cul == null) { |
| cul = new CachedUserLimit(userLimit); |
| CachedUserLimit retVal = |
| userLimits.putIfAbsent(application.getUser(), cul); |
| if (retVal != null) { |
| // another thread updated the user limit cache before us |
| cul = retVal; |
| userLimit = cul.userLimit; |
| } |
| } |
| // Check user limit |
| boolean userAssignable = true; |
| if (!cul.canAssign && Resources.fitsIn(appReserved, cul.reservation)) { |
| userAssignable = false; |
| } else { |
| userAssignable = canAssignToUser(clusterResource, application.getUser(), |
| userLimit, application, candidates.getPartition(), |
| currentResourceLimits); |
| if (!userAssignable && Resources.fitsIn(cul.reservation, appReserved)) { |
| cul.canAssign = false; |
| cul.reservation = appReserved; |
| } |
| } |
| if (!userAssignable) { |
| application.updateAMContainerDiagnostics(AMState.ACTIVATED, |
| "User capacity has reached its maximum limit."); |
| ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( |
| activitiesManager, node, application, application.getPriority(), |
| ActivityDiagnosticConstant.QUEUE_HIT_USER_MAX_CAPACITY_LIMIT); |
| continue; |
| } |
| |
| // Try to schedule |
| assignment = application.assignContainers(clusterResource, |
| candidates, currentResourceLimits, schedulingMode, null); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("post-assignContainers for application " + application |
| .getApplicationId()); |
| application.showRequests(); |
| } |
| |
| // Did we schedule or reserve a container? |
| Resource assigned = assignment.getResource(); |
| |
| if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, |
| Resources.none())) { |
| ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, |
| getParent().getQueuePath(), getQueuePath(), |
| ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY); |
| return assignment; |
| } else if (assignment.getSkippedType() |
| == CSAssignment.SkippedType.OTHER) { |
| ActivitiesLogger.APP.finishSkippedAppAllocationRecording( |
| activitiesManager, application.getApplicationId(), |
| ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY); |
| application.updateNodeInfoForAMDiagnostics(node); |
| } else if (assignment.getSkippedType() |
| == CSAssignment.SkippedType.QUEUE_LIMIT) { |
| ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, |
| getParent().getQueuePath(), getQueuePath(), ActivityState.REJECTED, |
| () -> ActivityDiagnosticConstant.QUEUE_DO_NOT_HAVE_ENOUGH_HEADROOM |
| + " from " + application.getApplicationId()); |
| return assignment; |
| } else{ |
| // If we don't allocate anything, and it is not skipped by application, |
| // we will return to respect FIFO of applications |
| ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, |
| getParent().getQueuePath(), getQueuePath(), ActivityState.SKIPPED, |
| ActivityDiagnosticConstant.QUEUE_SKIPPED_TO_RESPECT_FIFO); |
| ActivitiesLogger.APP.finishSkippedAppAllocationRecording( |
| activitiesManager, application.getApplicationId(), |
| ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY); |
| return CSAssignment.NULL_ASSIGNMENT; |
| } |
| } |
| ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, |
| getParent().getQueuePath(), getQueuePath(), ActivityState.SKIPPED, |
| ActivityDiagnosticConstant.EMPTY); |
| |
| return CSAssignment.NULL_ASSIGNMENT; |
| } |
| |
| @Override |
| public boolean accept(Resource cluster, |
| ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) { |
| ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> allocation = |
| request.getFirstAllocatedOrReservedContainer(); |
| SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> schedulerContainer = |
| allocation.getAllocatedOrReservedContainer(); |
| |
| // Do not check limits when allocation from a reserved container |
| if (allocation.getAllocateFromReservedContainer() == null) { |
| readLock.lock(); |
| try { |
| FiCaSchedulerApp app = |
| schedulerContainer.getSchedulerApplicationAttempt(); |
| String username = app.getUser(); |
| String p = schedulerContainer.getNodePartition(); |
| |
| // check user-limit |
| Resource userLimit = computeUserLimitAndSetHeadroom(app, cluster, p, |
| allocation.getSchedulingMode(), null); |
| |
| // Deduct resources that we can release |
| User user = getUser(username); |
| if (user == null) { |
| LOG.debug("User {} has been removed!", username); |
| return false; |
| } |
| Resource usedResource = Resources.clone(user.getUsed(p)); |
| Resources.subtractFrom(usedResource, |
| request.getTotalReleasedResource()); |
| |
| if (Resources.greaterThan(resourceCalculator, cluster, usedResource, |
| userLimit)) { |
| LOG.debug("Used resource={} exceeded user-limit={}", |
| usedResource, userLimit); |
| return false; |
| } |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| return super.accept(cluster, request); |
| } |
| |
| private void internalReleaseContainer(Resource clusterResource, |
| SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> schedulerContainer) { |
| RMContainer rmContainer = schedulerContainer.getRmContainer(); |
| |
| LeafQueue targetLeafQueue = |
| schedulerContainer.getSchedulerApplicationAttempt().getCSLeafQueue(); |
| |
| if (targetLeafQueue == this) { |
| // When trying to preempt containers from the same queue |
| if (rmContainer.getState() == RMContainerState.RESERVED) { |
| // For other reserved containers |
| // This is a reservation exchange, complete previous reserved container |
| completedContainer(clusterResource, |
| schedulerContainer.getSchedulerApplicationAttempt(), |
| schedulerContainer.getSchedulerNode(), rmContainer, SchedulerUtils |
| .createAbnormalContainerStatus(rmContainer.getContainerId(), |
| SchedulerUtils.UNRESERVED_CONTAINER), |
| RMContainerEventType.RELEASED, null, false); |
| } |
| } else{ |
| // When trying to preempt containers from different queue -- this |
| // is for lazy preemption feature (kill preemption candidate in scheduling |
| // cycle). |
| targetLeafQueue.completedContainer(clusterResource, |
| schedulerContainer.getSchedulerApplicationAttempt(), |
| schedulerContainer.getSchedulerNode(), |
| schedulerContainer.getRmContainer(), SchedulerUtils |
| .createPreemptedContainerStatus(rmContainer.getContainerId(), |
| SchedulerUtils.PREEMPTED_CONTAINER), |
| RMContainerEventType.KILL, null, false); |
| } |
| } |
| |
| private void releaseContainers(Resource clusterResource, |
| ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) { |
| for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> c : request |
| .getContainersToRelease()) { |
| internalReleaseContainer(clusterResource, c); |
| } |
| |
| // Handle container reservation looking, or lazy preemption case: |
| if (null != request.getContainersToAllocate() && !request |
| .getContainersToAllocate().isEmpty()) { |
| for (ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> context : request |
| .getContainersToAllocate()) { |
| if (null != context.getToRelease()) { |
| for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> c : context |
| .getToRelease()) { |
| internalReleaseContainer(clusterResource, c); |
| } |
| } |
| } |
| } |
| } |
| |
| public void apply(Resource cluster, |
| ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) { |
| // Do we need to call parent queue's apply? |
| boolean applyToParentQueue = false; |
| |
| releaseContainers(cluster, request); |
| |
| writeLock.lock(); |
| try { |
| if (request.anythingAllocatedOrReserved()) { |
| ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> |
| allocation = request.getFirstAllocatedOrReservedContainer(); |
| SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> |
| schedulerContainer = allocation.getAllocatedOrReservedContainer(); |
| |
| // Do not modify queue when allocation from reserved container |
| if (allocation.getAllocateFromReservedContainer() == null) { |
| // Only invoke apply() of ParentQueue when new allocation / |
| // reservation happen. |
| applyToParentQueue = true; |
| // Book-keeping |
| // Note: Update headroom to account for current allocation too... |
| allocateResource(cluster, |
| schedulerContainer.getSchedulerApplicationAttempt(), |
| allocation.getAllocatedOrReservedResource(), |
| schedulerContainer.getNodePartition(), |
| schedulerContainer.getRmContainer()); |
| orderingPolicy.containerAllocated( |
| schedulerContainer.getSchedulerApplicationAttempt(), |
| schedulerContainer.getRmContainer()); |
| } |
| |
| // Update reserved resource |
| if (Resources.greaterThan(resourceCalculator, cluster, |
| request.getTotalReservedResource(), Resources.none())) { |
| incReservedResource(schedulerContainer.getNodePartition(), |
| request.getTotalReservedResource()); |
| } |
| } |
| } finally { |
| writeLock.unlock(); |
| } |
| |
| if (parent != null && applyToParentQueue) { |
| parent.apply(cluster, request); |
| } |
| } |
| |
| |
| protected Resource getHeadroom(User user, Resource queueCurrentLimit, |
| Resource clusterResource, FiCaSchedulerApp application) { |
| return getHeadroom(user, queueCurrentLimit, clusterResource, application, |
| RMNodeLabelsManager.NO_LABEL); |
| } |
| |
| protected Resource getHeadroom(User user, Resource queueCurrentLimit, |
| Resource clusterResource, FiCaSchedulerApp application, |
| String partition) { |
| return getHeadroom(user, queueCurrentLimit, clusterResource, |
| getResourceLimitForActiveUsers(application.getUser(), clusterResource, |
| partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), |
| partition); |
| } |
| |
| private Resource getHeadroom(User user, |
| Resource currentPartitionResourceLimit, Resource clusterResource, |
| Resource userLimitResource, String partition) { |
| /** |
| * Headroom is: |
| * min( |
| * min(userLimit, queueMaxCap) - userConsumed, |
| * queueMaxCap - queueUsedResources |
| * ) |
| * |
| * ( which can be expressed as, |
| * min (userLimit - userConsumed, queuMaxCap - userConsumed, |
| * queueMaxCap - queueUsedResources) |
| * ) |
| * |
| * given that queueUsedResources >= userConsumed, this simplifies to |
| * |
| * >> min (userlimit - userConsumed, queueMaxCap - queueUsedResources) << |
| * |
| * sum of queue max capacities of multiple queue's will be greater than the |
| * actual capacity of a given partition, hence we need to ensure that the |
| * headroom is not greater than the available resource for a given partition |
| * |
| * headroom = min (unused resourcelimit of a label, calculated headroom ) |
| */ |
| currentPartitionResourceLimit = |
| partition.equals(RMNodeLabelsManager.NO_LABEL) |
| ? currentPartitionResourceLimit |
| : getQueueMaxResource(partition); |
| |
| Resource headroom = Resources.componentwiseMin( |
| Resources.subtractNonNegative(userLimitResource, |
| user.getUsed(partition)), |
| Resources.subtractNonNegative(currentPartitionResourceLimit, |
| queueUsage.getUsed(partition))); |
| // Normalize it before return |
| headroom = |
| Resources.roundDown(resourceCalculator, headroom, minimumAllocation); |
| |
| //headroom = min (unused resourcelimit of a label, calculated headroom ) |
| Resource clusterPartitionResource = |
| labelManager.getResourceByLabel(partition, clusterResource); |
| Resource clusterFreePartitionResource = |
| Resources.subtract(clusterPartitionResource, |
| csContext.getClusterResourceUsage().getUsed(partition)); |
| headroom = Resources.min(resourceCalculator, clusterPartitionResource, |
| clusterFreePartitionResource, headroom); |
| return headroom; |
| } |
| |
| private void setQueueResourceLimitsInfo( |
| Resource clusterResource) { |
| synchronized (queueResourceLimitsInfo) { |
| queueResourceLimitsInfo.setQueueCurrentLimit(cachedResourceLimitsForHeadroom |
| .getLimit()); |
| queueResourceLimitsInfo.setClusterResource(clusterResource); |
| } |
| } |
| |
| // It doesn't necessarily to hold application's lock here. |
| @Lock({LeafQueue.class}) |
| Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, |
| Resource clusterResource, String nodePartition, |
| SchedulingMode schedulingMode, Resource userLimit) { |
| String user = application.getUser(); |
| User queueUser = getUser(user); |
| if (queueUser == null) { |
| LOG.debug("User {} has been removed!", user); |
| return Resources.none(); |
| } |
| |
| // Compute user limit respect requested labels, |
| // TODO, need consider headroom respect labels also |
| if (userLimit == null) { |
| userLimit = getResourceLimitForActiveUsers(application.getUser(), |
| clusterResource, nodePartition, schedulingMode); |
| } |
| setQueueResourceLimitsInfo(clusterResource); |
| |
| Resource headroom = |
| metrics.getUserMetrics(user) == null ? Resources.none() : |
| getHeadroom(queueUser, cachedResourceLimitsForHeadroom.getLimit(), |
| clusterResource, userLimit, nodePartition); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Headroom calculation for user " + user + ": " + " userLimit=" |
| + userLimit + " queueMaxAvailRes=" |
| + cachedResourceLimitsForHeadroom.getLimit() + " consumed=" |
| + queueUser.getUsed() + " partition=" |
| + nodePartition); |
| } |
| |
| CapacityHeadroomProvider headroomProvider = new CapacityHeadroomProvider( |
| queueUser, this, application, queueResourceLimitsInfo); |
| |
| application.setHeadroomProvider(headroomProvider); |
| |
| metrics.setAvailableResourcesToUser(nodePartition, user, headroom); |
| |
| return userLimit; |
| } |
| |
| @Lock(NoLock.class) |
| public int getNodeLocalityDelay() { |
| return nodeLocalityDelay; |
| } |
| |
| @Lock(NoLock.class) |
| public int getRackLocalityAdditionalDelay() { |
| return rackLocalityAdditionalDelay; |
| } |
| |
| @Lock(NoLock.class) |
| public boolean getRackLocalityFullReset() { |
| return rackLocalityFullReset; |
| } |
| |
| /** |
| * |
| * @param userName |
| * Name of user who has submitted one/more app to given queue. |
| * @param clusterResource |
| * total cluster resource |
| * @param nodePartition |
| * partition name |
| * @param schedulingMode |
| * scheduling mode |
| * RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY |
| * @return Computed User Limit |
| */ |
| public Resource getResourceLimitForActiveUsers(String userName, |
| Resource clusterResource, String nodePartition, |
| SchedulingMode schedulingMode) { |
| return usersManager.getComputedResourceLimitForActiveUsers(userName, |
| clusterResource, nodePartition, schedulingMode); |
| } |
| |
| /** |
| * |
| * @param userName |
| * Name of user who has submitted one/more app to given queue. |
| * @param clusterResource |
| * total cluster resource |
| * @param nodePartition |
| * partition name |
| * @param schedulingMode |
| * scheduling mode |
| * RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY |
| * @return Computed User Limit |
| */ |
| public Resource getResourceLimitForAllUsers(String userName, |
| Resource clusterResource, String nodePartition, |
| SchedulingMode schedulingMode) { |
| return usersManager.getComputedResourceLimitForAllUsers(userName, |
| clusterResource, nodePartition, schedulingMode); |
| } |
| |
| @Private |
| protected boolean canAssignToUser(Resource clusterResource, |
| String userName, Resource limit, FiCaSchedulerApp application, |
| String nodePartition, ResourceLimits currentResourceLimits) { |
| |
| readLock.lock(); |
| try { |
| User user = getUser(userName); |
| if (user == null) { |
| LOG.debug("User {} has been removed!", userName); |
| return false; |
| } |
| |
| currentResourceLimits.setAmountNeededUnreserve(Resources.none()); |
| |
| // Note: We aren't considering the current request since there is a fixed |
| // overhead of the AM, but it's a > check, not a >= check, so... |
| if (Resources.greaterThan(resourceCalculator, clusterResource, |
| user.getUsed(nodePartition), limit)) { |
| // if enabled, check to see if could we potentially use this node instead |
| // of a reserved node if the application has reserved containers |
| if (this.reservationsContinueLooking) { |
| if (Resources.lessThanOrEqual(resourceCalculator, clusterResource, |
| Resources.subtract(user.getUsed(), |
| application.getCurrentReservation()), limit)) { |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("User " + userName + " in queue " + getQueuePath() |
| + " will exceed limit based on reservations - " |
| + " consumed: " + user.getUsed() + " reserved: " + application |
| .getCurrentReservation() + " limit: " + limit); |
| } |
| Resource amountNeededToUnreserve = Resources.subtract( |
| user.getUsed(nodePartition), limit); |
| // we can only acquire a new container if we unreserve first to |
| // respect user-limit |
| currentResourceLimits.setAmountNeededUnreserve( |
| amountNeededToUnreserve); |
| return true; |
| } |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("User " + userName + " in queue " + getQueuePath() |
| + " will exceed limit - " + " consumed: " + user |
| .getUsed(nodePartition) + " limit: " + limit); |
| } |
| return false; |
| } |
| return true; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| private void updateSchedulerHealthForCompletedContainer( |
| RMContainer rmContainer, ContainerStatus containerStatus) { |
| // Update SchedulerHealth for released / preempted container |
| SchedulerHealth schedulerHealth = csContext.getSchedulerHealth(); |
| if (null == schedulerHealth) { |
| // Only do update if we have schedulerHealth |
| return; |
| } |
| |
| if (containerStatus.getExitStatus() == ContainerExitStatus.PREEMPTED) { |
| schedulerHealth.updatePreemption(Time.now(), rmContainer.getAllocatedNode(), |
| rmContainer.getContainerId(), getQueuePath()); |
| schedulerHealth.updateSchedulerPreemptionCounts(1); |
| } else { |
| schedulerHealth.updateRelease(csContext.getLastNodeUpdateTime(), |
| rmContainer.getAllocatedNode(), rmContainer.getContainerId(), |
| getQueuePath()); |
| } |
| } |
| |
| /** |
| * Recalculate QueueUsage Ratio. |
| * |
| * @param clusterResource |
| * Total Cluster Resource |
| * @param nodePartition |
| * Partition |
| */ |
| public void recalculateQueueUsageRatio(Resource clusterResource, |
| String nodePartition) { |
| writeLock.lock(); |
| try { |
| ResourceUsage queueResourceUsage = getQueueResourceUsage(); |
| |
| if (nodePartition == null) { |
| for (String partition : Sets.union( |
| getQueueCapacities().getNodePartitionsSet(), |
| queueResourceUsage.getNodePartitionsSet())) { |
| usersManager.updateUsageRatio(partition, clusterResource); |
| } |
| } else { |
| usersManager.updateUsageRatio(nodePartition, clusterResource); |
| } |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void completedContainer(Resource clusterResource, |
| FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer, |
| ContainerStatus containerStatus, RMContainerEventType event, CSQueue childQueue, |
| boolean sortQueues) { |
| // Update SchedulerHealth for released / preempted container |
| updateSchedulerHealthForCompletedContainer(rmContainer, containerStatus); |
| |
| if (application != null) { |
| boolean removed = false; |
| |
| // Careful! Locking order is important! |
| writeLock.lock(); |
| try { |
| Container container = rmContainer.getContainer(); |
| |
| // Inform the application & the node |
| // Note: It's safe to assume that all state changes to RMContainer |
| // happen under scheduler's lock... |
| // So, this is, in effect, a transaction across application & node |
| if (rmContainer.getState() == RMContainerState.RESERVED) { |
| removed = application.unreserve(rmContainer.getReservedSchedulerKey(), |
| node, rmContainer); |
| } else{ |
| removed = application.containerCompleted(rmContainer, containerStatus, |
| event, node.getPartition()); |
| |
| node.releaseContainer(rmContainer.getContainerId(), false); |
| } |
| |
| // Book-keeping |
| if (removed) { |
| |
| // Inform the ordering policy |
| orderingPolicy.containerReleased(application, rmContainer); |
| |
| releaseResource(clusterResource, application, container.getResource(), |
| node.getPartition(), rmContainer); |
| } |
| } finally { |
| writeLock.unlock(); |
| } |
| |
| |
| if (removed) { |
| // Inform the parent queue _outside_ of the leaf-queue lock |
| getParent().completedContainer(clusterResource, application, node, |
| rmContainer, null, event, this, sortQueues); |
| } |
| } |
| |
| // Notify PreemptionManager |
| csContext.getPreemptionManager().removeKillableContainer( |
| new KillableContainer( |
| rmContainer, |
| node.getPartition(), |
| getQueuePath())); |
| |
| // Update preemption metrics if exit status is PREEMPTED |
| if (containerStatus != null |
| && ContainerExitStatus.PREEMPTED == containerStatus.getExitStatus()) { |
| updateQueuePreemptionMetrics(rmContainer); |
| } |
| } |
| |
| void allocateResource(Resource clusterResource, |
| SchedulerApplicationAttempt application, Resource resource, |
| String nodePartition, RMContainer rmContainer) { |
| writeLock.lock(); |
| try { |
| super.allocateResource(clusterResource, resource, nodePartition); |
| |
| // handle ignore exclusivity container |
| if (null != rmContainer && rmContainer.getNodeLabelExpression().equals( |
| RMNodeLabelsManager.NO_LABEL) && !nodePartition.equals( |
| RMNodeLabelsManager.NO_LABEL)) { |
| TreeSet<RMContainer> rmContainers = null; |
| if (null == (rmContainers = ignorePartitionExclusivityRMContainers.get( |
| nodePartition))) { |
| rmContainers = new TreeSet<>(); |
| ignorePartitionExclusivityRMContainers.put(nodePartition, |
| rmContainers); |
| } |
| rmContainers.add(rmContainer); |
| } |
| |
| // Update user metrics |
| String userName = application.getUser(); |
| |
| // Increment user's resource usage. |
| User user = usersManager.updateUserResourceUsage(userName, resource, |
| nodePartition, true); |
| |
| Resource partitionHeadroom = Resources.createResource(0, 0); |
| if (metrics.getUserMetrics(userName) != null) { |
| partitionHeadroom = getHeadroom(user, |
| cachedResourceLimitsForHeadroom.getLimit(), clusterResource, |
| getResourceLimitForActiveUsers(userName, clusterResource, |
| nodePartition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), |
| nodePartition); |
| } |
| metrics.setAvailableResourcesToUser(nodePartition, userName, |
| partitionHeadroom); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(getQueuePath() + " user=" + userName + " used=" |
| + queueUsage.getUsed(nodePartition) + " numContainers=" |
| + numContainers + " headroom = " + application.getHeadroom() |
| + " user-resources=" + user.getUsed()); |
| } |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| void releaseResource(Resource clusterResource, |
| FiCaSchedulerApp application, Resource resource, String nodePartition, |
| RMContainer rmContainer) { |
| writeLock.lock(); |
| try { |
| super.releaseResource(clusterResource, resource, nodePartition); |
| |
| // handle ignore exclusivity container |
| if (null != rmContainer && rmContainer.getNodeLabelExpression().equals( |
| RMNodeLabelsManager.NO_LABEL) && !nodePartition.equals( |
| RMNodeLabelsManager.NO_LABEL)) { |
| if (ignorePartitionExclusivityRMContainers.containsKey(nodePartition)) { |
| Set<RMContainer> rmContainers = |
| ignorePartitionExclusivityRMContainers.get(nodePartition); |
| rmContainers.remove(rmContainer); |
| if (rmContainers.isEmpty()) { |
| ignorePartitionExclusivityRMContainers.remove(nodePartition); |
| } |
| } |
| } |
| |
| // Update user metrics |
| String userName = application.getUser(); |
| User user = usersManager.updateUserResourceUsage(userName, resource, |
| nodePartition, false); |
| |
| Resource partitionHeadroom = Resources.createResource(0, 0); |
| if (metrics.getUserMetrics(userName) != null) { |
| partitionHeadroom = getHeadroom(user, |
| cachedResourceLimitsForHeadroom.getLimit(), clusterResource, |
| getResourceLimitForActiveUsers(userName, clusterResource, |
| nodePartition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), |
| nodePartition); |
| } |
| metrics.setAvailableResourcesToUser(nodePartition, userName, |
| partitionHeadroom); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug( |
| getQueuePath() + " used=" + queueUsage.getUsed() + " numContainers=" |
| + numContainers + " user=" + userName + " user-resources=" |
| + user.getUsed()); |
| } |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| private void updateCurrentResourceLimits( |
| ResourceLimits currentResourceLimits, Resource clusterResource) { |
| // TODO: need consider non-empty node labels when resource limits supports |
| // node labels |
| // Even if ParentQueue will set limits respect child's max queue capacity, |
| // but when allocating reserved container, CapacityScheduler doesn't do |
| // this. So need cap limits by queue's max capacity here. |
| this.cachedResourceLimitsForHeadroom = |
| new ResourceLimits(currentResourceLimits.getLimit()); |
| Resource queueMaxResource = getEffectiveMaxCapacityDown( |
| RMNodeLabelsManager.NO_LABEL, minimumAllocation); |
| this.cachedResourceLimitsForHeadroom.setLimit(Resources.min( |
| resourceCalculator, clusterResource, queueMaxResource, |
| currentResourceLimits.getLimit())); |
| } |
| |
| private void updateAbsoluteCapacitiesAndRelatedFields() { |
| updateAbsoluteCapacities(); |
| CapacitySchedulerConfiguration schedulerConf = csContext.getConfiguration(); |
| |
| // If maxApplications not set, use the system total max app, apply newly |
| // calculated abs capacity of the queue. |
| if (maxApplications <= 0) { |
| int maxSystemApps = schedulerConf. |
| getMaximumSystemApplications(); |
| maxApplications = |
| (int) (maxSystemApps * queueCapacities.getAbsoluteCapacity()); |
| } |
| maxApplicationsPerUser = |
| Math.min(maxApplications, |
| (int) (maxApplications * (usersManager.getUserLimit() / 100.0f) |
| * usersManager.getUserLimitFactor())); |
| |
| if (getUserLimitFactor() == -1) { |
| maxApplicationsPerUser = maxApplications; |
| } |
| } |
| |
| @Override |
| public void updateClusterResource(Resource clusterResource, |
| ResourceLimits currentResourceLimits) { |
| writeLock.lock(); |
| try { |
| lastClusterResource = clusterResource; |
| |
| updateAbsoluteCapacitiesAndRelatedFields(); |
| |
| super.updateEffectiveResources(clusterResource); |
| |
| updateCurrentResourceLimits(currentResourceLimits, clusterResource); |
| |
| // Update headroom info based on new cluster resource value |
| // absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity |
| // during allocation |
| setQueueResourceLimitsInfo(clusterResource); |
| |
| // Update user consumedRatios |
| recalculateQueueUsageRatio(clusterResource, null); |
| |
| // Update metrics |
| CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, |
| this, labelManager, null); |
| // Update configured capacity/max-capacity for default partition only |
| CSQueueUtils.updateConfiguredCapacityMetrics(resourceCalculator, |
| labelManager.getResourceByLabel(null, clusterResource), |
| RMNodeLabelsManager.NO_LABEL, this); |
| |
| // queue metrics are updated, more resource may be available |
| // activate the pending applications if possible |
| activateApplications(); |
| |
| // In case of any resource change, invalidate recalculateULCount to clear |
| // the computed user-limit. |
| usersManager.userLimitNeedsRecompute(); |
| |
| // Update application properties |
| for (FiCaSchedulerApp application : orderingPolicy |
| .getSchedulableEntities()) { |
| computeUserLimitAndSetHeadroom(application, clusterResource, |
| RMNodeLabelsManager.NO_LABEL, |
| SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null); |
| } |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void incUsedResource(String nodeLabel, Resource resourceToInc, |
| SchedulerApplicationAttempt application) { |
| usersManager.updateUserResourceUsage(application.getUser(), resourceToInc, |
| nodeLabel, true); |
| super.incUsedResource(nodeLabel, resourceToInc, application); |
| } |
| |
| @Override |
| public void decUsedResource(String nodeLabel, Resource resourceToDec, |
| SchedulerApplicationAttempt application) { |
| usersManager.updateUserResourceUsage(application.getUser(), resourceToDec, |
| nodeLabel, false); |
| super.decUsedResource(nodeLabel, resourceToDec, application); |
| } |
| |
| public void incAMUsedResource(String nodeLabel, Resource resourceToInc, |
| SchedulerApplicationAttempt application) { |
| getUser(application.getUser()).getResourceUsage().incAMUsed(nodeLabel, |
| resourceToInc); |
| // ResourceUsage has its own lock, no addition lock needs here. |
| queueUsage.incAMUsed(nodeLabel, resourceToInc); |
| } |
| |
| public void decAMUsedResource(String nodeLabel, Resource resourceToDec, |
| SchedulerApplicationAttempt application) { |
| getUser(application.getUser()).getResourceUsage().decAMUsed(nodeLabel, |
| resourceToDec); |
| // ResourceUsage has its own lock, no addition lock needs here. |
| queueUsage.decAMUsed(nodeLabel, resourceToDec); |
| } |
| |
| @Override |
| public void recoverContainer(Resource clusterResource, |
| SchedulerApplicationAttempt attempt, RMContainer rmContainer) { |
| if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { |
| return; |
| } |
| if (rmContainer.getExecutionType() != ExecutionType.GUARANTEED) { |
| return; |
| } |
| // Careful! Locking order is important! |
| writeLock.lock(); |
| try { |
| FiCaSchedulerNode node = scheduler.getNode( |
| rmContainer.getContainer().getNodeId()); |
| allocateResource(clusterResource, attempt, |
| rmContainer.getContainer().getResource(), node.getPartition(), |
| rmContainer); |
| } finally { |
| writeLock.unlock(); |
| } |
| |
| getParent().recoverContainer(clusterResource, attempt, rmContainer); |
| } |
| |
| /** |
| * Obtain (read-only) collection of pending applications. |
| */ |
| public Collection<FiCaSchedulerApp> getPendingApplications() { |
| return Collections.unmodifiableCollection(pendingOrderingPolicy |
| .getSchedulableEntities()); |
| } |
| |
| /** |
| * Obtain (read-only) collection of active applications. |
| */ |
| public Collection<FiCaSchedulerApp> getApplications() { |
| return Collections.unmodifiableCollection(orderingPolicy |
| .getSchedulableEntities()); |
| } |
| |
| /** |
| * Obtain (read-only) collection of all applications. |
| */ |
| public Collection<FiCaSchedulerApp> getAllApplications() { |
| Collection<FiCaSchedulerApp> apps = new HashSet<FiCaSchedulerApp>( |
| pendingOrderingPolicy.getSchedulableEntities()); |
| apps.addAll(orderingPolicy.getSchedulableEntities()); |
| |
| return Collections.unmodifiableCollection(apps); |
| } |
| |
| /** |
| * Get total pending resource considering user limit for the leaf queue. This |
| * will be used for calculating pending resources in the preemption monitor. |
| * |
| * Consider the headroom for each user in the queue. |
| * Total pending for the queue = |
| * sum(for each user(min((user's headroom), sum(user's pending requests)))) |
| * NOTE: |
| |
| * @param clusterResources clusterResource |
| * @param partition node partition |
| * @param deductReservedFromPending When a container is reserved in CS, |
| * pending resource will not be deducted. |
| * This could lead to double accounting when |
| * doing preemption: |
| * In normal cases, we should deduct reserved |
| * resource from pending to avoid |
| * excessive preemption. |
| * @return Total pending resource considering user limit |
| */ |
| public Resource getTotalPendingResourcesConsideringUserLimit( |
| Resource clusterResources, String partition, |
| boolean deductReservedFromPending) { |
| readLock.lock(); |
| try { |
| Map<String, Resource> userNameToHeadroom = |
| new HashMap<>(); |
| Resource totalPendingConsideringUserLimit = Resource.newInstance(0, 0); |
| for (FiCaSchedulerApp app : getApplications()) { |
| String userName = app.getUser(); |
| if (!userNameToHeadroom.containsKey(userName)) { |
| User user = getUser(userName); |
| Resource headroom = Resources.subtract( |
| getResourceLimitForActiveUsers(app.getUser(), clusterResources, |
| partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), |
| user.getUsed(partition)); |
| // Make sure headroom is not negative. |
| headroom = Resources.componentwiseMax(headroom, Resources.none()); |
| userNameToHeadroom.put(userName, headroom); |
| } |
| |
| // Check if we need to deduct reserved from pending |
| Resource pending = app.getAppAttemptResourceUsage().getPending( |
| partition); |
| if (deductReservedFromPending) { |
| pending = Resources.subtract(pending, |
| app.getAppAttemptResourceUsage().getReserved(partition)); |
| } |
| pending = Resources.componentwiseMax(pending, Resources.none()); |
| |
| Resource minpendingConsideringUserLimit = Resources.componentwiseMin( |
| userNameToHeadroom.get(userName), pending); |
| Resources.addTo(totalPendingConsideringUserLimit, |
| minpendingConsideringUserLimit); |
| Resources.subtractFrom(userNameToHeadroom.get(userName), |
| minpendingConsideringUserLimit); |
| } |
| return totalPendingConsideringUserLimit; |
| } finally { |
| readLock.unlock(); |
| } |
| |
| } |
| |
| @Override |
| public void collectSchedulerApplications( |
| Collection<ApplicationAttemptId> apps) { |
| readLock.lock(); |
| try { |
| for (FiCaSchedulerApp pendingApp : pendingOrderingPolicy |
| .getSchedulableEntities()) { |
| apps.add(pendingApp.getApplicationAttemptId()); |
| } |
| for (FiCaSchedulerApp app : orderingPolicy.getSchedulableEntities()) { |
| apps.add(app.getApplicationAttemptId()); |
| } |
| } finally { |
| readLock.unlock(); |
| } |
| |
| } |
| |
| @Override |
| public void attachContainer(Resource clusterResource, |
| FiCaSchedulerApp application, RMContainer rmContainer) { |
| if (application != null && rmContainer != null |
| && rmContainer.getExecutionType() == ExecutionType.GUARANTEED) { |
| FiCaSchedulerNode node = |
| scheduler.getNode(rmContainer.getContainer().getNodeId()); |
| allocateResource(clusterResource, application, rmContainer.getContainer() |
| .getResource(), node.getPartition(), rmContainer); |
| LOG.info("movedContainer" + " container=" + rmContainer.getContainer() |
| + " containerState="+ rmContainer.getState() |
| + " resource=" + rmContainer.getContainer().getResource() |
| + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity() |
| + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" |
| + queueUsage.getUsed() + " cluster=" + clusterResource); |
| // Inform the parent queue |
| getParent().attachContainer(clusterResource, application, rmContainer); |
| } |
| } |
| |
| @Override |
| public void detachContainer(Resource clusterResource, |
| FiCaSchedulerApp application, RMContainer rmContainer) { |
| if (application != null && rmContainer != null |
| && rmContainer.getExecutionType() == ExecutionType.GUARANTEED) { |
| FiCaSchedulerNode node = |
| scheduler.getNode(rmContainer.getContainer().getNodeId()); |
| releaseResource(clusterResource, application, rmContainer.getContainer() |
| .getResource(), node.getPartition(), rmContainer); |
| LOG.info("movedContainer" + " container=" + rmContainer.getContainer() |
| + " containerState="+ rmContainer.getState() |
| + " resource=" + rmContainer.getContainer().getResource() |
| + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity() |
| + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" |
| + queueUsage.getUsed() + " cluster=" + clusterResource); |
| // Inform the parent queue |
| getParent().detachContainer(clusterResource, application, rmContainer); |
| } |
| } |
| |
| /** |
| * @return all ignored partition exclusivity RMContainers in the LeafQueue, |
| * this will be used by preemption policy. |
| */ |
| public Map<String, TreeSet<RMContainer>> |
| getIgnoreExclusivityRMContainers() { |
| Map<String, TreeSet<RMContainer>> clonedMap = new HashMap<>(); |
| |
| readLock.lock(); |
| try { |
| for (Map.Entry<String, TreeSet<RMContainer>> entry : ignorePartitionExclusivityRMContainers |
| .entrySet()) { |
| clonedMap.put(entry.getKey(), new TreeSet<>(entry.getValue())); |
| } |
| |
| return clonedMap; |
| |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public void setCapacity(float capacity) { |
| queueCapacities.setCapacity(capacity); |
| } |
| |
| public void setCapacity(String nodeLabel, float capacity) { |
| queueCapacities.setCapacity(nodeLabel, capacity); |
| } |
| |
| public void setAbsoluteCapacity(float absoluteCapacity) { |
| queueCapacities.setAbsoluteCapacity(absoluteCapacity); |
| } |
| |
| public void setAbsoluteCapacity(String nodeLabel, float absoluteCapacity) { |
| queueCapacities.setAbsoluteCapacity(nodeLabel, absoluteCapacity); |
| } |
| |
| public void setMaxApplicationsPerUser(int maxApplicationsPerUser) { |
| this.maxApplicationsPerUser = maxApplicationsPerUser; |
| } |
| |
| public void setMaxApplications(int maxApplications) { |
| this.maxApplications = maxApplications; |
| } |
| |
| public void setMaxAMResourcePerQueuePercent( |
| float maxAMResourcePerQueuePercent) { |
| this.maxAMResourcePerQueuePercent = maxAMResourcePerQueuePercent; |
| } |
| |
| public OrderingPolicy<FiCaSchedulerApp> |
| getOrderingPolicy() { |
| return orderingPolicy; |
| } |
| |
| void setOrderingPolicy( |
| OrderingPolicy<FiCaSchedulerApp> orderingPolicy) { |
| writeLock.lock(); |
| try { |
| if (null != this.orderingPolicy) { |
| orderingPolicy.addAllSchedulableEntities( |
| this.orderingPolicy.getSchedulableEntities()); |
| } |
| this.orderingPolicy = orderingPolicy; |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Priority getDefaultApplicationPriority() { |
| return defaultAppPriorityPerQueue; |
| } |
| |
| public void updateApplicationPriority(SchedulerApplication<FiCaSchedulerApp> app, |
| Priority newAppPriority) { |
| writeLock.lock(); |
| try { |
| FiCaSchedulerApp attempt = app.getCurrentAppAttempt(); |
| boolean isActive = orderingPolicy.removeSchedulableEntity(attempt); |
| if (!isActive) { |
| pendingOrderingPolicy.removeSchedulableEntity(attempt); |
| } |
| // Update new priority in SchedulerApplication |
| attempt.setPriority(newAppPriority); |
| |
| if (isActive) { |
| orderingPolicy.addSchedulableEntity(attempt); |
| } else { |
| pendingOrderingPolicy.addSchedulableEntity(attempt); |
| } |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| public OrderingPolicy<FiCaSchedulerApp> |
| getPendingAppsOrderingPolicy() { |
| return pendingOrderingPolicy; |
| } |
| |
| /* |
| * Holds shared values used by all applications in |
| * the queue to calculate headroom on demand |
| */ |
| static class QueueResourceLimitsInfo { |
| private Resource queueCurrentLimit; |
| private Resource clusterResource; |
| |
| public void setQueueCurrentLimit(Resource currentLimit) { |
| this.queueCurrentLimit = currentLimit; |
| } |
| |
| public Resource getQueueCurrentLimit() { |
| return queueCurrentLimit; |
| } |
| |
| public void setClusterResource(Resource clusterResource) { |
| this.clusterResource = clusterResource; |
| } |
| |
| public Resource getClusterResource() { |
| return clusterResource; |
| } |
| } |
| |
| @Override |
| public void stopQueue() { |
| writeLock.lock(); |
| try { |
| if (getNumApplications() > 0) { |
| updateQueueState(QueueState.DRAINING); |
| } else { |
| updateQueueState(QueueState.STOPPED); |
| } |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| /** |
| * Get all valid users in this queue. |
| * @return user list |
| */ |
| public Set<String> getAllUsers() { |
| return this.getUsersManager().getUsers().keySet(); |
| } |
| |
| static class CachedUserLimit { |
| final Resource userLimit; |
| volatile boolean canAssign = true; |
| volatile Resource reservation = Resources.none(); |
| |
| CachedUserLimit(Resource userLimit) { |
| this.userLimit = userLimit; |
| } |
| } |
| |
| private void updateQueuePreemptionMetrics(RMContainer rmc) { |
| final long usedMillis = rmc.getFinishTime() - rmc.getCreationTime(); |
| final long usedSeconds = usedMillis / DateUtils.MILLIS_PER_SECOND; |
| Resource containerResource = rmc.getAllocatedResource(); |
| metrics.preemptContainer(); |
| long mbSeconds = (containerResource.getMemorySize() * usedMillis) |
| / DateUtils.MILLIS_PER_SECOND; |
| long vcSeconds = (containerResource.getVirtualCores() * usedMillis) |
| / DateUtils.MILLIS_PER_SECOND; |
| metrics.updatePreemptedMemoryMBSeconds(mbSeconds); |
| metrics.updatePreemptedVcoreSeconds(vcSeconds); |
| metrics.updatePreemptedResources(containerResource); |
| metrics.updatePreemptedSecondsForCustomResources(containerResource, |
| usedSeconds); |
| metrics.updatePreemptedForCustomResources(containerResource); |
| } |
| |
| @Override |
| int getNumRunnableApps() { |
| readLock.lock(); |
| try { |
| return runnableApps.size(); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| int getNumNonRunnableApps() { |
| readLock.lock(); |
| try { |
| return nonRunnableApps.size(); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| boolean removeNonRunnableApp(FiCaSchedulerApp app) { |
| writeLock.lock(); |
| try { |
| return nonRunnableApps.remove(app); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| List<FiCaSchedulerApp> getCopyOfNonRunnableAppSchedulables() { |
| List<FiCaSchedulerApp> appsToReturn = new ArrayList<>(); |
| readLock.lock(); |
| try { |
| appsToReturn.addAll(nonRunnableApps); |
| } finally { |
| readLock.unlock(); |
| } |
| return appsToReturn; |
| } |
| } |