blob: 01af1f9c891cb2aeb9358687c6f2af86b3063193 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.distributed.internal.deadlock;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.execute.Function;
import com.gemstone.gemfire.cache.execute.FunctionContext;
import com.gemstone.gemfire.cache.execute.FunctionService;
import com.gemstone.gemfire.cache.execute.ResultCollector;
import com.gemstone.gemfire.cache30.CacheTestCase;
import com.gemstone.gemfire.distributed.DistributedLockService;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import dunit.Host;
import dunit.SerializableCallable;
import dunit.SerializableRunnable;
import dunit.VM;
/**
* @author dsmith
*
*/
public class GemFireDeadlockDetectorDUnitTest extends CacheTestCase {
private static final Set<Thread> stuckThreads = Collections.synchronizedSet(new HashSet<Thread>());
@Override
public void tearDown2() throws Exception {
invokeInEveryVM(new SerializableRunnable() {
public void run() {
for(Thread thread: stuckThreads) {
thread.interrupt();
}
}
});
}
public GemFireDeadlockDetectorDUnitTest(String name) {
super(name);
}
public void testNoDeadlock() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
//Make sure a deadlock from a previous test is cleared.
disconnectAllFromDS();
createCache(vm0);
createCache(vm1);
getSystem();
GemFireDeadlockDetector detect = new GemFireDeadlockDetector();
assertEquals(null, detect.find().findCycle());
}
private static final Lock lock = new ReentrantLock();
public void testDistributedDeadlockWithFunction() throws InterruptedException {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
getSystem();
InternalDistributedMember member1 = createCache(vm0);
final InternalDistributedMember member2 = createCache(vm1);
//Have two threads lock locks on different members in different orders.
//This thread locks the lock member1 first, then member2.
lockTheLocks(vm0, member2);
//This thread locks the lock member2 first, then member1.
lockTheLocks(vm1, member1);
Thread.sleep(5000);
GemFireDeadlockDetector detect = new GemFireDeadlockDetector();
LinkedList<Dependency> deadlock = detect.find().findCycle();
getLogWriter().info("Deadlock=" + DeadlockDetector.prettyFormat(deadlock));
assertEquals(8, deadlock.size());
}
private void lockTheLocks(VM vm0, final InternalDistributedMember member) {
vm0.invokeAsync(new SerializableRunnable() {
public void run() {
lock.lock();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
fail("interrupted", e);
}
ResultCollector collector = FunctionService.onMember(member).execute(new TestFunction());
//wait the function to lock the lock on member.
collector.getResult();
lock.unlock();
}
});
}
public void testDistributedDeadlockWithDLock() throws InterruptedException {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
lockTheDLocks(vm0, "one", "two");
lockTheDLocks(vm1, "two", "one");
getSystem();
GemFireDeadlockDetector detect = new GemFireDeadlockDetector();
LinkedList<Dependency> deadlock = null;
for(int i =0; i < 60; i ++) {
deadlock = detect.find().findCycle();
if(deadlock != null) {
break;
}
Thread.sleep(1000);
}
assertTrue(deadlock != null);
getLogWriter().info("Deadlock=" + DeadlockDetector.prettyFormat(deadlock));
assertEquals(4, deadlock.size());
}
private void lockTheDLocks(VM vm, final String first, final String second) {
vm.invokeAsync(new SerializableRunnable() {
public void run() {
getCache();
DistributedLockService dls = DistributedLockService.create("deadlock_test", getSystem());
dls.lock(first, 10 * 1000, -1);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
dls.lock(second, 10 * 1000, -1);
}
});
}
private InternalDistributedMember createCache(VM vm) {
return (InternalDistributedMember) vm.invoke(new SerializableCallable() {
public Object call() {
getCache();
return getSystem().getDistributedMember();
}
});
}
private static class TestFunction implements Function {
private static final int LOCK_WAIT_TIME = 1000;
public boolean hasResult() {
return true;
}
public void execute(FunctionContext context) {
try {
stuckThreads.add(Thread.currentThread());
lock.tryLock(LOCK_WAIT_TIME, TimeUnit.SECONDS);
} catch (InterruptedException e) {
//ingore
}
context.getResultSender().lastResult(null);
}
public String getId() {
return getClass().getCanonicalName();
}
public boolean optimizeForWrite() {
return false;
}
public boolean isHA() {
return false;
}
}
}