| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.sentry.provider.db.service.persistent; |
| |
| import org.apache.http.annotation.ThreadSafe; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.util.concurrent.PriorityBlockingQueue; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| /** |
| * Waiting for counter to reach certain value. |
| * The counter starts from zero and its value increases over time. |
| * The class allows for multiple consumers waiting until the value of the |
| * counter reaches some value interesting to them. |
| * Consumers call {@link #waitFor(long)} which may either return |
| * immediately if the counter reached the specified value, or block |
| * until this value is reached. Consumers can also specify timeout for the |
| * {@link #waitFor(long)} in which case it may return {@link TimeoutException} |
| * when the wait was not successfull within the specified time limit. |
| * <p> |
| * All waiters should be waken up when the counter becomes equal or higher |
| * then the value they are waiting for. |
| * <p> |
| * The counter is updated by a single updater that should only increase the |
| * counter value. |
| * The updater calls the {@link #update(long)} method to update the counter |
| * value and this should wake up all threads waiting for any value smaller or |
| * equal to the new one. |
| * <p> |
| * The class is thread-safe. |
| * It is designed for use by multiple waiter threads and a single |
| * updater thread, but it will work correctly even in the presence of multiple |
| * updater threads. |
| */ |
| @ThreadSafe |
| public class CounterWait { |
| // Implementation notes. |
| // |
| // The implementation is based on: |
| // |
| // 1) Using an atomic counter value which guarantees consistency. |
| // Since everyone needs only to know when the counter value reached the |
| // certain value and the counter may only increase its value, |
| // it is safe to update the counter by another thread after its value |
| // was read. |
| // |
| // 2) Priority queue of waiters, sorted by their expected values. The smallest |
| // value is always at the top of the queue. The priority queue itself |
| // is thread-safe, so no locks are needed to protect access to it. |
| // |
| // Each waiter is implemented using a binary semaphore. |
| // This solves the problem of a wakeup that happens before the sleep - |
| // in this case the acquire() doesn't block and returns immediately. |
| // |
| // NOTE: We use PriorityBlockingQueue for waiters because it is thread-safe, |
| // we are not using its blocking queue semantics. |
| |
| private static final Logger LOGGER = LoggerFactory.getLogger(CounterWait.class); |
| |
| /** Counter value. May only increase. */ |
| private final AtomicLong currentId = new AtomicLong(0); |
| |
| private final long waitTimeout; |
| private final TimeUnit waitTimeUnit; |
| |
| /** |
| * Waiters sorted by the value of the counter they are waiting for. |
| * Note that {@link PriorityBlockingQueue} is thread-safe. |
| * We are not using this as a blocking queue, but as a synchronized |
| * PriorityQueue. |
| */ |
| private final PriorityBlockingQueue<ValueEvent> waiters = |
| new PriorityBlockingQueue<>(); |
| |
| /** |
| * Create an instance of CounterWait object that will not timeout during wait |
| */ |
| public CounterWait() { |
| this(0, TimeUnit.SECONDS); |
| } |
| |
| /** |
| * Create an instance of CounterWait object that will timeout during wait |
| * @param waitTimeoutSec maximum time in seconds to wait for counter |
| */ |
| public CounterWait(long waitTimeoutSec) { |
| this(waitTimeoutSec, TimeUnit.SECONDS); |
| } |
| |
| /** |
| * Create an instance of CounterWait object that will timeout during wait |
| * @param waitTimeout maximum time to wait for counter |
| * @param waitTimeUnit time units for wait |
| */ |
| public CounterWait(long waitTimeout, TimeUnit waitTimeUnit) { |
| this.waitTimeout = waitTimeout; |
| this.waitTimeUnit = waitTimeUnit; |
| } |
| |
| /** |
| * Update the counter value and wake up all threads waiting for this |
| * value or any value below it. |
| * <p> |
| * The counter value should only increase. |
| * An attempt to decrease the value is ignored. |
| * |
| * @param newValue the new counter value |
| */ |
| public synchronized void update(long newValue) { |
| // update() is synchronized so the value can't change. |
| long oldValue = currentId.get(); |
| LOGGER.debug("CounterWait update: oldValue = {}, newValue = {}", oldValue, newValue); |
| // Avoid doing extra work if not needed |
| if (oldValue == newValue) { |
| return; // no-op |
| } |
| |
| // Make sure the counter is never decremented. |
| if (newValue < oldValue) { |
| LOGGER.error("new counter value {} is smaller then the previous one {}", |
| newValue, oldValue); |
| return; // no-op |
| } |
| |
| currentId.set(newValue); |
| |
| // Wake up any threads waiting for a counter to reach this value. |
| wakeup(newValue); |
| } |
| |
| /** |
| * Explicitly reset the counter value to a new value, but allow setting to a |
| * smaller value. |
| * This should be used when we have some external event that resets the counter |
| * value space. |
| * @param newValue New counter value. If this is greater or equal then the current |
| * value, this is equivalent to {@link #update(long)}. Otherwise |
| * sets the counter to the new smaller value. |
| */ |
| public synchronized void reset(long newValue) { |
| long oldValue = currentId.get(); |
| LOGGER.debug("CounterWait reset: oldValue = {}, newValue = {}", oldValue, newValue); |
| |
| if (newValue > oldValue) { |
| update(newValue); |
| } else if (newValue < oldValue) { |
| LOGGER.warn("resetting counter from {} to smaller value {}", |
| oldValue, newValue); |
| currentId.set(newValue); |
| // No need to wakeup waiters since no one should wait on the smaller value |
| } |
| } |
| |
| |
| /** |
| * Wait for specified counter value. |
| * Returns immediately if the value is reached or blocks until the value |
| * is reached. |
| * Multiple threads can call the method concurrently. |
| * |
| * @param value requested counter value |
| * @return current counter value that should be no smaller then the requested |
| * value |
| * @throws InterruptedException if the wait was interrupted, TimeoutException if |
| * wait was not successfull within the timeout value specified at the construction time. |
| */ |
| public long waitFor(long value) throws InterruptedException, TimeoutException { |
| // Fast path - counter value already reached, no need to block |
| if (value <= currentId.get()) { |
| LOGGER.debug("Value {} reached", value); |
| return currentId.get(); |
| } |
| |
| // Enqueue the waiter for this value |
| ValueEvent eid = new ValueEvent(value); |
| waiters.put(eid); |
| |
| // It is possible that between the fast path check and the time the |
| // value event is enqueued, the counter value already reached the requested |
| // value. In this case we return immediately. |
| if (value <= currentId.get()) { |
| LOGGER.debug("Value {} reached", value); |
| return currentId.get(); |
| } |
| |
| // At this point we may be sure that by the time the event was enqueued, |
| // the counter was below the requested value. This means that update() |
| // is guaranteed to wake us up when the counter reaches the requested value. |
| // The wake up may actually happen before we start waiting, in this case |
| // the event's blocking queue will be non-empty and the waitFor() below |
| // will not block, so it is safe to wake up before the wait. |
| // So sit tight and wait patiently. |
| LOGGER.debug("Blocked, waiting for value {}", value); |
| eid.waitFor(); |
| return currentId.get(); |
| } |
| |
| /** |
| * Wake up any threads waiting for a counter to reach specified value |
| * Peek at the top of the queue. If the queue is empty or the top value |
| * exceeds the current value, we are done. Otherwise wakeup the top thread, |
| * remove the corresponding waiter and continue. |
| * <p> |
| * Note that the waiter may be removed under our nose by |
| * {@link #waitFor(long)} method, but this is Ok - in this case |
| * waiters.remove() will just return false. |
| * |
| * @param value current counter value |
| */ |
| private void wakeup(long value) { |
| while (true) { |
| // Get the top of the waiters queue or null if it is empty |
| ValueEvent e = waiters.poll(); |
| if (e == null) { |
| // Queue is empty - return. |
| return; |
| } |
| // No one to wake up, return event to the queue and exit |
| if (e.getValue() > value) { |
| waiters.add(e); |
| return; |
| } |
| // Due for wake-up call |
| LOGGER.debug("Unblocking, Value {} reached", e.getValue()); |
| e.wakeup(); |
| } |
| } |
| |
| // Useful for debugging |
| @Override |
| public String toString() { |
| return "CounterWait{" + "currentId=" + currentId + |
| ", waiters=" + waiters + "}"; |
| } |
| |
| /** |
| * Return number of waiters. This is mostly useful for metrics/debugging |
| * |
| * @return number of sleeping waiters |
| */ |
| public int waitersCount() { |
| return waiters.size(); |
| } |
| |
| /** |
| * Representation of the waiting event. |
| * The waiting event consists of the expected value and a binary semaphore. |
| * <p> |
| * Each thread waiting for the given value, creates a ValueEvent and tries |
| * to acquire a semaphore. This blocks until the semaphore is released. |
| * <p> |
| * ValueEvents are stored in priority queue sorted by value, so they should be |
| * comparable by the value. |
| */ |
| private class ValueEvent implements Comparable<ValueEvent> { |
| /** Value waited for. */ |
| private final long value; |
| /** Binary semaphore to synchronize waiters */ |
| private final Semaphore semaphore = new Semaphore(1); |
| |
| /** |
| * Instantiates a new Value event. |
| * |
| * @param v the expected value |
| */ |
| ValueEvent(long v) { |
| this.value = v; |
| // Acquire the semaphore. Subsequent calls to waitFor() will block until |
| // wakeup() releases the semaphore. |
| semaphore.acquireUninterruptibly(); // Will not block |
| } |
| |
| /** Wait until signaled or interrupted. May return immediately if already signalled. */ |
| void waitFor() throws InterruptedException, TimeoutException { |
| if (waitTimeout == 0) { |
| semaphore.acquire(); |
| return; |
| } |
| if (!semaphore.tryAcquire(waitTimeout, waitTimeUnit)) { |
| throw new TimeoutException(); |
| } |
| } |
| |
| /** @return the value we are waiting for. */ |
| long getValue() { |
| return value; |
| } |
| |
| /** Wakeup the waiting thread. */ |
| void wakeup() { |
| semaphore.release(); |
| } |
| |
| /** |
| * Compare objects by value. |
| */ |
| @Override |
| public int compareTo(final ValueEvent o) { |
| return value == o.value ? 0 |
| : value < o.value ? -1 |
| : 1; |
| } |
| |
| /** |
| * Use identity comparison of objects. |
| */ |
| @Override |
| public boolean equals(final Object o) { |
| return (this == o); |
| } |
| |
| @Override |
| public int hashCode() { |
| return (int) (value ^ (value >>> 32)); |
| } |
| |
| @Override |
| public String toString() { |
| return String.valueOf(value); |
| } |
| } |
| } |