blob: 907785e437db3633a2c890fc9794976cef7e7826 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* Calculate how much resources need to be preempted for each queue,
* will be used by {@link PreemptionCandidatesSelector}
*/
public class PreemptableResourceCalculator
extends
AbstractPreemptableResourceCalculator {
private static final Log LOG =
LogFactory.getLog(PreemptableResourceCalculator.class);
private boolean isReservedPreemptionCandidatesSelector;
/**
* PreemptableResourceCalculator constructor
*
* @param preemptionContext
* @param isReservedPreemptionCandidatesSelector this will be set by
* different implementation of candidate selectors, please refer to
* TempQueuePerPartition#offer for details.
*/
public PreemptableResourceCalculator(
CapacitySchedulerPreemptionContext preemptionContext,
boolean isReservedPreemptionCandidatesSelector) {
super(preemptionContext, isReservedPreemptionCandidatesSelector);
}
/**
* This method computes (for a single level in the tree, passed as a {@code
* List<TempQueue>}) the ideal assignment of resources. This is done
* recursively to allocate capacity fairly across all queues with pending
* demands. It terminates when no resources are left to assign, or when all
* demand is satisfied.
*
* @param rc resource calculator
* @param queues a list of cloned queues to be assigned capacity to (this is
* an out param)
* @param totalPreemptionAllowed total amount of preemption we allow
* @param tot_guarant the amount of capacity assigned to this pool of queues
*/
private void computeIdealResourceDistribution(ResourceCalculator rc,
List<TempQueuePerPartition> queues, Resource totalPreemptionAllowed,
Resource tot_guarant) {
// qAlloc tracks currently active queues (will decrease progressively as
// demand is met)
List<TempQueuePerPartition> qAlloc = new ArrayList<>(queues);
// unassigned tracks how much resources are still to assign, initialized
// with the total capacity for this set of queues
Resource unassigned = Resources.clone(tot_guarant);
// group queues based on whether they have non-zero guaranteed capacity
Set<TempQueuePerPartition> nonZeroGuarQueues = new HashSet<>();
Set<TempQueuePerPartition> zeroGuarQueues = new HashSet<>();
for (TempQueuePerPartition q : qAlloc) {
if (Resources.greaterThan(rc, tot_guarant,
q.getGuaranteed(), Resources.none())) {
nonZeroGuarQueues.add(q);
} else {
zeroGuarQueues.add(q);
}
}
// first compute the allocation as a fixpoint based on guaranteed capacity
computeFixpointAllocation(tot_guarant, nonZeroGuarQueues, unassigned,
false);
// if any capacity is left unassigned, distributed among zero-guarantee
// queues uniformly (i.e., not based on guaranteed capacity, as this is zero)
if (!zeroGuarQueues.isEmpty()
&& Resources.greaterThan(rc, tot_guarant, unassigned, Resources.none())) {
computeFixpointAllocation(tot_guarant, zeroGuarQueues, unassigned,
true);
}
// based on ideal assignment computed above and current assignment we derive
// how much preemption is required overall
Resource totPreemptionNeeded = Resource.newInstance(0, 0);
for (TempQueuePerPartition t:queues) {
if (Resources.greaterThan(rc, tot_guarant,
t.getUsed(), t.idealAssigned)) {
Resources.addTo(totPreemptionNeeded, Resources
.subtract(t.getUsed(), t.idealAssigned));
}
}
/**
* if we need to preempt more than is allowed, compute a factor (0<f<1)
* that is used to scale down how much we ask back from each queue
*/
float scalingFactor = 1.0F;
if (Resources.greaterThan(rc,
tot_guarant, totPreemptionNeeded, totalPreemptionAllowed)) {
scalingFactor = Resources.divide(rc, tot_guarant, totalPreemptionAllowed,
totPreemptionNeeded);
}
// assign to each queue the amount of actual preemption based on local
// information of ideal preemption and scaling factor
for (TempQueuePerPartition t : queues) {
t.assignPreemption(scalingFactor, rc, tot_guarant);
}
}
/**
* This method recursively computes the ideal assignment of resources to each
* level of the hierarchy. This ensures that leafs that are over-capacity but
* with parents within capacity will not be preemptionCandidates. Preemptions are allowed
* within each subtree according to local over/under capacity.
*
* @param root the root of the cloned queue hierachy
* @param totalPreemptionAllowed maximum amount of preemption allowed
* @return a list of leaf queues updated with preemption targets
*/
private void recursivelyComputeIdealAssignment(
TempQueuePerPartition root, Resource totalPreemptionAllowed) {
if (root.getChildren() != null &&
root.getChildren().size() > 0) {
// compute ideal distribution at this level
computeIdealResourceDistribution(rc, root.getChildren(),
totalPreemptionAllowed, root.idealAssigned);
// compute recursively for lower levels and build list of leafs
for (TempQueuePerPartition t : root.getChildren()) {
recursivelyComputeIdealAssignment(t, totalPreemptionAllowed);
}
}
}
private void calculateResToObtainByPartitionForLeafQueues(
Set<String> leafQueueNames, Resource clusterResource) {
// Loop all leaf queues
for (String queueName : leafQueueNames) {
// check if preemption disabled for the queue
if (context.getQueueByPartition(queueName,
RMNodeLabelsManager.NO_LABEL).preemptionDisabled) {
if (LOG.isDebugEnabled()) {
LOG.debug("skipping from queue=" + queueName
+ " because it's a non-preemptable queue");
}
continue;
}
// compute resToObtainByPartition considered inter-queue preemption
for (TempQueuePerPartition qT : context.getQueuePartitions(queueName)) {
// we act only if we are violating balance by more than
// maxIgnoredOverCapacity
if (Resources.greaterThan(rc, clusterResource,
qT.getUsed(), Resources
.multiply(qT.getGuaranteed(),
1.0 + context.getMaxIgnoreOverCapacity()))) {
/*
* We introduce a dampening factor naturalTerminationFactor that
* accounts for natural termination of containers.
*
* This is added to control pace of preemption, let's say:
* If preemption policy calculated a queue *should be* preempted 20 GB
* And the nature_termination_factor set to 0.1. As a result, preemption
* policy will select 20 GB * 0.1 = 2GB containers to be preempted.
*
* However, it doesn't work for YARN-4390:
* For example, if a queue needs to be preempted 20GB for *one single*
* large container, preempt 10% of such resource isn't useful.
* So to make it simple, only apply nature_termination_factor when
* selector is not reservedPreemptionCandidatesSelector.
*/
Resource resToObtain = qT.toBePreempted;
if (!isReservedPreemptionCandidatesSelector) {
resToObtain = Resources.multiply(qT.toBePreempted,
context.getNaturalTerminationFactor());
}
// Only add resToObtain when it >= 0
if (Resources.greaterThan(rc, clusterResource, resToObtain,
Resources.none())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Queue=" + queueName + " partition=" + qT.partition
+ " resource-to-obtain=" + resToObtain);
}
}
qT.setActuallyToBePreempted(Resources.clone(resToObtain));
} else {
qT.setActuallyToBePreempted(Resources.none());
}
if (LOG.isDebugEnabled()) {
LOG.debug(qT);
}
}
}
}
private void updatePreemptableExtras(TempQueuePerPartition cur) {
if (cur.children == null || cur.children.isEmpty()) {
cur.updatePreemptableExtras(rc);
} else {
for (TempQueuePerPartition child : cur.children) {
updatePreemptableExtras(child);
}
cur.updatePreemptableExtras(rc);
}
}
public void computeIdealAllocation(Resource clusterResource,
Resource totalPreemptionAllowed) {
for (String partition : context.getAllPartitions()) {
TempQueuePerPartition tRoot = context.getQueueByPartition(
CapacitySchedulerConfiguration.ROOT, partition);
updatePreemptableExtras(tRoot);
// compute the ideal distribution of resources among queues
// updates cloned queues state accordingly
tRoot.idealAssigned = tRoot.getGuaranteed();
recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);
}
// based on ideal allocation select containers to be preempted from each
// calculate resource-to-obtain by partition for each leaf queues
calculateResToObtainByPartitionForLeafQueues(context.getLeafQueueNames(),
clusterResource);
}
}