blob: b87b4326a92cea97171040452a8ad37a1cc44ed9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.testframework.GridTestThread;
import org.apache.ignite.testframework.MvccFeatureChecker;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import static org.apache.ignite.events.EventType.EVTS_CACHE;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_UNLOCKED;
/**
* Test cases for multi-threaded tests.
*/
public abstract class GridCacheMultiNodeLockAbstractTest extends GridCommonAbstractTest {
/** */
private static final String CACHE2 = "cache2";
/** Grid 1. */
private static Ignite ignite1;
/** Grid 2. */
private static Ignite ignite2;
/** Listeners. */
private static Collection<IgnitePredicate<Event>> lsnrs = new ArrayList<>();
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.ENTRY_LOCK);
MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_EVENTS);
super.beforeTest();
}
/**
*
*/
protected GridCacheMultiNodeLockAbstractTest() {
super(false /*start grid. */);
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
CacheConfiguration ccfg1 = cacheConfiguration().setName(DEFAULT_CACHE_NAME);
CacheConfiguration ccfg2 = cacheConfiguration().setName(CACHE2);
cfg.setCacheConfiguration(ccfg1, ccfg2);
cfg.setIncludeEventTypes(EventType.EVTS_ALL);
return cfg;
}
/**
* @return Cache configuration.
*/
protected abstract CacheConfiguration cacheConfiguration();
/**
* @return {@code True} for partitioned caches.
*/
protected boolean partitioned() {
return false;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
ignite1 = startGrid(1);
ignite2 = startGrid(2);
startGrid(3);
// Make sure topology is stable to avoid topology deadlocks on lock aquisiotion.
awaitPartitionMapExchange();
}
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
ignite1 = null;
ignite2 = null;
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
removeListeners(ignite1);
removeListeners(ignite2);
lsnrs.clear();
for (int i = 1; i <= 3; i++) {
jcache(i).clear();
assertTrue(
"Cache isn't empty [i=" + i + ", entries=" + ((IgniteKernal)grid(i)).internalCache(DEFAULT_CACHE_NAME).entries() + "]",
jcache(i).localSize() == 0);
}
}
/**
* @param ignite Grid to remove listeners from.
*/
private void removeListeners(Ignite ignite) {
for (IgnitePredicate<Event> lsnr : lsnrs)
ignite.events().stopLocalListen(lsnr);
}
/**
* @param ignite Grid
* @param lsnr Listener.
*/
void addListener(Ignite ignite, IgnitePredicate<Event> lsnr) {
if (!lsnrs.contains(lsnr))
lsnrs.add(lsnr);
ignite.events().localListen(lsnr, EVTS_CACHE);
}
/**
* @param cache Cache.
* @param key Key.
*/
private void checkLocked(IgniteCache<Integer,String> cache, Integer key) {
assert cache.isLocalLocked(key, false);
assert cache.isLocalLocked(key, true);
}
/**
* @param cache Cache.
* @param key Key.
*/
private void checkRemoteLocked(IgniteCache<Integer,String> cache, Integer key) {
assert cache.isLocalLocked(key, false);
assert !cache.isLocalLocked(key, true);
}
/**
* @param cache Cache.
* @param key Key.
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings({"BusyWait"})
private void checkUnlocked(IgniteCache<Integer,String> cache, Integer key) throws IgniteCheckedException {
assert !cache.isLocalLocked(key, true);
if (partitioned()) {
for (int i = 0; i < 200; i++)
if (cache.isLocalLocked(key, false)) {
try {
Thread.sleep(10);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
else
return;
}
assertFalse("Key locked [key=" + key + ", entries=" + entries(key) + "]", cache.isLocalLocked(key, false));
}
/**
* @param cache Cache.
* @param keys Keys.
*/
private void checkLocked(IgniteCache<Integer,String> cache, Iterable<Integer> keys) {
for (Integer key : keys)
checkLocked(cache, key);
}
/**
* @param cache Cache.
* @param keys Keys.
*/
private void checkRemoteLocked(IgniteCache<Integer,String> cache, Iterable<Integer> keys) {
for (Integer key : keys)
checkRemoteLocked(cache, key);
}
/**
* @param cache Cache.
* @param keys Keys.
* @throws IgniteCheckedException If failed.
*/
private void checkUnlocked(IgniteCache<Integer,String> cache, Iterable<Integer> keys) throws IgniteCheckedException {
for (Integer key : keys)
checkUnlocked(cache, key);
}
/**
*
* @throws Exception If test failed.
*/
@Test
public void testBasicLock() throws Exception {
IgniteCache<Integer, String> cache = ignite1.cache(DEFAULT_CACHE_NAME);
Lock lock = cache.lock(1);
lock.lock();
assert cache.isLocalLocked(1, false);
assert cache.isLocalLocked(1, true);
lock.unlock();
checkUnlocked(cache, 1);
}
/**
* Entries for key.
*
* @param key Key.
* @return Entries.
* @throws IgniteCheckedException If failed.
*/
private String entries(int key) throws IgniteCheckedException {
if (partitioned()) {
GridNearCacheAdapter<Integer, String> near1 = near(1);
GridNearCacheAdapter<Integer, String> near2 = near(2);
GridDhtCacheAdapter<Integer, String> dht1 = dht(1);
GridDhtCacheAdapter<Integer, String> dht2 = dht(2);
return "Entries [ne1=" + near1.peekEx(key) + ", de1=" + dht1.peekEx(key) + ", ne2=" + near2.peekEx(key) +
", de2=" + dht2.peekEx(key) + ']';
}
return "Entries [e1=" + "(" + key + ", " + ((IgniteKernal)ignite1).internalCache(DEFAULT_CACHE_NAME).get(key) + ")"
+ ", e2=" + "(" + key + ", " + ((IgniteKernal)ignite2).internalCache(DEFAULT_CACHE_NAME).get(key) + ")" + ']';
}
/**
* @throws Exception If test fails.
*/
@Test
public void testMultiNodeLock() throws Exception {
IgniteCache<Integer, String> cache1 = ignite1.cache(DEFAULT_CACHE_NAME);
IgniteCache<Integer, String> cache2 = ignite2.cache(DEFAULT_CACHE_NAME);
Lock lock1_1 = cache1.lock(1);
Lock lock2_1 = cache2.lock(1);
lock1_1.lock();
try {
assert cache1.isLocalLocked(1, false) : entries(1);
assert cache1.isLocalLocked(1, true);
assert cache2.isLocalLocked(1, false) : entries(1);
assert !cache2.isLocalLocked(1, true);
assert !lock2_1.tryLock();
assert cache2.isLocalLocked(1, false) : entries(1);
assert !cache2.isLocalLocked(1, true);
}
finally {
lock1_1.unlock();
checkUnlocked(cache1, 1);
}
CountDownLatch latch = new CountDownLatch(1);
lock2_1.lock();
try {
assert cache2.isLocalLocked(1, false) : entries(1);
assert cache2.isLocalLocked(1, true);
assert cache1.isLocalLocked(1, false) : entries(1);
assert !cache1.isLocalLocked(1, true);
addListener(ignite1, new UnlockListener(latch, 1));
assert !lock1_1.tryLock();
assert cache1.isLocalLocked(1, false) : entries(1);
assert !cache1.isLocalLocked(1, true);
}
finally {
lock2_1.unlock();
}
latch.await();
checkUnlocked(cache1, 1);
checkUnlocked(cache2, 1);
}
/**
* @throws Exception If test fails.
*/
@Test
public void testMultiNodeLockWithKeyLists() throws Exception {
IgniteCache<Integer, String> cache1 = ignite1.cache(DEFAULT_CACHE_NAME);
IgniteCache<Integer, String> cache2 = ignite2.cache(DEFAULT_CACHE_NAME);
Collection<Integer> keys1 = Arrays.asList(1, 2, 3);
Collection<Integer> keys2 = Arrays.asList(2, 3, 4);
Lock lock1_1 = cache1.lockAll(keys1);
Lock lock2_2 = cache2.lockAll(keys2);
lock1_1.lock();
checkLocked(cache1, keys1);
try {
assert !lock2_2.tryLock();
assert cache2.isLocalLocked(2, false);
assert cache2.isLocalLocked(3, false);
checkUnlocked(cache1, 4);
checkUnlocked(cache2, 4);
assert !cache2.isLocalLocked(2, true);
assert !cache2.isLocalLocked(3, true);
assert !cache2.isLocalLocked(4, true);
}
finally {
lock1_1.unlock();
}
checkUnlocked(cache1, keys1);
checkUnlocked(cache1, keys2);
checkUnlocked(cache2, 4);
lock2_2.lock();
CountDownLatch latch1 = new CountDownLatch(keys2.size());
CountDownLatch latch2 = new CountDownLatch(1);
addListener(ignite2, new UnlockListener(latch2, 1));
addListener(ignite1, (new UnlockListener(latch1, keys2)));
Lock lock1_ = cache1.lock(1);
try {
checkLocked(cache2, keys2);
checkUnlocked(cache2, 1);
assert lock1_.tryLock();
checkLocked(cache1, 1);
checkRemoteLocked(cache1, keys2);
checkRemoteLocked(cache2, 1);
}
finally {
lock2_2.unlock();
lock1_.unlock();
}
latch1.await();
latch2.await();
checkUnlocked(cache1, keys1);
checkUnlocked(cache2, keys1);
checkUnlocked(cache1, keys2);
checkUnlocked(cache2, keys2);
}
/**
* @throws IgniteCheckedException If test failed.
*/
@Test
public void testLockReentry() throws IgniteCheckedException {
IgniteCache<Integer, String> cache = ignite1.cache(DEFAULT_CACHE_NAME);
Lock lock = cache.lock(1);
lock.lock();
try {
checkLocked(cache, 1);
lock.lock();
checkLocked(cache, 1);
lock.unlock();
checkLocked(cache, 1);
}
finally {
lock.unlock();
}
checkUnlocked(cache, 1);
}
/**
* @throws Exception If test failed.
*/
@Test
public void testLockMultithreaded() throws Exception {
final IgniteCache<Integer, String> cache = ignite1.cache(DEFAULT_CACHE_NAME);
final CountDownLatch l1 = new CountDownLatch(1);
final CountDownLatch l2 = new CountDownLatch(1);
final Lock lock1 = cache.lock(1);
GridTestThread t1 = new GridTestThread(new Callable<Object>() {
/** {@inheritDoc} */
@Nullable @Override public Object call() throws Exception {
info("Before lock for.key 1");
lock1.lock();
info("After lock for key 1");
try {
checkLocked(cache, 1);
l1.countDown();
info("Let thread2 proceed.");
// Reentry.
lock1.lock();
checkLocked(cache, 1);
// Nested lock.
Lock lock2 = cache.lock(2);
assert lock2.tryLock();
checkLocked(cache, 2);
// Unlock reentry.
lock1.unlock();
// Outer should still be owned.
checkLocked(cache, 1);
// Unlock in reverse order.
lock2.unlock();
checkUnlocked(cache, 2);
l2.await();
info("Waited for latch 2");
}
finally {
lock1.unlock();
info("Unlocked entry for key 1.");
}
assert !cache.isLocalLocked(1, true);
assert !cache.isLocalLocked(2, true);
return null;
}
});
GridTestThread t2 = new GridTestThread(new Callable<Object>() {
/** {@inheritDoc} */
@Nullable @Override public Object call() throws Exception {
info("Waiting for latch1...");
l1.await();
info("Latch1 released.");
assert !lock1.tryLock();
info("Tried to lock cache for key1");
l2.countDown();
info("Released latch2");
lock1.lock();
try {
info("Locked cache for key 1");
checkLocked(cache, 1);
info("Checked that cache is locked for key 1");
}
finally {
lock1.unlock();
info("Unlocked cache for key 1");
}
checkUnlocked(cache, 1);
return null;
}
});
t1.start();
t2.start();
t1.join();
t2.join();
t1.checkError();
t2.checkError();
checkUnlocked(cache, 1);
checkUnlocked(cache, 2);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTwoCaches() throws Exception {
IgniteCache<Integer, String> cache1 = ignite1.cache(DEFAULT_CACHE_NAME);
IgniteCache<Integer, String> cache2 = ignite1.cache(CACHE2);
final Integer key = primaryKey(cache1);
Lock lock = cache1.lock(key);
lock.lock();
try {
assertTrue(cache1.isLocalLocked(key, true));
assertTrue(cache1.isLocalLocked(key, false));
assertFalse(cache2.isLocalLocked(key, true));
assertFalse(cache2.isLocalLocked(key, false));
}
finally {
lock.unlock();
}
}
/**
* Cache unlock listener.
*/
private class UnlockListener implements IgnitePredicate<Event> {
/** Latch. */
private final CountDownLatch latch;
/** */
private final Collection<Integer> keys;
/**
* @param latch Latch.
* @param keys Keys.
*/
UnlockListener(CountDownLatch latch, Integer... keys) {
this(latch, Arrays.asList(keys));
}
/**
* @param latch Latch.
* @param keys Keys.
*/
UnlockListener(CountDownLatch latch, Collection<Integer> keys) {
assert latch != null;
assert keys != null;
this.latch = latch;
this.keys = keys;
}
/** {@inheritDoc} */
@Override public boolean apply(Event evt) {
info("Received cache event: " + evt);
if (evt instanceof CacheEvent) {
CacheEvent cacheEvt = (CacheEvent)evt;
Integer key = cacheEvt.key();
if (keys.contains(key))
if (evt.type() == EVT_CACHE_OBJECT_UNLOCKED)
latch.countDown();
}
return true;
}
}
}