GEODE-8745: Handle race between batch destroy msg and sec event. (#5821)

  
   * Primary serial sender receives an event but the sender is not running yet. Hence it has to drop the event.
   * Primary has to inform the secondary to remove the event from the unprocessedEventsMap.
   * It has to do that because the event is dropped and the cache listener on the secondary queue will not be triggered to remove it.
   * Hence, the primary sends BatchDestroyOperation to the secondary with the tail key set to -1 to trigger removal from unprocessedEventMap.
   * There is a race in which the BatchDestroyOperation arrives before the secondary event.
   * In this case the event is never removed from the unprocessedEventsMap.
   * In this fix, if this happens the event ID is placed in the unprocessedTokensMap.
   * So, when the secondary event arrives, it checks the unprocessedTokensMap and then realizes that the primary has seen the event and there is no need to place the event in the unprocessedEventsMap.

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 071974d..7f6c8a1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -1068,8 +1068,8 @@
           if (isDebugEnabled) {
             logger.debug("Returning back without putting into the gateway sender queue:" + event);
           }
-          if (this.eventProcessor != null) {
-            this.eventProcessor.registerEventDroppedInPrimaryQueue(event);
+          if (this.isPrimary()) {
+            recordDroppedEvent(clonedEvent);
           }
           return;
         }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation.java
index b493949..4f2fbfd 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation.java
@@ -144,7 +144,7 @@
           }
           if (ep != null) {
             // if sender is being shutdown, the ep could be null
-            boolean removed = ep.basicHandlePrimaryDestroy(ev.getEventId());
+            boolean removed = ep.basicHandlePrimaryDestroy(ev.getEventId(), true);
             if (removed) {
               if (isDebugEnabled) {
                 logger.debug("Removed a dropped event {} from unprocessedEvents.",
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
index bbb144e..6efac4d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
@@ -607,7 +607,7 @@
       my_executor.execute(new Runnable() {
         @Override
         public void run() {
-          basicHandlePrimaryDestroy(gatewayEvent.getEventId());
+          basicHandlePrimaryDestroy(gatewayEvent.getEventId(), false);
         }
       });
     }
@@ -617,7 +617,8 @@
    * Just remove the event from the unprocessed events map if it is present. This method added to
    * fix bug 37603
    */
-  protected boolean basicHandlePrimaryDestroy(final EventID eventId) {
+  protected boolean basicHandlePrimaryDestroy(final EventID eventId,
+      boolean addToUnprocessedTokens) {
     if (this.sender.isPrimary()) {
       // no need to do anything if we have become the primary
       return false;
@@ -638,6 +639,18 @@
         ew.event.release();
         statistics.incUnprocessedEventsRemovedByPrimary();
         return true;
+      } else if (addToUnprocessedTokens) {
+        // Secondary event may not have arrived
+        if (logger.isTraceEnabled()) {
+          logger.trace("{}: fromPrimary destroy event {} : added to unprocessed token map",
+              sender.getId(), eventId);
+        }
+        Long mapValue =
+            System.currentTimeMillis() + AbstractGatewaySender.TOKEN_TIMEOUT;
+        Long oldv = this.unprocessedTokens.put(eventId, mapValue);
+        if (oldv == null) {
+          statistics.incUnprocessedTokensAddedByPrimary();
+        }
       }
     }
     return false;
@@ -670,13 +683,10 @@
         }
         {
           Long mapValue =
-              Long.valueOf(System.currentTimeMillis() + AbstractGatewaySender.TOKEN_TIMEOUT);
+              System.currentTimeMillis() + AbstractGatewaySender.TOKEN_TIMEOUT;
           Long oldv = this.unprocessedTokens.put(gatewayEvent.getEventId(), mapValue);
           if (oldv == null) {
             statistics.incUnprocessedTokensAddedByPrimary();
-          } else {
-            // its ok for oldv to be non-null
-            // this shouldn't happen anymore @todo add an assertion here
           }
         }
       } else {
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPersistenceEnabledGatewaySenderDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPersistenceEnabledGatewaySenderDUnitTest.java
index bc5e570..71c9e49 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPersistenceEnabledGatewaySenderDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPersistenceEnabledGatewaySenderDUnitTest.java
@@ -577,18 +577,14 @@
     Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
 
     createCacheInVMs(nyPort, vm2, vm3);
-    createReceiverInVMs(vm2, vm3);
 
     createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
-    String firstDStore = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
+    vm4.invoke("Creating DS", () -> WANTestBase.createSenderWithDiskStore("ln", 2,
         false, 100, 10, false, true, null, null, true));
-    String secondDStore = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
+    vm5.invoke("Creating DS", () -> WANTestBase.createSenderWithDiskStore("ln", 2,
         false, 100, 10, false, true, null, null, true));
 
-    logger.info("The first ds is " + firstDStore);
-    logger.info("The second ds is " + secondDStore);
-
     vm2.invoke(
         () -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", null, isOffHeap()));
     vm3.invoke(
@@ -605,26 +601,21 @@
     vm7.invoke(() -> WANTestBase.createPersistentReplicatedRegion(getTestMethodName() + "_RR", "ln",
         isOffHeap()));
 
-    vm4.invoke(() -> WANTestBase.pauseSender("ln"));
-    vm5.invoke(() -> WANTestBase.pauseSender("ln"));
-
-    vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
-
-    logger.info("Completed puts in the region");
-
-    vm4.invoke(() -> WANTestBase.stopSender("ln"));
-    vm5.invoke(() -> WANTestBase.stopSender("ln"));
+    vm4.invoke("Puts in the region" + getTestMethodName() + "_RR",
+        () -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
 
 
-    logger.info("Stopped all the senders. ");
+    vm4.invoke("Stopping ln sender", () -> WANTestBase.stopSender("ln"));
+    vm5.invoke("Stopping ln sender", () -> WANTestBase.stopSender("ln"));
 
-    AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.startSenderwithCleanQueues("ln"));
-    logger.info("Started the sender in vm 4");
+    createReceiverInVMs(vm2, vm3);
 
-    vm5.invoke(() -> WANTestBase.startSenderwithCleanQueues("ln"));
-    logger.info("Started the sender in vm 5");
+    AsyncInvocation<?> inv1 = vm4.invokeAsync("Starting sender with clean queues",
+        () -> WANTestBase.startSenderwithCleanQueues("ln"));
+    vm5.invoke("Starting sender with clean queues",
+        () -> WANTestBase.startSenderwithCleanQueues("ln"));
     try {
-      inv1.join();
+      inv1.await();
     } catch (InterruptedException e) {
       fail("Got interrupted exception while waiting for startSender to finish.");
     }