blob: 71b8d5ccc67427864c45da03ecca4b2901a0b51c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.examples.mapreduce;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.examples.cli.ClientOpts;
import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.beust.jcommander.Parameter;
/**
* Generate the *almost* official terasort input data set. (See below) The user specifies the number
* of rows and the output directory and this class runs a map/reduce program to generate the data.
* The format of the data is:
* <ul>
* <li>(10 bytes key) (10 bytes rowid) (78 bytes filler) \r \n
* <li>The keys are random characters from the set ' ' .. '~'.
* <li>The rowid is the right justified row id as a int.
* <li>The filler consists of 7 runs of 10 characters from 'A' to 'Z'.
* </ul>
*
* This TeraSort is slightly modified to allow for variable length key sizes and value sizes. The
* row length isn't variable. To generate a terabyte of data in the same way TeraSort does use
* 10000000000 rows and 10/10 byte key length and 78/78 byte value length. Along with the 10 byte
* row id and \r\n this gives you 100 byte row * 10000000000 rows = 1tb. Min/Max ranges for key and
* value parameters are inclusive/inclusive respectively.
*/
public class TeraSortIngest {
private static final Logger log = LoggerFactory.getLogger(TeraSortIngest.class);
/**
* An input format that assigns ranges of longs to each mapper.
*/
static class RangeInputFormat extends InputFormat<LongWritable,NullWritable> {
/**
* An input split consisting of a range on numbers.
*/
static class RangeInputSplit extends InputSplit implements Writable {
long firstRow;
long rowCount;
public RangeInputSplit() {}
public RangeInputSplit(long offset, long length) {
firstRow = offset;
rowCount = length;
}
@Override
public long getLength() throws IOException {
return 0;
}
@Override
public String[] getLocations() throws IOException {
return new String[] {};
}
@Override
public void readFields(DataInput in) throws IOException {
firstRow = WritableUtils.readVLong(in);
rowCount = WritableUtils.readVLong(in);
}
@Override
public void write(DataOutput out) throws IOException {
WritableUtils.writeVLong(out, firstRow);
WritableUtils.writeVLong(out, rowCount);
}
}
/**
* A record reader that will generate a range of numbers.
*/
static class RangeRecordReader extends RecordReader<LongWritable,NullWritable> {
long startRow;
long finishedRows;
long totalRows;
public RangeRecordReader(RangeInputSplit split) {
startRow = split.firstRow;
finishedRows = 0;
totalRows = split.rowCount;
}
@Override
public void close() throws IOException {}
@Override
public float getProgress() throws IOException {
return finishedRows / (float) totalRows;
}
@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return new LongWritable(startRow + finishedRows);
}
@Override
public NullWritable getCurrentValue() throws IOException, InterruptedException {
return NullWritable.get();
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (finishedRows < totalRows) {
++finishedRows;
return true;
}
return false;
}
}
@Override
public RecordReader<LongWritable,NullWritable> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException {
return new RangeRecordReader((RangeInputSplit) split);
}
/**
* Create the desired number of splits, dividing the number of rows between the mappers.
*/
@Override
public List<InputSplit> getSplits(JobContext job) {
long totalRows = job.getConfiguration().getLong(NUMROWS, 0);
int numSplits = job.getConfiguration().getInt(NUMSPLITS, 1);
long rowsPerSplit = totalRows / numSplits;
log.info(
"Generating " + totalRows + " using " + numSplits + " maps with step of " + rowsPerSplit);
ArrayList<InputSplit> splits = new ArrayList<>(numSplits);
long currentRow = 0;
for (int split = 0; split < numSplits - 1; ++split) {
splits.add(new RangeInputSplit(currentRow, rowsPerSplit));
currentRow += rowsPerSplit;
}
splits.add(new RangeInputSplit(currentRow, totalRows - currentRow));
log.info("Done Generating.");
return splits;
}
}
private static final String NUMSPLITS = "terasort.overridesplits";
private static final String NUMROWS = "terasort.numrows";
static class RandomGenerator {
private long seed = 0;
private static final long mask32 = (1l << 32) - 1;
/**
* The number of iterations separating the precomputed seeds.
*/
private static final int seedSkip = 128 * 1024 * 1024;
/**
* The precomputed seed values after every seedSkip iterations. There should be enough values so
* that a 2**32 iterations are covered.
*/
private static final long[] seeds = new long[] {0L, 4160749568L, 4026531840L, 3892314112L,
3758096384L, 3623878656L, 3489660928L, 3355443200L, 3221225472L, 3087007744L, 2952790016L,
2818572288L, 2684354560L, 2550136832L, 2415919104L, 2281701376L, 2147483648L, 2013265920L,
1879048192L, 1744830464L, 1610612736L, 1476395008L, 1342177280L, 1207959552L, 1073741824L,
939524096L, 805306368L, 671088640L, 536870912L, 402653184L, 268435456L, 134217728L,};
/**
* Start the random number generator on the given iteration.
*
* @param initalIteration
* the iteration number to start on
*/
RandomGenerator(long initalIteration) {
int baseIndex = (int) ((initalIteration & mask32) / seedSkip);
seed = seeds[baseIndex];
for (int i = 0; i < initalIteration % seedSkip; ++i) {
next();
}
}
RandomGenerator() {
this(0);
}
long next() {
seed = (seed * 3141592621L + 663896637) & mask32;
return seed;
}
}
/**
* The Mapper class that given a row number, will generate the appropriate output line.
*/
public static class SortGenMapper extends Mapper<LongWritable,NullWritable,Text,Mutation> {
private Text tableName = null;
private int minkeylength = 0;
private int maxkeylength = 0;
private int minvaluelength = 0;
private int maxvaluelength = 0;
private final Text key = new Text();
private final Text value = new Text();
private RandomGenerator rand;
private byte[] keyBytes; // = new byte[12];
private final String spaces = " ";
private final byte[][] filler = new byte[26][];
{
for (int i = 0; i < 26; ++i) {
filler[i] = new byte[10];
for (int j = 0; j < 10; ++j) {
filler[i][j] = (byte) ('A' + i);
}
}
}
/**
* Add a random key to the text
*/
private final Random random = new Random();
private void addKey() {
int range = random.nextInt(maxkeylength - minkeylength + 1);
int keylen = range + minkeylength;
int keyceil = keylen + (4 - (keylen % 4));
keyBytes = new byte[keyceil];
long temp = 0;
for (int i = 0; i < keyceil / 4; i++) {
temp = rand.next() / 52;
keyBytes[3 + 4 * i] = (byte) (' ' + (temp % 95));
temp /= 95;
keyBytes[2 + 4 * i] = (byte) (' ' + (temp % 95));
temp /= 95;
keyBytes[1 + 4 * i] = (byte) (' ' + (temp % 95));
temp /= 95;
keyBytes[4 * i] = (byte) (' ' + (temp % 95));
}
key.set(keyBytes, 0, keylen);
}
/**
* Add the rowid to the row.
*/
private String getRowIdString(long rowId) {
StringBuilder paddedRowIdString = new StringBuilder();
String rowid = Integer.toString((int) rowId);
int padSpace = 10 - rowid.length();
if (padSpace > 0) {
paddedRowIdString.append(spaces, 0, padSpace);
}
paddedRowIdString.append(rowid, 0, Math.min(rowid.length(), 10));
return paddedRowIdString.toString();
}
/**
* Add the required filler bytes. Each row consists of 7 blocks of 10 characters and 1 block of
* 8 characters.
*
* @param rowId
* the current row number
*/
private void addFiller(long rowId) {
int base = (int) ((rowId * 8) % 26);
// Get Random var
Random random = new Random(rand.seed);
int range = random.nextInt(maxvaluelength - minvaluelength + 1);
int valuelen = range + minvaluelength;
while (valuelen > 10) {
value.append(filler[(base + valuelen) % 26], 0, 10);
valuelen -= 10;
}
if (valuelen > 0)
value.append(filler[(base + valuelen) % 26], 0, valuelen);
}
@Override
public void map(LongWritable row, NullWritable ignored, Context context)
throws IOException, InterruptedException {
context.setStatus("Entering");
long rowId = row.get();
if (rand == null) {
// we use 3 random numbers per a row
rand = new RandomGenerator(rowId * 3);
}
addKey();
value.clear();
// addRowId(rowId);
addFiller(rowId);
// New
Mutation m = new Mutation(key);
m.put("c", // column family
getRowIdString(rowId), // column qual
new Value(value.toString().getBytes())); // data
context.setStatus("About to add to accumulo");
context.write(tableName, m);
context.setStatus("Added to accumulo " + key.toString());
}
@Override
public void setup(Context job) {
minkeylength = job.getConfiguration().getInt("cloudgen.minkeylength", 0);
maxkeylength = job.getConfiguration().getInt("cloudgen.maxkeylength", 0);
minvaluelength = job.getConfiguration().getInt("cloudgen.minvaluelength", 0);
maxvaluelength = job.getConfiguration().getInt("cloudgen.maxvaluelength", 0);
tableName = new Text(job.getConfiguration().get("cloudgen.tablename"));
}
}
static class Opts extends ClientOpts {
@Parameter(names = {"-t", "--table"}, required = true, description = "table to use")
String tableName;
@Parameter(names = "--count", description = "number of rows to ingest", required = true)
long numRows;
@Parameter(names = {"-nk", "--minKeySize"}, description = "miniumum key size", required = true)
int minKeyLength;
@Parameter(names = {"-xk", "--maxKeySize"}, description = "maximum key size", required = true)
int maxKeyLength;
@Parameter(names = {"-nv", "--minValueSize"}, description = "minimum key size", required = true)
int minValueLength;
@Parameter(names = {"-xv", "--maxValueSize"}, description = "maximum key size", required = true)
int maxValueLength;
@Parameter(names = "--splits", description = "number of splits to create in the table")
int splits = 0;
}
public static void main(String[] args) throws Exception {
Opts opts = new Opts();
opts.parseArgs(TeraSortIngest.class.getName(), args);
Job job = Job.getInstance(opts.getHadoopConfig());
job.setJobName(TeraSortIngest.class.getName());
job.setJarByClass(TeraSortIngest.class);
job.setInputFormatClass(RangeInputFormat.class);
job.setMapperClass(SortGenMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Mutation.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(AccumuloOutputFormat.class);
AccumuloOutputFormat.configure().clientProperties(opts.getClientProperties())
.defaultTable(opts.tableName).createTables(true).store(job);
Configuration conf = job.getConfiguration();
conf.setLong(NUMROWS, opts.numRows);
conf.setInt("cloudgen.minkeylength", opts.minKeyLength);
conf.setInt("cloudgen.maxkeylength", opts.maxKeyLength);
conf.setInt("cloudgen.minvaluelength", opts.minValueLength);
conf.setInt("cloudgen.maxvaluelength", opts.maxValueLength);
conf.set("cloudgen.tablename", opts.tableName);
if (opts.splits != 0)
conf.setInt(NUMSPLITS, opts.splits);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}