blob: 1d31bacedb3a21d65d11b9f006583cc556e983da [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.datastructures;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteCondition;
import org.apache.ignite.IgniteCountDownLatch;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLock;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
import org.jetbrains.annotations.Nullable;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
/**
* Cache reentrant lock self test.
*/
public abstract class IgniteLockAbstractSelfTest extends IgniteAtomicsAbstractTest
implements Externalizable {
/** */
private static final int NODES_CNT = 4;
/** */
protected static final int THREADS_CNT = 5;
/** */
private static final Random RND = new Random();
/** */
@Rule
public final ExpectedException exception = ExpectedException.none();
/** {@inheritDoc} */
@Override protected int gridCount() {
return NODES_CNT;
}
/**
* @throws Exception If failed.
*/
@Test
public void testReentrantLock() throws Exception {
checkReentrantLock(false);
checkReentrantLock(true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testFailover() throws Exception {
checkFailover(true, false);
checkFailover(false, false);
checkFailover(true, true);
checkFailover(false, true);
}
/**
* Implementation of ignite data structures internally uses special system caches, need make sure
* that transaction on these system caches do not intersect with transactions started by user.
*
* @throws Exception If failed.
*/
@Test
public void testIsolation() throws Exception {
Ignite ignite = grid(0);
CacheConfiguration cfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
cfg.setName("myCache");
cfg.setAtomicityMode(TRANSACTIONAL);
cfg.setWriteSynchronizationMode(FULL_SYNC);
IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(cfg);
try {
IgniteLock lock = ignite.reentrantLock("lock", true, true, true);
try (Transaction tx = ignite.transactions().txStart()) {
cache.put(1, 1);
boolean success = lock.tryLock(1, MILLISECONDS);
assertTrue(success);
tx.rollback();
}
assertEquals(0, cache.size());
assertTrue(lock.isLocked());
lock.unlock();
assertFalse(lock.isLocked());
lock.close();
assertTrue(lock.removed());
}
finally {
ignite.destroyCache(cfg.getName());
}
}
/**
* @param failoverSafe Failover safe flag.
* @throws Exception If failed.
*/
private void checkFailover(final boolean failoverSafe, final boolean fair) throws Exception {
IgniteEx g = startGrid(NODES_CNT + 1);
// For vars locality.
{
// Ensure not exists.
assert g.reentrantLock("lock", failoverSafe, fair, false) == null;
IgniteLock lock = g.reentrantLock("lock", failoverSafe, fair, true);
lock.lock();
assert lock.tryLock();
assertEquals(2, lock.getHoldCount());
}
Ignite g0 = grid(0);
final IgniteLock lock0 = g0.reentrantLock("lock", false, fair, false);
assert !lock0.tryLock();
assertEquals(0, lock0.getHoldCount());
IgniteInternalFuture<?> fut = multithreadedAsync(
new Callable<Object>() {
@Override public Object call() throws Exception {
try {
lock0.lock();
info("Acquired in separate thread.");
// Lock is acquired silently only in failoverSafe mode.
assertTrue(failoverSafe);
lock0.unlock();
info("Released lock in separate thread.");
}
catch (IgniteException e) {
if (!failoverSafe)
info("Ignored expected exception: " + e);
else
throw e;
}
return null;
}
},
1);
Thread.sleep(100);
g.close();
fut.get(500);
lock0.close();
}
/**
* @throws Exception If failed.
*/
private void checkReentrantLock(final boolean fair) throws Exception {
// Test API.
checkLock(fair);
checkFailoverSafe(fair);
// Test main functionality.
IgniteLock lock1 = grid(0).reentrantLock("lock", true, fair, true);
assertFalse(lock1.isLocked());
lock1.lock();
IgniteFuture<Object> fut = grid(0).compute().callAsync(new IgniteCallable<Object>() {
@IgniteInstanceResource
private Ignite ignite;
@LoggerResource
private IgniteLogger log;
@Nullable @Override public Object call() throws Exception {
// Test reentrant lock in multiple threads on each node.
IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(
new Callable<Object>() {
@Nullable @Override public Object call() throws Exception {
IgniteLock lock = ignite.reentrantLock("lock", true, fair, true);
assert lock != null;
log.info("Thread is going to wait on reentrant lock: " + Thread.currentThread().getName());
assert lock.tryLock(1, MINUTES);
log.info("Thread is again runnable: " + Thread.currentThread().getName());
lock.unlock();
return null;
}
},
5,
"test-thread"
);
fut.get();
return null;
}
});
Thread.sleep(3000);
assert lock1.isHeldByCurrentThread();
assert lock1.getHoldCount() == 1;
lock1.lock();
assert lock1.isHeldByCurrentThread();
assert lock1.getHoldCount() == 2;
lock1.unlock();
lock1.unlock();
// Ensure there are no hangs.
fut.get();
// Test operations on removed lock.
lock1.close();
checkRemovedReentrantLock(lock1);
}
/**
* @param lock IgniteLock.
* @throws Exception If failed.
*/
protected void checkRemovedReentrantLock(final IgniteLock lock) throws Exception {
assert GridTestUtils.waitForCondition(new PA() {
@Override public boolean apply() {
return lock.removed();
}
}, 5000);
assert lock.removed();
}
/**
* This method only checks if parameter of new reentrant lock is initialized properly.
* For tests considering failure recovery see @GridCachePartitionedNodeFailureSelfTest.
*
* @throws Exception Exception.
*/
private void checkFailoverSafe(final boolean fair) throws Exception {
// Checks only if reentrant lock is initialized properly
IgniteLock lock = createReentrantLock("rmv", true, fair);
assert lock.isFailoverSafe();
removeReentrantLock("rmv", fair);
IgniteLock lock1 = createReentrantLock("rmv1", false, fair);
assert !lock1.isFailoverSafe();
removeReentrantLock("rmv1", fair);
}
/**
* @throws Exception Exception.
*/
private void checkLock(final boolean fair) throws Exception {
// Check only 'false' cases here. Successful lock is tested over the grid.
final IgniteLock lock = createReentrantLock("acquire", false, fair);
lock.lock();
IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
assertNotNull(lock);
assert !lock.tryLock();
assert !lock.tryLock(10, MICROSECONDS);
return null;
}
});
fut.get();
lock.unlock();
removeReentrantLock("acquire", fair);
}
/**
* @param lockName Reentrant lock name.
* @param failoverSafe FailoverSafe flag.
* @param fair Fairness flag.
* @return New distributed reentrant lock.
* @throws Exception If failed.
*/
private IgniteLock createReentrantLock(String lockName, boolean failoverSafe, boolean fair)
throws Exception {
IgniteLock lock = grid(RND.nextInt(NODES_CNT)).reentrantLock(lockName, failoverSafe, fair, true);
// Test initialization.
assert lockName.equals(lock.name());
assert !lock.isLocked();
assert lock.isFailoverSafe() == failoverSafe;
assert lock.isFair() == fair;
return lock;
}
/**
* @param lockName Reentrant lock name.
* @throws Exception If failed.
*/
private void removeReentrantLock(String lockName, final boolean fair)
throws Exception {
IgniteLock lock = grid(RND.nextInt(NODES_CNT)).reentrantLock(lockName, false, fair, true);
assert lock != null;
// Remove lock on random node.
IgniteLock lock0 = grid(RND.nextInt(NODES_CNT)).reentrantLock(lockName, false, fair, true);
assertNotNull(lock0);
lock0.close();
// Ensure reentrant lock is removed on all nodes.
for (Ignite g : G.allGrids())
assertNull(((IgniteKernal)g).context().dataStructures().reentrantLock(lockName, null, false, fair, false));
checkRemovedReentrantLock(lock);
}
/**
* @throws Exception If failed.
*/
@Test
public void testLockSerialization() throws Exception {
final IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
info("Lock created: " + lock);
lock.isFailoverSafe();
lock.isFair();
grid(ThreadLocalRandom.current().nextInt(G.allGrids().size())).compute().broadcast(new IgniteCallable<Object>() {
@Nullable @Override public Object call() throws Exception {
Thread.sleep(1000);
lock.lock();
try {
info("Inside lock: " + lock.getHoldCount());
}
finally {
lock.unlock();
}
return null;
}
});
}
/**
* @throws Exception If failed.
*/
@Test
public void testInitialization() throws Exception {
// Test #name() method.
{
IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
assertEquals("lock", lock.name());
lock.close();
}
// Test #isFailoverSafe() method.
{
IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
info("Lock created: " + lock);
assertTrue(lock.isFailoverSafe());
lock.close();
}
// Test #isFair() method.
{
IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
assertTrue(lock.isFair());
lock.close();
}
// Test #isBroken() method.
{
IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
assertFalse(lock.isBroken());
lock.close();
}
// Test #getOrCreateCondition(String ) method.
{
IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
assertNotNull(lock.getOrCreateCondition("condition"));
lock.close();
}
// Test #getHoldCount() method.
{
IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
assertEquals(0, lock.getHoldCount());
lock.close();
}
// Test #isHeldByCurrentThread() method.
{
IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
assertFalse(lock.isHeldByCurrentThread());
lock.close();
}
// Test #isLocked() method.
{
IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
assertFalse(lock.isLocked());
lock.close();
}
// Test #hasQueuedThreads() method.
{
IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
assertFalse(lock.hasQueuedThreads());
lock.close();
}
// Test #hasQueuedThread(Thread ) method.
{
IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
assertFalse(lock.hasQueuedThread(Thread.currentThread()));
lock.close();
}
// Test #hasWaiters(IgniteCondition ) method.
{
IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
try {
IgniteCondition cond = grid(0).reentrantLock("lock2", true, true, true).getOrCreateCondition("cond");
lock.hasWaiters(cond);
fail("Condition not associated with this lock passed as argument.");
}
catch (IllegalArgumentException ignored) {
info("IllegalArgumentException thrown as it should be.");
}
try {
IgniteCondition cond = lock.getOrCreateCondition("condition");
lock.hasWaiters(cond);
fail("This method should throw exception when lock is not held.");
}
catch (IllegalMonitorStateException ignored) {
info("IllegalMonitorStateException thrown as it should be.");
}
lock.close();
}
// Test #getWaitQueueLength(IgniteCondition ) method.
{
IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
try {
IgniteCondition cond = grid(0).reentrantLock("lock2", true, true, true).getOrCreateCondition("cond");
lock.getWaitQueueLength(cond);
fail("Condition not associated with this lock passed as argument.");
}
catch (IllegalArgumentException ignored) {
info("IllegalArgumentException thrown as it should be.");
}
try {
IgniteCondition cond = lock.getOrCreateCondition("condition");
lock.getWaitQueueLength(cond);
fail("This method should throw exception when lock is not held.");
}
catch (IllegalMonitorStateException ignored) {
info("IllegalMonitorStateException thrown as it should be.");
}
lock.close();
}
// Test #lock() method.
{
IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
lock.lock();
lock.unlock();
lock.close();
}
// Test #lockInterruptibly() method.
{
IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
lock.lockInterruptibly();
lock.unlock();
lock.close();
}
// Test #tryLock() method.
{
IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
boolean success = lock.tryLock();
assertTrue(success);
lock.unlock();
lock.close();
}
// Test #tryLock(long, TimeUnit) method.
{
IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
boolean success = lock.tryLock(1, MILLISECONDS);
assertTrue(success);
lock.unlock();
lock.close();
}
// Test #unlock() method.
{
IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
try {
lock.unlock();
fail("This method should throw exception when lock is not held.");
}
catch (IllegalMonitorStateException ignored) {
info("IllegalMonitorStateException thrown as it should be.");
}
lock.close();
}
// Test #removed() method.
{
IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
assertFalse(lock.removed());
lock.close();
assertTrue(lock.removed());
}
// Test #close() method.
{
IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
lock.close();
assertTrue(lock.removed());
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testReentrantLockMultinode1() throws Exception {
testReentrantLockMultinode1(false);
testReentrantLockMultinode1(true);
}
/**
* @throws Exception If failed.
*/
private void testReentrantLockMultinode1(final boolean fair) throws Exception {
if (gridCount() == 1)
return;
IgniteLock lock = grid(0).reentrantLock("s1", true, fair, true);
List<IgniteInternalFuture<?>> futs = new ArrayList<>();
for (int i = 0; i < gridCount(); i++) {
final Ignite ignite = grid(i);
futs.add(GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
IgniteLock lock = ignite.reentrantLock("s1", true, fair, false);
assertNotNull(lock);
IgniteCondition cond1 = lock.getOrCreateCondition("c1");
IgniteCondition cond2 = lock.getOrCreateCondition("c2");
try {
boolean wait = lock.tryLock(30_000, MILLISECONDS);
assertTrue(wait);
cond2.signal();
cond1.await();
}
finally {
lock.unlock();
}
return null;
}
}));
}
boolean done = false;
while (!done) {
done = true;
for (IgniteInternalFuture<?> fut : futs) {
if (!fut.isDone())
done = false;
}
try {
lock.lock();
lock.getOrCreateCondition("c1").signal();
lock.getOrCreateCondition("c2").await(10, MILLISECONDS);
}
finally {
lock.unlock();
}
}
for (IgniteInternalFuture<?> fut : futs)
fut.get(30_000);
}
/**
* @throws Exception If failed.
*/
@Test
public void testLockInterruptibly() throws Exception {
testLockInterruptibly(false);
testLockInterruptibly(true);
}
/**
* @throws Exception If failed.
*/
private void testLockInterruptibly(final boolean fair) throws Exception {
final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true);
assertEquals(0, lock0.getHoldCount());
assertFalse(lock0.hasQueuedThreads());
final int totalThreads = 2;
final Set<Thread> startedThreads = new GridConcurrentHashSet<>();
lock0.lock();
IgniteInternalFuture<?> fut = multithreadedAsync(
new Callable<Object>() {
@Override public Object call() throws Exception {
assertFalse(lock0.isHeldByCurrentThread());
startedThreads.add(Thread.currentThread());
boolean isInterrupted = false;
try {
lock0.lockInterruptibly();
}
catch (IgniteInterruptedException ignored) {
assertFalse(Thread.currentThread().isInterrupted());
isInterrupted = true;
}
finally {
// Assert that thread was interrupted.
assertTrue(isInterrupted);
// Assert that locked is still owned by main thread.
assertTrue(lock0.isLocked());
// Assert that this thread doesn't own the lock.
assertFalse(lock0.isHeldByCurrentThread());
}
return null;
}
}, totalThreads);
// Wait for all threads to attempt to acquire lock.
while (startedThreads.size() != totalThreads) {
Thread.sleep(1000);
}
for (Thread t : startedThreads)
t.interrupt();
fut.get();
lock0.unlock();
assertFalse(lock0.isLocked());
for (Thread t : startedThreads)
assertFalse(lock0.hasQueuedThread(t));
lock0.close();
}
/**
* @throws Exception If failed.
*/
@Test
public void testLockInterruptiblyMultinode() throws Exception {
testLockInterruptiblyMultinode(false);
testLockInterruptiblyMultinode(true);
}
/**
* @throws Exception If failed.
*/
private void testLockInterruptiblyMultinode(final boolean fair) throws Exception {
if (gridCount() == 1)
return;
// Initialize reentrant lock.
final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true);
assertEquals(0, lock0.getHoldCount());
assertFalse(lock0.hasQueuedThreads());
lock0.lock();
// Number of threads, one per node.
final int threadCount = gridCount();
final AtomicLong threadCounter = new AtomicLong(0);
IgniteInternalFuture<?> fut = multithreadedAsync(
new Callable<Object>() {
@Override public Object call() throws Exception {
final int localNodeId = (int)threadCounter.getAndIncrement();
final Ignite grid = grid(localNodeId);
IgniteClosure<Ignite, Void> closure = new IgniteClosure<Ignite, Void>() {
@Override public Void apply(Ignite ignite) {
final IgniteLock l = ignite.reentrantLock("lock", true, true, true);
final AtomicReference<Thread> thread = new AtomicReference<>();
final AtomicBoolean done = new AtomicBoolean(false);
final AtomicBoolean exceptionThrown = new AtomicBoolean(false);
final IgniteCountDownLatch latch = ignite.countDownLatch("latch", threadCount, false, true);
IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
try {
thread.set(Thread.currentThread());
l.lockInterruptibly();
}
catch (IgniteInterruptedException ignored) {
exceptionThrown.set(true);
}
finally {
done.set(true);
}
return null;
}
});
// Wait until l.lock() has been called.
while (!l.hasQueuedThreads()){
// No-op.
}
latch.countDown();
latch.await();
thread.get().interrupt();
while (!done.get()){
// No-op.
}
try {
fut.get();
}
catch (IgniteCheckedException e) {
fail(e.getMessage());
throw new RuntimeException(e);
}
assertTrue(exceptionThrown.get());
return null;
}
};
closure.apply(grid);
return null;
}
}, threadCount);
fut.get();
lock0.unlock();
info("Checking if interrupted threads are removed from global waiting queue...");
// Check if interrupted threads are removed from global waiting queue.
boolean locked = lock0.tryLock(1000, MILLISECONDS);
info("Interrupted threads successfully removed from global waiting queue. ");
assertTrue(locked);
lock0.unlock();
assertFalse(lock0.isLocked());
lock0.close();
}
/**
* @throws Exception If failed.
*/
@Test
public void testLock() throws Exception {
testLock(false);
testLock(true);
}
/**
* @throws Exception If failed.
*/
private void testLock(final boolean fair) throws Exception {
final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true);
assertEquals(0, lock0.getHoldCount());
assertFalse(lock0.hasQueuedThreads());
final int totalThreads = 2;
final Set<Thread> startedThreads = new GridConcurrentHashSet<>();
lock0.lock();
IgniteInternalFuture<?> fut = multithreadedAsync(
new Callable<Object>() {
@Override public Object call() throws Exception {
assertFalse(lock0.isHeldByCurrentThread());
startedThreads.add(Thread.currentThread());
boolean isInterrupted = false;
try {
lock0.lock();
}
catch (IgniteInterruptedException ignored) {
isInterrupted = true;
fail("Lock() method is uninterruptible.");
}
finally {
// Assert that thread was not interrupted.
assertFalse(isInterrupted);
// Assert that interrupted flag is set and clear it in order to call unlock().
assertTrue(Thread.interrupted());
// Assert that lock is still owned by this thread.
assertTrue(lock0.isLocked());
// Assert that this thread does own the lock.
assertTrue(lock0.isHeldByCurrentThread());
// Release lock.
lock0.unlock();
}
return null;
}
}, totalThreads);
// Wait for all threads to attempt to acquire lock.
while (startedThreads.size() != totalThreads) {
Thread.sleep(500);
}
for (Thread t : startedThreads)
t.interrupt();
lock0.unlock();
fut.get();
assertFalse(lock0.isLocked());
for (Thread t : startedThreads)
assertFalse(lock0.hasQueuedThread(t));
lock0.close();
}
/**
* @throws Exception If failed.
*/
@Test
public void testTryLock() throws Exception {
testTryLock(false);
testTryLock(true);
}
/**
* @throws Exception If failed.
*/
private void testTryLock(final boolean fair) throws Exception {
final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true);
assertEquals(0, lock0.getHoldCount());
assertFalse(lock0.hasQueuedThreads());
final int totalThreads = 2;
final Set<Thread> startedThreads = new GridConcurrentHashSet<>();
lock0.lock();
IgniteInternalFuture<?> fut = multithreadedAsync(
new Callable<Object>() {
@Override public Object call() throws Exception {
assertFalse(lock0.isHeldByCurrentThread());
startedThreads.add(Thread.currentThread());
boolean isInterrupted = false;
boolean locked = false;
try {
locked = lock0.tryLock();
}
catch (IgniteInterruptedException ignored) {
isInterrupted = true;
fail("tryLock() method is uninterruptible.");
}
finally {
// Assert that thread was not interrupted.
assertFalse(isInterrupted);
// Assert that lock is locked.
assertTrue(lock0.isLocked());
// Assert that this thread does own the lock.
assertEquals(locked, lock0.isHeldByCurrentThread());
// Release lock.
if (locked)
lock0.unlock();
}
return null;
}
}, totalThreads);
// Wait for all threads to attempt to acquire lock.
while (startedThreads.size() != totalThreads) {
Thread.sleep(500);
}
for (Thread t : startedThreads)
t.interrupt();
fut.get();
lock0.unlock();
assertFalse(lock0.isLocked());
for (Thread t : startedThreads)
assertFalse(lock0.hasQueuedThread(t));
lock0.close();
}
/**
* @throws Exception If failed.
*/
@Test
public void testTryLockTimed() throws Exception {
testTryLockTimed(false);
testTryLockTimed(true);
}
/**
* @throws Exception If failed.
*/
private void testTryLockTimed(final boolean fair) throws Exception {
final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true);
assertEquals(0, lock0.getHoldCount());
assertFalse(lock0.hasQueuedThreads());
final int totalThreads = 2;
final Set<Thread> startedThreads = new GridConcurrentHashSet<>();
lock0.lock();
IgniteInternalFuture<?> fut = multithreadedAsync(
new Callable<Object>() {
@Override public Object call() throws Exception {
assertFalse(lock0.isHeldByCurrentThread());
startedThreads.add(Thread.currentThread());
boolean isInterrupted = false;
boolean locked = false;
try {
locked = lock0.tryLock(100, TimeUnit.MILLISECONDS);
}
catch (IgniteInterruptedException ignored) {
isInterrupted = true;
}
finally {
// Assert that thread was not interrupted.
assertFalse(isInterrupted);
// Assert that tryLock returned false.
assertFalse(locked);
// Assert that lock is still owned by main thread.
assertTrue(lock0.isLocked());
// Assert that this thread doesn't own the lock.
assertFalse(lock0.isHeldByCurrentThread());
// Release lock.
if (locked)
lock0.unlock();
}
return null;
}
}, totalThreads);
fut.get();
lock0.unlock();
assertFalse(lock0.isLocked());
for (Thread t : startedThreads)
assertFalse(lock0.hasQueuedThread(t));
lock0.close();
}
/**
* @throws Exception If failed.
*/
@Test
public void testConditionAwaitUninterruptibly() throws Exception {
testConditionAwaitUninterruptibly(false);
testConditionAwaitUninterruptibly(true);
}
/**
* @throws Exception If failed.
*/
private void testConditionAwaitUninterruptibly(final boolean fair) throws Exception {
final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true);
assertEquals(0, lock0.getHoldCount());
assertFalse(lock0.hasQueuedThreads());
final int totalThreads = 2;
final Set<Thread> startedThreads = new GridConcurrentHashSet<>();
IgniteInternalFuture<?> fut = multithreadedAsync(
new Callable<Object>() {
@Override public Object call() throws Exception {
assertFalse(lock0.isHeldByCurrentThread());
startedThreads.add(Thread.currentThread());
boolean isInterrupted = false;
lock0.lock();
IgniteCondition cond = lock0.getOrCreateCondition("cond");
try {
cond.awaitUninterruptibly();
}
catch (IgniteInterruptedException ignored) {
isInterrupted = true;
}
finally {
// Assert that thread was not interrupted.
assertFalse(isInterrupted);
// Assert that lock is still locked.
assertTrue(lock0.isLocked());
// Assert that this thread does own the lock.
assertTrue(lock0.isHeldByCurrentThread());
// Clear interrupt flag.
assertTrue(Thread.interrupted());
// Release lock.
if (lock0.isHeldByCurrentThread())
lock0.unlock();
}
return null;
}
}, totalThreads);
// Wait for all threads to attempt to acquire lock.
while (startedThreads.size() != totalThreads) {
Thread.sleep(500);
}
lock0.lock();
for (Thread t : startedThreads) {
t.interrupt();
lock0.getOrCreateCondition("cond").signal();
}
lock0.unlock();
fut.get();
assertFalse(lock0.isLocked());
for (Thread t : startedThreads)
assertFalse(lock0.hasQueuedThread(t));
lock0.close();
}
/**
* @throws Exception If failed.
*/
@Test
public void testConditionInterruptAwait() throws Exception {
testConditionInterruptAwait(false);
testConditionInterruptAwait(true);
}
/**
* @throws Exception If failed.
*/
private void testConditionInterruptAwait(final boolean fair) throws Exception {
final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true);
assertEquals(0, lock0.getHoldCount());
assertFalse(lock0.hasQueuedThreads());
final int totalThreads = 2;
final Set<Thread> startedThreads = new GridConcurrentHashSet<>();
IgniteInternalFuture<?> fut = multithreadedAsync(
new Callable<Object>() {
@Override public Object call() throws Exception {
assertFalse(lock0.isHeldByCurrentThread());
startedThreads.add(Thread.currentThread());
boolean isInterrupted = false;
lock0.lock();
IgniteCondition cond = lock0.getOrCreateCondition("cond");
try {
cond.await();
}
catch (IgniteInterruptedException ignored) {
isInterrupted = true;
}
finally {
// Assert that thread was interrupted.
assertTrue(isInterrupted);
// Assert that lock is still locked.
assertTrue(lock0.isLocked());
// Assert that this thread does own the lock.
assertTrue(lock0.isHeldByCurrentThread());
// Release lock.
if (lock0.isHeldByCurrentThread())
lock0.unlock();
}
return null;
}
}, totalThreads);
// Wait for all threads to attempt to acquire lock.
while (startedThreads.size() != totalThreads) {
Thread.sleep(500);
}
for (Thread t : startedThreads)
t.interrupt();
fut.get();
assertFalse(lock0.isLocked());
for (Thread t : startedThreads)
assertFalse(lock0.hasQueuedThread(t));
lock0.close();
}
/**
* @throws Exception If failed.
*/
@Test
public void testHasQueuedThreads() throws Exception {
testHasQueuedThreads(false);
testHasQueuedThreads(true);
}
/**
* @throws Exception If failed.
*/
private void testHasQueuedThreads(final boolean fair) throws Exception {
final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true);
assertEquals(0, lock0.getHoldCount());
assertFalse(lock0.hasQueuedThreads());
final int totalThreads = 5;
final Set<Thread> startedThreads = new GridConcurrentHashSet<>();
final Set<Thread> finishedThreads = new GridConcurrentHashSet<>();
IgniteInternalFuture<?> fut = multithreadedAsync(
new Callable<Object>() {
@Override public Object call() throws Exception {
assertFalse(lock0.isHeldByCurrentThread());
startedThreads.add(Thread.currentThread());
lock0.lock();
// Wait until every thread tries to lock.
do {
Thread.sleep(1000);
}
while (startedThreads.size() != totalThreads);
try {
info("Acquired in separate thread. ");
assertTrue(lock0.isHeldByCurrentThread());
assertFalse(lock0.hasQueuedThread(Thread.currentThread()));
finishedThreads.add(Thread.currentThread());
if (startedThreads.size() != finishedThreads.size()) {
assertTrue(lock0.hasQueuedThreads());
}
for (Thread t : startedThreads) {
assertTrue(lock0.hasQueuedThread(t) != finishedThreads.contains(t));
}
}
finally {
lock0.unlock();
assertFalse(lock0.isHeldByCurrentThread());
}
return null;
}
}, totalThreads);
fut.get();
assertFalse(lock0.hasQueuedThreads());
for (Thread t : startedThreads)
assertFalse(lock0.hasQueuedThread(t));
lock0.close();
}
/**
* @throws Exception If failed.
*/
@Test
public void testHasConditionQueuedThreads() throws Exception {
testHasConditionQueuedThreads(false);
testHasConditionQueuedThreads(true);
}
/**
* @throws Exception If failed.
*/
private void testHasConditionQueuedThreads(final boolean fair) throws Exception {
final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true);
assertEquals(0, lock0.getHoldCount());
assertFalse(lock0.hasQueuedThreads());
final int totalThreads = 5;
final Set<Thread> startedThreads = new GridConcurrentHashSet<>();
final Set<Thread> finishedThreads = new GridConcurrentHashSet<>();
IgniteInternalFuture<?> fut = multithreadedAsync(
new Callable<Object>() {
@Override public Object call() throws Exception {
assertFalse(lock0.isHeldByCurrentThread());
IgniteCondition cond = lock0.getOrCreateCondition("cond");
lock0.lock();
startedThreads.add(Thread.currentThread());
// Wait until every thread tries to lock.
do {
cond.await();
Thread.sleep(1000);
}
while (startedThreads.size() != totalThreads);
try {
info("Acquired in separate thread. Number of threads waiting on condition: "
+ lock0.getWaitQueueLength(cond));
assertTrue(lock0.isHeldByCurrentThread());
assertFalse(lock0.hasQueuedThread(Thread.currentThread()));
finishedThreads.add(Thread.currentThread());
if (startedThreads.size() != finishedThreads.size()) {
assertTrue(lock0.hasWaiters(cond));
}
for (Thread t : startedThreads) {
if (!finishedThreads.contains(t))
assertTrue(lock0.hasWaiters(cond));
}
assertTrue(lock0.getWaitQueueLength(cond) == (startedThreads.size() - finishedThreads.size()));
}
finally {
cond.signal();
lock0.unlock();
assertFalse(lock0.isHeldByCurrentThread());
}
return null;
}
}, totalThreads);
IgniteCondition cond = lock0.getOrCreateCondition("cond");
lock0.lock();
try {
// Wait until all threads are waiting on condition.
while (lock0.getWaitQueueLength(cond) != totalThreads) {
lock0.unlock();
Thread.sleep(1000);
lock0.lock();
}
// Signal once to get things started.
cond.signal();
}
finally {
lock0.unlock();
}
fut.get();
assertFalse(lock0.hasQueuedThreads());
for (Thread t : startedThreads)
assertFalse(lock0.hasQueuedThread(t));
lock0.close();
}
/**
* Tests that closed lock throws meaningful exception.
*/
@Test (expected = IgniteException.class)
public void testClosedLockThrowsIgniteException() {
final String lockName = "lock";
IgniteLock lock1 = grid(0).reentrantLock(lockName, false, false, true);
IgniteLock lock2 = grid(0).reentrantLock(lockName, false, false, true);
lock1.close();
try {
lock2.lock();
}
catch (IgniteException e) {
String msg = String.format("Failed to find reentrant lock with given name: " + lockName);
assertEquals(msg, e.getMessage());
throw e;
}
}
/**
* Tests if lock is evenly acquired among nodes when fair flag is set on.
* Since exact ordering of lock acquisitions cannot be guaranteed because it also depends
* on the OS thread scheduling, certain deviation from uniform distribution is tolerated.
* @throws Exception If failed.
*/
@Test
public void testFairness() throws Exception {
if (gridCount() == 1)
return;
// Total number of ops.
final long opsCount = 10000;
// Allowed deviation from uniform distribution.
final double tolerance = 0.05;
// Shared counter.
final String OPS_COUNTER = "ops_counter";
// Number of threads, one per node.
final int threadCount = gridCount();
final AtomicLong threadCounter = new AtomicLong(0);
Ignite ignite = startGrid(gridCount());
// Initialize reentrant lock.
IgniteLock l = ignite.reentrantLock("lock", true, true, true);
// Initialize OPS_COUNTER.
ignite.getOrCreateCache(OPS_COUNTER).put(OPS_COUNTER, (long)0);
final Map<Integer, Long> counts = new ConcurrentHashMap<>();
IgniteInternalFuture<?> fut = multithreadedAsync(
new Callable<Object>() {
@Override public Object call() throws Exception {
final int localNodeId = (int)threadCounter.getAndIncrement();
final Ignite grid = grid(localNodeId);
IgniteClosure<Ignite, Long> closure = new IgniteClosure<Ignite, Long>() {
@Override public Long apply(Ignite ignite) {
IgniteLock l = ignite.reentrantLock("lock", true, true, true);
long localCount = 0;
IgniteCountDownLatch latch = ignite.countDownLatch("latch", threadCount, false, true);
latch.countDown();
latch.await();
while (true) {
l.lock();
try {
long opsCounter = (long)ignite.getOrCreateCache(OPS_COUNTER).get(OPS_COUNTER);
if (opsCounter == opsCount)
break;
ignite.getOrCreateCache(OPS_COUNTER).put(OPS_COUNTER, ++opsCounter);
localCount++;
if (localCount > 1000) {
assertTrue(localCount < (1 + tolerance) * opsCounter / threadCount);
assertTrue(localCount > (1 - tolerance) * opsCounter / threadCount);
}
if (localCount % 100 == 0) {
info("Node [id=" + ignite.cluster().localNode().id() + "] acquired " +
localCount + " times. " + "Total ops count: " +
opsCounter + "/" + opsCount + "]");
}
}
finally {
l.unlock();
}
}
return localCount;
}
};
long localCount = closure.apply(grid);
counts.put(localNodeId, localCount);
return null;
}
}, threadCount);
fut.get();
long totalSum = 0;
for (int i = 0; i < gridCount(); i++) {
totalSum += counts.get(i);
info("Node " + grid(i).localNode().id() + " acquired the lock " + counts.get(i) + " times. ");
}
assertEquals(totalSum, opsCount);
l.close();
ignite.close();
}
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
// No-op.
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
// No-op.
}
}