Improvements made for 2.0.0-alpha-2 CI testing (#51)

* Improvements made for 2.0.0-alpha-2 CI testing

Added ability to set test options on command line. This makes running different
types of ingest from the same docker image easy.  Without this change a new
docker image would need to be created for each different ingest type.

Documented commands for running a long running ingest test.

Made the ingest flush size configurable.  This made it possible to write really
small amounts of data and then pause.  This changes makes testing
for the problem in apache/accumulo#854 possible.

Added docs for how to copy docker image to other nodes if you don't have
a repository handy.
diff --git a/README.md b/README.md
index 1a758a1..f9acb26 100644
--- a/README.md
+++ b/README.md
@@ -66,13 +66,18 @@
 3. Multiple containers can also be run (if you have [Docker Swarm] enabled):
 
    ```bash
+   # the following can be used to get the image on all nodes if you do not have a registry.
+   for HOST in node1 node2 node3; do
+     docker save accumulo-testing | ssh -C $HOST docker load &
+   done
+
    docker service create --network="host" --replicas 2 --name ci accumulo-testing cingest ingest
    ```
 
 ## Random walk test
 
 The random walk test generates client behavior on an Apache Accumulo instance by randomly walking a
-graph of client operations. 
+graph of client operations.
 
 Before running random walk, review the `test.common.*` properties in `accumulo-testing.properties`
 file. A test module must also be specified. See the [modules] directory for a list of available ones.
@@ -101,12 +106,12 @@
 table is set by the property `test.ci.common.accumulo.table` (its value defaults to `ci`) in the file
 `accumulo-testing.properties`:
 
-          ./bin/cingest createtable
+          ./bin/cingest createtable {-o test.<prop>=<value>}
 
 The continuous ingest tests have several applications that start a local application which will run
 continuously until you stop using `ctrl-c`:
 
-          ./bin/cingest <application>
+          ./bin/cingest <application> {-o test.<prop>=<value>}
 
 Below is a list of available continuous ingest applications. You should run the `ingest` application
 first to add data to your table.
@@ -131,6 +136,9 @@
 created by the MapReduce job itself). Stop ingest before running this MapReduce job. Do not run more
 than one instance of this MapReduce job concurrently against a table.
 
+Checkout [ingest-test.md](docs/ingest-test.md) for pointers on running a long
+running ingest and verification test.
+
 ## Agitator
 
 The agitator will periodically kill the Accumulo master, tablet server, and Hadoop data node
