| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; |
| |
| import org.apache.hadoop.classification.InterfaceAudience.Public; |
| import org.apache.hadoop.classification.InterfaceStability.Evolving; |
| import org.apache.hadoop.util.ReflectionUtils; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; |
| |
| |
| import org.apache.hadoop.yarn.util.resource.ResourceCalculator; |
| |
| import java.util.Collection; |
| import java.util.Comparator; |
| import java.util.concurrent.ConcurrentHashMap; |
| |
| /** |
| * The SchedulingPolicy is used by the fair scheduler mainly to determine |
| * what a queue's fair share and steady fair share should be as well as |
| * calculating available headroom. This determines how resources can be |
| * shared between running applications within a queue. |
| * <p> |
| * Every queue has a policy, including parents and children. If a child |
| * queue doesn't specify one, it inherits the parent's policy. |
| * The policy for a child queue must be compatible with the policy of |
| * the parent queue; there are some combinations that aren't allowed. |
| * See {@link SchedulingPolicy#isChildPolicyAllowed(SchedulingPolicy)}. |
| * The policy for a queue is specified by setting property |
| * <i>schedulingPolicy</i> in the fair scheduler configuration file. |
| * The default policy is {@link FairSharePolicy} if not specified. |
| */ |
| @Public |
| @Evolving |
| public abstract class SchedulingPolicy { |
| private static final ConcurrentHashMap<Class<? extends SchedulingPolicy>, SchedulingPolicy> instances = |
| new ConcurrentHashMap<Class<? extends SchedulingPolicy>, SchedulingPolicy>(); |
| |
| public static final SchedulingPolicy DEFAULT_POLICY = |
| getInstance(FairSharePolicy.class); |
| |
| /** |
| * Returns a {@link SchedulingPolicy} instance corresponding |
| * to the passed clazz. |
| * |
| * @param clazz a class that extends {@link SchedulingPolicy} |
| * @return a {@link SchedulingPolicy} instance |
| */ |
| public static SchedulingPolicy getInstance( |
| Class<? extends SchedulingPolicy> clazz) { |
| SchedulingPolicy policy = ReflectionUtils.newInstance(clazz, null); |
| SchedulingPolicy policyRet = instances.putIfAbsent(clazz, policy); |
| if(policyRet != null) { |
| return policyRet; |
| } |
| return policy; |
| } |
| |
| /** |
| * Returns {@link SchedulingPolicy} instance corresponding to the |
| * {@link SchedulingPolicy} passed as a string. The policy can be "fair" for |
| * FairSharePolicy, "fifo" for FifoPolicy, or "drf" for |
| * DominantResourceFairnessPolicy. For a custom |
| * {@link SchedulingPolicy}s in the RM classpath, the policy should be |
| * canonical class name of the {@link SchedulingPolicy}. |
| * |
| * @param policy canonical class name or "drf" or "fair" or "fifo" |
| * @return a {@link SchedulingPolicy} instance parsed from given policy |
| * @throws AllocationConfigurationException |
| * |
| */ |
| @SuppressWarnings("unchecked") |
| public static SchedulingPolicy parse(String policy) |
| throws AllocationConfigurationException { |
| @SuppressWarnings("rawtypes") |
| Class clazz; |
| String text = StringUtils.toLowerCase(policy); |
| if (text.equalsIgnoreCase(FairSharePolicy.NAME)) { |
| clazz = FairSharePolicy.class; |
| } else if (text.equalsIgnoreCase(FifoPolicy.NAME)) { |
| clazz = FifoPolicy.class; |
| } else if (text.equalsIgnoreCase(DominantResourceFairnessPolicy.NAME)) { |
| clazz = DominantResourceFairnessPolicy.class; |
| } else { |
| try { |
| clazz = Class.forName(policy); |
| } catch (ClassNotFoundException cnfe) { |
| throw new AllocationConfigurationException(policy |
| + " SchedulingPolicy class not found!"); |
| } |
| } |
| if (!SchedulingPolicy.class.isAssignableFrom(clazz)) { |
| throw new AllocationConfigurationException(policy |
| + " does not extend SchedulingPolicy"); |
| } |
| return getInstance(clazz); |
| } |
| |
| /** |
| * Initialize the scheduling policy with cluster resources. |
| * @deprecated Since it doesn't track cluster resource changes, replaced by |
| * {@link #initialize(FSContext)}. |
| * |
| * @param clusterCapacity cluster resources |
| */ |
| @Deprecated |
| public void initialize(Resource clusterCapacity) {} |
| |
| /** |
| * Initialize the scheduling policy with a {@link FSContext} object, which has |
| * a pointer to the cluster resources among other information. |
| * |
| * @param fsContext a {@link FSContext} object which has a pointer to the |
| * cluster resources |
| */ |
| public void initialize(FSContext fsContext) {} |
| |
| /** |
| * The {@link ResourceCalculator} returned by this method should be used |
| * for any calculations involving resources. |
| * |
| * @return ResourceCalculator instance to use |
| */ |
| public abstract ResourceCalculator getResourceCalculator(); |
| |
| /** |
| * @return returns the name of {@link SchedulingPolicy} |
| */ |
| public abstract String getName(); |
| |
| /** |
| * The comparator returned by this method is to be used for sorting the |
| * {@link Schedulable}s in that queue. |
| * |
| * @return the comparator to sort by |
| */ |
| public abstract Comparator<Schedulable> getComparator(); |
| |
| /** |
| * Computes and updates the shares of {@link Schedulable}s as per |
| * the {@link SchedulingPolicy}, to be used later for scheduling decisions. |
| * The shares computed are instantaneous and only consider queues with |
| * running applications. |
| * |
| * @param schedulables {@link Schedulable}s whose shares are to be updated |
| * @param totalResources Total {@link Resource}s in the cluster |
| */ |
| public abstract void computeShares( |
| Collection<? extends Schedulable> schedulables, Resource totalResources); |
| |
| /** |
| * Computes and updates the steady shares of {@link FSQueue}s as per the |
| * {@link SchedulingPolicy}. The steady share does not differentiate |
| * between queues with and without running applications under them. The |
| * steady share is not used for scheduling, it is displayed on the Web UI |
| * for better visibility. |
| * |
| * @param queues {@link FSQueue}s whose shares are to be updated |
| * @param totalResources Total {@link Resource}s in the cluster |
| */ |
| public abstract void computeSteadyShares( |
| Collection<? extends FSQueue> queues, Resource totalResources); |
| |
| /** |
| * Check if the resource usage is over the fair share under this policy. |
| * |
| * @param usage {@link Resource} the resource usage |
| * @param fairShare {@link Resource} the fair share |
| * @return true if check passes (is over) or false otherwise |
| */ |
| public abstract boolean checkIfUsageOverFairShare( |
| Resource usage, Resource fairShare); |
| |
| /** |
| * Get headroom by calculating the min of {@code clusterAvailable} and |
| * ({@code queueFairShare} - {@code queueUsage}) resources that are |
| * applicable to this policy. For eg if only memory then leave other |
| * resources such as CPU to same as {@code clusterAvailable}. |
| * |
| * @param queueFairShare fairshare in the queue |
| * @param queueUsage resources used in the queue |
| * @param maxAvailable available resource in cluster for this queue |
| * @return calculated headroom |
| */ |
| public abstract Resource getHeadroom(Resource queueFairShare, |
| Resource queueUsage, Resource maxAvailable); |
| |
| /** |
| * Check whether the policy of a child queue is allowed. |
| * |
| * @param childPolicy the policy of child queue |
| * @return true if the child policy is allowed; false otherwise |
| */ |
| public boolean isChildPolicyAllowed(SchedulingPolicy childPolicy) { |
| return true; |
| } |
| } |