blob: 359fc6ff09eb37f41a59a6245abd3ddf3d74fdb8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.wan.concurrent;
import static org.apache.geode.cache.Region.SEPARATOR;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.WANTestBase;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.Wait;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.junit.categories.WanTest;
@Category({WanTest.class})
public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTestBase {
private static final long serialVersionUID = 1L;
public ConcurrentParallelGatewaySenderOperation_2_DUnitTest() {
super();
}
@Override
protected final void postSetUpWANTestBase() throws Exception {
IgnoredException.addIgnoredException("RegionDestroyedException");
IgnoredException.addIgnoredException("Broken pipe");
IgnoredException.addIgnoredException("Connection reset");
IgnoredException.addIgnoredException("Unexpected IOException");
}
@Test
public void shuttingOneSenderInAVMShouldNotAffectOthersBatchRemovalThread() {
Integer lnport = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnport));
createCacheInVMs(lnport, vm2, vm3);
vm2.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true));
vm2.invoke(() -> WANTestBase.createSender("ln2", 2, true, 100, 10, false, true, null, true));
vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln,ln2", 1,
100, false));
createCacheInVMs(nyPort, vm4, vm5);
vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 100,
false));
vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 100,
false));
vm4.invoke(() -> WANTestBase.createReceiver());
vm2.invoke(() -> WANTestBase.startSender("ln"));
vm2.invoke(() -> WANTestBase.startSender("ln2"));
vm3.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true));
vm3.invoke(() -> WANTestBase.createSender("ln2", 2, true, 100, 10, false, true, null, true));
vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln,ln2", 1,
100, false));
vm3.invoke(() -> WANTestBase.startSender("ln"));
vm3.invoke(() -> WANTestBase.startSender("ln2"));
vm2.invoke(() -> WANTestBase.waitForSenderRunningState("ln"));
vm2.invoke(() -> WANTestBase.waitForSenderRunningState("ln2"));
vm3.invoke(() -> WANTestBase.waitForSenderRunningState("ln"));
vm3.invoke(() -> WANTestBase.waitForSenderRunningState("ln2"));
AsyncInvocation asyncPuts = vm2.invokeAsync(() -> {
WANTestBase.doPuts(getTestMethodName() + "_PR", 1000);
});
// Guarantee some entries are in the queue even if the asyncPuts thread is slow
vm2.invoke(() -> {
WANTestBase.doPuts(getTestMethodName() + "_PR", 100);
});
vm2.invoke(() -> await()
.until(() -> WANTestBase.getSenderStats("ln", -1).get(3) > 0));
vm2.invoke(() -> WANTestBase.stopSender("ln")); // Things have dispatched
// Dispatch additional values
vm2.invoke(() -> {
WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 1000, 1100);
});
await().until(() -> asyncPuts.isDone());
vm2.invoke(() -> await()
.untilAsserted(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1100)));
vm4.invoke(() -> await()
.untilAsserted(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1100)));
vm3.invoke(() -> {
await()
.untilAsserted(
() -> assertTrue(WANTestBase.getQueueContentSize("ln2", true) + " was the size",
WANTestBase.getQueueContentSize("ln2", true) == 0));
});
}
// to test that when userPR is locally destroyed, shadow Pr is also locally
// destroyed and on recreation userPr , shadow Pr is also recreated.
@Test
public void testParallelGatewaySender_SingleNode_UserPR_localDestroy_RecreateRegion()
throws Exception {
Integer[] locatorPorts = createLNAndNYLocators();
Integer lnPort = locatorPorts[0];
Integer nyPort = locatorPorts[1];
try {
String regionName = getTestMethodName() + "_PR";
createCacheInVMs(lnPort, vm4);
vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = -1);
vm4.invoke(
() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap()));
vm4.invoke(() -> createConcurrentSender("ln", 2, true, 100, 10, false, false, null, false, 5,
OrderPolicy.KEY));
vm4.invoke(() -> startSender("ln"));
vm4.invoke(() -> pauseSender("ln"));
createCacheInVMs(nyPort, vm2);
vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap()));
vm2.invoke(() -> createReceiver());
vm4.invoke(() -> doPuts(regionName, 10));
vm4.invoke(() -> validateRegionSize(regionName, 10));
vm2.invoke(() -> validateRegionSize(regionName, 0));
vm4.invoke(() -> localDestroyRegion(getTestMethodName() + "_PR"));
vm2.invoke(() -> validateRegionSize(regionName, 0));
vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 10, isOffHeap()));
vm4.invoke(() -> doPutsFrom(regionName, 10, 20));
vm2.invoke(() -> await()
.untilAsserted(() -> validateRegionSize(regionName, 0)));
vm4.invoke(() -> validateRegionSize(regionName, 10));
} finally {
vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = 0);
}
}
@Test
public void testParallelGatewaySender_SingleNode_UserPR_Destroy_RecreateRegion()
throws Exception {
Integer[] locatorPorts = createLNAndNYLocators();
Integer lnPort = locatorPorts[0];
Integer nyPort = locatorPorts[1];
try {
String regionName = getTestMethodName() + "_PR";
createCacheInVMs(lnPort, vm4);
vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = -1);
vm4.invoke(
() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap()));
vm4.invoke(() -> createConcurrentSender("ln", 2, true, 100, 10, false, false, null, false, 4,
OrderPolicy.KEY));
vm4.invoke(() -> startSender("ln"));
vm4.invoke(() -> pauseSender("ln"));
createCacheInVMs(nyPort, vm2);
vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap()));
vm2.invoke(() -> createReceiver());
vm4.invoke(() -> doPuts(regionName, 10));
vm4.invoke(() -> validateRegionSize(regionName, 10));
vm2.invoke(() -> validateRegionSize(regionName, 0));
vm4.invoke(() -> resumeSender("ln"));
vm4.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln"));
vm4.invoke(() -> localDestroyRegion(getTestMethodName() + "_PR"));
vm2.invoke(() -> validateRegionSize(regionName, 10));
vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 10, isOffHeap()));
vm4.invoke(() -> doPutsFrom(regionName, 10, 20));
vm2.invoke(() -> await()
.untilAsserted(() -> validateRegionSize(regionName, 20)));
vm4.invoke(() -> validateRegionSize(regionName, 10));
} finally {
vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = 0);
}
}
@Test
public void testParallelGatewaySender_SingleNode_UserPR_Close_RecreateRegion() throws Exception {
Integer[] locatorPorts = createLNAndNYLocators();
Integer lnPort = locatorPorts[0];
Integer nyPort = locatorPorts[1];
try {
String regionName = getTestMethodName() + "_PR";
createCacheInVMs(lnPort, vm4);
vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = -1);
vm4.invoke(
() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap()));
vm4.invoke(() -> createConcurrentSender("ln", 2, true, 100, 10, false, false, null, false, 7,
OrderPolicy.KEY));
vm4.invoke(() -> startSender("ln"));
vm4.invoke(() -> pauseSender("ln"));
createCacheInVMs(nyPort, vm2);
vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap()));
vm2.invoke(() -> createReceiver());
vm4.invoke(() -> doPuts(regionName, 10));
vm4.invoke(() -> validateRegionSize(regionName, 10));
vm4.invoke(() -> closeRegion(getTestMethodName() + "_PR"));
vm4.invoke(() -> resumeSender("ln"));
Thread.sleep(500);
vm2.invoke(() -> validateRegionSize(regionName, 0));
vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 10, isOffHeap()));
vm4.invoke(() -> doPutsFrom(regionName, 10, 20));
vm2.invoke(() -> await()
.untilAsserted(() -> validateRegionSize(regionName, 10)));
vm4.invoke(() -> validateRegionSize(regionName, 10));
} finally {
vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = 0);
}
}
@Test
public void testParallelGatewaySender_SingleNode_UserPR_Destroy_SimultaneousPut_RecreateRegion()
throws Exception {
Integer[] locatorPorts = createLNAndNYLocators();
Integer lnPort = locatorPorts[0];
Integer nyPort = locatorPorts[1];
try {
createAndStartSender(vm4, lnPort, 6, false, true);
vm4.invoke(() -> addCacheListenerAndDestroyRegion(getTestMethodName() + "_PR"));
createReceiverAndDoPutsInPausedSender(nyPort);
vm4.invoke(() -> resumeSender("ln"));
AsyncInvocation putAsync =
vm4.invokeAsync(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 10, 101));
try {
putAsync.join();
} catch (InterruptedException e) {
e.printStackTrace();
fail("Interrupted the async invocation.");
}
if (putAsync.getException() != null
&& !(putAsync.getException() instanceof RegionDestroyedException)) {
Assert.fail("Expected RegionDestroyedException but got", putAsync.getException());
}
// before destroy, there is wait for queue to drain, so data will be
// dispatched
vm2.invoke(() -> validateRegionSizeWithinRange(getTestMethodName() + "_PR", 10, 101)); // possible
// size
// is
// more
// than
// 10
vm4.invoke(
() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap()));
vm4.invoke(() -> doPutsFrom(getTestMethodName() + "_PR", 10, 20));
vm4.invoke(() -> validateRegionSize(getTestMethodName() + "_PR", 10));
vm2.invoke(() -> validateRegionSizeWithinRange(getTestMethodName() + "_PR", 20, 101)); // possible
// size
// is
// more
// than
// 20
} finally {
vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
}
}
@Test
public void testParallelGatewaySender_SingleNode_UserPR_Destroy_NodeDown() throws Exception {
IgnoredException.addIgnoredException("Broken pipe");
IgnoredException.addIgnoredException("Connection reset");
IgnoredException.addIgnoredException("Unexpected IOException");
Integer[] locatorPorts = createLNAndNYLocators();
Integer lnPort = locatorPorts[0];
Integer nyPort = locatorPorts[1];
try {
createAndStartSender(vm4, lnPort, 5, false, true);
createAndStartSender(vm5, lnPort, 5, false, true);
createAndStartSender(vm6, lnPort, 5, false, true);
createReceiverAndDoPutsInPausedSender(nyPort);
vm4.invoke(() -> WANTestBase.resumeSender("ln"));
vm5.invoke(() -> WANTestBase.resumeSender("ln"));
vm6.invoke(() -> WANTestBase.resumeSender("ln"));
Wait.pause(200);
AsyncInvocation localDestroyAsync =
vm4.invokeAsync(() -> WANTestBase.destroyRegion(getTestMethodName() + "_PR"));
AsyncInvocation closeAsync = vm4.invokeAsync(() -> WANTestBase.closeCache());
try {
localDestroyAsync.join();
closeAsync.join();
} catch (InterruptedException e) {
e.printStackTrace();
fail("Interrupted the async invocation.");
}
vm2.invoke(() -> validateRegionSize(getTestMethodName() + "_PR", 10));
} finally {
vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
vm5.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
vm6.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
}
}
@Test
public void testParallelGatewaySender_SingleNode_UserPR_Close_SimultaneousPut_RecreateRegion()
throws Exception {
Integer[] locatorPorts = createLNAndNYLocators();
Integer lnPort = locatorPorts[0];
Integer nyPort = locatorPorts[1];
try {
String regionName = getTestMethodName() + "_PR";
createCacheInVMs(lnPort, vm4);
vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = -1);
vm4.invoke(
() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap()));
vm4.invoke(() -> createConcurrentSender("ln", 2, true, 100, 10, false, false, null, false, 5,
OrderPolicy.KEY));
vm4.invoke(() -> startSender("ln"));
vm4.invoke(() -> pauseSender("ln"));
createCacheInVMs(nyPort, vm2);
vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap()));
vm2.invoke(() -> createReceiver());
vm4.invoke(() -> doPuts(regionName, 10));
vm4.invoke(() -> validateRegionSize(regionName, 10));
vm2.invoke(() -> validateRegionSize(regionName, 0));
AsyncInvocation putAsync =
vm4.invokeAsync(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 10, 2000));
AsyncInvocation localDestroyAsync =
vm4.invokeAsync(() -> ConcurrentParallelGatewaySenderOperation_2_DUnitTest
.closeRegion(getTestMethodName() + "_PR"));
try {
putAsync.join();
localDestroyAsync.join();
} catch (InterruptedException e) {
e.printStackTrace();
fail("Interrupted the async invocation.");
}
vm2.invoke(() -> validateRegionSize(regionName, 0));
vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 10, isOffHeap()));
vm4.invoke(() -> doPutsFrom(regionName, 10, 20));
vm2.invoke(() -> await()
.untilAsserted(() -> validateRegionSize(regionName, 0)));
vm4.invoke(() -> validateRegionSize(regionName, 10));
} finally {
vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = 0);
}
}
@Test
public void testParallelGatewaySenders_SingleNode_UserPR_localDestroy_RecreateRegion()
throws Exception {
Integer[] locatorPorts = createLNAndNYLocators();
Integer lnPort = locatorPorts[0];
Integer nyPort = locatorPorts[1];
Integer tkPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator(3, lnPort));
Integer pnPort = (Integer) vm3.invoke(() -> createFirstRemoteLocator(4, lnPort));
createCacheInVMs(nyPort, vm4);
vm4.invoke(() -> createReceiver());
createCacheInVMs(tkPort, vm5);
vm5.invoke(() -> createReceiver());
createCacheInVMs(pnPort, vm6);
vm6.invoke(() -> createReceiver());
try {
vm7.invoke(() -> createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME(lnPort));
LogWriterUtils.getLogWriter().info("Created cache on local site");
vm7.invoke(() -> createConcurrentSender("ln1", 2, true, 100, 10, false, false, null, true, 5,
OrderPolicy.KEY));
vm7.invoke(() -> createConcurrentSender("ln2", 3, true, 100, 10, false, false, null, true, 5,
OrderPolicy.KEY));
vm7.invoke(() -> createConcurrentSender("ln3", 4, true, 100, 10, false, false, null, true, 5,
OrderPolicy.KEY));
vm7.invoke(() -> startSender("ln1"));
vm7.invoke(() -> startSender("ln2"));
vm7.invoke(() -> startSender("ln3"));
String regionName = getTestMethodName() + "_PR";
vm7.invoke(() -> createPartitionedRegion(regionName, "ln1,ln2,ln3", 1, 10, isOffHeap()));
LogWriterUtils.getLogWriter().info("Created PRs on local site");
vm4.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap()));
vm5.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap()));
vm6.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap()));
vm7.invoke(() -> doPuts(regionName, 10));
vm7.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln1"));
vm7.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln2"));
vm7.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln3"));
vm7.invoke(() -> localDestroyRegion(regionName));
vm7.invoke(() -> createPartitionedRegion(regionName, "ln1,ln2,ln3", 1, 10, isOffHeap()));
vm7.invoke(() -> doPutsFrom(regionName, 10, 20));
vm7.invoke(() -> validateRegionSize(regionName, 10));
validateRegionSizes(regionName, 20, vm4, vm5, vm6);
} finally {
vm7.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
}
}
@Test
public void testParallelGatewaySender_MultipleNode_UserPR_localDestroy_Recreate()
throws Exception {
Integer[] locatorPorts = createLNAndNYLocators();
Integer lnPort = locatorPorts[0];
Integer nyPort = locatorPorts[1];
createCacheInVMs(nyPort, vm2);
vm2.invoke(() -> createReceiver());
try {
createAndStartSender(vm4, lnPort, 5, true, false);
createAndStartSender(vm5, lnPort, 5, true, false);
String regionName = getTestMethodName() + "_PR";
vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap()));
AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts(regionName, 10));
Wait.pause(1000);
vm5.invoke(() -> localDestroyRegion(regionName));
try {
inv1.join();
} catch (InterruptedException ex) {
ex.printStackTrace();
fail("Interrupted the async invocation.");
}
validateRegionSizes(regionName, 10, vm4, vm2);
vm5.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 10, isOffHeap()));
vm4.invoke(() -> doPutsFrom(regionName, 10, 20));
validateRegionSizes(regionName, 20, vm4, vm2);
} finally {
vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
vm5.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
}
}
@Test
public void testParallelGatewaySenders_MultipleNode_UserPR_localDestroy_Recreate()
throws Exception {
Integer[] locatorPorts = createLNAndNYLocators();
Integer lnPort = locatorPorts[0];
Integer nyPort = locatorPorts[1];
Integer tkPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator(3, lnPort));
createCacheInVMs(nyPort, vm6);
vm6.invoke(() -> createReceiver());
createCacheInVMs(tkPort, vm7);
vm7.invoke(() -> createReceiver());
try {
createAndStartTwoSenders(vm4, lnPort, 4);
createAndStartTwoSenders(vm5, lnPort, 4);
String regionName = getTestMethodName() + "_PR";
vm6.invoke(() -> WANTestBase.createPartitionedRegion(regionName, null, 1, 100, isOffHeap()));
vm7.invoke(() -> WANTestBase.createPartitionedRegion(regionName, null, 1, 100, isOffHeap()));
AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts(regionName, 10));
Wait.pause(1000);
vm5.invoke(() -> WANTestBase.localDestroyRegion(regionName));
try {
inv1.join();
} catch (InterruptedException ex) {
ex.printStackTrace();
fail("Interrupted the async invocation.");
}
validateRegionSizes(regionName, 10, vm4, vm6, vm7);
vm5.invoke(
() -> WANTestBase.createPartitionedRegion(regionName, "ln1,ln2", 1, 100, isOffHeap()));
vm4.invoke(() -> WANTestBase.doPutsFrom(regionName, 10, 20));
validateRegionSizes(regionName, 20, vm4, vm6, vm7);
} finally {
vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
vm5.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
}
}
@Test
public void testParallelGatewaySender_ColocatedPartitionedRegions_localDestroy()
throws Exception {
Integer[] locatorPorts = createLNAndNYLocators();
Integer lnPort = locatorPorts[0];
Integer nyPort = locatorPorts[1];
createCacheInVMs(nyPort, vm2);
vm2.invoke(() -> createReceiver());
try {
createAndStartSenderWithCustomerOrderShipmentRegion(vm4, lnPort, 5, true);
createAndStartSenderWithCustomerOrderShipmentRegion(vm5, lnPort, 5, true);
LogWriterUtils.getLogWriter().info("Created PRs on local site");
vm2.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 100, isOffHeap()));
AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.putcolocatedPartitionedRegion(10));
Wait.pause(1000);
try {
vm5.invoke(() -> localDestroyRegion(customerRegionName));
} catch (Exception ex) {
assertTrue(ex.getCause() instanceof UnsupportedOperationException);
}
try {
inv1.join();
} catch (Exception e) {
Assert.fail("Unexpected exception", e);
}
validateRegionSizes(customerRegionName, 10, vm4, vm5, vm2);
} finally {
vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
vm5.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
}
}
@Test
public void testParallelGatewaySender_ColocatedPartitionedRegions_destroy() throws Exception {
Integer[] locatorPorts = createLNAndNYLocators();
Integer lnPort = locatorPorts[0];
Integer nyPort = locatorPorts[1];
createCacheInVMs(nyPort, vm2);
vm2.invoke(() -> WANTestBase.createReceiver());
try {
createAndStartSenderWithCustomerOrderShipmentRegion(vm4, lnPort, 6, true);
createAndStartSenderWithCustomerOrderShipmentRegion(vm5, lnPort, 6, true);
LogWriterUtils.getLogWriter().info("Created PRs on local site");
vm2.invoke(() -> WANTestBase.createCustomerOrderShipmentPartitionedRegion(null, 1, 100,
isOffHeap()));
AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.putcolocatedPartitionedRegion(2000));
Wait.pause(1000);
try {
vm5.invoke(() -> WANTestBase.destroyRegion(customerRegionName));
} catch (Exception ex) {
assertTrue(ex.getCause() instanceof IllegalStateException);
return;
}
fail("Expected UnsupportedOperationException");
} finally {
vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
vm5.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
}
}
public static void clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME() {
AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = 0;
}
public static void closeRegion(String regionName) {
Region r = cache.getRegion(Region.SEPARATOR + regionName);
assertNotNull(r);
r.close();
}
public static void validateRegionSizeWithinRange(String regionName, final int min,
final int max) {
final Region r = cache.getRegion(SEPARATOR + regionName);
assertNotNull(r);
WaitCriterion wc = new WaitCriterion() {
@Override
public boolean done() {
if (r.keySet().size() > min && r.keySet().size() <= max) {
return true;
}
return false;
}
@Override
public String description() {
return "Expected region entries to be within range : " + min + " " + max
+ " but actual entries: " + r.keySet().size();
}
};
GeodeAwaitility.await().untilAsserted(wc);
}
protected static void createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME(Integer locPort) {
createCache(false, locPort);
AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = -1;
}
protected void createAndStartSender(VM vm, int port, int concurrencyLevel, boolean manualStart,
boolean pause) {
vm.invoke(() -> createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME(port));
vm.invoke(() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap()));
createSender(vm, concurrencyLevel, manualStart);
vm.invoke(() -> startSender("ln"));
if (pause) {
vm.invoke(() -> pauseSender("ln"));
}
LogWriterUtils.getLogWriter().info("Created PRs on local site");
}
protected void createReceiverAndDoPutsInPausedSender(int port) {
// Note: This is a test-specific method used by several tests to do puts from vm4 to vm2.
String regionName = getTestMethodName() + "_PR";
createCacheInVMs(port, vm2);
vm2.invoke(() -> createReceiver());
vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap()));
vm4.invoke(() -> doPuts(regionName, 10));
vm4.invoke(() -> validateRegionSize(regionName, 10));
// since sender is paused, no dispatching
vm2.invoke(() -> validateRegionSize(regionName, 0));
}
protected void createAndStartTwoSenders(VM vm, int port, int concurrencyLevel) {
// Note: This is a test-specific method used to create and start 2 senders.
vm.invoke(() -> createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME(port));
vm.invoke(
() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln1,ln2", 1, 100, isOffHeap()));
createSenders(vm, concurrencyLevel);
vm.invoke(() -> startSender("ln1"));
vm.invoke(() -> startSender("ln2"));
}
protected void createAndStartSenderWithCustomerOrderShipmentRegion(VM vm, int port,
int concurrencyLevel, boolean manualStart) {
vm.invoke(() -> createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME(port));
vm.invoke(() -> createCustomerOrderShipmentPartitionedRegion("ln", 1, 100, isOffHeap()));
createSender(vm, concurrencyLevel, manualStart);
vm.invoke(() -> startSender("ln"));
}
protected void createSender(VM vm, int concurrencyLevel, boolean manualStart) {
vm.invoke(() -> createConcurrentSender("ln", 2, true, 100, 10, false, false, null, manualStart,
concurrencyLevel, OrderPolicy.KEY));
}
protected void createSenders(VM vm, int concurrencyLevel) {
vm.invoke(() -> createConcurrentSender("ln1", 2, true, 100, 10, false, false, null, true,
concurrencyLevel, OrderPolicy.KEY));
vm.invoke(() -> createConcurrentSender("ln2", 3, true, 100, 10, false, false, null, true,
concurrencyLevel, OrderPolicy.KEY));
}
}