blob: f818499aba7663dc720409fe2913647eccb41ac0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptableQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
/**
* This class implement a {@link SchedulingEditPolicy} that is designed to be
* paired with the {@code CapacityScheduler}. At every invocation of {@code
* editSchedule()} it computes the ideal amount of resources assigned to each
* queue (for each queue in the hierarchy), and determines whether preemption
* is needed. Overcapacity is distributed among queues in a weighted fair manner,
* where the weight is the amount of guaranteed capacity for the queue.
* Based on this ideal assignment it determines whether preemption is required
* and select a set of containers from each application that would be killed if
* the corresponding amount of resources is not freed up by the application.
*
* If not in {@code observeOnly} mode, it triggers preemption requests via a
* {@link ContainerPreemptEvent} that the {@code ResourceManager} will ensure
* to deliver to the application (or to execute).
*
* If the deficit of resources is persistent over a long enough period of time
* this policy will trigger forced termination of containers (again by generating
* {@link ContainerPreemptEvent}).
*/
public class ProportionalCapacityPreemptionPolicy
implements SchedulingEditPolicy, CapacitySchedulerPreemptionContext {
/**
* IntraQueuePreemptionOrder will be used to define various priority orders
* which could be configured by admin.
*/
@Unstable
public enum IntraQueuePreemptionOrderPolicy {
PRIORITY_FIRST, USERLIMIT_FIRST;
}
private static final Log LOG =
LogFactory.getLog(ProportionalCapacityPreemptionPolicy.class);
private final Clock clock;
// Configurable fields
private double maxIgnoredOverCapacity;
private long maxWaitTime;
private long monitoringInterval;
private float percentageClusterPreemptionAllowed;
private double naturalTerminationFactor;
private boolean observeOnly;
private boolean lazyPreempionEnabled;
private float maxAllowableLimitForIntraQueuePreemption;
private float minimumThresholdForIntraQueuePreemption;
private IntraQueuePreemptionOrderPolicy intraQueuePreemptionOrderPolicy;
// Current configuration
private CapacitySchedulerConfiguration csConfig;
// Pointer to other RM components
private RMContext rmContext;
private ResourceCalculator rc;
private CapacityScheduler scheduler;
private RMNodeLabelsManager nlm;
// Internal properties to make decisions of what to preempt
private final Map<RMContainer,Long> preemptionCandidates =
new HashMap<>();
private Map<String, Map<String, TempQueuePerPartition>> queueToPartitions =
new HashMap<>();
private Map<String, LinkedHashSet<String>> partitionToUnderServedQueues =
new HashMap<String, LinkedHashSet<String>>();
private List<PreemptionCandidatesSelector> candidatesSelectionPolicies;
private Set<String> allPartitions;
private Set<String> leafQueueNames;
// Preemptable Entities, synced from scheduler at every run
private Map<String, PreemptableQueue> preemptableQueues;
private Set<ContainerId> killableContainers;
@SuppressWarnings("unchecked")
public ProportionalCapacityPreemptionPolicy() {
clock = SystemClock.getInstance();
allPartitions = Collections.EMPTY_SET;
leafQueueNames = Collections.EMPTY_SET;
preemptableQueues = Collections.EMPTY_MAP;
}
@SuppressWarnings("unchecked")
@VisibleForTesting
public ProportionalCapacityPreemptionPolicy(RMContext context,
CapacityScheduler scheduler, Clock clock) {
init(context.getYarnConfiguration(), context, scheduler);
this.clock = clock;
allPartitions = Collections.EMPTY_SET;
leafQueueNames = Collections.EMPTY_SET;
preemptableQueues = Collections.EMPTY_MAP;
}
public void init(Configuration config, RMContext context,
ResourceScheduler sched) {
LOG.info("Preemption monitor:" + this.getClass().getCanonicalName());
assert null == scheduler : "Unexpected duplicate call to init";
if (!(sched instanceof CapacityScheduler)) {
throw new YarnRuntimeException("Class " +
sched.getClass().getCanonicalName() + " not instance of " +
CapacityScheduler.class.getCanonicalName());
}
rmContext = context;
scheduler = (CapacityScheduler) sched;
rc = scheduler.getResourceCalculator();
nlm = scheduler.getRMContext().getNodeLabelManager();
updateConfigIfNeeded();
}
private void updateConfigIfNeeded() {
CapacitySchedulerConfiguration config = scheduler.getConfiguration();
if (config == csConfig) {
return;
}
maxIgnoredOverCapacity = config.getDouble(
CapacitySchedulerConfiguration.PREEMPTION_MAX_IGNORED_OVER_CAPACITY,
CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_MAX_IGNORED_OVER_CAPACITY);
naturalTerminationFactor = config.getDouble(
CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_NATURAL_TERMINATION_FACTOR);
maxWaitTime = config.getLong(
CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL,
CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_WAIT_TIME_BEFORE_KILL);
monitoringInterval = config.getLong(
CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_MONITORING_INTERVAL);
percentageClusterPreemptionAllowed = config.getFloat(
CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
CapacitySchedulerConfiguration.DEFAULT_TOTAL_PREEMPTION_PER_ROUND);
observeOnly = config.getBoolean(
CapacitySchedulerConfiguration.PREEMPTION_OBSERVE_ONLY,
CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_OBSERVE_ONLY);
lazyPreempionEnabled = config.getBoolean(
CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENABLED,
CapacitySchedulerConfiguration.DEFAULT_LAZY_PREEMPTION_ENABLED);
maxAllowableLimitForIntraQueuePreemption = config.getFloat(
CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
CapacitySchedulerConfiguration.
DEFAULT_INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT);
minimumThresholdForIntraQueuePreemption = config.getFloat(
CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD,
CapacitySchedulerConfiguration.
DEFAULT_INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD);
intraQueuePreemptionOrderPolicy = IntraQueuePreemptionOrderPolicy
.valueOf(config
.get(
CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
CapacitySchedulerConfiguration.DEFAULT_INTRAQUEUE_PREEMPTION_ORDER_POLICY)
.toUpperCase());
candidatesSelectionPolicies = new ArrayList<>();
// Do we need white queue-priority preemption policy?
boolean isQueuePriorityPreemptionEnabled =
config.getPUOrderingPolicyUnderUtilizedPreemptionEnabled();
if (isQueuePriorityPreemptionEnabled) {
candidatesSelectionPolicies.add(
new QueuePriorityContainerCandidateSelector(this));
}
// Do we need to specially consider reserved containers?
boolean selectCandidatesForResevedContainers = config.getBoolean(
CapacitySchedulerConfiguration.
PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS,
CapacitySchedulerConfiguration.
DEFAULT_PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS);
if (selectCandidatesForResevedContainers) {
candidatesSelectionPolicies
.add(new ReservedContainerCandidatesSelector(this));
}
boolean additionalPreemptionBasedOnReservedResource = config.getBoolean(
CapacitySchedulerConfiguration.ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS,
CapacitySchedulerConfiguration.DEFAULT_ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS);
// initialize candidates preemption selection policies
candidatesSelectionPolicies.add(new FifoCandidatesSelector(this,
additionalPreemptionBasedOnReservedResource));
// Do we need to specially consider intra queue
boolean isIntraQueuePreemptionEnabled = config.getBoolean(
CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED,
CapacitySchedulerConfiguration.DEFAULT_INTRAQUEUE_PREEMPTION_ENABLED);
if (isIntraQueuePreemptionEnabled) {
candidatesSelectionPolicies.add(new IntraQueueCandidatesSelector(this));
}
LOG.info("Capacity Scheduler configuration changed, updated preemption " +
"properties to:\n" +
"max_ignored_over_capacity = " + maxIgnoredOverCapacity + "\n" +
"natural_termination_factor = " + naturalTerminationFactor + "\n" +
"max_wait_before_kill = " + maxWaitTime + "\n" +
"monitoring_interval = " + monitoringInterval + "\n" +
"total_preemption_per_round = " + percentageClusterPreemptionAllowed +
"\n" +
"observe_only = " + observeOnly + "\n" +
"lazy-preemption-enabled = " + lazyPreempionEnabled + "\n" +
"intra-queue-preemption.enabled = " + isIntraQueuePreemptionEnabled +
"\n" +
"intra-queue-preemption.max-allowable-limit = " +
maxAllowableLimitForIntraQueuePreemption + "\n" +
"intra-queue-preemption.minimum-threshold = " +
minimumThresholdForIntraQueuePreemption + "\n" +
"intra-queue-preemption.preemption-order-policy = " +
intraQueuePreemptionOrderPolicy + "\n" +
"priority-utilization.underutilized-preemption.enabled = " +
isQueuePriorityPreemptionEnabled + "\n" +
"select_based_on_reserved_containers = " +
selectCandidatesForResevedContainers + "\n" +
"additional_res_balance_based_on_reserved_containers = " +
additionalPreemptionBasedOnReservedResource);
csConfig = config;
}
@Override
public ResourceCalculator getResourceCalculator() {
return rc;
}
@Override
public synchronized void editSchedule() {
updateConfigIfNeeded();
long startTs = clock.getTime();
CSQueue root = scheduler.getRootQueue();
Resource clusterResources = Resources.clone(scheduler.getClusterResource());
containerBasedPreemptOrKill(root, clusterResources);
if (LOG.isDebugEnabled()) {
LOG.debug("Total time used=" + (clock.getTime() - startTs) + " ms.");
}
}
@SuppressWarnings("unchecked")
private void preemptOrkillSelectedContainerAfterWait(
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
long currentTime) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Starting to preempt containers for selectedCandidates and size:"
+ selectedCandidates.size());
}
// preempt (or kill) the selected containers
for (Map.Entry<ApplicationAttemptId, Set<RMContainer>> e : selectedCandidates
.entrySet()) {
ApplicationAttemptId appAttemptId = e.getKey();
if (LOG.isDebugEnabled()) {
LOG.debug("Send to scheduler: in app=" + appAttemptId
+ " #containers-to-be-preemptionCandidates=" + e.getValue().size());
}
for (RMContainer container : e.getValue()) {
// if we tried to preempt this for more than maxWaitTime
if (preemptionCandidates.get(container) != null
&& preemptionCandidates.get(container)
+ maxWaitTime <= currentTime) {
// kill it
rmContext.getDispatcher().getEventHandler().handle(
new ContainerPreemptEvent(appAttemptId, container,
SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE));
preemptionCandidates.remove(container);
} else {
if (preemptionCandidates.get(container) != null) {
// We already updated the information to scheduler earlier, we need
// not have to raise another event.
continue;
}
//otherwise just send preemption events
rmContext.getDispatcher().getEventHandler().handle(
new ContainerPreemptEvent(appAttemptId, container,
SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION));
preemptionCandidates.put(container, currentTime);
}
}
}
}
private void syncKillableContainersFromScheduler() {
// sync preemptable entities from scheduler
preemptableQueues =
scheduler.getPreemptionManager().getShallowCopyOfPreemptableQueues();
killableContainers = new HashSet<>();
for (Map.Entry<String, PreemptableQueue> entry : preemptableQueues
.entrySet()) {
PreemptableQueue entity = entry.getValue();
for (Map<ContainerId, RMContainer> map : entity.getKillableContainers()
.values()) {
killableContainers.addAll(map.keySet());
}
}
}
private void cleanupStaledPreemptionCandidates(long currentTime) {
// Keep the preemptionCandidates list clean
for (Iterator<Map.Entry<RMContainer, Long>> i =
preemptionCandidates.entrySet().iterator(); i.hasNext();) {
// garbage collect containers that are irrelevant for preemption
// And avoid preempt selected containers for *this execution*
// or within 1 ms
if (i.next().getValue() + 2 * maxWaitTime < currentTime) {
i.remove();
}
}
}
private Set<String> getLeafQueueNames(TempQueuePerPartition q) {
if (q.children == null || q.children.isEmpty()) {
return ImmutableSet.of(q.queueName);
}
Set<String> leafQueueNames = new HashSet<>();
for (TempQueuePerPartition child : q.children) {
leafQueueNames.addAll(getLeafQueueNames(child));
}
return leafQueueNames;
}
/**
* This method selects and tracks containers to be preemptionCandidates. If a container
* is in the target list for more than maxWaitTime it is killed.
*
* @param root the root of the CapacityScheduler queue hierarchy
* @param clusterResources the total amount of resources in the cluster
*/
private void containerBasedPreemptOrKill(CSQueue root,
Resource clusterResources) {
// Sync killable containers from scheduler when lazy preemption enabled
if (lazyPreempionEnabled) {
syncKillableContainersFromScheduler();
}
// All partitions to look at
Set<String> partitions = new HashSet<>();
partitions.addAll(scheduler.getRMContext()
.getNodeLabelManager().getClusterNodeLabelNames());
partitions.add(RMNodeLabelsManager.NO_LABEL);
this.allPartitions = ImmutableSet.copyOf(partitions);
// extract a summary of the queues from scheduler
synchronized (scheduler) {
queueToPartitions.clear();
for (String partitionToLookAt : allPartitions) {
cloneQueues(root, Resources
.clone(nlm.getResourceByLabel(partitionToLookAt, clusterResources)),
partitionToLookAt);
}
// Update effective priority of queues
}
this.leafQueueNames = ImmutableSet.copyOf(getLeafQueueNames(
getQueueByPartition(CapacitySchedulerConfiguration.ROOT,
RMNodeLabelsManager.NO_LABEL)));
// compute total preemption allowed
Resource totalPreemptionAllowed = Resources.multiply(clusterResources,
percentageClusterPreemptionAllowed);
//clear under served queues for every run
partitionToUnderServedQueues.clear();
// based on ideal allocation select containers to be preemptionCandidates from each
// queue and each application
Map<ApplicationAttemptId, Set<RMContainer>> toPreempt =
new HashMap<>();
for (PreemptionCandidatesSelector selector :
candidatesSelectionPolicies) {
long startTime = 0;
if (LOG.isDebugEnabled()) {
LOG.debug(MessageFormat
.format("Trying to use {0} to select preemption candidates",
selector.getClass().getName()));
startTime = clock.getTime();
}
toPreempt = selector.selectCandidates(toPreempt,
clusterResources, totalPreemptionAllowed);
if (LOG.isDebugEnabled()) {
LOG.debug(MessageFormat
.format("{0} uses {1} millisecond to run",
selector.getClass().getName(), clock.getTime() - startTime));
int totalSelected = 0;
for (Set<RMContainer> set : toPreempt.values()) {
totalSelected += set.size();
}
LOG.debug(MessageFormat
.format("So far, total {0} containers selected to be preempted",
totalSelected));
}
}
if (LOG.isDebugEnabled()) {
logToCSV(new ArrayList<>(leafQueueNames));
}
// if we are in observeOnly mode return before any action is taken
if (observeOnly) {
return;
}
// TODO: need consider revert killable containers when no more demandings.
// Since we could have several selectors to make decisions concurrently.
// So computed ideal-allocation varies between different selectors.
//
// We may need to "score" killable containers and revert the most preferred
// containers. The bottom line is, we shouldn't preempt a queue which is already
// below its guaranteed resource.
long currentTime = clock.getTime();
// preempt (or kill) the selected containers
preemptOrkillSelectedContainerAfterWait(toPreempt, currentTime);
// cleanup staled preemption candidates
cleanupStaledPreemptionCandidates(currentTime);
}
@Override
public long getMonitoringInterval() {
return monitoringInterval;
}
@Override
public String getPolicyName() {
return "ProportionalCapacityPreemptionPolicy";
}
@VisibleForTesting
public Map<RMContainer, Long> getToPreemptContainers() {
return preemptionCandidates;
}
/**
* This method walks a tree of CSQueue and clones the portion of the state
* relevant for preemption in TempQueue(s). It also maintains a pointer to
* the leaves. Finally it aggregates pending resources in each queue and rolls
* it up to higher levels.
*
* @param curQueue current queue which I'm looking at now
* @param partitionResource the total amount of resources in the cluster
* @return the root of the cloned queue hierarchy
*/
private TempQueuePerPartition cloneQueues(CSQueue curQueue,
Resource partitionResource, String partitionToLookAt) {
TempQueuePerPartition ret;
ReadLock readLock = curQueue.getReadLock();
try {
// Acquire a read lock from Parent/LeafQueue.
readLock.lock();
String queueName = curQueue.getQueueName();
QueueCapacities qc = curQueue.getQueueCapacities();
float absCap = qc.getAbsoluteCapacity(partitionToLookAt);
float absMaxCap = qc.getAbsoluteMaximumCapacity(partitionToLookAt);
boolean preemptionDisabled = curQueue.getPreemptionDisabled();
Resource current = Resources
.clone(curQueue.getQueueResourceUsage().getUsed(partitionToLookAt));
Resource killable = Resources.none();
Resource reserved = Resources.clone(
curQueue.getQueueResourceUsage().getReserved(partitionToLookAt));
if (null != preemptableQueues.get(queueName)) {
killable = Resources.clone(preemptableQueues.get(queueName)
.getKillableResource(partitionToLookAt));
}
// when partition is a non-exclusive partition, the actual maxCapacity
// could more than specified maxCapacity
try {
if (!scheduler.getRMContext().getNodeLabelManager()
.isExclusiveNodeLabel(partitionToLookAt)) {
absMaxCap = 1.0f;
}
} catch (IOException e) {
// This may cause by partition removed when running capacity monitor,
// just ignore the error, this will be corrected when doing next check.
}
ret = new TempQueuePerPartition(queueName, current, preemptionDisabled,
partitionToLookAt, killable, absCap, absMaxCap, partitionResource,
reserved, curQueue);
if (curQueue instanceof ParentQueue) {
String configuredOrderingPolicy =
((ParentQueue) curQueue).getQueueOrderingPolicy().getConfigName();
// Recursively add children
for (CSQueue c : curQueue.getChildQueues()) {
TempQueuePerPartition subq = cloneQueues(c, partitionResource,
partitionToLookAt);
// If we respect priority
if (StringUtils.equals(
CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY,
configuredOrderingPolicy)) {
subq.relativePriority = c.getPriority().getPriority();
}
ret.addChild(subq);
subq.parent = ret;
}
}
} finally {
readLock.unlock();
}
addTempQueuePartition(ret);
return ret;
}
// simple printout function that reports internal queue state (useful for
// plotting)
private void logToCSV(List<String> leafQueueNames){
Collections.sort(leafQueueNames);
String queueState = " QUEUESTATE: " + clock.getTime();
StringBuilder sb = new StringBuilder();
sb.append(queueState);
for (String queueName : leafQueueNames) {
TempQueuePerPartition tq =
getQueueByPartition(queueName, RMNodeLabelsManager.NO_LABEL);
sb.append(", ");
tq.appendLogString(sb);
}
LOG.debug(sb.toString());
}
private void addTempQueuePartition(TempQueuePerPartition queuePartition) {
String queueName = queuePartition.queueName;
Map<String, TempQueuePerPartition> queuePartitions;
if (null == (queuePartitions = queueToPartitions.get(queueName))) {
queuePartitions = new HashMap<>();
queueToPartitions.put(queueName, queuePartitions);
}
queuePartitions.put(queuePartition.partition, queuePartition);
}
/**
* Get queue partition by given queueName and partitionName
*/
@Override
public TempQueuePerPartition getQueueByPartition(String queueName,
String partition) {
Map<String, TempQueuePerPartition> partitionToQueues;
if (null == (partitionToQueues = queueToPartitions.get(queueName))) {
throw new YarnRuntimeException("This shouldn't happen, cannot find "
+ "TempQueuePerPartition for queueName=" + queueName);
}
return partitionToQueues.get(partition);
}
/**
* Get all queue partitions by given queueName
*/
@Override
public Collection<TempQueuePerPartition> getQueuePartitions(String queueName) {
if (!queueToPartitions.containsKey(queueName)) {
throw new YarnRuntimeException("This shouldn't happen, cannot find "
+ "TempQueuePerPartition collection for queueName=" + queueName);
}
return queueToPartitions.get(queueName).values();
}
@Override
public CapacityScheduler getScheduler() {
return scheduler;
}
@Override
public RMContext getRMContext() {
return rmContext;
}
@Override
public boolean isObserveOnly() {
return observeOnly;
}
@Override
public Set<ContainerId> getKillableContainers() {
return killableContainers;
}
@Override
public double getMaxIgnoreOverCapacity() {
return maxIgnoredOverCapacity;
}
@Override
public double getNaturalTerminationFactor() {
return naturalTerminationFactor;
}
@Override
public Set<String> getLeafQueueNames() {
return leafQueueNames;
}
@Override
public Set<String> getAllPartitions() {
return allPartitions;
}
@VisibleForTesting
Map<String, Map<String, TempQueuePerPartition>> getQueuePartitions() {
return queueToPartitions;
}
@Override
public int getClusterMaxApplicationPriority() {
return scheduler.getMaxClusterLevelAppPriority().getPriority();
}
@Override
public float getMaxAllowableLimitForIntraQueuePreemption() {
return maxAllowableLimitForIntraQueuePreemption;
}
@Override
public float getMinimumThresholdForIntraQueuePreemption() {
return minimumThresholdForIntraQueuePreemption;
}
@Override
public Resource getPartitionResource(String partition) {
return Resources.clone(nlm.getResourceByLabel(partition,
Resources.clone(scheduler.getClusterResource())));
}
public LinkedHashSet<String> getUnderServedQueuesPerPartition(
String partition) {
return partitionToUnderServedQueues.get(partition);
}
public void addPartitionToUnderServedQueues(String queueName,
String partition) {
LinkedHashSet<String> underServedQueues = partitionToUnderServedQueues
.get(partition);
if (null == underServedQueues) {
underServedQueues = new LinkedHashSet<String>();
partitionToUnderServedQueues.put(partition, underServedQueues);
}
underServedQueues.add(queueName);
}
@Override
public IntraQueuePreemptionOrderPolicy getIntraQueuePreemptionOrderPolicy() {
return intraQueuePreemptionOrderPolicy;
}
}