blob: c872e3864435b33f4370b3349f42357a431139e5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.wan.parallel;
import static org.apache.geode.distributed.internal.DistributionConfig.OFF_HEAP_MEMORY_SIZE_NAME;
import static org.apache.geode.internal.cache.tier.sockets.Message.MAX_MESSAGE_SIZE_PROPERTY;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowable;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import util.TestException;
import org.apache.geode.GemFireIOException;
import org.apache.geode.cache.CacheWriter;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.internal.cache.BucketRegion;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PartitionedRegionDataStore;
import org.apache.geode.internal.cache.tier.sockets.MessageTooLargeException;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.GatewaySenderException;
import org.apache.geode.internal.cache.wan.WANTestBase;
import org.apache.geode.internal.offheap.MemoryAllocatorImpl;
import org.apache.geode.internal.offheap.OffHeapRegionEntryHelper;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.RMIException;
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
import org.apache.geode.test.dunit.rules.MemberVM;
import org.apache.geode.test.junit.categories.WanTest;
/**
* DUnit test for operations on ParallelGatewaySender
*/
@Category(WanTest.class)
@SuppressWarnings("serial")
public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
@Rule
public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
@Rule
public DistributedRestoreSystemProperties restoreSystemProperties =
new DistributedRestoreSystemProperties();
@Before
public void setUp() throws Exception {
addIgnoredException("Broken pipe||Unexpected IOException");
}
@Test(timeout = 300_000)
public void testStopOneConcurrentGatewaySenderWithSSL() throws Exception {
Integer lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1));
Integer nyPort = vm1.invoke(() -> createFirstRemoteLocator(2, lnPort));
createCacheInVMs(nyPort, vm2, vm3);
vm2.invoke(() -> createReceiverWithSSL(nyPort));
vm3.invoke(() -> createReceiverWithSSL(nyPort));
vm4.invoke(() -> createCacheWithSSL(lnPort));
vm5.invoke(() -> createCacheWithSSL(lnPort));
vm4.invoke(() -> createConcurrentSender("ln", 2, true, 100, 10, false, true, null, true, 5,
GatewaySender.OrderPolicy.KEY));
vm5.invoke(() -> createConcurrentSender("ln", 2, true, 100, 10, false, true, null, true, 5,
GatewaySender.OrderPolicy.KEY));
String regionName = getTestMethodName() + "_PR";
vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap()));
vm5.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap()));
vm2.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap()));
vm3.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap()));
startSenderInVMs("ln", vm4, vm5);
vm4.invoke(() -> doPuts(regionName, 10));
vm4.invoke(() -> stopSender("ln"));
vm4.invoke(() -> startSender("ln"));
vm4.invoke(() -> doPuts(regionName, 10));
vm5.invoke(() -> stopSender("ln"));
vm5.invoke(() -> startSender("ln"));
}
@Test
public void testParallelGatewaySenderWithoutStarting() {
Integer[] locatorPorts = createLNAndNYLocators();
Integer lnPort = locatorPorts[0];
Integer nyPort = locatorPorts[1];
createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, false);
vm4.invoke(() -> doPuts(getTestMethodName() + "_PR", 1000));
vm4.invoke(() -> verifySenderStoppedState("ln"));
vm5.invoke(() -> verifySenderStoppedState("ln"));
vm6.invoke(() -> verifySenderStoppedState("ln"));
vm7.invoke(() -> verifySenderStoppedState("ln"));
validateRegionSizes(getTestMethodName() + "_PR", 0, vm2, vm3);
}
/**
* ParallelGatewaySender should not be started on Accessor node
*
* <p>
* TRAC #44323: NewWan: ParallelGatewaySender should not be started on Accessor Node
*/
@Test
public void testParallelGatewaySenderStartOnAccessorNode() {
Integer[] locatorPorts = createLNAndNYLocators();
Integer lnPort = locatorPorts[0];
Integer nyPort = locatorPorts[1];
createSendersReceiversAndPartitionedRegion(lnPort, nyPort, true, true);
vm6.invoke(() -> waitForSenderRunningState("ln"));
vm7.invoke(() -> waitForSenderRunningState("ln"));
vm4.invoke(() -> doPuts(getTestMethodName() + "_PR", 10));
vm4.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln"));
vm5.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln"));
validateRegionSizes(getTestMethodName() + "_PR", 10, vm2, vm3);
}
/**
* Normal scenario in which the sender is paused in between.
*/
@Test
public void testParallelPropagationSenderPause() throws Exception {
Integer[] locatorPorts = createLNAndNYLocators();
Integer lnPort = locatorPorts[0];
Integer nyPort = locatorPorts[1];
createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true);
// make sure all the senders are running before doing any puts
waitForSendersRunning();
// FIRST RUN: now, the senders are started. So, start the puts
vm4.invoke(() -> doPuts(getTestMethodName() + "_PR", 100));
// now, pause all of the senders
vm4.invoke(() -> pauseSender("ln"));
vm5.invoke(() -> pauseSender("ln"));
vm6.invoke(() -> pauseSender("ln"));
vm7.invoke(() -> pauseSender("ln"));
// SECOND RUN: keep one thread doing puts to the region
vm4.invokeAsync(() -> doPuts(getTestMethodName() + "_PR", 1000));
// verify region size remains on remote vm and is restricted below a specified limit (i.e.
// number of puts in the first run)
vm2.invoke(() -> validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 100));
}
/**
* Normal scenario in which a paused sender is resumed.
*/
@Test
public void testParallelPropagationSenderResume() throws Exception {
Integer[] locatorPorts = createLNAndNYLocators();
Integer lnPort = locatorPorts[0];
Integer nyPort = locatorPorts[1];
createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true);
// make sure all the senders are running before doing any puts
waitForSendersRunning();
int numPuts = 1000;
// now, the senders are started. So, start the puts
AsyncInvocation async = vm4.invokeAsync(() -> doPuts(getTestMethodName() + "_PR", numPuts));
// now, pause all of the senders
vm4.invoke(() -> pauseSender("ln"));
vm5.invoke(() -> pauseSender("ln"));
vm6.invoke(() -> pauseSender("ln"));
vm7.invoke(() -> pauseSender("ln"));
// resume the senders
vm4.invoke(() -> resumeSender("ln"));
vm5.invoke(() -> resumeSender("ln"));
vm6.invoke(() -> resumeSender("ln"));
vm7.invoke(() -> resumeSender("ln"));
async.await(2, TimeUnit.MINUTES);
validateParallelSenderQueueAllBucketsDrained();
// find the region size on remote vm
vm2.invoke(() -> validateRegionSize(getTestMethodName() + "_PR", numPuts));
}
/**
* Negative scenario in which a sender that is stopped (and not paused) is resumed. Expected:
* resume is only valid for pause. If a sender which is stopped is resumed, it will not be started
* again.
*/
@Test
public void testParallelPropagationSenderResumeNegativeScenario() throws Exception {
Integer[] locatorPorts = createLNAndNYLocators();
Integer lnPort = locatorPorts[0];
Integer nyPort = locatorPorts[1];
createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5);
vm4.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true));
vm5.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true));
vm4.invoke(
() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap()));
vm5.invoke(
() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap()));
vm2.invoke(
() -> createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 100, isOffHeap()));
vm3.invoke(
() -> createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 100, isOffHeap()));
startSenderInVMs("ln", vm4, vm5);
// wait till the senders are running
vm4.invoke(() -> waitForSenderRunningState("ln"));
vm5.invoke(() -> waitForSenderRunningState("ln"));
// start the puts
vm4.invoke(() -> doPuts(getTestMethodName() + "_PR", 100));
// let the queue drain completely
vm4.invoke(() -> validateQueueContents("ln", 0));
// stop the senders
vm4.invoke(() -> stopSender("ln"));
vm5.invoke(() -> stopSender("ln"));
// now, try to resume a stopped sender
vm4.invoke(() -> resumeSender("ln"));
vm5.invoke(() -> resumeSender("ln"));
// do more puts
vm4.invoke(() -> doPuts(getTestMethodName() + "_PR", 1000));
// validate region size on remote vm to contain only the events put in local site
// before the senders are stopped.
vm2.invoke(() -> validateRegionSize(getTestMethodName() + "_PR", 100));
}
/**
* Normal scenario in which a sender is stopped.
*/
@Test
public void testParallelPropagationSenderStop() throws Exception {
addIgnoredException("Broken pipe");
Integer[] locatorPorts = createLNAndNYLocators();
Integer lnPort = locatorPorts[0];
Integer nyPort = locatorPorts[1];
createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true);
// make sure all the senders are running before doing any puts
waitForSendersRunning();
// FIRST RUN: now, the senders are started. So, do some of the puts
vm4.invoke(() -> doPuts(getTestMethodName() + "_PR", 100));
// now, stop all of the senders
stopSenders();
// SECOND RUN: keep one thread doing puts
vm4.invokeAsync(() -> doPuts(getTestMethodName() + "_PR", 1000));
// verify region size remains on remote vm and is restricted below a specified limit (number of
// puts in the first run)
vm2.invoke(() -> validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 100));
}
/**
* Normal scenario in which a sender is stopped and then started again.
*/
@Test
public void testParallelPropagationSenderStartAfterStop() throws Exception {
addIgnoredException("Broken pipe");
Integer[] locatorPorts = createLNAndNYLocators();
Integer lnPort = locatorPorts[0];
Integer nyPort = locatorPorts[1];
String regionName = getTestMethodName() + "_PR";
createCacheInVMs(nyPort, vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm2.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap()));
vm3.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap()));
createReceiverInVMs(vm2, vm3);
vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap()));
vm5.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap()));
vm6.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap()));
vm7.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap()));
vm4.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true));
vm5.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true));
vm6.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true));
vm7.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true));
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
// make sure all the senders are running before doing any puts
vm4.invoke(() -> waitForSenderRunningState("ln"));
vm5.invoke(() -> waitForSenderRunningState("ln"));
vm6.invoke(() -> waitForSenderRunningState("ln"));
vm7.invoke(() -> waitForSenderRunningState("ln"));
// FIRST RUN: now, the senders are started. So, do some of the puts
vm4.invoke(() -> doPuts(regionName, 200));
// now, stop all of the senders
vm4.invoke(() -> stopSender("ln"));
vm5.invoke(() -> stopSender("ln"));
vm6.invoke(() -> stopSender("ln"));
vm7.invoke(() -> stopSender("ln"));
// Region size on remote site should remain same and below the number of puts done in the FIRST
// RUN
vm2.invoke(() -> validateRegionSizeRemainsSame(regionName, 200));
// SECOND RUN: do some of the puts after the senders are stopped
vm4.invoke(() -> doPuts(regionName, 1000));
// Region size on remote site should remain same and below the number of puts done in the FIRST
// RUN
vm2.invoke(() -> validateRegionSizeRemainsSame(regionName, 200));
// start the senders again
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
vm4.invoke(() -> waitForSenderRunningState("ln"));
vm5.invoke(() -> waitForSenderRunningState("ln"));
vm6.invoke(() -> waitForSenderRunningState("ln"));
vm7.invoke(() -> waitForSenderRunningState("ln"));
// Region size on remote site should remain same and below the number of puts done in the FIRST
// RUN
vm2.invoke(() -> validateRegionSizeRemainsSame(regionName, 200));
// SECOND RUN: do some more puts
vm4.invoke(() -> doPuts(regionName, 1000));
// verify all the buckets on all the sender nodes are drained
validateParallelSenderQueueAllBucketsDrained();
// verify the events propagate to remote site
vm2.invoke(() -> validateRegionSize(regionName, 1000));
vm4.invoke(() -> validateQueueSizeStat("ln", 0));
vm5.invoke(() -> validateQueueSizeStat("ln", 0));
vm6.invoke(() -> validateQueueSizeStat("ln", 0));
vm7.invoke(() -> validateQueueSizeStat("ln", 0));
}
/**
* Normal scenario in which a sender is stopped and then started again. Differs from above test
* case in the way that when the sender is starting from stopped state, puts are simultaneously
* happening on the region by another thread.
*/
@Test
public void testParallelPropagationSenderStartAfterStop_Scenario2() throws Exception {
addIgnoredException("Broken pipe");
Integer[] locatorPorts = createLNAndNYLocators();
Integer lnPort = locatorPorts[0];
Integer nyPort = locatorPorts[1];
createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true);
// make sure all the senders are running before doing any puts
waitForSendersRunning();
// FIRST RUN: now, the senders are started. So, do some of the puts
vm4.invoke(() -> doPuts(getTestMethodName() + "_PR", 200));
// Make sure the puts make it to the remote side
vm2.invoke(() -> validateRegionSize(getTestMethodName() + "_PR", 200, 120000));
vm3.invoke(() -> validateRegionSize(getTestMethodName() + "_PR", 200, 120000));
// now, stop all of the senders
stopSenders();
vm2.invoke(() -> validateRegionSize(getTestMethodName() + "_PR", 200, 120000));
vm3.invoke(() -> validateRegionSize(getTestMethodName() + "_PR", 200, 120000));
// SECOND RUN: do some of the puts after the senders are stopped
vm4.invoke(() -> doPuts(getTestMethodName() + "_PR", 1000));
// Region size on remote site should remain same and below the number of puts done in the FIRST
// RUN
vm2.invoke(() -> validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200));
// SECOND RUN: start async puts on region
ArrayList<Integer> vm4List = null;
ArrayList<Integer> vm5List = null;
ArrayList<Integer> vm6List = null;
ArrayList<Integer> vm7List = null;
boolean foundEventsDroppedDueToPrimarySenderNotRunning = false;
int count = 0;
do {
stopSenders();
AsyncInvocation async = vm4.invokeAsync(() -> doPuts(getTestMethodName() + "_PR", 5000));
// when puts are happening by another thread, start the senders
startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
async.join();
vm4List =
(ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
vm5List =
(ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
vm6List =
(ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
vm7List =
(ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
if (vm4List.get(0) + vm5List.get(0) + vm6List.get(0) + vm7List.get(0) > 0) {
foundEventsDroppedDueToPrimarySenderNotRunning = true;
}
count++;
} while (foundEventsDroppedDueToPrimarySenderNotRunning == false && count < 5);
assertThat(foundEventsDroppedDueToPrimarySenderNotRunning);
// verify all the buckets on all the sender nodes are drained
validateParallelSenderQueueAllBucketsDrained();
// verify that the queue size ultimately becomes zero. That means all the events propagate to
// remote site.
vm4.invoke(() -> validateQueueContents("ln", 0));
}
/**
* Normal scenario in which a sender is stopped and then started again on accessor node.
*/
@Test
public void testParallelPropagationSenderStartAfterStopOnAccessorNode() throws Exception {
addIgnoredException("Broken pipe");
addIgnoredException("Connection reset");
addIgnoredException("Unexpected IOException");
Integer[] locatorPorts = createLNAndNYLocators();
Integer lnPort = locatorPorts[0];
Integer nyPort = locatorPorts[1];
createSendersReceiversAndPartitionedRegion(lnPort, nyPort, true, true);
// make sure all the senders are not running on accessor nodes and running on non-accessor nodes
waitForSendersRunning();
// FIRST RUN: now, the senders are started. So, do some of the puts
vm4.invoke(() -> doPuts(getTestMethodName() + "_PR", 200));
// now, stop all of the senders
stopSenders();
// SECOND RUN: do some of the puts after the senders are stopped
vm4.invoke(() -> doPuts(getTestMethodName() + "_PR", 1000));
// Region size on remote site should remain same and below the number of puts done in the FIRST
// RUN
vm2.invoke(() -> validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200));
// start the senders again
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
// Region size on remote site should remain same and below the number of puts done in the FIRST
// RUN
vm2.invoke(() -> validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200));
// SECOND RUN: do some more puts
vm4.invoke(() -> doPuts(getTestMethodName() + "_PR", 1000));
// verify all buckets drained only on non-accessor nodes.
vm4.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln"));
vm5.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln"));
// verify the events propagate to remote site
vm2.invoke(() -> validateRegionSize(getTestMethodName() + "_PR", 1000));
}
/**
* Normal scenario in which a combinations of start, pause, resume operations is tested
*/
@Test
public void testStartPauseResumeParallelGatewaySender() throws Exception {
Integer[] locatorPorts = createLNAndNYLocators();
Integer lnPort = locatorPorts[0];
Integer nyPort = locatorPorts[1];
createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true);
vm4.invoke(() -> doPuts(getTestMethodName() + "_PR", 1000));
// Since puts are already done on userPR, it will have the buckets created.
// During sender start, it will wait until those buckets are created for shadowPR as well.
// Start the senders in async threads, so colocation of shadowPR will be complete and
// missing buckets will be created in PRHARedundancyProvider.createMissingBuckets().
startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
waitForSendersRunning();
vm4.invoke(() -> doPuts(getTestMethodName() + "_PR", 5000));
vm4.invoke(() -> pauseSender("ln"));
vm5.invoke(() -> pauseSender("ln"));
vm6.invoke(() -> pauseSender("ln"));
vm7.invoke(() -> pauseSender("ln"));
vm4.invoke(() -> verifySenderPausedState("ln"));
vm5.invoke(() -> verifySenderPausedState("ln"));
vm6.invoke(() -> verifySenderPausedState("ln"));
vm7.invoke(() -> verifySenderPausedState("ln"));
AsyncInvocation async = vm4.invokeAsync(() -> doPuts(getTestMethodName() + "_PR", 1000));
vm4.invoke(() -> resumeSender("ln"));
vm5.invoke(() -> resumeSender("ln"));
vm6.invoke(() -> resumeSender("ln"));
vm7.invoke(() -> resumeSender("ln"));
vm4.invoke(() -> verifySenderResumedState("ln"));
vm5.invoke(() -> verifySenderResumedState("ln"));
vm6.invoke(() -> verifySenderResumedState("ln"));
vm7.invoke(() -> verifySenderResumedState("ln"));
async.await();
// verify all buckets drained on all sender nodes.
validateParallelSenderQueueAllBucketsDrained();
validateRegionSizes(getTestMethodName() + "_PR", 5000, vm2, vm3);
}
/**
* Since the sender is attached to a region and in use, it can not be destroyed. Hence, exception
* is thrown by the sender API.
*/
@Test
public void testDestroyParallelGatewaySenderExceptionScenario() {
Integer[] locatorPorts = createLNAndNYLocators();
Integer lnPort = locatorPorts[0];
Integer nyPort = locatorPorts[1];
createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true);
// make sure all the senders are running before doing any puts
waitForSendersRunning();
vm4.invoke(() -> doPuts(getTestMethodName() + "_PR", 1000));
// try destroying on couple of nodes
Throwable caughtException = catchThrowable(() -> vm4.invoke(() -> destroySender("ln")));
assertThat(caughtException).isInstanceOf(RMIException.class);
assertThat(caughtException.getCause()).isInstanceOf(GatewaySenderException.class);
caughtException = catchThrowable(() -> vm5.invoke(() -> destroySender("ln")));
assertThat(caughtException).isInstanceOf(RMIException.class);
assertThat(caughtException.getCause()).isInstanceOf(GatewaySenderException.class);
vm2.invoke(() -> validateRegionSize(getTestMethodName() + "_PR", 1000));
}
@Test
public void testDestroyParallelGatewaySender() {
Integer[] locatorPorts = createLNAndNYLocators();
Integer lnPort = locatorPorts[0];
Integer nyPort = locatorPorts[1];
createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true);
// make sure all the senders are running
waitForSendersRunning();
vm4.invoke(() -> doPuts(getTestMethodName() + "_PR", 1000));
// stop the sender and remove from region before calling destroy on it
stopSenders();
vm4.invoke(() -> removeSenderFromTheRegion("ln", getTestMethodName() + "_PR"));
vm5.invoke(() -> removeSenderFromTheRegion("ln", getTestMethodName() + "_PR"));
vm6.invoke(() -> removeSenderFromTheRegion("ln", getTestMethodName() + "_PR"));
vm7.invoke(() -> removeSenderFromTheRegion("ln", getTestMethodName() + "_PR"));
vm4.invoke(() -> destroySender("ln"));
vm5.invoke(() -> destroySender("ln"));
vm6.invoke(() -> destroySender("ln"));
vm7.invoke(() -> destroySender("ln"));
vm4.invoke(() -> verifySenderDestroyed("ln", true));
vm5.invoke(() -> verifySenderDestroyed("ln", true));
vm6.invoke(() -> verifySenderDestroyed("ln", true));
vm7.invoke(() -> verifySenderDestroyed("ln", true));
}
@Test
public void testParallelGatewaySenderMessageTooLargeException() {
vm4.invoke(() -> System.setProperty(MAX_MESSAGE_SIZE_PROPERTY, String.valueOf(1024 * 1024)));
Integer[] locatorPorts = createLNAndNYLocators();
Integer lnPort = locatorPorts[0];
Integer nyPort = locatorPorts[1];
// Create and start sender with reduced maximum message size and 1 dispatcher thread
String regionName = getTestMethodName() + "_PR";
vm4.invoke(() -> createCache(lnPort));
vm4.invoke(() -> setNumDispatcherThreadsForTheRun(1));
vm4.invoke(() -> createSender("ln", 2, true, 100, 100, false, false, null, false));
vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 0, 100, isOffHeap()));
// Do puts
int numPuts = 200;
vm4.invoke(() -> doPuts(regionName, numPuts, new byte[11000]));
validateRegionSizes(regionName, numPuts, vm4);
// Start receiver
addIgnoredException(MessageTooLargeException.class.getName(), vm4);
addIgnoredException(GemFireIOException.class.getName(), vm4);
vm2.invoke(() -> createCache(nyPort));
vm2.invoke(() -> createPartitionedRegion(regionName, null, 0, 100, isOffHeap()));
vm2.invoke(() -> createReceiver());
validateRegionSizes(regionName, numPuts, vm2);
vm4.invoke(() -> {
AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender("ln");
assertThat(sender.getStatistics().getBatchesResized()).isGreaterThan(0);
});
}
@Test
public void testParallelGatewaySenderConcurrentPutClearNoOffheapOrphans()
throws Exception {
MemberVM locator = clusterStartupRule.startLocatorVM(1, new Properties());
Properties properties = new Properties();
properties.put(OFF_HEAP_MEMORY_SIZE_NAME, "100");
MemberVM server = clusterStartupRule.startServerVM(2, properties, locator.getPort());
final String regionName = "portfolios";
final String gatewaySenderId = "ln";
server.invoke(() -> {
IgnoredException ie = addIgnoredException("could not get remote locator");
InternalCache cache = ClusterStartupRule.getCache();
GatewaySender sender =
cache.createGatewaySenderFactory().setParallel(true).create(gatewaySenderId, 1);
Region userRegion = cache.createRegionFactory(RegionShortcut.PARTITION).setOffHeap(true)
.addGatewaySenderId("ln").create(regionName);
PartitionedRegion shadowRegion = (PartitionedRegion) ((AbstractGatewaySender) sender)
.getEventProcessor().getQueue().getRegion();
CacheWriter mockCacheWriter = mock(CacheWriter.class);
CountDownLatch cacheWriterLatch = new CountDownLatch(1);
CountDownLatch shadowRegionClearLatch = new CountDownLatch(1);
doAnswer(invocation -> {
// The cache writer is invoked between when the region entry is created with value of
// Token.REMOVED_PHASE_1 and when it is replaced with the actual GatewaySenderEvent.
// We use this hook to trigger the clear logic when the token is still in the
// region entry.
cacheWriterLatch.countDown();
// Wait until the clear is complete before putting the actual GatewaySenderEvent in the
// region entry.
shadowRegionClearLatch.await();
return null;
}).when(mockCacheWriter).beforeCreate(any());
shadowRegion.setCacheWriter(mockCacheWriter);
ExecutorService service = Executors.newFixedThreadPool(2);
List<Callable<Object>> callables = new ArrayList<>();
// In one thread, we are going to put some test data in the user region,
// which will eventually put the GatewaySenderEvent into the shadow region
callables.add(Executors.callable(() -> {
try {
userRegion.put("testKey", "testValue");
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}));
// In another thread, we are clear the shadow region's buckets. If the region entry is a
// Token.REMOVED_PHASE_1 at this time, a leak is possible. We can guarantee that the
// Token.REMOVED_PHASE_1 is present by using the mocked cache writer defined
// above.
callables.add(Executors.callable(() -> {
try {
OffHeapRegionEntryHelper.doWithOffHeapClear(new Runnable() {
@Override
public void run() {
// Wait for the cache writer to be invoked to release this countdown latch.
// This guarantees that the region entry will contain a Token.REMOVED_PHASE_1.
try {
cacheWriterLatch.await();
} catch (InterruptedException e) {
throw new TestException(
"Thread was interrupted while waiting for mocked cache writer to be invoked");
}
clearShadowBucketRegions(shadowRegion);
// Signal to the cache writer that the clear is complete and the put can continue.
shadowRegionClearLatch.countDown();
}
});
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}));
List<Future<Object>> futures = service.invokeAll(callables, 10, TimeUnit.SECONDS);
for (Future<Object> future : futures) {
try {
future.get();
} catch (Exception ex) {
throw new TestException(
"Exception thrown while executing put and clear concurrently",
ex);
}
}
userRegion.close();
await("Waiting for off-heap to be freed").until(
() -> 0 == ((MemoryAllocatorImpl) cache.getOffHeapStore()).getOrphans(cache).size());
});
}
private void clearShadowBucketRegions(PartitionedRegion shadowRegion) {
PartitionedRegionDataStore.BucketVisitor bucketVisitor =
new PartitionedRegionDataStore.BucketVisitor() {
@Override
public void visit(Integer bucketId, Region r) {
((BucketRegion) r).clearEntries(null);
}
};
shadowRegion.getDataStore().visitBuckets(bucketVisitor);
}
private void createSendersReceiversAndPartitionedRegion(Integer lnPort, Integer nyPort,
boolean createAccessors, boolean startSenders) {
createSendersAndReceivers(lnPort, nyPort);
createPartitionedRegions(createAccessors);
if (startSenders) {
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
}
}
private void createSendersAndReceivers(Integer lnPort, Integer nyPort) {
createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true));
vm5.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true));
vm6.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true));
vm7.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true));
}
private void createPartitionedRegions(boolean createAccessors) {
String regionName = getTestMethodName() + "_PR";
vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap()));
vm5.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap()));
if (createAccessors) {
vm6.invoke(() -> createPartitionedRegionAsAccessor(regionName, "ln", 1, 100));
vm7.invoke(() -> createPartitionedRegionAsAccessor(regionName, "ln", 1, 100));
} else {
vm6.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap()));
vm7.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap()));
}
vm2.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap()));
vm3.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap()));
}
private void stopSenders() {
vm4.invoke(() -> stopSender("ln"));
vm5.invoke(() -> stopSender("ln"));
vm6.invoke(() -> stopSender("ln"));
vm7.invoke(() -> stopSender("ln"));
}
private void waitForSendersRunning() {
vm4.invoke(() -> waitForSenderRunningState("ln"));
vm5.invoke(() -> waitForSenderRunningState("ln"));
vm6.invoke(() -> waitForSenderRunningState("ln"));
vm7.invoke(() -> waitForSenderRunningState("ln"));
}
private void validateParallelSenderQueueAllBucketsDrained() {
vm4.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln"));
vm5.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln"));
vm6.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln"));
vm7.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln"));
}
}