GEODE-8672: No need in token mode if concurrencyChecksEnabled (#5691)

  * The DESTROYED token is only needed to prevent concurrent destroy op
    is lost in GII. If concurrency checks are enabled, the version tag
    should be able to prevent the destroy op being lost.

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationDistributedTest.java
index 3592003..605547b 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationDistributedTest.java
@@ -39,6 +39,7 @@
 import static org.mockito.Mockito.spy;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -50,9 +51,14 @@
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -70,13 +76,16 @@
 import org.apache.geode.cache.CacheLoader;
 import org.apache.geode.cache.CacheLoaderException;
 import org.apache.geode.cache.CacheWriterException;
+import org.apache.geode.cache.CommitConflictException;
 import org.apache.geode.cache.DiskStoreFactory;
 import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.EntryNotFoundException;
 import org.apache.geode.cache.EvictionAttributes;
 import org.apache.geode.cache.LoaderHelper;
 import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.TransactionDataRebalancedException;
 import org.apache.geode.cache.asyncqueue.AsyncEvent;
 import org.apache.geode.cache.asyncqueue.AsyncEventListener;
 import org.apache.geode.cache.control.RebalanceOperation;
@@ -98,10 +107,14 @@
 import org.apache.geode.internal.cache.PRHARedundancyProvider;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.PartitionedRegionDataStore;
+import org.apache.geode.internal.cache.RegionEntry;
 import org.apache.geode.internal.cache.SignalBounceOnRequestImageMessageObserver;
+import org.apache.geode.internal.cache.TXManagerImpl;
 import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceObserverAdapter;
 import org.apache.geode.internal.cache.partitioned.BucketCountLoadProbe;
 import org.apache.geode.internal.cache.partitioned.LoadProbe;
+import org.apache.geode.internal.cache.versions.VersionStamp;
+import org.apache.geode.internal.cache.versions.VersionTag;
 import org.apache.geode.test.awaitility.GeodeAwaitility;
 import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.SerializableRunnable;
@@ -125,6 +138,12 @@
 
   private static final long TIMEOUT_SECONDS = GeodeAwaitility.getTimeout().getSeconds();
 
+  private VM vm0 = getVM(0);
+  private VM vm1 = getVM(1);
+  private VM vm2 = getVM(2);
+  private VM vm3 = getVM(3);
+  private boolean toSetBucketNumber = false;
+
   @Rule
   public DistributedRestoreSystemProperties restoreSystemProperties =
       new DistributedRestoreSystemProperties();
@@ -144,9 +163,6 @@
   @Parameters({"true", "false"})
   @TestCaseName("{method}(simulate={0})")
   public void testRecoverRedundancy(boolean simulate) {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-
     // Create the region in only 1 VM
     vm0.invoke(() -> createPartitionedRegion("region1", 1));
 
@@ -215,9 +231,6 @@
   @Parameters({"true", "false"})
   @TestCaseName("{method}(simulate={0})")
   public void testEnforceIP(boolean simulate) {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-
     for (VM vm : toArray(vm0, vm1)) {
       vm.invoke(() -> {
         Properties props = new Properties();
@@ -286,10 +299,6 @@
   @Parameters({"true", "false"})
   @TestCaseName("{method}(simulate={0})")
   public void testEnforceZone(boolean simulate) {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-    VM vm2 = getVM(2);
-
     vm0.invoke(() -> setRedundancyZone("A"));
     vm1.invoke(() -> setRedundancyZone("A"));
 
@@ -371,10 +380,6 @@
 
   @Test
   public void testEnforceZoneWithMultipleRegions() {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-    VM vm2 = getVM(2);
-
     vm0.invoke(() -> setRedundancyZone("A"));
     vm1.invoke(() -> setRedundancyZone("A"));
 
@@ -463,10 +468,6 @@
   @Parameters({"true", "false"})
   @TestCaseName("{method}(simulate={0})")
   public void testRecoverRedundancyBalancing(boolean simulate) {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-    VM vm2 = getVM(2);
-
     DistributedMember member1 = vm0.invoke(() -> {
       createPartitionedRegion("region1", 1, 200);
       return getCache().getDistributedSystem().getDistributedMember();
@@ -534,10 +535,6 @@
 
   @Test
   public void testRecoverRedundancyBalancingIfCreateBucketFails() {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-    VM vm2 = getVM(2);
-
     DistributedMember member1 = vm0.invoke(() -> {
       createPartitionedRegion("region1", 1, 100);
       return getCache().getDistributedSystem().getDistributedMember();
@@ -644,10 +641,6 @@
   @Parameters({"true", "false"})
   @TestCaseName("{method}(simulate={0})")
   public void testRecoverRedundancyColocatedRegions(boolean simulate) {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-    VM vm2 = getVM(2);
-
     DistributedMember member1 = vm0.invoke(() -> {
       createPartitionedRegion("region1", 1, 200);
       return getCache().getDistributedSystem().getDistributedMember();
@@ -751,10 +744,6 @@
           () -> System.setProperty(GeodeGlossary.GEMFIRE_PREFIX + "LOG_REBALANCE", "true"));
     }
 
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-    VM vm2 = getVM(2);
-
     DistributedMember member1 = vm0.invoke(() -> {
       createPRRegionWithAsyncQueue(200);
       return getCache().getDistributedSystem().getDistributedMember();
@@ -857,9 +846,6 @@
 
   @Test
   public void testCancelOperation() {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-
     // Create the region in only 1 VM
     vm0.invoke(() -> createPartitionedRegion("region1", 1));
 
@@ -935,10 +921,6 @@
    */
   @Test
   public void testMembershipChange() {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-    VM vm2 = getVM(2);
-
     // Create the region in only 1 VM
     vm0.invoke(() -> createPartitionedRegion("region1", 0));
 
@@ -1018,9 +1000,6 @@
   @Parameters({"true", "false"})
   @TestCaseName("{method}(simulate={0})")
   public void testMoveBucketsNoRedundancy(boolean simulate) {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-
     // Create the region in only 1 VM
     vm0.invoke(() -> createPartitionedRegion("region1", new NullReturningLoader()));
 
@@ -1116,9 +1095,6 @@
   @Parameters({"true", "false"})
   @TestCaseName("{method}(simulate={0})")
   public void testFilterRegions(boolean simulate) {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-
     Set<String> included = new HashSet<>();
     included.add("region0");
     included.add("region1");
@@ -1238,10 +1214,6 @@
   @Parameters({"true", "false"})
   @TestCaseName("{method}(simulate={0})")
   public void testMoveBucketsWithRedundancy(boolean simulate) {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-    VM vm2 = getVM(2);
-
     // Create the region in two VMs
     vm0.invoke(() -> createPartitionedRegion("region1", 1));
     vm1.invoke(() -> createPartitionedRegion("region1", 1));
@@ -1332,11 +1304,6 @@
    */
   @Test
   public void testMoveBucketsOverflowToDisk() {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-    VM vm2 = getVM(2);
-    VM vm3 = getVM(3);
-
     // Create the region in two VMs
     vm0.invoke(
         () -> createPartitionedRegion("region1", createLRUEntryAttributes(1, OVERFLOW_TO_DISK)));
@@ -1501,10 +1468,6 @@
    */
   @Test
   public void testMoveBucketsNestedPR() {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-    VM vm2 = getVM(2);
-
     // Create the region in two VMs
     for (VM vm : toArray(vm0, vm1)) {
       vm.invoke(() -> {
@@ -1597,10 +1560,6 @@
   @Parameters({"true", "false"})
   @TestCaseName("{method}(simulate={0})")
   public void testMoveBucketsColocatedRegions(boolean simulate) {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-    VM vm2 = getVM(2);
-
     vm0.invoke(() -> createPartitionedRegion("region1", 1, 200));
     vm0.invoke(() -> createPartitionedRegion("region2", 200, "region1"));
     vm1.invoke(() -> createPartitionedRegion("region1", 1, 200));
@@ -1707,8 +1666,6 @@
   @Parameters({"PUT", "INVALIDATE", "DESTROY", "CACHE_LOADER"})
   @TestCaseName("{method}(operation={0})")
   public void runTestWaitForOperation(OperationEnum operation) throws Exception {
-    VM vm1 = getVM(1);
-
     // Create a region in this VM with a cache writer
     // and cache loader
     createPartitionedRegion("region1", 1, 100, (CacheLoader<Number, String>) helper1 -> "anobject");
@@ -1780,11 +1737,6 @@
   @TestCaseName("{method}(simulate={0}, userAccessor={1})")
   public void testRecoverRedundancyWithOfflinePersistence(boolean simulate, boolean useAccessor)
       throws Exception {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-    VM vm2 = getVM(2);
-    VM vm3 = getVM(3);
-
     int redundantCopies = 1;
 
     // Create the region in only 2 VMs
@@ -1967,9 +1919,6 @@
   @Parameters({"true", "false"})
   @TestCaseName("{method}(simulate={0})")
   public void testMoveBucketsWithUnrecoveredValues(boolean simulate) {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-
     // Create the region in only 1 VM
     vm0.invoke(() -> createPersistentPartitionedRegion("region1", "store", getDiskDirs(),
         new NullReturningLoader<>()));
@@ -2102,9 +2051,6 @@
   @Parameters({"true", "false"})
   @TestCaseName("{method}(simulate={0})")
   public void testBalanceBucketsByCount(boolean simulate) {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-
     // Cache is closed so loadProbeToRestore does not need to be restored
     vm0.invoke(
         () -> {
@@ -2356,6 +2302,9 @@
     partitionAttributesFactory.setRedundantCopies(redundantCopies);
     partitionAttributesFactory.setRecoveryDelay(-1);
     partitionAttributesFactory.setStartupRecoveryDelay(-1);
+    if (toSetBucketNumber) {
+      partitionAttributesFactory.setTotalNumBuckets(totalNumberOfBuckets);
+    }
 
     RegionFactory regionFactory = getCache().createRegionFactory(PARTITION);
     regionFactory.setPartitionAttributes(partitionAttributesFactory.create());
@@ -2834,4 +2783,180 @@
       closed = true;
     }
   }
+
+  private String regionName = "region";
+  private int numOfEntry = 2500;
+  private int totalNumberOfBuckets = 31;
+  private final ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
+
+  @Test
+  public void correctVersionGeneratedForConcurrentOperationsInTxWithRebalance() throws Exception {
+    toSetBucketNumber = true;
+    vm0.invoke(() -> createPartitionedRegion(regionName, 0));
+    vm0.invoke(() -> doPut("A"));
+    vm0.invoke(this::doDestroy);
+    vm0.invoke(() -> doPut("B"));
+
+    vm1.invoke(() -> createPartitionedRegion(regionName, 0));
+    doConcurrentOpsAndRebalance("C");
+    validateVersionsInVms(vm0, vm1);
+
+    vm2.invoke(() -> createPartitionedRegion(regionName, 0));
+    doConcurrentOpsAndRebalance("D");
+    validateVersionsInVms(vm0, vm1, vm2);
+
+    vm3.invoke(() -> createPartitionedRegion(regionName, 0));
+    doConcurrentOpsAndRebalance("E");
+    validateVersionsInVms(vm0, vm1, vm2, vm3);
+  }
+
+  private void doConcurrentOpsAndRebalance(String s) throws Exception {
+    AsyncInvocation async0 = vm0.invokeAsync(this::doConcurrentDestroyInTx);
+    AsyncInvocation async1 = vm1.invokeAsync(() -> doConcurrentPutInTx(s));
+    vm0.invoke(() -> doRebalance());
+    async0.await();
+    async1.await();
+  }
+
+  private void doRebalance() throws TimeoutException, InterruptedException {
+    InternalResourceManager manager = getCache().getInternalResourceManager();
+    doRebalance(false, manager);
+  }
+
+  private void doConcurrentPutInTx(String s) throws Exception {
+    for (int i = 0; i < totalNumberOfBuckets; i++) {
+      queue.add(i);
+    }
+
+    ExecutorService pool = Executors.newCachedThreadPool();
+    Collection<Callable<Object>> tasks = new ArrayList<>();
+    Callable<Object> task = () -> {
+      doPutOpInTx(s);
+      return null;
+    };
+    for (int i = 0; i < totalNumberOfBuckets; i++) {
+      tasks.add(task);
+    }
+
+    List<Future<Object>> futures = pool.invokeAll(tasks);
+    for (Future future : futures) {
+      future.get();
+    }
+  }
+
+  private void doConcurrentDestroyInTx() throws Exception {
+    for (int i = 0; i < totalNumberOfBuckets; i++) {
+      queue.add(i);
+    }
+
+    ExecutorService pool = Executors.newCachedThreadPool();
+    Collection<Callable<Object>> tasks = new ArrayList<>();
+    Callable<Object> task = () -> {
+      doDestroyOpInTx();
+      return null;
+    };
+    for (int i = 0; i < totalNumberOfBuckets; i++) {
+      tasks.add(task);
+    }
+
+    List<Future<Object>> futures = pool.invokeAll(tasks);
+    for (Future future : futures) {
+      future.get();
+    }
+  }
+
+  private void doPutOpInTx(String s) {
+    int bucket;
+    if (!queue.isEmpty()) {
+      bucket = queue.poll();
+      Region<Number, String> region = getCache().getRegion(regionName);
+      for (int i = 0; i < numOfEntry; i++) {
+        if (i % totalNumberOfBuckets == bucket) {
+          doTXPut(region, i, s);
+        }
+      }
+    }
+  }
+
+  private void doDestroyOpInTx() {
+    int bucket;
+    if (!queue.isEmpty()) {
+      bucket = queue.poll();
+      Region<Number, String> region = getCache().getRegion(regionName);
+      for (int i = 0; i < numOfEntry; i++) {
+        if (i % totalNumberOfBuckets == bucket) {
+          doTxDestroy(region, i);
+        }
+      }
+    }
+  }
+
+  private void doPut(String s) {
+    Region<Number, String> region = getCache().getRegion(regionName);
+    for (int i = 0; i < numOfEntry; i++) {
+      region.put(i, s);
+    }
+  }
+
+  private void doTXPut(Region<Number, String> region, int i, String s) {
+    TXManagerImpl manager = getCache().getTxManager();
+    manager.begin();
+    try {
+      region.put(i, s);
+      manager.commit();
+    } catch (TransactionDataRebalancedException e) {
+      if (manager.getTransactionId() != null) {
+        manager.rollback();
+      }
+    } catch (CommitConflictException ignore) {
+    }
+  }
+
+  private void doTxDestroy(Region<Number, String> region, int i) {
+    TXManagerImpl manager = getCache().getTxManager();
+    manager.begin();
+    try {
+      region.remove(i);
+      manager.commit();
+    } catch (TransactionDataRebalancedException e) {
+      if (manager.getTransactionId() != null) {
+        manager.rollback();
+      }
+    } catch (CommitConflictException ignore) {
+    }
+  }
+
+  private void doDestroy() {
+    Region<Number, String> region = getCache().getRegion(regionName);
+    for (int i = 0; i < numOfEntry; i++) {
+      try {
+        region.destroy(i);
+      } catch (EntryNotFoundException ignore) {
+      }
+    }
+  }
+
+  private void validateVersionsInVms(VM... vms) {
+    for (VM vm : vms) {
+      vm.invoke(this::validateEntryVersions);
+    }
+  }
+
+  private void validateEntryVersions() {
+    PartitionedRegion region = (PartitionedRegion) getCache().getRegion(regionName);
+    for (int i = 0; i < numOfEntry; i++) {
+      BucketRegion bucketRegion = region.getDataStore().getLocalBucketByKey(i);
+      if (bucketRegion != null) {
+        RegionEntry entry = bucketRegion.getRegionMap().getEntry(i);
+        if (entry != null) {
+          VersionStamp stamp = entry.getVersionStamp();
+          VersionTag tag = stamp.asVersionTag();
+          if (tag.getEntryVersion() < 3) {
+            logger.info("tag for key {} is " + tag, i);
+          }
+          assertThat(tag.getEntryVersion()).isGreaterThan(2);
+        }
+      }
+    }
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 61328f3..017329c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -6836,7 +6836,7 @@
     final boolean needRIDestroyToken = inRI && riCnt > 0;
 
     try {
-      final boolean inTokenMode = needTokensForGII || needRIDestroyToken;
+      final boolean inTokenMode = isInTokenModeNeeded(needTokensForGII, needRIDestroyToken);
       entries.txApplyDestroy(key, rmtOrigin, event, inTokenMode, needRIDestroyToken, op,
           eventId, aCallbackArgument, pendingCallbacks, filterRoutingInfo, bridgeContext,
           isOriginRemote, txEntryState, versionTag, tailKey);
@@ -6847,6 +6847,10 @@
     }
   }
 
+  boolean isInTokenModeNeeded(boolean needTokensForGII, boolean needRIDestroyToken) {
+    return !getConcurrencyChecksEnabled() && (needTokensForGII || needRIDestroyToken);
+  }
+
   /**
    * Called by lower levels, while still holding the write sync lock, and the low level has
    * completed its part of the basic destroy
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/LocalRegionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/LocalRegionTest.java
index dd9cce1..60e8454 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/LocalRegionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/LocalRegionTest.java
@@ -277,4 +277,45 @@
 
     assertThat(region.isGenerateLocalFilterRoutingNeeded(event)).isFalse();
   }
+
+  @Test
+  public void isInTokenModeNeededReturnsFalseIfConcurrencyChecksEnabled() {
+    LocalRegion region =
+        spy(new LocalRegion("region", regionAttributes, null, cache, internalRegionArguments,
+            internalDataView, regionMapConstructor, serverRegionProxyConstructor, entryEventFactory,
+            poolFinder, regionPerfStatsFactory, disabledClock()));
+    doReturn(true).when(region).getConcurrencyChecksEnabled();
+
+    assertThat(region.isInTokenModeNeeded(true, true)).isFalse();
+  }
+
+  @Test
+  public void isInTokenModeNeededReturnsFalseIfBothNeedTokensForGIIAndNeedRIDestroyTokenAreFalse() {
+    LocalRegion region =
+        spy(new LocalRegion("region", regionAttributes, null, cache, internalRegionArguments,
+            internalDataView, regionMapConstructor, serverRegionProxyConstructor, entryEventFactory,
+            poolFinder, regionPerfStatsFactory, disabledClock()));
+
+    assertThat(region.isInTokenModeNeeded(false, false)).isFalse();
+  }
+
+  @Test
+  public void isInTokenModeNeededReturnsTrueIfConcurrencyChecksNotEnabledAndNeedTokensForGII() {
+    LocalRegion region =
+        spy(new LocalRegion("region", regionAttributes, null, cache, internalRegionArguments,
+            internalDataView, regionMapConstructor, serverRegionProxyConstructor, entryEventFactory,
+            poolFinder, regionPerfStatsFactory, disabledClock()));
+
+    assertThat(region.isInTokenModeNeeded(true, false)).isTrue();
+  }
+
+  @Test
+  public void isInTokenModeNeededReturnsTrueIfConcurrencyChecksNotEnabledAndNeedRIDestroyToken() {
+    LocalRegion region =
+        spy(new LocalRegion("region", regionAttributes, null, cache, internalRegionArguments,
+            internalDataView, regionMapConstructor, serverRegionProxyConstructor, entryEventFactory,
+            poolFinder, regionPerfStatsFactory, disabledClock()));
+
+    assertThat(region.isInTokenModeNeeded(false, true)).isTrue();
+  }
 }