blob: 981a58e038a2cf8a7671d9ef97ec363b11afa0e3 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache;
import java.util.Collections;
import java.util.Set;
import java.util.TreeSet;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheClosedException;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.DiskStore;
import com.gemstone.gemfire.cache.PartitionAttributesFactory;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache30.CacheTestCase;
import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.distributed.internal.DistributionMessage;
import com.gemstone.gemfire.distributed.internal.DistributionMessageObserver;
import com.gemstone.gemfire.internal.cache.partitioned.ManageBucketMessage;
import com.gemstone.gemfire.internal.cache.partitioned.ManageBucketMessage.ManageBucketReplyMessage;
import dunit.AsyncInvocation;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.RMIException;
import dunit.SerializableCallable;
import dunit.SerializableRunnable;
import dunit.VM;
/**
* @author dsmith
*
* Test to make sure that we can handle
* a crash of the member directing bucket creation.
*/
public class Bug41733DUnitTest extends CacheTestCase {
public Bug41733DUnitTest(String name) {
super(name);
}
@Override
public void tearDown2() throws Exception {
disconnectAllFromDS();
super.tearDown2();
}
/**
* Test the we can handle a member departing after creating
* a bucket on the remote node but before we choose a primary
*/
public void testCrashAfterBucketCreation() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
vm0.invoke(new SerializableRunnable("Install observer") {
public void run() {
DistributionMessageObserver.setInstance(new DistributionMessageObserver() {
@Override
public void beforeProcessMessage(DistributionManager dm,
DistributionMessage message) {
if(message instanceof ManageBucketReplyMessage) {
DistributedTestCase.disconnectFromDS();
}
}
});
}
});
createPR(vm0, 0);
//Create a couple of buckets in VM0. This will make sure
//the next bucket we create will be created in VM 1.
putData(vm0, 0, 2, "a");
createPR(vm1, 0);
//Trigger a bucket creation in VM1, which should cause vm0 to close it's cache.
try {
putData(vm0, 3, 4, "a");
fail("should have received a cache closed exception");
} catch(RMIException e) {
if(!(e.getCause() instanceof DistributedSystemDisconnectedException)) {
throw e;
}
}
assertEquals(Collections.singleton(3), getBucketList(vm1));
//This shouldn't hang, because the bucket creation should finish,.
putData(vm1, 3, 4, "a");
}
/**
* Test the we can handle a member departing while we are
* in the process of creating the bucket on the remote node.
*/
public void testCrashDuringBucketCreation() throws Throwable {
Host host = Host.getHost(0);
final VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
vm1.invoke(new SerializableRunnable("Install observer") {
public void run() {
DistributionMessageObserver.setInstance(new DistributionMessageObserver() {
@Override
public void beforeProcessMessage(DistributionManager dm,
DistributionMessage message) {
if(message instanceof ManageBucketMessage) {
vm0.invoke(DistributedTestCase.class, "disconnectFromDS");
}
}
});
}
});
createPR(vm0, 0);
//Create a couple of buckets in VM0. This will make sure
//the next bucket we create will be created in VM 1.
putData(vm0, 0, 2, "a");
createPR(vm1, 0);
//Trigger a bucket creation in VM1, which should cause vm0 to close it's cache.
try {
putData(vm0, 3, 4, "a");
fail("should have received a cache closed exception");
} catch(RMIException e) {
if(!(e.getCause() instanceof DistributedSystemDisconnectedException)) {
throw e;
}
}
assertEquals(Collections.singleton(3), getBucketList(vm1));
//This shouldn't hang, because the bucket creation should finish,.
putData(vm1, 3, 4, "a");
}
/**
* @param vm0
* @param i
*/
private void createPR(VM vm0, final int redundancy) {
vm0.invoke(new SerializableRunnable("Create PR") {
public void run() {
Cache cache = getCache();
AttributesFactory af = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(redundancy);
af.setPartitionAttributes(paf.create());
af.setDataPolicy(DataPolicy.PARTITION);
cache.createRegion("region", af.create());
}
});
}
protected void putData(VM vm, final int startKey, final int endKey,
final String value) {
SerializableRunnable createData = new SerializableRunnable() {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion("region");
for(int i =startKey; i < endKey; i++) {
region.put(i, value);
}
}
};
vm.invoke(createData);
}
protected Set<Integer> getBucketList(VM vm0) {
return getBucketList(vm0, "region");
}
protected Set<Integer> getBucketList(VM vm0, final String regionName) {
SerializableCallable getBuckets = new SerializableCallable("get buckets") {
public Object call() throws Exception {
Cache cache = getCache();
PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName);
return new TreeSet<Integer>(region.getDataStore().getAllLocalBucketIds());
}
};
return (Set<Integer>) vm0.invoke(getBuckets);
}
}