Update japi readme and master config (#55)

* Update japi readme and master config

* Update japi-accumulo-master.xml

Remove old jar
diff --git a/README.md b/README.md
index f9acb26..463c52c 100644
--- a/README.md
+++ b/README.md
@@ -165,6 +165,11 @@
 wiping, and confguring an Accumulo instance.  This script should define the following functions.
 
 ```bash
+
+function get_hadoop_client {
+  # TODO return hadoop client libs in a form suitable for appending to a classpath
+}
+
 function get_version {
   case $1 in
     ACCUMULO)
diff --git a/bin/performance b/bin/performance
index d062ecf..20d8e8d 100755
--- a/bin/performance
+++ b/bin/performance
@@ -28,12 +28,13 @@
   run <output dir> [filter]        Runs performance tests.
   compare <result 1> <result 2>    Compares results of two test.
   csv {files}                      Converts results to CSV
+  list                             List the performance test
 EOF
 }
 
 
 function build_shade_jar() {
-  at_shaded_jar="$at_home/core/target/accumulo-testing-$at_version-shaded.jar"
+  at_shaded_jar="$at_home/target/accumulo-testing-shaded.jar"
   if [ ! -f "$at_shaded_jar" ]; then
     echo "Building $at_shaded_jar"
     cd "$at_home" || exit 1
@@ -58,7 +59,7 @@
 
 . $at_home/conf/cluster-control.sh
 build_shade_jar
-CP="$at_home/core/target/accumulo-testing-$at_version-shaded.jar"
+CP="$at_home/target/accumulo-testing-shaded.jar:$(get_hadoop_client)"
 perf_pkg="org.apache.accumulo.testing.performance.impl"
 case "$1" in
   run)
@@ -77,7 +78,7 @@
         get_config_file accumulo.properties "$pt_tmp"
         CLASSPATH="$CP" java -Dlog4j.configuration="file:$log4j_config"  ${perf_pkg}.MergeSiteConfig "$test_class" "$pt_tmp"
         put_config_file "$pt_tmp/accumulo.properties"
-        put_server_code "$at_home/core/target/accumulo-testing-core-$at_version.jar"
+        put_server_code "$at_home/target/accumulo-testing-$at_version.jar"
         start_accumulo
         get_config_file accumulo-client.properties "$pt_tmp"
         CLASSPATH="$CP" java -Dlog4j.configuration="file:$log4j_config"  ${perf_pkg}.PerfTestRunner "$pt_tmp/accumulo-client.properties" "$test_class" "$(get_version 'ACCUMULO')" "$2"
@@ -91,6 +92,9 @@
   csv)
     CLASSPATH="$CP" java -Dlog4j.configuration="file:$log4j_config"  ${perf_pkg}.Csv "${@:2}"
     ;;
+  list)
+    CLASSPATH="$CP" java -Dlog4j.configuration="file:$log4j_config"  ${perf_pkg}.ListTests
+    ;;
   *)
     echo "Unknown command : $1"
     print_usage
diff --git a/conf/cluster-control.sh.uno b/conf/cluster-control.sh.uno
index 678ee35..a98d520 100644
--- a/conf/cluster-control.sh.uno
+++ b/conf/cluster-control.sh.uno
@@ -22,9 +22,13 @@
   echo "$($UNO env | grep ACCUMULO_HOME | sed 's/export ACCUMULO_HOME=//' | sed 's/"//g')"
 }
 
-
 # functions required for accumulo testing cluster control
 
+function get_hadoop_client {
+  echo "$($UNO env | grep HADOOP_HOME | sed 's/export HADOOP_HOME=//' | sed 's/"//g')/share/hadoop/client/*"
+}
+
+
 function get_version {
   case $1 in
     ACCUMULO)
diff --git a/contrib/import-control.xml b/contrib/import-control.xml
index ce449fc..af247c4 100644
--- a/contrib/import-control.xml
+++ b/contrib/import-control.xml
@@ -29,6 +29,10 @@
     <allow pkg="org.apache.accumulo.minicluster"/>
     <allow pkg="org.apache.accumulo.hadoop.mapreduce"/>
 
+    <!-- SPI package -->
+    <allow pkg="org.apache.accumulo.core.spi"/>
+
+
     <!-- exceptions for testing -->
     <allow pkg="org.apache.accumulo.core.conf"/>
 
diff --git a/pom.xml b/pom.xml
index a1a07be..b1a606c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -33,7 +33,7 @@
   <description>Testing tools for Apache Accumulo</description>
 
   <properties>
-    <accumulo.version>2.0.0-SNAPSHOT</accumulo.version>
+    <accumulo.version>2.0.0-alpha-2</accumulo.version>
     <hadoop.version>3.0.3</hadoop.version>
     <zookeeper.version>3.4.9</zookeeper.version>
     <slf4j.version>1.7.21</slf4j.version>
diff --git a/src/main/java/org/apache/accumulo/testing/performance/tests/ProbabilityFilter.java b/src/main/java/org/apache/accumulo/testing/performance/tests/ProbabilityFilter.java
new file mode 100644
index 0000000..aa807e0
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/performance/tests/ProbabilityFilter.java
@@ -0,0 +1,29 @@
+package org.apache.accumulo.testing.performance.tests;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Random;
+import java.util.function.BiPredicate;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+
+public class ProbabilityFilter extends YieldingFilter {
+
+  private double matchProbability;
+
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
+      IteratorEnvironment env) throws IOException {
+    super.init(source, options, env);
+    this.matchProbability = Double.parseDouble(options.get("probability"));
+  }
+
+  @Override
+  protected BiPredicate<Key, Value> createPredicate() {
+    Random rand = new Random();
+    return (k,v) -> rand.nextDouble() < matchProbability;
+  }
+}
diff --git a/src/main/java/org/apache/accumulo/testing/performance/tests/TimedScanDispatcher.java b/src/main/java/org/apache/accumulo/testing/performance/tests/TimedScanDispatcher.java
new file mode 100644
index 0000000..7058bf9
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/performance/tests/TimedScanDispatcher.java
@@ -0,0 +1,29 @@
+package org.apache.accumulo.testing.performance.tests;
+
+import org.apache.accumulo.core.spi.scan.ScanDispatcher;
+import org.apache.accumulo.core.spi.scan.ScanInfo;
+
+public class TimedScanDispatcher implements ScanDispatcher {
+
+  String quickExecutor;
+  long quickTime;
+
+  String longExectuor;
+
+  public void init(InitParameters params) {
+    quickExecutor = params.getOptions().get("quick.executor");
+    quickTime = Long.parseLong(params.getOptions().get("quick.time.ms"));
+
+    longExectuor = params.getOptions().get("long.executor");
+  }
+
+  @Override
+  public String dispatch(DispatchParmaters params) {
+    ScanInfo scanInfo = params.getScanInfo();
+
+    if (scanInfo.getRunTimeStats().sum() < quickTime)
+      return quickExecutor;
+
+    return longExectuor;
+  }
+}
diff --git a/src/main/java/org/apache/accumulo/testing/performance/tests/YieldingFilter.java b/src/main/java/org/apache/accumulo/testing/performance/tests/YieldingFilter.java
new file mode 100644
index 0000000..f514059
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/performance/tests/YieldingFilter.java
@@ -0,0 +1,85 @@
+package org.apache.accumulo.testing.performance.tests;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.function.BiPredicate;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.YieldCallback;
+import org.slf4j.LoggerFactory;
+
+public abstract class YieldingFilter implements SortedKeyValueIterator<Key,Value> {
+
+  private SortedKeyValueIterator<Key,Value> source;
+  private BiPredicate<Key,Value> predicate;
+  private YieldCallback<Key> yield;
+  private long yieldTime;
+
+  protected abstract BiPredicate<Key,Value> createPredicate();
+
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
+      IteratorEnvironment env) throws IOException {
+    this.source = source;
+    this.predicate = createPredicate();
+    this.yieldTime = Long.parseLong(options.getOrDefault("yieldTimeMS", "100"));
+  }
+
+  protected void findTop() throws IOException {
+    long start = System.nanoTime();
+    while (source.hasTop() && !source.getTopKey().isDeleted()
+        && !predicate.test(source.getTopKey(), source.getTopValue())) {
+      long duration = (System.nanoTime() - start) / 1000000;
+      if (duration > yieldTime) {
+        yield.yield(source.getTopKey());
+        break;
+      }
+
+      source.next();
+    }
+  }
+
+  @Override
+  public boolean hasTop() {
+    return !yield.hasYielded() && source.hasTop();
+  }
+
+  @Override
+  public void next() throws IOException {
+    source.next();
+    findTop();
+  }
+
+  @Override
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
+      throws IOException {
+    source.seek(range, columnFamilies, inclusive);
+    findTop();
+  }
+
+  @Override
+  public Key getTopKey() {
+    return source.getTopKey();
+  }
+
+  @Override
+  public Value getTopValue() {
+    return source.getTopValue();
+  }
+
+  @Override
+  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void enableYielding(YieldCallback<Key> callback) {
+    this.yield = callback;
+  }
+}
diff --git a/src/main/java/org/apache/accumulo/testing/performance/tests/YieldingScanExecutorPT.java b/src/main/java/org/apache/accumulo/testing/performance/tests/YieldingScanExecutorPT.java
new file mode 100644
index 0000000..dd94d37
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/performance/tests/YieldingScanExecutorPT.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.testing.performance.tests;
+
+import java.util.HashMap;
+import java.util.LongSummaryStatistics;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.testing.performance.Environment;
+import org.apache.accumulo.testing.performance.PerformanceTest;
+import org.apache.accumulo.testing.performance.Report;
+import org.apache.accumulo.testing.performance.SystemConfiguration;
+import org.apache.accumulo.testing.performance.util.TestData;
+import org.apache.accumulo.testing.performance.util.TestExecutor;
+import org.apache.hadoop.io.Text;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+
+public class YieldingScanExecutorPT implements PerformanceTest {
+
+  private static final int NUM_SHORT_SCANS_THREADS = 5;
+  private static final int NUM_LONG_SCANS = 50;
+
+  private static final int NUM_ROWS = 100000;
+  private static final int NUM_FAMS = 10;
+  private static final int NUM_QUALS = 10;
+
+  private static final String SCAN_EXECUTOR_THREADS = "2";
+
+  private static final String TEST_DESC = "Scan Executor Test.  Test running lots of short scans "
+      + "while many filters that return little data are running in the background.  If nothing is "
+      + "done these filters will prevent the short scans from running.  This test configures "
+      + "Accumulo so that the short scans should get a chance to run.  It does three things to "
+      + "facilitate the short scans : the filters yield, there a two scan executors, and a scan "
+      + "dispatcher sends long running scans to the second executor. If yielding and dispatching "
+      + "are working correctly then the short scans should have very short response times.  This "
+      + "happens because the filters should end up in a separate thread pool than the short scan.";
+
+  private static final String FILTER_PROBABILITY = "0.000001";
+  private static final String FILTER_YIELD_TIME = "1000";
+
+  private static final String QUICK_SCAN_TIME = "500";
+
+  @Override
+  public SystemConfiguration getSystemConfig() {
+    Map<String,String> siteCfg = new HashMap<>();
+
+    siteCfg.put(Property.TSERV_SCAN_MAX_OPENFILES.getKey(), "200");
+    siteCfg.put(Property.TSERV_MINTHREADS.getKey(), "200");
+    siteCfg.put(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + "se1.threads",
+        SCAN_EXECUTOR_THREADS);
+    siteCfg.put(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + "se2.threads",
+        SCAN_EXECUTOR_THREADS);
+    return new SystemConfiguration().setAccumuloConfig(siteCfg);
+  }
+
+  @Override
+  public Report runTest(Environment env) throws Exception {
+
+    String tableName = "scept";
+
+    Map<String,String> props = new HashMap<>();
+    // set up a scan dispatcher that send long runnning scans (> 500ms) to the second executor
+    props.put(Property.TABLE_SCAN_DISPATCHER.getKey(), TimedScanDispatcher.class.getName());
+    props.put(Property.TABLE_SCAN_DISPATCHER_OPTS.getKey() + "quick.executor", "se1");
+    props.put(Property.TABLE_SCAN_DISPATCHER_OPTS.getKey() + "quick.time.ms", QUICK_SCAN_TIME);
+    props.put(Property.TABLE_SCAN_DISPATCHER_OPTS.getKey() + "long.executor", "se2");
+
+    env.getClient().tableOperations().create(tableName,
+        new NewTableConfiguration().setProperties(props));
+
+    long t1 = System.currentTimeMillis();
+    TestData.generate(env.getClient(), tableName, NUM_ROWS, NUM_FAMS, NUM_QUALS);
+    long t2 = System.currentTimeMillis();
+    env.getClient().tableOperations().compact(tableName, null, null, true, true);
+    long t3 = System.currentTimeMillis();
+
+    AtomicBoolean stop = new AtomicBoolean(false);
+
+    TestExecutor<Long> longScans = startLongScans(env, tableName, stop);
+
+    LongSummaryStatistics shortStats1 = runShortScans(env, tableName, 50000);
+    LongSummaryStatistics shortStats2 = runShortScans(env, tableName, 100000);
+
+    stop.set(true);
+    long t4 = System.currentTimeMillis();
+
+    LongSummaryStatistics longStats = longScans.stream().mapToLong(l -> l).summaryStatistics();
+
+    longScans.close();
+
+    Report.Builder builder = Report.builder();
+
+    builder.id("yfexec").description(TEST_DESC);
+    builder.info("write", NUM_ROWS * NUM_FAMS * NUM_QUALS, t2 - t1, "Data write rate entries/sec ");
+    builder.info("compact", NUM_ROWS * NUM_FAMS * NUM_QUALS, t3 - t2, "Compact rate entries/sec ");
+    builder.info("short_times1", shortStats1, "Times in ms for each short scan.  First run.");
+    builder.info("short_times2", shortStats2, "Times in ms for each short scan. Second run.");
+    builder.result("short", shortStats2.getAverage(),
+        "Average times in ms for short scans from 2nd run.");
+    builder.info("long_counts", longStats, "Entries read by each of the filter threads");
+    builder.info("long", longStats.getSum(), (t4 - t3),
+        "Combined rate in entries/second of all long scans.  This should be low but non-zero.");
+    builder.parameter("short_threads", NUM_SHORT_SCANS_THREADS, "Threads used to run short scans.");
+    builder.parameter("long_threads", NUM_LONG_SCANS,
+        "Threads running long fileter scans.  Each thread repeatedly scans entire table for "
+        + "duration of test randomly returning a few of the keys.");
+    builder.parameter("rows", NUM_ROWS, "Rows in test table");
+    builder.parameter("familes", NUM_FAMS, "Families per row in test table");
+    builder.parameter("qualifiers", NUM_QUALS, "Qualifiers per family in test table");
+    builder.parameter("server_scan_threads", SCAN_EXECUTOR_THREADS,
+        "Server side scan handler threads that each executor has.  There are 2 executors.");
+
+    builder.parameter("filter_probability", FILTER_PROBABILITY, "The chance that one of the long "
+        + "filter scans will return any key it sees.");
+    builder.parameter("filter_yield_time", FILTER_YIELD_TIME, "The time in ms after which one of "
+        + "the long filter scans will yield.");
+    builder.parameter("quick_scan_time", QUICK_SCAN_TIME, "The threshold time in ms for deciding "
+        + "what is a quick vs long scan.  Times less than this are sent to one executor and longer "
+        + "times are sent to another.");
+
+    return builder.build();
+  }
+
+  private static long scan(String tableName, AccumuloClient c, byte[] row, byte[] fam,
+      Map<String,String> hints) throws TableNotFoundException {
+    long t1 = System.currentTimeMillis();
+    int count = 0;
+    try (Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY)) {
+      // scanner.setExecutionHints(hints);
+      scanner.setRange(Range.exact(new Text(row), new Text(fam)));
+      if (Iterables.size(scanner) != NUM_QUALS) {
+        throw new RuntimeException("bad count " + count);
+      }
+    }
+
+    return System.currentTimeMillis() - t1;
+  }
+
+  private long scan(String tableName, AccumuloClient c, AtomicBoolean stop, Map<String,String> hints)
+      throws TableNotFoundException {
+    long count = 0;
+    while (!stop.get()) {
+      try (Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY)) {
+
+        IteratorSetting is = new IteratorSetting(30, ProbabilityFilter.class);
+        is.addOption("probability", FILTER_PROBABILITY);
+        is.addOption("yieldTimeMS", FILTER_YIELD_TIME);
+
+        scanner.addScanIterator(is);
+
+        // scanner.setExecutionHints(hints);
+        for (Entry<Key,Value> entry : scanner) {
+          count++;
+          if (stop.get()) {
+            return count;
+          }
+        }
+      }
+    }
+
+    return count;
+  }
+
+  private LongSummaryStatistics runShortScans(Environment env, String tableName, int numScans)
+      throws InterruptedException, ExecutionException {
+
+    Map<String,String> execHints = ImmutableMap.of("executor", "se2");
+    Map<String,String> prioHints = ImmutableMap.of("priority", "1");
+
+    try (TestExecutor<Long> executor = new TestExecutor<>(NUM_SHORT_SCANS_THREADS)) {
+      Random rand = new Random();
+
+      for (int i = 0; i < numScans; i++) {
+        byte[] row = TestData.row(rand.nextInt(NUM_ROWS));
+        byte[] fam = TestData.fam(rand.nextInt(NUM_FAMS));
+        // scans have a 20% chance of getting dedicated thread pool and 80% chance of getting high
+        // priority
+        Map<String,String> hints = rand.nextInt(10) <= 1 ? execHints : prioHints;
+        executor.submit(() -> scan(tableName, env.getClient(), row, fam, hints));
+      }
+
+      return executor.stream().mapToLong(l -> l).summaryStatistics();
+    }
+  }
+
+  private TestExecutor<Long> startLongScans(Environment env, String tableName, AtomicBoolean stop) {
+    Map<String,String> hints = ImmutableMap.of("priority", "2");
+
+    TestExecutor<Long> longScans = new TestExecutor<>(NUM_LONG_SCANS);
+
+    for (int i = 0; i < NUM_LONG_SCANS; i++) {
+      longScans.submit(() -> scan(tableName, env.getClient(), stop, hints));
+    }
+    return longScans;
+  }
+}