blob: 18c3fce6c15ce4c5dcdadec0248df432a5e91f34 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.myriad.scheduler;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.myriad.configuration.MyriadConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implementation assumes NM binaries already deployed
*/
public class NMExecutorCLGenImpl implements ExecutorCommandLineGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(NMExecutorCLGenImpl.class);
public static final String ENV_YARN_NODEMANAGER_OPTS = "YARN_NODEMANAGER_OPTS";
public static final String KEY_YARN_NM_CGROUPS_PATH = "yarn.nodemanager.cgroups.path";
public static final String KEY_YARN_RM_HOSTNAME = "yarn.resourcemanager.hostname";
/**
* YARN class to help handle LCE resources
*/
// TODO (mohit): Should it be configurable ?
public static final String KEY_YARN_NM_LCE_CGROUPS_HIERARCHY = "yarn.nodemanager.linux-container-executor.cgroups.hierarchy";
public static final String KEY_YARN_HOME = "yarn.home";
public static final String KEY_NM_RESOURCE_CPU_VCORES = "nodemanager.resource.cpu-vcores";
public static final String KEY_NM_RESOURCE_MEM_MB = "nodemanager.resource.memory-mb";
public static final String YARN_NM_CMD = " $YARN_HOME/bin/yarn nodemanager";
public static final String KEY_NM_ADDRESS = "myriad.yarn.nodemanager.address";
public static final String KEY_NM_LOCALIZER_ADDRESS = "myriad.yarn.nodemanager.localizer.address";
public static final String KEY_NM_WEBAPP_ADDRESS = "myriad.yarn.nodemanager.webapp.address";
public static final String KEY_NM_SHUFFLE_PORT = "myriad.mapreduce.shuffle.port";
private static final String ALL_LOCAL_IPV4ADDR = "0.0.0.0:";
private static final String PROPERTY_FORMAT = "-D%s=%s";
private Map<String, String> environment = new HashMap<>();
protected MyriadConfiguration cfg;
protected YarnConfiguration conf = new YarnConfiguration();
public NMExecutorCLGenImpl(MyriadConfiguration cfg) {
this.cfg = cfg;
}
@Override
public String generateCommandLine(ServiceResourceProfile profile, Ports ports) {
StringBuilder cmdLine = new StringBuilder();
generateEnvironment(profile, (NMPorts) ports);
appendCgroupsCmds(cmdLine);
appendYarnHomeExport(cmdLine);
appendEnvForNM(cmdLine);
cmdLine.append(YARN_NM_CMD);
return cmdLine.toString();
}
protected void generateEnvironment(ServiceResourceProfile profile, NMPorts ports) {
//yarnEnvironemnt configuration from yaml file
Map<String, String> yarnEnvironmentMap = cfg.getYarnEnvironment();
if (yarnEnvironmentMap != null) {
environment.putAll(yarnEnvironmentMap);
}
String rmHostName = System.getProperty(KEY_YARN_RM_HOSTNAME);
if (rmHostName != null && !rmHostName.isEmpty()) {
addYarnNodemanagerOpt(KEY_YARN_RM_HOSTNAME, rmHostName);
}
if (cfg.getNodeManagerConfiguration().getCgroups().or(Boolean.FALSE)) {
addYarnNodemanagerOpt(KEY_YARN_NM_LCE_CGROUPS_HIERARCHY, "mesos/$TASK_DIR");
if (environment.containsKey("YARN_HOME")) {
addYarnNodemanagerOpt(KEY_YARN_HOME, environment.get("YARN_HOME"));
}
}
addYarnNodemanagerOpt(KEY_NM_RESOURCE_CPU_VCORES, Integer.toString(profile.getCpus().intValue()));
addYarnNodemanagerOpt(KEY_NM_RESOURCE_MEM_MB, Integer.toString(profile.getMemory().intValue()));
addYarnNodemanagerOpt(KEY_NM_ADDRESS, ALL_LOCAL_IPV4ADDR + Long.valueOf(ports.getRpcPort()).toString());
addYarnNodemanagerOpt(KEY_NM_LOCALIZER_ADDRESS, ALL_LOCAL_IPV4ADDR + Long.valueOf(ports.getLocalizerPort()).toString());
addYarnNodemanagerOpt(KEY_NM_WEBAPP_ADDRESS, ALL_LOCAL_IPV4ADDR + Long.valueOf(ports.getWebAppHttpPort()).toString());
addYarnNodemanagerOpt(KEY_NM_SHUFFLE_PORT, Long.valueOf(ports.getShufflePort()).toString());
}
protected void appendEnvForNM(StringBuilder cmdLine) {
cmdLine.append(" env ");
for (Map.Entry<String, String> env : environment.entrySet()) {
cmdLine.append(env.getKey()).append("=").append("\"").append(env.getValue()).append("\" ");
}
}
protected void appendCgroupsCmds(StringBuilder cmdLine) {
if (cfg.getFrameworkSuperUser().isPresent()) {
cmdLine.append(" export TASK_DIR=`basename $PWD`&&");
//The container executor script expects mount-path to exist and owned by the yarn user
//See: https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/NodeManagerCgroups.html
//If YARN ever moves to cgroup/mem it will be necessary to add a mem version.
appendSudo(cmdLine);
cmdLine.append("chown " + cfg.getFrameworkUser().get() + " ");
cmdLine.append(cfg.getCGroupPath());
cmdLine.append("/cpu/mesos/$TASK_DIR &&");
} else {
LOGGER.info("frameworkSuperUser not enabled ignoring cgroup configuration");
}
}
protected void appendYarnHomeExport(StringBuilder cmdLine) {
if (environment.containsKey("YARN_HOME")) {
cmdLine.append(" export YARN_HOME=");
cmdLine.append(environment.get("YARN_HOME"));
cmdLine.append(";");
}
}
protected void appendSudo(StringBuilder cmdLine) {
if (cfg.getFrameworkSuperUser().isPresent()) {
cmdLine.append(" sudo ");
}
}
protected void appendUserSudo(StringBuilder cmdLine) {
if (cfg.getFrameworkSuperUser().isPresent()) {
cmdLine.append(" sudo -E -u ");
cmdLine.append(cfg.getFrameworkUser().get());
cmdLine.append(" -H ");
}
}
protected void addYarnNodemanagerOpt(String propertyName, String propertyValue) {
String envOpt = String.format(PROPERTY_FORMAT, propertyName, propertyValue);
if (environment.containsKey(ENV_YARN_NODEMANAGER_OPTS)) {
String existingOpts = environment.get(ENV_YARN_NODEMANAGER_OPTS);
environment.put(ENV_YARN_NODEMANAGER_OPTS, existingOpts + " " + envOpt);
} else {
environment.put(ENV_YARN_NODEMANAGER_OPTS, envOpt);
}
}
@Override
public String getConfigurationUrl() {
String httpPolicy = conf.get(TaskFactory.YARN_HTTP_POLICY);
if (httpPolicy != null && httpPolicy.equals(TaskFactory.YARN_HTTP_POLICY_HTTPS_ONLY)) {
String address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_WEBAPP_HTTPS_ADDRESS);
if (address == null || address.isEmpty()) {
address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_HOSTNAME) + ":8090";
}
return "https://" + address + "/conf";
} else {
String address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_WEBAPP_ADDRESS);
if (address == null || address.isEmpty()) {
address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_HOSTNAME) + ":8088";
}
return "http://" + address + "/conf";
}
}
}