blob: 375c69fd0caf906162a509692bad91a53247224a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Vector;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.TaskLog.LogName;
import org.apache.hadoop.mapreduce.ID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Apps;
@SuppressWarnings("deprecation")
public class MapReduceChildJVM {
private static String getTaskLogFile(LogName filter) {
return ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR +
filter.toString();
}
private static String getChildEnv(JobConf jobConf, boolean isMap) {
if (isMap) {
return jobConf.get(JobConf.MAPRED_MAP_TASK_ENV,
jobConf.get(JobConf.MAPRED_TASK_ENV));
}
return jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV,
jobConf.get(JobConf.MAPRED_TASK_ENV));
}
private static String getChildLogLevel(JobConf conf, boolean isMap) {
if (isMap) {
return conf.get(
MRJobConfig.MAP_LOG_LEVEL,
JobConf.DEFAULT_LOG_LEVEL.toString()
);
} else {
return conf.get(
MRJobConfig.REDUCE_LOG_LEVEL,
JobConf.DEFAULT_LOG_LEVEL.toString()
);
}
}
public static void setVMEnv(Map<String, String> environment,
Task task) {
JobConf conf = task.conf;
// Add the env variables passed by the user
String mapredChildEnv = getChildEnv(conf, task.isMapTask());
Apps.setEnvFromInputString(environment, mapredChildEnv);
// Set logging level in the environment.
// This is so that, if the child forks another "bin/hadoop" (common in
// streaming) it will have the correct loglevel.
environment.put(
"HADOOP_ROOT_LOGGER",
getChildLogLevel(conf, task.isMapTask()) + ",CLA");
// TODO: The following is useful for instance in streaming tasks. Should be
// set in ApplicationMaster's env by the RM.
String hadoopClientOpts = System.getenv("HADOOP_CLIENT_OPTS");
if (hadoopClientOpts == null) {
hadoopClientOpts = "";
} else {
hadoopClientOpts = hadoopClientOpts + " ";
}
// FIXME: don't think this is also needed given we already set java
// properties.
long logSize = TaskLog.getTaskLogLength(conf);
Vector<String> logProps = new Vector<String>(4);
setupLog4jProperties(task, logProps, logSize);
Iterator<String> it = logProps.iterator();
StringBuffer buffer = new StringBuffer();
while (it.hasNext()) {
buffer.append(" " + it.next());
}
hadoopClientOpts = hadoopClientOpts + buffer.toString();
environment.put("HADOOP_CLIENT_OPTS", hadoopClientOpts);
// Add stdout/stderr env
environment.put(
MRJobConfig.STDOUT_LOGFILE_ENV,
getTaskLogFile(TaskLog.LogName.STDOUT)
);
environment.put(
MRJobConfig.STDERR_LOGFILE_ENV,
getTaskLogFile(TaskLog.LogName.STDERR)
);
environment.put(MRJobConfig.APPLICATION_ATTEMPT_ID_ENV,
conf.get(MRJobConfig.APPLICATION_ATTEMPT_ID).toString());
}
private static String getChildJavaOpts(JobConf jobConf, boolean isMapTask) {
String userClasspath = "";
String adminClasspath = "";
if (isMapTask) {
userClasspath =
jobConf.get(
JobConf.MAPRED_MAP_TASK_JAVA_OPTS,
jobConf.get(
JobConf.MAPRED_TASK_JAVA_OPTS,
JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS)
);
adminClasspath =
jobConf.get(
MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS);
} else {
userClasspath =
jobConf.get(
JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS,
jobConf.get(
JobConf.MAPRED_TASK_JAVA_OPTS,
JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS)
);
adminClasspath =
jobConf.get(
MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS);
}
// Add admin classpath first so it can be overridden by user.
return adminClasspath + " " + userClasspath;
}
private static void setupLog4jProperties(Task task,
Vector<String> vargs,
long logSize) {
String logLevel = getChildLogLevel(task.conf, task.isMapTask());
MRApps.addLog4jSystemProperties(logLevel, logSize, vargs);
}
public static List<String> getVMCommand(
InetSocketAddress taskAttemptListenerAddr, Task task,
ID jvmID) {
TaskAttemptID attemptID = task.getTaskID();
JobConf conf = task.conf;
Vector<String> vargs = new Vector<String>(8);
vargs.add("exec");
vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
// Add child (task) java-vm options.
//
// The following symbols if present in mapred.{map|reduce}.child.java.opts
// value are replaced:
// + @taskid@ is interpolated with value of TaskID.
// Other occurrences of @ will not be altered.
//
// Example with multiple arguments and substitutions, showing
// jvm GC logging, and start of a passwordless JVM JMX agent so can
// connect with jconsole and the likes to watch child memory, threads
// and get thread dumps.
//
// <property>
// <name>mapred.map.child.java.opts</name>
// <value>-Xmx 512M -verbose:gc -Xloggc:/tmp/@taskid@.gc \
// -Dcom.sun.management.jmxremote.authenticate=false \
// -Dcom.sun.management.jmxremote.ssl=false \
// </value>
// </property>
//
// <property>
// <name>mapred.reduce.child.java.opts</name>
// <value>-Xmx 1024M -verbose:gc -Xloggc:/tmp/@taskid@.gc \
// -Dcom.sun.management.jmxremote.authenticate=false \
// -Dcom.sun.management.jmxremote.ssl=false \
// </value>
// </property>
//
String javaOpts = getChildJavaOpts(conf, task.isMapTask());
javaOpts = javaOpts.replace("@taskid@", attemptID.toString());
String [] javaOptsSplit = javaOpts.split(" ");
for (int i = 0; i < javaOptsSplit.length; i++) {
vargs.add(javaOptsSplit[i]);
}
Path childTmpDir = new Path(Environment.PWD.$(),
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
vargs.add("-Djava.io.tmpdir=" + childTmpDir);
// Setup the log4j prop
long logSize = TaskLog.getTaskLogLength(conf);
setupLog4jProperties(task, vargs, logSize);
if (conf.getProfileEnabled()) {
if (conf.getProfileTaskRange(task.isMapTask()
).isIncluded(task.getPartition())) {
vargs.add(
String.format(
conf.getProfileParams(),
getTaskLogFile(TaskLog.LogName.PROFILE)
)
);
if (task.isMapTask()) {
vargs.add(conf.get(MRJobConfig.TASK_MAP_PROFILE_PARAMS, ""));
}
else {
vargs.add(conf.get(MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, ""));
}
}
}
// Add main class and its arguments
vargs.add(YarnChild.class.getName()); // main of Child
// pass TaskAttemptListener's address
vargs.add(taskAttemptListenerAddr.getAddress().getHostAddress());
vargs.add(Integer.toString(taskAttemptListenerAddr.getPort()));
vargs.add(attemptID.toString()); // pass task identifier
// Finally add the jvmID
vargs.add(String.valueOf(jvmID.getId()));
vargs.add("1>" + getTaskLogFile(TaskLog.LogName.STDOUT));
vargs.add("2>" + getTaskLogFile(TaskLog.LogName.STDERR));
// Final commmand
StringBuilder mergedCommand = new StringBuilder();
for (CharSequence str : vargs) {
mergedCommand.append(str).append(" ");
}
Vector<String> vargsFinal = new Vector<String>(1);
vargsFinal.add(mergedCommand.toString());
return vargsFinal;
}
}