Refactored RowHash and TeraSortIngest (#68)

diff --git a/bin/mapred b/bin/mapred
new file mode 100755
index 0000000..f943b45
--- /dev/null
+++ b/bin/mapred
@@ -0,0 +1,67 @@
+#! /usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+bin_dir=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
+at_home=$( cd "$( dirname "$bin_dir" )" && pwd )
+
+function print_usage() {
+  cat <<EOF
+
+Usage: mapred <application> {-o test.<prop>=<value>}
+
+Available applications:
+
+    terasort    Run Terasort
+    rowhash     Run RowHash
+EOF
+}
+
+if [ -f "$at_home/conf/env.sh" ]; then
+  . "$at_home"/conf/env.sh
+else
+  . "$at_home"/conf/env.sh.example
+fi
+
+if [ -z "$1" ]; then
+  echo "ERROR: <application> needs to be set"
+  print_usage
+  exit 1
+fi
+
+mr_package="org.apache.accumulo.testing.mapreduce"
+case "$1" in
+  terasort)
+    mr_main="${mr_package}.TeraSortIngest"
+    ;;
+  rowhash)
+    mr_main="${mr_package}.RowHash"
+    ;;
+  *)
+    echo "Unknown application: $1"
+    print_usage
+    exit 1
+esac
+
+export CLASSPATH="$TEST_JAR_PATH:$HADOOP_API_JAR:$HADOOP_RUNTIME_JAR:$CLASSPATH"
+
+if [ ! -z $HADOOP_HOME ]; then
+  export HADOOP_USE_CLIENT_CLASSLOADER=true
+  "$HADOOP_HOME"/bin/yarn jar "$TEST_JAR_PATH" "$mr_main" "$TEST_PROPS" "$ACCUMULO_CLIENT_PROPS" "${@:2}" 
+else
+  echo "Hadoop must be installed and HADOOP_HOME must be set!"
+  exit 1
+fi
diff --git a/conf/accumulo-testing.properties.example b/conf/accumulo-testing.properties.example
index 502bcde..9dbe4e0 100644
--- a/conf/accumulo-testing.properties.example
+++ b/conf/accumulo-testing.properties.example
@@ -119,3 +119,32 @@
 # produce a bulk import file.
 test.ci.bulk.reducers.max=1024
 
+#################
+# MapReduce Tests
+#################
+
+# RowHash test
+# ------------
+# Table containing input data
+test.rowhash.input.table = terasort
+# Table where data will be output to
+test.rowhash.output.table = rowhash
+# Column that is fetched in input table
+test.rowhash.column = c
+
+# TeraSort ingest
+# ---------------
+# Table to ingest into
+test.terasort.table = terasort
+# Number of rows to ingest
+test.terasort.num.rows = 10000
+# Minimum key size
+test.terasort.min.keysize = 10
+# Maximum key size
+test.terasort.max.keysize = 10
+# Minimum value size
+test.terasort.min.valuesize = 78
+# Maximum value size
+test.terasort.max.valuesize = 78
+# Number of table splits
+test.terasort.num.splits = 4
diff --git a/src/main/java/org/apache/accumulo/testing/TestProps.java b/src/main/java/org/apache/accumulo/testing/TestProps.java
index 3f2ca15..49ea718 100644
--- a/src/main/java/org/apache/accumulo/testing/TestProps.java
+++ b/src/main/java/org/apache/accumulo/testing/TestProps.java
@@ -33,6 +33,8 @@
   private static final String CI_SCANNER = CI + "scanner.";
   private static final String CI_VERIFY = CI + "verify.";
   private static final String CI_BULK = CI + "bulk.";
+  public static final String TERASORT = PREFIX + "terasort.";
+  public static final String ROWHASH = PREFIX + "rowhash.";
 
   /** Common properties **/
   // HDFS root path. Should match 'fs.defaultFS' property in Hadoop's core-site.xml
@@ -122,6 +124,20 @@
   public static final String CI_BULK_MAP_NODES = CI_BULK + "map.nodes";
   public static final String CI_BULK_REDUCERS = CI_BULK + "reducers.max";
 
