Cleanup Continuous Ingest (#71)

* Add SLF4J logging
* Some minor cleanup
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousBatchWalker.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousBatchWalker.java
index 6c02f7f..fe6cfcf 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousBatchWalker.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousBatchWalker.java
@@ -36,8 +36,11 @@
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.testing.TestProps;
 import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ContinuousBatchWalker {
+  private static final Logger log = LoggerFactory.getLogger(ContinuousBatchWalker.class);
 
   public static void main(String[] args) throws Exception {
 
@@ -98,13 +101,13 @@
       copy1.removeAll(batch);
       copy2.removeAll(rowsSeen);
 
-      System.out.printf("DIF %d %d %d%n", t1, copy1.size(), copy2.size());
-      System.err.printf("DIF %d %d %d%n", t1, copy1.size(), copy2.size());
-      System.err.println("Extra seen : " + copy1);
-      System.err.println("Not seen   : " + copy2);
+      log.info("DIF {} {} {}", t1, copy1.size(), copy2.size());
+      log.info("DIF {} {} {}", t1, copy1.size(), copy2.size());
+      log.info("Extra seen : {}", copy1);
+      log.info("Not seen   : {}", copy2);
     } else {
-      System.out.printf("BRQ %d %d %d %d %d%n", t1, (t2 - t1), rowsSeen.size(), count,
-          (int) (rowsSeen.size() / ((t2 - t1) / 1000.0)));
+      log.info("BRQ {} {} {} {} {}", t1, (t2 - t1), rowsSeen.size(), count,
+          (rowsSeen.size() / ((t2 - t1) / 1000.0)));
     }
   }
 
@@ -143,7 +146,7 @@
 
       long t2 = System.currentTimeMillis();
 
-      System.out.println("FSB " + t1 + " " + (t2 - t1) + " " + count);
+      log.info("FSB {} {} {}", t1, (t2 - t1), count);
 
       sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
     }
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
index 65779d9..c5229f7 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
@@ -17,6 +17,10 @@
 package org.apache.accumulo.testing.continuous;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.accumulo.testing.TestProps.CI_INGEST_PAUSE_DURATION_MAX;
+import static org.apache.accumulo.testing.TestProps.CI_INGEST_PAUSE_DURATION_MIN;
+import static org.apache.accumulo.testing.TestProps.CI_INGEST_PAUSE_WAIT_MAX;
+import static org.apache.accumulo.testing.TestProps.CI_INGEST_PAUSE_WAIT_MIN;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -34,8 +38,6 @@
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.security.ColumnVisibility;
-// import org.apache.accumulo.core.trace.Trace;
-// import org.apache.accumulo.core.trace.TraceSamplers;
 import org.apache.accumulo.core.util.FastFormat;
 import org.apache.accumulo.testing.TestProps;
 import org.slf4j.Logger;
@@ -62,24 +64,14 @@
     return Boolean.parseBoolean(value);
   }
 
