blob: 74d25c5a40ea75742912951f179959ddda307ab3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.tez.util;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobSubmissionFiles;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.hadoop.InputSplitInfoDisk;
import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
@InterfaceAudience.Private
public class MRToTezHelper {
private static final Log LOG = LogFactory.getLog(MRToTezHelper.class);
private static final String JOB_SPLIT_RESOURCE_NAME = MRJobConfig.JOB_SPLIT;
private static final String JOB_SPLIT_METAINFO_RESOURCE_NAME = MRJobConfig.JOB_SPLIT_METAINFO;
private static Map<String, String> mrAMParamToTezAMParamMap = new HashMap<String, String>();
private static Map<String, String> mrMapParamToTezVertexParamMap = new HashMap<String, String>();
private static Map<String, String> mrReduceParamToTezVertexParamMap = new HashMap<String, String>();
private static List<String> mrSettingsToRetain = new ArrayList<String>();
private static List<String> mrSettingsToRemove = new ArrayList<String>();
private MRToTezHelper() {
}
static {
populateMRToTezParamsMap();
populateMRSettingsToRetain();
populateMRSettingsToRemove();
}
private static void populateMRToTezParamsMap() {
//AM settings
mrAMParamToTezAMParamMap.put(MRJobConfig.MR_AM_VMEM_MB, TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB);
mrAMParamToTezAMParamMap.put(MRJobConfig.MR_AM_CPU_VCORES, TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES);
mrAMParamToTezAMParamMap.put(MRJobConfig.MR_AM_MAX_ATTEMPTS, TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS);
mrAMParamToTezAMParamMap.put(MRConfiguration.JOB_CREDENTIALS_BINARY, TezConfiguration.TEZ_CREDENTIALS_PATH);
mrAMParamToTezAMParamMap.put(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, TezConfiguration.TEZ_CANCEL_DELEGATION_TOKENS_ON_COMPLETION);
// Map settings for Scope.VERTEX
mrMapParamToTezVertexParamMap.put(MRJobConfig.MAP_MAX_ATTEMPTS, TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS);
mrMapParamToTezVertexParamMap.put(MRJobConfig.MAP_SPECULATIVE, TezConfiguration.TEZ_AM_SPECULATION_ENABLED);
mrMapParamToTezVertexParamMap.put(MRJobConfig.MAP_LOG_LEVEL, TezConfiguration.TEZ_TASK_LOG_LEVEL);
// TezConfiguration.TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY TEZ-2914 in Tez 0.8
mrMapParamToTezVertexParamMap.put("mapreduce.job.running.map.limit", "tez.am.vertex.max-task-concurrency");
// TezConfiguration.TEZ_VERTEX_FAILURES_MAXPERCENT TEZ-3271 in Tez 0.8.4
mrMapParamToTezVertexParamMap.put(MRJobConfig.MAP_FAILURES_MAX_PERCENT, "tez.vertex.failures.maxpercent");
// Reduce settings for Scope.VERTEX
mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_MAX_ATTEMPTS, TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS);
mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_SPECULATIVE, TezConfiguration.TEZ_AM_SPECULATION_ENABLED);
mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_LOG_LEVEL, TezConfiguration.TEZ_TASK_LOG_LEVEL);
mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.reduce.limit", "tez.am.vertex.max-task-concurrency");
mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, "tez.vertex.failures.maxpercent");
try {
Class.forName("org.apache.tez.common.ProgressHelper");
// TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS TEZ-808 in Tez 0.8
mrMapParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, "tez.task.progress.stuck.interval-ms");
mrReduceParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, "tez.task.progress.stuck.interval-ms");
}
catch (ClassNotFoundException e) {
// Not translating before Tez 0.8.5 due to TEZ-3549
}
}
private static void populateMRSettingsToRetain() {
// FileInputFormat
mrSettingsToRetain.add(FileInputFormat.INPUT_DIR);
mrSettingsToRetain.add(FileInputFormat.SPLIT_MAXSIZE);
mrSettingsToRetain.add(FileInputFormat.SPLIT_MINSIZE);
mrSettingsToRetain.add(FileInputFormat.PATHFILTER_CLASS);
mrSettingsToRetain.add(FileInputFormat.NUM_INPUT_FILES);
mrSettingsToRetain.add(FileInputFormat.INPUT_DIR_RECURSIVE);
// FileOutputFormat
mrSettingsToRetain.add(MRConfiguration.OUTPUT_BASENAME);
mrSettingsToRetain.add(FileOutputFormat.COMPRESS);
mrSettingsToRetain.add(FileOutputFormat.COMPRESS_CODEC);
mrSettingsToRetain.add(FileOutputFormat.COMPRESS_TYPE);
mrSettingsToRetain.add(FileOutputFormat.OUTDIR);
mrSettingsToRetain.add(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER);
}
private static void populateMRSettingsToRemove() {
// FileInputFormat.listStatus() on a task can cause job failure when run from Oozie
mrSettingsToRemove.add(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY);
mrSettingsToRemove.add(MRJobConfig.CACHE_ARCHIVES);
mrSettingsToRemove.add(MRJobConfig.CACHE_ARCHIVES_SIZES);
mrSettingsToRemove.add(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS);
mrSettingsToRemove.add(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES);
mrSettingsToRemove.add(MRJobConfig.CACHE_FILES);
mrSettingsToRemove.add(MRJobConfig.CACHE_FILES_SIZES);
mrSettingsToRemove.add(MRJobConfig.CACHE_FILE_TIMESTAMPS);
mrSettingsToRemove.add(MRJobConfig.CACHE_FILE_VISIBILITIES);
mrSettingsToRemove.add(MRJobConfig.CLASSPATH_FILES);
}
private static void removeUnwantedSettings(Configuration tezConf, boolean isAMConf) {
// It is good to clean up as much of the unapplicable settings as possible.
// Tez has configs set on multiple places AM, DAG, Vertex, VertexManager
// Plugin, Tasks (Processor, Edge, every input and output, combiner)
// If conf size is bigger, it places heavy pressurce on AM memory and is
// inefficient while sending over RPC to tasks
for (String mrSetting : mrSettingsToRemove) {
tezConf.unset(mrSetting);
}
Iterator<Entry<String, String>> iter = new Configuration(tezConf).iterator();
while (iter.hasNext()) {
String key = iter.next().getKey();
if (!isAMConf) {
// Keep the setting in AM conf to be able to connect back to the
// Oozie launcher job and look at the parameter values passed,
// but get rid of for others
if (key.startsWith("oozie.")) {
tezConf.unset(key);
continue;
}
}
if (key.startsWith("yarn.nodemanager")) {
tezConf.unset(key);
} else if (key.startsWith("mapreduce.jobhistory")) {
tezConf.unset(key);
} else if (key.startsWith("mapreduce.jobtracker")) {
tezConf.unset(key);
} else if (key.startsWith("mapreduce.tasktracker")) {
tezConf.unset(key);
}
}
}
public static void translateMRSettingsForTezAM(TezConfiguration dagAMConf) {
convertMRToTezConf(dagAMConf, dagAMConf, DeprecatedKeys.getMRToDAGParamMap());
convertMRToTezConf(dagAMConf, dagAMConf, mrAMParamToTezAMParamMap);
String env = dagAMConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV);
if (dagAMConf.get(MRJobConfig.MR_AM_ENV) != null) {
env = (env == null) ? dagAMConf.get(MRJobConfig.MR_AM_ENV)
: env + "," + dagAMConf.get(MRJobConfig.MR_AM_ENV);
}
if (env != null) {
dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_LAUNCH_ENV, env);
}
dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
org.apache.tez.mapreduce.hadoop.MRHelpers
.getJavaOptsForMRAM(dagAMConf));
String queueName = dagAMConf.get(JobContext.QUEUE_NAME,
YarnConfiguration.DEFAULT_QUEUE_NAME);
dagAMConf.setIfUnset(TezConfiguration.TEZ_QUEUE_NAME, queueName);
dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_VIEW_ACLS,
dagAMConf.get(MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_MODIFY_ACLS,
dagAMConf.get(MRJobConfig.JOB_ACL_MODIFY_JOB, MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));
// Hardcoding at AM level instead of setting per vertex till TEZ-2710 is available
dagAMConf.setIfUnset(TezConfiguration.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION, "0.5");
removeUnwantedSettings(dagAMConf, true);
}
/**
* Set config with Scope.Vertex in TezConfiguration on the vertex
*
* @param vertex Vertex on which config is to be set
* @param isMapVertex Whether map or reduce vertex. i.e root or intermediate/leaf vertex
* @param conf Config that contains the tez or equivalent mapreduce settings.
*/
public static void setVertexConfig(Vertex vertex, boolean isMapVertex,
Configuration conf) {
Map<String, String> configMapping = isMapVertex ? mrMapParamToTezVertexParamMap
: mrReduceParamToTezVertexParamMap;
for (Entry<String, String> dep : configMapping.entrySet()) {
String value = conf.get(dep.getValue(), conf.get(dep.getKey()));
if (value != null) {
vertex.setConf(dep.getValue(), value);
LOG.debug("Setting " + dep.getValue() + " to " + value
+ " for the vertex " + vertex.getName());
}
}
}
/**
* Process the mapreduce configuration settings and
* - copy as is the still required ones (like those used by FileInputFormat/FileOutputFormat)
* - convert and set equivalent tez runtime settings
* - handle compression related settings
*
* @param tezConf Configuration on which the mapreduce settings will have to be transferred
* @param mrConf Configuration that contains mapreduce settings
*/
public static void processMRSettings(Configuration tezConf, Configuration mrConf) {
for (String mrSetting : mrSettingsToRetain) {
if (mrConf.get(mrSetting) != null) {
tezConf.set(mrSetting, mrConf.get(mrSetting));
}
}
JobControlCompiler.configureCompression(tezConf);
convertMRToTezConf(tezConf, mrConf, DeprecatedKeys.getMRToTezRuntimeParamMap());
removeUnwantedSettings(tezConf, false);
// ShuffleVertexManager Plugin settings
// DeprecatedKeys.getMRToTezRuntimeParamMap() only translates min and not max
String slowStartFraction = mrConf.get(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART);
if (slowStartFraction != null) {
tezConf.setIfUnset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, slowStartFraction);
tezConf.setIfUnset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, slowStartFraction);
}
}
/**
* Convert MR settings to Tez settings and set on conf.
*
* @param tezConf Configuration on which MR equivalent Tez settings should be set
* @param mrConf Configuration that contains MR settings
* @param mrToTezConfigMapping Mapping of MR config to equivalent Tez config
*/
private static void convertMRToTezConf(Configuration tezConf, Configuration mrConf, Map<String, String> mrToTezConfigMapping) {
for (Entry<String, String> dep : mrToTezConfigMapping.entrySet()) {
if (mrConf.get(dep.getKey()) != null) {
if (tezConf.get(dep.getValue()) == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Setting " + dep.getValue() + " to "
+ mrConf.get(dep.getKey()) + " from MR setting "
+ dep.getKey());
}
tezConf.set(dep.getValue(), mrConf.get(dep.getKey()));
}
tezConf.unset(dep.getKey());
}
}
}
/**
* Write input splits (job.split and job.splitmetainfo) to disk. It uses already
* serialized splits from given MRSplitsProto
* @param infoMem
* @param inputSplitsDir
* @param jobConf
* @param fs
* @param splitsProto MRSplitsProto containing already serialized splits
* @return
* @throws IOException
* @throws InterruptedException
*/
public static InputSplitInfoDisk writeInputSplitInfoToDisk(
InputSplitInfoMem infoMem, Path inputSplitsDir, JobConf jobConf,
FileSystem fs, MRSplitsProto splitsProto) throws IOException, InterruptedException {
InputSplit[] splits = infoMem.getNewFormatSplits();
TezJobSplitWriter.createSplitFiles(inputSplitsDir, jobConf, fs, splits, splitsProto);
return new InputSplitInfoDisk(
JobSubmissionFiles.getJobSplitFile(inputSplitsDir),
JobSubmissionFiles.getJobSplitMetaFile(inputSplitsDir),
splits.length, infoMem.getTaskLocationHints(),
jobConf.getCredentials());
}
/**
* Exact copy of private method from from org.apache.tez.mapreduce.hadoop.MRInputHelpers
*
* Update provided localResources collection with the required local
* resources needed by MapReduce tasks with respect to Input splits.
*
* @param fs Filesystem instance to access status of splits related files
* @param inputSplitInfo Information on location of split files
* @param localResources LocalResources collection to be updated
* @throws IOException
*/
public static void updateLocalResourcesForInputSplits(
FileSystem fs,
InputSplitInfo inputSplitInfo,
Map<String, LocalResource> localResources) throws IOException {
if (localResources.containsKey(JOB_SPLIT_RESOURCE_NAME)) {
throw new RuntimeException("LocalResources already contains a"
+ " resource named " + JOB_SPLIT_RESOURCE_NAME);
}
if (localResources.containsKey(JOB_SPLIT_METAINFO_RESOURCE_NAME)) {
throw new RuntimeException("LocalResources already contains a"
+ " resource named " + JOB_SPLIT_METAINFO_RESOURCE_NAME);
}
FileStatus splitFileStatus =
fs.getFileStatus(inputSplitInfo.getSplitsFile());
FileStatus metaInfoFileStatus =
fs.getFileStatus(inputSplitInfo.getSplitsMetaInfoFile());
localResources.put(JOB_SPLIT_RESOURCE_NAME,
LocalResource.newInstance(
ConverterUtils.getYarnUrlFromPath(inputSplitInfo.getSplitsFile()),
LocalResourceType.FILE,
LocalResourceVisibility.APPLICATION,
splitFileStatus.getLen(), splitFileStatus.getModificationTime()));
localResources.put(JOB_SPLIT_METAINFO_RESOURCE_NAME,
LocalResource.newInstance(
ConverterUtils.getYarnUrlFromPath(
inputSplitInfo.getSplitsMetaInfoFile()),
LocalResourceType.FILE,
LocalResourceVisibility.APPLICATION,
metaInfoFileStatus.getLen(),
metaInfoFileStatus.getModificationTime()));
}
}