Added perf test for yielding+scan dispatching (#56)

Test running lots of short scans while many filters that return little
data are running in the background.  If nothing is done these filters
will prevent the short scans from running.  This test configures
Accumulo so that the short scans should get a chance to run.  It does
three things to facilitate the short scans : the filters yield, there
are two scan executors, and a scan dispatcher sends long running scans
to the second executor. If yielding and dispatching are working
correctly then the short scans should have very short response times.
This happens because the filters should end up in a separate thread pool
than the short scan.

Also fixed some bugs with the performance test framework.
diff --git a/README.md b/README.md
index f9acb26..463c52c 100644
--- a/README.md
+++ b/README.md
@@ -165,6 +165,11 @@
 wiping, and confguring an Accumulo instance.  This script should define the following functions.
 
 ```bash
+
+function get_hadoop_client {
+  # TODO return hadoop client libs in a form suitable for appending to a classpath
+}
+
 function get_version {
   case $1 in
     ACCUMULO)
diff --git a/bin/performance b/bin/performance
index d062ecf..20d8e8d 100755
--- a/bin/performance
+++ b/bin/performance
@@ -28,12 +28,13 @@
   run <output dir> [filter]        Runs performance tests.
   compare <result 1> <result 2>    Compares results of two test.
   csv {files}                      Converts results to CSV
+  list                             List the performance test
 EOF
 }
 
 
 function build_shade_jar() {
-  at_shaded_jar="$at_home/core/target/accumulo-testing-$at_version-shaded.jar"
+  at_shaded_jar="$at_home/target/accumulo-testing-shaded.jar"
   if [ ! -f "$at_shaded_jar" ]; then
     echo "Building $at_shaded_jar"
     cd "$at_home" || exit 1
@@ -58,7 +59,7 @@
 
 . $at_home/conf/cluster-control.sh
 build_shade_jar
-CP="$at_home/core/target/accumulo-testing-$at_version-shaded.jar"
+CP="$at_home/target/accumulo-testing-shaded.jar:$(get_hadoop_client)"
 perf_pkg="org.apache.accumulo.testing.performance.impl"
 case "$1" in
   run)
@@ -77,7 +78,7 @@
         get_config_file accumulo.properties "$pt_tmp"
         CLASSPATH="$CP" java -Dlog4j.configuration="file:$log4j_config"  ${perf_pkg}.MergeSiteConfig "$test_class" "$pt_tmp"
         put_config_file "$pt_tmp/accumulo.properties"
-        put_server_code "$at_home/core/target/accumulo-testing-core-$at_version.jar"
+        put_server_code "$at_home/target/accumulo-testing-$at_version.jar"
         start_accumulo
         get_config_file accumulo-client.properties "$pt_tmp"
         CLASSPATH="$CP" java -Dlog4j.configuration="file:$log4j_config"  ${perf_pkg}.PerfTestRunner "$pt_tmp/accumulo-client.properties" "$test_class" "$(get_version 'ACCUMULO')" "$2"
@@ -91,6 +92,9 @@
   csv)
     CLASSPATH="$CP" java -Dlog4j.configuration="file:$log4j_config"  ${perf_pkg}.Csv "${@:2}"
     ;;
+  list)
+    CLASSPATH="$CP" java -Dlog4j.configuration="file:$log4j_config"  ${perf_pkg}.ListTests
+    ;;
   *)
     echo "Unknown command : $1"
     print_usage
diff --git a/conf/cluster-control.sh.uno b/conf/cluster-control.sh.uno
index 678ee35..a98d520 100644
--- a/conf/cluster-control.sh.uno
+++ b/conf/cluster-control.sh.uno
@@ -22,9 +22,13 @@
   echo "$($UNO env | grep ACCUMULO_HOME | sed 's/export ACCUMULO_HOME=//' | sed 's/"//g')"
 }
 
-
 # functions required for accumulo testing cluster control
 
+function get_hadoop_client {
+  echo "$($UNO env | grep HADOOP_HOME | sed 's/export HADOOP_HOME=//' | sed 's/"//g')/share/hadoop/client/*"
+}
+
+
 function get_version {
   case $1 in
     ACCUMULO)
diff --git a/contrib/import-control.xml b/contrib/import-control.xml
index ce449fc..af247c4 100644
--- a/contrib/import-control.xml
+++ b/contrib/import-control.xml
@@ -29,6 +29,10 @@
     <allow pkg="org.apache.accumulo.minicluster"/>
     <allow pkg="org.apache.accumulo.hadoop.mapreduce"/>
 
+    <!-- SPI package -->
+    <allow pkg="org.apache.accumulo.core.spi"/>
+
+
     <!-- exceptions for testing -->
     <allow pkg="org.apache.accumulo.core.conf"/>
 
diff --git a/pom.xml b/pom.xml
index a1a07be..b1a606c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -33,7 +33,7 @@
   <description>Testing tools for Apache Accumulo</description>
 
   <properties>
-    <accumulo.version>2.0.0-SNAPSHOT</accumulo.version>
+    <accumulo.version>2.0.0-alpha-2</accumulo.version>
     <hadoop.version>3.0.3</hadoop.version>
     <zookeeper.version>3.4.9</zookeeper.version>
     <slf4j.version>1.7.21</slf4j.version>
diff --git a/src/main/java/org/apache/accumulo/testing/performance/tests/ProbabilityFilter.java b/src/main/java/org/apache/accumulo/testing/performance/tests/ProbabilityFilter.java
new file mode 100644
index 0000000..aa807e0
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/performance/tests/ProbabilityFilter.java
@@ -0,0 +1,29 @@
+package org.apache.accumulo.testing.performance.tests;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Random;
+import java.util.function.BiPredicate;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+
+public class ProbabilityFilter extends YieldingFilter {
+
+  private double matchProbability;
+
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
+      IteratorEnvironment env) throws IOException {
+    super.init(source, options, env);
+    this.matchProbability = Double.parseDouble(options.get("probability"));
+  }
+
+  @Override
+  protected BiPredicate<Key, Value> createPredicate() {
+    Random rand = new Random();
+    return (k,v) -> rand.nextDouble() < matchProbability;
+  }
+}
diff --git a/src/main/java/org/apache/accumulo/testing/performance/tests/TimedScanDispatcher.java b/src/main/java/org/apache/accumulo/testing/performance/tests/TimedScanDispatcher.java
new file mode 100644
index 0000000..7058bf9
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/performance/tests/TimedScanDispatcher.java
@@ -0,0 +1,29 @@
+package org.apache.accumulo.testing.performance.tests;
+
+import org.apache.accumulo.core.spi.scan.ScanDispatcher;
+import org.apache.accumulo.core.spi.scan.ScanInfo;
+
+public class TimedScanDispatcher implements ScanDispatcher {
+
+  String quickExecutor;
+  long quickTime;
+
+  String longExectuor;
+
+  public void init(InitParameters params) {
+    quickExecutor = params.getOptions().get("quick.executor");
+    quickTime = Long.parseLong(params.getOptions().get("quick.time.ms"));
+
+    longExectuor = params.getOptions().get("long.executor");
+  }
+
+  @Override
+  public String dispatch(DispatchParmaters params) {
+    ScanInfo scanInfo = params.getScanInfo();
+
+    if (scanInfo.getRunTimeStats().sum() < quickTime)
+      return quickExecutor;
+
+    return longExectuor;
+  }
+}
diff --git a/src/main/java/org/apache/accumulo/testing/performance/tests/YieldingFilter.java b/src/main/java/org/apache/accumulo/testing/performance/tests/YieldingFilter.java
new file mode 100644
index 0000000..f514059
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/performance/tests/YieldingFilter.java
@@ -0,0 +1,85 @@
+package org.apache.accumulo.testing.performance.tests;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.function.BiPredicate;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.YieldCallback;
+import org.slf4j.LoggerFactory;
+
+public abstract class YieldingFilter implements SortedKeyValueIterator<Key,Value> {
+
+  private SortedKeyValueIterator<Key,Value> source;
+  private BiPredicate<Key,Value> predicate;
+  private YieldCallback<Key> yield;
+  private long yieldTime;
+
+  protected abstract BiPredicate<Key,Value> createPredicate();
+
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
+      IteratorEnvironment env) throws IOException {
+    this.source = source;
+    this.predicate = createPredicate();
+    this.yieldTime = Long.parseLong(options.getOrDefault("yieldTimeMS", "100"));
+  }
+
+  protected void findTop() throws IOException {
+    long start = System.nanoTime();
+    while (source.hasTop() && !source.getTopKey().isDeleted()
+        && !predicate.test(source.getTopKey(), source.getTopValue())) {
+      long duration = (System.nanoTime() - start) / 1000000;
+      if (duration > yieldTime) {
+        yield.yield(source.getTopKey());
+        break;
+      }
+
+      source.next();
+    }
+  }
+
+  @Override
+  public boolean hasTop() {
+    return !yield.hasYielded() && source.hasTop();
+  }
+
+  @Override
+  public void next() throws IOException {
+    source.next();
+    findTop();
+  }
+
+  @Override
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
+      throws IOException {
+    source.seek(range, columnFamilies, inclusive);
+    findTop();
+  }
+
+  @Override
+  public Key getTopKey() {
+    return source.getTopKey();
+  }
+
+  @Override
+  public Value getTopValue() {
+    return source.getTopValue();
+  }
+
+  @Override
+  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void enableYielding(YieldCallback<Key> callback) {
+    this.yield = callback;
+  }
+}
diff --git a/src/main/java/org/apache/accumulo/testing/performance/tests/YieldingScanExecutorPT.java b/src/main/java/org/apache/accumulo/testing/performance/tests/YieldingScanExecutorPT.java
new file mode 100644
index 0000000..dd94d37
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/performance/tests/YieldingScanExecutorPT.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.testing.performance.tests;
+
+import java.util.HashMap;
+import java.util.LongSummaryStatistics;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.testing.performance.Environment;
+import org.apache.accumulo.testing.performance.PerformanceTest;
+import org.apache.accumulo.testing.performance.Report;
+import org.apache.accumulo.testing.performance.SystemConfiguration;
+import org.apache.accumulo.testing.performance.util.TestData;
+import org.apache.accumulo.testing.performance.util.TestExecutor;
+import org.apache.hadoop.io.Text;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+
+public class YieldingScanExecutorPT implements PerformanceTest {
+
+  private static final int NUM_SHORT_SCANS_THREADS = 5;
+  private static final int NUM_LONG_SCANS = 50;
+
+  private static final int NUM_ROWS = 100000;
+  private static final int NUM_FAMS = 10;
+  private static final int NUM_QUALS = 10;
+
+  private static final String SCAN_EXECUTOR_THREADS = "2";
+
+  private static final String TEST_DESC = "Scan Executor Test.  Test running lots of short scans "
+      + "while many filters that return little data are running in the background.  If nothing is "
+      + "done these filters will prevent the short scans from running.  This test configures "
+      + "Accumulo so that the short scans should get a chance to run.  It does three things to "
+      + "facilitate the short scans : the filters yield, there a two scan executors, and a scan "
+      + "dispatcher sends long running scans to the second executor. If yielding and dispatching "
+      + "are working correctly then the short scans should have very short response times.  This "
+      + "happens because the filters should end up in a separate thread pool than the short scan.";
+
+  private static final String FILTER_PROBABILITY = "0.000001";
+  private static final String FILTER_YIELD_TIME = "1000";
+
+  private static final String QUICK_SCAN_TIME = "500";
+
+  @Override
+  public SystemConfiguration getSystemConfig() {
+    Map<String,String> siteCfg = new HashMap<>();
+
+    siteCfg.put(Property.TSERV_SCAN_MAX_OPENFILES.getKey(), "200");
+    siteCfg.put(Property.TSERV_MINTHREADS.getKey(), "200");
+    siteCfg.put(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + "se1.threads",
+        SCAN_EXECUTOR_THREADS);
+    siteCfg.put(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + "se2.threads",
+        SCAN_EXECUTOR_THREADS);
+    return new SystemConfiguration().setAccumuloConfig(siteCfg);
+  }
+
+  @Override
+  public Report runTest(Environment env) throws Exception {
+
+    String tableName = "scept";
+
+    Map<String,String> props = new HashMap<>();
+    // set up a scan dispatcher that send long runnning scans (> 500ms) to the second executor
+    props.put(Property.TABLE_SCAN_DISPATCHER.getKey(), TimedScanDispatcher.class.getName());
+    props.put(Property.TABLE_SCAN_DISPATCHER_OPTS.getKey() + "quick.executor", "se1");
+    props.put(Property.TABLE_SCAN_DISPATCHER_OPTS.getKey() + "quick.time.ms", QUICK_SCAN_TIME);
+    props.put(Property.TABLE_SCAN_DISPATCHER_OPTS.getKey() + "long.executor", "se2");
+
+    env.getClient().tableOperations().create(tableName,
+        new NewTableConfiguration().setProperties(props));
+
+    long t1 = System.currentTimeMillis();
+    TestData.generate(env.getClient(), tableName, NUM_ROWS, NUM_FAMS, NUM_QUALS);
+    long t2 = System.currentTimeMillis();
+    env.getClient().tableOperations().compact(tableName, null, null, true, true);
+    long t3 = System.currentTimeMillis();
+
+    AtomicBoolean stop = new AtomicBoolean(false);
+
+    TestExecutor<Long> longScans = startLongScans(env, tableName, stop);
+
+    LongSummaryStatistics shortStats1 = runShortScans(env, tableName, 50000);
+    LongSummaryStatistics shortStats2 = runShortScans(env, tableName, 100000);
+
+    stop.set(true);
+    long t4 = System.currentTimeMillis();
+
+    LongSummaryStatistics longStats = longScans.stream().mapToLong(l -> l).summaryStatistics();
+
+    longScans.close();
+
+    Report.Builder builder = Report.builder();
+
+    builder.id("yfexec").description(TEST_DESC);
+    builder.info("write", NUM_ROWS * NUM_FAMS * NUM_QUALS, t2 - t1, "Data write rate entries/sec ");
+    builder.info("compact", NUM_ROWS * NUM_FAMS * NUM_QUALS, t3 - t2, "Compact rate entries/sec ");
+    builder.info("short_times1", shortStats1, "Times in ms for each short scan.  First run.");
+    builder.info("short_times2", shortStats2, "Times in ms for each short scan. Second run.");
+    builder.result("short", shortStats2.getAverage(),
+        "Average times in ms for short scans from 2nd run.");
+    builder.info("long_counts", longStats, "Entries read by each of the filter threads");
+    builder.info("long", longStats.getSum(), (t4 - t3),
+        "Combined rate in entries/second of all long scans.  This should be low but non-zero.");
+    builder.parameter("short_threads", NUM_SHORT_SCANS_THREADS, "Threads used to run short scans.");
+    builder.parameter("long_threads", NUM_LONG_SCANS,
+        "Threads running long fileter scans.  Each thread repeatedly scans entire table for "
+        + "duration of test randomly returning a few of the keys.");
+    builder.parameter("rows", NUM_ROWS, "Rows in test table");
+    builder.parameter("familes", NUM_FAMS, "Families per row in test table");
+    builder.parameter("qualifiers", NUM_QUALS, "Qualifiers per family in test table");
+    builder.parameter("server_scan_threads", SCAN_EXECUTOR_THREADS,
+        "Server side scan handler threads that each executor has.  There are 2 executors.");
+
+    builder.parameter("filter_probability", FILTER_PROBABILITY, "The chance that one of the long "
+        + "filter scans will return any key it sees.");
+    builder.parameter("filter_yield_time", FILTER_YIELD_TIME, "The time in ms after which one of "
+        + "the long filter scans will yield.");
+    builder.parameter("quick_scan_time", QUICK_SCAN_TIME, "The threshold time in ms for deciding "
+        + "what is a quick vs long scan.  Times less than this are sent to one executor and longer "
+        + "times are sent to another.");
+
+    return builder.build();
+  }
+
+  private static long scan(String tableName, AccumuloClient c, byte[] row, byte[] fam,
+      Map<String,String> hints) throws TableNotFoundException {
+    long t1 = System.currentTimeMillis();
+    int count = 0;
+    try (Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY)) {
+      // scanner.setExecutionHints(hints);
+      scanner.setRange(Range.exact(new Text(row), new Text(fam)));
+      if (Iterables.size(scanner) != NUM_QUALS) {
+        throw new RuntimeException("bad count " + count);
+      }
+    }
+
+    return System.currentTimeMillis() - t1;
+  }
+
+  private long scan(String tableName, AccumuloClient c, AtomicBoolean stop, Map<String,String> hints)
+      throws TableNotFoundException {
+    long count = 0;
+    while (!stop.get()) {
+      try (Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY)) {
+
+        IteratorSetting is = new IteratorSetting(30, ProbabilityFilter.class);
+        is.addOption("probability", FILTER_PROBABILITY);
+        is.addOption("yieldTimeMS", FILTER_YIELD_TIME);
+
+        scanner.addScanIterator(is);
+
+        // scanner.setExecutionHints(hints);
+        for (Entry<Key,Value> entry : scanner) {
+          count++;
+          if (stop.get()) {
+            return count;
+          }
+        }
+      }
+    }
+
+    return count;
+  }
+
+  private LongSummaryStatistics runShortScans(Environment env, String tableName, int numScans)
+      throws InterruptedException, ExecutionException {
+
+    Map<String,String> execHints = ImmutableMap.of("executor", "se2");
+    Map<String,String> prioHints = ImmutableMap.of("priority", "1");
+
+    try (TestExecutor<Long> executor = new TestExecutor<>(NUM_SHORT_SCANS_THREADS)) {
+      Random rand = new Random();
+
+      for (int i = 0; i < numScans; i++) {
+        byte[] row = TestData.row(rand.nextInt(NUM_ROWS));
+        byte[] fam = TestData.fam(rand.nextInt(NUM_FAMS));
+        // scans have a 20% chance of getting dedicated thread pool and 80% chance of getting high
+        // priority
+        Map<String,String> hints = rand.nextInt(10) <= 1 ? execHints : prioHints;
+        executor.submit(() -> scan(tableName, env.getClient(), row, fam, hints));
+      }
+
+      return executor.stream().mapToLong(l -> l).summaryStatistics();
+    }
+  }
+
+  private TestExecutor<Long> startLongScans(Environment env, String tableName, AtomicBoolean stop) {
+    Map<String,String> hints = ImmutableMap.of("priority", "2");
+
+    TestExecutor<Long> longScans = new TestExecutor<>(NUM_LONG_SCANS);
+
+    for (int i = 0; i < NUM_LONG_SCANS; i++) {
+      longScans.submit(() -> scan(tableName, env.getClient(), stop, hints));
+    }
+    return longScans;
+  }
+}