blob: 58443e8eb1e8bacd773a53e9a805688147e2bda9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.mapreduce.examples;
import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.client.AMConfiguration;
import org.apache.tez.client.TezClient;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.client.TezSession;
import org.apache.tez.client.TezSessionConfiguration;
import org.apache.tez.client.TezSessionStatus;
import org.apache.tez.dag.api.GroupInputEdge;
import org.apache.tez.dag.api.VertexGroup;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.mapreduce.committer.MROutputCommitter;
import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalIOProcessor;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.TezProcessorContext;
import org.apache.tez.runtime.api.TezRootInputInitializer;
import org.apache.tez.runtime.library.api.KeyValueReader;
import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.api.KeyValuesReader;
import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValuesInput;
import org.apache.tez.runtime.library.input.ShuffledMergedInput;
import org.apache.tez.runtime.library.output.OnFileSortedOutput;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
public class UnionExample {
public static class TokenProcessor implements LogicalIOProcessor {
TezProcessorContext context;
IntWritable one = new IntWritable(1);
Text word = new Text();
@Override
public void initialize(TezProcessorContext processorContext)
throws Exception {
this.context = processorContext;
}
@Override
public void handleEvents(List<Event> processorEvents) {
}
@Override
public void close() throws Exception {
}
@Override
public void run(Map<String, LogicalInput> inputs,
Map<String, LogicalOutput> outputs) throws Exception {
Preconditions.checkArgument(inputs.size() == 1);
for (LogicalInput input : inputs.values()) {
input.start();
}
for (LogicalOutput output : outputs.values()) {
output.start();
}
boolean inUnion = true;
if (context.getTaskVertexName().equals("map3")) {
inUnion = false;
}
Preconditions.checkArgument(outputs.size() == (inUnion ? 2 : 1));
Preconditions.checkArgument(outputs.containsKey("checker"));
MRInput input = (MRInput) inputs.values().iterator().next();
KeyValueReader kvReader = input.getReader();
OnFileSortedOutput output = (OnFileSortedOutput) outputs.get("checker");
KeyValueWriter kvWriter = output.getWriter();
MROutput parts = null;
KeyValueWriter partsWriter = null;
if (inUnion) {
parts = (MROutput) outputs.get("parts");
partsWriter = parts.getWriter();
}
while (kvReader.next()) {
StringTokenizer itr = new StringTokenizer(kvReader.getCurrentValue().toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
kvWriter.write(word, one);
if (inUnion) {
partsWriter.write(word, one);
}
}
}
if (inUnion) {
if (parts.isCommitRequired()) {
while (!context.canCommit()) {
Thread.sleep(100);
}
parts.commit();
}
}
}
}
public static class UnionProcessor implements LogicalIOProcessor {
TezProcessorContext context;
IntWritable one = new IntWritable(1);
@Override
public void initialize(TezProcessorContext processorContext)
throws Exception {
this.context = processorContext;
}
@Override
public void handleEvents(List<Event> processorEvents) {
}
@Override
public void close() throws Exception {
}
@Override
public void run(Map<String, LogicalInput> inputs,
Map<String, LogicalOutput> outputs) throws Exception {
Preconditions.checkArgument(inputs.size() == 2);
Preconditions.checkArgument(outputs.size() == 2);
for (LogicalInput input : inputs.values()) {
input.start();
}
for (LogicalOutput output : outputs.values()) {
output.start();
}
MROutput out = (MROutput) outputs.get("union");
MROutput allParts = (MROutput) outputs.get("all-parts");
KeyValueWriter kvWriter = out.getWriter();
KeyValueWriter partsWriter = allParts.getWriter();
Map<String, AtomicInteger> unionKv = Maps.newHashMap();
LogicalInput union = inputs.get("union");
KeyValuesReader kvReader = (KeyValuesReader) union.getReader();
while (kvReader.next()) {
String word = ((Text) kvReader.getCurrentKey()).toString();
IntWritable intVal = (IntWritable) kvReader.getCurrentValues().iterator().next();
for (int i=0; i<intVal.get(); ++i) {
partsWriter.write(word, one);
}
AtomicInteger value = unionKv.get(word);
if (value == null) {
unionKv.put(word, new AtomicInteger(intVal.get()));
} else {
value.addAndGet(intVal.get());
}
}
LogicalInput map3 = inputs.get("map3");
kvReader = (KeyValuesReader) map3.getReader();
while (kvReader.next()) {
String word = ((Text) kvReader.getCurrentKey()).toString();
IntWritable intVal = (IntWritable) kvReader.getCurrentValues().iterator().next();
AtomicInteger value = unionKv.get(word);
if (value == null) {
throw new TezUncheckedException("Expected to exist: " + word);
} else {
value.getAndAdd(intVal.get()*-2);
}
}
for (AtomicInteger value : unionKv.values()) {
if (value.get() != 0) {
throw new TezUncheckedException("Unexpected non-zero value");
}
}
kvWriter.write("Union", new IntWritable(unionKv.size()));
if (out.isCommitRequired()) {
while (!context.canCommit()) {
Thread.sleep(100);
}
out.commit();
}
if (allParts.isCommitRequired()) {
while (!context.canCommit()) {
Thread.sleep(100);
}
allParts.commit();
}
}
}
private DAG createDAG(FileSystem fs, TezConfiguration tezConf,
Map<String, LocalResource> localResources, Path stagingDir,
String inputPath, String outputPath) throws IOException {
Configuration mapStageConf = new JobConf((Configuration)tezConf);
mapStageConf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
Text.class.getName());
mapStageConf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
IntWritable.class.getName());
mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
TezGroupedSplitsInputFormat.class.getName());
mapStageConf.set(FileInputFormat.INPUT_DIR, inputPath);
mapStageConf.setBoolean("mapred.mapper.new-api", true);
MultiStageMRConfToTezTranslator.translateVertexConfToTez(mapStageConf,
null);
Configuration finalReduceConf = new JobConf((Configuration)tezConf);
finalReduceConf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
Text.class.getName());
finalReduceConf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
IntWritable.class.getName());
finalReduceConf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR,
TextOutputFormat.class.getName());
finalReduceConf.set(FileOutputFormat.OUTDIR, outputPath);
finalReduceConf.setBoolean("mapred.mapper.new-api", true);
MultiStageMRConfToTezTranslator.translateVertexConfToTez(finalReduceConf,
mapStageConf);
MRHelpers.doJobClientMagic(mapStageConf);
MRHelpers.doJobClientMagic(finalReduceConf);
byte[] mapPayload = MRHelpers.createUserPayloadFromConf(mapStageConf);
byte[] mapInputPayload = MRHelpers.createMRInputPayloadWithGrouping(mapPayload,
TextInputFormat.class.getName());
int numMaps = -1;
Vertex mapVertex1 = new Vertex("map1", new ProcessorDescriptor(
TokenProcessor.class.getName()),
numMaps, MRHelpers.getMapResource(mapStageConf));
mapVertex1.setJavaOpts(MRHelpers.getMapJavaOpts(mapStageConf));
Map<String, String> mapEnv = new HashMap<String, String>();
MRHelpers.updateEnvironmentForMRTasks(mapStageConf, mapEnv, true);
mapVertex1.setTaskEnvironment(mapEnv);
Class<? extends TezRootInputInitializer> initializerClazz = MRInputAMSplitGenerator.class;
InputDescriptor id = new InputDescriptor(MRInput.class.getName()).
setUserPayload(mapInputPayload);
mapVertex1.addInput("MRInput", id, initializerClazz);
Vertex mapVertex2 = new Vertex("map2", new ProcessorDescriptor(
TokenProcessor.class.getName()),
numMaps, MRHelpers.getMapResource(mapStageConf));
mapVertex2.setJavaOpts(MRHelpers.getMapJavaOpts(mapStageConf));
MRHelpers.updateEnvironmentForMRTasks(mapStageConf, mapEnv, true);
mapVertex2.setTaskEnvironment(mapEnv);
mapVertex2.addInput("MRInput", id, initializerClazz);
Vertex mapVertex3 = new Vertex("map3", new ProcessorDescriptor(
TokenProcessor.class.getName()),
numMaps, MRHelpers.getMapResource(mapStageConf));
mapVertex3.setJavaOpts(MRHelpers.getMapJavaOpts(mapStageConf));
MRHelpers.updateEnvironmentForMRTasks(mapStageConf, mapEnv, true);
mapVertex3.setTaskEnvironment(mapEnv);
mapVertex3.addInput("MRInput", id, initializerClazz);
byte[] finalReducePayload = MRHelpers.createUserPayloadFromConf(finalReduceConf);
Vertex checkerVertex = new Vertex("checker",
new ProcessorDescriptor(
UnionProcessor.class.getName()).setUserPayload(finalReducePayload),
1, MRHelpers.getReduceResource(finalReduceConf));
checkerVertex.setJavaOpts(
MRHelpers.getReduceJavaOpts(finalReduceConf));
Map<String, String> reduceEnv = new HashMap<String, String>();
MRHelpers.updateEnvironmentForMRTasks(finalReduceConf, reduceEnv, false);
checkerVertex.setTaskEnvironment(reduceEnv);
OutputDescriptor od = new OutputDescriptor(MROutput.class.getName())
.setUserPayload(finalReducePayload);
checkerVertex.addOutput("union", od, MROutputCommitter.class);
Configuration partsConf = new Configuration(finalReduceConf);
partsConf.set(FileOutputFormat.OUTDIR, outputPath+"-parts");
byte[] partsPayload = MRHelpers.createUserPayloadFromConf(partsConf);
DAG dag = new DAG("UnionExample");
VertexGroup unionVertex = dag.createVertexGroup("union", mapVertex1, mapVertex2);
OutputDescriptor od1 = new OutputDescriptor(MROutput.class.getName())
.setUserPayload(partsPayload);
Configuration allPartsConf = new Configuration(finalReduceConf);
allPartsConf.set(FileOutputFormat.OUTDIR, outputPath+"-all-parts");
byte[] allPartsPayload = MRHelpers.createUserPayloadFromConf(allPartsConf);
OutputDescriptor od2 = new OutputDescriptor(MROutput.class.getName())
.setUserPayload(allPartsPayload);
unionVertex.addOutput("parts", od1, MROutputCommitter.class);
checkerVertex.addOutput("all-parts", od2, MROutputCommitter.class);
dag.addVertex(mapVertex1)
.addVertex(mapVertex2)
.addVertex(mapVertex3)
.addVertex(checkerVertex)
.addEdge(
new Edge(mapVertex3, checkerVertex, new EdgeProperty(
DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
SchedulingType.SEQUENTIAL,
new OutputDescriptor(OnFileSortedOutput.class.getName())
.setUserPayload(mapPayload),
new InputDescriptor(ShuffledMergedInput.class.getName())
.setUserPayload(finalReducePayload))))
.addEdge(
new GroupInputEdge(unionVertex, checkerVertex, new EdgeProperty(
DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
SchedulingType.SEQUENTIAL,
new OutputDescriptor(OnFileSortedOutput.class.getName())
.setUserPayload(mapPayload),
new InputDescriptor(ShuffledMergedInput.class.getName())
.setUserPayload(finalReducePayload)),
new InputDescriptor(
ConcatenatedMergedKeyValuesInput.class.getName())));
return dag;
}
private static void waitForTezSessionReady(TezSession tezSession)
throws IOException, TezException {
while (true) {
TezSessionStatus status = tezSession.getSessionStatus();
if (status.equals(TezSessionStatus.SHUTDOWN)) {
throw new RuntimeException("TezSession has already shutdown");
}
if (status.equals(TezSessionStatus.READY)) {
return;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
return;
}
}
}
private static void printUsage() {
System.err.println("Usage: " + " unionexample <in1> <out1>");
}
private Credentials credentials = new Credentials();
public boolean run(String inputPath, String outputPath, Configuration conf) throws Exception {
System.out.println("Running UnionExample");
// conf and UGI
TezConfiguration tezConf;
if (conf != null) {
tezConf = new TezConfiguration(conf);
} else {
tezConf = new TezConfiguration();
}
UserGroupInformation.setConfiguration(tezConf);
String user = UserGroupInformation.getCurrentUser().getShortUserName();
TezClient tezClient = new TezClient(tezConf);
ApplicationId appId = tezClient.createApplication();
// staging dir
FileSystem fs = FileSystem.get(tezConf);
String stagingDirStr = Path.SEPARATOR + "user" + Path.SEPARATOR
+ user + Path.SEPARATOR+ ".staging" + Path.SEPARATOR
+ Path.SEPARATOR + appId.toString();
Path stagingDir = new Path(stagingDirStr);
tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirStr);
stagingDir = fs.makeQualified(stagingDir);
// security
TokenCache.obtainTokensForNamenodes(credentials, new Path[] {stagingDir}, tezConf);
TezClientUtils.ensureStagingDirExists(tezConf, stagingDir);
tezConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS,
MRHelpers.getMRAMJavaOpts(tezConf));
// No need to add jar containing this class as assumed to be part of
// the tez jars.
// TEZ-674 Obtain tokens based on the Input / Output paths. For now assuming staging dir
// is the same filesystem as the one used for Input/Output.
TezSession tezSession = null;
AMConfiguration amConfig = new AMConfiguration(null,
null, tezConf, credentials);
TezSessionConfiguration sessionConfig =
new TezSessionConfiguration(amConfig, tezConf);
tezSession = new TezSession("UnionExampleSession", appId,
sessionConfig);
tezSession.start();
DAGStatus dagStatus = null;
DAGClient dagClient = null;
String[] vNames = { "map1", "map2", "map3", "checker" };
Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
try {
if (fs.exists(new Path(outputPath))) {
throw new FileAlreadyExistsException("Output directory "
+ outputPath + " already exists");
}
Map<String, LocalResource> localResources =
new TreeMap<String, LocalResource>();
DAG dag = createDAG(fs, tezConf, localResources,
stagingDir, inputPath, outputPath);
waitForTezSessionReady(tezSession);
dagClient = tezSession.submitDAG(dag);
//dagClient = tezClient.submitDAGApplication(dag, amConfig);
// monitoring
while (true) {
dagStatus = dagClient.getDAGStatus(statusGetOpts);
if(dagStatus.getState() == DAGStatus.State.RUNNING ||
dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
dagStatus.getState() == DAGStatus.State.FAILED ||
dagStatus.getState() == DAGStatus.State.KILLED ||
dagStatus.getState() == DAGStatus.State.ERROR) {
break;
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// continue;
}
}
while (dagStatus.getState() == DAGStatus.State.RUNNING) {
try {
ExampleDriver.printDAGStatus(dagClient, vNames);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// continue;
}
dagStatus = dagClient.getDAGStatus(statusGetOpts);
} catch (TezException e) {
System.exit(-1);
}
}
ExampleDriver.printDAGStatus(dagClient, vNames,
true, true);
System.out.println("DAG completed. " + "FinalState=" + dagStatus.getState());
if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
System.out.println("DAG diagnostics: " + dagStatus.getDiagnostics());
return false;
}
return true;
} finally {
fs.delete(stagingDir, true);
tezSession.stop();
}
}
public static void main(String[] args) throws Exception {
if ((args.length%2) != 0) {
printUsage();
System.exit(2);
}
UnionExample job = new UnionExample();
job.run(args[0], args[1], null);
}
}