| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache.wan.parallel; |
| |
| import static org.apache.geode.distributed.internal.DistributionConfig.OFF_HEAP_MEMORY_SIZE_NAME; |
| import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPorts; |
| import static org.apache.geode.internal.cache.tier.sockets.Message.MAX_MESSAGE_SIZE_PROPERTY; |
| import static org.apache.geode.internal.util.ArrayUtils.asList; |
| import static org.apache.geode.test.awaitility.GeodeAwaitility.await; |
| import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException; |
| import static org.assertj.core.api.Assertions.assertThat; |
| import static org.assertj.core.api.Assertions.catchThrowable; |
| import static org.mockito.ArgumentMatchers.any; |
| import static org.mockito.Mockito.doAnswer; |
| import static org.mockito.Mockito.mock; |
| |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.stream.Collectors; |
| import java.util.stream.IntStream; |
| import java.util.stream.Stream; |
| |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| import util.TestException; |
| |
| import org.apache.geode.GemFireIOException; |
| import org.apache.geode.cache.CacheWriter; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.RegionShortcut; |
| import org.apache.geode.cache.wan.GatewayEventFilter; |
| import org.apache.geode.cache.wan.GatewaySender; |
| import org.apache.geode.distributed.internal.ClusterDistributionManager; |
| import org.apache.geode.distributed.internal.ClusterOperationExecutors; |
| import org.apache.geode.distributed.internal.DistributionMessage; |
| import org.apache.geode.distributed.internal.DistributionMessageObserver; |
| import org.apache.geode.internal.cache.BucketRegion; |
| import org.apache.geode.internal.cache.InternalCache; |
| import org.apache.geode.internal.cache.PartitionedRegion; |
| import org.apache.geode.internal.cache.PartitionedRegionDataStore; |
| import org.apache.geode.internal.cache.tier.sockets.MessageTooLargeException; |
| import org.apache.geode.internal.cache.wan.AbstractGatewaySender; |
| import org.apache.geode.internal.cache.wan.GatewaySenderException; |
| import org.apache.geode.internal.cache.wan.WANTestBase; |
| import org.apache.geode.internal.offheap.MemoryAllocatorImpl; |
| import org.apache.geode.internal.offheap.OffHeapClearRequired; |
| import org.apache.geode.test.awaitility.GeodeAwaitility; |
| import org.apache.geode.test.dunit.AsyncInvocation; |
| import org.apache.geode.test.dunit.RMIException; |
| import org.apache.geode.test.dunit.VM; |
| import org.apache.geode.test.dunit.rules.ClusterStartupRule; |
| import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties; |
| import org.apache.geode.test.dunit.rules.MemberVM; |
| import org.apache.geode.test.junit.categories.WanTest; |
| import org.apache.geode.test.junit.rules.serializable.SerializableTestName; |
| |
| /** |
| * DUnit test for operations on ParallelGatewaySender |
| */ |
| @Category(WanTest.class) |
| public class ParallelGatewaySenderOperationsDistributedTest extends WANTestBase { |
| |
| @Rule |
| public SerializableTestName testName = new SerializableTestName(); |
| |
| @Rule |
| public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(); |
| |
| @Rule |
| public DistributedRestoreSystemProperties restoreSystemProperties = |
| new DistributedRestoreSystemProperties(); |
| |
| @Before |
| public void setUp() throws Exception { |
| addIgnoredException("Broken pipe||Unexpected IOException"); |
| addIgnoredException("Connection reset"); |
| addIgnoredException("Connection refused"); |
| addIgnoredException("could not get remote locator information"); |
| } |
| |
| @After |
| public void tearDown() { |
| for (VM vm : asList(vm4, vm5, vm6, vm7)) { |
| vm.invoke(() -> DistributionMessageObserver.setInstance(null)); |
| } |
| } |
| |
| @Test(timeout = 300_000) |
| public void testStopOneConcurrentGatewaySenderWithSSL() { |
| Integer lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1)); |
| Integer nyPort = vm1.invoke(() -> createFirstRemoteLocator(2, lnPort)); |
| |
| createCacheInVMs(nyPort, vm2, vm3); |
| vm2.invoke(() -> createReceiverWithSSL(nyPort)); |
| vm3.invoke(() -> createReceiverWithSSL(nyPort)); |
| |
| vm4.invoke(() -> createCacheWithSSL(lnPort)); |
| vm5.invoke(() -> createCacheWithSSL(lnPort)); |
| |
| vm4.invoke(() -> createConcurrentSender("ln", 2, true, 100, 10, false, true, null, true, 5, |
| GatewaySender.OrderPolicy.KEY)); |
| vm5.invoke(() -> createConcurrentSender("ln", 2, true, 100, 10, false, true, null, true, 5, |
| GatewaySender.OrderPolicy.KEY)); |
| |
| String regionName = getUniqueName() + "_PR"; |
| vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); |
| vm5.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); |
| |
| vm2.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); |
| vm3.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); |
| |
| startSenderInVMs("ln", vm4, vm5); |
| |
| vm4.invoke(() -> doPuts(regionName, 10)); |
| |
| vm4.invoke(() -> stopSender("ln")); |
| vm4.invoke(() -> startSender("ln")); |
| |
| vm4.invoke(() -> doPuts(regionName, 10)); |
| |
| vm5.invoke(() -> stopSender("ln")); |
| vm5.invoke(() -> startSender("ln")); |
| } |
| |
| @Test |
| public void testParallelGatewaySenderWithoutStarting() { |
| Integer[] locatorPorts = createLNAndNYLocators(); |
| Integer lnPort = locatorPorts[0]; |
| Integer nyPort = locatorPorts[1]; |
| |
| createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, false); |
| |
| vm4.invoke(() -> doPuts(getUniqueName() + "_PR", 1000)); |
| |
| vm4.invoke(() -> verifySenderStoppedState("ln")); |
| vm5.invoke(() -> verifySenderStoppedState("ln")); |
| vm6.invoke(() -> verifySenderStoppedState("ln")); |
| vm7.invoke(() -> verifySenderStoppedState("ln")); |
| |
| validateRegionSizes(getUniqueName() + "_PR", 0, vm2, vm3); |
| } |
| |
| /** |
| * ParallelGatewaySender should not be started on Accessor node |
| * |
| * <p> |
| * TRAC #44323: NewWan: ParallelGatewaySender should not be started on Accessor Node |
| */ |
| @Test |
| public void testParallelGatewaySenderStartOnAccessorNode() { |
| Integer[] locatorPorts = createLNAndNYLocators(); |
| Integer lnPort = locatorPorts[0]; |
| Integer nyPort = locatorPorts[1]; |
| |
| createSendersReceiversAndPartitionedRegion(lnPort, nyPort, true, true); |
| |
| vm6.invoke(() -> waitForSenderRunningState("ln")); |
| vm7.invoke(() -> waitForSenderRunningState("ln")); |
| |
| vm4.invoke(() -> doPuts(getUniqueName() + "_PR", 10)); |
| |
| vm4.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln")); |
| vm5.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln")); |
| |
| validateRegionSizes(getUniqueName() + "_PR", 10, vm2, vm3); |
| } |
| |
| /** |
| * Normal scenario in which the sender is paused in between. |
| */ |
| @Test |
| public void testParallelPropagationSenderPause() { |
| Integer[] locatorPorts = createLNAndNYLocators(); |
| Integer lnPort = locatorPorts[0]; |
| Integer nyPort = locatorPorts[1]; |
| |
| createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true); |
| |
| // make sure all the senders are running before doing any puts |
| waitForSendersRunning(); |
| |
| // FIRST RUN: now, the senders are started. So, start the puts |
| vm4.invoke(() -> doPuts(getUniqueName() + "_PR", 100)); |
| |
| // now, pause all of the senders |
| vm4.invoke(() -> pauseSender("ln")); |
| vm5.invoke(() -> pauseSender("ln")); |
| vm6.invoke(() -> pauseSender("ln")); |
| vm7.invoke(() -> pauseSender("ln")); |
| |
| // SECOND RUN: keep one thread doing puts to the region |
| vm4.invokeAsync(() -> doPuts(getUniqueName() + "_PR", 1000)); |
| |
| // verify region size remains on remote vm and is restricted below a specified limit (i.e. |
| // number of puts in the first run) |
| vm2.invoke(() -> validateRegionSizeRemainsSame(getUniqueName() + "_PR", 100)); |
| } |
| |
| /** |
| * Normal scenario in which a paused sender is resumed. |
| */ |
| @Test |
| public void testParallelPropagationSenderResume() throws Exception { |
| Integer[] locatorPorts = createLNAndNYLocators(); |
| Integer lnPort = locatorPorts[0]; |
| Integer nyPort = locatorPorts[1]; |
| |
| createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true); |
| |
| // make sure all the senders are running before doing any puts |
| waitForSendersRunning(); |
| |
| int numPuts = 1000; |
| // now, the senders are started. So, start the puts |
| AsyncInvocation<Void> async = vm4.invokeAsync(() -> doPuts(getUniqueName() + "_PR", numPuts)); |
| |
| // now, pause all of the senders |
| vm4.invoke(() -> pauseSender("ln")); |
| vm5.invoke(() -> pauseSender("ln")); |
| vm6.invoke(() -> pauseSender("ln")); |
| vm7.invoke(() -> pauseSender("ln")); |
| |
| // resume the senders |
| vm4.invoke(() -> resumeSender("ln")); |
| vm5.invoke(() -> resumeSender("ln")); |
| vm6.invoke(() -> resumeSender("ln")); |
| vm7.invoke(() -> resumeSender("ln")); |
| |
| async.await(2, TimeUnit.MINUTES); |
| validateParallelSenderQueueAllBucketsDrained(); |
| |
| // find the region size on remote vm |
| vm2.invoke(() -> validateRegionSize(getUniqueName() + "_PR", numPuts)); |
| } |
| |
| /** |
| * Negative scenario in which a sender that is stopped (and not paused) is resumed. Expected: |
| * resume is only valid for pause. If a sender which is stopped is resumed, it will not be started |
| * again. |
| */ |
| @Test |
| public void testParallelPropagationSenderResumeNegativeScenario() { |
| Integer[] locatorPorts = createLNAndNYLocators(); |
| Integer lnPort = locatorPorts[0]; |
| Integer nyPort = locatorPorts[1]; |
| |
| createCacheInVMs(nyPort, vm2, vm3); |
| createReceiverInVMs(vm2, vm3); |
| |
| createCacheInVMs(lnPort, vm4, vm5); |
| |
| vm4.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true)); |
| vm5.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true)); |
| |
| vm4.invoke( |
| () -> createPartitionedRegion(getUniqueName() + "_PR", "ln", 1, 100, isOffHeap())); |
| vm5.invoke( |
| () -> createPartitionedRegion(getUniqueName() + "_PR", "ln", 1, 100, isOffHeap())); |
| |
| vm2.invoke( |
| () -> createPartitionedRegion(getUniqueName() + "_PR", null, 1, 100, isOffHeap())); |
| vm3.invoke( |
| () -> createPartitionedRegion(getUniqueName() + "_PR", null, 1, 100, isOffHeap())); |
| |
| startSenderInVMs("ln", vm4, vm5); |
| |
| // wait till the senders are running |
| vm4.invoke(() -> waitForSenderRunningState("ln")); |
| vm5.invoke(() -> waitForSenderRunningState("ln")); |
| |
| // start the puts |
| vm4.invoke(() -> doPuts(getUniqueName() + "_PR", 100)); |
| |
| // let the queue drain completely |
| vm4.invoke(() -> validateQueueContents("ln", 0)); |
| |
| // stop the senders |
| vm4.invoke(() -> stopSender("ln")); |
| vm5.invoke(() -> stopSender("ln")); |
| |
| // now, try to resume a stopped sender |
| vm4.invoke(() -> resumeSender("ln")); |
| vm5.invoke(() -> resumeSender("ln")); |
| |
| // do more puts |
| vm4.invoke(() -> doPuts(getUniqueName() + "_PR", 1000)); |
| |
| // validate region size on remote vm to contain only the events put in local site |
| // before the senders are stopped. |
| vm2.invoke(() -> validateRegionSize(getUniqueName() + "_PR", 100)); |
| } |
| |
| /** |
| * Normal scenario in which a sender is stopped. |
| */ |
| @Test |
| public void testParallelPropagationSenderStop() { |
| addIgnoredException("Broken pipe"); |
| Integer[] locatorPorts = createLNAndNYLocators(); |
| Integer lnPort = locatorPorts[0]; |
| Integer nyPort = locatorPorts[1]; |
| |
| createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true); |
| |
| // make sure all the senders are running before doing any puts |
| waitForSendersRunning(); |
| |
| // FIRST RUN: now, the senders are started. So, do some of the puts |
| vm4.invoke(() -> doPuts(getUniqueName() + "_PR", 100)); |
| |
| // now, stop all of the senders |
| stopSenders(); |
| |
| // SECOND RUN: keep one thread doing puts |
| vm4.invokeAsync(() -> doPuts(getUniqueName() + "_PR", 1000)); |
| |
| // verify region size remains on remote vm and is restricted below a specified limit (number of |
| // puts in the first run) |
| vm2.invoke(() -> validateRegionSizeRemainsSame(getUniqueName() + "_PR", 100)); |
| } |
| |
| /** |
| * Verifies that no distributed deadlock occurs when stopping a gateway sender while receiving |
| * traffic. |
| * The distributed deadlock may occur when the gateway sender tries to get the |
| * size of the gateway sender queue (sending a size message to other members) while holding the |
| * lifeCycleLock lock. This lock is also taken when an event is to be distributed by the gateway |
| * sender. |
| * As this issue has only been observed in the field with a lot of traffic, in order to reproduce |
| * it in a test case, conserve-sockets is set to true (although the deadlock has also |
| * been seen with conserve-sockets=false), the size of the PartitionedRegion thread pool is set |
| * to a small value and an artificial timeout is added at a point in the distribute() call |
| * of the AbstractGatewaySeder class. |
| */ |
| @Test |
| public void testNoDistributedDeadlockWithGatewaySenderStop() throws Exception { |
| addIgnoredException("Broken pipe"); |
| Integer[] locatorPorts = createLNAndNYLocators(); |
| Integer lnPort = locatorPorts[0]; |
| Integer nyPort = locatorPorts[1]; |
| VM[] senders = {vm4, vm5, vm6, vm7}; |
| try { |
| for (VM sender : senders) { |
| sender.invoke(() -> AbstractGatewaySender.doSleepForTestingInDistribute.set(true)); |
| sender.invoke(() -> ClusterOperationExecutors.maxPrThreadsForTest.set(2)); |
| } |
| vm2.invoke(() -> ClusterOperationExecutors.maxPrThreadsForTest.set(2)); |
| vm3.invoke(() -> ClusterOperationExecutors.maxPrThreadsForTest.set(2)); |
| |
| createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true, true); |
| |
| // make sure all the senders are running before doing any puts |
| waitForSendersRunning(); |
| |
| // Send a fairly big amount of operations to provoke the deadlock |
| int invocationsPerServer = 4; |
| AsyncInvocation[] invocations = new AsyncInvocation[senders.length * invocationsPerServer]; |
| for (int i = 0; i < senders.length; i++) { |
| for (int j = 0; j < invocationsPerServer; j++) { |
| invocations[i + (j * invocationsPerServer)] = |
| senders[i].invokeAsync(() -> doPuts(getUniqueName() + "_PR", 100)); |
| } |
| } |
| |
| // Wait for some elements to be replicated before stopping the senders |
| for (int i = 0; i < senders.length; i++) { |
| senders[i].invoke(() -> await() |
| .untilAsserted(() -> assertThat(getSenderStats("ln", -1).get(3)).isGreaterThan(1))); |
| } |
| |
| stopSendersAsync(); |
| for (int i = 0; i < invocations.length; i++) { |
| invocations[i].await(); |
| } |
| } finally { |
| for (int i = 0; i < senders.length; i++) { |
| senders[i].invoke(() -> AbstractGatewaySender.doSleepForTestingInDistribute.set(false)); |
| } |
| } |
| } |
| |
| /** |
| * Normal scenario in which a sender is stopped and then started again. |
| */ |
| @Test |
| public void testParallelPropagationSenderStartAfterStop() { |
| addIgnoredException("Broken pipe"); |
| Integer[] locatorPorts = createLNAndNYLocators(); |
| Integer lnPort = locatorPorts[0]; |
| Integer nyPort = locatorPorts[1]; |
| String regionName = getUniqueName() + "_PR"; |
| |
| createCacheInVMs(nyPort, vm2, vm3); |
| createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); |
| |
| vm2.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); |
| vm3.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); |
| |
| createReceiverInVMs(vm2, vm3); |
| |
| vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); |
| vm5.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); |
| vm6.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); |
| vm7.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); |
| |
| vm4.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true)); |
| vm5.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true)); |
| vm6.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true)); |
| vm7.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true)); |
| |
| startSenderInVMs("ln", vm4, vm5, vm6, vm7); |
| |
| // make sure all the senders are running before doing any puts |
| vm4.invoke(() -> waitForSenderRunningState("ln")); |
| vm5.invoke(() -> waitForSenderRunningState("ln")); |
| vm6.invoke(() -> waitForSenderRunningState("ln")); |
| vm7.invoke(() -> waitForSenderRunningState("ln")); |
| |
| // FIRST RUN: now, the senders are started. So, do some of the puts |
| vm4.invoke(() -> doPuts(regionName, 200)); |
| |
| // now, stop all of the senders |
| vm4.invoke(() -> stopSender("ln")); |
| vm5.invoke(() -> stopSender("ln")); |
| vm6.invoke(() -> stopSender("ln")); |
| vm7.invoke(() -> stopSender("ln")); |
| |
| // Region size on remote site should remain same and below the number of puts done in the FIRST |
| // RUN |
| vm2.invoke(() -> validateRegionSizeRemainsSame(regionName, 200)); |
| |
| // SECOND RUN: do some of the puts after the senders are stopped |
| vm4.invoke(() -> doPuts(regionName, 1000)); |
| |
| // Region size on remote site should remain same and below the number of puts done in the FIRST |
| // RUN |
| vm2.invoke(() -> validateRegionSizeRemainsSame(regionName, 200)); |
| |
| // start the senders again |
| startSenderInVMs("ln", vm4, vm5, vm6, vm7); |
| |
| vm4.invoke(() -> waitForSenderRunningState("ln")); |
| vm5.invoke(() -> waitForSenderRunningState("ln")); |
| vm6.invoke(() -> waitForSenderRunningState("ln")); |
| vm7.invoke(() -> waitForSenderRunningState("ln")); |
| |
| // Region size on remote site should remain same and below the number of puts done in the FIRST |
| // RUN |
| vm2.invoke(() -> validateRegionSizeRemainsSame(regionName, 200)); |
| |
| // SECOND RUN: do some more puts |
| vm4.invoke(() -> doPuts(regionName, 1000)); |
| |
| // verify all the buckets on all the sender nodes are drained |
| validateParallelSenderQueueAllBucketsDrained(); |
| |
| // verify the events propagate to remote site |
| vm2.invoke(() -> validateRegionSize(regionName, 1000)); |
| |
| vm4.invoke(() -> validateQueueSizeStat("ln", 0)); |
| vm5.invoke(() -> validateQueueSizeStat("ln", 0)); |
| vm6.invoke(() -> validateQueueSizeStat("ln", 0)); |
| vm7.invoke(() -> validateQueueSizeStat("ln", 0)); |
| } |
| |
| /** |
| * Normal scenario in which a sender is stopped and then started again. Differs from above test |
| * case in the way that when the sender is starting from stopped state, puts are simultaneously |
| * happening on the region by another thread. |
| */ |
| @Test |
| public void testParallelPropagationSenderStartAfterStop_Scenario2() throws Exception { |
| addIgnoredException("Broken pipe"); |
| Integer[] locatorPorts = createLNAndNYLocators(); |
| Integer lnPort = locatorPorts[0]; |
| Integer nyPort = locatorPorts[1]; |
| |
| createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true); |
| |
| // make sure all the senders are running before doing any puts |
| waitForSendersRunning(); |
| |
| // FIRST RUN: now, the senders are started. So, do some of the puts |
| vm4.invoke(() -> doPuts(getUniqueName() + "_PR", 200)); |
| |
| // Make sure the puts make it to the remote side |
| vm2.invoke(() -> validateRegionSize(getUniqueName() + "_PR", 200)); |
| vm3.invoke(() -> validateRegionSize(getUniqueName() + "_PR", 200)); |
| |
| // now, stop all of the senders |
| stopSenders(); |
| |
| vm2.invoke(() -> validateRegionSize(getUniqueName() + "_PR", 200)); |
| vm3.invoke(() -> validateRegionSize(getUniqueName() + "_PR", 200)); |
| |
| // SECOND RUN: do some of the puts after the senders are stopped |
| vm4.invoke(() -> doPuts(getUniqueName() + "_PR", 1000)); |
| |
| // Region size on remote site should remain same and below the number of puts done in the FIRST |
| // RUN |
| vm2.invoke(() -> validateRegionSizeRemainsSame(getUniqueName() + "_PR", 200)); |
| |
| // SECOND RUN: start async puts on region |
| boolean foundEventsDroppedDueToPrimarySenderNotRunning = false; |
| int count = 0; |
| |
| do { |
| stopSenders(); |
| AsyncInvocation<Void> async = vm4.invokeAsync(() -> doPuts(getUniqueName() + "_PR", 5000)); |
| |
| // when puts are happening by another thread, start the senders |
| startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7); |
| |
| async.join(); |
| List<Integer> vm4List = vm4.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln")); |
| List<Integer> vm5List = vm5.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln")); |
| List<Integer> vm6List = vm6.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln")); |
| List<Integer> vm7List = vm7.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln")); |
| if (vm4List.get(0) + vm5List.get(0) + vm6List.get(0) + vm7List.get(0) > 0) { |
| foundEventsDroppedDueToPrimarySenderNotRunning = true; |
| } |
| count++; |
| } while (!foundEventsDroppedDueToPrimarySenderNotRunning && count < 5); |
| |
| // verify all the buckets on all the sender nodes are drained |
| validateParallelSenderQueueAllBucketsDrained(); |
| |
| // verify that the queue size ultimately becomes zero. That means all the events propagate to |
| // remote site. |
| |
| vm4.invoke(() -> validateQueueContents("ln", 0)); |
| } |
| |
| /** |
| * Normal scenario in which a sender is stopped and then started again on accessor node. |
| */ |
| @Test |
| public void testParallelPropagationSenderStartAfterStopOnAccessorNode() { |
| addIgnoredException("Broken pipe"); |
| addIgnoredException("Connection reset"); |
| addIgnoredException("Unexpected IOException"); |
| Integer[] locatorPorts = createLNAndNYLocators(); |
| Integer lnPort = locatorPorts[0]; |
| Integer nyPort = locatorPorts[1]; |
| |
| createSendersReceiversAndPartitionedRegion(lnPort, nyPort, true, true); |
| |
| // make sure all the senders are not running on accessor nodes and running on non-accessor nodes |
| waitForSendersRunning(); |
| |
| // FIRST RUN: now, the senders are started. So, do some of the puts |
| vm4.invoke(() -> doPuts(getUniqueName() + "_PR", 200)); |
| |
| // now, stop all of the senders |
| stopSenders(); |
| |
| // SECOND RUN: do some of the puts after the senders are stopped |
| vm4.invoke(() -> doPuts(getUniqueName() + "_PR", 1000)); |
| |
| // Region size on remote site should remain same and below the number of puts done in the FIRST |
| // RUN |
| vm2.invoke(() -> validateRegionSizeRemainsSame(getUniqueName() + "_PR", 200)); |
| |
| // start the senders again |
| startSenderInVMs("ln", vm4, vm5, vm6, vm7); |
| |
| // Region size on remote site should remain same and below the number of puts done in the FIRST |
| // RUN |
| vm2.invoke(() -> validateRegionSizeRemainsSame(getUniqueName() + "_PR", 200)); |
| |
| // SECOND RUN: do some more puts |
| vm4.invoke(() -> doPuts(getUniqueName() + "_PR", 1000)); |
| |
| // verify all buckets drained only on non-accessor nodes. |
| vm4.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln")); |
| vm5.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln")); |
| |
| // verify the events propagate to remote site |
| vm2.invoke(() -> validateRegionSize(getUniqueName() + "_PR", 1000)); |
| } |
| |
| /** |
| * Normal scenario in which a combinations of start, pause, resume operations is tested |
| */ |
| @Test |
| public void testStartPauseResumeParallelGatewaySender() throws Exception { |
| Integer[] locatorPorts = createLNAndNYLocators(); |
| Integer lnPort = locatorPorts[0]; |
| Integer nyPort = locatorPorts[1]; |
| |
| createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true); |
| |
| vm4.invoke(() -> doPuts(getUniqueName() + "_PR", 1000)); |
| |
| // Since puts are already done on userPR, it will have the buckets created. |
| // During sender start, it will wait until those buckets are created for shadowPR as well. |
| // Start the senders in async threads, so colocation of shadowPR will be complete and |
| // missing buckets will be created in PRHARedundancyProvider.createMissingBuckets(). |
| startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7); |
| |
| waitForSendersRunning(); |
| |
| vm4.invoke(() -> doPuts(getUniqueName() + "_PR", 5000)); |
| |
| vm4.invoke(() -> pauseSender("ln")); |
| vm5.invoke(() -> pauseSender("ln")); |
| vm6.invoke(() -> pauseSender("ln")); |
| vm7.invoke(() -> pauseSender("ln")); |
| |
| vm4.invoke(() -> verifySenderPausedState("ln")); |
| vm5.invoke(() -> verifySenderPausedState("ln")); |
| vm6.invoke(() -> verifySenderPausedState("ln")); |
| vm7.invoke(() -> verifySenderPausedState("ln")); |
| |
| AsyncInvocation<Void> async = vm4.invokeAsync(() -> doPuts(getUniqueName() + "_PR", 1000)); |
| |
| vm4.invoke(() -> resumeSender("ln")); |
| vm5.invoke(() -> resumeSender("ln")); |
| vm6.invoke(() -> resumeSender("ln")); |
| vm7.invoke(() -> resumeSender("ln")); |
| |
| vm4.invoke(() -> verifySenderResumedState("ln")); |
| vm5.invoke(() -> verifySenderResumedState("ln")); |
| vm6.invoke(() -> verifySenderResumedState("ln")); |
| vm7.invoke(() -> verifySenderResumedState("ln")); |
| |
| async.await(); |
| |
| // verify all buckets drained on all sender nodes. |
| validateParallelSenderQueueAllBucketsDrained(); |
| |
| validateRegionSizes(getUniqueName() + "_PR", 5000, vm2, vm3); |
| } |
| |
| /** |
| * Since the sender is attached to a region and in use, it can not be destroyed. Hence, exception |
| * is thrown by the sender API. |
| */ |
| @Test |
| public void testDestroyParallelGatewaySenderExceptionScenario() { |
| Integer[] locatorPorts = createLNAndNYLocators(); |
| Integer lnPort = locatorPorts[0]; |
| Integer nyPort = locatorPorts[1]; |
| |
| createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true); |
| |
| // make sure all the senders are running before doing any puts |
| waitForSendersRunning(); |
| |
| vm4.invoke(() -> doPuts(getUniqueName() + "_PR", 1000)); |
| |
| // try destroying on couple of nodes |
| Throwable caughtException = catchThrowable(() -> vm4.invoke(() -> destroySender("ln"))); |
| |
| assertThat(caughtException).isInstanceOf(RMIException.class); |
| assertThat(caughtException.getCause()).isInstanceOf(GatewaySenderException.class); |
| |
| caughtException = catchThrowable(() -> vm5.invoke(() -> destroySender("ln"))); |
| |
| assertThat(caughtException).isInstanceOf(RMIException.class); |
| assertThat(caughtException.getCause()).isInstanceOf(GatewaySenderException.class); |
| |
| vm2.invoke(() -> validateRegionSize(getUniqueName() + "_PR", 1000)); |
| } |
| |
| @Test |
| public void testDestroyParallelGatewaySender() { |
| Integer[] locatorPorts = createLNAndNYLocators(); |
| Integer lnPort = locatorPorts[0]; |
| Integer nyPort = locatorPorts[1]; |
| |
| createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true); |
| |
| // make sure all the senders are running |
| waitForSendersRunning(); |
| |
| vm4.invoke(() -> doPuts(getUniqueName() + "_PR", 1000)); |
| |
| // stop the sender and remove from region before calling destroy on it |
| stopSenders(); |
| |
| vm4.invoke(() -> removeSenderFromTheRegion("ln", getUniqueName() + "_PR")); |
| vm5.invoke(() -> removeSenderFromTheRegion("ln", getUniqueName() + "_PR")); |
| vm6.invoke(() -> removeSenderFromTheRegion("ln", getUniqueName() + "_PR")); |
| vm7.invoke(() -> removeSenderFromTheRegion("ln", getUniqueName() + "_PR")); |
| |
| vm4.invoke(() -> destroySender("ln")); |
| vm5.invoke(() -> destroySender("ln")); |
| vm6.invoke(() -> destroySender("ln")); |
| vm7.invoke(() -> destroySender("ln")); |
| |
| vm4.invoke(() -> verifySenderDestroyed("ln", true)); |
| vm5.invoke(() -> verifySenderDestroyed("ln", true)); |
| vm6.invoke(() -> verifySenderDestroyed("ln", true)); |
| vm7.invoke(() -> verifySenderDestroyed("ln", true)); |
| } |
| |
| @Test |
| public void destroyParallelGatewaySenderShouldNotStopDispatchingFromOtherSendersAttachedToTheRegion() { |
| String site2SenderId = "site2-sender"; |
| String site3SenderId = "site3-sender"; |
| String regionName = testName.getMethodName(); |
| int[] ports = getRandomAvailableTCPPorts(3); |
| int site1Port = ports[0]; |
| int site2Port = ports[1]; |
| int site3Port = ports[2]; |
| Set<String> site1RemoteLocators = |
| Stream.of("localhost[" + site2Port + "]", "localhost[" + site3Port + "]") |
| .collect(Collectors.toSet()); |
| Set<String> site2RemoteLocators = |
| Stream.of("localhost[" + site1Port + "]", "localhost[" + site3Port + "]") |
| .collect(Collectors.toSet()); |
| Set<String> site3RemoteLocators = |
| Stream.of("localhost[" + site1Port + "]", "localhost[" + site2Port + "]") |
| .collect(Collectors.toSet()); |
| |
| // Start 3 sites. |
| vm0.invoke(() -> createLocator(1, site1Port, |
| Collections.singleton("localhost[" + site1Port + "]"), site1RemoteLocators)); |
| vm1.invoke(() -> createLocator(2, site2Port, |
| Collections.singleton("localhost[" + site2Port + "]"), site2RemoteLocators)); |
| vm2.invoke(() -> createLocator(3, site3Port, |
| Collections.singleton("localhost[" + site3Port + "]"), site3RemoteLocators)); |
| |
| // Create the cache on the 3 sites. |
| createCacheInVMs(site1Port, vm3); |
| createCacheInVMs(site2Port, vm4); |
| createCacheInVMs(site3Port, vm5); |
| |
| // Create receiver and region on sites 2 and 3. |
| asList(vm4, vm5).forEach(vm -> vm.invoke(() -> { |
| createReceiver(); |
| createPartitionedRegion(regionName, null, 1, 113, isOffHeap()); |
| })); |
| |
| // Create senders and partitioned region on site 1. |
| vm3.invoke(() -> { |
| createSender(site2SenderId, 2, true, 100, 20, false, false, null, false); |
| createSender(site3SenderId, 3, true, 100, 20, false, false, null, false); |
| waitForSenderRunningState(site2SenderId); |
| waitForSenderRunningState(site3SenderId); |
| |
| createPartitionedRegion(regionName, String.join(",", site2SenderId, site3SenderId), 1, 113, |
| isOffHeap()); |
| }); |
| |
| // #################################################################################### // |
| |
| final int FIRST_BATCH = 100; |
| final int SECOND_BATCH = 200; |
| final Map<String, String> firstBatch = new HashMap<>(); |
| IntStream.range(0, FIRST_BATCH).forEach(i -> firstBatch.put("Key" + i, "Value" + i)); |
| final Map<String, String> secondBatch = new HashMap<>(); |
| IntStream.range(FIRST_BATCH, SECOND_BATCH) |
| .forEach(i -> secondBatch.put("Key" + i, "Value" + i)); |
| |
| // Insert first batch and wait until the queues are empty. |
| vm3.invoke(() -> { |
| cache.getRegion(regionName).putAll(firstBatch); |
| checkQueueSize(site2SenderId, 0); |
| checkQueueSize(site3SenderId, 0); |
| }); |
| |
| // Wait until sites 2 and 3 have received all updates. |
| asList(vm4, vm5).forEach(vm -> vm.invoke(() -> { |
| Region<String, String> region = cache.getRegion(regionName); |
| await().untilAsserted(() -> assertThat(region.size()).isEqualTo(FIRST_BATCH)); |
| firstBatch.forEach((key, value) -> assertThat(region.get(key)).isEqualTo(value)); |
| })); |
| |
| // Stop sender to site3, remove it from the region and destroy it. |
| vm3.invoke(() -> { |
| stopSender(site3SenderId); |
| removeSenderFromTheRegion(site3SenderId, regionName); |
| destroySender(site3SenderId); |
| verifySenderDestroyed(site3SenderId, true); |
| }); |
| |
| // Insert second batch and wait until the queue is empty. |
| vm3.invoke(() -> { |
| cache.getRegion(regionName).putAll(secondBatch); |
| checkQueueSize(site2SenderId, 0); |
| }); |
| |
| // Site 3 should only have the first batch. |
| vm5.invoke(() -> { |
| Region<String, String> region = cache.getRegion(regionName); |
| await().untilAsserted(() -> assertThat(region.size()).isEqualTo(FIRST_BATCH)); |
| firstBatch.forEach((key, value) -> assertThat(region.get(key)).isEqualTo(value)); |
| }); |
| |
| // Site 2 should have both batches. |
| vm4.invoke(() -> { |
| Region<String, String> region = cache.getRegion(regionName); |
| await().untilAsserted(() -> assertThat(region.size()).isEqualTo(SECOND_BATCH)); |
| firstBatch.forEach((key, value) -> assertThat(region.get(key)).isEqualTo(value)); |
| secondBatch.forEach((key, value) -> assertThat(region.get(key)).isEqualTo(value)); |
| }); |
| } |
| |
| @Test |
| public void testParallelGatewaySenderMessageTooLargeException() { |
| vm4.invoke(() -> System.setProperty(MAX_MESSAGE_SIZE_PROPERTY, String.valueOf(1024 * 1024))); |
| |
| Integer[] locatorPorts = createLNAndNYLocators(); |
| Integer lnPort = locatorPorts[0]; |
| Integer nyPort = locatorPorts[1]; |
| |
| // Create and start sender with reduced maximum message size and 1 dispatcher thread |
| String regionName = getUniqueName() + "_PR"; |
| vm4.invoke(() -> createCache(lnPort)); |
| vm4.invoke(() -> setNumDispatcherThreadsForTheRun(1)); |
| vm4.invoke(() -> createSender("ln", 2, true, 100, 100, false, false, null, false)); |
| vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 0, 100, isOffHeap())); |
| |
| // Do puts |
| int numPuts = 200; |
| vm4.invoke(() -> doPuts(regionName, numPuts, new byte[11000])); |
| validateRegionSizes(regionName, numPuts, vm4); |
| |
| // Start receiver |
| addIgnoredException(MessageTooLargeException.class.getName(), vm4); |
| addIgnoredException(GemFireIOException.class.getName(), vm4); |
| |
| vm2.invoke(() -> createCache(nyPort)); |
| vm2.invoke(() -> createPartitionedRegion(regionName, null, 0, 100, isOffHeap())); |
| vm2.invoke(WANTestBase::createReceiver); |
| |
| validateRegionSizes(regionName, numPuts, vm2); |
| |
| vm4.invoke(() -> { |
| AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender("ln"); |
| GeodeAwaitility.await() |
| .untilAsserted(() -> assertThat(System.getProperty(MAX_MESSAGE_SIZE_PROPERTY)) |
| .isEqualTo(String.valueOf(1024 * 1024))); |
| GeodeAwaitility.await().untilAsserted( |
| () -> assertThat(sender.getStatistics().getBatchesResized()).isGreaterThan(0)); |
| }); |
| } |
| |
| @Test |
| public void testMultiSiteReplication() { |
| String site1to2SenderId = "site1to2-sender"; |
| String site1to3SenderId = "site1to3-sender"; |
| String site2to3SenderId = "site2to3-sender"; |
| String regionName = testName.getMethodName(); |
| int[] ports = getRandomAvailableTCPPorts(3); |
| int site1Port = ports[0]; |
| int site2Port = ports[1]; |
| int site3Port = ports[2]; |
| Set<String> site1RemoteLocators = |
| Stream.of("localhost[" + site2Port + "]", "localhost[" + site3Port + "]") |
| .collect(Collectors.toSet()); |
| Set<String> site2RemoteLocators = |
| Stream.of("localhost[" + site1Port + "]", "localhost[" + site3Port + "]") |
| .collect(Collectors.toSet()); |
| Set<String> site3RemoteLocators = |
| Stream.of("localhost[" + site1Port + "]", "localhost[" + site2Port + "]") |
| .collect(Collectors.toSet()); |
| |
| // Start 3 sites. |
| vm0.invoke(() -> createLocator(1, site1Port, |
| Collections.singleton("localhost[" + site1Port + "]"), site1RemoteLocators)); |
| vm1.invoke(() -> createLocator(2, site2Port, |
| Collections.singleton("localhost[" + site2Port + "]"), site2RemoteLocators)); |
| vm2.invoke(() -> createLocator(3, site3Port, |
| Collections.singleton("localhost[" + site3Port + "]"), site3RemoteLocators)); |
| |
| // Create the cache on the 3 sites. |
| createCacheInVMs(site1Port, vm3); |
| createCacheInVMs(site2Port, vm4); |
| createCacheInVMs(site3Port, vm5); |
| |
| // Create senders and partitioned region on site 1. |
| vm3.invoke(() -> { |
| createSender(site1to2SenderId, 2, true, 100, 20, false, false, null, false); |
| createSender(site1to3SenderId, 3, true, 100, 20, false, false, null, false); |
| waitForSenderRunningState(site1to2SenderId); |
| waitForSenderRunningState(site1to3SenderId); |
| |
| createPartitionedRegion(regionName, String.join(",", site1to2SenderId, site1to3SenderId), 1, |
| 113, |
| isOffHeap()); |
| }); |
| |
| // Create receiver, sender and partitioned region on site 2. |
| vm4.invoke(() -> { |
| createReceiver(); |
| createSender(site2to3SenderId, 3, true, 100, 20, false, false, null, false); |
| waitForSenderRunningState(site2to3SenderId); |
| |
| createPartitionedRegion(regionName, String.join(",", site2to3SenderId), 1, 113, |
| isOffHeap()); |
| }); |
| |
| // Create receiver and partitioned region on site 3. |
| vm5.invoke(() -> { |
| createReceiver(); |
| createPartitionedRegion(regionName, null, 1, 113, isOffHeap()); |
| }); |
| |
| // Do puts |
| vm3.invoke(() -> doPuts(regionName, 200)); |
| validateRegionSizes(regionName, 200, vm3, vm4, vm5); |
| |
| // Stop sender from site 1 to site 3. |
| vm3.invoke(() -> stopSender(site1to3SenderId)); |
| |
| // Do puts again |
| vm3.invoke(() -> doPuts(regionName, 1000)); |
| validateRegionSizes(regionName, 1000, vm3, vm4, vm5); |
| } |
| |
| @Test |
| public void testParallelGatewaySenderConcurrentPutClearNoOffheapOrphans() { |
| MemberVM locator = clusterStartupRule.startLocatorVM(1, new Properties()); |
| Properties properties = new Properties(); |
| properties.put(OFF_HEAP_MEMORY_SIZE_NAME, "100"); |
| MemberVM server = clusterStartupRule.startServerVM(2, properties, locator.getPort()); |
| final String regionName = "portfolios"; |
| final String gatewaySenderId = "ln"; |
| |
| server.invoke(() -> { |
| addIgnoredException("could not get remote locator"); |
| InternalCache cache = ClusterStartupRule.getCache(); |
| GatewaySender sender = |
| cache.createGatewaySenderFactory().setParallel(true).create(gatewaySenderId, 1); |
| Region<Object, Object> userRegion = |
| cache.createRegionFactory(RegionShortcut.PARTITION).setOffHeap(true) |
| .addGatewaySenderId("ln").create(regionName); |
| PartitionedRegion shadowRegion = (PartitionedRegion) ((AbstractGatewaySender) sender) |
| .getEventProcessor().getQueue().getRegion(); |
| @SuppressWarnings("unchecked") |
| CacheWriter<Object, Object> mockCacheWriter = mock(CacheWriter.class); |
| CountDownLatch cacheWriterLatch = new CountDownLatch(1); |
| CountDownLatch shadowRegionClearLatch = new CountDownLatch(1); |
| |
| doAnswer(invocation -> { |
| // The cache writer is invoked between when the region entry is created with value of |
| // Token.REMOVED_PHASE_1 and when it is replaced with the actual GatewaySenderEvent. |
| // We use this hook to trigger the clear logic when the token is still in the |
| // region entry. |
| cacheWriterLatch.countDown(); |
| // Wait until the clear is complete before putting the actual GatewaySenderEvent in the |
| // region entry. |
| shadowRegionClearLatch.await(); |
| return null; |
| }).when(mockCacheWriter).beforeCreate(any()); |
| |
| shadowRegion.setCacheWriter(mockCacheWriter); |
| |
| ExecutorService service = Executors.newFixedThreadPool(2); |
| |
| List<Callable<Object>> callables = new ArrayList<>(); |
| |
| // In one thread, we are going to put some test data in the user region, |
| // which will eventually put the GatewaySenderEvent into the shadow region |
| callables.add(Executors.callable(() -> { |
| try { |
| userRegion.put("testKey", "testValue"); |
| } catch (Exception ex) { |
| throw new RuntimeException(ex); |
| } |
| })); |
| |
| // In another thread, we are clear the shadow region's buckets. If the region entry is a |
| // Token.REMOVED_PHASE_1 at this time, a leak is possible. We can guarantee that the |
| // Token.REMOVED_PHASE_1 is present by using the mocked cache writer defined |
| // above. |
| callables.add(Executors.callable(() -> { |
| try { |
| OffHeapClearRequired.doWithOffHeapClear(() -> { |
| // Wait for the cache writer to be invoked to release this countdown latch. |
| // This guarantees that the region entry will contain a Token.REMOVED_PHASE_1. |
| try { |
| cacheWriterLatch.await(); |
| } catch (InterruptedException e) { |
| throw new TestException( |
| "Thread was interrupted while waiting for mocked cache writer to be invoked"); |
| } |
| |
| clearShadowBucketRegions(shadowRegion); |
| |
| // Signal to the cache writer that the clear is complete and the put can continue. |
| shadowRegionClearLatch.countDown(); |
| }); |
| } catch (Exception ex) { |
| throw new RuntimeException(ex); |
| } |
| })); |
| |
| List<Future<Object>> futures = service.invokeAll(callables, 10, TimeUnit.SECONDS); |
| |
| for (Future<Object> future : futures) { |
| try { |
| future.get(); |
| } catch (Exception ex) { |
| throw new TestException( |
| "Exception thrown while executing put and clear concurrently", |
| ex); |
| } |
| } |
| |
| userRegion.close(); |
| |
| await("Waiting for off-heap to be freed").until( |
| () -> 0 == ((MemoryAllocatorImpl) cache.getOffHeapStore()).getOrphans(cache).size()); |
| }); |
| } |
| |
| @Test |
| public void testParallelGWSenderUpdateAttrWhileEntriesInQueue() { |
| Integer[] locatorPorts = createLNAndNYLocators(); |
| Integer lnPort = locatorPorts[0]; |
| Integer nyPort = locatorPorts[1]; |
| |
| createCacheInVMs(nyPort, vm2, vm3); |
| |
| createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); |
| |
| vm4.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, false)); |
| vm5.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, false)); |
| vm6.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, false)); |
| vm7.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, false)); |
| |
| createPartitionedRegions(false); |
| |
| vm4.invoke(() -> waitForSenderRunningState("ln")); |
| vm5.invoke(() -> waitForSenderRunningState("ln")); |
| vm6.invoke(() -> waitForSenderRunningState("ln")); |
| vm7.invoke(() -> waitForSenderRunningState("ln")); |
| |
| vm4.invoke(() -> doPuts(getUniqueName() + "_PR", 1000)); |
| |
| updateBatchSize(50); |
| updateBatchTimeInterval(100); |
| |
| createReceiverInVMs(vm2, vm3); |
| |
| validateRegionSizes(getUniqueName() + "_PR", 1000, vm2, vm3); |
| |
| checkBatchSize(50); |
| checkBatchTimeInterval(100); |
| } |
| |
| @Test |
| public void testParallelGWSenderUpdateAttrWhilePutting() throws Exception { |
| Integer[] locatorPorts = createLNAndNYLocators(); |
| Integer lnPort = locatorPorts[0]; |
| Integer nyPort = locatorPorts[1]; |
| |
| createCacheInVMs(nyPort, vm2, vm3); |
| createReceiverInVMs(vm2, vm3); |
| |
| createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); |
| |
| vm4.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, false)); |
| vm5.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, false)); |
| vm6.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, false)); |
| vm7.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, false)); |
| |
| createPartitionedRegions(false); |
| |
| vm4.invoke(() -> waitForSenderRunningState("ln")); |
| vm5.invoke(() -> waitForSenderRunningState("ln")); |
| vm6.invoke(() -> waitForSenderRunningState("ln")); |
| vm7.invoke(() -> waitForSenderRunningState("ln")); |
| |
| AsyncInvocation<Void> async = vm4.invokeAsync(() -> doPuts(getUniqueName() + "_PR", 5000)); |
| |
| updateBatchSize(100); |
| updateBatchTimeInterval(150); |
| |
| async.await(); |
| validateRegionSizes(getUniqueName() + "_PR", 5000, vm2, vm3); |
| |
| checkBatchSize(100); |
| checkBatchTimeInterval(150); |
| } |
| |
| @Test |
| public void testParallelGWSenderUpdateAttrWhilePaused() { |
| Integer[] locatorPorts = createLNAndNYLocators(); |
| Integer lnPort = locatorPorts[0]; |
| Integer nyPort = locatorPorts[1]; |
| |
| createCacheInVMs(nyPort, vm2, vm3); |
| createReceiverInVMs(vm2, vm3); |
| |
| createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); |
| |
| vm4.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, false)); |
| vm5.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, false)); |
| vm6.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, false)); |
| vm7.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, false)); |
| |
| createPartitionedRegions(false); |
| |
| vm4.invoke(() -> waitForSenderRunningState("ln")); |
| vm5.invoke(() -> waitForSenderRunningState("ln")); |
| vm6.invoke(() -> waitForSenderRunningState("ln")); |
| vm7.invoke(() -> waitForSenderRunningState("ln")); |
| |
| vm4.invoke(() -> pauseSender("ln")); |
| vm5.invoke(() -> pauseSender("ln")); |
| vm6.invoke(() -> pauseSender("ln")); |
| vm7.invoke(() -> pauseSender("ln")); |
| |
| vm4.invoke(() -> doPuts(getUniqueName() + "_PR", 5000)); |
| |
| updateGroupTransactionEvents(true); |
| updateBatchTimeInterval(200); |
| |
| validateRegionSizes(getUniqueName() + "_PR", 0, vm2, vm3); |
| |
| checkGroupTransactionEvents(true); |
| checkBatchTimeInterval(200); |
| |
| vm4.invoke(() -> resumeSender("ln")); |
| vm5.invoke(() -> resumeSender("ln")); |
| vm6.invoke(() -> resumeSender("ln")); |
| vm7.invoke(() -> resumeSender("ln")); |
| |
| validateRegionSizes(getUniqueName() + "_PR", 5000, vm2, vm3); |
| |
| } |
| |
| @Test |
| public void testParallelGWSenderUpdateFiltersWhilePutting() throws Exception { |
| Integer[] locatorPorts = createLNAndNYLocators(); |
| Integer lnPort = locatorPorts[0]; |
| Integer nyPort = locatorPorts[1]; |
| |
| List<GatewayEventFilter> filters = new ArrayList<>(); |
| filters.add(new MyGatewayEventFilter_AfterAck()); |
| filters.add(new PDXGatewayEventFilter()); |
| |
| List<GatewayEventFilter> filters2 = new ArrayList<>(); |
| filters.add(new MyGatewayEventFilter()); |
| filters.add(new MyGatewayEventFilter_AfterAck()); |
| filters.add(new PDXGatewayEventFilter()); |
| |
| createCacheInVMs(nyPort, vm2, vm3); |
| createReceiverInVMs(vm2, vm3); |
| |
| createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); |
| |
| vm4.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, false)); |
| vm5.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, false)); |
| vm6.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, false)); |
| vm7.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, false)); |
| |
| createPartitionedRegions(false); |
| |
| vm4.invoke(() -> waitForSenderRunningState("ln")); |
| vm5.invoke(() -> waitForSenderRunningState("ln")); |
| vm6.invoke(() -> waitForSenderRunningState("ln")); |
| vm7.invoke(() -> waitForSenderRunningState("ln")); |
| |
| AsyncInvocation<Void> async = vm4.invokeAsync(() -> doPuts(getUniqueName() + "_PR", 5000)); |
| checkBatchSize(10); |
| |
| updateBatchSize(100); |
| updateGatewayEventFilters(filters); |
| checkBatchSize(100); |
| updateGatewayEventFilters(filters2); |
| |
| async.await(); |
| |
| } |
| |
| |
| @Test |
| public void testParallelGWSenderUpdateFiltersWhilePuttingOnOneDispatcThread() throws Exception { |
| Integer[] locatorPorts = createLNAndNYLocators(); |
| Integer lnPort = locatorPorts[0]; |
| Integer nyPort = locatorPorts[1]; |
| |
| List<GatewayEventFilter> filters = new ArrayList<>(); |
| filters.add(new MyGatewayEventFilter_AfterAck()); |
| filters.add(new PDXGatewayEventFilter()); |
| |
| List<GatewayEventFilter> filters2 = new ArrayList<>(); |
| filters.add(new MyGatewayEventFilter()); |
| filters.add(new MyGatewayEventFilter_AfterAck()); |
| filters.add(new PDXGatewayEventFilter()); |
| |
| createCacheInVMs(nyPort, vm2, vm3); |
| createReceiverInVMs(vm2, vm3); |
| |
| createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); |
| vm4.invoke(() -> setNumDispatcherThreadsForTheRun(1)); |
| vm5.invoke(() -> setNumDispatcherThreadsForTheRun(1)); |
| vm6.invoke(() -> setNumDispatcherThreadsForTheRun(1)); |
| vm7.invoke(() -> setNumDispatcherThreadsForTheRun(1)); |
| |
| vm4.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, false)); |
| vm5.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, false)); |
| vm6.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, false)); |
| vm7.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, false)); |
| |
| createPartitionedRegions(false); |
| |
| vm4.invoke(() -> waitForSenderRunningState("ln")); |
| vm5.invoke(() -> waitForSenderRunningState("ln")); |
| vm6.invoke(() -> waitForSenderRunningState("ln")); |
| vm7.invoke(() -> waitForSenderRunningState("ln")); |
| |
| AsyncInvocation<Void> async = vm4.invokeAsync(() -> doPuts(getUniqueName() + "_PR", 5000)); |
| checkBatchSize(10); |
| |
| updateBatchSize(100); |
| updateGatewayEventFilters(filters); |
| checkBatchSize(100); |
| updateGatewayEventFilters(filters2); |
| |
| async.await(); |
| |
| } |
| |
| /** |
| * Put entries in region after gateway sender is stopped. Count number of PQRM messages sent. |
| */ |
| @Test |
| public void testDroppedEventsSignalizationToSecondaryQueueWhileSenderStopped() { |
| int lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1)); |
| int nyPort = vm1.invoke(() -> createFirstRemoteLocator(2, lnPort)); |
| |
| createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true); |
| |
| // make sure all the senders are running before doing any puts |
| waitForSendersRunning(); |
| |
| // FIRST RUN: now, the senders are started. So, start the puts |
| vm4.invoke(() -> doPuts(getUniqueName() + "_PR", 100)); |
| |
| vm2.invoke(() -> validateRegionSizeRemainsSame(getUniqueName() + "_PR", 100)); |
| |
| stopSenders(); |
| |
| waitForAllSendersNotRunning(); |
| |
| vm4.invoke(() -> { |
| DistributionMessageObserver.setInstance(new CountSentPQRMObserver()); |
| }); |
| vm5.invoke(() -> { |
| DistributionMessageObserver.setInstance(new CountSentPQRMObserver()); |
| }); |
| vm6.invoke(() -> { |
| DistributionMessageObserver.setInstance(new CountSentPQRMObserver()); |
| }); |
| vm7.invoke(() -> { |
| DistributionMessageObserver.setInstance(new CountSentPQRMObserver()); |
| }); |
| |
| // SECOND RUN: keep one thread doing puts to the region |
| vm4.invoke(() -> doPutsFrom(getUniqueName() + "_PR", 100, 200)); |
| |
| vm2.invoke(() -> validateRegionSizeRemainsSame(getUniqueName() + "_PR", 100)); |
| |
| int parallelQueueRemovalMessageCountInVm4 = vm4.invoke(() -> { |
| CountSentPQRMObserver observer = |
| (CountSentPQRMObserver) DistributionMessageObserver.getInstance(); |
| return observer.getNumberOfSentPQRM(); |
| }); |
| |
| int parallelQueueRemovalMessageCountInVm5 = vm5.invoke(() -> { |
| CountSentPQRMObserver observer = |
| (CountSentPQRMObserver) DistributionMessageObserver.getInstance(); |
| return observer.getNumberOfSentPQRM(); |
| }); |
| |
| int parallelQueueRemovalMessageCountInVm6 = vm6.invoke(() -> { |
| CountSentPQRMObserver observer = |
| (CountSentPQRMObserver) DistributionMessageObserver.getInstance(); |
| return observer.getNumberOfSentPQRM(); |
| }); |
| |
| int parallelQueueRemovalMessageCountInVm7 = vm7.invoke(() -> { |
| CountSentPQRMObserver observer = |
| (CountSentPQRMObserver) DistributionMessageObserver.getInstance(); |
| return observer.getNumberOfSentPQRM(); |
| }); |
| |
| assertThat(parallelQueueRemovalMessageCountInVm4 + parallelQueueRemovalMessageCountInVm5 |
| + parallelQueueRemovalMessageCountInVm6 + parallelQueueRemovalMessageCountInVm7) |
| .isEqualTo(100); |
| |
| await().untilAsserted(() -> { |
| assertThat(vm4.invoke(() -> getSecondaryQueueSizeInStats("ln"))).isZero(); |
| assertThat(vm5.invoke(() -> getSecondaryQueueSizeInStats("ln"))).isZero(); |
| assertThat(vm6.invoke(() -> getSecondaryQueueSizeInStats("ln"))).isZero(); |
| assertThat(vm7.invoke(() -> getSecondaryQueueSizeInStats("ln"))).isZero(); |
| }); |
| |
| } |
| |
| private void clearShadowBucketRegions(PartitionedRegion shadowRegion) { |
| PartitionedRegionDataStore.BucketVisitor bucketVisitor = |
| (bucketId, r) -> ((BucketRegion) r).clearEntries(null); |
| |
| shadowRegion.getDataStore().visitBuckets(bucketVisitor); |
| } |
| |
| private void createSendersReceiversAndPartitionedRegion(Integer lnPort, Integer nyPort, |
| boolean createAccessors, boolean startSenders) { |
| createSendersReceiversAndPartitionedRegion(lnPort, nyPort, createAccessors, startSenders, |
| false); |
| } |
| |
| private void createSendersReceiversAndPartitionedRegion(Integer lnPort, Integer nyPort, |
| boolean createAccessors, boolean startSenders, boolean conserveSockets) { |
| createSendersAndReceivers(lnPort, nyPort, conserveSockets); |
| |
| createPartitionedRegions(createAccessors); |
| |
| if (startSenders) { |
| startSenderInVMs("ln", vm4, vm5, vm6, vm7); |
| } |
| } |
| |
| private void createSendersAndReceivers(Integer lnPort, Integer nyPort, boolean conserveSockets) { |
| createCacheConserveSocketsInVMs(conserveSockets, nyPort, vm2, vm3); |
| createReceiverInVMs(vm2, vm3); |
| |
| createCacheConserveSocketsInVMs(conserveSockets, lnPort, vm4, vm5, vm6, vm7); |
| |
| vm4.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true)); |
| vm5.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true)); |
| vm6.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true)); |
| vm7.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true)); |
| } |
| |
| private void createPartitionedRegions(boolean createAccessors) { |
| String regionName = getUniqueName() + "_PR"; |
| vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); |
| vm5.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); |
| |
| if (createAccessors) { |
| vm6.invoke(() -> createPartitionedRegionAsAccessor(regionName, "ln", 1, 100)); |
| vm7.invoke(() -> createPartitionedRegionAsAccessor(regionName, "ln", 1, 100)); |
| } else { |
| vm6.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); |
| vm7.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); |
| } |
| |
| vm2.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); |
| vm3.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); |
| } |
| |
| private void updateBatchSize(int batchsize) { |
| vm4.invoke(() -> { |
| AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender("ln"); |
| boolean paused = false; |
| if (sender.isRunning() && !sender.isPaused()) { |
| sender.pause(); |
| paused = true; |
| } |
| sender.setBatchSize(batchsize); |
| if (paused) { |
| sender.resume(); |
| } |
| }); |
| vm5.invoke(() -> { |
| AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender("ln"); |
| boolean paused = false; |
| if (sender.isRunning() && !sender.isPaused()) { |
| sender.pause(); |
| paused = true; |
| } |
| sender.setBatchSize(batchsize); |
| if (paused) { |
| sender.resume(); |
| } |
| }); |
| vm6.invoke(() -> { |
| AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender("ln"); |
| boolean paused = false; |
| if (sender.isRunning() && !sender.isPaused()) { |
| sender.pause(); |
| paused = true; |
| } |
| sender.setBatchSize(batchsize); |
| if (paused) { |
| sender.resume(); |
| } |
| }); |
| vm7.invoke(() -> { |
| AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender("ln"); |
| boolean paused = false; |
| if (sender.isRunning() && !sender.isPaused()) { |
| sender.pause(); |
| paused = true; |
| } |
| sender.setBatchSize(batchsize); |
| if (paused) { |
| sender.resume(); |
| } |
| }); |
| } |
| |
| private void updateBatchTimeInterval(int batchTimeInterval) { |
| vm4.invoke(() -> { |
| AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender("ln"); |
| boolean paused = false; |
| if (sender.isRunning() && !sender.isPaused()) { |
| sender.pause(); |
| paused = true; |
| } |
| sender.setBatchTimeInterval(batchTimeInterval); |
| if (paused) { |
| sender.resume(); |
| } |
| }); |
| vm5.invoke(() -> { |
| AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender("ln"); |
| boolean paused = false; |
| if (sender.isRunning() && !sender.isPaused()) { |
| sender.pause(); |
| paused = true; |
| } |
| sender.setBatchTimeInterval(batchTimeInterval); |
| if (paused) { |
| sender.resume(); |
| } |
| }); |
| vm6.invoke(() -> { |
| AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender("ln"); |
| boolean paused = false; |
| if (sender.isRunning() && !sender.isPaused()) { |
| sender.pause(); |
| paused = true; |
| } |
| sender.setBatchTimeInterval(batchTimeInterval); |
| if (paused) { |
| sender.resume(); |
| } |
| }); |
| vm7.invoke(() -> { |
| AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender("ln"); |
| boolean paused = false; |
| if (sender.isRunning() && !sender.isPaused()) { |
| sender.pause(); |
| paused = true; |
| } |
| sender.setBatchTimeInterval(batchTimeInterval); |
| if (paused) { |
| sender.resume(); |
| } |
| }); |
| } |
| |
| private void updateGroupTransactionEvents(boolean groupTransactionEvents) { |
| vm4.invoke(() -> { |
| AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender("ln"); |
| boolean paused = false; |
| if (sender.isRunning() && !sender.isPaused()) { |
| sender.pause(); |
| paused = true; |
| } |
| sender.setGroupTransactionEvents(groupTransactionEvents); |
| if (paused) { |
| sender.resume(); |
| } |
| }); |
| vm5.invoke(() -> { |
| AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender("ln"); |
| boolean paused = false; |
| if (sender.isRunning() && !sender.isPaused()) { |
| sender.pause(); |
| paused = true; |
| } |
| sender.setGroupTransactionEvents(groupTransactionEvents); |
| if (paused) { |
| sender.resume(); |
| } |
| }); |
| vm6.invoke(() -> { |
| AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender("ln"); |
| boolean paused = false; |
| if (sender.isRunning() && !sender.isPaused()) { |
| sender.pause(); |
| paused = true; |
| } |
| sender.setGroupTransactionEvents(groupTransactionEvents); |
| if (paused) { |
| sender.resume(); |
| } |
| }); |
| vm7.invoke(() -> { |
| AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender("ln"); |
| boolean paused = false; |
| if (sender.isRunning() && !sender.isPaused()) { |
| sender.pause(); |
| paused = true; |
| } |
| sender.setGroupTransactionEvents(groupTransactionEvents); |
| if (paused) { |
| sender.resume(); |
| } |
| }); |
| |
| } |
| |
| private void updateGatewayEventFilters(List<GatewayEventFilter> filters) { |
| vm4.invoke(() -> { |
| AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender("ln"); |
| boolean paused = false; |
| if (sender.isRunning() && !sender.isPaused()) { |
| sender.pause(); |
| paused = true; |
| } |
| sender.setGatewayEventFilters(filters); |
| if (paused) { |
| sender.resume(); |
| } |
| }); |
| vm5.invoke(() -> { |
| AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender("ln"); |
| boolean paused = false; |
| if (sender.isRunning() && !sender.isPaused()) { |
| sender.pause(); |
| paused = true; |
| } |
| sender.setGatewayEventFilters(filters); |
| if (paused) { |
| sender.resume(); |
| } |
| }); |
| vm6.invoke(() -> { |
| AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender("ln"); |
| boolean paused = false; |
| if (sender.isRunning() && !sender.isPaused()) { |
| sender.pause(); |
| paused = true; |
| } |
| sender.setGatewayEventFilters(filters); |
| if (paused) { |
| sender.resume(); |
| } |
| }); |
| vm7.invoke(() -> { |
| AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender("ln"); |
| boolean paused = false; |
| if (sender.isRunning() && !sender.isPaused()) { |
| sender.pause(); |
| paused = true; |
| } |
| sender.setGatewayEventFilters(filters); |
| if (paused) { |
| sender.resume(); |
| } |
| }); |
| } |
| |
| private void checkBatchSize(int batchsize) { |
| vm4.invoke(() -> { |
| AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender("ln"); |
| assertThat(sender.getBatchSize()).isEqualTo(batchsize); |
| }); |
| vm5.invoke(() -> { |
| AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender("ln"); |
| assertThat(sender.getBatchSize()).isEqualTo(batchsize); |
| }); |
| vm6.invoke(() -> { |
| AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender("ln"); |
| assertThat(sender.getBatchSize()).isEqualTo(batchsize); |
| }); |
| vm7.invoke(() -> { |
| AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender("ln"); |
| assertThat(sender.getBatchSize()).isEqualTo(batchsize); |
| }); |
| } |
| |
| private void checkBatchTimeInterval(int batchTimeInterval) { |
| vm4.invoke(() -> { |
| AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender("ln"); |
| assertThat(sender.getBatchTimeInterval()).isEqualTo(batchTimeInterval); |
| }); |
| vm5.invoke(() -> { |
| AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender("ln"); |
| assertThat(sender.getBatchTimeInterval()).isEqualTo(batchTimeInterval); |
| }); |
| vm6.invoke(() -> { |
| AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender("ln"); |
| assertThat(sender.getBatchTimeInterval()).isEqualTo(batchTimeInterval); |
| }); |
| vm7.invoke(() -> { |
| AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender("ln"); |
| assertThat(sender.getBatchTimeInterval()).isEqualTo(batchTimeInterval); |
| }); |
| } |
| |
| private void checkGroupTransactionEvents(boolean groupTransactionEvents) { |
| vm4.invoke(() -> { |
| AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender("ln"); |
| assertThat(sender.mustGroupTransactionEvents()).isEqualTo(groupTransactionEvents); |
| }); |
| vm5.invoke(() -> { |
| AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender("ln"); |
| assertThat(sender.mustGroupTransactionEvents()).isEqualTo(groupTransactionEvents); |
| }); |
| vm6.invoke(() -> { |
| AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender("ln"); |
| assertThat(sender.mustGroupTransactionEvents()).isEqualTo(groupTransactionEvents); |
| }); |
| vm7.invoke(() -> { |
| AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender("ln"); |
| assertThat(sender.mustGroupTransactionEvents()).isEqualTo(groupTransactionEvents); |
| }); |
| } |
| |
| private void stopSenders() { |
| vm4.invoke(() -> stopSender("ln")); |
| vm5.invoke(() -> stopSender("ln")); |
| vm6.invoke(() -> stopSender("ln")); |
| vm7.invoke(() -> stopSender("ln")); |
| } |
| |
| private void stopSendersAsync() throws InterruptedException { |
| AsyncInvocation inv1 = vm4.invokeAsync(() -> stopSender("ln")); |
| AsyncInvocation inv2 = vm5.invokeAsync(() -> stopSender("ln")); |
| AsyncInvocation inv3 = vm6.invokeAsync(() -> stopSender("ln")); |
| AsyncInvocation inv4 = vm7.invokeAsync(() -> stopSender("ln")); |
| inv1.await(); |
| inv2.await(); |
| inv3.await(); |
| inv4.await(); |
| } |
| |
| private void waitForSendersRunning() { |
| vm4.invoke(() -> waitForSenderRunningState("ln")); |
| vm5.invoke(() -> waitForSenderRunningState("ln")); |
| vm6.invoke(() -> waitForSenderRunningState("ln")); |
| vm7.invoke(() -> waitForSenderRunningState("ln")); |
| } |
| |
| private void waitForAllSendersNotRunning() { |
| vm4.invoke(() -> waitForSenderNonRunningState("ln")); |
| vm5.invoke(() -> waitForSenderNonRunningState("ln")); |
| vm6.invoke(() -> waitForSenderNonRunningState("ln")); |
| vm7.invoke(() -> waitForSenderNonRunningState("ln")); |
| } |
| |
| private void validateParallelSenderQueueAllBucketsDrained() { |
| vm4.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln")); |
| vm5.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln")); |
| vm6.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln")); |
| vm7.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln")); |
| } |
| |
| private static class CountSentPQRMObserver extends DistributionMessageObserver |
| implements Serializable { |
| private final AtomicInteger numberOfSentPQRM = new AtomicInteger(0); |
| |
| @Override |
| public void beforeSendMessage(ClusterDistributionManager dm, DistributionMessage message) { |
| if (message instanceof ParallelQueueRemovalMessage) { |
| numberOfSentPQRM.addAndGet(message.getRecipients().size()); |
| } |
| } |
| |
| public int getNumberOfSentPQRM() { |
| return numberOfSentPQRM.get(); |
| } |
| } |
| |
| } |