+  /** TeraSort **/
+  public static final String TERASORT_TABLE = TERASORT + "table";
+  public static final String TERASORT_NUM_ROWS = TERASORT + "num.rows";
+  public static final String TERASORT_MIN_KEYSIZE = TERASORT + "min.keysize";
+  public static final String TERASORT_MAX_KEYSIZE = TERASORT + "max.keysize";
+  public static final String TERASORT_MIN_VALUESIZE = TERASORT + "min.valuesize";
+  public static final String TERASORT_MAX_VALUESIZE = TERASORT + "max.valuesize";
+  public static final String TERASORT_NUM_SPLITS = TERASORT + "num.splits";
+
+  /** RowHash **/
+  public static final String ROWHASH_INPUT_TABLE = ROWHASH + "input.table";
+  public static final String ROWHASH_OUTPUT_TABLE = ROWHASH + "output.table";
+  public static final String ROWHASH_COLUMN = ROWHASH + "column";
+
   public static Properties loadFromFile(String propsFilePath) {
     try {
       return loadFromStream(new FileInputStream(propsFilePath));
diff --git a/src/main/java/org/apache/accumulo/testing/mapreduce/RowHash.java b/src/main/java/org/apache/accumulo/testing/mapreduce/RowHash.java
index dca7128..4722799 100644
--- a/src/main/java/org/apache/accumulo/testing/mapreduce/RowHash.java
+++ b/src/main/java/org/apache/accumulo/testing/mapreduce/RowHash.java
@@ -20,6 +20,7 @@
 import java.util.Base64;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Properties;
 
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.data.Key;
@@ -27,8 +28,8 @@
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat;
-import org.apache.accumulo.testing.cli.ClientOpts;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.accumulo.testing.TestEnv;
+import org.apache.accumulo.testing.TestProps;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.Text;
@@ -37,8 +38,6 @@
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
-import com.beust.jcommander.Parameter;
-
 public class RowHash extends Configured implements Tool {
   /**
    * The Mapper class that given a row number, will generate the appropriate output line.
@@ -57,23 +56,16 @@
     public void setup(Context job) {}
   }
 
-  private static class Opts extends ClientOpts {
-    @Parameter(names = "--column", required = true)
-    String column;
-    @Parameter(names = {"-t", "--table"}, required = true, description = "table to use")
-    String tableName;
-  }
-
   @Override
   public int run(String[] args) throws Exception {
+    TestEnv env = new TestEnv(args);
     Job job = Job.getInstance(getConf());
     job.setJobName(this.getClass().getName());
     job.setJarByClass(this.getClass());
-    Opts opts = new Opts();
-    opts.parseArgs(RowHash.class.getName(), args);
     job.setInputFormatClass(AccumuloInputFormat.class);
 
-    String col = opts.column;
+    Properties props = env.getTestProperties();
+    String col = props.getProperty(TestProps.ROWHASH_COLUMN);
     int idx = col.indexOf(":");
     Text cf = new Text(idx < 0 ? col : col.substring(0, idx));
     Text cq = idx < 0 ? null : new Text(col.substring(idx + 1));
@@ -81,12 +73,15 @@
     if (cf.getLength() > 0)
       cols = Collections.singleton(new IteratorSetting.Column(cf, cq));
 
-    AccumuloInputFormat.configure().clientProperties(opts.getClientProps()).table(opts.tableName)
-        .auths(opts.auths).fetchColumns(cols).store(job);
+    String inputTable = props.getProperty(TestProps.ROWHASH_INPUT_TABLE);
+    String outputTable = props.getProperty(TestProps.ROWHASH_OUTPUT_TABLE);
 
-    AccumuloOutputFormat.configure().clientProperties(opts.getClientProps())
-        .defaultTable(opts.tableName).createTables(true).store(job);
+    AccumuloInputFormat.configure().clientProperties(env.getClientProps()).table(inputTable)
+        .fetchColumns(cols).store(job);
+    AccumuloOutputFormat.configure().clientProperties(env.getClientProps())
+        .defaultTable(outputTable).createTables(true).store(job);
 
+    job.getConfiguration().set("mapreduce.job.classloader", "true");
     job.setMapperClass(HashDataMapper.class);
     job.setMapOutputKeyClass(Text.class);
     job.setMapOutputValueClass(Mutation.class);
@@ -100,6 +95,7 @@
   }
 
   public static void main(String[] args) throws Exception {
-    ToolRunner.run(new Configuration(), new RowHash(), args);
+    TestEnv env = new TestEnv(args);
+    ToolRunner.run(env.getHadoopConfiguration(), new RowHash(), args);
   }
 }
diff --git a/src/main/java/org/apache/accumulo/testing/mapreduce/TeraSortIngest.java b/src/main/java/org/apache/accumulo/testing/mapreduce/TeraSortIngest.java
index cd47e1e..ee2b5d0 100644
--- a/src/main/java/org/apache/accumulo/testing/mapreduce/TeraSortIngest.java
+++ b/src/main/java/org/apache/accumulo/testing/mapreduce/TeraSortIngest.java
@@ -23,12 +23,15 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
 import java.util.Random;
 
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat;
-import org.apache.accumulo.testing.cli.ClientOpts;
+import org.apache.accumulo.testing.TestEnv;
+import org.apache.accumulo.testing.TestProps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.io.LongWritable;
@@ -46,8 +49,6 @@
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
-import com.beust.jcommander.Parameter;
-
 /**
  * Generate the *almost* official terasort input data set. (See below) The user specifies the number
  * of rows and the output directory and this class runs a map/reduce program to generate the data.
@@ -79,6 +80,8 @@
       long firstRow;
       long rowCount;
 
+      RangeInputSplit() {}
+
       public RangeInputSplit(long offset, long length) {
         firstRow = offset;
         rowCount = length;
@@ -165,8 +168,8 @@
      */
     @Override
     public List<InputSplit> getSplits(JobContext job) {
-      long totalRows = job.getConfiguration().getLong(NUMROWS, 0);
-      int numSplits = job.getConfiguration().getInt(NUMSPLITS, 1);
+      long totalRows = job.getConfiguration().getLong(TestProps.TERASORT_NUM_ROWS, 0);
+      int numSplits = job.getConfiguration().getInt(TestProps.TERASORT_NUM_SPLITS, 1);
       long rowsPerSplit = totalRows / numSplits;
       System.out.println(
           "Generating " + totalRows + " using " + numSplits + " maps with step of " + rowsPerSplit);
@@ -183,12 +186,9 @@
 
   }
 
-  private static String NUMSPLITS = "terasort.overridesplits";
-  private static String NUMROWS = "terasort.numrows";
-
   static class RandomGenerator {
     private long seed = 0;
-    private static final long mask32 = (1l << 32) - 1;
+    private static final long mask32 = (1L << 32) - 1;
     /**
      * The number of iterations separating the precomputed seeds.
      */
@@ -343,65 +343,49 @@
 
     @Override
     public void setup(Context job) {
-      minkeylength = job.getConfiguration().getInt("cloudgen.minkeylength", 0);
-      maxkeylength = job.getConfiguration().getInt("cloudgen.maxkeylength", 0);
-      minvaluelength = job.getConfiguration().getInt("cloudgen.minvaluelength", 0);
-      maxvaluelength = job.getConfiguration().getInt("cloudgen.maxvaluelength", 0);
-      tableName = new Text(job.getConfiguration().get("cloudgen.tablename"));
+      minkeylength = job.getConfiguration().getInt(TestProps.TERASORT_MIN_KEYSIZE, 0);
+      maxkeylength = job.getConfiguration().getInt(TestProps.TERASORT_MAX_KEYSIZE, 0);
+      minvaluelength = job.getConfiguration().getInt(TestProps.TERASORT_MIN_VALUESIZE, 0);
+      maxvaluelength = job.getConfiguration().getInt(TestProps.TERASORT_MAX_VALUESIZE, 0);
+      tableName = new Text(job.getConfiguration().get(TestProps.TERASORT_TABLE));
     }
   }
 
   public static void main(String[] args) throws Exception {
-    ToolRunner.run(new Configuration(), new TeraSortIngest(), args);
-  }
-
-  static class Opts extends ClientOpts {
-    @Parameter(names = {"-t", "--table"}, required = true, description = "table to use")
-    String tableName;
-    @Parameter(names = "--count", description = "number of rows to ingest", required = true)
-    long numRows;
-    @Parameter(names = {"-nk", "--minKeySize"}, description = "miniumum key size", required = true)
-    int minKeyLength;
-    @Parameter(names = {"-xk", "--maxKeySize"}, description = "maximum key size", required = true)
-    int maxKeyLength;
-    @Parameter(names = {"-nv", "--minValueSize"}, description = "minimum key size", required = true)
-    int minValueLength;
-    @Parameter(names = {"-xv", "--maxValueSize"}, description = "maximum key size", required = true)
-    int maxValueLength;
-    @Parameter(names = "--splits", description = "number of splits to create in the table")
-    int splits = 0;
+    TestEnv env = new TestEnv(args);
+    ToolRunner.run(env.getHadoopConfiguration(), new TeraSortIngest(), args);
   }
 
   @Override
   public int run(String[] args) throws Exception {
-    Job job = Job.getInstance(getConf());
-    job.setJobName("TeraSortCloud");
-    job.setJarByClass(this.getClass());
-    Opts opts = new Opts();
-    opts.parseArgs(TeraSortIngest.class.getName(), args);
 
+    TestEnv env = new TestEnv(args);
+
+    Job job = Job.getInstance(getConf());
+    job.setJobName("TeraSortIngest");
+    job.setJarByClass(this.getClass());
     job.setInputFormatClass(RangeInputFormat.class);
     job.setMapperClass(SortGenMapper.class);
     job.setMapOutputKeyClass(Text.class);
     job.setMapOutputValueClass(Mutation.class);
-
     job.setNumReduceTasks(0);
-
     job.setOutputFormatClass(AccumuloOutputFormat.class);
 
-    AccumuloOutputFormat.configure().clientProperties(opts.getClientProps()).createTables(true)
-        .defaultTable(opts.tableName);
+    Properties testProps = env.getTestProperties();
+    String tableName = testProps.getProperty(TestProps.TERASORT_TABLE);
+    Objects.requireNonNull(tableName);
+
+    AccumuloOutputFormat.configure().clientProperties(env.getClientProps()).createTables(true)
+        .defaultTable(tableName).store(job);
 
     Configuration conf = job.getConfiguration();
-    conf.setLong(NUMROWS, opts.numRows);
-    conf.setInt("cloudgen.minkeylength", opts.minKeyLength);
-    conf.setInt("cloudgen.maxkeylength", opts.maxKeyLength);
-    conf.setInt("cloudgen.minvaluelength", opts.minValueLength);
-    conf.setInt("cloudgen.maxvaluelength", opts.maxValueLength);
-    conf.set("cloudgen.tablename", opts.tableName);
-
-    if (args.length > 10)
-      conf.setInt(NUMSPLITS, opts.splits);
+    conf.set("mapreduce.job.classloader", "true");
+    for (Object keyObj : testProps.keySet()) {
+      String key = (String) keyObj;
+      if (key.startsWith(TestProps.TERASORT)) {
+        conf.set(key, testProps.getProperty(key));
+      }
+    }
 
     job.waitForCompletion(true);
     return job.isSuccessful() ? 0 : 1;
diff --git a/src/test/java/org/apache/accumulo/testing/randomwalk/ReplicationRandomWalkIT.java b/src/test/java/org/apache/accumulo/testing/randomwalk/ReplicationRandomWalkIT.java
index 6271fd4..972206a 100644
--- a/src/test/java/org/apache/accumulo/testing/randomwalk/ReplicationRandomWalkIT.java
+++ b/src/test/java/org/apache/accumulo/testing/randomwalk/ReplicationRandomWalkIT.java
@@ -50,5 +50,4 @@
 
     r.visit(null, env, null);
   }
-
 }