blob: a80f317bb4c694c57e9a2162156e46ff6ca63d89 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.PriorityQueue;
/**
* Calculate how much resources need to be preempted for each queue,
* will be used by {@link PreemptionCandidatesSelector}.
*/
public class AbstractPreemptableResourceCalculator {
protected final CapacitySchedulerPreemptionContext context;
protected final ResourceCalculator rc;
private boolean isReservedPreemptionCandidatesSelector;
static class TQComparator implements Comparator<TempQueuePerPartition> {
private ResourceCalculator rc;
private Resource clusterRes;
TQComparator(ResourceCalculator rc, Resource clusterRes) {
this.rc = rc;
this.clusterRes = clusterRes;
}
@Override
public int compare(TempQueuePerPartition tq1, TempQueuePerPartition tq2) {
double assigned1 = getIdealPctOfGuaranteed(tq1);
double assigned2 = getIdealPctOfGuaranteed(tq2);
return PriorityUtilizationQueueOrderingPolicy.compare(assigned1,
assigned2, tq1.relativePriority, tq2.relativePriority);
}
// Calculates idealAssigned / guaranteed
// TempQueues with 0 guarantees are always considered the most over
// capacity and therefore considered last for resources.
private double getIdealPctOfGuaranteed(TempQueuePerPartition q) {
double pctOver = Integer.MAX_VALUE;
if (q != null && Resources.greaterThan(rc, clusterRes, q.getGuaranteed(),
Resources.none())) {
pctOver = Resources.divide(rc, clusterRes, q.idealAssigned,
q.getGuaranteed());
}
return (pctOver);
}
}
/**
* PreemptableResourceCalculator constructor.
*
* @param preemptionContext context
* @param isReservedPreemptionCandidatesSelector
* this will be set by different implementation of candidate
* selectors, please refer to TempQueuePerPartition#offer for
* details.
*/
public AbstractPreemptableResourceCalculator(
CapacitySchedulerPreemptionContext preemptionContext,
boolean isReservedPreemptionCandidatesSelector) {
context = preemptionContext;
rc = preemptionContext.getResourceCalculator();
this.isReservedPreemptionCandidatesSelector =
isReservedPreemptionCandidatesSelector;
}
/**
* Given a set of queues compute the fix-point distribution of unassigned
* resources among them. As pending request of a queue are exhausted, the
* queue is removed from the set and remaining capacity redistributed among
* remaining queues. The distribution is weighted based on guaranteed
* capacity, unless asked to ignoreGuarantee, in which case resources are
* distributed uniformly.
*
* @param totGuarant
* total guaranteed resource
* @param qAlloc
* List of child queues
* @param unassigned
* Unassigned resource per queue
* @param ignoreGuarantee
* ignore guarantee per queue.
*/
protected void computeFixpointAllocation(Resource totGuarant,
Collection<TempQueuePerPartition> qAlloc, Resource unassigned,
boolean ignoreGuarantee) {
// Prior to assigning the unused resources, process each queue as follows:
// If current > guaranteed, idealAssigned = guaranteed + untouchable extra
// Else idealAssigned = current;
// Subtract idealAssigned resources from unassigned.
// If the queue has all of its needs met (that is, if
// idealAssigned >= current + pending), remove it from consideration.
// Sort queues from most under-guaranteed to most over-guaranteed.
TQComparator tqComparator = new TQComparator(rc, totGuarant);
PriorityQueue<TempQueuePerPartition> orderedByNeed = new PriorityQueue<>(10,
tqComparator);
for (Iterator<TempQueuePerPartition> i = qAlloc.iterator(); i.hasNext();) {
TempQueuePerPartition q = i.next();
Resource used = q.getUsed();
if (Resources.greaterThan(rc, totGuarant, used, q.getGuaranteed())) {
q.idealAssigned = Resources.add(q.getGuaranteed(), q.untouchableExtra);
} else {
q.idealAssigned = Resources.clone(used);
}
Resources.subtractFrom(unassigned, q.idealAssigned);
// If idealAssigned < (allocated + used + pending), q needs more
// resources, so
// add it to the list of underserved queues, ordered by need.
Resource curPlusPend = Resources.add(q.getUsed(), q.pending);
if (Resources.lessThan(rc, totGuarant, q.idealAssigned, curPlusPend)) {
orderedByNeed.add(q);
}
}
// assign all cluster resources until no more demand, or no resources are
// left
while (!orderedByNeed.isEmpty() && Resources.greaterThan(rc, totGuarant,
unassigned, Resources.none())) {
Resource wQassigned = Resource.newInstance(0, 0);
// we compute normalizedGuarantees capacity based on currently active
// queues
resetCapacity(unassigned, orderedByNeed, ignoreGuarantee);
// For each underserved queue (or set of queues if multiple are equally
// underserved), offer its share of the unassigned resources based on its
// normalized guarantee. After the offer, if the queue is not satisfied,
// place it back in the ordered list of queues, recalculating its place
// in the order of most under-guaranteed to most over-guaranteed. In this
// way, the most underserved queue(s) are always given resources first.
Collection<TempQueuePerPartition> underserved = getMostUnderservedQueues(
orderedByNeed, tqComparator);
for (Iterator<TempQueuePerPartition> i = underserved.iterator(); i
.hasNext();) {
TempQueuePerPartition sub = i.next();
Resource wQavail = Resources.multiplyAndNormalizeUp(rc, unassigned,
sub.normalizedGuarantee, Resource.newInstance(1, 1));
Resource wQidle = sub.offer(wQavail, rc, totGuarant,
isReservedPreemptionCandidatesSelector);
Resource wQdone = Resources.subtract(wQavail, wQidle);
if (Resources.greaterThan(rc, totGuarant, wQdone, Resources.none())) {
// The queue is still asking for more. Put it back in the priority
// queue, recalculating its order based on need.
orderedByNeed.add(sub);
}
Resources.addTo(wQassigned, wQdone);
}
Resources.subtractFrom(unassigned, wQassigned);
}
// Sometimes its possible that, all queues are properly served. So intra
// queue preemption will not try for any preemption. How ever there are
// chances that within a queue, there are some imbalances. Hence make sure
// all queues are added to list.
while (!orderedByNeed.isEmpty()) {
TempQueuePerPartition q1 = orderedByNeed.remove();
context.addPartitionToUnderServedQueues(q1.queueName, q1.partition);
}
}
/**
* Computes a normalizedGuaranteed capacity based on active queues.
*
* @param clusterResource
* the total amount of resources in the cluster
* @param queues
* the list of queues to consider
* @param ignoreGuar
* ignore guarantee.
*/
private void resetCapacity(Resource clusterResource,
Collection<TempQueuePerPartition> queues, boolean ignoreGuar) {
Resource activeCap = Resource.newInstance(0, 0);
if (ignoreGuar) {
for (TempQueuePerPartition q : queues) {
q.normalizedGuarantee = 1.0f / queues.size();
}
} else {
for (TempQueuePerPartition q : queues) {
Resources.addTo(activeCap, q.getGuaranteed());
}
for (TempQueuePerPartition q : queues) {
q.normalizedGuarantee = Resources.divide(rc, clusterResource,
q.getGuaranteed(), activeCap);
}
}
}
// Take the most underserved TempQueue (the one on the head). Collect and
// return the list of all queues that have the same idealAssigned
// percentage of guaranteed.
private Collection<TempQueuePerPartition> getMostUnderservedQueues(
PriorityQueue<TempQueuePerPartition> orderedByNeed,
TQComparator tqComparator) {
ArrayList<TempQueuePerPartition> underserved = new ArrayList<>();
while (!orderedByNeed.isEmpty()) {
TempQueuePerPartition q1 = orderedByNeed.remove();
underserved.add(q1);
// Add underserved queues in order for later uses
context.addPartitionToUnderServedQueues(q1.queueName, q1.partition);
TempQueuePerPartition q2 = orderedByNeed.peek();
// q1's pct of guaranteed won't be larger than q2's. If it's less, then
// return what has already been collected. Otherwise, q1's pct of
// guaranteed == that of q2, so add q2 to underserved list during the
// next pass.
if (q2 == null || tqComparator.compare(q1, q2) < 0) {
if (null != q2) {
context.addPartitionToUnderServedQueues(q2.queueName, q2.partition);
}
return underserved;
}
}
return underserved;
}
}