GEODE-7537: use dm.getCache instead of CacheFactory.getAnyInstance (#4486)



(cherry picked from commit 53f851c94b6ee2e62e8e16f3cd73e77cec54eb19)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation.java
index 08d1e5b..c13b3c5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation.java
@@ -27,7 +27,6 @@
 
 import org.apache.geode.DataSerializer;
 import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.wan.GatewayQueueEvent;
 import org.apache.geode.cache.wan.GatewaySender;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
@@ -41,6 +40,7 @@
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.InitialImageOperation;
+import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.InternalRegion;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.versions.VersionTag;
@@ -95,7 +95,7 @@
   }
 
   protected GemFireCacheImpl getCache() {
-    return (GemFireCacheImpl) CacheFactory.getAnyInstance();
+    return (GemFireCacheImpl) region.getDistributionManager().getCache();
   }
 
   private void initializeEntriesToSynchronize(
@@ -163,8 +163,8 @@
       }
     }
 
-    private Cache getCache() {
-      return CacheFactory.getAnyInstance();
+    Cache getCache() {
+      return dmgr.getCache();
     }
   }
 
@@ -198,7 +198,7 @@
           logger.debug("{}: Providing synchronization region={}; entriesToSynchronize={}",
               getClass().getSimpleName(), this.regionPath, this.entriesToSynchronize);
         }
-        result = getSynchronizationEvents();
+        result = getSynchronizationEvents(dm.getCache());
       } catch (Throwable t) {
         replyException = new ReplyException(t);
       } finally {
@@ -218,15 +218,14 @@
       }
     }
 
