blob: 9977bcda8d113f4cbb8e28c61c67c4139a53ecef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.base.source.reader.synchronization;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.FlinkRuntimeException;
import javax.annotation.concurrent.GuardedBy;
import java.lang.reflect.Field;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.flink.util.Preconditions.checkArgument;
/**
* A custom implementation of blocking queue in combination with a {@link CompletableFuture} that is
* used in the hand-over of data from a producing thread to a consuming thread. This
* FutureCompletingBlockingQueue has the following features:
*
* <h3>Consumer Notifications</h3>
*
* <p>Rather than letting consumers block on the {@link #take()} method, or have them poll the
* {@link #poll()} method, this queue offers a {@link CompletableFuture}, obtained via the {@link
* #getAvailabilityFuture()} method) that gets completed whenever the queue is non-empty. A consumer
* can thus subscribe to asynchronous notifications for availability by adding a handler to the
* obtained {@code CompletableFuture}.
*
* <p>The future may also be completed by an explicit call to {@link #notifyAvailable()}. That way
* the consumer may be notified of a situation/condition without adding an element to the queue.
*
* <p>Availability is reset when a call to {@link #poll()} (or {@link #take()} finds an empty queue
* or results in an empty queue (takes the last element).
*
* <p>Note that this model generally assumes that <i>false positives</i> are okay, meaning that the
* availability future completes despite there being no data availabile in the queue. The consumer
* is responsible for polling data and obtaining another future to wait on. This is similar to the
* way that Java's Monitors and Conditions can have the <i>spurious wakeup</i> of the waiting
* threads and commonly need to be used in loop with the waiting condition.
*
* <h3>Producer Wakeup</h3>
*
* <p>The queue supports gracefully waking up producing threads that are blocked due to the queue
* capacity limits, without interrupting the thread. This is done via the {@link
* #wakeUpPuttingThread(int)} method.
*
* @param <T> the type of the elements in the queue.
*/
@Internal
public class FutureCompletingBlockingQueue<T> {
/**
* A constant future that is complete, indicating availability. Using this constant in cases
* that are guaranteed available helps short-circuiting some checks and avoiding volatile memory
* operations.
*/
public static final CompletableFuture<Void> AVAILABLE = getAvailableFuture();
/** The default capacity for the queue. */
private static final int DEFAULT_CAPACITY = 2;
// ------------------------------------------------------------------------
/** The maximum capacity of the queue. */
private final int capacity;
/**
* The availability future. This doubles as a "non empty" condition. This value is never null.
*/
private CompletableFuture<Void> currentFuture;
/** The lock for synchronization. */
private final Lock lock;
/** The element queue. */
@GuardedBy("lock")
private final Queue<T> queue;
/** The per-thread conditions that are waiting on putting elements. */
@GuardedBy("lock")
private final Queue<Condition> notFull;
/** The per-thread conditions and wakeUp flags. */
@GuardedBy("lock")
private ConditionAndFlag[] putConditionAndFlags;
public FutureCompletingBlockingQueue() {
this(DEFAULT_CAPACITY);
}
public FutureCompletingBlockingQueue(int capacity) {
checkArgument(capacity > 0, "capacity must be > 0");
this.capacity = capacity;
this.queue = new ArrayDeque<>(capacity);
this.lock = new ReentrantLock();
this.putConditionAndFlags = new ConditionAndFlag[1];
this.notFull = new ArrayDeque<>();
// initially the queue is empty and thus unavailable
this.currentFuture = new CompletableFuture<>();
}
// ------------------------------------------------------------------------
// Future / Notification logic
// ------------------------------------------------------------------------
/**
* Returns the availability future. If the queue is non-empty, then this future will already be
* complete. Otherwise the obtained future is guaranteed to get completed the next time the
* queue becomes non-empty, or a notification happens via {@link #notifyAvailable()}.
*
* <p>It is important that a completed future is no guarantee that the next call to {@link
* #poll()} will return a non-null element. If there are concurrent consumer, another consumer
* may have taken the available element. Or there was no element in the first place, because the
* future was completed through a call to {@link #notifyAvailable()}.
*
* <p>For that reason, it is important to call this method (to obtain a new future) every time
* again after {@link #poll()} returned null and you want to wait for data.
*/
public CompletableFuture<Void> getAvailabilityFuture() {
return currentFuture;
}
/**
* Makes sure the availability future is complete, if it is not complete already. All futures
* returned by previous calls to {@link #getAvailabilityFuture()} are guaranteed to be
* completed.
*
* <p>All future calls to the method will return a completed future, until the point that the
* availability is reset via calls to {@link #poll()} that leave the queue empty.
*/
public void notifyAvailable() {
lock.lock();
try {
moveToAvailable();
} finally {
lock.unlock();
}
}
/** Internal utility to make sure that the current future futures are complete (until reset). */
@GuardedBy("lock")
private void moveToAvailable() {
final CompletableFuture<Void> current = currentFuture;
if (current != AVAILABLE) {
currentFuture = AVAILABLE;
current.complete(null);
}
}
/** Makes sure the availability future is incomplete, if it was complete before. */
@GuardedBy("lock")
private void moveToUnAvailable() {
if (currentFuture == AVAILABLE) {
currentFuture = new CompletableFuture<>();
}
}
// ------------------------------------------------------------------------
// Blocking Queue Logic
// ------------------------------------------------------------------------
/**
* Put an element into the queue. The thread blocks if the queue is full.
*
* @param threadIndex the index of the thread.
* @param element the element to put.
* @return true if the element has been successfully put into the queue, false otherwise.
* @throws InterruptedException when the thread is interrupted.
*/
public boolean put(int threadIndex, T element) throws InterruptedException {
if (element == null) {
throw new NullPointerException();
}
lock.lockInterruptibly();
try {
while (queue.size() >= capacity) {
if (getAndResetWakeUpFlag(threadIndex)) {
return false;
}
waitOnPut(threadIndex);
}
enqueue(element);
return true;
} finally {
lock.unlock();
}
}
/**
* <b>Warning:</b> This is a dangerous method and should only be used for testing convenience. A
* method that blocks until availability does not go together well with the concept of
* asynchronous notifications and non-blocking polling.
*
* <p>Get and remove the first element from the queue. The call blocks if the queue is empty.
* The problem with this method is that it may loop internally until an element is available and
* that way eagerly reset the availability future. If a consumer thread is blocked in taking an
* element, it will receive availability notifications from {@link #notifyAvailable()} and
* immediately reset them by calling {@link #poll()} and finding the queue empty.
*
* @return the first element in the queue.
* @throws InterruptedException when the thread is interrupted.
*/
@VisibleForTesting
public T take() throws InterruptedException {
T next;
while ((next = poll()) == null) {
// use the future to wait for availability to avoid busy waiting
try {
getAvailabilityFuture().get();
} catch (ExecutionException | CompletionException e) {
// this should never happen, but we propagate just in case
throw new FlinkRuntimeException("exception in queue future completion", e);
}
}
return next;
}
/**
* Get and remove the first element from the queue. Null is returned if the queue is empty. If
* this makes the queue empty (takes the last element) or finds the queue already empty, then
* this resets the availability notifications. The next call to {@link #getAvailabilityFuture()}
* will then return a non-complete future that completes only the next time that the queue
* becomes non-empty or the {@link #notifyAvailable()} method is called.
*
* @return the first element from the queue, or Null if the queue is empty.
*/
public T poll() {
lock.lock();
try {
if (queue.size() == 0) {
moveToUnAvailable();
return null;
}
return dequeue();
} finally {
lock.unlock();
}
}
/**
* Get the first element from the queue without removing it.
*
* @return the first element in the queue, or Null if the queue is empty.
*/
public T peek() {
lock.lock();
try {
return queue.peek();
} finally {
lock.unlock();
}
}
/** Gets the size of the queue. */
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
/** Checks whether the queue is empty. */
public boolean isEmpty() {
lock.lock();
try {
return queue.isEmpty();
} finally {
lock.unlock();
}
}
/**
* Checks the remaining capacity in the queue. That is the difference between the maximum
* capacity and the current number of elements in the queue.
*/
public int remainingCapacity() {
lock.lock();
try {
return capacity - queue.size();
} finally {
lock.unlock();
}
}
/**
* Gracefully wakes up the thread with the given {@code threadIndex} if it is blocked in adding
* an element. to the queue. If the thread is blocked in {@link #put(int, Object)} it will
* immediately return from the method with a return value of false.
*
* <p>If this method is called, the next time the thread with the given index is about to be
* blocked in adding an element, it may immediately wake up and return.
*
* @param threadIndex The number identifying the thread.
*/
public void wakeUpPuttingThread(int threadIndex) {
lock.lock();
try {
maybeCreateCondition(threadIndex);
ConditionAndFlag caf = putConditionAndFlags[threadIndex];
if (caf != null) {
caf.setWakeUp(true);
caf.condition().signal();
}
} finally {
lock.unlock();
}
}
// --------------- private helpers -------------------------
@GuardedBy("lock")
private void enqueue(T element) {
final int sizeBefore = queue.size();
queue.add(element);
if (sizeBefore == 0) {
moveToAvailable();
}
if (sizeBefore < capacity - 1 && !notFull.isEmpty()) {
signalNextPutter();
}
}
@GuardedBy("lock")
private T dequeue() {
final int sizeBefore = queue.size();
final T element = queue.poll();
if (sizeBefore == capacity && !notFull.isEmpty()) {
signalNextPutter();
}
if (queue.isEmpty()) {
moveToUnAvailable();
}
return element;
}
@GuardedBy("lock")
private void waitOnPut(int fetcherIndex) throws InterruptedException {
maybeCreateCondition(fetcherIndex);
Condition cond = putConditionAndFlags[fetcherIndex].condition();
notFull.add(cond);
cond.await();
}
@GuardedBy("lock")
private void signalNextPutter() {
if (!notFull.isEmpty()) {
notFull.poll().signal();
}
}
@GuardedBy("lock")
private void maybeCreateCondition(int threadIndex) {
if (putConditionAndFlags.length < threadIndex + 1) {
putConditionAndFlags = Arrays.copyOf(putConditionAndFlags, threadIndex + 1);
}
if (putConditionAndFlags[threadIndex] == null) {
putConditionAndFlags[threadIndex] = new ConditionAndFlag(lock.newCondition());
}
}
@GuardedBy("lock")
private boolean getAndResetWakeUpFlag(int threadIndex) {
maybeCreateCondition(threadIndex);
if (putConditionAndFlags[threadIndex].getWakeUp()) {
putConditionAndFlags[threadIndex].setWakeUp(false);
return true;
}
return false;
}
// --------------- private per thread state ------------
private static class ConditionAndFlag {
private final Condition cond;
private boolean wakeUp;
private ConditionAndFlag(Condition cond) {
this.cond = cond;
this.wakeUp = false;
}
private Condition condition() {
return cond;
}
private boolean getWakeUp() {
return wakeUp;
}
private void setWakeUp(boolean value) {
wakeUp = value;
}
}
// ------------------------------------------------------------------------
// utilities
// ------------------------------------------------------------------------
@SuppressWarnings("unchecked")
private static CompletableFuture<Void> getAvailableFuture() {
// this is a way to obtain the AvailabilityProvider.AVAILABLE future until we decide to
// move the class from the runtime module to the core module
try {
final Class<?> clazz =
Class.forName("org.apache.flink.runtime.io.AvailabilityProvider");
final Field field = clazz.getDeclaredField("AVAILABLE");
return (CompletableFuture<Void>) field.get(null);
} catch (Throwable t) {
return CompletableFuture.completedFuture(null);
}
}
}