blob: db972f06ad1b582ef74c37180815508d4c52e431 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.test.junit.rules;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.base.Stopwatch;
import org.junit.rules.ErrorCollector;
import org.apache.geode.test.junit.rules.serializable.SerializableExternalResource;
/**
* A rule for testing using multiple threads. This rule should not be used as a class rule. This
* rule accepts threads to be run, runs them in series or parallel, and throws exceptions if any of
* the threads threw an unexpected exception or returned an incorrect value.
*
* Basic Steps for Usage:
* 1. Declare the rule as a test rule with the @Rule annotation
* 2. Create a Callable, or create a Runnable and use toCallable(runnable) to convert it
* 3. Add the Callable to the rule using add(callable)
* 4. (Optional) Add expectations for the outcome of running the Callable (values or exceptions),
* and/or any repetition of threads (for N iterations or for a duration)
* 5. Run all submitted callables in series or in parallel
* 6. Before re-executing within the same test, use clear() to empty the list of callables and
* errors
*
* Example Usage:
*
* <pre>
* <code>
*
* {@literal @}Rule
* public ConcurrencyRule concurrencyRule = new ConcurrencyRule(); // step 1
*
* {@literal @}Test
* public void testName() {
* Callable<String> c1 = () -> {
* return "some Value";
* }; // step 2
*
* concurrencyRule.add(c1).expectValue("some Value").repeatForIterations(3); // steps 3&4
* concurrencyRule.executeInParallel(); // step 5
* concurrencyRule.clear(); // step 6
* // keep using the rule as above, or ConcurrencyRule.after() will be called for cleanup
* }
*
* </code>
* </pre>
*/
public class ConcurrencyRule extends SerializableExternalResource {
private final ExecutorService threadPool = Executors.newCachedThreadPool();
private final Collection<ConcurrentOperation> toInvoke;
private final Collection<Future<Void>> futures;
private ProtectedErrorCollector errorCollector;
private Duration timeout;
private final AtomicBoolean allThreadsExecuted = new AtomicBoolean(false);
/**
* A default constructor that sets the timeout to a default of 30 seconds
*/
public ConcurrencyRule() {
this(Duration.ofSeconds(300));
}
/**
* A non-default constructor that sets the timeout to the given duration
*
* @param timeout the maximum duration that threads should execute, given that the submitted
* tasks respond to cancellation signals.
*/
public ConcurrencyRule(Duration timeout) {
toInvoke = new ArrayList<>();
futures = new ArrayList<>();
this.timeout = timeout;
errorCollector = new ProtectedErrorCollector();
allThreadsExecuted.set(true);
}
@Override
protected void after() throws IllegalStateException {
if (allThreadsExecuted.get() == Boolean.FALSE) {
throw new IllegalStateException("Threads have been added that have not been executed.");
}
clear();
stopThreadPool();
}
/**
* Adds a Callable to the concurrency rule to be run. Expectations for return values and thrown
* exceptions, as well as any repetition of the thread should be added using ConcurrentOperation.
*
* @param callable a Callable to be run. If the Callable throws an exception that is not expected
* it will be thrown up to the test that the threads are run from.
* @return concurrentOperation, the ConcurrentOperation that has been added to the rule
*/
public <T> ConcurrentOperation<T> add(Callable<T> callable) {
ConcurrentOperation<T> concurrentOperation = new ConcurrentOperation(callable);
toInvoke.add(concurrentOperation);
allThreadsExecuted.set(false);
return concurrentOperation;
}
/**
* Runs all callables in the rule in parallel and fails if threads' conditions were not met. Each
* thread runs until timeout has been reached. This method will not return until all
* threads have completed or been cancelled.
*
* @throws InterruptedException if interrupted before timeout
* @throws RuntimeException with cause of MultipleFailureException with a list of failures
* including AssertionErrors for all threads whose expectations were not met (if there are
* multiple failures).
* @throws AssertionError if a single thread's expectations are not met
* @throws Exception if a thread throws an unexpected exception
*/
public void executeInParallel() {
for (ConcurrentOperation op : toInvoke) {
futures.add(threadPool.submit(op));
}
allThreadsExecuted.set(true);
awaitFutures();
errorCollector.verify();
}
/**
* Runs all callables in the rule in the order that they were added and fail if threads'
* conditions
* are not met. Each thread runs until timeout is reached. This method will not return until all
* threads have completed or been cancelled.
*
* @throws RuntimeException with cause of MultipleFailureException with a list of failures
* including AssertionErrors for all threads whose expectations were not met (if there are
* multiple failures).
* @throws AssertionError if a single thread's expectations are not met
* @throws Exception if a thread throws an unexpected exception
*/
public void executeInSeries() {
for (ConcurrentOperation op : toInvoke) {
awaitFuture(threadPool.submit(op));
}
allThreadsExecuted.set(true);
errorCollector.verify();
}
/**
* Clears the lists of callables, futures, and errors. Use between calls to execute methods to
* avoid rerunning and rethrowing callables from the previous executions.
*/
public void clear() {
toInvoke.clear();
futures.clear();
errorCollector = new ProtectedErrorCollector();
allThreadsExecuted.set(true);
}
/**
* Shuts down the thread pool. Does not need to be called if the rule's after is called
*/
public void stopThreadPool() {
threadPool.shutdownNow();
}
/**
* Set the timeout for the threads. After the timeout is reached, the threads will be interrupted
* and will throw a CancellationException
*/
public void setTimeout(Duration timeout) {
this.timeout = timeout;
}
/**
* Turns a Runnable into a Void Callable in order to submit it to the rule for execution
*
* @param runnable a Runnable to convert to a Callable
* @return a Callable with Void return type
*/
public static Callable<Void> toCallable(Runnable runnable) {
return () -> {
runnable.run();
return null;
};
}
private void awaitFutures() {
for (Future<Void> future : futures) {
awaitFuture(future);
}
clearCompletedFutures();
}
private void awaitFuture(Future<Void> future) {
try {
future.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
errorCollector.addError(e.getCause());
} catch (Exception e) {
errorCollector.addError(e);
}
}
private static Object readField(final Class targetClass, final Object targetInstance,
final String fieldName) {
try {
Field field = targetClass.getDeclaredField(fieldName);
field.setAccessible(true);
return field.get(targetInstance);
} catch (IllegalAccessException | NoSuchFieldException e) {
throw new Error(e);
}
}
private void clearCompletedFutures() {
futures.removeIf(future -> future.isCancelled() || future.isDone());
}
public static class ConcurrentOperation<T> implements Callable<Void> {
private final int DEFAULT_ITERATIONS = 1;
private final Duration DEFAULT_DURATION = Duration.ofSeconds(300);
private Callable<T> callable;
private int iterations;
private Duration duration;
private Boolean expectedResultIsSet;
private T expectedValue;
private Boolean eventualExpectedValueSet;
private Throwable expectedException;
private Class expectedExceptionType;
private Class expectedExceptionCauseType;
public ConcurrentOperation() {
callable = null;
iterations = DEFAULT_ITERATIONS;
duration = DEFAULT_DURATION;
this.expectedResultIsSet = false;
expectedException = null;
expectedExceptionType = null;
expectedExceptionCauseType = null;
expectedValue = null;
eventualExpectedValueSet = false;
}
public ConcurrentOperation(Callable<T> toAdd) {
this.callable = toAdd;
iterations = DEFAULT_ITERATIONS;
duration = DEFAULT_DURATION;
this.expectedResultIsSet = false;
expectedException = null;
expectedExceptionType = null;
expectedExceptionCauseType = null;
expectedValue = null;
eventualExpectedValueSet = false;
}
/**
* Sets a callable to be repeated the given number of times. If there is also an expected result
* for the callable, that expectation must be met for each iteration of the callable.
*
* @param iterations the number of times to run the callable
* @return this, the ConcurrentOperation (containing a callable) that has been set to repeat
*/
public ConcurrentOperation repeatForIterations(int iterations) {
if (!duration.equals(DEFAULT_DURATION)) {
throw new IllegalArgumentException("Specify only Duration or Iterations");
}
this.iterations = iterations;
return this;
}
/**
* Sets a callable to be repeated for some duration. If there is also an expected result
* for the callable, that expectation must be met for each iteration of the callable. The
* callable will not be restarted after the duration has been met, however the current
* iteration will be allowed to continue until the timeout is reached.
*
* @param duration the Duration for which to repeat the callable
* @return this, the ConcurrentOperation (containing a callable) that has been set to repeat
*/
public ConcurrentOperation repeatForDuration(Duration duration) {
if (iterations != DEFAULT_ITERATIONS) {
throw new IllegalArgumentException("Specify only Duration or Iterations");
}
this.duration = duration;
return this;
}
/**
* Sets the expected result of running the thread to be a value eventually matching the given
* value
*
* @param expectedValue the value expected to be returned from the thread. The value must
* implement equals
* @return this, the ConcurrentOperation (containing a callable) that has been set to repeat
*/
public ConcurrentOperation repeatUntilValue(T expectedValue) {
if (this.expectedResultIsSet) {
throw new IllegalArgumentException("Specify only one expected outcome.");
}
this.eventualExpectedValueSet = true;
this.expectedValue = expectedValue;
this.expectedResultIsSet = true;
return this;
}
/**
* Sets the expected result of running the thread to be an exception matching the given
* exception
*
* @param expectedException the expected exception. If the message is null, the message of the
* thrown exception will not be checked, however if the message is empty, the thrown
* exception
* must also have a null or empty message.
* @return this, the ConcurrentOperation (containing a callable) that has been set to repeat
*/
public ConcurrentOperation expectException(Throwable expectedException) {
if (expectedResultIsSet) {
throw new IllegalArgumentException("Specify only one expected outcome.");
}
this.expectedException = expectedException;
this.expectedResultIsSet = true;
return this;
}
/**
* Sets the expected result of running the thread to be an exception that is an instance of the
* given class
*
* @param expectedExceptionType the class of the expected exception. Causes will not be checked.
* @return this, the ConcurrentOperation (containing a callable) that has been set to repeat
*/
public ConcurrentOperation expectExceptionType(Class expectedExceptionType) {
if (expectedException != null || expectedValue != null) {
throw new IllegalArgumentException("Specify only one expected outcome.");
}
this.expectedExceptionType = expectedExceptionType;
this.expectedResultIsSet = true;
return this;
}
/**
* Sets the expected result of running the thread to be an exception with a cause that is an
* instance of the given class
*
* @param expectedExceptionCauseType the class of the expected exception cause. The exception
* itself will not be checked.
* @return this, the ConcurrentOperation (containing a callable) that has been set to repeat
*/
public ConcurrentOperation expectExceptionCauseType(Class expectedExceptionCauseType) {
if (expectedException != null || expectedValue != null) {
throw new IllegalArgumentException("Specify only one expected outcome.");
}
this.expectedExceptionCauseType = expectedExceptionCauseType;
this.expectedResultIsSet = true;
return this;
}
/**
* Sets the expected result of running the thread to be a value matching the given value
*
* @param expectedValue the value expected to be returned from the thread. The value must
* implement equals
* @return this, the ConcurrentOperation (containing a callable) that has been set to repeat
*/
public ConcurrentOperation expectValue(T expectedValue) {
if (this.expectedResultIsSet) {
throw new IllegalArgumentException("Specify only one expected outcome.");
}
this.expectedValue = expectedValue;
this.expectedResultIsSet = true;
return this;
}
@Override
public Void call() throws Exception {
Stopwatch timeRun = duration != DEFAULT_DURATION ? Stopwatch.createStarted() : null;
int numRuns = 0;
do {
numRuns++;
callAndValidate();
} while ((iterations != DEFAULT_ITERATIONS && numRuns < iterations) ||
(duration != DEFAULT_DURATION
&& timeRun.elapsed(SECONDS) <= duration.getSeconds()));
return null;
}
private void callAndValidate() throws Exception {
Exception exception = null;
T retVal;
int numRuns = 0;
Stopwatch timeRun = Stopwatch.createStarted();
try {
do {
numRuns++;
retVal = this.callable.call();
if (eventualExpectedValueSet && retVal.equals(expectedValue)) {
return;
}
} while (eventualExpectedValueSet
&& ((iterations != DEFAULT_ITERATIONS && numRuns < iterations)
|| (timeRun.elapsed(SECONDS) <= duration.getSeconds())));
if (this.expectedValue != null) {
assertThat(retVal).isEqualTo(this.expectedValue);
}
} catch (Exception e) {
exception = e;
}
if (this.expectedExceptionCauseType != null && this.expectedExceptionType != null) {
assertThat(exception).isInstanceOf(this.expectedExceptionType)
.hasCauseInstanceOf(this.expectedExceptionCauseType);
} else if (this.expectedExceptionType != null) {
assertThat(exception).isInstanceOf(this.expectedExceptionType);
} else if (this.expectedExceptionCauseType != null) {
assertThat(exception).hasCauseInstanceOf(this.expectedExceptionCauseType);
} else if (this.expectedException != null) {
checkThrown(exception, this.expectedException);
} else {
if (exception != null) {
throw exception; // rethrow if we weren't expecting any exception and got one
}
}
}
private void checkThrown(Throwable actual, Throwable expected) {
assertThat(actual).isInstanceOf(expected.getClass());
if (expected.getMessage() != null) {
assertThat(actual).hasMessage(expected.getMessage());
}
if (expected.getCause() != null) {
checkThrown(actual.getCause(), expected.getCause());
}
}
}
private static class ProtectedErrorCollector extends ErrorCollector {
@Override
protected void verify() {
try {
super.verify();
} catch (Error | RuntimeException e) {
throw e;
} catch (Throwable t) {
throw new RuntimeException(t);
}
}
List<Throwable> getErrors() {
return (List<Throwable>) readField(ErrorCollector.class, this, "errors");
}
}
}