-    private Object getSynchronizationEvents() {
+    private Object getSynchronizationEvents(InternalCache cache) {
       List<Map<String, GatewayQueueEvent>> results = new ArrayList<>();
       // Get the region
-      GemFireCacheImpl gfci = (GemFireCacheImpl) getCache();
-      LocalRegion region = (LocalRegion) gfci.getRegion(this.regionPath);
+      LocalRegion region = (LocalRegion) cache.getRegion(this.regionPath);
 
       // Add the appropriate GatewaySenderEventImpl from each GatewaySender for each entry
       Set<String> allGatewaySenderIds = region.getAllGatewaySenderIds();
-      for (GatewaySender sender : gfci.getAllGatewaySenders()) {
+      for (GatewaySender sender : cache.getAllGatewaySenders()) {
         if (allGatewaySenderIds.contains(sender.getId())) {
           for (GatewaySenderQueueEntrySynchronizationEntry entry : this.entriesToSynchronize) {
             Map<String, GatewayQueueEvent> resultForOneEntry = new HashMap<>();
@@ -243,10 +242,6 @@
       return results;
     }
 
-    private Cache getCache() {
-      return CacheFactory.getAnyInstance();
-    }
-
     @Override
     public int getDSFID() {
       return GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_MESSAGE;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 35f3b20..1b26b60 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -659,9 +659,10 @@
     boolean putDone = false;
     // Can this region ever be null? Should we work with regionName and not with region
     // instance.
-    // It can't be as put is happeing on the region and its still under process
+    // It can't be as put is happening on the region and its still under process
     GatewaySenderEventImpl value = (GatewaySenderEventImpl) object;
-    boolean isDREvent = isDREvent(value);
+
+    boolean isDREvent = isDREvent(sender.getCache(), value);
 
     Region region = value.getRegion();
     String regionPath = null;
@@ -922,8 +923,9 @@
     return prQ;
   }
 
-  private boolean isDREvent(GatewaySenderEventImpl event) {
-    return (event.getRegion() instanceof DistributedRegion) ? true : false;
+  boolean isDREvent(InternalCache cache, GatewaySenderEventImpl event) {
+    Region region = cache.getRegion(event.getRegionPath());
+    return region instanceof DistributedRegion;
   }
 
   /**
@@ -1001,7 +1003,7 @@
         int bucketId = -1;
         Object key = null;
         if (event.getRegion() != null) {
-          if (isDREvent(event)) {
+          if (isDREvent(sender.getCache(), event)) {
             prQ = this.userRegionNameToshadowPRMap.get(event.getRegion().getFullPath());
             bucketId = event.getEventId().getBucketID();
             key = event.getEventId();
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperationTest.java
new file mode 100644
index 0000000..e9f0466
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperationTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InitialImageOperation;
+import org.apache.geode.internal.cache.InternalRegion;
+
+public class GatewaySenderQueueEntrySynchronizationOperationTest {
+  private DistributionManager distributionManager;
+  private InternalDistributedMember recipient;
+  private GatewaySenderQueueEntrySynchronizationOperation operation;
+  private InternalRegion region;
+  private GemFireCacheImpl cache;
+
+  @Before
+  public void setup() {
+    distributionManager = mock(DistributionManager.class, RETURNS_DEEP_STUBS);
+    recipient = mock(InternalDistributedMember.class);
+    region = mock(InternalRegion.class);
+    cache = mock(GemFireCacheImpl.class);
+  }
+
+  @Test
+  public void ReplyProcessorGetCacheDelegateToDistributionManager() {
+    operation = mock(GatewaySenderQueueEntrySynchronizationOperation.class);
+    GatewaySenderQueueEntrySynchronizationOperation.GatewaySenderQueueEntrySynchronizationReplyProcessor processor =
+        new GatewaySenderQueueEntrySynchronizationOperation.GatewaySenderQueueEntrySynchronizationReplyProcessor(
+            distributionManager, recipient, operation);
+    when(distributionManager.getCache()).thenReturn(cache);
+
+    assertThat(processor.getCache()).isEqualTo(cache);
+  }
+
+  @Test
+  public void GatewaySenderQueueEntrySynchronizationOperationGetCacheDelegateToDistributionManager() {
+    InitialImageOperation.Entry entry = mock(InitialImageOperation.Entry.class);
+    List<InitialImageOperation.Entry> list = new ArrayList<>();
+    list.add(entry);
+    operation = new GatewaySenderQueueEntrySynchronizationOperation(recipient, region, list);
+    when(region.getDistributionManager()).thenReturn(distributionManager);
+    when(distributionManager.getCache()).thenReturn(cache);
+
+    assertThat(operation.getCache()).isEqualTo(cache);
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
index d03a15c..88c3e45 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
@@ -14,6 +14,7 @@
  */
 package org.apache.geode.internal.cache.wan.parallel;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
@@ -39,6 +40,7 @@
 import org.apache.geode.cache.Region;
 import org.apache.geode.internal.cache.AbstractBucketRegionQueue;
 import org.apache.geode.internal.cache.BucketRegionQueue;
+import org.apache.geode.internal.cache.DistributedRegion;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.PartitionedRegionDataStore;
@@ -155,6 +157,32 @@
     assertEquals(3, queue.localSize());
   }
 
+  @Test
+  public void isDREventReturnsTrueForDistributedRegionEvent() {
+    String regionPath = "regionPath";
+    GatewaySenderEventImpl event = mock(GatewaySenderEventImpl.class);
+    when(event.getRegionPath()).thenReturn(regionPath);
+    DistributedRegion region = mock(DistributedRegion.class);
+    when(cache.getRegion(regionPath)).thenReturn(region);
+    ParallelGatewaySenderQueue queue = mock(ParallelGatewaySenderQueue.class);
+    when(queue.isDREvent(cache, event)).thenCallRealMethod();
+
+    assertThat(queue.isDREvent(cache, event)).isTrue();
+  }
+
+  @Test
+  public void isDREventReturnsFalseForPartitionedRegionEvent() {
+    String regionPath = "regionPath";
+    GatewaySenderEventImpl event = mock(GatewaySenderEventImpl.class);
+    when(event.getRegionPath()).thenReturn(regionPath);
+    PartitionedRegion region = mock(PartitionedRegion.class);
+    when(cache.getRegion(regionPath)).thenReturn(region);
+    ParallelGatewaySenderQueue queue = mock(ParallelGatewaySenderQueue.class);
+    when(queue.isDREvent(cache, event)).thenCallRealMethod();
+
+    assertThat(queue.isDREvent(cache, event)).isFalse();
+  }
+
   private PartitionedRegion mockPR(String name) {
     PartitionedRegion region = mock(PartitionedRegion.class);
     when(region.getFullPath()).thenReturn(name);