| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tez.mapreduce.examples; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.StringTokenizer; |
| import java.util.TreeMap; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.conf.Configured; |
| import org.apache.hadoop.fs.FileAlreadyExistsException; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.IntWritable; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapreduce.Mapper; |
| import org.apache.hadoop.mapreduce.Reducer; |
| import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; |
| import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; |
| import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.util.GenericOptionsParser; |
| import org.apache.hadoop.util.Tool; |
| import org.apache.hadoop.util.ToolRunner; |
| import org.apache.hadoop.yarn.api.records.LocalResource; |
| import org.apache.tez.client.TezClient; |
| import org.apache.tez.common.TezUtils; |
| import org.apache.tez.dag.api.DAG; |
| import org.apache.tez.dag.api.DataSourceDescriptor; |
| import org.apache.tez.dag.api.Edge; |
| import org.apache.tez.dag.api.ProcessorDescriptor; |
| import org.apache.tez.dag.api.TezConfiguration; |
| import org.apache.tez.dag.api.UserPayload; |
| import org.apache.tez.dag.api.Vertex; |
| import org.apache.tez.dag.api.client.DAGClient; |
| import org.apache.tez.dag.api.client.DAGStatus; |
| import org.apache.tez.mapreduce.hadoop.MRHelpers; |
| import org.apache.tez.mapreduce.hadoop.MRInputHelpers; |
| import org.apache.tez.mapreduce.hadoop.MRJobConfig; |
| import org.apache.tez.mapreduce.output.MROutputLegacy; |
| import org.apache.tez.mapreduce.processor.map.MapProcessor; |
| import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor; |
| import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; |
| import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig; |
| import org.apache.tez.runtime.library.partitioner.HashPartitioner; |
| |
| /** |
| * Simple example that does a GROUP BY ORDER BY in an MRR job |
| * Consider a query such as |
| * Select DeptName, COUNT(*) as cnt FROM EmployeeTable |
| * GROUP BY DeptName ORDER BY cnt; |
| * |
| * i.e. List all departments with count of employees in each department |
| * and ordered based on department's employee count. |
| * |
| * Requires an Input file containing 2 strings per line in format of |
| * <EmployeeName> <DeptName> |
| * |
| * For example, use the following: |
| * |
| * #/bin/bash |
| * |
| * i=1000000 |
| * j=1000 |
| * |
| * id=0 |
| * while [[ "$id" -ne "$i" ]] |
| * do |
| * id=`expr $id + 1` |
| * deptId=`expr $RANDOM % $j + 1` |
| * deptName=`echo "ibase=10;obase=16;$deptId" | bc` |
| * echo "$id O$deptName" |
| * done |
| * |
| */ |
| public class GroupByOrderByMRRTest extends Configured implements Tool { |
| |
| private static final Log LOG = LogFactory.getLog(GroupByOrderByMRRTest.class); |
| |
| /** |
| * Mapper takes in a single line as input containing |
| * employee name and department name and then |
| * emits department name with count of 1 |
| */ |
| public static class MyMapper |
| extends Mapper<Object, Text, Text, IntWritable> { |
| |
| private final static IntWritable one = new IntWritable(1); |
| private final static Text word = new Text(); |
| |
| public void map(Object key, Text value, Context context |
| ) throws IOException, InterruptedException { |
| StringTokenizer itr = new StringTokenizer(value.toString()); |
| String empName; |
| String deptName = ""; |
| if (itr.hasMoreTokens()) { |
| empName = itr.nextToken(); |
| if (itr.hasMoreTokens()) { |
| deptName = itr.nextToken(); |
| } |
| if (!empName.isEmpty() |
| && !deptName.isEmpty()) { |
| word.set(deptName); |
| context.write(word, one); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Intermediate reducer aggregates the total count per department. |
| * It takes department name and count as input and emits the final |
| * count per department name. |
| */ |
| public static class MyGroupByReducer |
| extends Reducer<Text, IntWritable, IntWritable, Text> { |
| private IntWritable result = new IntWritable(); |
| |
| public void reduce(Text key, Iterable<IntWritable> values, |
| Context context |
| ) throws IOException, InterruptedException { |
| |
| int sum = 0; |
| for (IntWritable val : values) { |
| sum += val.get(); |
| } |
| result.set(sum); |
| context.write(result, key); |
| } |
| } |
| |
| /** |
| * Shuffle ensures ordering based on count of employees per department |
| * hence the final reducer is a no-op and just emits the department name |
| * with the employee count per department. |
| */ |
| public static class MyOrderByNoOpReducer |
| extends Reducer<IntWritable, Text, Text, IntWritable> { |
| |
| public void reduce(IntWritable key, Iterable<Text> values, |
| Context context |
| ) throws IOException, InterruptedException { |
| for (Text word : values) { |
| context.write(word, key); |
| } |
| } |
| } |
| |
| private static DAG createDAG(Configuration conf, Map<String, LocalResource> commonLocalResources, |
| Path stagingDir, String inputPath, String outputPath, boolean useMRSettings) |
| throws Exception { |
| |
| Configuration mapStageConf = new JobConf(conf); |
| mapStageConf.set(MRJobConfig.MAP_CLASS_ATTR, |
| MyMapper.class.getName()); |
| |
| MRHelpers.translateMRConfToTez(mapStageConf); |
| |
| Configuration iReduceStageConf = new JobConf(conf); |
| // TODO replace with auto-reduce parallelism |
| iReduceStageConf.setInt(MRJobConfig.NUM_REDUCES, 2); |
| iReduceStageConf.set(MRJobConfig.REDUCE_CLASS_ATTR, |
| MyGroupByReducer.class.getName()); |
| iReduceStageConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName()); |
| iReduceStageConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, |
| IntWritable.class.getName()); |
| iReduceStageConf.setBoolean("mapred.mapper.new-api", true); |
| MRHelpers.translateMRConfToTez(iReduceStageConf); |
| |
| Configuration finalReduceConf = new JobConf(conf); |
| finalReduceConf.setInt(MRJobConfig.NUM_REDUCES, 1); |
| finalReduceConf.set(MRJobConfig.REDUCE_CLASS_ATTR, |
| MyOrderByNoOpReducer.class.getName()); |
| finalReduceConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName()); |
| finalReduceConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName()); |
| MRHelpers.translateMRConfToTez(finalReduceConf); |
| |
| MRHelpers.configureMRApiUsage(mapStageConf); |
| MRHelpers.configureMRApiUsage(iReduceStageConf); |
| MRHelpers.configureMRApiUsage(finalReduceConf); |
| |
| List<Vertex> vertices = new ArrayList<Vertex>(); |
| |
| ByteArrayOutputStream outputStream = new ByteArrayOutputStream(4096); |
| mapStageConf.writeXml(outputStream); |
| String mapStageHistoryText = new String(outputStream.toByteArray(), "UTF-8"); |
| mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR, |
| TextInputFormat.class.getName()); |
| mapStageConf.set(FileInputFormat.INPUT_DIR, inputPath); |
| mapStageConf.setBoolean("mapred.mapper.new-api", true); |
| DataSourceDescriptor dsd = MRInputHelpers.configureMRInputWithLegacySplitGeneration( |
| mapStageConf, stagingDir, true); |
| |
| Vertex mapVertex; |
| ProcessorDescriptor mapProcessorDescriptor = |
| ProcessorDescriptor.create(MapProcessor.class.getName()) |
| .setUserPayload( |
| TezUtils.createUserPayloadFromConf(mapStageConf)) |
| .setHistoryText(mapStageHistoryText); |
| if (!useMRSettings) { |
| mapVertex = Vertex.create("initialmap", mapProcessorDescriptor); |
| } else { |
| mapVertex = Vertex.create("initialmap", mapProcessorDescriptor, -1, |
| MRHelpers.getResourceForMRMapper(mapStageConf)); |
| mapVertex.setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRMapper(mapStageConf)); |
| } |
| mapVertex.addTaskLocalFiles(commonLocalResources) |
| .addDataSource("MRInput", dsd); |
| vertices.add(mapVertex); |
| |
| ByteArrayOutputStream iROutputStream = new ByteArrayOutputStream(4096); |
| iReduceStageConf.writeXml(iROutputStream); |
| String iReduceStageHistoryText = new String(iROutputStream.toByteArray(), "UTF-8"); |
| |
| ProcessorDescriptor iReduceProcessorDescriptor = ProcessorDescriptor.create( |
| ReduceProcessor.class.getName()) |
| .setUserPayload(TezUtils.createUserPayloadFromConf(iReduceStageConf)) |
| .setHistoryText(iReduceStageHistoryText); |
| |
| Vertex intermediateVertex; |
| if (!useMRSettings) { |
| intermediateVertex = Vertex.create("ireduce1", iReduceProcessorDescriptor, 1); |
| } else { |
| intermediateVertex = Vertex.create("ireduce1", iReduceProcessorDescriptor, |
| 1, MRHelpers.getResourceForMRReducer(iReduceStageConf)); |
| intermediateVertex.setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRReducer(iReduceStageConf)); |
| } |
| intermediateVertex.addTaskLocalFiles(commonLocalResources); |
| vertices.add(intermediateVertex); |
| |
| ByteArrayOutputStream finalReduceOutputStream = new ByteArrayOutputStream(4096); |
| finalReduceConf.writeXml(finalReduceOutputStream); |
| String finalReduceStageHistoryText = new String(finalReduceOutputStream.toByteArray(), "UTF-8"); |
| UserPayload finalReducePayload = TezUtils.createUserPayloadFromConf(finalReduceConf); |
| Vertex finalReduceVertex; |
| |
| ProcessorDescriptor finalReduceProcessorDescriptor = |
| ProcessorDescriptor.create( |
| ReduceProcessor.class.getName()) |
| .setUserPayload(finalReducePayload) |
| .setHistoryText(finalReduceStageHistoryText); |
| if (!useMRSettings) { |
| finalReduceVertex = Vertex.create("finalreduce", finalReduceProcessorDescriptor, 1); |
| } else { |
| finalReduceVertex = Vertex.create("finalreduce", finalReduceProcessorDescriptor, 1, |
| MRHelpers.getResourceForMRReducer(finalReduceConf)); |
| finalReduceVertex.setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRReducer(finalReduceConf)); |
| } |
| finalReduceVertex.addTaskLocalFiles(commonLocalResources); |
| finalReduceVertex.addDataSink("MROutput", |
| MROutputLegacy.createConfigBuilder(finalReduceConf, TextOutputFormat.class, outputPath) |
| .build()); |
| vertices.add(finalReduceVertex); |
| |
| DAG dag = DAG.create("groupbyorderbymrrtest"); |
| for (Vertex v : vertices) { |
| dag.addVertex(v); |
| } |
| |
| OrderedPartitionedKVEdgeConfig edgeConf1 = OrderedPartitionedKVEdgeConfig |
| .newBuilder(Text.class.getName(), IntWritable.class.getName(), |
| HashPartitioner.class.getName()).setFromConfiguration(conf) |
| .configureInput().useLegacyInput().done().build(); |
| dag.addEdge( |
| Edge.create(dag.getVertex("initialmap"), dag.getVertex("ireduce1"), |
| edgeConf1.createDefaultEdgeProperty())); |
| |
| OrderedPartitionedKVEdgeConfig edgeConf2 = OrderedPartitionedKVEdgeConfig |
| .newBuilder(IntWritable.class.getName(), Text.class.getName(), |
| HashPartitioner.class.getName()).setFromConfiguration(conf) |
| .configureInput().useLegacyInput().done().build(); |
| dag.addEdge( |
| Edge.create(dag.getVertex("ireduce1"), dag.getVertex("finalreduce"), |
| edgeConf2.createDefaultEdgeProperty())); |
| |
| return dag; |
| } |
| |
| |
| @Override |
| public int run(String[] args) throws Exception { |
| Configuration conf = getConf(); |
| |
| String[] otherArgs = new GenericOptionsParser(conf, args). |
| getRemainingArgs(); |
| if (otherArgs.length != 2) { |
| System.err.println("Usage: groupbyorderbymrrtest <in> <out>"); |
| ToolRunner.printGenericCommandUsage(System.err); |
| return 2; |
| } |
| |
| String inputPath = otherArgs[0]; |
| String outputPath = otherArgs[1]; |
| |
| UserGroupInformation.setConfiguration(conf); |
| |
| TezConfiguration tezConf = new TezConfiguration(conf); |
| FileSystem fs = FileSystem.get(conf); |
| |
| if (fs.exists(new Path(outputPath))) { |
| throw new FileAlreadyExistsException("Output directory " |
| + outputPath + " already exists"); |
| } |
| |
| Map<String, LocalResource> localResources = |
| new TreeMap<String, LocalResource>(); |
| |
| String stagingDirStr = conf.get(TezConfiguration.TEZ_AM_STAGING_DIR, |
| TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT) + Path.SEPARATOR + |
| Long.toString(System.currentTimeMillis()); |
| Path stagingDir = new Path(stagingDirStr); |
| FileSystem pathFs = stagingDir.getFileSystem(tezConf); |
| pathFs.mkdirs(new Path(stagingDirStr)); |
| |
| tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirStr); |
| stagingDir = pathFs.makeQualified(new Path(stagingDirStr)); |
| |
| TezClient tezClient = TezClient.create("groupbyorderbymrrtest", tezConf); |
| tezClient.start(); |
| |
| LOG.info("Submitting groupbyorderbymrrtest DAG as a new Tez Application"); |
| |
| try { |
| DAG dag = createDAG(conf, localResources, stagingDir, inputPath, outputPath, true); |
| |
| tezClient.waitTillReady(); |
| |
| DAGClient dagClient = tezClient.submitDAG(dag); |
| |
| DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null); |
| if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) { |
| LOG.error("groupbyorderbymrrtest failed, state=" + dagStatus.getState() |
| + ", diagnostics=" + dagStatus.getDiagnostics()); |
| return -1; |
| } |
| LOG.info("Application completed. " + "FinalState=" + dagStatus.getState()); |
| return 0; |
| } finally { |
| tezClient.stop(); |
| } |
| } |
| |
| public static void main(String[] args) throws Exception { |
| Configuration configuration = new Configuration(); |
| GroupByOrderByMRRTest groupByOrderByMRRTest = new GroupByOrderByMRRTest(); |
| int status = ToolRunner.run(configuration, groupByOrderByMRRTest, args); |
| System.exit(status); |
| } |
| } |