GEODE-8745: locking maps, to prevent concurrent use by Listeners (#5815)
* Also re-checking the isPrimary status, as after the first check, handleFailover may take the lock and clean null out the maps
* This may cause NPEs and checking the status after the lock is acquired.
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
index b372661..bbb144e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
@@ -110,10 +110,11 @@
public SerialGatewaySenderEventProcessor(AbstractGatewaySender sender, String id,
ThreadsMonitoring tMonitoring, boolean cleanQueues) {
super("Event Processor for GatewaySender_" + id, sender, tMonitoring);
-
- initializeMessageQueue(id, cleanQueues);
- this.unprocessedEvents = new LinkedHashMap<EventID, EventWrapper>();
- this.unprocessedTokens = new LinkedHashMap<EventID, Long>();
+ synchronized (this.unprocessedEventsLock) {
+ initializeMessageQueue(id, cleanQueues);
+ this.unprocessedEvents = new LinkedHashMap<EventID, EventWrapper>();
+ this.unprocessedTokens = new LinkedHashMap<EventID, Long>();
+ }
}
@Override
@@ -624,6 +625,11 @@
GatewaySenderStats statistics = this.sender.getStatistics();
// Get the event from the map
synchronized (unprocessedEventsLock) {
+ // If handleFailover() acquired the lock hence double checking
+ if (this.sender.isPrimary()) {
+ // no need to do anything if we have become the primary
+ return false;
+ }
if (this.unprocessedEvents == null)
return false;
// now we can safely use the unprocessedEvents field
@@ -645,6 +651,11 @@
GatewaySenderStats statistics = this.sender.getStatistics();
// Get the event from the map
synchronized (unprocessedEventsLock) {
+ // If handleFailover() acquired the lock hence double checking
+ if (this.sender.isPrimary()) {
+ // no need to do anything if we have become the primary
+ return;
+ }
if (this.unprocessedEvents == null)
return;
// now we can safely use the unprocessedEvents field
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDistributedTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDistributedTest.java
index 8ba8cf2..e3e3ede 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDistributedTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDistributedTest.java
@@ -1216,7 +1216,8 @@
await()
.untilAsserted(() -> {
assertThat(sender.getStatistics().getUnprocessedEventMapSize())
- .as("Sender statistics unprocessed event map size")
+ .as("Sender statistics unprocessed event map contents: "
+ + sender.getEventProcessor().printUnprocessedEvents())
.isEqualTo(queueSize);
});
}