blob: a925137f97f3627bc883d73400e9442d2e6234b3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.examples;
import org.apache.tez.common.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.KeyValueReader;
import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.api.Partitioner;
import org.apache.tez.runtime.library.cartesianproduct.CartesianProductConfig;
import org.apache.tez.runtime.library.cartesianproduct.CartesianProductEdgeManager;
import org.apache.tez.runtime.library.cartesianproduct.CartesianProductVertexManager;
import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
import org.apache.tez.runtime.library.processor.SimpleProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.StringTokenizer;
/**
* This DAG does cartesian product of two text inputs and then filters results according to the
* third text input.
*
* V1 V2 V3
* \ | /
* CP\ CP| / Broadcast
* \ | /
* Vertex 4
*
* Vertex 1~3 are tokenizers and each of them tokenizes input from one directory. In partitioned
* case, CustomPartitioner separates tokens into 2 partitions according to the parity of token's
* first char. Vertex 4 does cartesian product of input from vertex1 and vertex2, and generates
* KV pairs where keys are vertex 1 tokens and values are vertex 2 tokens. Then vertex 4 outputs KV
* pairs whose keys appears in vertex 3 tokens.
*/
public class CartesianProduct extends TezExampleBase {
private static final String INPUT = "Input1";
private static final String OUTPUT = "Output";
private static final String VERTEX1 = "Vertex1";
private static final String VERTEX2 = "Vertex2";
private static final String VERTEX3 = "Vertex3";
private static final String VERTEX4 = "Vertex4";
private static final String PARTITIONED = "-partitioned";
private static final String UNPARTITIONED = "-unpartitioned";
private static final Logger LOG = LoggerFactory.getLogger(CartesianProduct.class);
private static final int numPartition = 2;
private static final String[] cpSources = new String[] {VERTEX1, VERTEX2};
public static class TokenProcessor extends SimpleProcessor {
public TokenProcessor(ProcessorContext context) {
super(context);
}
@Override
public void run() throws Exception {
Preconditions.checkArgument(getInputs().size() == 1);
Preconditions.checkArgument(getOutputs().size() == 1);
KeyValueReader kvReader = (KeyValueReader) getInputs().get(INPUT).getReader();
KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(VERTEX4).getWriter();
while (kvReader.next()) {
StringTokenizer itr = new StringTokenizer(kvReader.getCurrentValue().toString());
while (itr.hasMoreTokens()) {
kvWriter.write(new Text(itr.nextToken()), new IntWritable(1));
}
}
}
}
public static class JoinProcessor extends SimpleMRProcessor {
public JoinProcessor(ProcessorContext context) {
super(context);
}
@Override
public void run() throws Exception {
KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(OUTPUT).getWriter();
KeyValueReader kvReader1 = (KeyValueReader) getInputs().get(VERTEX1).getReader();
KeyValueReader kvReader2 = (KeyValueReader) getInputs().get(VERTEX2).getReader();
KeyValueReader kvReader3 = (KeyValueReader) getInputs().get(VERTEX3).getReader();
Set<String> v2TokenSet = new HashSet<>();
Set<String> v3TokenSet = new HashSet<>();
while (kvReader2.next()) {
v2TokenSet.add(kvReader2.getCurrentKey().toString());
}
while (kvReader3.next()) {
v3TokenSet.add(kvReader3.getCurrentKey().toString());
}
while (kvReader1.next()) {
String left = kvReader1.getCurrentKey().toString();
if (v3TokenSet.contains(left)) {
for (String right : v2TokenSet) {
kvWriter.write(left, right);
}
}
}
}
}
public static class CustomPartitioner implements Partitioner {
@Override
public int getPartition(Object key, Object value, int numPartitions) {
return key.toString().charAt(0) % numPartition;
}
}
private DAG createDAG(TezConfiguration tezConf, String inputPath1, String inputPath2,
String inputPath3, String outputPath, boolean isPartitioned)
throws IOException {
Vertex v1 = Vertex.create(VERTEX1, ProcessorDescriptor.create(TokenProcessor.class.getName()));
// turn off groupSplit so that each input file incurs one task
v1.addDataSource(INPUT,
MRInput.createConfigBuilder(new Configuration(tezConf), TextInputFormat.class, inputPath1)
.groupSplits(false).build());
Vertex v2 = Vertex.create(VERTEX2, ProcessorDescriptor.create(TokenProcessor.class.getName()));
v2.addDataSource(INPUT,
MRInput.createConfigBuilder(new Configuration(tezConf), TextInputFormat.class, inputPath2)
.groupSplits(false).build());
Vertex v3 = Vertex.create(VERTEX3, ProcessorDescriptor.create(TokenProcessor.class.getName()));
v3.addDataSource(INPUT,
MRInput.createConfigBuilder(new Configuration(tezConf), TextInputFormat.class, inputPath3)
.groupSplits(false).build());
CartesianProductConfig cartesianProductConfig;
if (isPartitioned) {
Map<String, Integer> vertexPartitionMap = new HashMap<>();
for (String vertex : cpSources) {
vertexPartitionMap.put(vertex, numPartition);
}
cartesianProductConfig = new CartesianProductConfig(vertexPartitionMap);
} else {
cartesianProductConfig = new CartesianProductConfig(Arrays.asList(cpSources));
}
UserPayload userPayload = cartesianProductConfig.toUserPayload(tezConf);
Vertex v4 = Vertex.create(VERTEX4, ProcessorDescriptor.create(JoinProcessor.class.getName()));
v4.addDataSink(OUTPUT,
MROutput.createConfigBuilder(new Configuration(tezConf), TextOutputFormat.class, outputPath)
.build());
v4.setVertexManagerPlugin(
VertexManagerPluginDescriptor.create(CartesianProductVertexManager.class.getName())
.setUserPayload(userPayload));
EdgeManagerPluginDescriptor cpEdgeManager =
EdgeManagerPluginDescriptor.create(CartesianProductEdgeManager.class.getName());
cpEdgeManager.setUserPayload(userPayload);
EdgeProperty cpEdgeProperty;
if (isPartitioned) {
UnorderedPartitionedKVEdgeConfig cpEdgeConf =
UnorderedPartitionedKVEdgeConfig.newBuilder(Text.class.getName(),
IntWritable.class.getName(), CustomPartitioner.class.getName()).build();
cpEdgeProperty = cpEdgeConf.createDefaultCustomEdgeProperty(cpEdgeManager);
} else {
UnorderedKVEdgeConfig edgeConf =
UnorderedKVEdgeConfig.newBuilder(Text.class.getName(), IntWritable.class.getName()).build();
cpEdgeProperty = edgeConf.createDefaultCustomEdgeProperty(cpEdgeManager);
}
EdgeProperty broadcastEdgeProperty;
UnorderedKVEdgeConfig broadcastEdgeConf =
UnorderedKVEdgeConfig.newBuilder(Text.class.getName(), IntWritable.class.getName()).build();
broadcastEdgeProperty = broadcastEdgeConf.createDefaultBroadcastEdgeProperty();
return DAG.create("CartesianProduct")
.addVertex(v1).addVertex(v2).addVertex(v3).addVertex(v4)
.addEdge(Edge.create(v1, v4, cpEdgeProperty))
.addEdge(Edge.create(v2, v4, cpEdgeProperty))
.addEdge(Edge.create(v3, v4, broadcastEdgeProperty));
}
@Override
protected void printUsage() {
System.err.println("Usage: args: ["+PARTITIONED + "|" + UNPARTITIONED
+ " <input_dir1> <input_dir2> <input_dir3> <output_dir>");
}
@Override
protected int validateArgs(String[] otherArgs) {
return (otherArgs.length != 5 || (!otherArgs[0].equals(PARTITIONED)
&& !otherArgs[0].equals(UNPARTITIONED))) ? -1 : 0;
}
@Override
protected int runJob(String[] args, TezConfiguration tezConf,
TezClient tezClient) throws Exception {
DAG dag = createDAG(tezConf, args[1], args[2],
args[3], args[4], args[0].equals(PARTITIONED));
return runDag(dag, isCountersLog(), LOG);
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new CartesianProduct(), args);
System.exit(res);
}
}