blob: 5953f8f54b1f7db248d22dd1e1a431820e572cd1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test.pigmix.datagen;
import java.io.*;
import java.lang.SecurityException;
import java.text.ParseException;
import java.util.*;
import sdsu.algorithms.data.Zipf;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.tools.cmdline.CmdLineParser;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
/**
* A tool to generate data for performance testing.
*/
public class DataGenerator extends Configured implements Tool {
ColSpec[] colSpecs;
long seed = -1;
long numRows = -1;
int numMappers = -1;
String outputFile;
String inFile;
char separator = '\u0001' ;
Random rand;
private String[] mapkey = { "a", "b", "c", "d", "e", "f", "g", "h", "i",
"j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w",
"x", "y", "z"};
public static void main(String[] args) throws Exception {
DataGenerator dg = new DataGenerator();
try {
ToolRunner.run(new Configuration(), dg, args);
}catch(Exception e) {
throw new IOException (e);
}
dg.go();
}
protected DataGenerator(long seed) {
System.out.println("Using seed " + seed);
rand = new Random(seed);
}
protected DataGenerator() {
}
protected DataGenerator(String[] args) {
}
public int run(String[] args) throws Exception {
CmdLineParser opts = new CmdLineParser(args);
opts.registerOpt('e', "seed", CmdLineParser.ValueExpected.REQUIRED);
opts.registerOpt('f', "file", CmdLineParser.ValueExpected.REQUIRED);
opts.registerOpt('r', "rows", CmdLineParser.ValueExpected.REQUIRED);
opts.registerOpt('s', "separator", CmdLineParser.ValueExpected.REQUIRED);
opts.registerOpt('i', "input", CmdLineParser.ValueExpected.REQUIRED);
opts.registerOpt('m', "mappers", CmdLineParser.ValueExpected.OPTIONAL);
char opt;
try {
while ((opt = opts.getNextOpt()) != CmdLineParser.EndOfOpts) {
switch (opt) {
case 'e':
seed = Long.valueOf(opts.getValStr());
break;
case 'f':
outputFile = opts.getValStr();
break;
case 'i':
inFile = opts.getValStr();
break;
case 'r':
numRows = Long.valueOf(opts.getValStr());
break;
case 's':
separator = opts.getValStr().charAt(0);
break;
case 'm':
numMappers = Integer.valueOf(opts.getValStr());
break;
default:
usage();
break;
}
}
} catch (ParseException pe) {
System.err.println("Couldn't parse the command line arguments, " +
pe.getMessage());
usage();
}
if (numRows < 1 && inFile == null) usage();
if (numRows > 0 && inFile != null) usage();
if (numMappers > 0 && seed != -1) usage();
if (seed == -1){
seed = System.currentTimeMillis();
}
String remainders[] = opts.getRemainingArgs();
colSpecs = new ColSpec[remainders.length];
for (int i = 0; i < remainders.length; i++) {
colSpecs[i] = new ColSpec(remainders[i]);
}
System.err.println("Using seed " + seed);
rand = new Random(seed);
return 0;
}
private void go() throws IOException {
long t1 = System.currentTimeMillis();
if (numMappers <= 0) {
System.out.println("Generate data in local mode.");
goLocal();
}else{
System.out.println("Generate data in hadoop mode.");
HadoopRunner runner = new HadoopRunner();
runner.goHadoop();
}
long t2 = System.currentTimeMillis();
System.out.println("Job is successful! It took " + (t2-t1)/1000 + " seconds.");
}
public void goLocal() throws IOException {
PrintWriter out = null;
try {
out = new PrintWriter(outputFile);
} catch (FileNotFoundException fnfe) {
System.err.println("Could not find file " + outputFile +
", " + fnfe.getMessage());
return;
} catch (SecurityException se) {
System.err.println("Could not write to file " + outputFile +
", " + se.getMessage());
return;
}
BufferedReader in = null;
if (inFile != null) {
try {
in = new BufferedReader(new FileReader(inFile));
} catch (FileNotFoundException fnfe) {
System.err.println("Unable to find input file " + inFile);
return;
}
}
if (numRows > 0) {
for (int i = 0; i < numRows; i++) {
writeLine(out);
out.println();
}
} else if (in != null) {
String line;
while ((line = in.readLine()) != null) {
out.print(line);
writeLine(out);
out.println();
}
}
out.close();
}
protected void writeLine(PrintWriter out) {
for (int j = 0; j < colSpecs.length; j++) {
if (j != 0) out.print(separator);
// First, decide if it's going to be null
if (rand.nextInt(100) < colSpecs[j].pctNull) {
continue;
}
writeCol(colSpecs[j], out);
}
}
private void writeCol(ColSpec colspec, PrintWriter out) {
switch (colspec.datatype) {
case INT:
out.print(colspec.nextInt());
break;
case LONG:
out.print(colspec.nextLong());
break;
case FLOAT:
out.print(colspec.nextFloat());
break;
case DOUBLE:
out.print(colspec.nextDouble());
break;
case STRING:
out.print(colspec.nextString());
break;
case MAP:
int len = rand.nextInt(20) + 6;
for (int k = 0; k < len; k++) {
if (k != 0) out.print('');
out.print(mapkey[k] + '');
out.print(colspec.gen.randomString());
}
break;
case BAG:
int numElements = rand.nextInt(5) + 5;
for (int i = 0; i < numElements; i++) {
if (i != 0) out.print('');
switch(colspec.contained.datatype) {
case INT: out.print("i"); break;
case LONG: out.print("l"); break;
case FLOAT: out.print("f"); break;
case DOUBLE: out.print("d"); break;
case STRING: out.print("s"); break;
case MAP: out.print("m"); break;
case BAG: out.print("b"); break;
default: throw new RuntimeException("should never be here");
}
writeCol(colspec.contained, out);
}
}
}
private void usage() {
System.err.println("Usage: datagen -rows numrows [options] colspec ...");
System.err.println("\tOptions:");
System.err.println("\t-e -seed seed value for random numbers");
System.err.println("\t-f -file output file, default is stdout");
System.err.println("\t-i -input input file, lines will be read from");
System.err.println("\t\tthe file and additional columns appended.");
System.err.println("\t\tMutually exclusive with -r.");
System.err.println("\t-r -rows number of rows to output");
System.err.println("\t-s -separator character, default is ^A");
System.err.println("\t-m -number of mappers to run concurrently to generate data. " +
"If not specified, DataGenerator runs locally. This option can NOT be used with -e.");
System.err.println();
System.err.print("\tcolspec: columntype:average_size:cardinality:");
System.err.println("distribution_type:percent_null");
System.err.println("\tcolumntype:");
System.err.println("\t\ti = int");
System.err.println("\t\tl = long");
System.err.println("\t\tf = float");
System.err.println("\t\td = double");
System.err.println("\t\ts = string");
System.err.println("\t\tm = map");
System.err.println("\t\tbx = bag of x, where x is a columntype");
System.err.println("\tdistribution_type:");
System.err.println("\t\tu = uniform");
System.err.println("\t\tz = zipf");
throw new RuntimeException();
}
static enum Datatype { INT, LONG, FLOAT, DOUBLE, STRING, MAP, BAG };
static enum DistributionType { UNIFORM, ZIPF };
protected class ColSpec {
String arg;
Datatype datatype;
DistributionType distype;
int avgsz;
int card;
RandomGenerator gen;
int pctNull;
ColSpec contained;
String mapfile;
Map<Integer, Object> map;
public ColSpec(String arg) {
this.arg = arg;
String[] parts = arg.split(":");
if (parts.length != 5 && parts.length != 6) {
System.err.println("Colspec [" + arg + "] format incorrect");
usage();
}
switch (parts[0].charAt(0)) {
case 'i': datatype = Datatype.INT; break;
case 'l': datatype = Datatype.LONG; break;
case 'f': datatype = Datatype.FLOAT; break;
case 'd': datatype = Datatype.DOUBLE; break;
case 's': datatype = Datatype.STRING; break;
case 'm': datatype = Datatype.MAP; break;
case 'b':
datatype = Datatype.BAG;
contained = new ColSpec(arg.substring(1));
return;
default:
System.err.println("Don't know column type " +
parts[0].charAt(0));
usage();
break;
}
avgsz = Integer.valueOf(parts[1]);
card = Integer.valueOf(parts[2]);
switch (parts[3].charAt(0)) {
case 'u':
gen = new UniformRandomGenerator(avgsz, card);
distype = DistributionType.UNIFORM;
break;
case 'z':
gen = new ZipfRandomGenerator(avgsz, card);
distype = DistributionType.ZIPF;
break;
default:
System.err.println("Don't know generator type " +
parts[3].charAt(0));
usage();
break;
}
pctNull = Integer.valueOf(parts[4]);
if (pctNull > 100) {
System.err.println("Percentage null must be between 0-100, "
+ "you gave" + pctNull);
usage();
}
contained = null;
// if config has 6 columns, the last col is the file name
// of the mapping file from random number to field value
if (parts.length == 6) {
mapfile = parts[5];
gen.hasMapFile = true;
}
map = new HashMap<Integer, Object>();
}
public int nextInt() {
return gen.nextInt(map);
}
public long nextLong() {
return gen.nextInt(map);
}
public double nextDouble() {
return gen.nextDouble(map);
}
public float nextFloat() {
return gen.nextFloat(map);
}
public String nextString() {
return gen.nextString(map);
}
}
abstract class RandomGenerator {
protected int avgsz;
protected boolean hasMapFile; // indicating whether a map file from
// random number to the field value is pre-defined
abstract public int nextInt(Map<Integer, Object> map);
abstract public long nextLong(Map<Integer, Object> map);
abstract public float nextFloat(Map<Integer, Object> map);
abstract public double nextDouble(Map<Integer, Object> map);
abstract public String nextString(Map<Integer, Object> map);
public String randomString() {
int var = (int)((double)avgsz * 0.3);
StringBuffer sb = new StringBuffer(avgsz + var);
if (var < 1) var = 1;
int len = rand.nextInt(2 * var) + avgsz - var;
for (int i = 0; i < len; i++) {
int n = rand.nextInt(122 - 65) + 65;
sb.append(Character.toChars(n));
}
return sb.toString();
}
public float randomFloat() {
return rand.nextFloat() * rand.nextInt();
}
public double randomDouble() {
return rand.nextDouble() * rand.nextInt();
}
}
class UniformRandomGenerator extends RandomGenerator {
int card;
public UniformRandomGenerator(int a, int c) {
avgsz = a;
card = c;
}
public int nextInt(Map<Integer, Object> map) {
return rand.nextInt(card);
}
public long nextLong(Map<Integer, Object> map) {
return rand.nextLong() % card;
}
public float nextFloat(Map<Integer, Object> map) {
int seed = rand.nextInt(card);
Float f = (Float)map.get(seed);
if (f == null) {
if (!hasMapFile) {
f = randomFloat();
map.put(seed, f);
}else{
throw new IllegalStateException("Number " + seed + " is not found in map file");
}
}
return f;
}
public double nextDouble(Map<Integer, Object> map) {
int seed = rand.nextInt(card);
Double d = (Double)map.get(seed);
if (d == null) {
if (!hasMapFile) {
d = randomDouble();
map.put(seed, d);
}else{
throw new IllegalStateException("Number " + seed + " is not found in map file");
}
}
return d;
}
public String nextString(Map<Integer, Object> map) {
int seed = rand.nextInt(card);
String s = (String)map.get(seed);
if (s == null) {
if (!hasMapFile) {
s = randomString();
map.put(seed, s);
}else{
throw new IllegalStateException("Number " + seed + " is not found in map file");
}
}
return s;
}
}
class ZipfRandomGenerator extends RandomGenerator {
Zipf z;
public ZipfRandomGenerator(int a, int c) {
avgsz = a;
z = new Zipf(c);
}
// the Zipf library returns a random number [1..cardinality], so we substract by 1
// to get [0..cardinality)
// the randome number returned by zipf library is an integer, but converted into double
private double next() {
return z.nextElement()-1;
}
public int nextInt(Map<Integer, Object> map) {
return (int)next();
}
public long nextLong(Map<Integer, Object> map) {
return (long)next();
}
public float nextFloat(Map<Integer, Object> map) {
int seed = (int)next();
Float d = (Float)map.get(seed);
if (d == null) {
if (!hasMapFile) {
d = randomFloat();
map.put(seed, d);
}else{
throw new IllegalStateException("Number " + seed + " is not found in map file");
}
}
return d;
}
public double nextDouble(Map<Integer, Object> map) {
int seed = (int)next();
Double d = (Double)map.get(seed);
if (d == null) {
if (!hasMapFile) {
d = randomDouble();
map.put(seed, d);
}else{
throw new IllegalStateException("Number " + seed + " is not found in map file");
}
}
return d;
}
public String nextString(Map<Integer, Object> map) {
int seed = (int)next();
String s = (String)map.get(seed);
if (s == null) {
if (!hasMapFile) {
s = randomString();
map.put(seed, s);
}else{
throw new IllegalStateException("Number " + seed + " is not found in map file");
}
}
return s;
}
}
// launch hadoop job
class HadoopRunner {
Random r;
FileSystem fs;
Path tmpHome;
public HadoopRunner() {
r = new Random();
}
public void goHadoop() throws IOException {
// Configuration processed by ToolRunner
Configuration conf = getConf();
// Create a JobConf using the processed conf
JobConf job = new JobConf(conf);
fs = FileSystem.get(job);
tmpHome = createTempDir(null);
String config = genMapFiles().toUri().getRawPath();
// set config properties into job conf
job.set("fieldconfig", config);
job.set("separator", String.valueOf((int)separator));
job.setJobName("data-gen");
job.setNumMapTasks(numMappers);
job.setNumReduceTasks(0);
job.setMapperClass(DataGenMapper.class);
job.setJarByClass(DataGenMapper.class);
// if inFile is specified, use it as input
if (inFile != null) {
FileInputFormat.setInputPaths(job, inFile);
job.set("hasinput", "true");
} else {
job.set("hasinput", "false");
Path input = genInputFiles();
FileInputFormat.setInputPaths(job, input);
}
FileOutputFormat.setOutputPath(job, new Path(outputFile));
// Submit the job, then poll for progress until the job is complete
System.out.println("Submit hadoop job...");
RunningJob j = JobClient.runJob(job);
if (!j.isSuccessful()) {
throw new IOException("Job failed");
}
if (fs.exists(tmpHome)) {
fs.delete(tmpHome, true);
}
}
private Path genInputFiles() throws IOException {
long avgRows = numRows/numMappers;
// create a temp directory as mappers input
Path input = createTempDir(tmpHome);
System.out.println("Generating input files into " + input.toString());
long rowsLeft = numRows;
// create one input file per mapper, which contains
// the number of rows
for(int i=0; i<numMappers; i++) {
Object[] tmp = createTempFile(input, false);
PrintWriter pw = new PrintWriter((OutputStream)tmp[1]);
if (i < numMappers-1) {
pw.println(avgRows);
}else{
// last mapper takes all the rows left
pw.println(rowsLeft);
}
pw.close();
rowsLeft -= avgRows;
}
return input;
}
// generate map files for all the fields that need to pre-generate map files
// return a config file which contains config info for each field, including
// the path to their map file
private Path genMapFiles() throws IOException {
Object[] tmp = createTempFile(tmpHome, false);
System.out.println("Generating column config file in " + tmp[0].toString());
PrintWriter pw = new PrintWriter((OutputStream)tmp[1]);
for(int i=0; i<colSpecs.length; i++) {
DataGenerator.Datatype datatype = colSpecs[i].datatype;
pw.print(colSpecs[i].arg);
if ( datatype == DataGenerator.Datatype.FLOAT || datatype == DataGenerator.Datatype.DOUBLE ||
datatype == DataGenerator.Datatype.STRING) {
Path p = genMapFile(colSpecs[i]);
pw.print(':');
pw.print(p.toUri().getRawPath());
}
pw.println();
}
pw.close();
return (Path)tmp[0];
}
// genereate a map file between random number to field value
// return the path of the map file
private Path genMapFile(DataGenerator.ColSpec col) throws IOException {
int card = col.card;
Object[] tmp = createTempFile(tmpHome, false);
System.out.println("Generating mapping file for column " + col.arg + " into " + tmp[0].toString());
PrintWriter pw = new PrintWriter((OutputStream)tmp[1]);
HashSet<Object> hash = new HashSet<Object>(card);
for(int i=0; i<card; i++) {
pw.print(i);
pw.print("\t");
Object next = null;
do {
if (col.datatype == DataGenerator.Datatype.DOUBLE) {
next = col.gen.randomDouble();
}else if (col.datatype == DataGenerator.Datatype.FLOAT) {
next = col.gen.randomFloat();
}else if (col.datatype == DataGenerator.Datatype.STRING) {
next = col.gen.randomString();
}
}while(hash.contains(next));
hash.add(next);
pw.println(next);
if ( (i>0 && i%300000 == 0) || i == card-1 ) {
System.out.println("processed " + i*100/card + "%." );
pw.flush();
}
}
pw.close();
return (Path)tmp[0];
}
private Path createTempDir(Path parentDir) throws IOException {
Object[] obj = createTempFile(parentDir, true);
return (Path)obj[0];
}
private Object[] createTempFile(Path parentDir, boolean isDir) throws IOException {
Path tmp_home = parentDir;
if (tmp_home == null) {
tmp_home = new Path(fs.getHomeDirectory(), "tmp");
}
if (!fs.exists(tmp_home)) {
fs.mkdirs(tmp_home);
}
int id = r.nextInt();
Path f = new Path(tmp_home, "tmp" + id);
while (fs.exists(f)) {
id = r.nextInt();
f = new Path(tmp_home, "tmp" + id);
}
// return a 2-element array. first element is PATH,
// second element is OutputStream
Object[] r = new Object[2];
r[0] = f;
if (!isDir) {
r[1] = fs.create(f);
}else{
fs.mkdirs(f);
}
return r;
}
}
public static class DataGenMapper extends MapReduceBase implements Mapper<LongWritable, Text, String, String> {
private JobConf jobConf;
private DataGenerator dg;
private boolean hasInput;
public void configure(JobConf jobconf) {
this.jobConf = jobconf;
int id = Integer.parseInt(jobconf.get(MRConfiguration.TASK_PARTITION));
long time = System.currentTimeMillis() - id*3600*24*1000;
dg = new DataGenerator( ((time-id*3600*24*1000) | (id << 48)));
dg.separator = (char)Integer.parseInt(jobConf.get("separator"));
if (jobConf.get("hasinput").equals("true")) {
hasInput = true;
}
String config = jobConf.get("fieldconfig");
try {
FileSystem fs = FileSystem.get(jobconf);
// load in config file for each column
BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(new Path(config))));
String line = null;
List<DataGenerator.ColSpec> cols = new ArrayList<DataGenerator.ColSpec>();
while((line = reader.readLine()) != null) {
cols.add(dg.new ColSpec(line));
}
reader.close();
dg.colSpecs = cols.toArray(new DataGenerator.ColSpec[0]);
// load in mapping files
for(int i=0; i<dg.colSpecs.length; i++) {
DataGenerator.ColSpec col = dg.colSpecs[i];
if (col.mapfile != null) {
reader = new BufferedReader(new InputStreamReader(fs.open(new Path(col.mapfile))));
Map<Integer, Object> map = dg.colSpecs[i].map;
while((line = reader.readLine()) != null) {
String[] fields = line.split("\t");
int key = Integer.parseInt(fields[0]);
if (col.datatype == DataGenerator.Datatype.DOUBLE) {
map.put(key, Double.parseDouble(fields[1]));
}else if (col.datatype == DataGenerator.Datatype.FLOAT) {
map.put(key, Float.parseFloat(fields[1]));
}else {
map.put(key, fields[1]);
}
}
reader.close();
}
}
}catch(IOException e) {
throw new RuntimeException("Failed to load config file. " + e);
}
}
public void map(LongWritable key, Text value, OutputCollector<String, String> output, Reporter reporter) throws IOException {
int intialsz = dg.colSpecs.length * 50;
if (!hasInput) {
long numRows = Long.parseLong(value.toString().trim());
dg.numRows = numRows;
for (int i = 0; i < numRows; i++) {
StringWriter str = new StringWriter(intialsz);
PrintWriter pw = new PrintWriter(str);
dg.writeLine(pw);
output.collect(null, str.toString());
if ((i+1) % 10000 == 0) {
reporter.progress();
reporter.setStatus("" + (i+1) + " tuples generated.");
}
}
} else {
StringWriter str = new StringWriter(intialsz);
PrintWriter pw = new PrintWriter(str);
pw.write(value.toString());
dg.writeLine(pw);
output.collect(null, str.toString());
}
}
}
}