blob: 54dd090617dafedd5c9d7636488a887641d52040 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ReservationACL;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocation.AllocationFileParser;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocation.QueueProperties;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
public class AllocationConfiguration extends ReservationSchedulerConfiguration {
private static final AccessControlList EVERYBODY_ACL = new AccessControlList("*");
private static final AccessControlList NOBODY_ACL = new AccessControlList(" ");
// Minimum resource allocation for each queue
private final Map<String, Resource> minQueueResources;
// Maximum amount of resources per queue
@VisibleForTesting
final Map<String, ConfigurableResource> maxQueueResources;
// Maximum amount of resources for each queue's ad hoc children
private final Map<String, ConfigurableResource> maxChildQueueResources;
// Sharing weights for each queue
private final Map<String, Float> queueWeights;
// Max concurrent running applications for each queue and for each user; in addition,
// for users that have no max specified, we use the userMaxJobsDefault.
@VisibleForTesting
final Map<String, Integer> queueMaxApps;
@VisibleForTesting
final Map<String, Integer> userMaxApps;
private final int userMaxAppsDefault;
private final int queueMaxAppsDefault;
private final ConfigurableResource queueMaxResourcesDefault;
// Maximum resource share for each leaf queue that can be used to run AMs
final Map<String, Float> queueMaxAMShares;
private final float queueMaxAMShareDefault;
// ACL's for each queue. Only specifies non-default ACL's from configuration.
private final Map<String, Map<AccessType, AccessControlList>> queueAcls;
// Reservation ACL's for each queue. Only specifies non-default ACL's from
// configuration.
private final Map<String, Map<ReservationACL, AccessControlList>> resAcls;
// Min share preemption timeout for each queue in seconds. If a job in the queue
// waits this long without receiving its guaranteed share, it is allowed to
// preempt other jobs' tasks.
private final Map<String, Long> minSharePreemptionTimeouts;
// Fair share preemption timeout for each queue in seconds. If a job in the
// queue waits this long without receiving its fair share threshold, it is
// allowed to preempt other jobs' tasks.
private final Map<String, Long> fairSharePreemptionTimeouts;
// The fair share preemption threshold for each queue. If a queue waits
// fairSharePreemptionTimeout without receiving
// fairshare * fairSharePreemptionThreshold resources, it is allowed to
// preempt other queues' tasks.
private final Map<String, Float> fairSharePreemptionThresholds;
private final Set<String> reservableQueues;
private final Map<String, SchedulingPolicy> schedulingPolicies;
private final SchedulingPolicy defaultSchedulingPolicy;
// Policy for mapping apps to queues
@VisibleForTesting
QueuePlacementPolicy placementPolicy;
//Configured queues in the alloc xml
@VisibleForTesting
Map<FSQueueType, Set<String>> configuredQueues;
// Reservation system configuration
private ReservationQueueConfiguration globalReservationQueueConfig;
private final Set<String> nonPreemptableQueues;
public AllocationConfiguration(QueueProperties queueProperties,
AllocationFileParser allocationFileParser,
QueuePlacementPolicy newPlacementPolicy,
ReservationQueueConfiguration globalReservationQueueConfig)
throws AllocationConfigurationException {
this.minQueueResources = queueProperties.getMinQueueResources();
this.maxQueueResources = queueProperties.getMaxQueueResources();
this.maxChildQueueResources = queueProperties.getMaxChildQueueResources();
this.queueMaxApps = queueProperties.getQueueMaxApps();
this.userMaxApps = allocationFileParser.getUserMaxApps();
this.queueMaxAMShares = queueProperties.getQueueMaxAMShares();
this.queueWeights = queueProperties.getQueueWeights();
this.userMaxAppsDefault = allocationFileParser.getUserMaxAppsDefault();
this.queueMaxResourcesDefault =
allocationFileParser.getQueueMaxResourcesDefault();
this.queueMaxAppsDefault = allocationFileParser.getQueueMaxAppsDefault();
this.queueMaxAMShareDefault =
allocationFileParser.getQueueMaxAMShareDefault();
this.defaultSchedulingPolicy =
allocationFileParser.getDefaultSchedulingPolicy();
this.schedulingPolicies = queueProperties.getQueuePolicies();
this.minSharePreemptionTimeouts =
queueProperties.getMinSharePreemptionTimeouts();
this.fairSharePreemptionTimeouts =
queueProperties.getFairSharePreemptionTimeouts();
this.fairSharePreemptionThresholds =
queueProperties.getFairSharePreemptionThresholds();
this.queueAcls = queueProperties.getQueueAcls();
this.resAcls = queueProperties.getReservationAcls();
this.reservableQueues = queueProperties.getReservableQueues();
this.globalReservationQueueConfig = globalReservationQueueConfig;
this.placementPolicy = newPlacementPolicy;
this.configuredQueues = queueProperties.getConfiguredQueues();
this.nonPreemptableQueues = queueProperties.getNonPreemptableQueues();
}
public AllocationConfiguration(Configuration conf) {
minQueueResources = new HashMap<>();
maxChildQueueResources = new HashMap<>();
maxQueueResources = new HashMap<>();
queueWeights = new HashMap<>();
queueMaxApps = new HashMap<>();
userMaxApps = new HashMap<>();
queueMaxAMShares = new HashMap<>();
userMaxAppsDefault = Integer.MAX_VALUE;
queueMaxAppsDefault = Integer.MAX_VALUE;
queueMaxResourcesDefault = new ConfigurableResource(Resources.unbounded());
queueMaxAMShareDefault = 0.5f;
queueAcls = new HashMap<>();
resAcls = new HashMap<>();
minSharePreemptionTimeouts = new HashMap<>();
fairSharePreemptionTimeouts = new HashMap<>();
fairSharePreemptionThresholds = new HashMap<>();
schedulingPolicies = new HashMap<>();
defaultSchedulingPolicy = SchedulingPolicy.DEFAULT_POLICY;
reservableQueues = new HashSet<>();
configuredQueues = new HashMap<>();
for (FSQueueType queueType : FSQueueType.values()) {
configuredQueues.put(queueType, new HashSet<>());
}
placementPolicy =
QueuePlacementPolicy.fromConfiguration(conf, configuredQueues);
nonPreemptableQueues = new HashSet<>();
}
/**
* Get the map of ACLs of all queues.
* @return the map of ACLs of all queues
*/
public Map<String, Map<AccessType, AccessControlList>> getQueueAcls() {
return Collections.unmodifiableMap(this.queueAcls);
}
@Override
/**
* Get the map of reservation ACLs to {@link AccessControlList} for the
* specified queue.
*/
public Map<ReservationACL, AccessControlList> getReservationAcls(String
queue) {
return this.resAcls.get(queue);
}
/**
* Get a queue's min share preemption timeout configured in the allocation
* file, in milliseconds. Return -1 if not set.
*/
public long getMinSharePreemptionTimeout(String queueName) {
Long minSharePreemptionTimeout = minSharePreemptionTimeouts.get(queueName);
return (minSharePreemptionTimeout == null) ? -1 : minSharePreemptionTimeout;
}
/**
* Get a queue's fair share preemption timeout configured in the allocation
* file, in milliseconds. Return -1 if not set.
*/
public long getFairSharePreemptionTimeout(String queueName) {
Long fairSharePreemptionTimeout = fairSharePreemptionTimeouts.get(queueName);
return (fairSharePreemptionTimeout == null) ?
-1 : fairSharePreemptionTimeout;
}
/**
* Get a queue's fair share preemption threshold in the allocation file.
* Return -1f if not set.
*/
public float getFairSharePreemptionThreshold(String queueName) {
Float fairSharePreemptionThreshold =
fairSharePreemptionThresholds.get(queueName);
return (fairSharePreemptionThreshold == null) ?
-1f : fairSharePreemptionThreshold;
}
public boolean isPreemptable(String queueName) {
return !nonPreemptableQueues.contains(queueName);
}
private float getQueueWeight(String queue) {
Float weight = queueWeights.get(queue);
return (weight == null) ? 1.0f : weight;
}
public int getUserMaxApps(String user) {
Integer maxApps = userMaxApps.get(user);
return (maxApps == null) ? userMaxAppsDefault : maxApps;
}
@VisibleForTesting
int getQueueMaxApps(String queue) {
Integer maxApps = queueMaxApps.get(queue);
return (maxApps == null) ? queueMaxAppsDefault : maxApps;
}
@VisibleForTesting
float getQueueMaxAMShare(String queue) {
Float maxAMShare = queueMaxAMShares.get(queue);
return (maxAMShare == null) ? queueMaxAMShareDefault : maxAMShare;
}
/**
* Get the minimum resource allocation for the given queue.
*
* @param queue the target queue's name
* @return the min allocation on this queue or {@link Resources#none}
* if not set
*/
@VisibleForTesting
Resource getMinResources(String queue) {
Resource minQueueResource = minQueueResources.get(queue);
return (minQueueResource == null) ? Resources.none() : minQueueResource;
}
/**
* Get the maximum resource allocation for the given queue. If the max in not
* set, return the default max.
*
* @param queue the target queue's name
* @return the max allocation on this queue
*/
@VisibleForTesting
ConfigurableResource getMaxResources(String queue) {
ConfigurableResource maxQueueResource = maxQueueResources.get(queue);
if (maxQueueResource == null) {
maxQueueResource = queueMaxResourcesDefault;
}
return maxQueueResource;
}
/**
* Get the maximum resource allocation for children of the given queue.
*
* @param queue the target queue's name
* @return the max allocation on this queue or null if not set
*/
@VisibleForTesting
ConfigurableResource getMaxChildResources(String queue) {
return maxChildQueueResources.get(queue);
}
@VisibleForTesting
SchedulingPolicy getSchedulingPolicy(String queueName) {
SchedulingPolicy policy = schedulingPolicies.get(queueName);
return (policy == null) ? defaultSchedulingPolicy : policy;
}
public SchedulingPolicy getDefaultSchedulingPolicy() {
return defaultSchedulingPolicy;
}
public Map<FSQueueType, Set<String>> getConfiguredQueues() {
return configuredQueues;
}
public QueuePlacementPolicy getPlacementPolicy() {
return placementPolicy;
}
@Override
public boolean isReservable(String queue) {
return reservableQueues.contains(queue);
}
@Override
public long getReservationWindow(String queue) {
return globalReservationQueueConfig.getReservationWindowMsec();
}
@Override
public float getAverageCapacity(String queue) {
return globalReservationQueueConfig.getAvgOverTimeMultiplier() * 100;
}
@Override
public float getInstantaneousMaxCapacity(String queue) {
return globalReservationQueueConfig.getMaxOverTimeMultiplier() * 100;
}
@Override
public String getReservationAdmissionPolicy(String queue) {
return globalReservationQueueConfig.getReservationAdmissionPolicy();
}
@Override
public String getReservationAgent(String queue) {
return globalReservationQueueConfig.getReservationAgent();
}
@Override
public boolean getShowReservationAsQueues(String queue) {
return globalReservationQueueConfig.shouldShowReservationAsQueues();
}
@Override
public String getReplanner(String queue) {
return globalReservationQueueConfig.getPlanner();
}
@Override
public boolean getMoveOnExpiry(String queue) {
return globalReservationQueueConfig.shouldMoveOnExpiry();
}
@Override
public long getEnforcementWindow(String queue) {
return globalReservationQueueConfig.getEnforcementWindowMsec();
}
@VisibleForTesting
public void setReservationWindow(long window) {
globalReservationQueueConfig.setReservationWindow(window);
}
@VisibleForTesting
public void setAverageCapacity(int avgCapacity) {
globalReservationQueueConfig.setAverageCapacity(avgCapacity);
}
/**
* Initialize a {@link FSQueue} with queue-specific properties and its
* metrics.
* @param queue the FSQueue needed to be initialized
*/
public void initFSQueue(FSQueue queue){
// Set queue-specific properties.
String name = queue.getName();
queue.setWeights(getQueueWeight(name));
queue.setMinShare(getMinResources(name));
queue.setMaxShare(getMaxResources(name));
queue.setMaxRunningApps(getQueueMaxApps(name));
queue.setMaxAMShare(getQueueMaxAMShare(name));
queue.setMaxChildQueueResource(getMaxChildResources(name));
// Set queue metrics.
queue.getMetrics().setMinShare(queue.getMinShare());
queue.getMetrics().setMaxShare(queue.getMaxShare());
queue.getMetrics().setMaxApps(queue.getMaxRunningApps());
queue.getMetrics().setSchedulingPolicy(getSchedulingPolicy(name).getName());
}
}