blob: d5d4d908152ff73381b8124360062d4ac6b663e2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAPPING_RULE_JSON;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT_JSON;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSQueueConverter.QUEUE_MAX_AM_SHARE_DISABLED;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.security.ConfiguredYarnAuthorizer;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.schema.MappingRulesDescription;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfigurationException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.ConfigurableResource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import com.fasterxml.jackson.core.JsonGenerator;
/**
* Converts Fair Scheduler configuration (site and fair-scheduler.xml)
* to Capacity Scheduler. The mapping is not 100% perfect due to
* feature gaps. These will be addressed in the future.
*/
public class FSConfigToCSConfigConverter {
public static final Logger LOG = LoggerFactory.getLogger(
FSConfigToCSConfigConverter.class.getName());
public static final String MAPPING_RULES_JSON =
"mapping-rules.json";
private static final String YARN_SITE_XML = "yarn-site.xml";
private static final String CAPACITY_SCHEDULER_XML =
"capacity-scheduler.xml";
private static final String FAIR_SCHEDULER_XML =
"fair-scheduler.xml";
private Resource clusterResource;
private boolean preemptionEnabled = false;
private int queueMaxAppsDefault;
private float queueMaxAMShareDefault;
private Map<String, Integer> userMaxApps;
private int userMaxAppsDefault;
private boolean sizeBasedWeight = false;
private ConversionOptions conversionOptions;
private boolean drfUsed = false;
private Configuration convertedYarnSiteConfig;
private CapacitySchedulerConfiguration capacitySchedulerConfig;
private FSConfigToCSConfigRuleHandler ruleHandler;
private QueuePlacementConverter placementConverter;
private OutputStream yarnSiteOutputStream;
private OutputStream capacitySchedulerOutputStream;
private OutputStream mappingRulesOutputStream;
private boolean consoleMode = false;
private boolean convertPlacementRules = true;
private String outputDirectory;
private boolean rulesToFile;
private boolean usePercentages;
private FSConfigToCSConfigConverterParams.
PreemptionMode preemptionMode;
public FSConfigToCSConfigConverter(FSConfigToCSConfigRuleHandler
ruleHandler, ConversionOptions conversionOptions) {
this.ruleHandler = ruleHandler;
this.conversionOptions = conversionOptions;
this.yarnSiteOutputStream = System.out;
this.capacitySchedulerOutputStream = System.out;
this.placementConverter = new QueuePlacementConverter();
}
public void convert(FSConfigToCSConfigConverterParams params)
throws Exception {
validateParams(params);
this.clusterResource = getClusterResource(params);
this.convertPlacementRules = params.isConvertPlacementRules();
this.outputDirectory = params.getOutputDirectory();
this.rulesToFile = params.isPlacementRulesToFile();
this.usePercentages = params.isUsePercentages();
this.preemptionMode = params.getPreemptionMode();
prepareOutputFiles(params.isConsole());
loadConversionRules(params.getConversionRulesConfig());
Configuration inputYarnSiteConfig = getInputYarnSiteConfig(params);
handleFairSchedulerConfig(params, inputYarnSiteConfig);
convert(inputYarnSiteConfig);
}
private void prepareOutputFiles(boolean console)
throws FileNotFoundException {
if (console) {
LOG.info("Console mode is enabled, {}, {} and {} will be only emitted " +
"to the console!",
YARN_SITE_XML, CAPACITY_SCHEDULER_XML, MAPPING_RULES_JSON);
this.consoleMode = true;
return;
}
File yarnSiteXmlOutput = new File(outputDirectory,
YARN_SITE_XML);
File schedulerXmlOutput = new File(outputDirectory,
CAPACITY_SCHEDULER_XML);
LOG.info("Output directory for " + YARN_SITE_XML + " and" +
" " + CAPACITY_SCHEDULER_XML + " is: {}", outputDirectory);
this.yarnSiteOutputStream = new FileOutputStream(yarnSiteXmlOutput);
this.capacitySchedulerOutputStream =
new FileOutputStream(schedulerXmlOutput);
}
private void validateParams(FSConfigToCSConfigConverterParams params) {
if (params.getYarnSiteXmlConfig() == null) {
throw new PreconditionException("" + YARN_SITE_XML + " configuration " +
"is not defined but it is mandatory!");
} else if (params.getOutputDirectory() == null && !params.isConsole()) {
throw new PreconditionException("Output directory configuration " +
"is not defined but it is mandatory!");
}
}
private Resource getClusterResource(
FSConfigToCSConfigConverterParams params) {
Resource resource = null;
if (params.getClusterResource() != null) {
ConfigurableResource configurableResource;
try {
configurableResource = FairSchedulerConfiguration
.parseResourceConfigValue(params.getClusterResource());
} catch (AllocationConfigurationException e) {
throw new ConversionException("Error while parsing resource.", e);
}
resource = configurableResource.getResource();
}
return resource;
}
private void loadConversionRules(String rulesFile) throws IOException {
if (rulesFile != null) {
LOG.info("Reading conversion rules file from: " + rulesFile);
ruleHandler.loadRulesFromFile(rulesFile);
} else {
LOG.info("Conversion rules file is not defined, " +
"using default conversion config!");
}
ruleHandler.initPropertyActions();
}
private Configuration getInputYarnSiteConfig(
FSConfigToCSConfigConverterParams params) {
Configuration conf = new YarnConfiguration();
conf.addResource(new Path(params.getYarnSiteXmlConfig()));
return conf;
}
private void handleFairSchedulerConfig(
FSConfigToCSConfigConverterParams params, Configuration conf) {
String fairSchedulerXmlConfig = params.getFairSchedulerXmlConfig();
// Don't override allocation file in conf yet, as it would ruin the second
// condition here
if (fairSchedulerXmlConfig != null) {
LOG.info("Using explicitly defined " + FAIR_SCHEDULER_XML);
} else if (conf.get(FairSchedulerConfiguration.ALLOCATION_FILE) != null) {
LOG.info("Using " + FAIR_SCHEDULER_XML + " defined in " +
YARN_SITE_XML + " by key: " +
FairSchedulerConfiguration.ALLOCATION_FILE);
} else {
throw new PreconditionException("" + FAIR_SCHEDULER_XML +
" is not defined neither in " + YARN_SITE_XML +
"(with property: " + FairSchedulerConfiguration.ALLOCATION_FILE +
") nor directly with its own parameter!");
}
// We can now safely override allocation file in conf
if (fairSchedulerXmlConfig != null) {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
params.getFairSchedulerXmlConfig());
}
}
@VisibleForTesting
void convert(Configuration inputYarnSiteConfig) throws Exception {
// initialize Fair Scheduler
RMContext ctx = new RMContextImpl();
PlacementManager placementManager = new PlacementManager();
ctx.setQueuePlacementManager(placementManager);
// Prepare a separate config for the FS instance
// to force the use of ConfiguredYarnAuthorizer, otherwise
// it might use that of Ranger
Configuration fsConfig = new Configuration(inputYarnSiteConfig);
fsConfig.setBoolean(FairSchedulerConfiguration.MIGRATION_MODE, true);
fsConfig.setBoolean(FairSchedulerConfiguration.NO_TERMINAL_RULE_CHECK,
conversionOptions.isNoRuleTerminalCheck());
fsConfig.setClass(YarnConfiguration.YARN_AUTHORIZATION_PROVIDER,
ConfiguredYarnAuthorizer.class, YarnAuthorizationProvider.class);
FairScheduler fs = new FairScheduler();
fs.setRMContext(ctx);
fs.init(fsConfig);
drfUsed = isDrfUsed(fs);
AllocationConfiguration allocConf = fs.getAllocationConfiguration();
queueMaxAppsDefault = allocConf.getQueueMaxAppsDefault();
userMaxAppsDefault = allocConf.getUserMaxAppsDefault();
userMaxApps = allocConf.getUserMaxApps();
queueMaxAMShareDefault = allocConf.getQueueMaxAMShareDefault();
convertedYarnSiteConfig = new Configuration(false);
capacitySchedulerConfig =
new CapacitySchedulerConfiguration(new Configuration(false));
convertYarnSiteXml(inputYarnSiteConfig);
convertCapacitySchedulerXml(fs);
if (convertPlacementRules) {
performRuleConversion(fs);
}
if (consoleMode) {
System.out.println("======= " + CAPACITY_SCHEDULER_XML + " =======");
}
capacitySchedulerConfig.writeXml(capacitySchedulerOutputStream);
if (consoleMode) {
System.out.println();
System.out.println("======= " + YARN_SITE_XML + " =======");
}
convertedYarnSiteConfig.writeXml(yarnSiteOutputStream);
}
private void convertYarnSiteXml(Configuration inputYarnSiteConfig) {
FSYarnSiteConverter siteConverter =
new FSYarnSiteConverter();
siteConverter.convertSiteProperties(inputYarnSiteConfig,
convertedYarnSiteConfig, drfUsed,
conversionOptions.isEnableAsyncScheduler(),
usePercentages, preemptionMode);
preemptionEnabled = siteConverter.isPreemptionEnabled();
sizeBasedWeight = siteConverter.isSizeBasedWeight();
checkReservationSystem(inputYarnSiteConfig);
}
private void convertCapacitySchedulerXml(FairScheduler fs) {
FSParentQueue rootQueue = fs.getQueueManager().getRootQueue();
emitDefaultQueueMaxParallelApplications();
emitDefaultUserMaxParallelApplications();
emitUserMaxParallelApplications();
emitDefaultMaxAMShare();
emitDisablePreemptionForObserveOnlyMode();
FSQueueConverter queueConverter = FSQueueConverterBuilder.create()
.withRuleHandler(ruleHandler)
.withCapacitySchedulerConfig(capacitySchedulerConfig)
.withPreemptionEnabled(preemptionEnabled)
.withSizeBasedWeight(sizeBasedWeight)
.withClusterResource(clusterResource)
.withQueueMaxAMShareDefault(queueMaxAMShareDefault)
.withQueueMaxAppsDefault(queueMaxAppsDefault)
.withConversionOptions(conversionOptions)
.withDrfUsed(drfUsed)
.withPercentages(usePercentages)
.build();
queueConverter.convertQueueHierarchy(rootQueue);
emitACLs(fs);
}
private void performRuleConversion(FairScheduler fs)
throws IOException {
LOG.info("Converting placement rules");
PlacementManager placementManager =
fs.getRMContext().getQueuePlacementManager();
if (placementManager.getPlacementRules().size() > 0) {
mappingRulesOutputStream = getOutputStreamForJson();
MappingRulesDescription desc =
placementConverter.convertPlacementPolicy(placementManager,
ruleHandler, capacitySchedulerConfig, usePercentages);
ObjectMapper mapper = new ObjectMapper();
// close output stream if we write to a file, leave it open otherwise
if (!consoleMode && rulesToFile) {
mapper.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, true);
} else {
mapper.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false);
}
ObjectWriter writer = mapper.writer(new DefaultPrettyPrinter());
if (consoleMode && rulesToFile) {
System.out.println("======= " + MAPPING_RULES_JSON + " =======");
}
writer.writeValue(mappingRulesOutputStream, desc);
capacitySchedulerConfig.set(MAPPING_RULE_FORMAT,
MAPPING_RULE_FORMAT_JSON);
capacitySchedulerConfig.setOverrideWithQueueMappings(true);
if (!rulesToFile) {
String json =
((ByteArrayOutputStream)mappingRulesOutputStream)
.toString(StandardCharsets.UTF_8.displayName());
capacitySchedulerConfig.set(MAPPING_RULE_JSON, json);
}
} else {
LOG.info("No rules to convert");
}
}
/*
* Console RulesToFile OutputStream
* true true System.out / PrintStream
* true false ByteArrayOutputStream
* false true FileOutputStream
* false false ByteArrayOutputStream
*/
private OutputStream getOutputStreamForJson() throws FileNotFoundException {
if (consoleMode && rulesToFile) {
return System.out;
} else if (rulesToFile) {
File mappingRulesFile = new File(outputDirectory,
MAPPING_RULES_JSON);
return new FileOutputStream(mappingRulesFile);
} else {
return new ByteArrayOutputStream();
}
}
private void emitDefaultQueueMaxParallelApplications() {
if (queueMaxAppsDefault != Integer.MAX_VALUE) {
capacitySchedulerConfig.set(
PREFIX + "max-parallel-apps",
String.valueOf(queueMaxAppsDefault));
}
}
private void emitDefaultUserMaxParallelApplications() {
if (userMaxAppsDefault != Integer.MAX_VALUE) {
capacitySchedulerConfig.set(
PREFIX + "user.max-parallel-apps",
String.valueOf(userMaxAppsDefault));
}
}
private void emitUserMaxParallelApplications() {
userMaxApps
.forEach((user, apps) -> {
capacitySchedulerConfig.setInt(
PREFIX + "user." + user + ".max-parallel-apps", apps);
});
}
private void emitDefaultMaxAMShare() {
if (queueMaxAMShareDefault == QUEUE_MAX_AM_SHARE_DISABLED) {
capacitySchedulerConfig.setFloat(
CapacitySchedulerConfiguration.
MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
1.0f);
} else {
capacitySchedulerConfig.setFloat(
CapacitySchedulerConfiguration.
MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
queueMaxAMShareDefault);
}
}
private void emitDisablePreemptionForObserveOnlyMode() {
if (preemptionMode == FSConfigToCSConfigConverterParams
.PreemptionMode.OBSERVE_ONLY) {
capacitySchedulerConfig.
setBoolean(CapacitySchedulerConfiguration.
PREEMPTION_OBSERVE_ONLY, true);
}
}
private void emitACLs(FairScheduler fs) {
fs.getAllocationConfiguration().getQueueAcls()
.forEach(this::generateQueueAcl);
}
private void generateQueueAcl(String queue,
Map<AccessType, AccessControlList> access) {
AccessControlList submitAcls = access.get(AccessType.SUBMIT_APP);
AccessControlList adminAcls = access.get(AccessType.ADMINISTER_QUEUE);
if (!submitAcls.getGroups().isEmpty() ||
!submitAcls.getUsers().isEmpty()) {
capacitySchedulerConfig.set(PREFIX + queue + ".acl_submit_applications",
submitAcls.getAclString());
}
if (!adminAcls.getGroups().isEmpty() ||
!adminAcls.getUsers().isEmpty()) {
capacitySchedulerConfig.set(PREFIX + queue + ".acl_administer_queue",
adminAcls.getAclString());
}
}
private void checkReservationSystem(Configuration conf) {
if (conf.getBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE,
YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_ENABLE)) {
ruleHandler.handleReservationSystem();
}
}
private boolean isDrfUsed(FairScheduler fs) {
FSQueue rootQueue = fs.getQueueManager().getRootQueue();
AllocationConfiguration allocConf = fs.getAllocationConfiguration();
String defaultPolicy = allocConf.getDefaultSchedulingPolicy().getName();
return DominantResourceFairnessPolicy.NAME.equals(defaultPolicy) ||
isDrfUsedOnQueueLevel(rootQueue);
}
private boolean isDrfUsedOnQueueLevel(FSQueue queue) {
String policy = queue.getPolicy().getName();
boolean usesDrf = DominantResourceFairnessPolicy.NAME.equals(policy);
if (usesDrf) {
return true;
} else {
List<FSQueue> children = queue.getChildQueues();
if (children != null) {
for (FSQueue child : children) {
usesDrf |= isDrfUsedOnQueueLevel(child);
}
}
return usesDrf;
}
}
@VisibleForTesting
Resource getClusterResource() {
return clusterResource;
}
@VisibleForTesting
void setClusterResource(Resource clusterResource) {
this.clusterResource = clusterResource;
}
@VisibleForTesting
FSConfigToCSConfigRuleHandler getRuleHandler() {
return ruleHandler;
}
@VisibleForTesting
Configuration getYarnSiteConfig() {
return convertedYarnSiteConfig;
}
@VisibleForTesting
Configuration getCapacitySchedulerConfig() {
return capacitySchedulerConfig;
}
@VisibleForTesting
void setConvertPlacementRules(boolean convertPlacementRules) {
this.convertPlacementRules = convertPlacementRules;
}
@VisibleForTesting
void setPlacementConverter(QueuePlacementConverter converter) {
this.placementConverter = converter;
}
@VisibleForTesting
void setConsoleMode(boolean console) {
this.consoleMode = console;
}
}