blob: d096f32158e14bdaf5c9bedd58e4749887772667 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.examples;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.common.io.NonSyncByteArrayInputStream;
import org.apache.tez.common.io.NonSyncByteArrayOutputStream;
import org.apache.tez.common.io.NonSyncDataOutputStream;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.common.Preconditions;
public class JoinDataGen extends TezExampleBase {
private static final Logger LOG = LoggerFactory.getLogger(JoinDataGen.class);
private static final String STREAM_OUTPUT_NAME = "streamoutput";
private static final String HASH_OUTPUT_NAME = "hashoutput";
private static final String EXPECTED_OUTPUT_NAME = "expectedoutput";
public static void main(String[] args) throws Exception {
JoinDataGen dataGen = new JoinDataGen();
int status = ToolRunner.run(new Configuration(), dataGen, args);
System.exit(status);
}
@Override
protected void printUsage() {
System.err
.println("Usage: "
+ "joindatagen <outPath1> <path1Size> <outPath2> <path2Size> <expectedResultPath> <numTasks>");
ToolRunner.printGenericCommandUsage(System.err);
}
@Override
protected int runJob(String[] args, TezConfiguration tezConf,
TezClient tezClient) throws Exception {
LOG.info("Running JoinDataGen");
String outDir1 = args[0];
long outDir1Size = Long.parseLong(args[1]);
String outDir2 = args[2];
long outDir2Size = Long.parseLong(args[3]);
String expectedOutputDir = args[4];
int numTasks = Integer.parseInt(args[5]);
Path largeOutPath = null;
Path smallOutPath = null;
long largeOutSize = 0;
long smallOutSize = 0;
if (outDir1Size >= outDir2Size) {
largeOutPath = new Path(outDir1);
largeOutSize = outDir1Size;
smallOutPath = new Path(outDir2);
smallOutSize = outDir2Size;
} else {
largeOutPath = new Path(outDir2);
largeOutSize = outDir2Size;
smallOutPath = new Path(outDir1);
smallOutSize = outDir1Size;
}
Path expectedOutputPath = new Path(expectedOutputDir);
// Verify output path existence
int res = 0;
res = checkOutputDirectory(tezConf, largeOutPath)
+ checkOutputDirectory(tezConf, smallOutPath)
+ checkOutputDirectory(tezConf, expectedOutputPath);
if (res != 0) {
return 3;
}
if (numTasks <= 0) {
System.err.println("NumTasks must be > 0");
return 4;
}
DAG dag = createDag(tezConf, largeOutPath, smallOutPath, expectedOutputPath, numTasks,
largeOutSize, smallOutSize);
return runDag(dag, isCountersLog(), LOG);
}
@Override
protected int validateArgs(String[] otherArgs) {
if (otherArgs.length != 6) {
return 2;
}
return 0;
}
private DAG createDag(TezConfiguration tezConf, Path largeOutPath, Path smallOutPath,
Path expectedOutputPath, int numTasks, long largeOutSize, long smallOutSize)
throws IOException {
long largeOutSizePerTask = largeOutSize / numTasks;
long smallOutSizePerTask = smallOutSize / numTasks;
DAG dag = DAG.create("JoinDataGen");
Vertex genDataVertex = Vertex.create("datagen", ProcessorDescriptor.create(
GenDataProcessor.class.getName()).setUserPayload(
UserPayload.create(ByteBuffer.wrap(GenDataProcessor.createConfiguration(largeOutSizePerTask,
smallOutSizePerTask)))), numTasks);
genDataVertex.addDataSink(STREAM_OUTPUT_NAME,
MROutput.createConfigBuilder(new Configuration(tezConf),
TextOutputFormat.class, largeOutPath.toUri().toString()).build());
genDataVertex.addDataSink(HASH_OUTPUT_NAME,
MROutput.createConfigBuilder(new Configuration(tezConf),
TextOutputFormat.class, smallOutPath.toUri().toString()).build());
genDataVertex.addDataSink(EXPECTED_OUTPUT_NAME,
MROutput.createConfigBuilder(new Configuration(tezConf),
TextOutputFormat.class, expectedOutputPath.toUri().toString()).build());
dag.addVertex(genDataVertex);
return dag;
}
public static class GenDataProcessor extends SimpleMRProcessor {
private static final Logger LOG = LoggerFactory.getLogger(GenDataProcessor.class);
long streamOutputFileSize;
long hashOutputFileSize;
float overlapApprox = 0.2f;
public GenDataProcessor(ProcessorContext context) {
super(context);
}
public static byte[] createConfiguration(long streamOutputFileSize, long hashOutputFileSize)
throws IOException {
NonSyncByteArrayOutputStream bos = new NonSyncByteArrayOutputStream();
NonSyncDataOutputStream dos = new NonSyncDataOutputStream(bos);
dos.writeLong(streamOutputFileSize);
dos.writeLong(hashOutputFileSize);
dos.close();
bos.close();
return bos.toByteArray();
}
@Override
public void initialize() throws Exception {
byte[] payload = getContext().getUserPayload().deepCopyAsArray();
NonSyncByteArrayInputStream bis = new NonSyncByteArrayInputStream(payload);
DataInputStream dis = new DataInputStream(bis);
streamOutputFileSize = dis.readLong();
hashOutputFileSize = dis.readLong();
LOG.info("Initialized with largeFileTargetSize=" + streamOutputFileSize
+ ", smallFileTragetSize=" + hashOutputFileSize);
dis.close();
bis.close();
}
@Override
public void run() throws Exception {
Preconditions.checkState(getInputs().size() == 0);
Preconditions.checkState(getOutputs().size() == 3);
KeyValueWriter streamOutputWriter = (KeyValueWriter) getOutputs().get(STREAM_OUTPUT_NAME)
.getWriter();
KeyValueWriter hashOutputWriter = (KeyValueWriter) getOutputs().get(HASH_OUTPUT_NAME)
.getWriter();
KeyValueWriter expectedOutputWriter = (KeyValueWriter) getOutputs().get(EXPECTED_OUTPUT_NAME)
.getWriter();
float fileSizeFraction = hashOutputFileSize / (float) streamOutputFileSize;
Preconditions.checkState(fileSizeFraction > 0.0f && fileSizeFraction <= 1.0f);
int mod = 1;
int extraKeysMod = 0;
if (fileSizeFraction > overlapApprox) {
// Common keys capped by overlap. Additional ones required in the hashFile.
mod = (int) (1 / overlapApprox);
extraKeysMod = (int) (1 / (fileSizeFraction - overlapApprox));
} else {
// All keys in hashFile must exist in stream file.
mod = (int) (1 / fileSizeFraction);
}
LOG.info("Using mod=" + mod + ", extraKeysMod=" + extraKeysMod);
long count = 0;
long sizeLarge = 0;
long sizeSmall = 0;
long numLargeFileKeys = 0;
long numSmallFileKeys = 0;
long numExpectedKeys = 0;
while (sizeLarge < streamOutputFileSize) {
String str = createOverlapString(13, count);
Text text = new Text(str);
int size = text.getLength();
streamOutputWriter.write(text, NullWritable.get());
sizeLarge += size;
numLargeFileKeys++;
if (count % mod == 0) {
hashOutputWriter.write(text, NullWritable.get());
sizeSmall += size;
numSmallFileKeys++;
expectedOutputWriter.write(text, NullWritable.get());
numExpectedKeys++;
}
if (extraKeysMod != 0 && count % extraKeysMod == 0) {
String nStr = createNonOverlaptring(13, count);
Text nText = new Text(nStr);
hashOutputWriter.write(nText, NullWritable.get());
sizeSmall += nText.getLength();
numSmallFileKeys++;
}
count++;
}
LOG.info("OutputStats: " + "largeFileNumKeys=" + numLargeFileKeys + ", smallFileNumKeys="
+ numSmallFileKeys + ", expFileNumKeys=" + numExpectedKeys + ", largeFileSize="
+ sizeLarge + ", smallFileSize=" + sizeSmall);
}
private String createOverlapString(int size, long count) {
StringBuilder sb = new StringBuilder();
Random random = new Random();
for (int i = 0; i < size; i++) {
int r = random.nextInt(Integer.MAX_VALUE) % 26;
// Random a-z followed by the count
sb.append((char) (97 + r));
}
sb.append("_").append(getContext().getTaskIndex()).append("_").append(count);
return sb.toString();
}
private String createNonOverlaptring(int size, long count) {
StringBuilder sb = new StringBuilder();
Random random = new Random();
for (int i = 0; i < size; i++) {
int r = random.nextInt(Integer.MAX_VALUE) % 26;
// Random A-Z followed by the count
sb.append((char) (65 + r));
}
sb.append("_").append(getContext().getTaskIndex()).append("_").append(count);
return sb.toString();
}
}
private int checkOutputDirectory(Configuration conf, Path path) throws IOException {
FileSystem fs = path.getFileSystem(conf);
path = fs.makeQualified(path);
if (fs.exists(path)) {
System.err.println("Output directory: " + path + " already exists");
return 2;
}
return 0;
}
}