blob: 03bb0519a07a7ecfe01fec392b57c1fe21a029ee [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.mapreduce.examples;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.util.ToolRunner;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.examples.TezExampleBase;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.KeyValueReader;
import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
import org.apache.tez.runtime.library.output.UnorderedKVOutput;
import org.apache.tez.runtime.library.processor.SimpleProcessor;
import org.apache.tez.common.Preconditions;
public class BroadcastLoadGen extends TezExampleBase {
private static final Logger LOG = LoggerFactory.getLogger(RPCLoadGen.class);
public static class InputGenProcessor extends SimpleProcessor {
final int bytesToGenerate;
public InputGenProcessor(ProcessorContext context) {
super(context);
bytesToGenerate = context.getUserPayload().getPayload().getInt(0);
}
@Override
public void run() throws Exception {
Random random = new Random();
Preconditions.checkArgument(getOutputs().size() == 1);
LogicalOutput out = getOutputs().values().iterator().next();
if (out instanceof UnorderedKVOutput) {
UnorderedKVOutput output = (UnorderedKVOutput) out;
KeyValueWriter kvWriter = output.getWriter();
int approxNumInts = bytesToGenerate / 6;
for (int i = 0 ; i < approxNumInts ; i++) {
kvWriter.write(NullWritable.get(), new IntWritable(random.nextInt()));
}
}
}
}
public static class InputFetchProcessor extends SimpleProcessor {
public InputFetchProcessor(ProcessorContext context) {
super(context);
}
@Override
public void run() throws Exception {
Preconditions.checkArgument(inputs.size() == 1);
KeyValueReader broadcastKvReader = (KeyValueReader) getInputs().values().iterator().next().getReader();
long sum = 0;
int count = 0;
while (broadcastKvReader.next()) {
count++;
sum += ((IntWritable) broadcastKvReader.getCurrentValue()).get();
}
System.err.println("Count = " + getContext().getTaskIndex() + " * " + count + ", Sum=" + sum);
}
}
private DAG createDAG(int numGenTasks, int totalSourceDataSize, int numFetcherTasks) {
int bytesPerSource = totalSourceDataSize / numGenTasks;
LOG.info("DataPerSourceTask(bytes)=" + bytesPerSource);
ByteBuffer payload = ByteBuffer.allocate(4);
payload.putInt(0, bytesPerSource);
Vertex broadcastVertex = Vertex.create("DataGen",
ProcessorDescriptor.create(InputGenProcessor.class.getName())
.setUserPayload(UserPayload.create(payload)), numGenTasks);
Vertex fetchVertex = Vertex.create("FetchVertex",
ProcessorDescriptor.create(InputFetchProcessor.class.getName()), numFetcherTasks);
UnorderedKVEdgeConfig edgeConf = UnorderedKVEdgeConfig.newBuilder(NullWritable.class
.getName(), IntWritable.class.getName()).setCompression(false, null, null).build();
DAG dag = DAG.create("BroadcastLoadGen");
dag.addVertex(broadcastVertex).addVertex(fetchVertex).addEdge(
Edge.create(broadcastVertex, fetchVertex, edgeConf.createDefaultBroadcastEdgeProperty()));
return dag;
}
@Override
protected final int runJob(String[] args, TezConfiguration tezConf, TezClient tezClient) throws
TezException, InterruptedException, IOException {
LOG.info("Running: " + this.getClass().getSimpleName() + StringUtils.join(args, " "));
int numSourceTasks = Integer.parseInt(args[0]);
int totalSourceData = Integer.parseInt(args[1]);
int numFetcherTasks = Integer.parseInt(args[2]);
LOG.info("Parameters: numSourceTasks=" + numSourceTasks + ", totalSourceDataSize(bytes)=" + totalSourceData +
", numFetcherTasks=" + numFetcherTasks);
DAG dag = createDAG(numSourceTasks, totalSourceData, numFetcherTasks);
return runDag(dag, false, LOG);
}
@Override
protected void printUsage() {
System.err.println(
"Usage: " + "BroadcastLoadGen <num_source_tasks> <total_source_data> <num_destination_tasks>");
ToolRunner.printGenericCommandUsage(System.err);
}
@Override
protected final int validateArgs(String[] otherArgs) {
return otherArgs.length != 3 ? 2 : 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new BroadcastLoadGen(), args);
System.exit(res);
}
}