blob: 08bbcb221c2a0aafac11096624091455973b2b06 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.kudu.mapreduce.tools;
import static org.apache.kudu.mapreduce.tools.BigLinkedListCommon.COLUMN_CLIENT;
import static org.apache.kudu.mapreduce.tools.BigLinkedListCommon.COLUMN_KEY_ONE;
import static org.apache.kudu.mapreduce.tools.BigLinkedListCommon.COLUMN_KEY_TWO;
import static org.apache.kudu.mapreduce.tools.BigLinkedListCommon.COLUMN_PREV_ONE;
import static org.apache.kudu.mapreduce.tools.BigLinkedListCommon.COLUMN_PREV_TWO;
import static org.apache.kudu.mapreduce.tools.BigLinkedListCommon.COLUMN_ROW_ID;
import static org.apache.kudu.mapreduce.tools.BigLinkedListCommon.COLUMN_UPDATE_COUNT;
import static org.apache.kudu.mapreduce.tools.BigLinkedListCommon.Counts;
import static org.apache.kudu.mapreduce.tools.BigLinkedListCommon.DEFAULT_HEADS_TABLE_NAME;
import static org.apache.kudu.mapreduce.tools.BigLinkedListCommon.DEFAULT_TABLE_NAME;
import static org.apache.kudu.mapreduce.tools.BigLinkedListCommon.HEADS_TABLE_NAME_KEY;
import static org.apache.kudu.mapreduce.tools.BigLinkedListCommon.TABLE_NAME_KEY;
import static org.apache.kudu.mapreduce.tools.BigLinkedListCommon.Xoroshiro128PlusRandom;
import static org.apache.kudu.mapreduce.tools.BigLinkedListCommon.getCreateTableOptions;
import static org.apache.kudu.mapreduce.tools.BigLinkedListCommon.getHeadsTableSchema;
import static org.apache.kudu.mapreduce.tools.BigLinkedListCommon.getTableSchema;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kudu.Schema;
import org.apache.kudu.client.AbstractKuduScannerBuilder;
import org.apache.kudu.client.Bytes;
import org.apache.kudu.client.CreateTableOptions;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduScanner;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.Operation;
import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.RowResult;
import org.apache.kudu.client.RowResultIterator;
import org.apache.kudu.client.SessionConfiguration;
import org.apache.kudu.client.Update;
import org.apache.kudu.mapreduce.CommandLineParser;
import org.apache.kudu.mapreduce.KuduTableMapReduceUtil;
import org.apache.kudu.util.Pair;
/**
* <p>
* This is an integration test borrowed from goraci, written by Keith Turner,
* which is in turn inspired by the Accumulo test called continuous ingest (ci).
* The original source code can be found here:
* </p>
* <ul>
* <li>
* <a href="https://github.com/keith-turner/goraci">https://github.com/keith-turner/goraci</a>
* </li>
* <li>
* <a href="https://github.com/enis/goraci/">https://github.com/enis/goraci/</a>
* </li>
* </ul>
*
* <p>
* Apache Accumulo has a simple test suite that verifies that data is not
* lost at scale. This test suite is called continuous ingest. This test runs
* many ingest clients that continually create linked lists containing 25
* million nodes. At some point the clients are stopped and a map reduce job is
* run to ensure no linked list has a hole. A hole indicates data was lost.
* </p>
*
* <p>
* The nodes in the linked list are random. This causes each linked list to
* spread across the table. Therefore if one part of a table loses data, then it
* will be detected by references in another part of the table.
* </p>
*
* <h3>
* THE ANATOMY OF THE TEST
* </h3>
*
* <p>
* Below is rough sketch of how data is written. For specific details look at
* the Generator code.
* </p>
* <ol>
* <li>
* Write out 1 million nodes
* </li>
* <li>
* Flush the client
* </li>
* <li>
* Write out 1 million that reference previous million
* </li>
* <li>
* If this is the 25th set of 1 million nodes, then update 1st set of million to point to last
* </li>
* <li>
* Goto 1
* </li>
* </ol>
*
* <p>
* The key is that nodes only reference flushed nodes. Therefore a node should
* never reference a missing node, even if the ingest client is killed at any
* point in time.
* </p>
*
* <p>
* When running this test suite w/ Accumulo there is a script running in
* parallel called the Agitator that randomly and continuously kills server
* processes. The outcome was that many data loss bugs were found in Accumulo
* by doing this. This test suite can also help find bugs that impact uptime
* and stability when run for days or weeks.
* </p>
*
* <p>
* This test suite consists the following:
* </p>
* <ul>
* <li>
* A few Java programs
* </li>
* <li>
* A little helper script to run the java programs
* </li>
* <li>
* A maven script to build it.
* </li>
* </ul>
*
* <p>
* When generating data, its best to have each map task generate a multiple of
* 25 million. The reason for this is that circular linked list are generated
* every 25M. Not generating a multiple in 25M will result in some nodes in the
* linked list not having references. The loss of an unreferenced node can not
* be detected.
* </p>
*
* <h3>
* Below is a description of the Java programs
* </h3>
*
* <ul>
* <li>
* Generator - A map only job that generates data. As stated previously,
* its best to generate data in multiples of 25M.
* </li>
* <li>
* Verify - A map reduce job that looks for holes. Look at the counts after running. REFERENCED and
* UNREFERENCED are ok, any UNDEFINED counts are bad. Do not run at the same
* time as the Generator.
* </li>
* <li>
* Print - A standalone program that prints nodes in the linked list
* </li>
* <li>
* Delete - Disabled. A standalone program that deletes a single node
* </li>
* <li>
* Walker - Disabled. A standalone program that start following a linked list and emits timing
* info.
* </li>
* </ul>
*
* <h3>
* KUDU-SPECIFIC CHANGES
* </h3>
*
* <ul>
* <li>
* The 16 bytes row key is divided into two 8 byte long since we don't have a "bytes" type in
* Kudu. Note that the C++ client can store bytes directly in string columns. Using longs
* enables us to pretty print human readable keys than can then be passed back just as easily.
* </li>
* <li>
* The table can be pre-split when running the Generator. The row keys' first component will be
* spread over the Long.MIN_VALUE - Long.MAX_VALUE keyspace.
* </li>
* <li>
* The Walker and Deleter programs were disabled to save some time but they can be re-enabled then
* ported to Kudu without too much effort.
* </li>
* </ul>
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class IntegrationTestBigLinkedList extends Configured implements Tool {
private static final byte[] NO_KEY = new byte[1];
/** How many rows to write per map task. This has to be a multiple of 25M. */
private static final String GENERATOR_NUM_ROWS_PER_MAP_KEY
= "IntegrationTestBigLinkedList.generator.num_rows";
private static final String GENERATOR_NUM_MAPPERS_KEY
= "IntegrationTestBigLinkedList.generator.map.tasks";
private static final String GENERATOR_WIDTH_KEY
= "IntegrationTestBigLinkedList.generator.width";
private static final String GENERATOR_WRAP_KEY
= "IntegrationTestBigLinkedList.generator.wrap";
private static final int WIDTH_DEFAULT = 1000000;
private static final int WRAP_DEFAULT = 25;
private static final int ROWKEY_LENGTH = 16;
private String toRun;
private String[] otherArgs;
static class CINode {
String key;
String prev;
String client;
long rowId;
int updateCount;
}
/**
* A Map only job that generates random linked list and stores them.
*/
static class Generator extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(Generator.class);
private CommandLineParser parser;
private KuduClient client;
static class GeneratorInputFormat extends InputFormat<BytesWritable,NullWritable> {
static class GeneratorInputSplit extends InputSplit implements Writable {
@Override
public long getLength() throws IOException, InterruptedException {
return 1;
}
@Override
public String[] getLocations() throws IOException, InterruptedException {
return new String[0];
}
@Override
public void readFields(DataInput arg0) throws IOException {
}
@Override
public void write(DataOutput arg0) throws IOException {
}
}
static class GeneratorRecordReader extends RecordReader<BytesWritable,NullWritable> {
private long count;
private long numNodes;
private Xoroshiro128PlusRandom rand;
@Override
public void close() throws IOException {
}
@Override
public BytesWritable getCurrentKey() throws IOException, InterruptedException {
byte[] bytes = new byte[ROWKEY_LENGTH];
rand.nextBytes(bytes);
return new BytesWritable(bytes);
}
@Override
public NullWritable getCurrentValue() throws IOException, InterruptedException {
return NullWritable.get();
}
@Override
public float getProgress() throws IOException, InterruptedException {
return (float)(count / (double)numNodes);
}
@Override
public void initialize(InputSplit arg0, TaskAttemptContext context)
throws IOException, InterruptedException {
numNodes = context.getConfiguration().getLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, 25000000);
// Use SecureRandom to avoid issue described in HBASE-13382.
rand = new Xoroshiro128PlusRandom();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return count++ < numNodes;
}
}
@Override
public RecordReader<BytesWritable,NullWritable> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
GeneratorRecordReader rr = new GeneratorRecordReader();
rr.initialize(split, context);
return rr;
}
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
int numMappers = job.getConfiguration().getInt(GENERATOR_NUM_MAPPERS_KEY, 1);
ArrayList<InputSplit> splits = new ArrayList<InputSplit>(numMappers);
for (int i = 0; i < numMappers; i++) {
splits.add(new GeneratorInputSplit());
}
return splits;
}
}
/** Ensure output files from prev-job go to map inputs for current job */
static class OneFilePerMapperSFIF<K, V> extends SequenceFileInputFormat<K, V> {
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
}
/**
* Some ASCII art time:
* [ . . . ] represents one batch of random longs of length WIDTH
*
* _________________________
* | ______ |
* | | ||
* __+_________________+_____ ||
* v v v |||
* first = [ . . . . . . . . . . . ] |||
* ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ |||
* | | | | | | | | | | | |||
* prev = [ . . . . . . . . . . . ] |||
* ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ |||
* | | | | | | | | | | | |||
* current = [ . . . . . . . . . . . ] |||
* |||
* ... |||
* |||
* last = [ . . . . . . . . . . . ] |||
* | | | | | | | | | | |-----|||
* | |--------||
* |___________________________|
*/
static class GeneratorMapper
extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
private byte[][] first = null;
private byte[][] prev = null;
private byte[][] current = null;
private String id;
private long rowId = 0;
private int i;
private KuduClient client;
private KuduTable table;
private KuduSession session;
private KuduTable headsTable;
private long numNodes;
private long wrap;
private int width;
@Override
protected void setup(Context context) throws KuduException {
id = "Job: " + context.getJobID() + " Task: " + context.getTaskAttemptID();
Configuration conf = context.getConfiguration();
CommandLineParser parser = new CommandLineParser(conf);
client = parser.getClient();
table = client.openTable(getTableName(conf));
headsTable = client.openTable(getHeadsTable(conf));
session = client.newSession();
session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
session.setMutationBufferSpace(WIDTH_DEFAULT);
session.setIgnoreAllDuplicateRows(true);
this.width = context.getConfiguration().getInt(GENERATOR_WIDTH_KEY, WIDTH_DEFAULT);
current = new byte[this.width][];
int wrapMultiplier = context.getConfiguration().getInt(GENERATOR_WRAP_KEY, WRAP_DEFAULT);
this.wrap = (long)wrapMultiplier * width;
this.numNodes = context.getConfiguration().getLong(
GENERATOR_NUM_ROWS_PER_MAP_KEY, (long)WIDTH_DEFAULT * WRAP_DEFAULT);
if (this.numNodes < this.wrap) {
this.wrap = this.numNodes;
}
}
@Override
protected void cleanup(Context context) throws KuduException {
session.close();
client.shutdown();
}
@Override
protected void map(BytesWritable key, NullWritable value, Context output) throws IOException {
current[i] = new byte[key.getLength()];
System.arraycopy(key.getBytes(), 0, current[i], 0, key.getLength());
if (++i == current.length) {
persist(output, current, false);
i = 0;
// Keep track of the first row so that we can point to it at the end.
if (first == null) {
first = current;
}
prev = current;
current = new byte[this.width][];
rowId += current.length;
output.setStatus("Count " + rowId);
// Check if it's time to wrap up this batch.
if (rowId % wrap == 0) {
// this block of code turns the 1 million linked list of length 25 into one giant
// circular linked list of 25 million.
circularLeftShift(first);
persist(output, first, true);
Operation insert = headsTable.newInsert();
PartialRow row = insert.getRow();
row.addLong(COLUMN_KEY_ONE, Bytes.getLong(first[0]));
row.addLong(COLUMN_KEY_TWO, Bytes.getLong(first[0], 8));
session.apply(insert);
session.flush();
first = null;
prev = null;
}
}
}
private static <T> void circularLeftShift(T[] first) {
T ez = first[0];
System.arraycopy(first, 1, first, 0, first.length - 1);
first[first.length - 1] = ez;
}
private void persist(Context output, byte[][] data, boolean update) throws KuduException {
for (int i = 0; i < data.length; i++) {
Operation put = update ? table.newUpdate() : table.newInsert();
PartialRow row = put.getRow();
long keyOne = Bytes.getLong(data[i]);
long keyTwo = Bytes.getLong(data[i], 8);
row.addLong(COLUMN_KEY_ONE, keyOne);
row.addLong(COLUMN_KEY_TWO, keyTwo);
// prev is null for the first line, we'll update it at the end.
if (prev == null) {
row.setNull(COLUMN_PREV_ONE);
row.setNull(COLUMN_PREV_TWO);
} else {
row.addLong(COLUMN_PREV_ONE, Bytes.getLong(prev[i]));
row.addLong(COLUMN_PREV_TWO, Bytes.getLong(prev[i], 8));
}
if (!update) {
// We only add those for new inserts, we don't update the heads with a new row, etc.
row.addLong(COLUMN_ROW_ID, rowId + i);
row.addString(COLUMN_CLIENT, id);
row.addInt(COLUMN_UPDATE_COUNT, 0);
}
session.apply(put);
if (i % 1000 == 0) {
// Tickle progress every so often else maprunner will think us hung
output.progress();
}
}
session.flush();
}
}
@Override
public int run(String[] args) throws Exception {
if (args.length < 4) {
System.out.println("Usage : " + Generator.class.getSimpleName() +
" <num mappers> <num nodes per map> <num_tablets> <tmp output dir> [<width> <wrap " +
"multiplier>]");
System.out.println(" where <num nodes per map> should be a multiple of " +
" width*wrap multiplier, 25M by default");
return 0;
}
int numMappers = Integer.parseInt(args[0]);
long numNodes = Long.parseLong(args[1]);
int numTablets = Integer.parseInt(args[2]);
Path tmpOutput = new Path(args[3]);
Integer width = (args.length < 5) ? null : Integer.parseInt(args[4]);
Integer wrapMultiplier = (args.length < 6) ? null : Integer.parseInt(args[5]);
return run(numMappers, numNodes, numTablets, tmpOutput, width, wrapMultiplier);
}
public int run(int numMappers, long numNodes, int numTablets, Path tmpOutput,
Integer width, Integer wrapMultiplier) throws Exception {
parser = new CommandLineParser(getConf());
client = parser.getClient();
try {
int ret = runRandomInputGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier);
if (ret > 0) {
return ret;
}
return runGenerator(numMappers, numNodes, numTablets, tmpOutput, width, wrapMultiplier);
} finally {
client.close();
client = null;
}
}
private void createTables(int numTablets) throws Exception {
createSchema(getTableName(getConf()), getTableSchema(), numTablets);
createSchema(getHeadsTable(getConf()), getHeadsTableSchema(), numTablets);
}
private void createSchema(String tableName, Schema schema, int numTablets) throws Exception {
if (client.tableExists(tableName)) {
return;
}
CreateTableOptions builder = getCreateTableOptions(schema,
parser.getNumReplicas(),
numTablets, 1);
client.createTable(tableName, schema, builder);
}
public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput,
Integer width, Integer wrapMultiplier) throws Exception {
LOG.info("Running RandomInputGenerator with numMappers=" + numMappers +
", numNodes=" + numNodes);
Job job = new Job(getConf());
job.setJobName("Random Input Generator");
job.setNumReduceTasks(0);
job.setJarByClass(getClass());
job.setInputFormatClass(GeneratorInputFormat.class);
job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(NullWritable.class);
setJobConf(job, numMappers, numNodes, width, wrapMultiplier);
job.setMapperClass(Mapper.class); //identity mapper
FileOutputFormat.setOutputPath(job, tmpOutput);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public int runGenerator(int numMappers, long numNodes, int numTablets, Path tmpOutput,
Integer width, Integer wrapMultiplier) throws Exception {
LOG.info("Running Generator with numMappers=" + numMappers + ", numNodes=" + numNodes);
createTables(numTablets);
Job job = new Job(getConf());
job.setJobName("Link Generator");
job.setNumReduceTasks(0);
job.setJarByClass(getClass());
FileInputFormat.setInputPaths(job, tmpOutput);
job.setInputFormatClass(org.apache.kudu.mapreduce.tools.IntegrationTestBigLinkedList
.Generator.OneFilePerMapperSFIF.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
setJobConf(job, numMappers, numNodes, width, wrapMultiplier);
job.setMapperClass(GeneratorMapper.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
// If we fail, retrying will fail again in case we were able to flush at least once since
// we'll be creating duplicate rows. Better to just have one try.
job.getConfiguration().setInt("mapreduce.map.maxattempts", 1);
// Lack of YARN-445 means we can't auto-jstack on timeout, so disabling the timeout gives
// us a chance to do it manually.
job.getConfiguration().setInt("mapreduce.task.timeout", 0);
KuduTableMapReduceUtil.addDependencyJars(job);
KuduTableMapReduceUtil.addCredentialsToJob(client, job);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
}
/**
* A Map Reduce job that verifies that the linked lists generated by
* {@link Generator} do not have any holes.
*/
static class Verify extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(Verify.class);
private static final BytesWritable DEF = new BytesWritable(NO_KEY);
private static final Joiner COMMA_JOINER = Joiner.on(",");
private static final byte[] rowKey = new byte[ROWKEY_LENGTH];
private static final byte[] prev = new byte[ROWKEY_LENGTH];
private Job job;
public static class VerifyMapper extends Mapper<NullWritable, RowResult,
BytesWritable, BytesWritable> {
private BytesWritable row = new BytesWritable();
private BytesWritable ref = new BytesWritable();
@Override
protected void map(NullWritable key, RowResult value, Mapper.Context context)
throws IOException, InterruptedException {
Bytes.setLong(rowKey, value.getLong(0));
Bytes.setLong(rowKey, value.getLong(1), 8);
row.set(rowKey, 0, rowKey.length);
// Emit that the row is defined
context.write(row, DEF);
if (value.isNull(2)) {
LOG.warn(String.format("Prev is not set for: %s", Bytes.pretty(rowKey)));
} else {
Bytes.setLong(prev, value.getLong(2));
Bytes.setLong(prev, value.getLong(3), 8);
ref.set(prev, 0, prev.length);
// Emit which row is referenced by this row.
context.write(ref, row);
}
}
}
public static class VerifyReducer extends Reducer<BytesWritable,BytesWritable,Text,Text> {
private ArrayList<byte[]> refs = new ArrayList<byte[]>();
@Override
public void reduce(BytesWritable key, Iterable<BytesWritable> values, Context context)
throws IOException, InterruptedException {
int defCount = 0;
refs.clear();
// We only expect two values, a DEF and a reference, but there might be more.
for (BytesWritable type : values) {
if (type.getLength() == DEF.getLength()) {
defCount++;
} else {
byte[] bytes = new byte[type.getLength()];
System.arraycopy(type.getBytes(), 0, bytes, 0, type.getLength());
refs.add(bytes);
}
}
// TODO check for more than one def, should not happen
List<String> refsList = new ArrayList<>(refs.size());
String keyString = null;
if (defCount == 0 || refs.size() != 1) {
for (byte[] ref : refs) {
refsList.add(COMMA_JOINER.join(Bytes.getLong(ref), Bytes.getLong(ref, 8)));
}
keyString = COMMA_JOINER.join(Bytes.getLong(key.getBytes()),
Bytes.getLong(key.getBytes(), 8));
LOG.error("Linked List error: Key = " + keyString + " References = " + refsList);
}
if (defCount == 0 && refs.size() > 0) {
// this is bad, found a node that is referenced but not defined. It must have been
// lost, emit some info about this node for debugging purposes.
context.write(new Text(keyString), new Text(refsList.toString()));
context.getCounter(Counts.UNDEFINED).increment(1);
} else if (defCount > 0 && refs.size() == 0) {
// node is defined but not referenced
context.write(new Text(keyString), new Text("none"));
context.getCounter(Counts.UNREFERENCED).increment(1);
} else {
if (refs.size() > 1) {
if (refsList != null) {
context.write(new Text(keyString), new Text(refsList.toString()));
}
context.getCounter(Counts.EXTRAREFERENCES).increment(refs.size() - 1);
}
// node is defined and referenced
context.getCounter(Counts.REFERENCED).increment(1);
}
}
}
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out.println("Usage : " + Verify.class.getSimpleName() +
" <output dir> <num reducers>");
return 0;
}
String outputDir = args[0];
int numReducers = Integer.parseInt(args[1]);
return run(outputDir, numReducers);
}
public int run(String outputDir, int numReducers) throws Exception {
return run(new Path(outputDir), numReducers);
}
public int run(Path outputDir, int numReducers) throws Exception {
LOG.info("Running Verify with outputDir=" + outputDir + ", numReducers=" + numReducers);
job = new Job(getConf());
job.setJobName("Link Verifier");
job.setNumReduceTasks(numReducers);
job.setJarByClass(getClass());
Joiner columnsToQuery = Joiner.on(",");
new KuduTableMapReduceUtil.TableInputFormatConfiguratorWithCommandLineParser(
job, getTableName(getConf()),
columnsToQuery.join(COLUMN_KEY_ONE, COLUMN_KEY_TWO, COLUMN_PREV_ONE, COLUMN_PREV_TWO))
.configure();
job.setMapperClass(VerifyMapper.class);
job.setMapOutputKeyClass(BytesWritable.class);
job.setMapOutputValueClass(BytesWritable.class);
job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
job.setReducerClass(VerifyReducer.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, outputDir);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
@SuppressWarnings("deprecation")
public boolean verify(long expectedReferenced) throws Exception {
if (job == null) {
throw new IllegalStateException("You should call run() first");
}
Counters counters = job.getCounters();
Counter referenced = counters.findCounter(Counts.REFERENCED);
Counter unreferenced = counters.findCounter(Counts.UNREFERENCED);
Counter undefined = counters.findCounter(Counts.UNDEFINED);
Counter multiref = counters.findCounter(Counts.EXTRAREFERENCES);
boolean success = true;
//assert
if (expectedReferenced != referenced.getValue()) {
LOG.error("Expected referenced count does not match with actual referenced count. " +
"Expected referenced=" + expectedReferenced + ", actual=" + referenced.getValue());
success = false;
}
if (unreferenced.getValue() > 0) {
boolean couldBeMultiRef = (multiref.getValue() == unreferenced.getValue());
LOG.error("Unreferenced nodes were not expected. Unreferenced count=" +
unreferenced.getValue() +
(couldBeMultiRef ? "; could be due to duplicate random numbers" : ""));
success = false;
}
if (undefined.getValue() > 0) {
LOG.error("Found an undefined node. Undefined count=" + undefined.getValue());
success = false;
}
// TODO Add the rows' location on failure.
if (!success) {
//Configuration conf = job.getConfiguration();
//HConnection conn = HConnectionManager.getConnection(conf);
//TableName tableName = getTableName(conf);
CounterGroup g = counters.getGroup("undef");
Iterator<Counter> it = g.iterator();
while (it.hasNext()) {
String keyString = it.next().getName();
//byte[] key = Bytes.toBytes(keyString);
//HRegionLocation loc = conn.relocateRegion(tableName, key);
LOG.error("undefined row " + keyString /*+ ", " + loc*/);
}
g = counters.getGroup("unref");
it = g.iterator();
while (it.hasNext()) {
String keyString = it.next().getName();
//byte[] key = Bytes.toBytes(keyString);
//HRegionLocation loc = conn.relocateRegion(tableName, key);
LOG.error("unreferred row " + keyString /*+ ", " + loc*/);
}
}
return success;
}
}
/**
* Executes Generate and Verify in a loop. Data is not cleaned between runs, so each iteration
* adds more data.
*/
static class Loop extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(Loop.class);
IntegrationTestBigLinkedList it;
FileSystem fs;
protected void runGenerator(int numMappers, long numNodes,
int numTablets, String outputDir,
Integer width, Integer wrapMultiplier) throws Exception {
Path outputPath = new Path(outputDir);
UUID uuid = UUID.randomUUID(); //create a random UUID.
Path generatorOutput = new Path(outputPath, uuid.toString());
Generator generator = new Generator();
generator.setConf(getConf());
int retCode = generator.run(numMappers, numNodes, numTablets, generatorOutput, width,
wrapMultiplier);
if (retCode > 0) {
throw new RuntimeException("Generator failed with return code: " + retCode);
}
fs.delete(generatorOutput, true);
}
protected void runVerify(String outputDir,
int numReducers,
long expectedNumNodes,
int retries) throws Exception {
// Kudu doesn't fully support snapshot consistency so we might start reading from a node that
// doesn't have all the data. This happens often with under "chaos monkey"-type of setups.
for (int i = 0; i < retries; i++) {
if (i > 0) {
long sleep = 60 * 1000;
LOG.info("Retrying in " + sleep + "ms");
Thread.sleep(sleep);
}
Path outputPath = new Path(outputDir);
UUID uuid = UUID.randomUUID(); //create a random UUID.
Path iterationOutput = new Path(outputPath, uuid.toString());
Verify verify = new Verify();
verify.setConf(getConf());
int retCode = verify.run(iterationOutput, numReducers);
if (retCode > 0) {
LOG.warn("Verify.run failed with return code: " + retCode);
} else if (!verify.verify(expectedNumNodes)) {
LOG.warn("Verify.verify failed");
} else {
fs.delete(iterationOutput, true);
LOG.info("Verify finished with success. Total nodes=" + expectedNumNodes);
return;
}
}
throw new RuntimeException("Ran out of retries to verify");
}
@Override
public int run(String[] args) throws Exception {
if (args.length < 6) {
System.err.println("Usage: Loop <num iterations> <num mappers> <num nodes per mapper> " +
"<num_tablets> <output dir> <num reducers> [<width> <wrap multiplier>" +
"<start expected nodes> <num_verify_retries>]");
return 1;
}
LOG.info("Running Loop with args:" + Arrays.deepToString(args));
int numIterations = Integer.parseInt(args[0]);
int numMappers = Integer.parseInt(args[1]);
long numNodes = Long.parseLong(args[2]);
int numTablets = Integer.parseInt(args[3]);
String outputDir = args[4];
int numReducers = Integer.parseInt(args[5]);
Integer width = (args.length < 7) ? null : Integer.parseInt(args[6]);
Integer wrapMultiplier = (args.length < 8) ? null : Integer.parseInt(args[7]);
long expectedNumNodes = (args.length < 9) ? 0 : Long.parseLong(args[8]);
int numVerifyRetries = (args.length < 10) ? 3 : Integer.parseInt(args[9]);
if (numIterations < 0) {
numIterations = Integer.MAX_VALUE; // run indefinitely (kind of)
}
fs = FileSystem.get(getConf());
for (int i = 0; i < numIterations; i++) {
LOG.info("Starting iteration = " + i);
runGenerator(numMappers, numNodes, numTablets, outputDir, width, wrapMultiplier);
expectedNumNodes += numMappers * numNodes;
runVerify(outputDir, numReducers, expectedNumNodes, numVerifyRetries);
}
return 0;
}
}
/**
* A stand alone program that prints out portions of a list created by {@link Generator}
*/
private static class Print extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Options options = new Options();
options.addOption("s", "start", true, "start key, only the first component");
options.addOption("e", "end", true, "end key (exclusive), only the first component");
options.addOption("l", "limit", true, "number to print");
GnuParser parser = new GnuParser();
CommandLine cmd = null;
try {
cmd = parser.parse(options, args);
if (cmd.getArgs().length != 0) {
throw new ParseException("Command takes no arguments");
}
} catch (ParseException e) {
System.err.println("Failed to parse command line " + e.getMessage());
System.err.println();
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp(getClass().getSimpleName(), options);
System.exit(-1);
}
CommandLineParser cmdLineParser = new CommandLineParser(getConf());
long timeout = cmdLineParser.getOperationTimeoutMs();
KuduClient client = cmdLineParser.getClient();
KuduTable table = client.openTable(getTableName(getConf()));
KuduScanner.KuduScannerBuilder builder =
client.newScannerBuilder(table)
.scanRequestTimeout(timeout);
if (cmd.hasOption("s")) {
PartialRow row = table.getSchema().newPartialRow();
row.addLong(0, Long.parseLong(cmd.getOptionValue("s")));
builder.lowerBound(row);
}
if (cmd.hasOption("e")) {
PartialRow row = table.getSchema().newPartialRow();
row.addLong(0, Long.parseLong(cmd.getOptionValue("e")));
builder.exclusiveUpperBound(row);
}
int limit = cmd.hasOption("l") ? Integer.parseInt(cmd.getOptionValue("l")) : 100;
int count = 0;
KuduScanner scanner = builder.build();
while (scanner.hasMoreRows() && count < limit) {
RowResultIterator rowResults = scanner.nextRows();
count = printNodesAndGetNewCount(count, limit, rowResults);
}
RowResultIterator rowResults = scanner.close();
printNodesAndGetNewCount(count, limit, rowResults);
client.shutdown();
return 0;
}
private static int printNodesAndGetNewCount(int oldCount, int limit,
RowResultIterator rowResults) {
int newCount = oldCount;
if (rowResults == null) {
return newCount;
}
CINode node = new CINode();
for (RowResult result : rowResults) {
newCount++;
node = getCINode(result, node);
printCINodeString(node);
if (newCount == limit) {
break;
}
}
return newCount;
}
}
/**
* This tool needs to be run separately from the Generator-Verify loop. It can run while the
* other two are running or in between loops.
*
* Each mapper scans a "heads" table and, for each row, follows the circular linked list and
* updates their counter until it reaches the head of the list again.
*/
private static class Updater extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(Updater.class);
private static final String MAX_LINK_UPDATES_PER_MAPPER = "kudu.updates.per.mapper";
public enum Counts {
// Stats on what we're updating.
UPDATED_LINKS,
UPDATED_NODES,
FIRST_UPDATE,
SECOND_UPDATE,
THIRD_UPDATE,
FOURTH_UPDATE,
MORE_THAN_FOUR_UPDATES,
// Stats on what's broken.
BROKEN_LINKS,
BAD_UPDATE_COUNTS
}
public static class UpdaterMapper extends Mapper<NullWritable, RowResult,
NullWritable, NullWritable> {
private KuduClient client;
private KuduTable table;
private KuduSession session;
/**
* Schema we use when getting rows from the linked list, we only need the reference and
* its update count.
*/
private static final List<String> SCAN_COLUMN_NAMES =
ImmutableList.of(COLUMN_PREV_ONE, COLUMN_PREV_TWO, COLUMN_UPDATE_COUNT, COLUMN_CLIENT);
private int numUpdatesPerMapper;
/**
* Processing each linked list takes minutes, meaning that it's easily possible for our
* scanner to timeout. Instead, we gather all the linked list heads that we need and
* process them all at once in the first map invocation.
*/
private List<Pair<Long, Long>> headsCache;
@Override
protected void setup(Context context) throws KuduException {
Configuration conf = context.getConfiguration();
CommandLineParser parser = new CommandLineParser(conf);
client = parser.getClient();
table = client.openTable(getTableName(conf));
session = client.newSession();
numUpdatesPerMapper = conf.getInt(MAX_LINK_UPDATES_PER_MAPPER, 1);
headsCache = new ArrayList<>(numUpdatesPerMapper);
}
@Override
protected void map(NullWritable key, RowResult value, Mapper.Context context)
throws IOException, InterruptedException {
// Add as many heads as we need, then we skip the rest.
do {
if (headsCache.size() < numUpdatesPerMapper) {
value = (RowResult)context.getCurrentValue();
headsCache.add(new Pair<Long, Long>(value.getLong(0), value.getLong(1)));
}
} while (context.nextKeyValue());
// At this point we've exhausted the scanner and hopefully gathered all the linked list
// heads we needed.
LOG.info("Processing " + headsCache.size() +
" linked lists, out of " + numUpdatesPerMapper);
processAllHeads(context);
}
private void processAllHeads(Mapper.Context context) throws IOException {
for (Pair<Long, Long> value : headsCache) {
processHead(value, context);
}
}
private void processHead(Pair<Long, Long> head, Mapper.Context context) throws IOException {
long headKeyOne = head.getFirst();
long headKeyTwo = head.getSecond();
long prevKeyOne = headKeyOne;
long prevKeyTwo = headKeyTwo;
int currentCount = -1;
int newCount = -1;
String client = null;
// Always printing this out, really useful when debugging.
LOG.info("Head: " + getStringFromKeys(headKeyOne, headKeyTwo));
do {
RowResult prev = nextNode(prevKeyOne, prevKeyTwo);
if (prev == null) {
context.getCounter(Counts.BROKEN_LINKS).increment(1);
LOG.warn(getStringFromKeys(prevKeyOne, prevKeyTwo) + " doesn't exist");
break;
}
// It's possible those columns are null, let's not break trying to read them.
if (prev.isNull(0) || prev.isNull(1)) {
context.getCounter(Counts.BROKEN_LINKS).increment(1);
LOG.warn(getStringFromKeys(prevKeyOne, prevKeyTwo) + " isn't referencing anywhere");
break;
}
int prevCount = prev.getInt(2);
String prevClient = prev.getString(3);
if (currentCount == -1) {
// First time we loop we discover what the count was and set the new one.
currentCount = prevCount;
newCount = currentCount + 1;
client = prevClient;
}
if (prevCount != currentCount) {
context.getCounter(Counts.BAD_UPDATE_COUNTS).increment(1);
LOG.warn(getStringFromKeys(prevKeyOne, prevKeyTwo) + " has a wrong updateCount, " +
prevCount + " instead of " + currentCount);
// Game over, there's corruption.
break;
}
if (!prevClient.equals(client)) {
context.getCounter(Counts.BROKEN_LINKS).increment(1);
LOG.warn(getStringFromKeys(prevKeyOne, prevKeyTwo) + " has the wrong client, " +
"bad reference? Bad client= " + prevClient);
break;
}
updateRow(prevKeyOne, prevKeyTwo, newCount);
context.getCounter(Counts.UPDATED_NODES).increment(1);
if (prevKeyOne % 10 == 0) {
context.progress();
}
prevKeyOne = prev.getLong(0);
prevKeyTwo = prev.getLong(1);
} while (headKeyOne != prevKeyOne && headKeyTwo != prevKeyTwo);
updateStatCounters(context, newCount);
context.getCounter(Counts.UPDATED_LINKS).increment(1);
}
/**
* Finds the next node in the linked list.
*/
private RowResult nextNode(long prevKeyOne, long prevKeyTwo) throws KuduException {
KuduScanner.KuduScannerBuilder builder = client.newScannerBuilder(table)
.setProjectedColumnNames(SCAN_COLUMN_NAMES);
configureScannerForRandomRead(builder, table, prevKeyOne, prevKeyTwo);
return getOneRowResult(builder.build());
}
private void updateRow(long keyOne, long keyTwo, int newCount) throws IOException {
Update update = table.newUpdate();
PartialRow row = update.getRow();
row.addLong(COLUMN_KEY_ONE, keyOne);
row.addLong(COLUMN_KEY_TWO, keyTwo);
row.addInt(COLUMN_UPDATE_COUNT, newCount);
session.apply(update);
}
/**
* We keep some statistics about the linked list we update so that we can get a feel of
* what's being updated.
*/
private void updateStatCounters(Mapper.Context context, int newCount) {
switch (newCount) {
case -1:
case 0:
// TODO We didn't event get the first node?
break;
case 1:
context.getCounter(Counts.FIRST_UPDATE).increment(1);
break;
case 2:
context.getCounter(Counts.SECOND_UPDATE).increment(1);
break;
case 3:
context.getCounter(Counts.THIRD_UPDATE).increment(1);
break;
case 4:
context.getCounter(Counts.FOURTH_UPDATE).increment(1);
break;
default:
context.getCounter(Counts.MORE_THAN_FOUR_UPDATES).increment(1);
break;
}
}
@Override
protected void cleanup(Context context) throws KuduException {
session.close();
client.shutdown();
}
}
public int run(long maxLinkUpdatesPerMapper) throws Exception {
LOG.info("Running Updater with maxLinkUpdatesPerMapper=" + maxLinkUpdatesPerMapper);
Job job = new Job(getConf());
job.setJobName("Link Updater");
job.setNumReduceTasks(0);
job.setJarByClass(getClass());
Joiner columnsToQuery = Joiner.on(",");
new KuduTableMapReduceUtil.TableInputFormatConfiguratorWithCommandLineParser(
job, getHeadsTable(getConf()),
columnsToQuery.join(COLUMN_KEY_ONE, COLUMN_KEY_TWO))
.configure();
job.setMapperClass(UpdaterMapper.class);
job.setMapOutputKeyClass(BytesWritable.class);
job.setMapOutputValueClass(BytesWritable.class);
job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
// If something fails we want to exit ASAP.
job.getConfiguration().setInt("mapreduce.map.maxattempts", 1);
// Lack of YARN-445 means we can't auto-jstack on timeout, so disabling the timeout gives
// us a chance to do it manually.
job.getConfiguration().setInt("mapreduce.task.timeout", 0);
job.getConfiguration().setLong(MAX_LINK_UPDATES_PER_MAPPER, maxLinkUpdatesPerMapper);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(NullOutputFormat.class);
KuduTableMapReduceUtil.addDependencyJars(job);
boolean success = job.waitForCompletion(true);
Counters counters = job.getCounters();
if (success) {
// Let's not continue looping if we have broken linked lists.
Counter brokenLinks = counters.findCounter(Counts.BROKEN_LINKS);
Counter badUpdates = counters.findCounter(Counts.BAD_UPDATE_COUNTS);
if (brokenLinks.getValue() > 0 || badUpdates.getValue() > 0) {
LOG.error("Corruption was detected, see the job's counters. Ending the update loop.");
success = false;
}
}
return success ? 0 : 1;
}
@Override
public int run(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: Update <num iterations> <max link updates per mapper>");
System.err.println(" where <num iterations> will be 'infinite' if passed a negative value" +
" or zero");
return 1;
}
LOG.info("Running Loop with args:" + Arrays.deepToString(args));
int numIterations = Integer.parseInt(args[0]);
long maxUpdates = Long.parseLong(args[1]);
if (numIterations <= 0) {
numIterations = Integer.MAX_VALUE;
}
if (maxUpdates < 1) {
maxUpdates = 1;
}
for (int i = 0; i < numIterations; i++) {
LOG.info("Starting iteration = " + i);
int ret = run(maxUpdates);
if (ret != 0) {
LOG.error("Can't continue updating, last run failed.");
return ret;
}
}
return 0;
}
}
/**
* A stand alone program that deletes a single node.
* TODO
*/
/*private static class Delete extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 1) {
System.out.println("Usage : " + Delete.class.getSimpleName() + " <node to delete>");
return 0;
}
byte[] val = Bytes.toBytesBinary(args[0]);
org.apache.hadoop.hbase.client.Delete delete
= new org.apache.hadoop.hbase.client.Delete(val);
HTable table = new HTable(getConf(), getTableName(getConf()));
table.delete(delete);
table.flushCommits();
table.close();
System.out.println("Delete successful");
return 0;
}
}*/
/**
* A stand alone program that follows a linked list created by {@link Generator}
* and prints timing info.
*
*/
private static class Walker extends Configured implements Tool {
private KuduClient client;
private KuduTable table;
@Override
public int run(String[] args) throws IOException {
if (args.length < 1) {
System.err.println("Usage: Walker <start key> [<num nodes>]");
System.err.println(" where <num nodes> defaults to 100 nodes that will be printed out");
return 1;
}
int maxNumNodes = 100;
if (args.length == 2) {
maxNumNodes = Integer.parseInt(args[1]);
}
System.out.println("Running Walker with args:" + Arrays.deepToString(args));
String[] keys = args[0].split(",");
if (keys.length != 2) {
System.err.println("The row key must be formatted like key1,key2");
return 1;
}
long keyOne = Long.parseLong(keys[0]);
long keyTwo = Long.parseLong(keys[1]);
System.out.println("Walking with " + getStringFromKeys(keyOne, keyTwo));
walk(keyOne, keyTwo, maxNumNodes);
return 0;
}
private void walk(long headKeyOne, long headKeyTwo, int maxNumNodes) throws KuduException {
CommandLineParser parser = new CommandLineParser(getConf());
client = parser.getClient();
table = client.openTable(getTableName(getConf()));
long prevKeyOne = headKeyOne;
long prevKeyTwo = headKeyTwo;
CINode node = new CINode();
int nodesCount = 0;
do {
RowResult rr = nextNode(prevKeyOne, prevKeyTwo);
if (rr == null) {
System.err.println(getStringFromKeys(prevKeyOne, prevKeyTwo) + " doesn't exist!");
break;
}
getCINode(rr, node);
printCINodeString(node);
if (rr.isNull(2) || rr.isNull(3)) {
System.err.println("Last node didn't have a reference, breaking");
break;
}
prevKeyOne = rr.getLong(2);
prevKeyTwo = rr.getLong(3);
nodesCount++;
} while ((headKeyOne != prevKeyOne && headKeyTwo != prevKeyTwo) && (nodesCount <
maxNumNodes));
}
private RowResult nextNode(long keyOne, long keyTwo) throws KuduException {
KuduScanner.KuduScannerBuilder builder = client.newScannerBuilder(table);
configureScannerForRandomRead(builder, table, keyOne, keyTwo);
return getOneRowResult(builder.build());
}
}
private static void configureScannerForRandomRead(AbstractKuduScannerBuilder builder,
KuduTable table,
long keyOne,
long keyTwo) {
PartialRow lowerBound = table.getSchema().newPartialRow();
lowerBound.addLong(0, keyOne);
lowerBound.addLong(1, keyTwo);
builder.lowerBound(lowerBound);
PartialRow upperBound = table.getSchema().newPartialRow();
// Adding 1 since we want a single row, and the upper bound is exclusive.
upperBound.addLong(0, keyOne + 1);
upperBound.addLong(1, keyTwo + 1);
builder.exclusiveUpperBound(upperBound);
}
private static String getTableName(Configuration conf) {
return conf.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME);
}
private static String getHeadsTable(Configuration conf) {
return conf.get(HEADS_TABLE_NAME_KEY, DEFAULT_HEADS_TABLE_NAME);
}
private static CINode getCINode(RowResult result, CINode node) {
node.key = getStringFromKeys(result.getLong(0), result.getLong(1));
if (result.isNull(2) || result.isNull(3)) {
node.prev = "NO_REFERENCE";
} else {
node.prev = getStringFromKeys(result.getLong(2), result.getLong(3));
}
node.rowId = result.getInt(4);
node.client = result.getString(5);
node.updateCount = result.getInt(6);
return node;
}
private static void printCINodeString(CINode node) {
System.out.printf("%s:%s:%012d:%s:%s\n", node.key, node.prev, node.rowId, node.client,
node.updateCount);
}
private static String getStringFromKeys(long key1, long key2) {
return new StringBuilder().append(key1).append(",").append(key2).toString();
}
private static RowResult getOneRowResult(KuduScanner scanner) throws KuduException {
RowResultIterator rowResults;
rowResults = scanner.nextRows();
if (rowResults.getNumRows() == 0) {
return null;
}
if (rowResults.getNumRows() > 1) {
throw new RuntimeException("Received too many rows from scanner " + scanner);
}
return rowResults.next();
}
private void usage() {
System.err.println("Usage: " + this.getClass().getSimpleName() + " COMMAND [COMMAND options]");
System.err.println(" where COMMAND is one of:");
System.err.println("");
System.err.println(" Generator A map only job that generates data.");
System.err.println(" Verify A map reduce job that looks for holes");
System.err.println(" Look at the counts after running");
System.err.println(" REFERENCED and UNREFERENCED are ok");
System.err.println(" any UNDEFINED counts are bad. Do not");
System.err.println(" run at the same time as the Generator.");
System.err.println(" Print A standalone program that prints nodes");
System.err.println(" in the linked list.");
System.err.println(" Loop A program to Loop through Generator and");
System.err.println(" Verify steps");
System.err.println(" Update A program to updade the nodes");
/* System.err.println(" Delete A standalone program that deletes a");
System.err.println(" single node.");*/
System.err.println(" Walker A standalong program that starts ");
System.err.println(" following a linked list");
System.err.println("\t ");
System.err.flush();
}
protected void processOptions(String[] args) {
//get the class, run with the conf
if (args.length < 1) {
usage();
throw new RuntimeException("Incorrect Number of args.");
}
toRun = args[0];
otherArgs = Arrays.copyOfRange(args, 1, args.length);
}
@Override
public int run(String[] args) throws Exception {
Tool tool;
processOptions(args);
if (toRun.equals("Generator")) {
tool = new Generator();
} else if (toRun.equals("Verify")) {
tool = new Verify();
} else if (toRun.equals("Loop")) {
Loop loop = new Loop();
loop.it = this;
tool = loop;
} else if (toRun.equals("Print")) {
tool = new Print();
} else if (toRun.equals("Update")) {
tool = new Updater();
} else if (toRun.equals("Walker")) {
tool = new Walker();
} else {
usage();
throw new RuntimeException("Unknown arg");
}
return ToolRunner.run(getConf(), tool, otherArgs);
}
private static void setJobConf(Job job, int numMappers, long numNodes,
Integer width, Integer wrapMultiplier) {
job.getConfiguration().setInt(GENERATOR_NUM_MAPPERS_KEY, numMappers);
job.getConfiguration().setLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, numNodes);
if (width != null) {
job.getConfiguration().setInt(GENERATOR_WIDTH_KEY, width);
}
if (wrapMultiplier != null) {
job.getConfiguration().setInt(GENERATOR_WRAP_KEY, wrapMultiplier);
}
}
public static void main(String[] args) throws Exception {
int ret = ToolRunner.run(new IntegrationTestBigLinkedList(), args);
System.exit(ret);
}
}