blob: dc07f8697535ad7c7d836c18de69119fa98adf8c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.security.Permission;
import org.apache.hadoop.yarn.security.PrivilegedEntity;
import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocation.AllocationFileParser;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocation.AllocationFileQueueParser;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocation.QueueProperties;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocation.AllocationFileQueueParser.EVERYBODY_ACL;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocation.AllocationFileQueueParser.ROOT;
@Public
@Unstable
public class AllocationFileLoaderService extends AbstractService {
public static final Logger LOG = LoggerFactory.getLogger(
AllocationFileLoaderService.class.getName());
/** Time to wait between checks of the allocation file */
public static final long ALLOC_RELOAD_INTERVAL_MS = 10 * 1000;
/**
* Time to wait after the allocation has been modified before reloading it
* (this is done to prevent loading a file that hasn't been fully written).
*/
public static final long ALLOC_RELOAD_WAIT_MS = 5 * 1000;
public static final long THREAD_JOIN_TIMEOUT_MS = 1000;
//Permitted allocation file filesystems (case insensitive)
private static final String SUPPORTED_FS_REGEX =
"(?i)(hdfs)|(file)|(s3a)|(viewfs)";
private final Clock clock;
private final FairScheduler scheduler;
// Last time we successfully reloaded queues
private volatile long lastSuccessfulReload;
private volatile boolean lastReloadAttemptFailed = false;
// Path to XML file containing allocations.
private Path allocFile;
private FileSystem fs;
private Listener reloadListener;
@VisibleForTesting
long reloadIntervalMs = ALLOC_RELOAD_INTERVAL_MS;
private Thread reloadThread;
private volatile boolean running = true;
public AllocationFileLoaderService(FairScheduler scheduler) {
this(SystemClock.getInstance(), scheduler);
}
private List<Permission> defaultPermissions;
AllocationFileLoaderService(Clock clock, FairScheduler scheduler) {
super(AllocationFileLoaderService.class.getName());
this.scheduler = scheduler;
this.clock = clock;
}
@Override
public void serviceInit(Configuration conf) throws Exception {
this.allocFile = getAllocationFile(conf);
if (this.allocFile != null) {
this.fs = allocFile.getFileSystem(conf);
reloadThread = new Thread(() -> {
while (running) {
try {
synchronized (this) {
reloadListener.onCheck();
}
long time = clock.getTime();
long lastModified =
fs.getFileStatus(allocFile).getModificationTime();
if (lastModified > lastSuccessfulReload &&
time > lastModified + ALLOC_RELOAD_WAIT_MS) {
try {
reloadAllocations();
} catch (Exception ex) {
if (!lastReloadAttemptFailed) {
LOG.error("Failed to reload fair scheduler config file - " +
"will use existing allocations.", ex);
}
lastReloadAttemptFailed = true;
}
} else if (lastModified == 0l) {
if (!lastReloadAttemptFailed) {
LOG.warn("Failed to reload fair scheduler config file because" +
" last modified returned 0. File exists: "
+ fs.exists(allocFile));
}
lastReloadAttemptFailed = true;
}
} catch (IOException e) {
LOG.error("Exception while loading allocation file: " + e);
}
try {
Thread.sleep(reloadIntervalMs);
} catch (InterruptedException ex) {
LOG.info(
"Interrupted while waiting to reload alloc configuration");
}
}
});
reloadThread.setName("AllocationFileReloader");
reloadThread.setDaemon(true);
}
super.serviceInit(conf);
}
@Override
public void serviceStart() throws Exception {
if (reloadThread != null) {
reloadThread.start();
}
super.serviceStart();
}
@Override
public void serviceStop() throws Exception {
running = false;
if (reloadThread != null) {
reloadThread.interrupt();
try {
reloadThread.join(THREAD_JOIN_TIMEOUT_MS);
} catch (InterruptedException e) {
LOG.warn("reloadThread fails to join.");
}
}
super.serviceStop();
}
/**
* Path to XML file containing allocations. If the
* path is relative, it is searched for in the
* classpath, but loaded like a regular File.
*/
@VisibleForTesting
public Path getAllocationFile(Configuration conf)
throws UnsupportedFileSystemException {
String allocFilePath = conf.get(FairSchedulerConfiguration.ALLOCATION_FILE,
FairSchedulerConfiguration.DEFAULT_ALLOCATION_FILE);
Path allocPath = new Path(allocFilePath);
String allocPathScheme = allocPath.toUri().getScheme();
if(allocPathScheme != null && !allocPathScheme.matches(SUPPORTED_FS_REGEX)){
throw new UnsupportedFileSystemException("Allocation file "
+ allocFilePath + " uses an unsupported filesystem");
} else if (!allocPath.isAbsolute()) {
URL url = Thread.currentThread().getContextClassLoader()
.getResource(allocFilePath);
if (url == null) {
LOG.warn(allocFilePath + " not found on the classpath.");
allocPath = null;
} else if (!url.getProtocol().equalsIgnoreCase("file")) {
throw new RuntimeException("Allocation file " + url
+ " found on the classpath is not on the local filesystem.");
} else {
allocPath = new Path(url.getProtocol(), null, url.getPath());
}
} else if (allocPath.isAbsoluteAndSchemeAuthorityNull()){
allocPath = new Path("file", null, allocFilePath);
}
return allocPath;
}
public synchronized void setReloadListener(Listener reloadListener) {
this.reloadListener = reloadListener;
}
/**
* Updates the allocation list from the allocation config file. This file is
* expected to be in the XML format specified in the design doc.
*
* @throws IOException if the config file cannot be read.
* @throws AllocationConfigurationException if allocations are invalid.
* @throws ParserConfigurationException if XML parser is misconfigured.
* @throws SAXException if config file is malformed.
*/
public synchronized void reloadAllocations()
throws IOException, ParserConfigurationException, SAXException,
AllocationConfigurationException {
if (allocFile == null) {
reloadListener.onReload(null);
return;
}
LOG.info("Loading allocation file " + allocFile);
// Read and parse the allocations file.
DocumentBuilderFactory docBuilderFactory =
DocumentBuilderFactory.newInstance();
docBuilderFactory.setIgnoringComments(true);
DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
Document doc = builder.parse(fs.open(allocFile));
Element root = doc.getDocumentElement();
if (!"allocations".equals(root.getTagName())) {
throw new AllocationConfigurationException("Bad fair scheduler config "
+ "file: top-level element not <allocations>");
}
NodeList elements = root.getChildNodes();
AllocationFileParser allocationFileParser =
new AllocationFileParser(elements);
allocationFileParser.parse();
AllocationFileQueueParser queueParser =
new AllocationFileQueueParser(allocationFileParser.getQueueElements());
QueueProperties queueProperties = queueParser.parse();
// Load placement policy
getQueuePlacementPolicy(allocationFileParser);
setupRootQueueProperties(allocationFileParser, queueProperties);
ReservationQueueConfiguration globalReservationQueueConfig =
createReservationQueueConfig(allocationFileParser);
AllocationConfiguration info = new AllocationConfiguration(queueProperties,
allocationFileParser, globalReservationQueueConfig);
lastSuccessfulReload = clock.getTime();
lastReloadAttemptFailed = false;
reloadListener.onReload(info);
}
private void getQueuePlacementPolicy(
AllocationFileParser allocationFileParser)
throws AllocationConfigurationException {
if (allocationFileParser.getQueuePlacementPolicy().isPresent()) {
QueuePlacementPolicy.fromXml(
allocationFileParser.getQueuePlacementPolicy().get(),
scheduler);
} else {
QueuePlacementPolicy.fromConfiguration(scheduler);
}
}
private void setupRootQueueProperties(
AllocationFileParser allocationFileParser,
QueueProperties queueProperties) {
// Set the min/fair share preemption timeout for the root queue
if (!queueProperties.getMinSharePreemptionTimeouts()
.containsKey(QueueManager.ROOT_QUEUE)) {
queueProperties.getMinSharePreemptionTimeouts().put(
QueueManager.ROOT_QUEUE,
allocationFileParser.getDefaultMinSharePreemptionTimeout());
}
if (!queueProperties.getFairSharePreemptionTimeouts()
.containsKey(QueueManager.ROOT_QUEUE)) {
queueProperties.getFairSharePreemptionTimeouts().put(
QueueManager.ROOT_QUEUE,
allocationFileParser.getDefaultFairSharePreemptionTimeout());
}
// Set the fair share preemption threshold for the root queue
if (!queueProperties.getFairSharePreemptionThresholds()
.containsKey(QueueManager.ROOT_QUEUE)) {
queueProperties.getFairSharePreemptionThresholds().put(
QueueManager.ROOT_QUEUE,
allocationFileParser.getDefaultFairSharePreemptionThreshold());
}
}
private ReservationQueueConfiguration createReservationQueueConfig(
AllocationFileParser allocationFileParser) {
ReservationQueueConfiguration globalReservationQueueConfig =
new ReservationQueueConfiguration();
if (allocationFileParser.getReservationPlanner().isPresent()) {
globalReservationQueueConfig
.setPlanner(allocationFileParser.getReservationPlanner().get());
}
if (allocationFileParser.getReservationAdmissionPolicy().isPresent()) {
globalReservationQueueConfig.setReservationAdmissionPolicy(
allocationFileParser.getReservationAdmissionPolicy().get());
}
if (allocationFileParser.getReservationAgent().isPresent()) {
globalReservationQueueConfig.setReservationAgent(
allocationFileParser.getReservationAgent().get());
}
return globalReservationQueueConfig;
}
/**
* Returns the list of default permissions.
* The default permission for the root queue is everybody ("*")
* and the default permission for all other queues is nobody ("").
* The default permission list would be loaded before the permissions
* from allocation file.
* @return default permission list
*/
protected List<Permission> getDefaultPermissions() {
if (defaultPermissions == null) {
defaultPermissions = new ArrayList<>();
Map<AccessType, AccessControlList> acls =
new HashMap<>();
for (QueueACL acl : QueueACL.values()) {
acls.put(SchedulerUtils.toAccessType(acl), EVERYBODY_ACL);
}
defaultPermissions.add(new Permission(
new PrivilegedEntity(EntityType.QUEUE, ROOT), acls));
}
return defaultPermissions;
}
public interface Listener {
void onReload(AllocationConfiguration info) throws IOException;
default void onCheck() {
}
}
}