blob: 12fd943afa9460e56cf91ee44941465772b2346c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.examples;
import java.io.IOException;
import org.apache.tez.client.CallerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSinkDescriptor;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.examples.WordCount.TokenProcessor;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.api.KeyValuesReader;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.apache.tez.runtime.library.processor.SimpleProcessor;
import org.apache.tez.common.Preconditions;
/**
* Simple example that extends the WordCount example to show a chain of processing.
* The example extends WordCount by sorting the words by their count.
*/
public class OrderedWordCount extends TezExampleBase {
private static String INPUT = WordCount.INPUT;
private static String OUTPUT = WordCount.OUTPUT;
private static String TOKENIZER = WordCount.TOKENIZER;
private static String SUMMATION = WordCount.SUMMATION;
private static String SORTER = "Sorter";
private static final Logger LOG = LoggerFactory.getLogger(OrderedWordCount.class);
/*
* SumProcessor similar to WordCount except that it writes the count as key and the
* word as value. This is because we can and ordered partitioned key value edge to group the
* words with the same count (as key) and order the counts.
*/
public static class SumProcessor extends SimpleProcessor {
public SumProcessor(ProcessorContext context) {
super(context);
}
@Override
public void run() throws Exception {
Preconditions.checkArgument(getInputs().size() == 1);
Preconditions.checkArgument(getOutputs().size() == 1);
// the recommended approach is to cast the reader/writer to a specific type instead
// of casting the input/output. This allows the actual input/output type to be replaced
// without affecting the semantic guarantees of the data type that are represented by
// the reader and writer.
// The inputs/outputs are referenced via the names assigned in the DAG.
KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(SORTER).getWriter();
KeyValuesReader kvReader = (KeyValuesReader) getInputs().get(TOKENIZER).getReader();
while (kvReader.next()) {
Text word = (Text) kvReader.getCurrentKey();
int sum = 0;
for (Object value : kvReader.getCurrentValues()) {
sum += ((IntWritable) value).get();
}
// write the sum as the key and the word as the value
kvWriter.write(new IntWritable(sum), word);
}
}
}
/**
* No-op sorter processor. It does not need to apply any logic since the ordered partitioned edge
* ensures that we get the data sorted and grouped by the the sum key.
*/
public static class NoOpSorter extends SimpleMRProcessor {
public NoOpSorter(ProcessorContext context) {
super(context);
}
@Override
public void run() throws Exception {
Preconditions.checkArgument(getInputs().size() == 1);
Preconditions.checkArgument(getOutputs().size() == 1);
KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(OUTPUT).getWriter();
KeyValuesReader kvReader = (KeyValuesReader) getInputs().get(SUMMATION).getReader();
while (kvReader.next()) {
Object sum = kvReader.getCurrentKey();
for (Object word : kvReader.getCurrentValues()) {
kvWriter.write(word, sum);
}
}
// deriving from SimpleMRProcessor takes care of committing the output
}
}
public static DAG createDAG(TezConfiguration tezConf, String inputPath, String outputPath,
int numPartitions, boolean disableSplitGrouping, boolean isGenerateSplitInClient, String dagName) throws IOException {
DataSourceDescriptor dataSource = MRInput.createConfigBuilder(new Configuration(tezConf),
TextInputFormat.class, inputPath).groupSplits(!disableSplitGrouping)
.generateSplitsInAM(!isGenerateSplitInClient).build();
DataSinkDescriptor dataSink = MROutput.createConfigBuilder(new Configuration(tezConf),
TextOutputFormat.class, outputPath).build();
Vertex tokenizerVertex = Vertex.create(TOKENIZER, ProcessorDescriptor.create(
TokenProcessor.class.getName()));
tokenizerVertex.addDataSource(INPUT, dataSource);
// Use Text key and IntWritable value to bring counts for each word in the same partition
// The setFromConfiguration call is optional and allows overriding the config options with
// command line parameters.
OrderedPartitionedKVEdgeConfig summationEdgeConf = OrderedPartitionedKVEdgeConfig
.newBuilder(Text.class.getName(), IntWritable.class.getName(),
HashPartitioner.class.getName())
.setFromConfiguration(tezConf)
.build();
// This vertex will be reading intermediate data via an input edge and writing intermediate data
// via an output edge.
Vertex summationVertex = Vertex.create(SUMMATION, ProcessorDescriptor.create(
SumProcessor.class.getName()), numPartitions);
// Use IntWritable key and Text value to bring all words with the same count in the same
// partition. The data will be ordered by count and words grouped by count. The
// setFromConfiguration call is optional and allows overriding the config options with
// command line parameters.
OrderedPartitionedKVEdgeConfig sorterEdgeConf = OrderedPartitionedKVEdgeConfig
.newBuilder(IntWritable.class.getName(), Text.class.getName(),
HashPartitioner.class.getName())
.setFromConfiguration(tezConf)
.build();
// Use 1 task to bring all the data in one place for global sorted order. Essentially the number
// of partitions is 1. So the NoOpSorter can be used to produce the globally ordered output
Vertex sorterVertex = Vertex.create(SORTER, ProcessorDescriptor.create(
NoOpSorter.class.getName()), 1);
sorterVertex.addDataSink(OUTPUT, dataSink);
// No need to add jar containing this class as assumed to be part of the tez jars.
DAG dag = DAG.create(dagName);
dag.addVertex(tokenizerVertex)
.addVertex(summationVertex)
.addVertex(sorterVertex)
.addEdge(
Edge.create(tokenizerVertex, summationVertex,
summationEdgeConf.createDefaultEdgeProperty()))
.addEdge(
Edge.create(summationVertex, sorterVertex, sorterEdgeConf.createDefaultEdgeProperty()));
return dag;
}
@Override
protected void printUsage() {
System.err.println("Usage: " + " orderedwordcount in out [numPartitions]");
}
@Override
protected int validateArgs(String[] otherArgs) {
if (otherArgs.length < 2 || otherArgs.length > 3) {
return 2;
}
return 0;
}
@Override
protected int runJob(String[] args, TezConfiguration tezConf,
TezClient tezClient) throws Exception {
DAG dag = createDAG(tezConf, args[0], args[1],
args.length == 3 ? Integer.parseInt(args[2]) : 1, isDisableSplitGrouping(),
isGenerateSplitInClient(), "OrderedWordCount");
LOG.info("Running OrderedWordCount");
return runDag(dag, isCountersLog(), LOG);
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new OrderedWordCount(), args);
System.exit(res);
}
}