GEODE-8672: No need in token mode if concurrencyChecksEnabled (#5746)

  * The DESTROYED token is only needed to prevent concurrent destroy op
    is lost in GII. If concurrency checks are enabled, the version tag
    should be able to prevent the destroy op being lost.
  * Correctly set the inTokenMode to avoid a potential hang in replicate
    region as invokeTXCallbacks needs to wait for the region initialization
    finished, but region intialization could not due to lock held by
    threads invoking callbacks.
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/TransactionsWithGIIDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/TransactionsWithGIIDistributedTest.java
new file mode 100644
index 0000000..33a40a2
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/TransactionsWithGIIDistributedTest.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.geode.cache.RegionShortcut.PARTITION;
+import static org.apache.geode.cache.RegionShortcut.REPLICATE;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.CommitConflictException;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.TransactionDataRebalancedException;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.internal.cache.control.InternalResourceManager;
+import org.apache.geode.internal.cache.versions.VersionStamp;
+import org.apache.geode.internal.cache.versions.VersionTag;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+
+public class TransactionsWithGIIDistributedTest implements Serializable {
+  private static final Logger logger = LogService.getLogger();
+  private static final long TIMEOUT_SECONDS = GeodeAwaitility.getTimeout().getSeconds();
+
+  private VM vm0;
+  private VM vm1;
+  private VM vm2;
+  private VM vm3;
+  private String regionName = "region";
+  private int numOfEntry = 2500;
+  private int workers = 31;
+  private final ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
+
+  @Rule
+  public CacheRule cacheRule = new CacheRule();
+
+  @Before
+  public void setUp() {
+    vm0 = getVM(0);
+    vm1 = getVM(1);
+    vm2 = getVM(2);
+    vm3 = getVM(3);
+  }
+
+  @Test
+  public void correctVersionGeneratedForConcurrentOperationsInTxWithRebalance() throws Exception {
+    vm0.invoke(() -> createPartitionedRegion(regionName));
+    vm0.invoke(() -> doPut("A"));
+    vm0.invoke(this::doDestroy);
+    vm0.invoke(() -> doPut("B"));
+
+    vm1.invoke(() -> createPartitionedRegion(regionName));
+    doConcurrentOpsAndRebalance("C", true);
+    validateVersionsInVms(true, vm0, vm1);
+
+    vm2.invoke(() -> createPartitionedRegion(regionName));
+    doConcurrentOpsAndRebalance("D", true);
+    validateVersionsInVms(true, vm0, vm1, vm2);
+
+    vm3.invoke(() -> createPartitionedRegion(regionName));
+    doConcurrentOpsAndRebalance("E", true);
+    validateVersionsInVms(true, vm0, vm1, vm2, vm3);
+  }
+
+  private void createPartitionedRegion(String regionName) {
+    PartitionAttributesFactory partitionAttributesFactory = new PartitionAttributesFactory();
+    partitionAttributesFactory.setRedundantCopies(0);
+    partitionAttributesFactory.setRecoveryDelay(-1);
+    partitionAttributesFactory.setStartupRecoveryDelay(-1);
+    partitionAttributesFactory.setTotalNumBuckets(workers);
+
+    RegionFactory regionFactory = cacheRule.getOrCreateCache().createRegionFactory(PARTITION);
+    regionFactory.setPartitionAttributes(partitionAttributesFactory.create());
+
+    regionFactory.create(regionName);
+  }
+
+  private void doConcurrentOpsAndRebalance(String s, boolean doRebalance) throws Exception {
+    AsyncInvocation async0 = vm0.invokeAsync(this::doConcurrentDestroyInTx);
+    AsyncInvocation async1;
+    if (doRebalance) {
+      async1 = vm1.invokeAsync(() -> doConcurrentPutInTx(s));
+    } else {
+      async1 = vm0.invokeAsync(() -> doConcurrentPutInTx(s));
+    }
+    if (doRebalance) {
+      vm0.invoke(this::doRebalance);
+    }
+    async0.await();
+    async1.await();
+  }
+
+  private void doRebalance() throws TimeoutException, InterruptedException {
+    InternalResourceManager manager = cacheRule.getCache().getInternalResourceManager();
+    manager.createRebalanceFactory().includeRegions(null).excludeRegions(null)
+        .start().getResults(TIMEOUT_SECONDS, SECONDS);
+
+    assertThat(manager.getRebalanceOperations()).isEmpty();
+  }
+
+  private void doConcurrentPutInTx(String s) throws Exception {
+    for (int i = 0; i < workers; i++) {
+      queue.add(i);
+    }
+
+    ExecutorService pool = Executors.newCachedThreadPool();
+    Collection<Callable<Object>> tasks = new ArrayList<>();
+    Callable<Object> task = () -> {
+      doPutOpInTx(s);
+      return null;
+    };
+    for (int i = 0; i < workers; i++) {
+      tasks.add(task);
+    }
+
+    List<Future<Object>> futures = pool.invokeAll(tasks);
+    for (Future future : futures) {
+      future.get();
+    }
+  }
+
+  private void doConcurrentDestroyInTx() throws Exception {
+    for (int i = 0; i < workers; i++) {
+      queue.add(i);
+    }
+
+    ExecutorService pool = Executors.newCachedThreadPool();
+    Collection<Callable<Object>> tasks = new ArrayList<>();
+    Callable<Object> task = () -> {
+      doDestroyOpInTx();
+      return null;
+    };
+    for (int i = 0; i < workers; i++) {
+      tasks.add(task);
+    }
+
+    List<Future<Object>> futures = pool.invokeAll(tasks);
+    for (Future future : futures) {
+      future.get();
+    }
+  }
+
+  private void doPutOpInTx(String s) {
+    int worker;
+    if (!queue.isEmpty()) {
+      worker = queue.poll();
+      Region<Number, String> region = cacheRule.getCache().getRegion(regionName);
+      for (int i = 0; i < numOfEntry; i++) {
+        if (i % workers == worker) {
+          doTXPut(region, i, s);
+        }
+      }
+    }
+  }
+
+  private void doDestroyOpInTx() {
+    int worker;
+    if (!queue.isEmpty()) {
+      worker = queue.poll();
+      Region<Number, String> region = cacheRule.getCache().getRegion(regionName);
+      for (int i = 0; i < numOfEntry; i++) {
+        if (i % workers == worker) {
+          doTxDestroy(region, i);
+        }
+      }
+    }
+  }
+
+  private void doPut(String s) {
+    Region<Number, String> region = cacheRule.getCache().getRegion(regionName);
+    for (int i = 0; i < numOfEntry; i++) {
+      region.put(i, s);
+    }
+  }
+
+  private void doTXPut(Region<Number, String> region, int i, String s) {
+    TXManagerImpl manager = cacheRule.getCache().getTxManager();
+    manager.begin();
+    try {
+      region.put(i, s);
+      manager.commit();
+    } catch (TransactionDataRebalancedException e) {
+      if (manager.getTransactionId() != null) {
+        manager.rollback();
+      }
+    } catch (CommitConflictException ignore) {
+    }
+  }
+
+  private void doTxDestroy(Region<Number, String> region, int i) {
+    TXManagerImpl manager = cacheRule.getCache().getTxManager();
+    manager.begin();
+    try {
+      region.remove(i);
+      manager.commit();
+    } catch (TransactionDataRebalancedException e) {
+      if (manager.getTransactionId() != null) {
+        manager.rollback();
+      }
+    } catch (CommitConflictException ignore) {
+    }
+  }
+
+  private void doDestroy() {
+    Region<Number, String> region = cacheRule.getCache().getRegion(regionName);
+    for (int i = 0; i < numOfEntry; i++) {
+      try {
+        region.destroy(i);
+      } catch (EntryNotFoundException ignore) {
+      }
+    }
+  }
+
+  private void validateVersionsInVms(boolean isPartition, VM... vms) {
+    for (VM vm : vms) {
+      if (isPartition) {
+        vm.invoke(this::validateEntryVersions);
+      } else {
+        vm.invoke(this::validateEntryVersionsInReplicateRegion);
+      }
+    }
+  }
+
+  private void validateEntryVersions() {
+    PartitionedRegion region = (PartitionedRegion) cacheRule.getCache().getRegion(regionName);
+    for (int i = 0; i < numOfEntry; i++) {
+      BucketRegion bucketRegion = region.getDataStore().getLocalBucketByKey(i);
+      if (bucketRegion != null) {
+        RegionEntry entry = bucketRegion.getRegionMap().getEntry(i);
+        validateVersion(entry);
+      }
+    }
+  }
+
+  @Test
+  public void correctVersionGeneratedForConcurrentOperationsInTxInReplicateRegion()
+      throws Exception {
+    vm0.invoke(() -> createRegion(regionName));
+    vm0.invoke(() -> doPut("A"));
+    vm0.invoke(this::doDestroy);
+    vm0.invoke(() -> doPut("B"));
+    vm1.invoke(() -> {
+      cacheRule.getOrCreateCache();
+    });
+
+    AsyncInvocation async = vm1.invokeAsync(() -> createRegion(regionName));
+    doConcurrentOpsAndRebalance("C", false);
+    async.await();
+    validateVersionsInVms(false, vm0, vm1);
+  }
+
+  private void createRegion(String regionName) {
+    RegionFactory<Integer, String> regionFactory =
+        cacheRule.getOrCreateCache().createRegionFactory(REPLICATE);
+    CacheListenerAdapter<Integer, String> listener = new CacheListenerAdapter<Integer, String>() {
+      @Override
+      public void afterDestroy(EntryEvent<Integer, String> e) {
+        assertThat(e.getOperation().isEntry()).isTrue();
+      }
+    };
+    regionFactory.addCacheListener(listener).create(regionName);
+  }
+
+  private void validateEntryVersionsInReplicateRegion() {
+    LocalRegion region = uncheckedCast(cacheRule.getCache().getRegion(regionName));
+    for (int i = 0; i < numOfEntry; i++) {
+      RegionEntry entry = region.getRegionMap().getEntry(i);
+      validateVersion(entry);
+    }
+  }
+
+  private void validateVersion(RegionEntry entry) {
+    if (entry != null) {
+      VersionStamp stamp = entry.getVersionStamp();
+      VersionTag tag = stamp.asVersionTag();
+      if (tag.getEntryVersion() < 3) {
+        logger.info("tag for key {} is " + tag, entry.getKey());
+      }
+      assertThat(tag.getEntryVersion()).isGreaterThan(2);
+    }
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
index 6e1dd75..c29e8c9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
@@ -981,6 +981,7 @@
     final LocalRegion owner = _getOwner();
 
     final boolean isRegionReady = !inTokenMode;
+    inTokenMode = isInTokenModeNeeded(owner, inTokenMode);
     final boolean hasRemoteOrigin = !txId.getMemberId().equals(owner.getMyId());
     boolean callbackEventAddedToPending = false;
     IndexManager oqlIndexManager = owner.getIndexManager();
@@ -1080,10 +1081,9 @@
             oqlIndexManager.countDownIndexUpdaters();
           }
         }
-      } else if (inTokenMode || owner.getConcurrencyChecksEnabled()) {
+      } else if (!isRegionReady || owner.getConcurrencyChecksEnabled()) {
         // treating tokenMode and re == null as same, since we now want to
         // generate versions and Tombstones for destroys
-        boolean dispatchListenerEvent = inTokenMode;
         boolean opCompleted = false;
         RegionEntry newRe = getEntryFactory().createEntry(owner, key, Token.REMOVED_PHASE1);
         if (oqlIndexManager != null) {
@@ -1251,6 +1251,10 @@
     }
   }
 
