blob: a23776678a3b0ab6c507dca8b093d4ecfc139b89 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.cache.partition;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import com.gemstone.gemfire.InternalGemFireError;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.PartitionAttributes;
import com.gemstone.gemfire.cache.PartitionAttributesFactory;
import com.gemstone.gemfire.cache30.CacheTestCase;
import com.gemstone.gemfire.internal.cache.ForceReattemptException;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import dunit.AsyncInvocation;
import dunit.Host;
import dunit.RMIException;
import dunit.SerializableRunnable;
import dunit.VM;
/**
* Test of the PartitionManager.createPrimaryBucket method.
* @author dsmith
*
*/
public class PartitionManagerDUnitTest extends CacheTestCase {
private static final int BUCKET_ID = 5;
protected static final long CONCURRENT_TIME = 10;
protected static final long MAX_WAIT = 60 * 1000;
public PartitionManagerDUnitTest(String name) {
super(name);
}
public void testDestroyExistingRemote() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
SerializableRunnable createPrRegion = new SerializableRunnable("createRegion") {
public void run()
{
Cache cache = getCache();
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(0);
paf.setRecoveryDelay(-1);
paf.setStartupRecoveryDelay(-1);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
cache.createRegion("region1", attr.create());
}
};
//create the PR
vm0.invoke(createPrRegion);
vm1.invoke(createPrRegion);
//Move the bucket around a couple times. Make sure it moves
createPrimaryBucket(vm0, true, false);
assertNotPrimary(vm1);
createPrimaryBucket(vm1, true, false);
assertNotPrimary(vm0);
//Put something in the bucket
vm1.invoke(new SerializableRunnable() {
public void run() {
Cache cache = getCache();
PartitionedRegion region = (PartitionedRegion) cache.getRegion("region1");
region.put(BUCKET_ID, "B");
}
});
//Make sure we don't mess with the if it's already local
createPrimaryBucket(vm1, true, false);
vm1.invoke(new SerializableRunnable() {
public void run() {
Cache cache = getCache();
PartitionedRegion region = (PartitionedRegion) cache.getRegion("region1");
assertEquals("B", region.get(BUCKET_ID));
}
});
//Make sure we drop the data when we move the bucket
createPrimaryBucket(vm0, true, false);
vm1.invoke(new SerializableRunnable() {
public void run() {
Cache cache = getCache();
PartitionedRegion region = (PartitionedRegion) cache.getRegion("region1");
assertEquals(null, region.get(BUCKET_ID));
}
});
}
public void testDestroyExistingBoth() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
SerializableRunnable createPrRegion = new SerializableRunnable("createRegion") {
public void run()
{
Cache cache = getCache();
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(0);
paf.setRecoveryDelay(-1);
paf.setStartupRecoveryDelay(-1);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
cache.createRegion("region1", attr.create());
}
};
//create the PR
vm0.invoke(createPrRegion);
vm1.invoke(createPrRegion);
//Move the bucket around a couple times. Make sure it moves
createPrimaryBucket(vm0, true, true);
createPrimaryBucket(vm1, true, true);
//Put something in the bucket
vm1.invoke(new SerializableRunnable() {
public void run() {
Cache cache = getCache();
PartitionedRegion region = (PartitionedRegion) cache.getRegion("region1");
region.put(BUCKET_ID, "B");
}
});
//Make sure we do drop the data even if the bucket is already local
createPrimaryBucket(vm1, true, true);
vm1.invoke(new SerializableRunnable() {
public void run() {
Cache cache = getCache();
PartitionedRegion region = (PartitionedRegion) cache.getRegion("region1");
assertEquals(null, region.get(BUCKET_ID));
}
});
}
public void testDestroyExistingLocal() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
SerializableRunnable createPrRegion = new SerializableRunnable("createRegion") {
public void run()
{
Cache cache = getCache();
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(0);
paf.setRecoveryDelay(-1);
paf.setStartupRecoveryDelay(-1);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
cache.createRegion("region1", attr.create());
}
};
//create the PR
vm0.invoke(createPrRegion);
vm1.invoke(createPrRegion);
createPrimaryBucket(vm0, false, true);
//This should throw an exception
try {
createPrimaryBucket(vm1, false, false);
} catch(RMIException e) {
if(!(e.getCause() instanceof IllegalStateException)) {
throw e;
}
}
//Put something in the bucket
vm0.invoke(new SerializableRunnable() {
public void run() {
Cache cache = getCache();
PartitionedRegion region = (PartitionedRegion) cache.getRegion("region1");
region.put(BUCKET_ID, "B");
}
});
//Make sure we do drop the data even if the bucket is already local
createPrimaryBucket(vm0, false, true);
vm1.invoke(new SerializableRunnable() {
public void run() {
Cache cache = getCache();
PartitionedRegion region = (PartitionedRegion) cache.getRegion("region1");
assertEquals(null, region.get(BUCKET_ID));
}
});
}
public void testDestroyExistingNeither() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
SerializableRunnable createPrRegion = new SerializableRunnable("createRegion") {
public void run()
{
Cache cache = getCache();
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(0);
paf.setRecoveryDelay(-1);
paf.setStartupRecoveryDelay(-1);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
cache.createRegion("region1", attr.create());
}
};
//create the PR
vm0.invoke(createPrRegion);
vm1.invoke(createPrRegion);
//Move the bucket around a couple times. Make sure it moves
createPrimaryBucket(vm0, false, false);
//This should throw an exception
try {
createPrimaryBucket(vm1, false, false);
} catch(RMIException e) {
if(!(e.getCause() instanceof IllegalStateException)) {
throw e;
}
}
//So should this
try {
createPrimaryBucket(vm0, false, false);
} catch(RMIException e) {
if(!(e.getCause() instanceof IllegalStateException)) {
throw e;
}
}
//Put something in the bucket
vm0.invoke(new SerializableRunnable() {
public void run() {
Cache cache = getCache();
PartitionedRegion region = (PartitionedRegion) cache.getRegion("region1");
region.put(BUCKET_ID, "B");
}
});
//Make sure we do drop the data even if the bucket is already local
createPrimaryBucket(vm0, false, true);
vm1.invoke(new SerializableRunnable() {
public void run() {
Cache cache = getCache();
PartitionedRegion region = (PartitionedRegion) cache.getRegion("region1");
assertEquals(null, region.get(BUCKET_ID));
}
});
}
private void createPrimaryBucket(VM vm0, final boolean destroyRemote, final boolean destroyLocal) {
vm0.invoke(new SerializableRunnable("Move PR") {
public void run() {
Cache cache = getCache();
final PartitionedRegion region = (PartitionedRegion) cache.getRegion("region1");
PartitionManager.createPrimaryBucket(region, BUCKET_ID, destroyRemote, destroyLocal);
//There is a race here that someone else on this same node could
//trigger the bucket creation. If that happens, we won't create the bucket,
//but getPrimary may not wait for the bucket either.
//if we were doing a put, it would reattempt the put until the bucket was
//created. In this case, we just want to wait for the bucket to have a primary
waitForCriterion(new WaitCriterion() {
public boolean done() {
return region.getBucketPrimary(BUCKET_ID) != null;
}
public String description() {
return null;
}
}, 10000, 100, false);
//Make sure we really are the primary now
assertEquals(cache.getDistributedSystem().getDistributedMember(), region.getBucketPrimary(BUCKET_ID));
//Make sure there is only 1 bucket owner
Set owners = region.getDataStore().getLocalBucketById(BUCKET_ID).getBucketOwners();
assertEquals(1, owners.size());
}
});
}
private void assertNotPrimary(VM vm0) {
vm0.invoke(new SerializableRunnable("Move PR") {
public void run() {
Cache cache = getCache();
PartitionedRegion region = (PartitionedRegion) cache.getRegion("region1");
//Make sure we're not the primary
assertFalse(region.getBucketPrimary(BUCKET_ID).equals(cache.getDistributedSystem().getDistributedMember()));
}
});
}
// public void testLoop() throws Throwable {
// for(int i=0 ;i < 50; i++) {
// getLogWriter().info("test iteration #" + i);
// doTestConcurrent();
// tearDown();
// setUp();
// }
// }
public void testConcurrent() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
SerializableRunnable createPrRegion = new SerializableRunnable("createRegion") {
public void run()
{
Cache cache = getCache();
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(0);
paf.setRecoveryDelay(-1);
paf.setStartupRecoveryDelay(-1);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
cache.createRegion("region1", attr.create());
}
};
vm0.invoke(createPrRegion);
vm1.invoke(createPrRegion);
SerializableRunnable hungryHungryPrimary = new SerializableRunnable("Try to grab the primary a lot") {
public void run()
{
Cache cache = getCache();
PartitionedRegion region = (PartitionedRegion) cache.getRegion("region1");
long start = System.nanoTime();
while(TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < CONCURRENT_TIME) {
PartitionManager.createPrimaryBucket(region, BUCKET_ID, true, false);
}
}
};
SerializableRunnable lotsOfPuts= new SerializableRunnable("A bunch of puts") {
public void run()
{
Cache cache = getCache();
PartitionedRegion region = (PartitionedRegion) cache.getRegion("region1");
long start = System.nanoTime();
while(TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < CONCURRENT_TIME) {
//Do a put which might trigger bucket creation
try {
region.put(BUCKET_ID, "B");
} catch (InternalGemFireError e) {
if (e.getMessage().contains("recordVersion invoked for a local version tag that is higher")) {
// bug #50566 encountered. This can happen in this test that aggressively moves
// buckets around w/o persistence, causing a node to create, destroy and recreate
// the same bucket while the operation is in progress. Not something likely
// to happen to a customer.
}
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
fail("", e);
}
}
}
};
AsyncInvocation async0 = vm0.invokeAsync(hungryHungryPrimary);
AsyncInvocation async1 = vm1.invokeAsync(hungryHungryPrimary);
AsyncInvocation async0_2 = vm0.invokeAsync(lotsOfPuts);
AsyncInvocation async1_2 = vm1.invokeAsync(lotsOfPuts);
async0.getResult(MAX_WAIT);
async1.getResult(MAX_WAIT);
async0_2.getResult(MAX_WAIT);
async1_2.getResult(MAX_WAIT);
vm0.invoke(new SerializableRunnable("Check the number of owners") {
public void run()
{
Cache cache = getCache();
PartitionedRegion region = (PartitionedRegion) cache.getRegion("region1");
List owners;
try {
owners = region.getBucketOwnersForValidation(BUCKET_ID);
assertEquals(1, owners.size());
} catch (ForceReattemptException e) {
fail("shouldn't have seen force reattempt", e);
}
}
});
}
public void testConcurrentWithPuts() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
SerializableRunnable createPrRegion = new SerializableRunnable("createRegion") {
public void run()
{
Cache cache = getCache();
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(0);
paf.setRecoveryDelay(-1);
paf.setStartupRecoveryDelay(-1);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
cache.createRegion("region1", attr.create());
}
};
vm0.invoke(createPrRegion);
vm1.invoke(createPrRegion);
SerializableRunnable lotsOfPuts= new SerializableRunnable("A bunch of puts") {
public void run()
{
Cache cache = getCache();
PartitionedRegion region = (PartitionedRegion) cache.getRegion("region1");
long start = System.nanoTime();
while(TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < CONCURRENT_TIME) {
//Do a put which might trigger bucket creation
region.put(BUCKET_ID, "B");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
fail("", e);
}
}
}
};
AsyncInvocation async0_2 = vm0.invokeAsync(lotsOfPuts);
AsyncInvocation async1_2 = vm1.invokeAsync(lotsOfPuts);
long start = System.nanoTime();
while(TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < CONCURRENT_TIME) {
createPrimaryBucket(vm0, true, false);
createPrimaryBucket(vm1, true, false);
}
async0_2.getResult(MAX_WAIT);
async1_2.getResult(MAX_WAIT);
vm0.invoke(new SerializableRunnable("Check the number of owners") {
public void run()
{
Cache cache = getCache();
PartitionedRegion region = (PartitionedRegion) cache.getRegion("region1");
List owners;
try {
owners = region.getBucketOwnersForValidation(BUCKET_ID);
assertEquals(1, owners.size());
} catch (ForceReattemptException e) {
fail("shouldn't have seen force reattempt", e);
}
}
});
}
}