blob: 24272da80365bad9f3798fb729f26014f3c8d20f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.partitioned;
import static org.apache.geode.internal.lang.ThrowableUtils.getRootCause;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.admin.AdminDistributedSystemFactory;
import org.apache.geode.admin.AdminException;
import org.apache.geode.admin.DistributedSystemConfig;
import org.apache.geode.admin.internal.AdminDistributedSystemImpl;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.DiskStore;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.EvictionAction;
import org.apache.geode.cache.EvictionAttributes;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.cache.CacheLifecycleListener;
import org.apache.geode.internal.cache.DiskRegion;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.control.InternalResourceManager;
import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceObserver;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.SerializableCallable;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.Wait;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
import org.apache.geode.test.junit.categories.RegionsTest;
/**
* Tests the basic use cases for PR persistence.
*/
@Category({RegionsTest.class})
public class ShutdownAllDUnitTest extends JUnit4CacheTestCase {
private static HangingCacheListener listener;
private static final String expectedExceptions = InternalGemFireError.class.getName()
+ "||ShutdownAllRequest: disconnect distributed without response";
private static final int MAX_WAIT = 600 * 1000;
@Override
public final void postSetUp() throws Exception {
// Get rid of any existing distributed systems. We want
// to make assertions about the number of distributed systems
// we shut down, so we need to start with a clean slate.
disconnectAllFromDS();
}
@Test
public void testShutdownAll2Servers() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
int numBuckets = 50;
createRegion(vm0, "region", "disk", true, 1);
createRegion(vm1, "region", "disk", true, 1);
createData(vm0, 0, numBuckets, "a", "region");
Set<Integer> vm0Buckets = getBucketList(vm0, "region");
Set<Integer> vm1Buckets = getBucketList(vm1, "region");
assertEquals(vm0Buckets, vm1Buckets);
shutDownAllMembers(vm2, 2);
assertTrue(InternalDistributedSystem.getExistingSystems().isEmpty());
// restart vm0
AsyncInvocation a0 = createRegionAsync(vm0, "region", "disk", true, 1);
// restart vm1
AsyncInvocation a1 = createRegionAsync(vm1, "region", "disk", true, 1);
a0.getResult(MAX_WAIT);
a1.getResult(MAX_WAIT);
assertEquals(vm0Buckets, getBucketList(vm0, "region"));
// checkRecoveredFromDisk(vm0, 0, true);
// checkRecoveredFromDisk(vm1, 0, false);
checkData(vm0, 0, numBuckets, "a", "region");
checkData(vm1, 0, numBuckets, "a", "region");
createData(vm0, numBuckets, 113, "b", "region");
checkData(vm0, numBuckets, 113, "b", "region");
}
@Test
public void testShutdownAllWithEncounterIGE1() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
int numBuckets = 50;
createRegion(vm0, "region", "disk", true, 1);
createData(vm0, 0, numBuckets, "a", "region");
vm0.invoke(addExceptionTag1(expectedExceptions));
Invoke.invokeInEveryVM(new SerializableRunnable("set TestInternalGemFireError") {
@Override
public void run() {
System.setProperty("TestInternalGemFireError", "true");
}
});
shutDownAllMembers(vm0, 1);
assertTrue(InternalDistributedSystem.getExistingSystems().isEmpty());
Invoke.invokeInEveryVM(new SerializableRunnable("reset TestInternalGemFireError") {
@Override
public void run() {
System.setProperty("TestInternalGemFireError", "false");
}
});
vm0.invoke(removeExceptionTag1(expectedExceptions));
}
@Test
public void testShutdownAllWithEncounterIGE2() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
int numBuckets = 50;
createRegion(vm0, "region", "disk", true, 1);
createRegion(vm1, "region", "disk", true, 1);
createData(vm0, 0, numBuckets, "a", "region");
Set<Integer> vm0Buckets = getBucketList(vm0, "region");
Set<Integer> vm1Buckets = getBucketList(vm1, "region");
assertEquals(vm0Buckets, vm1Buckets);
vm0.invoke(addExceptionTag1(expectedExceptions));
vm1.invoke(addExceptionTag1(expectedExceptions));
Invoke.invokeInEveryVM(new SerializableRunnable("set TestInternalGemFireError") {
@Override
public void run() {
System.setProperty("TestInternalGemFireError", "true");
}
});
shutDownAllMembers(vm2, 0);
assertTrue(InternalDistributedSystem.getExistingSystems().isEmpty());
Invoke.invokeInEveryVM(new SerializableRunnable("reset TestInternalGemFireError") {
@Override
public void run() {
System.setProperty("TestInternalGemFireError", "false");
}
});
vm0.invoke(removeExceptionTag1(expectedExceptions));
vm1.invoke(removeExceptionTag1(expectedExceptions));
}
private static final AtomicBoolean calledCreateCache = new AtomicBoolean();
private static final AtomicBoolean calledCloseCache = new AtomicBoolean();
private static CacheLifecycleListener cll;
@Test
public void testShutdownAllInterruptsCacheCreation()
throws ExecutionException, InterruptedException, TimeoutException {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm2 = host.getVM(2);
closeAllCache();
// in vm0 create the cache in a way that hangs until
// it sees that a shutDownAll is in progress
AsyncInvocation<?> asyncCreate = vm0.invokeAsync(() -> {
cll = new CacheLifecycleListener() {
@Override
public void cacheCreated(InternalCache cache) {
calledCreateCache.set(true);
await().until(() -> cache.isCacheAtShutdownAll());
}
@Override
public void cacheClosed(InternalCache cache) {
calledCloseCache.set(true);
}
};
GemFireCacheImpl.addCacheLifecycleListener(cll);
getCache();
});
try {
boolean vm0CalledCreateCache = vm0.invoke(() -> {
await().until(() -> calledCreateCache.get());
return calledCreateCache.get();
});
assertTrue(vm0CalledCreateCache);
shutDownAllMembers(vm2, 1);
asyncCreate.get(60, TimeUnit.SECONDS);
boolean vm0CalledCloseCache = vm0.invoke(() -> {
await().until(() -> calledCloseCache.get());
return calledCloseCache.get();
});
assertTrue(vm0CalledCloseCache);
} finally {
vm0.invoke(() -> {
calledCreateCache.set(false);
calledCloseCache.set(false);
GemFireCacheImpl.removeCacheLifecycleListener(cll);
});
}
}
@Test
public void testShutdownAllOneServerAndRecover() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm2 = host.getVM(2);
createRegion(vm0, "region", "disk", true, 0);
createData(vm0, 0, 1, "a", "region");
shutDownAllMembers(vm2, 1);
assertTrue(InternalDistributedSystem.getExistingSystems().isEmpty());
// restart vm0
createRegion(vm0, "region", "disk", true, 0);
checkPRRecoveredFromDisk(vm0, "region", 0, true);
createData(vm0, 1, 10, "b", "region");
}
@Test
public void testPRWithDR() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm2 = host.getVM(2);
createRegion(vm0, "region_pr", "disk", true, 0);
createRegion(vm0, "region_dr", "disk", false, 0);
createData(vm0, 0, 1, "a", "region_pr");
createData(vm0, 0, 1, "c", "region_dr");
shutDownAllMembers(vm2, 1);
assertTrue(InternalDistributedSystem.getExistingSystems().isEmpty());
// restart vm0
createRegion(vm0, "region_pr", "disk", true, 0);
createRegion(vm0, "region_dr", "disk", false, 0);
checkPRRecoveredFromDisk(vm0, "region_pr", 0, true);
checkData(vm0, 0, 1, "a", "region_pr");
checkData(vm0, 0, 1, "c", "region_dr");
}
@Test
public void testShutdownAllFromServer() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
int numBuckets = 50;
createRegion(vm0, "region", "disk", true, 1);
createRegion(vm1, "region", "disk", true, 1);
createRegion(vm2, "region", "disk", true, 1);
createData(vm0, 0, numBuckets, "a", "region");
shutDownAllMembers(vm2, 3);
assertTrue(InternalDistributedSystem.getExistingSystems().isEmpty());
// restart vm0, vm1, vm2
AsyncInvocation a0 = createRegionAsync(vm0, "region", "disk", true, 1);
AsyncInvocation a1 = createRegionAsync(vm1, "region", "disk", true, 1);
AsyncInvocation a2 = createRegionAsync(vm2, "region", "disk", true, 1);
a0.getResult(MAX_WAIT);
a1.getResult(MAX_WAIT);
a2.getResult(MAX_WAIT);
createData(vm0, 0, numBuckets, "a", "region");
createData(vm1, 0, numBuckets, "a", "region");
createData(vm2, 0, numBuckets, "a", "region");
createData(vm0, numBuckets, 113, "b", "region");
checkData(vm0, numBuckets, 113, "b", "region");
}
// shutdownAll, then restart to verify
@Test
public void testCleanStop() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
createRegion(vm0, "region", "disk", true, 1);
createRegion(vm1, "region", "disk", true, 1);
createData(vm0, 0, 1, "a", "region");
shutDownAllMembers(vm2, 2);
AsyncInvocation a0 = createRegionAsync(vm0, "region", "disk", true, 1);
// [dsmith] Make sure that vm0 is waiting for vm1 to recover
// If VM(0) recovers early, that is a problem, because we
// are no longer doing a clean recovery.
Thread.sleep(500);
assertTrue(a0.isAlive());
AsyncInvocation a1 = createRegionAsync(vm1, "region", "disk", true, 1);
a0.getResult(MAX_WAIT);
a1.getResult(MAX_WAIT);
checkData(vm0, 0, 1, "a", "region");
checkData(vm1, 0, 1, "a", "region");
checkPRRecoveredFromDisk(vm0, "region", 0, true);
checkPRRecoveredFromDisk(vm1, "region", 0, true);
closeRegion(vm0, "region");
closeRegion(vm1, "region");
a0 = createRegionAsync(vm0, "region", "disk", true, 1);
a1 = createRegionAsync(vm1, "region", "disk", true, 1);
a0.getResult(MAX_WAIT);
a1.getResult(MAX_WAIT);
checkData(vm0, 0, 1, "a", "region");
checkData(vm1, 0, 1, "a", "region");
checkPRRecoveredFromDisk(vm0, "region", 0, false);
checkPRRecoveredFromDisk(vm1, "region", 0, true);
}
// shutdownAll, then restart to verify
@Test
public void testCleanStopWithConflictCachePort() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
createRegion(vm0, "region", "disk", true, 1);
createRegion(vm1, "region", "disk", true, 1);
// 2 vms use the same port no to conflict
addCacheServer(vm0, 34505);
addCacheServer(vm1, 34505);
createData(vm0, 0, 1, "a", "region");
shutDownAllMembers(vm2, 2);
AsyncInvocation a0 = createRegionAsync(vm0, "region", "disk", true, 1);
// [dsmith] Make sure that vm0 is waiting for vm1 to recover
// If VM(0) recovers early, that is a problem, because we
// are no longer doing a clean recovery.
Thread.sleep(500);
assertTrue(a0.isAlive());
AsyncInvocation a1 = createRegionAsync(vm1, "region", "disk", true, 1);
a0.getResult(MAX_WAIT);
a1.getResult(MAX_WAIT);
addCacheServer(vm0, 34505);
addCacheServer(vm1, 34506);
checkData(vm0, 0, 1, "a", "region");
checkData(vm1, 0, 1, "a", "region");
checkPRRecoveredFromDisk(vm0, "region", 0, true);
checkPRRecoveredFromDisk(vm1, "region", 0, true);
}
@Test
public void testMultiPRDR() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm2 = host.getVM(2);
createRegion(vm0, "region_pr1", "disk1", true, 0);
createRegion(vm0, "region_pr2", "disk1", true, 0);
createRegion(vm0, "region_pr3", "disk1", true, 0);
createRegion(vm0, "region_dr1", "disk2", false, 0);
createRegion(vm0, "region_dr2", "disk2", false, 0);
createData(vm0, 0, 1, "a", "region_pr1");
createData(vm0, 0, 1, "b", "region_pr2");
createData(vm0, 0, 1, "c", "region_pr3");
createData(vm0, 0, 1, "d", "region_dr1");
createData(vm0, 0, 1, "e", "region_dr2");
shutDownAllMembers(vm2, 1);
assertTrue(InternalDistributedSystem.getExistingSystems().isEmpty());
// restart vm0
createRegion(vm0, "region_pr1", "disk1", true, 0);
createRegion(vm0, "region_pr2", "disk1", true, 0);
createRegion(vm0, "region_pr3", "disk1", true, 0);
createRegion(vm0, "region_dr1", "disk2", false, 0);
createRegion(vm0, "region_dr2", "disk2", false, 0);
// checkPRRecoveredFromDisk(vm0, "region_pr1", 0, true);
// checkPRRecoveredFromDisk(vm0, "region_pr2", 0, true);
// checkPRRecoveredFromDisk(vm0, "region_pr3", 0, true);
checkData(vm0, 0, 1, "a", "region_pr1");
checkData(vm0, 0, 1, "b", "region_pr2");
checkData(vm0, 0, 1, "c", "region_pr3");
checkData(vm0, 0, 1, "d", "region_dr1");
checkData(vm0, 0, 1, "e", "region_dr2");
}
@Test
public void testShutdownAllTimeout() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
final int numBuckets = 50;
createRegion(vm0, "region", "disk", true, 1);
createRegion(vm1, "region", "disk", true, 1);
createData(vm0, 0, numBuckets, "a", "region");
Set<Integer> vm0Buckets = getBucketList(vm0, "region");
Set<Integer> vm1Buckets = getBucketList(vm1, "region");
assertEquals(vm0Buckets, vm1Buckets);
// Add a cache listener that will cause the system to hang up.
// Then do some puts to get us stuck in a put.
AsyncInvocation async1 = vm0.invokeAsync(new SerializableRunnable() {
@Override
public void run() {
Region<Object, Object> region = getCache().getRegion("region");
listener = new HangingCacheListener();
region.getAttributesMutator().addCacheListener(listener);
// get us stuck doing a put.
for (int i = 0; i < numBuckets; i++) {
region.put(i, "a");
}
}
});
// Make sure the we do get stuck
async1.join(1000);
assertTrue(async1.isAlive());
// Do a shutdownall with a timeout.
// This will hit the timeout, because the in progress put will
// prevent us from gracefully shutting down.
long start = System.nanoTime();
shutDownAllMembers(vm2, 0, 2000);
long end = System.nanoTime();
// Make sure we waited for the timeout.
assertTrue(end - start > TimeUnit.MILLISECONDS.toNanos(1500));
// clean up our stuck thread
vm0.invoke(new SerializableRunnable() {
@Override
public void run() {
listener.latch.countDown();
listener = null;
}
});
// wait for shutdown to finish
Wait.pause(10000);
// restart vm0
AsyncInvocation a0 = createRegionAsync(vm0, "region", "disk", true, 1);
// restart vm1
AsyncInvocation a1 = createRegionAsync(vm1, "region", "disk", true, 1);
a0.getResult(MAX_WAIT);
a1.getResult(MAX_WAIT);
assertEquals(vm0Buckets, getBucketList(vm0, "region"));
checkData(vm0, 0, numBuckets, "a", "region");
checkData(vm1, 0, numBuckets, "a", "region");
createData(vm0, numBuckets, 113, "b", "region");
checkData(vm0, numBuckets, 113, "b", "region");
}
/**
* Test for 43551. Do a shutdown all with some members waiting on recovery.
*/
@Test
public void testShutdownAllWithMembersWaiting() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
final int numBuckets = 5;
createRegion(vm0, "region", "disk", true, 1);
createRegion(vm1, "region", "disk", true, 1);
createData(vm0, 0, numBuckets, "a", "region");
Set<Integer> vm0Buckets = getBucketList(vm0, "region");
Set<Integer> vm1Buckets = getBucketList(vm1, "region");
// shutdown all the members
shutDownAllMembers(vm2, 2);
// restart one of the members (this will hang, waiting for the other members)
// restart vm0
AsyncInvocation a0 = createRegionAsync(vm0, "region", "disk", true, 1);
// Wait a bit for the initialization to get stuck
Wait.pause(20000);
assertTrue(a0.isAlive());
// Do another shutdown all, with a member offline and another stuck
shutDownAllMembers(vm2, 1);
// This should complete (but maybe it will throw an exception?)
try {
a0.getResult(MAX_WAIT);
fail("should have received a cache closed exception");
} catch (AssertionError e) {
if (!CacheClosedException.class.isInstance(getRootCause(e))) {
throw e;
}
}
// now restart both members. This should work, but
// no guarantee they'll do a clean recovery
a0 = createRegionAsync(vm0, "region", "disk", true, 1);
AsyncInvocation a1 = createRegionAsync(vm1, "region", "disk", true, 1);
a0.getResult(MAX_WAIT);
a1.getResult(MAX_WAIT);
assertEquals(vm0Buckets, getBucketList(vm0, "region"));
checkData(vm0, 0, numBuckets, "a", "region");
checkData(vm1, 0, numBuckets, "a", "region");
createData(vm0, numBuckets, numBuckets * 2, "b", "region");
checkData(vm0, numBuckets, numBuckets * 2, "b", "region");
}
// TODO prpersist
// test move bucket
// test async put
// test create a new bucket by put async
private void shutDownAllMembers(VM vm, final int expnum) {
vm.invoke(new SerializableRunnable("Shutdown all the members") {
@Override
public void run() {
DistributedSystemConfig config;
AdminDistributedSystemImpl adminDS = null;
try {
config = AdminDistributedSystemFactory.defineDistributedSystem(getSystem(), "");
adminDS = (AdminDistributedSystemImpl) AdminDistributedSystemFactory
.getDistributedSystem(config);
adminDS.connect();
Set members = adminDS.shutDownAllMembers();
int num = members == null ? 0 : members.size();
assertEquals(expnum, num);
} catch (AdminException e) {
throw new RuntimeException(e);
} finally {
if (adminDS != null) {
adminDS.disconnect();
}
}
}
});
// clean up for this vm
System.setProperty("TestInternalGemFireError", "false");
}
private SerializableRunnable getCreateDRRunnable(final String regionName,
final String diskStoreName) {
return new SerializableRunnable("create DR") {
@Override
public void run() {
Cache cache = ShutdownAllDUnitTest.this.getCache();
DiskStore ds = cache.findDiskStore(diskStoreName);
if (ds == null) {
ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create(diskStoreName);
}
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
af.setDiskStoreName(diskStoreName);
cache.createRegion(regionName, af.create());
}
};
}
protected void addCacheServer(VM vm, final int port) {
vm.invoke(new SerializableRunnable("add Cache Server") {
@Override
public void run() {
Cache cache = getCache();
CacheServer cs = cache.addCacheServer();
cs.setPort(port);
try {
cs.start();
} catch (IOException e) {
System.out.println("Received expected " + e.getMessage());
}
}
});
}
protected void createRegion(VM vm, final String regionName, final String diskStoreName,
final boolean isPR, final int redundancy) {
if (isPR) {
SerializableRunnable createPR = getCreatePRRunnable(regionName, diskStoreName, redundancy);
vm.invoke(createPR);
} else {
SerializableRunnable createPR = getCreateDRRunnable(regionName, diskStoreName);
vm.invoke(createPR);
}
}
protected AsyncInvocation createRegionAsync(VM vm, final String regionName,
final String diskStoreName, final boolean isPR, final int redundancy) {
if (isPR) {
SerializableRunnable createPR = getCreatePRRunnable(regionName, diskStoreName, redundancy);
return vm.invokeAsync(createPR);
} else {
SerializableRunnable createDR = getCreateDRRunnable(regionName, diskStoreName);
return vm.invokeAsync(createDR);
}
}
private SerializableRunnable getCreatePRRunnable(final String regionName,
final String diskStoreName, final int redundancy) {
return new SerializableRunnable("create pr") {
@Override
public void run() {
final CountDownLatch recoveryDone;
if (redundancy > 0) {
recoveryDone = new CountDownLatch(1);
ResourceObserver observer = new InternalResourceManager.ResourceObserverAdapter() {
@Override
public void recoveryFinished(Region region) {
recoveryDone.countDown();
}
};
InternalResourceManager.setResourceObserver(observer);
} else {
recoveryDone = null;
}
Cache cache = ShutdownAllDUnitTest.this.getCache();
if (diskStoreName != null) {
DiskStore ds = cache.findDiskStore(diskStoreName);
if (ds == null) {
ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create(diskStoreName);
}
}
AttributesFactory af = new AttributesFactory();
af.setDiskSynchronous(false); // use async to trigger flush
af.setEvictionAttributes(
EvictionAttributes.createLRUEntryAttributes(100, EvictionAction.OVERFLOW_TO_DISK));
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(redundancy);
af.setPartitionAttributes(paf.create());
if (diskStoreName != null) {
af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
af.setDiskStoreName(diskStoreName);
} else {
af.setDataPolicy(DataPolicy.PARTITION);
}
cache.createRegion(regionName, af.create());
if (recoveryDone != null) {
try {
recoveryDone.await();
} catch (InterruptedException e) {
Assert.fail("Interrupted", e);
}
}
}
};
}
protected void createData(VM vm, final int startKey, final int endKey, final String value,
final String regionName) {
SerializableRunnable createData = new SerializableRunnable() {
@Override
public void run() {
Cache cache = getCache();
Region region = cache.getRegion(regionName);
for (int i = startKey; i < endKey; i++) {
region.put(i, value);
}
}
};
vm.invoke(createData);
}
protected Set<Integer> getBucketList(VM vm, final String regionName) {
SerializableCallable getBuckets = new SerializableCallable("get buckets") {
@Override
public Object call() throws Exception {
Cache cache = getCache();
Region region = cache.getRegion(regionName);
if (region instanceof PartitionedRegion) {
PartitionedRegion pr = (PartitionedRegion) region;
return new TreeSet<Integer>(pr.getDataStore().getAllLocalBucketIds());
} else {
return null;
}
}
};
return (Set<Integer>) vm.invoke(getBuckets);
}
protected void checkData(VM vm, final int startKey, final int endKey, final String value,
final String regionName) {
SerializableRunnable checkData = new SerializableRunnable() {
@Override
public void run() {
Cache cache = getCache();
Region region = cache.getRegion(regionName);
for (int i = startKey; i < endKey; i++) {
assertEquals(value, region.get(i));
}
}
};
vm.invoke(checkData);
}
protected void checkPRRecoveredFromDisk(VM vm, final String regionName, final int bucketId,
final boolean recoveredLocally) {
vm.invoke(new SerializableRunnable("check PR recovered from disk") {
@Override
public void run() {
Cache cache = getCache();
PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName);
DiskRegion disk = region.getRegionAdvisor().getBucket(bucketId).getDiskRegion();
if (recoveredLocally) {
assertEquals(0, disk.getStats().getRemoteInitializations());
assertEquals(1, disk.getStats().getLocalInitializations());
} else {
assertEquals(1, disk.getStats().getRemoteInitializations());
assertEquals(0, disk.getStats().getLocalInitializations());
}
}
});
}
protected void closeRegion(VM vm, final String regionName) {
SerializableRunnable close = new SerializableRunnable() {
@Override
public void run() {
Cache cache = getCache();
Region region = cache.getRegion(regionName);
region.close();
}
};
vm.invoke(close);
}
private void shutDownAllMembers(VM vm, final int expnum, final long timeout) {
vm.invoke(new SerializableRunnable("Shutdown all the members") {
@Override
public void run() {
DistributedSystemConfig config;
AdminDistributedSystemImpl adminDS = null;
try {
config = AdminDistributedSystemFactory.defineDistributedSystem(getSystem(), "");
adminDS = (AdminDistributedSystemImpl) AdminDistributedSystemFactory
.getDistributedSystem(config);
adminDS.connect();
Set members = adminDS.shutDownAllMembers(timeout);
int num = members == null ? 0 : members.size();
assertEquals(expnum, num);
} catch (AdminException e) {
throw new RuntimeException(e);
} finally {
if (adminDS != null) {
adminDS.disconnect();
}
}
}
});
}
private static class HangingCacheListener extends CacheListenerAdapter {
CountDownLatch latch = new CountDownLatch(1);
@Override
public void afterUpdate(EntryEvent event) {
try {
latch.await();
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
}
}
}