blob: d9d5b1f307662a0b1726964224252f121bfbe8e4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.File;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Vector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.TaskLog.LogName;
import org.apache.hadoop.mapreduce.ID;
import org.apache.hadoop.util.StringUtils;
public class MapReduceChildJVM {
private static final String SYSTEM_PATH_SEPARATOR =
System.getProperty("path.separator");
private static final Log LOG = LogFactory.getLog(MapReduceChildJVM.class);
private static File getTaskLogFile(String logDir, LogName filter) {
return new File(logDir, filter.toString());
}
private static String getChildEnv(JobConf jobConf, boolean isMap) {
if (isMap) {
return jobConf.get(JobConf.MAPRED_MAP_TASK_ENV,
jobConf.get(JobConf.MAPRED_TASK_ENV));
}
return jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV,
jobConf.get(jobConf.MAPRED_TASK_ENV));
}
public static void setVMEnv(Map<String, String> env,
List<String> classPaths, String pwd, String containerLogDir,
String nmLdLibraryPath, Task task, CharSequence applicationTokensFile) {
JobConf conf = task.conf;
// Add classpath.
CharSequence cp = env.get("CLASSPATH");
String classpath = StringUtils.join(SYSTEM_PATH_SEPARATOR, classPaths);
if (null == cp) {
env.put("CLASSPATH", classpath);
} else {
env.put("CLASSPATH", classpath + SYSTEM_PATH_SEPARATOR + cp);
}
/////// Environmental variable LD_LIBRARY_PATH
StringBuilder ldLibraryPath = new StringBuilder();
ldLibraryPath.append(nmLdLibraryPath);
ldLibraryPath.append(SYSTEM_PATH_SEPARATOR);
ldLibraryPath.append(pwd);
env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
/////// Environmental variable LD_LIBRARY_PATH
// for the child of task jvm, set hadoop.root.logger
env.put("HADOOP_ROOT_LOGGER", "DEBUG,CLA"); // TODO: Debug
// TODO: The following is useful for instance in streaming tasks. Should be
// set in ApplicationMaster's env by the RM.
String hadoopClientOpts = System.getenv("HADOOP_CLIENT_OPTS");
if (hadoopClientOpts == null) {
hadoopClientOpts = "";
} else {
hadoopClientOpts = hadoopClientOpts + " ";
}
// FIXME: don't think this is also needed given we already set java
// properties.
long logSize = TaskLog.getTaskLogLength(conf);
Vector<String> logProps = new Vector<String>(4);
setupLog4jProperties(logProps, logSize, containerLogDir);
Iterator<String> it = logProps.iterator();
StringBuffer buffer = new StringBuffer();
while (it.hasNext()) {
buffer.append(" " + it.next());
}
hadoopClientOpts = hadoopClientOpts + buffer.toString();
env.put("HADOOP_CLIENT_OPTS", hadoopClientOpts);
// add the env variables passed by the user
String mapredChildEnv = getChildEnv(conf, task.isMapTask());
if (mapredChildEnv != null && mapredChildEnv.length() > 0) {
String childEnvs[] = mapredChildEnv.split(",");
for (String cEnv : childEnvs) {
String[] parts = cEnv.split("="); // split on '='
String value = (String) env.get(parts[0]);
if (value != null) {
// replace $env with the child's env constructed by tt's
// example LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp
value = parts[1].replace("$" + parts[0], value);
} else {
// this key is not configured by the tt for the child .. get it
// from the tt's env
// example PATH=$PATH:/tmp
value = System.getenv(parts[0]); // Get from NM?
if (value != null) {
// the env key is present in the tt's env
value = parts[1].replace("$" + parts[0], value);
} else {
// the env key is note present anywhere .. simply set it
// example X=$X:/tmp or X=/tmp
value = parts[1].replace("$" + parts[0], "");
}
}
env.put(parts[0], value);
}
}
//This should not be set here (If an OS check is requied. moved to ContainerLuanch)
// env.put("JVM_PID", "`echo $$`");
env.put(Constants.STDOUT_LOGFILE_ENV,
getTaskLogFile(containerLogDir, TaskLog.LogName.STDOUT).toString());
env.put(Constants.STDERR_LOGFILE_ENV,
getTaskLogFile(containerLogDir, TaskLog.LogName.STDERR).toString());
}
private static String getChildJavaOpts(JobConf jobConf, boolean isMapTask) {
if (isMapTask) {
return jobConf.get(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, jobConf.get(
JobConf.MAPRED_TASK_JAVA_OPTS,
JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS));
}
return jobConf
.get(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, jobConf.get(
JobConf.MAPRED_TASK_JAVA_OPTS,
JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS));
}
private static void setupLog4jProperties(Vector<String> vargs,
long logSize, String containerLogDir) {
vargs.add("-Dlog4j.configuration=container-log4j.properties");
vargs.add("-Dhadoop.yarn.mr.containerLogDir=" + containerLogDir);
vargs.add("-Dhadoop.yarn.mr.totalLogFileSize=" + logSize);
}
public static List<String> getVMCommand(
InetSocketAddress taskAttemptListenerAddr, Task task, String javaHome,
String workDir, String logDir, String childTmpDir, ID jvmID) {
TaskAttemptID attemptID = task.getTaskID();
JobConf conf = task.conf;
Vector<String> vargs = new Vector<String>(8);
vargs.add("exec");
vargs.add(javaHome + "/bin/java");
// Add child (task) java-vm options.
//
// The following symbols if present in mapred.{map|reduce}.child.java.opts
// value are replaced:
// + @taskid@ is interpolated with value of TaskID.
// Other occurrences of @ will not be altered.
//
// Example with multiple arguments and substitutions, showing
// jvm GC logging, and start of a passwordless JVM JMX agent so can
// connect with jconsole and the likes to watch child memory, threads
// and get thread dumps.
//
// <property>
// <name>mapred.map.child.java.opts</name>
// <value>-Xmx 512M -verbose:gc -Xloggc:/tmp/@taskid@.gc \
// -Dcom.sun.management.jmxremote.authenticate=false \
// -Dcom.sun.management.jmxremote.ssl=false \
// </value>
// </property>
//
// <property>
// <name>mapred.reduce.child.java.opts</name>
// <value>-Xmx 1024M -verbose:gc -Xloggc:/tmp/@taskid@.gc \
// -Dcom.sun.management.jmxremote.authenticate=false \
// -Dcom.sun.management.jmxremote.ssl=false \
// </value>
// </property>
//
String javaOpts = getChildJavaOpts(conf, task.isMapTask());
javaOpts = javaOpts.replace("@taskid@", attemptID.toString());
String [] javaOptsSplit = javaOpts.split(" ");
// Add java.library.path; necessary for loading native libraries.
//
// 1. We add the 'cwd' of the task to it's java.library.path to help
// users distribute native libraries via the DistributedCache.
// 2. The user can also specify extra paths to be added to the
// java.library.path via mapred.{map|reduce}.child.java.opts.
//
String libraryPath = workDir;
boolean hasUserLDPath = false;
for(int i=0; i<javaOptsSplit.length ;i++) {
if(javaOptsSplit[i].startsWith("-Djava.library.path=")) {
// TODO: Does the above take care of escaped space chars
javaOptsSplit[i] += SYSTEM_PATH_SEPARATOR + libraryPath;
hasUserLDPath = true;
break;
}
}
if(!hasUserLDPath) {
vargs.add("-Djava.library.path=" + libraryPath);
}
for (int i = 0; i < javaOptsSplit.length; i++) {
vargs.add(javaOptsSplit[i]);
}
if (childTmpDir != null) {
vargs.add("-Djava.io.tmpdir=" + childTmpDir);
}
// Setup the log4j prop
long logSize = TaskLog.getTaskLogLength(conf);
setupLog4jProperties(vargs, logSize, logDir);
if (conf.getProfileEnabled()) {
if (conf.getProfileTaskRange(task.isMapTask()
).isIncluded(task.getPartition())) {
File prof = getTaskLogFile(logDir, TaskLog.LogName.PROFILE);
vargs.add(String.format(conf.getProfileParams(), prof.toString()));
}
}
// Add main class and its arguments
vargs.add(YarnChild.class.getName()); // main of Child
// pass TaskAttemptListener's address
vargs.add(taskAttemptListenerAddr.getAddress().getHostAddress());
vargs.add(Integer.toString(taskAttemptListenerAddr.getPort()));
vargs.add(attemptID.toString()); // pass task identifier
// Finally add the jvmID
vargs.add(String.valueOf(jvmID.getId()));
vargs.add("1>" + getTaskLogFile(logDir, TaskLog.LogName.STDERR));
vargs.add("2>" + getTaskLogFile(logDir, TaskLog.LogName.STDOUT));
// Final commmand
StringBuilder mergedCommand = new StringBuilder();
for (CharSequence str : vargs) {
mergedCommand.append(str).append(" ");
}
Vector<String> vargsFinal = new Vector<String>(1);
vargsFinal.add(mergedCommand.toString());
return vargsFinal;
}
}