blob: 28788e5e9768f318a89649c39c1a0c00f20f014e [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.cache30;
import java.util.HashSet;
import java.util.Set;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.EvictionAction;
import com.gemstone.gemfire.cache.EvictionAttributes;
import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.PartitionAttributesFactory;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.distributed.internal.membership.jgroup.MembershipManagerHelper;
import com.gemstone.gemfire.internal.cache.BucketRegion;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.RegionEntry;
import com.gemstone.gemfire.internal.cache.Token;
import com.gemstone.gemfire.internal.cache.VMCachedDeserializable;
import com.gemstone.gemfire.internal.cache.versions.VMVersionTag;
import com.gemstone.gemfire.internal.cache.versions.VersionSource;
import com.gemstone.gemfire.internal.cache.versions.VersionTag;
import com.gemstone.org.jgroups.Event;
import com.gemstone.org.jgroups.JChannel;
import com.gemstone.org.jgroups.stack.Protocol;
import dunit.Host;
import dunit.SerializableCallable;
import dunit.VM;
/**
* concurrency-control tests for client/server
*
* @author bruce
*
*/
public class PRBucketSynchronizationDUnitTest extends CacheTestCase {
static enum TestType {
IN_MEMORY,
OVERFLOW,
PERSISTENT
};
public static LocalRegion TestRegion;
public PRBucketSynchronizationDUnitTest(String name) {
super(name);
}
public void testThatBucketSyncOnPrimaryLoss() {
doBucketsSyncOnPrimaryLoss(TestType.IN_MEMORY);
}
public void testThatBucketsSyncOnPrimaryLossWithPersistence() {
doBucketsSyncOnPrimaryLoss(TestType.PERSISTENT);
}
public void testThatBucketsSyncOnPrimaryLossWithOverflow() {
doBucketsSyncOnPrimaryLoss(TestType.OVERFLOW);
}
/**
* We hit this problem in bug #45669. A primary was lost and we
* did not see secondary buckets perform a delta-GII.
*/
public void doBucketsSyncOnPrimaryLoss(TestType typeOfTest) {
addExpectedException("killing member's ds");
addExpectedException("killing member's ds");
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
Set<VM> verifyVMs = new HashSet<VM>();
final String name = this.getUniqueName() + "Region";
verifyVMs.add(vm0);
verifyVMs.add(vm1);
verifyVMs.add(vm2);
disconnectAllFromDS();
try {
createRegion(vm0, name, typeOfTest);
createRegion(vm1, name, typeOfTest);
createRegion(vm2, name, typeOfTest);
createEntry1(vm0);
VM primaryOwner;
if (isPrimaryForBucket0(vm0))
primaryOwner = vm0;
else if (isPrimaryForBucket0(vm1))
primaryOwner = vm1;
else
primaryOwner = vm2;
verifyVMs.remove(primaryOwner);
// cause one of the VMs to throw away the next operation
VM creatorVM = null;
InternalDistributedMember primaryID = getID(primaryOwner);
VersionSource primaryVersionID = getVersionID(primaryOwner);
for (VM vm: verifyVMs) {
creatorVM = vm;
createEntry2(creatorVM, primaryID, primaryVersionID);
break;
}
verifyVMs.remove(creatorVM);
// Now we crash the primary bucket owner simulating death during distribution.
// The backup buckets should perform a delta-GII for the lost member and
// get back in sync
crashDistributedSystem(primaryOwner);
for (VM vm: verifyVMs) {
verifySynchronized(vm, primaryID);
}
} finally {
disconnectAllFromDS();
}
}
// private void closeCache(VM vm) {
// vm.invoke(new SerializableCallable() {
// public Object call() throws Exception {
// closeCache();
// return null;
// }
// });
// }
private boolean createEntry1(VM vm) {
return (Boolean)vm.invoke(new SerializableCallable("create entry1") {
public Object call() {
TestRegion.create("Object1", Integer.valueOf(1));
return true;
}
});
}
private boolean isPrimaryForBucket0(VM vm) {
return (Boolean)vm.invoke(new SerializableCallable("is primary?") {
public Object call() {
PartitionedRegion pr = (PartitionedRegion)TestRegion;
return pr.getDataStore().getLocalBucketById(0).getBucketAdvisor().isPrimary();
}
});
}
private InternalDistributedMember getID(VM vm) {
return (InternalDistributedMember)vm.invoke(new SerializableCallable("get dmID") {
public Object call() {
return TestRegion.getCache().getMyId();
}
});
}
private VersionSource getVersionID(VM vm) {
return (VersionSource)vm.invoke(new SerializableCallable("get versionID") {
public Object call() {
return TestRegion.getVersionMember();
}
});
}
private boolean createEntry2(VM vm, final InternalDistributedMember primary, final VersionSource primaryVersionID) {
return (Boolean)vm.invoke(new SerializableCallable("create entry2") {
public Object call() {
// create a fake event that looks like it came from the primary and apply it to
// this cache
PartitionedRegion pr = (PartitionedRegion)TestRegion;
BucketRegion bucket = pr.getDataStore().getLocalBucketById(0);
VersionTag tag = new VMVersionTag();
tag.setMemberID(primaryVersionID);
tag.setRegionVersion(2);
tag.setEntryVersion(1);
tag.setIsRemoteForTesting();
EntryEventImpl event = EntryEventImpl.create(bucket, Operation.CREATE, "Object3", true, primary, true, false);
getLogWriter().info("applying this event to the cache: " + event);
event.setNewValue(new VMCachedDeserializable("value3", 12));
event.setVersionTag(tag);
bucket.getRegionMap().basicPut(event, System.currentTimeMillis(), true, false, null, false, false);
event.release();
// now create a tombstone so we can be sure these are transferred in delta-GII
tag = new VMVersionTag();
tag.setMemberID(primaryVersionID);
tag.setRegionVersion(3);
tag.setEntryVersion(1);
tag.setIsRemoteForTesting();
event = EntryEventImpl.create(bucket, Operation.CREATE, "Object5", true, primary, true, false);
event.setNewValue(Token.TOMBSTONE);
event.setVersionTag(tag);
getLogWriter().info("applying this event to the cache: " + event);
bucket.getRegionMap().basicPut(event, System.currentTimeMillis(), true, false, null, false, false);
event.release();
bucket.dumpBackingMap();
getLogWriter().info("bucket version vector is now " + bucket.getVersionVector().fullToString());
assertTrue("bucket should hold entry Object3 now", bucket.containsKey("Object3"));
return true;
}
});
}
private void verifySynchronized(VM vm, final InternalDistributedMember crashedMember) {
vm.invoke(new SerializableCallable("check that synchronization happened") {
public Object call() throws Exception {
PartitionedRegion pr = (PartitionedRegion)TestRegion;
final BucketRegion bucket = pr.getDataStore().getLocalBucketById(0);
waitForCriterion(new WaitCriterion() {
String waitingFor = "primary is still in membership view: " + crashedMember;
boolean dumped = false;
public boolean done() {
if (TestRegion.getCache().getDistributionManager().isCurrentMember(crashedMember)) {
getLogWriter().info(waitingFor);
return false;
}
if (!TestRegion.containsKey("Object3")) {
waitingFor = "entry for Object3 not found";
getLogWriter().info(waitingFor);
return false;
}
RegionEntry re = bucket.getRegionMap().getEntry("Object5");
if (re == null) {
if (!dumped) {
dumped = true;
bucket.dumpBackingMap();
}
waitingFor = "entry for Object5 not found";
getLogWriter().info(waitingFor);
return false;
}
if (!re.isTombstone()) {
if (!dumped) {
dumped = true;
bucket.dumpBackingMap();
}
waitingFor = "Object5 is not a tombstone but should be: " + re;
getLogWriter().info(waitingFor);
return false;
}
return true;
}
public String description() {
return waitingFor;
}
}, 30000, 5000, true);
return null;
}
});
}
private void createRegion(VM vm, final String regionName, final TestType typeOfTest) {
SerializableCallable createRegion = new SerializableCallable() {
@SuppressWarnings("deprecation")
public Object call() throws Exception {
// TombstoneService.VERBOSE = true;
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.PARTITION);
af.setPartitionAttributes((new PartitionAttributesFactory()).setTotalNumBuckets(2).setRedundantCopies(3).create());
switch (typeOfTest) {
case IN_MEMORY:
break;
case PERSISTENT:
af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
break;
case OVERFLOW:
af.setEvictionAttributes(EvictionAttributes.createLRUEntryAttributes(
5, EvictionAction.OVERFLOW_TO_DISK));
break;
}
TestRegion = (LocalRegion)createRootRegion(regionName, af.create());
return null;
}
};
vm.invoke(createRegion);
}
}