blob: 0e1b23a752ef7605dd30672b94ed18d0a200e478 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.wan.parallel;
import static org.junit.Assert.fail;
import java.io.IOException;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.cache.wan.GatewaySenderFactory;
import org.apache.geode.internal.cache.ColocationHelper;
import org.apache.geode.internal.cache.wan.WANTestBase;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.SerializableRunnableIF;
import org.apache.geode.test.junit.categories.WanTest;
@Category({WanTest.class})
public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends WANTestBase {
private static final long serialVersionUID = 1L;
public ParallelWANPersistenceEnabledGatewaySenderDUnitTest() {
super();
}
@Override
protected final void postSetUpWANTestBase() throws Exception {
// The restart tests log this string
IgnoredException.addIgnoredException("failed accepting client connection");
}
@Test
public void testPartitionedRegionWithGatewaySenderPersistenceEnabled() throws IOException {
try {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCache(lnPort);
GatewaySenderFactory fact = cache.createGatewaySenderFactory();
fact.setPersistenceEnabled(true);
fact.setParallel(true);
final IgnoredException ex = IgnoredException.addIgnoredException("Could not connect");
try {
GatewaySender sender1 = fact.create("NYSender", 2);
AttributesFactory rFact = new AttributesFactory();
rFact.addGatewaySenderId(sender1.getId());
PartitionAttributesFactory pFact = new PartitionAttributesFactory();
pFact.setTotalNumBuckets(100);
pFact.setRedundantCopies(1);
rFact.setPartitionAttributes(pFact.create());
Region r = cache.createRegionFactory(rFact.create()).create("MyRegion");
sender1.start();
} finally {
ex.remove();
}
} catch (Exception e) {
fail("Unexpected Exception :" + e);
}
}
/**
* Enable persistence for region as well as GatewaySender and see if remote site receives all the
* events.
*/
@Test
public void testPersistentPartitionedRegionWithGatewaySenderPersistenceEnabled() {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true));
vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true));
vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true));
vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true));
vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
100, isOffHeap()));
vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
100, isOffHeap()));
vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 1000));
vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000));
vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000));
}
/**
* Enable persistence for the GatewaySender but not the region
*/
@Category({WanTest.class})
@Test
public void testPartitionedRegionWithPersistentGatewaySender() {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);
LogWriterUtils.getLogWriter().info("Created remote receivers");
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
LogWriterUtils.getLogWriter().info("Created local site cache");
vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true));
vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true));
vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true));
vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true));
LogWriterUtils.getLogWriter().info("Created local site senders");
vm4.invoke(createPartitionedRegionRunnable());
vm5.invoke(createPartitionedRegionRunnable());
vm6.invoke(createPartitionedRegionRunnable());
vm7.invoke(createPartitionedRegionRunnable());
LogWriterUtils.getLogWriter().info("Created local site persistent PR");
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
LogWriterUtils.getLogWriter().info("Started the senders");
vm2.invoke(
() -> WANTestBase.createPartitionedRegion(getTestMethodName(), null, 1, 100, isOffHeap()));
vm3.invoke(
() -> WANTestBase.createPartitionedRegion(getTestMethodName(), null, 1, 100, isOffHeap()));
vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 1000));
vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000));
vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000));
}
protected SerializableRunnableIF createPartitionedRegionRunnable() {
return () -> WANTestBase.createPartitionedRegion(getTestMethodName(), "ln", 1, 100,
isOffHeap());
}
/**
* Enable persistence for GatewaySender. Pause the sender and do some puts in local region. Close
* the local site and rebuild the region and sender from disk store. Dispatcher should not start
* dispatching events recovered from persistent sender. Check if the remote site receives all the
* events.
*/
@Test
public void testPRWithGatewaySenderPersistenceEnabled_Restart() {
// create locator on local site
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
// create locator on remote site
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
// create receiver on remote site
createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);
// create cache in local site
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
// create senders with disk store
String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
true, 100, 10, false, true, null, null, true));
String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
true, 100, 10, false, true, null, null, true));
String diskStore3 = (String) vm6.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
true, 100, 10, false, true, null, null, true));
String diskStore4 = (String) vm7.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
true, 100, 10, false, true, null, null, true));
LogWriterUtils.getLogWriter()
.info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4);
// create PR on remote site
vm2.invoke(
() -> WANTestBase.createPartitionedRegion(getTestMethodName(), null, 1, 100, isOffHeap()));
vm3.invoke(
() -> WANTestBase.createPartitionedRegion(getTestMethodName(), null, 1, 100, isOffHeap()));
// create PR on local site
vm4.invoke(createPartitionedRegionRunnable());
vm5.invoke(createPartitionedRegionRunnable());
vm6.invoke(createPartitionedRegionRunnable());
vm7.invoke(createPartitionedRegionRunnable());
// start the senders on local site
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
// wait for senders to become running
vm4.invoke(waitForSenderRunnable());
vm5.invoke(waitForSenderRunnable());
vm6.invoke(waitForSenderRunnable());
vm7.invoke(waitForSenderRunnable());
// pause the senders
vm4.invoke(pauseSenderRunnable());
vm5.invoke(pauseSenderRunnable());
vm6.invoke(pauseSenderRunnable());
vm7.invoke(pauseSenderRunnable());
// start puts in region on local site
vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 3000));
LogWriterUtils.getLogWriter().info("Completed puts in the region");
// --------------------close and rebuild local site
// -------------------------------------------------
// kill the senders
vm4.invoke(killSenderRunnable());
vm5.invoke(killSenderRunnable());
vm6.invoke(killSenderRunnable());
vm7.invoke(killSenderRunnable());
LogWriterUtils.getLogWriter().info("Killed all the senders.");
// restart the vm
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
LogWriterUtils.getLogWriter().info("Created back the cache");
// create senders with disk store
vm4.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
null, diskStore1, true));
vm5.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
null, diskStore2, true));
vm6.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
null, diskStore3, true));
vm7.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
null, diskStore4, true));
LogWriterUtils.getLogWriter().info("Created the senders back from the disk store.");
// create PR on local site
AsyncInvocation inv1 = vm4.invokeAsync(createPartitionedRegionRunnable());
AsyncInvocation inv2 = vm5.invokeAsync(createPartitionedRegionRunnable());
AsyncInvocation inv3 = vm6.invokeAsync(createPartitionedRegionRunnable());
AsyncInvocation inv4 = vm7.invokeAsync(createPartitionedRegionRunnable());
try {
inv1.join();
inv2.join();
inv3.join();
inv4.join();
} catch (InterruptedException e) {
e.printStackTrace();
fail();
}
LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
// start the senders in async mode. This will ensure that the
// node of shadow PR that went down last will come up first
startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
LogWriterUtils.getLogWriter().info("Waiting for senders running.");
// wait for senders running
vm4.invoke(waitForSenderRunnable());
vm5.invoke(waitForSenderRunnable());
vm6.invoke(waitForSenderRunnable());
vm7.invoke(waitForSenderRunnable());
LogWriterUtils.getLogWriter().info("All the senders are now running...");
// ----------------------------------------------------------------------------------------------------
vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 3000));
vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 3000));
}
protected SerializableRunnableIF killSenderRunnable() {
return () -> WANTestBase.killSender();
}
protected SerializableRunnableIF pauseSenderRunnable() {
return () -> WANTestBase.pauseSender("ln");
}
protected SerializableRunnableIF waitForSenderRunnable() {
return () -> WANTestBase.waitForSenderRunningState("ln");
}
/**
* Enable persistence for PR and GatewaySender. Pause the sender and do some puts in local region.
* Close the local site and rebuild the region and sender from disk store. Dispatcher should not
* start dispatching events recovered from persistent sender. Check if the remote site receives
* all the events.
*/
@Test
public void testPersistentPRWithGatewaySenderPersistenceEnabled_Restart() {
// create locator on local site
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
// create locator on remote site
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
// create receiver on remote site
createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);
// create cache in local site
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
// create senders with disk store
String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
true, 100, 10, false, true, null, null, true));
String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
true, 100, 10, false, true, null, null, true));
String diskStore3 = (String) vm6.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
true, 100, 10, false, true, null, null, true));
String diskStore4 = (String) vm7.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
true, 100, 10, false, true, null, null, true));
LogWriterUtils.getLogWriter()
.info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4);
// create PR on remote site
vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
100, isOffHeap()));
vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
100, isOffHeap()));
// create PR on local site
vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
// start the senders on local site
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
// wait for senders to become running
vm4.invoke(waitForSenderRunnable());
vm5.invoke(waitForSenderRunnable());
vm6.invoke(waitForSenderRunnable());
vm7.invoke(waitForSenderRunnable());
// pause the senders
vm4.invoke(pauseSenderRunnable());
vm5.invoke(pauseSenderRunnable());
vm6.invoke(pauseSenderRunnable());
vm7.invoke(pauseSenderRunnable());
// start puts in region on local site
vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 3000));
LogWriterUtils.getLogWriter().info("Completed puts in the region");
// --------------------close and rebuild local site
// -------------------------------------------------
// kill the senders
vm4.invoke(killSenderRunnable());
vm5.invoke(killSenderRunnable());
vm6.invoke(killSenderRunnable());
vm7.invoke(killSenderRunnable());
LogWriterUtils.getLogWriter().info("Killed all the senders.");
// restart the vm
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
LogWriterUtils.getLogWriter().info("Created back the cache");
// create senders with disk store
vm4.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
null, diskStore1, true));
vm5.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
null, diskStore2, true));
vm6.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
null, diskStore3, true));
vm7.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
null, diskStore4, true));
LogWriterUtils.getLogWriter().info("Created the senders back from the disk store.");
// create PR on local site
AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase
.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, 100, isOffHeap()));
AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase
.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, 100, isOffHeap()));
AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase
.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, 100, isOffHeap()));
AsyncInvocation inv4 = vm7.invokeAsync(() -> WANTestBase
.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, 100, isOffHeap()));
try {
inv1.join();
inv2.join();
inv3.join();
inv4.join();
} catch (InterruptedException e) {
e.printStackTrace();
fail();
}
LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
// start the senders in async mode. This will ensure that the
// node of shadow PR that went down last will come up first
startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
LogWriterUtils.getLogWriter().info("Waiting for senders running.");
// wait for senders running
vm4.invoke(waitForSenderRunnable());
vm5.invoke(waitForSenderRunnable());
vm6.invoke(waitForSenderRunnable());
vm7.invoke(waitForSenderRunnable());
LogWriterUtils.getLogWriter().info("All the senders are now running...");
// ----------------------------------------------------------------------------------------------------
vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 3000));
vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 3000));
}
/**
* Enable persistence for PR and GatewaySender. Pause the sender and do some puts in local region.
* Close the local site and rebuild the region and sender from disk store. Dispatcher should not
* start dispatching events recovered from persistent sender. Check if the remote site receives
* all the events.
*/
@Test
public void testPersistentPRWithGatewaySenderPersistenceEnabled_Restart2() {
// create locator on local site
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
// create locator on remote site
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
// create cache in local site
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
// create senders with disk store
String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
true, 100, 10, false, true, null, null, false));
String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
true, 100, 10, false, true, null, null, false));
String diskStore3 = (String) vm6.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
true, 100, 10, false, true, null, null, false));
String diskStore4 = (String) vm7.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
true, 100, 10, false, true, null, null, false));
LogWriterUtils.getLogWriter()
.info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4);
// create PR on local site
vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
// start the senders on local site
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
// wait for senders to become running
vm4.invoke(waitForSenderRunnable());
vm5.invoke(waitForSenderRunnable());
vm6.invoke(waitForSenderRunnable());
vm7.invoke(waitForSenderRunnable());
// pause the senders
vm4.invoke(pauseSenderRunnable());
vm5.invoke(pauseSenderRunnable());
vm6.invoke(pauseSenderRunnable());
vm7.invoke(pauseSenderRunnable());
// start puts in region on local site
vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 300));
LogWriterUtils.getLogWriter().info("Completed puts in the region");
// --------------------close and rebuild local site
// -------------------------------------------------
// kill the senders
vm4.invoke(killSenderRunnable());
vm5.invoke(killSenderRunnable());
vm6.invoke(killSenderRunnable());
vm7.invoke(killSenderRunnable());
LogWriterUtils.getLogWriter().info("Killed all the senders.");
// restart the vm
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
LogWriterUtils.getLogWriter().info("Created back the cache");
// create senders with disk store
vm4.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
null, diskStore1, true));
vm5.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
null, diskStore2, true));
vm6.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
null, diskStore3, true));
vm7.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
null, diskStore4, true));
LogWriterUtils.getLogWriter().info("Created the senders back from the disk store.");
// create PR on local site
AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase
.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, 100, isOffHeap()));
AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase
.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, 100, isOffHeap()));
AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase
.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, 100, isOffHeap()));
AsyncInvocation inv4 = vm7.invokeAsync(() -> WANTestBase
.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, 100, isOffHeap()));
try {
inv1.join();
inv2.join();
inv3.join();
inv4.join();
} catch (InterruptedException e) {
e.printStackTrace();
fail();
}
LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
// start the senders in async mode. This will ensure that the
// node of shadow PR that went down last will come up first
startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
LogWriterUtils.getLogWriter().info("Waiting for senders running.");
// wait for senders running
vm4.invoke(waitForSenderRunnable());
vm5.invoke(waitForSenderRunnable());
vm6.invoke(waitForSenderRunnable());
vm7.invoke(waitForSenderRunnable());
LogWriterUtils.getLogWriter().info("Creating the receiver.");
// create receiver on remote site
createCacheInVMs(nyPort, vm2, vm3);
// create PR on remote site
LogWriterUtils.getLogWriter().info("Creating the partitioned region at receiver. ");
vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
100, isOffHeap()));
vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
100, isOffHeap()));
createReceiverInVMs(vm2, vm3);
vm4.invoke(pauseSenderRunnable());
vm5.invoke(pauseSenderRunnable());
vm6.invoke(pauseSenderRunnable());
vm7.invoke(pauseSenderRunnable());
LogWriterUtils.getLogWriter().info("Doing some extra puts. ");
// start puts in region on local site
vm4.invoke(() -> WANTestBase.doPutsAfter300(getTestMethodName(), 1000));
// ----------------------------------------------------------------------------------------------------
vm4.invoke(() -> WANTestBase.resumeSender("ln"));
vm5.invoke(() -> WANTestBase.resumeSender("ln"));
vm6.invoke(() -> WANTestBase.resumeSender("ln"));
vm7.invoke(() -> WANTestBase.resumeSender("ln"));
LogWriterUtils.getLogWriter().info("Validating the region size at the receiver end. ");
vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000));
vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000));
}
/**
* Enable persistence for PR and GatewaySender. Pause the sender and do some puts in local region.
* Close the local site and rebuild the region and sender from disk store. Dispatcher should not
* start dispatching events recovered from persistent sender. --> At the same time, do some more
* puts on the local region. Check if the remote site receives all the events.
*/
@Test
public void testPersistentPRWithGatewaySenderPersistenceEnabled_Restart_Scenario2() {
// create locator on local site
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
// create locator on remote site
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
// create receiver on remote site
createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
// create senders with disk store
String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
true, 100, 10, false, true, null, null, true));
String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
true, 100, 10, false, true, null, null, true));
String diskStore3 = (String) vm6.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
true, 100, 10, false, true, null, null, true));
String diskStore4 = (String) vm7.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
true, 100, 10, false, true, null, null, true));
LogWriterUtils.getLogWriter()
.info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4);
// create PR on remote site
vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
100, isOffHeap()));
vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
100, isOffHeap()));
// create PR on local site
vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
// start the senders on local site
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
// wait for senders to become running
vm4.invoke(waitForSenderRunnable());
vm5.invoke(waitForSenderRunnable());
vm6.invoke(waitForSenderRunnable());
vm7.invoke(waitForSenderRunnable());
// pause the senders
vm4.invoke(pauseSenderRunnable());
vm5.invoke(pauseSenderRunnable());
vm6.invoke(pauseSenderRunnable());
vm7.invoke(pauseSenderRunnable());
// start puts in region on local site
vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 3000));
LogWriterUtils.getLogWriter().info("Completed puts in the region");
// --------------------close and rebuild local site
// -------------------------------------------------
// kill the senders
vm4.invoke(killSenderRunnable());
vm5.invoke(killSenderRunnable());
vm6.invoke(killSenderRunnable());
vm7.invoke(killSenderRunnable());
LogWriterUtils.getLogWriter().info("Killed all the senders.");
// restart the vm
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
LogWriterUtils.getLogWriter().info("Created back the cache");
// create senders with disk store
vm4.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
null, diskStore1, true));
vm5.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
null, diskStore2, true));
vm6.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
null, diskStore3, true));
vm7.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
null, diskStore4, true));
LogWriterUtils.getLogWriter().info("Created the senders back from the disk store.");
// create PR on local site
AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase
.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, 100, isOffHeap()));
AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase
.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, 100, isOffHeap()));
AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase
.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, 100, isOffHeap()));
AsyncInvocation inv4 = vm7.invokeAsync(() -> WANTestBase
.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, 100, isOffHeap()));
try {
inv1.join();
inv2.join();
inv3.join();
inv4.join();
} catch (InterruptedException e) {
e.printStackTrace();
fail();
}
LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
// start the senders in async mode. This will ensure that the
// node of shadow PR that went down last will come up first
startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
LogWriterUtils.getLogWriter().info("Waiting for senders running.");
// wait for senders running
vm4.invoke(waitForSenderRunnable());
vm5.invoke(waitForSenderRunnable());
vm6.invoke(waitForSenderRunnable());
vm7.invoke(waitForSenderRunnable());
LogWriterUtils.getLogWriter().info("All the senders are now running...");
// ----------------------------------------------------------------------------------------------------
// Dispatcher should be dispatching now. Do some more puts through async thread
AsyncInvocation async1 = vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName(), 1000));
try {
async1.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 3000));
vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 3000));
}
/**
* Test case for bug# 44275. Enable persistence for PR and GatewaySender. Do puts into region with
* key as a String. Close the local site and rebuild the region and sender from disk store. Check
* if the remote site receives all the events.
*/
@Test
public void testPersistentPRWithPersistentGatewaySender_Restart_Bug44275() {
// create locator on local site
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
// create locator on remote site
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
// create receiver on remote site
createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);
// create cache in local site
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
// create senders with disk store
String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
true, 100, 10, false, true, null, null, true));
String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
true, 100, 10, false, true, null, null, true));
String diskStore3 = (String) vm6.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
true, 100, 10, false, true, null, null, true));
String diskStore4 = (String) vm7.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
true, 100, 10, false, true, null, null, true));
LogWriterUtils.getLogWriter()
.info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4);
// create PR on remote site
vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
100, isOffHeap()));
vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
100, isOffHeap()));
// create PR on local site
vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
// start the senders on local site
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
// wait for senders to become running
vm4.invoke(waitForSenderRunnable());
vm5.invoke(waitForSenderRunnable());
vm6.invoke(waitForSenderRunnable());
vm7.invoke(waitForSenderRunnable());
// pause the senders
vm4.invoke(pauseSenderRunnable());
vm5.invoke(pauseSenderRunnable());
vm6.invoke(pauseSenderRunnable());
vm7.invoke(pauseSenderRunnable());
// start puts in region on local site
vm4.invoke(() -> WANTestBase.doPutsWithKeyAsString(getTestMethodName(), 1000));
LogWriterUtils.getLogWriter().info("Completed puts in the region");
// --------------------close and rebuild local site
// -------------------------------------------------
// kill the senders
vm4.invoke(killSenderRunnable());
vm5.invoke(killSenderRunnable());
vm6.invoke(killSenderRunnable());
vm7.invoke(killSenderRunnable());
LogWriterUtils.getLogWriter().info("Killed all the senders.");
// restart the vm
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
LogWriterUtils.getLogWriter().info("Created back the cache");
// create senders with disk store
vm4.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
null, diskStore1, true));
vm5.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
null, diskStore2, true));
vm6.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
null, diskStore3, true));
vm7.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
null, diskStore4, true));
LogWriterUtils.getLogWriter().info("Created the senders back from the disk store.");
// create PR on local site
AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase
.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, 100, isOffHeap()));
AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase
.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, 100, isOffHeap()));
AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase
.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, 100, isOffHeap()));
AsyncInvocation inv4 = vm7.invokeAsync(() -> WANTestBase
.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, 100, isOffHeap()));
try {
inv1.join();
inv2.join();
inv3.join();
inv4.join();
} catch (InterruptedException e) {
e.printStackTrace();
fail();
}
LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
// start the senders in async mode. This will ensure that the
// node of shadow PR that went down last will come up first
startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
LogWriterUtils.getLogWriter().info("Waiting for senders running.");
// wait for senders running
vm4.invoke(waitForSenderRunnable());
vm5.invoke(waitForSenderRunnable());
vm6.invoke(waitForSenderRunnable());
vm7.invoke(waitForSenderRunnable());
LogWriterUtils.getLogWriter().info("All the senders are now running...");
// ----------------------------------------------------------------------------------------------------
vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000));
vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000));
}
/**
* Test case for bug# 44275. Enable persistence for PR and GatewaySender. Do puts into region with
* key as a String. Close the local site and rebuild the region and sender from disk store. Check
* if the remote site receives all the events.
*/
@Test
public void testPersistentPRWithPersistentGatewaySender_Restart_DoOps() {
// create locator on local site
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
// create locator on remote site
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
// create receiver on remote site
createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
// create senders with disk store
String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
true, 100, 10, false, true, null, null, true));
String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
true, 100, 10, false, true, null, null, true));
String diskStore3 = (String) vm6.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
true, 100, 10, false, true, null, null, true));
String diskStore4 = (String) vm7.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
true, 100, 10, false, true, null, null, true));
LogWriterUtils.getLogWriter()
.info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4);
// create PR on remote site
vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
100, isOffHeap()));
vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
100, isOffHeap()));
// create PR on local site
vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
// start the senders on local site
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
// wait for senders to become running
vm4.invoke(waitForSenderRunnable());
vm5.invoke(waitForSenderRunnable());
vm6.invoke(waitForSenderRunnable());
vm7.invoke(waitForSenderRunnable());
// pause the senders
vm4.invoke(pauseSenderRunnable());
vm5.invoke(pauseSenderRunnable());
vm6.invoke(pauseSenderRunnable());
vm7.invoke(pauseSenderRunnable());
// start puts in region on local site
vm4.invoke(() -> WANTestBase.doPutsWithKeyAsString(getTestMethodName(), 1000));
LogWriterUtils.getLogWriter().info("Completed puts in the region");
// --------------------close and rebuild local site
// -------------------------------------------------
// kill the senders
vm4.invoke(killSenderRunnable());
vm5.invoke(killSenderRunnable());
vm6.invoke(killSenderRunnable());
vm7.invoke(killSenderRunnable());
LogWriterUtils.getLogWriter().info("Killed all the senders.");
// restart the vm
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
LogWriterUtils.getLogWriter().info("Created back the cache");
// create senders with disk store
vm4.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
null, diskStore1, true));
vm5.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
null, diskStore2, true));
vm6.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
null, diskStore3, true));
vm7.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
null, diskStore4, true));
LogWriterUtils.getLogWriter().info("Created the senders back from the disk store.");
// create PR on local site
vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
// start the senders in async mode. This will ensure that the
// node of shadow PR that went down last will come up first
startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
LogWriterUtils.getLogWriter().info("Waiting for senders running.");
// wait for senders running
vm4.invoke(waitForSenderRunnable());
vm5.invoke(waitForSenderRunnable());
vm6.invoke(waitForSenderRunnable());
vm7.invoke(waitForSenderRunnable());
LogWriterUtils.getLogWriter().info("All the senders are now running...");
// ----------------------------------------------------------------------------------------------------
vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000));
vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000));
vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000));
vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000));
vm6.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000));
vm7.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000));
// do some extra puts in region on local site
vm4.invoke(() -> WANTestBase.doPutsWithKeyAsString(getTestMethodName(), 10000));
LogWriterUtils.getLogWriter().info("Completed puts in the region");
vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 10000));
vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 10000));
}
@Test
public void testPersistentPR_Restart() {
// create locator on local site
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
// create cache in local site
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
// create PR on local site
vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
// start puts in region on local site
vm4.invoke(() -> WANTestBase.doPutsWithKeyAsString(getTestMethodName(), 1000));
LogWriterUtils.getLogWriter().info("Completed puts in the region");
// --------------------close and rebuild local site
// -------------------------------------------------
// kill the senders
vm4.invoke(killSenderRunnable());
vm5.invoke(killSenderRunnable());
vm6.invoke(killSenderRunnable());
vm7.invoke(killSenderRunnable());
LogWriterUtils.getLogWriter().info("Killed all the senders.");
// restart the vm
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
LogWriterUtils.getLogWriter().info("Created back the cache");
// // create PR on local site
// vm4.invoke(WANTestBase.class, "createPersistentPartitionedRegion",
// new Object[] { testName, "ln", 1, 100, isOffHeap() });
// vm5.invoke(WANTestBase.class, "createPersistentPartitionedRegion",
// new Object[] { testName, "ln", 1, 100, isOffHeap() });
// vm6.invoke(WANTestBase.class, "createPersistentPartitionedRegion",
// new Object[] { testName, "ln", 1, 100, isOffHeap() });
// vm7.invoke(WANTestBase.class, "createPersistentPartitionedRegion",
// new Object[] { testName, "ln", 1, 100, isOffHeap() });
// create PR on local site
AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase
.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, 100, isOffHeap()));
AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase
.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, 100, isOffHeap()));
AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase
.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, 100, isOffHeap()));
AsyncInvocation inv4 = vm7.invokeAsync(() -> WANTestBase
.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, 100, isOffHeap()));
try {
inv1.join();
inv2.join();
inv3.join();
inv4.join();
} catch (InterruptedException e) {
e.printStackTrace();
fail();
}
LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000));
vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000));
vm6.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000));
vm7.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000));
}
/**
* Enable persistence for PR and GatewaySender. Close the local site. Create the local cache.
* Don't create back the partitioned region but create just the sender. Check if the remote site
* receives all the events. NOTE: This use case is not supported yet. For ParallelGatewaySender to
* start, it must be associated with a partitioned region.
*/
@Ignore("NotSupported")
@Test
public void testPersistentPartitionedRegionWithGatewaySenderPersistenceEnabled_Restart2() {
// create locator on local site
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
// create locator on remote site
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
// create receiver on remote site
createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);
// create cache in local site
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
// create senders with disk store
String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
true, 100, 10, false, true, null, null, true));
String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
true, 100, 10, false, true, null, null, true));
String diskStore3 = (String) vm6.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
true, 100, 10, false, true, null, null, true));
String diskStore4 = (String) vm7.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
true, 100, 10, false, true, null, null, true));
LogWriterUtils.getLogWriter()
.info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4);
// create PR on remote site
vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
100, isOffHeap()));
vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
100, isOffHeap()));
// create PR on local site
vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
// start the senders on local site
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
// wait for senders to become running
vm4.invoke(waitForSenderRunnable());
vm5.invoke(waitForSenderRunnable());
vm6.invoke(waitForSenderRunnable());
vm7.invoke(waitForSenderRunnable());
// pause the senders
vm4.invoke(pauseSenderRunnable());
vm5.invoke(pauseSenderRunnable());
vm6.invoke(pauseSenderRunnable());
vm7.invoke(pauseSenderRunnable());
// start puts in region on local site
vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 1000));
LogWriterUtils.getLogWriter().info("Completed puts in the region");
// --------------------close and rebuild local site
// -------------------------------------------------
// kill the senders
vm4.invoke(killSenderRunnable());
vm5.invoke(killSenderRunnable());
vm6.invoke(killSenderRunnable());
vm7.invoke(killSenderRunnable());
LogWriterUtils.getLogWriter().info("Killed all the senders.");
// restart the vm
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
LogWriterUtils.getLogWriter().info("Created back the cache");
// create senders from disk store
vm4.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
null, diskStore1, true));
vm5.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
null, diskStore2, true));
vm6.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
null, diskStore3, true));
vm7.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
null, diskStore4, true));
LogWriterUtils.getLogWriter().info("Created the senders back from the disk store.");
// start the senders. NOTE that the senders are not associated with partitioned region
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
LogWriterUtils.getLogWriter().info("Started the senders.");
LogWriterUtils.getLogWriter().info("Waiting for senders running.");
// wait for senders running
vm4.invoke(waitForSenderRunnable());
vm5.invoke(waitForSenderRunnable());
vm6.invoke(waitForSenderRunnable());
vm7.invoke(waitForSenderRunnable());
LogWriterUtils.getLogWriter().info("All the senders are now running...");
// ----------------------------------------------------------------------------------------------------
vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000));
vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000));
}
/**
* Create a non persistent PR and enable persistence for GatewaySender attached to the PR. Close
* the local site and rebuild it. Check if the remote site receives all the events. NOTE: This use
* case is not supported for now. For persistent parallel gateway sender, the PR to which it is
* attached should also be persistent.
*/
@Ignore("NotSupported")
@Test
public void testNonPersistentPartitionedRegionWithGatewaySenderPersistenceEnabled_Restart() {
// create locator on local site
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
// create locator on remote site
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
// create receiver on remote site
createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);
// create cache in local site
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
// create senders with disk store
String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
true, 100, 10, false, true, null, null, true));
String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
true, 100, 10, false, true, null, null, true));
String diskStore3 = (String) vm6.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
true, 100, 10, false, true, null, null, true));
String diskStore4 = (String) vm7.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
true, 100, 10, false, true, null, null, true));
LogWriterUtils.getLogWriter()
.info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4);
// create PR on remote site
vm2.invoke(
() -> WANTestBase.createPartitionedRegion(getTestMethodName(), null, 1, 100, isOffHeap()));
vm3.invoke(
() -> WANTestBase.createPartitionedRegion(getTestMethodName(), null, 1, 100, isOffHeap()));
// create non persistent PR on local site
vm4.invoke(createPartitionedRegionRunnable());
vm5.invoke(createPartitionedRegionRunnable());
vm6.invoke(createPartitionedRegionRunnable());
vm7.invoke(createPartitionedRegionRunnable());
// start the senders on local site
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
// wait for senders to become running
vm4.invoke(waitForSenderRunnable());
vm5.invoke(waitForSenderRunnable());
vm6.invoke(waitForSenderRunnable());
vm7.invoke(waitForSenderRunnable());
// pause the senders
vm4.invoke(pauseSenderRunnable());
vm5.invoke(pauseSenderRunnable());
vm6.invoke(pauseSenderRunnable());
vm7.invoke(pauseSenderRunnable());
// start puts in region on local site
vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 1000));
LogWriterUtils.getLogWriter().info("Completed puts in the region");
// kill the senders
vm4.invoke(killSenderRunnable());
vm5.invoke(killSenderRunnable());
vm6.invoke(killSenderRunnable());
vm7.invoke(killSenderRunnable());
LogWriterUtils.getLogWriter()
.info("Killed all the senders. The local site has been brought down.");
// restart the vm
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
LogWriterUtils.getLogWriter().info("Created back the cache");
// create senders with disk store
vm4.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
null, diskStore1, true));
vm5.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
null, diskStore2, true));
vm6.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
null, diskStore3, true));
vm7.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
null, diskStore4, true));
LogWriterUtils.getLogWriter().info("Created the senders back from the disk store.");
// create PR on local site
vm4.invoke(createPartitionedRegionRunnable());
vm5.invoke(createPartitionedRegionRunnable());
vm6.invoke(createPartitionedRegionRunnable());
vm7.invoke(createPartitionedRegionRunnable());
LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
// start the senders
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
LogWriterUtils.getLogWriter().info("Started the senders.");
LogWriterUtils.getLogWriter().info("Waiting for senders running.");
// wait for senders running
vm4.invoke(waitForSenderRunnable());
vm5.invoke(waitForSenderRunnable());
vm6.invoke(waitForSenderRunnable());
vm7.invoke(waitForSenderRunnable());
LogWriterUtils.getLogWriter().info("All the senders are now running...");
vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000));
vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 1000));
}
/**
* Create persistent PR and non-persistent GatewaySender. After doing puts, close the local site.
* Rebuild the PR from persistent disk store and create the sender again. Do more puts. Check if
* the remote site receives newly added events.
*
* This test can be used as DUnit test for defect #50247 reported by Indian Railways. At present,
* customer is using this configuration and which is not recommended since it can lead to event
* loss of GatewaySender events.
*/
@Ignore("Bug50247")
@Test
public void testPersistentPartitionedRegionWithGatewaySender_Restart() {
// create locator on local site
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
// create locator on remote site
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
// create receiver on remote site
createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);
// create cache in local site
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true));
vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true));
vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true));
vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true));
// create PR on remote site
vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
100, isOffHeap()));
vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
100, isOffHeap()));
// create PR on local site
vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
// start the senders on local site
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
// wait for senders to become running
vm4.invoke(waitForSenderRunnable());
vm5.invoke(waitForSenderRunnable());
vm6.invoke(waitForSenderRunnable());
vm7.invoke(waitForSenderRunnable());
// pause the senders
vm4.invoke(pauseSenderRunnable());
vm5.invoke(pauseSenderRunnable());
vm6.invoke(pauseSenderRunnable());
vm7.invoke(pauseSenderRunnable());
// start puts in region on local site
vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 3000));
LogWriterUtils.getLogWriter().info("Completed puts in the region");
// ----------------- Close and rebuild local site -------------------------------------
// kill the senders
vm4.invoke(killSenderRunnable());
vm5.invoke(killSenderRunnable());
vm6.invoke(killSenderRunnable());
vm7.invoke(killSenderRunnable());
LogWriterUtils.getLogWriter().info("Killed all the senders.");
// restart the vm
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
LogWriterUtils.getLogWriter().info("Created back the cache");
// create back the senders
vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true));
vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true));
vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true));
vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true));
LogWriterUtils.getLogWriter().info("Created the senders again");
// start the senders
startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
LogWriterUtils.getLogWriter().info("Started the senders.");
LogWriterUtils.getLogWriter().info("Waiting for senders running.");
// wait for senders running
vm4.invoke(waitForSenderRunnable());
vm5.invoke(waitForSenderRunnable());
vm6.invoke(waitForSenderRunnable());
vm7.invoke(waitForSenderRunnable());
LogWriterUtils.getLogWriter().info("All the senders are now running...");
// create PR on local site
AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase
.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, 100, isOffHeap()));
AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase
.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, 100, isOffHeap()));
AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase
.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, 100, isOffHeap()));
AsyncInvocation inv4 = vm7.invokeAsync(() -> WANTestBase
.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1, 100, isOffHeap()));
try {
inv1.join();
inv2.join();
inv3.join();
inv4.join();
} catch (InterruptedException e) {
e.printStackTrace();
fail();
}
LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
// -------------------------------------------------------------------------------------------
// start puts in region on local site
vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 3000));
LogWriterUtils.getLogWriter().info("Completed puts in the region");
vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 3000));
vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 3000));
}
/**
* LocalMaxMemory of user PR is 0 (accessor region). Parallel sender persistence is enabled. In
* this scenario, the PR for Parallel sender should be created with persistence = false. This is
* because since the region is accessor region, it won't host any buckets and hence Parallel
* sender PR is not required to be persistent. This test is added for defect # 45747.
*/
@Test
public void testParallelPropagationWithSenderPersistenceEnabledForAccessor() {
// create locator on local site
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
// create locator on remote site
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
// create receiver on remote site
createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);
// create cache in local site
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true));
vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true));
vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true));
vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true));
vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100,
isOffHeap()));
vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100,
isOffHeap()));
vm6.invoke(() -> WANTestBase.createPartitionedRegionAsAccessor(getTestMethodName() + "_PR",
"ln", 1, 100));
vm7.invoke(() -> WANTestBase.createPartitionedRegionAsAccessor(getTestMethodName() + "_PR",
"ln", 1, 100));
vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 100,
isOffHeap()));
vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 100,
isOffHeap()));
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
// start puts in region on local site
vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 1000));
LogWriterUtils.getLogWriter().info("Completed puts in the region");
vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000));
vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000));
}
/**
* Test for bug 50120 see if we can recover after deleting the parallel gateway sender files and
* not recoverying the sender when we have a persistent PR.
*
*/
@Test
public void testRemoveGatewayFromPersistentRegionOnRestart() throws Throwable {
try {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true));
vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true));
vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true));
vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true));
vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
100, isOffHeap()));
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
100, isOffHeap()));
vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
100, isOffHeap()));
vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 113));
vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 113));
vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 113));
// Bounce vm4, vm5, vm6, vm7 without the parallel queue
vm4.invoke(WANTestBase.class, "closeCache", new Object[] {});
vm5.invoke(WANTestBase.class, "closeCache", new Object[] {});
vm6.invoke(WANTestBase.class, "closeCache", new Object[] {});
vm7.invoke(WANTestBase.class, "closeCache", new Object[] {});
vm4.invoke(ParallelWANPersistenceEnabledGatewaySenderDUnitTest.class, "setIgnoreQueue",
new Object[] {true});
vm5.invoke(ParallelWANPersistenceEnabledGatewaySenderDUnitTest.class, "setIgnoreQueue",
new Object[] {true});
vm6.invoke(ParallelWANPersistenceEnabledGatewaySenderDUnitTest.class, "setIgnoreQueue",
new Object[] {true});
vm7.invoke(ParallelWANPersistenceEnabledGatewaySenderDUnitTest.class, "setIgnoreQueue",
new Object[] {true});
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
AsyncInvocation async4 = vm4.invokeAsync(() -> WANTestBase
.createPersistentPartitionedRegion(getTestMethodName(), null, 1, 100, isOffHeap()));
AsyncInvocation async5 = vm5.invokeAsync(() -> WANTestBase
.createPersistentPartitionedRegion(getTestMethodName(), null, 1, 100, isOffHeap()));
AsyncInvocation async6 = vm6.invokeAsync(() -> WANTestBase
.createPersistentPartitionedRegion(getTestMethodName(), null, 1, 100, isOffHeap()));
AsyncInvocation async7 = vm7.invokeAsync(() -> WANTestBase
.createPersistentPartitionedRegion(getTestMethodName(), null, 1, 100, isOffHeap()));
async7.getResult(30 * 1000);
async5.getResult(30 * 1000);
async6.getResult(30 * 1000);
async4.getResult(30 * 1000);
// This should succeed, because the region recovered even though
// the queue was removed.
vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 113));
} finally {
vm4.invoke(ParallelWANPersistenceEnabledGatewaySenderDUnitTest.class, "setIgnoreQueue",
new Object[] {false});
vm5.invoke(ParallelWANPersistenceEnabledGatewaySenderDUnitTest.class, "setIgnoreQueue",
new Object[] {false});
vm6.invoke(ParallelWANPersistenceEnabledGatewaySenderDUnitTest.class, "setIgnoreQueue",
new Object[] {false});
vm7.invoke(ParallelWANPersistenceEnabledGatewaySenderDUnitTest.class, "setIgnoreQueue",
new Object[] {false});
}
}
/**
* setIgnoreQueue has lots of callers by reflection
* <p>
* TODO: add DistributedRestoreSystemProperties Rule and delete this mess and make constant final
*/
public static void setIgnoreQueue(boolean shouldIgnore) {
ColocationHelper.IGNORE_UNRECOVERED_QUEUE = shouldIgnore;
}
}