blob: dd94d37fec5678c8e020c62e39bc28cf7473daa5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.testing.performance.tests;
import java.util.HashMap;
import java.util.LongSummaryStatistics;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.testing.performance.Environment;
import org.apache.accumulo.testing.performance.PerformanceTest;
import org.apache.accumulo.testing.performance.Report;
import org.apache.accumulo.testing.performance.SystemConfiguration;
import org.apache.accumulo.testing.performance.util.TestData;
import org.apache.accumulo.testing.performance.util.TestExecutor;
import org.apache.hadoop.io.Text;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
public class YieldingScanExecutorPT implements PerformanceTest {
private static final int NUM_SHORT_SCANS_THREADS = 5;
private static final int NUM_LONG_SCANS = 50;
private static final int NUM_ROWS = 100000;
private static final int NUM_FAMS = 10;
private static final int NUM_QUALS = 10;
private static final String SCAN_EXECUTOR_THREADS = "2";
private static final String TEST_DESC = "Scan Executor Test. Test running lots of short scans "
+ "while many filters that return little data are running in the background. If nothing is "
+ "done these filters will prevent the short scans from running. This test configures "
+ "Accumulo so that the short scans should get a chance to run. It does three things to "
+ "facilitate the short scans : the filters yield, there a two scan executors, and a scan "
+ "dispatcher sends long running scans to the second executor. If yielding and dispatching "
+ "are working correctly then the short scans should have very short response times. This "
+ "happens because the filters should end up in a separate thread pool than the short scan.";
private static final String FILTER_PROBABILITY = "0.000001";
private static final String FILTER_YIELD_TIME = "1000";
private static final String QUICK_SCAN_TIME = "500";
@Override
public SystemConfiguration getSystemConfig() {
Map<String,String> siteCfg = new HashMap<>();
siteCfg.put(Property.TSERV_SCAN_MAX_OPENFILES.getKey(), "200");
siteCfg.put(Property.TSERV_MINTHREADS.getKey(), "200");
siteCfg.put(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + "se1.threads",
SCAN_EXECUTOR_THREADS);
siteCfg.put(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + "se2.threads",
SCAN_EXECUTOR_THREADS);
return new SystemConfiguration().setAccumuloConfig(siteCfg);
}
@Override
public Report runTest(Environment env) throws Exception {
String tableName = "scept";
Map<String,String> props = new HashMap<>();
// set up a scan dispatcher that send long runnning scans (> 500ms) to the second executor
props.put(Property.TABLE_SCAN_DISPATCHER.getKey(), TimedScanDispatcher.class.getName());
props.put(Property.TABLE_SCAN_DISPATCHER_OPTS.getKey() + "quick.executor", "se1");
props.put(Property.TABLE_SCAN_DISPATCHER_OPTS.getKey() + "quick.time.ms", QUICK_SCAN_TIME);
props.put(Property.TABLE_SCAN_DISPATCHER_OPTS.getKey() + "long.executor", "se2");
env.getClient().tableOperations().create(tableName,
new NewTableConfiguration().setProperties(props));
long t1 = System.currentTimeMillis();
TestData.generate(env.getClient(), tableName, NUM_ROWS, NUM_FAMS, NUM_QUALS);
long t2 = System.currentTimeMillis();
env.getClient().tableOperations().compact(tableName, null, null, true, true);
long t3 = System.currentTimeMillis();
AtomicBoolean stop = new AtomicBoolean(false);
TestExecutor<Long> longScans = startLongScans(env, tableName, stop);
LongSummaryStatistics shortStats1 = runShortScans(env, tableName, 50000);
LongSummaryStatistics shortStats2 = runShortScans(env, tableName, 100000);
stop.set(true);
long t4 = System.currentTimeMillis();
LongSummaryStatistics longStats = longScans.stream().mapToLong(l -> l).summaryStatistics();
longScans.close();
Report.Builder builder = Report.builder();
builder.id("yfexec").description(TEST_DESC);
builder.info("write", NUM_ROWS * NUM_FAMS * NUM_QUALS, t2 - t1, "Data write rate entries/sec ");
builder.info("compact", NUM_ROWS * NUM_FAMS * NUM_QUALS, t3 - t2, "Compact rate entries/sec ");
builder.info("short_times1", shortStats1, "Times in ms for each short scan. First run.");
builder.info("short_times2", shortStats2, "Times in ms for each short scan. Second run.");
builder.result("short", shortStats2.getAverage(),
"Average times in ms for short scans from 2nd run.");
builder.info("long_counts", longStats, "Entries read by each of the filter threads");
builder.info("long", longStats.getSum(), (t4 - t3),
"Combined rate in entries/second of all long scans. This should be low but non-zero.");
builder.parameter("short_threads", NUM_SHORT_SCANS_THREADS, "Threads used to run short scans.");
builder.parameter("long_threads", NUM_LONG_SCANS,
"Threads running long fileter scans. Each thread repeatedly scans entire table for "
+ "duration of test randomly returning a few of the keys.");
builder.parameter("rows", NUM_ROWS, "Rows in test table");
builder.parameter("familes", NUM_FAMS, "Families per row in test table");
builder.parameter("qualifiers", NUM_QUALS, "Qualifiers per family in test table");
builder.parameter("server_scan_threads", SCAN_EXECUTOR_THREADS,
"Server side scan handler threads that each executor has. There are 2 executors.");
builder.parameter("filter_probability", FILTER_PROBABILITY, "The chance that one of the long "
+ "filter scans will return any key it sees.");
builder.parameter("filter_yield_time", FILTER_YIELD_TIME, "The time in ms after which one of "
+ "the long filter scans will yield.");
builder.parameter("quick_scan_time", QUICK_SCAN_TIME, "The threshold time in ms for deciding "
+ "what is a quick vs long scan. Times less than this are sent to one executor and longer "
+ "times are sent to another.");
return builder.build();
}
private static long scan(String tableName, AccumuloClient c, byte[] row, byte[] fam,
Map<String,String> hints) throws TableNotFoundException {
long t1 = System.currentTimeMillis();
int count = 0;
try (Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY)) {
// scanner.setExecutionHints(hints);
scanner.setRange(Range.exact(new Text(row), new Text(fam)));
if (Iterables.size(scanner) != NUM_QUALS) {
throw new RuntimeException("bad count " + count);
}
}
return System.currentTimeMillis() - t1;
}
private long scan(String tableName, AccumuloClient c, AtomicBoolean stop, Map<String,String> hints)
throws TableNotFoundException {
long count = 0;
while (!stop.get()) {
try (Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY)) {
IteratorSetting is = new IteratorSetting(30, ProbabilityFilter.class);
is.addOption("probability", FILTER_PROBABILITY);
is.addOption("yieldTimeMS", FILTER_YIELD_TIME);
scanner.addScanIterator(is);
// scanner.setExecutionHints(hints);
for (Entry<Key,Value> entry : scanner) {
count++;
if (stop.get()) {
return count;
}
}
}
}
return count;
}
private LongSummaryStatistics runShortScans(Environment env, String tableName, int numScans)
throws InterruptedException, ExecutionException {
Map<String,String> execHints = ImmutableMap.of("executor", "se2");
Map<String,String> prioHints = ImmutableMap.of("priority", "1");
try (TestExecutor<Long> executor = new TestExecutor<>(NUM_SHORT_SCANS_THREADS)) {
Random rand = new Random();
for (int i = 0; i < numScans; i++) {
byte[] row = TestData.row(rand.nextInt(NUM_ROWS));
byte[] fam = TestData.fam(rand.nextInt(NUM_FAMS));
// scans have a 20% chance of getting dedicated thread pool and 80% chance of getting high
// priority
Map<String,String> hints = rand.nextInt(10) <= 1 ? execHints : prioHints;
executor.submit(() -> scan(tableName, env.getClient(), row, fam, hints));
}
return executor.stream().mapToLong(l -> l).summaryStatistics();
}
}
private TestExecutor<Long> startLongScans(Environment env, String tableName, AtomicBoolean stop) {
Map<String,String> hints = ImmutableMap.of("priority", "2");
TestExecutor<Long> longScans = new TestExecutor<>(NUM_LONG_SCANS);
for (int i = 0; i < NUM_LONG_SCANS; i++) {
longScans.submit(() -> scan(tableName, env.getClient(), stop, hints));
}
return longScans;
}
}