blob: 5773e68524682b5622c97d3721f787c94187134a [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package accord.utils.async;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiConsumer;
import java.util.function.Function;
import accord.api.VisibleForImplementation;
import accord.utils.Invariants;
public class AsyncResults
public static final AsyncResult<Void> SUCCESS_VOID = success(null);
private AsyncResults() {}
private static class Result<V>
final V value;
final Throwable failure;
public Result(V value, Throwable failure)
this.value = value;
this.failure = failure;
static class AbstractResult<V> implements AsyncResult<V>
private static final AtomicReferenceFieldUpdater<AbstractResult, Object> STATE = AtomicReferenceFieldUpdater.newUpdater(AbstractResult.class, Object.class, "state");
private volatile Object state;
private static class Listener<V>
final BiConsumer<? super V, Throwable> callback;
Listener<V> next;
public Listener(BiConsumer<? super V, Throwable> callback)
this.callback = callback;
private void notify(Listener<V> listener, Result<V> result)
List<Throwable> failures = null;
while (listener != null)
listener.callback.accept(result.value, result.failure);
catch (Throwable t)
if (failures == null)
failures = new ArrayList<>();
listener =;
if (failures != null)
IllegalStateException f = new IllegalStateException("Callbacks threw");
throw f;
boolean trySetResult(Result<V> result)
while (true)
Object current = state;
if (current instanceof Result)
return false;
Listener<V> listener = (Listener<V>) current;
if (STATE.compareAndSet(this, current, result))
notify(listener, result);
return true;
protected boolean trySetResult(V result, Throwable failure)
return trySetResult(new Result<>(result, failure));
private AsyncChain<V> newChain()
return new AsyncChains.Head<V>()
protected void start(BiConsumer<? super V, Throwable> callback)
void setResult(V result, Throwable failure)
if (!trySetResult(result, failure))
IllegalStateException f = new IllegalStateException("Result has already been set on " + this);
if (failure != null)
throw f;
public <T> AsyncChain<T> map(Function<? super V, ? extends T> mapper)
return newChain().map(mapper);
public <T> AsyncChain<T> flatMap(Function<? super V, ? extends AsyncChain<T>> mapper)
return newChain().flatMap(mapper);
public AsyncResult<V> addCallback(BiConsumer<? super V, Throwable> callback)
Listener<V> listener = null;
while (true)
Object current = state;
if (current instanceof Result)
Result<V> result = (Result<V>) current;
callback.accept(result.value, result.failure);
return this;
if (listener == null)
listener = new Listener<>(callback); = (Listener<V>) current;
if (STATE.compareAndSet(this, current, listener))
return this;
public boolean isDone()
return state instanceof Result;
public boolean isSuccess()
Object current = state;
return current instanceof Result && ((Result) current).failure == null;
private Result<V> getResult()
Object current = state;
Invariants.checkState(current instanceof Result);
return (Result<V>) current;
public V result()
Result<V> result = getResult();
if (result.failure != null)
throw new IllegalStateException("Result failed", result.failure);
return result.value;
public Throwable failure()
Result<V> result = getResult();
if (result.failure == null)
throw new IllegalStateException("Result succeeded");
return result.failure;
public String toString()
return getClass().getSimpleName() + "{status=" + (isDone() ? isSuccess() ? "success" : "failure" : "pending") + "}";
static class Chain<V> extends AbstractResult<V>
public Chain(AsyncChain<V> chain)
public static class SettableResult<V> extends AbstractResult<V> implements AsyncResult.Settable<V>
public boolean trySuccess(V value)
return trySetResult(value, null);
public boolean tryFailure(Throwable throwable)
return trySetResult(null, throwable);
static class Immediate<V> implements AsyncResult<V>
private final V value;
private final Throwable failure;
Immediate(V value)
this.value = value;
this.failure = null;
Immediate(Throwable failure)
this.value = null;
this.failure = failure;
private AsyncChain<V> newChain()
return new AsyncChains.Head<V>()
protected void start(BiConsumer<? super V, Throwable> callback)
public <T> AsyncChain<T> map(Function<? super V, ? extends T> mapper)
return newChain().map(mapper);
public <T> AsyncChain<T> flatMap(Function<? super V, ? extends AsyncChain<T>> mapper)
return newChain().flatMap(mapper);
public AsyncResult<V> addCallback(BiConsumer<? super V, Throwable> callback)
callback.accept(value, failure);
return this;
public boolean isDone()
return true;
public boolean isSuccess()
return failure == null;
* Creates an AsyncResult for the given chain. This calls begin on the supplied chain
public static <V> AsyncResult<V> forChain(AsyncChain<V> chain)
return new Chain<>(chain);
public static <V> AsyncResult<V> success(V value)
return new Immediate<>(value);
public static <V> AsyncResult<V> failure(Throwable failure)
return new Immediate<>(failure);
public static <V> AsyncResult.Settable<V> settable()
return new SettableResult<>();
* An AsyncResult that also implements Runnable
* @param <V>
public static class RunnableResult<V> extends AbstractResult<V> implements Runnable
protected final Callable<V> callable;
public RunnableResult(Callable<V> callable)
this.callable = callable;
public void run()
// There are two different type of exceptions: user function throws, listener throws. To make sure this is clear,
// make sure to catch the exception from the user function and set as failed, and let the listener exceptions bubble up.
V call;
call =;
catch (Throwable t)
trySetResult(null, t);
trySetResult(call, null);
public static <V> RunnableResult<V> runnableResult(Callable<V> callable)
return new RunnableResult<>(callable);
public static RunnableResult<Void> runnableResult(Runnable runnable)
return new RunnableResult<>(() -> {;
return null;