blob: ee7198122c5f0faa85ea5aaa555f87317a3c027b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.ReservationACL;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.w3c.dom.Text;
import org.xml.sax.SAXException;
import com.google.common.base.CharMatcher;
import com.google.common.annotations.VisibleForTesting;
@Public
@Unstable
public class AllocationFileLoaderService extends AbstractService {
public static final Log LOG = LogFactory.getLog(
AllocationFileLoaderService.class.getName());
/** Time to wait between checks of the allocation file */
public static final long ALLOC_RELOAD_INTERVAL_MS = 10 * 1000;
/**
* Time to wait after the allocation has been modified before reloading it
* (this is done to prevent loading a file that hasn't been fully written).
*/
public static final long ALLOC_RELOAD_WAIT_MS = 5 * 1000;
public static final long THREAD_JOIN_TIMEOUT_MS = 1000;
private final Clock clock;
private long lastSuccessfulReload; // Last time we successfully reloaded queues
private boolean lastReloadAttemptFailed = false;
// Path to XML file containing allocations.
private File allocFile;
private Listener reloadListener;
@VisibleForTesting
long reloadIntervalMs = ALLOC_RELOAD_INTERVAL_MS;
private Thread reloadThread;
private volatile boolean running = true;
public AllocationFileLoaderService() {
this(SystemClock.getInstance());
}
public AllocationFileLoaderService(Clock clock) {
super(AllocationFileLoaderService.class.getName());
this.clock = clock;
}
@Override
public void serviceInit(Configuration conf) throws Exception {
this.allocFile = getAllocationFile(conf);
if (allocFile != null) {
reloadThread = new Thread() {
@Override
public void run() {
while (running) {
long time = clock.getTime();
long lastModified = allocFile.lastModified();
if (lastModified > lastSuccessfulReload &&
time > lastModified + ALLOC_RELOAD_WAIT_MS) {
try {
reloadAllocations();
} catch (Exception ex) {
if (!lastReloadAttemptFailed) {
LOG.error("Failed to reload fair scheduler config file - " +
"will use existing allocations.", ex);
}
lastReloadAttemptFailed = true;
}
} else if (lastModified == 0l) {
if (!lastReloadAttemptFailed) {
LOG.warn("Failed to reload fair scheduler config file because" +
" last modified returned 0. File exists: "
+ allocFile.exists());
}
lastReloadAttemptFailed = true;
}
try {
Thread.sleep(reloadIntervalMs);
} catch (InterruptedException ex) {
LOG.info(
"Interrupted while waiting to reload alloc configuration");
}
}
}
};
reloadThread.setName("AllocationFileReloader");
reloadThread.setDaemon(true);
}
super.serviceInit(conf);
}
@Override
public void serviceStart() throws Exception {
if (reloadThread != null) {
reloadThread.start();
}
super.serviceStart();
}
@Override
public void serviceStop() throws Exception {
running = false;
if (reloadThread != null) {
reloadThread.interrupt();
try {
reloadThread.join(THREAD_JOIN_TIMEOUT_MS);
} catch (InterruptedException e) {
LOG.warn("reloadThread fails to join.");
}
}
super.serviceStop();
}
/**
* Path to XML file containing allocations. If the
* path is relative, it is searched for in the
* classpath, but loaded like a regular File.
*/
public File getAllocationFile(Configuration conf) {
String allocFilePath = conf.get(FairSchedulerConfiguration.ALLOCATION_FILE,
FairSchedulerConfiguration.DEFAULT_ALLOCATION_FILE);
File allocFile = new File(allocFilePath);
if (!allocFile.isAbsolute()) {
URL url = Thread.currentThread().getContextClassLoader()
.getResource(allocFilePath);
if (url == null) {
LOG.warn(allocFilePath + " not found on the classpath.");
allocFile = null;
} else if (!url.getProtocol().equalsIgnoreCase("file")) {
throw new RuntimeException("Allocation file " + url
+ " found on the classpath is not on the local filesystem.");
} else {
allocFile = new File(url.getPath());
}
}
return allocFile;
}
public synchronized void setReloadListener(Listener reloadListener) {
this.reloadListener = reloadListener;
}
/**
* Updates the allocation list from the allocation config file. This file is
* expected to be in the XML format specified in the design doc.
*
* @throws IOException if the config file cannot be read.
* @throws AllocationConfigurationException if allocations are invalid.
* @throws ParserConfigurationException if XML parser is misconfigured.
* @throws SAXException if config file is malformed.
*/
public synchronized void reloadAllocations() throws IOException,
ParserConfigurationException, SAXException,
AllocationConfigurationException {
if (allocFile == null) {
return;
}
LOG.info("Loading allocation file " + allocFile);
// Create some temporary hashmaps to hold the new allocs, and we only save
// them in our fields if we have parsed the entire allocs file successfully.
Map<String, Resource> minQueueResources = new HashMap<>();
Map<String, Resource> maxQueueResources = new HashMap<>();
Map<String, Resource> maxChildQueueResources = new HashMap<>();
Map<String, Integer> queueMaxApps = new HashMap<>();
Map<String, Integer> userMaxApps = new HashMap<>();
Map<String, Float> queueMaxAMShares = new HashMap<>();
Map<String, ResourceWeights> queueWeights = new HashMap<>();
Map<String, SchedulingPolicy> queuePolicies = new HashMap<>();
Map<String, Long> minSharePreemptionTimeouts = new HashMap<>();
Map<String, Long> fairSharePreemptionTimeouts = new HashMap<>();
Map<String, Float> fairSharePreemptionThresholds = new HashMap<>();
Map<String, Map<QueueACL, AccessControlList>> queueAcls = new HashMap<>();
Map<String, Map<ReservationACL, AccessControlList>> reservationAcls =
new HashMap<>();
Set<String> reservableQueues = new HashSet<>();
Set<String> nonPreemptableQueues = new HashSet<>();
int userMaxAppsDefault = Integer.MAX_VALUE;
int queueMaxAppsDefault = Integer.MAX_VALUE;
Resource queueMaxResourcesDefault = Resources.unbounded();
float queueMaxAMShareDefault = 0.5f;
long defaultFairSharePreemptionTimeout = Long.MAX_VALUE;
long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
float defaultFairSharePreemptionThreshold = 0.5f;
SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY;
// Reservation global configuration knobs
String planner = null;
String reservationAgent = null;
String reservationAdmissionPolicy = null;
QueuePlacementPolicy newPlacementPolicy = null;
// Remember all queue names so we can display them on web UI, etc.
// configuredQueues is segregated based on whether it is a leaf queue
// or a parent queue. This information is used for creating queues
// and also for making queue placement decisions(QueuePlacementRule.java).
Map<FSQueueType, Set<String>> configuredQueues = new HashMap<>();
for (FSQueueType queueType : FSQueueType.values()) {
configuredQueues.put(queueType, new HashSet<String>());
}
// Read and parse the allocations file.
DocumentBuilderFactory docBuilderFactory =
DocumentBuilderFactory.newInstance();
docBuilderFactory.setIgnoringComments(true);
DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
Document doc = builder.parse(allocFile);
Element root = doc.getDocumentElement();
if (!"allocations".equals(root.getTagName()))
throw new AllocationConfigurationException("Bad fair scheduler config " +
"file: top-level element not <allocations>");
NodeList elements = root.getChildNodes();
List<Element> queueElements = new ArrayList<Element>();
Element placementPolicyElement = null;
for (int i = 0; i < elements.getLength(); i++) {
Node node = elements.item(i);
if (node instanceof Element) {
Element element = (Element)node;
if ("queue".equals(element.getTagName()) ||
"pool".equals(element.getTagName())) {
queueElements.add(element);
} else if ("user".equals(element.getTagName())) {
String userName = element.getAttribute("name");
NodeList fields = element.getChildNodes();
for (int j = 0; j < fields.getLength(); j++) {
Node fieldNode = fields.item(j);
if (!(fieldNode instanceof Element))
continue;
Element field = (Element) fieldNode;
if ("maxRunningApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
userMaxApps.put(userName, val);
}
}
} else if ("queueMaxResourcesDefault".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
Resource val =
FairSchedulerConfiguration.parseResourceConfigValue(text);
queueMaxResourcesDefault = val;
} else if ("userMaxAppsDefault".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
userMaxAppsDefault = val;
} else if ("defaultFairSharePreemptionTimeout"
.equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
defaultFairSharePreemptionTimeout = val;
} else if ("fairSharePreemptionTimeout".equals(element.getTagName())) {
if (defaultFairSharePreemptionTimeout == Long.MAX_VALUE) {
String text = ((Text)element.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
defaultFairSharePreemptionTimeout = val;
}
} else if ("defaultMinSharePreemptionTimeout"
.equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
defaultMinSharePreemptionTimeout = val;
} else if ("defaultFairSharePreemptionThreshold"
.equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
float val = Float.parseFloat(text);
val = Math.max(Math.min(val, 1.0f), 0.0f);
defaultFairSharePreemptionThreshold = val;
} else if ("queueMaxAppsDefault".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
queueMaxAppsDefault = val;
} else if ("queueMaxAMShareDefault".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
float val = Float.parseFloat(text);
val = Math.min(val, 1.0f);
queueMaxAMShareDefault = val;
} else if ("defaultQueueSchedulingPolicy".equals(element.getTagName())
|| "defaultQueueSchedulingMode".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
if (text.equalsIgnoreCase(FifoPolicy.NAME)) {
throw new AllocationConfigurationException("Bad fair scheduler "
+ "config file: defaultQueueSchedulingPolicy or "
+ "defaultQueueSchedulingMode can't be FIFO.");
}
defaultSchedPolicy = SchedulingPolicy.parse(text);
} else if ("queuePlacementPolicy".equals(element.getTagName())) {
placementPolicyElement = element;
} else if ("reservation-planner".equals(element.getTagName())) {
String text = ((Text) element.getFirstChild()).getData().trim();
planner = text;
} else if ("reservation-agent".equals(element.getTagName())) {
String text = ((Text) element.getFirstChild()).getData().trim();
reservationAgent = text;
} else if ("reservation-policy".equals(element.getTagName())) {
String text = ((Text) element.getFirstChild()).getData().trim();
reservationAdmissionPolicy = text;
} else {
LOG.warn("Bad element in allocations file: " + element.getTagName());
}
}
}
// Load queue elements. A root queue can either be included or omitted. If
// it's included, all other queues must be inside it.
for (Element element : queueElements) {
String parent = "root";
if (element.getAttribute("name").equalsIgnoreCase("root")) {
if (queueElements.size() > 1) {
throw new AllocationConfigurationException("If configuring root queue,"
+ " no other queues can be placed alongside it.");
}
parent = null;
}
loadQueue(parent, element, minQueueResources, maxQueueResources,
maxChildQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares,
queueWeights, queuePolicies, minSharePreemptionTimeouts,
fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls,
reservationAcls, configuredQueues, reservableQueues,
nonPreemptableQueues);
}
// Load placement policy and pass it configured queues
Configuration conf = getConfig();
if (placementPolicyElement != null) {
newPlacementPolicy = QueuePlacementPolicy.fromXml(placementPolicyElement,
configuredQueues, conf);
} else {
newPlacementPolicy = QueuePlacementPolicy.fromConfiguration(conf,
configuredQueues);
}
// Set the min/fair share preemption timeout for the root queue
if (!minSharePreemptionTimeouts.containsKey(QueueManager.ROOT_QUEUE)){
minSharePreemptionTimeouts.put(QueueManager.ROOT_QUEUE,
defaultMinSharePreemptionTimeout);
}
if (!fairSharePreemptionTimeouts.containsKey(QueueManager.ROOT_QUEUE)) {
fairSharePreemptionTimeouts.put(QueueManager.ROOT_QUEUE,
defaultFairSharePreemptionTimeout);
}
// Set the fair share preemption threshold for the root queue
if (!fairSharePreemptionThresholds.containsKey(QueueManager.ROOT_QUEUE)) {
fairSharePreemptionThresholds.put(QueueManager.ROOT_QUEUE,
defaultFairSharePreemptionThreshold);
}
ReservationQueueConfiguration globalReservationQueueConfig = new
ReservationQueueConfiguration();
if (planner != null) {
globalReservationQueueConfig.setPlanner(planner);
}
if (reservationAdmissionPolicy != null) {
globalReservationQueueConfig.setReservationAdmissionPolicy
(reservationAdmissionPolicy);
}
if (reservationAgent != null) {
globalReservationQueueConfig.setReservationAgent(reservationAgent);
}
AllocationConfiguration info =
new AllocationConfiguration(minQueueResources, maxQueueResources,
maxChildQueueResources, queueMaxApps, userMaxApps, queueWeights,
queueMaxAMShares, userMaxAppsDefault, queueMaxAppsDefault,
queueMaxResourcesDefault, queueMaxAMShareDefault, queuePolicies,
defaultSchedPolicy, minSharePreemptionTimeouts,
fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls,
reservationAcls, newPlacementPolicy, configuredQueues,
globalReservationQueueConfig, reservableQueues, nonPreemptableQueues);
lastSuccessfulReload = clock.getTime();
lastReloadAttemptFailed = false;
reloadListener.onReload(info);
}
/**
* Loads a queue from a queue element in the configuration file
*/
private void loadQueue(String parentName, Element element,
Map<String, Resource> minQueueResources,
Map<String, Resource> maxQueueResources,
Map<String, Resource> maxChildQueueResources,
Map<String, Integer> queueMaxApps,
Map<String, Integer> userMaxApps,
Map<String, Float> queueMaxAMShares,
Map<String, ResourceWeights> queueWeights,
Map<String, SchedulingPolicy> queuePolicies,
Map<String, Long> minSharePreemptionTimeouts,
Map<String, Long> fairSharePreemptionTimeouts,
Map<String, Float> fairSharePreemptionThresholds,
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
Map<String, Map<ReservationACL, AccessControlList>> resAcls,
Map<FSQueueType, Set<String>> configuredQueues,
Set<String> reservableQueues,
Set<String> nonPreemptableQueues)
throws AllocationConfigurationException {
String queueName = CharMatcher.WHITESPACE.trimFrom(
element.getAttribute("name"));
if (queueName.contains(".")) {
throw new AllocationConfigurationException("Bad fair scheduler config "
+ "file: queue name (" + queueName + ") shouldn't contain period.");
}
if (queueName.isEmpty()) {
throw new AllocationConfigurationException("Bad fair scheduler config "
+ "file: queue name shouldn't be empty or "
+ "consist only of whitespace.");
}
if (parentName != null) {
queueName = parentName + "." + queueName;
}
Map<QueueACL, AccessControlList> acls = new HashMap<>();
Map<ReservationACL, AccessControlList> racls = new HashMap<>();
NodeList fields = element.getChildNodes();
boolean isLeaf = true;
for (int j = 0; j < fields.getLength(); j++) {
Node fieldNode = fields.item(j);
if (!(fieldNode instanceof Element))
continue;
Element field = (Element) fieldNode;
if ("minResources".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
Resource val =
FairSchedulerConfiguration.parseResourceConfigValue(text);
minQueueResources.put(queueName, val);
} else if ("maxResources".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
Resource val =
FairSchedulerConfiguration.parseResourceConfigValue(text);
maxQueueResources.put(queueName, val);
} else if ("maxChildResources".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
Resource val =
FairSchedulerConfiguration.parseResourceConfigValue(text);
maxChildQueueResources.put(queueName, val);
} else if ("maxRunningApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
queueMaxApps.put(queueName, val);
} else if ("maxAMShare".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
float val = Float.parseFloat(text);
val = Math.min(val, 1.0f);
queueMaxAMShares.put(queueName, val);
} else if ("weight".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
double val = Double.parseDouble(text);
queueWeights.put(queueName, new ResourceWeights((float)val));
} else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
minSharePreemptionTimeouts.put(queueName, val);
} else if ("fairSharePreemptionTimeout".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
fairSharePreemptionTimeouts.put(queueName, val);
} else if ("fairSharePreemptionThreshold".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
float val = Float.parseFloat(text);
val = Math.max(Math.min(val, 1.0f), 0.0f);
fairSharePreemptionThresholds.put(queueName, val);
} else if ("schedulingPolicy".equals(field.getTagName())
|| "schedulingMode".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
SchedulingPolicy policy = SchedulingPolicy.parse(text);
queuePolicies.put(queueName, policy);
} else if ("aclSubmitApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData();
acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
} else if ("aclAdministerApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData();
acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
} else if ("aclAdministerReservations".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData();
racls.put(ReservationACL.ADMINISTER_RESERVATIONS,
new AccessControlList(text));
} else if ("aclListReservations".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData();
racls.put(ReservationACL.LIST_RESERVATIONS, new AccessControlList(
text));
} else if ("aclSubmitReservations".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData();
racls.put(ReservationACL.SUBMIT_RESERVATIONS,
new AccessControlList(text));
} else if ("reservation".equals(field.getTagName())) {
isLeaf = false;
reservableQueues.add(queueName);
configuredQueues.get(FSQueueType.PARENT).add(queueName);
} else if ("allowPreemptionFrom".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
if (!Boolean.parseBoolean(text)) {
nonPreemptableQueues.add(queueName);
}
} else if ("queue".endsWith(field.getTagName()) ||
"pool".equals(field.getTagName())) {
loadQueue(queueName, field, minQueueResources, maxQueueResources,
maxChildQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares,
queueWeights, queuePolicies, minSharePreemptionTimeouts,
fairSharePreemptionTimeouts, fairSharePreemptionThresholds,
queueAcls, resAcls, configuredQueues, reservableQueues,
nonPreemptableQueues);
isLeaf = false;
}
}
if (isLeaf) {
// if a leaf in the alloc file is marked as type='parent'
// then store it under 'parent'
if ("parent".equals(element.getAttribute("type"))) {
configuredQueues.get(FSQueueType.PARENT).add(queueName);
} else {
configuredQueues.get(FSQueueType.LEAF).add(queueName);
}
} else {
if ("parent".equals(element.getAttribute("type"))) {
throw new AllocationConfigurationException("Both <reservation> and " +
"type=\"parent\" found for queue " + queueName + " which is " +
"unsupported");
}
configuredQueues.get(FSQueueType.PARENT).add(queueName);
}
queueAcls.put(queueName, acls);
resAcls.put(queueName, racls);
if (maxQueueResources.containsKey(queueName) &&
minQueueResources.containsKey(queueName)
&& !Resources.fitsIn(minQueueResources.get(queueName),
maxQueueResources.get(queueName))) {
LOG.warn(
String.format("Queue %s has max resources %s less than "
+ "min resources %s", queueName, maxQueueResources.get(queueName),
minQueueResources.get(queueName)));
}
}
public interface Listener {
public void onReload(AllocationConfiguration info);
}
}