| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.connector.base.source.reader.synchronization; |
| |
| import org.apache.flink.annotation.Internal; |
| import org.apache.flink.annotation.VisibleForTesting; |
| import org.apache.flink.util.FlinkRuntimeException; |
| |
| import javax.annotation.concurrent.GuardedBy; |
| |
| import java.lang.reflect.Field; |
| import java.util.ArrayDeque; |
| import java.util.Arrays; |
| import java.util.Queue; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CompletionException; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.locks.Condition; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReentrantLock; |
| |
| import static org.apache.flink.util.Preconditions.checkArgument; |
| |
| /** |
| * A custom implementation of blocking queue in combination with a {@link CompletableFuture} that is |
| * used in the hand-over of data from a producing thread to a consuming thread. This |
| * FutureCompletingBlockingQueue has the following features: |
| * |
| * <h3>Consumer Notifications</h3> |
| * |
| * <p>Rather than letting consumers block on the {@link #take()} method, or have them poll the |
| * {@link #poll()} method, this queue offers a {@link CompletableFuture}, obtained via the {@link |
| * #getAvailabilityFuture()} method) that gets completed whenever the queue is non-empty. A consumer |
| * can thus subscribe to asynchronous notifications for availability by adding a handler to the |
| * obtained {@code CompletableFuture}. |
| * |
| * <p>The future may also be completed by an explicit call to {@link #notifyAvailable()}. That way |
| * the consumer may be notified of a situation/condition without adding an element to the queue. |
| * |
| * <p>Availability is reset when a call to {@link #poll()} (or {@link #take()} finds an empty queue |
| * or results in an empty queue (takes the last element). |
| * |
| * <p>Note that this model generally assumes that <i>false positives</i> are okay, meaning that the |
| * availability future completes despite there being no data availabile in the queue. The consumer |
| * is responsible for polling data and obtaining another future to wait on. This is similar to the |
| * way that Java's Monitors and Conditions can have the <i>spurious wakeup</i> of the waiting |
| * threads and commonly need to be used in loop with the waiting condition. |
| * |
| * <h3>Producer Wakeup</h3> |
| * |
| * <p>The queue supports gracefully waking up producing threads that are blocked due to the queue |
| * capacity limits, without interrupting the thread. This is done via the {@link |
| * #wakeUpPuttingThread(int)} method. |
| * |
| * @param <T> the type of the elements in the queue. |
| */ |
| @Internal |
| public class FutureCompletingBlockingQueue<T> { |
| |
| /** |
| * A constant future that is complete, indicating availability. Using this constant in cases |
| * that are guaranteed available helps short-circuiting some checks and avoiding volatile memory |
| * operations. |
| */ |
| public static final CompletableFuture<Void> AVAILABLE = getAvailableFuture(); |
| |
| /** The default capacity for the queue. */ |
| private static final int DEFAULT_CAPACITY = 2; |
| |
| // ------------------------------------------------------------------------ |
| |
| /** The maximum capacity of the queue. */ |
| private final int capacity; |
| |
| /** |
| * The availability future. This doubles as a "non empty" condition. This value is never null. |
| */ |
| private CompletableFuture<Void> currentFuture; |
| |
| /** The lock for synchronization. */ |
| private final Lock lock; |
| |
| /** The element queue. */ |
| @GuardedBy("lock") |
| private final Queue<T> queue; |
| |
| /** The per-thread conditions that are waiting on putting elements. */ |
| @GuardedBy("lock") |
| private final Queue<Condition> notFull; |
| |
| /** The per-thread conditions and wakeUp flags. */ |
| @GuardedBy("lock") |
| private ConditionAndFlag[] putConditionAndFlags; |
| |
| public FutureCompletingBlockingQueue() { |
| this(DEFAULT_CAPACITY); |
| } |
| |
| public FutureCompletingBlockingQueue(int capacity) { |
| checkArgument(capacity > 0, "capacity must be > 0"); |
| this.capacity = capacity; |
| this.queue = new ArrayDeque<>(capacity); |
| this.lock = new ReentrantLock(); |
| this.putConditionAndFlags = new ConditionAndFlag[1]; |
| this.notFull = new ArrayDeque<>(); |
| |
| // initially the queue is empty and thus unavailable |
| this.currentFuture = new CompletableFuture<>(); |
| } |
| |
| // ------------------------------------------------------------------------ |
| // Future / Notification logic |
| // ------------------------------------------------------------------------ |
| |
| /** |
| * Returns the availability future. If the queue is non-empty, then this future will already be |
| * complete. Otherwise the obtained future is guaranteed to get completed the next time the |
| * queue becomes non-empty, or a notification happens via {@link #notifyAvailable()}. |
| * |
| * <p>It is important that a completed future is no guarantee that the next call to {@link |
| * #poll()} will return a non-null element. If there are concurrent consumer, another consumer |
| * may have taken the available element. Or there was no element in the first place, because the |
| * future was completed through a call to {@link #notifyAvailable()}. |
| * |
| * <p>For that reason, it is important to call this method (to obtain a new future) every time |
| * again after {@link #poll()} returned null and you want to wait for data. |
| */ |
| public CompletableFuture<Void> getAvailabilityFuture() { |
| return currentFuture; |
| } |
| |
| /** |
| * Makes sure the availability future is complete, if it is not complete already. All futures |
| * returned by previous calls to {@link #getAvailabilityFuture()} are guaranteed to be |
| * completed. |
| * |
| * <p>All future calls to the method will return a completed future, until the point that the |
| * availability is reset via calls to {@link #poll()} that leave the queue empty. |
| */ |
| public void notifyAvailable() { |
| lock.lock(); |
| try { |
| moveToAvailable(); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** Internal utility to make sure that the current future futures are complete (until reset). */ |
| @GuardedBy("lock") |
| private void moveToAvailable() { |
| final CompletableFuture<Void> current = currentFuture; |
| if (current != AVAILABLE) { |
| currentFuture = AVAILABLE; |
| current.complete(null); |
| } |
| } |
| |
| /** Makes sure the availability future is incomplete, if it was complete before. */ |
| @GuardedBy("lock") |
| private void moveToUnAvailable() { |
| if (currentFuture == AVAILABLE) { |
| currentFuture = new CompletableFuture<>(); |
| } |
| } |
| |
| // ------------------------------------------------------------------------ |
| // Blocking Queue Logic |
| // ------------------------------------------------------------------------ |
| |
| /** |
| * Put an element into the queue. The thread blocks if the queue is full. |
| * |
| * @param threadIndex the index of the thread. |
| * @param element the element to put. |
| * @return true if the element has been successfully put into the queue, false otherwise. |
| * @throws InterruptedException when the thread is interrupted. |
| */ |
| public boolean put(int threadIndex, T element) throws InterruptedException { |
| if (element == null) { |
| throw new NullPointerException(); |
| } |
| lock.lockInterruptibly(); |
| try { |
| while (queue.size() >= capacity) { |
| if (getAndResetWakeUpFlag(threadIndex)) { |
| return false; |
| } |
| waitOnPut(threadIndex); |
| } |
| enqueue(element); |
| return true; |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * <b>Warning:</b> This is a dangerous method and should only be used for testing convenience. A |
| * method that blocks until availability does not go together well with the concept of |
| * asynchronous notifications and non-blocking polling. |
| * |
| * <p>Get and remove the first element from the queue. The call blocks if the queue is empty. |
| * The problem with this method is that it may loop internally until an element is available and |
| * that way eagerly reset the availability future. If a consumer thread is blocked in taking an |
| * element, it will receive availability notifications from {@link #notifyAvailable()} and |
| * immediately reset them by calling {@link #poll()} and finding the queue empty. |
| * |
| * @return the first element in the queue. |
| * @throws InterruptedException when the thread is interrupted. |
| */ |
| @VisibleForTesting |
| public T take() throws InterruptedException { |
| T next; |
| while ((next = poll()) == null) { |
| // use the future to wait for availability to avoid busy waiting |
| try { |
| getAvailabilityFuture().get(); |
| } catch (ExecutionException | CompletionException e) { |
| // this should never happen, but we propagate just in case |
| throw new FlinkRuntimeException("exception in queue future completion", e); |
| } |
| } |
| return next; |
| } |
| |
| /** |
| * Get and remove the first element from the queue. Null is returned if the queue is empty. If |
| * this makes the queue empty (takes the last element) or finds the queue already empty, then |
| * this resets the availability notifications. The next call to {@link #getAvailabilityFuture()} |
| * will then return a non-complete future that completes only the next time that the queue |
| * becomes non-empty or the {@link #notifyAvailable()} method is called. |
| * |
| * @return the first element from the queue, or Null if the queue is empty. |
| */ |
| public T poll() { |
| lock.lock(); |
| try { |
| if (queue.size() == 0) { |
| moveToUnAvailable(); |
| return null; |
| } |
| return dequeue(); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * Get the first element from the queue without removing it. |
| * |
| * @return the first element in the queue, or Null if the queue is empty. |
| */ |
| public T peek() { |
| lock.lock(); |
| try { |
| return queue.peek(); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** Gets the size of the queue. */ |
| public int size() { |
| lock.lock(); |
| try { |
| return queue.size(); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** Checks whether the queue is empty. */ |
| public boolean isEmpty() { |
| lock.lock(); |
| try { |
| return queue.isEmpty(); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * Checks the remaining capacity in the queue. That is the difference between the maximum |
| * capacity and the current number of elements in the queue. |
| */ |
| public int remainingCapacity() { |
| lock.lock(); |
| try { |
| return capacity - queue.size(); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * Gracefully wakes up the thread with the given {@code threadIndex} if it is blocked in adding |
| * an element. to the queue. If the thread is blocked in {@link #put(int, Object)} it will |
| * immediately return from the method with a return value of false. |
| * |
| * <p>If this method is called, the next time the thread with the given index is about to be |
| * blocked in adding an element, it may immediately wake up and return. |
| * |
| * @param threadIndex The number identifying the thread. |
| */ |
| public void wakeUpPuttingThread(int threadIndex) { |
| lock.lock(); |
| try { |
| maybeCreateCondition(threadIndex); |
| ConditionAndFlag caf = putConditionAndFlags[threadIndex]; |
| if (caf != null) { |
| caf.setWakeUp(true); |
| caf.condition().signal(); |
| } |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| // --------------- private helpers ------------------------- |
| |
| @GuardedBy("lock") |
| private void enqueue(T element) { |
| final int sizeBefore = queue.size(); |
| queue.add(element); |
| if (sizeBefore == 0) { |
| moveToAvailable(); |
| } |
| if (sizeBefore < capacity - 1 && !notFull.isEmpty()) { |
| signalNextPutter(); |
| } |
| } |
| |
| @GuardedBy("lock") |
| private T dequeue() { |
| final int sizeBefore = queue.size(); |
| final T element = queue.poll(); |
| if (sizeBefore == capacity && !notFull.isEmpty()) { |
| signalNextPutter(); |
| } |
| if (queue.isEmpty()) { |
| moveToUnAvailable(); |
| } |
| return element; |
| } |
| |
| @GuardedBy("lock") |
| private void waitOnPut(int fetcherIndex) throws InterruptedException { |
| maybeCreateCondition(fetcherIndex); |
| Condition cond = putConditionAndFlags[fetcherIndex].condition(); |
| notFull.add(cond); |
| cond.await(); |
| } |
| |
| @GuardedBy("lock") |
| private void signalNextPutter() { |
| if (!notFull.isEmpty()) { |
| notFull.poll().signal(); |
| } |
| } |
| |
| @GuardedBy("lock") |
| private void maybeCreateCondition(int threadIndex) { |
| if (putConditionAndFlags.length < threadIndex + 1) { |
| putConditionAndFlags = Arrays.copyOf(putConditionAndFlags, threadIndex + 1); |
| } |
| |
| if (putConditionAndFlags[threadIndex] == null) { |
| putConditionAndFlags[threadIndex] = new ConditionAndFlag(lock.newCondition()); |
| } |
| } |
| |
| @GuardedBy("lock") |
| private boolean getAndResetWakeUpFlag(int threadIndex) { |
| maybeCreateCondition(threadIndex); |
| if (putConditionAndFlags[threadIndex].getWakeUp()) { |
| putConditionAndFlags[threadIndex].setWakeUp(false); |
| return true; |
| } |
| return false; |
| } |
| |
| // --------------- private per thread state ------------ |
| |
| private static class ConditionAndFlag { |
| private final Condition cond; |
| private boolean wakeUp; |
| |
| private ConditionAndFlag(Condition cond) { |
| this.cond = cond; |
| this.wakeUp = false; |
| } |
| |
| private Condition condition() { |
| return cond; |
| } |
| |
| private boolean getWakeUp() { |
| return wakeUp; |
| } |
| |
| private void setWakeUp(boolean value) { |
| wakeUp = value; |
| } |
| } |
| |
| // ------------------------------------------------------------------------ |
| // utilities |
| // ------------------------------------------------------------------------ |
| |
| @SuppressWarnings("unchecked") |
| private static CompletableFuture<Void> getAvailableFuture() { |
| // this is a way to obtain the AvailabilityProvider.AVAILABLE future until we decide to |
| // move the class from the runtime module to the core module |
| try { |
| final Class<?> clazz = |
| Class.forName("org.apache.flink.runtime.io.AvailabilityProvider"); |
| final Field field = clazz.getDeclaredField("AVAILABLE"); |
| return (CompletableFuture<Void>) field.get(null); |
| } catch (Throwable t) { |
| return CompletableFuture.completedFuture(null); |
| } |
| } |
| } |