blob: a245e3b9d06a6231d81524aa9044df9976a497a2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
@Private
@Evolving
public class ParentQueue extends AbstractCSQueue {
private static final Log LOG = LogFactory.getLog(ParentQueue.class);
protected final Set<CSQueue> childQueues;
private final boolean rootQueue;
final Comparator<CSQueue> nonPartitionedQueueComparator;
final PartitionedQueueComparator partitionQueueComparator;
volatile int numApplications;
private final CapacitySchedulerContext scheduler;
private boolean needToResortQueuesAtNextAllocation = false;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
public ParentQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old);
this.scheduler = cs;
this.nonPartitionedQueueComparator = cs.getNonPartitionedQueueComparator();
this.partitionQueueComparator = cs.getPartitionedQueueComparator();
this.rootQueue = (parent == null);
float rawCapacity = cs.getConfiguration().getNonLabeledQueueCapacity(getQueuePath());
if (rootQueue &&
(rawCapacity != CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE)) {
throw new IllegalArgumentException("Illegal " +
"capacity of " + rawCapacity + " for queue " + queueName +
". Must be " + CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE);
}
this.childQueues = new TreeSet<CSQueue>(nonPartitionedQueueComparator);
setupQueueConfigs(cs.getClusterResource());
LOG.info("Initialized parent-queue " + queueName +
" name=" + queueName +
", fullname=" + getQueuePath());
}
synchronized void setupQueueConfigs(Resource clusterResource)
throws IOException {
super.setupQueueConfigs(clusterResource);
StringBuilder aclsString = new StringBuilder();
for (Map.Entry<AccessType, AccessControlList> e : acls.entrySet()) {
aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
}
StringBuilder labelStrBuilder = new StringBuilder();
if (accessibleLabels != null) {
for (String s : accessibleLabels) {
labelStrBuilder.append(s);
labelStrBuilder.append(",");
}
}
LOG.info(queueName +
", capacity=" + this.queueCapacities.getCapacity() +
", asboluteCapacity=" + this.queueCapacities.getAbsoluteCapacity() +
", maxCapacity=" + this.queueCapacities.getMaximumCapacity() +
", asboluteMaxCapacity=" + this.queueCapacities.getAbsoluteMaximumCapacity() +
", state=" + state +
", acls=" + aclsString +
", labels=" + labelStrBuilder.toString() + "\n" +
", reservationsContinueLooking=" + reservationsContinueLooking);
}
private static float PRECISION = 0.0005f; // 0.05% precision
synchronized void setChildQueues(Collection<CSQueue> childQueues) {
// Validate
float childCapacities = 0;
for (CSQueue queue : childQueues) {
childCapacities += queue.getCapacity();
}
float delta = Math.abs(1.0f - childCapacities); // crude way to check
// allow capacities being set to 0, and enforce child 0 if parent is 0
if (((queueCapacities.getCapacity() > 0) && (delta > PRECISION)) ||
((queueCapacities.getCapacity() == 0) && (childCapacities > 0))) {
throw new IllegalArgumentException("Illegal" +
" capacity of " + childCapacities +
" for children of queue " + queueName);
}
// check label capacities
for (String nodeLabel : queueCapacities.getExistingNodeLabels()) {
float capacityByLabel = queueCapacities.getCapacity(nodeLabel);
// check children's labels
float sum = 0;
for (CSQueue queue : childQueues) {
sum += queue.getQueueCapacities().getCapacity(nodeLabel);
}
if ((capacityByLabel > 0 && Math.abs(1.0f - sum) > PRECISION)
|| (capacityByLabel == 0) && (sum > 0)) {
throw new IllegalArgumentException("Illegal" + " capacity of "
+ sum + " for children of queue " + queueName
+ " for label=" + nodeLabel);
}
}
this.childQueues.clear();
this.childQueues.addAll(childQueues);
if (LOG.isDebugEnabled()) {
LOG.debug("setChildQueues: " + getChildQueuesToPrint());
}
}
@Override
public String getQueuePath() {
String parentPath = ((parent == null) ? "" : (parent.getQueuePath() + "."));
return parentPath + getQueueName();
}
@Override
public synchronized QueueInfo getQueueInfo(
boolean includeChildQueues, boolean recursive) {
QueueInfo queueInfo = getQueueInfo();
List<QueueInfo> childQueuesInfo = new ArrayList<QueueInfo>();
if (includeChildQueues) {
for (CSQueue child : childQueues) {
// Get queue information recursively?
childQueuesInfo.add(
child.getQueueInfo(recursive, recursive));
}
}
queueInfo.setChildQueues(childQueuesInfo);
return queueInfo;
}
private synchronized QueueUserACLInfo getUserAclInfo(
UserGroupInformation user) {
QueueUserACLInfo userAclInfo =
recordFactory.newRecordInstance(QueueUserACLInfo.class);
List<QueueACL> operations = new ArrayList<QueueACL>();
for (QueueACL operation : QueueACL.values()) {
if (hasAccess(operation, user)) {
operations.add(operation);
}
}
userAclInfo.setQueueName(getQueueName());
userAclInfo.setUserAcls(operations);
return userAclInfo;
}
@Override
public synchronized List<QueueUserACLInfo> getQueueUserAclInfo(
UserGroupInformation user) {
List<QueueUserACLInfo> userAcls = new ArrayList<QueueUserACLInfo>();
// Add parent queue acls
userAcls.add(getUserAclInfo(user));
// Add children queue acls
for (CSQueue child : childQueues) {
userAcls.addAll(child.getQueueUserAclInfo(user));
}
return userAcls;
}
public String toString() {
return queueName + ": " +
"numChildQueue= " + childQueues.size() + ", " +
"capacity=" + queueCapacities.getCapacity() + ", " +
"absoluteCapacity=" + queueCapacities.getAbsoluteCapacity() + ", " +
"usedResources=" + queueUsage.getUsed() +
"usedCapacity=" + getUsedCapacity() + ", " +
"numApps=" + getNumApplications() + ", " +
"numContainers=" + getNumContainers();
}
@Override
public synchronized void reinitialize(CSQueue newlyParsedQueue,
Resource clusterResource) throws IOException {
// Sanity check
if (!(newlyParsedQueue instanceof ParentQueue) ||
!newlyParsedQueue.getQueuePath().equals(getQueuePath())) {
throw new IOException("Trying to reinitialize " + getQueuePath() +
" from " + newlyParsedQueue.getQueuePath());
}
ParentQueue newlyParsedParentQueue = (ParentQueue)newlyParsedQueue;
// Set new configs
setupQueueConfigs(clusterResource);
// Re-configure existing child queues and add new ones
// The CS has already checked to ensure all existing child queues are present!
Map<String, CSQueue> currentChildQueues = getQueues(childQueues);
Map<String, CSQueue> newChildQueues =
getQueues(newlyParsedParentQueue.childQueues);
for (Map.Entry<String, CSQueue> e : newChildQueues.entrySet()) {
String newChildQueueName = e.getKey();
CSQueue newChildQueue = e.getValue();
CSQueue childQueue = currentChildQueues.get(newChildQueueName);
// Check if the child-queue already exists
if (childQueue != null) {
// Re-init existing child queues
childQueue.reinitialize(newChildQueue, clusterResource);
LOG.info(getQueueName() + ": re-configured queue: " + childQueue);
} else {
// New child queue, do not re-init
// Set parent to 'this'
newChildQueue.setParent(this);
// Save in list of current child queues
currentChildQueues.put(newChildQueueName, newChildQueue);
LOG.info(getQueueName() + ": added new child queue: " + newChildQueue);
}
}
// Re-sort all queues
childQueues.clear();
childQueues.addAll(currentChildQueues.values());
}
Map<String, CSQueue> getQueues(Set<CSQueue> queues) {
Map<String, CSQueue> queuesMap = new HashMap<String, CSQueue>();
for (CSQueue queue : queues) {
queuesMap.put(queue.getQueueName(), queue);
}
return queuesMap;
}
@Override
public void submitApplication(ApplicationId applicationId, String user,
String queue) throws AccessControlException {
synchronized (this) {
// Sanity check
if (queue.equals(queueName)) {
throw new AccessControlException("Cannot submit application " +
"to non-leaf queue: " + queueName);
}
if (state != QueueState.RUNNING) {
throw new AccessControlException("Queue " + getQueuePath() +
" is STOPPED. Cannot accept submission of application: " +
applicationId);
}
addApplication(applicationId, user);
}
// Inform the parent queue
if (parent != null) {
try {
parent.submitApplication(applicationId, user, queue);
} catch (AccessControlException ace) {
LOG.info("Failed to submit application to parent-queue: " +
parent.getQueuePath(), ace);
removeApplication(applicationId, user);
throw ace;
}
}
}
@Override
public void submitApplicationAttempt(FiCaSchedulerApp application,
String userName) {
// submit attempt logic.
}
@Override
public void finishApplicationAttempt(FiCaSchedulerApp application,
String queue) {
// finish attempt logic.
}
private synchronized void addApplication(ApplicationId applicationId,
String user) {
++numApplications;
LOG.info("Application added -" +
" appId: " + applicationId +
" user: " + user +
" leaf-queue of parent: " + getQueueName() +
" #applications: " + getNumApplications());
}
@Override
public void finishApplication(ApplicationId application, String user) {
synchronized (this) {
removeApplication(application, user);
}
// Inform the parent queue
if (parent != null) {
parent.finishApplication(application, user);
}
}
private synchronized void removeApplication(ApplicationId applicationId,
String user) {
--numApplications;
LOG.info("Application removed -" +
" appId: " + applicationId +
" user: " + user +
" leaf-queue of parent: " + getQueueName() +
" #applications: " + getNumApplications());
}
private String getParentName() {
return getParent() != null ? getParent().getQueueName() : "";
}
@Override
public synchronized CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, ResourceLimits resourceLimits,
SchedulingMode schedulingMode) {
// if our queue cannot access this node, just return
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
&& !accessibleToPartition(node.getPartition())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip this queue=" + getQueuePath()
+ ", because it is not able to access partition=" + node
.getPartition());
}
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParentName(), getQueueName(), ActivityState.REJECTED,
ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node
.getPartition());
if (rootQueue) {
ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
node);
}
return CSAssignment.NULL_ASSIGNMENT;
}
// Check if this queue need more resource, simply skip allocation if this
// queue doesn't need more resources.
if (!super.hasPendingResourceRequest(node.getPartition(),
clusterResource, schedulingMode)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip this queue=" + getQueuePath()
+ ", because it doesn't need more resource, schedulingMode="
+ schedulingMode.name() + " node-partition=" + node.getPartition());
}
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParentName(), getQueueName(), ActivityState.SKIPPED,
ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE);
if (rootQueue) {
ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
node);
}
return CSAssignment.NULL_ASSIGNMENT;
}
CSAssignment assignment =
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
while (canAssign(clusterResource, node)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to assign containers to child-queue of "
+ getQueueName());
}
// Are we over maximum-capacity for this queue?
// This will also consider parent's limits and also continuous reservation
// looking
if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
resourceLimits, Resources.createResource(
getMetrics().getReservedMB(), getMetrics()
.getReservedVirtualCores()), schedulingMode)) {
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParentName(), getQueueName(), ActivityState.SKIPPED,
ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT);
if (rootQueue) {
ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
node);
}
break;
}
// Schedule
CSAssignment assignedToChild =
assignContainersToChildQueues(clusterResource, node, resourceLimits,
schedulingMode);
assignment.setType(assignedToChild.getType());
// Done if no child-queue assigned anything
if (Resources.greaterThan(
resourceCalculator, clusterResource,
assignedToChild.getResource(), Resources.none())) {
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParentName(), getQueueName(), ActivityState.ACCEPTED,
ActivityDiagnosticConstant.EMPTY);
if (node.getReservedContainer() == null) {
if (rootQueue) {
ActivitiesLogger.NODE.finishAllocatedNodeAllocation(
activitiesManager, node,
assignedToChild.getAssignmentInformation()
.getFirstAllocatedOrReservedContainerId(),
AllocationState.ALLOCATED);
}
} else {
if (rootQueue) {
ActivitiesLogger.NODE.finishAllocatedNodeAllocation(
activitiesManager, node,
assignedToChild.getAssignmentInformation()
.getFirstAllocatedOrReservedContainerId(),
AllocationState.RESERVED);
}
}
// Track resource utilization for the parent-queue
allocateResource(clusterResource, assignedToChild.getResource(),
node.getPartition(), assignedToChild.isIncreasedAllocation());
// Track resource utilization in this pass of the scheduler
Resources
.addTo(assignment.getResource(), assignedToChild.getResource());
Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
assignedToChild.getAssignmentInformation().getAllocated());
Resources.addTo(assignment.getAssignmentInformation().getReserved(),
assignedToChild.getAssignmentInformation().getReserved());
assignment.getAssignmentInformation().incrAllocations(
assignedToChild.getAssignmentInformation().getNumAllocations());
assignment.getAssignmentInformation().incrReservations(
assignedToChild.getAssignmentInformation().getNumReservations());
assignment
.getAssignmentInformation()
.getAllocationDetails()
.addAll(
assignedToChild.getAssignmentInformation().getAllocationDetails());
assignment
.getAssignmentInformation()
.getReservationDetails()
.addAll(
assignedToChild.getAssignmentInformation()
.getReservationDetails());
assignment.setIncreasedAllocation(assignedToChild
.isIncreasedAllocation());
LOG.info("assignedContainer" +
" queue=" + getQueueName() +
" usedCapacity=" + getUsedCapacity() +
" absoluteUsedCapacity=" + getAbsoluteUsedCapacity() +
" used=" + queueUsage.getUsed() +
" cluster=" + clusterResource);
} else {
assignment.setSkippedType(assignedToChild.getSkippedType());
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParentName(), getQueueName(), ActivityState.SKIPPED,
ActivityDiagnosticConstant.EMPTY);
if (rootQueue) {
ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
node);
}
break;
}
if (LOG.isDebugEnabled()) {
LOG.debug("ParentQ=" + getQueueName()
+ " assignedSoFarInThisIteration=" + assignment.getResource()
+ " usedCapacity=" + getUsedCapacity()
+ " absoluteUsedCapacity=" + getAbsoluteUsedCapacity());
}
// Do not assign more than one container if this isn't the root queue
// or if we've already assigned an off-switch container
if (!rootQueue || assignment.getType() == NodeType.OFF_SWITCH) {
if (LOG.isDebugEnabled()) {
if (rootQueue && assignment.getType() == NodeType.OFF_SWITCH) {
LOG.debug("Not assigning more than one off-switch container," +
" assignments so far: " + assignment);
}
}
break;
}
}
return assignment;
}
private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) {
// Two conditions need to meet when trying to allocate:
// 1) Node doesn't have reserved container
// 2) Node's available-resource + killable-resource should > 0
return node.getReservedContainer() == null && Resources.greaterThanOrEqual(
resourceCalculator, clusterResource, Resources
.add(node.getUnallocatedResource(), node.getTotalKillableResources()),
minimumAllocation);
}
private ResourceLimits getResourceLimitsOfChild(CSQueue child,
Resource clusterResource, Resource parentLimits,
String nodePartition) {
// Set resource-limit of a given child, child.limit =
// min(my.limit - my.used + child.used, child.max)
// Parent available resource = parent-limit - parent-used-resource
Resource parentMaxAvailableResource = Resources.subtract(
parentLimits, queueUsage.getUsed(nodePartition));
// Deduct killable from used
Resources.addTo(parentMaxAvailableResource,
getTotalKillableResource(nodePartition));
// Child's limit = parent-available-resource + child-used
Resource childLimit = Resources.add(parentMaxAvailableResource,
child.getQueueResourceUsage().getUsed(nodePartition));
// Get child's max resource
Resource childConfiguredMaxResource = Resources.multiplyAndNormalizeDown(
resourceCalculator,
labelManager.getResourceByLabel(nodePartition, clusterResource),
child.getQueueCapacities().getAbsoluteMaximumCapacity(nodePartition),
minimumAllocation);
// Child's limit should be capped by child configured max resource
childLimit =
Resources.min(resourceCalculator, clusterResource, childLimit,
childConfiguredMaxResource);
// Normalize before return
childLimit =
Resources.roundDown(resourceCalculator, childLimit, minimumAllocation);
return new ResourceLimits(childLimit);
}
private Iterator<CSQueue> sortAndGetChildrenAllocationIterator(FiCaSchedulerNode node) {
if (node.getPartition().equals(RMNodeLabelsManager.NO_LABEL)) {
if (needToResortQueuesAtNextAllocation) {
// If we skipped resort queues last time, we need to re-sort queue
// before allocation
List<CSQueue> childrenList = new ArrayList<>(childQueues);
childQueues.clear();
childQueues.addAll(childrenList);
needToResortQueuesAtNextAllocation = false;
}
return childQueues.iterator();
}
partitionQueueComparator.setPartitionToLookAt(node.getPartition());
List<CSQueue> childrenList = new ArrayList<>(childQueues);
Collections.sort(childrenList, partitionQueueComparator);
return childrenList.iterator();
}
private synchronized CSAssignment assignContainersToChildQueues(
Resource cluster, FiCaSchedulerNode node, ResourceLimits limits,
SchedulingMode schedulingMode) {
CSAssignment assignment = CSAssignment.NULL_ASSIGNMENT;
Resource parentLimits = limits.getLimit();
printChildQueues();
// Try to assign to most 'under-served' sub-queue
for (Iterator<CSQueue> iter = sortAndGetChildrenAllocationIterator(node); iter
.hasNext();) {
CSQueue childQueue = iter.next();
if(LOG.isDebugEnabled()) {
LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath()
+ " stats: " + childQueue);
}
// Get ResourceLimits of child queue before assign containers
ResourceLimits childLimits =
getResourceLimitsOfChild(childQueue, cluster, parentLimits,
node.getPartition());
CSAssignment childAssignment = childQueue.assignContainers(cluster, node,
childLimits, schedulingMode);
if(LOG.isDebugEnabled()) {
LOG.debug("Assigned to queue: " + childQueue.getQueuePath() +
" stats: " + childQueue + " --> " +
childAssignment.getResource() + ", " + childAssignment.getType());
}
// If we do assign, remove the queue and re-insert in-order to re-sort
if (Resources.greaterThan(
resourceCalculator, cluster,
childAssignment.getResource(), Resources.none())) {
// Only update childQueues when we doing non-partitioned node
// allocation.
if (RMNodeLabelsManager.NO_LABEL.equals(node.getPartition())) {
// Remove and re-insert to sort
iter.remove();
LOG.info("Re-sorting assigned queue: " + childQueue.getQueuePath()
+ " stats: " + childQueue);
childQueues.add(childQueue);
if (LOG.isDebugEnabled()) {
printChildQueues();
}
}
assignment = childAssignment;
break;
} else if (childAssignment.getSkippedType() ==
CSAssignment.SkippedType.QUEUE_LIMIT) {
if (assignment.getSkippedType() !=
CSAssignment.SkippedType.QUEUE_LIMIT) {
assignment = childAssignment;
}
Resource resourceToSubtract = Resources.max(resourceCalculator,
cluster, childLimits.getHeadroom(), Resources.none());
if(LOG.isDebugEnabled()) {
LOG.debug("Decrease parentLimits " + parentLimits +
" for " + this.getQueueName() + " by " +
resourceToSubtract + " as childQueue=" +
childQueue.getQueueName() + " is blocked");
}
parentLimits = Resources.subtract(parentLimits,
resourceToSubtract);
}
}
return assignment;
}
String getChildQueuesToPrint() {
StringBuilder sb = new StringBuilder();
for (CSQueue q : childQueues) {
sb.append(q.getQueuePath() +
"usedCapacity=(" + q.getUsedCapacity() + "), " +
" label=("
+ StringUtils.join(q.getAccessibleNodeLabels().iterator(), ",")
+ ")");
}
return sb.toString();
}
private void printChildQueues() {
if (LOG.isDebugEnabled()) {
LOG.debug("printChildQueues - queue: " + getQueuePath()
+ " child-queues: " + getChildQueuesToPrint());
}
}
private synchronized void internalReleaseResource(Resource clusterResource,
FiCaSchedulerNode node, Resource releasedResource, boolean changeResource,
CSQueue completedChildQueue, boolean sortQueues) {
super.releaseResource(clusterResource,
releasedResource, node.getPartition(),
changeResource);
if (LOG.isDebugEnabled()) {
LOG.debug("completedContainer " + this + ", cluster=" + clusterResource);
}
// Note that this is using an iterator on the childQueues so this can't
// be called if already within an iterator for the childQueues. Like
// from assignContainersToChildQueues.
if (sortQueues) {
// reinsert the updated queue
for (Iterator<CSQueue> iter = childQueues.iterator(); iter.hasNext();) {
CSQueue csqueue = iter.next();
if (csqueue.equals(completedChildQueue)) {
iter.remove();
if (LOG.isDebugEnabled()) {
LOG.debug("Re-sorting completed queue: " + csqueue);
}
childQueues.add(csqueue);
break;
}
}
}
// If we skipped sort queue this time, we need to resort queues to make
// sure we allocate from least usage (or order defined by queue policy)
// queues.
needToResortQueuesAtNextAllocation = !sortQueues;
}
@Override
public void decreaseContainer(Resource clusterResource,
SchedContainerChangeRequest decreaseRequest, FiCaSchedulerApp app)
throws InvalidResourceRequestException {
// delta capacity is negative when it's a decrease request
Resource absDeltaCapacity =
Resources.negate(decreaseRequest.getDeltaCapacity());
internalReleaseResource(clusterResource,
csContext.getNode(decreaseRequest.getNodeId()), absDeltaCapacity, false,
null, false);
// Inform the parent
if (parent != null) {
parent.decreaseContainer(clusterResource, decreaseRequest, app);
}
}
@Override
public void unreserveIncreasedContainer(Resource clusterResource,
FiCaSchedulerApp app, FiCaSchedulerNode node, RMContainer rmContainer) {
if (app != null) {
internalReleaseResource(clusterResource, node,
rmContainer.getReservedResource(), false, null, false);
// Inform the parent
if (parent != null) {
parent.unreserveIncreasedContainer(clusterResource, app, node,
rmContainer);
}
}
}
@Override
public void completedContainer(Resource clusterResource,
FiCaSchedulerApp application, FiCaSchedulerNode node,
RMContainer rmContainer, ContainerStatus containerStatus,
RMContainerEventType event, CSQueue completedChildQueue,
boolean sortQueues) {
if (application != null) {
internalReleaseResource(clusterResource, node,
rmContainer.getContainer().getResource(), false, completedChildQueue,
sortQueues);
// Inform the parent
if (parent != null) {
// complete my parent
parent.completedContainer(clusterResource, application,
node, rmContainer, null, event, this, sortQueues);
}
}
}
@Override
public synchronized void updateClusterResource(Resource clusterResource,
ResourceLimits resourceLimits) {
// Update all children
for (CSQueue childQueue : childQueues) {
// Get ResourceLimits of child queue before assign containers
ResourceLimits childLimits = getResourceLimitsOfChild(childQueue,
clusterResource, resourceLimits.getLimit(),
RMNodeLabelsManager.NO_LABEL);
childQueue.updateClusterResource(clusterResource, childLimits);
}
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
minimumAllocation, this, labelManager, null);
}
@Override
public synchronized List<CSQueue> getChildQueues() {
return new ArrayList<CSQueue>(childQueues);
}
@Override
public void recoverContainer(Resource clusterResource,
SchedulerApplicationAttempt attempt, RMContainer rmContainer) {
if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
return;
}
// Careful! Locking order is important!
synchronized (this) {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
allocateResource(clusterResource,
rmContainer.getContainer().getResource(), node.getPartition(), false);
}
if (parent != null) {
parent.recoverContainer(clusterResource, attempt, rmContainer);
}
}
@Override
public ActiveUsersManager getActiveUsersManager() {
// Should never be called since all applications are submitted to LeafQueues
return null;
}
@Override
public synchronized void collectSchedulerApplications(
Collection<ApplicationAttemptId> apps) {
for (CSQueue queue : childQueues) {
queue.collectSchedulerApplications(apps);
}
}
@Override
public void attachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer rmContainer) {
if (application != null) {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
allocateResource(clusterResource, rmContainer.getContainer()
.getResource(), node.getPartition(), false);
LOG.info("movedContainer" + " queueMoveIn=" + getQueueName()
+ " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
+ getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster="
+ clusterResource);
// Inform the parent
if (parent != null) {
parent.attachContainer(clusterResource, application, rmContainer);
}
}
}
@Override
public void detachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer rmContainer) {
if (application != null) {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
super.releaseResource(clusterResource,
rmContainer.getContainer().getResource(),
node.getPartition(), false);
LOG.info("movedContainer" + " queueMoveOut=" + getQueueName()
+ " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
+ getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster="
+ clusterResource);
// Inform the parent
if (parent != null) {
parent.detachContainer(clusterResource, application, rmContainer);
}
}
}
public synchronized int getNumApplications() {
return numApplications;
}
synchronized void allocateResource(Resource clusterResource,
Resource resource, String nodePartition, boolean changeContainerResource) {
super.allocateResource(clusterResource, resource, nodePartition,
changeContainerResource);
/**
* check if we need to kill (killable) containers if maximum resource violated.
* Doing this because we will deduct killable resource when going from root.
* For example:
* <pre>
* Root
* / \
* a b
* / \
* a1 a2
* </pre>
*
* a: max=10G, used=10G, killable=2G
* a1: used=8G, killable=2G
* a2: used=2G, pending=2G, killable=0G
*
* When we get queue-a to allocate resource, even if queue-a
* reaches its max resource, we deduct its used by killable, so we can allocate
* at most 2G resources. ResourceLimits passed down to a2 has headroom set to 2G.
*
* If scheduler finds a 2G available resource in existing cluster, and assigns it
* to a2, now a2's used= 2G + 2G = 4G, and a's used = 8G + 4G = 12G > 10G
*
* When this happens, we have to preempt killable container (on same or different
* nodes) of parent queue to avoid violating parent's max resource.
*/
if (getQueueCapacities().getAbsoluteMaximumCapacity(nodePartition)
< getQueueCapacities().getAbsoluteUsedCapacity(nodePartition)) {
killContainersToEnforceMaxQueueCapacity(nodePartition, clusterResource);
}
}
private void killContainersToEnforceMaxQueueCapacity(String partition,
Resource clusterResource) {
Iterator<RMContainer> killableContainerIter = getKillableContainers(
partition);
if (!killableContainerIter.hasNext()) {
return;
}
Resource partitionResource = labelManager.getResourceByLabel(partition,
null);
Resource maxResource = Resources.multiply(partitionResource,
getQueueCapacities().getAbsoluteMaximumCapacity(partition));
while (Resources.greaterThan(resourceCalculator, partitionResource,
queueUsage.getUsed(partition), maxResource)) {
RMContainer toKillContainer = killableContainerIter.next();
FiCaSchedulerApp attempt = csContext.getApplicationAttempt(
toKillContainer.getContainerId().getApplicationAttemptId());
FiCaSchedulerNode node = csContext.getNode(
toKillContainer.getAllocatedNode());
if (null != attempt && null != node) {
LeafQueue lq = attempt.getCSLeafQueue();
lq.completedContainer(clusterResource, attempt, node, toKillContainer,
SchedulerUtils.createPreemptedContainerStatus(
toKillContainer.getContainerId(),
SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL,
null, false);
LOG.info("Killed container=" + toKillContainer.getContainerId()
+ " from queue=" + lq.getQueueName() + " to make queue=" + this
.getQueueName() + "'s max-capacity enforced");
}
if (!killableContainerIter.hasNext()) {
break;
}
}
}
}