blob: 40e984b3001ae18eda803f4a09e79c9641eadddc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.wan.serial;
import static org.junit.Assert.fail;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.CacheTransactionManager;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.execute.Execution;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.RegionFunctionContext;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.internal.cache.wan.WANTestBase;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.junit.categories.WanTest;
// The tests here are to validate changes introduced because a distributed deadlock
// was found that caused issues for a production customer.
//
// There are 4 tests which use sender gateways with primaries on different
// JVM's. Two tests use replicated and two use partition regions and the
// the tests vary the conserve-sockets.
//
// currently the 4th test using PR, conserve-sockets=true hangs/fails and is commented
// out to prevent issues
@Category({WanTest.class})
public class SerialGatewaySenderDistributedDeadlockDUnitTest extends WANTestBase {
public SerialGatewaySenderDistributedDeadlockDUnitTest() {
super();
}
// Uses replicated regions and conserve-sockets=false
@Test
public void testPrimarySendersOnDifferentVMsReplicated() throws Exception {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstPeerLocator(1));
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCachesWith(Boolean.FALSE, nyPort, lnPort);
createSerialSenders();
createReplicatedRegions(nyPort);
// get one primary sender on vm4 and another primary on vm5
// the startup order matters here
startSerialSenders();
// exercise region and gateway operations with different messaging
exerciseWANOperations();
AsyncInvocation invVM4transaction =
vm4.invokeAsync("doTxPutsAsync", () -> WANTestBase.doTxPuts(getTestMethodName() + "_RR"));
AsyncInvocation invVM5transaction =
vm5.invokeAsync("doTxPutsAsync", () -> WANTestBase.doTxPuts(getTestMethodName() + "_RR"));
AsyncInvocation invVM4 =
vm4.invokeAsync("doPutsAsync", () -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
AsyncInvocation invVM5 =
vm5.invokeAsync("doPutsAsync", () -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
exerciseFunctions();
try {
invVM4transaction.join();
invVM5transaction.join();
invVM4.join();
invVM5.join();
} catch (InterruptedException e) {
e.printStackTrace();
fail();
}
}
// Uses partitioned regions and conserve-sockets=false
@Test
public void testPrimarySendersOnDifferentVMsPR() throws Exception {
Integer lnPort =
(Integer) vm0.invoke("createFirstPeerLocator", () -> WANTestBase.createFirstPeerLocator(1));
Integer nyPort = (Integer) vm1.invoke("createFirstRemoteLocator",
() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCachesWith(Boolean.FALSE, nyPort, lnPort);
createSerialSenders();
createPartitionedRegions(nyPort);
startSerialSenders();
exerciseWANOperations();
AsyncInvocation invVM4transaction =
vm4.invokeAsync("doTxPutsPRAsync", () -> SerialGatewaySenderDistributedDeadlockDUnitTest
.doTxPutsPR(getTestMethodName() + "_RR", 100, 1000));
AsyncInvocation invVM5transaction =
vm5.invokeAsync("doTxPutsPRAsync", () -> SerialGatewaySenderDistributedDeadlockDUnitTest
.doTxPutsPR(getTestMethodName() + "_RR", 100, 1000));
AsyncInvocation invVM4 =
vm4.invokeAsync("doPutsAsync", () -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
AsyncInvocation invVM5 =
vm5.invokeAsync("doPutsAsync", () -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
exerciseFunctions();
invVM4transaction.join();
invVM5transaction.join();
invVM4.join();
invVM5.join();
}
// Uses replicated regions and conserve-sockets=true
@Test
public void testPrimarySendersOnDifferentVMsReplicatedSocketPolicy() throws Exception {
Integer lnPort =
(Integer) vm0.invoke("createFirstPeerLocator", () -> WANTestBase.createFirstPeerLocator(1));
Integer nyPort = (Integer) vm1.invoke("createFirstRemoteLocator",
() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCachesWith(Boolean.TRUE, nyPort, lnPort);
createSerialSenders();
createReplicatedRegions(nyPort);
// get one primary sender on vm4 and another primary on vm5
// the startup order matters here
startSerialSenders();
// exercise region and gateway operations with messaging
exerciseWANOperations();
AsyncInvocation invVM4transaction =
vm4.invokeAsync("doTxPutsAsync", () -> WANTestBase.doTxPuts(getTestMethodName() + "_RR"));
AsyncInvocation invVM5transaction =
vm5.invokeAsync("doTxPutsAsync", () -> WANTestBase.doTxPuts(getTestMethodName() + "_RR"));
AsyncInvocation invVM4 =
vm4.invokeAsync("doPutsAsync", () -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
AsyncInvocation invVM5 =
vm5.invokeAsync("doPutsAsync", () -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
exerciseFunctions();
invVM4transaction.join();
invVM5transaction.join();
invVM4.join();
invVM5.join();
}
// Uses partitioned regions and conserve-sockets=true
// this always causes a distributed deadlock
@Test
public void testPrimarySendersOnDifferentVMsPRSocketPolicy() throws Exception {
Integer lnPort =
(Integer) vm0.invoke("createFirstPeerLocator", () -> WANTestBase.createFirstPeerLocator(1));
Integer nyPort = (Integer) vm1.invoke("createFirstRemoteLocator",
() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCachesWith(Boolean.TRUE, nyPort, lnPort);
createSerialSenders();
createPartitionedRegions(nyPort);
startSerialSenders();
exerciseWANOperations();
AsyncInvocation invVM4transaction =
vm4.invokeAsync("doTxPutsPRAsync", () -> SerialGatewaySenderDistributedDeadlockDUnitTest
.doTxPutsPR(getTestMethodName() + "_RR", 100, 1000));
AsyncInvocation invVM5transaction =
vm5.invokeAsync("doTxPutsPRAsync", () -> SerialGatewaySenderDistributedDeadlockDUnitTest
.doTxPutsPR(getTestMethodName() + "_RR", 100, 1000));
AsyncInvocation invVM4 =
vm4.invokeAsync("doPutsAsync", () -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
AsyncInvocation invVM5 =
vm5.invokeAsync("doPutsAsync", () -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
exerciseFunctions();
invVM4transaction.join();
invVM5transaction.join();
invVM4.join();
invVM5.join();
}
// **************************************************************************
// Utility methods used by tests
// **************************************************************************
private void createReplicatedRegions(Integer nyPort) throws Exception {
// create receiver
vm2.invoke("createReplicatedRegion",
() -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", null, false));
vm2.invoke("createReceiver", () -> WANTestBase.createReceiver());
// create senders
vm4.invoke("createReplicatedRegion",
() -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", "ln1,ln2", false));
vm5.invoke("createReplicatedRegion",
() -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", "ln1,ln2", false));
}
private void createCachesWith(Boolean socketPolicy, Integer nyPort, Integer lnPort) {
vm2.invoke("createCacheConserveSockets",
() -> WANTestBase.createCacheConserveSockets(socketPolicy, nyPort));
vm4.invoke("createCacheConserveSockets",
() -> WANTestBase.createCacheConserveSockets(socketPolicy, lnPort));
vm5.invoke("createCacheConserveSockets",
() -> WANTestBase.createCacheConserveSockets(socketPolicy, lnPort));
}
private void exerciseFunctions() throws Exception {
// do function calls that use a shared connection
for (int x = 0; x < 1000; x++) {
// setting it to Boolean.TRUE it should pass the test
vm4.invoke("doFunctionPuts", () -> SerialGatewaySenderDistributedDeadlockDUnitTest
.doFunctionPuts(getTestMethodName() + "_RR", 1, Boolean.TRUE));
vm5.invoke("doFunctionPuts", () -> SerialGatewaySenderDistributedDeadlockDUnitTest
.doFunctionPuts(getTestMethodName() + "_RR", 1, Boolean.TRUE));
}
for (int x = 0; x < 1000; x++) {
// setting the Boolean.FALSE below will cause a deadlock in some GFE versions
// setting it to Boolean.TRUE as above it should pass the test
// this is similar to the customer found distributed deadlock
vm4.invoke("doFunctionPuts2", () -> SerialGatewaySenderDistributedDeadlockDUnitTest
.doFunctionPuts(getTestMethodName() + "_RR", 1, Boolean.FALSE));
vm5.invoke("doFunctionPuts2", () -> SerialGatewaySenderDistributedDeadlockDUnitTest
.doFunctionPuts(getTestMethodName() + "_RR", 1, Boolean.FALSE));
}
}
private void createPartitionedRegions(Integer nyPort) throws Exception {
// create remote receiver
vm2.invoke("createPartitionedRegion",
() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_RR", "", 0, 113, false));
vm2.invoke("createReceiver", () -> WANTestBase.createReceiver());
// create sender vms
vm4.invoke("createPartitionedRegion",
() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_RR", "ln1,ln2", 1,
113, false));
vm5.invoke("createPartitionedRegion",
() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_RR", "ln1,ln2", 1,
113, false));
}
private void exerciseWANOperations() throws Exception {
// note - some of these should be made async to truly exercise the
// messaging between the WAN gateways and members
// exercise region and gateway operations
vm4.invoke("exerciseWANOperations.doPuts",
() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 100));
vm5.invoke("exerciseWANOperations.doPuts",
() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 100));
vm4.invoke("exerciseWANOperations.validateRegionSize",
() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 100));
vm2.invoke("exerciseWANOperations.validateRegionSize",
() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 100));
vm5.invoke("exerciseWANOperations.doDestroys",
() -> WANTestBase.doDestroys(getTestMethodName() + "_RR", 100));
vm5.invoke("exerciseWANOperations.validateRegionSize",
() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 0));
vm2.invoke("exerciseWANOperations.validateRegionSize",
() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 0));
vm4.invoke("exerciseWANOperations.doPuts2",
() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 100));
vm5.invoke("exerciseWANOperations.doPuts2",
() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 100));
vm4.invoke("exerciseWANOperations.validateRegionSize",
() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 100));
vm2.invoke("exerciseWANOperations.validateRegionSize",
() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 100));
vm4.invoke("exerciseWANOperations.doInvalidates",
() -> SerialGatewaySenderDistributedDeadlockDUnitTest
.doInvalidates(getTestMethodName() + "_RR", 100, 100));
vm4.invoke("exerciseWANOperations.doPutAll",
() -> WANTestBase.doPutAll(getTestMethodName() + "_RR", 100, 10));
vm5.invoke("exerciseWANOperations.doPutAll",
() -> WANTestBase.doPutAll(getTestMethodName() + "_RR", 100, 10));
vm4.invoke("exerciseWANOperations.validateRegionSize",
() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 1000));
vm2.invoke("exerciseWANOperations.validateRegionSize",
() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 1000));
vm4.invoke("exerciseWANOperations.doDestroys",
() -> WANTestBase.doDestroys(getTestMethodName() + "_RR", 1000));
vm5.invoke("exerciseWANOperations.validateRegionSize",
() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 0));
vm2.invoke("exerciseWANOperations.validateRegionSize",
() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 0));
vm4.invoke("exerciseWANOperations.doPutsPDXSerializable",
() -> WANTestBase.doPutsPDXSerializable(getTestMethodName() + "_RR", 100));
vm5.invoke("exerciseWANOperations.validateRegionSize_PDX",
() -> WANTestBase.validateRegionSize_PDX(getTestMethodName() + "_RR", 100));
vm2.invoke("exerciseWANOperations.validateRegionSize_PDX",
() -> WANTestBase.validateRegionSize_PDX(getTestMethodName() + "_RR", 100));
}
private void startSerialSenders() throws Exception {
// get one primary sender on vm4 and another primary on vm5
// the startup order matters here so that primaries are
// on different JVM's
vm4.invoke("start primary sender", () -> WANTestBase.startSender("ln1"));
vm5.invoke("start primary sender", () -> WANTestBase.startSender("ln2"));
// start secondaries
vm5.invoke("start secondary sender", () -> WANTestBase.startSender("ln1"));
vm4.invoke("start secondary sender", () -> WANTestBase.startSender("ln2"));
}
private void createSerialSenders() throws Exception {
vm4.invoke("create primary sender",
() -> WANTestBase.createSender("ln1", 2, false, 100, 10, false, false, null, true));
vm5.invoke("create secondary sender",
() -> WANTestBase.createSender("ln1", 2, false, 100, 10, false, false, null, true));
vm4.invoke("create primary sender",
() -> WANTestBase.createSender("ln2", 2, false, 100, 10, false, false, null, true));
vm5.invoke("create secondary sender",
() -> WANTestBase.createSender("ln2", 2, false, 100, 10, false, false, null, true));
}
public static void doFunctionPuts(String name, int num, Boolean useThreadOwnedSocket)
throws Exception {
Region region = CacheFactory.getAnyInstance().getRegion(name);
FunctionService.registerFunction(new TestFunction());
Execution exe = FunctionService.onRegion(region);
for (int x = 0; x < num; x++) {
exe.setArguments(useThreadOwnedSocket)
.execute("org.apache.geode.internal.cache.wan.serial.TestFunction");
}
}
public static void doTxPutsPR(String regionName, int numPuts, int size) throws Exception {
Region r = cache.getRegion(Region.SEPARATOR + regionName);
CacheTransactionManager mgr = cache.getCacheTransactionManager();
for (int x = 0; x < numPuts; x++) {
int temp = (int) (Math.floor(Math.random() * size));
try {
mgr.begin();
r.put(temp, temp);
mgr.commit();
} catch (org.apache.geode.cache.TransactionDataNotColocatedException txe) {
// ignore colocation issues or primary bucket issues
} catch (org.apache.geode.cache.CommitConflictException cce) {
// ignore - conflicts are ok and expected
}
}
}
public static void doInvalidates(String regionName, int numInvalidates, int size)
throws Exception {
Region r = cache.getRegion(Region.SEPARATOR + regionName);
for (int x = 0; x < numInvalidates; x++) {
int temp = (int) (Math.floor(Math.random() * size));
try {
if (r.containsValueForKey(temp)) {
r.invalidate(temp);
}
} catch (org.apache.geode.cache.EntryNotFoundException entryNotFoundException) {
// ignore as an entry may not exist
}
}
}
}
class TestFunction implements Function {
@Override
public boolean hasResult() {
return false;
}
@Override
public void execute(FunctionContext fc) {
boolean option = (Boolean) fc.getArguments();
if (option) {
DistributedSystem.setThreadsSocketPolicy(false);
}
RegionFunctionContext context = (RegionFunctionContext) fc;
Region local = context.getDataSet();
local.put(randKeyValue(10), randKeyValue(10000));
}
@Override
public String getId() {
return this.getClass().getName();
}
@Override
public boolean optimizeForWrite() {
return false;
}
@Override
public boolean isHA() {
return false;
}
private int randKeyValue(int size) {
double temp = Math.floor(Math.random() * size);
return (int) temp;
}
}