blob: 2ec3cda7ff7957a2c280012a2f632ef36cde4176 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.core.launch;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.Records;
import org.apache.slider.api.ResourceKeys;
import org.apache.slider.api.RoleKeys;
import org.apache.slider.common.SliderKeys;
import org.apache.slider.common.tools.CoreFileSystem;
import org.apache.slider.common.tools.SliderUtils;
import org.apache.slider.core.conf.MapOperations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Method;
import java.lang.reflect.InvocationTargetException;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Launcher of applications: base class
*/
public abstract class AbstractLauncher extends Configured {
private static final Logger log =
LoggerFactory.getLogger(AbstractLauncher.class);
public static final String CLASSPATH = "CLASSPATH";
/**
* Filesystem to use for the launch
*/
protected final CoreFileSystem coreFileSystem;
/**
* Env vars; set up at final launch stage
*/
protected final Map<String, String> envVars = new HashMap<>();
protected final MapOperations env = new MapOperations("env", envVars);
protected final ContainerLaunchContext containerLaunchContext =
Records.newRecord(ContainerLaunchContext.class);
protected final List<String> commands = new ArrayList<>(20);
protected final Map<String, LocalResource> localResources = new HashMap<>();
private final Map<String, ByteBuffer> serviceData = new HashMap<>();
// security
protected final Credentials credentials;
protected LogAggregationContext logAggregationContext;
/**
* Create instance.
* @param conf configuration
* @param coreFileSystem filesystem
* @param credentials initial set of credentials -null is permitted
*/
protected AbstractLauncher(Configuration conf,
CoreFileSystem coreFileSystem,
Credentials credentials) {
super(conf);
this.coreFileSystem = coreFileSystem;
this.credentials = credentials != null ? credentials: new Credentials();
}
/**
* Get the container. Until "completed", this isn't valid to launch.
* @return the container to launch
*/
public ContainerLaunchContext getContainerLaunchContext() {
return containerLaunchContext;
}
/**
* Get the env vars to work on
* @return env vars
*/
public MapOperations getEnv() {
return env;
}
/**
* Get the launch commands.
* @return the live list of commands
*/
public List<String> getCommands() {
return commands;
}
/**
* Get the map of local resources.
* @return the live map of local resources.
*/
public Map<String, LocalResource> getLocalResources() {
return localResources;
}
public void addLocalResource(String subpath, LocalResource resource) {
localResources.put(subpath, resource);
}
/**
* Add a set of local resources
* @param resourceMap map of name:resource to add
*/
public void addLocalResources(Map<String, LocalResource> resourceMap) {
localResources.putAll(resourceMap);
}
public Map<String, ByteBuffer> getServiceData() {
return serviceData;
}
/**
* Accessor to the credentials
* @return the credentials associated with this launcher
*/
public Credentials getCredentials() {
return credentials;
}
/**
* Add a command line. It is converted to a single command before being
* added.
* @param cmd
*/
public void addCommandLine(CommandLineBuilder cmd) {
commands.add(cmd.build());
}
public void addCommand(String cmd) {
commands.add(cmd);
}
/**
* Add a list of commands. Each element in the list becomes a single command
* @param commandList list of commands
*/
public void addCommands(List<String> commandList) {
commands.addAll(commandList);
}
/**
* Get all commands as a string, separated by ";". This is for diagnostics
* @return a string description of the commands
*/
public String getCommandsAsString() {
return SliderUtils.join(getCommands(), "; ");
}
/**
* Complete the launch context (copy in env vars, etc).
* @return the container to launch
*/
public ContainerLaunchContext completeContainerLaunch() throws IOException {
String cmdStr = SliderUtils.join(commands, " ", false);
log.debug("Completed setting up container command {}", cmdStr);
containerLaunchContext.setCommands(commands);
//env variables
if (log.isDebugEnabled()) {
log.debug("Environment variables");
for (Map.Entry<String, String> envPair : envVars.entrySet()) {
log.debug(" \"{}\"=\"{}\"", envPair.getKey(), envPair.getValue());
}
}
containerLaunchContext.setEnvironment(env);
//service data
if (log.isDebugEnabled()) {
log.debug("Service Data size");
for (Map.Entry<String, ByteBuffer> entry : serviceData.entrySet()) {
log.debug("\"{}\"=> {} bytes of data", entry.getKey(),
entry.getValue().array().length);
}
}
containerLaunchContext.setServiceData(serviceData);
// resources
dumpLocalResources();
containerLaunchContext.setLocalResources(localResources);
//tokens
log.debug("{} tokens", credentials.numberOfTokens());
containerLaunchContext.setTokens(CredentialUtils.marshallCredentials(
credentials));
return containerLaunchContext;
}
/**
* Dump local resources at debug level
*/
private void dumpLocalResources() {
if (log.isDebugEnabled()) {
log.debug("{} resources: ", localResources.size());
for (Map.Entry<String, LocalResource> entry : localResources.entrySet()) {
String key = entry.getKey();
LocalResource val = entry.getValue();
log.debug(key + "=" + SliderUtils.stringify(val.getResource()));
}
}
}
/**
* This is critical for an insecure cluster -it passes
* down the username to YARN, and so gives the code running
* in containers the rights it needs to work with
* data.
* @throws IOException problems working with current user
*/
protected void propagateUsernameInInsecureCluster() throws IOException {
//insecure cluster: propagate user name via env variable
String userName = UserGroupInformation.getCurrentUser().getUserName();
env.put(SliderKeys.HADOOP_USER_NAME, userName);
}
/**
* Extract any resource requirements from this component's settings.
* All fields that are set will override the existing values -if
* unset that resource field will be left unchanged.
*
* Important: the configuration must already be fully resolved
* in order to pick up global options.
* @param resource resource to configure
* @param map map of options
*/
public void extractResourceRequirements(Resource resource,
Map<String, String> map) {
if (map != null) {
MapOperations options = new MapOperations("", map);
resource.setMemory(options.getOptionInt(ResourceKeys.YARN_MEMORY,
resource.getMemory()));
resource.setVirtualCores(options.getOptionInt(ResourceKeys.YARN_CORES,
resource.getVirtualCores()));
}
}
/**
* Extract the value for option
* {@code yarn.resourcemanager.am.retry-count-window-ms}
* and set it on the ApplicationSubmissionContext. Use the default value
* if option is not set.
*
* @param submissionContext
* @param map
*/
public void extractAmRetryCount(ApplicationSubmissionContext submissionContext,
Map<String, String> map) {
if (map != null) {
MapOperations options = new MapOperations("", map);
long amRetryCountWindow = options.getOptionLong(ResourceKeys
.YARN_RESOURCEMANAGER_AM_RETRY_COUNT_WINDOW_MS,
ResourceKeys.DEFAULT_AM_RETRY_COUNT_WINDOW_MS);
log.info("Setting {} to {}",
ResourceKeys.YARN_RESOURCEMANAGER_AM_RETRY_COUNT_WINDOW_MS,
amRetryCountWindow);
submissionContext.setAttemptFailuresValidityInterval(amRetryCountWindow);
}
}
public void extractLogAggregationContext(Map<String, String> map) {
if (map != null) {
String logPatternSepStr = "\\|";
String logPatternJoinStr = "|";
MapOperations options = new MapOperations("", map);
List<String> logIncludePatterns = new ArrayList<>();
String includePatternExpression = options.getOption(
ResourceKeys.YARN_LOG_INCLUDE_PATTERNS, "").trim();
if (!includePatternExpression.isEmpty()) {
String[] includePatterns = includePatternExpression
.split(logPatternSepStr);
for (String includePattern : includePatterns) {
String trimmedIncludePattern = includePattern.trim();
if (!trimmedIncludePattern.isEmpty()) {
logIncludePatterns.add(trimmedIncludePattern);
}
}
}
String logIncludePattern = StringUtils.join(logIncludePatterns,
logPatternJoinStr);
log.info("Log include patterns: {}", logIncludePattern);
List<String> logExcludePatterns = new ArrayList<>();
String excludePatternExpression = options.getOption(
ResourceKeys.YARN_LOG_EXCLUDE_PATTERNS, "").trim();
if (!excludePatternExpression.isEmpty()) {
String[] excludePatterns = excludePatternExpression
.split(logPatternSepStr);
for (String excludePattern : excludePatterns) {
String trimmedExcludePattern = excludePattern.trim();
if (!trimmedExcludePattern.isEmpty()) {
logExcludePatterns.add(trimmedExcludePattern);
}
}
}
String logExcludePattern = StringUtils.join(logExcludePatterns,
logPatternJoinStr);
log.info("Log exclude patterns: {}", logExcludePattern);
// SLIDER-810/YARN-3154 - hadoop 2.7.0 onwards a new instance method has
// been added for log aggregation for LRS. Existing newInstance method's
// behavior has changed and is used for log aggregation only after the
// application has finished. This forces Slider users to move to hadoop
// 2.7.0+ just for log aggregation, which is not very desirable. So we
// decided to use reflection here to find out if the new 2.7.0 newInstance
// method is available. If yes, then we use it, so log aggregation will
// work in hadoop 2.7.0+ env. If no, then we fallback to the pre-2.7.0
// newInstance method, which means log aggregation will work as expected
// in hadoop 2.6 as well.
// TODO: At some point, say 2-3 Slider releases down, when most users are
// running hadoop 2.7.0, we should get rid of the reflection code here.
try {
Method logAggregationContextMethod = LogAggregationContext.class
.getMethod("newInstance", String.class, String.class, String.class,
String.class);
// Need to set include/exclude patterns appropriately since by default
// rolled log aggregation is not done for any files, so defaults are
// - include pattern set to ""
// - exclude pattern set to "*"
// For Slider we want all logs to be uploaded if include/exclude
// patterns are left empty by the app owner in resources file
if (StringUtils.isEmpty(logIncludePattern)
&& StringUtils.isEmpty(logExcludePattern)) {
logIncludePattern = ".*";
logExcludePattern = "";
} else if (StringUtils.isEmpty(logIncludePattern)
&& StringUtils.isNotEmpty(logExcludePattern)) {
logIncludePattern = ".*";
} else if (StringUtils.isNotEmpty(logIncludePattern)
&& StringUtils.isEmpty(logExcludePattern)) {
logExcludePattern = "";
}
log.debug("LogAggregationContext newInstance method for rolled logs "
+ "include/exclude patterns is available");
log.info("Modified log include patterns: {}", logIncludePattern);
log.info("Modified log exclude patterns: {}", logExcludePattern);
logAggregationContext = (LogAggregationContext) logAggregationContextMethod
.invoke(null, null, null, logIncludePattern, logExcludePattern);
} catch (NoSuchMethodException | SecurityException
| IllegalAccessException | IllegalArgumentException
| InvocationTargetException e) {
log.debug("LogAggregationContext newInstance method for rolled logs "
+ "include/exclude patterns is not available - fallback to old one");
log.debug(e.toString());
logAggregationContext = LogAggregationContext.newInstance(
logIncludePattern, logExcludePattern);
}
}
}
/**
* Utility method to set up the classpath
* @param classpath classpath to use
*/
public void setClasspath(ClasspathConstructor classpath) {
setEnv(CLASSPATH, classpath.buildClasspath());
}
/**
* Set an environment variable in the launch context
* @param var variable name
* @param value value (must be non null)
*/
public void setEnv(String var, String value) {
Preconditions.checkArgument(var != null, "null variable name");
Preconditions.checkArgument(value != null, "null value");
env.put(var, value);
}
/**
* Set an environment variable if its value is non-null.
* @param var variable name
* @param value value (may be null)
*/
public void maybeSetEnv(String var, String value) {
if (value != null) {
setEnv(var, value);
}
}
public void putEnv(Map<String, String> map) {
env.putAll(map);
}
/**
* Important: the configuration must already be fully resolved
* in order to pick up global options
* Copy env vars into the launch context.
*/
public boolean copyEnvVars(MapOperations options) {
if (options == null) {
return false;
}
for (Map.Entry<String, String> entry : options.entrySet()) {
String key = entry.getKey();
if (key.startsWith(RoleKeys.ENV_PREFIX)) {
key = key.substring(RoleKeys.ENV_PREFIX.length());
env.put(key, entry.getValue());
}
}
return true;
}
public String[] dumpEnvToString() {
List<String> nodeEnv = new ArrayList<>();
for (Map.Entry<String, String> entry : env.entrySet()) {
String envElt = String.format("%s=\"%s\"",
entry.getKey(),
entry.getValue());
log.debug(envElt);
nodeEnv.add(envElt);
}
String[] envDescription = nodeEnv.toArray(new String[nodeEnv.size()]);
return envDescription;
}
/**
* Submit an entire directory
* @param srcDir src path in filesystem
* @param destRelativeDir relative path under destination local dir
* @throws IOException IO problems
*/
public void submitDirectory(Path srcDir, String destRelativeDir)
throws IOException {
//add the configuration resources
Map<String, LocalResource> confResources;
confResources = coreFileSystem.submitDirectory(
srcDir,
destRelativeDir);
addLocalResources(confResources);
}
/**
* Return the label expression and if not set null
* @param map map to look up
* @return extracted label or null
*/
public String extractLabelExpression(Map<String, String> map) {
if (map != null) {
MapOperations options = new MapOperations("", map);
return options.getOption(ResourceKeys.YARN_LABEL_EXPRESSION, null);
}
return null;
}
}