blob: 4e02cba52cf86baa2b9f7acd29f2cab6ddb3ad25 [file] [log] [blame]
package accord.utils;
import com.google.common.base.Preconditions;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.apache.cassandra.utils.concurrent.Future;
import java.util.List;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.BiFunction;
public class ReducingFuture<V> extends AsyncPromise<V>
{
private static final AtomicIntegerFieldUpdater<ReducingFuture> PENDING_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ReducingFuture.class, "pending");
private final List<? extends Future<V>> futures;
private final BiFunction<V, V, V> reducer;
private volatile int pending;
private ReducingFuture(List<? extends Future<V>> futures, BiFunction<V, V, V> reducer)
{
this.futures = futures;
this.reducer = reducer;
this.pending = futures.size();
futures.forEach(f -> f.addListener(this::operationComplete));
}
private <F extends io.netty.util.concurrent.Future<?>> void operationComplete(F future) throws Exception
{
if (isDone())
return;
if (!future.isSuccess())
{
tryFailure(future.cause());
}
else if (PENDING_UPDATER.decrementAndGet(this) == 0)
{
V result = futures.get(0).getNow();
for (int i=1, mi=futures.size(); i<mi; i++)
result = reducer.apply(result, futures.get(i).getNow());
trySuccess(result);
}
}
public static <T> Future<T> reduce(List<? extends Future<T>> futures, BiFunction<T, T, T> reducer)
{
Preconditions.checkArgument(!futures.isEmpty(), "future list is empty");
if (futures.size() == 1)
return futures.get(0);
return new ReducingFuture<>(futures, reducer);
}
}