blob: 8b3ae52030f79ac9f913e76f28970cbaf1157a01 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache.partitioned;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.junit.Ignore;
import com.gemstone.gemfire.DataSerializable;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheClosedException;
import com.gemstone.gemfire.cache.CustomExpiry;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.DiskAccessException;
import com.gemstone.gemfire.cache.DiskStore;
import com.gemstone.gemfire.cache.EvictionAction;
import com.gemstone.gemfire.cache.EvictionAttributes;
import com.gemstone.gemfire.cache.ExpirationAction;
import com.gemstone.gemfire.cache.ExpirationAttributes;
import com.gemstone.gemfire.cache.PartitionAttributesFactory;
import com.gemstone.gemfire.cache.PartitionedRegionStorageException;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.Region.Entry;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.RegionFactory;
import com.gemstone.gemfire.cache.RegionShortcut;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.client.PoolFactory;
import com.gemstone.gemfire.cache.client.PoolManager;
import com.gemstone.gemfire.cache.client.ServerOperationException;
import com.gemstone.gemfire.cache.execute.Function;
import com.gemstone.gemfire.cache.execute.FunctionContext;
import com.gemstone.gemfire.cache.execute.FunctionService;
import com.gemstone.gemfire.cache.persistence.ConflictingPersistentDataException;
import com.gemstone.gemfire.cache.persistence.PartitionOfflineException;
import com.gemstone.gemfire.cache.persistence.RevokeFailedException;
import com.gemstone.gemfire.cache.persistence.RevokedPersistentDataException;
import com.gemstone.gemfire.cache.query.QueryException;
import com.gemstone.gemfire.cache.server.CacheServer;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.distributed.internal.DistributionMessage;
import com.gemstone.gemfire.distributed.internal.DistributionMessageObserver;
import com.gemstone.gemfire.distributed.internal.ReplyException;
import com.gemstone.gemfire.internal.AvailablePort;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.InitialImageOperation.RequestImageMessage;
import com.gemstone.gemfire.internal.cache.control.InternalResourceManager;
import com.gemstone.gemfire.internal.cache.partitioned.ManageBucketMessage.ManageBucketReplyMessage;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import dunit.AsyncInvocation;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.RMIException;
import dunit.SerializableCallable;
import dunit.SerializableRunnable;
import dunit.VM;
/**
* Tests the basic use cases for PR persistence.
* @author dsmith
*
*/
public class PersistentPartitionedRegionDUnitTest extends PersistentPartitionedRegionTestBase {
private static final int NUM_BUCKETS = 15;
//This must be bigger than the dunit ack-wait-threshold for the revoke
//tests. The command line is setting the ack-wait-threshold to be
//60 seconds.
private static final int MAX_WAIT = 65 * 1000;
public PersistentPartitionedRegionDUnitTest(String name) {
super(name);
}
/**
* A simple test case that we are actually
* persisting with a PR.
*/
public void testSinglePR() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
createPR(vm0, 0);
createData(vm0, 0, 1, "a");
Set<Integer> vm0Buckets = getBucketList(vm0);
// closePR(vm0);
closeCache(vm0);
createPR(vm0, 0);
assertEquals(vm0Buckets,getBucketList(vm0));
checkData(vm0, 0, 1, "a");
localDestroyPR(vm0);
closeCache(vm0);
createPR(vm0, 0);
//Make sure the data is now missing
checkData(vm0, 0, 1, null);
}
/**
* Test total-buckets-num getting bigger, which cause exception.
* but changed to smaller should be ok.
*/
public void testChangedToalBucketNumberSinglePR() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
createPR(vm0, 0, 0, 5);
createData(vm0, 0, 5, "a");
closeCache(vm0);
ExpectedException expect = addExpectedException("IllegalStateException", vm0);
expect = addExpectedException("DiskAccessException", vm0);
try {
createPR(vm0, 0, 0, 2);
fail("Expect to get java.lang.IllegalStateException, but it did not");
} catch (RMIException exp) {
assertTrue(exp.getCause() instanceof IllegalStateException);
IllegalStateException ise = (IllegalStateException)exp.getCause();
Object[] prms = new Object[] { "/"+PR_REGION_NAME, 2, 5 };
assertTrue(ise.getMessage().contains(LocalizedStrings.PartitionedRegion_FOR_REGION_0_TotalBucketNum_1_SHOULD_NOT_BE_CHANGED_Previous_Configured_2.toString(prms)));
}
closeCache(vm0);
try {
createPR(vm0, 0, 0, 10);
fail("Expect to get java.lang.IllegalStateException, but it did not");
} catch (RMIException exp) {
assertTrue(exp.getCause() instanceof IllegalStateException);
IllegalStateException ise = (IllegalStateException)exp.getCause();
Object[] prms = new Object[] { "/"+PR_REGION_NAME, 10, 5 };
assertTrue(ise.getMessage().contains(LocalizedStrings.PartitionedRegion_FOR_REGION_0_TotalBucketNum_1_SHOULD_NOT_BE_CHANGED_Previous_Configured_2.toString(prms)));
}
expect.remove();
}
/**
* Test for bug 44184
*/
public void testSinglePRWithCustomExpiry() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(1);
SerializableRunnable createPR = new SerializableRunnable() {
public void run() {
Cache cache = getCache();
DiskStore ds = cache.findDiskStore("disk");
if(ds == null) {
ds = cache.createDiskStoreFactory()
.setDiskDirs(getDiskDirs()).create("disk");
}
AttributesFactory af = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
af.setPartitionAttributes(paf.create());
af.setCustomEntryIdleTimeout(new TestCustomExpiration());
af.setEntryIdleTimeout(new ExpirationAttributes(60, ExpirationAction.INVALIDATE));
af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
af.setDiskStoreName("disk");
RegionAttributes attr = af.create();
cache.createRegion(PR_REGION_NAME, attr);
}
};
vm0.invoke(createPR);
createData(vm0, 0, 1, "a");
Set<Integer> vm0Buckets = getBucketList(vm0);
// closePR(vm0);
closeCache(vm0);
vm0.invoke(createPR);
assertEquals(vm0Buckets,getBucketList(vm0));
checkData(vm0, 0, 1, "a");
}
/**
* Test to make sure that we can recover
* from a complete system shutdown with redundancy
* 0
* @throws Throwable
*/
public void testTotalRecoverRedundancy0() throws Throwable {
totalRecoverTest(0);
}
/**
* Test to make sure that we can recover
* from a complete system shutdown with redundancy
* 1
* @throws Throwable
*/
public void testTotalRecoverRedundancy1() throws Throwable {
totalRecoverTest(1);
}
private static boolean FAIL_IN_THIS_VM = false;
/**
* Test for bug #49972 - handle a serialization error in the
* async writer thread.
*/
@Ignore("Bug 50376")
public void DISABLED_testBadSerializationInAsyncThread() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
final int numBuckets = 50;
vm0.invoke(new SerializableRunnable() {
@Override
public void run() {
FAIL_IN_THIS_VM=true;
}
});
ExpectedException expected1 = addExpectedException("Fatal error from asynch");
ExpectedException expected2 = addExpectedException("ToDataException");
try {
int redundancy=1;
createPR(vm0, redundancy, -1, 113, false);
createPR(vm2, redundancy, -1, 113, false);
//Trigger bucket creation
createData(vm0, 0, numBuckets, "a");
createPR(vm1, redundancy, -1, 113, false);
//write objects which will fail serialization in async writer thread.
vm0.invoke(new SerializableRunnable() {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion(PR_REGION_NAME);
try {
for(int i=0;i<numBuckets; i++) {
region.put(i, new BadSerializer());
//this will trigger a deserialiation (could have also done this put with a function I guess.
region.get(i);
}
} catch (DiskAccessException ex) {
if (ex.getMessage().contains("the flusher thread had been terminated")) {
// expected
} else {
throw ex;
}
}
}
});
//Wait for the thread to get hosed.
Thread.sleep(2000);
createData(vm1, 0, numBuckets, "b");
//Try to do puts from vm1, which doesn't have any buckets
createData(vm1, numBuckets, numBuckets * 2, "b");
createData(vm1, numBuckets, numBuckets * 2, "c");
//make sure everything has settle out (these VM's I suppose may be terminated)
checkData(vm2, 0, numBuckets, "b");
checkData(vm2, numBuckets, numBuckets * 2, "c");
}finally {
expected1.remove();
expected2.remove();
}
}
public static class BadSerializer implements DataSerializable {
public BadSerializer() {
}
public void toData(DataOutput out) throws IOException {
if(Thread.currentThread().getName().contains("Asynchronous disk writer") && FAIL_IN_THIS_VM) {
throw new ConcurrentModificationException();
}
}
public void fromData(DataInput in) throws IOException,
ClassNotFoundException {
}
}
public void totalRecoverTest(int redundancy) throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
int numBuckets = 50;
createPR(vm0, redundancy);
createPR(vm1, redundancy);
createPR(vm2, redundancy);
createData(vm0, 0, numBuckets, "a");
Set<Integer> vm0Buckets = getBucketList(vm0);
Set<Integer> vm1Buckets = getBucketList(vm1);
Set<Integer> vm2Buckets = getBucketList(vm2);
closeCache(vm0);
closeCache(vm1);
closeCache(vm2);
AsyncInvocation a1 = createPRAsync(vm0, redundancy);
AsyncInvocation a2 = createPRAsync(vm1, redundancy);
AsyncInvocation a3 = createPRAsync(vm2, redundancy);
a1.getResult(MAX_WAIT);
a2.getResult(MAX_WAIT);
a3.getResult(MAX_WAIT);
assertEquals(vm0Buckets,getBucketList(vm0));
assertEquals(vm1Buckets,getBucketList(vm1));
assertEquals(vm2Buckets,getBucketList(vm2));
checkData(vm0, 0, numBuckets, "a");
createData(vm0, numBuckets, 113, "b");
checkData(vm0, numBuckets, 113, "b");
//Test for bug 43476 - make sure a destroy
//cleans up proxy bucket regions.
destroyPR(vm0);
destroyPR(vm1);
destroyPR(vm2);
a1 = createPRAsync(vm0, redundancy);
a2 = createPRAsync(vm1, redundancy);
a3 = createPRAsync(vm2, redundancy);
a1.getResult(MAX_WAIT);
a2.getResult(MAX_WAIT);
a3.getResult(MAX_WAIT);
checkData(vm0, 0, numBuckets, null);
}
public void testRevokeAfterStartup() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
int numBuckets = 50;
createPR(vm0, 1);
createPR(vm1, 1);
createData(vm0, 0, numBuckets, "a");
Set<Integer> vm0Buckets = getBucketList(vm0);
Set<Integer> vm1Buckets = getBucketList(vm1);
assertEquals(vm0Buckets, vm1Buckets);
closeCache(vm0);
createData(vm1, 0, numBuckets, "b");
closeCache(vm1);
AsyncInvocation a1 = createPRAsync(vm0, 1);
//[dsmith] Make sure that vm0 is waiting for vm1 to recover
//If VM(0) recovers early, that is a problem, because vm1
//has newer data
Thread.sleep(500);
assertTrue(a1.isAlive());
revokeKnownMissingMembers(vm2, 1);
a1.getResult(MAX_WAIT);
assertEquals(vm0Buckets,getBucketList(vm0));
checkData(vm0, 0, numBuckets, "a");
createData(vm0, numBuckets, 113, "b");
checkData(vm0, numBuckets, 113, "b");
ExpectedException ex = addExpectedException(RevokedPersistentDataException.class.getName(), vm1);
try {
createPR(vm1, 1);
fail("Should have recieved a SplitDistributedSystemException");
} catch(RMIException e) {
//This should throw a split distributed system exception, because
//We revoked this member.
if(!(e.getCause() instanceof RevokedPersistentDataException)) {
throw e;
}
}
ex.remove();
}
public void testRevokeBeforeStartup() throws Throwable {
addExpectedException("RevokeFailedException");
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
int numBuckets = 50;
createPR(vm0, 1);
createPR(vm1, 1);
createData(vm0, 0, numBuckets, "a");
Set<Integer> vm0Buckets = getBucketList(vm0);
Set<Integer> vm1Buckets = getBucketList(vm1);
assertEquals(vm0Buckets, vm1Buckets);
//This should fail with a revocation failed message
try {
revokeAllMembers(vm2);
fail("The revoke should have failed, because members are running");
} catch(RMIException e) {
if(!(e.getCause() instanceof ReplyException && e.getCause().getCause() instanceof RevokeFailedException)) {
throw e;
}
}
closeCache(vm0);
createData(vm1, 0, numBuckets, "b");
File vm1Directory = getDiskDirectory(vm1);
closeCache(vm1);
vm0.invoke(new SerializableRunnable("get cache") {
public void run() {
getCache();
}
});
revokeMember(vm2, vm1Directory);
AsyncInvocation a1 = createPRAsync(vm0, 1);
a1.getResult(MAX_WAIT);
assertEquals(vm0Buckets,getBucketList(vm0));
checkData(vm0, 0, numBuckets, "a");
createData(vm0, numBuckets, 113, "b");
checkData(vm0, numBuckets, 113, "b");
ExpectedException ex = addExpectedException(RevokedPersistentDataException.class.getName(), vm1);
try {
createPR(vm1, 1);
fail("Should have recieved a SplitDistributedSystemException");
} catch(RMIException e) {
//This should throw a split distributed system exception, because
//We revoked this member.
if(!(e.getCause() instanceof RevokedPersistentDataException)) {
throw e;
}
}
ex.remove();
}
private File getDiskDirectory(VM vm0) {
return (File) vm0.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
return getDiskDirs()[0];
}
});
}
/**
* Test that we wait for missing data to come back
* if the redundancy was 0.
*/
public void testMissingMemberRedundancy0() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
createPR(vm0, 0);
createPR(vm1, 0);
createData(vm0, 0, NUM_BUCKETS, "a");
Set<Integer> vm0Buckets = getBucketList(vm0);
Set<Integer> vm1Buckets = getBucketList(vm1);
final int aVM0Bucket = vm0Buckets.iterator().next();
final int aVM1Bucket = vm1Buckets.iterator().next();
closeCache(vm1);
ExpectedException ex = addExpectedException("PartitionOfflineException");
try {
checkReadWriteOperationsWithOfflineMember(vm0, aVM0Bucket, aVM1Bucket);
//Make sure that a newly created member is informed about the offline member
createPR(vm2,0);
checkReadWriteOperationsWithOfflineMember(vm2, aVM0Bucket, aVM1Bucket);
} finally {
ex.remove();
}
//This should work, because these are new buckets
createData(vm0, NUM_BUCKETS, 113, "a");
createPR(vm1, 0);
//The data should be back online now.
checkData(vm0, 0, 113, "a");
closeCache(vm0);
closeCache(vm1);
}
private void checkReadWriteOperationsWithOfflineMember(VM vm0,
final int aVM0Bucket, final int aVM1Bucket) {
//This should work, because this bucket is still available.
checkData(vm0, aVM0Bucket, aVM0Bucket + 1, "a");
try {
checkData(vm0, aVM1Bucket, aVM1Bucket + 1, null);
fail("Should not have been able to read from missing buckets!");
} catch (RMIException e) {
//We expect a PartitionOfflineException
if(!(e.getCause() instanceof PartitionOfflineException)) {
throw e;
}
}
ExpectedException expect = addExpectedException("PartitionOfflineException", vm0);
//Try a function execution
vm0.invoke(new SerializableRunnable("Test ways to read") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion(PR_REGION_NAME);
try {
FunctionService.onRegion(region).execute(new TestFunction());
fail("Should not have been able to read from missing buckets!");
} catch (PartitionOfflineException e) {
//expected
}
//This should work, because this bucket is still available.
FunctionService.onRegion(region).withFilter(Collections.singleton(aVM0Bucket)).execute(new TestFunction());
//This should fail, because this bucket is offline
try {
FunctionService.onRegion(region).withFilter(Collections.singleton(aVM1Bucket)).execute(new TestFunction());
fail("Should not have been able to read from missing buckets!");
} catch (PartitionOfflineException e) {
//expected
}
//This should fail, because a bucket is offline
try {
HashSet filter = new HashSet();
filter.add(aVM0Bucket);
filter.add(aVM1Bucket);
FunctionService.onRegion(region).withFilter(filter).execute(new TestFunction());
fail("Should not have been able to read from missing buckets!");
} catch (PartitionOfflineException e) {
//expected
}
//This should fail, because a bucket is offline
try {
FunctionService.onRegion(region).execute(new TestFunction());
fail("Should not have been able to read from missing buckets!");
} catch (PartitionOfflineException e) {
//expected
}
try {
cache.getQueryService().newQuery("select * from /"+PR_REGION_NAME).execute();
fail("Should not have been able to read from missing buckets!");
} catch (PartitionOfflineException e) {
//expected
} catch (QueryException e) {
throw new RuntimeException(e);
}
try {
Set keys = region.keySet();
//iterate over all of the keys
for(Object key : keys) {
}
fail("Should not have been able to iterate over keyset");
} catch (PartitionOfflineException e) {
//expected
}
try {
//iterate over all of the keys
for(Object key : region.values()) {
}
fail("Should not have been able to iterate over set");
} catch (PartitionOfflineException e) {
//expected
}
try {
//iterate over all of the keys
for(Object key : region.entrySet()) {
}
fail("Should not have been able to iterate over set");
} catch (PartitionOfflineException e) {
//expected
}
try {
region.get(aVM1Bucket);
fail("Should not have been able to get an offline key");
} catch (PartitionOfflineException e) {
//expected
}
try {
region.containsKey(aVM1Bucket);
fail("Should not have been able to get an offline key");
} catch (PartitionOfflineException e) {
//expected
}
try {
region.getEntry(aVM1Bucket);
fail("Should not have been able to get an offline key");
} catch (PartitionOfflineException e) {
//expected
}
try {
region.invalidate(aVM1Bucket);
fail("Should not have been able to get an offline key");
} catch (PartitionOfflineException e) {
//expected
}
try {
region.destroy(aVM1Bucket);
fail("Should not have been able to get an offline key");
} catch (PartitionOfflineException e) {
//expected
}
}
});
try {
createData(vm0, aVM1Bucket, aVM1Bucket + 1, "b");
fail("Should not have been able to write to missing buckets!");
} catch (RMIException e) {
//We expect to see a partition offline exception here.
if(!(e.getCause() instanceof PartitionOfflineException)) {
throw e;
}
}
expect.remove();
}
/**Test to make sure that we recreate
* a bucket if a member is destroyed
*/
public void testDestroyedMemberRedundancy0() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
createPR(vm0, 0);
createPR(vm1, 0);
createData(vm0, 0, NUM_BUCKETS, "a");
Set<Integer> vm0Buckets = getBucketList(vm0);
Set<Integer> vm1Buckets = getBucketList(vm1);
int aVM0Bucket = vm0Buckets.iterator().next();
int aVM1Bucket = vm1Buckets.iterator().next();
localDestroyPR(vm1);
//This should work, because this bucket is still available.
checkData(vm0, aVM0Bucket, aVM0Bucket + 1, "a");
//This should find that the data is missing, because we destroyed that bucket
checkData(vm0, aVM1Bucket, aVM1Bucket + 1, null);
//We should be able to recreate that bucket
createData(vm0, aVM1Bucket, aVM1Bucket + 1, "b");
createPR(vm1, 0);
//The data should still be available
checkData(vm0, aVM0Bucket, aVM0Bucket + 1, "a");
checkData(vm0, aVM1Bucket, aVM1Bucket + 1, "b");
//This bucket should now be in vm0, because we recreated it there
assertTrue(getBucketList(vm0).contains(aVM1Bucket));
}
/**Test to make sure that we recreate
* a bucket if a member is destroyed
*/
public void testDestroyedMemberRedundancy1() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
createPR(vm0, 1);
createPR(vm1, 1);
createData(vm0, 0, NUM_BUCKETS, "a");
Set<Integer> vm0Buckets = getBucketList(vm0);
Set<Integer> vm1Buckets = getBucketList(vm1);
assertEquals(vm0Buckets, vm1Buckets);
int aVM0Bucket = vm0Buckets.iterator().next();
localDestroyPR(vm1);
//This should work, because this bucket is still available.
checkData(vm0, aVM0Bucket, aVM0Bucket + 1, "a");
createPR(vm2, 1);
Set<Integer> vm2Buckets = getBucketList(vm2);
//VM 2 should have created a copy of all of the buckets
assertEquals(vm0Buckets, vm2Buckets);
}
/**Test to make sure that we recreate
* a bucket if a member is revoked
*/
public void testRevokedMemberRedundancy0() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
createPR(vm0, 0);
createPR(vm1, 0);
createData(vm0, 0, NUM_BUCKETS, "a");
Set<Integer> vm0Buckets = getBucketList(vm0);
Set<Integer> vm1Buckets = getBucketList(vm1);
int aVM0Bucket = vm0Buckets.iterator().next();
int aVM1Bucket = vm1Buckets.iterator().next();
closeCache(vm1);
//This should work, because this bucket is still available.
checkData(vm0, aVM0Bucket, aVM0Bucket + 1, "a");
ExpectedException expect = addExpectedException("PartitionOfflineException", vm0);
try {
checkData(vm0, aVM1Bucket, aVM1Bucket + 1, "a");
fail("Should not have been able to read from missing buckets!");
} catch (RMIException e) {
if(!(e.getCause() instanceof PartitionOfflineException)) {
throw e;
}
}
try {
createData(vm0, aVM1Bucket, aVM1Bucket + 1, "b");
fail("Should not have been able to write to missing buckets!");
} catch (RMIException e) {
//We expect to see a partition offline exception here.
if(!(e.getCause() instanceof PartitionOfflineException)) {
throw e;
}
}
expect.remove();
//This should work, because these are new buckets
createData(vm0, NUM_BUCKETS, 113, "a");
revokeKnownMissingMembers(vm2, 1);
createPR(vm2, 0);
//We should be able to use that missing bucket now
checkData(vm2, aVM1Bucket, aVM1Bucket + 1, null);
createData(vm2, aVM1Bucket, aVM1Bucket + 1, "a");
checkData(vm2, aVM1Bucket, aVM1Bucket + 1, "a");
ExpectedException ex = addExpectedException(RevokedPersistentDataException.class.getName(), vm1);
try {
createPR(vm1, 0);
fail("Should have recieved a RevokedPersistentDataException");
} catch(RMIException e) {
//This should throw a split distributed system exception, because
//We revoked this member.
if(!(e.getCause() instanceof RevokedPersistentDataException)) {
throw e;
}
}
ex.remove();
}
/**Test to make sure that we recreate
* a bucket if a member is revoked
* @throws Throwable
*/
public void testRevokedMemberRedundancy1() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
createPR(vm0, 1);
createPR(vm1, 1);
createData(vm0, 0, NUM_BUCKETS, "a");
Set<Integer> vm0Buckets = getBucketList(vm0);
Set<Integer> vm1Buckets = getBucketList(vm1);
assertEquals(vm0Buckets, vm1Buckets);
closeCache(vm1);
//This should work, because this bucket is still available.
checkData(vm0, 0, NUM_BUCKETS, "a");
createData(vm0, 0, NUM_BUCKETS, "b");
revokeKnownMissingMembers(vm2, 1);
//This should make a copy of all of the buckets,
//because we have revoked VM1.
createPR(vm2, 1);
Set<Integer> vm2Buckets = getBucketList(vm2);
assertEquals(vm1Buckets, vm2Buckets);
ExpectedException ex = addExpectedException(RevokedPersistentDataException.class.getName(), vm1);
try {
createPR(vm1, 1);
fail("Should have recieved a SplitDistributedSystemException");
} catch(RMIException e) {
//This should throw a RevokedPersistentDataException exception, because
//We revoked this member.
if(!(e.getCause() instanceof RevokedPersistentDataException)) {
throw e;
}
}
//Test that we can bounce vm0 and vm1, and still get a RevokedPersistentDataException
//when vm1 tries to recover
closeCache(vm0);
closeCache(vm2);
AsyncInvocation async0 = createPRAsync(vm0, 1);
AsyncInvocation async2 = createPRAsync(vm2, 1);
async0.getResult();
async2.getResult();
try {
createPR(vm1, 1);
fail("Should have recieved a RevokedPersistentDataException");
} catch(RMIException e) {
//This should throw a RevokedPersistentDataException exception, because
//We revoked this member.
if(!(e.getCause() instanceof RevokedPersistentDataException)) {
throw e;
}
}
ex.remove();
//The data shouldn't be affected.
checkData(vm2, 0, NUM_BUCKETS, "b");
}
/**Test to make sure that we recreate
* a bucket if a member is revoked, and
* that we do it immediately if recovery delay
* is set to 0.
* @throws Throwable
*/
public void testRevokedMemberRedundancy1ImmediateRecovery() throws Throwable {
disconnectAllFromDS(); // I see this test failing because it finds the ds disconnected. Trying this as a fix.
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
final VM vm2 = host.getVM(2);
createPR(vm0, 1, 0);
createPR(vm1, 1, 0);
createData(vm0, 0, NUM_BUCKETS, "a");
//This should do nothing because we have satisfied redundancy.
createPR(vm2, 1, 0);
assertEquals(Collections.emptySet(), getBucketList(vm2));
Set<Integer> vm0Buckets = getBucketList(vm0);
final Set<Integer> lostBuckets = getBucketList(vm1);
closeCache(vm1);
//VM2 should pick up the slack
waitForCriterion(new WaitCriterion() {
public boolean done() {
Set<Integer> vm2Buckets = getBucketList(vm2);
return lostBuckets.equals(vm2Buckets);
}
public String description() {
return "expected to recover " + lostBuckets + " buckets, now have " + getBucketList(vm2);
}
}, 30000, 500, true);
createData(vm0, 0, NUM_BUCKETS, "b");
//VM1 should recover, but it shouldn't host the bucket anymore
createPR(vm1, 1, 0);
//The data shouldn't be affected.
checkData(vm1, 0, NUM_BUCKETS, "b");
//restart everything, and make sure it comes back correctly.
closeCache(vm1);
closeCache(vm0);
closeCache(vm2);
AsyncInvocation async1 = createPRAsync(vm1, 1);
AsyncInvocation async0 = createPRAsync(vm0, 1);
//Make sure we wait for vm2, because it's got the latest copy of the bucket
async1.join(50);
//FAILED On this line
assertTrue(async1.isAlive());
AsyncInvocation async2 = createPRAsync(vm2, 1);
async2.getResult(MAX_WAIT);
async0.getResult(MAX_WAIT);
async1.getResult(MAX_WAIT);
//The data shouldn't be affected.
checkData(vm1, 0, NUM_BUCKETS, "b");
assertEquals(Collections.emptySet(), getBucketList(vm1));
assertEquals(vm0Buckets, getBucketList(vm0));
assertEquals(vm0Buckets, getBucketList(vm2));
}
/**
* This test this case
* we replace buckets where are offline on A by creating them on C
* We then shutdown C and restart A, which recovers those buckets
*/
public void testBug41340() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
final VM vm2 = host.getVM(2);
createPR(vm0, 1, 0);
createPR(vm1, 1, 0);
createData(vm0, 0, NUM_BUCKETS, "a");
//This should do nothing because we have satisfied redundancy.
createPR(vm2, 1, 0);
assertEquals(Collections.emptySet(), getBucketList(vm2));
Set<Integer> vm0Buckets = getBucketList(vm0);
final Set<Integer> lostBuckets = getBucketList(vm1);
closeCache(vm1);
//VM2 should pick up the slack
waitForBucketRecovery(vm2, lostBuckets);
createData(vm0, 0, NUM_BUCKETS, "b");
//VM1 should recover, but it shouldn't host the bucket anymore
createPR(vm1, 1, 0);
//The data shouldn't be affected.
checkData(vm1, 0, NUM_BUCKETS, "b");
closeCache(vm2);
//The buckets should move back to vm1.
waitForBucketRecovery(vm1, lostBuckets);
assertEquals(vm0Buckets, getBucketList(vm0));
assertEquals(vm0Buckets, getBucketList(vm1));
//The data shouldn't be affected.
checkData(vm1, 0, NUM_BUCKETS, "b");
//restart everything, and make sure it comes back correctly.
closeCache(vm0);
closeCache(vm1);
AsyncInvocation async1 = createPRAsync(vm1, 1);
AsyncInvocation async0 = createPRAsync(vm0, 1);
async0.getResult(MAX_WAIT);
async1.getResult(MAX_WAIT);
//The data shouldn't be affected.
checkData(vm1, 0, NUM_BUCKETS, "b");
assertEquals(vm0Buckets, getBucketList(vm0));
assertEquals(vm0Buckets, getBucketList(vm1));
}
/** Test the with redundancy
* 1, we restore the same buckets when the
* missing member comes back online.
*/
public void testMissingMemberRedundancy1() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
createPR(vm0, 1);
createPR(vm1, 1);
createData(vm0, 0, NUM_BUCKETS, "a");
Set<Integer> vm0Buckets = getBucketList(vm0);
Set<Integer> vm1Buckets = getBucketList(vm1);
closeCache(vm1);
//This should work, because this bucket is still available.
checkData(vm0, 0, NUM_BUCKETS, "a");
removeData(vm0, 0, NUM_BUCKETS/2);
createData(vm0, NUM_BUCKETS/2, NUM_BUCKETS, "b");
//This shouldn't create any buckets, because we know there are offline copies
createPR(vm2, 1);
Set<Integer> vm2Buckets = getBucketList(vm2);
assertEquals(Collections.emptySet(), vm2Buckets);
createPR(vm1, 1);
//The data should be back online now.
//and vm1 should have received the latest copy
//of the data.
checkData(vm1, 0, NUM_BUCKETS/2, null);
checkData(vm1, NUM_BUCKETS/2, NUM_BUCKETS, "b");
//Make sure we restored the buckets in the right
//place
assertEquals(vm0Buckets, getBucketList(vm0));
assertEquals(vm1Buckets, getBucketList(vm1));
assertEquals(Collections.emptySet(), getBucketList(vm2));
}
/**
* Test that we don't record our old
* member ID as offline, preventing redundancy
* recovery in the future.
*/
public void testBug41341() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
createPR(vm0, 1);
createPR(vm1, 1);
createData(vm0, 0, 1, "a");
Set<Integer> vm0Buckets = getBucketList(vm0);
Set<Integer> vm1Buckets = getBucketList(vm1);
assertEquals(Collections.singleton(0), vm0Buckets);
assertEquals(Collections.singleton(0), vm1Buckets);
closeCache(vm1);
//This shouldn't create any buckets, because we know there are offline copies
createPR(vm2, 1);
assertEquals(1, getOfflineMembers(0, vm0).size());
//Note, vm2 will consider vm1 as "online" because vm2 doesn't host the bucket
assertEquals(2, getOnlineMembers(0, vm2).size());
Set<Integer> vm2Buckets = getBucketList(vm2);
assertEquals(Collections.emptySet(), vm2Buckets);
createPR(vm1, 1);
//Make sure we restored the buckets in the right
//place
assertEquals(vm0Buckets, getBucketList(vm0));
assertEquals(vm1Buckets, getBucketList(vm1));
assertEquals(Collections.emptySet(), getBucketList(vm2));
assertEquals(Collections.emptySet(), getOfflineMembers(0, vm0));
assertEquals(Collections.emptySet(), getOfflineMembers(0, vm1));
moveBucket(0, vm1, vm2);
assertEquals(Collections.emptySet(), getOfflineMembers(0, vm0));
assertEquals(Collections.emptySet(), getOfflineMembers(0, vm1));
assertEquals(Collections.emptySet(), getOfflineMembers(0, vm2));
assertEquals(Collections.singleton(0), getBucketList(vm0));
assertEquals(Collections.emptySet(), getBucketList(vm1));
assertEquals(Collections.singleton(0), getBucketList(vm2));
//Destroy VM2
destroyPR(vm2);
assertEquals(Collections.emptySet(), getOfflineMembers(0, vm0));
assertEquals(Collections.emptySet(), getOfflineMembers(0, vm1));
//Close down VM 1
closeCache(vm1);
assertEquals(0, getOfflineMembers(0, vm0).size());
//This should recover redundancy, because vm2 was destroyed
createPR(vm1, 1);
assertEquals(Collections.singleton(0), getBucketList(vm0));
assertEquals(Collections.singleton(0), getBucketList(vm1));
assertEquals(Collections.emptySet(), getOfflineMembers(0, vm0));
assertEquals(Collections.emptySet(), getOfflineMembers(0, vm1));
}
/**
* Test that we throw away a bucket
* if we restored redundancy while
* that bucket was offline.
*/
public void z_testThrowAwayUneededBucket() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
createPR(vm0, 1);
createPR(vm1, 1);
createData(vm0, 0, NUM_BUCKETS, "a");
Set<Integer> vm0Buckets = getBucketList(vm0);
Set<Integer> vm1Buckets = getBucketList(vm1);
assertEquals(vm0Buckets, vm1Buckets);
assertEquals(NUM_BUCKETS, vm0Buckets.size());
closeCache(vm1);
createPR(vm2, 1);
checkData(vm0, 0, NUM_BUCKETS, "a");
vm0Buckets = getBucketList(vm0);
Set<Integer> vm2Buckets = getBucketList(vm2);
//Each node should have a full copy of everything
assertEquals(vm0Buckets, vm2Buckets);
assertEquals(NUM_BUCKETS, vm0Buckets.size());
createPR(vm1, 1);
assertEquals(Collections.emptySet(), getBucketList(vm1));
closeCache(vm0);
closeCache(vm1);
closeCache(vm2);
createPR(vm0, 1);
createPR(vm1, 1);
createPR(vm2, 1);
assertEquals(vm0Buckets,getBucketList(vm0));
assertEquals(Collections.emptySet(), getBucketList(vm1));
assertEquals(vm2Buckets,getBucketList(vm2));
checkData(vm0, 0, NUM_BUCKETS, "a");
}
public void testMoveBucket() throws Throwable {
int redundancy = 0;
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
createPR(vm0, redundancy);
createData(vm0, 0, 2, "a");
createPR(vm1, redundancy);
createPR(vm2, redundancy);
Set<Integer> vm0Buckets = getBucketList(vm0);
Set<Integer> vm1Buckets = getBucketList(vm1);
Set<Integer> vm2Buckets = getBucketList(vm2);
moveBucket(0, vm0, vm1);
moveBucket(0, vm1, vm2);
createData(vm0, 113, 114, "a");
moveBucket(0, vm2, vm0);
createData(vm0, 226, 227, "a");
assertEquals(vm0Buckets,getBucketList(vm0));
assertEquals(vm1Buckets,getBucketList(vm1));
assertEquals(vm2Buckets,getBucketList(vm2));
closeCache(vm0);
closeCache(vm1);
closeCache(vm2);
AsyncInvocation a1 = createPRAsync(vm0, redundancy);
AsyncInvocation a2 = createPRAsync(vm1, redundancy);
AsyncInvocation a3 = createPRAsync(vm2, redundancy);
a1.getResult(MAX_WAIT);
a2.getResult(MAX_WAIT);
a3.getResult(MAX_WAIT);
assertEquals(vm2Buckets,getBucketList(vm2));
assertEquals(vm1Buckets,getBucketList(vm1));
assertEquals(vm0Buckets,getBucketList(vm0));
checkData(vm0, 0, 2, "a");
checkData(vm0, 113, 114, "a");
checkData(vm0, 226, 227, "a");
}
public void testCleanStop() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
createPR(vm0, 1);
createPR(vm1, 1);
createData(vm0, 0, 1, "a");
fakeCleanShutdown(vm1, 0);
fakeCleanShutdown(vm0, 0);
AsyncInvocation async1 = createPRAsync(vm0, 1);
//[dsmith] Make sure that vm0 is waiting for vm1 to recover
//If VM(0) recovers early, that is a problem, because
//we can now longer do a clean restart
AsyncInvocation async2 = createPRAsync(vm1, 1);
async1.getResult(MAX_WAIT);
async2.getResult(MAX_WAIT);
checkData(vm0, 0, 1, "a");
checkData(vm1, 0, 1, "a");
checkRecoveredFromDisk(vm0, 0, true);
checkRecoveredFromDisk(vm1, 0, true);
closePR(vm0);
closePR(vm1);
async1 = createPRAsync(vm0, 1);
async2 = createPRAsync(vm1, 1);
async1.getResult(MAX_WAIT);
async2.getResult(MAX_WAIT);
checkData(vm0, 0, 1, "a");
checkData(vm1, 0, 1, "a");
checkRecoveredFromDisk(vm0, 0, false);
checkRecoveredFromDisk(vm1, 0, true);
}
public void testRegisterInterestNoDataStores() {
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
final Integer serverPort = (Integer) vm0.invoke(new SerializableCallable("create per") {
public Object call () {
Cache cache = getCache();
AttributesFactory af = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRedundantCopies(0);
paf.setLocalMaxMemory(0);
af.setPartitionAttributes(paf.create());
af.setDataPolicy(DataPolicy.PARTITION);
cache.createRegion(PR_REGION_NAME, af.create());
CacheServer server = cache.addCacheServer();
server.setPort(AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET));
server.setNotifyBySubscription(true);
try {
server.start();
} catch (IOException e) {
throw new RuntimeException(e);
}
return server.getPort();
}
});
vm1.invoke(new SerializableRunnable("create client") {
public void run () {
Properties props = new Properties();
props.setProperty("mcast-port", "0");
props.setProperty("locators", "");
getSystem(props );
try {
Cache cache = getCache();
PoolFactory pf = PoolManager.createFactory();
pf.addServer(getServerHostName(host), serverPort);
pf.setSubscriptionEnabled(true);
pf.create("pool");
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.NORMAL);
af.setScope(Scope.LOCAL);
af.setPoolName("pool");
Region region = cache.createRegion(PR_REGION_NAME, af.create());
try {
region.registerInterestRegex(".*");
} catch(ServerOperationException e) {
if(!(e.getCause() instanceof PartitionedRegionStorageException)) {
throw e;
}
}
} finally {
disconnectFromDS();
}
}
});
}
/**
* This test is in here just to test to make
* sure that we don't get a suspect string
* with an exception during cache closure.
*/
public void testOverflowCacheClose() {
Cache cache = getCache();
RegionFactory rf = new RegionFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
rf.setPartitionAttributes(paf.create());
rf.setDataPolicy(DataPolicy.PARTITION);
rf.setEvictionAttributes(EvictionAttributes.createLRUEntryAttributes(50, EvictionAction.OVERFLOW_TO_DISK));
rf.setDiskDirs(getDiskDirs());
Region region = rf.create(PR_REGION_NAME);
region.get(0);
cache.getDistributedSystem().disconnect();
// cache.close();
}
/**
* Test for bug 41336
*/
public void testCrashDuringBucketCreation() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
vm0.invoke(new SerializableRunnable("Install observer") {
public void run() {
DistributionMessageObserver.setInstance(new DistributionMessageObserver() {
@Override
public void beforeSendMessage(DistributionManager dm,
DistributionMessage msg) {
if(msg instanceof ManageBucketReplyMessage) {
DistributedTestCase.disconnectFromDS();
}
}
});
}
});
createPR(vm0, 0);
createPR(vm1, 0);
createData(vm1, 0, 4, "a");
Set<Integer> vm1Buckets = getBucketList(vm1);
//Make sure the test hook ran
vm0.invoke(new SerializableRunnable("Check for no distributed system") {
public void run() {
assertEquals(null,GemFireCacheImpl.getInstance());
}
});
checkData(vm1, 0, 4, "a");
assertEquals(4, vm1Buckets.size());
createPR(vm0, 0);
checkData(vm0, 0, 4, "a");
assertEquals(vm1Buckets, getBucketList(vm1));
assertEquals(Collections.emptySet(), getBucketList(vm0));
closeCache(vm0);
closeCache(vm1);
AsyncInvocation async0 = createPRAsync(vm0, 0);
AsyncInvocation async1 = createPRAsync(vm1, 0);
async0.getResult();
async1.getResult();
checkData(vm0, 0, 4, "a");
assertEquals(vm1Buckets, getBucketList(vm1));
assertEquals(Collections.emptySet(), getBucketList(vm0));
}
public void testNestedPRRegions() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
int numBuckets = 50;
createNestedPR(vm0);
createNestedPR(vm1);
createNestedPR(vm2);
createData(vm0, 0, numBuckets, "a", "parent1/"+PR_REGION_NAME);
createData(vm0, 0, numBuckets, "b", "parent2/"+PR_REGION_NAME);
checkData(vm2, 0, numBuckets, "a", "parent1/"+PR_REGION_NAME);
checkData(vm2, 0, numBuckets, "b", "parent2/"+PR_REGION_NAME);
Set<Integer> vm1_0Buckets = getBucketList(vm0, "parent1/"+PR_REGION_NAME);
Set<Integer> vm1_1Buckets = getBucketList(vm1, "parent1/"+PR_REGION_NAME);
Set<Integer> vm1_2Buckets = getBucketList(vm2, "parent1/"+PR_REGION_NAME);
Set<Integer> vm2_0Buckets = getBucketList(vm0, "parent2/"+PR_REGION_NAME);
Set<Integer> vm2_1Buckets = getBucketList(vm1, "parent2/"+PR_REGION_NAME);
Set<Integer> vm2_2Buckets = getBucketList(vm2, "parent2/"+PR_REGION_NAME);
closeCache(vm0);
closeCache(vm1);
closeCache(vm2);
AsyncInvocation async0 = createNestedPRAsync(vm0);
//[dsmith] Make sure that vm0 is waiting for vm1 and vm2 to recover
//If VM(0) recovers early, that is a problem, because vm1
//has newer data
Thread.sleep(50);
assertTrue(async0.isAlive());
AsyncInvocation async1 = createNestedPRAsync(vm1);
AsyncInvocation async2 = createNestedPRAsync(vm2);
async0.getResult();
async1.getResult();
async2.getResult();
assertEquals(vm1_0Buckets,getBucketList(vm0, "parent1/"+PR_REGION_NAME));
assertEquals(vm1_1Buckets,getBucketList(vm1, "parent1/"+PR_REGION_NAME));
assertEquals(vm1_2Buckets,getBucketList(vm2, "parent1/"+PR_REGION_NAME));
assertEquals(vm2_0Buckets,getBucketList(vm0, "parent2/"+PR_REGION_NAME));
assertEquals(vm2_1Buckets,getBucketList(vm1, "parent2/"+PR_REGION_NAME));
assertEquals(vm2_2Buckets,getBucketList(vm2, "parent2/"+PR_REGION_NAME));
checkData(vm0, 0, numBuckets, "a", "parent1/"+PR_REGION_NAME);
checkData(vm0, 0, numBuckets, "b", "parent2/"+PR_REGION_NAME);
createData(vm1, numBuckets, 113, "c", "parent1/"+PR_REGION_NAME);
createData(vm1, numBuckets, 113, "d", "parent2/"+PR_REGION_NAME);
checkData(vm2, numBuckets, 113, "c", "parent1/"+PR_REGION_NAME);
checkData(vm2, numBuckets, 113, "d", "parent2/"+PR_REGION_NAME);
}
public void testCloseDuringRegionOperation() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
createPR(vm0, 1, -1, 1);
createPR(vm1, 1, -1, 1);
//Make sure we create a bucket
createData(vm1, 0, 1, "a");
//Try to make sure there are some operations in flight while closing the cache
SerializableCallable createData = new SerializableCallable() {
public Object call() {
Cache cache = getCache();
Region region = cache.getRegion(PR_REGION_NAME);
int i =0;
while(true) {
try {
region.put(0, i);
i++;
} catch(CacheClosedException e) {
break;
}
}
return i-1;
}
};
AsyncInvocation asyncCreate = vm0.invokeAsync(createData);
Thread.sleep(100);
AsyncInvocation close0 = closeCacheAsync(vm0);
AsyncInvocation close1 = closeCacheAsync(vm1);
//wait for the close to finish
close0.getResult();
close1.getResult();
Integer lastSuccessfulInt = (Integer) asyncCreate.getResult();
System.err.println("Cache was closed on integer " + lastSuccessfulInt);
AsyncInvocation create1 = createPRAsync(vm0, 1, -1, 1);
AsyncInvocation create2 = createPRAsync(vm1, 1, -1, 1);
create1.getResult(MAX_WAIT);
create2.getResult(MAX_WAIT);
SerializableCallable getValue = new SerializableCallable() {
public Object call() {
Cache cache = getCache();
Region region = cache.getRegion(PR_REGION_NAME);
int value = (Integer) region.get(0);
return value;
}
};
int vm1Value = (Integer) vm0.invoke(getValue);
int vm2Value = (Integer) vm1.invoke(getValue);
assertEquals(vm1Value, vm2Value);
assertTrue("value = " + vm1Value + ", lastSuccessfulInt=" + lastSuccessfulInt,
vm1Value == lastSuccessfulInt || vm1Value == lastSuccessfulInt+1);
}
/**
* Test for bug 4226.
* 1. Member A has the bucket
* 2. Member B starts creating the bucket. It tells member A that it hosts the bucket
* 3. Member A crashes
* 4. Member B destroys the bucket and throws a partition offline exception, because it wasn't able to complete initialization.
* 5. Member A recovers, and gets stuck waiting for member B.
* @throws Throwable
*/
public void testBug42226() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
//Add a hook which will disconnect from the distributed
//system when the initial image message shows up.
vm0.invoke(new SerializableRunnable() {
public void run() {
DistributionMessageObserver.setInstance(new DistributionMessageObserver() {
@Override
public void beforeProcessMessage(DistributionManager dm,
DistributionMessage message) {
if(message instanceof RequestImageMessage) {
RequestImageMessage rim = (RequestImageMessage) message;
//Don't disconnect until we see a bucket
if(rim.regionPath.contains("_B_")) {
DistributionMessageObserver.setInstance(null);
disconnectFromDS();
}
}
}
@Override
public void afterProcessMessage(DistributionManager dm,
DistributionMessage message) {
}
});
}
});
getLogWriter().info("Creating region in VM0");
createPR(vm0, 1, 0, 1);
//Make sure we create a bucket
createData(vm0, 0, 1, "a");
//This should recover redundancy, which should cause vm0 to disconnect
ExpectedException ex = addExpectedException("PartitionOfflineException");
try {
getLogWriter().info("Creating region in VM1");
createPR(vm1, 1, 0, 1);
//Make sure get a partition offline exception
try {
createData(vm1, 0, 1, "a");
} catch (RMIException e) {
//We expect a PartitionOfflineException
if(!(e.getCause() instanceof PartitionOfflineException)) {
throw e;
}
}
} finally {
ex.remove();
}
//Make sure vm0 is really disconnected (avoids a race with the observer).
vm0.invoke(new SerializableRunnable() {
public void run() {
disconnectFromDS();
}
});
//This should recreate the bucket
AsyncInvocation async1 = createPRAsync(vm0, 1, 0, 1);
async1.getResult(MAX_WAIT);
checkData(vm1, 0, 1, "a");
}
/**
* A test to make sure that we allow the PR to be used
* after at least one copy of every bucket is recovered,
* but before the secondaries are initialized.
*
* @throws Throwable
*/
public void testAllowRegionUseBeforeRedundancyRecovery() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
final int redundancy = 1;
int numBuckets = 20;
createPR(vm0, redundancy);
createPR(vm1, redundancy);
createPR(vm2, redundancy);
createData(vm0, 0, numBuckets, "a");
Set<Integer> vm0Buckets = getBucketList(vm0);
Set<Integer> vm1Buckets = getBucketList(vm1);
Set<Integer> vm2Buckets = getBucketList(vm2);
closeCache(vm0);
closeCache(vm1);
closeCache(vm2);
SerializableRunnable slowGII = new SerializableRunnable("Slow down GII") {
@SuppressWarnings("synthetic-access")
public void run() {
InternalResourceManager.setResourceObserver(new RecoveryObserver());
DistributionMessageObserver.setInstance(new BlockGIIMessageObserver());
}
};
SerializableRunnable resetSlowGII = new SerializableRunnable("Unset the slow GII") {
public void run() {
BlockGIIMessageObserver messageObserver = (BlockGIIMessageObserver) DistributionMessageObserver.setInstance(null);
RecoveryObserver recoveryObserver = (RecoveryObserver) InternalResourceManager.getResourceObserver();
messageObserver.cdl.countDown();
try {
recoveryObserver.recoveryDone.await();
} catch (InterruptedException e) {
fail("Interrupted", e);
}
InternalResourceManager.setResourceObserver(null);
}
};
try {
vm0.invoke(slowGII);
vm1.invoke(slowGII);
vm2.invoke(slowGII);
SerializableRunnable createPR = new SerializableRunnable("create PR") {
public void run() {
Cache cache = getCache();
RegionAttributes attr = getPersistentPRAttributes(redundancy,
-1, cache, 113, true);
cache.createRegion(PR_REGION_NAME, attr);
}
};
AsyncInvocation a1 = vm0.invokeAsync(createPR);
AsyncInvocation a2 = vm1.invokeAsync(createPR);
AsyncInvocation a3 = vm2.invokeAsync(createPR);
a1.getResult(MAX_WAIT);
a2.getResult(MAX_WAIT);
a3.getResult(MAX_WAIT);
//Make sure all of the primary are available.
checkData(vm0, 0, numBuckets, "a");
createData(vm0, 113, 113 + numBuckets, "b");
//But none of the secondaries
Set<Integer> vm0InitialBuckets = getBucketList(vm0);
Set<Integer> vm1InitialBuckets = getBucketList(vm1);
Set<Integer> vm2InitialBuckets = getBucketList(vm2);
assertEquals("vm0=" + vm0InitialBuckets + ",vm1=" + vm1InitialBuckets
+ "vm2=" + vm2InitialBuckets, numBuckets, vm0InitialBuckets.size()
+ vm1InitialBuckets.size() + vm2InitialBuckets.size());
} finally {
//Reset the slow GII flag, and wait for the redundant buckets
//to be recovered.
AsyncInvocation reset0 = vm0.invokeAsync(resetSlowGII);
AsyncInvocation reset1 = vm1.invokeAsync(resetSlowGII);
AsyncInvocation reset2 = vm2.invokeAsync(resetSlowGII);
reset0.getResult(MAX_WAIT);
reset1.getResult(MAX_WAIT);
reset2.getResult(MAX_WAIT);
}
//Now we better have all of the buckets
assertEquals(vm0Buckets,getBucketList(vm0));
assertEquals(vm1Buckets,getBucketList(vm1));
assertEquals(vm2Buckets,getBucketList(vm2));
//Make sure the members see the data recovered from disk
//in those secondary buckets
checkData(vm0, 0, numBuckets, "a");
checkData(vm1, 0, numBuckets, "a");
//Make sure the members see the new updates
//in those secondary buckets
checkData(vm0, 113, 113 + numBuckets, "b");
checkData(vm1, 113, 113 + numBuckets, "b");
}
/**
* A test for bug 41436. If the GII source
* crashes before the GII is complete, we need
* to make sure that later we can recover redundancy.
*/
public void testCrashDuringBucketGII() {
addExpectedException("PartitionOfflineException");
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
createPR(vm0, 1);
createData(vm0, 0, 1, "value");
//Add an observer which will close the cache when the GII starts
vm0.invoke(new SerializableRunnable("Set crashing observer") {
public void run() {
DistributionMessageObserver.setInstance(new DistributionMessageObserver() {
@Override
public void beforeProcessMessage(DistributionManager dm,
DistributionMessage message) {
if(message instanceof RequestImageMessage) {
RequestImageMessage rim = (RequestImageMessage) message;
if(rim.regionPath.contains("_0")) {
DistributionMessageObserver.setInstance(null);
getCache().close();
}
}
}
});
}
});
createPR(vm1, 1);
//Make sure vm1 didn't create the bucket
assertEquals(Collections.emptySet(), getBucketList(vm1));
createPR(vm0, 1, 0);
//Make sure vm0 recovers the bucket
assertEquals(Collections.singleton(0), getBucketList(vm0));
//vm1 should satisfy redundancy for the bucket as well
assertEquals(Collections.singleton(0), getBucketList(vm1));
}
/**
* Another test for bug 41436. If the GII source
* crashes before the GII is complete, we need
* to make sure that later we can recover redundancy.
*
* In this test case, we bring the GII down before we
* bring the source back up, to make sure the source still
* discovers that the GII target is no longer hosting the bucket.
* @throws InterruptedException
*/
public void testCrashDuringBucketGII2() throws InterruptedException {
addExpectedException("PartitionOfflineException");
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
final VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
createPR(vm0, 1);
createData(vm0, 0, 1, "value");
//Add an observer which will close the cache when the GII starts
vm0.invoke(new SerializableRunnable("Set crashing observer") {
public void run() {
DistributionMessageObserver.setInstance(new DistributionMessageObserver() {
@Override
public void beforeProcessMessage(DistributionManager dm,
DistributionMessage message) {
if(message instanceof RequestImageMessage) {
RequestImageMessage rim = (RequestImageMessage) message;
if(rim.regionPath.contains("_0")) {
DistributionMessageObserver.setInstance(null);
getCache().close();
}
}
}
});
}
});
createPR(vm1, 1);
//Make sure vm1 didn't create the bucket
assertEquals(Collections.emptySet(), getBucketList(vm1));
closeCache(vm1);
AsyncInvocation async0 = createPRAsync(vm0, 1, 0, 113);
async0.join(500);
//vm0 should get stuck waiting for vm1 to recover from disk,
//because vm0 thinks vm1 has the bucket
assertTrue(async0.isAlive());
createPR(vm1, 1, 0);
//Make sure vm0 recovers the bucket
assertEquals(Collections.singleton(0), getBucketList(vm0));
//vm1 should satisfy redundancy for the bucket as well
WaitCriterion ev = new WaitCriterion() {
public boolean done() {
return(Collections.singleton(0).equals(getBucketList(vm1)));
}
public String description() {
return null;
}
};
DistributedTestCase.waitForCriterion(ev, 30 * 1000, 200, true);
assertEquals(Collections.singleton(0), getBucketList(vm1));
}
public void testCleanupAfterConflict() throws Exception {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
createPR(vm0, 0);
//create some buckets
createData(vm0, 0, 2, "a");
closePR(vm0);
createPR(vm1, 0);
//create an overlapping bucket
//TODO - this test hangs if vm1 has some buckets that vm0
//does not have. The problem is that when vm0 starts up and gets a conflict
//on some buckets, it updates it's view for other buckets.
// createData(vm1, 1, 3, "a");
createData(vm1, 1, 2, "a");
//this should throw a conflicting data exception.
ExpectedException expect = addExpectedException("ConflictingPersistentDataException", vm0);
try {
createPR(vm0, 0);
fail("should have seen a conflicting data exception");
} catch(Exception e) {
if(!(e.getCause() instanceof ConflictingPersistentDataException)) {
throw e;
}
} finally {
expect.remove();
}
//This will hang, if this test fails.
//TODO - DAN - I'm not even sure what this means here?
//It seems like if anything, vm1 should not have updated it's persistent
//view from vm0 because vm0 was in conflict!
//In fact, this is a bit of a problem, because now vm1 is dependent
//on vm vm0.
expect = addExpectedException("PartitionOfflineException", vm1);
try {
createData(vm1, 0, 1, "a");
fail("Should have seen a PartitionOfflineException for bucket 0");
} catch(Exception e) {
if(!(e.getCause() instanceof PartitionOfflineException)) {
throw e;
}
} finally {
expect.remove();
}
closePR(vm1);
//This should succeed, vm0 should not have persisted any view
//information from vm1
createPR(vm0, 0);
checkData(vm0, 0, 2, "a");
checkData(vm0, 2, 3, null);
}
/**
* Test to make sure that primaries are rebalanced after recovering from
* disk.
*/
public void testPrimaryBalanceAfterRecovery() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
int numBuckets = 30;
createPR(vm0, 1);
createPR(vm1, 1);
createPR(vm2, 1);
createData(vm0, 0, numBuckets, "a");
Set<Integer> vm0Buckets = getBucketList(vm0);
Set<Integer> vm1Buckets = getBucketList(vm1);
Set<Integer> vm2Buckets = getBucketList(vm2);
//We expect to see 10 primaries on each node since we have 30 buckets
Set<Integer> vm0Primaries = getPrimaryBucketList(vm0);
assertEquals("Expected 10 primaries " + vm0Primaries, 10, vm0Primaries.size());
Set<Integer> vm1Primaries = getPrimaryBucketList(vm1);
assertEquals("Expected 10 primaries " + vm1Primaries, 10, vm1Primaries.size());
Set<Integer> vm2Primaries = getPrimaryBucketList(vm2);
assertEquals("Expected 10 primaries " + vm2Primaries, 10, vm2Primaries.size());
//bounce vm0
closeCache(vm0);
createPR(vm0, 1);
waitForBucketRecovery(vm0, vm0Buckets);
assertEquals(vm0Buckets,getBucketList(vm0));
assertEquals(vm1Buckets,getBucketList(vm1));
assertEquals(vm2Buckets,getBucketList(vm2));
//The primaries should be evenly distributed after recovery.
vm0Primaries = getPrimaryBucketList(vm0);
assertEquals("Expected 10 primaries " + vm0Primaries, 10, vm0Primaries.size());
vm1Primaries = getPrimaryBucketList(vm1);
assertEquals("Expected 10 primaries " + vm1Primaries, 10, vm1Primaries.size());
vm2Primaries = getPrimaryBucketList(vm2);
assertEquals("Expected 10 primaries " + vm2Primaries, 10, vm2Primaries.size());
}
public void testConcurrencyChecksEnabled() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
final String regionName = getName();
SerializableCallable createPR = new SerializableCallable() {
@Override
public Object call() throws Exception {
RegionFactory<Integer, String> rf = getCache().createRegionFactory(RegionShortcut.PARTITION_PERSISTENT);
Region<Integer, String> r = rf.create(regionName);
assertTrue(r.getAttributes().getConcurrencyChecksEnabled());
return null;
}
};
SerializableCallable createPRProxy = new SerializableCallable() {
@Override
public Object call() throws Exception {
RegionFactory<Integer, String> rf = getCache().createRegionFactory(RegionShortcut.PARTITION_PROXY);
Region<Integer, String> r = rf.create(regionName);
return null;
}
};
vm0.invoke(createPRProxy);
vm1.invoke(createPR);
vm2.invoke(createPR);
vm3.invoke(createPRProxy);
SerializableCallable verifyConcurrenyChecks = new SerializableCallable() {
@Override
public Object call() throws Exception {
Region r = getCache().getRegion(regionName);
assertTrue(r.getAttributes().getConcurrencyChecksEnabled());
return null;
}
};
vm0.invoke(verifyConcurrenyChecks);
vm3.invoke(verifyConcurrenyChecks);
}
public void testNonPersistentProxy() {
Host host = Host.getHost(0);
VM vm1 = host.getVM(0);
VM vm2 = host.getVM(1);
VM vm3 = host.getVM(2);
final String regionName = getName();
SerializableCallable createAccessor = new SerializableCallable() {
@Override
public Object call() throws Exception {
getCache().createRegionFactory(RegionShortcut.PARTITION_PROXY).create(regionName);
return null;
}
};
vm1.invoke(createAccessor);
vm2.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Region r = getCache().createRegionFactory(RegionShortcut.PARTITION_PERSISTENT).create(regionName);
assertTrue(r.getAttributes().getConcurrencyChecksEnabled());
return null;
}
});
vm3.invoke(createAccessor);
SerializableCallable verifyConcurrencyChecks = new SerializableCallable() {
@Override
public Object call() throws Exception {
Region r = getCache().getRegion(regionName);
assertTrue(r.getAttributes().getConcurrencyChecksEnabled());
return null;
}
};
vm1.invoke(verifyConcurrencyChecks);
vm3.invoke(verifyConcurrencyChecks);
}
public void testReplicateAfterPersistent() {
Host host = Host.getHost(0);
VM vm1 = host.getVM(0);
VM vm2 = host.getVM(1);
VM vm3 = host.getVM(2);
final String regionName = getName();
SerializableCallable createPersistentReplicate = new SerializableCallable() {
@Override
public Object call() throws Exception {
Region r = getCache().createRegionFactory(RegionShortcut.REPLICATE_PERSISTENT).create(regionName);
return null;
}
};
SerializableCallable createNonPersistentReplicate = new SerializableCallable() {
@Override
public Object call() throws Exception {
Region r = getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
return null;
}
};
vm1.invoke(createPersistentReplicate);
vm2.invoke(createNonPersistentReplicate);
vm3.invoke(createPersistentReplicate);
}
private static final class RecoveryObserver extends
InternalResourceManager.ResourceObserverAdapter {
final CountDownLatch recoveryDone = new CountDownLatch(1);
@Override
public void rebalancingOrRecoveryFinished(Region region) {
if(region.getName().equals(PR_REGION_NAME)) {
recoveryDone.countDown();
}
}
}
private static class TestFunction implements Function, Serializable {
public void execute(FunctionContext context) {
context.getResultSender().lastResult(null);
}
public String getId() {
return TestFunction.class.getSimpleName();
}
public boolean hasResult() {
return true;
}
public boolean optimizeForWrite() {
return false;
}
public boolean isHA() {
return false;
}
}
private static class BlockGIIMessageObserver extends DistributionMessageObserver {
CountDownLatch cdl = new CountDownLatch(1);
@Override
public void beforeSendMessage(DistributionManager dm,
DistributionMessage message) {
if(message instanceof RequestImageMessage) {
RequestImageMessage rim = (RequestImageMessage) message;
//make sure this is a bucket region doing a GII
if(rim.regionPath.contains("B_")) {
try {
cdl.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
}
private static class TestCustomExpiration implements CustomExpiry {
public void close() {
//do nothing
}
public ExpirationAttributes getExpiry(Entry entry) {
return new ExpirationAttributes((entry.getKey().hashCode() + entry.getValue().hashCode()) % 100, ExpirationAction.INVALIDATE);
}
}
}