GEODE-8833: Set possible dupliate flag for persistent region (#5904)

For persistent region, even though a version tag is not recovered, the possibleDuplicate
  flag is still needed to be set for the client retry message.
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Put65.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Put65.java
index 5e1a3de..db8ef00 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Put65.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Put65.java
@@ -229,16 +229,8 @@
 
     // msg.isRetry might be set by v7.0 and later clients
     if (clientMessage.isRetry()) {
-      // if (logger.isDebugEnabled()) {
-      // logger.debug("DEBUG: encountered isRetry in Put65");
-      // }
-      clientEvent.setPossibleDuplicate(true);
-      if (region.getAttributes().getConcurrencyChecksEnabled()) {
-        // recover the version tag from other servers
-        clientEvent.setRegion(region);
-        if (!recoverVersionTagForRetriedOperation(clientEvent)) {
-          clientEvent.setPossibleDuplicate(false); // no-one has seen this event
-        }
+      if (shouldSetPossibleDuplicate(region, clientEvent)) {
+        clientEvent.setPossibleDuplicate(true);
       }
     }
 
@@ -498,6 +490,22 @@
 
   }
 
+  boolean shouldSetPossibleDuplicate(LocalRegion region, EventIDHolder clientEvent) {
+    boolean shouldSetPossibleDuplicate = true;
+    if (region.getAttributes().getConcurrencyChecksEnabled()) {
+      // recover the version tag from other servers
+      clientEvent.setRegion(region);
+      boolean withPersistence = region.getAttributes().getDataPolicy().withPersistence();
+      if (!recoverVersionTagForRetriedOperation(clientEvent) && !withPersistence) {
+        // For persistent region, it is possible that all persistent copies went offline.
+        // Do not reset possible duplicate in this case, as persistent data
+        // can be recovered during the retry after recover of version tag failed.
+        shouldSetPossibleDuplicate = false; // no-one has seen this event
+      }
+    }
+    return shouldSetPossibleDuplicate;
+  }
+
   protected void writeReply(Message origMsg, ServerConnection servConn, boolean sendOldValue,
       boolean oldValueIsObject, Object oldValue, VersionTag tag) throws IOException {
     Message replyMsg = servConn.getReplyMessage();
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Test.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Test.java
index 90d786d..a1f59f6 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Test.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Test.java
@@ -18,6 +18,7 @@
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
@@ -29,10 +30,14 @@
 import org.mockito.ArgumentCaptor;
 import org.mockito.InjectMocks;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 
 import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.operations.PutOperationContext;
+import org.apache.geode.internal.cache.EventIDHolder;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.TXManagerImpl;
@@ -97,6 +102,12 @@
   private Message errorResponseMessage;
   @Mock
   private Message replyMessage;
+  @Mock
+  private RegionAttributes attributes;
+  @Mock
+  private EventIDHolder clientEvent;
+  @Mock
+  private DataPolicy dataPolicy;
 
   @InjectMocks
   private Put65 put65;
@@ -157,6 +168,9 @@
 
     when(this.valuePart.getSerializedForm()).thenReturn(VALUE);
     when(this.valuePart.isObject()).thenReturn(true);
+
+    when(localRegion.getAttributes()).thenReturn(attributes);
+    when(attributes.getDataPolicy()).thenReturn(dataPolicy);
   }
 
   @Test
@@ -255,4 +269,41 @@
     verify(this.errorResponseMessage).send(this.serverConnection);
   }
 
+  @Test
+  public void shouldSetPossibleDuplicateReturnsTrueIfConcurrencyChecksNotEnabled() {
+
+    when(attributes.getConcurrencyChecksEnabled()).thenReturn(false);
+
+    assertThat(put65.shouldSetPossibleDuplicate(localRegion, clientEvent)).isTrue();
+  }
+
+  @Test
+  public void shouldSetPossibleDuplicateReturnsTrueIfRecoveredVersionTagForRetriedOperation() {
+    Put65 spy = Mockito.spy(put65);
+    when(attributes.getConcurrencyChecksEnabled()).thenReturn(true);
+    doReturn(true).when(spy).recoverVersionTagForRetriedOperation(clientEvent);
+
+    assertThat(spy.shouldSetPossibleDuplicate(localRegion, clientEvent)).isTrue();
+  }
+
+  @Test
+  public void shouldSetPossibleDuplicateReturnsFalseIfNotRecoveredVersionTagAndNoPersistence() {
+    Put65 spy = Mockito.spy(put65);
+    when(attributes.getConcurrencyChecksEnabled()).thenReturn(true);
+    when(dataPolicy.withPersistence()).thenReturn(false);
+    doReturn(false).when(spy).recoverVersionTagForRetriedOperation(clientEvent);
+
+    assertThat(spy.shouldSetPossibleDuplicate(localRegion, clientEvent)).isFalse();
+  }
+
+  @Test
+  public void shouldSetPossibleDuplicateReturnsTrueIfNotRecoveredVersionTagAndWithPersistence() {
+    Put65 spy = Mockito.spy(put65);
+    when(attributes.getConcurrencyChecksEnabled()).thenReturn(true);
+    when(dataPolicy.withPersistence()).thenReturn(true);
+    doReturn(false).when(spy).recoverVersionTagForRetriedOperation(clientEvent);
+
+    assertThat(spy.shouldSetPossibleDuplicate(localRegion, clientEvent)).isTrue();
+  }
+
 }