| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.util; |
| |
| import java.lang.invoke.MethodHandles; |
| import java.lang.invoke.VarHandle; |
| import java.util.concurrent.TimeUnit; |
| import org.apache.ignite.internal.tostring.S; |
| |
| /** |
| * Spin read-write lock. |
| * Its blocking methods use the spinwait strategy. When they do so, they are not interruptible (that is, they do not break their loop on |
| * interruption signal). |
| * |
| * <p>The locks are reentrant (that is, the same thread can acquire the same lock a few times in a row and then |
| * release them same number of times. |
| * |
| * <p>Write lock acquire requests are prioritized over read lock acquire requests. That is, if both read and write lock |
| * acquire requests are received when the write lock is held by someone else, then, on its release, the write lock attempt will be served |
| * first. |
| */ |
| public class IgniteSpinReadWriteLock { |
| /** Signals that nobody currently owns the read lock. */ |
| private static final long NO_OWNER = -1; |
| |
| /** |
| * State -1 means that the write lock is acquired. |
| * |
| * @see #state |
| */ |
| private static final int WRITE_LOCKED = -1; |
| |
| /** |
| * State 0 means that both read and write locks are available for acquiring. |
| * |
| * @see #state |
| */ |
| private static final int AVAILABLE = 0; |
| |
| /** How much time to sleep on each iteration of a spin loop (milliseconds). */ |
| private static final int SLEEP_MILLIS = 10; |
| |
| /** {@link VarHandle} used to access the {@code pendingWLocks} field. */ |
| private static final VarHandle PENDING_WLOCKS_VH; |
| |
| /** {@link VarHandle} used to access the {@code state} field. */ |
| private static final VarHandle STATE_VH; |
| |
| static { |
| try { |
| STATE_VH = MethodHandles.lookup() |
| .findVarHandle(IgniteSpinReadWriteLock.class, "state", int.class); |
| |
| PENDING_WLOCKS_VH = MethodHandles.lookup() |
| .findVarHandle(IgniteSpinReadWriteLock.class, "pendingWriteLocks", int.class); |
| } catch (ReflectiveOperationException e) { |
| throw new ExceptionInInitializerError(e); |
| } |
| } |
| |
| /** Number of times read lock was acquired, per thread (used to track reentrance). */ |
| private final ThreadLocal<Integer> readLockEntryCnt = ThreadLocal.withInitial(() -> 0); |
| |
| /** |
| * Main state of the lock. |
| * <ul> |
| * <li>Positive when the read lock has been acquired by at least one thread; in such case, this number equals |
| * to the number of threads holding the read lock. In such state, the read lock may be acquired by any thread, |
| * while an attempt to acquire the write lock will block or fail.</li> |
| * <li>Zero when neither the read lock, nor the write lock has been acquired by any thread. This state allows |
| * a thread to acquire either the read or the write lock at will.</li> |
| * <li>-1 when the write lock has been acquired by exactly one thread. In such state, any attempt to acquire the read |
| * or the write lock by any thread (but the thread holding the write lock) will block or fail.</li> |
| * </ul> |
| */ |
| private volatile int state; |
| |
| /** |
| * Number of pending write attempts to acquire the write lock. Currently it is only used to prioritize write lock attempts over read |
| * lock attempts when the write lock has been released (so, if both an attempt to acquire the write lock and an attempt to acquire the |
| * read lock are waiting for write lock to be released, a write lock attempt will be served first when the release happens). |
| */ |
| private volatile int pendingWriteLocks; |
| |
| /** ID of the thread holding write lock (or {@link #NO_OWNER} if the write lock is not held). */ |
| private long writeLockOwner = NO_OWNER; |
| |
| /** Number of times the write lock holder locked the write lock (used to track reentrance). */ |
| private int writeLockEntryCnt; |
| |
| /** |
| * Acquires the read lock. If the write lock is held by another thread, this blocks until the write lock is released (and until all |
| * concurrent write locks are acquired and released, as this class pripritizes write lock attempts over read lock attempts). |
| */ |
| @SuppressWarnings("BusyWait") |
| public void readLock() { |
| int cnt = readLockEntryCnt.get(); |
| |
| // Read lock reentry or acquiring read lock while holding write lock. |
| if (alreadyHoldingAnyLock(cnt)) { |
| incrementCurrentThreadReadLockCount(cnt); |
| |
| return; |
| } |
| |
| boolean interrupted = false; |
| |
| while (true) { |
| int curState = state; |
| |
| assert curState >= WRITE_LOCKED; |
| |
| if (writeLockedOrGoingToBe(curState)) { |
| try { |
| Thread.sleep(SLEEP_MILLIS); |
| } catch (InterruptedException ignored) { |
| interrupted = true; |
| } |
| |
| continue; |
| } |
| |
| if (tryAdvanceStateToReadLocked(curState)) { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| |
| break; |
| } |
| } |
| |
| readLockEntryCnt.set(1); |
| } |
| |
| /** |
| * Whether the current thread already holds any lock. |
| * |
| * @param currentThreadReadLockAcquiredCount how many times current thread acquired (without releasing yet) the read lock |
| * @return true if current thread already holds any lock |
| */ |
| private boolean alreadyHoldingAnyLock(int currentThreadReadLockAcquiredCount) { |
| return currentThreadReadLockAcquiredCount > 0 || writeLockedByCurrentThread(); |
| } |
| |
| private void incrementCurrentThreadReadLockCount(int cnt) { |
| assert state > 0 || state == WRITE_LOCKED; |
| |
| readLockEntryCnt.set(cnt + 1); |
| } |
| |
| private boolean writeLockedOrGoingToBe(int curState) { |
| return curState == WRITE_LOCKED || pendingWriteLocks > 0; |
| } |
| |
| private boolean tryAdvanceStateToReadLocked(int curState) { |
| return compareAndSet(STATE_VH, curState, curState + 1); |
| } |
| |
| /** |
| * Tries to acquire the read lock. No spinwait is used if the lock cannot be acquired immediately. |
| * |
| * @return {@code true} if acquired, {@code false} if write lock is already held by someone else |
| */ |
| public boolean tryReadLock() { |
| int cnt = readLockEntryCnt.get(); |
| |
| // Read lock reentry or acquiring read lock while holding write lock. |
| if (alreadyHoldingAnyLock(cnt)) { |
| incrementCurrentThreadReadLockCount(cnt); |
| |
| return true; |
| } |
| |
| while (true) { |
| int curState = state; |
| |
| if (writeLockedOrGoingToBe(curState)) { |
| return false; |
| } |
| |
| if (tryAdvanceStateToReadLocked(curState)) { |
| readLockEntryCnt.set(1); |
| |
| return true; |
| } |
| } |
| } |
| |
| /** |
| * Releases the read lock. |
| * |
| * @throws IllegalMonitorStateException thrown if the current thread does not hold the read lock |
| */ |
| public void readUnlock() { |
| int cnt = readLockEntryCnt.get(); |
| |
| if (cnt == 0) { |
| throw new IllegalMonitorStateException(); |
| } |
| |
| // Read unlock when holding write lock is performed here. |
| if (cnt > 1 || writeLockedByCurrentThread()) { |
| assert state > 0 || state == WRITE_LOCKED; |
| |
| readLockEntryCnt.set(cnt - 1); |
| |
| return; |
| } |
| |
| while (true) { |
| int curState = state; |
| |
| assert curState > 0; |
| |
| if (compareAndSet(STATE_VH, curState, curState - 1)) { |
| readLockEntryCnt.set(0); |
| |
| return; |
| } |
| } |
| } |
| |
| /** |
| * Acquires the write lock waiting, if needed. The thread will block until all other threads release both read and write locks. |
| */ |
| @SuppressWarnings("BusyWait") |
| public void writeLock() { |
| if (writeLockedByCurrentThread()) { |
| incrementWriteLockCount(); |
| |
| return; |
| } |
| |
| boolean interrupted = false; |
| |
| incrementPendingWriteLocks(); |
| try { |
| while (!trySwitchStateToWriteLocked()) { |
| try { |
| Thread.sleep(SLEEP_MILLIS); |
| } catch (InterruptedException ignored) { |
| interrupted = true; |
| } |
| } |
| } finally { |
| decrementPendingWriteLocks(); |
| } |
| |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| |
| finishWriteLockAcquire(); |
| } |
| |
| private void incrementWriteLockCount() { |
| assert state == WRITE_LOCKED; |
| |
| writeLockEntryCnt++; |
| } |
| |
| private void incrementPendingWriteLocks() { |
| while (true) { |
| int curPendingWriteLocks = pendingWriteLocks; |
| |
| if (compareAndSet(PENDING_WLOCKS_VH, curPendingWriteLocks, curPendingWriteLocks + 1)) { |
| break; |
| } |
| } |
| } |
| |
| private boolean trySwitchStateToWriteLocked() { |
| return compareAndSet(STATE_VH, AVAILABLE, WRITE_LOCKED); |
| } |
| |
| private void decrementPendingWriteLocks() { |
| while (true) { |
| int curPendingWriteLocks = pendingWriteLocks; |
| |
| assert curPendingWriteLocks > 0; |
| |
| if (compareAndSet(PENDING_WLOCKS_VH, curPendingWriteLocks, curPendingWriteLocks - 1)) { |
| break; |
| } |
| } |
| } |
| |
| private void finishWriteLockAcquire() { |
| assert writeLockOwner == NO_OWNER; |
| |
| writeLockOwner = Thread.currentThread().getId(); |
| writeLockEntryCnt = 1; |
| } |
| |
| /** |
| * Acquires the write lock without sleeping between unsuccessful attempts. Instead, the spinwait eats cycles of the core it gets at full |
| * speed. It is non-interruptible as its {@link #writeLock()} cousin. |
| */ |
| public void writeLockBusy() { |
| if (writeLockedByCurrentThread()) { |
| incrementWriteLockCount(); |
| |
| return; |
| } |
| |
| incrementPendingWriteLocks(); |
| try { |
| while (!trySwitchStateToWriteLocked()) { |
| // No-op. |
| } |
| } finally { |
| decrementPendingWriteLocks(); |
| } |
| |
| finishWriteLockAcquire(); |
| } |
| |
| /** |
| * Return {@code true} if blocked by current thread. |
| * |
| * @return {@code True} if blocked by current thread. |
| */ |
| public boolean writeLockedByCurrentThread() { |
| return writeLockOwner == Thread.currentThread().getId(); |
| } |
| |
| /** |
| * Tries to acquire the write lock. Never blocks: if any lock has already been acquired by someone else, returns {@code false} |
| * immediately. |
| * |
| * @return {@code true} if the write lock has been acquired, {@code false} otherwise |
| */ |
| public boolean tryWriteLock() { |
| if (writeLockedByCurrentThread()) { |
| incrementWriteLockCount(); |
| |
| return true; |
| } |
| |
| if (trySwitchStateToWriteLocked()) { |
| finishWriteLockAcquire(); |
| |
| return true; |
| } |
| |
| return false; |
| } |
| |
| /** |
| * Tries to acquire the write lock with timeout. If it gets the write lock before the timeout expires, then returns {@code true}. If the |
| * timeout expires before the lock becomes available, returns {@code false}. |
| * |
| * @param timeout Timeout. |
| * @param unit Unit. |
| * @return {@code true} if the write lock has been acquired in time; {@code false} otherwise |
| * @throws InterruptedException If interrupted. |
| */ |
| @SuppressWarnings("BusyWait") |
| public boolean tryWriteLock(long timeout, TimeUnit unit) throws InterruptedException { |
| if (writeLockedByCurrentThread()) { |
| incrementWriteLockCount(); |
| |
| return true; |
| } |
| |
| incrementPendingWriteLocks(); |
| try { |
| long startNanos = System.nanoTime(); |
| |
| long timeoutNanos = unit.toNanos(timeout); |
| |
| while (true) { |
| if (trySwitchStateToWriteLocked()) { |
| finishWriteLockAcquire(); |
| |
| return true; |
| } |
| |
| Thread.sleep(SLEEP_MILLIS); |
| |
| if (System.nanoTime() - startNanos >= timeoutNanos) { |
| return false; |
| } |
| } |
| } finally { |
| decrementPendingWriteLocks(); |
| } |
| } |
| |
| /** |
| * Releases the write lock. |
| * |
| * @throws IllegalMonitorStateException thrown if the current thread does not hold the write lock. |
| */ |
| public void writeUnlock() { |
| if (!writeLockedByCurrentThread()) { |
| throw new IllegalMonitorStateException(); |
| } |
| |
| if (writeLockEntryCnt > 1) { |
| writeLockEntryCnt--; |
| |
| return; |
| } |
| |
| writeLockEntryCnt = 0; |
| writeLockOwner = NO_OWNER; |
| |
| // Current thread holds write and read locks and is releasing |
| // write lock now. |
| int update = readLockEntryCnt.get() > 0 ? 1 : AVAILABLE; |
| |
| boolean b = compareAndSet(STATE_VH, WRITE_LOCKED, update); |
| |
| assert b; |
| } |
| |
| /** |
| * Returns {@code true} on success. |
| * |
| * @param varHandle VarHandle. |
| * @param expect Expected. |
| * @param update Update. |
| * @return {@code True} on success. |
| */ |
| private boolean compareAndSet(VarHandle varHandle, int expect, int update) { |
| return varHandle.compareAndSet(this, expect, update); |
| } |
| |
| /** |
| * Returns the count of pending write lock requests count. Only used by tests, should not be used in production code. |
| * |
| * @return count of pending requests to get the write lock |
| */ |
| int pendingWriteLocksCount() { |
| return pendingWriteLocks; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public String toString() { |
| return S.toString(IgniteSpinReadWriteLock.class, this); |
| } |
| } |