blob: c39ca4a1f0326ad14321017f3b3212c853082e28 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.mapreduce.hadoop;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Vector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobSubmissionFiles;
import org.apache.hadoop.mapreduce.split.JobSplitWriter;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.ContainerLogAppender;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
import org.apache.tez.mapreduce.combine.MRCombiner;
import org.apache.tez.mapreduce.partition.MRPartitioner;
public class MRHelpers {
private static final Log LOG = LogFactory.getLog(MRHelpers.class);
static final String JOB_SPLIT_RESOURCE_NAME = "job.split";
static final String JOB_SPLIT_METAINFO_RESOURCE_NAME =
"job.splitmetainfo";
/**
* Comparator for org.apache.hadoop.mapreduce.InputSplit
*/
private static class InputSplitComparator
implements Comparator<org.apache.hadoop.mapreduce.InputSplit> {
@Override
public int compare(org.apache.hadoop.mapreduce.InputSplit o1,
org.apache.hadoop.mapreduce.InputSplit o2) {
try {
long len1 = o1.getLength();
long len2 = o2.getLength();
if (len1 < len2) {
return 1;
} else if (len1 == len2) {
return 0;
} else {
return -1;
}
} catch (IOException ie) {
throw new RuntimeException("exception in InputSplit compare", ie);
} catch (InterruptedException ie) {
throw new RuntimeException("exception in InputSplit compare", ie);
}
}
}
/**
* Comparator for org.apache.hadoop.mapred.InputSplit
*/
private static class OldInputSplitComparator
implements Comparator<org.apache.hadoop.mapred.InputSplit> {
@Override
public int compare(org.apache.hadoop.mapred.InputSplit o1,
org.apache.hadoop.mapred.InputSplit o2) {
try {
long len1 = o1.getLength();
long len2 = o2.getLength();
if (len1 < len2) {
return 1;
} else if (len1 == len2) {
return 0;
} else {
return -1;
}
} catch (IOException ie) {
throw new RuntimeException("Problem getting input split size", ie);
}
}
}
/**
* Generate new-api mapreduce InputFormat splits
* @param jobContext JobContext required by InputFormat
* @param inputSplitDir Directory in which to generate splits information
*
* @return InputSplitInfo containing the split files' information and the
* location hints for each split generated to be used to determining parallelism of
* the map stage.
*
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
private static InputSplitInfo writeNewSplits(JobContext jobContext,
Path inputSplitDir) throws IOException, InterruptedException,
ClassNotFoundException {
Configuration conf = jobContext.getConfiguration();
InputFormat<?, ?> input =
ReflectionUtils.newInstance(jobContext.getInputFormatClass(), conf);
List<org.apache.hadoop.mapreduce.InputSplit> array =
input.getSplits(jobContext);
org.apache.hadoop.mapreduce.InputSplit[] splits =
(org.apache.hadoop.mapreduce.InputSplit[])
array.toArray(
new org.apache.hadoop.mapreduce.InputSplit[array.size()]);
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(splits, new InputSplitComparator());
JobSplitWriter.createSplitFiles(inputSplitDir, conf,
inputSplitDir.getFileSystem(conf), splits);
List<TaskLocationHint> locationHints =
new ArrayList<TaskLocationHint>(splits.length);
for (int i = 0; i < splits.length; ++i) {
locationHints.add(
new TaskLocationHint(new HashSet<String>(
Arrays.asList(splits[i].getLocations())), null));
}
return new InputSplitInfo(
JobSubmissionFiles.getJobSplitFile(inputSplitDir),
JobSubmissionFiles.getJobSplitMetaFile(inputSplitDir),
splits.length, locationHints);
}
/**
* Generate old-api mapred InputFormat splits
* @param jobConf JobConf required by InputFormat class
* @param inputSplitDir Directory in which to generate splits information
*
* @return InputSplitInfo containing the split files' information and the
* number of splits generated to be used to determining parallelism of
* the map stage.
*
* @throws IOException
*/
private static InputSplitInfo writeOldSplits(JobConf jobConf,
Path inputSplitDir) throws IOException {
org.apache.hadoop.mapred.InputSplit[] splits =
jobConf.getInputFormat().getSplits(jobConf, jobConf.getNumMapTasks());
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(splits, new OldInputSplitComparator());
JobSplitWriter.createSplitFiles(inputSplitDir, jobConf,
inputSplitDir.getFileSystem(jobConf), splits);
List<TaskLocationHint> locationHints =
new ArrayList<TaskLocationHint>(splits.length);
for (int i = 0; i < splits.length; ++i) {
locationHints.add(
new TaskLocationHint(new HashSet<String>(
Arrays.asList(splits[i].getLocations())), null));
}
return new InputSplitInfo(
JobSubmissionFiles.getJobSplitFile(inputSplitDir),
JobSubmissionFiles.getJobSplitMetaFile(inputSplitDir),
splits.length, locationHints);
}
/**
* Helper api to generate splits
* @param conf Configuration with all necessary information set to generate
* splits. The following are required at a minimum:
*
* - mapred.mapper.new-api: determine whether mapred.InputFormat or
* mapreduce.InputFormat is to be used
* - mapred.input.format.class or mapreduce.job.inputformat.class:
* determines the InputFormat class to be used
*
* In addition to this, all the configs needed by the InputFormat class also
* have to be set. For example, FileInputFormat needs the input directory
* paths to be set in the config.
*
* @param inputSplitsDir Directory in which the splits file and meta info file
* will be generated. job.split and job.splitmetainfo files in this directory
* will be overwritten. Should be a fully-qualified path.
*
* @return InputSplitInfo containing the split files' information and the
* number of splits generated to be used to determining parallelism of
* the map stage.
*
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
public static InputSplitInfo generateInputSplits(Configuration conf,
Path inputSplitsDir) throws IOException, InterruptedException,
ClassNotFoundException {
Job job = Job.getInstance(conf);
JobConf jobConf = new JobConf(conf);
if (jobConf.getUseNewMapper()) {
LOG.info("Generating new input splits"
+ ", splitsDir=" + inputSplitsDir.toString());
return writeNewSplits(job, inputSplitsDir);
} else {
LOG.info("Generating old input splits"
+ ", splitsDir=" + inputSplitsDir.toString());
return writeOldSplits(jobConf, inputSplitsDir);
}
}
private static String getChildLogLevel(Configuration conf, boolean isMap) {
if (isMap) {
return conf.get(
MRJobConfig.MAP_LOG_LEVEL,
JobConf.DEFAULT_LOG_LEVEL.toString()
);
} else {
return conf.get(
MRJobConfig.REDUCE_LOG_LEVEL,
JobConf.DEFAULT_LOG_LEVEL.toString()
);
}
}
private static String getLog4jCmdLineProperties(Configuration conf,
boolean isMap) {
Vector<String> logProps = new Vector<String>(4);
addLog4jSystemProperties(getChildLogLevel(conf, isMap), logProps);
StringBuilder sb = new StringBuilder();
for (String str : logProps) {
sb.append(str).append(" ");
}
return sb.toString();
}
/**
* Add the JVM system properties necessary to configure
* {@link ContainerLogAppender}.
*
* @param logLevel
* the desired log level (eg INFO/WARN/DEBUG)
* @param vargs
* the argument list to append to
*/
private static void addLog4jSystemProperties(String logLevel,
List<String> vargs) {
vargs.add("-Dlog4j.configuration="
+ TezConfiguration.TEZ_CONTAINER_LOG4J_PROPERTIES_FILE);
vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "="
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR);
vargs.add("-D" + TezConfiguration.TEZ_ROOT_LOGGER_NAME + "=" + logLevel
+ "," + TezConfiguration.TEZ_CONTAINER_LOGGER_NAME);
}
/**
* Generate JVM options to be used to launch map tasks
*
* Uses mapreduce.admin.map.child.java.opts, mapreduce.map.java.opts and
* mapreduce.map.log.level from config to generate the opts.
*
* @param conf Configuration to be used to extract JVM opts specific info
* @return JAVA_OPTS string to be used in launching the JVM
*/
@SuppressWarnings("deprecation")
public static String getMapJavaOpts(Configuration conf) {
String adminOpts = conf.get(
MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS);
String userOpts = conf.get(
MRJobConfig.MAP_JAVA_OPTS,
conf.get(
JobConf.MAPRED_TASK_JAVA_OPTS,
JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS));
return adminOpts.trim() + " " + userOpts.trim() + " "
+ getLog4jCmdLineProperties(conf, true);
}
/**
* Generate JVM options to be used to launch reduce tasks
*
* Uses mapreduce.admin.reduce.child.java.opts, mapreduce.reduce.java.opts
* and mapreduce.reduce.log.level from config to generate the opts.
*
* @param conf Configuration to be used to extract JVM opts specific info
* @return JAVA_OPTS string to be used in launching the JVM
*/
@SuppressWarnings("deprecation")
public static String getReduceJavaOpts(Configuration conf) {
String adminOpts = conf.get(
MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS);
String userOpts = conf.get(
MRJobConfig.REDUCE_JAVA_OPTS,
conf.get(
JobConf.MAPRED_TASK_JAVA_OPTS,
JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS));
return adminOpts.trim() + " " + userOpts.trim() + " "
+ getLog4jCmdLineProperties(conf, false);
}
/**
* Sets up parameters which used to be set by the MR JobClient. Includes
* setting whether to use the new api or the old api. Note: Must be called
* before generating InputSplits
*
* @param conf
* configuration for the vertex.
*/
public static void doJobClientMagic(Configuration conf) throws IOException {
setUseNewAPI(conf);
// TODO Maybe add functionality to check output specifications - e.g. fail
// early if the output directory exists.
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
String submitHostAddress = ip.getHostAddress();
String submitHostName = ip.getHostName();
conf.set(MRJobConfig.JOB_SUBMITHOST, submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR, submitHostAddress);
}
// conf.set("hadoop.http.filter.initializers",
// "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
// Skipping setting JOB_DIR - not used by AM.
// Maybe generate SHUFFLE secret. The AM uses the job token generated in
// the AM anyway.
// TODO eventually ACLs
conf.set(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS, MRPartitioner.class.getName());
boolean useNewApi = conf.getBoolean("mapred.mapper.new-api", false);
if (useNewApi) {
if (conf.get(MRJobConfig.COMBINE_CLASS_ATTR) != null) {
conf.set(TezJobConfig.TEZ_RUNTIME_COMBINER_CLASS, MRCombiner.class.getName());
}
} else {
if (conf.get("mapred.combiner.class") != null) {
conf.set(TezJobConfig.TEZ_RUNTIME_COMBINER_CLASS, MRCombiner.class.getName());
}
}
setWorkingDirectory(conf);
}
private static void setWorkingDirectory(Configuration conf) {
String name = conf.get(JobContext.WORKING_DIR);
if (name == null) {
try {
Path dir = FileSystem.get(conf).getWorkingDirectory();
conf.set(JobContext.WORKING_DIR, dir.toString());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
/**
* Default to the new APIs unless they are explicitly set or the old mapper or
* reduce attributes are used.
*
* @throws IOException
* if the configuration is inconsistant
*/
private static void setUseNewAPI(Configuration conf) throws IOException {
int numReduces = conf.getInt(MRJobConfig.NUM_REDUCES, 1);
String oldMapperClass = "mapred.mapper.class";
String oldReduceClass = "mapred.reducer.class";
conf.setBooleanIfUnset("mapred.mapper.new-api",
conf.get(oldMapperClass) == null);
if (conf.getBoolean("mapred.mapper.new-api", false)) {
String mode = "new map API";
ensureNotSet(conf, "mapred.input.format.class", mode);
ensureNotSet(conf, oldMapperClass, mode);
if (numReduces != 0) {
ensureNotSet(conf, "mapred.partitioner.class", mode);
} else {
ensureNotSet(conf, "mapred.output.format.class", mode);
}
} else {
String mode = "map compatability";
ensureNotSet(conf, MRJobConfig.INPUT_FORMAT_CLASS_ATTR, mode);
ensureNotSet(conf, MRJobConfig.MAP_CLASS_ATTR, mode);
if (numReduces != 0) {
ensureNotSet(conf, MRJobConfig.PARTITIONER_CLASS_ATTR, mode);
} else {
ensureNotSet(conf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, mode);
}
}
if (numReduces != 0) {
conf.setBooleanIfUnset("mapred.reducer.new-api",
conf.get(oldReduceClass) == null);
if (conf.getBoolean("mapred.reducer.new-api", false)) {
String mode = "new reduce API";
ensureNotSet(conf, "mapred.output.format.class", mode);
ensureNotSet(conf, oldReduceClass, mode);
} else {
String mode = "reduce compatability";
ensureNotSet(conf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, mode);
ensureNotSet(conf, MRJobConfig.REDUCE_CLASS_ATTR, mode);
}
}
}
private static void ensureNotSet(Configuration conf, String attr, String msg)
throws IOException {
if (conf.get(attr) != null) {
throw new IOException(attr + " is incompatible with " + msg + " mode.");
}
}
@LimitedPrivate("Hive, Pig")
@Unstable
public static byte[] createUserPayloadFromConf(Configuration conf)
throws IOException {
return TezUtils.createUserPayloadFromConf(conf);
}
@LimitedPrivate("Hive, Pig")
@Unstable
public static Configuration createConfFromUserPayload(byte[] bb)
throws IOException {
return TezUtils.createConfFromUserPayload(bb);
}
/**
* Update provided localResources collection with the required local
* resources needed by MapReduce tasks with respect to Input splits.
*
* @param fs Filesystem instance to access status of splits related files
* @param inputSplitInfo Information on location of split files
* @param localResources LocalResources collection to be updated
* @throws IOException
*/
public static void updateLocalResourcesForInputSplits(
FileSystem fs,
InputSplitInfo inputSplitInfo,
Map<String, LocalResource> localResources) throws IOException {
if (localResources.containsKey(JOB_SPLIT_RESOURCE_NAME)) {
throw new RuntimeException("LocalResources already contains a"
+ " resource named " + JOB_SPLIT_RESOURCE_NAME);
}
if (localResources.containsKey(JOB_SPLIT_METAINFO_RESOURCE_NAME)) {
throw new RuntimeException("LocalResources already contains a"
+ " resource named " + JOB_SPLIT_METAINFO_RESOURCE_NAME);
}
FileStatus splitFileStatus =
fs.getFileStatus(inputSplitInfo.getSplitsFile());
FileStatus metaInfoFileStatus =
fs.getFileStatus(inputSplitInfo.getSplitsMetaInfoFile());
localResources.put(JOB_SPLIT_RESOURCE_NAME,
LocalResource.newInstance(
ConverterUtils.getYarnUrlFromPath(inputSplitInfo.getSplitsFile()),
LocalResourceType.FILE,
LocalResourceVisibility.APPLICATION,
splitFileStatus.getLen(), splitFileStatus.getModificationTime()));
localResources.put(JOB_SPLIT_METAINFO_RESOURCE_NAME,
LocalResource.newInstance(
ConverterUtils.getYarnUrlFromPath(
inputSplitInfo.getSplitsMetaInfoFile()),
LocalResourceType.FILE,
LocalResourceVisibility.APPLICATION,
metaInfoFileStatus.getLen(),
metaInfoFileStatus.getModificationTime()));
}
/**
* Extract the map task's container resource requirements from the
* provided configuration.
*
* Uses mapreduce.map.memory.mb and mapreduce.map.cpu.vcores from the
* provided configuration.
*
* @param conf Configuration with MR specific settings used to extract
* information from
*
* @return Resource object used to define requirements for containers
* running Map tasks
*/
public static Resource getMapResource(Configuration conf) {
return Resource.newInstance(conf.getInt(
MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB),
conf.getInt(MRJobConfig.MAP_CPU_VCORES,
MRJobConfig.DEFAULT_MAP_CPU_VCORES));
}
/**
* Extract the reduce task's container resource requirements from the
* provided configuration.
*
* Uses mapreduce.reduce.memory.mb and mapreduce.reduce.cpu.vcores from the
* provided configuration.
*
* @param conf Configuration with MR specific settings used to extract
* information from
*
* @return Resource object used to define requirements for containers
* running Reduce tasks
*/
public static Resource getReduceResource(Configuration conf) {
return Resource.newInstance(conf.getInt(
MRJobConfig.REDUCE_MEMORY_MB, MRJobConfig.DEFAULT_REDUCE_MEMORY_MB),
conf.getInt(MRJobConfig.REDUCE_CPU_VCORES,
MRJobConfig.DEFAULT_REDUCE_CPU_VCORES));
}
/**
* Setup classpath and other environment variables
* @param conf Configuration to retrieve settings from
* @param environment Environment to update
* @param isMap Whether task is a map or reduce task
*/
public static void updateEnvironmentForMRTasks(Configuration conf,
Map<String, String> environment, boolean isMap) {
boolean isMiniCluster =
conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false);
if (isMiniCluster) {
Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
System.getProperty("java.class.path"));
}
// TEZ jars and deps will be localized by the TezClient submission layer
// Assumption is that MR is also in TEZ dependencies
Apps.addToEnvironment(environment,
Environment.CLASSPATH.name(),
Environment.PWD.$());
// Add YARN/COMMON/HDFS jars to path
if (!isMiniCluster) {
for (String c : conf.getStrings(
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
c.trim());
}
}
Apps.addToEnvironment(environment,
Environment.CLASSPATH.name(),
Environment.PWD.$() + File.separator + "*");
// Shell
environment.put(Environment.SHELL.name(), conf.get(
MRJobConfig.MAPRED_ADMIN_USER_SHELL, MRJobConfig.DEFAULT_SHELL));
// Add pwd to LD_LIBRARY_PATH, add this before adding anything else
Apps.addToEnvironment(environment, Environment.LD_LIBRARY_PATH.name(),
Environment.PWD.$());
// Add the env variables passed by the admin
Apps.setEnvFromInputString(environment, conf.get(
MRJobConfig.MAPRED_ADMIN_USER_ENV,
MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV));
// Add the env variables passed by the user
String mapredChildEnv = (isMap ?
conf.get(MRJobConfig.MAP_ENV, "")
: conf.get(MRJobConfig.REDUCE_ENV, ""));
Apps.setEnvFromInputString(environment, mapredChildEnv);
// Set logging level in the environment.
environment.put(
"HADOOP_ROOT_LOGGER",
getChildLogLevel(conf, isMap) + ",CLA");
}
private static Configuration getBaseJobConf(Configuration conf) {
if (conf != null) {
return new JobConf(conf);
} else {
return new JobConf();
}
}
/**
* Get default initialize JobConf-based configuration
* @param conf Conf to initialize JobConf with.
* @return Base configuration for MR-based jobs
*/
public static Configuration getBaseMRConfiguration(Configuration conf) {
return getBaseJobConf(conf);
}
/**
* Get default initialize JobConf-based configuration
* @return Base configuration for MR-based jobs
*/
public static Configuration getBaseMRConfiguration() {
return getBaseJobConf(null);
}
/**
* Setup environment for the AM based on MR-based configuration
* @param conf Configuration from which to extract information
* @param environment Environment map to update
*/
public static void updateEnvironmentForMRAM(Configuration conf,
Map<String, String> environment) {
MRApps.setEnvFromInputString(environment,
conf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV));
MRApps.setEnvFromInputString(environment,
conf.get(MRJobConfig.MR_AM_ENV));
}
/**
* Extract Java Opts for the AM based on MR-based configuration
* @param conf Configuration from which to extract information
* @return Java opts for the AM
*/
public static String getMRAMJavaOpts(Configuration conf) {
// Admin opts
String mrAppMasterAdminOptions = conf.get(
MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS);
// Add AM user command opts
String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS);
return mrAppMasterAdminOptions.trim()
+ " " + mrAppMasterUserOptions.trim();
}
}