blob: 53b6deb9b751ad5e5866d97ad83ae2aaac35d940 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.util;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
/**
* This class is aimed at simplifying work with {@code CompletableFuture}.
*/
public class FutureUtil {
/**
* Return a future that represents the completion of the futures in the provided list.
*
* @param futures
* @return
*/
public static <T> CompletableFuture<Void> waitForAll(List<CompletableFuture<T>> futures) {
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}
public static <T> CompletableFuture<T> failedFuture(Throwable t) {
CompletableFuture<T> future = new CompletableFuture<>();
future.completeExceptionally(t);
return future;
}
public static Throwable unwrapCompletionException(Throwable t) {
if (t instanceof CompletionException) {
return unwrapCompletionException(t.getCause());
} else {
return t;
}
}
/**
* Creates a new {@link CompletableFuture} instance with timeout handling.
*
* @param timeout the duration of the timeout
* @param executor the executor to use for scheduling the timeout
* @param exceptionSupplier the supplier for creating the exception
* @param <T> type parameter for the future
* @return the new {@link CompletableFuture} instance
*/
public static <T> CompletableFuture<T> createFutureWithTimeout(Duration timeout,
ScheduledExecutorService executor,
Supplier<Throwable> exceptionSupplier) {
return addTimeoutHandling(new CompletableFuture<>(), timeout, executor, exceptionSupplier);
}
/**
* Adds timeout handling to an existing {@link CompletableFuture}.
*
* @param future the target future
* @param timeout the duration of the timeout
* @param executor the executor to use for scheduling the timeout
* @param exceptionSupplier the supplier for creating the exception
* @param <T> type parameter for the future
* @return returns the original target future
*/
public static <T> CompletableFuture<T> addTimeoutHandling(CompletableFuture<T> future, Duration timeout,
ScheduledExecutorService executor,
Supplier<Throwable> exceptionSupplier) {
ScheduledFuture<?> scheduledFuture = executor.schedule(() -> {
if (!future.isDone()) {
future.completeExceptionally(exceptionSupplier.get());
}
}, timeout.toMillis(), TimeUnit.MILLISECONDS);
future.whenComplete((res, exception) -> scheduledFuture.cancel(false));
return future;
}
/**
* Creates a low-overhead timeout exception which is performance optimized to minimize allocations
* and cpu consumption. It sets the stacktrace of the exception to the given source class and
* source method name. The instances of this class can be cached or stored as constants and reused
* multiple times.
*
* @param message exception message
* @param sourceClass source class for manually filled in stacktrace
* @param sourceMethod source method name for manually filled in stacktrace
* @return new TimeoutException instance
*/
public static TimeoutException createTimeoutException(String message, Class<?> sourceClass, String sourceMethod) {
return new LowOverheadTimeoutException(message, sourceClass, sourceMethod);
}
private static class LowOverheadTimeoutException extends TimeoutException {
private static final long serialVersionUID = 1L;
LowOverheadTimeoutException(String message, Class<?> sourceClass, String sourceMethod) {
super(message);
setStackTrace(new StackTraceElement[]{new StackTraceElement(sourceClass.getName(), sourceMethod,
null, -1)});
}
@Override
public synchronized Throwable fillInStackTrace() {
return this;
}
}
}