@@ -208,7 +216,7 @@
 ```
 
 An example script for [Uno] is provided.  To use this do the following and set
-`UNO_HOME` after copying. 
+`UNO_HOME` after copying.
 
     cp conf/cluster-control.sh.uno conf/cluster-control.sh
 
diff --git a/bin/agitator b/bin/agitator
index 8933b7f..bd2b445 100755
--- a/bin/agitator
+++ b/bin/agitator
@@ -29,10 +29,10 @@
 EOF
 }
 
-if [ -f "$at_home/conf/accumulo-testing-env.sh" ]; then
-  . "$at_home"/conf/accumulo-testing-env.sh
+if [ -f "$at_home/conf/env.sh" ]; then
+  . "$at_home"/conf/env.sh
 else
-  . "$at_home"/conf/accumulo-testing-env.sh.example
+  . "$at_home"/conf/env.sh.example
 fi
 
 function start_agitator() {
diff --git a/bin/cingest b/bin/cingest
index 6cbbb40..39e6d33 100755
--- a/bin/cingest
+++ b/bin/cingest
@@ -21,7 +21,7 @@
 function print_usage() {
   cat <<EOF
 
-Usage: cingest <application>
+Usage: cingest <application> {-o test.<prop>=<value>}
 
 Available applications:
 
@@ -83,12 +83,12 @@
   verify|moru)
     if [ ! -z $HADOOP_HOME ]; then
       export HADOOP_USE_CLIENT_CLASSLOADER=true
-      "$HADOOP_HOME"/bin/yarn jar "$TEST_JAR_PATH" "$ci_main" "$TEST_PROPS" "$ACCUMULO_CLIENT_PROPS"
+      "$HADOOP_HOME"/bin/yarn jar "$TEST_JAR_PATH" "$ci_main" "${@:2}" "$TEST_PROPS" "$ACCUMULO_CLIENT_PROPS"
     else
       echo "Hadoop must be installed and HADOOP_HOME must be set!"
       exit 1
     fi
     ;;
   *)
-    java -Dlog4j.configuration="file:$TEST_LOG4J" "$ci_main" "$TEST_PROPS" "$ACCUMULO_CLIENT_PROPS"
+    java -Dlog4j.configuration="file:$TEST_LOG4J" "$ci_main" "${@:2}" "$TEST_PROPS" "$ACCUMULO_CLIENT_PROPS"
 esac
diff --git a/conf/accumulo-testing.properties.example b/conf/accumulo-testing.properties.example
index b084c4c..b64a86e 100644
--- a/conf/accumulo-testing.properties.example
+++ b/conf/accumulo-testing.properties.example
@@ -44,6 +44,8 @@
 # ------
 # Number of entries each ingest client should write
 test.ci.ingest.client.entries=9223372036854775807
+# Flush batch writer after this many entries.
+test.ci.ingest.entries.flush=1000000
 # Minimum random row to generate
 test.ci.ingest.row.min=0
 # Maximum random row to generate
@@ -57,7 +59,8 @@
 test.ci.ingest.visibilities=
 # Checksums will be generated during ingest if set to true
 test.ci.ingest.checksum=true
-# Enables periodic pausing of ingest
+# Enables periodic pausing of ingest. Pause checks are only done after a flush. To write small
+# amounts of data and then pause, set pause.wait.max and entries.flush small.
 test.ci.ingest.pause.enabled=false
 # Minimum wait between ingest pauses (in seconds)
 test.ci.ingest.pause.wait.min=120
@@ -90,7 +93,7 @@
 # Verify
 # -----
 # Maximum number of mapreduce mappers
-test.ci.verify.max.maps=64
+test.ci.verify.max.maps=4096
 # Number of mapreduce reducers
 test.ci.verify.reducers=64
 # Perform the verification directly on the files while the table is offline
diff --git a/docs/ingest-test.md b/docs/ingest-test.md
new file mode 100644
index 0000000..fcec446
--- /dev/null
+++ b/docs/ingest-test.md
@@ -0,0 +1,85 @@
+# Running a long continuous ingest test
+
+Running continuous ingest for long periods on a cluster is a good way to
+validate an Accumulo release.  This document outlines one possible way to do
+that.  The commands below show how to start different types of ingest into 
+multiple tables.  These commands assume the docker image was created for 
+accumulo-testing and that docker swarm is available on the cluster.
+
+```bash
+# Number of nodes in cluster
+NUM_NODES=10
+
+# Start lots of processes writing to a single table as fast as possible
+docker run --network="host" accumulo-testing cingest createtable
+docker service create --network="host" --replicas $NUM_NODES --name ci \
+   accumulo-testing cingest ingest \
+   -o test.ci.ingest.pause.enabled=false
+
+# Write data with pauses.  Should cause tablets to not have data in some write
+# ahead logs.  But there is still enough write pressure to cause minor
+# compactions.
+#
+# Some of the write ahead log recovery bugs fixed in 1.9.1 and 1.9.2 were not
+# seen with continuous writes.
+#
+for i in $(seq 1 $NUM_NODES); do
+  TABLE="cip_$i"
+  docker run --network="host" accumulo-testing cingest createtable \
+     -o test.ci.common.accumulo.table=$TABLE \
+     -o test.ci.common.accumulo.num.tablets=$(( $NUM_NODES * 4 ))
+  docker service create --network="host" --replicas 1 --name $TABLE \
+     accumulo-testing cingest ingest \
+     -o test.ci.common.accumulo.table=$TABLE \
+     -o test.ci.ingest.pause.enabled=true
+done
+
+# Write very small amounts of data with long pauses in between.  Should cause
+# data to be spread across lots of write ahead logs.  So little data is written
+# that minor compactions may not happen because of writes.  If Accumulo does
+# nothing then this will result in tablet servers having lots of write ahead
+# logs.
+#
+# https://github.com/apache/accumulo/issues/854
+#
+for FLUSH_SIZE in 97 101 997 1009 ; do
+  TABLE="cip_small_$FLUSH_SIZE"
+  docker run --network="host" accumulo-testing cingest createtable \
+     -o test.ci.common.accumulo.table=$TABLE \
+     -o test.ci.common.accumulo.num.tablets=$(( $NUM_NODES * 2 ))
+  docker service create --network="host" --replicas 1 --name $TABLE \
+     accumulo-testing cingest ingest \
+     -o test.ci.common.accumulo.table=$TABLE \
+     -o test.ci.ingest.pause.enabled=true \
+     -o test.ci.ingest.pause.wait.min=1 \
+     -o test.ci.ingest.pause.wait.max=3 \
+     -o test.ci.ingest.entries.flush=$FLUSH_SIZE
+done
+```
+
+After starting the ingest, consider starting the agitator.  Testing with and
+without agitation is valuable.  Let the ingest run for a period of 12 to 36
+hours. Then stop it as follows.
+
+
+```bash
+# stop the agitator if started
+
+# stop all docker services (assuming its only ingest started above, otherwise do not run)
+docker service rm $(docker service ls -q)
+```
+
+After ingest stops verify the data.
+
+```bash
+# run verification map reduce jobs
+mkdir -p logs
+accumulo shell -u root -p secret -e tables | grep ci | while read table ; do
+  nohup ./bin/cingest verify \
+      -o test.ci.common.accumulo.table=$table \
+      -o test.ci.verify.output.dir=/tmp/$table-verify \
+          &> logs/verify_$table.log &
+done
+```
+
+
diff --git a/src/main/java/org/apache/accumulo/testing/TestEnv.java b/src/main/java/org/apache/accumulo/testing/TestEnv.java
index 601db3c..d36fa1a 100644
--- a/src/main/java/org/apache/accumulo/testing/TestEnv.java
+++ b/src/main/java/org/apache/accumulo/testing/TestEnv.java
@@ -1,8 +1,10 @@
 package org.apache.accumulo.testing;
 
-import static java.util.Objects.requireNonNull;
-
 import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.accumulo.core.client.Accumulo;
@@ -19,12 +21,36 @@
   private AccumuloClient client = null;
   private Configuration hadoopConfig = null;
 
-  public TestEnv(String testPropsPath, String clientPropsPath) {
-    requireNonNull(testPropsPath);
-    requireNonNull(clientPropsPath);
+  public TestEnv(String[] args) {
+
+    Map<String, String> options = new HashMap<>();
+    List<String> arguments = new ArrayList<>();
+
+    for (int i = 0; i < args.length; i++) {
+      if(args[i].equals("-o")) {
+        i++;
+        String[] tokens = args[i].split("=",2);
+        options.put(tokens[0], tokens[1]);
+      } else {
+        arguments.add(args[i]);
+      }
+    }
+
+    if(arguments.size() != 2) {
+      throw new IllegalArgumentException("Expected <testPropsPath> <clientPropsPath> arguments.");
+    }
+
+    String testPropsPath = arguments.get(0);
+    clientPropsPath = arguments.get(1);
+
     this.testProps = TestProps.loadFromFile(testPropsPath);
-    this.clientPropsPath = clientPropsPath;
     this.clientProps = Accumulo.newClientProperties().from(clientPropsPath).build();
+
+    options.forEach((k,v) -> testProps.setProperty(k, v));
+  }
+
+  public TestEnv(String testPropsPath, String clientPropsPath) {
+    this(new String[] {testPropsPath, clientPropsPath});
   }
 
   private Properties copyProperties(Properties props) {
diff --git a/src/main/java/org/apache/accumulo/testing/TestProps.java b/src/main/java/org/apache/accumulo/testing/TestProps.java
index f8eb640..ad49fd3 100644
--- a/src/main/java/org/apache/accumulo/testing/TestProps.java
+++ b/src/main/java/org/apache/accumulo/testing/TestProps.java
@@ -82,6 +82,8 @@
   public static final String CI_INGEST_PAUSE_DURATION_MIN = CI_INGEST + "pause.duration.min";
   // Maximum pause duration (in seconds)
   public static final String CI_INGEST_PAUSE_DURATION_MAX = CI_INGEST + "pause.duration.max";
+  // Amount of data to write before flushing. Pause checks are only done after flush.
+  public static final String CI_INGEST_FLUSH_ENTRIES = CI_INGEST + "entries.flush";
 
   /** Batch Walker **/
   // Sleep time between batch scans (in ms)
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousBatchWalker.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousBatchWalker.java
index 1716bdf..6c02f7f 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousBatchWalker.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousBatchWalker.java
@@ -16,6 +16,8 @@
  */
 package org.apache.accumulo.testing.continuous;
 
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -35,40 +37,34 @@
 import org.apache.accumulo.testing.TestProps;
 import org.apache.hadoop.io.Text;
 
-import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
-
 public class ContinuousBatchWalker {
 
   public static void main(String[] args) throws Exception {
 
-    if (args.length != 2) {
-      System.err.println("Usage: ContinuousBatchWalker <testPropsPath> <clientPropsPath>");
-      System.exit(-1);
-    }
-    ContinuousEnv env = new ContinuousEnv(args[0], args[1]);
+    try (ContinuousEnv env = new ContinuousEnv(args)) {
+      Authorizations auths = env.getRandomAuthorizations();
+      AccumuloClient client = env.getAccumuloClient();
+      Scanner scanner = ContinuousUtil.createScanner(client, env.getAccumuloTableName(), auths);
+      int scanBatchSize = Integer.parseInt(env.getTestProperty(TestProps.CI_BW_BATCH_SIZE));
+      scanner.setBatchSize(scanBatchSize);
 
-    Authorizations auths = env.getRandomAuthorizations();
-    AccumuloClient client = env.getAccumuloClient();
-    Scanner scanner = ContinuousUtil.createScanner(client, env.getAccumuloTableName(), auths);
-    int scanBatchSize = Integer.parseInt(env.getTestProperty(TestProps.CI_BW_BATCH_SIZE));
-    scanner.setBatchSize(scanBatchSize);
+      Random r = new Random();
 
-    Random r = new Random();
+      while (true) {
+        BatchScanner bs = client.createBatchScanner(env.getAccumuloTableName(), auths);
 
-    while (true) {
-      BatchScanner bs = client.createBatchScanner(env.getAccumuloTableName(), auths);
+        Set<Text> batch = getBatch(scanner, env.getRowMin(), env.getRowMax(), scanBatchSize, r);
+        List<Range> ranges = new ArrayList<>(batch.size());
 
-      Set<Text> batch = getBatch(scanner, env.getRowMin(), env.getRowMax(), scanBatchSize, r);
-      List<Range> ranges = new ArrayList<>(batch.size());
+        for (Text row : batch) {
+          ranges.add(new Range(row));
+        }
 
-      for (Text row : batch) {
-        ranges.add(new Range(row));
+        runBatchScan(scanBatchSize, bs, batch, ranges);
+
+        int bwSleepMs = Integer.parseInt(env.getTestProperty(TestProps.CI_BW_SLEEP_MS));
+        sleepUninterruptibly(bwSleepMs, TimeUnit.MILLISECONDS);
       }
-
-      runBatchScan(scanBatchSize, bs, batch, ranges);
-
-      int bwSleepMs = Integer.parseInt(env.getTestProperty(TestProps.CI_BW_SLEEP_MS));
-      sleepUninterruptibly(bwSleepMs, TimeUnit.MILLISECONDS);
     }
   }
 
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousEnv.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousEnv.java
index 9642cf3..7d9f89c 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousEnv.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousEnv.java
@@ -13,8 +13,8 @@
 
   private List<Authorizations> authList;
 
-  ContinuousEnv(String testPropsPath, String clientPropsPath) {
-    super(testPropsPath, clientPropsPath);
+  ContinuousEnv(String[] args) {
+    super(args);
   }
 
   /**
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
index a62ed13..e4a5935 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
@@ -84,6 +84,10 @@
     return (rand.nextInt(durationMax - durationMin) + durationMin);
   }
 
+  private static int getFlushEntries(Properties props) {
+    return Integer.parseInt(props.getProperty(TestProps.CI_INGEST_FLUSH_ENTRIES, "1000000"));
+  }
+
   private static void pauseCheck(Properties props, Random rand) throws InterruptedException {
     if (pauseEnabled(props)) {
       long elapsedNano = System.nanoTime() - lastPauseNs;
@@ -100,110 +104,88 @@
 
   public static void main(String[] args) throws Exception {
 
-    if (args.length != 2) {
-      System.err.println("Usage: ContinuousIngest <testPropsPath> <clientPropsPath>");
-      System.exit(-1);
-    }
+    try (ContinuousEnv env = new ContinuousEnv(args)) {
 
-    ContinuousEnv env = new ContinuousEnv(args[0], args[1]);
-
-    String vis = env.getTestProperty(TestProps.CI_INGEST_VISIBILITIES);
-    if (vis == null) {
-      visibilities = Collections.singletonList(new ColumnVisibility());
-    } else {
-      visibilities = new ArrayList<>();
-      for (String v : vis.split(",")) {
-        visibilities.add(new ColumnVisibility(v.trim()));
-      }
-    }
-
-    long rowMin = env.getRowMin();
-    long rowMax = env.getRowMax();
-    if (rowMin < 0 || rowMax < 0 || rowMax <= rowMin) {
-      throw new IllegalArgumentException("bad min and max");
-    }
-
-    AccumuloClient client = env.getAccumuloClient();
-    String tableName = env.getAccumuloTableName();
-    if (!client.tableOperations().exists(tableName)) {
-      throw new TableNotFoundException(null, tableName,
-          "Consult the README and create the table before starting ingest.");
-    }
-
-    BatchWriter bw = client.createBatchWriter(tableName);
-    bw = Trace.wrapAll(bw, TraceSamplers.countSampler(1024));
-
-    Random r = new Random();
-
-    byte[] ingestInstanceId = UUID.randomUUID().toString().getBytes(UTF_8);
-
-    log.info(String.format("UUID %d %s", System.currentTimeMillis(), new String(ingestInstanceId,
-        UTF_8)));
-
-    long count = 0;
-    final int flushInterval = 1000000;
-    final int maxDepth = 25;
-
-    // always want to point back to flushed data. This way the previous item
-    // should
-    // always exist in accumulo when verifying data. To do this make insert
-    // N point
-    // back to the row from insert (N - flushInterval). The array below is
-    // used to keep
-    // track of this.
-    long prevRows[] = new long[flushInterval];
-    long firstRows[] = new long[flushInterval];
-    int firstColFams[] = new int[flushInterval];
-    int firstColQuals[] = new int[flushInterval];
-
-    long lastFlushTime = System.currentTimeMillis();
-
-    int maxColF = env.getMaxColF();
-    int maxColQ = env.getMaxColQ();
-    boolean checksum = Boolean.parseBoolean(env.getTestProperty(TestProps.CI_INGEST_CHECKSUM));
-    long numEntries = Long.parseLong(env.getTestProperty(TestProps.CI_INGEST_CLIENT_ENTRIES));
-
-    Properties testProps = env.getTestProperties();
-    if (pauseEnabled(testProps)) {
-      lastPauseNs = System.nanoTime();
-      pauseWaitSec = getPauseWaitSec(testProps, r);
-      log.info("PAUSING enabled");
-      log.info("INGESTING for " + pauseWaitSec + "s");
-    }
-
-    out: while (true) {
-      // generate first set of nodes
-      ColumnVisibility cv = getVisibility(r);
-
-      for (int index = 0; index < flushInterval; index++) {
-        long rowLong = genLong(rowMin, rowMax, r);
-        prevRows[index] = rowLong;
-        firstRows[index] = rowLong;
-
-        int cf = r.nextInt(maxColF);
-        int cq = r.nextInt(maxColQ);
-
-        firstColFams[index] = cf;
-        firstColQuals[index] = cq;
-
-        Mutation m = genMutation(rowLong, cf, cq, cv, ingestInstanceId, count, null, checksum);
-        count++;
-        bw.addMutation(m);
+      String vis = env.getTestProperty(TestProps.CI_INGEST_VISIBILITIES);
+      if (vis == null) {
+        visibilities = Collections.singletonList(new ColumnVisibility());
+      } else {
+        visibilities = new ArrayList<>();
+        for (String v : vis.split(",")) {
+          visibilities.add(new ColumnVisibility(v.trim()));
+        }
       }
 
-      lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
-      if (count >= numEntries)
-        break out;
+      long rowMin = env.getRowMin();
+      long rowMax = env.getRowMax();
+      if (rowMin < 0 || rowMax < 0 || rowMax <= rowMin) {
+        throw new IllegalArgumentException("bad min and max");
+      }
 
-      // generate subsequent sets of nodes that link to previous set of
-      // nodes
-      for (int depth = 1; depth < maxDepth; depth++) {
+      AccumuloClient client = env.getAccumuloClient();
+      String tableName = env.getAccumuloTableName();
+      if (!client.tableOperations().exists(tableName)) {
+        throw new TableNotFoundException(null, tableName,
+            "Consult the README and create the table before starting ingest.");
+      }
+
+      BatchWriter bw = client.createBatchWriter(tableName);
+      bw = Trace.wrapAll(bw, TraceSamplers.countSampler(1024));
+
+      Random r = new Random();
+
+      byte[] ingestInstanceId = UUID.randomUUID().toString().getBytes(UTF_8);
+
+      log.info(String.format("UUID %d %s", System.currentTimeMillis(), new String(ingestInstanceId,
+          UTF_8)));
+
+      long count = 0;
+      final int flushInterval = getFlushEntries(env.getTestProperties());
+      final int maxDepth = 25;
+
+      // always want to point back to flushed data. This way the previous item
+      // should
+      // always exist in accumulo when verifying data. To do this make insert
+      // N point
+      // back to the row from insert (N - flushInterval). The array below is
+      // used to keep
+      // track of this.
+      long prevRows[] = new long[flushInterval];
+      long firstRows[] = new long[flushInterval];
+      int firstColFams[] = new int[flushInterval];
+      int firstColQuals[] = new int[flushInterval];
+
+      long lastFlushTime = System.currentTimeMillis();
+
+      int maxColF = env.getMaxColF();
+      int maxColQ = env.getMaxColQ();
+      boolean checksum = Boolean.parseBoolean(env.getTestProperty(TestProps.CI_INGEST_CHECKSUM));
+      long numEntries = Long.parseLong(env.getTestProperty(TestProps.CI_INGEST_CLIENT_ENTRIES));
+
+      Properties testProps = env.getTestProperties();
+      if (pauseEnabled(testProps)) {
+        lastPauseNs = System.nanoTime();
+        pauseWaitSec = getPauseWaitSec(testProps, r);
+        log.info("PAUSING enabled");
+        log.info("INGESTING for " + pauseWaitSec + "s");
+      }
+
+      out: while (true) {
+        // generate first set of nodes
+        ColumnVisibility cv = getVisibility(r);
+
         for (int index = 0; index < flushInterval; index++) {
           long rowLong = genLong(rowMin, rowMax, r);
-          byte[] prevRow = genRow(prevRows[index]);
           prevRows[index] = rowLong;
-          Mutation m = genMutation(rowLong, r.nextInt(maxColF), r.nextInt(maxColQ), cv,
-              ingestInstanceId, count, prevRow, checksum);
+          firstRows[index] = rowLong;
+
+          int cf = r.nextInt(maxColF);
+          int cq = r.nextInt(maxColQ);
+
+          firstColFams[index] = cf;
+          firstColQuals[index] = cq;
+
+          Mutation m = genMutation(rowLong, cf, cq, cv, ingestInstanceId, count, null, checksum);
           count++;
           bw.addMutation(m);
         }
@@ -211,23 +193,41 @@
         lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
         if (count >= numEntries)
           break out;
+
+        // generate subsequent sets of nodes that link to previous set of
+        // nodes
+        for (int depth = 1; depth < maxDepth; depth++) {
+          for (int index = 0; index < flushInterval; index++) {
+            long rowLong = genLong(rowMin, rowMax, r);
+            byte[] prevRow = genRow(prevRows[index]);
+            prevRows[index] = rowLong;
+            Mutation m = genMutation(rowLong, r.nextInt(maxColF), r.nextInt(maxColQ), cv,
+                ingestInstanceId, count, prevRow, checksum);
+            count++;
+            bw.addMutation(m);
+          }
+
+          lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
+          if (count >= numEntries)
+            break out;
+          pauseCheck(testProps, r);
+        }
+
+        // create one big linked list, this makes all of the first inserts
+        // point to something
+        for (int index = 0; index < flushInterval - 1; index++) {
+          Mutation m = genMutation(firstRows[index], firstColFams[index], firstColQuals[index], cv,
+              ingestInstanceId, count, genRow(prevRows[index + 1]), checksum);
+          count++;
+          bw.addMutation(m);
+        }
+        lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
+        if (count >= numEntries)
+          break out;
         pauseCheck(testProps, r);
       }
-
-      // create one big linked list, this makes all of the first inserts
-      // point to something
-      for (int index = 0; index < flushInterval - 1; index++) {
-        Mutation m = genMutation(firstRows[index], firstColFams[index], firstColQuals[index], cv,
-            ingestInstanceId, count, genRow(prevRows[index + 1]), checksum);
-        count++;
-        bw.addMutation(m);
-      }
-      lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
-      if (count >= numEntries)
-        break out;
-      pauseCheck(testProps, r);
+      bw.close();
     }
-    bw.close();
   }
 
   private static long flush(BatchWriter bw, long count, final int flushInterval, long lastFlushTime)
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousMoru.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousMoru.java
index 663ed33..c7d102c 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousMoru.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousMoru.java
@@ -23,13 +23,13 @@
 import java.util.Set;
 import java.util.UUID;
 
-import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
-import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat;
 import org.apache.accumulo.testing.TestProps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
@@ -113,11 +113,7 @@
   @Override
   public int run(String[] args) throws Exception {
 
-    if (args.length != 2) {
-      System.err.println("Usage: ContinuousMoru <testPropsPath> <clientPropsPath>");
-      System.exit(-1);
-    }
-    ContinuousEnv env = new ContinuousEnv(args[0], args[1]);
+    ContinuousEnv env = new ContinuousEnv(args);
 
     Job job = Job.getInstance(getConf(),
         this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
@@ -151,11 +147,7 @@
   }
 
   public static void main(String[] args) throws Exception {
-    if (args.length != 2) {
-      System.err.println("Usage: ContinuousMoru <testPropsPath> <clientPropsPath>");
-      System.exit(-1);
-    }
-    ContinuousEnv env = new ContinuousEnv(args[0], args[1]);
+    ContinuousEnv env = new ContinuousEnv(args);
     int res = ToolRunner.run(env.getHadoopConfiguration(), new ContinuousMoru(), args);
     if (res != 0)
       System.exit(res);
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousScanner.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousScanner.java
index e4bab76..9fd4a80 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousScanner.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousScanner.java
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.testing.continuous;
 
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static java.nio.charset.StandardCharsets.UTF_8;
 
 import java.util.Iterator;
@@ -32,75 +33,70 @@
 import org.apache.accumulo.testing.TestProps;
 import org.apache.hadoop.io.Text;
 
-import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
-
 public class ContinuousScanner {
 
   public static void main(String[] args) throws Exception {
-    if (args.length != 2) {
-      System.err.println("Usage: ContinuousScanner <testPropsPath> <clientPropsPath>");
-      System.exit(-1);
-    }
-    ContinuousEnv env = new ContinuousEnv(args[0], args[1]);
 
-    Random r = new Random();
+    try (ContinuousEnv env = new ContinuousEnv(args)) {
 
-    long distance = 1000000000000l;
+      Random r = new Random();
 
-    AccumuloClient client = env.getAccumuloClient();
-    Authorizations auths = env.getRandomAuthorizations();
-    Scanner scanner = ContinuousUtil.createScanner(client, env.getAccumuloTableName(), auths);
+      long distance = 1000000000000l;
 
-    int numToScan = Integer.parseInt(env.getTestProperty(TestProps.CI_SCANNER_ENTRIES));
-    int scannerSleepMs = Integer.parseInt(env.getTestProperty(TestProps.CI_SCANNER_SLEEP_MS));
+      AccumuloClient client = env.getAccumuloClient();
+      Authorizations auths = env.getRandomAuthorizations();
+      Scanner scanner = ContinuousUtil.createScanner(client, env.getAccumuloTableName(), auths);
 
-    double delta = Math.min(.05, .05 / (numToScan / 1000.0));
+      int numToScan = Integer.parseInt(env.getTestProperty(TestProps.CI_SCANNER_ENTRIES));
+      int scannerSleepMs = Integer.parseInt(env.getTestProperty(TestProps.CI_SCANNER_SLEEP_MS));
 
-    while (true) {
-      long startRow = ContinuousIngest.genLong(env.getRowMin(), env.getRowMax() - distance, r);
-      byte[] scanStart = ContinuousIngest.genRow(startRow);
-      byte[] scanStop = ContinuousIngest.genRow(startRow + distance);
+      double delta = Math.min(.05, .05 / (numToScan / 1000.0));
 
-      scanner.setRange(new Range(new Text(scanStart), new Text(scanStop)));
+      while (true) {
+        long startRow = ContinuousIngest.genLong(env.getRowMin(), env.getRowMax() - distance, r);
+        byte[] scanStart = ContinuousIngest.genRow(startRow);
+        byte[] scanStop = ContinuousIngest.genRow(startRow + distance);
 
-      int count = 0;
-      Iterator<Entry<Key,Value>> iter = scanner.iterator();
+        scanner.setRange(new Range(new Text(scanStart), new Text(scanStop)));
 
-      long t1 = System.currentTimeMillis();
+        int count = 0;
+        Iterator<Entry<Key,Value>> iter = scanner.iterator();
 
-      while (iter.hasNext()) {
-        Entry<Key,Value> entry = iter.next();
-        ContinuousWalk.validate(entry.getKey(), entry.getValue());
-        count++;
-      }
+        long t1 = System.currentTimeMillis();
 
-      long t2 = System.currentTimeMillis();
-
-      // System.out.println("P1 " +count +" "+((1-delta) *
-      // numToScan)+" "+((1+delta) * numToScan)+" "+numToScan);
-
-      if (count < (1 - delta) * numToScan || count > (1 + delta) * numToScan) {
-        if (count == 0) {
-          distance = distance * 10;
-          if (distance < 0)
-            distance = 1000000000000l;
-        } else {
-          double ratio = (double) numToScan / count;
-          // move ratio closer to 1 to make change slower
-          ratio = ratio - (ratio - 1.0) * (2.0 / 3.0);
-          distance = (long) (ratio * distance);
+        while (iter.hasNext()) {
+          Entry<Key,Value> entry = iter.next();
+          ContinuousWalk.validate(entry.getKey(), entry.getValue());
+          count++;
         }
 
-        // System.out.println("P2 "+delta
-        // +" "+numToScan+" "+distance+"  "+((double)numToScan/count ));
-      }
+        long t2 = System.currentTimeMillis();
 
-      System.out.printf("SCN %d %s %d %d%n", t1, new String(scanStart, UTF_8), (t2 - t1), count);
+        // System.out.println("P1 " +count +" "+((1-delta) *
+        // numToScan)+" "+((1+delta) * numToScan)+" "+numToScan);
 
-      if (scannerSleepMs > 0) {
-        sleepUninterruptibly(scannerSleepMs, TimeUnit.MILLISECONDS);
+        if (count < (1 - delta) * numToScan || count > (1 + delta) * numToScan) {
+          if (count == 0) {
+            distance = distance * 10;
+            if (distance < 0)
+              distance = 1000000000000l;
+          } else {
+            double ratio = (double) numToScan / count;
+            // move ratio closer to 1 to make change slower
+            ratio = ratio - (ratio - 1.0) * (2.0 / 3.0);
+            distance = (long) (ratio * distance);
+          }
+
+          // System.out.println("P2 "+delta
+          // +" "+numToScan+" "+distance+"  "+((double)numToScan/count ));
+        }
+
+        System.out.printf("SCN %d %s %d %d%n", t1, new String(scanStart, UTF_8), (t2 - t1), count);
+
+        if (scannerSleepMs > 0) {
+          sleepUninterruptibly(scannerSleepMs, TimeUnit.MILLISECONDS);
+        }
       }
     }
-
   }
 }
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousVerify.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousVerify.java
index e862d66..d94764e 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousVerify.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousVerify.java
@@ -26,10 +26,10 @@
 import java.util.Set;
 
 import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.testing.TestProps;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
@@ -140,21 +140,19 @@
 
   @Override
   public int run(String[] args) throws Exception {
-    if (args.length != 2) {
-      System.err.println("Usage: ContinuousVerify <testPropsPath> <clientPropsPath>");
-      System.exit(-1);
-    }
-    ContinuousEnv env = new ContinuousEnv(args[0], args[1]);
+
+    ContinuousEnv env = new ContinuousEnv(args);
+
+    String tableName = env.getAccumuloTableName();
 
     Job job = Job.getInstance(getConf(),
-        this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+        this.getClass().getSimpleName() + "_"+ tableName + "_" + System.currentTimeMillis());
     job.setJarByClass(this.getClass());
 
     job.setInputFormatClass(AccumuloInputFormat.class);
 
     boolean scanOffline = Boolean.parseBoolean(env
         .getTestProperty(TestProps.CI_VERIFY_SCAN_OFFLINE));
-    String tableName = env.getAccumuloTableName();
     int maxMaps = Integer.parseInt(env.getTestProperty(TestProps.CI_VERIFY_MAX_MAPS));
     int reducers = Integer.parseInt(env.getTestProperty(TestProps.CI_VERIFY_REDUCERS));
     String outputDir = env.getTestProperty(TestProps.CI_VERIFY_OUTPUT_DIR);
@@ -202,11 +200,7 @@
   }
 
   public static void main(String[] args) throws Exception {
-    if (args.length != 2) {
-      System.err.println("Usage: ContinuousVerify <testPropsPath> <clientPropsPath>");
-      System.exit(-1);
-    }
-    ContinuousEnv env = new ContinuousEnv(args[0], args[1]);
+    ContinuousEnv env = new ContinuousEnv(args);
 
     int res = ToolRunner.run(env.getHadoopConfiguration(), new ContinuousVerify(), args);
     if (res != 0)
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java
index 7f8a024..2094ec9 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java
@@ -45,58 +45,56 @@
   }
 
   public static void main(String[] args) throws Exception {
-    if (args.length != 2) {
-      System.err.println("Usage: ContinuousWalk <testPropsPath> <clientPropsPath>");
-      System.exit(-1);
-    }
-    ContinuousEnv env = new ContinuousEnv(args[0], args[1]);
 
-    AccumuloClient client = env.getAccumuloClient();
+    try (ContinuousEnv env = new ContinuousEnv(args)) {
 
-    Random r = new Random();
+      AccumuloClient client = env.getAccumuloClient();
 
-    ArrayList<Value> values = new ArrayList<>();
+      Random r = new Random();
 
-    int sleepTime = Integer.parseInt(env.getTestProperty(TestProps.CI_WALKER_SLEEP_MS));
+      ArrayList<Value> values = new ArrayList<>();
 
-    while (true) {
-      Scanner scanner = ContinuousUtil.createScanner(client, env.getAccumuloTableName(),
-          env.getRandomAuthorizations());
-      String row = findAStartRow(env.getRowMin(), env.getRowMax(), scanner, r);
+      int sleepTime = Integer.parseInt(env.getTestProperty(TestProps.CI_WALKER_SLEEP_MS));
 
-      while (row != null) {
+      while (true) {
+        Scanner scanner = ContinuousUtil.createScanner(client, env.getAccumuloTableName(),
+            env.getRandomAuthorizations());
+        String row = findAStartRow(env.getRowMin(), env.getRowMax(), scanner, r);
 
-        values.clear();
+        while (row != null) {
 
-        long t1 = System.currentTimeMillis();
-        Span span = Trace.on("walk");
-        try {
-          scanner.setRange(new Range(new Text(row)));
-          for (Entry<Key,Value> entry : scanner) {
-            validate(entry.getKey(), entry.getValue());
-            values.add(entry.getValue());
+          values.clear();
+
+          long t1 = System.currentTimeMillis();
+          Span span = Trace.on("walk");
+          try {
+            scanner.setRange(new Range(new Text(row)));
+            for (Entry<Key,Value> entry : scanner) {
+              validate(entry.getKey(), entry.getValue());
+              values.add(entry.getValue());
+            }
+          } finally {
+            span.stop();
           }
-        } finally {
-          span.stop();
-        }
-        long t2 = System.currentTimeMillis();
+          long t2 = System.currentTimeMillis();
 
-        System.out.printf("SRQ %d %s %d %d%n", t1, row, (t2 - t1), values.size());
+          System.out.printf("SRQ %d %s %d %d%n", t1, row, (t2 - t1), values.size());
 
-        if (values.size() > 0) {
-          row = getPrevRow(values.get(r.nextInt(values.size())));
-        } else {
-          System.out.printf("MIS %d %s%n", t1, row);
-          System.err.printf("MIS %d %s%n", t1, row);
-          row = null;
+          if (values.size() > 0) {
+            row = getPrevRow(values.get(r.nextInt(values.size())));
+          } else {
+            System.out.printf("MIS %d %s%n", t1, row);
+            System.err.printf("MIS %d %s%n", t1, row);
+            row = null;
+          }
+
+          if (sleepTime > 0)
+            Thread.sleep(sleepTime);
         }
 
         if (sleepTime > 0)
           Thread.sleep(sleepTime);
       }
-
-      if (sleepTime > 0)
-        Thread.sleep(sleepTime);
     }
   }
 
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java b/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java
index fb4eeea..9558ef9 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java
@@ -27,47 +27,44 @@
 
   public static void main(String[] args) throws Exception {
 
-    if (args.length != 2) {
-      System.err.println("Usage: CreateTable <testPropsPath> <clientPropsPath>");
-      System.exit(-1);
-    }
-    ContinuousEnv env = new ContinuousEnv(args[0], args[1]);
+    try (ContinuousEnv env = new ContinuousEnv(args)) {
 
-    AccumuloClient client = env.getAccumuloClient();
-    String tableName = env.getAccumuloTableName();
-    if (client.tableOperations().exists(tableName)) {
-      System.err.println("ERROR: Accumulo table '" + tableName + "' already exists");
-      System.exit(-1);
-    }
-
-    int numTablets = Integer
-        .parseInt(env.getTestProperty(TestProps.CI_COMMON_ACCUMULO_NUM_TABLETS));
-    if (numTablets < 1) {
-      System.err.println("ERROR: numTablets < 1");
-      System.exit(-1);
-    }
-    if (env.getRowMin() >= env.getRowMax()) {
-      System.err.println("ERROR: min >= max");
-      System.exit(-1);
-    }
-
-    client.tableOperations().create(tableName);
-
-    SortedSet<Text> splits = new TreeSet<>();
-    int numSplits = numTablets - 1;
-    long distance = ((env.getRowMax() - env.getRowMin()) / numTablets) + 1;
-    long split = distance;
-    for (int i = 0; i < numSplits; i++) {
-      String s = String.format("%016x", split + env.getRowMin());
-      while (s.charAt(s.length() - 1) == '0') {
-        s = s.substring(0, s.length() - 1);
+      AccumuloClient client = env.getAccumuloClient();
+      String tableName = env.getAccumuloTableName();
+      if (client.tableOperations().exists(tableName)) {
+        System.err.println("ERROR: Accumulo table '" + tableName + "' already exists");
+        System.exit(-1);
       }
-      splits.add(new Text(s));
-      split += distance;
-    }
 
-    client.tableOperations().addSplits(tableName, splits);
-    System.out
-        .println("Created Accumulo table '" + tableName + "' with " + numTablets + " tablets");
+      int numTablets = Integer.parseInt(env
+          .getTestProperty(TestProps.CI_COMMON_ACCUMULO_NUM_TABLETS));
+      if (numTablets < 1) {
+        System.err.println("ERROR: numTablets < 1");
+        System.exit(-1);
+      }
+      if (env.getRowMin() >= env.getRowMax()) {
+        System.err.println("ERROR: min >= max");
+        System.exit(-1);
+      }
+
+      client.tableOperations().create(tableName);
+
+      SortedSet<Text> splits = new TreeSet<>();
+      int numSplits = numTablets - 1;
+      long distance = ((env.getRowMax() - env.getRowMin()) / numTablets) + 1;
+      long split = distance;
+      for (int i = 0; i < numSplits; i++) {
+        String s = String.format("%016x", split + env.getRowMin());
+        while (s.charAt(s.length() - 1) == '0') {
+          s = s.substring(0, s.length() - 1);
+        }
+        splits.add(new Text(s));
+        split += distance;
+      }
+
+      client.tableOperations().addSplits(tableName, splits);
+      System.out.println("Created Accumulo table '" + tableName + "' with " + numTablets
+          + " tablets");
+    }
   }
 }