blob: 9eda46cd8c216f75549fc592e14d3cc706cf40a5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import java.util.Collection;
import java.util.Comparator;
import java.util.concurrent.ConcurrentHashMap;
@Public
@Evolving
public abstract class SchedulingPolicy {
private static final ConcurrentHashMap<Class<? extends SchedulingPolicy>, SchedulingPolicy> instances =
new ConcurrentHashMap<Class<? extends SchedulingPolicy>, SchedulingPolicy>();
public static final SchedulingPolicy DEFAULT_POLICY =
getInstance(FairSharePolicy.class);
public static final byte DEPTH_LEAF = (byte) 1;
public static final byte DEPTH_INTERMEDIATE = (byte) 2;
public static final byte DEPTH_ROOT = (byte) 4;
public static final byte DEPTH_PARENT = (byte) 6; // Root and Intermediate
public static final byte DEPTH_ANY = (byte) 7;
/**
* Returns a {@link SchedulingPolicy} instance corresponding to the passed clazz
*/
public static SchedulingPolicy getInstance(Class<? extends SchedulingPolicy> clazz) {
SchedulingPolicy policy = ReflectionUtils.newInstance(clazz, null);
SchedulingPolicy policyRet = instances.putIfAbsent(clazz, policy);
if(policyRet != null) {
return policyRet;
}
return policy;
}
/**
* Returns {@link SchedulingPolicy} instance corresponding to the
* {@link SchedulingPolicy} passed as a string. The policy can be "fair" for
* FairSharePolicy, "fifo" for FifoPolicy, or "drf" for
* DominantResourceFairnessPolicy. For a custom
* {@link SchedulingPolicy}s in the RM classpath, the policy should be
* canonical class name of the {@link SchedulingPolicy}.
*
* @param policy canonical class name or "drf" or "fair" or "fifo"
* @throws AllocationConfigurationException
*/
@SuppressWarnings("unchecked")
public static SchedulingPolicy parse(String policy)
throws AllocationConfigurationException {
@SuppressWarnings("rawtypes")
Class clazz;
String text = StringUtils.toLowerCase(policy);
if (text.equalsIgnoreCase(FairSharePolicy.NAME)) {
clazz = FairSharePolicy.class;
} else if (text.equalsIgnoreCase(FifoPolicy.NAME)) {
clazz = FifoPolicy.class;
} else if (text.equalsIgnoreCase(DominantResourceFairnessPolicy.NAME)) {
clazz = DominantResourceFairnessPolicy.class;
} else {
try {
clazz = Class.forName(policy);
} catch (ClassNotFoundException cnfe) {
throw new AllocationConfigurationException(policy
+ " SchedulingPolicy class not found!");
}
}
if (!SchedulingPolicy.class.isAssignableFrom(clazz)) {
throw new AllocationConfigurationException(policy
+ " does not extend SchedulingPolicy");
}
return getInstance(clazz);
}
public void initialize(Resource clusterCapacity) {}
/**
* The {@link ResourceCalculator} returned by this method should be used
* for any calculations involving resources.
*
* @return ResourceCalculator instance to use
*/
public abstract ResourceCalculator getResourceCalculator();
/**
* @return returns the name of {@link SchedulingPolicy}
*/
public abstract String getName();
/**
* Specifies the depths in the hierarchy, this {@link SchedulingPolicy}
* applies to
*
* @return depth equal to one of fields {@link SchedulingPolicy}#DEPTH_*
*/
public abstract byte getApplicableDepth();
/**
* Checks if the specified {@link SchedulingPolicy} can be used for a queue at
* the specified depth in the hierarchy
*
* @param policy {@link SchedulingPolicy} we are checking the
* depth-applicability for
* @param depth queue's depth in the hierarchy
* @return true if policy is applicable to passed depth, false otherwise
*/
public static boolean isApplicableTo(SchedulingPolicy policy, byte depth) {
return ((policy.getApplicableDepth() & depth) == depth) ? true : false;
}
/**
* The comparator returned by this method is to be used for sorting the
* {@link Schedulable}s in that queue.
*
* @return the comparator to sort by
*/
public abstract Comparator<Schedulable> getComparator();
/**
* Computes and updates the shares of {@link Schedulable}s as per
* the {@link SchedulingPolicy}, to be used later for scheduling decisions.
* The shares computed are instantaneous and only consider queues with
* running applications.
*
* @param schedulables {@link Schedulable}s whose shares are to be updated
* @param totalResources Total {@link Resource}s in the cluster
*/
public abstract void computeShares(
Collection<? extends Schedulable> schedulables, Resource totalResources);
/**
* Computes and updates the steady shares of {@link FSQueue}s as per the
* {@link SchedulingPolicy}. The steady share does not differentiate
* between queues with and without running applications under them. The
* steady share is not used for scheduling, it is displayed on the Web UI
* for better visibility.
*
* @param queues {@link FSQueue}s whose shares are to be updated
* @param totalResources Total {@link Resource}s in the cluster
*/
public abstract void computeSteadyShares(
Collection<? extends FSQueue> queues, Resource totalResources);
/**
* Check if the resource usage is over the fair share under this policy
*
* @param usage {@link Resource} the resource usage
* @param fairShare {@link Resource} the fair share
* @return true if check passes (is over) or false otherwise
*/
public abstract boolean checkIfUsageOverFairShare(
Resource usage, Resource fairShare);
/**
* Get headroom by calculating the min of <code>clusterAvailable</code> and
* (<code>queueFairShare</code> - <code>queueUsage</code>) resources that are
* applicable to this policy. For eg if only memory then leave other
* resources such as CPU to same as clusterAvailable.
*
* @param queueFairShare fairshare in the queue
* @param queueUsage resources used in the queue
* @param maxAvailable available resource in cluster for this queue
* @return calculated headroom
*/
public abstract Resource getHeadroom(Resource queueFairShare,
Resource queueUsage, Resource maxAvailable);
}