GEODE-8714: return event to queue at stoping of gw sender (#5752)

* GEODE-8714: return event to queue at stoping of gw sender

* GEODE-8714: added test
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 9366d5f..2788bb2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1731,12 +1731,9 @@
     for (int i = helpArray.length - 1; i >= 0; i--) {
       GatewaySenderEventImpl event = (GatewaySenderEventImpl) helpArray[i];
       final int bucketId = event.getBucketId();
-      final PartitionedRegion region = (PartitionedRegion) event.getRegion();
-      if (region.getRegionAdvisor().isPrimaryForBucket(bucketId)) {
-        BucketRegionQueue brq = getBucketRegionQueueByBucketId(getRandomShadowPR(), bucketId);
-        if (brq != null) {
-          brq.pushKeyIntoQueue(event.getShadowKey());
-        }
+      BucketRegionQueue brq = getBucketRegionQueueByBucketId(getRandomShadowPR(), bucketId);
+      if (brq != null) {
+        brq.pushKeyIntoQueue(event.getShadowKey());
       }
     }
   }
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
index 7070ae0..9364187 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
@@ -1897,6 +1897,106 @@
     vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 3000));
   }
 
+  /**
+   * Enable persistence for PR and GatewaySender. Do some puts in local region. Restart 1 server,
+   * then stop gateway sender, and stop server. After that create receiver on remote site.
+   * Check if the remote site receives all the events.
+   */
+  @Test
+  public void testPersistentPRWithGatewaySenderPersistenceEnabled_RestartAndStopServer() {
+    // create locator on local site
+    Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    // create locator on remote site
+    Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    // create receiver on remote site
+    createCacheInVMs(nyPort, vm2, vm3);
+    // createReceiverInVMs(vm2, vm3);
+
+    // create cache in local site
+    createCacheInVMs(lnPort, vm4, vm5);
+    vm4.invoke(() -> setNumDispatcherThreadsForTheRun(2));
+    vm5.invoke(() -> setNumDispatcherThreadsForTheRun(2));
+
+    // create senders with disk store
+    String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
+        true, 100, 10, false, true, null, null, true));
+    String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
+        true, 100, 10, false, true, null, null, true));
+
+    LogWriterUtils.getLogWriter()
+        .info("The DS are: " + diskStore1 + "," + diskStore2);
+
+    // create PR on remote site
+    vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
+        13, isOffHeap()));
+    vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
+        13, isOffHeap()));
+
+    // create PR on local site
+    vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
+        13, isOffHeap()));
+    vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
+        13, isOffHeap()));
+
+    // start the senders on local site
+    startSenderInVMs("ln", vm4, vm5);
+
+    // wait for senders to become running
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+
+    // start puts in region on local site
+    vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 10));
+    LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+    // --------------------close and rebuild local site
+    // -------------------------------------------------
+    // kill the sender in vm5
+    vm5.invoke(killSenderRunnable());
+
+    LogWriterUtils.getLogWriter().info("Killed vm5 sender.");
+
+    // restart the vm
+    createCacheInVMs(lnPort, vm5);
+    vm5.invoke(() -> setNumDispatcherThreadsForTheRun(2));
+
+    LogWriterUtils.getLogWriter().info("Created back the cache");
+
+    // create senders with disk store
+    vm5.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
+        null, diskStore2, false));
+
+    LogWriterUtils.getLogWriter().info("Created the senders back from the disk store.");
+    // create PR on local site
+    vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
+        13, isOffHeap()));
+
+
+    LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
+
+    LogWriterUtils.getLogWriter().info("Waiting for senders running.");
+    // wait for senders running
+    vm5.invoke(waitForSenderRunnable());
+
+    LogWriterUtils.getLogWriter().info("All the senders are now running...");
+
+    // ----------------------------------------------------------------------------------------------------
+
+    vm4.invoke(() -> WANTestBase.stopSender("ln"));
+    vm5.invoke(() -> WANTestBase.stopSender("ln"));
+
+    vm5.invoke(killSenderRunnable());
+
+    vm4.invoke(() -> WANTestBase.startSender("ln"));
+
+
+    createReceiverInVMs(vm2, vm3);
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 10));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 10));
+  }
+
 
   /**
    * setIgnoreQueue has lots of callers by reflection