-  private static int getPauseWaitSec(Properties props, Random rand) {
-    int waitMin = Integer.parseInt(props.getProperty(TestProps.CI_INGEST_PAUSE_WAIT_MIN));
-    int waitMax = Integer.parseInt(props.getProperty(TestProps.CI_INGEST_PAUSE_WAIT_MAX));
-    Preconditions.checkState(waitMax >= waitMin && waitMin > 0);
-    if (waitMax == waitMin) {
-      return waitMin;
+  private static int getPause(Properties props, Random rand, String minProp, String maxProp) {
+    int min = Integer.parseInt(props.getProperty(minProp));
+    int max = Integer.parseInt(props.getProperty(maxProp));
+    Preconditions.checkState(max >= min && min > 0);
+    if (max == min) {
+      return min;
     }
-    return (rand.nextInt(waitMax - waitMin) + waitMin);
-  }
-
-  private static int getPauseDurationSec(Properties props, Random rand) {
-    int durationMin = Integer.parseInt(props.getProperty(TestProps.CI_INGEST_PAUSE_DURATION_MIN));
-    int durationMax = Integer.parseInt(props.getProperty(TestProps.CI_INGEST_PAUSE_DURATION_MAX));
-    Preconditions.checkState(durationMax >= durationMin && durationMin > 0);
-    if (durationMax == durationMin) {
-      return durationMin;
-    }
-    return (rand.nextInt(durationMax - durationMin) + durationMin);
+    return (rand.nextInt(max - min) + min);
   }
 
   private static int getFlushEntries(Properties props) {
@@ -90,11 +82,12 @@
     if (pauseEnabled(props)) {
       long elapsedNano = System.nanoTime() - lastPauseNs;
       if (elapsedNano > (TimeUnit.SECONDS.toNanos(pauseWaitSec))) {
-        long pauseDurationSec = getPauseDurationSec(props, rand);
+        long pauseDurationSec = getPause(props, rand, CI_INGEST_PAUSE_DURATION_MIN,
+            CI_INGEST_PAUSE_DURATION_MAX);
         log.info("PAUSING for " + pauseDurationSec + "s");
         Thread.sleep(TimeUnit.SECONDS.toMillis(pauseDurationSec));
         lastPauseNs = System.nanoTime();
-        pauseWaitSec = getPauseWaitSec(props, rand);
+        pauseWaitSec = getPause(props, rand, CI_INGEST_PAUSE_WAIT_MIN, CI_INGEST_PAUSE_WAIT_MAX);
         log.info("INGESTING for " + pauseWaitSec + "s");
       }
     }
@@ -128,7 +121,6 @@
       }
 
       BatchWriter bw = client.createBatchWriter(tableName);
-      // bw = Trace.wrapAll(bw, TraceSamplers.countSampler(1024));
 
       Random r = new Random();
 
@@ -141,12 +133,9 @@
       final int flushInterval = getFlushEntries(env.getTestProperties());
       final int maxDepth = 25;
 
-      // always want to point back to flushed data. This way the previous item
-      // should
-      // always exist in accumulo when verifying data. To do this make insert
-      // N point
-      // back to the row from insert (N - flushInterval). The array below is
-      // used to keep
+      // always want to point back to flushed data. This way the previous item should
+      // always exist in accumulo when verifying data. To do this make insert N point
+      // back to the row from insert (N - flushInterval). The array below is used to keep
       // track of this.
       long[] prevRows = new long[flushInterval];
       long[] firstRows = new long[flushInterval];
@@ -163,7 +152,7 @@
       Properties testProps = env.getTestProperties();
       if (pauseEnabled(testProps)) {
         lastPauseNs = System.nanoTime();
-        pauseWaitSec = getPauseWaitSec(testProps, r);
+        pauseWaitSec = getPause(testProps, r, CI_INGEST_PAUSE_WAIT_MIN, CI_INGEST_PAUSE_WAIT_MAX);
         log.info("PAUSING enabled");
         log.info("INGESTING for " + pauseWaitSec + "s");
       }
@@ -192,8 +181,7 @@
         if (count >= numEntries)
           break out;
 
-        // generate subsequent sets of nodes that link to previous set of
-        // nodes
+        // generate subsequent sets of nodes that link to previous set of nodes
         for (int depth = 1; depth < maxDepth; depth++) {
           for (int index = 0; index < flushInterval; index++) {
             long rowLong = genLong(rowMin, rowMax, r);
@@ -211,8 +199,7 @@
           pauseCheck(testProps, r);
         }
 
-        // create one big linked list, this makes all of the first inserts
-        // point to something
+        // create one big linked list, this makes all of the first inserts point to something
         for (int index = 0; index < flushInterval - 1; index++) {
           Mutation m = genMutation(firstRows[index], firstColFams[index], firstColQuals[index], cv,
               ingestInstanceId, count, genRow(prevRows[index + 1]), checksum);
@@ -306,9 +293,6 @@
       cksum.getValue();
       FastFormat.toZeroPaddedString(val, index, cksum.getValue(), 8, 16, EMPTY_BYTES);
     }
-
-    // System.out.println("val "+new String(val));
-
     return val;
   }
 }
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousScanner.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousScanner.java
index d453ec7..27ab4c4 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousScanner.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousScanner.java
@@ -32,8 +32,11 @@
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.testing.TestProps;
 import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ContinuousScanner {
+  private static final Logger log = LoggerFactory.getLogger(ContinuousScanner.class);
 
   public static void main(String[] args) throws Exception {
 
@@ -72,9 +75,6 @@
 
         long t2 = System.currentTimeMillis();
 
-        // System.out.println("P1 " +count +" "+((1-delta) *
-        // numToScan)+" "+((1+delta) * numToScan)+" "+numToScan);
-
         if (count < (1 - delta) * numToScan || count > (1 + delta) * numToScan) {
           if (count == 0) {
             distance = distance * 10;
@@ -86,12 +86,9 @@
             ratio = ratio - (ratio - 1.0) * (2.0 / 3.0);
             distance = (long) (ratio * distance);
           }
-
-          // System.out.println("P2 "+delta
-          // +" "+numToScan+" "+distance+" "+((double)numToScan/count ));
         }
 
-        System.out.printf("SCN %d %s %d %d%n", t1, new String(scanStart, UTF_8), (t2 - t1), count);
+        log.debug("SCN {} {} {} {}", t1, new String(scanStart, UTF_8), (t2 - t1), count);
 
         if (scannerSleepMs > 0) {
           sleepUninterruptibly(scannerSleepMs, TimeUnit.MILLISECONDS);
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousVerify.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousVerify.java
index 880643d..14015e3 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousVerify.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousVerify.java
@@ -50,7 +50,6 @@
  * referenced nodes are defined.
  */
 public class ContinuousVerify extends Configured implements Tool {
-
   public static final VLongWritable DEF = new VLongWritable(-1);
 
   public static class CMapper extends Mapper<Key,Value,LongWritable,VLongWritable> {
@@ -75,7 +74,7 @@
         if (corrupt < 1000) {
           log.error("Bad checksum : " + key);
         } else if (corrupt == 1000) {
-          System.out.println("Too many bad checksums, not printing anymore!");
+          log.error("Too many bad checksums, not printing anymore!");
         }
         corrupt++;
         return;
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java
index 11291b1..54f9786 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java
@@ -28,12 +28,13 @@
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
-// import org.apache.accumulo.core.trace.Span;
-// import org.apache.accumulo.core.trace.Trace;
 import org.apache.accumulo.testing.TestProps;
 import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ContinuousWalk {
+  private static final Logger log = LoggerFactory.getLogger(ContinuousWalk.class);
 
   static class BadChecksumException extends RuntimeException {
     private static final long serialVersionUID = 1L;
@@ -66,25 +67,22 @@
           values.clear();
 
           long t1 = System.currentTimeMillis();
-          // Span span = Trace.on("walk");
-          try {
-            scanner.setRange(new Range(new Text(row)));
-            for (Entry<Key,Value> entry : scanner) {
-              validate(entry.getKey(), entry.getValue());
-              values.add(entry.getValue());
-            }
-          } finally {
-            // span.stop();
+
+          scanner.setRange(new Range(new Text(row)));
+          for (Entry<Key,Value> entry : scanner) {
+            validate(entry.getKey(), entry.getValue());
+            values.add(entry.getValue());
           }
+
           long t2 = System.currentTimeMillis();
 
-          System.out.printf("SRQ %d %s %d %d%n", t1, row, (t2 - t1), values.size());
+          log.debug("SRQ {} {} {} {}", t1, row, (t2 - t1), values.size());
 
           if (values.size() > 0) {
             row = getPrevRow(values.get(r.nextInt(values.size())));
           } else {
-            System.out.printf("MIS %d %s%n", t1, row);
-            System.err.printf("MIS %d %s%n", t1, row);
+            log.debug("MIS {} {}", t1, row);
+            log.debug("MIS {} {}", t1, row);
             row = null;
           }
 
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java b/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java
index fc3440a..a3f1457 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java
@@ -16,6 +16,8 @@
  */
 package org.apache.accumulo.testing.continuous;
 
+import static org.apache.accumulo.testing.TestProps.CI_COMMON_ACCUMULO_NUM_TABLETS;
+
 import java.util.HashMap;
 import java.util.Map;
 import java.util.SortedSet;
@@ -25,8 +27,11 @@
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 import org.apache.accumulo.testing.TestProps;
 import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class CreateTable {
+  private static final Logger log = LoggerFactory.getLogger(CreateTable.class);
 
   public static void main(String[] args) throws Exception {
 
@@ -35,19 +40,18 @@
       AccumuloClient client = env.getAccumuloClient();
       String tableName = env.getAccumuloTableName();
       if (client.tableOperations().exists(tableName)) {
-        System.err.println("ERROR: Accumulo table '" + tableName + "' already exists");
+        log.error("Accumulo table {} already exists", tableName);
         System.exit(-1);
       }
 
-      int numTablets = Integer
-          .parseInt(env.getTestProperty(TestProps.CI_COMMON_ACCUMULO_NUM_TABLETS));
+      int numTablets = Integer.parseInt(env.getTestProperty(CI_COMMON_ACCUMULO_NUM_TABLETS));
 
       if (numTablets < 1) {
-        System.err.println("ERROR: numTablets < 1");
+        log.error("numTablets < 1");
         System.exit(-1);
       }
       if (env.getRowMin() >= env.getRowMax()) {
-        System.err.println("ERROR: min >= max");
+        log.error("min >= max");
         System.exit(-1);
       }
 
@@ -70,8 +74,7 @@
 
       client.tableOperations().create(tableName, ntc);
 
-      System.out
-          .println("Created Accumulo table '" + tableName + "' with " + numTablets + " tablets");
+      log.info("Created Accumulo table {} with {} tablets", tableName, numTablets);
     }
   }
 
@@ -79,7 +82,7 @@
     String[] props = env.getTestProperty(TestProps.CI_COMMON_ACCUMULO_TABLE_PROPS).split(" ");
     Map<String,String> tableProps = new HashMap<>();
     for (String prop : props) {
-      System.out.println("prop" + prop);
+      log.debug("prop: {}", prop);
       String[] kv = prop.split("=");
       tableProps.put(kv[0], kv[1]);
     }
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/TimeBinner.java b/src/main/java/org/apache/accumulo/testing/continuous/TimeBinner.java
index f231294..33ea7ef 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/TimeBinner.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/TimeBinner.java
@@ -29,10 +29,13 @@
 
 import org.apache.accumulo.testing.cli.ClientOpts.TimeConverter;
 import org.apache.accumulo.testing.cli.Help;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.beust.jcommander.Parameter;
 
 public class TimeBinner {
+  private static final Logger log = LoggerFactory.getLogger(TimeBinner.class);
 
   enum Operation {
     AVG,
@@ -149,7 +152,7 @@
         }
 
       } catch (Exception e) {
-        System.err.println("Failed to process line : " + line + "  " + e.getMessage());
+        log.error("Failed to process line: {} {}", line, e.getMessage());
       }
     }
 
@@ -183,7 +186,7 @@
           value = "" + entry.getValue().d;
       }
 
-      System.out.println(sdf.format(new Date(entry.getKey())) + " " + value);
+      log.info(sdf.format(new Date(entry.getKey())) + " " + value);
     }
 
   }
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/UndefinedAnalyzer.java b/src/main/java/org/apache/accumulo/testing/continuous/UndefinedAnalyzer.java
index ebf42dc..61d28ea 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/UndefinedAnalyzer.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/UndefinedAnalyzer.java
@@ -41,6 +41,8 @@
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.testing.cli.ClientOpts;
 import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.beust.jcommander.Parameter;
 
@@ -51,6 +53,7 @@
  * have old dates in them.
  */
 public class UndefinedAnalyzer {
+  private static final Logger logger = LoggerFactory.getLogger(UndefinedAnalyzer.class);
 
   static class UndefinedNode {
 
@@ -90,7 +93,7 @@
           String uuid = tokens[2];
 
           if (flushes.containsKey(uuid)) {
-            System.err.println("WARN Duplicate uuid " + log);
+            logger.error("WARN Duplicate uuid " + log);
             return;
           }
 
@@ -101,7 +104,7 @@
 
         }
         if (tm == null) {
-          System.err.println("WARN Bad ingest log " + log);
+          logger.error("WARN Bad ingest log " + log);
           return;
         }
 
@@ -203,16 +206,13 @@
                     : tablet.substring(pos1 + 1, pos2);
                 String prevEndRow = tablet.charAt(pos2) == '<' ? "" : tablet.substring(pos2 + 1);
                 if (tid.equals(tableId)) {
-                  // System.out.println(" "+server+" "+tid+" "+endRow+" "+prevEndRow);
                   Date date = sdf.parse(day + " " + time);
-                  // System.out.println(" "+date);
-
                   assignments.add(
                       new TabletAssignment(tablet, endRow, prevEndRow, server, date.getTime()));
 
                 }
               } else if (!tablet.startsWith("!0")) {
-                System.err.println("Cannot parse tablet " + tablet);
+                logger.error("Cannot parse tablet {}", tablet);
               }
             }
           }
@@ -317,15 +317,14 @@
             }
 
             if (ta == null)
-              System.out.println(
-                  undefinedNode.undef + " " + undefinedNode.ref + " " + uuid + " " + t1 + " " + t2);
+              logger.debug("{} {} {} {} {}", undefinedNode.undef, undefinedNode.ref, uuid, t1, t2);
             else
-              System.out.println(undefinedNode.undef + " " + undefinedNode.ref + " " + ta.tablet
-                  + " " + ta.server + " " + uuid + " " + t1 + " " + t2);
+              logger.debug("{} {} {} {} {}", undefinedNode.undef, undefinedNode.ref, ta.tablet,
+                  ta.server, uuid, t1, t2);
 
           }
         } else {
-          System.out.println(undefinedNode.undef + " " + undefinedNode.ref);
+          logger.debug("{} {}", undefinedNode.undef, undefinedNode.ref);
         }
       }
     }