blob: 4db109b51f7bb76e6f425034a9105a1746bf124c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.testing.performance.tests;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.client.ConditionalWriterConfig;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Condition;
import org.apache.accumulo.core.data.ConditionalMutation;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.testing.performance.Environment;
import org.apache.accumulo.testing.performance.PerformanceTest;
import org.apache.accumulo.testing.performance.Report;
import org.apache.accumulo.testing.performance.SystemConfiguration;
import org.apache.hadoop.io.Text;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.hash.Hashing;
public class ConditionalMutationsPT implements PerformanceTest {
@Override
public SystemConfiguration getSystemConfig() {
Map<String,String> siteCfg = new HashMap<>();
siteCfg.put(Property.TABLE_BLOCKCACHE_ENABLED.getKey(), "true");
return new SystemConfiguration().setAccumuloConfig(siteCfg);
}
@Override
public Report runTest(Environment env) throws Exception {
String tableName = "foo";
Report.Builder reportBuilder = Report.builder();
reportBuilder.id("Conditional Mutations");
reportBuilder.description("Runs conditional mutations tests with and without randomization,"
+ "setting of block size, and with and without randomization of batch writing/reading.");
runConditionalMutationsTest(env, tableName, reportBuilder);
runRandomizeConditionalMutationsTest(env, tableName, reportBuilder);
runRandomizeBatchScanAndWriteTest(env, tableName, reportBuilder);
runSetBlockSizeTest(env, tableName, reportBuilder);
return reportBuilder.build();
}
private static void runConditionalMutationsTest(Environment env, String tableName,
Report.Builder reportBuilder) throws Exception {
try {
env.getClient().tableOperations().delete(tableName);
} catch (TableNotFoundException e) {}
env.getClient().tableOperations().create(tableName);
ConditionalWriter cw = env.getClient().createConditionalWriter(tableName,
new ConditionalWriterConfig());
conditionalMutationsTime(cw, 0);
double rateSum = 0.0;
for (long i = 1; i < 20; i++) {
rateSum += conditionalMutationsTime(cw, i);
}
reportBuilder.result("avgRate: 1-19",
Double.parseDouble(new DecimalFormat("#0.00").format(rateSum / 20)),
"ConditionalMutationsTest: average rate (conditions/sec) to run sequence 1-19");
env.getClient().tableOperations().flush(tableName, null, null, true);
rateSum = 0.0;
for (long i = 20; i < 40; i++) {
rateSum += conditionalMutationsTime(cw, i);
}
reportBuilder.result("avgRate: 20-39",
Double.parseDouble(new DecimalFormat("#0.00").format(rateSum / 20)),
"ConditionalMutationsTest: average rate (conditions/sec) to run sequence 20-39");
}
public static double conditionalMutationsTime(ConditionalWriter cw, long seq) throws Exception {
ArrayList<ConditionalMutation> cmuts = new ArrayList<>();
for (int i = 0; i < 10000; i++) {
Condition cond = new Condition("meta", "seq");
if (seq != 0) {
cond.setValue("" + seq);
}
ConditionalMutation cm = new ConditionalMutation(String.format("r%07d", i), cond);
cm.put("meta", "seq", seq == 0 ? "1" : (seq + 1) + "");
cmuts.add(cm);
}
long t1 = System.nanoTime();
int count = 0;
Iterator<ConditionalWriter.Result> results = cw.write(cmuts.iterator());
while (results.hasNext()) {
ConditionalWriter.Result result = results.next();
if (ConditionalWriter.Status.ACCEPTED != result.getStatus()) {
throw new RuntimeException();
}
count++;
}
if (cmuts.size() != count) {
throw new RuntimeException();
}
long t2 = System.nanoTime();
return 10000.0 / TimeUnit.NANOSECONDS.toSeconds(t2 - t1);
}
private static void runRandomizeConditionalMutationsTest(Environment env, String tableName,
Report.Builder reportBuilder) throws Exception {
try {
env.getClient().tableOperations().delete(tableName);
} catch (TableNotFoundException e) {}
env.getClient().tableOperations().create(tableName);
ConditionalWriter cw = env.getClient().createConditionalWriter(tableName,
new ConditionalWriterConfig());
randomizeConditionalMutationsTime(cw, 0);
double rateSum = 0;
for (long i = 1; i < 20; i++) {
rateSum += randomizeConditionalMutationsTime(cw, i);
}
reportBuilder.result("avgRate: 1-19",
Double.parseDouble(new DecimalFormat("#0.00").format(rateSum / 20)),
"RandomizeConditionalMutationsTest: average rate (conditions/sec) to run sequence 1-19");
env.getClient().tableOperations().flush(tableName, null, null, true);
rateSum = 0;
for (long i = 20; i < 40; i++) {
rateSum += randomizeConditionalMutationsTime(cw, i);
}
reportBuilder.result("avgRate: 20-39",
Double.parseDouble(new DecimalFormat("#0.00").format(rateSum / 20)),
"RandomizeConditionalMutationsTest: average rate (conditions/sec) to run sequence 20-39");
}
private static double randomizeConditionalMutationsTime(ConditionalWriter cw, long seq)
throws Exception {
ArrayList<ConditionalMutation> cmuts = new ArrayList<>();
ConditionalMutation cm = new ConditionalMutation("r01");
ArrayList<Integer> ints = new ArrayList<>(10000);
for (int i = 0; i < 10000; i++) {
ints.add(i);
}
Collections.shuffle(ints);
for (int i = 0; i < 10000; i++) {
String qual = String.format("q%07d", ints.get(i));
Condition cond = new Condition("seq", qual);
if (seq != 0) {
cond.setValue("" + seq);
}
cm.addCondition(cond);
cm.put("seq", qual, seq == 0 ? "1" : (seq + 1) + "");
}
cmuts.add(cm);
long t1 = System.nanoTime();
int count = 0;
Iterator<ConditionalWriter.Result> results = cw.write(cmuts.iterator());
while (results.hasNext()) {
ConditionalWriter.Result result = results.next();
if (ConditionalWriter.Status.ACCEPTED != result.getStatus()) {
throw new RuntimeException();
}
count++;
}
if (cmuts.size() != count) {
throw new RuntimeException();
}
long t2 = System.nanoTime();
return 10000.0 / TimeUnit.NANOSECONDS.toSeconds(t2 - t1);
}
private static void runRandomizeBatchScanAndWriteTest(Environment env, String tableName,
Report.Builder reportBuilder) throws Exception {
try {
env.getClient().tableOperations().delete(tableName);
} catch (TableNotFoundException e) {}
env.getClient().tableOperations().create(tableName);
BatchWriter bw = env.getClient().createBatchWriter(tableName, new BatchWriterConfig());
BatchScanner bs = env.getClient().createBatchScanner(tableName, Authorizations.EMPTY, 1);
randomizeBatchWriteAndScanTime(bw, bs, 0);
double rateSum = 0;
for (long i = 1; i < 20; i++) {
rateSum += randomizeBatchWriteAndScanTime(bw, bs, i);
}
reportBuilder.result("avgRate: 1-19",
Double.parseDouble(new DecimalFormat("#0.00").format(rateSum / 20)),
"RandomizeBatchScanAndWriteTest: average rate (conditions/sec) to write and scan sequence 1-19");
env.getClient().tableOperations().flush(tableName, null, null, true);
rateSum = 0;
for (long i = 20; i < 40; i++) {
rateSum += randomizeBatchWriteAndScanTime(bw, bs, i);
}
reportBuilder.result("avgRate: 20-39",
Double.parseDouble(new DecimalFormat("#0.00").format(rateSum / 20)),
"RandomizeBatchScanAndWriteTest: average rate (conditions/sec) to write and scan sequence 20-39 post flush");
}
private static double randomizeBatchWriteAndScanTime(BatchWriter bw, BatchScanner bs, long seq)
throws Exception {
ArrayList<Range> ranges = new ArrayList<>();
Mutation cm = new Mutation("r01");
ArrayList<Integer> ints = new ArrayList<>(10000);
for (int i = 0; i < 10000; i++) {
ints.add(i);
}
Collections.shuffle(ints);
for (int i = 0; i < 10000; i++) {
String qual = String.format("q%07d", ints.get(i));
cm.put("seq", qual, seq == 0 ? "1" : (seq + 1) + "");
// look between existing values
ranges.add(Range.exact("r01", "seq", qual + ".a"));
}
bw.addMutation(cm);
bw.flush();
long t1 = System.nanoTime();
bs.setRanges(ranges);
int count = Iterables.size(bs);
if (0 != count) {
throw new RuntimeException("count = " + count);
}
long t2 = System.nanoTime();
return 10000.0 / TimeUnit.NANOSECONDS.toSeconds(t2 - t1);
}
private static void runSetBlockSizeTest(Environment env, String tableName,
Report.Builder reportBuilder) throws Exception {
try {
env.getClient().tableOperations().delete(tableName);
} catch (TableNotFoundException e) {}
env.getClient().tableOperations().create(tableName, new NewTableConfiguration().setProperties(
Collections.singletonMap(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "8K")));
env.getClient().tableOperations().setLocalityGroups(tableName,
ImmutableMap.of("lg1", ImmutableSet.of(new Text("ntfy"))));
int numRows = 100000;
int numCols = 100;
int numTest = 10;
writeData(env, tableName, numRows, numCols);
writeLgData(env, tableName, numRows);
ConditionalWriter cw = env.getClient().createConditionalWriter(tableName,
new ConditionalWriterConfig());
double rateSum = 0;
for (int i = 0; i < numTest; i++) {
rateSum += setBlockSizeTime(cw, numRows, numCols);
}
reportBuilder.result("avgRate1",
Double.parseDouble(new DecimalFormat("#0.00").format(rateSum / numTest)),
"SetBlockSizeTest: average rate in conditions/sec");
env.getClient().tableOperations().flush(tableName, null, null, true);
rateSum = 0;
for (int i = 0; i < numTest; i++) {
rateSum += setBlockSizeTime(cw, numRows, numCols);
}
reportBuilder.result("avgRate2",
Double.parseDouble(new DecimalFormat("#0.00").format(rateSum / numTest)),
"SetBlockSizeTest: average rate in conditions/sec post flush");
env.getClient().tableOperations().compact(tableName, null, null, true, true);
rateSum = 0;
for (int i = 0; i < numTest; i++) {
rateSum += setBlockSizeTime(cw, numRows, numCols);
}
reportBuilder.result("avgRate3",
Double.parseDouble(new DecimalFormat("#0.00").format(rateSum / 20)),
"SetBlockSizeTest: average rate in conditions/sec post compaction");
reportBuilder.parameter("numRows", numRows, "SetBlockSizeTest: The number of rows");
reportBuilder.parameter("numCols", numCols, "SetBlockSizeTest: The number of columns");
reportBuilder.parameter("numTest", numTest,
"SetBlockSizeTest: The number of tests ran per trial");
}
private static void writeData(Environment env, String tableName, int numRows, int numCols)
throws TableNotFoundException, MutationsRejectedException {
BatchWriter bw = env.getClient().createBatchWriter(tableName, new BatchWriterConfig());
for (int row = 0; row < numRows; row++) {
bw.addMutation(genRow(row, numCols));
}
bw.close();
}
private static Mutation genRow(int row, int numCols) {
String r = String.format("%08x", Math.abs(Hashing.murmur3_32().hashInt(row).asInt()));
Mutation m = new Mutation(r);
for (int col = 0; col < numCols; col++) {
String c = String.format("%04x",
Math.abs(Hashing.murmur3_32().hashInt(col).asInt() & 0xffff));
m.put("data", c, "1");
}
return m;
}
private static void writeLgData(Environment env, String tableName, int numRows)
throws TableNotFoundException, MutationsRejectedException {
BatchWriter bw = env.getClient().createBatchWriter(tableName, new BatchWriterConfig());
numRows = numRows * 10;
for (int row = 0; row < numRows; row++) {
String r = String.format("%08x", Math.abs(Hashing.murmur3_32().hashInt(row).asInt()));
Mutation m = new Mutation(r);
m.put("ntfy", "absfasf3", "");
bw.addMutation(m);
}
bw.close();
}
private static String randRow(Random rand, int numRows) {
int row = rand.nextInt(numRows);
return String.format("%08x", Math.abs(Hashing.murmur3_32().hashInt(row).asInt()));
}
private static Collection<String> randCols(Random rand, int num, int numCols) {
HashSet<String> cols = new HashSet<>();
while (cols.size() < num) {
int col = rand.nextInt(numCols);
String c = String.format("%04x",
Math.abs(Hashing.murmur3_32().hashInt(col).asInt() & 0xffff));
cols.add(c);
}
return cols;
}
private static double setBlockSizeTime(ConditionalWriter cw, int numRows, int numCols)
throws Exception {
Random rand = new Random();
ArrayList<ConditionalMutation> cmuts = new ArrayList<>();
for (int row = 0; row < 3000; row++) {
ConditionalMutation cm = new ConditionalMutation(randRow(rand, numRows));
for (String col : randCols(rand, 10, numCols)) {
cm.addCondition(new Condition("data", col).setValue("1"));
cm.put("data", col, "1");
}
cmuts.add(cm);
}
long t1 = System.nanoTime();
int count = 0;
Iterator<ConditionalWriter.Result> results = cw.write(cmuts.iterator());
while (results.hasNext()) {
ConditionalWriter.Result result = results.next();
if (ConditionalWriter.Status.ACCEPTED != result.getStatus()) {
throw new RuntimeException();
}
count++;
}
if (cmuts.size() != count) {
throw new RuntimeException();
}
long t2 = System.nanoTime();
return 30000.0 / TimeUnit.NANOSECONDS.toSeconds(t2 - t1);
}
}