blob: 97d9933109a0599e429c8ac6112facfa939c8b25 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import com.google.common.annotations.VisibleForTesting;
import com.google.gson.GsonBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
/**
* A filesystem implementation of {@link YarnConfigurationStore}. Offer
* configuration storage in FileSystem
*/
public class FSSchedulerConfigurationStore extends YarnConfigurationStore {
public static final Logger LOG = LoggerFactory.getLogger(
FSSchedulerConfigurationStore.class);
@VisibleForTesting
protected static final Version CURRENT_VERSION_INFO
= Version.newInstance(0, 1);
private static final String TMP = ".tmp";
private int maxVersion;
private Path schedulerConfDir;
private FileSystem fileSystem;
private PathFilter configFilePathFilter;
private volatile Configuration schedConf;
private volatile Configuration oldConf;
private Path tempConfigPath;
private Path configVersionFile;
@Override
public void initialize(Configuration fsConf, Configuration vSchedConf,
RMContext rmContext) throws Exception {
this.configFilePathFilter = new PathFilter() {
@Override
public boolean accept(Path path) {
if (path == null) {
return false;
}
String pathName = path.getName();
return pathName.startsWith(YarnConfiguration.CS_CONFIGURATION_FILE)
&& !pathName.endsWith(TMP);
}
};
Configuration conf = new Configuration(fsConf);
String schedulerConfPathStr = conf.get(
YarnConfiguration.SCHEDULER_CONFIGURATION_FS_PATH);
if (schedulerConfPathStr == null || schedulerConfPathStr.isEmpty()) {
throw new IOException(
YarnConfiguration.SCHEDULER_CONFIGURATION_FS_PATH
+ " must be set");
}
this.schedulerConfDir = new Path(schedulerConfPathStr);
String scheme = schedulerConfDir.toUri().getScheme();
if (scheme == null) {
scheme = FileSystem.getDefaultUri(conf).getScheme();
}
if (scheme != null) {
String disableCacheName = String.format("fs.%s.impl.disable.cache",
scheme);
conf.setBoolean(disableCacheName, true);
}
this.fileSystem = this.schedulerConfDir.getFileSystem(conf);
this.maxVersion = conf.getInt(
YarnConfiguration.SCHEDULER_CONFIGURATION_FS_MAX_VERSION,
YarnConfiguration.DEFAULT_SCHEDULER_CONFIGURATION_FS_MAX_VERSION);
LOG.info("schedulerConfDir=" + schedulerConfPathStr);
LOG.info("capacity scheduler file max version = " + maxVersion);
if (!fileSystem.exists(schedulerConfDir)) {
if (!fileSystem.mkdirs(schedulerConfDir)) {
throw new IOException("mkdir " + schedulerConfPathStr + " failed");
}
}
this.configVersionFile = new Path(schedulerConfPathStr, "ConfigVersion");
if (!fileSystem.exists(configVersionFile)) {
fileSystem.createNewFile(configVersionFile);
writeConfigVersion(0L);
}
// create capacity-schedule.xml.ts file if not existing
if (this.getConfigFileInputStream() == null) {
writeConfigurationToFileSystem(vSchedConf);
long configVersion = getConfigVersion() + 1L;
writeConfigVersion(configVersion);
}
this.schedConf = this.getConfigurationFromFileSystem();
}
/**
* Update and persist latest configuration in temp file.
* @param logMutation configuration change to be persisted in write ahead log
* @throws IOException throw IOE when write temp configuration file fail
*/
@Override
public void logMutation(LogMutation logMutation) throws IOException {
LOG.info(new GsonBuilder().serializeNulls().create().toJson(logMutation));
oldConf = new Configuration(schedConf);
Map<String, String> mutations = logMutation.getUpdates();
for (Map.Entry<String, String> kv : mutations.entrySet()) {
if (kv.getValue() == null) {
this.schedConf.unset(kv.getKey());
} else {
this.schedConf.set(kv.getKey(), kv.getValue());
}
}
tempConfigPath = writeTmpConfig(schedConf);
}
/**
* @param pendingMutation the log mutation to apply
* @param isValid if true, finalize temp configuration file
* if false, remove temp configuration file and rollback
* @throws Exception throw IOE when write temp configuration file fail
*/
@Override
public void confirmMutation(LogMutation pendingMutation,
boolean isValid) throws Exception {
if (pendingMutation == null || tempConfigPath == null) {
LOG.warn("pendingMutation or tempConfigPath is null, do nothing");
return;
}
if (isValid) {
finalizeFileSystemFile();
long configVersion = getConfigVersion() + 1L;
writeConfigVersion(configVersion);
} else {
schedConf = oldConf;
removeTmpConfigFile();
}
tempConfigPath = null;
}
private void finalizeFileSystemFile() throws IOException {
// call confirmMutation() make sure tempConfigPath is not null
Path finalConfigPath = getFinalConfigPath(tempConfigPath);
fileSystem.rename(tempConfigPath, finalConfigPath);
LOG.info("finalize temp configuration file successfully, finalConfigPath="
+ finalConfigPath);
}
@Override
public void format() throws Exception {
FileStatus[] fileStatuses = fileSystem.listStatus(this.schedulerConfDir,
this.configFilePathFilter);
if (fileStatuses == null) {
return;
}
for (int i = 0; i < fileStatuses.length; i++) {
fileSystem.delete(fileStatuses[i].getPath(), false);
LOG.info("delete config file " + fileStatuses[i].getPath());
}
}
private Path getFinalConfigPath(Path tempPath) {
String tempConfigPathStr = tempPath.getName();
if (!tempConfigPathStr.endsWith(TMP)) {
LOG.warn(tempPath + " does not end with '"
+ TMP + "' return null");
return null;
}
String finalConfigPathStr = tempConfigPathStr.substring(0,
(tempConfigPathStr.length() - TMP.length()));
return new Path(tempPath.getParent(), finalConfigPathStr);
}
private void removeTmpConfigFile() throws IOException {
// call confirmMutation() make sure tempConfigPath is not null
fileSystem.delete(tempConfigPath, true);
LOG.info("delete temp configuration file: " + tempConfigPath);
}
private Configuration getConfigurationFromFileSystem() throws IOException {
long start = Time.monotonicNow();
Configuration conf = new Configuration(false);
InputStream configInputStream = getConfigFileInputStream();
if (configInputStream == null) {
throw new IOException(
"no capacity scheduler file in " + this.schedulerConfDir);
}
conf.addResource(configInputStream);
Configuration result = new Configuration(false);
for (Map.Entry<String, String> entry : conf) {
result.set(entry.getKey(), entry.getValue());
}
LOG.info("upload conf from fileSystem took "
+ (Time.monotonicNow() - start) + " ms");
//for ha transition, local schedConf may be old one.
this.schedConf = result;
return result;
}
private InputStream getConfigFileInputStream() throws IOException {
Path lastestConfigPath = getLatestConfigPath();
if (lastestConfigPath == null) {
return null;
}
return fileSystem.open(lastestConfigPath);
}
private Path getLatestConfigPath() throws IOException {
FileStatus[] fileStatuses = fileSystem.listStatus(this.schedulerConfDir,
this.configFilePathFilter);
if (fileStatuses == null || fileStatuses.length == 0) {
return null;
}
Arrays.sort(fileStatuses);
return fileStatuses[fileStatuses.length - 1].getPath();
}
private void writeConfigVersion(long configVersion) throws IOException {
try (FSDataOutputStream out = fileSystem.create(configVersionFile, true)) {
out.writeLong(configVersion);
} catch (IOException e) {
LOG.info("Failed to write config version at {}", configVersionFile, e);
throw e;
}
}
@Override
public long getConfigVersion() throws Exception {
try (FSDataInputStream in = fileSystem.open(configVersionFile)) {
return in.readLong();
} catch (IOException e) {
LOG.info("Failed to read config version at {}", configVersionFile, e);
throw e;
}
}
@VisibleForTesting
private Path writeTmpConfig(Configuration vSchedConf) throws IOException {
long start = Time.monotonicNow();
String tempSchedulerConfigFile = YarnConfiguration.CS_CONFIGURATION_FILE
+ "." + System.currentTimeMillis() + TMP;
Path tempSchedulerConfigPath = new Path(this.schedulerConfDir,
tempSchedulerConfigFile);
try (FSDataOutputStream outputStream = fileSystem.create(
tempSchedulerConfigPath)) {
//clean configuration file when num exceed maxVersion
cleanConfigurationFile();
vSchedConf.writeXml(outputStream);
LOG.info(
"write temp capacity configuration successfully, schedulerConfigFile="
+ tempSchedulerConfigPath);
} catch (IOException e) {
LOG.info("write temp capacity configuration fail, schedulerConfigFile="
+ tempSchedulerConfigPath, e);
throw e;
}
LOG.info("write temp configuration to fileSystem took "
+ (Time.monotonicNow() - start) + " ms");
return tempSchedulerConfigPath;
}
@VisibleForTesting
void writeConfigurationToFileSystem(Configuration vSchedConf)
throws IOException {
tempConfigPath = writeTmpConfig(vSchedConf);
finalizeFileSystemFile();
}
private void cleanConfigurationFile() throws IOException {
FileStatus[] fileStatuses = fileSystem.listStatus(this.schedulerConfDir,
this.configFilePathFilter);
if (fileStatuses == null || fileStatuses.length <= this.maxVersion) {
return;
}
Arrays.sort(fileStatuses);
int configFileNum = fileStatuses.length;
if (fileStatuses.length > this.maxVersion) {
for (int i = 0; i < configFileNum - this.maxVersion; i++) {
fileSystem.delete(fileStatuses[i].getPath(), false);
LOG.info("delete config file " + fileStatuses[i].getPath());
}
}
}
@Override
public Configuration retrieve() throws IOException {
return getConfigurationFromFileSystem();
}
@Override
public List<LogMutation> getConfirmedConfHistory(long fromId) {
// Unimplemented.
return null;
}
@Override
protected LinkedList<LogMutation> getLogs() {
// Unimplemented.
return null;
}
@Override
protected Version getConfStoreVersion() throws Exception {
return null;
}
@Override
protected void storeVersion() throws Exception {
}
@Override
protected Version getCurrentVersion() {
return CURRENT_VERSION_INFO;
}
public void close() throws IOException {
if (fileSystem != null) {
fileSystem.close();
}
}
}