blob: a53fe4a391b72aaa58edace639b5a8bf8eb1fd4f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
.queuemanagement;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
.QueueManagementDynamicEditPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
.AbstractAutoCreatedLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
.AutoCreatedLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
.AutoCreatedLeafQueueConfig;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
.AutoCreatedQueueManagementPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
.CapacitySchedulerContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
.ManagedParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
.ParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
.QueueManagementChange;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica
.FiCaSchedulerApp;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.capacity.CSQueueUtils.EPSILON;
/**
* Capacity Management policy for auto created leaf queues
* <p>
* Assigns capacity if available to leaf queues based on application
* submission order i.e leaf queues are assigned capacity in FCFS order based
* on application submission time. Updates leaf queue capacities to 0 when
* there are no pending or running apps under that queue.
*/
public class GuaranteedOrZeroCapacityOverTimePolicy
implements AutoCreatedQueueManagementPolicy {
private CapacitySchedulerContext scheduler;
private ManagedParentQueue managedParentQueue;
private static final Logger LOG =
LoggerFactory.getLogger(GuaranteedOrZeroCapacityOverTimePolicy.class);
private ReentrantReadWriteLock.WriteLock writeLock;
private ReentrantReadWriteLock.ReadLock readLock;
private ParentQueueState parentQueueState = new ParentQueueState();
private AutoCreatedLeafQueueConfig leafQueueTemplate;
private QueueCapacities leafQueueTemplateCapacities;
private Set<String> leafQueueTemplateNodeLabels;
private LeafQueueState leafQueueState = new LeafQueueState();
private Clock clock = new MonotonicClock();
private class LeafQueueState {
//map of partition-> queueName->{leaf queue's state}
private Map<String, Map<String, LeafQueueStatePerPartition>>
leafQueueStateMap = new HashMap<>();
public boolean containsLeafQueue(String leafQueueName, String partition) {
if (leafQueueStateMap.containsKey(partition)) {
return leafQueueStateMap.get(partition).containsKey(leafQueueName);
}
return false;
}
private boolean containsPartition(String partition) {
if (leafQueueStateMap.containsKey(partition)) {
return true;
}
return false;
}
private boolean addLeafQueueStateIfNotExists(String leafQueuePath,
String partition, LeafQueueStatePerPartition leafQueueState) {
if (!containsPartition(partition)) {
leafQueueStateMap.put(partition, new HashMap<>());
}
if (!containsLeafQueue(leafQueuePath, partition)) {
leafQueueStateMap.get(partition).put(leafQueuePath, leafQueueState);
return true;
}
return false;
}
public boolean createLeafQueueStateIfNotExists(LeafQueue leafQueue,
String partition) {
return addLeafQueueStateIfNotExists(leafQueue.getQueuePath(), partition,
new LeafQueueStatePerPartition());
}
public LeafQueueStatePerPartition getLeafQueueStatePerPartition(
String leafQueuePath, String partition) {
if (leafQueueStateMap.get(partition) != null) {
return leafQueueStateMap.get(partition).get(leafQueuePath);
}
return null;
}
public Map<String, Map<String, LeafQueueStatePerPartition>>
getLeafQueueStateMap() {
return leafQueueStateMap;
}
private void clear() {
leafQueueStateMap.clear();
}
}
private class LeafQueueStatePerPartition {
private AtomicBoolean isActive = new AtomicBoolean(false);
private long mostRecentActivationTime;
private long mostRecentDeactivationTime;
public long getMostRecentActivationTime() {
return mostRecentActivationTime;
}
public long getMostRecentDeactivationTime() {
return mostRecentDeactivationTime;
}
/**
* Is the queue currently active or deactivated?
*
* @return true if Active else false
*/
public boolean isActive() {
return isActive.get();
}
private boolean activate() {
boolean ret = isActive.compareAndSet(false, true);
mostRecentActivationTime = clock.getTime();
return ret;
}
private boolean deactivate() {
boolean ret = isActive.compareAndSet(true, false);
mostRecentDeactivationTime = clock.getTime();
return ret;
}
}
private class ParentQueueState {
private Map<String, Float> totalAbsoluteActivatedChildQueueCapacityByLabel =
new HashMap<String, Float>();
private float getAbsoluteActivatedChildQueueCapacity(String nodeLabel) {
readLock.lock();
try {
Float totalActivatedCapacity = getAbsActivatedChildQueueCapacityByLabel(
nodeLabel);
if (totalActivatedCapacity != null) {
return totalActivatedCapacity;
} else{
return 0;
}
} finally {
readLock.unlock();
}
}
private void incAbsoluteActivatedChildCapacity(String nodeLabel,
float childQueueCapacity) {
writeLock.lock();
try {
Float activatedChildCapacity = getAbsActivatedChildQueueCapacityByLabel(
nodeLabel);
if (activatedChildCapacity != null) {
setAbsActivatedChildQueueCapacityByLabel(nodeLabel,
activatedChildCapacity + childQueueCapacity);
} else{
setAbsActivatedChildQueueCapacityByLabel(nodeLabel,
childQueueCapacity);
}
} finally {
writeLock.unlock();
}
}
private void decAbsoluteActivatedChildCapacity(String nodeLabel,
float childQueueCapacity) {
writeLock.lock();
try {
Float activatedChildCapacity = getAbsActivatedChildQueueCapacityByLabel(
nodeLabel);
if (activatedChildCapacity != null) {
setAbsActivatedChildQueueCapacityByLabel(nodeLabel,
activatedChildCapacity - childQueueCapacity);
} else{
setAbsActivatedChildQueueCapacityByLabel(nodeLabel,
childQueueCapacity);
}
} finally {
writeLock.unlock();
}
}
Float getAbsActivatedChildQueueCapacityByLabel(String label) {
return totalAbsoluteActivatedChildQueueCapacityByLabel.get(label);
}
Float setAbsActivatedChildQueueCapacityByLabel(String label, float val) {
return totalAbsoluteActivatedChildQueueCapacityByLabel.put(label, val);
}
void clear() {
totalAbsoluteActivatedChildQueueCapacityByLabel.clear();
}
}
/**
* Comparator that orders applications by their submit time
*/
private class PendingApplicationComparator
implements Comparator<FiCaSchedulerApp> {
@Override
public int compare(FiCaSchedulerApp app1, FiCaSchedulerApp app2) {
RMApp rmApp1 = scheduler.getRMContext().getRMApps().get(
app1.getApplicationId());
RMApp rmApp2 = scheduler.getRMContext().getRMApps().get(
app2.getApplicationId());
if (rmApp1 != null && rmApp2 != null) {
return Long.compare(rmApp1.getSubmitTime(), rmApp2.getSubmitTime());
} else if (rmApp1 != null) {
return -1;
} else if (rmApp2 != null) {
return 1;
} else{
return 0;
}
}
}
private PendingApplicationComparator applicationComparator =
new PendingApplicationComparator();
@Override
public void init(final CapacitySchedulerContext schedulerContext,
final ParentQueue parentQueue) throws IOException {
this.scheduler = schedulerContext;
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
if (!(parentQueue instanceof ManagedParentQueue)) {
throw new IllegalArgumentException(
"Expected instance of type " + ManagedParentQueue.class);
}
this.managedParentQueue = (ManagedParentQueue) parentQueue;
initializeLeafQueueTemplate(this.managedParentQueue);
LOG.info(
"Initialized queue management policy for parent queue " + parentQueue
.getQueuePath() + " with leaf queue template capacities : ["
+ leafQueueTemplate.getQueueCapacities() + "]");
}
private void initializeLeafQueueTemplate(ManagedParentQueue parentQueue)
throws IOException {
leafQueueTemplate = parentQueue.getLeafQueueTemplate();
leafQueueTemplateCapacities = leafQueueTemplate.getQueueCapacities();
Set<String> parentQueueLabels = parentQueue.getNodeLabelsForQueue();
for (String nodeLabel : leafQueueTemplateCapacities
.getExistingNodeLabels()) {
if (!parentQueueLabels.contains(nodeLabel)) {
LOG.error("Invalid node label " + nodeLabel
+ " on configured leaf template on parent" + " queue " + parentQueue
.getQueuePath());
throw new IOException("Invalid node label " + nodeLabel
+ " on configured leaf template on parent" + " queue " + parentQueue
.getQueuePath());
}
}
leafQueueTemplateNodeLabels =
leafQueueTemplateCapacities.getExistingNodeLabels();
}
/**
* Compute/Adjust child queue capacities
* for auto created leaf queues
* This computes queue entitlements but does not update LeafQueueState or
* queue capacities. Scheduler calls commitQueueManagemetChanges after
* validation after applying queue changes and commits to LeafQueueState
* are done in commitQueueManagementChanges.
*
* @return List of Queue Management change suggestions which could potentially
* be committed/rejected by the scheduler due to validation failures
* @throws SchedulerDynamicEditException
*/
@Override
public List<QueueManagementChange> computeQueueManagementChanges()
throws SchedulerDynamicEditException {
// Update template absolute capacities as the capacities could have changed
// in weight mode
updateTemplateAbsoluteCapacities(managedParentQueue.getQueueCapacities(),
(GuaranteedOrZeroCapacityOverTimePolicy)
managedParentQueue.getAutoCreatedQueueManagementPolicy());
//TODO : Add support for node labels on leaf queue template configurations
//synch/add missing leaf queue(s) if any to state
updateLeafQueueState();
readLock.lock();
try {
List<QueueManagementChange> queueManagementChanges = new ArrayList<>();
List<FiCaSchedulerApp> pendingApps = getSortedPendingApplications();
//Map of LeafQueue->QueueCapacities - keep adding the computed
// entitlements to this map and finally
// build the leaf queue configuration Template for all identified leaf
// queues
Map<String, QueueCapacities> leafQueueEntitlements = new HashMap<>();
for (String nodeLabel : leafQueueTemplateNodeLabels) {
// check if any leaf queues need to be deactivated based on pending
// applications
float parentAbsoluteCapacity =
managedParentQueue.getQueueCapacities().getAbsoluteCapacity(
nodeLabel);
float leafQueueTemplateAbsoluteCapacity =
leafQueueTemplateCapacities.getAbsoluteCapacity(nodeLabel);
Map<String, QueueCapacities> deactivatedLeafQueues =
deactivateLeafQueuesIfInActive(managedParentQueue, nodeLabel,
leafQueueEntitlements);
if (LOG.isDebugEnabled()) {
if ( deactivatedLeafQueues.size() > 0) {
LOG.debug("Parent queue = {}, " +
", nodeLabel = {}, deactivated leaf queues = [{}] ",
managedParentQueue.getQueuePath(), nodeLabel,
deactivatedLeafQueues.size() > 25 ? deactivatedLeafQueues
.size() : deactivatedLeafQueues);
}
}
float deactivatedCapacity = getTotalDeactivatedCapacity(
deactivatedLeafQueues, nodeLabel);
float sumOfChildQueueActivatedCapacity = parentQueueState.
getAbsoluteActivatedChildQueueCapacity(nodeLabel);
//Check if we need to activate anything at all?
float availableCapacity =
parentAbsoluteCapacity - sumOfChildQueueActivatedCapacity
+ deactivatedCapacity + EPSILON;
if (LOG.isDebugEnabled()) {
LOG.debug("Parent queue = " + managedParentQueue.getQueuePath()
+ ", nodeLabel = " + nodeLabel + ", absCapacity = "
+ parentAbsoluteCapacity + ", leafQueueAbsoluteCapacity = "
+ leafQueueTemplateAbsoluteCapacity + ", deactivatedCapacity = "
+ deactivatedCapacity + " , absChildActivatedCapacity = "
+ sumOfChildQueueActivatedCapacity + ", availableCapacity = "
+ availableCapacity);
}
if (availableCapacity >= leafQueueTemplateAbsoluteCapacity) {
//sort applications across leaf queues by submit time
if (pendingApps.size() > 0) {
int maxLeafQueuesTobeActivated = getMaxLeavesToBeActivated(
availableCapacity, leafQueueTemplateAbsoluteCapacity,
pendingApps.size());
if (LOG.isDebugEnabled()) {
LOG.debug("Parent queue = " + managedParentQueue.getQueuePath()
+ " : Found " + maxLeafQueuesTobeActivated + " leaf queues"
+ " to be activated with " + pendingApps.size() + " apps ");
}
LinkedHashSet<String> leafQueuesToBeActivated = getSortedLeafQueues(
nodeLabel, pendingApps, maxLeafQueuesTobeActivated,
deactivatedLeafQueues.keySet());
//Compute entitlement changes for the identified leaf queues
// which is appended to the List of computedEntitlements
updateLeafQueueCapacitiesByLabel(nodeLabel, leafQueuesToBeActivated,
leafQueueEntitlements);
if (LOG.isDebugEnabled()) {
if (leafQueuesToBeActivated.size() > 0) {
LOG.debug("Activated leaf queues : [{}]",
leafQueuesToBeActivated.size() < 25 ?
leafQueuesToBeActivated : leafQueuesToBeActivated.size());
}
}
}
}
}
//Populate new entitlements
for (final Iterator<Map.Entry<String, QueueCapacities>> iterator =
leafQueueEntitlements.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<String, QueueCapacities> queueCapacities = iterator.next();
String leafQueueName = queueCapacities.getKey();
AutoCreatedLeafQueue leafQueue =
(AutoCreatedLeafQueue) scheduler.getCapacitySchedulerQueueManager()
.getQueue(leafQueueName);
AutoCreatedLeafQueueConfig newTemplate = buildTemplate(
queueCapacities.getValue());
queueManagementChanges.add(
new QueueManagementChange.UpdateQueue(leafQueue, newTemplate));
}
return queueManagementChanges;
} finally {
readLock.unlock();
}
}
private void updateTemplateAbsoluteCapacities(QueueCapacities parentQueueCapacities,
GuaranteedOrZeroCapacityOverTimePolicy policy) {
writeLock.lock();
try {
CSQueueUtils.updateAbsoluteCapacitiesByNodeLabels(
policy.leafQueueTemplate.getQueueCapacities(),
parentQueueCapacities, policy.leafQueueTemplateNodeLabels);
policy.leafQueueTemplateCapacities =
policy.leafQueueTemplate.getQueueCapacities();
} finally {
writeLock.unlock();
}
}
public void updateTemplateAbsoluteCapacities(QueueCapacities queueCapacities) {
updateTemplateAbsoluteCapacities(queueCapacities, this);
}
private float getTotalDeactivatedCapacity(
Map<String, QueueCapacities> deactivatedLeafQueues, String nodeLabel) {
float deactivatedCapacity = 0;
for (Iterator<Map.Entry<String, QueueCapacities>> iterator =
deactivatedLeafQueues.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<String, QueueCapacities> deactivatedQueueCapacity =
iterator.next();
deactivatedCapacity +=
deactivatedQueueCapacity.getValue().getAbsoluteCapacity(nodeLabel);
}
return deactivatedCapacity;
}
@VisibleForTesting
void updateLeafQueueState() {
writeLock.lock();
try {
Set<String> newPartitions = new HashSet<>();
Set<String> newQueues = new HashSet<>();
for (CSQueue newQueue : managedParentQueue.getChildQueues()) {
if (newQueue instanceof LeafQueue) {
for (String nodeLabel : leafQueueTemplateNodeLabels) {
leafQueueState.createLeafQueueStateIfNotExists((LeafQueue) newQueue,
nodeLabel);
newPartitions.add(nodeLabel);
}
newQueues.add(newQueue.getQueuePath());
}
}
for (Iterator<Map.Entry<String, Map<String, LeafQueueStatePerPartition>>>
itr = leafQueueState.getLeafQueueStateMap().entrySet().iterator();
itr.hasNext(); ) {
Map.Entry<String, Map<String, LeafQueueStatePerPartition>> e =
itr.next();
String partition = e.getKey();
if (!newPartitions.contains(partition)) {
itr.remove();
LOG.info(managedParentQueue.getQueuePath() +
" : Removed partition " + partition + " from leaf queue " +
"state");
} else{
Map<String, LeafQueueStatePerPartition> queues = e.getValue();
for (
Iterator<Map.Entry<String, LeafQueueStatePerPartition>> queueItr =
queues.entrySet().iterator(); queueItr.hasNext(); ) {
String queue = queueItr.next().getKey();
if (!newQueues.contains(queue)) {
queueItr.remove();
LOG.info(managedParentQueue.getQueuePath() + " : Removed queue"
+ queue + " from "
+ "leaf queue "
+ "state from partition " + partition);
}
}
}
}
} finally {
writeLock.unlock();
}
}
private LinkedHashSet<String> getSortedLeafQueues(String nodeLabel,
final List<FiCaSchedulerApp> pendingApps, int leafQueuesNeeded,
Set<String> deactivatedQueues) throws SchedulerDynamicEditException {
LinkedHashSet<String> leafQueues = new LinkedHashSet<>(leafQueuesNeeded);
int ctr = 0;
for (FiCaSchedulerApp app : pendingApps) {
AutoCreatedLeafQueue leafQueue =
(AutoCreatedLeafQueue) app.getCSLeafQueue();
String leafQueueName = leafQueue.getQueuePath();
//Check if leafQueue is not active already and has any pending apps
if (ctr < leafQueuesNeeded) {
if (!isActive(leafQueue, nodeLabel)) {
if (!deactivatedQueues.contains(leafQueueName)) {
if (addLeafQueueIfNotExists(leafQueues, leafQueueName)) {
ctr++;
}
}
}
} else{
break;
}
}
return leafQueues;
}
private boolean addLeafQueueIfNotExists(Set<String> leafQueues,
String leafQueueName) {
boolean ret = false;
if (!leafQueues.contains(leafQueueName)) {
ret = leafQueues.add(leafQueueName);
}
return ret;
}
@VisibleForTesting
public boolean isActive(final AutoCreatedLeafQueue leafQueue,
String nodeLabel) throws SchedulerDynamicEditException {
readLock.lock();
try {
LeafQueueStatePerPartition leafQueueStatus = getLeafQueueState(leafQueue,
nodeLabel);
return leafQueueStatus.isActive();
} finally {
readLock.unlock();
}
}
private Map<String, QueueCapacities> deactivateLeafQueuesIfInActive(
ParentQueue parentQueue, String nodeLabel,
Map<String, QueueCapacities> leafQueueEntitlements)
throws SchedulerDynamicEditException {
Map<String, QueueCapacities> deactivatedQueues = new HashMap<>();
for (CSQueue childQueue : parentQueue.getChildQueues()) {
AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) childQueue;
if (leafQueue != null) {
if (isActive(leafQueue, nodeLabel) && !hasPendingApps(leafQueue)) {
if (!leafQueueEntitlements.containsKey(leafQueue.getQueuePath())) {
leafQueueEntitlements.put(leafQueue.getQueuePath(),
new QueueCapacities(false));
}
QueueCapacities capacities = leafQueueEntitlements.get(
leafQueue.getQueuePath());
updateToZeroCapacity(capacities, nodeLabel, (LeafQueue)childQueue);
deactivatedQueues.put(leafQueue.getQueuePath(),
leafQueueTemplateCapacities);
}
} else{
LOG.warn("Could not find queue in scheduler while trying" + " to "
+ "deactivate for " + parentQueue);
}
}
return deactivatedQueues;
}
private void updateLeafQueueCapacitiesByLabel(String nodeLabel,
Set<String> leafQueuesToBeActivated,
Map<String, QueueCapacities> leafQueueEntitlements) {
for (String curLeafQueue : leafQueuesToBeActivated) {
if (!leafQueueEntitlements.containsKey(curLeafQueue)) {
leafQueueEntitlements.put(curLeafQueue, new QueueCapacities(false));
// Activate queues if capacity is available
}
QueueCapacities capacities = leafQueueEntitlements.get(curLeafQueue);
updateCapacityFromTemplate(capacities, nodeLabel);
}
}
@VisibleForTesting
public int getMaxLeavesToBeActivated(float availableCapacity,
float childQueueAbsoluteCapacity, int numPendingApps)
throws SchedulerDynamicEditException {
if (childQueueAbsoluteCapacity > 0) {
int numLeafQueuesNeeded = (int) Math.floor(
availableCapacity / childQueueAbsoluteCapacity);
return Math.min(numLeafQueuesNeeded, numPendingApps);
}
return 0;
}
/**
* Commit queue management changes - which involves updating required state
* on parent/underlying leaf queues
*
* @param queueManagementChanges Queue Management changes to commit
* @throws SchedulerDynamicEditException when validation fails
*/
@Override
public void commitQueueManagementChanges(
List<QueueManagementChange> queueManagementChanges)
throws SchedulerDynamicEditException {
writeLock.lock();
try {
for (QueueManagementChange queueManagementChange :
queueManagementChanges) {
AutoCreatedLeafQueueConfig updatedQueueTemplate =
queueManagementChange.getUpdatedQueueTemplate();
CSQueue queue = queueManagementChange.getQueue();
if (!(queue instanceof AutoCreatedLeafQueue)) {
throw new SchedulerDynamicEditException(
"Expected queue management change for AutoCreatedLeafQueue. "
+ "Found " + queue.getClass().getName());
}
AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) queue;
for (String nodeLabel : updatedQueueTemplate.getQueueCapacities()
.getExistingNodeLabels()) {
if (updatedQueueTemplate.getQueueCapacities().
getCapacity(nodeLabel) > 0) {
if (isActive(leafQueue, nodeLabel)) {
LOG.debug("Queue is already active. Skipping activation : {}",
leafQueue.getQueuePath());
} else{
activate(leafQueue, nodeLabel);
}
} else{
if (!isActive(leafQueue, nodeLabel)) {
LOG.debug("Queue is already de-activated. Skipping "
+ "de-activation : {}", leafQueue.getQueuePath());
} else{
/**
* While deactivating queues of type ABSOLUTE_RESOURCE, configured
* min resource has to be set based on updated capacity (which is
* again based on updated queue entitlements). Otherwise,
* ParentQueue#calculateEffectiveResourcesAndCapacity calculations
* leads to incorrect results.
*/
leafQueue
.mergeCapacities(updatedQueueTemplate.getQueueCapacities());
leafQueue.getQueueResourceQuotas()
.setConfiguredMinResource(Resources.multiply(
this.scheduler.getClusterResource(), updatedQueueTemplate
.getQueueCapacities().getCapacity(nodeLabel)));
deactivate(leafQueue, nodeLabel);
}
}
}
}
} finally {
writeLock.unlock();
}
}
private void activate(final AbstractAutoCreatedLeafQueue leafQueue,
String nodeLabel) throws SchedulerDynamicEditException {
writeLock.lock();
try {
getLeafQueueState(leafQueue, nodeLabel).activate();
parentQueueState.incAbsoluteActivatedChildCapacity(nodeLabel,
leafQueueTemplateCapacities.getAbsoluteCapacity(nodeLabel));
} finally {
writeLock.unlock();
}
}
private void deactivate(final AbstractAutoCreatedLeafQueue leafQueue,
String nodeLabel) throws SchedulerDynamicEditException {
writeLock.lock();
try {
getLeafQueueState(leafQueue, nodeLabel).deactivate();
parentQueueState.decAbsoluteActivatedChildCapacity(nodeLabel,
leafQueueTemplateCapacities.getAbsoluteCapacity(nodeLabel));
} finally {
writeLock.unlock();
}
}
public boolean hasPendingApps(final AutoCreatedLeafQueue leafQueue) {
return leafQueue.getNumApplications() > 0;
}
@Override
public void reinitialize(CapacitySchedulerContext schedulerContext,
final ParentQueue parentQueue) throws IOException {
if (!(parentQueue instanceof ManagedParentQueue)) {
throw new IllegalStateException(
"Expected instance of type " + ManagedParentQueue.class + " found "
+ " : " + parentQueue.getClass());
}
if (this.managedParentQueue != null && !parentQueue.getQueuePath().equals(
this.managedParentQueue.getQueuePath())) {
throw new IllegalStateException(
"Expected parent queue path to match " + this.managedParentQueue
.getQueuePath() + " found : " + parentQueue.getQueuePath());
}
this.managedParentQueue = (ManagedParentQueue) parentQueue;
initializeLeafQueueTemplate(this.managedParentQueue);
//clear state
parentQueueState.clear();
leafQueueState.clear();
LOG.info(
"Reinitialized queue management policy for parent queue " + parentQueue
.getQueuePath() + " with leaf queue template " + "capacities : ["
+ leafQueueTemplate.getQueueCapacities() + "]");
}
@Override
public AutoCreatedLeafQueueConfig getInitialLeafQueueConfiguration(
AbstractAutoCreatedLeafQueue leafQueue)
throws SchedulerDynamicEditException {
AutoCreatedLeafQueueConfig template;
if (!(leafQueue instanceof AutoCreatedLeafQueue)) {
throw new SchedulerDynamicEditException(
"Not an instance of " + "AutoCreatedLeafQueue : " + leafQueue
.getClass());
}
writeLock.lock();
try {
QueueCapacities capacities = new QueueCapacities(false);
for (String nodeLabel : leafQueueTemplateNodeLabels) {
if (!leafQueueState.createLeafQueueStateIfNotExists(leafQueue,
nodeLabel)) {
String message =
"Leaf queue already exists in state : " + getLeafQueueState(
leafQueue, nodeLabel);
LOG.error(message);
}
float availableCapacity = managedParentQueue.getQueueCapacities().
getAbsoluteCapacity(nodeLabel) - parentQueueState.
getAbsoluteActivatedChildQueueCapacity(nodeLabel) + EPSILON;
if (availableCapacity >= leafQueueTemplateCapacities
.getAbsoluteCapacity(nodeLabel)) {
updateCapacityFromTemplate(capacities, nodeLabel);
activate(leafQueue, nodeLabel);
} else{
updateToZeroCapacity(capacities, nodeLabel, leafQueue);
}
}
template = buildTemplate(capacities);
} finally {
writeLock.unlock();
}
return template;
}
private void updateToZeroCapacity(QueueCapacities capacities,
String nodeLabel, LeafQueue leafQueue) {
capacities.setCapacity(nodeLabel, 0.0f);
capacities.setMaximumCapacity(nodeLabel,
leafQueueTemplateCapacities.getMaximumCapacity(nodeLabel));
leafQueue.getQueueResourceQuotas().
setConfiguredMinResource(nodeLabel, Resource.newInstance(0, 0));
}
private void updateCapacityFromTemplate(QueueCapacities capacities,
String nodeLabel) {
capacities.setCapacity(nodeLabel,
leafQueueTemplateCapacities.getCapacity(nodeLabel));
capacities.setMaximumCapacity(nodeLabel,
leafQueueTemplateCapacities.getMaximumCapacity(nodeLabel));
capacities.setAbsoluteCapacity(nodeLabel,
leafQueueTemplateCapacities.getAbsoluteCapacity(nodeLabel));
capacities.setAbsoluteMaximumCapacity(nodeLabel,
leafQueueTemplateCapacities.getAbsoluteMaximumCapacity(nodeLabel));
}
@VisibleForTesting
LeafQueueStatePerPartition getLeafQueueState(LeafQueue queue,
String partition) throws SchedulerDynamicEditException {
readLock.lock();
try {
String queuePath = queue.getQueuePath();
if (!leafQueueState.containsLeafQueue(queuePath, partition)) {
throw new SchedulerDynamicEditException(
"Could not find leaf queue in " + "state " + queuePath);
} else{
return leafQueueState.
getLeafQueueStatePerPartition(queuePath, partition);
}
} finally {
readLock.unlock();
}
}
@VisibleForTesting
public float getAbsoluteActivatedChildQueueCapacity(String nodeLabel) {
return parentQueueState.getAbsoluteActivatedChildQueueCapacity(nodeLabel);
}
private List<FiCaSchedulerApp> getSortedPendingApplications() {
List<FiCaSchedulerApp> apps = new ArrayList<>(
managedParentQueue.getAllApplications());
Collections.sort(apps, applicationComparator);
return apps;
}
private AutoCreatedLeafQueueConfig buildTemplate(QueueCapacities capacities) {
AutoCreatedLeafQueueConfig.Builder templateBuilder =
new AutoCreatedLeafQueueConfig.Builder();
templateBuilder.capacities(capacities);
return new AutoCreatedLeafQueueConfig(templateBuilder);
}
}