blob: 97bb4c56175fb0856e3ed1f13d956d434d7dd644 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
import static java.lang.Math.addExact;
* Contains logic for computing the fair shares. A {@link Schedulable}'s fair
* share is {@link Resource} it is entitled to, independent of the current
* demands and allocations on the cluster. A {@link Schedulable} whose resource
* consumption lies at or below its fair share will never have its containers
* preempted.
public final class ComputeFairShares {
private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25;
private ComputeFairShares() {
* Compute fair share of the given schedulables.Fair share is an allocation of
* shares considering only active schedulables ie schedulables which have
* running apps.
* @param schedulables
* @param totalResources
* @param type
public static void computeShares(
Collection<? extends Schedulable> schedulables, Resource totalResources,
String type) {
computeSharesInternal(schedulables, totalResources, type, false);
* Compute the steady fair share of the given queues. The steady fair
* share is an allocation of shares considering all queues, i.e.,
* active and inactive.
* @param queues
* @param totalResources
* @param type
public static void computeSteadyShares(
Collection<? extends FSQueue> queues, Resource totalResources,
String type) {
computeSharesInternal(queues, totalResources, type, true);
* Given a set of Schedulables and a number of slots, compute their weighted
* fair shares. The min and max shares and of the Schedulables are assumed to
* be set beforehand. We compute the fairest possible allocation of shares to
* the Schedulables that respects their min and max shares.
* <p>
* To understand what this method does, we must first define what weighted
* fair sharing means in the presence of min and max shares. If there
* were no minimum or maximum shares, then weighted fair sharing would be
* achieved if the ratio of slotsAssigned / weight was equal for each
* Schedulable and all slots were assigned. Minimum and maximum shares add a
* further twist - Some Schedulables may have a min share higher than their
* assigned share or a max share lower than their assigned share.
* <p>
* To deal with these possibilities, we define an assignment of slots as being
* fair if there exists a ratio R such that: Schedulables S where S.minShare
* {@literal >} R * S.weight are given share S.minShare - Schedulables S
* where S.maxShare {@literal <} R * S.weight are given S.maxShare -
* All other Schedulables S are assigned share R * S.weight -
* The sum of all the shares is totalSlots.
* <p>
* We call R the weight-to-slots ratio because it converts a Schedulable's
* weight to the number of slots it is assigned.
* <p>
* We compute a fair allocation by finding a suitable weight-to-slot ratio R.
* To do this, we use binary search. Given a ratio R, we compute the number of
* slots that would be used in total with this ratio (the sum of the shares
* computed using the conditions above). If this number of slots is less than
* totalSlots, then R is too small and more slots could be assigned. If the
* number of slots is more than totalSlots, then R is too large.
* <p>
* We begin the binary search with a lower bound on R of 0 (which means that
* all Schedulables are only given their minShare) and an upper bound computed
* to be large enough that too many slots are given (by doubling R until we
* use more than totalResources resources). The helper method
* resourceUsedWithWeightToResourceRatio computes the total resources used
* with a given value of R.
* <p>
* The running time of this algorithm is linear in the number of Schedulables,
* because resourceUsedWithWeightToResourceRatio is linear-time and the
* number of iterations of binary search is a constant (dependent on desired
* precision).
private static void computeSharesInternal(
Collection<? extends Schedulable> allSchedulables,
Resource totalResources, String type, boolean isSteadyShare) {
Collection<Schedulable> schedulables = new ArrayList<>();
long takenResources = handleFixedFairShares(
allSchedulables, schedulables, isSteadyShare, type);
if (schedulables.isEmpty()) {
// Find an upper bound on R that we can use in our binary search. We start
// at R = 1 and double it until we have either used all the resources or we
// have met all Schedulables' max shares.
long totalMaxShare = 0;
for (Schedulable sched : schedulables) {
long maxShare = sched.getMaxShare().getResourceValue(type);
totalMaxShare = safeAdd(maxShare, totalMaxShare);
if (totalMaxShare == Long.MAX_VALUE) {
long totalResource = Math.max((totalResources.getResourceValue(type) -
takenResources), 0);
totalResource = Math.min(totalMaxShare, totalResource);
double rMax = 1.0;
while (resourceUsedWithWeightToResourceRatio(rMax, schedulables, type)
< totalResource) {
rMax *= 2.0;
// Perform the binary search for up to COMPUTE_FAIR_SHARES_ITERATIONS steps
double left = 0;
double right = rMax;
for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {
double mid = (left + right) / 2.0;
long plannedResourceUsed = resourceUsedWithWeightToResourceRatio(
mid, schedulables, type);
if (plannedResourceUsed == totalResource) {
right = mid;
} else if (plannedResourceUsed < totalResource) {
left = mid;
} else {
right = mid;
// Set the fair shares based on the value of R we've converged to
for (Schedulable sched : schedulables) {
Resource target;
if (isSteadyShare) {
target = ((FSQueue) sched).getSteadyFairShare();
} else {
target = sched.getFairShare();
target.setResourceValue(type, computeShare(sched, right, type));
* Compute the resources that would be used given a weight-to-resource ratio
* w2rRatio, for use in the computeFairShares algorithm as described in
* {@link #computeSharesInternal}.
private static long resourceUsedWithWeightToResourceRatio(double w2rRatio,
Collection<? extends Schedulable> schedulables, String type) {
long resourcesTaken = 0;
for (Schedulable sched : schedulables) {
long share = computeShare(sched, w2rRatio, type);
resourcesTaken = safeAdd(resourcesTaken, share);
if (resourcesTaken == Long.MAX_VALUE) {
return resourcesTaken;
* Compute the resources assigned to a Schedulable given a particular
* weight-to-resource ratio w2rRatio.
private static long computeShare(Schedulable sched, double w2rRatio,
String type) {
double share = sched.getWeight() * w2rRatio;
share = Math.max(share, sched.getMinShare().getResourceValue(type));
share = Math.min(share, sched.getMaxShare().getResourceValue(type));
return (long) share;
* Helper method to handle Schedulabes with fixed fairshares.
* Returns the resources taken by fixed fairshare schedulables,
* and adds the remaining to the passed nonFixedSchedulables.
private static long handleFixedFairShares(
Collection<? extends Schedulable> schedulables,
Collection<Schedulable> nonFixedSchedulables,
boolean isSteadyShare, String type) {
long totalResource = 0;
for (Schedulable sched : schedulables) {
long fixedShare = getFairShareIfFixed(sched, isSteadyShare, type);
if (fixedShare < 0) {
} else {
Resource target;
if (isSteadyShare) {
target = ((FSQueue)sched).getSteadyFairShare();
} else {
target = sched.getFairShare();
target.setResourceValue(type, fixedShare);
totalResource = safeAdd(totalResource, fixedShare);
return totalResource;
* Get the fairshare for the {@link Schedulable} if it is fixed,
* -1 otherwise.
* The fairshare is fixed if either the maxShare is 0, weight is 0,
* or the Schedulable is not active for instantaneous fairshare.
private static long getFairShareIfFixed(Schedulable sched,
boolean isSteadyShare, String type) {
// Check if maxShare is 0
if (sched.getMaxShare().getResourceValue(type) <= 0) {
return 0;
// For instantaneous fairshares, check if queue is active
if (!isSteadyShare &&
(sched instanceof FSQueue) && !((FSQueue)sched).isActive()) {
return 0;
// Check if weight is 0
if (sched.getWeight() <= 0) {
long minShare = sched.getMinShare().getResourceValue(type);
return (minShare <= 0) ? 0 : minShare;
return -1;
* Safely add two long values. The result will always be a valid long value.
* If the addition caused an overflow the return value will be set to
* <code>Long.MAX_VALUE</code>.
* @param a first long to add
* @param b second long to add
* @return result of the addition
private static long safeAdd(long a, long b) {
try {
return addExact(a, b);
} catch (ArithmeticException ae) {
return Long.MAX_VALUE;