blob: 4bbe63056119bcd1947f94d67f6626b1d47021a3 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
/**
* Test various distributed aspects of transactions,
* e.g. locking/reservation symantics that do not need multiple Region
* configurations. For those tests see
* <code>MultiVMRegionTestCase</code>.
*
*
* @author Mitch Thomas
* @since 4.0
* @see MultiVMRegionTestCase
*
*/
package com.gemstone.gemfire.cache30;
//import com.gemstone.gemfire.*;
import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.cache.*;
import com.gemstone.gemfire.internal.cache.*;
import dunit.*;
import com.gemstone.gemfire.internal.cache.locks.TXLockService;
import com.gemstone.gemfire.internal.cache.locks.TXLockServiceImpl;
//import com.gemstone.gemfire.internal.cache.locks.TXLockId;
import com.gemstone.gemfire.internal.cache.locks.TXLockBatch;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.distributed.internal.ResourceEvent;
import com.gemstone.gemfire.distributed.internal.ResourceEventsListener;
import com.gemstone.gemfire.distributed.internal.locks.DLockService;
import com.gemstone.gemfire.distributed.internal.locks.DLockBatch;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.distributed.internal.membership.jgroup.MembershipManagerHelper;
import com.gemstone.org.jgroups.Event;
import com.gemstone.org.jgroups.JChannel;
import com.gemstone.org.jgroups.stack.Protocol;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.junit.Ignore;
public class TXDistributedDUnitTest extends CacheTestCase {
public TXDistributedDUnitTest(String name) {
super(name);
}
protected RegionAttributes getRegionAttributes() {
return this.getRegionAttributes(Scope.DISTRIBUTED_ACK);
}
protected RegionAttributes getRegionAttributes(Scope scope) {
AttributesFactory factory = new AttributesFactory();
factory.setScope(scope);
if (scope.isDistributedAck()) {
factory.setEarlyAck(false);
}
return factory.create();
}
/**
* Test a remote grantor
*/
public void testRemoteGrantor() throws Exception {
addExpectedException("killing members ds");
final CacheTransactionManager txMgr = this.getCache().getCacheTransactionManager();
final String rgnName = getUniqueName();
Region rgn = getCache().createRegion(rgnName, getRegionAttributes());
rgn.create("key", null);
invokeInEveryVM(new SerializableRunnable("testRemoteGrantor: initial configuration") {
public void run() {
try {
Region rgn1 = getCache().createRegion(rgnName, getRegionAttributes());
rgn1.put("key", "val0");
} catch (CacheException e) {
fail("While creating region", e);
}
}
});
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
// VM vm1 = host.getVM(1);
// VM vm2 = host.getVM(2);
vm0.invoke(new SerializableRunnable("testRemoteGrantor: remote grantor init") {
public void run() {
try {
Region rgn1 = getCache().getRegion(rgnName);
final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
txMgr2.begin();
rgn1.put("key", "val1");
txMgr2.commit();
assertNotNull(TXLockService.getDTLS());
assertTrue(TXLockService.getDTLS().isLockGrantor());
} catch (CacheException e) {
fail("While performing first transaction");
}
}
});
// fix for bug 38843 causes the DTLS to be created in every TX participant
assertNotNull(TXLockService.getDTLS());
assertFalse(TXLockService.getDTLS().isLockGrantor());
assertEquals("val1", rgn.getEntry("key").getValue());
vm0.invoke(new SerializableRunnable("Disconnect from DS, remote grantor death") {
public void run() {
try {
MembershipManagerHelper.crashDistributedSystem(getSystem());
} finally {
// Allow getCache() to re-establish a ds connection
closeCache();
}
}
});
// Make this VM the remote Grantor
txMgr.begin();
rgn.put("key", "val2");
txMgr.commit();
assertNotNull(TXLockService.getDTLS());
assertTrue(TXLockService.getDTLS().isLockGrantor());
SerializableRunnable remoteComm =
new SerializableRunnable("testRemoteGrantor: remote grantor commit") {
public void run() {
try {
Cache c = getCache();
CacheTransactionManager txMgr2 = c.getCacheTransactionManager();
Region rgn1 = c.getRegion(rgnName);
if (rgn1 == null) {
// This block should only execute on VM0
rgn1 = c.createRegion(rgnName, getRegionAttributes());
}
txMgr2.begin();
rgn1.put("key", "val3");
txMgr2.commit();
if (TXLockService.getDTLS() != null) {
assertTrue(!TXLockService.getDTLS().isLockGrantor());
}
} catch (CacheException e) {
fail("While creating region", e);
}
}
};
invokeInEveryVM(remoteComm);
// vm1.invoke(remoteComm);
// vm2.invoke(remoteComm);
assertNotNull(TXLockService.getDTLS());
assertTrue(TXLockService.getDTLS().isLockGrantor());
assertEquals("val3", rgn.getEntry("key").getValue());
rgn.destroyRegion();
}
/**
* Test the internal callbacks used for what else... testing
*/
public void testInternalCallbacks() throws Exception {
final CacheTransactionManager txMgr = this.getCache().getCacheTransactionManager();
final String rgnName1 = getUniqueName() + "_1";
final String rgnName2 = getUniqueName() + "_2";
final String rgnName3 = getUniqueName() + "_3";
Region rgn1 = getCache().createRegion(rgnName1, getRegionAttributes());
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
SerializableRunnable createRgn =
new SerializableRunnable("testInternalCallbacks: initial configuration") {
public void run() {
try {
Region rgn1a = getCache().createRegion(rgnName1, getRegionAttributes());
Region rgn2 = getCache().createRegion(rgnName2, getRegionAttributes());
Region rgn3 = getCache().createRegion(rgnName3, getRegionAttributes(Scope.DISTRIBUTED_NO_ACK));
rgn1a.create("key", null);
rgn2.create("key", null);
rgn3.create("key", null);
} catch (CacheException e) {
fail("While creating region", e);
}
}
};
vm0.invoke(createRgn);
vm1.invoke(createRgn);
// Standard commit check
txMgr.begin();
rgn1.put("key", "value0");
txMgr.commit();
SerializableRunnable checkRgn1 =
new SerializableRunnable("testInternalCallbacks: check rgn1 valus") {
public void run() {
Region rgn1a = getCache().getRegion(rgnName1);
assertNotNull(rgn1a);
assertEquals("value0", rgn1a.getEntry("key").getValue());
}
};
vm0.invoke(checkRgn1);
vm1.invoke(checkRgn1);
{
final byte cbSensors[] = {0,0,0,0,0,0,0,0,0};
txMgr.begin();
((TXStateProxyImpl)((TXManagerImpl)txMgr).getTXState()).forceLocalBootstrap();
setInternalCallbacks(((TXManagerImpl)txMgr).getTXState(), cbSensors);
rgn1.put("key", "value1");
txMgr.commit();
for(int i=cbSensors.length-3; i>=0; --i) {
assertEquals("Internal callback " + i + " was not called the expected number of times!",
(byte) 1, cbSensors[i]);
}
for(int i=cbSensors.length-1; i>cbSensors.length-3; --i) {
assertEquals("Internal \"during\" callback " + i + " invoked an unexpected number of times!",
(byte) 2, cbSensors[i]);
}
}
SerializableRunnable checkRgn1Again =
new SerializableRunnable("testInternalCallbacks: validate remote values") {
public void run() {
Region rgn1a = getCache().getRegion(rgnName1);
assertNotNull(rgn1a);
assertEquals("value1", rgn1a.getEntry("key").getValue());
}
};
vm0.invoke(checkRgn1Again);
vm1.invoke(checkRgn1Again);
// Try 2 regions
Region rgn2 = getCache().createRegion(rgnName2, getRegionAttributes());
txMgr.begin();
rgn1.put("key", "value2");
rgn2.put("key", "value2");
txMgr.commit();
SerializableRunnable checkRgn12 =
new SerializableRunnable("testInternalCallbacks: check rgn1 valus") {
public void run() {
Region rgn1a = getCache().getRegion(rgnName1);
assertNotNull(rgn1a);
assertEquals("value2", rgn1a.getEntry("key").getValue());
Region rgn2a = getCache().getRegion(rgnName2);
assertNotNull(rgn2a);
assertEquals("value2", rgn2a.getEntry("key").getValue());
}
};
vm0.invoke(checkRgn12);
vm1.invoke(checkRgn12);
{
final byte cbSensors[] = {0,0,0,0,0,0,0,0,0};
txMgr.begin();
((TXStateProxyImpl)((TXManagerImpl)txMgr).getTXState()).forceLocalBootstrap();
setInternalCallbacks(((TXManagerImpl)txMgr).getTXState(), cbSensors);
rgn1.put("key", "value3");
rgn2.put("key", "value3");
txMgr.commit();
for(int i=cbSensors.length-3; i>=0; i--) {
assertEquals("Internal callback " + i + " was not called the expected number of times!",
(byte) 1, cbSensors[i]);
}
for(int i=cbSensors.length-1; i> cbSensors.length-3; --i) {
assertEquals("Internal \"during\" callback " + i + " invoked an unexpected number of times!",
(byte) 2, cbSensors[i]);
}
}
SerializableRunnable checkRgn12Again =
new SerializableRunnable("testInternalCallbacks: validate both regions remote values") {
public void run() {
Region rgn1a = getCache().getRegion(rgnName1);
assertNotNull(rgn1a);
assertEquals("value3", rgn1a.getEntry("key").getValue());
Region rgn2a = getCache().getRegion(rgnName2);
assertNotNull(rgn2a);
assertEquals("value3", rgn2a.getEntry("key").getValue());
}
};
vm0.invoke(checkRgn12Again);
vm1.invoke(checkRgn12Again);
// Try a third region (D_NO_ACK)
Region rgn3 = getCache().createRegion(rgnName3, getRegionAttributes(Scope.DISTRIBUTED_NO_ACK));
txMgr.begin();
rgn1.put("key", "value4");
rgn2.put("key", "value4");
rgn3.put("key", "value4");
txMgr.commit();
SerializableRunnable checkRgn123 =
new SerializableRunnable("testInternalCallbacks: check rgn1 valus") {
public void run() {
Region rgn1a = getCache().getRegion(rgnName1);
assertNotNull(rgn1a);
assertEquals("value4", rgn1a.getEntry("key").getValue());
Region rgn2a = getCache().getRegion(rgnName2);
assertNotNull(rgn2a);
assertEquals("value4", rgn2a.getEntry("key").getValue());
Region rgn3a = getCache().getRegion(rgnName3);
assertNotNull(rgn3a);
assertEquals("value4", rgn3a.getEntry("key").getValue());
}
};
vm0.invoke(checkRgn123);
vm1.invoke(checkRgn123);
{
final byte cbSensors[] = {0,0,0,0,0,0,0,0,0};
txMgr.begin();
((TXStateProxyImpl)((TXManagerImpl)txMgr).getTXState()).forceLocalBootstrap();
setInternalCallbacks(((TXManagerImpl)txMgr).getTXState(), cbSensors);
rgn1.put("key", "value5");
rgn2.put("key", "value5");
rgn3.put("key", "value5");
txMgr.commit();
for(int i=cbSensors.length-3; i>=0; i--) {
assertEquals("Internal callback " + i + " was not called the expected number of times!",
(byte) 1, cbSensors[i]);
}
for(int i=cbSensors.length-1; i> cbSensors.length-3; --i) {
assertEquals("Internal \"during\" callback " + i + " invoked an unexpected number of times!",
(byte) 2, cbSensors[i]);
}
}
SerializableRunnable checkRgn123Again =
new SerializableRunnable("testInternalCallbacks: validate both regions remote values") {
public void run() {
Region rgn1a = getCache().getRegion(rgnName1);
assertNotNull(rgn1a);
assertEquals("value5", rgn1a.getEntry("key").getValue());
Region rgn2a = getCache().getRegion(rgnName2);
assertNotNull(rgn2a);
assertEquals("value5", rgn2a.getEntry("key").getValue());
Region rgn3a = getCache().getRegion(rgnName3);
assertNotNull(rgn3a);
assertEquals("value5", rgn3a.getEntry("key").getValue());
}
};
vm0.invoke(checkRgn123Again);
vm1.invoke(checkRgn123Again);
rgn1.destroyRegion();
rgn2.destroyRegion();
}
static final void setInternalCallbacks(TXStateInterface txp, final byte[] cbSensors) {
((TXStateProxyImpl)txp).forceLocalBootstrap();
TXState tx = (TXState)((TXStateProxyImpl)txp).getRealDeal(null,null);
assertEquals(9, cbSensors.length);
tx.setAfterReservation(new Runnable() {
public void run() {
cbSensors[0]++;
}
});
tx.setAfterConflictCheck(new Runnable() {
public void run() {
cbSensors[1]++;
}
});
tx.setAfterApplyChanges(new Runnable() {
public void run() {
cbSensors[2]++;
}
});
tx.setAfterReleaseLocalLocks(new Runnable() {
public void run() {
cbSensors[3]++;
}
});
tx.setAfterIndividualSend(new Runnable() {
public void run() {
cbSensors[4]++;
}
});
tx.setAfterIndividualCommitProcess(new Runnable() {
public void run() {
cbSensors[5]++;
}
});
tx.setAfterSend(new Runnable() {
public void run() {
cbSensors[6]++;
}
});
tx.setDuringIndividualSend(new Runnable() {
public void run() {
cbSensors[7]++;
}
});
tx.setDuringIndividualCommitProcess(new Runnable() {
public void run() {
cbSensors[8]++;
}
});
}
/**
* Test distributed ack transactions that consist only of
* data from loaded values
*/
public void testDACKLoadedMessage() throws Exception {
final CacheTransactionManager txMgr = this.getCache().getCacheTransactionManager();
final String rgnName = getUniqueName();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setEarlyAck(false);
factory.setCacheLoader(new CacheLoader() {
public Object load(LoaderHelper helper) {
return "val" + helper.getArgument();
}
public void close() {}
});
Region rgn = getCache().createRegion(rgnName, factory.create());
invokeInEveryVM(new SerializableRunnable("testDACKLoadedMessage: intial configuration") {
public void run() {
try {
AttributesFactory factory2 = new AttributesFactory();
factory2.setScope(Scope.DISTRIBUTED_ACK);
factory2.setEarlyAck(false);
// factory.setDataPolicy(DataPolicy.REPLICATE);
factory2.setMirrorType(MirrorType.KEYS);
getCache().createRegion(rgnName, factory2.create());
} catch (CacheException e) {
fail("While creating region", e);
}
}
});
// Confirm the standard case
txMgr.begin();
rgn.put("key1", "val1");
txMgr.commit();
assertEquals("val1", rgn.getEntry("key1").getValue());
invokeInEveryVM(new SerializableRunnable("testDACKLoadedMessage: confirm standard case") {
public void run() {
Region rgn1 = getCache().getRegion(rgnName);
assertEquals("val1", rgn1.getEntry("key1").getValue());
}
});
// Confirm loaded value case
txMgr.begin();
rgn.get("key2", new Integer(2));
txMgr.commit();
assertEquals("val2", rgn.getEntry("key2").getValue());
invokeInEveryVM(new SerializableRunnable("testDACKLoadedMessage: confirm standard case") {
public void run() {
Region rgn1 = getCache().getRegion(rgnName);
assertEquals("val2", rgn1.getEntry("key2").getValue());
}
});
// This should use the ack w/ the lockid
txMgr.begin();
rgn.put("key3", "val3");
rgn.get("key4", new Integer(4));
txMgr.commit();
invokeInEveryVM(new SerializableRunnable("testDACKLoadedMessage: confirm standard case") {
public void run() {
Region rgn1 = getCache().getRegion(rgnName);
assertEquals("val3", rgn1.getEntry("key3").getValue());
assertEquals("val4", rgn1.getEntry("key4").getValue());
}
});
}
public void testHighAvailabilityFeatures() throws Exception {
addExpectedException("killing members ds");
// final CacheTransactionManager txMgr = this.getCache().getCacheTransactionManager();
// final TXManagerImpl txMgrImpl = (TXManagerImpl) txMgr;
final String rgnName = getUniqueName();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setEarlyAck(false);
Region rgn = getCache().createRegion(rgnName, factory.create());
invokeInEveryVM(new SerializableRunnable("testHighAvailabilityFeatures: intial region configuration") {
public void run() {
try {
AttributesFactory factory2 = new AttributesFactory();
factory2.setScope(Scope.DISTRIBUTED_ACK);
factory2.setEarlyAck(false);
factory2.setDataPolicy(DataPolicy.REPLICATE);
getCache().createRegion(rgnName, factory2.create());
} catch (CacheException e) {
fail("While creating region", e);
}
}
});
// create entries
rgn.put("key0", "val0_0");
rgn.put("key1", "val1_0");
Host host = Host.getHost(0);
// This test assumes that there are at least three VMs; the origin and two recipients
assertTrue(host.getVMCount() >= 3);
final VM originVM = host.getVM(0);
// Test that there is no commit after a partial commit message
// send (only sent to a minority of the recipients)
originVM.invoke(new SerializableRunnable("Flakey DuringIndividualSend Transaction") {
public void run() {
final Region rgn1 = getCache().getRegion(rgnName);
assertNotNull(rgn1);
try {
final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
final CacheTransactionManager txMgrImpl = txMgr2;
txMgr2.begin();
// 1. setup an internal callback on originVM that will call
// disconnectFromDS() on the 2nd duringIndividualSend
// call.
((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).forceLocalBootstrap();
TXState txState = (TXState)((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).getRealDeal(null,null);
txState.setDuringIndividualSend(new Runnable() {
private int numCalled = 0;
public synchronized void run() {
++numCalled;
rgn1.getCache().getLogger().info("setDuringIndividualSend Runnable called " + numCalled + " times");
if (numCalled > 1) {
MembershipManagerHelper.crashDistributedSystem(getSystem());
}
}
});
rgn1.put("key0", "val0_1");
rgn1.put("key1", "val1_1");
// 2. commit a transaction in originVM, it will disconnect from the DS
txMgr2.commit();
}
catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
}
catch (Throwable e) {
rgn1.getCache().getLogger().warning("Ignoring Exception", e);
}
finally {
// Allow this VM to re-connect to the DS upon getCache() call
closeCache();
}
}
});
// 3. verify on all VMs that the transaction was not committed
final SerializableRunnable noChangeValidator =
new SerializableRunnable("testHighAvailabilityFeatures: validate no change in Region") {
public void run() {
Region rgn1 = getCache().getRegion(rgnName);
if (rgn1 == null) {
// Expect a null region from originVM
try {
AttributesFactory factory2 = new AttributesFactory();
factory2.setScope(Scope.DISTRIBUTED_ACK);
factory2.setEarlyAck(false);
factory2.setDataPolicy(DataPolicy.REPLICATE);
rgn1 = getCache().createRegion(rgnName, factory2.create());
} catch (CacheException e) {
fail("While creating region", e);
}
}
Region.Entry re = rgn1.getEntry("key0");
assertNotNull(re);
assertEquals("val0_0", re.getValue());
re = rgn1.getEntry("key1");
assertNotNull(re);
assertEquals("val1_0", re.getValue());
}
};
invokeInEveryVM(noChangeValidator);
// Test that there is no commit after sending to all recipients
// but prior to sending the "commit process" message
originVM.invoke(new SerializableRunnable("Flakey AfterIndividualSend Transaction") {
public void run() {
final Region rgn1 = getCache().getRegion(rgnName);
assertNotNull(rgn1);
try {
final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
final CacheTransactionManager txMgrImpl = txMgr2;
txMgr2.begin();
// 1. setup an internal callback on originVM that will call
// disconnectFromDS() on AfterIndividualSend
((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).forceLocalBootstrap();
TXState txState = (TXState)((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).getRealDeal(null,null);
txState.setAfterIndividualSend(new Runnable() {
public synchronized void run() {
MembershipManagerHelper.crashDistributedSystem(getSystem());
}
});
rgn1.put("key0", "val0_2");
rgn1.put("key1", "val1_2");
// 2. commit a transaction in originVM, it will disconnect from the DS
txMgr2.commit();
}
catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
}
catch (Throwable e) {
rgn1.getCache().getLogger().warning("Ignoring Exception", e);
}
finally {
// Allow this VM to re-connect to the DS upon getCache() call
closeCache();
}
}
});
// 3. verify on all VMs, including the origin, that the transaction was not committed
invokeInEveryVM(noChangeValidator);
// Test commit success upon a single commit process message received.
originVM.invoke(new SerializableRunnable("Flakey DuringIndividualCommitProcess Transaction") {
public void run() {
final Region rgn1 = getCache().getRegion(rgnName);
assertNotNull(rgn1);
try {
final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
final CacheTransactionManager txMgrImpl = txMgr2;
txMgr2.begin();
((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).forceLocalBootstrap();
TXState txState = (TXState)((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).getRealDeal(null,null);
// 1. setup an internal callback on originVM that will call
// disconnectFromDS() on the 2nd internalDuringIndividualCommitProcess
// call.
txState.setDuringIndividualCommitProcess(new Runnable() {
private int numCalled = 0;
public synchronized void run() {
++numCalled;
rgn1.getCache().getLogger().info("setDuringIndividualCommitProcess Runnable called " + numCalled + " times");
if (numCalled > 1) {
MembershipManagerHelper.crashDistributedSystem(getSystem());
}
}
});
rgn1.put("key0", "val0_3");
rgn1.put("key1", "val1_3");
// 2. commit a transaction in originVM, it will disconnect from the DS
txMgr2.commit();
}
catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
}
catch (Throwable e) {
rgn1.getCache().getLogger().warning("Ignoring Exception", e);
}
finally {
// Allow this VM to re-connect to the DS upon getCache() call
closeCache();
}
}
});
// 3. verify on all VMs that the transaction was committed (including the orgin, due to GII)
SerializableRunnable nonSoloChangeValidator1 =
new SerializableRunnable("testHighAvailabilityFeatures: validate v1 non-solo Region changes") {
public void run() {
Region rgn1 = getCache().getRegion(rgnName);
if (rgn1 == null) {
// Expect a null region from originVM
try {
AttributesFactory factory2 = new AttributesFactory();
factory2.setScope(Scope.DISTRIBUTED_ACK);
factory2.setEarlyAck(false);
factory2.setDataPolicy(DataPolicy.REPLICATE);
rgn1 = getCache().createRegion(rgnName, factory2.create());
} catch (CacheException e) {
fail("While creating region", e);
}
}
Region.Entry re = rgn1.getEntry("key0");
assertNotNull(re);
assertEquals("val0_3", re.getValue());
re = rgn1.getEntry("key1");
assertNotNull(re);
assertEquals("val1_3", re.getValue());
}
};
invokeInEveryVM(nonSoloChangeValidator1);
// Verify successful solo region commit after duringIndividualSend
// (same as afterIndividualSend).
// Create a region that only exists on the origin and another VM
final String soloRegionName = getUniqueName() + "_solo";
SerializableRunnable createSoloRegion =
new SerializableRunnable("testHighAvailabilityFeatures: solo region configuration") {
public void run() {
try {
AttributesFactory factory2 = new AttributesFactory();
factory2.setScope(Scope.DISTRIBUTED_ACK);
factory2.setEarlyAck(false);
factory2.setDataPolicy(DataPolicy.REPLICATE);
Region rgn1 = getCache().createRegion(soloRegionName, factory2.create());
rgn1.put("soloKey0", "soloVal0_0");
rgn1.put("soloKey1", "soloVal1_0");
} catch (CacheException e) {
fail("While creating region", e);
}
}
};
final VM soloRegionVM = host.getVM(1);
originVM.invoke(createSoloRegion);
soloRegionVM.invoke(createSoloRegion);
originVM.invoke(new SerializableRunnable("Flakey solo region DuringIndividualSend Transaction") {
public void run() {
final Region soloRgn = getCache().getRegion(soloRegionName);
assertNotNull(soloRgn);
try {
final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
final CacheTransactionManager txMgrImpl = txMgr2;
txMgr2.begin();
// 1. setup an internal callback on originVM that will call
// disconnectFromDS() on the 2nd duringIndividualSend
// call.
((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).forceLocalBootstrap();
TXState txState = (TXState)((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).getRealDeal(null,null);
txState.setDuringIndividualSend(new Runnable() {
private int numCalled = 0;
public synchronized void run() {
++numCalled;
soloRgn.getCache().getLogger().info("setDuringIndividualSend Runnable called " + numCalled + " times");
if (numCalled > 1) {
MembershipManagerHelper.crashDistributedSystem(getSystem());
}
}
});
soloRgn.put("soloKey0", "soloVal0_1");
soloRgn.put("soloKey1", "soloVal1_1");
// 2. commit a transaction in originVM, it will disconnect from the DS
txMgr2.commit();
}
catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
}
catch (Throwable e) {
soloRgn.getCache().getLogger().warning("Ignoring Exception", e);
}
finally {
// Allow this VM to re-connect to the DS upon getCache() call
closeCache();
}
}
});
// 3. verify on the soloRegionVM that the transaction was committed
final SerializableRunnable soloRegionCommitValidator1 =
new SerializableRunnable("testHighAvailabilityFeatures: validate successful v1 commit in solo Region") {
public void run() {
Region soloRgn = getCache().getRegion(soloRegionName);
if (soloRgn == null) {
// Expect a null region from originVM
try {
AttributesFactory factory2 = new AttributesFactory();
factory2.setScope(Scope.DISTRIBUTED_ACK);
factory2.setEarlyAck(false);
factory2.setDataPolicy(DataPolicy.REPLICATE);
soloRgn = getCache().createRegion(soloRegionName, factory2.create());
} catch (CacheException e) {
fail("While creating region ", e);
}
}
Region.Entry re = soloRgn.getEntry("soloKey0");
assertNotNull(re);
assertEquals("soloVal0_1", re.getValue());
re = soloRgn.getEntry("soloKey1");
assertNotNull(re);
assertEquals("soloVal1_1", re.getValue());
}
};
originVM.invoke(soloRegionCommitValidator1);
soloRegionVM.invoke(soloRegionCommitValidator1);
// verify no change in nonSolo region, re-establish region in originVM
invokeInEveryVM(nonSoloChangeValidator1);
// Verify no commit for failed send (afterIndividualSend) for solo
// Region combined with non-solo Region
originVM.invoke(new SerializableRunnable("Flakey mixed (solo+non-solo) region DuringIndividualSend Transaction") {
public void run() {
final Region rgn1 = getCache().getRegion(rgnName);
assertNotNull(rgn1);
final Region soloRgn = getCache().getRegion(soloRegionName);
assertNotNull(soloRgn);
try {
final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
final CacheTransactionManager txMgrImpl = txMgr2;
txMgr2.begin();
// 1. setup an internal callback on originVM that will call
// disconnectFromDS() on the afterIndividualSend
// call.
((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).forceLocalBootstrap();
TXState txState = (TXState)((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).getRealDeal(null,null);
txState.setAfterIndividualSend(new Runnable() {
public synchronized void run() {
MembershipManagerHelper.crashDistributedSystem(getSystem());
}
});
rgn1.put("key0", "val0_4");
rgn1.put("key1", "val1_4");
soloRgn.put("soloKey0", "soloVal0_2");
soloRgn.put("soloKey1", "soloVal1_2");
// 2. commit a transaction in originVM, it will disconnect from the DS
txMgr2.commit();
}
catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
}
catch (Throwable e) {
rgn1.getCache().getLogger().warning("Ignoring Exception", e);
}
finally {
// Allow this VM to re-connect to the DS upon getCache() call
closeCache();
}
}
});
// Origin and Solo Region VM should be the same as last validation
originVM.invoke(soloRegionCommitValidator1);
soloRegionVM.invoke(soloRegionCommitValidator1);
invokeInEveryVM(nonSoloChangeValidator1);
// Verify commit after sending a single
// (duringIndividualCommitProcess) commit process for solo Region
// combined with non-solo Region
originVM.invoke(new SerializableRunnable("Flakey mixed (solo+non-solo) region DuringIndividualCommitProcess Transaction") {
public void run() {
final Region rgn1 = getCache().getRegion(rgnName);
assertNotNull(rgn1);
final Region soloRgn = getCache().getRegion(soloRegionName);
assertNotNull(soloRgn);
try {
final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
final CacheTransactionManager txMgrImpl = txMgr2;
txMgr2.begin();
// 1. setup an internal callback on originVM that will call
// disconnectFromDS() on the afterIndividualSend
// call.
((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).forceLocalBootstrap();
TXState txState = (TXState)((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).getRealDeal(null,null);
txState.setAfterIndividualSend(new Runnable() {
private int numCalled = 0;
public synchronized void run() {
++numCalled;
rgn1.getCache().getLogger().info("setDuringIndividualCommitProcess Runnable called " + numCalled + " times");
if (numCalled > 1) {
MembershipManagerHelper.crashDistributedSystem(getSystem());
}
}
});
rgn1.put("key0", "val0_5");
rgn1.put("key1", "val1_5");
soloRgn.put("soloKey0", "soloVal0_3");
soloRgn.put("soloKey1", "soloVal1_3");
// 2. commit a transaction in originVM, it will disconnect from the DS
txMgr2.commit();
}
catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
}
catch (Throwable e) {
rgn1.getCache().getLogger().warning("Ignoring Exception", e);
}
finally {
// Allow this VM to re-connect to the DS upon getCache() call
closeCache();
}
}
});
final SerializableRunnable soloRegionCommitValidator2 =
new SerializableRunnable("testHighAvailabilityFeatures: validate successful v2 commit in solo Region") {
public void run() {
Region soloRgn = getCache().getRegion(soloRegionName);
if (soloRgn == null) {
// Expect a null region from originVM
try {
AttributesFactory factory2 = new AttributesFactory();
factory2.setScope(Scope.DISTRIBUTED_ACK);
factory2.setEarlyAck(false);
factory2.setDataPolicy(DataPolicy.REPLICATE);
soloRgn = getCache().createRegion(soloRegionName, factory2.create());
} catch (CacheException e) {
fail("While creating region ", e);
}
}
Region.Entry re = soloRgn.getEntry("soloKey0");
assertNotNull(re);
assertEquals("soloVal0_3", re.getValue());
re = soloRgn.getEntry("soloKey1");
assertNotNull(re);
assertEquals("soloVal1_3", re.getValue());
}
};
originVM.invoke(soloRegionCommitValidator2);
soloRegionVM.invoke(soloRegionCommitValidator2);
SerializableRunnable nonSoloChangeValidator2 =
new SerializableRunnable("testHighAvailabilityFeatures: validate v2 non-solo Region changes") {
public void run() {
Region rgn1 = getCache().getRegion(rgnName);
if (rgn1 == null) {
// Expect a null region from originVM
try {
AttributesFactory factory2 = new AttributesFactory();
factory2.setScope(Scope.DISTRIBUTED_ACK);
factory2.setEarlyAck(false);
factory2.setDataPolicy(DataPolicy.REPLICATE);
rgn1 = getCache().createRegion(rgnName, factory2.create());
} catch (CacheException e) {
fail("While creating region", e);
}
}
Region.Entry re = rgn1.getEntry("key0");
assertNotNull(re);
assertEquals("val0_5", re.getValue());
re = rgn1.getEntry("key1");
assertNotNull(re);
assertEquals("val1_5", re.getValue());
}
};
invokeInEveryVM(nonSoloChangeValidator2);
}
/**
* A class used in testLockBatchParticipantsUpdate to pause a transaction in the
* afterResrvation and afterSend states.
*/
static public class PausibleTX implements Runnable {
public boolean isRunning = false;
public String rgnName = null;
public Cache myCache = null;
public Object key = null;
public Object value = null;
public boolean getIsRunning() {
return this.isRunning;
}
public void run() {
Region rgn = this.myCache.getRegion(this.rgnName);
final CacheTransactionManager txMgr = this.myCache.getCacheTransactionManager();
txMgr.begin();
((TXStateProxyImpl)((TXManagerImpl)txMgr).getTXState()).forceLocalBootstrap();
TXState txState = (TXState)((TXStateProxyImpl)((TXManagerImpl)txMgr).getTXState()).getRealDeal(null,null);
txState.setAfterReservation(new Runnable() {
public void run() {
try {
synchronized (PausibleTX.class) {
PausibleTX.this.isRunning = true;
// Notify the thread that created this, that we are ready
PausibleTX.class.notifyAll();
// Wait for the controller to start a GII and let us proceed
PausibleTX.class.wait();
}
}
catch (InterruptedException ie) {
// PausibleTX.this.myCache.getLogger().info("Why was I interrupted? " + ie);
fail("interrupted");
}
}
});
txState.setAfterSend(new Runnable() {
public void run() {
try {
synchronized (PausibleTX.class) {
// Notify the controller that we have sent the TX data (and the
// update)
PausibleTX.class.notifyAll();
// Wait until the controller has determined in fact the update
// took place
PausibleTX.class.wait();
}
}
catch (InterruptedException ie) {
// PausibleTX.this.myCache.getLogger().info("Why was I interrupted? " + ie);
fail("interrupted");
}
}
});
try {
rgn.put(key, value);
txMgr.commit();
} catch (CommitConflictException cce) {
fail("Did not expect commit conflict exception when sending updates to new members in PausibleTX" + cce);
// } catch (CacheException ce) {
// fail("Did not expect cache exception when sending updates to new members in PausibleTX" + ce);
}
}
}
/**
* Returns the GemFire system ID of the VM on which this method is run
*/
public static Serializable getSystemId() {
Serializable ret = null;
if (DistributedTestCase.system != null) {
ret = DistributedTestCase.system.getDistributionManager().getId();
}
return ret;
}
static HashSet preTXSystemIds;
public static void setPreTXSystemIds(HashSet ids) {
TXDistributedDUnitTest.preTXSystemIds = ids;
}
static HashSet postTXSystemIds;
public static void setPostTXSystemIds(HashSet ids) {
TXDistributedDUnitTest.postTXSystemIds = ids;
}
static Serializable txHostId;
public static void setTXHostSystemId(Serializable id) {
TXDistributedDUnitTest.txHostId = id;
}
/**
* Test update of lock batch participants (needed when new members are
* discovered between a commit's locking phase and the applicatoin of the
* Region's data. See bug 32999
*/
public void testLockBatchParticipantsUpdate() throws Exception {
// final CacheTransactionManager txMgr = this.getCache().getCacheTransactionManager();
final String rgnName = getUniqueName();
Region rgn = getCache().createRegion(rgnName, getRegionAttributes());
rgn.create("key", null);
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
SerializableRunnable initRegions =
new SerializableRunnable("testLockBatchParticipantsUpdate: initial configuration") {
public void run() {
try {
Region rgn1 = getCache().createRegion(rgnName, getRegionAttributes());
rgn1.create("key", null);
} catch (CacheException e) {
fail("While creating region", e);
}
}
};
vm0.invoke(initRegions);
vm1.invoke(initRegions);
rgn.put("key", "val1");
// Connect vm2 also since it may have been shutdown when logPerTest
// is turned on
vm2.invoke(new SerializableRunnable("connect vm2 if not connected") {
public void run() {
getCache();
}
});
// Make VM0 the Grantor
vm0.invoke(new SerializableRunnable("testLockBatchParticipantsUpdate: remote grantor init") {
public void run() {
try {
Region rgn1 = getCache().getRegion(rgnName);
final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
assertEquals("val1", rgn1.getEntry("key").getValue());
txMgr2.begin();
rgn1.put("key", "val2");
txMgr2.commit();
assertNotNull(TXLockService.getDTLS());
assertTrue(TXLockService.getDTLS().isLockGrantor());
} catch (CacheException e) {
fail("While performing first transaction");
}
}
});
// fix for bug 38843 causes the DTLS to be created in every TX participant
assertNotNull(TXLockService.getDTLS());
assertFalse(TXLockService.getDTLS().isLockGrantor());
assertEquals("val2", rgn.getEntry("key").getValue());
// Build sets of System Ids and set them up on VM0 for future batch member checks
HashSet txMembers = new HashSet(4);
txMembers.add(getSystemId());
txMembers.add(vm0.invoke(TXDistributedDUnitTest.class, "getSystemId"));
vm0.invoke(TXDistributedDUnitTest.class, "setPreTXSystemIds", new Object[] {txMembers});
txMembers.add(vm2.invoke(TXDistributedDUnitTest.class, "getSystemId"));
vm0.invoke(TXDistributedDUnitTest.class, "setPostTXSystemIds", new Object[] {txMembers});
// Don't include the tx host in the batch member set(s)
Serializable vm1HostId = (Serializable) vm1.invoke(TXDistributedDUnitTest.class, "getSystemId");
vm0.invoke(TXDistributedDUnitTest.class, "setTXHostSystemId", new Object[] {vm1HostId});
// Create a TX on VM1 (such that it will ask for locks on VM0) that uses the callbacks
// to pause and give us time to start a GII process on another VM
vm1.invoke(new SerializableRunnable("testLockBatchParticipantsUpdate: slow tx (one that detects new member)") {
public void run() {
// fix for bug 38843 causes the DTLS to be created in every TX participant
assertNotNull(TXLockService.getDTLS());
assertFalse(TXLockService.getDTLS().isLockGrantor());
PausibleTX pauseTXRunnable = new PausibleTX();
pauseTXRunnable.rgnName = rgnName;
pauseTXRunnable.myCache = getCache();
pauseTXRunnable.key = "key";
pauseTXRunnable.value = "val3";
new Thread(pauseTXRunnable, "PausibleTX Thread").start();
synchronized(PausibleTX.class) {
while(!pauseTXRunnable.getIsRunning()) {
try {
PausibleTX.class.wait();
}
catch (InterruptedException ie) {
fail("Did not expect " + ie);
}
}
}
}
});
// Verify that the lock batch exists VM0 and has the size we expect
vm0.invoke(new SerializableRunnable("testLockBatchParticipantsUpdate: Verify lock batch exists on VM0 with expected size") {
public void run() {
getCache().getRegion(rgnName);
TXLockServiceImpl dtls = (TXLockServiceImpl) TXLockService.getDTLS();
assertNotNull(dtls);
assertTrue(dtls.isLockGrantor());
DLockService dLockSvc = dtls.getInternalDistributedLockService();
assertNotNull(TXDistributedDUnitTest.txHostId);
DLockBatch[] batches = dLockSvc.getGrantor().getLockBatches((InternalDistributedMember)TXDistributedDUnitTest.txHostId);
assertEquals(batches.length, 1);
TXLockBatch txLockBatch = (TXLockBatch) batches[0];
assertNotNull(txLockBatch);
assertNotNull(TXDistributedDUnitTest.preTXSystemIds);
assertTrue("Members in lock batch " + txLockBatch.getParticipants() + " not the same as " + TXDistributedDUnitTest.preTXSystemIds,
txLockBatch.getParticipants().equals(TXDistributedDUnitTest.preTXSystemIds));
}
});
// Start a GII process on VM2
vm2.invoke(new SerializableRunnable("testLockBatchParticipantsUpdate: start GII") {
public void run() {
try {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setEarlyAck(false);
factory.setDataPolicy(DataPolicy.REPLICATE);
getCache().createRegion(rgnName, factory.create());
} catch (CacheException e) {
fail("While creating region", e);
}
}
});
// Notify TX on VM1 so that it can continue
vm1.invoke(new SerializableRunnable("testLockBatchParticipantsUpdate: Notfiy VM1 TX to continue") {
public void run() {
synchronized(PausibleTX.class) {
// Notify VM1 that it should proceed to the TX send
PausibleTX.class.notifyAll();
// Wait until VM1 has sent the TX
try {
PausibleTX.class.wait();
}
catch (InterruptedException ie) {
fail("Did not expect " + ie);
}
}
}
});
// Verify that the batch on VM0 has added VM2 into the set
vm0.invoke(new SerializableRunnable("testLockBatchParticipantsUpdate: Verify lock batch contains VM2") {
public void run() {
getCache().getRegion(rgnName);
TXLockServiceImpl dtls = (TXLockServiceImpl) TXLockService.getDTLS();
assertNotNull(dtls);
assertTrue(dtls.isLockGrantor());
DLockService dLockSvc = dtls.getInternalDistributedLockService();
assertNotNull(TXDistributedDUnitTest.txHostId);
DLockBatch[] batches = dLockSvc.getGrantor().getLockBatches((InternalDistributedMember)TXDistributedDUnitTest.txHostId);
assertEquals(batches.length, 1);
TXLockBatch txLockBatch = (TXLockBatch) batches[0];
assertNotNull(txLockBatch);
assertNotNull(TXDistributedDUnitTest.preTXSystemIds);
assertTrue("Members in lock batch " + txLockBatch.getParticipants() + " not the same as " + TXDistributedDUnitTest.postTXSystemIds,
txLockBatch.getParticipants().equals(TXDistributedDUnitTest.postTXSystemIds));
}
});
// fix for bug 38843 causes the DTLS to be created in every TX participant
assertNotNull(TXLockService.getDTLS());
assertFalse(TXLockService.getDTLS().isLockGrantor());
assertEquals("val3", rgn.getEntry("key").getValue());
// Notify TX on VM1 that it can go ahead and complete the TX
vm1.invoke(new SerializableRunnable("testLockBatchParticipantsUpdate: Notfiy VM1 TX to finish") {
public void run() {
synchronized(PausibleTX.class) {
// Notify VM1 that it should finish the TX
PausibleTX.class.notifyAll();
}
}
});
rgn.destroyRegion();
}
/**
* Hitachi bug 38809: Applying an exception to a remote VM fails due to
* an IOException on a Region configured for LRU Overflow
*/
public static final String TROUBLE_KEY = "GoBoom";
public static class TXTroubleMaker implements LocalRegion.TestCallable {
// private final Region r;
public void call(LocalRegion r, Operation op, RegionEntry re) {
if (TROUBLE_KEY.equals(re.getKey())) {
throw new DiskAccessException(TROUBLE_KEY, r);
}
}
}
public static class ShutdownListener implements ResourceEventsListener {
CountDownLatch latch = new CountDownLatch(1);
@Override
public void handleEvent(ResourceEvent event, Object resource) {
if (event.equals(ResourceEvent.CACHE_REMOVE)) {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void unblockShutdown() {
this.latch.countDown();
}
}
@Ignore("Disabled for 51260")
public void DISABLED_testRemoteCommitFailure() {
try {
disconnectAllFromDS();
final String rgnName1= getUniqueName() + "_1";
final String rgnName2 = getUniqueName() + "_2";
final String diskStoreName = getUniqueName() + "_store";
Host host = Host.getHost(0);
VM origin = host.getVM(0);
VM trouble1 = host.getVM(1);
VM trouble2 = host.getVM(2);
VM noTrouble = host.getVM(3);
CacheSerializableRunnable initRegions =
new CacheSerializableRunnable("Initialize no trouble regions") {
@Override
public void run2() {
getCache().createDiskStoreFactory().setDiskDirs(getDiskDirs()).create(diskStoreName);
TXManagerImpl.ALLOW_PERSISTENT_TRANSACTIONS = true;
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
af.setScope(Scope.DISTRIBUTED_ACK);
af.setDiskStoreName(diskStoreName);
getCache().createRegion(rgnName1, af.create());
getCache().createRegion(rgnName2, af.create());
}
};
origin.invoke(initRegions);
noTrouble.invoke(initRegions);
SerializableRunnable initTroulbeRegions =
new CacheSerializableRunnable("Initialize regions that cause trouble") {
@Override
public void run2() {
GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
InternalRegionArguments ira = new InternalRegionArguments().setTestCallable(new TXTroubleMaker());
try {
getCache().createDiskStoreFactory().setDiskDirs(getDiskDirs()).create(diskStoreName);
TXManagerImpl.ALLOW_PERSISTENT_TRANSACTIONS = true;
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
af.setScope(Scope.DISTRIBUTED_ACK);
af.setDiskStoreName(diskStoreName);
gfc.createVMRegion(rgnName1, af.create(), ira);
gfc.createVMRegion(rgnName2, af.create(), ira);
gfc.getDistributedSystem().addResourceListener(new ShutdownListener());
} catch (IOException ioe) {
fail(ioe.toString());
}
catch (TimeoutException e) {
fail(e.toString());
}
catch (ClassNotFoundException e) {
fail(e.toString());
}
}
};
trouble1.invoke(initTroulbeRegions);
trouble2.invoke(initTroulbeRegions);
SerializableRunnable doTransaction =
new CacheSerializableRunnable("Run failing transaction") {
@Override
public void run2() {
Cache c = getCache();
Region r1 = c.getRegion(rgnName1);
assertNotNull(r1);
Region r2 = c.getRegion(rgnName2);
assertNotNull(r2);
CacheTransactionManager txmgr = c.getCacheTransactionManager();
txmgr.begin();
r1.put("k1", "k1");
r1.put("k2", "k2");
r1.put(TROUBLE_KEY, TROUBLE_KEY);
r2.put("k1", "k1");
r2.put("k2", "k2");
r2.put(TROUBLE_KEY, TROUBLE_KEY);
try {
txmgr.commit();
fail("Expected an tx incomplete exception");
} catch (CommitIncompleteException yay) {
String msg = yay.getMessage();
// getLogWriter().info("failing exception", yay);
// Each region on a trouble VM should be mentioned (two regions per trouble VM)
int ind=0, match=0;
while((ind = msg.indexOf(rgnName1, ind)) >= 0) {
ind++; match++;
}
assertEquals(2, match);
ind=match=0;
while((ind = msg.indexOf(rgnName2, ind)) >= 0) {
ind++; match++;
}
assertEquals(2, match);
// DiskAccessExcpetions should be mentioned four times
ind=match=0;
while((ind = msg.indexOf(DiskAccessException.class.getName(), ind)) >= 0) {
ind++; match++;
}
assertEquals(4, match);
}
}
};
ExpectedException ee = null;
try {
ee = addExpectedException(DiskAccessException.class.getName() + "|" +
CommitIncompleteException.class.getName() + "|" +
CommitReplyException.class.getName());
origin.invoke(doTransaction);
} finally {
if (ee!=null) ee.remove();
}
SerializableCallable allowCacheToShutdown = new SerializableCallable() {
@Override
public Object call() throws Exception {
GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
List<ResourceEventsListener> listeners = cache.getDistributedSystem().getResourceListeners();
for (ResourceEventsListener l : listeners) {
if (l instanceof ShutdownListener) {
ShutdownListener shutListener = (ShutdownListener) l;
shutListener.unblockShutdown();
}
}
return null;
}
};
trouble1.invoke(allowCacheToShutdown);
trouble2.invoke(allowCacheToShutdown);
// Assert proper content on failing VMs
SerializableRunnable assertTroubledContent =
new CacheSerializableRunnable("Assert partail commit data") {
@Override
public void run2() {
final Cache c = getCache();
waitForCriterion(new WaitCriterion() {
@Override
public boolean done() {
return c.getRegion(rgnName1) == null;
}
@Override
public String description() {
return null;
}
}, 30000, 1000, true);
Region r2 = c.getRegion(rgnName2);
assertNull(r2);
}
};
trouble1.invoke(assertTroubledContent);
trouble2.invoke(assertTroubledContent);
// Assert proper content on successful VMs
SerializableRunnable assertSuccessfulContent =
new CacheSerializableRunnable("Assert complete commit of data on successful VMs") {
@Override
public void run2() {
Cache c = getCache();
{
Region r1 = c.getRegion(rgnName1);
assertNotNull(r1);
assertEquals("k1", r1.getEntry("k1").getValue());
assertEquals("k2", r1.getEntry("k2").getValue());
assertEquals(TROUBLE_KEY, r1.getEntry(TROUBLE_KEY).getValue());
}
{
Region r2 = c.getRegion(rgnName2);
assertNotNull(r2);
assertEquals("k1", r2.getEntry("k1").getValue());
assertEquals("k2", r2.getEntry("k2").getValue());
assertEquals(TROUBLE_KEY, r2.getEntry(TROUBLE_KEY).getValue());
}
}
};
noTrouble.invoke(assertSuccessfulContent);
// Assert no content on originating VM
SerializableRunnable assertNoContent =
new CacheSerializableRunnable("Assert data survives on origin VM") {
@Override
public void run2() {
Cache c = getCache();
{
Region r1 = c.getRegion(rgnName1);
assertNotNull(r1);
assertNotNull(r1.getEntry("k1"));
assertNotNull(r1.getEntry("k2"));
assertNotNull(r1.getEntry(TROUBLE_KEY));
}
{
Region r2 = c.getRegion(rgnName2);
assertNotNull(r2);
assertNotNull(r2.getEntry("k1"));
assertNotNull(r2.getEntry("k2"));
assertNotNull(r2.getEntry(TROUBLE_KEY));
}
}
};
origin.invoke(assertNoContent);
} finally {
invokeInEveryVM(new SerializableCallable() {
@Override
public Object call() throws Exception {
TXManagerImpl.ALLOW_PERSISTENT_TRANSACTIONS = false;
return null;
}
});
}
}
}