blob: bed82433718dcfdba24ad3eb41c6e5e137c2bc70 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.test.junit.rules;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.lang.ref.WeakReference;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.geode.test.junit.rules.serializable.SerializableExternalResource;
/**
* Provides a reusable mechanism for executing tasks asynchronously in tests. This {@code Rule}
* creates an {@code ExecutorService} which is terminated after the scope of the {@code Rule}. This
* {@code Rule} can be used in tests for hangs, deadlocks, and infinite loops.
*
* <pre>
* private CountDownLatch hangLatch = new CountDownLatch(1);
*
* {@literal @}Rule
* public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
*
* {@literal @}Test
* public void doTest() throws Exception {
* Future&lt;Void&gt; result = executorServiceRule.runAsync(() -> {
* try {
* hangLatch.await();
* } catch (InterruptedException e) {
* throw new RuntimeException(e);
* }
* });
*
* assertThatThrownBy(() -> result.get(1, MILLISECONDS)).isInstanceOf(TimeoutException.class);
* }
* </pre>
*
* <p>
* The {@code Rule} can be configured to await termination by specifying
* {@link Builder#awaitTermination(long, TimeUnit)}. If all tasks have not terminated by the
* specified timeout, then {@code TimeoutException} will be thrown. This has the potential to
* obscure any {@code Throwable}s thrown by the test itself.
*
* <p>
* Example with awaitTermination enabled. Awaits up to timeout for all submitted tasks to terminate.
* This causes the {@code Rule} to invoke awaitTermination during its tear down:
*
* <pre>
* private CountDownLatch hangLatch = new CountDownLatch(1);
*
* {@literal @}Rule
* public ExecutorServiceRule executorServiceRule = ExecutorServiceRule.builder().awaitTermination(10, SECONDS).build();
*
* {@literal @}Test
* public void doTest() throws Exception {
* for (int i = 0; i < 10; i++) {
* executorServiceRule.runAsync(() -> {
* hangLatch.await();
* });
* }
* }
* </pre>
*/
@SuppressWarnings("unused")
public class ExecutorServiceRule extends SerializableExternalResource {
protected final boolean enableAwaitTermination;
protected final long awaitTerminationTimeout;
protected final TimeUnit awaitTerminationTimeUnit;
protected final boolean awaitTerminationBeforeShutdown;
protected final boolean useShutdown;
protected final boolean useShutdownNow;
protected transient volatile DedicatedThreadFactory threadFactory;
protected transient volatile ExecutorService executor;
/**
* Returns a {@code Builder} to configure a new {@code ExecutorServiceRule}.
*/
public static Builder builder() {
return new Builder();
}
protected ExecutorServiceRule(Builder builder) {
enableAwaitTermination = builder.enableAwaitTermination;
awaitTerminationTimeout = builder.awaitTerminationTimeout;
awaitTerminationTimeUnit = builder.awaitTerminationTimeUnit;
awaitTerminationBeforeShutdown = builder.awaitTerminationBeforeShutdown;
useShutdown = builder.useShutdown;
useShutdownNow = builder.useShutdownNow;
}
/**
* Constructs a {@code ExecutorServiceRule} which invokes {@code ExecutorService.shutdownNow()}
* during {@code tearDown}.
*/
public ExecutorServiceRule() {
enableAwaitTermination = false;
awaitTerminationTimeout = 0;
awaitTerminationTimeUnit = TimeUnit.NANOSECONDS;
awaitTerminationBeforeShutdown = false;
useShutdown = false;
useShutdownNow = true;
}
@Override
public void before() {
threadFactory = new DedicatedThreadFactory();
executor = Executors.newCachedThreadPool(threadFactory);
}
@Override
public void after() {
if (awaitTerminationBeforeShutdown) {
enableAwaitTermination();
}
if (useShutdown) {
executor.shutdown();
} else if (useShutdownNow) {
executor.shutdownNow();
}
if (!awaitTerminationBeforeShutdown) {
enableAwaitTermination();
}
}
private void enableAwaitTermination() {
if (enableAwaitTermination) {
try {
executor.awaitTermination(awaitTerminationTimeout, awaitTerminationTimeUnit);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
/**
* Returns a direct reference to the underlying {@code ExecutorService}.
*/
public ExecutorService getExecutorService() {
return executor;
}
/**
* Executes the given command at some time in the future.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be accepted for execution
* @throws NullPointerException if command is null
*/
public void execute(ThrowingRunnable command) {
executor.submit((Callable<Void>) () -> {
command.run();
return null;
});
}
/**
* Submits a value-returning task for execution and returns a Future representing the pending
* results of the task. The Future's {@code get} method will return the task's result upon
* successful completion.
*
* <p>
* If you would like to immediately block waiting for a task, you can use constructions of the
* form {@code result = exec.submit(aCallable).get();}
*
* @param task the task to submit
* @param <T> the type of the task's result
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be scheduled for execution
* @throws NullPointerException if the task is null
*/
public <T> Future<T> submit(Callable<T> task) {
return executor.submit(task);
}
/**
* Submits a Runnable task for execution and returns a Future representing that task. The Future's
* {@code get} method will return the given result upon successful completion.
*
* @param task the task to submit
* @param result the result to return
* @param <T> the type of the result
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be scheduled for execution
* @throws NullPointerException if the task is null
*/
public <T> Future<T> submit(ThrowingRunnable task, T result) {
FutureTask<T> futureTask = new FutureTask<>(() -> {
task.run();
return result;
});
executor.submit(futureTask);
return futureTask;
}
/**
* Submits a Runnable task for execution and returns a Future representing that task. The Future's
* {@code get} method will return {@code null} upon <em>successful</em> completion.
*
* @param task the task to submit
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be scheduled for execution
* @throws NullPointerException if the task is null
*/
public Future<Void> submit(ThrowingRunnable task) {
FutureTask<Void> futureTask = new FutureTask<>(() -> {
task.run();
return null;
});
executor.submit(futureTask);
return futureTask;
}
/**
* Returns a new CompletableFuture that is asynchronously completed by a task running in the
* dedicated executor after it runs the given action.
*
* @param runnable the action to run before completing the returned CompletableFuture
* @return the new CompletableFuture
*/
public CompletableFuture<Void> runAsync(Runnable runnable) {
return CompletableFuture.runAsync(runnable, executor);
}
/**
* Returns a new CompletableFuture that is asynchronously completed by a task running in the
* dedicated executor with the value obtained by calling the given Supplier.
*
* @param supplier a function returning the value to be used to complete the returned
* CompletableFuture
* @param <U> the function's return type
* @return the new CompletableFuture
*/
public <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return CompletableFuture.supplyAsync(supplier, executor);
}
/**
* Returns the {@code Thread}s that are directly in the {@code ExecutorService}'s
* {@code ThreadGroup} excluding subgroups.
*/
public Set<Thread> getThreads() {
return threadFactory.getThreads();
}
/**
* Returns an array of {@code Thread Ids} that are directly in the {@code ExecutorService}'s
* {@code ThreadGroup} excluding subgroups. {@code long[]} is returned to facilitate using JDK
* APIs such as {@code ThreadMXBean#getThreadInfo(long[], int)}.
*/
public long[] getThreadIds() {
Set<Thread> threads = getThreads();
long[] threadIds = new long[threads.size()];
int i = 0;
for (Thread thread : threads) {
threadIds[i++] = thread.getId();
}
return threadIds;
}
/**
* Returns thread dumps for the {@code Thread}s that are in the {@code ExecutorService}'s
* {@code ThreadGroup} excluding subgroups.
*/
public String dumpThreads() {
StringBuilder dumpWriter = new StringBuilder();
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(getThreadIds(), true, true);
for (ThreadInfo threadInfo : threadInfos) {
if (threadInfo == null) {
// sometimes ThreadMXBean.getThreadInfo returns array with one or more null elements
continue;
}
// ThreadInfo toString includes monitor and synchronizer details
dumpWriter.append(threadInfo);
}
return dumpWriter.toString();
}
/**
* This interface replaces {@link Runnable} in cases when execution of {@link #run()} method may
* throw exception.
*
* <p>
* Useful for capturing lambdas that throw exceptions.
*/
@FunctionalInterface
public interface ThrowingRunnable {
/**
* @throws Exception The exception that may be thrown
* @see Runnable#run()
*/
void run() throws Exception;
}
/**
* Modified version of {@code java.util.concurrent.Executors$DefaultThreadFactory} that uses
* a {@code Set<WeakReference<Thread>>} to track the {@code Thread}s in the factory's
* {@code ThreadGroup} excluding subgroups.
*/
protected static class DedicatedThreadFactory implements ThreadFactory {
private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
private final Set<WeakReference<Thread>> directThreads = new HashSet<>();
protected DedicatedThreadFactory() {
group = new ThreadGroup(ExecutorServiceRule.class.getSimpleName() + "-ThreadGroup");
namePrefix = "pool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon()) {
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
directThreads.add(new WeakReference<>(t));
return t;
}
protected Set<Thread> getThreads() {
Set<Thread> value = new HashSet<>();
for (WeakReference<Thread> reference : directThreads) {
Thread thread = reference.get();
if (thread != null) {
value.add(thread);
}
}
return value;
}
}
public static class Builder {
protected boolean enableAwaitTermination;
protected long awaitTerminationTimeout;
protected TimeUnit awaitTerminationTimeUnit = TimeUnit.NANOSECONDS;
protected boolean awaitTerminationBeforeShutdown = true;
protected boolean useShutdown;
protected boolean useShutdownNow = true;
protected Builder() {
// nothing
}
/**
* Enables invocation of {@code awaitTermination} during {@code tearDown}. Default is disabled.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
*/
public Builder awaitTermination(long timeout, TimeUnit unit) {
enableAwaitTermination = true;
awaitTerminationTimeout = timeout;
awaitTerminationTimeUnit = unit;
return this;
}
/**
* Enables invocation of {@code shutdown} during {@code tearDown}. Default is disabled.
*/
public Builder useShutdown() {
useShutdown = true;
useShutdownNow = false;
return this;
}
/**
* Enables invocation of {@code shutdownNow} during {@code tearDown}. Default is enabled.
*/
public Builder useShutdownNow() {
useShutdown = false;
useShutdownNow = true;
return this;
}
/**
* Specifies invocation of {@code awaitTermination} before {@code shutdown} or
* {@code shutdownNow}.
*/
public Builder awaitTerminationBeforeShutdown() {
awaitTerminationBeforeShutdown = true;
return this;
}
/**
* Specifies invocation of {@code awaitTermination} after {@code shutdown} or
* {@code shutdownNow}.
*/
public Builder awaitTerminationAfterShutdown() {
awaitTerminationBeforeShutdown = false;
return this;
}
/**
* Builds the instance of {@code ExecutorServiceRule}.
*/
public ExecutorServiceRule build() {
return new ExecutorServiceRule(this);
}
}
}