+  boolean isInTokenModeNeeded(LocalRegion owner, boolean inTokenMode) {
+    return !owner.getConcurrencyChecksEnabled() && inTokenMode;
+  }
+
   void releaseEvent(final EntryEventImpl event) {
     event.release();
   }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java
index 0625d68..987effc 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java
@@ -1534,6 +1534,33 @@
     verify(arm._getOwner()).unlockWhenRegionIsInitializing();
   }
 
+  @Test
+  public void isInTokenModeNeededReturnsFalseIfConcurrencyChecksEnabled() {
+    TestableAbstractRegionMap arm = new TestableAbstractRegionMap(true, true,
+        mock(ConcurrentMapWithReusableEntries.class), mock(RegionEntryFactory.class),
+        mock(RegionEntry.class));
+
+    assertThat(arm.isInTokenModeNeeded(arm._getOwner(), true)).isFalse();
+  }
+
+  @Test
+  public void isInTokenModeNeededReturnsFalseIfInTokenModeIsFalse() {
+    TestableAbstractRegionMap arm = new TestableAbstractRegionMap(false, true,
+        mock(ConcurrentMapWithReusableEntries.class), mock(RegionEntryFactory.class),
+        mock(RegionEntry.class));
+
+    assertThat(arm.isInTokenModeNeeded(arm._getOwner(), false)).isFalse();
+  }
+
+  @Test
+  public void isInTokenModeNeededReturnsTrueIfConcurrencyChecksNotEnabledAndInTokenMode() {
+    TestableAbstractRegionMap arm = new TestableAbstractRegionMap(false, true,
+        mock(ConcurrentMapWithReusableEntries.class), mock(RegionEntryFactory.class),
+        mock(RegionEntry.class));
+
+    assertThat(arm.isInTokenModeNeeded(arm._getOwner(), true)).isTrue();
+  }
+
   private static class TxNoRegionEntryTestableAbstractRegionMap
       extends TxTestableAbstractRegionMap {
     @Override
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTxApplyDestroyTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTxApplyDestroyTest.java
index fe41ca6..19c081e 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTxApplyDestroyTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTxApplyDestroyTest.java
@@ -240,7 +240,7 @@
   @Test
   public void txApplyDestroyHasNoPendingCallback_givenExistingRegionEntryWithInTokenModeAndNotInRI() {
     givenLocalRegion();
-    givenConcurrencyChecks();
+    givenNoConcurrencyChecks();
     givenExistingRegionEntry();
     this.inTokenMode = true;
     this.inRI = false;
@@ -351,7 +351,7 @@
   public void txApplyDestroySetsValueToDestroyToken_givenExistingRegionEntryThatIsValidWithInTokenMode()
       throws Exception {
     givenLocalRegion();
-    givenConcurrencyChecks();
+    givenNoConcurrencyChecks();
     givenExistingRegionEntry();
     inTokenMode = true;
 
@@ -364,7 +364,7 @@
   public void txApplyDestroyHandlesClear_givenExistingRegionEntryThatIsValidWithInTokenModeAndSetValueThrowsRegionClearedException()
       throws Exception {
     givenLocalRegion();
-    givenConcurrencyChecks();
+    givenNoConcurrencyChecks();
     givenExistingRegionEntry();
     inTokenMode = true;
     doThrow(RegionClearedException.class).when(existingRegionEntry).setValue(same(owner),
@@ -393,7 +393,7 @@
   @Test
   public void txApplyDestroyCallsUnscheduleTombstone_givenExistingRegionEntryThatIsTombstoneWithInTokenMode() {
     givenLocalRegion();
-    givenConcurrencyChecks();
+    givenNoConcurrencyChecks();
     givenExistingRegionEntry();
     inTokenMode = true;
     when(existingRegionEntry.getValueInVM(owner)).thenReturn(Token.TOMBSTONE);
@@ -544,13 +544,28 @@
     doTxApplyDestroy();
 
     assertThat(pendingCallbacks).hasSize(1);
-    verify(owner, times(1)).txApplyDestroyPart2(same(existingRegionEntry), eq(key), eq(inTokenMode),
+    verify(owner, times(1)).txApplyDestroyPart2(same(existingRegionEntry), eq(key),
+        eq(!inTokenMode),
         eq(false), eq(false));
   }
 
   @Test
   public void txApplyDestroyHasPendingCallback_givenExistingRemovedRegionEntryWithInTokenModeAndInRI() {
     givenLocalRegion();
+    givenNoConcurrencyChecks();
+    givenExistingRemovedRegionEntry();
+    inTokenMode = true;
+    inRI = true;
+
+    doTxApplyDestroy();
+
+    verify(owner, times(1)).txApplyDestroyPart2(same(existingRegionEntry), eq(key), eq(inTokenMode),
+        eq(false), eq(true));
+  }
+
+  @Test
+  public void txApplyDestroyHasPendingCallback_givenExistingRemovedRegionEntryWithInTokenModeAndInRIWithConcurrencyCheck() {
+    givenLocalRegion();
     givenConcurrencyChecks();
     givenExistingRemovedRegionEntry();
     inTokenMode = true;
@@ -559,7 +574,8 @@
     doTxApplyDestroy();
 
     assertThat(pendingCallbacks).hasSize(1);
-    verify(owner, times(1)).txApplyDestroyPart2(same(existingRegionEntry), eq(key), eq(inTokenMode),
+    verify(owner, times(1)).txApplyDestroyPart2(same(existingRegionEntry), eq(key),
+        eq(!inTokenMode),
         eq(false), eq(true));
   }
 
@@ -1095,7 +1111,7 @@
   @Test
   public void txApplyDestroyRetries_givenOldRegionEntryWithRemovedPhase2() {
     givenLocalRegion();
-    givenConcurrencyChecks();
+    givenNoConcurrencyChecks();
     givenOldRegionEntry();
     inTokenMode = true;
     when(oldRegionEntry.isRemovedPhase2()).thenReturn(true).thenReturn(false);
@@ -1432,7 +1448,7 @@
 
     doTxApplyDestroy();
 
-    verify(owner, times(1)).txApplyDestroyPart2(same(oldRegionEntry), eq(key), eq(inTokenMode),
+    verify(owner, times(1)).txApplyDestroyPart2(same(oldRegionEntry), eq(key), eq(!inTokenMode),
         eq(false), eq(false));
   }
 
@@ -1445,7 +1461,7 @@
 
     doTxApplyDestroy();
 
-    verify(owner, times(1)).txApplyDestroyPart2(same(oldRegionEntry), eq(key), eq(inTokenMode),
+    verify(owner, times(1)).txApplyDestroyPart2(same(oldRegionEntry), eq(key), eq(!inTokenMode),
         eq(false), eq(true));
   }
 
@@ -1517,7 +1533,7 @@
   public void txApplyDestroyNeverCallsRemoveEntry_givenOldRegionEntryAndInTokenMode()
       throws Exception {
     givenLocalRegion();
-    givenConcurrencyChecks();
+    givenNoConcurrencyChecks();
     givenOldRegionEntry();
     inTokenMode = true;