blob: 0ef35d5dde3ac3cac6bb1cfca9f2e08e8924ae2f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.utils.concurrent;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import javax.annotation.Nullable;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.ListenableFuture; // checkstyle: permit this import
import io.netty.util.concurrent.GenericFutureListener;
/**
* Our default {@link Future} implementation, with all state being managed without locks (except those used by the JVM).
*
* Some implementation comments versus Netty's default promise:
* - We permit efficient initial state declaration, avoiding unnecessary CAS or lock acquisitions when mutating
* a Promise we are ourselves constructing (and can easily add more; only those we use have been added)
* - We guarantee the order of invocation of listeners (and callbacks etc, and with respect to each other)
* - We save some space when registering listeners, especially if there is only one listener, as we perform no
* extra allocations in this case.
* - We implement our invocation list as a concurrent stack, that is cleared on notification
* - We handle special values slightly differently.
* - We do not use a special value for null, instead using a special value to indicate the result has not been set.
* This means that once isSuccess() holds, the result must be a correctly typed object (modulo generics pitfalls).
* - All special values are also instances of FailureHolder, which simplifies a number of the logical conditions.
*/
public class AsyncFuture<V> extends AbstractFuture<V>
{
@SuppressWarnings({ "rawtypes" })
private static final AtomicReferenceFieldUpdater<AsyncFuture, WaitQueue> waitingUpdater = AtomicReferenceFieldUpdater.newUpdater(AsyncFuture.class, WaitQueue.class, "waiting");
@SuppressWarnings({ "unused" })
private volatile WaitQueue waiting;
public AsyncFuture()
{
super();
}
protected AsyncFuture(V immediateSuccess)
{
super(immediateSuccess);
}
protected AsyncFuture(Throwable immediateFailure)
{
super(immediateFailure);
}
protected AsyncFuture(FailureHolder initialState)
{
super(initialState);
}
protected AsyncFuture(GenericFutureListener<? extends io.netty.util.concurrent.Future<? super V>> listener)
{
super(listener);
}
protected AsyncFuture(FailureHolder initialState, GenericFutureListener<? extends io.netty.util.concurrent.Future<? super V>> listener)
{
super(initialState, listener);
}
/**
* Shared implementation of various promise completion methods.
* Updates the result if it is possible to do so, returning success/failure.
*
* If the promise is UNSET the new value will succeed;
* if it is UNCANCELLABLE it will succeed only if the new value is not CANCELLED
* otherwise it will fail, as isDone() is implied
*
* If the update succeeds, and the new state implies isDone(), any listeners and waiters will be notified
*/
boolean trySet(Object v)
{
while (true)
{
Object current = result;
if (isDone(current) || (current == UNCANCELLABLE && (v == CANCELLED || v == UNCANCELLABLE)))
return false;
if (resultUpdater.compareAndSet(this, current, v))
{
if (v != UNCANCELLABLE)
{
ListenerList.notify(listenersUpdater, this);
AsyncAwaitable.signalAll(waitingUpdater, this);
}
return true;
}
}
}
/**
* Logically append {@code newListener} to {@link #listeners}
* (at this stage it is a stack, so we actually prepend)
*
* @param newListener must be either a {@link ListenerList} or {@link GenericFutureListener}
*/
void appendListener(ListenerList<V> newListener)
{
ListenerList.push(listenersUpdater, this, newListener);
if (isDone())
ListenerList.notify(listenersUpdater, this);
}
/**
* Support {@link com.google.common.util.concurrent.Futures#transform} natively
*
* See {@link #addListener(GenericFutureListener)} for ordering semantics.
*/
@Override
public <T> Future<T> map(Function<? super V, ? extends T> mapper, Executor executor)
{
return map(new AsyncFuture<>(), mapper, executor);
}
/**
* Support {@link com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)} natively
*
* See {@link #addListener(GenericFutureListener)} for ordering semantics.
*/
@Override
public <T> Future<T> flatMap(Function<? super V, ? extends Future<T>> flatMapper, @Nullable Executor executor)
{
return flatMap(new AsyncFuture<>(), flatMapper, executor);
}
/**
* Wait for this future to complete {@link Awaitable#await()}
*/
@Override
public AsyncFuture<V> await() throws InterruptedException
{
//noinspection unchecked
return AsyncAwaitable.await(waitingUpdater, Future::isDone, this);
}
@Override
public boolean awaitUntil(long nanoTimeDeadline) throws InterruptedException
{
return AsyncAwaitable.awaitUntil(waitingUpdater, Future::isDone, this, nanoTimeDeadline);
}
}