blob: 25bdf02778be7387a3622a424fe82a1ddaa4243d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.utils.concurrent;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Predicate;
import net.nicoulaj.compilecommand.annotations.Inline;
import org.apache.cassandra.utils.Shared;
import org.apache.cassandra.utils.Intercept;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
import static org.apache.cassandra.utils.concurrent.WaitQueue.newWaitQueue;
import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
/**
* A generic signal consumer, supporting all of the typical patterns used in Cassandra.
* All of the methods defined in {@link Awaitable} may be waited on without a loop,
* as this interface declares that there are no spurious wake-ups.
*/
@Shared(scope = SIMULATION)
public interface Awaitable
{
/**
* Await until the deadline (in nanoTime), throwing any interrupt.
* No spurious wakeups.
* @return true if we were signalled, false if the deadline elapsed
* @throws InterruptedException if interrupted
*/
boolean awaitUntil(long nanoTimeDeadline) throws InterruptedException;
/**
* Await until the deadline (in nanoTime), throwing any interrupt as an unchecked exception.
* No spurious wakeups.
* @return true if we were signalled, false if the deadline elapsed
* @throws UncheckedInterruptedException if interrupted
*/
boolean awaitUntilThrowUncheckedOnInterrupt(long nanoTimeDeadline) throws UncheckedInterruptedException;
/**
* Await until the deadline (in nanoTime), ignoring interrupts (but maintaining the interrupt flag on exit).
* No spurious wakeups.
* @return true if we were signalled, false if the deadline elapsed
*/
boolean awaitUntilUninterruptibly(long nanoTimeDeadline);
/**
* Await for the specified period, throwing any interrupt.
* No spurious wakeups.
* @return true if we were signalled, false if the timeout elapses
* @throws InterruptedException if interrupted
*/
boolean await(long time, TimeUnit units) throws InterruptedException;
/**
* Await for the specified period, throwing any interrupt as an unchecked exception.
* No spurious wakeups.
* @return true if we were signalled, false if the timeout elapses
* @throws UncheckedInterruptedException if interrupted
*/
boolean awaitThrowUncheckedOnInterrupt(long time, TimeUnit units) throws UncheckedInterruptedException;
/**
* Await until the deadline (in nanoTime), ignoring interrupts (but maintaining the interrupt flag on exit).
* No spurious wakeups.
* @return true if we were signalled, false if the timeout elapses
*/
boolean awaitUninterruptibly(long time, TimeUnit units);
/**
* Await indefinitely, throwing any interrupt.
* No spurious wakeups.
* @throws InterruptedException if interrupted
*/
Awaitable await() throws InterruptedException;
/**
* Await indefinitely, throwing any interrupt as an unchecked exception.
* No spurious wakeups.
* @throws UncheckedInterruptedException if interrupted
*/
Awaitable awaitThrowUncheckedOnInterrupt() throws UncheckedInterruptedException;
/**
* Await indefinitely, ignoring interrupts (but maintaining the interrupt flag on exit).
* No spurious wakeups.
*/
Awaitable awaitUninterruptibly();
// we must declare the static implementation methods outside of the interface,
// so that they can be loaded by different classloaders during simulation
class Defaults
{
public static boolean await(Awaitable await, long time, TimeUnit unit) throws InterruptedException
{
return await.awaitUntil(nanoTime() + unit.toNanos(time));
}
public static boolean awaitThrowUncheckedOnInterrupt(Awaitable await, long time, TimeUnit units) throws UncheckedInterruptedException
{
return awaitUntilThrowUncheckedOnInterrupt(await, nanoTime() + units.toNanos(time));
}
public static boolean awaitUninterruptibly(Awaitable await, long time, TimeUnit units)
{
return awaitUntilUninterruptibly(await, nanoTime() + units.toNanos(time));
}
public static <A extends Awaitable> A awaitThrowUncheckedOnInterrupt(A await) throws UncheckedInterruptedException
{
try
{
await.await();
}
catch (InterruptedException e)
{
throw new UncheckedInterruptedException();
}
return await;
}
public static boolean awaitUntilThrowUncheckedOnInterrupt(Awaitable await, long nanoTimeDeadline) throws UncheckedInterruptedException
{
try
{
return await.awaitUntil(nanoTimeDeadline);
}
catch (InterruptedException e)
{
throw new UncheckedInterruptedException();
}
}
/**
* {@link Awaitable#awaitUntilUninterruptibly(long)}
*/
public static boolean awaitUntilUninterruptibly(Awaitable await, long nanoTimeDeadline)
{
boolean interrupted = false;
boolean result;
while (true)
{
try
{
result = await.awaitUntil(nanoTimeDeadline);
break;
}
catch (InterruptedException e)
{
interrupted = true;
}
}
if (interrupted)
Thread.currentThread().interrupt();
return result;
}
/**
* {@link Awaitable#awaitUninterruptibly()}
*/
public static <A extends Awaitable> A awaitUninterruptibly(A await)
{
boolean interrupted = false;
while (true)
{
try
{
await.await();
break;
}
catch (InterruptedException e)
{
interrupted = true;
}
}
if (interrupted)
Thread.currentThread().interrupt();
return await;
}
}
abstract class AbstractAwaitable implements Awaitable
{
protected AbstractAwaitable() {}
/**
* {@link Awaitable#await(long, TimeUnit)}
*/
@Override
public boolean await(long time, TimeUnit unit) throws InterruptedException
{
return Defaults.await(this, time, unit);
}
/**
* {@link Awaitable#awaitThrowUncheckedOnInterrupt(long, TimeUnit)}
*/
@Override
public boolean awaitThrowUncheckedOnInterrupt(long time, TimeUnit units) throws UncheckedInterruptedException
{
return Defaults.awaitThrowUncheckedOnInterrupt(this, time, units);
}
/**
* {@link Awaitable#awaitUninterruptibly(long, TimeUnit)}
*/
public boolean awaitUninterruptibly(long time, TimeUnit units)
{
return awaitUntilUninterruptibly(nanoTime() + units.toNanos(time));
}
/**
* {@link Awaitable#awaitThrowUncheckedOnInterrupt()}
*/
public Awaitable awaitThrowUncheckedOnInterrupt() throws UncheckedInterruptedException
{
return Defaults.awaitThrowUncheckedOnInterrupt(this);
}
/**
* {@link Awaitable#awaitUntilThrowUncheckedOnInterrupt(long)}
*/
public boolean awaitUntilThrowUncheckedOnInterrupt(long nanoTimeDeadline) throws UncheckedInterruptedException
{
return Defaults.awaitUntilThrowUncheckedOnInterrupt(this, nanoTimeDeadline);
}
/**
* {@link Awaitable#awaitUntilUninterruptibly(long)}
*/
public boolean awaitUntilUninterruptibly(long nanoTimeDeadline)
{
return Defaults.awaitUntilUninterruptibly(this, nanoTimeDeadline);
}
/**
* {@link Awaitable#awaitUninterruptibly()}
*/
public Awaitable awaitUninterruptibly()
{
return Defaults.awaitUninterruptibly(this);
}
}
/**
* A barebones asynchronous {@link Awaitable}.
* If your state is minimal, or can be updated concurrently, extend this class.
*/
abstract class AsyncAwaitable extends AbstractAwaitable
{
/**
* Maintain an internal variable containing a lazily-initialized wait queue
* @return null if is done
*/
@Inline
private static <A extends Awaitable> WaitQueue.Signal register(AtomicReferenceFieldUpdater<A, WaitQueue> waitingUpdater, Predicate<A> isDone, A awaitable)
{
if (isDone.test(awaitable))
return null;
WaitQueue waiting = waitingUpdater.get(awaitable);
if (waiting == null)
{
if (!waitingUpdater.compareAndSet(awaitable, null, waiting = newWaitQueue()))
{
waiting = waitingUpdater.get(awaitable);
if (waiting == null)
{
assert isDone.test(awaitable);
return null;
}
}
}
WaitQueue.Signal s = waiting.register();
if (!isDone.test(awaitable))
return s;
s.cancel();
return null;
}
@Inline
static <A extends Awaitable> A await(AtomicReferenceFieldUpdater<A, WaitQueue> waitingUpdater, Predicate<A> isDone, A awaitable) throws InterruptedException
{
WaitQueue.Signal s = register(waitingUpdater, isDone, awaitable);
if (s != null)
s.await();
return awaitable;
}
@Inline
static <A extends Awaitable> boolean awaitUntil(AtomicReferenceFieldUpdater<A, WaitQueue> waitingUpdater, Predicate<A> isDone, A awaitable, long nanoTimeDeadline) throws InterruptedException
{
WaitQueue.Signal s = register(waitingUpdater, isDone, awaitable);
return s == null || s.awaitUntil(nanoTimeDeadline) || isDone.test(awaitable);
}
@Inline
static <A extends Awaitable> void signalAll(AtomicReferenceFieldUpdater<A, WaitQueue> waitingUpdater, A awaitable)
{
WaitQueue waiting = waitingUpdater.get(awaitable);
if (waiting == null)
return;
waiting.signalAll();
waitingUpdater.lazySet(awaitable, null);
}
private static final AtomicReferenceFieldUpdater<AsyncAwaitable, WaitQueue> waitingUpdater = AtomicReferenceFieldUpdater.newUpdater(AsyncAwaitable.class, WaitQueue.class, "waiting");
private volatile WaitQueue waiting;
protected AsyncAwaitable() {}
/**
* {@link Awaitable#await()}
*/
public Awaitable await() throws InterruptedException
{
return await(waitingUpdater, AsyncAwaitable::isSignalled, this);
}
/**
* {@link Awaitable#awaitUntil(long)}
*/
public boolean awaitUntil(long nanoTimeDeadline) throws InterruptedException
{
return awaitUntil(waitingUpdater, AsyncAwaitable::isSignalled, this, nanoTimeDeadline);
}
/**
* Signal any waiting threads; {@link #isSignalled()} must return {@code true} before this method is invoked.
*/
protected void signal()
{
signalAll(waitingUpdater, this);
}
/**
* Return true once signalled. Unidirectional; once true, must never again be false.
*/
protected abstract boolean isSignalled();
}
/**
* A barebones {@link Awaitable} that uses mutual exclusion.
* If your state will be updated while holding the object monitor, extend this class.
*/
abstract class SyncAwaitable extends AbstractAwaitable
{
protected SyncAwaitable() {}
/**
* {@link Awaitable#await()}
*/
public synchronized Awaitable await() throws InterruptedException
{
while (!isSignalled())
wait();
return this;
}
/**
* {@link Awaitable#awaitUntil(long)}
*/
public synchronized boolean awaitUntil(long nanoTimeDeadline) throws InterruptedException
{
while (true)
{
if (isSignalled()) return true;
if (!waitUntil(this, nanoTimeDeadline)) return false;
}
}
/**
* Return true once signalled. Unidirectional; once true, must never again be false.
*/
protected abstract boolean isSignalled();
@Intercept
public static boolean waitUntil(Object monitor, long deadlineNanos) throws InterruptedException
{
long wait = deadlineNanos - nanoTime();
if (wait <= 0)
return false;
monitor.wait((wait + 999999) / 1000000);
return true;
}
}
}