blob: ba863e4c49a6cd15726c6b38878b253669bb6ba6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.util;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.NotNull;
/**
* ReadWriteLock with striping mechanics.
* Compared to {@link ReentrantReadWriteLock} it has slightly improved performance of {@link ReadWriteLock#readLock()}
* operations at the cost of {@link ReadWriteLock#writeLock()} operations and memory consumption.
* It also supports reentrancy semantics like {@link ReentrantReadWriteLock}.
*/
public class StripedCompositeReadWriteLock implements ReadWriteLock {
/** Index generator. */
private static final AtomicInteger IDX_GEN = new AtomicInteger();
/** Index. */
private static final ThreadLocal<Integer> IDX = new ThreadLocal<Integer>() {
@Override protected Integer initialValue() {
return IDX_GEN.incrementAndGet();
}
};
/** Locks. */
private final ReentrantReadWriteLock[] locks;
/** Composite write lock. */
private final WriteLock writeLock;
/**
* Creates a new instance with given concurrency level.
*
* @param concurrencyLvl Number of internal read locks.
*/
public StripedCompositeReadWriteLock(int concurrencyLvl) {
locks = new ReadLock[concurrencyLvl];
for (int i = 0; i < concurrencyLvl; i++)
locks[i] = new ReadLock();
writeLock = new WriteLock();
}
/**
* @return Index of current thread stripe.
*/
private int curIdx() {
int idx;
Thread curThread = Thread.currentThread();
if (curThread instanceof IgniteThread) {
IgniteThread igniteThread = (IgniteThread)curThread;
idx = igniteThread.compositeRwLockIndex();
if (idx == IgniteThread.GRP_IDX_UNASSIGNED) {
idx = IDX_GEN.incrementAndGet();
igniteThread.compositeRwLockIndex(idx);
}
}
else
idx = IDX.get();
return idx % locks.length;
}
/** {@inheritDoc} */
@NotNull @Override public Lock readLock() {
return locks[curIdx()].readLock();
}
/** {@inheritDoc} */
@NotNull @Override public Lock writeLock() {
return writeLock;
}
/**
* Queries if the write lock is held by the current thread.
*
* @return {@code true} if the current thread holds the write lock and
* {@code false} otherwise
*/
public boolean isWriteLockedByCurrentThread() {
return locks[locks.length - 1].isWriteLockedByCurrentThread();
}
/**
* Queries the number of reentrant read holds on this lock by the
* current thread. A reader thread has a hold on a lock for
* each lock action that is not matched by an unlock action.
*
* @return the number of holds on the read lock by the current thread,
* or zero if the read lock is not held by the current thread
*/
public int getReadHoldCount() {
return locks[curIdx()].getReadHoldCount();
}
/**
* Read lock.
*/
@SuppressWarnings("unused")
private static class ReadLock extends ReentrantReadWriteLock {
/** */
private static final long serialVersionUID = 0L;
/** Padding. */
private long p0;
/** */
private long p1;
/** */
private long p2;
/** */
private long p3;
/** */
private long p4;
/** */
private long p5;
/** */
private long p6;
/** */
private long p7;
}
/**
* Write lock.
*/
private class WriteLock implements Lock {
/** {@inheritDoc} */
@Override public void lock() {
try {
lock0(false);
}
catch (InterruptedException ignore) {
assert false : "Should never happen";
}
}
/** {@inheritDoc} */
@Override public void lockInterruptibly() throws InterruptedException {
lock0(true);
}
/** {@inheritDoc} */
@Override public void unlock() {
unlock0(locks.length - 1);
}
/**
* Internal lock routine.
*
* @param canInterrupt Whether to acquire the lock interruptibly.
* @throws InterruptedException If interrupted.
*/
private void lock0(boolean canInterrupt) throws InterruptedException {
int i = 0;
try {
for (; i < locks.length; i++) {
if (canInterrupt)
locks[i].writeLock().lockInterruptibly();
else
locks[i].writeLock().lock();
}
}
catch (InterruptedException e) {
unlock0(i - 1);
throw e;
}
}
/**
* Internal unlock routine.
*
* @param fromIdx Start index.
*/
private void unlock0(int fromIdx) {
for (int i = fromIdx; i >= 0; i--)
locks[i].writeLock().unlock();
}
/** {@inheritDoc} */
@Override public boolean tryLock() {
int i = 0;
try {
for (; i < locks.length; i++) {
if (!locks[i].writeLock().tryLock())
break;
}
}
finally {
if (0 < i && i < locks.length)
unlock0(i - 1);
}
return i == locks.length;
}
/** {@inheritDoc} */
@Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
int i = 0;
long end = unit.toNanos(time) + System.nanoTime();
try {
for (; i < locks.length && System.nanoTime() < end; i++) {
if (!locks[i].writeLock().tryLock(time, unit))
break;
}
}
finally {
if (0 < i && i < locks.length)
unlock0(i - 1);
}
return i == locks.length;
}
/** {@inheritDoc} */
@NotNull @Override public Condition newCondition() {
throw new UnsupportedOperationException();
}
}
}