Fixes #59 Create herding performance test (#93)
* Add HeardingPT.java in order to test herding performance.
* Use Result.result() instead of Result.parameter() when reporting
resulting values so that they will be put in the appropriate section
when the JSON files are written.
Related: #53
diff --git a/src/main/java/org/apache/accumulo/testing/performance/tests/HerdingPT.java b/src/main/java/org/apache/accumulo/testing/performance/tests/HerdingPT.java
new file mode 100644
index 0000000..9f644fd
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/performance/tests/HerdingPT.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.testing.performance.tests;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.testing.performance.Environment;
+import org.apache.accumulo.testing.performance.PerformanceTest;
+import org.apache.accumulo.testing.performance.Report;
+import org.apache.accumulo.testing.performance.SystemConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HerdingPT implements PerformanceTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HerdingPT.class);
+
+ private static final byte[] COL_FAM = "pinky".getBytes();
+
+ private static final int NUM_ROWS = 1_000_000;
+ private static final int NUM_COLS = 10;
+ private static final int NUM_THREADS = 32;
+ private static final int NUM_LOAD_ATTEMPTS = 1_000;
+
+ private static final String TABLE_NAME = "herd";
+ private static final String DESCRIPTION = "Test herding performance with " + NUM_THREADS
+ + " threads attempting to " + "simultaneously load the same block " + NUM_LOAD_ATTEMPTS
+ + " times.";
+
+ private final Random random = new Random();
+
+ @Override
+ public SystemConfiguration getSystemConfig() {
+ Map<String,String> config = new HashMap<>();
+
+ config.put(Property.TABLE_BLOCKCACHE_ENABLED.getKey(), "true");
+
+ return new SystemConfiguration().setAccumuloConfig(config);
+ }
+
+ @Override
+ public Report runTest(final Environment env) throws Exception {
+ Report.Builder reportBuilder = Report.builder().id("herdingPT").description(DESCRIPTION)
+ .parameter("table_name", TABLE_NAME, "The name of the table.")
+ .parameter("num_threads", NUM_THREADS, "The number of threads.")
+ .parameter("num_load_attempts", NUM_LOAD_ATTEMPTS,
+ "The number of times the threads will attempt to load the same block.")
+ .parameter("num_rows", NUM_ROWS, "The number of rows that will be loaded into the table.");
+
+ final AccumuloClient client = env.getClient();
+ initTable(client);
+ long herdTime = getHerdingDuration(client);
+ reportBuilder.result("herd_time", herdTime, "The time (in ms) it took herding to complete.");
+
+ return reportBuilder.build();
+ }
+
+ private void initTable(final AccumuloClient client) throws TableExistsException,
+ AccumuloSecurityException, AccumuloException, TableNotFoundException {
+ client.tableOperations().create(TABLE_NAME);
+ writeEntries(client);
+ client.tableOperations().flush(TABLE_NAME, null, null, true);
+ }
+
+ private void writeEntries(final AccumuloClient client)
+ throws TableNotFoundException, MutationsRejectedException {
+ try (BatchWriter bw = client.createBatchWriter(TABLE_NAME)) {
+ for (int row = 0; row < NUM_ROWS; row++) {
+ bw.addMutation(createMutation(row));
+ }
+ }
+ }
+
+ private Mutation createMutation(final int rowNum) {
+ byte[] row = toZeroPaddedString(rowNum, 8);
+ Mutation mutation = new Mutation(row);
+ for (int col = 0; col < NUM_COLS; col++) {
+ byte[] qualifer = toZeroPaddedString(col, 4);
+ byte[] value = new byte[32];
+ random.nextBytes(value);
+ mutation.put(COL_FAM, qualifer, value);
+ }
+ return mutation;
+ }
+
+ private long getHerdingDuration(final AccumuloClient client)
+ throws ExecutionException, InterruptedException {
+ final ExecutorService pool = Executors.newFixedThreadPool(NUM_THREADS);
+ long start = System.currentTimeMillis();
+ final List<Future<?>> futures = addThreadsToPool(pool, client);
+ for (Future<?> future : futures) {
+ future.get();
+ }
+ long duration = System.currentTimeMillis() - start;
+ pool.shutdown();
+ return duration;
+ }
+
+ private List<Future<?>> addThreadsToPool(final ExecutorService pool,
+ final AccumuloClient client) {
+ final CyclicBarrier barrier = new CyclicBarrier(NUM_THREADS);
+ List<Future<?>> futures = new ArrayList<>();
+ for (int i = 0; i < NUM_THREADS; i++) {
+ futures.add(pool.submit(createThread(client, barrier)));
+ }
+ return futures;
+ }
+
+ private Runnable createThread(final AccumuloClient client, final CyclicBarrier barrier) {
+ return () -> {
+ try {
+ Scanner scanner = client.createScanner(TABLE_NAME, Authorizations.EMPTY);
+ for (int numAttempt = 0; numAttempt < NUM_LOAD_ATTEMPTS; numAttempt++) {
+ barrier.await();
+ byte[] row = toZeroPaddedString(numAttempt * NUM_LOAD_ATTEMPTS, 8);
+ scanner.setRange(Range.exact(new String(row)));
+ // Scan each entry in the range, but don't do anything with them.
+ scanner.forEach(e -> {});
+ }
+ } catch (Exception e) {
+ LOG.error("Error occurred during scan", e);
+ }
+ };
+ }
+
+ private byte[] toZeroPaddedString(final long num, final int width) {
+ return new byte[Math.max(Long.toString(num, 16).length(), width)];
+ }
+}