| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; |
| |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.authorize.AccessControlList; |
| import org.apache.hadoop.yarn.api.records.QueueACL; |
| import org.apache.hadoop.yarn.api.records.ReservationACL; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; |
| import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; |
| import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; |
| import org.apache.hadoop.yarn.util.resource.ResourceCalculator; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| |
| public class AllocationConfiguration extends ReservationSchedulerConfiguration { |
| private static final Log LOG = LogFactory.getLog(FSQueue.class.getName()); |
| private static final AccessControlList EVERYBODY_ACL = new AccessControlList("*"); |
| private static final AccessControlList NOBODY_ACL = new AccessControlList(" "); |
| private static final ResourceCalculator RESOURCE_CALCULATOR = |
| new DefaultResourceCalculator(); |
| // Minimum resource allocation for each queue |
| private final Map<String, Resource> minQueueResources; |
| // Maximum amount of resources per queue |
| @VisibleForTesting |
| final Map<String, Resource> maxQueueResources; |
| // Maximum amount of resources for each queue's ad hoc children |
| private final Map<String, Resource> maxChildQueueResources; |
| // Sharing weights for each queue |
| private final Map<String, ResourceWeights> queueWeights; |
| |
| // Max concurrent running applications for each queue and for each user; in addition, |
| // for users that have no max specified, we use the userMaxJobsDefault. |
| @VisibleForTesting |
| final Map<String, Integer> queueMaxApps; |
| @VisibleForTesting |
| final Map<String, Integer> userMaxApps; |
| private final int userMaxAppsDefault; |
| private final int queueMaxAppsDefault; |
| private final Resource queueMaxResourcesDefault; |
| |
| // Maximum resource share for each leaf queue that can be used to run AMs |
| final Map<String, Float> queueMaxAMShares; |
| private final float queueMaxAMShareDefault; |
| |
| // ACL's for each queue. Only specifies non-default ACL's from configuration. |
| private final Map<String, Map<QueueACL, AccessControlList>> queueAcls; |
| |
| // Reservation ACL's for each queue. Only specifies non-default ACL's from |
| // configuration. |
| private final Map<String, Map<ReservationACL, AccessControlList>> resAcls; |
| |
| // Min share preemption timeout for each queue in seconds. If a job in the queue |
| // waits this long without receiving its guaranteed share, it is allowed to |
| // preempt other jobs' tasks. |
| private final Map<String, Long> minSharePreemptionTimeouts; |
| |
| // Fair share preemption timeout for each queue in seconds. If a job in the |
| // queue waits this long without receiving its fair share threshold, it is |
| // allowed to preempt other jobs' tasks. |
| private final Map<String, Long> fairSharePreemptionTimeouts; |
| |
| // The fair share preemption threshold for each queue. If a queue waits |
| // fairSharePreemptionTimeout without receiving |
| // fairshare * fairSharePreemptionThreshold resources, it is allowed to |
| // preempt other queues' tasks. |
| private final Map<String, Float> fairSharePreemptionThresholds; |
| |
| private final Set<String> reservableQueues; |
| |
| private final Map<String, SchedulingPolicy> schedulingPolicies; |
| |
| private final SchedulingPolicy defaultSchedulingPolicy; |
| |
| // Policy for mapping apps to queues |
| @VisibleForTesting |
| QueuePlacementPolicy placementPolicy; |
| |
| //Configured queues in the alloc xml |
| @VisibleForTesting |
| Map<FSQueueType, Set<String>> configuredQueues; |
| |
| // Reservation system configuration |
| private ReservationQueueConfiguration globalReservationQueueConfig; |
| |
| private final Set<String> nonPreemptableQueues; |
| |
| public AllocationConfiguration(Map<String, Resource> minQueueResources, |
| Map<String, Resource> maxQueueResources, |
| Map<String, Resource> maxChildQueueResources, |
| Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps, |
| Map<String, ResourceWeights> queueWeights, |
| Map<String, Float> queueMaxAMShares, int userMaxAppsDefault, |
| int queueMaxAppsDefault, Resource queueMaxResourcesDefault, |
| float queueMaxAMShareDefault, |
| Map<String, SchedulingPolicy> schedulingPolicies, |
| SchedulingPolicy defaultSchedulingPolicy, |
| Map<String, Long> minSharePreemptionTimeouts, |
| Map<String, Long> fairSharePreemptionTimeouts, |
| Map<String, Float> fairSharePreemptionThresholds, |
| Map<String, Map<QueueACL, AccessControlList>> queueAcls, |
| Map<String, Map<ReservationACL, AccessControlList>> resAcls, |
| QueuePlacementPolicy placementPolicy, |
| Map<FSQueueType, Set<String>> configuredQueues, |
| ReservationQueueConfiguration globalReservationQueueConfig, |
| Set<String> reservableQueues, |
| Set<String> nonPreemptableQueues) { |
| this.minQueueResources = minQueueResources; |
| this.maxQueueResources = maxQueueResources; |
| this.maxChildQueueResources = maxChildQueueResources; |
| this.queueMaxApps = queueMaxApps; |
| this.userMaxApps = userMaxApps; |
| this.queueMaxAMShares = queueMaxAMShares; |
| this.queueWeights = queueWeights; |
| this.userMaxAppsDefault = userMaxAppsDefault; |
| this.queueMaxResourcesDefault = queueMaxResourcesDefault; |
| this.queueMaxAppsDefault = queueMaxAppsDefault; |
| this.queueMaxAMShareDefault = queueMaxAMShareDefault; |
| this.defaultSchedulingPolicy = defaultSchedulingPolicy; |
| this.schedulingPolicies = schedulingPolicies; |
| this.minSharePreemptionTimeouts = minSharePreemptionTimeouts; |
| this.fairSharePreemptionTimeouts = fairSharePreemptionTimeouts; |
| this.fairSharePreemptionThresholds = fairSharePreemptionThresholds; |
| this.queueAcls = queueAcls; |
| this.resAcls = resAcls; |
| this.reservableQueues = reservableQueues; |
| this.globalReservationQueueConfig = globalReservationQueueConfig; |
| this.placementPolicy = placementPolicy; |
| this.configuredQueues = configuredQueues; |
| this.nonPreemptableQueues = nonPreemptableQueues; |
| } |
| |
| public AllocationConfiguration(Configuration conf) { |
| minQueueResources = new HashMap<>(); |
| maxChildQueueResources = new HashMap<>(); |
| maxQueueResources = new HashMap<>(); |
| queueWeights = new HashMap<>(); |
| queueMaxApps = new HashMap<>(); |
| userMaxApps = new HashMap<>(); |
| queueMaxAMShares = new HashMap<>(); |
| userMaxAppsDefault = Integer.MAX_VALUE; |
| queueMaxAppsDefault = Integer.MAX_VALUE; |
| queueMaxResourcesDefault = Resources.unbounded(); |
| queueMaxAMShareDefault = 0.5f; |
| queueAcls = new HashMap<>(); |
| resAcls = new HashMap<>(); |
| minSharePreemptionTimeouts = new HashMap<>(); |
| fairSharePreemptionTimeouts = new HashMap<>(); |
| fairSharePreemptionThresholds = new HashMap<>(); |
| schedulingPolicies = new HashMap<>(); |
| defaultSchedulingPolicy = SchedulingPolicy.DEFAULT_POLICY; |
| reservableQueues = new HashSet<>(); |
| configuredQueues = new HashMap<>(); |
| for (FSQueueType queueType : FSQueueType.values()) { |
| configuredQueues.put(queueType, new HashSet<>()); |
| } |
| placementPolicy = |
| QueuePlacementPolicy.fromConfiguration(conf, configuredQueues); |
| nonPreemptableQueues = new HashSet<>(); |
| } |
| |
| /** |
| * Get the ACLs associated with this queue. If a given ACL is not explicitly |
| * configured, include the default value for that ACL. The default for the |
| * root queue is everybody ("*") and the default for all other queues is |
| * nobody ("") |
| */ |
| public AccessControlList getQueueAcl(String queue, QueueACL operation) { |
| Map<QueueACL, AccessControlList> queueAcls = this.queueAcls.get(queue); |
| if (queueAcls != null) { |
| AccessControlList operationAcl = queueAcls.get(operation); |
| if (operationAcl != null) { |
| return operationAcl; |
| } |
| } |
| return (queue.equals("root")) ? EVERYBODY_ACL : NOBODY_ACL; |
| } |
| |
| @Override |
| /** |
| * Get the map of reservation ACLs to {@link AccessControlList} for the |
| * specified queue. |
| */ |
| public Map<ReservationACL, AccessControlList> getReservationAcls(String |
| queue) { |
| return this.resAcls.get(queue); |
| } |
| |
| /** |
| * Get a queue's min share preemption timeout configured in the allocation |
| * file, in milliseconds. Return -1 if not set. |
| */ |
| public long getMinSharePreemptionTimeout(String queueName) { |
| Long minSharePreemptionTimeout = minSharePreemptionTimeouts.get(queueName); |
| return (minSharePreemptionTimeout == null) ? -1 : minSharePreemptionTimeout; |
| } |
| |
| /** |
| * Get a queue's fair share preemption timeout configured in the allocation |
| * file, in milliseconds. Return -1 if not set. |
| */ |
| public long getFairSharePreemptionTimeout(String queueName) { |
| Long fairSharePreemptionTimeout = fairSharePreemptionTimeouts.get(queueName); |
| return (fairSharePreemptionTimeout == null) ? |
| -1 : fairSharePreemptionTimeout; |
| } |
| |
| /** |
| * Get a queue's fair share preemption threshold in the allocation file. |
| * Return -1f if not set. |
| */ |
| public float getFairSharePreemptionThreshold(String queueName) { |
| Float fairSharePreemptionThreshold = |
| fairSharePreemptionThresholds.get(queueName); |
| return (fairSharePreemptionThreshold == null) ? |
| -1f : fairSharePreemptionThreshold; |
| } |
| |
| public boolean isPreemptable(String queueName) { |
| return !nonPreemptableQueues.contains(queueName); |
| } |
| |
| private ResourceWeights getQueueWeight(String queue) { |
| ResourceWeights weight = queueWeights.get(queue); |
| return (weight == null) ? ResourceWeights.NEUTRAL : weight; |
| } |
| |
| public int getUserMaxApps(String user) { |
| Integer maxApps = userMaxApps.get(user); |
| return (maxApps == null) ? userMaxAppsDefault : maxApps; |
| } |
| |
| @VisibleForTesting |
| int getQueueMaxApps(String queue) { |
| Integer maxApps = queueMaxApps.get(queue); |
| return (maxApps == null) ? queueMaxAppsDefault : maxApps; |
| } |
| |
| @VisibleForTesting |
| float getQueueMaxAMShare(String queue) { |
| Float maxAMShare = queueMaxAMShares.get(queue); |
| return (maxAMShare == null) ? queueMaxAMShareDefault : maxAMShare; |
| } |
| |
| /** |
| * Get the minimum resource allocation for the given queue. |
| * |
| * @param queue the target queue's name |
| * @return the min allocation on this queue or {@link Resources#none} |
| * if not set |
| */ |
| @VisibleForTesting |
| Resource getMinResources(String queue) { |
| Resource minQueueResource = minQueueResources.get(queue); |
| return (minQueueResource == null) ? Resources.none() : minQueueResource; |
| } |
| |
| /** |
| * Get the maximum resource allocation for the given queue. If the max in not |
| * set, return the larger of the min and the default max. |
| * |
| * @param queue the target queue's name |
| * @return the max allocation on this queue |
| */ |
| @VisibleForTesting |
| Resource getMaxResources(String queue) { |
| Resource maxQueueResource = maxQueueResources.get(queue); |
| if (maxQueueResource == null) { |
| Resource minQueueResource = minQueueResources.get(queue); |
| if (minQueueResource != null && |
| Resources.greaterThan(RESOURCE_CALCULATOR, Resources.unbounded(), |
| minQueueResource, queueMaxResourcesDefault)) { |
| return minQueueResource; |
| } else { |
| return queueMaxResourcesDefault; |
| } |
| } else { |
| return maxQueueResource; |
| } |
| } |
| |
| /** |
| * Get the maximum resource allocation for children of the given queue. |
| * |
| * @param queue the target queue's name |
| * @return the max allocation on this queue or null if not set |
| */ |
| @VisibleForTesting |
| Resource getMaxChildResources(String queue) { |
| return maxChildQueueResources.get(queue); |
| } |
| |
| public boolean hasAccess(String queueName, QueueACL acl, |
| UserGroupInformation user) { |
| int lastPeriodIndex = queueName.length(); |
| while (lastPeriodIndex != -1) { |
| String queue = queueName.substring(0, lastPeriodIndex); |
| if (getQueueAcl(queue, acl).isUserAllowed(user)) { |
| return true; |
| } |
| |
| lastPeriodIndex = queueName.lastIndexOf('.', lastPeriodIndex - 1); |
| } |
| |
| return false; |
| } |
| |
| @VisibleForTesting |
| SchedulingPolicy getSchedulingPolicy(String queueName) { |
| SchedulingPolicy policy = schedulingPolicies.get(queueName); |
| return (policy == null) ? defaultSchedulingPolicy : policy; |
| } |
| |
| public SchedulingPolicy getDefaultSchedulingPolicy() { |
| return defaultSchedulingPolicy; |
| } |
| |
| public Map<FSQueueType, Set<String>> getConfiguredQueues() { |
| return configuredQueues; |
| } |
| |
| public QueuePlacementPolicy getPlacementPolicy() { |
| return placementPolicy; |
| } |
| |
| @Override |
| public boolean isReservable(String queue) { |
| return reservableQueues.contains(queue); |
| } |
| |
| @Override |
| public long getReservationWindow(String queue) { |
| return globalReservationQueueConfig.getReservationWindowMsec(); |
| } |
| |
| @Override |
| public float getAverageCapacity(String queue) { |
| return globalReservationQueueConfig.getAvgOverTimeMultiplier() * 100; |
| } |
| |
| @Override |
| public float getInstantaneousMaxCapacity(String queue) { |
| return globalReservationQueueConfig.getMaxOverTimeMultiplier() * 100; |
| } |
| |
| @Override |
| public String getReservationAdmissionPolicy(String queue) { |
| return globalReservationQueueConfig.getReservationAdmissionPolicy(); |
| } |
| |
| @Override |
| public String getReservationAgent(String queue) { |
| return globalReservationQueueConfig.getReservationAgent(); |
| } |
| |
| @Override |
| public boolean getShowReservationAsQueues(String queue) { |
| return globalReservationQueueConfig.shouldShowReservationAsQueues(); |
| } |
| |
| @Override |
| public String getReplanner(String queue) { |
| return globalReservationQueueConfig.getPlanner(); |
| } |
| |
| @Override |
| public boolean getMoveOnExpiry(String queue) { |
| return globalReservationQueueConfig.shouldMoveOnExpiry(); |
| } |
| |
| @Override |
| public long getEnforcementWindow(String queue) { |
| return globalReservationQueueConfig.getEnforcementWindowMsec(); |
| } |
| |
| @VisibleForTesting |
| public void setReservationWindow(long window) { |
| globalReservationQueueConfig.setReservationWindow(window); |
| } |
| |
| @VisibleForTesting |
| public void setAverageCapacity(int avgCapacity) { |
| globalReservationQueueConfig.setAverageCapacity(avgCapacity); |
| } |
| |
| /** |
| * Initialize a {@link FSQueue} with queue-specific properties and its |
| * metrics. |
| * @param queue the FSQueue needed to be initialized |
| * @param scheduler the scheduler which the queue belonged to |
| */ |
| public void initFSQueue(FSQueue queue, FairScheduler scheduler){ |
| // Set queue-specific properties. |
| String name = queue.getName(); |
| queue.setWeights(getQueueWeight(name)); |
| queue.setMinShare(getMinResources(name)); |
| queue.setMaxShare(getMaxResources(name)); |
| queue.setMaxRunningApps(getQueueMaxApps(name)); |
| queue.setMaxAMShare(getQueueMaxAMShare(name)); |
| queue.setMaxChildQueueResource(getMaxChildResources(name)); |
| try { |
| SchedulingPolicy policy = getSchedulingPolicy(name); |
| policy.initialize(scheduler.getClusterResource()); |
| queue.setPolicy(policy); |
| } catch (AllocationConfigurationException ex) { |
| LOG.warn("Failed to set the scheduling policy " |
| + getDefaultSchedulingPolicy(), ex); |
| } |
| |
| // Set queue metrics. |
| queue.getMetrics().setMinShare(getMinResources(name)); |
| queue.getMetrics().setMaxShare(getMaxResources(name)); |
| queue.getMetrics().setMaxApps(getQueueMaxApps(name)); |
| queue.getMetrics().setSchedulingPolicy(getSchedulingPolicy(name).getName()); |
| } |
| } |