| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.accumulo.testing.core.performance.tests; |
| |
| import static java.util.stream.Collectors.toList; |
| |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.SortedSet; |
| import java.util.TreeSet; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.stream.Stream; |
| |
| import org.apache.accumulo.core.client.AccumuloClient; |
| import org.apache.accumulo.core.client.BatchWriter; |
| import org.apache.accumulo.core.client.BatchWriterConfig; |
| import org.apache.accumulo.core.client.Scanner; |
| import org.apache.accumulo.core.client.TableExistsException; |
| import org.apache.accumulo.core.client.TableNotFoundException; |
| import org.apache.accumulo.core.client.admin.CompactionConfig; |
| import org.apache.accumulo.core.client.admin.NewTableConfiguration; |
| import org.apache.accumulo.core.conf.Property; |
| import org.apache.accumulo.core.data.Mutation; |
| import org.apache.accumulo.core.data.Range; |
| import org.apache.accumulo.core.security.Authorizations; |
| import org.apache.accumulo.testing.core.performance.Environment; |
| import org.apache.accumulo.testing.core.performance.PerformanceTest; |
| import org.apache.accumulo.testing.core.performance.Report; |
| import org.apache.accumulo.testing.core.performance.Report.Builder; |
| import org.apache.accumulo.testing.core.performance.SystemConfiguration; |
| import org.apache.hadoop.io.Text; |
| |
| import com.google.common.base.Strings; |
| import com.google.common.collect.Iterables; |
| |
| public class RandomCachedLookupsPT implements PerformanceTest { |
| |
| private static final int NUM_LOOKUPS_PER_THREAD = 25000; |
| private static final int NUM_ROWS = 100000; |
| |
| @Override |
| public SystemConfiguration getSystemConfig() { |
| Map<String,String> siteCfg = new HashMap<>(); |
| |
| siteCfg.put(Property.TSERV_SCAN_MAX_OPENFILES.getKey(), "1000"); |
| siteCfg.put(Property.TSERV_MINTHREADS.getKey(), "256"); |
| siteCfg.put(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS.getKey(), "32"); |
| siteCfg.put(Property.TABLE_DURABILITY.getKey(), "flush"); |
| siteCfg.put(Property.TSERV_DATACACHE_SIZE.getKey(), "2G"); |
| siteCfg.put(Property.TSERV_INDEXCACHE_SIZE.getKey(), "1G"); |
| |
| // TODO it would be good if this could request a minimum amount of tserver memory |
| |
| return new SystemConfiguration().setAccumuloConfig(siteCfg); |
| } |
| |
| @Override |
| public Report runTest(Environment env) throws Exception { |
| Builder reportBuilder = Report.builder(); |
| |
| writeData(reportBuilder, env.getClient(), NUM_ROWS); |
| |
| long warmup = doLookups(env.getClient(), 32, NUM_LOOKUPS_PER_THREAD); |
| |
| long d1 = doLookups(env.getClient(), 1, NUM_LOOKUPS_PER_THREAD); |
| long d4 = doLookups(env.getClient(), 4, NUM_LOOKUPS_PER_THREAD); |
| long d8 = doLookups(env.getClient(), 8, NUM_LOOKUPS_PER_THREAD); |
| long d16 = doLookups(env.getClient(), 16, NUM_LOOKUPS_PER_THREAD); |
| long d32 = doLookups(env.getClient(), 32, NUM_LOOKUPS_PER_THREAD); |
| long d64 = doLookups(env.getClient(), 64, NUM_LOOKUPS_PER_THREAD); |
| long d128 = doLookups(env.getClient(), 128, NUM_LOOKUPS_PER_THREAD); |
| |
| reportBuilder.id("smalls"); |
| reportBuilder.description("Runs multiple threads each doing lots of small random scans. For this test data and index cache are enabled."); |
| reportBuilder.info("warmup", 32 * NUM_LOOKUPS_PER_THREAD, warmup, "Random lookup per sec for 32 threads"); |
| reportBuilder.info("lookups_1", NUM_LOOKUPS_PER_THREAD, d1, "Random lookup per sec rate for 1 thread"); |
| reportBuilder.info("lookups_4", 4 * NUM_LOOKUPS_PER_THREAD, d4, "Random lookup per sec rate for 4 threads"); |
| reportBuilder.info("lookups_8", 8 * NUM_LOOKUPS_PER_THREAD, d8, "Random lookup per sec rate for 8 threads"); |
| reportBuilder.info("lookups_16", 16 * NUM_LOOKUPS_PER_THREAD, d16, "Random lookup per sec rate for 16 threads"); |
| reportBuilder.info("lookups_32", 32 * NUM_LOOKUPS_PER_THREAD, d32, "Random lookup per sec rate for 32 threads"); |
| reportBuilder.info("lookups_64", 64 * NUM_LOOKUPS_PER_THREAD, d64, "Random lookup per sec rate for 64 threads"); |
| reportBuilder.info("lookups_128", 128 * NUM_LOOKUPS_PER_THREAD, d128, "Random lookup per sec rate for 128 threads"); |
| |
| reportBuilder.result("avg_1", d1 / (double) NUM_LOOKUPS_PER_THREAD, "Average milliseconds per lookup for 1 thread"); |
| reportBuilder.result("avg_4", d4 / (double) NUM_LOOKUPS_PER_THREAD, "Average milliseconds per lookup for 4 threads"); |
| reportBuilder.result("avg_8", d8 / (double) NUM_LOOKUPS_PER_THREAD, "Average milliseconds per lookup for 8 threads"); |
| reportBuilder.result("avg_16", d16 / (double) NUM_LOOKUPS_PER_THREAD, "Average milliseconds per lookup for 16 threads"); |
| reportBuilder.result("avg_32", d32 / (double) NUM_LOOKUPS_PER_THREAD, "Average milliseconds per lookup for 32 threads"); |
| reportBuilder.result("avg_64", d64 / (double) NUM_LOOKUPS_PER_THREAD, "Average milliseconds per lookup for 64 threads"); |
| reportBuilder.result("avg_128", d128 / (double) NUM_LOOKUPS_PER_THREAD, "Average milliseconds per lookup for 128 threads"); |
| |
| return reportBuilder.build(); |
| } |
| |
| public static void writeData(Builder reportBuilder, AccumuloClient client, int numRows) throws Exception { |
| |
| reportBuilder.parameter("rows", numRows, "Number of random rows written. Each row has 4 columns."); |
| |
| NewTableConfiguration ntc = new NewTableConfiguration(); |
| Map<String,String> props = new HashMap<>(); |
| props.put("table.file.compress.blocksize.index", "256K"); |
| props.put("table.file.compress.blocksize", "8K"); |
| props.put("table.cache.index.enable", "true"); |
| props.put("table.cache.block.enable", "true"); |
| ntc.setProperties(props); |
| |
| long t1 = System.currentTimeMillis(); |
| try { |
| client.tableOperations().create("scanpt", ntc); |
| } catch (TableExistsException tee) { |
| client.tableOperations().delete("scanpt"); |
| client.tableOperations().create("scanpt", ntc); |
| } |
| |
| long t2 = System.currentTimeMillis(); |
| |
| SortedSet<Text> partitionKeys = new TreeSet<>( |
| Stream.of("1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f").map(Text::new).collect(toList())); |
| client.tableOperations().addSplits("scanpt", partitionKeys); |
| |
| long t3 = System.currentTimeMillis(); |
| |
| BatchWriter bw = client.createBatchWriter("scanpt", new BatchWriterConfig()); |
| |
| Random rand = new Random(); |
| |
| for (int i = 0; i < numRows; i++) { |
| Mutation m = new Mutation(toHex(rand.nextLong())); |
| int c1 = rand.nextInt(1 << 10); |
| int c2 = rand.nextInt(1 << 10); |
| while (c1 == c2) |
| c2 = rand.nextInt(1 << 10); |
| int c3 = rand.nextInt(1 << 10); |
| while (c1 == c3 || c2 == c3) |
| c3 = rand.nextInt(1 << 10); |
| int c4 = rand.nextInt(1 << 10); |
| while (c1 == c4 || c2 == c4 || c3 == c4) |
| c4 = rand.nextInt(1 << 10); |
| m.put("fam1", toHex(c1, 3), toHex(rand.nextLong())); |
| m.put("fam1", toHex(c2, 3), toHex(rand.nextLong())); |
| m.put("fam1", toHex(c3, 3), toHex(rand.nextLong())); |
| m.put("fam1", toHex(c4, 3), toHex(rand.nextLong())); |
| bw.addMutation(m); |
| } |
| |
| bw.close(); |
| |
| long t4 = System.currentTimeMillis(); |
| |
| client.tableOperations().compact("scanpt", new CompactionConfig().setFlush(true).setWait(true)); |
| |
| long t5 = System.currentTimeMillis(); |
| |
| try (Scanner scanner = client.createScanner("scanpt", Authorizations.EMPTY)) { |
| // scan entire table to bring it into cache |
| Iterables.size(scanner); |
| } |
| |
| long t6 = System.currentTimeMillis(); |
| |
| reportBuilder.info("create", t2 - t1, "Time to create table in ms"); |
| reportBuilder.info("split", t3 - t2, "Time to split table in ms"); |
| reportBuilder.info("write", 4 * numRows, t4 - t3, "Rate to write data in entries/sec"); |
| reportBuilder.info("compact", 4 * numRows, t5 - t4, "Rate to compact table in entries/sec"); |
| reportBuilder.info("fullScan", 4 * numRows, t6 - t5, "Rate to do full table scan in entries/sec"); |
| } |
| |
| private static long doLookups(AccumuloClient client, int numThreads, int numScansPerThread) throws Exception { |
| |
| ExecutorService es = Executors.newFixedThreadPool(numThreads); |
| |
| List<Future<?>> futures = new ArrayList<>(numThreads); |
| |
| long t1 = System.currentTimeMillis(); |
| |
| for (int i = 0; i < numThreads; i++) { |
| futures.add(es.submit(() -> doLookups(client, numScansPerThread))); |
| } |
| |
| for (Future<?> future : futures) { |
| future.get(); |
| } |
| |
| long t2 = System.currentTimeMillis(); |
| |
| es.shutdown(); |
| |
| return t2 -t1; |
| } |
| |
| private static void doLookups(AccumuloClient client, int numScans) { |
| try { |
| Random rand = new Random(); |
| |
| for (int i = 0; i < numScans; i++) { |
| Scanner scanner = client.createScanner("scanpt", Authorizations.EMPTY); |
| |
| scanner.setRange(new Range(toHex(rand.nextLong()))); |
| |
| Iterables.size(scanner); |
| |
| scanner.close(); |
| } |
| } catch (TableNotFoundException e) { |
| e.printStackTrace(); |
| } |
| } |
| |
| public static String toHex(long l) { |
| String s = Long.toHexString(l); |
| return Strings.padStart(s, 16, '0'); |
| } |
| |
| public static String toHex(int i) { |
| String s = Integer.toHexString(i); |
| return Strings.padStart(s, 8, '0'); |
| } |
| |
| public static String toHex(int i, int len) { |
| String s = Integer.toHexString(i); |
| return Strings.padStart(s, len, '0'); |
| } |
| } |