blob: 67a7d84b26fe437cd3397f218b2e8ae040ed086f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Helper class for processing futures.
*/
@InterfaceAudience.Private
public final class FutureUtils {
private static final Logger LOG = LoggerFactory.getLogger(FutureUtils.class);
private FutureUtils() {
}
/**
* This is method is used when you just want to add a listener to the given future. We will call
* {@link CompletableFuture#whenComplete(BiConsumer)} to register the {@code action} to the
* {@code future}. Ignoring the return value of a Future is considered as a bad practice as it may
* suppress exceptions thrown from the code that completes the future, and this method will catch
* all the exception thrown from the {@code action} to catch possible code bugs.
* <p/>
* And the error phone check will always report FutureReturnValueIgnored because every method in
* the {@link CompletableFuture} class will return a new {@link CompletableFuture}, so you always
* have one future that has not been checked. So we introduce this method and add a suppress
* warnings annotation here.
*/
@SuppressWarnings("FutureReturnValueIgnored")
public static <T> void addListener(CompletableFuture<T> future,
BiConsumer<? super T, ? super Throwable> action) {
future.whenComplete((resp, error) -> {
try {
// See this post on stack overflow(shorten since the url is too long),
// https://s.apache.org/completionexception
// For a chain of CompleableFuture, only the first child CompletableFuture can get the
// original exception, others will get a CompletionException, which wraps the original
// exception. So here we unwrap it before passing it to the callback action.
action.accept(resp, unwrapCompletionException(error));
} catch (Throwable t) {
LOG.error("Unexpected error caught when processing CompletableFuture", t);
}
});
}
/**
* Almost the same with {@link #addListener(CompletableFuture, BiConsumer)} method above, the only
* exception is that we will call
* {@link CompletableFuture#whenCompleteAsync(BiConsumer, Executor)}.
* @see #addListener(CompletableFuture, BiConsumer)
*/
@SuppressWarnings("FutureReturnValueIgnored")
public static <T> void addListener(CompletableFuture<T> future,
BiConsumer<? super T, ? super Throwable> action, Executor executor) {
future.whenCompleteAsync((resp, error) -> {
try {
action.accept(resp, unwrapCompletionException(error));
} catch (Throwable t) {
LOG.error("Unexpected error caught when processing CompletableFuture", t);
}
}, executor);
}
/**
* Return a {@link CompletableFuture} which is same with the given {@code future}, but execute all
* the callbacks in the given {@code executor}.
*/
public static <T> CompletableFuture<T> wrapFuture(CompletableFuture<T> future,
Executor executor) {
CompletableFuture<T> wrappedFuture = new CompletableFuture<>();
addListener(future, (r, e) -> {
if (e != null) {
wrappedFuture.completeExceptionally(e);
} else {
wrappedFuture.complete(r);
}
}, executor);
return wrappedFuture;
}
/**
* Get the cause of the {@link Throwable} if it is a {@link CompletionException}.
*/
public static Throwable unwrapCompletionException(Throwable error) {
if (error instanceof CompletionException) {
Throwable cause = error.getCause();
if (cause != null) {
return cause;
}
}
return error;
}
// This method is used to record the stack trace that calling the FutureUtils.get method. As in
// async client, the retry will be done in the retry timer thread, so the exception we get from
// the CompletableFuture will have a stack trace starting from the root of the retry timer. If we
// just throw this exception out when calling future.get(by unwrapping the ExecutionException),
// the upper layer even can not know where is the exception thrown...
// See HBASE-22316.
private static void setStackTrace(Throwable error) {
StackTraceElement[] localStackTrace = Thread.currentThread().getStackTrace();
StackTraceElement[] originalStackTrace = error.getStackTrace();
StackTraceElement[] newStackTrace =
new StackTraceElement[localStackTrace.length + originalStackTrace.length + 1];
System.arraycopy(localStackTrace, 0, newStackTrace, 0, localStackTrace.length);
newStackTrace[localStackTrace.length] =
new StackTraceElement("--------Future", "get--------", null, -1);
System.arraycopy(originalStackTrace, 0, newStackTrace, localStackTrace.length + 1,
originalStackTrace.length);
error.setStackTrace(newStackTrace);
}
/**
* If we could propagate the given {@code error} directly, we will fill the stack trace with the
* current thread's stack trace so it is easier to trace where is the exception thrown. If not, we
* will just create a new IOException and then throw it.
*/
public static IOException rethrow(Throwable error) throws IOException {
if (error instanceof IOException) {
setStackTrace(error);
throw (IOException) error;
} else if (error instanceof RuntimeException) {
setStackTrace(error);
throw (RuntimeException) error;
} else if (error instanceof Error) {
setStackTrace(error);
throw (Error) error;
} else {
throw new IOException(error);
}
}
/**
* A helper class for getting the result of a Future, and convert the error to an
* {@link IOException}.
*/
public static <T> T get(Future<T> future) throws IOException {
try {
return future.get();
} catch (InterruptedException e) {
throw (IOException) new InterruptedIOException().initCause(e);
} catch (ExecutionException e) {
throw rethrow(e.getCause());
}
}
/**
* A helper class for getting the result of a Future with timeout, and convert the error to an
* {@link IOException}.
*/
public static <T> T get(Future<T> future, long timeout, TimeUnit unit) throws IOException {
try {
return future.get(timeout, unit);
} catch (InterruptedException e) {
throw (IOException) new InterruptedIOException().initCause(e);
} catch (ExecutionException e) {
throw rethrow(e.getCause());
} catch (TimeoutException e) {
throw new TimeoutIOException(e);
}
}
/**
* Returns a CompletableFuture that is already completed exceptionally with the given exception.
*/
public static <T> CompletableFuture<T> failedFuture(Throwable e) {
CompletableFuture<T> future = new CompletableFuture<>();
future.completeExceptionally(e);
return future;
}
}