GEODE-8833: Set possible dupliate flag for persistent region (#5904)
For persistent region, even though a version tag is not recovered, the possibleDuplicate
flag is still needed to be set for the client retry message.
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Put65.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Put65.java
index 5e1a3de..db8ef00 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Put65.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Put65.java
@@ -229,16 +229,8 @@
// msg.isRetry might be set by v7.0 and later clients
if (clientMessage.isRetry()) {
- // if (logger.isDebugEnabled()) {
- // logger.debug("DEBUG: encountered isRetry in Put65");
- // }
- clientEvent.setPossibleDuplicate(true);
- if (region.getAttributes().getConcurrencyChecksEnabled()) {
- // recover the version tag from other servers
- clientEvent.setRegion(region);
- if (!recoverVersionTagForRetriedOperation(clientEvent)) {
- clientEvent.setPossibleDuplicate(false); // no-one has seen this event
- }
+ if (shouldSetPossibleDuplicate(region, clientEvent)) {
+ clientEvent.setPossibleDuplicate(true);
}
}
@@ -498,6 +490,22 @@
}
+ boolean shouldSetPossibleDuplicate(LocalRegion region, EventIDHolder clientEvent) {
+ boolean shouldSetPossibleDuplicate = true;
+ if (region.getAttributes().getConcurrencyChecksEnabled()) {
+ // recover the version tag from other servers
+ clientEvent.setRegion(region);
+ boolean withPersistence = region.getAttributes().getDataPolicy().withPersistence();
+ if (!recoverVersionTagForRetriedOperation(clientEvent) && !withPersistence) {
+ // For persistent region, it is possible that all persistent copies went offline.
+ // Do not reset possible duplicate in this case, as persistent data
+ // can be recovered during the retry after recover of version tag failed.
+ shouldSetPossibleDuplicate = false; // no-one has seen this event
+ }
+ }
+ return shouldSetPossibleDuplicate;
+ }
+
protected void writeReply(Message origMsg, ServerConnection servConn, boolean sendOldValue,
boolean oldValueIsObject, Object oldValue, VersionTag tag) throws IOException {
Message replyMsg = servConn.getReplyMessage();
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Test.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Test.java
index 90d786d..a1f59f6 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Test.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Test.java
@@ -18,6 +18,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@@ -29,10 +30,14 @@
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.operations.PutOperationContext;
+import org.apache.geode.internal.cache.EventIDHolder;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.TXManagerImpl;
@@ -97,6 +102,12 @@
private Message errorResponseMessage;
@Mock
private Message replyMessage;
+ @Mock
+ private RegionAttributes attributes;
+ @Mock
+ private EventIDHolder clientEvent;
+ @Mock
+ private DataPolicy dataPolicy;
@InjectMocks
private Put65 put65;
@@ -157,6 +168,9 @@
when(this.valuePart.getSerializedForm()).thenReturn(VALUE);
when(this.valuePart.isObject()).thenReturn(true);
+
+ when(localRegion.getAttributes()).thenReturn(attributes);
+ when(attributes.getDataPolicy()).thenReturn(dataPolicy);
}
@Test
@@ -255,4 +269,41 @@
verify(this.errorResponseMessage).send(this.serverConnection);
}
+ @Test
+ public void shouldSetPossibleDuplicateReturnsTrueIfConcurrencyChecksNotEnabled() {
+
+ when(attributes.getConcurrencyChecksEnabled()).thenReturn(false);
+
+ assertThat(put65.shouldSetPossibleDuplicate(localRegion, clientEvent)).isTrue();
+ }
+
+ @Test
+ public void shouldSetPossibleDuplicateReturnsTrueIfRecoveredVersionTagForRetriedOperation() {
+ Put65 spy = Mockito.spy(put65);
+ when(attributes.getConcurrencyChecksEnabled()).thenReturn(true);
+ doReturn(true).when(spy).recoverVersionTagForRetriedOperation(clientEvent);
+
+ assertThat(spy.shouldSetPossibleDuplicate(localRegion, clientEvent)).isTrue();
+ }
+
+ @Test
+ public void shouldSetPossibleDuplicateReturnsFalseIfNotRecoveredVersionTagAndNoPersistence() {
+ Put65 spy = Mockito.spy(put65);
+ when(attributes.getConcurrencyChecksEnabled()).thenReturn(true);
+ when(dataPolicy.withPersistence()).thenReturn(false);
+ doReturn(false).when(spy).recoverVersionTagForRetriedOperation(clientEvent);
+
+ assertThat(spy.shouldSetPossibleDuplicate(localRegion, clientEvent)).isFalse();
+ }
+
+ @Test
+ public void shouldSetPossibleDuplicateReturnsTrueIfNotRecoveredVersionTagAndWithPersistence() {
+ Put65 spy = Mockito.spy(put65);
+ when(attributes.getConcurrencyChecksEnabled()).thenReturn(true);
+ when(dataPolicy.withPersistence()).thenReturn(true);
+ doReturn(false).when(spy).recoverVersionTagForRetriedOperation(clientEvent);
+
+ assertThat(spy.shouldSetPossibleDuplicate(localRegion, clientEvent)).isTrue();
+ }
+
}