blob: 0da405a52348c26f517c67be994b2ccf2f0e76b9 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.jackrabbit.oak.benchmark;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jcr.Credentials;
import javax.jcr.GuestCredentials;
import javax.jcr.Repository;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import javax.jcr.SimpleCredentials;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
import org.apache.commons.math.stat.descriptive.SynchronizedDescriptiveStatistics;
import org.apache.jackrabbit.oak.commons.Profiler;
import org.apache.jackrabbit.oak.fixture.RepositoryFixture;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Abstract base class for individual performance benchmarks.
public abstract class AbstractTest<T> extends Benchmark implements CSVResultGenerator {
* A random string to guarantee concurrently running tests don't overwrite
* each others changes (for example in a cluster).
* <p>
* The probability of duplicates, for 50 concurrent processes, is less than
* 1 in 1 million.
static final String TEST_ID = Integer.toHexString(new Random().nextInt());
static AtomicInteger nodeNameCounter = new AtomicInteger();
* A node name that is guarantee to be unique within the current JVM.
static String nextNodeName() {
return "n" + Integer.toHexString(nodeNameCounter.getAndIncrement());
private static final Credentials CREDENTIALS = new SimpleCredentials("admin", "admin".toCharArray());
private static final long WARMUP = TimeUnit.SECONDS.toMillis(Long.getLong("warmup", 5));
private static final long RUNTIME = TimeUnit.SECONDS.toMillis(Long.getLong("runtime", 60));
private static final boolean SKIP_WARMPUP = Boolean.getBoolean("skipWarmup");
private static final boolean PROFILE = Boolean.getBoolean("profile");
private static final Logger LOG = LoggerFactory.getLogger(AbstractTest.class);
private Repository repository;
private Credentials credentials;
private List<Session> sessions;
private List<Thread> threads;
private volatile boolean running;
private Profiler profiler;
private PrintStream out;
private RepositoryFixture currentFixture;
* <p>
* used to signal the {@link #runTest(int)} if stop running future test planned or not. If set
* to true, it will exit the loop not performing any more tests.
* </p>
* <p>
* useful when the running of the benchmark makes sense for as long as other processes didn't
* complete.
* </p>
* <p>
* Set this variable from within the benchmark itself by using {@link #issueHaltRequest(String)}
* </p>
* <p>
* <strong>it works only for concurrency level of 1 ({@code --concurrency 1} the
* default)</strong>
* </p>
private boolean haltRequested;
* If concurrency level is 1 ({@code --concurrency 1}, the default) it will issue a request to
* halt any future runs of a single benchmark. Useful when the benchmark makes sense only if run
* in conjunction of any other parallel operations.
* @param message an optional message that can be provided. It will logged at info level.
protected void issueHaltRequest(@Nullable final String message) {
String m = message == null ? "" : message;"halt requested. {}", m);
haltRequested = true;
* <p>
* this method will be called during the {@link #tearDown()} before the {@link #afterSuite()}.
* Override it if you have background processes you wish to stop.
* </p>
* <p>
* For example in case of big imports, the suite could be keep running for as long as the import
* is running, even if the tests are actually no longer executed.
* </p>
protected void issueHaltChildThreads() {
public void setPrintStream(PrintStream out) {
this.out = out;
protected static int getScale(int def) {
int scale = Integer.getInteger("scale", 0);
if (scale == 0) {
scale = def;
return scale;
* Prepares this performance benchmark.
* @param repository the repository to use
* @param credentials credentials of a user with write access
* @throws Exception if the benchmark can not be prepared
public void setUp(Repository repository, Credentials credentials)
throws Exception {
this.repository = repository;
this.credentials = credentials;
this.sessions = new LinkedList<Session>();
this.threads = new LinkedList<Thread>();
this.running = true;
haltRequested = false;
if (PROFILE) {
profiler = new Profiler().startCollecting();
public void run(Iterable<RepositoryFixture> fixtures) {
run(fixtures, null);
public void run(Iterable<RepositoryFixture> fixtures, List<Integer> concurrencyLevels) {
"# %-26.26s C min 10%% 50%% 90%% max N mean%s%n",
toString(), statsNamesJoined(false));
if (out != null) {
"# %-26.26s, C, min, 10%%, 50%%, 90%%, max, N mean%s%n",
toString(), statsNamesJoined(true));
for (RepositoryFixture fixture : fixtures) {
currentFixture = fixture;
try {
Repository[] cluster = createRepository(fixture);
try {
runTest(fixture, cluster[0], concurrencyLevels);
} finally {
} catch (Exception e) {
private void runTest(RepositoryFixture fixture, Repository repository, List<Integer> concurrencyLevels) throws Exception {
setUp(repository, CREDENTIALS);
try {
// Run a few iterations to warm up the system
long warmupEnd = System.currentTimeMillis() + WARMUP;
boolean stop = false;
while (System.currentTimeMillis() < warmupEnd && !stop) {
// we want to execute this at lease once. after that we consider the
// `haltRequested` flag.
stop = haltRequested;
if (concurrencyLevels == null || concurrencyLevels.isEmpty()) {
concurrencyLevels = Arrays.asList(1);
for (Integer concurrency: concurrencyLevels) {
// Run the test
DescriptiveStatistics statistics = runTest(concurrency);
Object[] defaultStats = new Object[] {
Object[] statsArg = ArrayUtils.addAll(defaultStats, statsValues());
String comment = comment();
if (comment != null) {
statsArg = ArrayUtils.add(statsArg, comment);
if (statistics.getN() > 0) {
"%-28.28s %6d %6.0f %6.0f %6.0f %6.0f %6.0f %6d %6.0f"+statsFormatsJoined(false)+"%n",
if (out != null) {
"%-28.28s, %6d, %6.0f, %6.0f, %6.0f, %6.0f, %6.0f, %6d, %6.0f"+statsFormatsJoined(false)+"%n",
} finally {
private String statsFormatsJoined(boolean commaSeparated) {
String comment = comment();
String[] formatPattern = statsFormats();
if (comment != null){
String commentPattern = commaSeparated ? "#%s" : " #%s";
formatPattern = (String[])ArrayUtils.add(formatPattern, commentPattern);
Joiner joiner = commaSeparated ? Joiner.on(',') : Joiner.on(" ");
return joiner.join(formatPattern);
private String statsNamesJoined(boolean commaSeparated) {
Joiner joiner = commaSeparated ? Joiner.on(',') : Joiner.on(" ");
String names = joiner.join(statsNames());
if (!commaSeparated) {
names = " " + names;
return names;
private class Executor extends Thread {
private final SynchronizedDescriptiveStatistics statistics;
private boolean running = true;
private Executor(String name, SynchronizedDescriptiveStatistics statistics) {
this.statistics = statistics;
public void run() {
try {
T context = prepareThreadExecutionContext();
try {
while (running) {
} finally {
} catch (Exception e) {
private DescriptiveStatistics runTest(int concurrencyLevel) throws Exception {
final SynchronizedDescriptiveStatistics statistics = new SynchronizedDescriptiveStatistics();
if (concurrencyLevel == 1) {
// Run test iterations, and capture the execution times
long runtimeEnd = System.currentTimeMillis() + RUNTIME;
boolean stop = false;
while (System.currentTimeMillis() < runtimeEnd && !stop) {
// we want to execute this at lease once. after that we consider the
// `haltRequested` flag.
stop = haltRequested;
} else {
List<Executor> threads = new LinkedList<Executor>();
for (int n=0; n<concurrencyLevel; n++) {
threads.add(new Executor("Background job " + n, statistics));
// start threads
for (Thread t: threads) {
//System.out.printf("Started %d threads%n", threads.size());
// Run test iterations, and capture the execution times
long runtimeEnd = System.currentTimeMillis() + RUNTIME;
while (System.currentTimeMillis() < runtimeEnd) {
Thread.sleep(runtimeEnd - System.currentTimeMillis());
// stop threads
for (Executor e: threads) {
e.running = false;
// wait for threads
for (Executor e: threads) {
return statistics;
* Executes a single iteration of this test.
* @return number of milliseconds spent in this iteration
* @throws Exception if an error occurs
public long execute() throws Exception {
try {
long start = System.currentTimeMillis();
long timeTaken = System.currentTimeMillis() - start;
LOG.trace("Time taken for test iteration run - " + timeTaken);
return timeTaken;
} finally {
private long execute(T executionContext) throws Exception {
if(executionContext == null){
return execute();
try {
long start = System.currentTimeMillis();
// System.out.println("execute " + this);
return System.currentTimeMillis() - start;
} finally {
* Cleans up after this performance benchmark.
* @throws Exception if the benchmark can not be cleaned up
public void tearDown() throws Exception {
this.running = false;
for (Thread thread : threads) {
if (profiler != null) {
profiler = null;
for (Session session : sessions) {
if (session.isLive()) {
this.threads = null;
this.sessions = null;
this.credentials = null;
this.repository = null;
* Names of additional stats which benchmark wants to be reported as part of
* default report. Add required padding to the names to account for stats value
* size
protected String[] statsNames(){
return new String[0];
* Format string used for additional stats as per {@link java.util.Formatter}
* Example [ "%6d" , "%6.0f" ]
protected String[] statsFormats(){
return new String[0];
* Stats values which needs to be included in the report
protected Object[] statsValues(){
return new Object[0];
protected String comment(){
return null;
* Run before any iterations of this test get executed. Subclasses can
* override this method to set up static test content.
* @throws Exception if an error occurs
protected void beforeSuite() throws Exception {
protected void beforeTest() throws Exception {
protected abstract void runTest() throws Exception;
protected void afterTest() throws Exception {
* Run after all iterations of this test have been executed. Subclasses can
* override this method to clean up static test content.
* @throws Exception if an error occurs
protected void afterSuite() throws Exception {
* Invoked before the thread starts. If the test later requires
* some thread local context e.g. JCR session per thread then sub
* classes can return a context instance. That instance would be
* passed as part of runTest call
* @return context instance to be used for runTest call for the
* current thread
protected T prepareThreadExecutionContext() throws Exception{
return null;
protected void disposeThreadExecutionContext(T context) throws Exception{
protected void afterTest(T executionContext) {
protected void runTest(T executionContext) throws Exception {
throw new IllegalStateException("If thread execution context is used then subclass must " +
"override this method");
protected void beforeTest(T executionContext) {
protected void failOnRepositoryVersions(String... versions)
throws RepositoryException {
String repositoryVersion =
for (String version : versions) {
if (repositoryVersion.startsWith(version)) {
throw new RepositoryException(
"Unable to run " + getClass().getName()
+ " on repository version " + version);
protected Repository getRepository() {
return repository;
protected Credentials getCredentials() {
return credentials;
protected RepositoryFixture getCurrentFixture() {
return currentFixture;
* Returns a new reader session that will be automatically closed once
* all the iterations of this test have been executed.
* @return reader session
protected Session loginAnonymous() {
return login(new GuestCredentials());
* Returns a new admin session that will be automatically closed once
* all the iterations of this test have been executed.
* @return admin session
protected Session loginAdministrative() {
return login(CREDENTIALS);
* Returns a new session for the given user
* that will be automatically closed once
* all the iterations of this test have been executed.
* @param credentials the user credentials
* @return user session
protected Session login(Credentials credentials) {
try {
Session session = repository.login(credentials);
synchronized (sessions) {
return session;
} catch (RepositoryException e) {
throw new RuntimeException(e);
protected Session systemLogin() {
return loginSubject(SystemSubject.INSTANCE);
protected Session loginSubject(@NotNull Subject subject) {
try {
return Subject.doAsPrivileged(subject, new PrivilegedExceptionAction<Session>() {
public Session run() throws Exception {
return getRepository().login(null, null);
}, null);
} catch (Exception e) {
throw new RuntimeException(e);
* Logs out and removes the session from the internal pool.
* @param session the session to logout
protected void logout(Session session) {
if (session != null) {
synchronized (sessions) {
* Returns a new writer session that will be automatically closed once
* all the iterations of this test have been executed.
* @return writer session
protected Session loginWriter() {
try {
LOG.trace("Creating new session");
Session session = repository.login(credentials);
synchronized (sessions) {
return session;
} catch (RepositoryException e) {
throw new RuntimeException(e);
* Adds a background thread that repeatedly executes the given job
* until all the iterations of this test have been executed.
* @param job background job
protected void addBackgroundJob(final Runnable job) {
Thread thread = new Thread("Background job " + job) {
public void run() {
while (running) {;
* Customize the repository creation process by custom fixture handling
protected Repository[] createRepository(RepositoryFixture fixture) throws Exception {
return fixture.setUpCluster(1);