Merge branch '2.1'
diff --git a/bin/cingest b/bin/cingest
index 4ef312c..26b20d4 100755
--- a/bin/cingest
+++ b/bin/cingest
@@ -39,6 +39,7 @@
     manysplits    Repeatedly lowers the split threshold on a table to create
                   many splits in order to test split performance
     bulk          Create RFiles in a Map Reduce job and calls importDirectory if successful
+    corrupt       Corrupts the first entry after the minimum row.  Use -o test.ci.ingest.row.min to change the minimum.
 EOF
 }
 
@@ -81,6 +82,9 @@
     fi
     ci_main="${ci_package}.BulkIngest"
     ;;
+  corrupt)
+     ci_main="${ci_package}.CorruptEntry"
+    ;;
   *)
     echo "Unknown application: $1"
     print_usage
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java
index 94a53bb..0dc13f5 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java
@@ -46,6 +46,9 @@
       super(msg);
     }
 
+    public BadChecksumException(String msg, Exception nfe) {
+      super(msg, nfe);
+    }
   }
 
   public static void main(String[] args) throws Exception {
@@ -154,7 +157,7 @@
     return null;
   }
 
-  private static int getChecksumOffset(byte[] val) {
+  static int getChecksumOffset(byte[] val) {
     if (val[val.length - 1] != ':') {
       if (val[val.length - 9] != ':')
         throw new IllegalArgumentException(new String(val, UTF_8));
@@ -169,7 +172,12 @@
     if (ckOff < 0)
       return;
 
-    long storedCksum = Long.parseLong(new String(value.get(), ckOff, 8, UTF_8), 16);
+    long storedCksum;
+    try {
+      storedCksum = Long.parseLong(new String(value.get(), ckOff, 8, UTF_8), 16);
+    } catch (NumberFormatException nfe) {
+      throw new BadChecksumException("Checksum invalid " + key + " " + value, nfe);
+    }
 
     CRC32 cksum = new CRC32();
 
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/CorruptEntry.java b/src/main/java/org/apache/accumulo/testing/continuous/CorruptEntry.java
new file mode 100644
index 0000000..bd71fa1
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/continuous/CorruptEntry.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.testing.continuous;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.hadoop.io.Text;
+
+public class CorruptEntry {
+  public static void main(String[] args) throws Exception {
+    try (ContinuousEnv env = new ContinuousEnv(args);
+        AccumuloClient client = env.getAccumuloClient();
+        var scanner = client.createScanner(env.getAccumuloTableName());
+        var writer = client.createBatchWriter(env.getAccumuloTableName())) {
+      final long rowMin = env.getRowMin();
+
+      var startRow = ContinuousIngest.genRow(rowMin);
+      new Range(new Text(startRow), null);
+      scanner.setRange(new Range(new Text(startRow), null));
+      var iter = scanner.iterator();
+      if (iter.hasNext()) {
+        var entry = iter.next();
+        byte[] val = entry.getValue().get();
+        int offset = ContinuousWalk.getChecksumOffset(val);
+        if (offset >= 0) {
+          for (int i = 0; i < 8; i++) {
+            if (val[i + offset] == 'f' || val[i + offset] == 'F') {
+              // in the case of an f hex char, set the hex char to 0
+              val[i + offset] = '0';
+
+            } else {
+              // increment the hex char in the checcksum
+              val[i + offset]++;
+            }
+          }
+          Key key = entry.getKey();
+          Mutation m = new Mutation(key.getRow());
+          m.at().family(key.getColumnFamily()).qualifier(key.getColumnQualifier())
+              .visibility(key.getColumnVisibility()).put(val);
+          writer.addMutation(m);
+          writer.flush();
+          System.out.println("Corrupted checksum value on key " + key);
+        }
+      } else {
+        throw new IllegalArgumentException("No entry found after " + new String(startRow, UTF_8));
+      }
+    }
+  }
+}
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/FlakyBulkBatchWriter.java b/src/main/java/org/apache/accumulo/testing/continuous/FlakyBulkBatchWriter.java
index 281b183..9259fa0 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/FlakyBulkBatchWriter.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/FlakyBulkBatchWriter.java
@@ -187,7 +187,7 @@
       long t2 = System.nanoTime();
 
       log.debug("Bulk imported dir {} destinations:{} mutations:{} memUsed:{} time:{}ms", tmpDir,
-          loadPlan.getDestinations().size(), mutations.size(), memUsed,
+          loadPlan.getDestinations().size(), keysValues.size(), memUsed,
           TimeUnit.NANOSECONDS.toMillis(t2 - t1));
 
       fileSystem.delete(tmpDir, true);
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ValidatingIterator.java b/src/main/java/org/apache/accumulo/testing/continuous/ValidatingIterator.java
new file mode 100644
index 0000000..500e7a5
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ValidatingIterator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.testing.continuous;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.iterators.WrappingIterator;
+
+/**
+ * Validate the checksum on each entry read in the continuous ingest table
+ */
+public class ValidatingIterator extends WrappingIterator {
+
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
+      throws IOException {
+    super.seek(range, columnFamilies, inclusive);
+    if (super.hasTop()) {
+      ContinuousWalk.validate(super.getTopKey(), super.getTopValue());
+    }
+  }
+
+  public void next() throws IOException {
+    super.next();
+    if (super.hasTop()) {
+      ContinuousWalk.validate(super.getTopKey(), super.getTopValue());
+    }
+  }
+}
diff --git a/test/compaction-failures/README.md b/test/compaction-failures/README.md
new file mode 100644
index 0000000..1b48b29
--- /dev/null
+++ b/test/compaction-failures/README.md
@@ -0,0 +1,53 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      https://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+
+Some scripts for testing different compaction failure scenarios.
+
+```bash
+# start compactors, half of which will always fail any compaction because they are missing an iterator class
+./setup-compactors.sh
+# starting ingest into table ci1
+./start-ingest.sh ci1 NORMAL
+# starting ingest into table ci2 with a non-existent compaction iterator configured, all compactions should fail on this table
+./start-ingest.sh ci2 BAD_ITER
+# starting ingest into table ci3 with a compaction service that has no compactors running, no compactions should ever run for this table
+./start-ingest.sh ci3 BAD_SERVICE
+# starting ingest into table ci4, corrupting data in a single tablet such that that tablet can never compact
+./start-ingest.sh ci4 BAD_TABLET
+```
+
+While test are running can use the following to monitor files per tablet on a table.
+
+```
+$ accumulo jshell
+Preparing JShell for Apache Accumulo
+
+Use 'client' to interact with Accumulo
+
+|  Welcome to JShell -- Version 17.0.15
+|  For an introduction type: /help intro
+
+jshell> /open count-file-per-tablet.jshell
+
+jshell> CFPT.printStats(client, "ci1", 3000)
+  0 secs min:20 avg:30.37 max:35
+  3 secs min:20 avg:30.28 max:35
+```
diff --git a/test/compaction-failures/count-file-per-tablet.jshell b/test/compaction-failures/count-file-per-tablet.jshell
new file mode 100644
index 0000000..5f3cf40
--- /dev/null
+++ b/test/compaction-failures/count-file-per-tablet.jshell
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Streams;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.RowIterator;
+import org.apache.accumulo.core.data.Range;
+
+// counts files per tablet for a table over time
+public class CFPT {
+
+    public static void printPerTabletFileStats(AccumuloClient client, String table, long startTime) throws Exception {
+        var tableId = client.tableOperations().tableIdMap().get(table);
+        try(var scanner = client.createScanner("accumulo.metadata")) {
+            scanner.setRange(new Range(tableId+":", tableId+"<"));
+            scanner.fetchColumnFamily("file");
+            RowIterator rowIterator = new RowIterator(scanner);
+            var stats = Streams.stream(rowIterator).mapToInt(Iterators::size).summaryStatistics();
+            long diff = (System.currentTimeMillis() -startTime)/1000;
+            System.out.printf("%3d secs min:%d avg:%.2f max:%d\n", diff, stats.getMin(),stats.getAverage(),stats.getMax() );
+        }
+    }
+
+    public static void printStats(AccumuloClient client, String table, long sleep) throws Exception {
+        long startTime = System.currentTimeMillis();
+        printPerTabletFileStats(client, table, startTime);
+
+        while(true) {
+            Thread.sleep(sleep);
+            printPerTabletFileStats(client, table, startTime);
+        }
+    }
+}
diff --git a/test/compaction-failures/setup-compactors.sh b/test/compaction-failures/setup-compactors.sh
new file mode 100755
index 0000000..8c28ae3
--- /dev/null
+++ b/test/compaction-failures/setup-compactors.sh
@@ -0,0 +1,60 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+# configure compaction services
+accumulo shell -u root -p secret <<EOF
+config -s tserver.compaction.major.service.cs1.planner=org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner
+config -s tserver.compaction.major.service.cs1.planner.opts.executors=[{"name":"small","type":"external","maxSize":"128M","queue":"small"},{"name":"large","type":"external","queue":"large"}]
+config -s tserver.compaction.major.service.cs2.planner=org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner
+config -s tserver.compaction.major.service.cs2.planner.opts.executors=[{"name":"small","type":"external","maxSize":"128M","queue":"emptysmall"},{"name":"large","type":"external","queue":"emptylarge"}]
+EOF
+
+mkdir -p logs
+
+pkill -f "Main compactor"
+pkill -f "Main compaction-coord"
+
+ACCUMULO_SERVICE_INSTANCE=coord accumulo compaction-coordinator  &> logs/coord.out &
+
+# get the absolute path of the the accumulo test non shaded test jar
+TEST_JAR=$(readlink -f $(ls ../../target/accumulo-testing-[0-9].*jar))
+
+# start 4 compactors with the test iterator on their classpath
+for i in $(seq 1 4); do
+	CLASSPATH=$TEST_JAR ACCUMULO_SERVICE_INSTANCE=compactor_small_$i accumulo compactor -q small &> logs/compactor-small-$i.out &
+done
+
+# start four compactors that do not have the test jar on their classpath.  Since
+# every table configures an iterator w/ the test jar, every compaction on these
+# compactors should fail
+for i in $(seq 5 8); do
+	ACCUMULO_SERVICE_INSTANCE=compactor_small_$i accumulo compactor -q small &> logs/compactor-small-$i.out &
+done
+
+# start 4 compactors for the large group w/ the test iterator on the classpath
+for i in $(seq 1 4); do
+	CLASSPATH=$TEST_JAR ACCUMULO_SERVICE_INSTANCE=compactor_large_$i accumulo compactor -q large &> logs/compactor-large-$i.out &
+done
+
+# start 4 compactors for the large group that are missing the iterator used by the test table
+for i in $(seq 5 8); do
+	ACCUMULO_SERVICE_INSTANCE=compactor_large_$i accumulo compactor -q large &> logs/compactor-large-$i.out &
+done
\ No newline at end of file
diff --git a/test/compaction-failures/start-ingest.sh b/test/compaction-failures/start-ingest.sh
new file mode 100755
index 0000000..9bebfb6
--- /dev/null
+++ b/test/compaction-failures/start-ingest.sh
@@ -0,0 +1,73 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+if [ "$#" != "2" ]; then
+  echo "Usage $0 <tablename> BAD_ITER|BAD_TABLET|BAD_SERVICE|NORMAL"
+  exit 1
+fi
+
+#accumulo-testing directory
+ATD=../..
+
+table=$1
+test_type=$2
+
+$ATD/bin/cingest createtable -o test.ci.common.accumulo.table=$table
+
+# setup a compaction time iterator and point the tablet to compaction service w/ external compactors
+accumulo shell -u root -p secret <<EOF
+config -t $table -s table.iterator.majc.validate=100,org.apache.accumulo.testing.continuous.ValidatingIterator
+config -t $table -s table.compaction.dispatcher.opts.service=cs1
+EOF
+
+case $test_type in
+BAD_ITER) accumulo shell -u root -p secret <<EOF
+config -t $table -s table.iterator.majc.validate=100,org.apache.accumulo.testing.continuous.MissingIter
+EOF
+  ;;
+BAD_TABLET)
+  # write a little bit of data
+  $ATD/bin/cingest ingest -o test.ci.common.accumulo.table=$table -o test.ci.ingest.client.entries=100000
+  # corrupt a checksum in an entry, this should cause the ValidatingIterator to fail when compacting that tablet
+  $ATD/bin/cingest corrupt -o test.ci.common.accumulo.table=$table -o test.ci.ingest.row.min=1000000000000000
+  accumulo shell -u root -p secret -e "flush -t $table"
+  ;;
+BAD_SERVICE) accumulo shell -u root -p secret <<EOF
+config -t $table -s table.compaction.dispatcher.opts.service=cs2
+EOF
+  ;;
+NORMAL) ;;
+*)
+    echo "Usage $0 <tablename> BAD_ITER|BAD_TABLET|BAD_SERVICE|NORMAL"
+    exit 1
+esac
+
+mkdir -p logs
+
+if [ "$test_type" == "NORMAL" ]; then
+  # start unlimited ingest into the table
+  $ATD/bin/cingest ingest -o test.ci.ingest.bulk.workdir=/ci_bulk -o test.ci.common.accumulo.table=$table &> logs/bulk-$table.log &
+else
+  # limit the amount of data written since tablets can not compact
+  # would not need to do this if the table.file.pause property existed in 2.1
+  $ATD/bin/cingest ingest -o test.ci.ingest.bulk.workdir=/ci_bulk -o test.ci.common.accumulo.table=$table -o test.ci.ingest.client.entries=10000000 &> logs/bulk-$table.log &
+fi
+