blob: a7a321c5f9823091622e9959f05d2d1e28f37664 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.Serializable;
import java.util.Collection;
import java.util.Comparator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource;
/**
* Utility class containing scheduling algorithms used in the fair scheduler.
*/
@Private
@Unstable
class SchedulingAlgorithms {
public static final Log LOG = LogFactory.getLog(
SchedulingAlgorithms.class.getName());
/**
* Compare Schedulables in order of priority and then submission time, as in
* the default FIFO scheduler in Hadoop.
*/
public static class FifoComparator implements Comparator<Schedulable>, Serializable {
private static final long serialVersionUID = -5905036205491177060L;
@Override
public int compare(Schedulable s1, Schedulable s2) {
int res = s1.getPriority().compareTo(s2.getPriority());
if (res == 0) {
res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
}
if (res == 0) {
// In the rare case where jobs were submitted at the exact same time,
// compare them by name (which will be the JobID) to get a deterministic
// ordering, so we don't alternately launch tasks from different jobs.
res = s1.getName().compareTo(s2.getName());
}
return res;
}
}
/**
* Compare Schedulables via weighted fair sharing. In addition, Schedulables
* below their min share get priority over those whose min share is met.
*
* Schedulables below their min share are compared by how far below it they
* are as a ratio. For example, if job A has 8 out of a min share of 10 tasks
* and job B has 50 out of a min share of 100, then job B is scheduled next,
* because B is at 50% of its min share and A is at 80% of its min share.
*
* Schedulables above their min share are compared by (runningTasks / weight).
* If all weights are equal, slots are given to the job with the fewest tasks;
* otherwise, jobs with more weight get proportionally more slots.
*/
public static class FairShareComparator implements Comparator<Schedulable>, Serializable {
private static final long serialVersionUID = 5564969375856699313L;
@Override
public int compare(Schedulable s1, Schedulable s2) {
double minShareRatio1, minShareRatio2;
double useToWeightRatio1, useToWeightRatio2;
Resource minShare1 = Resources.min(s1.getMinShare(), s1.getDemand());
Resource minShare2 = Resources.min(s2.getMinShare(), s2.getDemand());
boolean s1Needy = Resources.lessThan(s1.getResourceUsage(), minShare1);
boolean s2Needy = Resources.lessThan(s2.getResourceUsage(), minShare2);
Resource one = Resources.createResource(1);
minShareRatio1 = (double) s1.getResourceUsage().getMemory() /
Resources.max(minShare1, one).getMemory();
minShareRatio2 = (double) s2.getResourceUsage().getMemory() /
Resources.max(minShare2, one).getMemory();
useToWeightRatio1 = s1.getResourceUsage().getMemory() / s1.getWeight();
useToWeightRatio2 = s2.getResourceUsage().getMemory() / s2.getWeight();
int res = 0;
if (s1Needy && !s2Needy)
res = -1;
else if (s2Needy && !s1Needy)
res = 1;
else if (s1Needy && s2Needy)
res = (int) Math.signum(minShareRatio1 - minShareRatio2);
else // Neither schedulable is needy
res = (int) Math.signum(useToWeightRatio1 - useToWeightRatio2);
if (res == 0) {
// Apps are tied in fairness ratio. Break the tie by submit time and job
// name to get a deterministic ordering, which is useful for unit tests.
res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
if (res == 0)
res = s1.getName().compareTo(s2.getName());
}
return res;
}
}
/**
* Number of iterations for the binary search in computeFairShares. This is
* equivalent to the number of bits of precision in the output. 25 iterations
* gives precision better than 0.1 slots in clusters with one million slots.
*/
private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25;
/**
* Given a set of Schedulables and a number of slots, compute their weighted
* fair shares. The min shares and demands of the Schedulables are assumed to
* be set beforehand. We compute the fairest possible allocation of shares
* to the Schedulables that respects their min shares and demands.
*
* To understand what this method does, we must first define what weighted
* fair sharing means in the presence of minimum shares and demands. If there
* were no minimum shares and every Schedulable had an infinite demand (i.e.
* could launch infinitely many tasks), then weighted fair sharing would be
* achieved if the ratio of slotsAssigned / weight was equal for each
* Schedulable and all slots were assigned. Minimum shares and demands add
* two further twists:
* - Some Schedulables may not have enough tasks to fill all their share.
* - Some Schedulables may have a min share higher than their assigned share.
*
* To deal with these possibilities, we define an assignment of slots as
* being fair if there exists a ratio R such that:
* - Schedulables S where S.demand < R * S.weight are assigned share S.demand
* - Schedulables S where S.minShare > R * S.weight are given share S.minShare
* - All other Schedulables S are assigned share R * S.weight
* - The sum of all the shares is totalSlots.
*
* We call R the weight-to-slots ratio because it converts a Schedulable's
* weight to the number of slots it is assigned.
*
* We compute a fair allocation by finding a suitable weight-to-slot ratio R.
* To do this, we use binary search. Given a ratio R, we compute the number
* of slots that would be used in total with this ratio (the sum of the shares
* computed using the conditions above). If this number of slots is less than
* totalSlots, then R is too small and more slots could be assigned. If the
* number of slots is more than totalSlots, then R is too large.
*
* We begin the binary search with a lower bound on R of 0 (which means that
* all Schedulables are only given their minShare) and an upper bound computed
* to be large enough that too many slots are given (by doubling R until we
* either use more than totalSlots slots or we fulfill all jobs' demands).
* The helper method slotsUsedWithWeightToSlotRatio computes the total number
* of slots used with a given value of R.
*
* The running time of this algorithm is linear in the number of Schedulables,
* because slotsUsedWithWeightToSlotRatio is linear-time and the number of
* iterations of binary search is a constant (dependent on desired precision).
*/
public static void computeFairShares(
Collection<? extends Schedulable> schedulables, Resource totalResources) {
// Find an upper bound on R that we can use in our binary search. We start
// at R = 1 and double it until we have either used totalSlots slots or we
// have met all Schedulables' demands (if total demand < totalSlots).
Resource totalDemand = Resources.createResource(0);
for (Schedulable sched: schedulables) {
Resources.addTo(totalDemand, sched.getDemand());
}
Resource cap = Resources.min(totalDemand, totalResources);
double rMax = 1.0;
while (Resources.lessThan(resUsedWithWeightToResRatio(rMax, schedulables), cap)) {
rMax *= 2.0;
}
// Perform the binary search for up to COMPUTE_FAIR_SHARES_ITERATIONS steps
double left = 0;
double right = rMax;
for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {
double mid = (left + right) / 2.0;
if (Resources.lessThan(resUsedWithWeightToResRatio(mid, schedulables), cap)) {
left = mid;
} else {
right = mid;
}
}
// Set the fair shares based on the value of R we've converged to
for (Schedulable sched: schedulables) {
sched.setFairShare(computeShare(sched, right));
}
}
/**
* Compute the number of slots that would be used given a weight-to-slot
* ratio w2sRatio, for use in the computeFairShares algorithm as described
* in #{@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
*/
private static Resource resUsedWithWeightToResRatio(double w2sRatio,
Collection<? extends Schedulable> schedulables) {
Resource slotsTaken = Resources.createResource(0);
for (Schedulable sched: schedulables) {
Resource share = computeShare(sched, w2sRatio);
Resources.addTo(slotsTaken, share);
}
return slotsTaken;
}
/**
* Compute the resources assigned to a Schedulable given a particular
* res-to-slot ratio r2sRatio, for use in computeFairShares as described
* in #{@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
*/
private static Resource computeShare(Schedulable sched, double r2sRatio) {
double share = sched.getWeight() * r2sRatio;
share = Math.max(share, sched.getMinShare().getMemory());
share = Math.min(share, sched.getDemand().getMemory());
return Resources.createResource((int) share);
}
}