blob: 3611fd614a69817b4a04b18f04b0557eadee9c2a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.examples;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.Reader;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.KeyValueReader;
import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.apache.tez.runtime.library.processor.SimpleProcessor;
import com.google.common.base.Preconditions;
/**
* Simple example of joining 2 data sets.
* <br>The example shows a vertex with multiple inputs that represent the two
* data sets that need to be joined.
* <br>The join can be performed using a broadcast (or replicate-fragment) join in
* which the small side of the join is broadcast in total to fragments of the
* larger side. Each fragment of the larger side can perform the join operation
* independently using the full data of the smaller side. This shows the usage
* of the broadcast edge property in Tez.
* <br>The join can be performed using the regular repartition join where both
* sides are partitioned according to the same scheme into the same number of
* fragments. Then the keys in the same fragment are joined with each other. This
* is the default join strategy.
*
*/
public class JoinExample extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(JoinExample.class);
private static final String broadcastOption = "doBroadcast";
private static final String streamingSide = "streamingSide";
private static final String hashSide = "hashSide";
private static final String inputFile = "inputFile";
private static final String joiner = "joiner";
private static final String joinOutput = "joinOutput";
public static void main(String[] args) throws Exception {
JoinExample job = new JoinExample();
int status = ToolRunner.run(new Configuration(), job, args);
System.exit(status);
}
private static void printUsage() {
System.err.println("Usage: " + "joinexample <file1> <file2> <numPartitions> <outPath> ["
+ broadcastOption + "(default false)]");
ToolRunner.printGenericCommandUsage(System.err);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
int result = validateArgs(otherArgs);
if (result != 0) {
return result;
}
return execute(otherArgs);
}
public int run(Configuration conf, String[] args, TezClient tezClient) throws Exception {
setConf(conf);
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
int result = validateArgs(otherArgs);
if (result != 0) {
return result;
}
return execute(otherArgs, tezClient);
}
private int validateArgs(String[] otherArgs) {
if (!(otherArgs.length == 4 || otherArgs.length == 5)) {
printUsage();
return 2;
}
return 0;
}
private int execute(String[] args) throws TezException, IOException, InterruptedException {
TezConfiguration tezConf = new TezConfiguration(getConf());
TezClient tezClient = null;
try {
tezClient = createTezClient(tezConf);
return execute(args, tezConf, tezClient);
} finally {
if (tezClient != null) {
tezClient.stop();
}
}
}
private int execute(String[] args, TezClient tezClient) throws IOException, TezException,
InterruptedException {
TezConfiguration tezConf = new TezConfiguration(getConf());
return execute(args, tezConf, tezClient);
}
private TezClient createTezClient(TezConfiguration tezConf) throws TezException, IOException {
TezClient tezClient = TezClient.create("JoinExample", tezConf);
tezClient.start();
return tezClient;
}
private int execute(String[] args, TezConfiguration tezConf, TezClient tezClient)
throws IOException, TezException, InterruptedException {
boolean doBroadcast = args.length == 5 && args[4].equals(broadcastOption) ? true : false;
LOG.info("Running JoinExample" + (doBroadcast ? "-WithBroadcast" : ""));
UserGroupInformation.setConfiguration(tezConf);
String streamInputDir = args[0];
String hashInputDir = args[1];
int numPartitions = Integer.parseInt(args[2]);
String outputDir = args[3];
Path streamInputPath = new Path(streamInputDir);
Path hashInputPath = new Path(hashInputDir);
Path outputPath = new Path(outputDir);
// Verify output path existence
FileSystem fs = FileSystem.get(tezConf);
if (fs.exists(outputPath)) {
System.err.println("Output directory: " + outputDir + " already exists");
return 3;
}
if (numPartitions <= 0) {
System.err.println("NumPartitions must be > 0");
return 4;
}
DAG dag = createDag(tezConf, streamInputPath, hashInputPath, outputPath, numPartitions, doBroadcast);
tezClient.waitTillReady();
DAGClient dagClient = tezClient.submitDAG(dag);
DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null);
if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics());
return -1;
}
return 0;
}
private DAG createDag(TezConfiguration tezConf, Path streamPath, Path hashPath, Path outPath,
int numPartitions, boolean doBroadcast) throws IOException {
DAG dag = DAG.create("JoinExample" + (doBroadcast ? "-WithBroadcast" : ""));
/**
* This vertex represents the side of the join that will be accumulated in a hash
* table in order to join it against the other side. It reads text data using the
* TextInputFormat. ForwardingProcessor simply forwards the data downstream as is.
*/
Vertex hashFileVertex = Vertex.create(hashSide, ProcessorDescriptor.create(
ForwardingProcessor.class.getName())).addDataSource(
inputFile,
MRInput
.createConfigBuilder(new Configuration(tezConf), TextInputFormat.class,
hashPath.toUri().toString()).groupSplits(false).build());
/**
* This vertex represents that side of the data that will be streamed and joined
* against the other side that has been accumulated into a hash table. It reads
* text data using the TextInputFormat. ForwardingProcessor simply forwards the data
* downstream as is.
*/
Vertex streamFileVertex = Vertex.create(streamingSide, ProcessorDescriptor.create(
ForwardingProcessor.class.getName())).addDataSource(
inputFile,
MRInput
.createConfigBuilder(new Configuration(tezConf), TextInputFormat.class,
streamPath.toUri().toString()).groupSplits(false).build());
/**
* This vertex represents the join operation. It writes the join output as text using
* the TextOutputFormat. The JoinProcessor is going to perform the join of the
* streaming side and the hash side. It is load balanced across numPartitions
*/
Vertex joinVertex = Vertex.create(joiner, ProcessorDescriptor.create(
JoinProcessor.class.getName()), numPartitions).addDataSink(joinOutput,
MROutput.createConfigBuilder(new Configuration(tezConf),
TextOutputFormat.class, outPath.toUri().toString()).build());
/**
* The streamed side will be partitioned into fragments with the same keys going to
* the same fragments using hash partitioning. The data to be joined is the key itself
* and so the value is null. The number of fragments is initially inferred from the
* number of tasks running in the join vertex because each task will be handling one
* fragment.
*/
UnorderedPartitionedKVEdgeConfig streamConf =
UnorderedPartitionedKVEdgeConfig
.newBuilder(Text.class.getName(), NullWritable.class.getName(),
HashPartitioner.class.getName()).setFromConfiguration(tezConf).build();
/**
* Connect the join vertex with the stream side
*/
Edge e1 = Edge.create(streamFileVertex, joinVertex, streamConf.createDefaultEdgeProperty());
EdgeProperty hashSideEdgeProperty = null;
if (doBroadcast) {
/**
* This option can be used when the hash side is small. We can broadcast the entire data to
* all fragments of the stream side. This avoids re-partitioning the fragments of the stream
* side to match the partitioning scheme of the hash side and avoids costly network data
* transfer. However, in this example the stream side is being partitioned in both cases for
* brevity of code. The join task can perform the join of its fragment of keys with all the
* keys of the hash side.
* Using an unpartitioned edge to transfer the complete output of the hash side to be
* broadcasted to all fragments of the streamed side. Again, since the data is the key, the
* value is null.
*/
UnorderedKVEdgeConfig broadcastConf = UnorderedKVEdgeConfig.newBuilder(Text.class.getName(),
NullWritable.class.getName()).setFromConfiguration(tezConf).build();
hashSideEdgeProperty = broadcastConf.createDefaultBroadcastEdgeProperty();
} else {
/**
* The hash side is also being partitioned into fragments with the same key going to the same
* fragment using hash partitioning. This way all keys with the same hash value will go to the
* same fragment from both sides. Thus the join task handling that fragment can join both data
* set fragments.
*/
hashSideEdgeProperty = streamConf.createDefaultEdgeProperty();
}
/**
* Connect the join vertex to the hash side.
* The join vertex is connected with 2 upstream vertices that provide it with inputs
*/
Edge e2 = Edge.create(hashFileVertex, joinVertex, hashSideEdgeProperty);
/**
* Connect everything up by adding them to the DAG
*/
dag.addVertex(streamFileVertex).addVertex(hashFileVertex).addVertex(joinVertex)
.addEdge(e1).addEdge(e2);
return dag;
}
/**
* Reads key-values from the source and forwards the value as the key for the output
*/
public static class ForwardingProcessor extends SimpleProcessor {
public ForwardingProcessor(ProcessorContext context) {
super(context);
}
@Override
public void run() throws Exception {
Preconditions.checkState(getInputs().size() == 1);
Preconditions.checkState(getOutputs().size() == 1);
// not looking up inputs and outputs by name because there is just one
// instance and this processor is used in many different DAGs with
// different names for inputs and outputs
LogicalInput input = getInputs().values().iterator().next();
Reader rawReader = input.getReader();
Preconditions.checkState(rawReader instanceof KeyValueReader);
LogicalOutput output = getOutputs().values().iterator().next();
KeyValueReader reader = (KeyValueReader) rawReader;
KeyValueWriter writer = (KeyValueWriter) output.getWriter();
while (reader.next()) {
Object val = reader.getCurrentValue();
// The data value itself is the join key. Simply write it out as the key.
// The output value is null.
writer.write(val, NullWritable.get());
}
}
}
public static class JoinProcessor extends SimpleMRProcessor {
public JoinProcessor(ProcessorContext context) {
super(context);
}
@Override
public void run() throws Exception {
Preconditions.checkState(getInputs().size() == 2);
Preconditions.checkState(getOutputs().size() == 1);
// Get the input data for the 2 sides of the join from the 2 inputs
LogicalInput streamInput = getInputs().get(streamingSide);
LogicalInput hashInput = getInputs().get(hashSide);
Reader rawStreamReader = streamInput.getReader();
Reader rawHashReader = hashInput.getReader();
Preconditions.checkState(rawStreamReader instanceof KeyValueReader);
Preconditions.checkState(rawHashReader instanceof KeyValueReader);
LogicalOutput lo = getOutputs().get(joinOutput);
Preconditions.checkState(lo.getWriter() instanceof KeyValueWriter);
KeyValueWriter writer = (KeyValueWriter) lo.getWriter();
// create a hash table for the hash side
KeyValueReader hashKvReader = (KeyValueReader) rawHashReader;
Set<Text> keySet = new HashSet<Text>();
while (hashKvReader.next()) {
keySet.add(new Text((Text) hashKvReader.getCurrentKey()));
}
// read the stream side and join it using the hash table
KeyValueReader streamKvReader = (KeyValueReader) rawStreamReader;
while (streamKvReader.next()) {
Text key = (Text) streamKvReader.getCurrentKey();
if (keySet.contains(key)) {
writer.write(key, NullWritable.get());
}
}
}
}
}