GEODE-8714: return event to queue at stoping of gw sender (#5752)
* GEODE-8714: return event to queue at stoping of gw sender
* GEODE-8714: added test
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 9366d5f..2788bb2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1731,12 +1731,9 @@
for (int i = helpArray.length - 1; i >= 0; i--) {
GatewaySenderEventImpl event = (GatewaySenderEventImpl) helpArray[i];
final int bucketId = event.getBucketId();
- final PartitionedRegion region = (PartitionedRegion) event.getRegion();
- if (region.getRegionAdvisor().isPrimaryForBucket(bucketId)) {
- BucketRegionQueue brq = getBucketRegionQueueByBucketId(getRandomShadowPR(), bucketId);
- if (brq != null) {
- brq.pushKeyIntoQueue(event.getShadowKey());
- }
+ BucketRegionQueue brq = getBucketRegionQueueByBucketId(getRandomShadowPR(), bucketId);
+ if (brq != null) {
+ brq.pushKeyIntoQueue(event.getShadowKey());
}
}
}
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
index 7070ae0..9364187 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
@@ -1897,6 +1897,106 @@
vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 3000));
}
+ /**
+ * Enable persistence for PR and GatewaySender. Do some puts in local region. Restart 1 server,
+ * then stop gateway sender, and stop server. After that create receiver on remote site.
+ * Check if the remote site receives all the events.
+ */
+ @Test
+ public void testPersistentPRWithGatewaySenderPersistenceEnabled_RestartAndStopServer() {
+ // create locator on local site
+ Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+ // create locator on remote site
+ Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+ // create receiver on remote site
+ createCacheInVMs(nyPort, vm2, vm3);
+ // createReceiverInVMs(vm2, vm3);
+
+ // create cache in local site
+ createCacheInVMs(lnPort, vm4, vm5);
+ vm4.invoke(() -> setNumDispatcherThreadsForTheRun(2));
+ vm5.invoke(() -> setNumDispatcherThreadsForTheRun(2));
+
+ // create senders with disk store
+ String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, true));
+ String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, true));
+
+ LogWriterUtils.getLogWriter()
+ .info("The DS are: " + diskStore1 + "," + diskStore2);
+
+ // create PR on remote site
+ vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
+ 13, isOffHeap()));
+ vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
+ 13, isOffHeap()));
+
+ // create PR on local site
+ vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
+ 13, isOffHeap()));
+ vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
+ 13, isOffHeap()));
+
+ // start the senders on local site
+ startSenderInVMs("ln", vm4, vm5);
+
+ // wait for senders to become running
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+
+ // start puts in region on local site
+ vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 10));
+ LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+ // --------------------close and rebuild local site
+ // -------------------------------------------------
+ // kill the sender in vm5
+ vm5.invoke(killSenderRunnable());
+
+ LogWriterUtils.getLogWriter().info("Killed vm5 sender.");
+
+ // restart the vm
+ createCacheInVMs(lnPort, vm5);
+ vm5.invoke(() -> setNumDispatcherThreadsForTheRun(2));
+
+ LogWriterUtils.getLogWriter().info("Created back the cache");
+
+ // create senders with disk store
+ vm5.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
+ null, diskStore2, false));
+
+ LogWriterUtils.getLogWriter().info("Created the senders back from the disk store.");
+ // create PR on local site
+ vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
+ 13, isOffHeap()));
+
+
+ LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
+
+ LogWriterUtils.getLogWriter().info("Waiting for senders running.");
+ // wait for senders running
+ vm5.invoke(waitForSenderRunnable());
+
+ LogWriterUtils.getLogWriter().info("All the senders are now running...");
+
+ // ----------------------------------------------------------------------------------------------------
+
+ vm4.invoke(() -> WANTestBase.stopSender("ln"));
+ vm5.invoke(() -> WANTestBase.stopSender("ln"));
+
+ vm5.invoke(killSenderRunnable());
+
+ vm4.invoke(() -> WANTestBase.startSender("ln"));
+
+
+ createReceiverInVMs(vm2, vm3);
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 10));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 10));
+ }
+
/**
* setIgnoreQueue has lots of callers by reflection