blob: 3c47d5986dfbd250156ffd77ecbfdc9386a10a1b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.mapreduce.hadoop;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import com.google.common.base.Function;
import com.google.common.base.Strings;
import org.apache.tez.common.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobSubmissionFiles;
import org.apache.hadoop.mapreduce.split.JobSplitWriter;
import org.apache.hadoop.mapreduce.split.TezGroupedSplit;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.io.NonSyncDataOutputStream;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
@Public
@Unstable
public class MRInputHelpers {
private static final Logger LOG = LoggerFactory.getLogger(MRInputHelpers.class);
private static final int SPLIT_SERIALIZED_LENGTH_ESTIMATE = 40;
static final String JOB_SPLIT_RESOURCE_NAME = "job.split";
static final String JOB_SPLIT_METAINFO_RESOURCE_NAME = "job.splitmetainfo";
protected MRInputHelpers() {}
/**
* Setup split generation on the client, with splits being distributed via the traditional
* MapReduce mechanism of distributing splits via the Distributed Cache.
* <p/>
* Usage of this technique for handling splits is not advised. Instead, splits should be either
* generated in the AM, or generated in the client and distributed via the AM. See {@link
* org.apache.tez.mapreduce.input.MRInput.MRInputConfigBuilder}
* <p/>
* Note: Attempting to use this method to add multiple Inputs to a Vertex is not supported.
*
* This mechanism of propagating splits may be removed in a subsequent release, and is not recommended.
*
* @param conf configuration to be used by {@link org.apache.tez.mapreduce.input.MRInput}.
* This is expected to be fully configured.
* @param splitsDir the path to which splits will be generated.
* @param useLegacyInput whether to use {@link org.apache.tez.mapreduce.input.MRInputLegacy} or
* {@link org.apache.tez.mapreduce.input.MRInput}
* @return an instance of {@link org.apache.tez.dag.api.DataSourceDescriptor} which can be added
* as a data source to a {@link org.apache.tez.dag.api.Vertex}
*/
@InterfaceStability.Unstable
@InterfaceAudience.LimitedPrivate({"hive, pig"})
public static DataSourceDescriptor configureMRInputWithLegacySplitGeneration(Configuration conf,
Path splitsDir,
boolean useLegacyInput) {
InputSplitInfo inputSplitInfo;
try {
inputSplitInfo = generateInputSplits(conf, splitsDir);
InputDescriptor inputDescriptor = InputDescriptor.create(useLegacyInput ? MRInputLegacy.class
.getName() : MRInput.class.getName())
.setUserPayload(createMRInputPayload(conf, null, false, true));
Map<String, LocalResource> additionalLocalResources = new HashMap<String, LocalResource>();
updateLocalResourcesForInputSplits(conf, inputSplitInfo,
additionalLocalResources);
return DataSourceDescriptor.create(inputDescriptor, null, inputSplitInfo.getNumTasks(),
inputSplitInfo.getCredentials(),
VertexLocationHint.create(inputSplitInfo.getTaskLocationHints()),
additionalLocalResources);
} catch (IOException | InterruptedException | ClassNotFoundException e) {
throw new TezUncheckedException("Failed to generate InputSplits", e);
}
}
/**
* Parse the payload used by MRInputPayload
*
* @param payload the {@link org.apache.tez.dag.api.UserPayload} instance
* @return an instance of {@link org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto},
* which provides access to the underlying configuration bytes
*/
@InterfaceStability.Evolving
@InterfaceAudience.LimitedPrivate({"hive, pig"})
public static MRRuntimeProtos.MRInputUserPayloadProto parseMRInputPayload(UserPayload payload)
throws IOException {
return MRRuntimeProtos.MRInputUserPayloadProto.parseFrom(ByteString.copyFrom(payload.getPayload()));
}
/**
* Create an instance of {@link org.apache.hadoop.mapred.InputSplit} from the {@link
* org.apache.tez.mapreduce.input.MRInput} representation of a split.
*
* @param splitProto The {@link org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto}
* instance representing the split
* @param serializationFactory the serialization mechanism used to write out the split
* @return an instance of the split
*/
@SuppressWarnings("unchecked")
@InterfaceStability.Evolving
@InterfaceAudience.LimitedPrivate({"hive, pig"})
public static InputSplit createOldFormatSplitFromUserPayload(
MRRuntimeProtos.MRSplitProto splitProto, SerializationFactory serializationFactory)
throws IOException {
// This may not need to use serialization factory, since OldFormat
// always uses Writable to write splits.
Objects.requireNonNull(splitProto, "splitProto cannot be null");
String className = splitProto.getSplitClassName();
Class<InputSplit> clazz;
try {
clazz = (Class<InputSplit>) Class.forName(className);
} catch (ClassNotFoundException e) {
throw new IOException("Failed to load InputSplit class: [" + className + "]", e);
}
Deserializer<InputSplit> deserializer = serializationFactory
.getDeserializer(clazz);
deserializer.open(splitProto.getSplitBytes().newInput());
InputSplit inputSplit = deserializer.deserialize(null);
deserializer.close();
return inputSplit;
}
/**
* Create an instance of {@link org.apache.hadoop.mapreduce.InputSplit} from the {@link
* org.apache.tez.mapreduce.input.MRInput} representation of a split.
*
* @param splitProto The {@link org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto}
* instance representing the split
* @param serializationFactory the serialization mechanism used to write out the split
* @return an instance of the split
*/
@InterfaceStability.Evolving
@SuppressWarnings("unchecked")
public static org.apache.hadoop.mapreduce.InputSplit createNewFormatSplitFromUserPayload(
MRRuntimeProtos.MRSplitProto splitProto, SerializationFactory serializationFactory)
throws IOException {
Objects.requireNonNull(splitProto, "splitProto must be specified");
String className = splitProto.getSplitClassName();
Class<org.apache.hadoop.mapreduce.InputSplit> clazz;
try {
clazz = (Class<org.apache.hadoop.mapreduce.InputSplit>) Class
.forName(className);
} catch (ClassNotFoundException e) {
throw new IOException("Failed to load InputSplit class: [" + className + "]", e);
}
Deserializer<org.apache.hadoop.mapreduce.InputSplit> deserializer = serializationFactory
.getDeserializer(clazz);
deserializer.open(splitProto.getSplitBytes().newInput());
org.apache.hadoop.mapreduce.InputSplit inputSplit = deserializer
.deserialize(null);
deserializer.close();
return inputSplit;
}
@InterfaceStability.Evolving
public static <T extends org.apache.hadoop.mapreduce.InputSplit> MRRuntimeProtos.MRSplitProto createSplitProto(
T newSplit, SerializationFactory serializationFactory)
throws IOException {
MRRuntimeProtos.MRSplitProto.Builder builder = MRRuntimeProtos.MRSplitProto
.newBuilder();
builder.setSplitClassName(newSplit.getClass().getName());
@SuppressWarnings("unchecked")
Serializer<T> serializer = serializationFactory
.getSerializer((Class<T>) newSplit.getClass());
ByteString.Output out = ByteString
.newOutput(SPLIT_SERIALIZED_LENGTH_ESTIMATE);
serializer.open(out);
serializer.serialize(newSplit);
// TODO MR Compat: Check against max block locations per split.
ByteString splitBs = out.toByteString();
builder.setSplitBytes(splitBs);
return builder.build();
}
@InterfaceStability.Evolving
@InterfaceAudience.LimitedPrivate({"hive, pig"})
public static MRRuntimeProtos.MRSplitProto createSplitProto(
org.apache.hadoop.mapred.InputSplit oldSplit) throws IOException {
MRRuntimeProtos.MRSplitProto.Builder builder = MRRuntimeProtos.MRSplitProto.newBuilder();
builder.setSplitClassName(oldSplit.getClass().getName());
ByteString.Output os = ByteString
.newOutput(SPLIT_SERIALIZED_LENGTH_ESTIMATE);
oldSplit.write(new NonSyncDataOutputStream(os));
ByteString splitBs = os.toByteString();
builder.setSplitBytes(splitBs);
return builder.build();
}
/**
* Generates Input splits and stores them in a {@link org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto} instance.
*
* Returns an instance of {@link InputSplitInfoMem}
*
* With grouping enabled, the eventual configuration used by the tasks, will have
* the user-specified InputFormat replaced by either {@link org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat}
* or {@link org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat}
*
* @param conf
* an instance of Configuration which is used to determine whether
* the mapred of mapreduce API is being used. This Configuration
* instance should also contain adequate information to be able to
* generate splits - like the InputFormat being used and related
* configuration.
* @param groupSplits whether to group the splits or not
* @param targetTasks the number of target tasks if grouping is enabled. Specify as 0 otherwise.
* @return an instance of {@link InputSplitInfoMem} which supports a subset of
* the APIs defined on {@link InputSplitInfo}
*/
@InterfaceStability.Unstable
@InterfaceAudience.LimitedPrivate({"hive, pig"})
public static InputSplitInfoMem generateInputSplitsToMem(Configuration conf,
boolean groupSplits, int targetTasks)
throws IOException, ClassNotFoundException, InterruptedException {
return generateInputSplitsToMem(conf, groupSplits, true, targetTasks);
}
/**
* Generates Input splits and stores them in a {@link org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto} instance.
*
* Returns an instance of {@link InputSplitInfoMem}
*
* With grouping enabled, the eventual configuration used by the tasks, will have
* the user-specified InputFormat replaced by either {@link org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat}
* or {@link org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat}
*
* @param conf
* an instance of Configuration which is used to determine whether
* the mapred of mapreduce API is being used. This Configuration
* instance should also contain adequate information to be able to
* generate splits - like the InputFormat being used and related
* configuration.
* @param groupSplits whether to group the splits or not
* @param sortSplits whether to sort the splits or not
* @param targetTasks the number of target tasks if grouping is enabled. Specify as 0 otherwise.
* @return an instance of {@link InputSplitInfoMem} which supports a subset of
* the APIs defined on {@link InputSplitInfo}
*/
@InterfaceStability.Unstable
public static InputSplitInfoMem generateInputSplitsToMem(Configuration conf,
boolean groupSplits, boolean sortSplits, int targetTasks)
throws IOException, ClassNotFoundException, InterruptedException {
InputSplitInfoMem splitInfoMem;
JobConf jobConf = new JobConf(conf);
if (jobConf.getUseNewMapper()) {
LOG.debug("Generating mapreduce api input splits");
Job job = Job.getInstance(conf);
org.apache.hadoop.mapreduce.InputSplit[] splits =
generateNewSplits(job, groupSplits, sortSplits, targetTasks);
splitInfoMem = new InputSplitInfoMem(splits, createTaskLocationHintsFromSplits(splits),
splits.length, job.getCredentials(), job.getConfiguration());
} else {
LOG.debug("Generating mapred api input splits");
org.apache.hadoop.mapred.InputSplit[] splits =
generateOldSplits(jobConf, groupSplits, sortSplits, targetTasks);
splitInfoMem = new InputSplitInfoMem(splits, createTaskLocationHintsFromSplits(splits),
splits.length, jobConf.getCredentials(), jobConf);
}
LOG.info("NumSplits: " + splitInfoMem.getNumTasks() + ", SerializedSize: "
+ splitInfoMem.getSplitsProto().getSerializedSize());
return splitInfoMem;
}
private static List<TaskLocationHint> createTaskLocationHintsFromSplits(
org.apache.hadoop.mapreduce.InputSplit[] newFormatSplits) {
Iterable<TaskLocationHint> iterable = Iterables
.transform(Arrays.asList(newFormatSplits),
new Function<org.apache.hadoop.mapreduce.InputSplit, TaskLocationHint>() {
@Override
public TaskLocationHint apply(
org.apache.hadoop.mapreduce.InputSplit input) {
try {
if (input instanceof TezGroupedSplit) {
String rack =
((org.apache.hadoop.mapreduce.split.TezGroupedSplit) input).getRack();
if (rack == null) {
if (input.getLocations() != null) {
return TaskLocationHint.createTaskLocationHint(
new HashSet<>(Arrays.asList(input.getLocations())), null);
} else {
return TaskLocationHint.createTaskLocationHint(null, null);
}
} else {
return TaskLocationHint.createTaskLocationHint(null,
Collections.singleton(rack));
}
} else {
return TaskLocationHint.createTaskLocationHint(
new HashSet<>(Arrays.asList(input.getLocations())), null);
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
return Lists.newArrayList(iterable);
}
private static List<TaskLocationHint> createTaskLocationHintsFromSplits(
org.apache.hadoop.mapred.InputSplit[] oldFormatSplits) {
Iterable<TaskLocationHint> iterable = Iterables.transform(Arrays.asList(oldFormatSplits),
new Function<org.apache.hadoop.mapred.InputSplit, TaskLocationHint>() {
@Override
public TaskLocationHint apply(org.apache.hadoop.mapred.InputSplit input) {
try {
if (input instanceof org.apache.hadoop.mapred.split.TezGroupedSplit) {
String rack = ((org.apache.hadoop.mapred.split.TezGroupedSplit) input).getRack();
if (rack == null) {
if (input.getLocations() != null) {
return TaskLocationHint.createTaskLocationHint(new HashSet<String>(Arrays.asList(
input.getLocations())), null);
} else {
return TaskLocationHint.createTaskLocationHint(null, null);
}
} else {
return TaskLocationHint.createTaskLocationHint(null, Collections.singleton(rack));
}
} else {
return TaskLocationHint.createTaskLocationHint(
new HashSet<>(Arrays.asList(input.getLocations())),
null);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
return Lists.newArrayList(iterable);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
private static org.apache.hadoop.mapreduce.InputSplit[] generateNewSplits(
JobContext jobContext, boolean groupSplits, boolean sortSplits,
int numTasks) throws IOException,
InterruptedException {
Configuration conf = jobContext.getConfiguration();
// This is the real input format.
org.apache.hadoop.mapreduce.InputFormat<?, ?> inputFormat;
try {
inputFormat = ReflectionUtils.newInstance(jobContext.getInputFormatClass(), conf);
} catch (ClassNotFoundException e) {
throw new TezUncheckedException(e);
}
org.apache.hadoop.mapreduce.InputFormat<?, ?> finalInputFormat;
// For grouping, the underlying InputFormatClass class is passed in as a parameter.
// JobContext has this setup as TezGroupedSplitInputFormat
if (groupSplits) {
org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat groupedFormat =
new org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat();
groupedFormat.setConf(conf);
groupedFormat.setInputFormat(inputFormat);
groupedFormat.setDesiredNumberOfSplits(numTasks);
finalInputFormat = groupedFormat;
} else {
finalInputFormat = inputFormat;
}
List<org.apache.hadoop.mapreduce.InputSplit> array = finalInputFormat
.getSplits(jobContext);
org.apache.hadoop.mapreduce.InputSplit[] splits = array
.toArray(new org.apache.hadoop.mapreduce.InputSplit[array.size()]);
if (sortSplits) {
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(splits, new InputSplitComparator());
} else {
Collections.shuffle(Arrays.asList(splits));
}
return splits;
}
@SuppressWarnings({ "rawtypes", "unchecked" })
private static org.apache.hadoop.mapred.InputSplit[] generateOldSplits(
JobConf jobConf, boolean groupSplits, boolean sortSplits, int numTasks)
throws IOException {
// This is the real InputFormat
org.apache.hadoop.mapred.InputFormat inputFormat;
try {
inputFormat = jobConf.getInputFormat();
} catch (Exception e) {
throw new TezUncheckedException(e);
}
org.apache.hadoop.mapred.InputFormat finalInputFormat;
if (groupSplits) {
org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat groupedFormat =
new org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat();
groupedFormat.setConf(jobConf);
groupedFormat.setInputFormat(inputFormat);
groupedFormat.setDesiredNumberOfSplits(numTasks);
finalInputFormat = groupedFormat;
} else {
finalInputFormat = inputFormat;
}
org.apache.hadoop.mapred.InputSplit[] splits = finalInputFormat
.getSplits(jobConf, jobConf.getNumMapTasks());
if (sortSplits) {
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(splits, new OldInputSplitComparator());
}
return splits;
}
/**
* Comparator for org.apache.hadoop.mapreduce.InputSplit
*/
private static class InputSplitComparator
implements Comparator<org.apache.hadoop.mapreduce.InputSplit> {
@Override
public int compare(org.apache.hadoop.mapreduce.InputSplit o1,
org.apache.hadoop.mapreduce.InputSplit o2) {
try {
long len1 = o1.getLength();
long len2 = o2.getLength();
return Long.compare(len2, len1);
} catch (IOException | InterruptedException ie) {
throw new RuntimeException("exception in InputSplit compare", ie);
}
}
}
/**
* Comparator for org.apache.hadoop.mapred.InputSplit
*/
private static class OldInputSplitComparator
implements Comparator<org.apache.hadoop.mapred.InputSplit> {
@Override
public int compare(org.apache.hadoop.mapred.InputSplit o1,
org.apache.hadoop.mapred.InputSplit o2) {
try {
long len1 = o1.getLength();
long len2 = o2.getLength();
return Long.compare(len2, len1);
} catch (IOException ie) {
throw new RuntimeException("Problem getting input split size", ie);
}
}
}
/**
* Generate new-api mapreduce InputFormat splits
* @param jobContext JobContext required by InputFormat
* @param inputSplitDir Directory in which to generate splits information
*
* @return InputSplitInfo containing the split files' information and the
* location hints for each split generated to be used to determining parallelism of
* the map stage.
*/
private static InputSplitInfoDisk writeNewSplits(JobContext jobContext,
Path inputSplitDir) throws IOException, InterruptedException,
ClassNotFoundException {
org.apache.hadoop.mapreduce.InputSplit[] splits =
generateNewSplits(jobContext, false, true, 0);
Configuration conf = jobContext.getConfiguration();
JobSplitWriter.createSplitFiles(inputSplitDir, conf,
inputSplitDir.getFileSystem(conf), splits);
List<TaskLocationHint> locationHints =
new ArrayList<TaskLocationHint>(splits.length);
for (org.apache.hadoop.mapreduce.InputSplit split : splits) {
locationHints.add(
TaskLocationHint.createTaskLocationHint(new HashSet<String>(
Arrays.asList(split.getLocations())), null)
);
}
return new InputSplitInfoDisk(
JobSubmissionFiles.getJobSplitFile(inputSplitDir),
JobSubmissionFiles.getJobSplitMetaFile(inputSplitDir),
splits.length, locationHints, jobContext.getCredentials());
}
/**
* Generate old-api mapred InputFormat splits
* @param jobConf JobConf required by InputFormat class
* @param inputSplitDir Directory in which to generate splits information
*
* @return InputSplitInfo containing the split files' information and the
* number of splits generated to be used to determining parallelism of
* the map stage.
*/
private static InputSplitInfoDisk writeOldSplits(JobConf jobConf,
Path inputSplitDir) throws IOException {
org.apache.hadoop.mapred.InputSplit[] splits =
generateOldSplits(jobConf, false, true, 0);
JobSplitWriter.createSplitFiles(inputSplitDir, jobConf,
inputSplitDir.getFileSystem(jobConf), splits);
List<TaskLocationHint> locationHints =
new ArrayList<>(splits.length);
for (InputSplit split : splits) {
locationHints.add(
TaskLocationHint.createTaskLocationHint(new HashSet<>(
Arrays.asList(split.getLocations())), null)
);
}
return new InputSplitInfoDisk(
JobSubmissionFiles.getJobSplitFile(inputSplitDir),
JobSubmissionFiles.getJobSplitMetaFile(inputSplitDir),
splits.length, locationHints, jobConf.getCredentials());
}
/**
* Helper api to generate splits
* @param conf Configuration with all necessary information set to generate
* splits. The following are required at a minimum:
*
* - mapred.mapper.new-api: determine whether mapred.InputFormat or
* mapreduce.InputFormat is to be used
* - mapred.input.format.class or mapreduce.job.inputformat.class:
* determines the InputFormat class to be used
*
* In addition to this, all the configs needed by the InputFormat class also
* have to be set. For example, FileInputFormat needs the input directory
* paths to be set in the config.
*
* @param inputSplitsDir Directory in which the splits file and meta info file
* will be generated. job.split and job.splitmetainfo files in this directory
* will be overwritten. Should be a fully-qualified path.
*
* @return InputSplitInfo containing the split files' information and the
* number of splits generated to be used to determining parallelism of
* the map stage.
*/
private static InputSplitInfoDisk generateInputSplits(Configuration conf,
Path inputSplitsDir) throws IOException, InterruptedException,
ClassNotFoundException {
Job job = Job.getInstance(conf);
JobConf jobConf = new JobConf(conf);
conf.setBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, false);
if (jobConf.getUseNewMapper()) {
LOG.info("Generating new input splits"
+ ", splitsDir=" + inputSplitsDir.toString());
return writeNewSplits(job, inputSplitsDir);
} else {
LOG.info("Generating old input splits"
+ ", splitsDir=" + inputSplitsDir.toString());
return writeOldSplits(jobConf, inputSplitsDir);
}
}
/**
* Update provided localResources collection with the required local
* resources needed by MapReduce tasks with respect to Input splits.
*
* @param conf Configuration
* @param inputSplitInfo Information on location of split files
* @param localResources LocalResources collection to be updated
*/
private static void updateLocalResourcesForInputSplits(
Configuration conf,
InputSplitInfo inputSplitInfo,
Map<String, LocalResource> localResources) throws IOException {
if (localResources.containsKey(JOB_SPLIT_RESOURCE_NAME)) {
throw new RuntimeException("LocalResources already contains a"
+ " resource named " + JOB_SPLIT_RESOURCE_NAME);
}
if (localResources.containsKey(JOB_SPLIT_METAINFO_RESOURCE_NAME)) {
throw new RuntimeException("LocalResources already contains a"
+ " resource named " + JOB_SPLIT_METAINFO_RESOURCE_NAME);
}
FileSystem splitsFS = inputSplitInfo.getSplitsFile().getFileSystem(conf);
FileStatus splitFileStatus =
splitsFS.getFileStatus(inputSplitInfo.getSplitsFile());
FileStatus metaInfoFileStatus =
splitsFS.getFileStatus(inputSplitInfo.getSplitsMetaInfoFile());
localResources.put(JOB_SPLIT_RESOURCE_NAME,
LocalResource.newInstance(
ConverterUtils.getYarnUrlFromPath(inputSplitInfo.getSplitsFile()),
LocalResourceType.FILE,
LocalResourceVisibility.APPLICATION,
splitFileStatus.getLen(), splitFileStatus.getModificationTime()));
localResources.put(JOB_SPLIT_METAINFO_RESOURCE_NAME,
LocalResource.newInstance(
ConverterUtils.getYarnUrlFromPath(
inputSplitInfo.getSplitsMetaInfoFile()),
LocalResourceType.FILE,
LocalResourceVisibility.APPLICATION,
metaInfoFileStatus.getLen(),
metaInfoFileStatus.getModificationTime()));
}
/**
* Called to specify that grouping of input splits be performed by Tez
* The conf should have the input format class configuration
* set to the TezGroupedSplitsInputFormat. The real input format class name
* should be passed as an argument to this method.
* <p/>
* With grouping enabled, the eventual configuration used by the tasks, will have
* the user-specified InputFormat replaced by either {@link org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat}
* or {@link org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat}
*/
@InterfaceAudience.Private
protected static UserPayload createMRInputPayloadWithGrouping(Configuration conf) throws IOException {
Preconditions
.checkArgument(conf != null, "Configuration must be specified");
return createMRInputPayload(TezUtils.createByteStringFromConf(conf),
null, true, true);
}
@InterfaceAudience.Private
protected static UserPayload createMRInputPayload(Configuration conf,
MRRuntimeProtos.MRSplitsProto mrSplitsProto) throws
IOException {
return createMRInputPayload(conf, mrSplitsProto, false, true);
}
/**
* When isGrouped is true, it specifies that grouping of input splits be
* performed by Tez The conf should have the input format class configuration
* set to the TezGroupedSplitsInputFormat. The real input format class name
* should be passed as an argument to this method.
* <p/>
* With grouping enabled, the eventual configuration used by the tasks, will have
* the user-specified InputFormat replaced by either {@link org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat}
* or {@link org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat}
*/
@InterfaceAudience.Private
protected static UserPayload createMRInputPayload(Configuration conf,
MRRuntimeProtos.MRSplitsProto mrSplitsProto, boolean isGrouped,
boolean isSorted) throws
IOException {
Preconditions
.checkArgument(conf != null, "Configuration must be specified");
return createMRInputPayload(TezUtils.createByteStringFromConf(conf),
mrSplitsProto, isGrouped, isSorted);
}
private static UserPayload createMRInputPayload(ByteString bytes,
MRRuntimeProtos.MRSplitsProto mrSplitsProto,
boolean isGrouped, boolean isSorted) {
MRRuntimeProtos.MRInputUserPayloadProto.Builder userPayloadBuilder =
MRRuntimeProtos.MRInputUserPayloadProto
.newBuilder();
userPayloadBuilder.setConfigurationBytes(bytes);
if (mrSplitsProto != null) {
userPayloadBuilder.setSplits(mrSplitsProto);
}
userPayloadBuilder.setGroupingEnabled(isGrouped);
userPayloadBuilder.setSortSplitsEnabled(isSorted);
return UserPayload.create(userPayloadBuilder.build().
toByteString().asReadOnlyByteBuffer());
}
private static String getStringProperty(Configuration conf, String propertyName) {
Objects.requireNonNull(conf, "Configuration must be provided");
Objects.requireNonNull(propertyName, "Property name must be provided");
return conf.get(propertyName);
}
private static int getIntProperty(Configuration conf, String propertyName) {
return Integer.parseInt(getStringProperty(conf, propertyName));
}
/**
* @see InputContext#getDagIdentifier()
* @param conf configuration instance
* @return dag index
*/
@Public
public static int getDagIndex(Configuration conf) {
return getIntProperty(conf, MRInput.TEZ_MAPREDUCE_DAG_INDEX);
}
/**
* Returns string representation of full DAG identifier
* @param conf configuration instance
* @return dag identifier
*/
@Public
public static String getDagIdString(Configuration conf) {
return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_DAG_ID);
}
/**
* @see InputContext#getTaskVertexIndex
* @param conf configuration instance
* @return vertex index
*/
@Public
public static int getVertexIndex(Configuration conf) {
return getIntProperty(conf, MRInput.TEZ_MAPREDUCE_VERTEX_INDEX);
}
/**
* Returns string representation of full vertex identifier
* @param conf configuration instance
* @return vertex identifier
*/
@Public
public static String getVertexIdString(Configuration conf) {
return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_VERTEX_ID);
}
/**
* @see InputContext#getTaskIndex
* @param conf configuration instance
* @return task index
*/
@Public
public static int getTaskIndex(Configuration conf) {
return getIntProperty(conf, MRInput.TEZ_MAPREDUCE_TASK_INDEX);
}
/**
* Returns string representation of full task identifier
* @param conf configuration instance
* @return task identifier
*/
@Public
public static String getTaskIdString(Configuration conf) {
return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_TASK_ID);
}
/**
* @see InputContext#getTaskAttemptNumber
* @param conf configuration instance
* @return task attempt index
*/
@Public
public static int getTaskAttemptIndex(Configuration conf) {
return getIntProperty(conf, MRInput.TEZ_MAPREDUCE_TASK_ATTEMPT_INDEX);
}
/**
* Returns string representation of full task attempt identifier
* @param conf configuration instance
* @return task attempt identifier
*/
@Public
public static String getTaskAttemptIdString(Configuration conf) {
return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_TASK_ATTEMPT_ID);
}
/**
* @see InputContext#getInputIndex
* @param conf configuration instance
* @return input index
*/
@Public
public static int getInputIndex(Configuration conf) {
return getIntProperty(conf, MRInput.TEZ_MAPREDUCE_INPUT_INDEX);
}
/**
* @see InputContext#getDAGName
* @param conf configuration instance
* @return dag name
*/
@Public
public static String getDagName(Configuration conf) {
return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_DAG_NAME);
}
/**
* @see InputContext#getTaskVertexName
* @param conf configuration instance
* @return vertex name
*/
@Public
public static String getVertexName(Configuration conf) {
return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_VERTEX_NAME);
}
/**
* @see InputContext#getSourceVertexName
* @param conf configuration instance
* @return source name
*/
@Public
public static String getInputName(Configuration conf) {
return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_INPUT_NAME);
}
/**
* @see InputContext#getApplicationId
* @param conf configuration instance
* @return applicationId as a string
*/
@Public
public static String getApplicationIdString(Configuration conf) {
return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_APPLICATION_ID);
}
/**
* @see InputContext#getUniqueIdentifier
* @param conf configuration instance
* @return unique identifier for the input
*/
@Public
public static String getUniqueIdentifier(Configuration conf) {
return getStringProperty(conf, MRInput.TEZ_MAPREDUCE_UNIQUE_IDENTIFIER);
}
/**
* @see InputContext#getDAGAttemptNumber
* @param conf configuration instance
* @return attempt number
*/
@Public
public static int getDagAttemptNumber(Configuration conf) {
return getIntProperty(conf, MRInput.TEZ_MAPREDUCE_DAG_ATTEMPT_NUMBER);
}
public static MRSplitProto getProto(InputDataInformationEvent initEvent, JobConf jobConf) throws IOException {
return Strings.isNullOrEmpty(initEvent.getSerializedPath()) ? readProtoFromPayload(initEvent)
: readProtoFromFs(initEvent, jobConf);
}
private static MRSplitProto readProtoFromFs(InputDataInformationEvent initEvent, JobConf jobConf) throws IOException {
String serializedPath = initEvent.getSerializedPath();
Path filePath = new Path(serializedPath);
LOG.info("Reading InputDataInformationEvent from path: {}", filePath);
MRSplitProto splitProto = null;
FileSystem fs = filePath.getFileSystem(jobConf);
try (FSDataInputStream in = fs.open(filePath)) {
splitProto = MRSplitProto.parseFrom(in);
fs.delete(filePath, false);
}
return splitProto;
}
private static MRSplitProto readProtoFromPayload(InputDataInformationEvent initEvent) throws IOException {
ByteBuffer payload = initEvent.getUserPayload();
LOG.info("Reading InputDataInformationEvent from payload: {}", payload);
return MRSplitProto.parseFrom(ByteString.copyFrom(payload));
}
}