Miscellaneous improvements to continuous code (#212)

diff --git a/src/main/java/org/apache/accumulo/testing/TestEnv.java b/src/main/java/org/apache/accumulo/testing/TestEnv.java
index 18f6afc..6fd91fa 100644
--- a/src/main/java/org/apache/accumulo/testing/TestEnv.java
+++ b/src/main/java/org/apache/accumulo/testing/TestEnv.java
@@ -18,11 +18,13 @@
 package org.apache.accumulo.testing;
 
 import java.lang.management.ManagementFactory;
+import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Random;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
@@ -37,6 +39,7 @@
   private final Properties clientProps;
   private AccumuloClient client = null;
   private Configuration hadoopConfig = null;
+  private final SecureRandom random = new SecureRandom();
 
   public TestEnv(String[] args) {
 
@@ -180,6 +183,10 @@
     return Accumulo.newClient().from(clientProps).as(principal, token).build();
   }
 
+  public Random getRandom() {
+    return random;
+  }
+
   @Override
   public void close() throws Exception {
     if (client != null) {
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/BulkIngest.java b/src/main/java/org/apache/accumulo/testing/continuous/BulkIngest.java
index 685efa1..290b180 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/BulkIngest.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/BulkIngest.java
@@ -85,27 +85,25 @@
 
       // create splits file for KeyRangePartitioner
       String splitsFile = bulkDir + "/splits.txt";
-      try (AccumuloClient client = env.getAccumuloClient()) {
+      AccumuloClient client = env.getAccumuloClient();
 
-        // make sure splits file is closed before continuing
-        try (PrintStream out = new PrintStream(
-            new BufferedOutputStream(fs.create(fs.makeQualified(new Path(splitsFile)))))) {
-          Collection<Text> splits = client.tableOperations().listSplits(tableName,
-              env.getBulkReducers() - 1);
-          for (Text split : splits) {
-            out.println(Base64.getEncoder().encodeToString(split.copyBytes()));
-          }
-          job.setNumReduceTasks(splits.size() + 1);
-        }
-
-        job.setPartitionerClass(KeyRangePartitioner.class);
-        KeyRangePartitioner.setSplitFile(job, fs.makeQualified(new Path(splitsFile)).toString());
-
-        job.waitForCompletion(true);
-        boolean success = job.isSuccessful();
-
-        return success ? 0 : 1;
+      // make sure splits file is closed before continuing
+      try (PrintStream out = new PrintStream(
+          new BufferedOutputStream(fs.create(fs.makeQualified(new Path(splitsFile)))))) {
+        Collection<Text> splits = client.tableOperations().listSplits(tableName,
+            env.getBulkReducers() - 1);
+        splits.stream().map(Text::copyBytes).map(split -> Base64.getEncoder().encodeToString(split))
+            .forEach(out::println);
+        job.setNumReduceTasks(splits.size() + 1);
       }
+
+      job.setPartitionerClass(KeyRangePartitioner.class);
+      KeyRangePartitioner.setSplitFile(job, fs.makeQualified(new Path(splitsFile)).toString());
+
+      job.waitForCompletion(true);
+      boolean success = job.isSuccessful();
+
+      return success ? 0 : 1;
     }
   }
 
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousBatchWalker.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousBatchWalker.java
index fe6cfcf..5d613b5 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousBatchWalker.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousBatchWalker.java
@@ -18,7 +18,7 @@
 
 import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 
-import java.util.ArrayList;
+import java.time.Duration;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -26,6 +26,7 @@
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.BatchScanner;
@@ -39,6 +40,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Sets;
+
 public class ContinuousBatchWalker {
   private static final Logger log = LoggerFactory.getLogger(ContinuousBatchWalker.class);
 
@@ -47,26 +50,22 @@
     try (ContinuousEnv env = new ContinuousEnv(args)) {
       Authorizations auths = env.getRandomAuthorizations();
       AccumuloClient client = env.getAccumuloClient();
-      Scanner scanner = ContinuousUtil.createScanner(client, env.getAccumuloTableName(), auths);
-      int scanBatchSize = Integer.parseInt(env.getTestProperty(TestProps.CI_BW_BATCH_SIZE));
-      scanner.setBatchSize(scanBatchSize);
 
-      Random r = new Random();
-
-      while (true) {
-        BatchScanner bs = client.createBatchScanner(env.getAccumuloTableName(), auths);
-
-        Set<Text> batch = getBatch(scanner, env.getRowMin(), env.getRowMax(), scanBatchSize, r);
-        List<Range> ranges = new ArrayList<>(batch.size());
-
-        for (Text row : batch) {
-          ranges.add(new Range(row));
+      try (Scanner scanner = ContinuousUtil.createScanner(client, env.getAccumuloTableName(),
+          auths)) {
+        int scanBatchSize = Integer.parseInt(env.getTestProperty(TestProps.CI_BW_BATCH_SIZE));
+        scanner.setBatchSize(scanBatchSize);
+        Duration bwSleep = Duration
+            .ofMillis(Integer.parseInt(env.getTestProperty(TestProps.CI_BW_SLEEP_MS)));
+        while (true) {
+          try (BatchScanner bs = client.createBatchScanner(env.getAccumuloTableName(), auths)) {
+            Set<Text> batch = getBatch(scanner, env.getRowMin(), env.getRowMax(), scanBatchSize,
+                env.getRandom());
+            List<Range> ranges = batch.stream().map(Range::new).collect(Collectors.toList());
+            runBatchScan(scanBatchSize, bs, batch, ranges);
+          }
+          sleepUninterruptibly(bwSleep);
         }
-
-        runBatchScan(scanBatchSize, bs, batch, ranges);
-
-        int bwSleepMs = Integer.parseInt(env.getTestProperty(TestProps.CI_BW_SLEEP_MS));
-        sleepUninterruptibly(bwSleepMs, TimeUnit.MILLISECONDS);
       }
     }
   }
@@ -95,16 +94,13 @@
     long t2 = System.currentTimeMillis();
 
     if (!rowsSeen.equals(batch)) {
-      HashSet<Text> copy1 = new HashSet<>(rowsSeen);
-      HashSet<Text> copy2 = new HashSet<>(batch);
+      Set<Text> extraSeen = Sets.difference(rowsSeen, batch);
+      Set<Text> notSeen = Sets.difference(batch, rowsSeen);
 
-      copy1.removeAll(batch);
-      copy2.removeAll(rowsSeen);
-
-      log.info("DIF {} {} {}", t1, copy1.size(), copy2.size());
-      log.info("DIF {} {} {}", t1, copy1.size(), copy2.size());
-      log.info("Extra seen : {}", copy1);
-      log.info("Not seen   : {}", copy2);
+      log.info("DIF {} {} {}", t1, extraSeen.size(), notSeen.size());
+      log.info("DIF {} {} {}", t1, extraSeen.size(), notSeen.size());
+      log.info("Extra seen : {}", extraSeen);
+      log.info("Not seen   : {}", notSeen);
     } else {
       log.info("BRQ {} {} {} {} {}", t1, (t2 - t1), rowsSeen.size(), count,
           (rowsSeen.size() / ((t2 - t1) / 1000.0)));
@@ -124,7 +120,7 @@
     }
   }
 
-  private static HashSet<Text> rowsToQuery = new HashSet<>();
+  private static final HashSet<Text> rowsToQuery = new HashSet<>();
 
   private static Set<Text> getBatch(Scanner scanner, long min, long max, int batchSize, Random r) {
 
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousEnv.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousEnv.java
index 4896026..ed36583 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousEnv.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousEnv.java
@@ -17,10 +17,10 @@
 
 package org.apache.accumulo.testing.continuous;
 
-import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Random;
+import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.testing.TestEnv;
@@ -43,10 +43,8 @@
       if (authValue == null || authValue.trim().isEmpty()) {
         authList = Collections.singletonList(Authorizations.EMPTY);
       } else {
-        authList = new ArrayList<>();
-        for (String a : authValue.split("\\|")) {
-          authList.add(new Authorizations(a.split(",")));
-        }
+        authList = Arrays.stream(authValue.split("\\|")).map(a -> a.split(","))
+            .map(Authorizations::new).collect(Collectors.toList());
       }
     }
     return authList;
@@ -56,8 +54,7 @@
    * @return random authorization
    */
   Authorizations getRandomAuthorizations() {
-    Random r = new Random();
-    return getAuthList().get(r.nextInt(getAuthList().size()));
+    return getAuthList().get(this.getRandom().nextInt(getAuthList().size()));
   }
 
   public long getRowMin() {
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
index 73eb545..4edd581 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
@@ -98,8 +98,9 @@
 
   public static void main(String[] args) throws Exception {
 
-    try (ContinuousEnv env = new ContinuousEnv(args);
-        AccumuloClient client = env.getAccumuloClient()) {
+    try (ContinuousEnv env = new ContinuousEnv(args)) {
+
+      AccumuloClient client = env.getAccumuloClient();
 
       final long rowMin = env.getRowMin();
       final long rowMax = env.getRowMax();
@@ -112,8 +113,6 @@
             "Consult the README and create the table before starting ingest.");
       }
 
-      Random rand = new Random();
-
       byte[] ingestInstanceId = UUID.randomUUID().toString().getBytes(UTF_8);
       log.info("Ingest instance ID: {} current time: {}ms", new String(ingestInstanceId, UTF_8),
           System.currentTimeMillis());
@@ -153,7 +152,7 @@
 
       if (pauseEnabled) {
         lastPauseNs = System.nanoTime();
-        pauseWaitSec = getPause(rand);
+        pauseWaitSec = getPause(env.getRandom());
         log.info("PAUSING enabled");
         log.info("INGESTING for {}s", pauseWaitSec);
       }
@@ -164,17 +163,17 @@
 
       try (BatchWriter bw = client.createBatchWriter(tableName)) {
         out: while (true) {
-          ColumnVisibility cv = getVisibility(rand);
+          ColumnVisibility cv = getVisibility(env.getRandom());
 
           // generate sets nodes that link to previous set of nodes
           for (int depth = 0; depth < maxDepth; depth++) {
             for (int index = 0; index < flushInterval; index++) {
-              long rowLong = genLong(rowMin, rowMax, rand);
+              long rowLong = genLong(rowMin, rowMax, env.getRandom());
 
               byte[] prevRow = depth == 0 ? null : genRow(nodeMap[depth - 1][index].row);
 
-              int cfInt = rand.nextInt(maxColF);
-              int cqInt = rand.nextInt(maxColQ);
+              int cfInt = env.getRandom().nextInt(maxColF);
+              int cqInt = env.getRandom().nextInt(maxColQ);
 
               nodeMap[depth][index] = new MutationInfo(rowLong, cfInt, cqInt);
               Mutation m = genMutation(rowLong, cfInt, cqInt, cv, ingestInstanceId, entriesWritten,
@@ -186,11 +185,11 @@
             lastFlushTime = flush(bw, entriesWritten, entriesDeleted, lastFlushTime);
             if (entriesWritten >= numEntries)
               break out;
-            pauseCheck(rand);
+            pauseCheck(env.getRandom());
           }
 
           // random chance that the entries will be deleted
-          final boolean delete = rand.nextFloat() < deleteProbability;
+          final boolean delete = env.getRandom().nextFloat() < deleteProbability;
 
           // if the previously written entries are scheduled to be deleted
           if (delete) {
@@ -205,7 +204,7 @@
                 bw.addMutation(m);
               }
               lastFlushTime = flush(bw, entriesWritten, entriesDeleted, lastFlushTime);
-              pauseCheck(rand);
+              pauseCheck(env.getRandom());
             }
           } else {
             // create one big linked list, this makes all the first inserts point to something
@@ -222,7 +221,7 @@
 
           if (entriesWritten >= numEntries)
             break out;
-          pauseCheck(rand);
+          pauseCheck(env.getRandom());
         }
       }
     }
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousInputFormat.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousInputFormat.java
index 4e17bb7..2b63c2c 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousInputFormat.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousInputFormat.java
@@ -26,9 +26,10 @@
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.security.SecureRandom;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
@@ -81,11 +82,7 @@
   @Override
   public List<InputSplit> getSplits(JobContext jobContext) {
     int numTask = jobContext.getConfiguration().getInt(PROP_MAP_TASK, 1);
-    List<InputSplit> splits = new ArrayList<>();
-    for (int i = 0; i < numTask; i++) {
-      splits.add(new RandomSplit());
-    }
-    return splits;
+    return Stream.generate(RandomSplit::new).limit(numTask).collect(Collectors.toList());
   }
 
   public static void configure(Configuration conf, String uuid, ContinuousEnv env) {
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousMoru.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousMoru.java
index 99e2fdf..12e86b7 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousMoru.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousMoru.java
@@ -63,7 +63,6 @@
     private short max_cf;
     private short max_cq;
     private Random random;
-    private String ingestInstanceId;
     private byte[] iiId;
     private long count;
 
@@ -81,7 +80,7 @@
       this.max_cq = (short) max_cq;
 
       random = new Random();
-      ingestInstanceId = context.getConfiguration().get(CI_ID);
+      final String ingestInstanceId = context.getConfiguration().get(CI_ID);
       iiId = ingestInstanceId.getBytes(UTF_8);
 
       count = 0;
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousScanner.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousScanner.java
index 27ab4c4..808779f 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousScanner.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousScanner.java
@@ -19,17 +19,11 @@
 import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static java.nio.charset.StandardCharsets.UTF_8;
 
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.testing.TestProps;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
@@ -42,56 +36,50 @@
 
     try (ContinuousEnv env = new ContinuousEnv(args)) {
 
-      Random r = new Random();
-
-      long distance = 1000000000000l;
+      long distance = 1_000_000_000_000L;
 
       AccumuloClient client = env.getAccumuloClient();
-      Authorizations auths = env.getRandomAuthorizations();
-      Scanner scanner = ContinuousUtil.createScanner(client, env.getAccumuloTableName(), auths);
 
       int numToScan = Integer.parseInt(env.getTestProperty(TestProps.CI_SCANNER_ENTRIES));
       int scannerSleepMs = Integer.parseInt(env.getTestProperty(TestProps.CI_SCANNER_SLEEP_MS));
 
       double delta = Math.min(.05, .05 / (numToScan / 1000.0));
+      try (Scanner scanner = ContinuousUtil.createScanner(client, env.getAccumuloTableName(),
+          env.getRandomAuthorizations())) {
+        while (true) {
+          long startRow = ContinuousIngest.genLong(env.getRowMin(), env.getRowMax() - distance,
+              env.getRandom());
+          byte[] scanStart = ContinuousIngest.genRow(startRow);
+          byte[] scanStop = ContinuousIngest.genRow(startRow + distance);
 
-      while (true) {
-        long startRow = ContinuousIngest.genLong(env.getRowMin(), env.getRowMax() - distance, r);
-        byte[] scanStart = ContinuousIngest.genRow(startRow);
-        byte[] scanStop = ContinuousIngest.genRow(startRow + distance);
+          scanner.setRange(new Range(new Text(scanStart), new Text(scanStop)));
 
-        scanner.setRange(new Range(new Text(scanStart), new Text(scanStop)));
+          long t1 = System.currentTimeMillis();
 
-        int count = 0;
-        Iterator<Entry<Key,Value>> iter = scanner.iterator();
+          long count = scanner.stream()
+              .peek(entry -> ContinuousWalk.validate(entry.getKey(), entry.getValue())).count();
 
-        long t1 = System.currentTimeMillis();
+          long t2 = System.currentTimeMillis();
 
-        while (iter.hasNext()) {
-          Entry<Key,Value> entry = iter.next();
-          ContinuousWalk.validate(entry.getKey(), entry.getValue());
-          count++;
-        }
-
-        long t2 = System.currentTimeMillis();
-
-        if (count < (1 - delta) * numToScan || count > (1 + delta) * numToScan) {
-          if (count == 0) {
-            distance = distance * 10;
-            if (distance < 0)
-              distance = 1000000000000l;
-          } else {
-            double ratio = (double) numToScan / count;
-            // move ratio closer to 1 to make change slower
-            ratio = ratio - (ratio - 1.0) * (2.0 / 3.0);
-            distance = (long) (ratio * distance);
+          if (count < (1 - delta) * numToScan || count > (1 + delta) * numToScan) {
+            if (count == 0) {
+              distance = distance * 10;
+              if (distance < 0)
+                distance = 1_000_000_000_000L;
+            } else {
+              double ratio = (double) numToScan / count;
+              // move ratio closer to 1 to make change slower
+              ratio = ratio - (ratio - 1.0) * (2.0 / 3.0);
+              distance = (long) (ratio * distance);
+            }
           }
-        }
 
-        log.debug("SCN {} {} {} {}", t1, new String(scanStart, UTF_8), (t2 - t1), count);
+          log.debug("SCAN - start: {}ms, start row: {}, duration: {}ms, total scanned: {}", t1,
+              new String(scanStart, UTF_8), (t2 - t1), count);
 
-        if (scannerSleepMs > 0) {
-          sleepUninterruptibly(scannerSleepMs, TimeUnit.MILLISECONDS);
+          if (scannerSleepMs > 0) {
+            sleepUninterruptibly(scannerSleepMs, TimeUnit.MILLISECONDS);
+          }
         }
       }
     }
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousVerify.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousVerify.java
index 10d224e..05ace17 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousVerify.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousVerify.java
@@ -22,8 +22,9 @@
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Random;
+import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.data.Key;
@@ -119,15 +120,11 @@
       }
 
       if (defCount == 0 && refs.size() > 0) {
-        StringBuilder sb = new StringBuilder();
-        String comma = "";
-        for (Long ref : refs) {
-          sb.append(comma);
-          comma = ",";
-          sb.append(new String(ContinuousIngest.genRow(ref), UTF_8));
-        }
+        List<String> rowList = refs.stream().map(ContinuousIngest::genRow)
+            .map(row -> new String(row, UTF_8)).collect(Collectors.toList());
+        String rows = String.join(",", rowList);
 
-        context.write(new Text(ContinuousIngest.genRow(key.get())), new Text(sb.toString()));
+        context.write(new Text(ContinuousIngest.genRow(key.get())), new Text(rows));
         context.getCounter(Counts.UNDEFINED).increment(1L);
 
       } else if (defCount > 0 && refs.size() == 0) {
@@ -164,8 +161,8 @@
       String table;
 
       if (scanOffline) {
-        Random random = new Random();
-        clone = tableName + "_" + String.format("%016x", (random.nextLong() & 0x7fffffffffffffffL));
+        clone = tableName + "_"
+            + String.format("%016x", (env.getRandom().nextLong() & 0x7fffffffffffffffL));
         client.tableOperations().clone(tableName, clone, true, new HashMap<>(), new HashSet<>());
         ranges = client.tableOperations().splitRangeByTablets(tableName, new Range(), maxMaps);
         client.tableOperations().offline(clone);
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java
index 54f9786..28446dd 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java
@@ -51,47 +51,46 @@
 
       AccumuloClient client = env.getAccumuloClient();
 
-      Random r = new Random();
-
       ArrayList<Value> values = new ArrayList<>();
 
       int sleepTime = Integer.parseInt(env.getTestProperty(TestProps.CI_WALKER_SLEEP_MS));
 
       while (true) {
-        Scanner scanner = ContinuousUtil.createScanner(client, env.getAccumuloTableName(),
-            env.getRandomAuthorizations());
-        String row = findAStartRow(env.getRowMin(), env.getRowMax(), scanner, r);
+        try (Scanner scanner = ContinuousUtil.createScanner(client, env.getAccumuloTableName(),
+            env.getRandomAuthorizations())) {
+          String row = findAStartRow(env.getRowMin(), env.getRowMax(), scanner, env.getRandom());
 
-        while (row != null) {
+          while (row != null) {
 
-          values.clear();
+            values.clear();
 
-          long t1 = System.currentTimeMillis();
+            long t1 = System.currentTimeMillis();
 
-          scanner.setRange(new Range(new Text(row)));
-          for (Entry<Key,Value> entry : scanner) {
-            validate(entry.getKey(), entry.getValue());
-            values.add(entry.getValue());
-          }
+            scanner.setRange(new Range(new Text(row)));
+            for (Entry<Key,Value> entry : scanner) {
+              validate(entry.getKey(), entry.getValue());
+              values.add(entry.getValue());
+            }
 
-          long t2 = System.currentTimeMillis();
+            long t2 = System.currentTimeMillis();
 
-          log.debug("SRQ {} {} {} {}", t1, row, (t2 - t1), values.size());
+            log.debug("SRQ {} {} {} {}", t1, row, (t2 - t1), values.size());
 
-          if (values.size() > 0) {
-            row = getPrevRow(values.get(r.nextInt(values.size())));
-          } else {
-            log.debug("MIS {} {}", t1, row);
-            log.debug("MIS {} {}", t1, row);
-            row = null;
+            if (values.size() > 0) {
+              row = getPrevRow(values.get(env.getRandom().nextInt(values.size())));
+            } else {
+              log.debug("MIS {} {}", t1, row);
+              log.debug("MIS {} {}", t1, row);
+              row = null;
+            }
+
+            if (sleepTime > 0)
+              Thread.sleep(sleepTime);
           }
 
           if (sleepTime > 0)
             Thread.sleep(sleepTime);
         }
-
-        if (sleepTime > 0)
-          Thread.sleep(sleepTime);
       }
     }
   }
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java b/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java
index 831d315..13a118f 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java
@@ -38,8 +38,8 @@
   public static void main(String[] args) throws Exception {
 
     try (ContinuousEnv env = new ContinuousEnv(args)) {
-
       AccumuloClient client = env.getAccumuloClient();
+
       String tableName = env.getAccumuloTableName();
       if (client.tableOperations().exists(tableName)) {
         log.error("Accumulo table {} already exists", tableName);
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/TimeBinner.java b/src/main/java/org/apache/accumulo/testing/continuous/TimeBinner.java
index 33ea7ef..93b46da 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/TimeBinner.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/TimeBinner.java
@@ -23,9 +23,7 @@
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.accumulo.testing.cli.ClientOpts.TimeConverter;
 import org.apache.accumulo.testing.cli.Help;
@@ -156,39 +154,36 @@
       }
     }
 
-    TreeMap<Long,DoubleWrapper> sorted = new TreeMap<>(aggregation1);
-
-    Set<Entry<Long,DoubleWrapper>> es = sorted.entrySet();
-
-    double cumulative = 0;
-    for (Entry<Long,DoubleWrapper> entry : es) {
-      String value;
+    AtomicReference<Double> cumulative = new AtomicReference<>((double) 0);
+    aggregation1.entrySet().stream().sorted().forEach(entry -> {
+      final String value;
+      final var currentKey = entry.getKey();
+      final var currentValue = entry.getValue();
 
       switch (operation) {
         case AMM_HACK1:
         case AMM: {
-          DoubleWrapper countdw = aggregation2.get(entry.getKey());
-          value = "" + (entry.getValue().d / countdw.d) + " " + aggregation3.get(entry.getKey()).d
-              + " " + aggregation4.get(entry.getKey()).d;
+          DoubleWrapper countdw = aggregation2.get(currentKey);
+          value = "" + (currentValue.d / countdw.d) + " " + aggregation3.get(currentKey).d + " "
+              + aggregation4.get(currentKey).d;
           break;
         }
         case AVG: {
-          DoubleWrapper countdw = aggregation2.get(entry.getKey());
-          value = "" + (entry.getValue().d / countdw.d);
+          DoubleWrapper countdw = aggregation2.get(currentKey);
+          value = "" + (currentValue.d / countdw.d);
           break;
         }
         case CUMULATIVE: {
-          cumulative += entry.getValue().d;
+          cumulative.updateAndGet(v -> v + currentValue.d);
           value = "" + cumulative;
           break;
         }
         default:
-          value = "" + entry.getValue().d;
+          value = "" + currentValue.d;
       }
 
-      log.info(sdf.format(new Date(entry.getKey())) + " " + value);
-    }
-
+      log.info("{} {}", sdf.format(new Date(currentKey)), value);
+    });
   }
 
   private static void increment(long time, HashMap<Long,DoubleWrapper> aggregation, double amount) {
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/UndefinedAnalyzer.java b/src/main/java/org/apache/accumulo/testing/continuous/UndefinedAnalyzer.java
index 00bbe56..0708e6f 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/UndefinedAnalyzer.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/UndefinedAnalyzer.java
@@ -25,13 +25,16 @@
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.TreeMap;
+import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
@@ -172,67 +175,61 @@
 
       SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
 
-      if (masterLogs != null) {
-        for (File masterLog : masterLogs) {
+      if (masterLogs == null)
+        return;
 
-          String line;
-          try (BufferedReader reader = Files.newBufferedReader(masterLog.toPath())) {
-            while ((line = reader.readLine()) != null) {
-              String[] tokens = line.split("\\s+");
-              String day = tokens[0];
-              String time = tokens[1];
-              String tablet = tokens[2];
-              String server = tokens[3];
+      for (File masterLog : masterLogs) {
 
-              int pos1 = -1;
-              int pos2 = -1;
-              int pos3 = -1;
+        String line;
+        try (BufferedReader reader = Files.newBufferedReader(masterLog.toPath())) {
+          while ((line = reader.readLine()) != null) {
+            String[] tokens = line.split("\\s+");
+            String day = tokens[0];
+            String time = tokens[1];
+            String tablet = tokens[2];
+            String server = tokens[3];
 
-              for (int i = 0; i < tablet.length(); i++) {
-                if (tablet.charAt(i) == '<' || tablet.charAt(i) == ';') {
-                  if (pos1 == -1) {
-                    pos1 = i;
-                  } else if (pos2 == -1) {
-                    pos2 = i;
-                  } else {
-                    pos3 = i;
-                  }
+            int pos1 = -1;
+            int pos2 = -1;
+            int pos3 = -1;
+
+            for (int i = 0; i < tablet.length(); i++) {
+              if (tablet.charAt(i) == '<' || tablet.charAt(i) == ';') {
+                if (pos1 == -1) {
+                  pos1 = i;
+                } else if (pos2 == -1) {
+                  pos2 = i;
+                } else {
+                  pos3 = i;
                 }
               }
+            }
 
-              if (pos1 > 0 && pos2 > 0 && pos3 == -1) {
-                String tid = tablet.substring(0, pos1);
-                String endRow = tablet.charAt(pos1) == '<' ? "8000000000000000"
-                    : tablet.substring(pos1 + 1, pos2);
-                String prevEndRow = tablet.charAt(pos2) == '<' ? "" : tablet.substring(pos2 + 1);
-                if (tid.equals(tableId)) {
-                  Date date = sdf.parse(day + " " + time);
-                  assignments.add(
-                      new TabletAssignment(tablet, endRow, prevEndRow, server, date.getTime()));
+            if (pos1 > 0 && pos2 > 0 && pos3 == -1) {
+              String tid = tablet.substring(0, pos1);
+              String endRow = tablet.charAt(pos1) == '<' ? "8000000000000000"
+                  : tablet.substring(pos1 + 1, pos2);
+              String prevEndRow = tablet.charAt(pos2) == '<' ? "" : tablet.substring(pos2 + 1);
+              if (tid.equals(tableId)) {
+                Date date = sdf.parse(day + " " + time);
+                assignments
+                    .add(new TabletAssignment(tablet, endRow, prevEndRow, server, date.getTime()));
 
-                }
-              } else if (!tablet.startsWith("!0")) {
-                logger.error("Cannot parse tablet {}", tablet);
               }
+            } else if (!tablet.startsWith("!0")) {
+              logger.error("Cannot parse tablet {}", tablet);
             }
           }
         }
       }
     }
 
-    TabletAssignment findMostRecentAssignment(String row, long time1, long time2) {
-
-      long latest = Long.MIN_VALUE;
-      TabletAssignment ret = null;
-
-      for (TabletAssignment assignment : assignments) {
-        if (assignment.contains(row) && assignment.time <= time2 && assignment.time > latest) {
-          latest = assignment.time;
-          ret = assignment;
-        }
-      }
-
-      return ret;
+    TabletAssignment findMostRecentAssignment(String row, long time2) {
+      Optional<TabletAssignment> ret1 = assignments.stream()
+          .filter(assignment -> assignment.contains(row))
+          .filter(assignment -> assignment.time <= time2)
+          .max(Comparator.comparingLong(assignment -> assignment.time));
+      return ret1.orElse(null);
     }
   }
 
@@ -267,10 +264,9 @@
 
     try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build();
         BatchScanner bscanner = client.createBatchScanner(opts.tableName, opts.auths)) {
-      List<Range> refs = new ArrayList<>();
 
-      for (UndefinedNode undefinedNode : undefs)
-        refs.add(new Range(new Text(undefinedNode.ref)));
+      List<Range> refs = undefs.stream().map(node -> node.ref).map(Text::new).map(Range::new)
+          .collect(Collectors.toList());
 
       bscanner.setRanges(refs);
 
@@ -292,11 +288,9 @@
 
         List<String> refVals = refInfo.get(undefinedNode.ref);
         if (refVals != null) {
-          for (String refVal : refVals) {
+          refVals.stream().map(refVal -> refVal.split(":")).forEach(tokens -> {
             TabletAssignment ta = null;
 
-            String[] tokens = refVal.split(":");
-
             String uuid = tokens[0];
             String count = tokens[1];
 
@@ -311,7 +305,7 @@
                 if (times.hasNext()) {
                   long time1 = times.next();
                   t1 = sdf.format(new Date(time1));
-                  ta = tabletHistory.findMostRecentAssignment(undefinedNode.undef, time1, time2);
+                  ta = tabletHistory.findMostRecentAssignment(undefinedNode.undef, time2);
                 }
               }
             }
@@ -322,7 +316,7 @@
               logger.debug("{} {} {} {} {} {} {}", undefinedNode.undef, undefinedNode.ref,
                   ta.tablet, ta.server, uuid, t1, t2);
 
-          }
+          });
         } else {
           logger.debug("{} {}", undefinedNode.undef, undefinedNode.ref);
         }