blob: 4dcc2cbc4d7dd5c5723d6c18597ee8db4e55cc77 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2011-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache.partitioned;
import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import com.gemstone.gemfire.admin.internal.AdminDistributedSystemImpl;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheClosedException;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.DiskStore;
import com.gemstone.gemfire.cache.PartitionAttributesFactory;
import com.gemstone.gemfire.cache.PartitionedRegionStorageException;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.control.RebalanceOperation;
import com.gemstone.gemfire.cache.control.RebalanceResults;
import com.gemstone.gemfire.cache.persistence.PartitionOfflineException;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.distributed.internal.DistributionMessage;
import com.gemstone.gemfire.distributed.internal.DistributionMessageObserver;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.internal.FileUtil;
import com.gemstone.gemfire.internal.cache.InitialImageOperation.RequestImageMessage;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.control.InternalResourceManager;
import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceObserver;
import dunit.AsyncInvocation;
import dunit.Host;
import dunit.SerializableCallable;
import dunit.SerializableRunnable;
import dunit.VM;
/**
* @author dsmith
*
*/
public class PersistentColocatedPartitionedRegionDUnitTest extends
PersistentPartitionedRegionTestBase {
private static final int NUM_BUCKETS = 15;
private static final int MAX_WAIT = 30 * 1000;
/**
* @param name
*/
public PersistentColocatedPartitionedRegionDUnitTest(String name) {
super(name);
}
@Override
public void tearDown2() throws Exception {
FileUtil.delete(getBackupDir());
super.tearDown2();
}
public void testColocatedPRAttributes() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(1);
vm0.invoke(new SerializableRunnable("create") {
public void run() {
Cache cache = getCache();
DiskStore ds = cache.findDiskStore("disk");
if(ds == null) {
ds = cache.createDiskStoreFactory()
.setDiskDirs(getDiskDirs()).create("disk");
}
//Create Persistent region
AttributesFactory af = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(0);
af.setPartitionAttributes(paf.create());
af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
af.setDiskStoreName("disk");
cache.createRegion("persistentLeader", af.create());
af.setDataPolicy(DataPolicy.PARTITION);
af.setDiskStoreName(null);
cache.createRegion("nonPersistentLeader", af.create());
//Create a non persistent PR
af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
af.setDiskStoreName("disk");
paf.setColocatedWith("nonPersistentLeader");
af.setPartitionAttributes(paf.create());
//Try to colocate a persistent PR with the non persistent PR. This should fail.
ExpectedException exp = addExpectedException("IllegalStateException");
try {
cache.createRegion("colocated", af.create());
fail("should not have been able to create a persistent region colocated with a non persistent region");
} catch(IllegalStateException expected) {
//do nothing
} finally {
exp.remove();
}
//Try to colocate a persistent PR with another persistent PR. This should work.
paf.setColocatedWith("persistentLeader");
af.setPartitionAttributes(paf.create());
cache.createRegion("colocated", af.create());
//We should also be able to colocate a non persistent region with a persistent region.
af.setDataPolicy(DataPolicy.PARTITION);
af.setDiskStoreName(null);
paf.setColocatedWith("persistentLeader");
af.setPartitionAttributes(paf.create());
cache.createRegion("colocated2", af.create());
}
});
}
/**
* Testing that we can colocate persistent PRs
*/
public void testColocatedPRs() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
SerializableRunnable createPRs = new SerializableRunnable("region1") {
public void run() {
Cache cache = getCache();
DiskStore ds = cache.findDiskStore("disk");
if(ds == null) {
ds = cache.createDiskStoreFactory()
.setDiskDirs(getDiskDirs()).create("disk");
}
AttributesFactory af = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(0);
af.setPartitionAttributes(paf.create());
af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
af.setDiskStoreName("disk");
cache.createRegion(PR_REGION_NAME, af.create());
paf.setColocatedWith(PR_REGION_NAME);
af.setPartitionAttributes(paf.create());
cache.createRegion("region2", af.create());
paf.setColocatedWith("region2");
af.setPartitionAttributes(paf.create());
af.setDataPolicy(DataPolicy.PARTITION);
af.setDiskStoreName(null);
cache.createRegion("region3", af.create());
}
};
vm0.invoke(createPRs);
vm1.invoke(createPRs);
vm2.invoke(createPRs);
createData(vm0, 0, NUM_BUCKETS, "a");
createData(vm0, 0, NUM_BUCKETS, "b", "region2");
createData(vm0, 0, NUM_BUCKETS, "c", "region3");
Set<Integer> vm0Buckets = getBucketList(vm0, PR_REGION_NAME);
assertEquals(vm0Buckets, getBucketList(vm0, "region2"));
assertEquals(vm0Buckets, getBucketList(vm0, "region3"));
Set<Integer> vm1Buckets = getBucketList(vm1, PR_REGION_NAME);
assertEquals(vm1Buckets, getBucketList(vm1, "region2"));
assertEquals(vm1Buckets, getBucketList(vm1, "region3"));
Set<Integer> vm2Buckets = getBucketList(vm2, PR_REGION_NAME);
assertEquals(vm2Buckets, getBucketList(vm2, "region2"));
assertEquals(vm2Buckets, getBucketList(vm2, "region3"));
closeCache(vm0);
closeCache(vm1);
closeCache(vm2);
AsyncInvocation async0 = vm0.invokeAsync(createPRs);
AsyncInvocation async1 = vm1.invokeAsync(createPRs);
AsyncInvocation async2 = vm2.invokeAsync(createPRs);
async0.getResult(MAX_WAIT);
async1.getResult(MAX_WAIT);
async2.getResult(MAX_WAIT);
//The secondary buckets can be recovered asynchronously,
//so wait for them to come back.
waitForBuckets(vm0, vm0Buckets, PR_REGION_NAME);
waitForBuckets(vm0, vm0Buckets, "region2");
waitForBuckets(vm1, vm1Buckets, PR_REGION_NAME);
waitForBuckets(vm1, vm1Buckets, "region2");
checkData(vm0, 0, NUM_BUCKETS, "a");
checkData(vm0, 0, NUM_BUCKETS, "b", "region2");
//region 3 didn't have persistent data, so it nothing should be recovered
checkData(vm0, 0, NUM_BUCKETS, null, "region3");
//Make sure can do a put in all of the buckets in region 3
createData(vm0, 0, NUM_BUCKETS, "c", "region3");
//Now all of those buckets should exist.
checkData(vm0, 0, NUM_BUCKETS, "c", "region3");
//The region 3 buckets should be restored in the appropriate places.
assertEquals(vm0Buckets,getBucketList(vm0, "region3"));
assertEquals(vm1Buckets, getBucketList(vm1, "region3"));
assertEquals(vm2Buckets, getBucketList(vm2, "region3"));
}
/**
* Testing what happens we we recreate colocated persistent PRs by creating
* one PR everywhere and then the other PR everywhere.
*/
public void testColocatedPRsRecoveryOnePRAtATime() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
SerializableRunnable createParentPR = new SerializableRunnable("createParentPR") {
public void run() {
Cache cache = getCache();
DiskStore ds = cache.findDiskStore("disk");
if (ds == null) {
ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs())
.create("disk");
}
AttributesFactory af = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
af.setPartitionAttributes(paf.create());
af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
af.setDiskStoreName("disk");
cache.createRegion(PR_REGION_NAME, af.create());
}
};
SerializableRunnable createChildPR = getCreateChildPRRunnable();
vm0.invoke(createParentPR);
vm1.invoke(createParentPR);
vm2.invoke(createParentPR);
vm0.invoke(createChildPR);
vm1.invoke(createChildPR);
vm2.invoke(createChildPR);
createData(vm0, 0, NUM_BUCKETS, "a");
createData(vm0, 0, NUM_BUCKETS, "b", "region2");
Set<Integer> vm0Buckets = getBucketList(vm0, PR_REGION_NAME);
assertEquals(vm0Buckets, getBucketList(vm0, "region2"));
Set<Integer> vm1Buckets = getBucketList(vm1, PR_REGION_NAME);
assertEquals(vm1Buckets, getBucketList(vm1, "region2"));
Set<Integer> vm2Buckets = getBucketList(vm2, PR_REGION_NAME);
assertEquals(vm2Buckets, getBucketList(vm2, "region2"));
Set<Integer> vm0PrimaryBuckets = getPrimaryBucketList(vm0, PR_REGION_NAME);
assertEquals(vm0PrimaryBuckets, getPrimaryBucketList(vm0, "region2"));
Set<Integer> vm1PrimaryBuckets = getPrimaryBucketList(vm1, PR_REGION_NAME);
assertEquals(vm1PrimaryBuckets, getPrimaryBucketList(vm1, "region2"));
Set<Integer> vm2PrimaryBuckets = getPrimaryBucketList(vm2, PR_REGION_NAME);
assertEquals(vm2PrimaryBuckets, getPrimaryBucketList(vm2, "region2"));
closeCache(vm0);
closeCache(vm1);
closeCache(vm2);
AsyncInvocation async0 = vm0.invokeAsync(createParentPR);
AsyncInvocation async1 = vm1.invokeAsync(createParentPR);
AsyncInvocation async2 = vm2.invokeAsync(createParentPR);
async0.getResult(MAX_WAIT);
async1.getResult(MAX_WAIT);
async2.getResult(MAX_WAIT);
vm0.invoke(createChildPR);
vm1.invoke(createChildPR);
vm2.invoke(createChildPR);
pause(4000);
assertEquals(vm0Buckets, getBucketList(vm0, PR_REGION_NAME));
assertEquals(vm0Buckets, getBucketList(vm0, "region2"));
assertEquals(vm1Buckets, getBucketList(vm1, PR_REGION_NAME));
assertEquals(vm1Buckets, getBucketList(vm1, "region2"));
assertEquals(vm2Buckets, getBucketList(vm2, PR_REGION_NAME));
assertEquals(vm2Buckets, getBucketList(vm2, "region2"));
//primary can differ
vm0PrimaryBuckets = getPrimaryBucketList(vm0, PR_REGION_NAME);
assertEquals(vm0PrimaryBuckets, getPrimaryBucketList(vm0, "region2"));
vm1PrimaryBuckets = getPrimaryBucketList(vm1, PR_REGION_NAME);
assertEquals(vm1PrimaryBuckets, getPrimaryBucketList(vm1, "region2"));
vm2PrimaryBuckets = getPrimaryBucketList(vm2, PR_REGION_NAME);
assertEquals(vm2PrimaryBuckets, getPrimaryBucketList(vm2, "region2"));
checkData(vm0, 0, NUM_BUCKETS, "a");
// region 2 didn't have persistent data, so it nothing should be recovered
checkData(vm0, 0, NUM_BUCKETS, null, "region2");
// Make sure can do a put in all of the buckets in vm2
createData(vm0, 0, NUM_BUCKETS, "c", "region2");
// Now all of those buckets should exist
checkData(vm0, 0, NUM_BUCKETS, "c", "region2");
// Now all the buckets should be restored in the appropriate places.
assertEquals(vm0Buckets, getBucketList(vm0, "region2"));
assertEquals(vm1Buckets, getBucketList(vm1, "region2"));
assertEquals(vm2Buckets, getBucketList(vm2, "region2"));
}
private SerializableRunnable getCreateChildPRRunnable() {
return new SerializableRunnable("createChildPR") {
public void run() {
Cache cache = getCache();
final CountDownLatch recoveryDone = new CountDownLatch(1);
ResourceObserver observer = new InternalResourceManager.ResourceObserverAdapter() {
@Override
public void recoveryFinished(Region region) {
if(region.getName().equals("region2")){
recoveryDone.countDown();
}
}
};
InternalResourceManager.setResourceObserver(observer );
AttributesFactory af = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
paf.setColocatedWith(PR_REGION_NAME);
af.setPartitionAttributes(paf.create());
cache.createRegion("region2", af.create());
try {
recoveryDone.await(MAX_WAIT, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
fail("interrupted", e);
}
}
};
}
public void testColocatedPRsRecoveryOneMemberLater() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
SerializableRunnable createParentPR = new SerializableRunnable("createParentPR") {
public void run() {
Cache cache = getCache();
DiskStore ds = cache.findDiskStore("disk");
if (ds == null) {
ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs())
.create("disk");
}
AttributesFactory af = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
af.setPartitionAttributes(paf.create());
af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
af.setDiskStoreName("disk");
cache.createRegion(PR_REGION_NAME, af.create());
}
};
SerializableRunnable createChildPR =getCreateChildPRRunnable();
vm0.invoke(createParentPR);
vm1.invoke(createParentPR);
vm2.invoke(createParentPR);
vm0.invoke(createChildPR);
vm1.invoke(createChildPR);
vm2.invoke(createChildPR);
createData(vm0, 0, NUM_BUCKETS, "a");
createData(vm0, 0, NUM_BUCKETS, "b", "region2");
Set<Integer> vm0Buckets = getBucketList(vm0, PR_REGION_NAME);
assertEquals(vm0Buckets, getBucketList(vm0, "region2"));
Set<Integer> vm1Buckets = getBucketList(vm1, PR_REGION_NAME);
assertEquals(vm1Buckets, getBucketList(vm1, "region2"));
Set<Integer> vm2Buckets = getBucketList(vm2, PR_REGION_NAME);
assertEquals(vm2Buckets, getBucketList(vm2, "region2"));
Set<Integer> vm0PrimaryBuckets = getPrimaryBucketList(vm0, PR_REGION_NAME);
assertEquals(vm0PrimaryBuckets, getPrimaryBucketList(vm0, "region2"));
Set<Integer> vm1PrimaryBuckets = getPrimaryBucketList(vm1, PR_REGION_NAME);
assertEquals(vm1PrimaryBuckets, getPrimaryBucketList(vm1, "region2"));
Set<Integer> vm2PrimaryBuckets = getPrimaryBucketList(vm2, PR_REGION_NAME);
assertEquals(vm2PrimaryBuckets, getPrimaryBucketList(vm2, "region2"));
closeCache(vm2);
// Make sure the other members notice that vm2 has gone
// TODO use a callback for this.
Thread.sleep(4000);
closeCache(vm0);
closeCache(vm1);
// Create the members, but don't initialize
// VM2 yet
AsyncInvocation async0 = vm0.invokeAsync(createParentPR);
AsyncInvocation async1 = vm1.invokeAsync(createParentPR);
async0.getResult(MAX_WAIT);
async1.getResult(MAX_WAIT);
vm0.invoke(createChildPR);
vm1.invoke(createChildPR);
waitForBucketRecovery(vm0, vm0Buckets);
waitForBucketRecovery(vm1, vm1Buckets);
checkData(vm0, 0, NUM_BUCKETS, "a");
// region 2 didn't have persistent data, so it nothing should be recovered
checkData(vm0, 0, NUM_BUCKETS, null, "region2");
// Make sure can do a put in all of the buckets in vm2
createData(vm0, 0, NUM_BUCKETS, "c", "region2");
// Now all of those buckets should exist
checkData(vm0, 0, NUM_BUCKETS, "c", "region2");
// Now we initialize vm2.
vm2.invoke(createParentPR);
// Make sure vm2 hasn't created any buckets in the parent PR yet
// We don't want any buckets until the child PR is created
assertEquals(Collections.emptySet(), getBucketList(vm2, PR_REGION_NAME));
vm2.invoke(createChildPR);
// Now vm2 should have created all of the appropriate buckets.
assertEquals(vm2Buckets, getBucketList(vm2, PR_REGION_NAME));
assertEquals(vm2Buckets, getBucketList(vm2, "region2"));
vm0PrimaryBuckets = getPrimaryBucketList(vm0, PR_REGION_NAME);
assertEquals(vm0PrimaryBuckets, getPrimaryBucketList(vm0, "region2"));
vm1PrimaryBuckets = getPrimaryBucketList(vm1, PR_REGION_NAME);
assertEquals(vm1PrimaryBuckets, getPrimaryBucketList(vm1, "region2"));
vm2PrimaryBuckets = getPrimaryBucketList(vm2, PR_REGION_NAME);
assertEquals(vm2PrimaryBuckets, getPrimaryBucketList(vm2, "region2"));
}
public void testReplaceOfflineMemberAndRestart() throws Throwable {
SerializableRunnable createPRs = new SerializableRunnable("region1") {
public void run() {
Cache cache = getCache();
DiskStore ds = cache.findDiskStore("disk");
if(ds == null) {
ds = cache.createDiskStoreFactory()
.setDiskDirs(getDiskDirs()).create("disk");
}
final CountDownLatch recoveryDone = new CountDownLatch(2);
ResourceObserver observer = new InternalResourceManager.ResourceObserverAdapter() {
@Override
public void recoveryFinished(Region region) {
recoveryDone.countDown();
}
};
InternalResourceManager.setResourceObserver(observer );
AttributesFactory af = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
paf.setRecoveryDelay(0);
af.setPartitionAttributes(paf.create());
af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
af.setDiskStoreName("disk");
cache.createRegion(PR_REGION_NAME, af.create());
paf.setColocatedWith(PR_REGION_NAME);
af.setPartitionAttributes(paf.create());
cache.createRegion("region2", af.create());
try {
if(!recoveryDone.await(MAX_WAIT, TimeUnit.MILLISECONDS)) {
fail("timed out");
}
} catch (InterruptedException e) {
fail("interrupted", e);
}
}
};
replaceOfflineMemberAndRestart(createPRs);
}
/**
* Test that if we replace an offline member, even if colocated regions are
* in different disk stores, we still keep our metadata consistent.
* @throws Throwable
*/
public void testReplaceOfflineMemberAndRestartTwoDiskStores() throws Throwable {
SerializableRunnable createPRs = new SerializableRunnable("region1") {
public void run() {
Cache cache = getCache();
DiskStore ds = cache.findDiskStore("disk");
if(ds == null) {
ds = cache.createDiskStoreFactory()
.setDiskDirs(getDiskDirs()).create("disk");
}
final CountDownLatch recoveryDone = new CountDownLatch(2);
ResourceObserver observer = new InternalResourceManager.ResourceObserverAdapter() {
@Override
public void recoveryFinished(Region region) {
recoveryDone.countDown();
}
};
InternalResourceManager.setResourceObserver(observer );
AttributesFactory af = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
paf.setRecoveryDelay(0);
af.setPartitionAttributes(paf.create());
af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
af.setDiskStoreName("disk");
cache.createRegion(PR_REGION_NAME, af.create());
DiskStore ds2 = cache.findDiskStore("disk2");
if(ds2 == null) {
ds2 = cache.createDiskStoreFactory()
.setDiskDirs(getDiskDirs()).create("disk2");
}
paf.setColocatedWith(PR_REGION_NAME);
af.setPartitionAttributes(paf.create());
af.setDiskStoreName("disk2");
cache.createRegion("region2", af.create());
try {
if(!recoveryDone.await(MAX_WAIT, TimeUnit.MILLISECONDS)) {
fail("timed out");
}
} catch (InterruptedException e) {
fail("interrupted", e);
}
}
};
replaceOfflineMemberAndRestart(createPRs);
}
/**
* Test for support issue 7870.
* 1. Run three members with redundancy 1 and recovery delay 0
* 2. Kill one of the members, to trigger replacement of buckets
* 3. Shutdown all members and restart.
*
* What was happening is that in the parent PR, we discarded
* our offline data in one member, but in the child PR the other
* members ended up waiting for the child bucket to be created
* in the member that discarded it's offline data.
*
* @throws Throwable
*/
public void replaceOfflineMemberAndRestart(SerializableRunnable createPRs) throws Throwable {
disconnectAllFromDS();
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
//Create the PR on three members
vm0.invoke(createPRs);
vm1.invoke(createPRs);
vm2.invoke(createPRs);
//Create some buckets.
createData(vm0, 0, NUM_BUCKETS, "a");
createData(vm0, 0, NUM_BUCKETS, "a", "region2");
//Close one of the members to trigger redundancy recovery.
closeCache(vm2);
//Wait until redundancy is recovered.
waitForRedundancyRecovery(vm0, 1, PR_REGION_NAME);
waitForRedundancyRecovery(vm0, 1, "region2");
createData(vm0, 0, NUM_BUCKETS, "b");
createData(vm0, 0, NUM_BUCKETS, "b", "region2");
ExpectedException expected = addExpectedException("PartitionOfflineException");
try {
//Close the remaining members.
vm0.invoke(new SerializableCallable() {
public Object call() throws Exception {
InternalDistributedSystem ds = (InternalDistributedSystem) getCache().getDistributedSystem();
AdminDistributedSystemImpl.shutDownAllMembers(ds.getDistributionManager(), 600000);
return null;
}
});
//Make sure that vm-1 is completely disconnected
//The shutdown all asynchronously finishes the disconnect after
//replying to the admin member.
vm1.invoke(new SerializableRunnable() {
public void run() {
system.disconnect();
}
});
//Recreate the members. Try to make sure that
//the member with the latest copy of the buckets
//is the one that decides to throw away it's copy
//by starting it last.
AsyncInvocation async0 = vm0.invokeAsync(createPRs);
AsyncInvocation async1 = vm1.invokeAsync(createPRs);
pause(2000);
AsyncInvocation async2 = vm2.invokeAsync(createPRs);
async0.getResult(MAX_WAIT);
async1.getResult(MAX_WAIT);
async2.getResult(MAX_WAIT);
checkData(vm0, 0, NUM_BUCKETS, "b");
checkData(vm0, 0, NUM_BUCKETS, "b", "region2");
waitForRedundancyRecovery(vm0, 1, PR_REGION_NAME);
waitForRedundancyRecovery(vm0, 1, "region2");
waitForRedundancyRecovery(vm1, 1, PR_REGION_NAME);
waitForRedundancyRecovery(vm1, 1, "region2");
waitForRedundancyRecovery(vm2, 1, PR_REGION_NAME);
waitForRedundancyRecovery(vm2, 1, "region2");
//Make sure we don't have any extra buckets after the restart
int totalBucketCount = getBucketList(vm0).size();
totalBucketCount += getBucketList(vm1).size();
totalBucketCount += getBucketList(vm2).size();
assertEquals(2 * NUM_BUCKETS, totalBucketCount);
totalBucketCount = getBucketList(vm0, "region2").size();
totalBucketCount += getBucketList(vm1, "region2").size();
totalBucketCount += getBucketList(vm2, "region2").size();
assertEquals(2 * NUM_BUCKETS, totalBucketCount);
} finally {
expected.remove();
}
}
public void testReplaceOfflineMemberAndRestartCreateColocatedPRLate() throws Throwable {
SerializableRunnable createParentPR = new SerializableRunnable() {
public void run() {
Cache cache = getCache();
DiskStore ds = cache.findDiskStore("disk");
if(ds == null) {
ds = cache.createDiskStoreFactory()
.setDiskDirs(getDiskDirs()).create("disk");
}
AttributesFactory af = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
paf.setRecoveryDelay(0);
af.setPartitionAttributes(paf.create());
af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
af.setDiskStoreName("disk");
cache.createRegion(PR_REGION_NAME, af.create());
}
};
SerializableRunnable createChildPR = new SerializableRunnable() {
public void run() {
Cache cache = getCache();
final CountDownLatch recoveryDone = new CountDownLatch(1);
ResourceObserver observer = new InternalResourceManager.ResourceObserverAdapter() {
@Override
public void recoveryFinished(Region region) {
if(region.getName().contains("region2")) {
recoveryDone.countDown();
}
}
};
InternalResourceManager.setResourceObserver(observer );
AttributesFactory af = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
paf.setRecoveryDelay(0);
paf.setColocatedWith(PR_REGION_NAME);
af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
af.setDiskStoreName("disk");
af.setPartitionAttributes(paf.create());
cache.createRegion("region2", af.create());
try {
if(!recoveryDone.await(MAX_WAIT, TimeUnit.MILLISECONDS)) {
fail("timed out");
}
} catch (InterruptedException e) {
fail("interrupted", e);
}
}
};
replaceOfflineMemberAndRestartCreateColocatedPRLate(createParentPR, createChildPR);
}
public void testReplaceOfflineMemberAndRestartCreateColocatedPRLateTwoDiskStores() throws Throwable {
SerializableRunnable createParentPR = new SerializableRunnable() {
public void run() {
Cache cache = getCache();
DiskStore ds = cache.findDiskStore("disk");
if(ds == null) {
ds = cache.createDiskStoreFactory()
.setDiskDirs(getDiskDirs()).create("disk");
}
AttributesFactory af = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
paf.setRecoveryDelay(0);
af.setPartitionAttributes(paf.create());
af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
af.setDiskStoreName("disk");
cache.createRegion(PR_REGION_NAME, af.create());
}
};
SerializableRunnable createChildPR = new SerializableRunnable() {
public void run() {
Cache cache = getCache();
final CountDownLatch recoveryDone = new CountDownLatch(1);
ResourceObserver observer = new InternalResourceManager.ResourceObserverAdapter() {
@Override
public void recoveryFinished(Region region) {
if(region.getName().contains("region2")) {
recoveryDone.countDown();
}
}
};
InternalResourceManager.setResourceObserver(observer );
DiskStore ds2 = cache.findDiskStore("disk2");
if(ds2 == null) {
ds2 = cache.createDiskStoreFactory()
.setDiskDirs(getDiskDirs()).create("disk2");
}
AttributesFactory af = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
paf.setRecoveryDelay(0);
paf.setColocatedWith(PR_REGION_NAME);
af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
af.setDiskStoreName("disk2");
af.setPartitionAttributes(paf.create());
cache.createRegion("region2", af.create());
try {
if(!recoveryDone.await(MAX_WAIT, TimeUnit.MILLISECONDS)) {
fail("timed out");
}
} catch (InterruptedException e) {
fail("interrupted", e);
}
}
};
replaceOfflineMemberAndRestartCreateColocatedPRLate(createParentPR, createChildPR);
}
/**
* Test for support issue 7870.
* 1. Run three members with redundancy 1 and recovery delay 0
* 2. Kill one of the members, to trigger replacement of buckets
* 3. Shutdown all members and restart.
*
* What was happening is that in the parent PR, we discarded
* our offline data in one member, but in the child PR the other
* members ended up waiting for the child bucket to be created
* in the member that discarded it's offline data.
*
* In this test case, we're creating the child PR later,
* after the parent buckets have already been completely created.
*
* @throws Throwable
*/
public void replaceOfflineMemberAndRestartCreateColocatedPRLate(SerializableRunnable createParentPR, SerializableRunnable createChildPR) throws Throwable {
addExpectedException("PartitionOfflineException");
addExpectedException("PartitionOfflineException");
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
//Create the PRs on three members
vm0.invoke(createParentPR);
vm1.invoke(createParentPR);
vm2.invoke(createParentPR);
vm0.invoke(createChildPR);
vm1.invoke(createChildPR);
vm2.invoke(createChildPR);
//Create some buckets.
createData(vm0, 0, NUM_BUCKETS, "a");
createData(vm0, 0, NUM_BUCKETS, "a", "region2");
//Close one of the members to trigger redundancy recovery.
closeCache(vm2);
//Wait until redundancy is recovered.
waitForRedundancyRecovery(vm0, 1, PR_REGION_NAME);
waitForRedundancyRecovery(vm0, 1, "region2");
createData(vm0, 0, NUM_BUCKETS, "b");
createData(vm0, 0, NUM_BUCKETS, "b", "region2");
//Close the remaining members.
vm0.invoke(new SerializableCallable() {
public Object call() throws Exception {
InternalDistributedSystem ds = (InternalDistributedSystem) getCache().getDistributedSystem();
AdminDistributedSystemImpl.shutDownAllMembers(ds.getDistributionManager(), 0);
return null;
}
});
//Make sure that vm-1 is completely disconnected
//The shutdown all asynchronously finishes the disconnect after
//replying to the admin member.
vm1.invoke(new SerializableRunnable() {
public void run() {
system.disconnect();
}
});
//Recreate the parent region. Try to make sure that
//the member with the latest copy of the buckets
//is the one that decides to throw away it's copy
//by starting it last.
AsyncInvocation async2 = vm2.invokeAsync(createParentPR);
AsyncInvocation async1 = vm1.invokeAsync(createParentPR);
pause(2000);
AsyncInvocation async0 = vm0.invokeAsync(createParentPR);
async0.getResult(MAX_WAIT);
async1.getResult(MAX_WAIT);
async2.getResult(MAX_WAIT);
//Wait for async tasks
pause(2000);
//Recreate the child region.
async2 = vm2.invokeAsync(createChildPR);
async1 = vm1.invokeAsync(createChildPR);
async0 = vm0.invokeAsync(createChildPR);
async0.getResult(MAX_WAIT);
async1.getResult(MAX_WAIT);
async2.getResult(MAX_WAIT);
//Validate the data
checkData(vm0, 0, NUM_BUCKETS, "b");
checkData(vm0, 0, NUM_BUCKETS, "b", "region2");
//Make sure we can actually use the buckets in the child region.
createData(vm0, 0, NUM_BUCKETS, "c", "region2");
waitForRedundancyRecovery(vm0, 1, PR_REGION_NAME);
waitForRedundancyRecovery(vm0, 1, "region2");
//Make sure we don't have any extra buckets after the restart
int totalBucketCount = getBucketList(vm0).size();
totalBucketCount += getBucketList(vm1).size();
totalBucketCount += getBucketList(vm2).size();
assertEquals(2 * NUM_BUCKETS, totalBucketCount);
totalBucketCount = getBucketList(vm0, "region2").size();
totalBucketCount += getBucketList(vm1, "region2").size();
totalBucketCount += getBucketList(vm2, "region2").size();
assertEquals(2 * NUM_BUCKETS, totalBucketCount);
}
/**
* Test what happens when we crash in the middle of
* satisfying redundancy for a colocated bucket.
* @throws Throwable
*/
//This test method is disabled because it is failing
//periodically and causing cruise control failures
//See bug #46748
public void testCrashDuringRedundancySatisfaction() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
SerializableRunnable createPRs = new SerializableRunnable("region1") {
public void run() {
Cache cache = getCache();
DiskStore ds = cache.findDiskStore("disk");
if(ds == null) {
ds = cache.createDiskStoreFactory()
.setDiskDirs(getDiskDirs()).create("disk");
}
AttributesFactory af = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
//Workaround for 44414 - disable recovery delay so we shutdown
//vm1 at a predictable point.
paf.setRecoveryDelay(-1);
paf.setStartupRecoveryDelay(-1);
af.setPartitionAttributes(paf.create());
af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
af.setDiskStoreName("disk");
cache.createRegion(PR_REGION_NAME, af.create());
paf.setColocatedWith(PR_REGION_NAME);
af.setPartitionAttributes(paf.create());
cache.createRegion("region2", af.create());
}
};
//Create the PR on vm0
vm0.invoke(createPRs);
//Create some buckets.
createData(vm0, 0, NUM_BUCKETS, "a");
createData(vm0, 0, NUM_BUCKETS, "a", "region2");
vm1.invoke(createPRs);
//We shouldn't have created any buckets in vm1 yet.
assertEquals(Collections.emptySet(), getBucketList(vm1));
//Add an observer that will disconnect before allowing the peer to
//GII a colocated bucket. This should leave the peer with only the parent
//bucket
vm0.invoke(new SerializableRunnable() {
public void run() {
DistributionMessageObserver.setInstance(new DistributionMessageObserver() {
@Override
public void beforeProcessMessage(DistributionManager dm,
DistributionMessage message) {
if(message instanceof RequestImageMessage) {
if(((RequestImageMessage) message).regionPath.contains("region2")) {
DistributionMessageObserver.setInstance(null);
disconnectFromDS();
}
}
}
});
}
});
ExpectedException ex = addExpectedException("PartitionOfflineException", vm1);
try {
//Do a rebalance to create buckets in vm1. THis will cause vm0 to disconnect
//as we satisfy redundancy with vm1.
try {
RebalanceResults rr = rebalance(vm1);
} catch(Exception expected) {
//We expect to see a partition offline exception because of the
//disconnect
if(!(expected.getCause() instanceof PartitionOfflineException)) {
throw expected;
}
}
//Wait for vm0 to be closed by the callback
vm0.invoke(new SerializableCallable() {
public Object call() throws Exception {
waitForCriterion(new WaitCriterion() {
public boolean done() {
InternalDistributedSystem ds = system;
return ds == null || !ds.isConnected();
}
public String description() {
return "DS did not disconnect";
}
}, MAX_WAIT, 100, true);
return null;
}
});
//close the cache in vm1
SerializableCallable disconnectFromDS = new SerializableCallable() {
public Object call() throws Exception {
disconnectFromDS();
return null;
}
};
vm1.invoke(disconnectFromDS);
//Make sure vm0 is disconnected. This avoids a race where we
//may still in the process of disconnecting even though the our async listener
//found the system was disconnected
vm0.invoke(disconnectFromDS);
} finally {
ex.remove();
}
//Create the cache and PRs on both members
AsyncInvocation async0 = vm0.invokeAsync(createPRs);
AsyncInvocation async1 = vm1.invokeAsync(createPRs);
async0.getResult(MAX_WAIT);
async1.getResult(MAX_WAIT);
//Make sure the data was recovered correctly
checkData(vm0, 0, NUM_BUCKETS, "a");
// Workaround for bug 46748.
checkData(vm0, 0, NUM_BUCKETS, "a", "region2");
}
/**
* Test what happens when we restart persistent members while
* there is an accessor concurrently performing puts. This is for bug 43899
*/
public void testRecoverySystemWithConcurrentPutter() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
//Define all of the runnables used in this test
//runnable to create accessors
SerializableRunnable createAccessor = new SerializableRunnable("createAccessor") {
public void run() {
Cache cache = getCache();
AttributesFactory af = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
paf.setLocalMaxMemory(0);
af.setPartitionAttributes(paf.create());
af.setDataPolicy(DataPolicy.PARTITION);
cache.createRegion(PR_REGION_NAME, af.create());
paf.setColocatedWith(PR_REGION_NAME);
af.setPartitionAttributes(paf.create());
cache.createRegion("region2", af.create());
}
};
//runnable to create PRs
SerializableRunnable createPRs = new SerializableRunnable("region1") {
public void run() {
Cache cache = getCache();
DiskStore ds = cache.findDiskStore("disk");
if(ds == null) {
ds = cache.createDiskStoreFactory()
.setDiskDirs(getDiskDirs()).create("disk");
}
AttributesFactory af = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
af.setPartitionAttributes(paf.create());
af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
af.setDiskStoreName("disk");
cache.createRegion(PR_REGION_NAME, af.create());
paf.setColocatedWith(PR_REGION_NAME);
af.setPartitionAttributes(paf.create());
cache.createRegion("region2", af.create());
}
};
//runnable to close the cache.
SerializableRunnable closeCache = new SerializableRunnable("region1") {
public void run() {
closeCache();
}
};
//Runnable to do a bunch of puts handle exceptions
//due to the fact that member is offline.
SerializableRunnable doABunchOfPuts = new SerializableRunnable("region1") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion(PR_REGION_NAME);
try {
for(int i =0;;i++) {
try {
region.get(i % NUM_BUCKETS);
} catch(PartitionOfflineException expected) {
//do nothing.
} catch(PartitionedRegionStorageException expected) {
//do nothing.
}
Thread.yield();
}
} catch(CacheClosedException expected) {
//ok, we're done.
}
}
};
//Runnable to clean up disk dirs on a members
SerializableRunnable cleanDiskDirs = new SerializableRunnable("Clean disk dirs") {
public void run() {
try {
cleanDiskDirs();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
};
//Create the PR two members
vm1.invoke(createPRs);
vm2.invoke(createPRs);
//create the accessor.
vm0.invoke(createAccessor);
//Create some buckets.
createData(vm0, 0, NUM_BUCKETS, "a");
createData(vm0, 0, NUM_BUCKETS, "a", "region2");
//backup the system. We use this to get a snapshot of vm1 and vm2
//when they both are online. Recovering from this backup simulates
//a simulataneous kill and recovery.
backup(vm3);
//close vm1 and vm2.
vm1.invoke(closeCache);
vm2.invoke(closeCache);
//restore the backup
vm1.invoke(cleanDiskDirs);
vm2.invoke(cleanDiskDirs);
restoreBackup(2);
//in vm0, start doing a bunch of concurrent puts.
AsyncInvocation async0 = vm0.invokeAsync(doABunchOfPuts);
//This recovery should not hang (that's what we're testing for
//here.
AsyncInvocation async1 = vm1.invokeAsync(createPRs);
AsyncInvocation async2 = vm2.invokeAsync(createPRs);
async1.getResult(MAX_WAIT);
async2.getResult(MAX_WAIT);
//close the cache in vm0 to stop the async puts.
vm0.invoke(closeCache);
//make sure we didn't get an exception
async0.getResult(MAX_WAIT);
}
public void testRebalanceWithOfflineChildRegion() throws Throwable {
SerializableRunnable createParentPR = new SerializableRunnable() {
public void run() {
Cache cache = getCache();
DiskStore ds = cache.findDiskStore("disk");
if(ds == null) {
ds = cache.createDiskStoreFactory()
.setDiskDirs(getDiskDirs()).create("disk");
}
AttributesFactory af = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(0);
paf.setRecoveryDelay(0);
af.setPartitionAttributes(paf.create());
af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
af.setDiskStoreName("disk");
cache.createRegion(PR_REGION_NAME, af.create());
}
};
SerializableRunnable createChildPR = new SerializableRunnable() {
public void run() {
Cache cache = getCache();
AttributesFactory af = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(0);
paf.setRecoveryDelay(0);
paf.setColocatedWith(PR_REGION_NAME);
af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
af.setDiskStoreName("disk");
af.setPartitionAttributes(paf.create());
cache.createRegion("region2", af.create());
}
};
rebalanceWithOfflineChildRegion(createParentPR, createChildPR);
}
/**
* Test that a rebalance will regions are in the middle of recovery
* doesn't cause issues.
*
* This is slightly different than {{@link #testRebalanceWithOfflineChildRegion()}
* because in this case all of the regions have been created, but
* they are in the middle of actually recovering buckets from disk.
*/
public void testRebalanceDuringRecovery() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
SerializableRunnable createPRs = new SerializableRunnable() {
public void run() {
Cache cache = getCache();
DiskStore ds = cache.findDiskStore("disk");
if(ds == null) {
ds = cache.createDiskStoreFactory()
.setDiskDirs(getDiskDirs()).create("disk");
}
AttributesFactory af = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(1);
paf.setRecoveryDelay(-1);
af.setPartitionAttributes(paf.create());
af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
af.setDiskStoreName("disk");
cache.createRegion(PR_REGION_NAME, af.create());
paf.setRedundantCopies(1);
paf.setRecoveryDelay(-1);
paf.setColocatedWith(PR_REGION_NAME);
af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
af.setDiskStoreName("disk");
af.setPartitionAttributes(paf.create());
cache.createRegion("region2", af.create());
}
};
//Create the PRs on two members
vm0.invoke(createPRs);
vm1.invoke(createPRs);
//Create some buckets.
createData(vm0, 0, NUM_BUCKETS, "a");
createData(vm0, 0, NUM_BUCKETS, "a", "region2");
//Close the members
closeCache(vm1);
closeCache(vm0);
SerializableRunnable addHook = new SerializableRunnable() {
@Override
public void run() {
PartitionedRegionObserverHolder.setInstance(new PRObserver());
}
};
SerializableRunnable waitForHook = new SerializableRunnable() {
@Override
public void run() {
PRObserver observer = (PRObserver) PartitionedRegionObserverHolder.getInstance();
try {
observer.waitForCreate();
} catch (InterruptedException e) {
fail("interrupted", e);
}
}
};
SerializableRunnable removeHook = new SerializableRunnable() {
@Override
public void run() {
PRObserver observer = (PRObserver) PartitionedRegionObserverHolder.getInstance();
observer.release();
PartitionedRegionObserverHolder.setInstance(new PartitionedRegionObserverAdapter());
}
};
vm1.invoke(addHook);
// vm1.invoke(addHook);
AsyncInvocation async0;
AsyncInvocation async1;
AsyncInvocation async2;
RebalanceResults rebalanceResults;
try {
async0 = vm0.invokeAsync(createPRs);
async1 = vm1.invokeAsync(createPRs);
vm1.invoke(waitForHook);
// vm1.invoke(waitForHook);
//Now create the parent region on vm-2. vm-2 did not
//previous host the child region.
vm2.invoke(createPRs);
//Try to forcibly move some buckets to vm2 (this should not succeed).
moveBucket(0, vm1, vm2);
moveBucket(1, vm1, vm2);
} finally {
vm1.invoke(removeHook);
// vm1.invoke(removeHook);
}
async0.getResult(MAX_WAIT);
async1.getResult(MAX_WAIT);
//Validate the data
checkData(vm0, 0, NUM_BUCKETS, "a");
checkData(vm0, 0, NUM_BUCKETS, "a", "region2");
//Make sure we can actually use the buckets in the child region.
createData(vm0, 0, NUM_BUCKETS, "c", "region2");
//Make sure the system is recoverable
//by restarting it
closeCache(vm2);
closeCache(vm1);
closeCache(vm0);
async0 = vm0.invokeAsync(createPRs);
async1 = vm1.invokeAsync(createPRs);
async2 = vm2.invokeAsync(createPRs);
async0.getResult();
async1.getResult();
async2.getResult();
}
public void testRebalanceWithOfflineChildRegionTwoDiskStores() throws Throwable {
SerializableRunnable createParentPR = new SerializableRunnable() {
public void run() {
Cache cache = getCache();
DiskStore ds = cache.findDiskStore("disk");
if(ds == null) {
ds = cache.createDiskStoreFactory()
.setDiskDirs(getDiskDirs()).create("disk");
}
AttributesFactory af = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(0);
paf.setRecoveryDelay(0);
af.setPartitionAttributes(paf.create());
af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
af.setDiskStoreName("disk");
cache.createRegion(PR_REGION_NAME, af.create());
}
};
SerializableRunnable createChildPR = new SerializableRunnable() {
public void run() {
Cache cache = getCache();
DiskStore ds2 = cache.findDiskStore("disk2");
if(ds2 == null) {
ds2 = cache.createDiskStoreFactory()
.setDiskDirs(getDiskDirs()).create("disk2");
}
AttributesFactory af = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(0);
paf.setRecoveryDelay(0);
paf.setColocatedWith(PR_REGION_NAME);
af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
af.setDiskStoreName("disk2");
af.setPartitionAttributes(paf.create());
cache.createRegion("region2", af.create());
}
};
rebalanceWithOfflineChildRegion(createParentPR, createChildPR);
}
/**
* Test that a user is not allowed to change the colocation of
* a PR with persistent data.
* @throws Throwable
*/
public void testModifyColocation() throws Throwable {
//Create PRs where region3 is colocated with region1.
createColocatedPRs("region1");
//Close everything
closeCache();
//Restart colocated with "region2"
ExpectedException ex = addExpectedException("DiskAccessException|IllegalStateException");
try {
createColocatedPRs("region2");
fail("Should have received an illegal state exception");
} catch(IllegalStateException expected) {
//do nothing
} finally {
ex.remove();
}
//Close everything
closeCache();
//Restart colocated with region1.
//Make sure we didn't screw anything up.
createColocatedPRs("/region1");
//Close everything
closeCache();
//Restart uncolocated. We don't allow changing
//from uncolocated to colocated.
ex = addExpectedException("DiskAccessException|IllegalStateException");
try {
createColocatedPRs(null);
fail("Should have received an illegal state exception");
} catch(IllegalStateException expected) {
//do nothing
} finally {
ex.remove();
}
//Close everything
closeCache();
}
/**
* Create three PRs on a VM, named region1, region2, and region3.
* The colocated with attribute describes which region region3
* should be colocated with.
* @param vm0
* @param colocatedWith
*/
private void createColocatedPRs(final String colocatedWith) {
Cache cache = getCache();
DiskStore ds = cache.findDiskStore("disk");
if(ds == null) {
ds = cache.createDiskStoreFactory()
.setDiskDirs(getDiskDirs()).create("disk");
}
AttributesFactory af = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(0);
af.setPartitionAttributes(paf.create());
af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
af.setDiskStoreName("disk");
cache.createRegion("region1", af.create());
cache.createRegion("region2", af.create());
if(colocatedWith != null) {
paf.setColocatedWith(colocatedWith);
}
af.setPartitionAttributes(paf.create());
cache.createRegion("region3", af.create());
}
/**
* Test for bug 43570. Rebalance a persistent parent PR before we recover
* the persistent child PR from disk.
* @throws Throwable
*/
public void rebalanceWithOfflineChildRegion(SerializableRunnable createParentPR, SerializableRunnable createChildPR) throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
//Create the PRs on two members
vm0.invoke(createParentPR);
vm1.invoke(createParentPR);
vm0.invoke(createChildPR);
vm1.invoke(createChildPR);
//Create some buckets.
createData(vm0, 0, NUM_BUCKETS, "a");
createData(vm0, 0, NUM_BUCKETS, "a", "region2");
//Close the members
closeCache(vm1);
closeCache(vm0);
//Recreate the parent region. Try to make sure that
//the member with the latest copy of the buckets
//is the one that decides to throw away it's copy
//by starting it last.
AsyncInvocation async0 = vm0.invokeAsync(createParentPR);
AsyncInvocation async1 = vm1.invokeAsync(createParentPR);
async0.getResult(MAX_WAIT);
async1.getResult(MAX_WAIT);
//Now create the parent region on vm-2. vm-2 did not
//previous host the child region.
vm2.invoke(createParentPR);
//Rebalance the parent region.
//This should not move any buckets, because
//we haven't recovered the child region
RebalanceResults rebalanceResults = rebalance(vm2);
assertEquals(0, rebalanceResults.getTotalBucketTransfersCompleted());
//Recreate the child region.
async1 = vm1.invokeAsync(createChildPR);
async0 = vm0.invokeAsync(createChildPR);
AsyncInvocation async2 = vm2.invokeAsync(createChildPR);
async0.getResult(MAX_WAIT);
async1.getResult(MAX_WAIT);
async2.getResult(MAX_WAIT);
//Validate the data
checkData(vm0, 0, NUM_BUCKETS, "a");
checkData(vm0, 0, NUM_BUCKETS, "a", "region2");
//Make sure we can actually use the buckets in the child region.
createData(vm0, 0, NUM_BUCKETS, "c", "region2");
}
private RebalanceResults rebalance(VM vm) {
return (RebalanceResults) vm.invoke(new SerializableCallable() {
public Object call() throws Exception {
RebalanceOperation op = getCache().getResourceManager().createRebalanceFactory().start();
return op.getResults();
}
});
}
private static class PRObserver extends PartitionedRegionObserverAdapter {
private CountDownLatch rebalanceDone = new CountDownLatch(1);
private CountDownLatch bucketCreateStarted = new CountDownLatch(3);
@Override
public void beforeBucketCreation(PartitionedRegion region, int bucketId) {
if(region.getName().contains("region2")) {
bucketCreateStarted.countDown();
waitForRebalance();
}
}
private void waitForRebalance() {
try {
if(!rebalanceDone.await(MAX_WAIT, TimeUnit.SECONDS)) {
fail("Failed waiting for the rebalance to start");
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public void waitForCreate() throws InterruptedException {
if(!bucketCreateStarted.await(MAX_WAIT, TimeUnit.SECONDS)) {
fail("Failed waiting for bucket creation to start");
}
}
public void release() {
rebalanceDone.countDown();
}
}
}