blob: 1faf6bc2bc5a7f0c048704e461a4adc05a058bf3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.tuweni.concurrent;
import static java.util.Objects.requireNonNull;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nullable;
import io.vertx.core.Vertx;
import io.vertx.core.WorkerExecutor;
final class DefaultCompletableAsyncResult<T> implements CompletableAsyncResult<T> {
private final CompletableFuture<T> future;
DefaultCompletableAsyncResult() {
this(new CompletableFuture<>());
}
DefaultCompletableAsyncResult(CompletableFuture<T> future) {
this.future = future;
}
@Override
public boolean complete(@Nullable T value) {
return future.complete(value);
}
@Override
public boolean completeExceptionally(Throwable ex) {
return future.completeExceptionally(ex);
}
@Override
public boolean cancel() {
return future.cancel(false);
}
@Override
public boolean isDone() {
return future.isDone();
}
@Override
public boolean isCompletedExceptionally() {
return future.isCompletedExceptionally();
}
@Override
public boolean isCancelled() {
return future.isCancelled();
}
@Override
@Nullable
public T get() throws CompletionException, InterruptedException {
try {
return get(10, TimeUnit.SECONDS);
} catch (TimeoutException ex) {
throw new RuntimeException("Default timeout triggered for blocking call to AsyncResult::get()", ex);
}
}
@Override
@Nullable
public T get(long timeout, TimeUnit unit) throws CompletionException, TimeoutException, InterruptedException {
requireNonNull(unit);
try {
return future.get(timeout, unit);
} catch (ExecutionException ex) {
throw new CompletionException(ex.getMessage(), ex.getCause());
}
}
@Override
public <U> AsyncResult<U> then(Function<? super T, ? extends AsyncResult<U>> fn) {
requireNonNull(fn);
CompletableAsyncResult<U> asyncResult = AsyncResult.incomplete();
future.whenComplete((t, ex1) -> {
if (ex1 == null) {
try {
fn.apply(t).whenComplete((u, ex3) -> {
if (ex3 == null) {
asyncResult.complete(u);
} else {
asyncResult.completeExceptionally(ex3);
}
});
} catch (Throwable ex2) {
asyncResult.completeExceptionally(ex2);
}
} else {
asyncResult.completeExceptionally(ex1);
}
});
return asyncResult;
}
@Override
public <U> AsyncResult<U> thenSchedule(Vertx vertx, Function<? super T, ? extends AsyncResult<U>> fn) {
requireNonNull(vertx);
requireNonNull(fn);
CompletableAsyncResult<U> asyncResult = AsyncResult.incomplete();
future.whenComplete((t, ex1) -> {
if (ex1 == null) {
try {
vertx.runOnContext(ev -> {
try {
fn.apply(t).whenComplete((u, ex4) -> {
if (ex4 == null) {
asyncResult.complete(u);
} else {
asyncResult.completeExceptionally(ex4);
}
});
} catch (Throwable ex3) {
asyncResult.completeExceptionally(ex3);
}
});
} catch (Throwable ex2) {
asyncResult.completeExceptionally(ex2);
}
} else {
asyncResult.completeExceptionally(ex1);
}
});
return asyncResult;
}
@Override
public AsyncCompletion thenCompose(Function<? super T, ? extends AsyncCompletion> fn) {
requireNonNull(fn);
CompletableAsyncCompletion completion = AsyncCompletion.incomplete();
future.whenComplete((t, ex1) -> {
if (ex1 == null) {
try {
fn.apply(t).whenComplete(ex3 -> {
if (ex3 == null) {
completion.complete();
} else {
completion.completeExceptionally(ex3);
}
});
} catch (Throwable ex2) {
completion.completeExceptionally(ex2);
}
} else {
completion.completeExceptionally(ex1);
}
});
return completion;
}
@Override
public AsyncCompletion thenRun(Runnable action) {
requireNonNull(action);
return new DefaultCompletableAsyncCompletion(future.thenRun(action));
}
@Override
public AsyncCompletion thenScheduleRun(Vertx vertx, Runnable runnable) {
requireNonNull(vertx);
requireNonNull(runnable);
CompletableAsyncCompletion completion = AsyncCompletion.incomplete();
future.whenComplete((t, ex1) -> {
if (ex1 == null) {
try {
vertx.runOnContext(ev -> {
try {
runnable.run();
} catch (Throwable ex3) {
completion.completeExceptionally(ex3);
return;
}
completion.complete();
});
} catch (Throwable ex2) {
completion.completeExceptionally(ex2);
}
} else {
completion.completeExceptionally(ex1);
}
});
return completion;
}
@Override
public AsyncCompletion thenScheduleBlockingRun(Vertx vertx, Runnable runnable) {
requireNonNull(vertx);
requireNonNull(runnable);
CompletableAsyncCompletion completion = AsyncCompletion.incomplete();
future.whenComplete((t, ex1) -> {
if (ex1 == null) {
try {
vertx.executeBlocking(vertxFuture -> {
runnable.run();
vertxFuture.complete(null);
}, false, res -> {
if (res.succeeded()) {
completion.complete();
} else {
completion.completeExceptionally(res.cause());
}
});
} catch (Throwable ex2) {
completion.completeExceptionally(ex2);
}
} else {
completion.completeExceptionally(ex1);
}
});
return completion;
}
@Override
public AsyncCompletion thenScheduleBlockingRun(WorkerExecutor executor, Runnable runnable) {
requireNonNull(executor);
requireNonNull(runnable);
CompletableAsyncCompletion completion = AsyncCompletion.incomplete();
future.whenComplete((t, ex1) -> {
if (ex1 == null) {
try {
executor.executeBlocking(vertxFuture -> {
runnable.run();
vertxFuture.complete(null);
}, false, res -> {
if (res.succeeded()) {
completion.complete();
} else {
completion.completeExceptionally(res.cause());
}
});
} catch (Throwable ex2) {
completion.completeExceptionally(ex2);
}
} else {
completion.completeExceptionally(ex1);
}
});
return completion;
}
@Override
public <U> AsyncResult<U> thenApply(Function<? super T, ? extends U> fn) {
requireNonNull(fn);
return new DefaultCompletableAsyncResult<>(future.thenApply(fn));
}
@Override
public <U> AsyncResult<U> thenScheduleApply(Vertx vertx, Function<? super T, ? extends U> fn) {
requireNonNull(vertx);
requireNonNull(fn);
CompletableAsyncResult<U> asyncResult = AsyncResult.incomplete();
future.whenComplete((t, ex1) -> {
if (ex1 == null) {
try {
vertx.runOnContext(ev -> {
try {
asyncResult.complete(fn.apply(t));
} catch (Throwable ex3) {
asyncResult.completeExceptionally(ex3);
}
});
} catch (Throwable ex2) {
asyncResult.completeExceptionally(ex2);
}
} else {
asyncResult.completeExceptionally(ex1);
}
});
return asyncResult;
}
@Override
public <U> AsyncResult<U> thenScheduleBlockingApply(Vertx vertx, Function<? super T, ? extends U> fn) {
requireNonNull(vertx);
requireNonNull(fn);
CompletableAsyncResult<U> asyncResult = AsyncResult.incomplete();
future.whenComplete((t, ex1) -> {
if (ex1 == null) {
try {
vertx.<U>executeBlocking(vertxFuture -> vertxFuture.complete(fn.apply(t)), false, res -> {
if (res.succeeded()) {
asyncResult.complete(res.result());
} else {
asyncResult.completeExceptionally(res.cause());
}
});
} catch (Throwable ex2) {
asyncResult.completeExceptionally(ex2);
}
} else {
asyncResult.completeExceptionally(ex1);
}
});
return asyncResult;
}
@Override
public <U> AsyncResult<U> thenScheduleBlockingApply(WorkerExecutor executor, Function<? super T, ? extends U> fn) {
requireNonNull(executor);
requireNonNull(fn);
CompletableAsyncResult<U> asyncResult = AsyncResult.incomplete();
future.whenComplete((t, ex1) -> {
if (ex1 == null) {
try {
executor.<U>executeBlocking(vertxFuture -> vertxFuture.complete(fn.apply(t)), false, res -> {
if (res.succeeded()) {
asyncResult.complete(res.result());
} else {
asyncResult.completeExceptionally(res.cause());
}
});
} catch (Throwable ex2) {
asyncResult.completeExceptionally(ex2);
}
} else {
asyncResult.completeExceptionally(ex1);
}
});
return asyncResult;
}
@Override
public AsyncCompletion thenAccept(Consumer<? super T> consumer) {
requireNonNull(consumer);
return new DefaultCompletableAsyncCompletion(future.thenAccept(consumer));
}
@Override
public <U> AsyncCompletion thenAcceptBoth(AsyncResult<? extends U> other, BiConsumer<? super T, ? super U> consumer) {
requireNonNull(other);
requireNonNull(consumer);
return new DefaultCompletableAsyncCompletion(future.thenAccept(t -> other.thenAccept(u -> consumer.accept(t, u))));
}
@Override
public <U, V> AsyncResult<V> thenCombine(
AsyncResult<? extends U> other,
BiFunction<? super T, ? super U, ? extends V> fn) {
requireNonNull(other);
requireNonNull(fn);
CompletableAsyncResult<V> asyncResult = AsyncResult.incomplete();
future.whenComplete((t, ex1) -> {
if (ex1 == null) {
try {
other.whenComplete((u, ex3) -> {
if (ex3 == null) {
asyncResult.complete(fn.apply(t, u));
} else {
asyncResult.completeExceptionally(ex3);
}
});
} catch (Throwable ex2) {
asyncResult.completeExceptionally(ex2);
}
} else {
asyncResult.completeExceptionally(ex1);
}
});
return asyncResult;
}
@Override
public AsyncResult<T> exceptionally(Function<Throwable, ? extends T> fn) {
requireNonNull(fn);
return new DefaultCompletableAsyncResult<>(future.exceptionally(fn::apply));
}
@Override
public AsyncResult<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) {
requireNonNull(action);
return new DefaultCompletableAsyncResult<>(future.whenComplete(action::accept));
}
@Override
public <U> AsyncResult<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {
requireNonNull(fn);
return new DefaultCompletableAsyncResult<>(future.handle(fn::apply));
}
@Override
public AsyncCompletion accept(BiConsumer<? super T, Throwable> consumer) {
requireNonNull(consumer);
return new DefaultCompletableAsyncCompletion(future.handle((t, ex) -> {
consumer.accept(t, ex);
return null;
}));
}
}