GEODE-8745: locking maps, to prevent concurrent use by Listeners (#5815)

   * Also re-checking the isPrimary status, as after the first check, handleFailover may take the lock and clean null out the maps
   * This may cause NPEs and checking the status after the lock is acquired.
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
index b372661..bbb144e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
@@ -110,10 +110,11 @@
   public SerialGatewaySenderEventProcessor(AbstractGatewaySender sender, String id,
       ThreadsMonitoring tMonitoring, boolean cleanQueues) {
     super("Event Processor for GatewaySender_" + id, sender, tMonitoring);
-
-    initializeMessageQueue(id, cleanQueues);
-    this.unprocessedEvents = new LinkedHashMap<EventID, EventWrapper>();
-    this.unprocessedTokens = new LinkedHashMap<EventID, Long>();
+    synchronized (this.unprocessedEventsLock) {
+      initializeMessageQueue(id, cleanQueues);
+      this.unprocessedEvents = new LinkedHashMap<EventID, EventWrapper>();
+      this.unprocessedTokens = new LinkedHashMap<EventID, Long>();
+    }
   }
 
   @Override
@@ -624,6 +625,11 @@
     GatewaySenderStats statistics = this.sender.getStatistics();
     // Get the event from the map
     synchronized (unprocessedEventsLock) {
+      // If handleFailover() acquired the lock hence double checking
+      if (this.sender.isPrimary()) {
+        // no need to do anything if we have become the primary
+        return false;
+      }
       if (this.unprocessedEvents == null)
         return false;
       // now we can safely use the unprocessedEvents field
@@ -645,6 +651,11 @@
     GatewaySenderStats statistics = this.sender.getStatistics();
     // Get the event from the map
     synchronized (unprocessedEventsLock) {
+      // If handleFailover() acquired the lock hence double checking
+      if (this.sender.isPrimary()) {
+        // no need to do anything if we have become the primary
+        return;
+      }
       if (this.unprocessedEvents == null)
         return;
       // now we can safely use the unprocessedEvents field
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDistributedTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDistributedTest.java
index 8ba8cf2..e3e3ede 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDistributedTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDistributedTest.java
@@ -1216,7 +1216,8 @@
     await()
         .untilAsserted(() -> {
           assertThat(sender.getStatistics().getUnprocessedEventMapSize())
-              .as("Sender statistics unprocessed event map size")
+              .as("Sender statistics unprocessed event map contents: "
+                  + sender.getEventProcessor().printUnprocessedEvents())
               .isEqualTo(queueSize);
         });
   }