blob: a19a62b10141be567f247949503fe631b33c42b2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
/**
* Test various distributed aspects of transactions, e.g. locking/reservation symantics that do not
* need multiple Region configurations. For those tests see <code>MultiVMRegionTestCase</code>.
*
*
* @since GemFire 4.0
* @see MultiVMRegionTestCase
*
*/
package org.apache.geode.cache30;
import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_NETWORK_PARTITION_DETECTION;
import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.junit.Ignore;
import org.junit.Test;
import org.apache.geode.SystemFailure;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.CacheLoader;
import org.apache.geode.cache.CacheTransactionManager;
import org.apache.geode.cache.CommitConflictException;
import org.apache.geode.cache.CommitIncompleteException;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.DiskAccessException;
import org.apache.geode.cache.LoaderHelper;
import org.apache.geode.cache.MirrorType;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.Scope;
import org.apache.geode.distributed.internal.ResourceEvent;
import org.apache.geode.distributed.internal.ResourceEventsListener;
import org.apache.geode.distributed.internal.locks.DLockBatch;
import org.apache.geode.distributed.internal.locks.DLockService;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.membership.api.MembershipManagerHelper;
import org.apache.geode.internal.cache.CommitReplyException;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalRegionFactory;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.RegionEntry;
import org.apache.geode.internal.cache.TXManagerImpl;
import org.apache.geode.internal.cache.TXState;
import org.apache.geode.internal.cache.TXStateInterface;
import org.apache.geode.internal.cache.TXStateProxyImpl;
import org.apache.geode.internal.cache.locks.TXLockBatch;
import org.apache.geode.internal.cache.locks.TXLockService;
import org.apache.geode.internal.cache.locks.TXLockServiceImpl;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.SerializableCallable;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
public class TXDistributedDUnitTest extends JUnit4CacheTestCase {
protected <K, V> RegionAttributes<K, V> getRegionAttributes() {
return getRegionAttributes(Scope.DISTRIBUTED_ACK);
}
protected <K, V> RegionAttributes<K, V> getRegionAttributes(Scope scope) {
AttributesFactory<K, V> factory = new AttributesFactory<>();
factory.setScope(scope);
if (scope.isDistributedAck()) {
factory.setEarlyAck(false);
}
return factory.create();
}
/**
* Test a remote grantor
*/
@Test
public void testRemoteGrantor() throws Exception {
IgnoredException.addIgnoredException("killing members ds");
final CacheTransactionManager txMgr = getCache().getCacheTransactionManager();
final String rgnName = getUniqueName();
Region rgn = getCache().createRegion(rgnName, getRegionAttributes());
rgn.create("key", null);
Invoke.invokeInEveryVM(new SerializableRunnable("testRemoteGrantor: initial configuration") {
@Override
public void run() {
try {
Region rgn1 = getCache().createRegion(rgnName, getRegionAttributes());
rgn1.put("key", "val0");
} catch (CacheException e) {
Assert.fail("While creating region", e);
}
}
});
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
// VM vm1 = host.getVM(1);
// VM vm2 = host.getVM(2);
vm0.invoke(new SerializableRunnable("testRemoteGrantor: remote grantor init") {
@Override
public void run() {
try {
Region rgn1 = getCache().getRegion(rgnName);
final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
txMgr2.begin();
rgn1.put("key", "val1");
txMgr2.commit();
assertNotNull(TXLockService.getDTLS());
assertTrue(TXLockService.getDTLS().isLockGrantor());
} catch (CacheException e) {
fail("While performing first transaction");
}
}
});
// fix for bug 38843 causes the DTLS to be created in every TX participant
assertNotNull(TXLockService.getDTLS());
assertFalse(TXLockService.getDTLS().isLockGrantor());
assertEquals("val1", rgn.getEntry("key").getValue());
vm0.invoke(new SerializableRunnable("Disconnect from DS, remote grantor death") {
@Override
public void run() {
try {
MembershipManagerHelper.crashDistributedSystem(getSystem());
} finally {
// Allow getCache() to re-establish a ds connection
closeCache();
}
}
});
// Make this VM the remote Grantor
txMgr.begin();
rgn.put("key", "val2");
txMgr.commit();
assertNotNull(TXLockService.getDTLS());
assertTrue(TXLockService.getDTLS().isLockGrantor());
SerializableRunnable remoteComm =
new SerializableRunnable("testRemoteGrantor: remote grantor commit") {
@Override
public void run() {
try {
Cache c = getCache();
CacheTransactionManager txMgr2 = c.getCacheTransactionManager();
Region rgn1 = c.getRegion(rgnName);
if (rgn1 == null) {
// This block should only execute on VM0
rgn1 = c.createRegion(rgnName, getRegionAttributes());
}
txMgr2.begin();
rgn1.put("key", "val3");
txMgr2.commit();
if (TXLockService.getDTLS() != null) {
assertTrue(!TXLockService.getDTLS().isLockGrantor());
}
} catch (CacheException e) {
Assert.fail("While creating region", e);
}
}
};
Invoke.invokeInEveryVM(remoteComm);
// vm1.invoke(remoteComm);
// vm2.invoke(remoteComm);
assertNotNull(TXLockService.getDTLS());
assertTrue(TXLockService.getDTLS().isLockGrantor());
assertEquals("val3", rgn.getEntry("key").getValue());
rgn.destroyRegion();
}
/**
* Test the internal callbacks used for what else... testing
*/
@Test
public void testInternalCallbacks() throws Exception {
final CacheTransactionManager txMgr = getCache().getCacheTransactionManager();
final String rgnName1 = getUniqueName() + "_1";
final String rgnName2 = getUniqueName() + "_2";
final String rgnName3 = getUniqueName() + "_3";
Region rgn1 = getCache().createRegion(rgnName1, getRegionAttributes());
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
SerializableRunnable createRgn =
new SerializableRunnable("testInternalCallbacks: initial configuration") {
@Override
public void run() {
try {
Region rgn1a = getCache().createRegion(rgnName1, getRegionAttributes());
Region rgn2 = getCache().createRegion(rgnName2, getRegionAttributes());
Region rgn3 =
getCache().createRegion(rgnName3, getRegionAttributes(Scope.DISTRIBUTED_NO_ACK));
rgn1a.create("key", null);
rgn2.create("key", null);
rgn3.create("key", null);
} catch (CacheException e) {
Assert.fail("While creating region", e);
}
}
};
vm0.invoke(createRgn);
vm1.invoke(createRgn);
// Standard commit check
txMgr.begin();
rgn1.put("key", "value0");
txMgr.commit();
SerializableRunnable checkRgn1 =
new SerializableRunnable("testInternalCallbacks: check rgn1 valus") {
@Override
public void run() {
Region rgn1a = getCache().getRegion(rgnName1);
assertNotNull(rgn1a);
assertEquals("value0", rgn1a.getEntry("key").getValue());
}
};
vm0.invoke(checkRgn1);
vm1.invoke(checkRgn1);
{
final byte[] cbSensors = {0, 0, 0, 0, 0, 0, 0, 0, 0};
txMgr.begin();
((TXStateProxyImpl) ((TXManagerImpl) txMgr).getTXState()).forceLocalBootstrap();
setInternalCallbacks(((TXManagerImpl) txMgr).getTXState(), cbSensors);
rgn1.put("key", "value1");
txMgr.commit();
for (int i = cbSensors.length - 3; i >= 0; --i) {
assertEquals("Internal callback " + i + " was not called the expected number of times!",
(byte) 1, cbSensors[i]);
}
for (int i = cbSensors.length - 1; i > cbSensors.length - 3; --i) {
assertEquals(
"Internal \"during\" callback " + i + " invoked an unexpected number of times!",
(byte) 2, cbSensors[i]);
}
}
SerializableRunnable checkRgn1Again =
new SerializableRunnable("testInternalCallbacks: validate remote values") {
@Override
public void run() {
Region rgn1a = getCache().getRegion(rgnName1);
assertNotNull(rgn1a);
assertEquals("value1", rgn1a.getEntry("key").getValue());
}
};
vm0.invoke(checkRgn1Again);
vm1.invoke(checkRgn1Again);
// Try 2 regions
Region rgn2 = getCache().createRegion(rgnName2, getRegionAttributes());
txMgr.begin();
rgn1.put("key", "value2");
rgn2.put("key", "value2");
txMgr.commit();
SerializableRunnable checkRgn12 =
new SerializableRunnable("testInternalCallbacks: check rgn1 valus") {
@Override
public void run() {
Region rgn1a = getCache().getRegion(rgnName1);
assertNotNull(rgn1a);
assertEquals("value2", rgn1a.getEntry("key").getValue());
Region rgn2a = getCache().getRegion(rgnName2);
assertNotNull(rgn2a);
assertEquals("value2", rgn2a.getEntry("key").getValue());
}
};
vm0.invoke(checkRgn12);
vm1.invoke(checkRgn12);
{
final byte[] cbSensors = {0, 0, 0, 0, 0, 0, 0, 0, 0};
txMgr.begin();
((TXStateProxyImpl) ((TXManagerImpl) txMgr).getTXState()).forceLocalBootstrap();
setInternalCallbacks(((TXManagerImpl) txMgr).getTXState(), cbSensors);
rgn1.put("key", "value3");
rgn2.put("key", "value3");
txMgr.commit();
for (int i = cbSensors.length - 3; i >= 0; i--) {
assertEquals("Internal callback " + i + " was not called the expected number of times!",
(byte) 1, cbSensors[i]);
}
for (int i = cbSensors.length - 1; i > cbSensors.length - 3; --i) {
assertEquals(
"Internal \"during\" callback " + i + " invoked an unexpected number of times!",
(byte) 2, cbSensors[i]);
}
}
SerializableRunnable checkRgn12Again =
new SerializableRunnable("testInternalCallbacks: validate both regions remote values") {
@Override
public void run() {
Region rgn1a = getCache().getRegion(rgnName1);
assertNotNull(rgn1a);
assertEquals("value3", rgn1a.getEntry("key").getValue());
Region rgn2a = getCache().getRegion(rgnName2);
assertNotNull(rgn2a);
assertEquals("value3", rgn2a.getEntry("key").getValue());
}
};
vm0.invoke(checkRgn12Again);
vm1.invoke(checkRgn12Again);
// Try a third region (D_NO_ACK)
Region rgn3 = getCache().createRegion(rgnName3, getRegionAttributes(Scope.DISTRIBUTED_NO_ACK));
txMgr.begin();
rgn1.put("key", "value4");
rgn2.put("key", "value4");
rgn3.put("key", "value4");
txMgr.commit();
SerializableRunnable checkRgn123 =
new SerializableRunnable("testInternalCallbacks: check rgn1 valus") {
@Override
public void run() {
Region rgn1a = getCache().getRegion(rgnName1);
assertNotNull(rgn1a);
assertEquals("value4", rgn1a.getEntry("key").getValue());
Region rgn2a = getCache().getRegion(rgnName2);
assertNotNull(rgn2a);
assertEquals("value4", rgn2a.getEntry("key").getValue());
Region rgn3a = getCache().getRegion(rgnName3);
assertNotNull(rgn3a);
assertEquals("value4", rgn3a.getEntry("key").getValue());
}
};
vm0.invoke(checkRgn123);
vm1.invoke(checkRgn123);
{
final byte[] cbSensors = {0, 0, 0, 0, 0, 0, 0, 0, 0};
txMgr.begin();
((TXStateProxyImpl) ((TXManagerImpl) txMgr).getTXState()).forceLocalBootstrap();
setInternalCallbacks(((TXManagerImpl) txMgr).getTXState(), cbSensors);
rgn1.put("key", "value5");
rgn2.put("key", "value5");
rgn3.put("key", "value5");
txMgr.commit();
for (int i = cbSensors.length - 3; i >= 0; i--) {
assertEquals("Internal callback " + i + " was not called the expected number of times!",
(byte) 1, cbSensors[i]);
}
for (int i = cbSensors.length - 1; i > cbSensors.length - 3; --i) {
assertEquals(
"Internal \"during\" callback " + i + " invoked an unexpected number of times!",
(byte) 2, cbSensors[i]);
}
}
SerializableRunnable checkRgn123Again =
new SerializableRunnable("testInternalCallbacks: validate both regions remote values") {
@Override
public void run() {
Region rgn1a = getCache().getRegion(rgnName1);
assertNotNull(rgn1a);
assertEquals("value5", rgn1a.getEntry("key").getValue());
Region rgn2a = getCache().getRegion(rgnName2);
assertNotNull(rgn2a);
assertEquals("value5", rgn2a.getEntry("key").getValue());
Region rgn3a = getCache().getRegion(rgnName3);
assertNotNull(rgn3a);
assertEquals("value5", rgn3a.getEntry("key").getValue());
}
};
vm0.invoke(checkRgn123Again);
vm1.invoke(checkRgn123Again);
rgn1.destroyRegion();
rgn2.destroyRegion();
}
static void setInternalCallbacks(TXStateInterface txp, final byte[] cbSensors) {
((TXStateProxyImpl) txp).forceLocalBootstrap();
TXState tx = (TXState) ((TXStateProxyImpl) txp).getRealDeal(null, null);
assertEquals(9, cbSensors.length);
tx.setAfterReservation(() -> cbSensors[0]++);
tx.setAfterConflictCheck(() -> cbSensors[1]++);
tx.setAfterApplyChanges(() -> cbSensors[2]++);
tx.setAfterReleaseLocalLocks(() -> cbSensors[3]++);
tx.setAfterIndividualSend(() -> cbSensors[4]++);
tx.setAfterIndividualCommitProcess(() -> cbSensors[5]++);
tx.setAfterSend(() -> cbSensors[6]++);
tx.setDuringIndividualSend(() -> cbSensors[7]++);
tx.setDuringIndividualCommitProcess(() -> cbSensors[8]++);
}
/**
* Test distributed ack transactions that consist only of data from loaded values
*/
@Test
public void testDACKLoadedMessage() throws Exception {
final CacheTransactionManager txMgr = getCache().getCacheTransactionManager();
final String rgnName = getUniqueName();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setEarlyAck(false);
factory.setCacheLoader(new CacheLoader() {
@Override
public Object load(LoaderHelper helper) {
return "val" + helper.getArgument();
}
@Override
public void close() {}
});
Region rgn = getCache().createRegion(rgnName, factory.create());
Invoke.invokeInEveryVM(new SerializableRunnable("testDACKLoadedMessage: intial configuration") {
@Override
public void run() {
try {
AttributesFactory factory2 = new AttributesFactory();
factory2.setScope(Scope.DISTRIBUTED_ACK);
factory2.setEarlyAck(false);
// factory.setDataPolicy(DataPolicy.REPLICATE);
factory2.setMirrorType(MirrorType.KEYS);
getCache().createRegion(rgnName, factory2.create());
} catch (CacheException e) {
Assert.fail("While creating region", e);
}
}
});
// Confirm the standard case
txMgr.begin();
rgn.put("key1", "val1");
txMgr.commit();
assertEquals("val1", rgn.getEntry("key1").getValue());
Invoke
.invokeInEveryVM(new SerializableRunnable("testDACKLoadedMessage: confirm standard case") {
@Override
public void run() {
Region rgn1 = getCache().getRegion(rgnName);
assertEquals("val1", rgn1.getEntry("key1").getValue());
}
});
// Confirm loaded value case
txMgr.begin();
rgn.get("key2", 2);
txMgr.commit();
assertEquals("val2", rgn.getEntry("key2").getValue());
Invoke
.invokeInEveryVM(new SerializableRunnable("testDACKLoadedMessage: confirm standard case") {
@Override
public void run() {
Region rgn1 = getCache().getRegion(rgnName);
assertEquals("val2", rgn1.getEntry("key2").getValue());
}
});
// This should use the ack w/ the lockid
txMgr.begin();
rgn.put("key3", "val3");
rgn.get("key4", 4);
txMgr.commit();
Invoke
.invokeInEveryVM(new SerializableRunnable("testDACKLoadedMessage: confirm standard case") {
@Override
public void run() {
Region rgn1 = getCache().getRegion(rgnName);
assertEquals("val3", rgn1.getEntry("key3").getValue());
assertEquals("val4", rgn1.getEntry("key4").getValue());
}
});
}
@Override
public Properties getDistributedSystemProperties() {
Properties p = super.getDistributedSystemProperties();
p.put(LOG_LEVEL, LogWriterUtils.getDUnitLogLevel());
p.put(ENABLE_NETWORK_PARTITION_DETECTION, "false");
return p;
}
@Test
public void testHighAvailabilityFeatures() throws Exception {
IgnoredException.addIgnoredException("DistributedSystemDisconnectedException");
final String rgnName = getUniqueName();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setEarlyAck(false);
Region rgn = getCache().createRegion(rgnName, factory.create());
Invoke.invokeInEveryVM(
new SerializableRunnable("testHighAvailabilityFeatures: intial region configuration") {
@Override
public void run() {
try {
AttributesFactory factory2 = new AttributesFactory();
factory2.setScope(Scope.DISTRIBUTED_ACK);
factory2.setEarlyAck(false);
factory2.setDataPolicy(DataPolicy.REPLICATE);
getCache().createRegion(rgnName, factory2.create());
} catch (CacheException e) {
Assert.fail("While creating region", e);
}
}
});
// create entries
rgn.put("key0", "val0_0");
rgn.put("key1", "val1_0");
Host host = Host.getHost(0);
// This test assumes that there are at least three VMs; the origin and two recipients
assertTrue(host.getVMCount() >= 3);
final VM originVM = host.getVM(0);
// Test that there is no commit after a partial commit message
// send (only sent to a minority of the recipients)
originVM.invoke(new SerializableRunnable("Flakey DuringIndividualSend Transaction") {
@Override
public void run() {
final Region rgn1 = getCache().getRegion(rgnName);
assertNotNull(rgn1);
try {
final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
final CacheTransactionManager txMgrImpl = txMgr2;
txMgr2.begin();
// 1. setup an internal callback on originVM that will call
// disconnectFromDS() on the 2nd duringIndividualSend
// call.
((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState()).forceLocalBootstrap();
TXState txState = (TXState) ((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState())
.getRealDeal(null, null);
txState.setDuringIndividualSend(new Runnable() {
private int numCalled = 0;
@Override
public synchronized void run() {
++numCalled;
rgn1.getCache().getLogger()
.info("setDuringIndividualSend Runnable called " + numCalled + " times");
if (numCalled > 1) {
MembershipManagerHelper.crashDistributedSystem(getSystem());
}
}
});
rgn1.put("key0", "val0_1");
rgn1.put("key1", "val1_1");
// 2. commit a transaction in originVM, it will disconnect from the DS
txMgr2.commit();
} catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
} catch (Throwable e) {
rgn1.getCache().getLogger().warning("Ignoring Exception", e);
} finally {
// Allow this VM to re-connect to the DS upon getCache() call
closeCache();
}
}
});
// 3. verify on all VMs that the transaction was not committed
final SerializableRunnable noChangeValidator =
new SerializableRunnable("testHighAvailabilityFeatures: validate no change in Region") {
@Override
public void run() {
Region rgn1 = getCache().getRegion(rgnName);
if (rgn1 == null) {
// Expect a null region from originVM
try {
AttributesFactory factory2 = new AttributesFactory();
factory2.setScope(Scope.DISTRIBUTED_ACK);
factory2.setEarlyAck(false);
factory2.setDataPolicy(DataPolicy.REPLICATE);
rgn1 = getCache().createRegion(rgnName, factory2.create());
} catch (CacheException e) {
Assert.fail("While creating region", e);
}
}
Region.Entry re = rgn1.getEntry("key0");
assertNotNull(re);
assertEquals("val0_0", re.getValue());
re = rgn1.getEntry("key1");
assertNotNull(re);
assertEquals("val1_0", re.getValue());
}
};
Invoke.invokeInEveryVM(noChangeValidator);
// Test that there is no commit after sending to all recipients
// but prior to sending the "commit process" message
originVM.invoke(new SerializableRunnable("Flakey AfterIndividualSend Transaction") {
@Override
public void run() {
final Region rgn1 = getCache().getRegion(rgnName);
assertNotNull(rgn1);
try {
final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
final CacheTransactionManager txMgrImpl = txMgr2;
txMgr2.begin();
// 1. setup an internal callback on originVM that will call
// disconnectFromDS() on AfterIndividualSend
((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState()).forceLocalBootstrap();
TXState txState = (TXState) ((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState())
.getRealDeal(null, null);
txState.setAfterIndividualSend(new Runnable() {
@Override
public synchronized void run() {
MembershipManagerHelper.crashDistributedSystem(getSystem());
}
});
rgn1.put("key0", "val0_2");
rgn1.put("key1", "val1_2");
// 2. commit a transaction in originVM, it will disconnect from the DS
txMgr2.commit();
} catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
} catch (Throwable e) {
rgn1.getCache().getLogger().warning("Ignoring Exception", e);
} finally {
// Allow this VM to re-connect to the DS upon getCache() call
closeCache();
}
}
});
// 3. verify on all VMs, including the origin, that the transaction was not committed
Invoke.invokeInEveryVM(noChangeValidator);
// Test commit success upon a single commit process message received.
originVM.invoke(new SerializableRunnable("Flakey DuringIndividualCommitProcess Transaction") {
@Override
public void run() {
final Region rgn1 = getCache().getRegion(rgnName);
assertNotNull(rgn1);
try {
final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
final CacheTransactionManager txMgrImpl = txMgr2;
txMgr2.begin();
((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState()).forceLocalBootstrap();
TXState txState = (TXState) ((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState())
.getRealDeal(null, null);
// 1. setup an internal callback on originVM that will call
// disconnectFromDS() on the 2nd internalDuringIndividualCommitProcess
// call.
txState.setDuringIndividualCommitProcess(new Runnable() {
private int numCalled = 0;
@Override
public synchronized void run() {
++numCalled;
rgn1.getCache().getLogger()
.info("setDuringIndividualCommitProcess Runnable called " + numCalled + " times");
if (numCalled > 1) {
MembershipManagerHelper.crashDistributedSystem(getSystem());
}
}
});
rgn1.put("key0", "val0_3");
rgn1.put("key1", "val1_3");
// 2. commit a transaction in originVM, it will disconnect from the DS
txMgr2.commit();
} catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
} catch (Throwable e) {
rgn1.getCache().getLogger().warning("Ignoring Exception", e);
} finally {
// Allow this VM to re-connect to the DS upon getCache() call
closeCache();
}
}
});
// 3. verify on all VMs that the transaction was committed (including the orgin, due to GII)
SerializableRunnable nonSoloChangeValidator1 = new SerializableRunnable(
"testHighAvailabilityFeatures: validate v1 non-solo Region changes") {
@Override
public void run() {
Region rgn1 = getCache().getRegion(rgnName);
if (rgn1 == null) {
// Expect a null region from originVM
try {
AttributesFactory factory2 = new AttributesFactory();
factory2.setScope(Scope.DISTRIBUTED_ACK);
factory2.setEarlyAck(false);
factory2.setDataPolicy(DataPolicy.REPLICATE);
rgn1 = getCache().createRegion(rgnName, factory2.create());
} catch (CacheException e) {
Assert.fail("While creating region", e);
}
}
long giveUp = System.currentTimeMillis() + 10000;
while (giveUp > System.currentTimeMillis()) {
try {
Region.Entry re = rgn1.getEntry("key0");
assertNotNull(re);
assertEquals("val0_3", re.getValue());
re = rgn1.getEntry("key1");
assertNotNull(re);
assertEquals("val1_3", re.getValue());
break;
} catch (AssertionError e) {
if (giveUp > System.currentTimeMillis()) {
throw e;
}
}
}
}
};
Invoke.invokeInEveryVM(nonSoloChangeValidator1);
// Verify successful solo region commit after duringIndividualSend
// (same as afterIndividualSend).
// Create a region that only exists on the origin and another VM
final String soloRegionName = getUniqueName() + "_solo";
SerializableRunnable createSoloRegion =
new SerializableRunnable("testHighAvailabilityFeatures: solo region configuration") {
@Override
public void run() {
try {
AttributesFactory factory2 = new AttributesFactory();
factory2.setScope(Scope.DISTRIBUTED_ACK);
factory2.setEarlyAck(false);
factory2.setDataPolicy(DataPolicy.REPLICATE);
Region rgn1 = getCache().createRegion(soloRegionName, factory2.create());
rgn1.put("soloKey0", "soloVal0_0");
rgn1.put("soloKey1", "soloVal1_0");
} catch (CacheException e) {
Assert.fail("While creating region", e);
}
}
};
final VM soloRegionVM = host.getVM(1);
originVM.invoke(createSoloRegion);
soloRegionVM.invoke(createSoloRegion);
originVM
.invoke(new SerializableRunnable("Flakey solo region DuringIndividualSend Transaction") {
@Override
public void run() {
final Region soloRgn = getCache().getRegion(soloRegionName);
assertNotNull(soloRgn);
try {
final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
final CacheTransactionManager txMgrImpl = txMgr2;
txMgr2.begin();
// 1. setup an internal callback on originVM that will call
// disconnectFromDS() on the 2nd duringIndividualSend
// call.
((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState()).forceLocalBootstrap();
TXState txState =
(TXState) ((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState())
.getRealDeal(null, null);
txState.setDuringIndividualSend(new Runnable() {
private int numCalled = 0;
@Override
public synchronized void run() {
++numCalled;
soloRgn.getCache().getLogger()
.info("setDuringIndividualSend Runnable called " + numCalled + " times");
if (numCalled > 1) {
MembershipManagerHelper.crashDistributedSystem(getSystem());
}
}
});
soloRgn.put("soloKey0", "soloVal0_1");
soloRgn.put("soloKey1", "soloVal1_1");
// 2. commit a transaction in originVM, it will disconnect from the DS
txMgr2.commit();
} catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
} catch (Throwable e) {
soloRgn.getCache().getLogger().warning("Ignoring Exception", e);
} finally {
// Allow this VM to re-connect to the DS upon getCache() call
closeCache();
}
}
});
// 3. verify on the soloRegionVM that the transaction was committed
final SerializableRunnable soloRegionCommitValidator1 = new SerializableRunnable(
"testHighAvailabilityFeatures: validate successful v1 commit in solo Region") {
@Override
public void run() {
Region soloRgn = getCache().getRegion(soloRegionName);
if (soloRgn == null) {
// Expect a null region from originVM
try {
AttributesFactory factory2 = new AttributesFactory();
factory2.setScope(Scope.DISTRIBUTED_ACK);
factory2.setEarlyAck(false);
factory2.setDataPolicy(DataPolicy.REPLICATE);
soloRgn = getCache().createRegion(soloRegionName, factory2.create());
} catch (CacheException e) {
Assert.fail("While creating region ", e);
}
}
Region.Entry re = soloRgn.getEntry("soloKey0");
assertNotNull(re);
assertEquals("soloVal0_1", re.getValue());
re = soloRgn.getEntry("soloKey1");
assertNotNull(re);
assertEquals("soloVal1_1", re.getValue());
}
};
originVM.invoke(soloRegionCommitValidator1);
soloRegionVM.invoke(soloRegionCommitValidator1);
// verify no change in nonSolo region, re-establish region in originVM
Invoke.invokeInEveryVM(nonSoloChangeValidator1);
// Verify no commit for failed send (afterIndividualSend) for solo
// Region combined with non-solo Region
originVM.invoke(new SerializableRunnable(
"Flakey mixed (solo+non-solo) region DuringIndividualSend Transaction") {
@Override
public void run() {
final Region rgn1 = getCache().getRegion(rgnName);
assertNotNull(rgn1);
final Region soloRgn = getCache().getRegion(soloRegionName);
assertNotNull(soloRgn);
try {
final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
final CacheTransactionManager txMgrImpl = txMgr2;
txMgr2.begin();
// 1. setup an internal callback on originVM that will call
// disconnectFromDS() on the afterIndividualSend
// call.
((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState()).forceLocalBootstrap();
TXState txState = (TXState) ((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState())
.getRealDeal(null, null);
txState.setAfterIndividualSend(new Runnable() {
@Override
public synchronized void run() {
MembershipManagerHelper.crashDistributedSystem(getSystem());
}
});
rgn1.put("key0", "val0_4");
rgn1.put("key1", "val1_4");
soloRgn.put("soloKey0", "soloVal0_2");
soloRgn.put("soloKey1", "soloVal1_2");
// 2. commit a transaction in originVM, it will disconnect from the DS
txMgr2.commit();
} catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
} catch (Throwable e) {
rgn1.getCache().getLogger().warning("Ignoring Exception", e);
} finally {
// Allow this VM to re-connect to the DS upon getCache() call
closeCache();
}
}
});
// Origin and Solo Region VM should be the same as last validation
originVM.invoke(soloRegionCommitValidator1);
soloRegionVM.invoke(soloRegionCommitValidator1);
Invoke.invokeInEveryVM(nonSoloChangeValidator1);
// Verify commit after sending a single
// (duringIndividualCommitProcess) commit process for solo Region
// combined with non-solo Region
originVM.invoke(new SerializableRunnable(
"Flakey mixed (solo+non-solo) region DuringIndividualCommitProcess Transaction") {
@Override
public void run() {
final Region rgn1 = getCache().getRegion(rgnName);
assertNotNull(rgn1);
final Region soloRgn = getCache().getRegion(soloRegionName);
assertNotNull(soloRgn);
try {
final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
final CacheTransactionManager txMgrImpl = txMgr2;
txMgr2.begin();
// 1. setup an internal callback on originVM that will call
// disconnectFromDS() on the afterIndividualSend
// call.
((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState()).forceLocalBootstrap();
TXState txState = (TXState) ((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState())
.getRealDeal(null, null);
txState.setAfterIndividualSend(new Runnable() {
private int numCalled = 0;
@Override
public synchronized void run() {
++numCalled;
rgn1.getCache().getLogger()
.info("setDuringIndividualCommitProcess Runnable called " + numCalled + " times");
if (numCalled > 1) {
MembershipManagerHelper.crashDistributedSystem(getSystem());
}
}
});
rgn1.put("key0", "val0_5");
rgn1.put("key1", "val1_5");
soloRgn.put("soloKey0", "soloVal0_3");
soloRgn.put("soloKey1", "soloVal1_3");
// 2. commit a transaction in originVM, it will disconnect from the DS
txMgr2.commit();
} catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
} catch (Throwable e) {
rgn1.getCache().getLogger().warning("Ignoring Exception", e);
} finally {
// Allow this VM to re-connect to the DS upon getCache() call
closeCache();
}
}
});
final SerializableRunnable soloRegionCommitValidator2 = new SerializableRunnable(
"testHighAvailabilityFeatures: validate successful v2 commit in solo Region") {
@Override
public void run() {
Region soloRgn = getCache().getRegion(soloRegionName);
if (soloRgn == null) {
// Expect a null region from originVM
try {
AttributesFactory factory2 = new AttributesFactory();
factory2.setScope(Scope.DISTRIBUTED_ACK);
factory2.setEarlyAck(false);
factory2.setDataPolicy(DataPolicy.REPLICATE);
soloRgn = getCache().createRegion(soloRegionName, factory2.create());
} catch (CacheException e) {
Assert.fail("While creating region ", e);
}
}
Region.Entry re = soloRgn.getEntry("soloKey0");
assertNotNull(re);
assertEquals("soloVal0_3", re.getValue());
re = soloRgn.getEntry("soloKey1");
assertNotNull(re);
assertEquals("soloVal1_3", re.getValue());
}
};
originVM.invoke(soloRegionCommitValidator2);
soloRegionVM.invoke(soloRegionCommitValidator2);
SerializableRunnable nonSoloChangeValidator2 = new SerializableRunnable(
"testHighAvailabilityFeatures: validate v2 non-solo Region changes") {
@Override
public void run() {
Region rgn1 = getCache().getRegion(rgnName);
if (rgn1 == null) {
// Expect a null region from originVM
try {
AttributesFactory factory2 = new AttributesFactory();
factory2.setScope(Scope.DISTRIBUTED_ACK);
factory2.setEarlyAck(false);
factory2.setDataPolicy(DataPolicy.REPLICATE);
rgn1 = getCache().createRegion(rgnName, factory2.create());
} catch (CacheException e) {
Assert.fail("While creating region", e);
}
}
Region.Entry re = rgn1.getEntry("key0");
assertNotNull(re);
assertEquals("val0_5", re.getValue());
re = rgn1.getEntry("key1");
assertNotNull(re);
assertEquals("val1_5", re.getValue());
}
};
Invoke.invokeInEveryVM(nonSoloChangeValidator2);
}
/**
* A class used in testLockBatchParticipantsUpdate to pause a transaction in the afterResrvation
* and afterSend states.
*/
public static class PausibleTX implements Runnable {
public boolean isRunning = false;
public String rgnName = null;
public Cache myCache = null;
public Object key = null;
public Object value = null;
public boolean getIsRunning() {
return isRunning;
}
@Override
public void run() {
Region rgn = myCache.getRegion(rgnName);
final CacheTransactionManager txMgr = myCache.getCacheTransactionManager();
txMgr.begin();
((TXStateProxyImpl) ((TXManagerImpl) txMgr).getTXState()).forceLocalBootstrap();
TXState txState = (TXState) ((TXStateProxyImpl) ((TXManagerImpl) txMgr).getTXState())
.getRealDeal(null, null);
txState.setAfterReservation(() -> {
try {
synchronized (PausibleTX.class) {
isRunning = true;
// Notify the thread that created this, that we are ready
PausibleTX.class.notifyAll();
// Wait for the controller to start a GII and let us proceed
PausibleTX.class.wait();
}
} catch (InterruptedException ie) {
// PausibleTX.this.myCache.getLogger().info("Why was I interrupted? " + ie);
fail("interrupted");
}
});
txState.setAfterSend(() -> {
try {
synchronized (PausibleTX.class) {
// Notify the controller that we have sent the TX data (and the
// update)
PausibleTX.class.notifyAll();
// Wait until the controller has determined in fact the update
// took place
PausibleTX.class.wait();
}
} catch (InterruptedException ie) {
// PausibleTX.this.myCache.getLogger().info("Why was I interrupted? " + ie);
fail("interrupted");
}
});
try {
rgn.put(key, value);
txMgr.commit();
} catch (CommitConflictException cce) {
fail(
"Did not expect commit conflict exception when sending updates to new members in PausibleTX"
+ cce);
// } catch (CacheException ce) {
// fail("Did not expect cache exception when sending updates to new members in PausibleTX" +
// ce);
}
}
}
/**
* Returns the GemFire system ID of the VM on which this method is run
*/
public static Serializable getSystemId() {
Serializable ret = null;
if (getSystemStatic() != null) {
ret = getSystemStatic().getDistributionManager().getId();
}
return ret;
}
static HashSet preTXSystemIds;
public static void setPreTXSystemIds(HashSet ids) {
TXDistributedDUnitTest.preTXSystemIds = ids;
}
static HashSet postTXSystemIds;
public static void setPostTXSystemIds(HashSet ids) {
TXDistributedDUnitTest.postTXSystemIds = ids;
}
static Serializable txHostId;
public static void setTXHostSystemId(Serializable id) {
TXDistributedDUnitTest.txHostId = id;
}
/**
* Test update of lock batch participants (needed when new members are discovered between a
* commit's locking phase and the applicatoin of the Region's data. See bug 32999
*/
@Test
public void testLockBatchParticipantsUpdate() throws Exception {
final String rgnName = getUniqueName();
Region rgn = getCache().createRegion(rgnName, getRegionAttributes());
rgn.create("key", null);
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
SerializableRunnable initRegions =
new SerializableRunnable("testLockBatchParticipantsUpdate: initial configuration") {
@Override
public void run() {
try {
Region rgn1 = getCache().createRegion(rgnName, getRegionAttributes());
rgn1.create("key", null);
} catch (CacheException e) {
Assert.fail("While creating region", e);
}
}
};
vm0.invoke(initRegions);
vm1.invoke(initRegions);
rgn.put("key", "val1");
// Connect vm2 also since it may have been shutdown when logPerTest
// is turned on
vm2.invoke(new SerializableRunnable("connect vm2 if not connected") {
@Override
public void run() {
getCache();
}
});
// Make VM0 the Grantor
vm0.invoke(new SerializableRunnable("testLockBatchParticipantsUpdate: remote grantor init") {
@Override
public void run() {
try {
Region rgn1 = getCache().getRegion(rgnName);
final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
assertEquals("val1", rgn1.getEntry("key").getValue());
txMgr2.begin();
rgn1.put("key", "val2");
txMgr2.commit();
assertNotNull(TXLockService.getDTLS());
assertTrue(TXLockService.getDTLS().isLockGrantor());
} catch (CacheException e) {
fail("While performing first transaction");
}
}
});
// fix for bug 38843 causes the DTLS to be created in every TX participant
assertNotNull(TXLockService.getDTLS());
assertFalse(TXLockService.getDTLS().isLockGrantor());
assertEquals("val2", rgn.getEntry("key").getValue());
// Build sets of System Ids and set them up on VM0 for future batch member checks
HashSet txMembers = new HashSet(4);
txMembers.add(getSystemId());
txMembers.add(vm0.invoke(TXDistributedDUnitTest::getSystemId));
vm0.invoke(() -> TXDistributedDUnitTest.setPreTXSystemIds(txMembers));
txMembers.add(vm2.invoke(TXDistributedDUnitTest::getSystemId));
vm0.invoke(() -> TXDistributedDUnitTest.setPostTXSystemIds(txMembers));
// Don't include the tx host in the batch member set(s)
Serializable vm1HostId = vm1.invoke(TXDistributedDUnitTest::getSystemId);
vm0.invoke(() -> TXDistributedDUnitTest.setTXHostSystemId(vm1HostId));
// Create a TX on VM1 (such that it will ask for locks on VM0) that uses the callbacks
// to pause and give us time to start a GII process on another VM
vm1.invoke(new SerializableRunnable(
"testLockBatchParticipantsUpdate: slow tx (one that detects new member)") {
@Override
public void run() {
// fix for bug 38843 causes the DTLS to be created in every TX participant
assertNotNull(TXLockService.getDTLS());
assertFalse(TXLockService.getDTLS().isLockGrantor());
PausibleTX pauseTXRunnable = new PausibleTX();
pauseTXRunnable.rgnName = rgnName;
pauseTXRunnable.myCache = getCache();
pauseTXRunnable.key = "key";
pauseTXRunnable.value = "val3";
new Thread(pauseTXRunnable, "PausibleTX Thread").start();
synchronized (PausibleTX.class) {
while (!pauseTXRunnable.getIsRunning()) {
try {
PausibleTX.class.wait();
} catch (InterruptedException ie) {
fail("Did not expect " + ie);
}
}
}
}
});
// Verify that the lock batch exists VM0 and has the size we expect
vm0.invoke(new SerializableRunnable(
"testLockBatchParticipantsUpdate: Verify lock batch exists on VM0 with expected size") {
@Override
public void run() {
getCache().getRegion(rgnName);
TXLockServiceImpl dtls = (TXLockServiceImpl) TXLockService.getDTLS();
assertNotNull(dtls);
assertTrue(dtls.isLockGrantor());
DLockService dLockSvc = dtls.getInternalDistributedLockService();
assertNotNull(TXDistributedDUnitTest.txHostId);
DLockBatch[] batches = dLockSvc.getGrantor()
.getLockBatches((InternalDistributedMember) TXDistributedDUnitTest.txHostId);
assertEquals(batches.length, 1);
TXLockBatch txLockBatch = (TXLockBatch) batches[0];
assertNotNull(txLockBatch);
assertNotNull(TXDistributedDUnitTest.preTXSystemIds);
assertTrue(
"Members in lock batch " + txLockBatch.getParticipants() + " not the same as "
+ TXDistributedDUnitTest.preTXSystemIds,
txLockBatch.getParticipants().equals(TXDistributedDUnitTest.preTXSystemIds));
}
});
// Start a GII process on VM2
vm2.invoke(new SerializableRunnable("testLockBatchParticipantsUpdate: start GII") {
@Override
public void run() {
try {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setEarlyAck(false);
factory.setDataPolicy(DataPolicy.REPLICATE);
getCache().createRegion(rgnName, factory.create());
} catch (CacheException e) {
Assert.fail("While creating region", e);
}
}
});
// Notify TX on VM1 so that it can continue
vm1.invoke(
new SerializableRunnable("testLockBatchParticipantsUpdate: Notfiy VM1 TX to continue") {
@Override
public void run() {
synchronized (PausibleTX.class) {
// Notify VM1 that it should proceed to the TX send
PausibleTX.class.notifyAll();
// Wait until VM1 has sent the TX
try {
PausibleTX.class.wait();
} catch (InterruptedException ie) {
fail("Did not expect " + ie);
}
}
}
});
// Verify that the batch on VM0 has added VM2 into the set
vm0.invoke(new SerializableRunnable(
"testLockBatchParticipantsUpdate: Verify lock batch contains VM2") {
@Override
public void run() {
getCache().getRegion(rgnName);
TXLockServiceImpl dtls = (TXLockServiceImpl) TXLockService.getDTLS();
assertNotNull(dtls);
assertTrue(dtls.isLockGrantor());
DLockService dLockSvc = dtls.getInternalDistributedLockService();
assertNotNull(TXDistributedDUnitTest.txHostId);
DLockBatch[] batches = dLockSvc.getGrantor()
.getLockBatches((InternalDistributedMember) TXDistributedDUnitTest.txHostId);
assertEquals(batches.length, 1);
TXLockBatch txLockBatch = (TXLockBatch) batches[0];
assertNotNull(txLockBatch);
assertNotNull(TXDistributedDUnitTest.preTXSystemIds);
assertTrue(
"Members in lock batch " + txLockBatch.getParticipants() + " not the same as "
+ TXDistributedDUnitTest.postTXSystemIds,
txLockBatch.getParticipants().equals(TXDistributedDUnitTest.postTXSystemIds));
}
});
// fix for bug 38843 causes the DTLS to be created in every TX participant
assertNotNull(TXLockService.getDTLS());
assertFalse(TXLockService.getDTLS().isLockGrantor());
assertEquals("val3", rgn.getEntry("key").getValue());
// Notify TX on VM1 that it can go ahead and complete the TX
vm1.invoke(
new SerializableRunnable("testLockBatchParticipantsUpdate: Notfiy VM1 TX to finish") {
@Override
public void run() {
synchronized (PausibleTX.class) {
// Notify VM1 that it should finish the TX
PausibleTX.class.notifyAll();
}
}
});
rgn.destroyRegion();
}
/**
* Hitachi bug 38809: Applying an exception to a remote VM fails due to an IOException on a Region
* configured for LRU Overflow
*/
public static final String TROUBLE_KEY = "GoBoom";
public static class TXTroubleMaker implements LocalRegion.TestCallable {
// private final Region r;
@Override
public void call(LocalRegion r, Operation op, RegionEntry re) {
if (TROUBLE_KEY.equals(re.getKey())) {
throw new DiskAccessException(TROUBLE_KEY, r);
}
}
}
public static class ShutdownListener implements ResourceEventsListener {
CountDownLatch latch = new CountDownLatch(1);
@Override
public void handleEvent(ResourceEvent event, Object resource) {
if (event.equals(ResourceEvent.CACHE_REMOVE)) {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void unblockShutdown() {
latch.countDown();
}
}
@Ignore("TODO: Disabled for #51260")
@Test
public void testRemoteCommitFailure() throws Exception {
try {
disconnectAllFromDS();
final String rgnName1 = getUniqueName() + "_1";
final String rgnName2 = getUniqueName() + "_2";
final String diskStoreName = getUniqueName() + "_store";
Host host = Host.getHost(0);
VM origin = host.getVM(0);
VM trouble1 = host.getVM(1);
VM trouble2 = host.getVM(2);
VM noTrouble = host.getVM(3);
CacheSerializableRunnable initRegions =
new CacheSerializableRunnable("Initialize no trouble regions") {
@Override
public void run2() {
getCache().createDiskStoreFactory().setDiskDirs(getDiskDirs()).create(diskStoreName);
TXManagerImpl.ALLOW_PERSISTENT_TRANSACTIONS = true;
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
af.setScope(Scope.DISTRIBUTED_ACK);
af.setDiskStoreName(diskStoreName);
getCache().createRegion(rgnName1, af.create());
getCache().createRegion(rgnName2, af.create());
}
};
origin.invoke(initRegions);
noTrouble.invoke(initRegions);
SerializableRunnable initTroulbeRegions =
new CacheSerializableRunnable("Initialize regions that cause trouble") {
@Override
public void run2() {
getCache().createDiskStoreFactory().setDiskDirs(getDiskDirs())
.create(diskStoreName);
TXManagerImpl.ALLOW_PERSISTENT_TRANSACTIONS = true;
InternalRegionFactory factory = getCache().createInternalRegionFactory();
factory.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDiskStoreName(diskStoreName);
factory.setTestCallable(new TXTroubleMaker());
factory.create(rgnName1);
factory.create(rgnName2);
getCache().getInternalDistributedSystem().addResourceListener(new ShutdownListener());
}
};
trouble1.invoke(initTroulbeRegions);
trouble2.invoke(initTroulbeRegions);
SerializableRunnable doTransaction =
new CacheSerializableRunnable("Run failing transaction") {
@Override
public void run2() {
Cache c = getCache();
Region r1 = c.getRegion(rgnName1);
assertNotNull(r1);
Region r2 = c.getRegion(rgnName2);
assertNotNull(r2);
CacheTransactionManager txmgr = c.getCacheTransactionManager();
txmgr.begin();
r1.put("k1", "k1");
r1.put("k2", "k2");
r1.put(TROUBLE_KEY, TROUBLE_KEY);
r2.put("k1", "k1");
r2.put("k2", "k2");
r2.put(TROUBLE_KEY, TROUBLE_KEY);
try {
txmgr.commit();
fail("Expected an tx incomplete exception");
} catch (CommitIncompleteException yay) {
String msg = yay.getMessage();
// getLogWriter().info("failing exception", yay);
// Each region on a trouble VM should be mentioned (two regions per trouble VM)
int ind = 0, match = 0;
while ((ind = msg.indexOf(rgnName1, ind)) >= 0) {
ind++;
match++;
}
assertEquals(2, match);
ind = match = 0;
while ((ind = msg.indexOf(rgnName2, ind)) >= 0) {
ind++;
match++;
}
assertEquals(2, match);
// DiskAccessExcpetions should be mentioned four times
ind = match = 0;
while ((ind = msg.indexOf(DiskAccessException.class.getName(), ind)) >= 0) {
ind++;
match++;
}
assertEquals(4, match);
}
}
};
IgnoredException ee = null;
try {
ee = IgnoredException.addIgnoredException(
DiskAccessException.class.getName() + "|" + CommitIncompleteException.class.getName()
+ "|" + CommitReplyException.class.getName());
origin.invoke(doTransaction);
} finally {
if (ee != null) {
ee.remove();
}
}
SerializableCallable allowCacheToShutdown = new SerializableCallable() {
@Override
public Object call() throws Exception {
GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
List<ResourceEventsListener> listeners =
cache.getInternalDistributedSystem().getResourceListeners();
for (ResourceEventsListener l : listeners) {
if (l instanceof ShutdownListener) {
ShutdownListener shutListener = (ShutdownListener) l;
shutListener.unblockShutdown();
}
}
return null;
}
};
trouble1.invoke(allowCacheToShutdown);
trouble2.invoke(allowCacheToShutdown);
// Assert proper content on failing VMs
SerializableRunnable assertTroubledContent =
new CacheSerializableRunnable("Assert partail commit data") {
@Override
public void run2() {
final Cache c = getCache();
GeodeAwaitility.await().untilAsserted(new WaitCriterion() {
@Override
public boolean done() {
return c.getRegion(rgnName1) == null;
}
@Override
public String description() {
return null;
}
});
Region r2 = c.getRegion(rgnName2);
assertNull(r2);
}
};
trouble1.invoke(assertTroubledContent);
trouble2.invoke(assertTroubledContent);
// Assert proper content on successful VMs
SerializableRunnable assertSuccessfulContent =
new CacheSerializableRunnable("Assert complete commit of data on successful VMs") {
@Override
public void run2() {
Cache c = getCache();
{
Region r1 = c.getRegion(rgnName1);
assertNotNull(r1);
assertEquals("k1", r1.getEntry("k1").getValue());
assertEquals("k2", r1.getEntry("k2").getValue());
assertEquals(TROUBLE_KEY, r1.getEntry(TROUBLE_KEY).getValue());
}
{
Region r2 = c.getRegion(rgnName2);
assertNotNull(r2);
assertEquals("k1", r2.getEntry("k1").getValue());
assertEquals("k2", r2.getEntry("k2").getValue());
assertEquals(TROUBLE_KEY, r2.getEntry(TROUBLE_KEY).getValue());
}
}
};
noTrouble.invoke(assertSuccessfulContent);
// Assert no content on originating VM
SerializableRunnable assertNoContent =
new CacheSerializableRunnable("Assert data survives on origin VM") {
@Override
public void run2() {
Cache c = getCache();
{
Region r1 = c.getRegion(rgnName1);
assertNotNull(r1);
assertNotNull(r1.getEntry("k1"));
assertNotNull(r1.getEntry("k2"));
assertNotNull(r1.getEntry(TROUBLE_KEY));
}
{
Region r2 = c.getRegion(rgnName2);
assertNotNull(r2);
assertNotNull(r2.getEntry("k1"));
assertNotNull(r2.getEntry("k2"));
assertNotNull(r2.getEntry(TROUBLE_KEY));
}
}
};
origin.invoke(assertNoContent);
} finally {
Invoke.invokeInEveryVM(new SerializableCallable() {
@Override
public Object call() throws Exception {
TXManagerImpl.ALLOW_PERSISTENT_TRANSACTIONS = false;
return null;
}
});
}
}
}