blob: 36bc304a6b0846d6099d30aa6efd0cab656a5271 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.net;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ThrowableUtil;
import org.apache.cassandra.utils.concurrent.WaitQueue;
import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.*;
/**
* Netty's DefaultPromise uses a mutex to coordinate notifiers AND waiters between the eventLoop and the other threads.
* Since we register cross-thread listeners, this has the potential to block internode messaging for an unknown
* number of threads for an unknown period of time, if we are unlucky with the scheduler (which will certainly
* happen, just with some unknown but low periodicity)
*
* At the same time, we manage some other efficiencies:
* - We save some space when registering listeners, especially if there is only one listener, as we perform no
* extra allocations in this case.
* - We permit efficient initial state declaration, avoiding unnecessary CAS or lock acquisitions when mutating
* a Promise we are ourselves constructing (and can easily add more; only those we use have been added)
*
* We can also make some guarantees about our behaviour here, although we primarily mirror Netty.
* Specifically, we can guarantee that notifiers are always invoked in the order they are added (which may be true
* for netty, but was unclear and is not declared). This is useful for ensuring the correctness of some of our
* behaviours in OutboundConnection without having to jump through extra hoops.
*
* The implementation loosely follows that of Netty's DefaultPromise, with some slight changes; notably that we have
* no synchronisation on our listeners, instead using a CoW list that is cleared each time we notify listeners.
*
* We handle special values slightly differently. We do not use a special value for null, instead using
* a special value to indicate the result has not been set yet. This means that once isSuccess() holds,
* the result must be a correctly typed object (modulo generics pitfalls).
* All special values are also instances of FailureHolder, which simplifies a number of the logical conditions.
*
* @param <V>
*/
public class AsyncPromise<V> implements Promise<V>
{
private static final Logger logger = LoggerFactory.getLogger(AsyncPromise.class);
private final EventExecutor executor;
private volatile Object result;
private volatile GenericFutureListener<? extends Future<? super V>> listeners;
private volatile WaitQueue waiting;
private static final AtomicReferenceFieldUpdater<AsyncPromise, Object> resultUpdater = newUpdater(AsyncPromise.class, Object.class, "result");
private static final AtomicReferenceFieldUpdater<AsyncPromise, GenericFutureListener> listenersUpdater = newUpdater(AsyncPromise.class, GenericFutureListener.class, "listeners");
private static final AtomicReferenceFieldUpdater<AsyncPromise, WaitQueue> waitingUpdater = newUpdater(AsyncPromise.class, WaitQueue.class, "waiting");
private static final FailureHolder UNSET = new FailureHolder(null);
private static final FailureHolder UNCANCELLABLE = new FailureHolder(null);
private static final FailureHolder CANCELLED = new FailureHolder(ThrowableUtil.unknownStackTrace(new CancellationException(), AsyncPromise.class, "cancel(...)"));
private static final DeferredGenericFutureListener NOTIFYING = future -> {};
private static interface DeferredGenericFutureListener<F extends Future<?>> extends GenericFutureListener<F> {}
private static final class FailureHolder
{
final Throwable cause;
private FailureHolder(Throwable cause)
{
this.cause = cause;
}
}
public AsyncPromise(EventExecutor executor)
{
this(executor, UNSET);
}
private AsyncPromise(EventExecutor executor, FailureHolder initialState)
{
this.executor = executor;
this.result = initialState;
}
public AsyncPromise(EventExecutor executor, GenericFutureListener<? extends Future<? super V>> listener)
{
this(executor);
this.listeners = listener;
}
AsyncPromise(EventExecutor executor, FailureHolder initialState, GenericFutureListener<? extends Future<? super V>> listener)
{
this(executor, initialState);
this.listeners = listener;
}
public static <V> AsyncPromise<V> uncancellable(EventExecutor executor)
{
return new AsyncPromise<>(executor, UNCANCELLABLE);
}
public static <V> AsyncPromise<V> uncancellable(EventExecutor executor, GenericFutureListener<? extends Future<? super V>> listener)
{
return new AsyncPromise<>(executor, UNCANCELLABLE);
}
public Promise<V> setSuccess(V v)
{
if (!trySuccess(v))
throw new IllegalStateException("complete already: " + this);
return this;
}
public Promise<V> setFailure(Throwable throwable)
{
if (!tryFailure(throwable))
throw new IllegalStateException("complete already: " + this);
return this;
}
public boolean trySuccess(V v)
{
return trySet(v);
}
public boolean tryFailure(Throwable throwable)
{
return trySet(new FailureHolder(throwable));
}
public boolean setUncancellable()
{
if (trySet(UNCANCELLABLE))
return true;
return result == UNCANCELLABLE;
}
public boolean cancel(boolean b)
{
return trySet(CANCELLED);
}
/**
* Shared implementation of various promise completion methods.
* Updates the result if it is possible to do so, returning success/failure.
*
* If the promise is UNSET the new value will succeed;
* if it is UNCANCELLABLE it will succeed only if the new value is not CANCELLED
* otherwise it will fail, as isDone() is implied
*
* If the update succeeds, and the new state implies isDone(), any listeners and waiters will be notified
*/
private boolean trySet(Object v)
{
while (true)
{
Object current = result;
if (isDone(current) || (current == UNCANCELLABLE && v == CANCELLED))
return false;
if (resultUpdater.compareAndSet(this, current, v))
{
if (v != UNCANCELLABLE)
{
notifyListeners();
notifyWaiters();
}
return true;
}
}
}
public boolean isSuccess()
{
return isSuccess(result);
}
private static boolean isSuccess(Object result)
{
return !(result instanceof FailureHolder);
}
public boolean isCancelled()
{
return isCancelled(result);
}
private static boolean isCancelled(Object result)
{
return result == CANCELLED;
}
public boolean isDone()
{
return isDone(result);
}
private static boolean isDone(Object result)
{
return result != UNSET && result != UNCANCELLABLE;
}
public boolean isCancellable()
{
Object result = this.result;
return result == UNSET;
}
public Throwable cause()
{
Object result = this.result;
if (result instanceof FailureHolder)
return ((FailureHolder) result).cause;
return null;
}
/**
* if isSuccess(), returns the value, otherwise returns null
*/
@SuppressWarnings("unchecked")
public V getNow()
{
Object result = this.result;
if (isSuccess(result))
return (V) result;
return null;
}
public V get() throws InterruptedException, ExecutionException
{
await();
return getWhenDone();
}
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
{
if (!await(timeout, unit))
throw new TimeoutException();
return getWhenDone();
}
/**
* Shared implementation of get() after suitable await(); assumes isDone(), and returns
* either the success result or throws the suitable exception under failure
*/
@SuppressWarnings("unchecked")
private V getWhenDone() throws ExecutionException
{
Object result = this.result;
if (isSuccess(result))
return (V) result;
if (result == CANCELLED)
throw new CancellationException();
throw new ExecutionException(((FailureHolder) result).cause);
}
/**
* waits for completion; in case of failure rethrows the original exception without a new wrapping exception
* so may cause problems for reporting stack traces
*/
public Promise<V> sync() throws InterruptedException
{
await();
rethrowIfFailed();
return this;
}
/**
* waits for completion; in case of failure rethrows the original exception without a new wrapping exception
* so may cause problems for reporting stack traces
*/
public Promise<V> syncUninterruptibly()
{
awaitUninterruptibly();
rethrowIfFailed();
return this;
}
private void rethrowIfFailed()
{
Throwable cause = this.cause();
if (cause != null)
{
PlatformDependent.throwException(cause);
}
}
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener)
{
listenersUpdater.accumulateAndGet(this, listener, AsyncPromise::appendListener);
if (isDone())
notifyListeners();
return this;
}
public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>> ... listeners)
{
// this could be more efficient if we cared, but we do not
return addListener(future -> {
for (GenericFutureListener<? extends Future<? super V>> listener : listeners)
AsyncPromise.invokeListener((GenericFutureListener<Future<? super V>>)listener, future);
});
}
public Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener)
{
throw new UnsupportedOperationException();
}
public Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>> ... listeners)
{
throw new UnsupportedOperationException();
}
@SuppressWarnings("unchecked")
private void notifyListeners()
{
if (!executor.inEventLoop())
{
// submit this method, to guarantee we invoke in the submitted order
executor.execute(this::notifyListeners);
return;
}
if (listeners == null || listeners instanceof DeferredGenericFutureListener<?>)
return; // either no listeners, or we are already notifying listeners, so we'll get to the new one when ready
// first run our notifiers
while (true)
{
GenericFutureListener listeners = listenersUpdater.getAndSet(this, NOTIFYING);
if (listeners != null)
invokeListener(listeners, this);
if (listenersUpdater.compareAndSet(this, NOTIFYING, null))
return;
}
}
private static <F extends Future<?>> void invokeListener(GenericFutureListener<F> listener, F future)
{
try
{
listener.operationComplete(future);
}
catch (Throwable t)
{
logger.error("Failed to invoke listener {} to {}", listener, future, t);
}
}
private static <F extends Future<?>> GenericFutureListener<F> appendListener(GenericFutureListener<F> prevListener, GenericFutureListener<F> newListener)
{
GenericFutureListener<F> result = newListener;
if (prevListener != null && prevListener != NOTIFYING)
{
result = future -> {
invokeListener(prevListener, future);
// we will wrap the outer invocation with invokeListener, so no need to do it here too
newListener.operationComplete(future);
};
}
if (prevListener instanceof DeferredGenericFutureListener<?>)
{
GenericFutureListener<F> wrap = result;
result = (DeferredGenericFutureListener<F>) wrap::operationComplete;
}
return result;
}
public Promise<V> await() throws InterruptedException
{
await(0L, (signal, nanos) -> { signal.await(); return true; } );
return this;
}
public Promise<V> awaitUninterruptibly()
{
await(0L, (signal, nanos) -> { signal.awaitUninterruptibly(); return true; } );
return this;
}
public boolean await(long timeout, TimeUnit unit) throws InterruptedException
{
return await(unit.toNanos(timeout),
(signal, nanos) -> signal.awaitUntil(nanos + System.nanoTime()));
}
public boolean await(long timeoutMillis) throws InterruptedException
{
return await(timeoutMillis, TimeUnit.MILLISECONDS);
}
public boolean awaitUninterruptibly(long timeout, TimeUnit unit)
{
return await(unit.toNanos(timeout),
(signal, nanos) -> signal.awaitUntilUninterruptibly(nanos + System.nanoTime()));
}
public boolean awaitUninterruptibly(long timeoutMillis)
{
return awaitUninterruptibly(timeoutMillis, TimeUnit.MILLISECONDS);
}
interface Awaiter<T extends Throwable>
{
boolean await(WaitQueue.Signal value, long nanos) throws T;
}
/**
* A clean way to implement each variant of await using lambdas; we permit a nanos parameter
* so that we can implement this without any unnecessary lambda allocations, although not
* all implementations need the nanos parameter (i.e. those that wait indefinitely)
*/
private <T extends Throwable> boolean await(long nanos, Awaiter<T> awaiter) throws T
{
if (isDone())
return true;
WaitQueue.Signal await = registerToWait();
if (null != await)
return awaiter.await(await, nanos);
return true;
}
/**
* Register a signal that will be notified when the promise is completed;
* if the promise becomes completed before this signal is registered, null is returned
*/
private WaitQueue.Signal registerToWait()
{
WaitQueue waiting = this.waiting;
if (waiting == null && !waitingUpdater.compareAndSet(this, null, waiting = new WaitQueue()))
waiting = this.waiting;
assert waiting != null;
WaitQueue.Signal signal = waiting.register();
if (!isDone())
return signal;
signal.cancel();
return null;
}
private void notifyWaiters()
{
WaitQueue waiting = this.waiting;
if (waiting != null)
waiting.signalAll();
}
public String toString()
{
Object result = this.result;
if (isSuccess(result))
return "(success: " + result + ')';
if (result == UNCANCELLABLE)
return "(uncancellable)";
if (result == CANCELLED)
return "(cancelled)";
if (isDone(result))
return "(failure: " + ((FailureHolder) result).cause + ')';
return "(incomplete)";
}
}