| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tez.examples; |
| |
| import java.io.IOException; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.conf.Configured; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.NullWritable; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; |
| import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.util.GenericOptionsParser; |
| import org.apache.hadoop.util.Tool; |
| import org.apache.hadoop.util.ToolRunner; |
| import org.apache.tez.client.TezClient; |
| import org.apache.tez.dag.api.DAG; |
| import org.apache.tez.dag.api.Edge; |
| import org.apache.tez.dag.api.ProcessorDescriptor; |
| import org.apache.tez.dag.api.TezConfiguration; |
| import org.apache.tez.dag.api.TezException; |
| import org.apache.tez.dag.api.Vertex; |
| import org.apache.tez.dag.api.client.DAGClient; |
| import org.apache.tez.dag.api.client.DAGStatus; |
| import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; |
| import org.apache.tez.examples.HashJoinExample.ForwardingProcessor; |
| import org.apache.tez.mapreduce.input.MRInput; |
| import org.apache.tez.mapreduce.output.MROutput; |
| import org.apache.tez.mapreduce.processor.SimpleMRProcessor; |
| import org.apache.tez.runtime.api.LogicalInput; |
| import org.apache.tez.runtime.api.LogicalOutput; |
| import org.apache.tez.runtime.api.ProcessorContext; |
| import org.apache.tez.runtime.api.Reader; |
| import org.apache.tez.runtime.library.api.KeyValueWriter; |
| import org.apache.tez.runtime.library.api.KeyValuesReader; |
| import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig; |
| import org.apache.tez.runtime.library.partitioner.HashPartitioner; |
| |
| import com.google.common.base.Preconditions; |
| |
| /** |
| * Simple example of joining 2 data sets using <a |
| * href="http://en.wikipedia.org/wiki/Sort-merge_join">Sort-Merge Join</a><br> |
| * There're 2 differences between {@link SortMergeJoinExample} and |
| * {@link HashJoinExample}. <li>We always load one data set(hashFile) in memory |
| * in {@link HashJoinExample} which require one dataset(hashFile) must be small |
| * enough to fit into memory, while in {@link SortMergeJoinExample}, it does not |
| * load one data set into memory, it just sort the output of the datasets before |
| * feeding to {@link SortMergeJoinProcessor}, just like the sort phase before |
| * reduce in traditional MapReduce. Then we could move forward the iterators of |
| * two inputs in {@link SortMergeJoinProcessor} to find the joined keys since |
| * they are both sorted already. <br> <li>Because of the sort implemention |
| * difference we describe above, the data requirement is also different for |
| * these 2 sort algorithms. For {@link HashJoinExample} It is required that keys |
| * in the hashFile are unique. while for {@link SortMergeJoinExample} it is |
| * required that keys in the both 2 datasets are unique. |
| */ |
| public class SortMergeJoinExample extends Configured implements Tool { |
| |
| private static final Log LOG = LogFactory.getLog(SortMergeJoinExample.class); |
| |
| private static final String input1 = "input1"; |
| private static final String input2 = "input2"; |
| private static final String inputFile = "inputFile"; |
| private static final String joiner = "joiner"; |
| private static final String joinOutput = "joinOutput"; |
| |
| public static void main(String[] args) throws Exception { |
| SortMergeJoinExample job = new SortMergeJoinExample(); |
| int status = ToolRunner.run(new Configuration(), job, args); |
| System.exit(status); |
| } |
| |
| private static void printUsage() { |
| System.err.println("Usage: " |
| + "sortmergejoin <file1> <file2> <numPartitions> <outPath>"); |
| ToolRunner.printGenericCommandUsage(System.err); |
| } |
| |
| @Override |
| public int run(String[] args) throws Exception { |
| Configuration conf = getConf(); |
| String[] otherArgs = |
| new GenericOptionsParser(conf, args).getRemainingArgs(); |
| int result = validateArgs(otherArgs); |
| if (result != 0) { |
| return result; |
| } |
| return execute(otherArgs); |
| } |
| |
| public int run(Configuration conf, String[] args, TezClient tezClient) |
| throws Exception { |
| setConf(conf); |
| String[] otherArgs = |
| new GenericOptionsParser(conf, args).getRemainingArgs(); |
| int result = validateArgs(otherArgs); |
| if (result != 0) { |
| return result; |
| } |
| return execute(otherArgs, tezClient); |
| } |
| |
| private int validateArgs(String[] otherArgs) { |
| if (otherArgs.length != 4) { |
| printUsage(); |
| return 2; |
| } |
| return 0; |
| } |
| |
| private int execute(String[] args) throws TezException, IOException, |
| InterruptedException { |
| TezConfiguration tezConf = new TezConfiguration(getConf()); |
| TezClient tezClient = null; |
| try { |
| tezClient = createTezClient(tezConf); |
| return execute(args, tezConf, tezClient); |
| } finally { |
| if (tezClient != null) { |
| tezClient.stop(); |
| } |
| } |
| } |
| |
| private int execute(String[] args, TezClient tezClient) throws IOException, |
| TezException, InterruptedException { |
| TezConfiguration tezConf = new TezConfiguration(getConf()); |
| return execute(args, tezConf, tezClient); |
| } |
| |
| private TezClient createTezClient(TezConfiguration tezConf) |
| throws TezException, IOException { |
| TezClient tezClient = TezClient.create("SortMergeJoinExample", tezConf); |
| tezClient.start(); |
| return tezClient; |
| } |
| |
| private int execute(String[] args, TezConfiguration tezConf, |
| TezClient tezClient) throws IOException, TezException, |
| InterruptedException { |
| LOG.info("Running SortMergeJoinExample"); |
| |
| UserGroupInformation.setConfiguration(tezConf); |
| |
| String inputDir1 = args[0]; |
| String inputDir2 = args[1]; |
| int numPartitions = Integer.parseInt(args[2]); |
| String outputDir = args[3]; |
| |
| Path inputPath1 = new Path(inputDir1); |
| Path inputPath2 = new Path(inputDir2); |
| Path outputPath = new Path(outputDir); |
| |
| // Verify output path existence |
| FileSystem fs = FileSystem.get(tezConf); |
| if (fs.exists(outputPath)) { |
| System.err.println("Output directory: " + outputDir + " already exists"); |
| return 3; |
| } |
| if (numPartitions <= 0) { |
| System.err.println("NumPartitions must be > 0"); |
| return 4; |
| } |
| |
| DAG dag = |
| createDag(tezConf, inputPath1, inputPath2, outputPath, numPartitions); |
| |
| tezClient.waitTillReady(); |
| DAGClient dagClient = tezClient.submitDAG(dag); |
| DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null); |
| if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) { |
| LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics()); |
| return -1; |
| } |
| return 0; |
| |
| } |
| |
| /** |
| * v1 v2 <br> |
| * \ / <br> |
| * v3 <br> |
| * |
| * @param tezConf |
| * @param inputPath1 |
| * @param inputPath2 |
| * @param outPath |
| * @param numPartitions |
| * @return |
| * @throws IOException |
| */ |
| private DAG createDag(TezConfiguration tezConf, Path inputPath1, |
| Path inputPath2, Path outPath, int numPartitions) throws IOException { |
| DAG dag = DAG.create("SortMergeJoinExample"); |
| |
| /** |
| * This vertex represents the one side of the join. It reads text data using |
| * the TextInputFormat. ForwardingProcessor simply forwards the data |
| * downstream as is. |
| */ |
| Vertex inputVertex1 = |
| Vertex.create("input1", |
| ProcessorDescriptor.create(ForwardingProcessor.class.getName())) |
| .addDataSource( |
| inputFile, |
| MRInput |
| .createConfigBuilder(new Configuration(tezConf), |
| TextInputFormat.class, inputPath1.toUri().toString()) |
| .groupSplits(false).build()); |
| |
| /** |
| * The other vertex represents the other side of the join. It reads text |
| * data using the TextInputFormat. ForwardingProcessor simply forwards the |
| * data downstream as is. |
| */ |
| Vertex inputVertex2 = |
| Vertex.create("input2", |
| ProcessorDescriptor.create(ForwardingProcessor.class.getName())) |
| .addDataSource( |
| inputFile, |
| MRInput |
| .createConfigBuilder(new Configuration(tezConf), |
| TextInputFormat.class, inputPath2.toUri().toString()) |
| .groupSplits(false).build()); |
| |
| /** |
| * This vertex represents the join operation. It writes the join output as |
| * text using the TextOutputFormat. The JoinProcessor is going to perform |
| * the join of the two sorted output from inputVertex1 and inputVerex2. It |
| * is load balanced across numPartitions. |
| */ |
| Vertex joinVertex = Vertex |
| .create(joiner, ProcessorDescriptor.create(SortMergeJoinProcessor.class.getName()), |
| numPartitions) |
| .setVertexManagerPlugin( |
| ShuffleVertexManager.createConfigBuilder(tezConf).setAutoReduceParallelism(true) |
| .build()) |
| .addDataSink( |
| joinOutput, |
| MROutput.createConfigBuilder(new Configuration(tezConf), TextOutputFormat.class, |
| outPath.toUri().toString()).build()); |
| |
| /** |
| * The output of inputVertex1 and inputVertex2 will be partitioned into |
| * fragments with the same keys going to the same fragments using hash |
| * partitioning. The data to be joined is the key itself and so the value is |
| * null. And these outputs will be sorted before feeding them to |
| * JoinProcessor. The number of fragments is initially inferred from the |
| * number of tasks running in the join vertex because each task will be |
| * handling one fragment. |
| * Edge config options are derived from client-side tez-site.xml (recommended). Optionally |
| * invoke setFromConfiguration to override these config options via commandline arguments. |
| */ |
| OrderedPartitionedKVEdgeConfig edgeConf = |
| OrderedPartitionedKVEdgeConfig |
| .newBuilder(Text.class.getName(), NullWritable.class.getName(), |
| HashPartitioner.class.getName()).setFromConfiguration(tezConf) |
| .build(); |
| |
| /** |
| * Connect the join vertex with inputVertex1 with the EdgeProperty created |
| * from {@link OrderedPartitionedKVEdgeConfig} so that the output of |
| * inputVertex1 is sorted before feeding it to JoinProcessor |
| */ |
| Edge e1 = |
| Edge.create(inputVertex1, joinVertex, |
| edgeConf.createDefaultEdgeProperty()); |
| /** |
| * Connect the join vertex with inputVertex2 with the EdgeProperty created |
| * from {@link OrderedPartitionedKVEdgeConfig} so that the output of |
| * inputVertex1 is sorted before feeding it to JoinProcessor |
| */ |
| Edge e2 = |
| Edge.create(inputVertex2, joinVertex, |
| edgeConf.createDefaultEdgeProperty()); |
| |
| dag.addVertex(inputVertex1).addVertex(inputVertex2).addVertex(joinVertex) |
| .addEdge(e1).addEdge(e2); |
| return dag; |
| } |
| |
| /** |
| * Join 2 inputs which has already been sorted. Check the algorithm here <a |
| * href="http://en.wikipedia.org/wiki/Sort-merge_join">Sort-Merge Join</a><br> |
| * It require the keys in both datasets are unique. <br> |
| * Disclaimer: The join code here is written as a tutorial for the APIs and |
| * not for performance. |
| */ |
| public static class SortMergeJoinProcessor extends SimpleMRProcessor { |
| |
| public SortMergeJoinProcessor(ProcessorContext context) { |
| super(context); |
| } |
| |
| @Override |
| public void run() throws Exception { |
| Preconditions.checkState(getInputs().size() == 2); |
| Preconditions.checkState(getOutputs().size() == 1); |
| // Get the input data for the 2 sides of the join from the 2 inputs |
| LogicalInput logicalInput1 = getInputs().get(input1); |
| LogicalInput logicalInput2 = getInputs().get(input2); |
| Reader inputReader1 = logicalInput1.getReader(); |
| Reader inputReader2 = logicalInput2.getReader(); |
| Preconditions.checkState(inputReader1 instanceof KeyValuesReader); |
| Preconditions.checkState(inputReader2 instanceof KeyValuesReader); |
| LogicalOutput lo = getOutputs().get(joinOutput); |
| Preconditions.checkState(lo.getWriter() instanceof KeyValueWriter); |
| KeyValueWriter writer = (KeyValueWriter) lo.getWriter(); |
| |
| join((KeyValuesReader) inputReader1, (KeyValuesReader) inputReader2, |
| writer); |
| } |
| |
| /** |
| * Join 2 sorted inputs both from {@link KeyValuesReader} and write output |
| * using {@link KeyValueWriter} |
| * |
| * @param inputReader1 |
| * @param inputReader2 |
| * @param writer |
| * @throws IOException |
| */ |
| private void join(KeyValuesReader inputReader1, |
| KeyValuesReader inputReader2, KeyValueWriter writer) throws IOException { |
| |
| while (inputReader1.next() && inputReader2.next()) { |
| Text value1 = (Text) inputReader1.getCurrentKey(); |
| Text value2 = (Text) inputReader2.getCurrentKey(); |
| boolean reachEnd = false; |
| // move the cursor of 2 inputs forward until find the same values or one |
| // of them reach the end. |
| while (value1.compareTo(value2) != 0) { |
| if (value1.compareTo(value2) > 0) { |
| if (inputReader2.next()) { |
| value2 = (Text) inputReader2.getCurrentKey(); |
| } else { |
| reachEnd = true; |
| break; |
| } |
| } else { |
| if (inputReader1.next()) { |
| value1 = (Text) inputReader1.getCurrentKey(); |
| } else { |
| reachEnd = true; |
| break; |
| } |
| } |
| } |
| |
| if (reachEnd) { |
| break; |
| } else { |
| writer.write(value1, NullWritable.get()); |
| } |
| } |
| } |
| } |
| } |