blob: 0ef90a1d72f7be824627f86ca850aabd63d21a59 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies;
import java.io.Serializable;
import java.util.Collection;
import java.util.Comparator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
/**
* Makes scheduling decisions by trying to equalize shares of memory.
*/
@Private
@Unstable
public class FairSharePolicy extends SchedulingPolicy {
private static final Log LOG = LogFactory.getLog(FairSharePolicy.class);
@VisibleForTesting
public static final String NAME = "fair";
private static final DefaultResourceCalculator RESOURCE_CALCULATOR =
new DefaultResourceCalculator();
private static final FairShareComparator COMPARATOR =
new FairShareComparator();
@Override
public String getName() {
return NAME;
}
/**
* Compare Schedulables mainly via fair share usage to meet fairness.
* Specifically, it goes through following four steps.
*
* 1. Compare demands. Schedulables without resource demand get lower priority
* than ones who have demands.
*
* 2. Compare min share usage. Schedulables below their min share are compared
* by how far below it they are as a ratio. For example, if job A has 8 out
* of a min share of 10 tasks and job B has 50 out of a min share of 100,
* then job B is scheduled next, because B is at 50% of its min share and A
* is at 80% of its min share.
*
* 3. Compare fair share usage. Schedulables above their min share are
* compared by fair share usage by checking (resource usage / weight).
* If all weights are equal, slots are given to the job with the fewest tasks;
* otherwise, jobs with more weight get proportionally more slots. If weight
* equals to 0, we can't compare Schedulables by (resource usage/weight).
* There are two situations: 1)All weights equal to 0, slots are given
* to one with less resource usage. 2)Only one of weight equals to 0, slots
* are given to the one with non-zero weight.
*
* 4. Break the tie by compare submit time and job name.
*/
private static class FairShareComparator implements Comparator<Schedulable>,
Serializable {
private static final long serialVersionUID = 5564969375856699313L;
private static final Resource ONE = Resources.createResource(1);
@Override
public int compare(Schedulable s1, Schedulable s2) {
int res = compareDemand(s1, s2);
// Pre-compute resource usages to avoid duplicate calculation
Resource resourceUsage1 = s1.getResourceUsage();
Resource resourceUsage2 = s2.getResourceUsage();
if (res == 0) {
res = compareMinShareUsage(s1, s2, resourceUsage1, resourceUsage2);
}
if (res == 0) {
res = compareFairShareUsage(s1, s2, resourceUsage1, resourceUsage2);
}
// Break the tie by submit time
if (res == 0) {
res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
}
// Break the tie by job name
if (res == 0) {
res = s1.getName().compareTo(s2.getName());
}
return res;
}
private int compareDemand(Schedulable s1, Schedulable s2) {
int res = 0;
Resource demand1 = s1.getDemand();
Resource demand2 = s2.getDemand();
if (demand1.equals(Resources.none()) && Resources.greaterThan(
RESOURCE_CALCULATOR, null, demand2, Resources.none())) {
res = 1;
} else if (demand2.equals(Resources.none()) && Resources.greaterThan(
RESOURCE_CALCULATOR, null, demand1, Resources.none())) {
res = -1;
}
return res;
}
private int compareMinShareUsage(Schedulable s1, Schedulable s2,
Resource resourceUsage1, Resource resourceUsage2) {
int res;
Resource minShare1 = Resources.min(RESOURCE_CALCULATOR, null,
s1.getMinShare(), s1.getDemand());
Resource minShare2 = Resources.min(RESOURCE_CALCULATOR, null,
s2.getMinShare(), s2.getDemand());
boolean s1Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
resourceUsage1, minShare1);
boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
resourceUsage2, minShare2);
if (s1Needy && !s2Needy) {
res = -1;
} else if (s2Needy && !s1Needy) {
res = 1;
} else if (s1Needy && s2Needy) {
double minShareRatio1 = (double) resourceUsage1.getMemorySize() /
Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE)
.getMemorySize();
double minShareRatio2 = (double) resourceUsage2.getMemorySize() /
Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE)
.getMemorySize();
res = (int) Math.signum(minShareRatio1 - minShareRatio2);
} else {
res = 0;
}
return res;
}
/**
* To simplify computation, use weights instead of fair shares to calculate
* fair share usage.
*/
private int compareFairShareUsage(Schedulable s1, Schedulable s2,
Resource resourceUsage1, Resource resourceUsage2) {
double weight1 = s1.getWeights().getWeight(ResourceType.MEMORY);
double weight2 = s2.getWeights().getWeight(ResourceType.MEMORY);
double useToWeightRatio1;
double useToWeightRatio2;
if (weight1 > 0.0 && weight2 > 0.0) {
useToWeightRatio1 = resourceUsage1.getMemorySize() / weight1;
useToWeightRatio2 = resourceUsage2.getMemorySize() / weight2;
} else { // Either weight1 or weight2 equals to 0
if (weight1 == weight2) {
// If they have same weight, just compare usage
useToWeightRatio1 = resourceUsage1.getMemorySize();
useToWeightRatio2 = resourceUsage2.getMemorySize();
} else {
// By setting useToWeightRatios to negative weights, we give the
// zero-weight one less priority, so the non-zero weight one will
// be given slots.
useToWeightRatio1 = -weight1;
useToWeightRatio2 = -weight2;
}
}
return (int) Math.signum(useToWeightRatio1 - useToWeightRatio2);
}
}
@Override
public Comparator<Schedulable> getComparator() {
return COMPARATOR;
}
@Override
public ResourceCalculator getResourceCalculator() {
return RESOURCE_CALCULATOR;
}
@Override
public Resource getHeadroom(Resource queueFairShare,
Resource queueUsage, Resource maxAvailable) {
long queueAvailableMemory = Math.max(
queueFairShare.getMemorySize() - queueUsage.getMemorySize(), 0);
Resource headroom = Resources.createResource(
Math.min(maxAvailable.getMemorySize(), queueAvailableMemory),
maxAvailable.getVirtualCores());
return headroom;
}
@Override
public void computeShares(Collection<? extends Schedulable> schedulables,
Resource totalResources) {
ComputeFairShares.computeShares(schedulables, totalResources, ResourceType.MEMORY);
}
@Override
public void computeSteadyShares(Collection<? extends FSQueue> queues,
Resource totalResources) {
ComputeFairShares.computeSteadyShares(queues, totalResources,
ResourceType.MEMORY);
}
@Override
public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) {
return Resources.greaterThan(RESOURCE_CALCULATOR, null, usage, fairShare);
}
@Override
public boolean isChildPolicyAllowed(SchedulingPolicy childPolicy) {
if (childPolicy instanceof DominantResourceFairnessPolicy) {
LOG.error("Queue policy can't be " + DominantResourceFairnessPolicy.NAME
+ " if the parent policy is " + getName() + ". Choose " +
getName() + " or " + FifoPolicy.NAME + " for child queues instead."
+ " Please note that " + FifoPolicy.NAME
+ " is only for leaf queues.");
return false;
}
return true;
}
}