blob: e08b50784c7b9189b7baecd3b5eda2a61741bbb2 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache.partitioned;
import java.io.IOException;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import com.gemstone.gemfire.InternalGemFireError;
import com.gemstone.gemfire.admin.AdminDistributedSystemFactory;
import com.gemstone.gemfire.admin.AdminException;
import com.gemstone.gemfire.admin.DistributedSystemConfig;
import com.gemstone.gemfire.admin.internal.AdminDistributedSystemImpl;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheClosedException;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.DiskStore;
import com.gemstone.gemfire.cache.EvictionAction;
import com.gemstone.gemfire.cache.EvictionAttributes;
import com.gemstone.gemfire.cache.MirrorType;
import com.gemstone.gemfire.cache.PartitionAttributesFactory;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.client.ServerConnectivityException;
import com.gemstone.gemfire.cache.server.CacheServer;
import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.PartitionAttributesFactory;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
import com.gemstone.gemfire.cache30.CacheTestCase;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.internal.admin.remote.ShutdownAllRequest;
import com.gemstone.gemfire.internal.cache.DiskRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.PutAllPartialResultException;
import com.gemstone.gemfire.internal.cache.control.InternalResourceManager;
import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceObserver;
import dunit.AsyncInvocation;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.RMIException;
import dunit.SerializableCallable;
import dunit.SerializableRunnable;
import dunit.VM;
/**
* Tests the basic use cases for PR persistence.
* @author xzhou
*
*/
public class ShutdownAllDUnitTest extends CacheTestCase {
protected static HangingCacheListener listener;
/**
* @param name
*/
final String expectedExceptions = InternalGemFireError.class.getName()+"||ShutdownAllRequest: disconnect distributed without response";
public ShutdownAllDUnitTest(String name) {
super(name);
}
/**
*
*/
private static final int MAX_WAIT = 600 * 1000;
@Override
public void setUp() throws Exception {
super.setUp();
//Get rid of any existing distributed systems. We want
//to make assertions about the number of distributed systems
//we shut down, so we need to start with a clean slate.
DistributedTestCase.disconnectAllFromDS();
}
public void testShutdownAll2Servers() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
int numBuckets = 50;
createRegion(vm0, "region", "disk", true, 1);
createRegion(vm1, "region", "disk", true, 1);
createData(vm0, 0, numBuckets, "a", "region");
Set<Integer> vm0Buckets = getBucketList(vm0, "region");
Set<Integer> vm1Buckets = getBucketList(vm1, "region");
assertEquals(vm0Buckets, vm1Buckets);
shutDownAllMembers(vm2, 2);
assertTrue(InternalDistributedSystem.getExistingSystems().isEmpty());
// restart vm0
AsyncInvocation a0 = createRegionAsync(vm0, "region", "disk", true, 1);
// restart vm1
AsyncInvocation a1 = createRegionAsync(vm1, "region", "disk", true, 1);
a0.getResult(MAX_WAIT);
a1.getResult(MAX_WAIT);
assertEquals(vm0Buckets, getBucketList(vm0, "region"));
// checkRecoveredFromDisk(vm0, 0, true);
// checkRecoveredFromDisk(vm1, 0, false);
checkData(vm0, 0, numBuckets, "a", "region");
checkData(vm1, 0, numBuckets, "a", "region");
createData(vm0, numBuckets, 113, "b", "region");
checkData(vm0, numBuckets, 113, "b", "region");
}
public void testShutdownAllWithEncounterIGE1() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
int numBuckets = 50;
createRegion(vm0, "region", "disk", true, 1);
createData(vm0, 0, numBuckets, "a", "region");
vm0.invoke(addExceptionTag1(expectedExceptions));
invokeInEveryVM(new SerializableRunnable("set TestInternalGemFireError") {
public void run() {
System.setProperty("TestInternalGemFireError", "true");
}
});
shutDownAllMembers(vm0, 1);
assertTrue(InternalDistributedSystem.getExistingSystems().isEmpty());
invokeInEveryVM(new SerializableRunnable("reset TestInternalGemFireError") {
public void run() {
System.setProperty("TestInternalGemFireError", "false");
}
});
vm0.invoke(removeExceptionTag1(expectedExceptions));
}
public void testShutdownAllWithEncounterIGE2() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
int numBuckets = 50;
createRegion(vm0, "region", "disk", true, 1);
createRegion(vm1, "region", "disk", true, 1);
createData(vm0, 0, numBuckets, "a", "region");
Set<Integer> vm0Buckets = getBucketList(vm0, "region");
Set<Integer> vm1Buckets = getBucketList(vm1, "region");
assertEquals(vm0Buckets, vm1Buckets);
vm0.invoke(addExceptionTag1(expectedExceptions));
vm1.invoke(addExceptionTag1(expectedExceptions));
invokeInEveryVM(new SerializableRunnable("set TestInternalGemFireError") {
public void run() {
System.setProperty("TestInternalGemFireError", "true");
}
});
shutDownAllMembers(vm2, 0);
assertTrue(InternalDistributedSystem.getExistingSystems().isEmpty());
invokeInEveryVM(new SerializableRunnable("reset TestInternalGemFireError") {
public void run() {
System.setProperty("TestInternalGemFireError", "false");
}
});
vm0.invoke(removeExceptionTag1(expectedExceptions));
vm1.invoke(removeExceptionTag1(expectedExceptions));
}
public void testShutdownAllOneServerAndRecover() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm2 = host.getVM(2);
createRegion(vm0, "region", "disk", true, 0);
createData(vm0, 0, 1, "a", "region");
shutDownAllMembers(vm2, 1);
assertTrue(InternalDistributedSystem.getExistingSystems().isEmpty());
// restart vm0
createRegion(vm0, "region", "disk", true, 0);
checkPRRecoveredFromDisk(vm0, "region", 0, true);
createData(vm0, 1, 10, "b", "region");
}
public void testPRWithDR() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm2 = host.getVM(2);
createRegion(vm0, "region_pr", "disk", true, 0);
createRegion(vm0, "region_dr", "disk", false, 0);
createData(vm0, 0, 1, "a", "region_pr");
createData(vm0, 0, 1, "c", "region_dr");
shutDownAllMembers(vm2, 1);
assertTrue(InternalDistributedSystem.getExistingSystems().isEmpty());
// restart vm0
createRegion(vm0, "region_pr", "disk", true, 0);
createRegion(vm0, "region_dr", "disk", false, 0);
checkPRRecoveredFromDisk(vm0, "region_pr", 0, true);
checkData(vm0, 0, 1, "a", "region_pr");
checkData(vm0, 0, 1, "c", "region_dr");
}
public void testShutdownAllFromServer() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
int numBuckets = 50;
createRegion(vm0, "region", "disk", true, 1);
createRegion(vm1, "region", "disk", true, 1);
createRegion(vm2, "region", "disk", true, 1);
createData(vm0, 0, numBuckets, "a", "region");
shutDownAllMembers(vm2, 3);
assertTrue(InternalDistributedSystem.getExistingSystems().isEmpty());
// restart vm0, vm1, vm2
AsyncInvocation a0 = createRegionAsync(vm0, "region", "disk", true, 1);
AsyncInvocation a1 = createRegionAsync(vm1, "region", "disk", true, 1);
AsyncInvocation a2 = createRegionAsync(vm2, "region", "disk", true, 1);
a0.getResult(MAX_WAIT);
a1.getResult(MAX_WAIT);
a2.getResult(MAX_WAIT);
createData(vm0, 0, numBuckets, "a", "region");
createData(vm1, 0, numBuckets, "a", "region");
createData(vm2, 0, numBuckets, "a", "region");
createData(vm0, numBuckets, 113, "b", "region");
checkData(vm0, numBuckets, 113, "b", "region");
}
// shutdownAll, then restart to verify
public void testCleanStop() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
createRegion(vm0, "region", "disk", true, 1);
createRegion(vm1, "region", "disk", true, 1);
createData(vm0, 0, 1, "a", "region");
shutDownAllMembers(vm2, 2);
AsyncInvocation a0 = createRegionAsync(vm0, "region", "disk", true, 1);
//[dsmith] Make sure that vm0 is waiting for vm1 to recover
//If VM(0) recovers early, that is a problem, because we
//are no longer doing a clean recovery.
Thread.sleep(500);
assertTrue(a0.isAlive());
AsyncInvocation a1 = createRegionAsync(vm1, "region", "disk", true, 1);
a0.getResult(MAX_WAIT);
a1.getResult(MAX_WAIT);
checkData(vm0, 0, 1, "a", "region");
checkData(vm1, 0, 1, "a", "region");
checkPRRecoveredFromDisk(vm0, "region", 0, true);
checkPRRecoveredFromDisk(vm1, "region", 0, true);
closeRegion(vm0, "region");
closeRegion(vm1, "region");
a0 = createRegionAsync(vm0, "region", "disk", true, 1);
a1 = createRegionAsync(vm1, "region", "disk", true, 1);
a0.getResult(MAX_WAIT);
a1.getResult(MAX_WAIT);
checkData(vm0, 0, 1, "a", "region");
checkData(vm1, 0, 1, "a", "region");
checkPRRecoveredFromDisk(vm0, "region", 0, false);
checkPRRecoveredFromDisk(vm1, "region", 0, true);
}
// shutdownAll, then restart to verify
public void testCleanStopWithConflictCachePort() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
createRegion(vm0, "region", "disk", true, 1);
createRegion(vm1, "region", "disk", true, 1);
// 2 vms use the same port no to conflict
addCacheServer(vm0, 34505);
addCacheServer(vm1, 34505);
createData(vm0, 0, 1, "a", "region");
shutDownAllMembers(vm2, 2);
AsyncInvocation a0 = createRegionAsync(vm0, "region", "disk", true, 1);
//[dsmith] Make sure that vm0 is waiting for vm1 to recover
//If VM(0) recovers early, that is a problem, because we
//are no longer doing a clean recovery.
Thread.sleep(500);
assertTrue(a0.isAlive());
AsyncInvocation a1 = createRegionAsync(vm1, "region", "disk", true, 1);
a0.getResult(MAX_WAIT);
a1.getResult(MAX_WAIT);
addCacheServer(vm0, 34505);
addCacheServer(vm1, 34506);
checkData(vm0, 0, 1, "a", "region");
checkData(vm1, 0, 1, "a", "region");
checkPRRecoveredFromDisk(vm0, "region", 0, true);
checkPRRecoveredFromDisk(vm1, "region", 0, true);
}
/*
public void testStopNonPersistRegions() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
createRegion(vm0, "region", null, true, 1);
createRegion(vm1, "region", "disk", true, 1);
createData(vm0, 0, 1, "a", "region");
shutDownAllMembers(vm2, 2);
// restart vms, and let vm0 to do GII from vm0
createRegion(vm1, "region", "disk", true, 1);
createRegion(vm0, "region", null, true, 1);
checkData(vm0, 0, 1, "a", "region");
checkData(vm1, 0, 1, "a", "region");
checkPRRecoveredFromDisk(vm1, "region", 0, true);
checkPRRecoveredFromDisk(vm0, "region", 0, false);
}
*/
public void testMultiPRDR() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm2 = host.getVM(2);
createRegion(vm0, "region_pr1", "disk1", true, 0);
createRegion(vm0, "region_pr2", "disk1", true, 0);
createRegion(vm0, "region_pr3", "disk1", true, 0);
createRegion(vm0, "region_dr1", "disk2", false, 0);
createRegion(vm0, "region_dr2", "disk2", false, 0);
createData(vm0, 0, 1, "a", "region_pr1");
createData(vm0, 0, 1, "b", "region_pr2");
createData(vm0, 0, 1, "c", "region_pr3");
createData(vm0, 0, 1, "d", "region_dr1");
createData(vm0, 0, 1, "e", "region_dr2");
shutDownAllMembers(vm2, 1);
assertTrue(InternalDistributedSystem.getExistingSystems().isEmpty());
// restart vm0
createRegion(vm0, "region_pr1", "disk1", true, 0);
createRegion(vm0, "region_pr2", "disk1", true, 0);
createRegion(vm0, "region_pr3", "disk1", true, 0);
createRegion(vm0, "region_dr1", "disk2", false, 0);
createRegion(vm0, "region_dr2", "disk2", false, 0);
// checkPRRecoveredFromDisk(vm0, "region_pr1", 0, true);
// checkPRRecoveredFromDisk(vm0, "region_pr2", 0, true);
// checkPRRecoveredFromDisk(vm0, "region_pr3", 0, true);
checkData(vm0, 0, 1, "a", "region_pr1");
checkData(vm0, 0, 1, "b", "region_pr2");
checkData(vm0, 0, 1, "c", "region_pr3");
checkData(vm0, 0, 1, "d", "region_dr1");
checkData(vm0, 0, 1, "e", "region_dr2");
}
public void testShutdownAllTimeout() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
final int numBuckets = 50;
createRegion(vm0, "region", "disk", true, 1);
createRegion(vm1, "region", "disk", true, 1);
createData(vm0, 0, numBuckets, "a", "region");
Set<Integer> vm0Buckets = getBucketList(vm0, "region");
Set<Integer> vm1Buckets = getBucketList(vm1, "region");
assertEquals(vm0Buckets, vm1Buckets);
//Add a cache listener that will cause the system to hang up.
//Then do some puts to get us stuck in a put.
AsyncInvocation async1 = vm0.invokeAsync(new SerializableRunnable() {
public void run() {
Region<Object, Object> region = getCache().getRegion("region");
listener = new HangingCacheListener();
region.getAttributesMutator().addCacheListener(listener);
//get us stuck doing a put.
for(int i=0; i < numBuckets; i++) {
region.put(i, "a");
}
}
});
//Make sure the we do get stuck
async1.join(1000);
assertTrue(async1.isAlive());
//Do a shutdownall with a timeout.
//This will hit the timeout, because the in progress put will
//prevent us from gracefully shutting down.
long start = System.nanoTime();
shutDownAllMembers(vm2, 0, 2000);
long end = System.nanoTime();
//Make sure we waited for the timeout.
assertTrue(end - start > TimeUnit.MILLISECONDS.toNanos(1500));
//clean up our stuck thread
vm0.invoke(new SerializableRunnable() {
public void run() {
listener.latch.countDown();
listener = null;
}
});
//wait for shutdown to finish
pause(10000);
// restart vm0
AsyncInvocation a0 = createRegionAsync(vm0, "region", "disk", true, 1);
// restart vm1
AsyncInvocation a1 = createRegionAsync(vm1, "region", "disk", true, 1);
a0.getResult(MAX_WAIT);
a1.getResult(MAX_WAIT);
assertEquals(vm0Buckets, getBucketList(vm0, "region"));
// checkRecoveredFromDisk(vm0, 0, true);
// checkRecoveredFromDisk(vm1, 0, false);
checkData(vm0, 0, numBuckets, "a", "region");
checkData(vm1, 0, numBuckets, "a", "region");
createData(vm0, numBuckets, 113, "b", "region");
checkData(vm0, numBuckets, 113, "b", "region");
}
/**
* Test for 43551. Do a shutdown all with some
* members waiting on recovery.
* @throws Throwable
*/
public void testShutdownAllWithMembersWaiting() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
final int numBuckets = 5;
createRegion(vm0, "region", "disk", true, 1);
createRegion(vm1, "region", "disk", true, 1);
createData(vm0, 0, numBuckets, "a", "region");
Set<Integer> vm0Buckets = getBucketList(vm0, "region");
Set<Integer> vm1Buckets = getBucketList(vm1, "region");
//shutdown all the members
shutDownAllMembers(vm2, 2);
//restart one of the members (this will hang, waiting for the other members)
// restart vm0
AsyncInvocation a0 = createRegionAsync(vm0, "region", "disk", true, 1);
//Wait a bit for the initialization to get stuck
pause(5000);
assertTrue(a0.isAlive());
//Do another shutdown all, with a member offline and another stuck
shutDownAllMembers(vm2, 1);
//This should complete (but maybe it will throw an exception?)
try {
a0.getResult(MAX_WAIT);
fail("should have received a cache closed exception");
} catch(Exception e) {
if(!(e.getCause() instanceof RMIException)) {
throw e;
}
RMIException cause = (RMIException) e.getCause();
if(!(cause.getCause() instanceof CacheClosedException)) {
throw e;
}
}
//now restart both members. This should work, but
//no guarantee they'll do a clean recovery
a0 = createRegionAsync(vm0, "region", "disk", true, 1);
AsyncInvocation a1 = createRegionAsync(vm1, "region", "disk", true, 1);
a0.getResult(MAX_WAIT);
a1.getResult(MAX_WAIT);
assertEquals(vm0Buckets, getBucketList(vm0, "region"));
checkData(vm0, 0, numBuckets, "a", "region");
checkData(vm1, 0, numBuckets, "a", "region");
createData(vm0, numBuckets, numBuckets * 2, "b", "region");
checkData(vm0, numBuckets, numBuckets * 2, "b", "region");
}
//TODO prpersist
// test move bucket
// test async put
// test create a new bucket by put async
private void shutDownAllMembers(VM vm, final int expnum) {
vm.invoke(new SerializableRunnable("Shutdown all the members") {
public void run() {
DistributedSystemConfig config;
AdminDistributedSystemImpl adminDS = null;
try {
config = AdminDistributedSystemFactory.defineDistributedSystem(getSystem(), "");
adminDS = (AdminDistributedSystemImpl)AdminDistributedSystemFactory.getDistributedSystem(config);
adminDS.connect();
Set members = adminDS.shutDownAllMembers();
int num = members==null?0:members.size();
assertEquals(expnum, num);
} catch (AdminException e) {
throw new RuntimeException(e);
} finally {
if(adminDS != null) {
adminDS.disconnect();
}
}
}
});
// clean up for this vm
System.setProperty("TestInternalGemFireError", "false");
}
private SerializableRunnable getCreateDRRunnable(final String regionName, final String diskStoreName) {
SerializableRunnable createDR = new SerializableRunnable("create DR") {
Cache cache;
public void run() {
cache = getCache();
DiskStore ds = cache.findDiskStore(diskStoreName);
if(ds == null) {
ds = cache.createDiskStoreFactory()
.setDiskDirs(getDiskDirs()).create(diskStoreName);
}
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
af.setDiskStoreName(diskStoreName);
cache.createRegion(regionName, af.create());
}
};
return createDR;
}
protected void addCacheServer(VM vm, final int port) {
vm.invoke(new SerializableRunnable("add Cache Server") {
public void run() {
Cache cache = getCache();
CacheServer cs = cache.addCacheServer();
cs.setPort(port);
try {
cs.start();
} catch (IOException e) {
System.out.println("Received expected "+e.getMessage());
}
}
});
}
protected void createRegion(VM vm, final String regionName, final String diskStoreName, final boolean isPR, final int redundancy) {
if (isPR) {
SerializableRunnable createPR = getCreatePRRunnable(regionName, diskStoreName, redundancy);
vm.invoke(createPR);
} else {
SerializableRunnable createPR = getCreateDRRunnable(regionName, diskStoreName);
vm.invoke(createPR);
}
}
protected AsyncInvocation createRegionAsync(VM vm, final String regionName, final String diskStoreName, final boolean isPR, final int redundancy) {
if (isPR) {
SerializableRunnable createPR = getCreatePRRunnable(regionName, diskStoreName, redundancy);
return vm.invokeAsync(createPR);
} else {
SerializableRunnable createDR = getCreateDRRunnable(regionName, diskStoreName);
return vm.invokeAsync(createDR);
}
}
private SerializableRunnable getCreatePRRunnable(final String regionName, final String diskStoreName, final int redundancy) {
SerializableRunnable createPR = new SerializableRunnable("create pr") {
public void run() {
final CountDownLatch recoveryDone;
if(redundancy > 0) {
recoveryDone = new CountDownLatch(1);
ResourceObserver observer = new InternalResourceManager.ResourceObserverAdapter() {
@Override
public void recoveryFinished(Region region) {
recoveryDone.countDown();
}
};
InternalResourceManager.setResourceObserver(observer );
} else {
recoveryDone = null;
}
Cache cache = getCache();
if (diskStoreName!=null) {
DiskStore ds = cache.findDiskStore(diskStoreName);
if(ds == null) {
ds = cache.createDiskStoreFactory()
.setDiskDirs(getDiskDirs()).create(diskStoreName);
}
}
AttributesFactory af = new AttributesFactory();
af.setDiskSynchronous(false); // use async to trigger flush
af.setEvictionAttributes(EvictionAttributes.createLRUEntryAttributes(100, EvictionAction.OVERFLOW_TO_DISK));
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(redundancy);
af.setPartitionAttributes(paf.create());
if (diskStoreName != null) {
af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
af.setDiskStoreName(diskStoreName);
} else {
af.setDataPolicy(DataPolicy.PARTITION);
}
cache.createRegion(regionName, af.create());
if(recoveryDone != null) {
try {
recoveryDone.await();
} catch (InterruptedException e) {
fail("Interrupted", e);
}
}
}
};
return createPR;
}
protected void createData(VM vm, final int startKey, final int endKey,
final String value, final String regionName) {
SerializableRunnable createData = new SerializableRunnable() {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion(regionName);
for(int i =startKey; i < endKey; i++) {
region.put(i, value);
}
}
};
vm.invoke(createData);
}
protected Set<Integer> getBucketList(VM vm, final String regionName) {
SerializableCallable getBuckets = new SerializableCallable("get buckets") {
public Object call() throws Exception {
Cache cache = getCache();
Region region = cache.getRegion(regionName);
if (region instanceof PartitionedRegion) {
PartitionedRegion pr = (PartitionedRegion)region;
return new TreeSet<Integer>(pr.getDataStore().getAllLocalBucketIds());
} else {
return null;
}
}
};
return (Set<Integer>) vm.invoke(getBuckets);
}
protected void checkData(VM vm, final int startKey, final int endKey,
final String value, final String regionName) {
SerializableRunnable checkData = new SerializableRunnable() {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion(regionName);
for(int i =startKey; i < endKey; i++) {
assertEquals(value, region.get(i));
}
}
};
vm.invoke(checkData);
}
protected void checkPRRecoveredFromDisk(VM vm, final String regionName, final int bucketId, final boolean recoveredLocally) {
vm.invoke(new SerializableRunnable("check PR recovered from disk") {
public void run() {
Cache cache = getCache();
PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName);
DiskRegion disk = region.getRegionAdvisor().getBucket(bucketId).getDiskRegion();
if(recoveredLocally) {
assertEquals(0, disk.getStats().getRemoteInitializations());
assertEquals(1, disk.getStats().getLocalInitializations());
} else {
assertEquals(1, disk.getStats().getRemoteInitializations());
assertEquals(0, disk.getStats().getLocalInitializations());
}
}
});
}
protected void closeRegion(VM vm, final String regionName) {
SerializableRunnable close = new SerializableRunnable() {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion(regionName);
region.close();
}
};
vm.invoke(close);
}
private void shutDownAllMembers(VM vm, final int expnum, final long timeout) {
vm.invoke(new SerializableRunnable("Shutdown all the members") {
public void run() {
DistributedSystemConfig config;
AdminDistributedSystemImpl adminDS = null;
try {
config = AdminDistributedSystemFactory.defineDistributedSystem(getSystem(), "");
adminDS = (AdminDistributedSystemImpl)AdminDistributedSystemFactory.getDistributedSystem(config);
adminDS.connect();
Set members = adminDS.shutDownAllMembers(timeout);
int num = members==null?0:members.size();
assertEquals(expnum, num);
} catch (AdminException e) {
throw new RuntimeException(e);
} finally {
if(adminDS != null) {
adminDS.disconnect();
}
}
}
});
}
private static class HangingCacheListener extends CacheListenerAdapter {
CountDownLatch latch = new CountDownLatch(1);
@Override
public void afterUpdate(EntryEvent event) {
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}