blob: be271b6dbd298895391703bef4dab89f0b6abadb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.utils.concurrent;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.LockSupport;
import com.codahale.metrics.Timer;
/**
* <p>A relatively easy to use utility for general purpose thread signalling.</p>
* <p>Usage on a thread awaiting a state change using a WaitQueue q is:</p>
* <pre>
* {@code
* while (!conditionMet())
* Signal s = q.register();
* if (!conditionMet()) // or, perhaps more correctly, !conditionChanged()
* s.await();
* else
* s.cancel();
* }
* </pre>
* A signalling thread, AFTER changing the state, then calls q.signal() to wake up one, or q.signalAll()
* to wake up all, waiting threads.
* <p>To understand intuitively how this class works, the idea is simply that a thread, once it considers itself
* incapable of making progress, registers to be awoken once that changes. Since this could have changed between
* checking and registering (in which case the thread that made this change would have been unable to signal it),
* it checks the condition again, sleeping only if it hasn't changed/still is not met.</p>
* <p>This thread synchronisation scheme has some advantages over Condition objects and Object.wait/notify in that no monitor
* acquisition is necessary and, in fact, besides the actual waiting on a signal, all operations are non-blocking.
* As a result consumers can never block producers, nor each other, or vice versa, from making progress.
* Threads that are signalled are also put into a RUNNABLE state almost simultaneously, so they can all immediately make
* progress without having to serially acquire the monitor/lock, reducing scheduler delay incurred.</p>
*
* <p>A few notes on utilisation:</p>
* <p>1. A thread will only exit await() when it has been signalled, but this does not guarantee the condition has not
* been altered since it was signalled, and depending on your design it is likely the outer condition will need to be
* checked in a loop, though this is not always the case.</p>
* <p>2. Each signal is single use, so must be re-registered after each await(). This is true even if it times out.</p>
* <p>3. If you choose not to wait on the signal (because the condition has been met before you waited on it)
* you must cancel() the signal if the signalling thread uses signal() to awake waiters; otherwise signals will be
* lost. If signalAll() is used but infrequent, and register() is frequent, cancel() should still be used to prevent the
* queue growing unboundedly. Similarly, if you provide a TimerContext, cancel should be used to ensure it is not erroneously
* counted towards wait time.</p>
* <p>4. Care must be taken when selecting conditionMet() to ensure we are waiting on the condition that actually
* indicates progress is possible. In some complex cases it may be tempting to wait on a condition that is only indicative
* of local progress, not progress on the task we are aiming to complete, and a race may leave us waiting for a condition
* to be met that we no longer need.
* <p>5. This scheme is not fair</p>
* <p>6. Only the thread that calls register() may call await()</p>
*/
public final class WaitQueue
{
private static final int CANCELLED = -1;
private static final int SIGNALLED = 1;
private static final int NOT_SET = 0;
private static final AtomicIntegerFieldUpdater signalledUpdater = AtomicIntegerFieldUpdater.newUpdater(RegisteredSignal.class, "state");
// the waiting signals
private final ConcurrentLinkedQueue<RegisteredSignal> queue = new ConcurrentLinkedQueue<>();
/**
* The calling thread MUST be the thread that uses the signal
* @return x
*/
public Signal register()
{
RegisteredSignal signal = new RegisteredSignal();
queue.add(signal);
return signal;
}
/**
* The calling thread MUST be the thread that uses the signal.
* If the Signal is waited on, context.stop() will be called when the wait times out, the Signal is signalled,
* or the waiting thread is interrupted.
* @return
*/
public Signal register(Timer.Context context)
{
assert context != null;
RegisteredSignal signal = new TimedSignal(context);
queue.add(signal);
return signal;
}
/**
* Signal one waiting thread
*/
public boolean signal()
{
if (!hasWaiters())
return false;
while (true)
{
RegisteredSignal s = queue.poll();
if (s == null || s.signal() != null)
return s != null;
}
}
/**
* Signal all waiting threads
*/
public void signalAll()
{
if (!hasWaiters())
return;
// to avoid a race where the condition is not met and the woken thread managed to wait on the queue before
// we finish signalling it all, we pick a random thread we have woken-up and hold onto it, so that if we encounter
// it again we know we're looping. We reselect a random thread periodically, progressively less often.
// the "correct" solution to this problem is to use a queue that permits snapshot iteration, but this solution is sufficient
int i = 0, s = 5;
Thread randomThread = null;
Iterator<RegisteredSignal> iter = queue.iterator();
while (iter.hasNext())
{
RegisteredSignal signal = iter.next();
Thread signalled = signal.signal();
if (signalled != null)
{
if (signalled == randomThread)
break;
if (++i == s)
{
randomThread = signalled;
s <<= 1;
}
}
iter.remove();
}
}
private void cleanUpCancelled()
{
// TODO: attempt to remove the cancelled from the beginning only (need atomic cas of head)
Iterator<RegisteredSignal> iter = queue.iterator();
while (iter.hasNext())
{
RegisteredSignal s = iter.next();
if (s.isCancelled())
iter.remove();
}
}
public boolean hasWaiters()
{
return !queue.isEmpty();
}
/**
* Return how many threads are waiting
* @return
*/
public int getWaiting()
{
if (!hasWaiters())
return 0;
Iterator<RegisteredSignal> iter = queue.iterator();
int count = 0;
while (iter.hasNext())
{
Signal next = iter.next();
if (!next.isCancelled())
count++;
}
return count;
}
/**
* A Signal is a one-time-use mechanism for a thread to wait for notification that some condition
* state has transitioned that it may be interested in (and hence should check if it is).
* It is potentially transient, i.e. the state can change in the meantime, it only indicates
* that it should be checked, not necessarily anything about what the expected state should be.
*
* Signal implementations should never wake up spuriously, they are always woken up by a
* signal() or signalAll().
*
* This abstract definition of Signal does not need to be tied to a WaitQueue.
* Whilst RegisteredSignal is the main building block of Signals, this abstract
* definition allows us to compose Signals in useful ways. The Signal is 'owned' by the
* thread that registered itself with WaitQueue(s) to obtain the underlying RegisteredSignal(s);
* only the owning thread should use a Signal.
*/
public static interface Signal
{
/**
* @return true if signalled; once true, must be discarded by the owning thread.
*/
public boolean isSignalled();
/**
* @return true if cancelled; once cancelled, must be discarded by the owning thread.
*/
public boolean isCancelled();
/**
* @return isSignalled() || isCancelled(). Once true, the state is fixed and the Signal should be discarded
* by the owning thread.
*/
public boolean isSet();
/**
* atomically: cancels the Signal if !isSet(), or returns true if isSignalled()
*
* @return true if isSignalled()
*/
public boolean checkAndClear();
/**
* Should only be called by the owning thread. Indicates the signal can be retired,
* and if signalled propagates the signal to another waiting thread
*/
public abstract void cancel();
/**
* Wait, without throwing InterruptedException, until signalled. On exit isSignalled() must be true.
* If the thread is interrupted in the meantime, the interrupted flag will be set.
*/
public void awaitUninterruptibly();
/**
* Wait until signalled, or throw an InterruptedException if interrupted before this happens.
* On normal exit isSignalled() must be true; however if InterruptedException is thrown isCancelled()
* will be true.
* @throws InterruptedException
*/
public void await() throws InterruptedException;
/**
* Wait until signalled, or the provided time is reached, or the thread is interrupted. If signalled,
* isSignalled() will be true on exit, and the method will return true; if timedout, the method will return
* false and isCancelled() will be true; if interrupted an InterruptedException will be thrown and isCancelled()
* will be true.
* @param nanos System.nanoTime() to wait until
* @return true if signalled, false if timed out
* @throws InterruptedException
*/
public boolean awaitUntil(long nanos) throws InterruptedException;
}
/**
* An abstract signal implementation
*/
public static abstract class AbstractSignal implements Signal
{
public void awaitUninterruptibly()
{
boolean interrupted = false;
while (!isSignalled())
{
if (Thread.interrupted())
interrupted = true;
LockSupport.park();
}
if (interrupted)
Thread.currentThread().interrupt();
checkAndClear();
}
public void await() throws InterruptedException
{
while (!isSignalled())
{
checkInterrupted();
LockSupport.park();
}
checkAndClear();
}
public boolean awaitUntil(long until) throws InterruptedException
{
long now;
while (until > (now = System.nanoTime()) && !isSignalled())
{
checkInterrupted();
long delta = until - now;
LockSupport.parkNanos(delta);
}
return checkAndClear();
}
private void checkInterrupted() throws InterruptedException
{
if (Thread.interrupted())
{
cancel();
throw new InterruptedException();
}
}
}
/**
* A signal registered with this WaitQueue
*/
private class RegisteredSignal extends AbstractSignal
{
private volatile Thread thread = Thread.currentThread();
volatile int state;
public boolean isSignalled()
{
return state == SIGNALLED;
}
public boolean isCancelled()
{
return state == CANCELLED;
}
public boolean isSet()
{
return state != NOT_SET;
}
private Thread signal()
{
if (!isSet() && signalledUpdater.compareAndSet(this, NOT_SET, SIGNALLED))
{
Thread thread = this.thread;
LockSupport.unpark(thread);
this.thread = null;
return thread;
}
return null;
}
public boolean checkAndClear()
{
if (!isSet() && signalledUpdater.compareAndSet(this, NOT_SET, CANCELLED))
{
thread = null;
cleanUpCancelled();
return false;
}
// must now be signalled assuming correct API usage
return true;
}
/**
* Should only be called by the registered thread. Indicates the signal can be retired,
* and if signalled propagates the signal to another waiting thread
*/
public void cancel()
{
if (isCancelled())
return;
if (!signalledUpdater.compareAndSet(this, NOT_SET, CANCELLED))
{
// must already be signalled - switch to cancelled and
state = CANCELLED;
// propagate the signal
WaitQueue.this.signal();
}
thread = null;
cleanUpCancelled();
}
}
/**
* A RegisteredSignal that stores a TimerContext, and stops the timer when either cancelled or
* finished waiting. i.e. if the timer is started when the signal is registered it tracks the
* time in between registering and invalidating the signal.
*/
private final class TimedSignal extends RegisteredSignal
{
private final Timer.Context context;
private TimedSignal(Timer.Context context)
{
this.context = context;
}
@Override
public boolean checkAndClear()
{
context.stop();
return super.checkAndClear();
}
@Override
public void cancel()
{
if (!isCancelled())
{
context.stop();
super.cancel();
}
}
}
/**
* An abstract signal wrapping multiple delegate signals
*/
private abstract static class MultiSignal extends AbstractSignal
{
final Signal[] signals;
protected MultiSignal(Signal[] signals)
{
this.signals = signals;
}
public boolean isCancelled()
{
for (Signal signal : signals)
if (!signal.isCancelled())
return false;
return true;
}
public boolean checkAndClear()
{
for (Signal signal : signals)
signal.checkAndClear();
return isSignalled();
}
public void cancel()
{
for (Signal signal : signals)
signal.cancel();
}
}
/**
* A Signal that wraps multiple Signals and returns when any single one of them would have returned
*/
private static class AnySignal extends MultiSignal
{
protected AnySignal(Signal ... signals)
{
super(signals);
}
public boolean isSignalled()
{
for (Signal signal : signals)
if (signal.isSignalled())
return true;
return false;
}
public boolean isSet()
{
for (Signal signal : signals)
if (signal.isSet())
return true;
return false;
}
}
/**
* A Signal that wraps multiple Signals and returns when all of them would have finished returning
*/
private static class AllSignal extends MultiSignal
{
protected AllSignal(Signal ... signals)
{
super(signals);
}
public boolean isSignalled()
{
for (Signal signal : signals)
if (!signal.isSignalled())
return false;
return true;
}
public boolean isSet()
{
for (Signal signal : signals)
if (!signal.isSet())
return false;
return true;
}
}
/**
* @param signals
* @return a signal that returns only when any of the provided signals would have returned
*/
public static Signal any(Signal ... signals)
{
return new AnySignal(signals);
}
/**
* @param signals
* @return a signal that returns only when all provided signals would have returned
*/
public static Signal all(Signal ... signals)
{
return new AllSignal(signals);
}
}