| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.accumulo.testing.performance.tests; |
| |
| import java.util.HashMap; |
| import java.util.LongSummaryStatistics; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Random; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.apache.accumulo.core.client.AccumuloClient; |
| import org.apache.accumulo.core.client.IteratorSetting; |
| import org.apache.accumulo.core.client.Scanner; |
| import org.apache.accumulo.core.client.TableNotFoundException; |
| import org.apache.accumulo.core.client.admin.NewTableConfiguration; |
| import org.apache.accumulo.core.conf.Property; |
| import org.apache.accumulo.core.data.Key; |
| import org.apache.accumulo.core.data.Range; |
| import org.apache.accumulo.core.data.Value; |
| import org.apache.accumulo.core.security.Authorizations; |
| import org.apache.accumulo.testing.performance.Environment; |
| import org.apache.accumulo.testing.performance.PerformanceTest; |
| import org.apache.accumulo.testing.performance.Report; |
| import org.apache.accumulo.testing.performance.SystemConfiguration; |
| import org.apache.accumulo.testing.performance.util.TestData; |
| import org.apache.accumulo.testing.performance.util.TestExecutor; |
| import org.apache.hadoop.io.Text; |
| |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.Iterables; |
| |
| public class YieldingScanExecutorPT implements PerformanceTest { |
| |
| private static final int NUM_SHORT_SCANS_THREADS = 5; |
| private static final int NUM_LONG_SCANS = 50; |
| |
| private static final int NUM_ROWS = 100000; |
| private static final int NUM_FAMS = 10; |
| private static final int NUM_QUALS = 10; |
| |
| private static final String SCAN_EXECUTOR_THREADS = "2"; |
| |
| private static final String TEST_DESC = "Scan Executor Test. Test running lots of short scans " |
| + "while many filters that return little data are running in the background. If nothing is " |
| + "done these filters will prevent the short scans from running. This test configures " |
| + "Accumulo so that the short scans should get a chance to run. It does three things to " |
| + "facilitate the short scans : the filters yield, there a two scan executors, and a scan " |
| + "dispatcher sends long running scans to the second executor. If yielding and dispatching " |
| + "are working correctly then the short scans should have very short response times. This " |
| + "happens because the filters should end up in a separate thread pool than the short scan."; |
| |
| private static final String FILTER_PROBABILITY = "0.000001"; |
| private static final String FILTER_YIELD_TIME = "1000"; |
| |
| private static final String QUICK_SCAN_TIME = "500"; |
| |
| @Override |
| public SystemConfiguration getSystemConfig() { |
| Map<String,String> siteCfg = new HashMap<>(); |
| |
| siteCfg.put(Property.TSERV_SCAN_MAX_OPENFILES.getKey(), "200"); |
| siteCfg.put(Property.TSERV_MINTHREADS.getKey(), "200"); |
| siteCfg.put(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + "se1.threads", |
| SCAN_EXECUTOR_THREADS); |
| siteCfg.put(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + "se2.threads", |
| SCAN_EXECUTOR_THREADS); |
| return new SystemConfiguration().setAccumuloConfig(siteCfg); |
| } |
| |
| @Override |
| public Report runTest(Environment env) throws Exception { |
| |
| String tableName = "scept"; |
| |
| Map<String,String> props = new HashMap<>(); |
| // set up a scan dispatcher that send long runnning scans (> 500ms) to the second executor |
| props.put(Property.TABLE_SCAN_DISPATCHER.getKey(), TimedScanDispatcher.class.getName()); |
| props.put(Property.TABLE_SCAN_DISPATCHER_OPTS.getKey() + "quick.executor", "se1"); |
| props.put(Property.TABLE_SCAN_DISPATCHER_OPTS.getKey() + "quick.time.ms", QUICK_SCAN_TIME); |
| props.put(Property.TABLE_SCAN_DISPATCHER_OPTS.getKey() + "long.executor", "se2"); |
| |
| env.getClient().tableOperations().create(tableName, |
| new NewTableConfiguration().setProperties(props)); |
| |
| long t1 = System.currentTimeMillis(); |
| TestData.generate(env.getClient(), tableName, NUM_ROWS, NUM_FAMS, NUM_QUALS); |
| long t2 = System.currentTimeMillis(); |
| env.getClient().tableOperations().compact(tableName, null, null, true, true); |
| long t3 = System.currentTimeMillis(); |
| |
| AtomicBoolean stop = new AtomicBoolean(false); |
| |
| TestExecutor<Long> longScans = startLongScans(env, tableName, stop); |
| |
| LongSummaryStatistics shortStats1 = runShortScans(env, tableName, 50000); |
| LongSummaryStatistics shortStats2 = runShortScans(env, tableName, 100000); |
| |
| stop.set(true); |
| long t4 = System.currentTimeMillis(); |
| |
| LongSummaryStatistics longStats = longScans.stream().mapToLong(l -> l).summaryStatistics(); |
| |
| longScans.close(); |
| |
| Report.Builder builder = Report.builder(); |
| |
| builder.id("yfexec").description(TEST_DESC); |
| builder.info("write", NUM_ROWS * NUM_FAMS * NUM_QUALS, t2 - t1, "Data write rate entries/sec "); |
| builder.info("compact", NUM_ROWS * NUM_FAMS * NUM_QUALS, t3 - t2, "Compact rate entries/sec "); |
| builder.info("short_times1", shortStats1, "Times in ms for each short scan. First run."); |
| builder.info("short_times2", shortStats2, "Times in ms for each short scan. Second run."); |
| builder.result("short", shortStats2.getAverage(), |
| "Average times in ms for short scans from 2nd run."); |
| builder.info("long_counts", longStats, "Entries read by each of the filter threads"); |
| builder.info("long", longStats.getSum(), (t4 - t3), |
| "Combined rate in entries/second of all long scans. This should be low but non-zero."); |
| builder.parameter("short_threads", NUM_SHORT_SCANS_THREADS, "Threads used to run short scans."); |
| builder.parameter("long_threads", NUM_LONG_SCANS, |
| "Threads running long fileter scans. Each thread repeatedly scans entire table for " |
| + "duration of test randomly returning a few of the keys."); |
| builder.parameter("rows", NUM_ROWS, "Rows in test table"); |
| builder.parameter("familes", NUM_FAMS, "Families per row in test table"); |
| builder.parameter("qualifiers", NUM_QUALS, "Qualifiers per family in test table"); |
| builder.parameter("server_scan_threads", SCAN_EXECUTOR_THREADS, |
| "Server side scan handler threads that each executor has. There are 2 executors."); |
| |
| builder.parameter("filter_probability", FILTER_PROBABILITY, "The chance that one of the long " |
| + "filter scans will return any key it sees."); |
| builder.parameter("filter_yield_time", FILTER_YIELD_TIME, "The time in ms after which one of " |
| + "the long filter scans will yield."); |
| builder.parameter("quick_scan_time", QUICK_SCAN_TIME, "The threshold time in ms for deciding " |
| + "what is a quick vs long scan. Times less than this are sent to one executor and longer " |
| + "times are sent to another."); |
| |
| return builder.build(); |
| } |
| |
| private static long scan(String tableName, AccumuloClient c, byte[] row, byte[] fam, |
| Map<String,String> hints) throws TableNotFoundException { |
| long t1 = System.currentTimeMillis(); |
| int count = 0; |
| try (Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY)) { |
| // scanner.setExecutionHints(hints); |
| scanner.setRange(Range.exact(new Text(row), new Text(fam))); |
| if (Iterables.size(scanner) != NUM_QUALS) { |
| throw new RuntimeException("bad count " + count); |
| } |
| } |
| |
| return System.currentTimeMillis() - t1; |
| } |
| |
| private long scan(String tableName, AccumuloClient c, AtomicBoolean stop, Map<String,String> hints) |
| throws TableNotFoundException { |
| long count = 0; |
| while (!stop.get()) { |
| try (Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY)) { |
| |
| IteratorSetting is = new IteratorSetting(30, ProbabilityFilter.class); |
| is.addOption("probability", FILTER_PROBABILITY); |
| is.addOption("yieldTimeMS", FILTER_YIELD_TIME); |
| |
| scanner.addScanIterator(is); |
| |
| // scanner.setExecutionHints(hints); |
| for (Entry<Key,Value> entry : scanner) { |
| count++; |
| if (stop.get()) { |
| return count; |
| } |
| } |
| } |
| } |
| |
| return count; |
| } |
| |
| private LongSummaryStatistics runShortScans(Environment env, String tableName, int numScans) |
| throws InterruptedException, ExecutionException { |
| |
| Map<String,String> execHints = ImmutableMap.of("executor", "se2"); |
| Map<String,String> prioHints = ImmutableMap.of("priority", "1"); |
| |
| try (TestExecutor<Long> executor = new TestExecutor<>(NUM_SHORT_SCANS_THREADS)) { |
| Random rand = new Random(); |
| |
| for (int i = 0; i < numScans; i++) { |
| byte[] row = TestData.row(rand.nextInt(NUM_ROWS)); |
| byte[] fam = TestData.fam(rand.nextInt(NUM_FAMS)); |
| // scans have a 20% chance of getting dedicated thread pool and 80% chance of getting high |
| // priority |
| Map<String,String> hints = rand.nextInt(10) <= 1 ? execHints : prioHints; |
| executor.submit(() -> scan(tableName, env.getClient(), row, fam, hints)); |
| } |
| |
| return executor.stream().mapToLong(l -> l).summaryStatistics(); |
| } |
| } |
| |
| private TestExecutor<Long> startLongScans(Environment env, String tableName, AtomicBoolean stop) { |
| Map<String,String> hints = ImmutableMap.of("priority", "2"); |
| |
| TestExecutor<Long> longScans = new TestExecutor<>(NUM_LONG_SCANS); |
| |
| for (int i = 0; i < NUM_LONG_SCANS; i++) { |
| longScans.submit(() -> scan(tableName, env.getClient(), stop, hints)); |
| } |
| return longScans; |
| } |
| } |