blob: 6c3b45905c615770f06a9a437fa7415157257352 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
package org.apache.geode.test.dunit;
import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.geode.SystemFailure;
* An {@code AsyncInvocation} represents the invocation of a remote invocation that executes
* asynchronously from its caller. An instance of {@code AsyncInvocation} provides information about
* the invocation such as any exception that it may have thrown.
* <p>
* {@code AsyncInvocation} can be used as follows:
* <pre>
* AsyncInvocation ai1 = vm.invokeAsync(() -> Test.method1());
* AsyncInvocation ai2 = vm.invokeAsync(() -> Test.method2());
* ai1.await();
* ai2.await();
* </pre>
* @param <V> The result type returned by this AsyncInvocation's {@code get} methods
* @see VM#invokeAsync(Class, String)
public class AsyncInvocation<V> implements Future<V> {
private static final long DEFAULT_JOIN_MILLIS = getTimeout().getValueInMS();
private final Thread thread;
private final AtomicReference<V> resultValue = new AtomicReference<>();
/** An exception thrown while this {@code AsyncInvocation} ran */
private final AtomicReference<Throwable> resultThrowable = new AtomicReference<>();
/** The object (or class) that is the target of this {@code AsyncInvocation} */
private Object target;
/** The name of the method being invoked */
private String methodName;
/** True if this {@code AsyncInvocation} has been cancelled */
private boolean cancelled;
* Creates a new {@code AsyncInvocation}.
* @param target The object or {@link Class} on which the remote method was invoked
* @param methodName The name of the method being invoked
* @param work The actual invocation of the method
public AsyncInvocation(final Object target, final String methodName, final Callable<V> work) { = target;
this.methodName = methodName;
thread =
new Thread(new AsyncInvocationGroup(), runnable(work), getName(target, methodName));
* Returns the target of this async method invocation.
* @deprecated This method is not required for anything.
public Object getTarget() {
return target;
* Returns the name of the method being invoked remotely.
* @deprecated This method is not required for anything.
public String getMethodName() {
return methodName;
* Returns whether or not an exception occurred during this async method invocation.
* @throws AssertionError if this {@code AsyncInvocation} is not done.
public boolean exceptionOccurred() {
return getException() != null;
* Returns the exception that was thrown during this async method invocation.
* @throws AssertionError if this {@code AsyncInvocation} is not done.
public Throwable getException() {
try {
checkIsDone("Exception status not available while thread is alive.");
} catch (IllegalStateException illegalStateException) {
throw new AssertionError(illegalStateException);
if (resultThrowable.get() instanceof RMIException) { // TODO: delete our RMIException
return resultThrowable.get().getCause();
} else {
return resultThrowable.get();
* Throws {@code AssertionError} wrapping any {@code Exception} thrown by this
* {@code AsyncInvocation}.
* @return this {@code AsyncInvocation}
* @throws AssertionError wrapping any {@code Exception} thrown by this {@code AsyncInvocation}.
public AsyncInvocation<V> checkException() {
if (resultThrowable.get() != null) {
throw new AssertionError("An exception occurred during asynchronous invocation.",
return this;
* Returns the result of this {@code AsyncInvocation}.
* @return the result of this {@code AsyncInvocation}
* @throws AssertionError wrapping any {@code Exception} thrown by this {@code AsyncInvocation}.
* @throws AssertionError wrapping a {@code TimeoutException} if this {@code AsyncInvocation}
* fails to complete within the default timeout of 60 seconds as defined by
* @throws InterruptedException if the current thread is interrupted.
* @deprecated Please use {@link #get()} instead.
public V getResult() throws InterruptedException {
checkIsDone("Return value not available while thread is alive.");
return resultValue.get();
* Returns the result of this {@code AsyncInvocation}.
* @param millis the time to wait in milliseconds
* @return the result of this {@code AsyncInvocation}
* @throws AssertionError wrapping any {@code Exception} thrown by this {@code AsyncInvocation}.
* @throws AssertionError wrapping a {@code TimeoutException} if this {@code AsyncInvocation}
* fails to complete within the specified timeout of {@code millis}.
* @throws InterruptedException if the current thread is interrupted.
* @deprecated Please use {@link #get(long, TimeUnit)} instead.
public V getResult(final long millis) throws InterruptedException {
try {
return get(millis, TimeUnit.MILLISECONDS);
} catch (ExecutionException executionException) {
throw new AssertionError(executionException);
} catch (TimeoutException timeoutException) {
throw new AssertionError(timeoutException);
* Returns the result of this {@code AsyncInvocation}.
* @return the result of this {@code AsyncInvocation}
* @throws AssertionError if this {@code AsyncInvocation} is not done.
* @deprecated Please use {@link #get()} instead.
public V getReturnValue() {
checkIsDone("Return value not available while thread is alive.");
return resultValue.get();
* Waits at most {@code millis} milliseconds for this {@code AsyncInvocation} to complete. A
* timeout of {@code 0} means to wait forever.
* @param millis the time to wait in milliseconds
* @return this {@code AsyncInvocation}
* @throws IllegalArgumentException if the value of {@code millis} is negative.
* @throws InterruptedException if the current thread is interrupted.
public synchronized AsyncInvocation<V> join(final long millis) throws InterruptedException {
return this;
* Waits at most {@code millis} milliseconds plus {@code nanos} nanoseconds for this
* {@code AsyncInvocation} to complete.
* @param millis the time to wait in milliseconds
* @param nanos {@code 0-999999} additional nanoseconds to wait
* @return this {@code AsyncInvocation}
* @throws IllegalArgumentException if the value of {@code millis} is negative, or the value of
* {@code nanos} is not in the range {@code 0-999999}.
* @throws InterruptedException if the current thread is interrupted.
public synchronized AsyncInvocation<V> join(final long millis, final int nanos)
throws InterruptedException {
thread.join(millis, nanos);
return this;
* Waits for this thread to die up to a default of 60 seconds as defined by
* @return this {@code AsyncInvocation}
* @throws InterruptedException if the current thread is interrupted.
public AsyncInvocation<V> join() throws InterruptedException {
// do NOT invoke Thread#join() without a timeout
return this;
* Start this {@code AsyncInvocation}.
* @return this {@code AsyncInvocation}
public synchronized AsyncInvocation<V> start() {
return this;
* Return this {@code AsyncInvocation}'s work thread.
* @return this {@code AsyncInvocation}'s work thread.
public synchronized Thread getThread() {
return thread;
* Tests if this {@code AsyncInvocation}'s thread is alive. A thread is alive if it has been
* started and has not yet died.
* @return {@code true} if this {@code AsyncInvocation}'s thread is alive; {@code false}
* otherwise.
public synchronized boolean isAlive() {
return thread.isAlive();
public synchronized boolean isCancelled() {
return cancelled;
public synchronized boolean isDone() {
return !thread.isAlive(); // state != NEW;
public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
if (thread.isAlive()) {
if (mayInterruptIfRunning) {
cancelled = true;
return true;
return false;
* Waits if necessary for at most the given time for the computation to complete.
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return this {@code AsyncInvocation}
* @throws AssertionError wrapping any {@code Exception} thrown by this {@code AsyncInvocation}.
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an exception
* @throws InterruptedException if the current thread is interrupted.
* @throws TimeoutException if the wait timed out
public AsyncInvocation<V> await(final long timeout, final TimeUnit unit)
throws ExecutionException, InterruptedException, TimeoutException {
long millis = unit.toMillis(timeout);
return this;
* Waits if necessary for at most the given time for the computation to complete.
* @return this {@code AsyncInvocation}
* @throws AssertionError wrapping any {@code Exception} thrown by this {@code AsyncInvocation}.
* @throws AssertionError wrapping a {@code TimeoutException} if this {@code AsyncInvocation}
* fails to complete within the default timeout of 60 seconds as defined by
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an exception
* @throws InterruptedException if the current thread is interrupted.
public AsyncInvocation<V> await() throws ExecutionException, InterruptedException {
try {
} catch (TimeoutException timeoutException) {
throw new AssertionError(timeoutException);
* Waits if necessary for the work to complete, and then returns the result of this
* {@code AsyncInvocation}.
* @return the result of this {@code AsyncInvocation}
* @throws AssertionError wrapping any {@code Exception} thrown by this {@code AsyncInvocation}.
* @throws AssertionError wrapping a {@code TimeoutException} if this {@code AsyncInvocation}
* fails to complete within the default timeout of 60 seconds as defined by
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an exception
* @throws InterruptedException if the current thread is interrupted.
public V get() throws ExecutionException, InterruptedException {
try {
} catch (TimeoutException timeoutException) {
throw new AssertionError(timeoutException);
* Waits if necessary for at most the given time for the computation to complete, and then
* retrieves its result, if available.
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the result of this {@code AsyncInvocation}
* @throws AssertionError wrapping any {@code Exception} thrown by this {@code AsyncInvocation}.
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an exception
* @throws InterruptedException if the current thread is interrupted.
* @throws TimeoutException if the wait timed out
public V get(final long timeout, final TimeUnit unit)
throws ExecutionException, InterruptedException, TimeoutException {
long millis = unit.toMillis(timeout);
return resultValue.get();
* Returns the identifier of this {@code AsyncInvocation}'s thread. The thread ID is a positive
* <tt>long</tt> number generated when this thread was created. The thread ID is unique and
* remains unchanged during its lifetime. When a thread is terminated, this thread ID may be
* reused.
* @return this {@code AsyncInvocation}'s thread's ID.
public long getId() {
return thread.getId();
public String toString() {
return "AsyncInvocation{" + "target=" + target + ", methodName='" + methodName + '\'' + '}';
* Throws {@code IllegalStateException} if this {@code AsyncInvocation} is not done.
* @param message The value to be used in constructing detail message
* @return this {@code AsyncInvocation}
* @throws IllegalStateException if this {@code AsyncInvocation} is not done.
private AsyncInvocation<V> checkIsDone(final String message) {
if (thread.isAlive()) {
throw new IllegalStateException(message);
return this;
* Throws {@code AssertionError} wrapping a {@code TimeoutException} if this
* {@code AsyncInvocation} fails to complete within the default timeout of 60 seconds as defined
* by {@link #DEFAULT_JOIN_MILLIS}.
* @return this {@code AsyncInvocation}
* @throws TimeoutException if this {@code AsyncInvocation} fails to complete within the default
* timeout of 60 seconds as defined by {@link #DEFAULT_JOIN_MILLIS}.
private AsyncInvocation<V> timeoutIfAlive(final long timeout) throws TimeoutException {
if (thread.isAlive()) {
throw new TimeoutException(
"Timed out waiting " + timeout + " milliseconds for AsyncInvocation to complete.");
return this;
private Runnable runnable(final Callable<V> work) {
return () -> {
try {
} catch (Throwable throwable) {
* Returns the name of a {@code AsyncInvocation} based on its {@code targetObject} and
* {@code methodName}.
private static String getName(final Object target, final String methodName) {
StringBuilder sb = new StringBuilder(methodName);
sb.append(" invoked on ");
if (target instanceof Class) {
sb.append("class ");
sb.append(((Class) target).getName());
} else {
sb.append("an instance of ");
return sb.toString();
* A {@code ThreadGroup} that notices when an exception occurs during an {@code AsyncInvocation}.
private class AsyncInvocationGroup extends ThreadGroup {
private AsyncInvocationGroup() {
super("Async Invocations");
public void uncaughtException(Thread thread, Throwable throwable) {
if (throwable instanceof VirtualMachineError) {
SystemFailure.setFailure((VirtualMachineError) throwable); // don't throw