blob: 5da281b8e62954fd819ab91876f0649e022401fd [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.PartitionAttributes;
import com.gemstone.gemfire.cache.PartitionAttributesFactory;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache30.CacheTestCase;
import com.gemstone.gemfire.internal.cache.control.InternalResourceManager;
import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceObserverAdapter;
import dunit.Host;
import dunit.SerializableCallable;
import dunit.SerializableRunnable;
import dunit.VM;
/**
* @author dsmith
*
*/
@SuppressWarnings("synthetic-access")
public class PartitionedRegionDelayedRecoveryDUnitTest extends CacheTestCase {
public PartitionedRegionDelayedRecoveryDUnitTest(String name) {
super(name);
}
@Override
public void tearDown2() throws Exception {
super.tearDown2();
invokeInEveryVM(new SerializableRunnable() {
public void run() {
InternalResourceManager.setResourceObserver(null);
}
});
InternalResourceManager.setResourceObserver(null);
}
public void testNoRecovery() throws Exception {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
SerializableRunnable createPrRegions = new SerializableRunnable("createRegions") {
public void run()
{
Cache cache = getCache();
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRecoveryDelay(-1);
paf.setStartupRecoveryDelay(-1);
paf.setRedundantCopies(1);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
cache.createRegion("region1", attr.create());
}
};
//create the region in 2 VMS
vm0.invoke(createPrRegions);
vm1.invoke(createPrRegions);
//Do 1 put, which should create 1 bucket on both Vms
vm0.invoke(new SerializableRunnable("putData") {
public void run() {
Cache cache = getCache();
PartitionedRegion region1 = (PartitionedRegion) cache.getRegion("region1");
region1.put("A", "B");
}
});
//create the PR on another region, which won't have the bucket
vm2.invoke(createPrRegions);
//destroy the region in 1 of the VM's that's hosting the bucket
vm1.invoke(new SerializableRunnable("Destroy region") {
public void run() {
Cache cache = getCache();
PartitionedRegion region1 = (PartitionedRegion) cache.getRegion("region1");
region1.localDestroyRegion();
}
});
//check to make sure we didn't make a copy of the low redundancy bucket
SerializableRunnable checkNoBucket = new SerializableRunnable("Check for bucket") {
public void run() {
Cache cache = getCache();
PartitionedRegion region1 = (PartitionedRegion) cache.getRegion("region1");
assertEquals(0,region1.getDataStore().getBucketsManaged());
}
};
//Wait for a bit, maybe the region will try to make a copy of the bucket
Thread.sleep(1000);
vm2.invoke(checkNoBucket);
//recreate the region on VM1
vm1.invoke(createPrRegions);
//Wait for a bit, maybe the region will try to make a copy of the bucket
Thread.sleep(1000);
vm1.invoke(checkNoBucket);
vm2.invoke(checkNoBucket);
}
public void testDelay() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
SerializableRunnable createPrRegions = new SerializableRunnable("createRegions") {
public void run()
{
final CountDownLatch rebalancingFinished = new CountDownLatch(1);
InternalResourceManager.setResourceObserver(new ResourceObserverAdapter(){
@Override
public void rebalancingOrRecoveryFinished(Region region) {
rebalancingFinished.countDown();
}
});
try {
Cache cache = getCache();
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRecoveryDelay(5000);
paf.setRedundantCopies(1);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
cache.createRegion("region1", attr.create());
if(!rebalancingFinished.await(60000, TimeUnit.MILLISECONDS)) {
fail("Redundancy recovery did not happen within 60 seconds");
}
} catch (InterruptedException e) {
fail("interrupted", e);
} finally {
InternalResourceManager.setResourceObserver(null);
}
}
};
//create the region in 2 VMS
vm0.invoke(createPrRegions);
vm1.invoke(createPrRegions);
//Do 1 put, which should create 1 bucket
vm0.invoke(new SerializableRunnable("putData") {
public void run() {
Cache cache = getCache();
PartitionedRegion region1 = (PartitionedRegion) cache.getRegion("region1");
region1.put("A", "B");
}
});
//create the region in a third VM, which won't have any buckets
vm2.invoke(createPrRegions);
//close 1 cache, which should make the bucket drop below
//the expected redundancy level.
vm1.invoke(new SerializableRunnable("close cache") {
public void run() {
Cache cache = getCache();
cache.close();
}
});
long elapsed = waitForBucketRecovery(vm2, 1);
assertTrue("Did not wait at least 5 seconds to create the bucket. Elapsed=" + elapsed, elapsed >= 5000);
}
public void testStartupDelay() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
SerializableRunnable createPrRegions = new SerializableRunnable("createRegions") {
public void run()
{
Cache cache = getCache();
InternalResourceManager.setResourceObserver(new MyResourceObserver());
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setStartupRecoveryDelay(5000);
paf.setRedundantCopies(1);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
cache.createRegion("region1", attr.create());
}
};
//create the region in 2 VMS
vm0.invoke(createPrRegions);
vm1.invoke(createPrRegions);
//Do 1 put, which should create 1 bucket
vm0.invoke(new SerializableRunnable("putData") {
public void run() {
Cache cache = getCache();
PartitionedRegion region1 = (PartitionedRegion) cache.getRegion("region1");
region1.put(Integer.valueOf(1), "B");
region1.put(Integer.valueOf(2), "B");
region1.put(Integer.valueOf(3), "B");
region1.put(Integer.valueOf(4), "B");
}
});
//close 1 cache, which should make the bucket drop below
//the expected redundancy level.
vm1.invoke(new SerializableRunnable("close cache") {
public void run() {
Cache cache = getCache();
cache.close();
}
});
final long begin = System.currentTimeMillis();
//create the region in a third VM, which won't have any buckets
vm2.invoke(createPrRegions);
long elapsed = System.currentTimeMillis() - begin;
assertTrue(
"Create region should not have waited to recover redundancy. Elapsed="
+ elapsed, elapsed < 5000);
//wait for the bucket to be copied
elapsed = waitForBucketRecovery(vm2, 4);
assertTrue("Did not wait at least 5 seconds to create the bucket. Elapsed=" + elapsed, elapsed >= 5000);
vm2.invoke(new SerializableCallable("wait for primary move") {
public Object call() throws Exception {
Cache cache = getCache();
MyResourceObserver observer = (MyResourceObserver) InternalResourceManager.getResourceObserver();
observer.waitForRecovery(30, TimeUnit.SECONDS);
PartitionedRegion region1 = (PartitionedRegion) cache.getRegion("region1");
assertEquals(2,region1.getDataStore().getNumberOfPrimaryBucketsManaged());
return null;
}
});
}
private long waitForBucketRecovery(VM vm2, final int numBuckets) {
final long begin = System.currentTimeMillis();
//wait for the bucket to be copied
Long elapsed = (Long) vm2.invoke(new SerializableCallable("putData") {
public Object call() {
Cache cache = getCache();
PartitionedRegion region1 = (PartitionedRegion) cache.getRegion("region1");
while(System.currentTimeMillis() - begin < 30000) {
int bucketsManaged = region1.getDataStore().getBucketsManaged();
if(bucketsManaged == numBuckets) {
break;
} else {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
assertEquals("Did not start managing the bucket within 30 seconds", numBuckets,
region1.getDataStore().getBucketsManaged());
long elapsed = System.currentTimeMillis() - begin;
return Long.valueOf(elapsed);
}
});
return elapsed.longValue();
}
private static class MyResourceObserver extends ResourceObserverAdapter {
CountDownLatch recoveryComplete = new CountDownLatch(1);
public void waitForRecovery(long time, TimeUnit unit) throws InterruptedException {
recoveryComplete.await(time, unit);
}
@Override
public void rebalancingOrRecoveryFinished(Region region) {
recoveryComplete.countDown();
}
}
}