blob: 051bfeecbbc06b38ddac87c5c04860adeb04a929 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.mapreduce.examples;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.TreeMap;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.client.PreWarmContext;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
import org.apache.tez.mapreduce.examples.helpers.SplitsInClientOptionParser;
import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.processor.map.MapProcessor;
import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
import org.apache.tez.runtime.api.TezRootInputInitializer;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.apache.tez.runtime.library.processor.SleepProcessor;
/**
* An MRR job built on top of word count to return words sorted by
* their frequency of occurrence.
*
* Use -DUSE_TEZ_SESSION=true to run jobs in a session mode.
* If multiple input/outputs are provided, this job will process each pair
* as a separate DAG in a sequential manner.
* Use -DINTER_JOB_SLEEP_INTERVAL=<N> where N is the sleep interval in seconds
* between the sequential DAGs.
*/
public class OrderedWordCount extends Configured implements Tool {
private static Log LOG = LogFactory.getLog(OrderedWordCount.class);
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,IntWritable, Text> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(result, key);
}
}
/**
* Shuffle ensures ordering based on count of employees per department
* hence the final reducer is a no-op and just emits the department name
* with the employee count per department.
*/
public static class MyOrderByNoOpReducer
extends Reducer<IntWritable, Text, Text, IntWritable> {
public void reduce(IntWritable key, Iterable<Text> values,
Context context
) throws IOException, InterruptedException {
for (Text word : values) {
context.write(word, key);
}
}
}
private Credentials credentials = new Credentials();
private DAG createDAG(FileSystem fs, Configuration conf,
Map<String, LocalResource> commonLocalResources, Path stagingDir,
int dagIndex, String inputPath, String outputPath,
boolean generateSplitsInClient) throws Exception {
Configuration mapStageConf = new JobConf(conf);
mapStageConf.set(MRJobConfig.MAP_CLASS_ATTR,
TokenizerMapper.class.getName());
if (generateSplitsInClient) {
mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
TextInputFormat.class.getName());
} else {
mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
TezGroupedSplitsInputFormat.class.getName());
}
mapStageConf.set(FileInputFormat.INPUT_DIR, inputPath);
mapStageConf.setBoolean("mapred.mapper.new-api", true);
InputSplitInfo inputSplitInfo = null;
if (generateSplitsInClient) {
inputSplitInfo = MRHelpers.generateInputSplits(mapStageConf, stagingDir);
mapStageConf.setInt(MRJobConfig.NUM_MAPS, inputSplitInfo.getNumTasks());
}
MRHelpers.translateVertexConfToTez(mapStageConf);
Configuration iReduceStageConf = new JobConf(conf);
iReduceStageConf.setInt(MRJobConfig.NUM_REDUCES, 2); // TODO NEWTEZ - NOT NEEDED NOW???
iReduceStageConf.set(MRJobConfig.REDUCE_CLASS_ATTR,
IntSumReducer.class.getName());
iReduceStageConf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
IntWritable.class.getName());
iReduceStageConf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
Text.class.getName());
iReduceStageConf.setBoolean("mapred.mapper.new-api", true);
MRHelpers.translateVertexConfToTez(iReduceStageConf);
Configuration finalReduceConf = new JobConf(conf);
finalReduceConf.setInt(MRJobConfig.NUM_REDUCES, 1);
finalReduceConf.set(MRJobConfig.REDUCE_CLASS_ATTR,
MyOrderByNoOpReducer.class.getName());
finalReduceConf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR,
TextOutputFormat.class.getName());
finalReduceConf.set(FileOutputFormat.OUTDIR, outputPath);
finalReduceConf.setBoolean("mapred.mapper.new-api", true);
MRHelpers.translateVertexConfToTez(finalReduceConf);
MRHelpers.doJobClientMagic(mapStageConf);
MRHelpers.doJobClientMagic(iReduceStageConf);
MRHelpers.doJobClientMagic(finalReduceConf);
List<Vertex> vertices = new ArrayList<Vertex>();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream(4096);
mapStageConf.writeXml(outputStream);
String mapStageHistoryText = new String(outputStream.toByteArray(), "UTF-8");
byte[] mapPayload = MRHelpers.createUserPayloadFromConf(mapStageConf);
byte[] mapInputPayload = MRHelpers.createMRInputPayloadWithGrouping(mapPayload,
TextInputFormat.class.getName());
int numMaps = generateSplitsInClient ? inputSplitInfo.getNumTasks() : -1;
Vertex mapVertex = new Vertex("initialmap", new ProcessorDescriptor(
MapProcessor.class.getName()).setUserPayload(mapPayload)
.setHistoryText(mapStageHistoryText),
numMaps, MRHelpers.getMapResource(mapStageConf));
if (generateSplitsInClient) {
mapVertex.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
Map<String, LocalResource> mapLocalResources =
new HashMap<String, LocalResource>();
mapLocalResources.putAll(commonLocalResources);
MRHelpers.updateLocalResourcesForInputSplits(fs, inputSplitInfo,
mapLocalResources);
mapVertex.setTaskLocalFiles(mapLocalResources);
} else {
mapVertex.setTaskLocalFiles(commonLocalResources);
}
Class<? extends TezRootInputInitializer> initializerClazz = generateSplitsInClient ? null
: MRInputAMSplitGenerator.class;
MRHelpers.addMRInput(mapVertex, mapInputPayload, initializerClazz);
vertices.add(mapVertex);
ByteArrayOutputStream iROutputStream = new ByteArrayOutputStream(4096);
iReduceStageConf.writeXml(iROutputStream);
String iReduceStageHistoryText = new String(iROutputStream.toByteArray(), "UTF-8");
Vertex ivertex = new Vertex("intermediate_reducer", new ProcessorDescriptor(
ReduceProcessor.class.getName())
.setUserPayload(MRHelpers.createUserPayloadFromConf(iReduceStageConf))
.setHistoryText(iReduceStageHistoryText),
2, MRHelpers.getReduceResource(iReduceStageConf));
ivertex.setTaskLocalFiles(commonLocalResources);
vertices.add(ivertex);
ByteArrayOutputStream finalReduceOutputStream = new ByteArrayOutputStream(4096);
finalReduceConf.writeXml(finalReduceOutputStream);
String finalReduceStageHistoryText = new String(finalReduceOutputStream.toByteArray(), "UTF-8");
byte[] finalReducePayload = MRHelpers.createUserPayloadFromConf(finalReduceConf);
Vertex finalReduceVertex = new Vertex("finalreduce",
new ProcessorDescriptor(
ReduceProcessor.class.getName())
.setUserPayload(finalReducePayload)
.setHistoryText(finalReduceStageHistoryText), 1,
MRHelpers.getReduceResource(finalReduceConf));
finalReduceVertex.setTaskLocalFiles(commonLocalResources);
MRHelpers.addMROutputLegacy(finalReduceVertex, finalReducePayload);
vertices.add(finalReduceVertex);
OrderedPartitionedKVEdgeConfigurer edgeConf = OrderedPartitionedKVEdgeConfigurer
.newBuilder(IntWritable.class.getName(), Text.class.getName(),
HashPartitioner.class.getName(), null).configureInput().useLegacyInput().done().build();
DAG dag = new DAG("OrderedWordCount" + dagIndex);
for (int i = 0; i < vertices.size(); ++i) {
dag.addVertex(vertices.get(i));
if (i != 0) {
dag.addEdge(
new Edge(vertices.get(i - 1), vertices.get(i), edgeConf.createDefaultEdgeProperty()));
}
}
return dag;
}
private static void printUsage() {
String options = " [-generateSplitsInClient true/<false>]";
System.err.println("Usage: orderedwordcount <in> <out>" + options);
System.err.println("Usage (In Session Mode):"
+ " orderedwordcount <in1> <out1> ... <inN> <outN>" + options);
ToolRunner.printGenericCommandUsage(System.err);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
boolean generateSplitsInClient;
SplitsInClientOptionParser splitCmdLineParser = new SplitsInClientOptionParser();
try {
generateSplitsInClient = splitCmdLineParser.parse(otherArgs, false);
otherArgs = splitCmdLineParser.getRemainingArgs();
} catch (ParseException e1) {
System.err.println("Invalid options");
printUsage();
return 2;
}
boolean useTezSession = conf.getBoolean("USE_TEZ_SESSION", true);
long interJobSleepTimeout = conf.getInt("INTER_JOB_SLEEP_INTERVAL", 0)
* 1000;
boolean retainStagingDir = conf.getBoolean("RETAIN_STAGING_DIR", false);
if (((otherArgs.length%2) != 0)
|| (!useTezSession && otherArgs.length != 2)) {
printUsage();
return 2;
}
List<String> inputPaths = new ArrayList<String>();
List<String> outputPaths = new ArrayList<String>();
for (int i = 0; i < otherArgs.length; i+=2) {
inputPaths.add(otherArgs[i]);
outputPaths.add(otherArgs[i+1]);
}
UserGroupInformation.setConfiguration(conf);
TezConfiguration tezConf = new TezConfiguration(conf);
OrderedWordCount instance = new OrderedWordCount();
FileSystem fs = FileSystem.get(conf);
String stagingDirStr = conf.get(TezConfiguration.TEZ_AM_STAGING_DIR,
TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT) + Path.SEPARATOR +
Long.toString(System.currentTimeMillis());
Path stagingDir = new Path(stagingDirStr);
FileSystem pathFs = stagingDir.getFileSystem(tezConf);
pathFs.mkdirs(new Path(stagingDirStr));
tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirStr);
stagingDir = pathFs.makeQualified(new Path(stagingDirStr));
TokenCache.obtainTokensForNamenodes(instance.credentials, new Path[] {stagingDir}, conf);
TezClientUtils.ensureStagingDirExists(tezConf, stagingDir);
// No need to add jar containing this class as assumed to be part of
// the tez jars.
// TEZ-674 Obtain tokens based on the Input / Output paths. For now assuming staging dir
// is the same filesystem as the one used for Input/Output.
if (useTezSession) {
LOG.info("Creating Tez Session");
tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
} else {
tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, false);
}
TezClient tezSession = new TezClient("OrderedWordCountSession", tezConf,
null, instance.credentials);
tezSession.start();
DAGStatus dagStatus = null;
DAGClient dagClient = null;
String[] vNames = { "initialmap", "intermediate_reducer",
"finalreduce" };
Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
try {
for (int dagIndex = 1; dagIndex <= inputPaths.size(); ++dagIndex) {
if (dagIndex != 1
&& interJobSleepTimeout > 0) {
try {
LOG.info("Sleeping between jobs, sleepInterval="
+ (interJobSleepTimeout/1000));
Thread.sleep(interJobSleepTimeout);
} catch (InterruptedException e) {
LOG.info("Main thread interrupted. Breaking out of job loop");
break;
}
}
String inputPath = inputPaths.get(dagIndex-1);
String outputPath = outputPaths.get(dagIndex-1);
if (fs.exists(new Path(outputPath))) {
throw new FileAlreadyExistsException("Output directory "
+ outputPath + " already exists");
}
LOG.info("Running OrderedWordCount DAG"
+ ", dagIndex=" + dagIndex
+ ", inputPath=" + inputPath
+ ", outputPath=" + outputPath);
Map<String, LocalResource> localResources =
new TreeMap<String, LocalResource>();
DAG dag = instance.createDAG(fs, conf, localResources,
stagingDir, dagIndex, inputPath, outputPath,
generateSplitsInClient);
boolean doPreWarm = dagIndex == 1 && useTezSession
&& conf.getBoolean("PRE_WARM_SESSION", true);
int preWarmNumContainers = 0;
if (doPreWarm) {
preWarmNumContainers = conf.getInt("PRE_WARM_NUM_CONTAINERS", 0);
if (preWarmNumContainers <= 0) {
doPreWarm = false;
}
}
if (doPreWarm) {
LOG.info("Pre-warming Session");
VertexLocationHint vertexLocationHint =
new VertexLocationHint(null);
ProcessorDescriptor sleepProcDescriptor =
new ProcessorDescriptor(SleepProcessor.class.getName());
SleepProcessor.SleepProcessorConfig sleepProcessorConfig =
new SleepProcessor.SleepProcessorConfig(4000);
sleepProcDescriptor.setUserPayload(
sleepProcessorConfig.toUserPayload());
PreWarmContext context = new PreWarmContext(sleepProcDescriptor,
dag.getVertex("initialmap").getTaskResource(), preWarmNumContainers,
vertexLocationHint);
Map<String, LocalResource> contextLocalRsrcs =
new TreeMap<String, LocalResource>();
contextLocalRsrcs.putAll(
dag.getVertex("initialmap").getTaskLocalFiles());
Map<String, String> contextEnv = new TreeMap<String, String>();
contextEnv.putAll(dag.getVertex("initialmap").getTaskEnvironment());
String contextJavaOpts =
dag.getVertex("initialmap").getTaskLaunchCmdOpts();
context
.setLocalResources(contextLocalRsrcs)
.setJavaOpts(contextJavaOpts)
.setEnvironment(contextEnv);
tezSession.preWarm(context);
}
if (useTezSession) {
LOG.info("Waiting for TezSession to get into ready state");
waitForTezSessionReady(tezSession);
LOG.info("Submitting DAG to Tez Session, dagIndex=" + dagIndex);
dagClient = tezSession.submitDAG(dag);
LOG.info("Submitted DAG to Tez Session, dagIndex=" + dagIndex);
} else {
LOG.info("Submitting DAG as a new Tez Application");
dagClient = tezSession.submitDAG(dag);
}
while (true) {
dagStatus = dagClient.getDAGStatus(statusGetOpts);
if (dagStatus.getState() == DAGStatus.State.RUNNING ||
dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
dagStatus.getState() == DAGStatus.State.FAILED ||
dagStatus.getState() == DAGStatus.State.KILLED ||
dagStatus.getState() == DAGStatus.State.ERROR) {
break;
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// continue;
}
}
while (dagStatus.getState() != DAGStatus.State.SUCCEEDED &&
dagStatus.getState() != DAGStatus.State.FAILED &&
dagStatus.getState() != DAGStatus.State.KILLED &&
dagStatus.getState() != DAGStatus.State.ERROR) {
if (dagStatus.getState() == DAGStatus.State.RUNNING) {
ExampleDriver.printDAGStatus(dagClient, vNames);
}
try {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// continue;
}
dagStatus = dagClient.getDAGStatus(statusGetOpts);
} catch (TezException e) {
LOG.fatal("Failed to get application progress. Exiting");
return -1;
}
}
ExampleDriver.printDAGStatus(dagClient, vNames,
true, true);
LOG.info("DAG " + dagIndex + " completed. "
+ "FinalState=" + dagStatus.getState());
if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
LOG.info("DAG " + dagIndex + " diagnostics: "
+ dagStatus.getDiagnostics());
}
}
} catch (Exception e) {
LOG.error("Error occurred when submitting/running DAGs", e);
throw e;
} finally {
if (!retainStagingDir) {
pathFs.delete(stagingDir, true);
}
LOG.info("Shutting down session");
tezSession.stop();
}
if (!useTezSession) {
ExampleDriver.printDAGStatus(dagClient, vNames);
LOG.info("Application completed. " + "FinalState=" + dagStatus.getState());
return dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1;
}
return 0;
}
private static void waitForTezSessionReady(TezClient tezSession)
throws IOException, TezException {
tezSession.waitTillReady();
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new OrderedWordCount(), args);
System.exit(res);
}
}