blob: 60ce3dabc4e64068b1ba8e7cc77743e7a45f3a15 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.mapreduce.examples;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
/**
* Simple example that does a GROUP BY ORDER BY in an MRR job
* Consider a query such as
* Select DeptName, COUNT(*) as cnt FROM EmployeeTable
* GROUP BY DeptName ORDER BY cnt;
*
* i.e. List all departments with count of employees in each department
* and ordered based on department's employee count.
*
* Requires an Input file containing 2 strings per line in format of
* <EmployeeName> <DeptName>
*
* For example, use the following:
*
* #/bin/bash
*
* i=1000000
* j=1000
*
* id=0
* while [[ "$id" -ne "$i" ]]
* do
* id=`expr $id + 1`
* deptId=`expr $RANDOM % $j + 1`
* deptName=`echo "ibase=10;obase=16;$deptId" | bc`
* echo "$id O$deptName"
* done
*
*/
public class GroupByOrderByMRRTest {
private static final Log LOG = LogFactory.getLog(GroupByOrderByMRRTest.class);
/**
* Mapper takes in a single line as input containing
* employee name and department name and then
* emits department name with count of 1
*/
public static class MyMapper
extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private final static Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
String empName = "";
String deptName = "";
if (itr.hasMoreTokens()) {
empName = itr.nextToken();
if (itr.hasMoreTokens()) {
deptName = itr.nextToken();
}
if (!empName.isEmpty()
&& !deptName.isEmpty()) {
word.set(deptName);
context.write(word, one);
}
}
}
}
/**
* Intermediate reducer aggregates the total count per department.
* It takes department name and count as input and emits the final
* count per department name.
*/
public static class MyGroupByReducer
extends Reducer<Text, IntWritable, IntWritable, Text> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(result, key);
}
}
/**
* Shuffle ensures ordering based on count of employees per department
* hence the final reducer is a no-op and just emits the department name
* with the employee count per department.
*/
public static class MyOrderByNoOpReducer
extends Reducer<IntWritable, Text, Text, IntWritable> {
public void reduce(IntWritable key, Iterable<Text> values,
Context context
) throws IOException, InterruptedException {
for (Text word : values) {
context.write(word, key);
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// Configure intermediate reduces
conf.setInt(MRJobConfig.MRR_INTERMEDIATE_STAGES, 1);
// Set reducer class for intermediate reduce
conf.setClass(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(1,
"mapreduce.job.reduce.class"), MyGroupByReducer.class, Reducer.class);
// Set reducer output key class
conf.setClass(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(1,
"mapreduce.map.output.key.class"), IntWritable.class, Object.class);
// Set reducer output value class
conf.setClass(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(1,
"mapreduce.map.output.value.class"), Text.class, Object.class);
conf.setInt(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(1,
"mapreduce.job.reduces"), 2);
String[] otherArgs = new GenericOptionsParser(conf, args).
getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: groupbyorderbymrrtest <in> <out>");
System.exit(2);
}
@SuppressWarnings("deprecation")
Job job = new Job(conf, "groupbyorderbymrrtest");
job.setJarByClass(GroupByOrderByMRRTest.class);
// Configure map
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// Configure reduce
job.setReducerClass(MyOrderByNoOpReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setNumReduceTasks(1);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
TezClient tezClient = new TezClient(new TezConfiguration(conf));
job.submit();
JobID jobId = job.getJobID();
ApplicationId appId = TypeConverter.toYarn(jobId).getAppId();
DAGClient dagClient = tezClient.getDAGClient(appId);
DAGStatus dagStatus;
String[] vNames = { "initialmap" , "ireduce1" , "finalreduce" };
while (true) {
dagStatus = dagClient.getDAGStatus(null);
if(dagStatus.getState() == DAGStatus.State.RUNNING ||
dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
dagStatus.getState() == DAGStatus.State.FAILED ||
dagStatus.getState() == DAGStatus.State.KILLED ||
dagStatus.getState() == DAGStatus.State.ERROR) {
break;
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// continue;
}
}
while (dagStatus.getState() == DAGStatus.State.RUNNING) {
try {
ExampleDriver.printDAGStatus(dagClient, vNames);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// continue;
}
dagStatus = dagClient.getDAGStatus(null);
} catch (TezException e) {
LOG.fatal("Failed to get application progress. Exiting");
System.exit(-1);
}
}
ExampleDriver.printDAGStatus(dagClient, vNames);
LOG.info("Application completed. " + "FinalState=" + dagStatus.getState());
System.exit(dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1);
}
}