blob: 66bb88bf16c8157774f9de5aa1d8f68f29fc2e16 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies;
import java.io.Serializable;
import java.util.Collection;
import java.util.Comparator;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
/**
* Makes scheduling decisions by trying to equalize shares of memory.
*/
@Private
@Unstable
public class FairSharePolicy extends SchedulingPolicy {
@VisibleForTesting
public static final String NAME = "fair";
private static final DefaultResourceCalculator RESOURCE_CALCULATOR =
new DefaultResourceCalculator();
private FairShareComparator comparator = new FairShareComparator();
@Override
public String getName() {
return NAME;
}
/**
* Compare Schedulables via weighted fair sharing. In addition, Schedulables
* below their min share get priority over those whose min share is met.
*
* Schedulables below their min share are compared by how far below it they
* are as a ratio. For example, if job A has 8 out of a min share of 10 tasks
* and job B has 50 out of a min share of 100, then job B is scheduled next,
* because B is at 50% of its min share and A is at 80% of its min share.
*
* Schedulables above their min share are compared by (runningTasks / weight).
* If all weights are equal, slots are given to the job with the fewest tasks;
* otherwise, jobs with more weight get proportionally more slots.
*/
private static class FairShareComparator implements Comparator<Schedulable>,
Serializable {
private static final long serialVersionUID = 5564969375856699313L;
private static final Resource ONE = Resources.createResource(1);
@Override
public int compare(Schedulable s1, Schedulable s2) {
double minShareRatio1, minShareRatio2;
double useToWeightRatio1, useToWeightRatio2;
Resource minShare1 = Resources.min(RESOURCE_CALCULATOR, null,
s1.getMinShare(), s1.getDemand());
Resource minShare2 = Resources.min(RESOURCE_CALCULATOR, null,
s2.getMinShare(), s2.getDemand());
boolean s1Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
s1.getResourceUsage(), minShare1);
boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
s2.getResourceUsage(), minShare2);
minShareRatio1 = (double) s1.getResourceUsage().getMemory()
/ Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE).getMemory();
minShareRatio2 = (double) s2.getResourceUsage().getMemory()
/ Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE).getMemory();
useToWeightRatio1 = s1.getResourceUsage().getMemory() /
s1.getWeights().getWeight(ResourceType.MEMORY);
useToWeightRatio2 = s2.getResourceUsage().getMemory() /
s2.getWeights().getWeight(ResourceType.MEMORY);
int res = 0;
if (s1Needy && !s2Needy)
res = -1;
else if (s2Needy && !s1Needy)
res = 1;
else if (s1Needy && s2Needy)
res = (int) Math.signum(minShareRatio1 - minShareRatio2);
else
// Neither schedulable is needy
res = (int) Math.signum(useToWeightRatio1 - useToWeightRatio2);
if (res == 0) {
// Apps are tied in fairness ratio. Break the tie by submit time and job
// name to get a deterministic ordering, which is useful for unit tests.
res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
if (res == 0)
res = s1.getName().compareTo(s2.getName());
}
return res;
}
}
@Override
public Comparator<Schedulable> getComparator() {
return comparator;
}
@Override
public void computeShares(Collection<? extends Schedulable> schedulables,
Resource totalResources) {
ComputeFairShares.computeShares(schedulables, totalResources, ResourceType.MEMORY);
}
@Override
public void computeSteadyShares(Collection<? extends FSQueue> queues,
Resource totalResources) {
ComputeFairShares.computeSteadyShares(queues, totalResources,
ResourceType.MEMORY);
}
@Override
public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) {
return Resources.greaterThan(RESOURCE_CALCULATOR, null, usage, fairShare);
}
@Override
public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) {
return usage.getMemory() > maxAMResource.getMemory();
}
@Override
public byte getApplicableDepth() {
return SchedulingPolicy.DEPTH_ANY;
}
}