GEODE-7273: Able to detect not colocated transaction (#4170)

* GEODE-7273: Able to detect not colocated transaction

  * Able to detect not colocated transaction if it is caused by the first operation on a
    replicate region and then on a partitioned region.
  * Make sure transaction host can detect this and throw TransactionDataNotColocatedException.
  * Transaction host will throw appropriate TransactionException based on the operations
    exectued.
  * Remote non host will rely on the TransactionException thrown from the tx host.
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerNotColocatedTransactionDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerNotColocatedTransactionDistributedTest.java
index 906a47c..d55b31e 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerNotColocatedTransactionDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerNotColocatedTransactionDistributedTest.java
@@ -34,8 +34,10 @@
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.cache.TransactionDataNotColocatedException;
+import org.apache.geode.cache.TransactionException;
 import org.apache.geode.cache.client.ClientRegionFactory;
 import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.Pool;
 import org.apache.geode.cache.client.PoolFactory;
 import org.apache.geode.cache.client.PoolManager;
 import org.apache.geode.cache.client.internal.PoolImpl;
@@ -50,6 +52,7 @@
   private String hostName;
   private String uniqueName;
   private String regionName;
+  private String replicateRegionName;
   private VM server1;
   private VM server2;
 
@@ -73,12 +76,13 @@
     hostName = getHostName();
     uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName();
     regionName = uniqueName + "_region";
+    replicateRegionName = uniqueName + "_replicateRegion";
   }
 
   @Test
-  public void getEntryOnRemoteNodeInATransactionThrowsTransactionDataNotColocatedException() {
+  public void getOnRemoteNodeInATransactionThrowsTransactionDataNotColocatedException() {
     int initialPuts = 4;
-    setupClientAndServerForMultipleTransactions(initialPuts);
+    setupClientAndServer(initialPuts);
     TXManagerImpl txManager =
         (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
     Region<Integer, Integer> region = clientCacheRule.getClientCache().getRegion(regionName);
@@ -91,7 +95,7 @@
     }
   }
 
-  private void setupClientAndServerForMultipleTransactions(int initialPuts) {
+  private void setupClientAndServer(int initialPuts) {
     int port1 = server1.invoke(() -> createServerRegion(2, false, 0));
     int port2 = server2.invoke(() -> createServerRegion(2, false, 0));
     server1.invoke(() -> doPuts(initialPuts));
@@ -150,7 +154,7 @@
   @Test
   public void getAllOnMultipleNodesInATransactionThrowsTransactionDataNotColocatedException() {
     int initialPuts = 4;
-    setupClientAndServerForMultipleTransactions(initialPuts);
+    setupClientAndServer(initialPuts);
     TXManagerImpl txManager =
         (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
     Region<Integer, Integer> region = clientCacheRule.getClientCache().getRegion(regionName);
@@ -174,7 +178,7 @@
   @Test
   public void putOnRemoteNodeInATransactionThrowsTransactionDataNotColocatedException() {
     int initialPuts = 4;
-    setupClientAndServerForMultipleTransactions(initialPuts);
+    setupClientAndServer(initialPuts);
     TXManagerImpl txManager =
         (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
     Region<Integer, Integer> region = clientCacheRule.getClientCache().getRegion(regionName);
@@ -196,7 +200,7 @@
   @Test
   public void putAllOnMultipleNodesInATransactionThrowsTransactionDataNotColocatedException() {
     int initialPuts = 4;
-    setupClientAndServerForMultipleTransactions(initialPuts);
+    setupClientAndServer(initialPuts);
     TXManagerImpl txManager =
         (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
     Region<Integer, Integer> region = clientCacheRule.getClientCache().getRegion(regionName);
@@ -216,4 +220,273 @@
     }
     region.putAll(map);
   }
+
+  @Test
+  public void invalidateOnRemoteNodeInATransactionThrowsTransactionDataNotColocatedException() {
+    int initialPuts = 4;
+    setupClientAndServer(initialPuts);
+    TXManagerImpl txManager =
+        (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
+    Region<Integer, Integer> region = clientCacheRule.getClientCache().getRegion(regionName);
+    txManager.begin();
+    try {
+      Throwable caughtException =
+          catchThrowable(() -> doTransactionalInvalidates(initialPuts, region));
+      assertThat(caughtException).isInstanceOf(TransactionDataNotColocatedException.class);
+    } finally {
+      txManager.commit();
+    }
+  }
+
+  private void doTransactionalInvalidates(int initialPuts, Region<Integer, Integer> region) {
+    for (int i = 1; i < initialPuts; i++) {
+      region.invalidate(i);
+    }
+  }
+
+  @Test
+  public void destroyOnRemoteNodeInATransactionThrowsTransactionDataNotColocatedException() {
+    int initialPuts = 4;
+    setupClientAndServer(initialPuts);
+    TXManagerImpl txManager =
+        (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
+    Region<Integer, Integer> region = clientCacheRule.getClientCache().getRegion(regionName);
+    txManager.begin();
+    try {
+      Throwable caughtException =
+          catchThrowable(() -> doTransactionalDestroys(initialPuts, region));
+      assertThat(caughtException).isInstanceOf(TransactionDataNotColocatedException.class);
+    } finally {
+      txManager.commit();
+    }
+  }
+
+  private void doTransactionalDestroys(int initialPuts, Region<Integer, Integer> region) {
+    for (int i = 1; i < initialPuts; i++) {
+      region.destroy(i);
+    }
+  }
+
+  @Test
+  public void getEntryOnRemoteNodeInATransactionThrowsTransactionDataNotColocatedException() {
+    int initialPuts = 4;
+    setupClientAndServer(initialPuts);
+    TXManagerImpl txManager =
+        (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
+    Region<Integer, Integer> region = clientCacheRule.getClientCache().getRegion(regionName);
+    txManager.begin();
+    try {
+      Throwable caughtException =
+          catchThrowable(() -> doTransactionalGetEntries(initialPuts, region));
+      assertThat(caughtException).isInstanceOf(TransactionDataNotColocatedException.class);
+    } finally {
+      txManager.commit();
+    }
+  }
+
+  private void doTransactionalGetEntries(int initialPuts, Region<Integer, Integer> region) {
+    for (int i = 1; i < initialPuts; i++) {
+      region.getEntry(i);
+    }
+  }
+
+  @Test
+  public void containKeyOnRemoteNodeInATransactionThrowsTransactionDataNotColocatedException() {
+    int initialPuts = 4;
+    setupClientAndServer(initialPuts);
+    TXManagerImpl txManager =
+        (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
+    Region<Integer, Integer> region = clientCacheRule.getClientCache().getRegion(regionName);
+    txManager.begin();
+    try {
+      Throwable caughtException =
+          catchThrowable(() -> doTransactionalContainsKey(initialPuts, region));
+      assertThat(caughtException).isInstanceOf(TransactionDataNotColocatedException.class);
+    } finally {
+      txManager.commit();
+    }
+  }
+
+  private void doTransactionalContainsKey(int initialPuts, Region<Integer, Integer> region) {
+    for (int i = 1; i < initialPuts; i++) {
+      region.containsKey(i);
+    }
+  }
+
+  @Test
+  public void containsValueForKeyOnRemoteNodeInATransactionThrowsTransactionDataNotColocatedException() {
+    int initialPuts = 4;
+    setupClientAndServer(initialPuts);
+    TXManagerImpl txManager =
+        (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
+    Region<Integer, Integer> region = clientCacheRule.getClientCache().getRegion(regionName);
+    txManager.begin();
+    try {
+      Throwable caughtException =
+          catchThrowable(() -> doTransactionalContainsValueForKey(initialPuts, region));
+      assertThat(caughtException).isInstanceOf(TransactionDataNotColocatedException.class);
+    } finally {
+      txManager.commit();
+    }
+  }
+
+  private void doTransactionalContainsValueForKey(int initialPuts,
+      Region<Integer, Integer> region) {
+    for (int i = 1; i < initialPuts; i++) {
+      region.containsValueForKey(i);
+    }
+  }
+
+  @Test
+  public void getOnReplicateRegionThenPartitionedRegionInATransactionThrowsTransactionDataNotColocatedException() {
+    int initialPuts = 4;
+    setupClientAndServerWithTwoRegions(initialPuts);
+    TXManagerImpl txManager =
+        (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
+    txManager.begin();
+    try {
+      // do first operation on replicate region
+      Region<Integer, Integer> replicateRegion =
+          clientCacheRule.getClientCache().getRegion(replicateRegionName);
+      replicateRegion.get(1);
+      Region<Integer, Integer> partitionedRegion =
+          clientCacheRule.getClientCache().getRegion(regionName);
+      try {
+        partitionedRegion.get(1);
+      } catch (TransactionException exception) {
+        assertThat(exception).isInstanceOf(TransactionDataNotColocatedException.class);
+      }
+    } finally {
+      txManager.commit();
+    }
+  }
+
+  private void doPutsInRegions(int numOfEntries) {
+    for (int i = 0; i <= numOfEntries; i++) {
+      cacheRule.getCache().getRegion(regionName).put(i, i);
+      cacheRule.getCache().getRegion(replicateRegionName).put(i, i);
+    }
+  }
+
+  private void setupClientAndServerWithTwoRegions(int initialPuts) {
+    int port1 = server1.invoke(() -> createServerRegion(2, false, 0));
+    server1.invoke(() -> createServerReplicateRegion());
+    int port2 = server2.invoke(() -> createServerRegion(2, false, 0));
+    server2.invoke(() -> createServerReplicateRegion());
+    server1.invoke(() -> doPutsInRegions(initialPuts));
+
+    createClientRegion(port1, port2);
+    createClientRegion(replicateRegionName);
+  }
+
+  private void createServerReplicateRegion() {
+    cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.REPLICATE)
+        .create(replicateRegionName);
+  }
+
+  private void createClientRegion(String regionName) {
+    Pool pool = clientCacheRule.getClientCache().getDefaultPool();
+    clientCacheRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.LOCAL)
+        .setPoolName(pool.getName())
+        .create(regionName);
+  }
+
+  @Test
+  public void putOnReplicateRegionThenPartitionedRegionInATransactionThrowsTransactionDataNotColocatedException() {
+    int initialPuts = 4;
+    setupClientAndServerWithTwoRegions(initialPuts);
+    TXManagerImpl txManager =
+        (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
+    txManager.begin();
+    try {
+      // do first operation on replicate region
+      Region<Integer, Integer> replicateRegion =
+          clientCacheRule.getClientCache().getRegion(replicateRegionName);
+      replicateRegion.put(1, 5);
+      Region<Integer, Integer> partitionedRegion =
+          clientCacheRule.getClientCache().getRegion(regionName);
+      try {
+        partitionedRegion.put(1, 6);
+      } catch (TransactionException exception) {
+        assertThat(exception).isInstanceOf(TransactionDataNotColocatedException.class);
+      }
+    } finally {
+      txManager.commit();
+    }
+  }
+
+  @Test
+  public void putAllOnReplicateRegionThenPartitionedRegionInATransactionThrowsTransactionDataNotColocatedException() {
+    int initialPuts = 4;
+    setupClientAndServerWithTwoRegions(initialPuts);
+    TXManagerImpl txManager =
+        (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
+    txManager.begin();
+    try {
+      // do first operation on replicate region
+      Region<Integer, Integer> replicateRegion =
+          clientCacheRule.getClientCache().getRegion(replicateRegionName);
+      HashMap<Integer, Integer> map = new HashMap();
+      map.put(1, 5);
+      replicateRegion.putAll(map);
+      Region<Integer, Integer> partitionedRegion =
+          clientCacheRule.getClientCache().getRegion(regionName);
+
+      try {
+        partitionedRegion.putAll(map);
+      } catch (TransactionException exception) {
+        assertThat(exception).isInstanceOf(TransactionDataNotColocatedException.class);
+      }
+    } finally {
+      txManager.commit();
+    }
+  }
+
+  @Test
+  public void invalidateOnReplicateRegionThenPartitionedRegionInATransactionThrowsTransactionDataNotColocatedException() {
+    int initialPuts = 4;
+    setupClientAndServerWithTwoRegions(initialPuts);
+    TXManagerImpl txManager =
+        (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
+    txManager.begin();
+    try {
+      // do first operation on replicate region
+      Region<Integer, Integer> replicateRegion =
+          clientCacheRule.getClientCache().getRegion(replicateRegionName);
+      replicateRegion.invalidate(1);
+      Region<Integer, Integer> partitionedRegion =
+          clientCacheRule.getClientCache().getRegion(regionName);
+      try {
+        partitionedRegion.invalidate(1);
+      } catch (TransactionException exception) {
+        assertThat(exception).isInstanceOf(TransactionDataNotColocatedException.class);
+      }
+    } finally {
+      txManager.commit();
+    }
+  }
+
+  @Test
+  public void destroyOnReplicateRegionThenPartitionedRegionInATransactionThrowsTransactionDataNotColocatedException() {
+    int initialPuts = 4;
+    setupClientAndServerWithTwoRegions(initialPuts);
+    TXManagerImpl txManager =
+        (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
+    txManager.begin();
+    try {
+      // do first operation on replicate region
+      Region<Integer, Integer> replicateRegion =
+          clientCacheRule.getClientCache().getRegion(replicateRegionName);
+      replicateRegion.destroy(1);
+      Region<Integer, Integer> partitionedRegion =
+          clientCacheRule.getClientCache().getRegion(regionName);
+      try {
+        partitionedRegion.destroy(1);
+      } catch (TransactionException exception) {
+        assertThat(exception).isInstanceOf(TransactionDataNotColocatedException.class);
+      }
+    } finally {
+      txManager.commit();
+    }
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index 725e621..83a3d6e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -1599,7 +1599,7 @@
   public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion, boolean updateStats,
       boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent,
       boolean returnTombstones, boolean retainResult, boolean createIfAbsent) {
-    TXEntryState tx = proxy.txReadEntry(keyInfo, localRegion, true, createIfAbsent);
+    TXEntryState tx = txReadEntry(keyInfo, localRegion, true, createIfAbsent);
     if (tx != null) {
       Object v = tx.getValue(keyInfo, localRegion, preferCD);
       if (!disableCopyOnRead) {
@@ -2152,4 +2152,8 @@
   boolean isClosed() {
     return closed;
   }
+
+  public boolean hasPerformedAnyOperation() {
+    return regions.size() != 0;
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
index a89265b..84fbbd5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
@@ -70,6 +70,8 @@
    */
   private Map<Integer, Boolean> buckets = new HashMap<Integer, Boolean>();
 
+  private boolean firstOperationOnPartitionedRegion = false;
+
   protected volatile TXStateInterface realDeal;
 
   protected boolean inProgress = true;
@@ -164,6 +166,11 @@
         logger.debug("Built a new TXState: {} me:{}", this.realDeal, this.txMgr.getDM().getId());
       }
     }
+    if (isRealDealLocal() && !((TXState) realDeal).hasPerformedAnyOperation()) {
+      if (r != null && (r instanceof PartitionedRegion || r.isUsedForPartitionedRegionBucket())) {
+        firstOperationOnPartitionedRegion = true;
+      }
+    }
     return this.realDeal;
   }
 
@@ -237,7 +244,7 @@
     }
   }
 
-  private TransactionException getTransactionException(KeyInfo keyInfo, GemFireException e) {
+  TransactionException getTransactionException(KeyInfo keyInfo, GemFireException e) {
     if (isRealDealLocal() && !buckets.isEmpty() && !buckets.containsKey(keyInfo.getBucketId())) {
       TransactionException ex = new TransactionDataNotColocatedException(
           String.format("Key %s is not colocated with transaction",
@@ -248,6 +255,14 @@
     Throwable ex = e;
     while (ex != null) {
       if (ex instanceof PrimaryBucketException) {
+        if (isRealDealLocal() && !firstOperationOnPartitionedRegion) {
+          return new TransactionDataNotColocatedException(
+              String.format(
+                  "Key %s is not colocated with transaction. First operation in a transaction "
+                      + "should be on a partitioned region when there are operations on both "
+                      + "partitioned regions and replicate regions.",
+                  keyInfo.getKey()));
+        }
         return new TransactionDataRebalancedException(
             "Transactional data moved, due to rebalancing.");
       }
@@ -263,12 +278,22 @@
       boolean retVal = getRealDeal(keyInfo, region).containsValueForKey(keyInfo, region);
       trackBucketForTx(keyInfo);
       return retVal;
-    } catch (TransactionDataRebalancedException | PrimaryBucketException re) {
-      throw getTransactionException(keyInfo, re);
+    } catch (TransactionDataRebalancedException transactionDataRebalancedException) {
+      throw handleTransactionDataRebalancedException(keyInfo, transactionDataRebalancedException);
+    } catch (PrimaryBucketException primaryBucketException) {
+      throw getTransactionException(keyInfo, primaryBucketException);
     }
   }
 
-  private void trackBucketForTx(KeyInfo keyInfo) {
+  private TransactionException handleTransactionDataRebalancedException(KeyInfo keyInfo,
+      TransactionDataRebalancedException transactionDataRebalancedException) {
+    if (isRealDealLocal()) {
+      return getTransactionException(keyInfo, transactionDataRebalancedException);
+    }
+    return transactionDataRebalancedException;
+  }
+
+  void trackBucketForTx(KeyInfo keyInfo) {
     if (keyInfo.getBucketId() >= 0) {
       if (logger.isDebugEnabled()) {
         logger.debug("adding bucket:{} for tx:{}", keyInfo.getBucketId(), getTransactionId());
@@ -287,8 +312,11 @@
       getRealDeal(event.getKeyInfo(), event.getRegion()).destroyExistingEntry(event, cacheWrite,
           expectedOldValue);
       trackBucketForTx(event.getKeyInfo());
-    } catch (TransactionDataRebalancedException | PrimaryBucketException re) {
-      throw getTransactionException(event.getKeyInfo(), re);
+    } catch (TransactionDataRebalancedException transactionDataRebalancedException) {
+      throw handleTransactionDataRebalancedException(event.getKeyInfo(),
+          transactionDataRebalancedException);
+    } catch (PrimaryBucketException primaryBucketException) {
+      throw getTransactionException(event.getKeyInfo(), primaryBucketException);
     }
   }
 
@@ -312,14 +340,21 @@
   public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion, boolean updateStats,
       boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent,
       boolean returnTombstones, boolean retainResult, boolean createIfAbsent) {
-    Object val = getRealDeal(keyInfo, localRegion).getDeserializedValue(keyInfo, localRegion,
-        updateStats, disableCopyOnRead, preferCD, null, false, retainResult, createIfAbsent);
-    if (val != null) {
-      // fixes bug 51057: TXStateStub on client always returns null, so do not increment
-      // the operation count it will be incremented in findObject()
-      this.operationCount++;
+    try {
+      Object val = getRealDeal(keyInfo, localRegion).getDeserializedValue(keyInfo, localRegion,
+          updateStats, disableCopyOnRead, preferCD, null, false, retainResult, createIfAbsent);
+      trackBucketForTx(keyInfo);
+      if (val != null) {
+        // fixes bug 51057: TXStateStub on client always returns null, so do not increment
+        // the operation count it will be incremented in findObject()
+        this.operationCount++;
+      }
+      return val;
+    } catch (TransactionDataRebalancedException transactionDataRebalancedException) {
+      throw handleTransactionDataRebalancedException(keyInfo, transactionDataRebalancedException);
+    } catch (PrimaryBucketException primaryBucketException) {
+      throw getTransactionException(keyInfo, primaryBucketException);
     }
-    return val;
   }
 
   @Override
@@ -329,8 +364,10 @@
       Entry retVal = getRealDeal(keyInfo, region).getEntry(keyInfo, region, allowTombstones);
       trackBucketForTx(keyInfo);
       return retVal;
-    } catch (TransactionDataRebalancedException | PrimaryBucketException re) {
-      throw getTransactionException(keyInfo, re);
+    } catch (TransactionDataRebalancedException transactionDataRebalancedException) {
+      throw handleTransactionDataRebalancedException(keyInfo, transactionDataRebalancedException);
+    } catch (PrimaryBucketException primaryBucketException) {
+      throw getTransactionException(keyInfo, primaryBucketException);
     }
   }
 
@@ -365,8 +402,11 @@
       getRealDeal(event.getKeyInfo(), event.getRegion()).invalidateExistingEntry(event,
           invokeCallbacks, forceNewEntry);
       trackBucketForTx(event.getKeyInfo());
-    } catch (TransactionDataRebalancedException | PrimaryBucketException re) {
-      throw getTransactionException(event.getKeyInfo(), re);
+    } catch (TransactionDataRebalancedException transactionDataRebalancedException) {
+      throw handleTransactionDataRebalancedException(event.getKeyInfo(),
+          transactionDataRebalancedException);
+    } catch (PrimaryBucketException primaryBucketException) {
+      throw getTransactionException(event.getKeyInfo(), primaryBucketException);
     }
   }
 
@@ -422,8 +462,11 @@
           .txPutEntry(event, ifNew, requireOldValue, checkResources, expectedOldValue);
       trackBucketForTx(event.getKeyInfo());
       return retVal;
-    } catch (TransactionDataRebalancedException | PrimaryBucketException re) {
-      throw getTransactionException(event.getKeyInfo(), re);
+    } catch (TransactionDataRebalancedException transactionDataRebalancedException) {
+      throw handleTransactionDataRebalancedException(event.getKeyInfo(),
+          transactionDataRebalancedException);
+    } catch (PrimaryBucketException primaryBucketException) {
+      throw getTransactionException(event.getKeyInfo(), primaryBucketException);
     }
   }
 
@@ -436,8 +479,10 @@
           rememberRead, createTxEntryIfAbsent);
       trackBucketForTx(keyInfo);
       return retVal;
-    } catch (TransactionDataRebalancedException | PrimaryBucketException re) {
-      throw getTransactionException(keyInfo, re);
+    } catch (TransactionDataRebalancedException transactionDataRebalancedException) {
+      throw handleTransactionDataRebalancedException(keyInfo, transactionDataRebalancedException);
+    } catch (PrimaryBucketException primaryBucketException) {
+      throw getTransactionException(keyInfo, primaryBucketException);
     }
   }
 
@@ -485,8 +530,10 @@
       boolean retVal = getRealDeal(keyInfo, localRegion).containsKey(keyInfo, localRegion);
       trackBucketForTx(keyInfo);
       return retVal;
-    } catch (TransactionDataRebalancedException | PrimaryBucketException re) {
-      throw getTransactionException(keyInfo, re);
+    } catch (TransactionDataRebalancedException transactionDataRebalancedException) {
+      throw handleTransactionDataRebalancedException(keyInfo, transactionDataRebalancedException);
+    } catch (PrimaryBucketException primaryBucketException) {
+      throw getTransactionException(keyInfo, primaryBucketException);
     }
   }
 
@@ -531,8 +578,10 @@
           disableCopyOnRead, preferCD, requestingClient, clientEvent, false);
       trackBucketForTx(key);
       return retVal;
-    } catch (TransactionDataRebalancedException | PrimaryBucketException re) {
-      throw getTransactionException(key, re);
+    } catch (TransactionDataRebalancedException transactionDataRebalancedException) {
+      throw handleTransactionDataRebalancedException(key, transactionDataRebalancedException);
+    } catch (PrimaryBucketException primaryBucketException) {
+      throw getTransactionException(key, primaryBucketException);
     }
   }
 
@@ -627,8 +676,11 @@
           ifOld, expectedOldValue, requireOldValue, lastModified, overwriteDestroyed);
       trackBucketForTx(event.getKeyInfo());
       return retVal;
-    } catch (TransactionDataRebalancedException | PrimaryBucketException re) {
-      throw getTransactionException(event.getKeyInfo(), re);
+    } catch (TransactionDataRebalancedException transactionDataRebalancedException) {
+      throw handleTransactionDataRebalancedException(event.getKeyInfo(),
+          transactionDataRebalancedException);
+    } catch (PrimaryBucketException primaryBucketException) {
+      throw getTransactionException(event.getKeyInfo(), primaryBucketException);
     }
   }
 
@@ -647,8 +699,17 @@
       ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent,
       boolean returnTombstones) throws DataLocationException {
     this.operationCount++;
-    return getRealDeal(key, localRegion).getSerializedValue(localRegion, key, doNotLockEntry,
-        requestingClient, clientEvent, returnTombstones);
+    try {
+      Object retVal =
+          getRealDeal(key, localRegion).getSerializedValue(localRegion, key, doNotLockEntry,
+              requestingClient, clientEvent, returnTombstones);
+      trackBucketForTx(key);
+      return retVal;
+    } catch (TransactionDataRebalancedException transactionDataRebalancedException) {
+      throw handleTransactionDataRebalancedException(key, transactionDataRebalancedException);
+    } catch (PrimaryBucketException primaryBucketException) {
+      throw getTransactionException(key, primaryBucketException);
+    }
   }
 
   @Override
@@ -658,8 +719,17 @@
     this.operationCount++;
     TXStateInterface tx = getRealDeal(event.getKeyInfo(), event.getRegion());
     assert (tx instanceof TXState) : tx.getClass().getSimpleName();
-    return tx.putEntryOnRemote(event, ifNew, ifOld, expectedOldValue, requireOldValue, lastModified,
-        overwriteDestroyed);
+    try {
+      boolean retVal = tx.putEntryOnRemote(event, ifNew, ifOld, expectedOldValue, requireOldValue,
+          lastModified, overwriteDestroyed);
+      trackBucketForTx(event.getKeyInfo());
+      return retVal;
+    } catch (TransactionDataRebalancedException transactionDataRebalancedException) {
+      throw handleTransactionDataRebalancedException(event.getKeyInfo(),
+          transactionDataRebalancedException);
+    } catch (PrimaryBucketException primaryBucketException) {
+      throw getTransactionException(event.getKeyInfo(), primaryBucketException);
+    }
   }
 
   @Override
@@ -673,7 +743,15 @@
     this.operationCount++;
     TXStateInterface tx = getRealDeal(event.getKeyInfo(), event.getRegion());
     assert (tx instanceof TXState);
-    tx.destroyOnRemote(event, cacheWrite, expectedOldValue);
+    try {
+      tx.destroyOnRemote(event, cacheWrite, expectedOldValue);
+      trackBucketForTx(event.getKeyInfo());
+    } catch (TransactionDataRebalancedException transactionDataRebalancedException) {
+      throw handleTransactionDataRebalancedException(event.getKeyInfo(),
+          transactionDataRebalancedException);
+    } catch (PrimaryBucketException primaryBucketException) {
+      throw getTransactionException(event.getKeyInfo(), primaryBucketException);
+    }
   }
 
   @Override
@@ -682,7 +760,15 @@
     this.operationCount++;
     TXStateInterface tx = getRealDeal(event.getKeyInfo(), event.getRegion());
     assert (tx instanceof TXState);
-    tx.invalidateOnRemote(event, invokeCallbacks, forceNewEntry);
+    try {
+      tx.invalidateOnRemote(event, invokeCallbacks, forceNewEntry);
+      trackBucketForTx(event.getKeyInfo());
+    } catch (TransactionDataRebalancedException transactionDataRebalancedException) {
+      throw handleTransactionDataRebalancedException(event.getKeyInfo(),
+          transactionDataRebalancedException);
+    } catch (PrimaryBucketException primaryBucketException) {
+      throw getTransactionException(event.getKeyInfo(), primaryBucketException);
+    }
   }
 
   @Override
@@ -728,7 +814,13 @@
     this.operationCount++;
     TXStateInterface tx = getRealDeal(keyInfo, localRegion);
     assert (tx instanceof TXState);
-    return tx.getEntryOnRemote(keyInfo, localRegion, allowTombstones);
+    try {
+      return tx.getEntryOnRemote(keyInfo, localRegion, allowTombstones);
+    } catch (TransactionDataRebalancedException transactionDataRebalancedException) {
+      throw handleTransactionDataRebalancedException(keyInfo, transactionDataRebalancedException);
+    } catch (PrimaryBucketException primaryBucketException) {
+      throw getTransactionException(keyInfo, primaryBucketException);
+    }
   }
 
   public void forceLocalBootstrap() {
@@ -893,8 +985,10 @@
       Entry retVal = getRealDeal(keyInfo, region).accessEntry(keyInfo, region);
       trackBucketForTx(keyInfo);
       return retVal;
-    } catch (TransactionDataRebalancedException | PrimaryBucketException re) {
-      throw getTransactionException(keyInfo, re);
+    } catch (TransactionDataRebalancedException transactionDataRebalancedException) {
+      throw handleTransactionDataRebalancedException(keyInfo, transactionDataRebalancedException);
+    } catch (PrimaryBucketException primaryBucketException) {
+      throw getTransactionException(keyInfo, primaryBucketException);
     }
   }
 
@@ -1004,4 +1098,7 @@
     return onBehalfOfClientMember;
   }
 
+  void setFirstOperationOnPartitionedRegion(boolean firstOperationOnPartitionedRegion) {
+    this.firstOperationOnPartitionedRegion = firstOperationOnPartitionedRegion;
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java
index 5e38c08..cf13e64 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java
@@ -77,10 +77,6 @@
     try {
       pr.destroyRemotely(state.getTarget(), event.getKeyInfo().getBucketId(), event,
           expectedOldValue);
-    } catch (TransactionException e) {
-      RuntimeException re = getTransactionException(event.getKeyInfo(), e);
-      re.initCause(e.getCause());
-      throw re;
     } catch (PrimaryBucketException e) {
       RuntimeException re = getTransactionException(event.getKeyInfo(), e);
       re.initCause(e);
@@ -104,7 +100,7 @@
   }
 
 
-  private RuntimeException getTransactionException(KeyInfo keyInfo, Throwable cause) {
+  RuntimeException getTransactionException(KeyInfo keyInfo, Throwable cause) {
     region.getCancelCriterion().checkCancelInProgress(cause); // fixes bug 44567
     Throwable ex = cause;
     while (ex != null) {
@@ -151,7 +147,7 @@
   /**
    * wait to retry after getting a ForceReattemptException
    */
-  private void waitToRetry() {
+  void waitToRetry() {
     // this is what PR operations do. The 2000ms is not used
     (new RetryTimeKeeper(2000)).waitForBucketsRecovery();
   }
@@ -167,10 +163,6 @@
       return e;
     } catch (EntryNotFoundException enfe) {
       return null;
-    } catch (TransactionException e) {
-      RuntimeException re = getTransactionException(keyInfo, e);
-      re.initCause(e.getCause());
-      throw re;
     } catch (PrimaryBucketException e) {
       RuntimeException re = getTransactionException(keyInfo, e);
       re.initCause(e);
@@ -193,7 +185,7 @@
   }
 
 
-  private void trackBucketForTx(KeyInfo keyInfo) {
+  void trackBucketForTx(KeyInfo keyInfo) {
     if (region.getCache().getLogger().fineEnabled()) {
       region.getCache().getLogger()
           .fine("adding bucket:" + keyInfo.getBucketId() + " for tx:" + state.getTransactionId());
@@ -210,10 +202,6 @@
     PartitionedRegion pr = (PartitionedRegion) event.getRegion();
     try {
       pr.invalidateRemotely(state.getTarget(), event.getKeyInfo().getBucketId(), event);
-    } catch (TransactionException e) {
-      RuntimeException re = getTransactionException(event.getKeyInfo(), e);
-      re.initCause(e.getCause());
-      throw re;
     } catch (PrimaryBucketException e) {
       RuntimeException re = getTransactionException(event.getKeyInfo(), e);
       re.initCause(e);
@@ -244,10 +232,6 @@
           keyInfo.getBucketId(), keyInfo.getKey());
       trackBucketForTx(keyInfo);
       return retVal;
-    } catch (TransactionException e) {
-      RuntimeException re = getTransactionException(keyInfo, e);
-      re.initCause(e.getCause());
-      throw re;
     } catch (PrimaryBucketException e) {
       RuntimeException re = getTransactionException(keyInfo, e);
       re.initCause(e);
@@ -272,7 +256,7 @@
   /**
    * @return true if the cause of the FRE is a BucketNotFoundException
    */
-  private boolean isBucketNotFoundException(ForceReattemptException e) {
+  boolean isBucketNotFoundException(ForceReattemptException e) {
     ForceReattemptException fre = e;
     while (fre.getCause() != null && fre.getCause() instanceof ForceReattemptException) {
       fre = (ForceReattemptException) fre.getCause();
@@ -288,10 +272,6 @@
           (InternalDistributedMember) state.getTarget(), keyInfo.getBucketId(), keyInfo.getKey());
       trackBucketForTx(keyInfo);
       return retVal;
-    } catch (TransactionException e) {
-      RuntimeException re = getTransactionException(keyInfo, e);
-      re.initCause(e.getCause());
-      throw re;
     } catch (PrimaryBucketException e) {
       RuntimeException re = getTransactionException(keyInfo, e);
       re.initCause(e);
@@ -312,7 +292,6 @@
     }
   }
 
-
   @Override
   public Object findObject(KeyInfo keyInfo, boolean isCreate, boolean generateCallbacks,
       Object value, boolean peferCD, ClientProxyMembershipID requestingClient,
@@ -324,10 +303,6 @@
       retVal =
           region.getRemotely((InternalDistributedMember) state.getTarget(), keyInfo.getBucketId(),
               key, callbackArgument, peferCD, requestingClient, clientEvent, false);
-    } catch (TransactionException e) {
-      RuntimeException re = getTransactionException(keyInfo, e);
-      re.initCause(e.getCause());
-      throw re;
     } catch (PrimaryBucketException e) {
       RuntimeException re = getTransactionException(keyInfo, e);
       re.initCause(e);
@@ -369,10 +344,6 @@
     try {
       retVal =
           pr.putRemotely(state.getTarget(), event, ifNew, ifOld, expectedOldValue, requireOldValue);
-    } catch (TransactionException e) {
-      RuntimeException re = getTransactionException(event.getKeyInfo(), e);
-      re.initCause(e.getCause());
-      throw re;
     } catch (PrimaryBucketException e) {
       RuntimeException re = getTransactionException(event.getKeyInfo(), e);
       re.initCause(e);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateProxyImplTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateProxyImplTest.java
index 42fe93f..d7658f1 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateProxyImplTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateProxyImplTest.java
@@ -24,6 +24,10 @@
 import org.junit.Before;
 import org.junit.Test;
 
+import org.apache.geode.GemFireException;
+import org.apache.geode.cache.TransactionDataNotColocatedException;
+import org.apache.geode.cache.TransactionDataRebalancedException;
+import org.apache.geode.cache.TransactionException;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
@@ -157,4 +161,84 @@
 
     tx.setTarget(remoteMember);
   }
+
+  @Test
+  public void txHostGetTransactionExceptionReturnsTransactionDataNotColocatedExceptionIfKeyNotInBuckets() {
+    TXStateProxyImpl tx = new TXStateProxyImpl(cache, txManager, txId, false, disabledClock());
+    tx.setLocalTXState(new TXState(tx, true, disabledClock()));
+    KeyInfo keyInfo1 = mock(KeyInfo.class);
+    when(keyInfo1.getBucketId()).thenReturn(1);
+    KeyInfo keyInfo2 = mock(KeyInfo.class);
+    when(keyInfo2.getBucketId()).thenReturn(2);
+    tx.trackBucketForTx(keyInfo1);
+
+    TransactionException transactionException =
+        tx.getTransactionException(keyInfo2, new PrimaryBucketException());
+
+    assertThat(transactionException).isInstanceOf(TransactionDataNotColocatedException.class);
+  }
+
+  @Test
+  public void txHostGetTransactionExceptionReturnsTransactionDataNotColocatedExceptionIfFirstOperationOnReplicate() {
+    TXStateProxyImpl tx = new TXStateProxyImpl(cache, txManager, txId, false, disabledClock());
+    tx.setLocalTXState(new TXState(tx, true, disabledClock()));
+    KeyInfo keyInfo = mock(KeyInfo.class);
+
+    TransactionException transactionException =
+        tx.getTransactionException(keyInfo, new PrimaryBucketException());
+
+    assertThat(transactionException).isInstanceOf(TransactionDataNotColocatedException.class);
+  }
+
+  @Test
+  public void txHostGetTransactionExceptionReturnsTransactionDataRebalancedExceptionIfFirstOperationOnPartitioned() {
+    TXStateProxyImpl tx = new TXStateProxyImpl(cache, txManager, txId, false, disabledClock());
+    tx.setLocalTXState(new TXState(tx, true, disabledClock()));
+    KeyInfo keyInfo = mock(KeyInfo.class);
+    GemFireException exception = mock(GemFireException.class);
+    when(exception.getCause()).thenReturn(new PrimaryBucketException());
+    tx.setFirstOperationOnPartitionedRegion(true);
+
+    TransactionException transactionException = tx.getTransactionException(keyInfo, exception);
+
+    assertThat(transactionException).isInstanceOf(TransactionDataRebalancedException.class);
+  }
+
+  @Test
+  public void txHostGetTransactionExceptionReturnsSameTransactionExceptionIfNotCausedByPrimaryBucketException() {
+    TXStateProxyImpl tx = new TXStateProxyImpl(cache, txManager, txId, false, disabledClock());
+    tx.setLocalTXState(new TXState(tx, true, disabledClock()));
+    TransactionException exception = mock(TransactionException.class);
+    KeyInfo keyInfo = mock(KeyInfo.class);
+
+    TransactionException transactionException = tx.getTransactionException(keyInfo, exception);
+
+    assertThat(transactionException).isSameAs(exception);
+  }
+
+  @Test
+  public void txStubGetTransactionExceptionReturnsTransactionDataRebalancedExceptionIfCausedByPrimaryBucketException() {
+    TXStateProxyImpl tx = new TXStateProxyImpl(cache, txManager, txId, false, disabledClock());
+    DistributedMember target = mock(InternalDistributedMember.class);
+    tx.setLocalTXState(new PeerTXStateStub(tx, target, null));
+    KeyInfo keyInfo = mock(KeyInfo.class);
+
+    TransactionException transactionException =
+        tx.getTransactionException(keyInfo, new PrimaryBucketException());
+
+    assertThat(transactionException).isInstanceOf(TransactionDataRebalancedException.class);
+  }
+
+  @Test
+  public void txStubGetTransactionExceptionReturnsSameTransactionExceptionIfNotCausedByPrimaryBucketException() {
+    TXStateProxyImpl tx = new TXStateProxyImpl(cache, txManager, txId, false, disabledClock());
+    DistributedMember target = mock(InternalDistributedMember.class);
+    tx.setLocalTXState(new PeerTXStateStub(tx, target, null));
+    TransactionException exception = mock(TransactionException.class);
+    KeyInfo keyInfo = mock(KeyInfo.class);
+
+    TransactionException transactionException = tx.getTransactionException(keyInfo, exception);
+
+    assertThat(transactionException).isSameAs(exception);
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStubTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStubTest.java
new file mode 100644
index 0000000..809b53c
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStubTest.java
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tx;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
+import org.apache.geode.cache.TransactionDataRebalancedException;
+import org.apache.geode.cache.TransactionException;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EntrySnapshot;
+import org.apache.geode.internal.cache.ForceReattemptException;
+import org.apache.geode.internal.cache.KeyInfo;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.TXStateStub;
+
+public class PartitionedTXRegionStubTest {
+  private TXStateStub txStateStub;
+  private PartitionedRegion partitionedRegion;
+  private EntryEventImpl event;
+  private Object expectedObject;
+  private TransactionException expectedException;
+  private DistributedMember remoteTransactionHost;
+  private KeyInfo keyInfo;
+  private Object key;
+
+  @Before
+  public void setup() {
+    txStateStub = mock(TXStateStub.class);
+    partitionedRegion = mock(PartitionedRegion.class, RETURNS_DEEP_STUBS);
+    event = mock(EntryEventImpl.class);
+    expectedObject = new Object();
+    expectedException = new TransactionException();
+    remoteTransactionHost = mock(InternalDistributedMember.class);
+    keyInfo = mock(KeyInfo.class);
+    key = new Object();
+    when(txStateStub.getTarget()).thenReturn(remoteTransactionHost);
+    when(event.getKeyInfo()).thenReturn(keyInfo);
+    when(keyInfo.getKey()).thenReturn(key);
+    when(keyInfo.getBucketId()).thenReturn(1);
+  }
+
+  @Test
+  public void destroyExistingEntryTracksBucketForTx() {
+    PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+    when(event.getRegion()).thenReturn(partitionedRegion);
+
+    stub.destroyExistingEntry(event, true, expectedObject);
+
+    verify(stub).trackBucketForTx(keyInfo);
+  }
+
+  @Test
+  public void destroyExistingEntryThrowsTransactionExceptionFromRemoteHost() throws Exception {
+    PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+    when(event.getRegion()).thenReturn(partitionedRegion);
+    doThrow(expectedException).when(partitionedRegion).destroyRemotely(remoteTransactionHost, 1,
+        event, expectedObject);
+
+    Throwable caughtException = catchThrowable(() -> stub.destroyExistingEntry(event, true,
+        expectedObject));
+
+    assertThat(caughtException).isSameAs(expectedException);
+    verify(stub, never()).trackBucketForTx(keyInfo);
+  }
+
+  @Test
+  public void destroyExistingEntryThrowsTransactionDataRebalancedExceptionIfIsBucketNotFoundException()
+      throws Exception {
+    PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+    when(event.getRegion()).thenReturn(partitionedRegion);
+    ForceReattemptException forceReattemptException = mock(ForceReattemptException.class);
+    doReturn(true).when(stub).isBucketNotFoundException(forceReattemptException);
+    doNothing().when(stub).waitToRetry();
+    doThrow(forceReattemptException).when(partitionedRegion).destroyRemotely(remoteTransactionHost,
+        1, event, expectedObject);
+
+    Throwable caughtException = catchThrowable(() -> stub.destroyExistingEntry(event, false,
+        expectedObject));
+
+    assertThat(caughtException).isInstanceOf(TransactionDataRebalancedException.class);
+    verify(stub, never()).trackBucketForTx(keyInfo);
+  }
+
+  @Test
+  public void destroyExistingEntryThrowsTransactionDataNodeHasDepartedExceptionIfIsNotBucketNotFoundException()
+      throws Exception {
+    PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+    when(event.getRegion()).thenReturn(partitionedRegion);
+    ForceReattemptException forceReattemptException = mock(ForceReattemptException.class);
+    doReturn(false).when(stub).isBucketNotFoundException(forceReattemptException);
+    doNothing().when(stub).waitToRetry();
+    doThrow(forceReattemptException).when(partitionedRegion).destroyRemotely(remoteTransactionHost,
+        1, event, expectedObject);
+
+    Throwable caughtException = catchThrowable(() -> stub.destroyExistingEntry(event, true,
+        expectedObject));
+
+    assertThat(caughtException).isInstanceOf(TransactionDataNodeHasDepartedException.class);
+    verify(stub, never()).trackBucketForTx(keyInfo);
+  }
+
+  @Test
+  public void getEntryReturnsEntryGotFromRemote() throws Exception {
+    PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+    EntrySnapshot entry = mock(EntrySnapshot.class);
+    when(event.getRegion()).thenReturn(partitionedRegion);
+    when(partitionedRegion.getEntryRemotely((InternalDistributedMember) remoteTransactionHost, 1,
+        key, false, true)).thenReturn((entry));
+
+    assertThat(stub.getEntry(keyInfo, true)).isEqualTo(entry);
+    verify(stub).trackBucketForTx(keyInfo);
+  }
+
+  @Test
+  public void getEntryThrowsTransactionExceptionFromRemoteHost() throws Exception {
+    PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+    when(event.getRegion()).thenReturn(partitionedRegion);
+    doThrow(expectedException).when(partitionedRegion)
+        .getEntryRemotely((InternalDistributedMember) remoteTransactionHost, 1, key, false, true);
+
+    Throwable caughtException = catchThrowable(() -> stub.getEntry(keyInfo, true));
+
+    assertThat(caughtException).isSameAs(expectedException);
+    verify(stub, never()).trackBucketForTx(keyInfo);
+  }
+
+  @Test
+  public void getEntryThrowsTransactionDataRebalancedExceptionIfIsBucketNotFoundException()
+      throws Exception {
+    PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+    when(event.getRegion()).thenReturn(partitionedRegion);
+    ForceReattemptException forceReattemptException = mock(ForceReattemptException.class);
+    doReturn(true).when(stub).isBucketNotFoundException(forceReattemptException);
+    doNothing().when(stub).waitToRetry();
+    doThrow(forceReattemptException).when(partitionedRegion)
+        .getEntryRemotely((InternalDistributedMember) remoteTransactionHost, 1, key, false, true);
+
+    Throwable caughtException = catchThrowable(() -> stub.getEntry(keyInfo, true));
+
+    assertThat(caughtException).isInstanceOf(TransactionDataRebalancedException.class);
+    verify(stub, never()).trackBucketForTx(keyInfo);
+  }
+
+  @Test
+  public void getEntryThrowsTransactionDataNodeHasDepartedExceptionIfIsNotBucketNotFoundException()
+      throws Exception {
+    PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+    when(event.getRegion()).thenReturn(partitionedRegion);
+
+    ForceReattemptException forceReattemptException = mock(ForceReattemptException.class);
+    doReturn(false).when(stub).isBucketNotFoundException(forceReattemptException);
+    doNothing().when(stub).waitToRetry();
+    doThrow(forceReattemptException).when(partitionedRegion)
+        .getEntryRemotely((InternalDistributedMember) remoteTransactionHost, 1, key, false, false);
+
+    Throwable caughtException = catchThrowable(() -> stub.getEntry(keyInfo, false));
+
+    assertThat(caughtException).isInstanceOf(TransactionDataNodeHasDepartedException.class);
+    verify(stub, never()).trackBucketForTx(keyInfo);
+  }
+
+  @Test
+  public void invalidateExistingEntryTracksBucketForTx() {
+    PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+    when(event.getRegion()).thenReturn(partitionedRegion);
+
+    stub.invalidateExistingEntry(event, true, false);
+
+    verify(stub).trackBucketForTx(keyInfo);
+  }
+
+  @Test
+  public void invalidateExistingEntryThrowsTransactionExceptionFromRemoteHost() throws Exception {
+    PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+    when(event.getRegion()).thenReturn(partitionedRegion);
+    when(keyInfo.getBucketId()).thenReturn(1);
+    doThrow(expectedException).when(partitionedRegion).invalidateRemotely(remoteTransactionHost, 1,
+        event);
+
+    Throwable caughtException =
+        catchThrowable(() -> stub.invalidateExistingEntry(event, false, false));
+
+    assertThat(caughtException).isSameAs(expectedException);
+    verify(stub, never()).trackBucketForTx(keyInfo);
+  }
+
+  @Test
+  public void invalidateExistingEntryThrowsTransactionDataRebalancedExceptionIfIsBucketNotFoundException()
+      throws Exception {
+    PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+    when(event.getRegion()).thenReturn(partitionedRegion);
+    when(keyInfo.getBucketId()).thenReturn(1);
+    ForceReattemptException forceReattemptException = mock(ForceReattemptException.class);
+    doReturn(true).when(stub).isBucketNotFoundException(forceReattemptException);
+    doNothing().when(stub).waitToRetry();
+    doThrow(forceReattemptException).when(partitionedRegion)
+        .invalidateRemotely(remoteTransactionHost, 1, event);
+
+    Throwable caughtException =
+        catchThrowable(() -> stub.invalidateExistingEntry(event, false, false));
+
+    assertThat(caughtException).isInstanceOf(TransactionDataRebalancedException.class);
+    verify(stub, never()).trackBucketForTx(keyInfo);
+  }
+
+  @Test
+  public void invalidateExistingEntryThrowsTransactionDataNodeHasDepartedExceptionIfIsNotBucketNotFoundException()
+      throws Exception {
+    PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+    when(event.getRegion()).thenReturn(partitionedRegion);
+    when(keyInfo.getBucketId()).thenReturn(1);
+    ForceReattemptException forceReattemptException = mock(ForceReattemptException.class);
+    doReturn(false).when(stub).isBucketNotFoundException(forceReattemptException);
+    doNothing().when(stub).waitToRetry();
+    doThrow(forceReattemptException).when(partitionedRegion)
+        .invalidateRemotely(remoteTransactionHost, 1, event);
+
+    Throwable caughtException =
+        catchThrowable(() -> stub.invalidateExistingEntry(event, false, false));
+
+    assertThat(caughtException).isInstanceOf(TransactionDataNodeHasDepartedException.class);
+    verify(stub, never()).trackBucketForTx(keyInfo);
+  }
+
+  @Test
+  public void containsKeyReturnsTrueIfContainsKeyRemotelyReturnsTrue() throws Exception {
+    PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+    when(event.getRegion()).thenReturn(partitionedRegion);
+    when(partitionedRegion.containsKeyRemotely((InternalDistributedMember) remoteTransactionHost, 1,
+        key)).thenReturn(true);
+
+    assertThat(stub.containsKey(keyInfo)).isTrue();
+    verify(stub).trackBucketForTx(keyInfo);
+  }
+
+  @Test
+  public void containsKeyReturnsFalseIfContainsKeyRemotelyReturnsFalse() throws Exception {
+    PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+    when(event.getRegion()).thenReturn(partitionedRegion);
+    when(partitionedRegion.containsKeyRemotely((InternalDistributedMember) remoteTransactionHost, 1,
+        key)).thenReturn(false);
+
+    assertThat(stub.containsKey(keyInfo)).isFalse();
+    verify(stub).trackBucketForTx(keyInfo);
+  }
+
+  @Test
+  public void containsKeyThrowsTransactionExceptionFromRemoteHost() throws Exception {
+    PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+    when(event.getRegion()).thenReturn(partitionedRegion);
+    when(partitionedRegion.containsKeyRemotely((InternalDistributedMember) remoteTransactionHost, 1,
+        key)).thenThrow(expectedException);
+
+    Throwable caughtException = catchThrowable(() -> stub.containsKey(keyInfo));
+
+    assertThat(caughtException).isSameAs(expectedException);
+    verify(stub, never()).trackBucketForTx(keyInfo);
+  }
+
+  @Test
+  public void containsKeyThrowsTransactionExceptionIfIsBucketNotFoundException()
+      throws Exception {
+    PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+    when(event.getRegion()).thenReturn(partitionedRegion);
+    ForceReattemptException forceReattemptException = mock(ForceReattemptException.class);
+    doReturn(true).when(stub).isBucketNotFoundException(forceReattemptException);
+    when(partitionedRegion.containsKeyRemotely((InternalDistributedMember) remoteTransactionHost, 1,
+        key)).thenThrow(forceReattemptException);
+    doReturn(expectedException).when(stub).getTransactionException(keyInfo,
+        forceReattemptException);
+
+    Throwable caughtException = catchThrowable(() -> stub.containsKey(keyInfo));
+
+    assertThat(caughtException).isInstanceOf(TransactionException.class);
+    verify(stub).getTransactionException(keyInfo, forceReattemptException);
+    verify(stub, never()).trackBucketForTx(keyInfo);
+  }
+
+  @Test
+  public void containsKeyThrowsTransactionDataNodeHasDepartedExceptionIfIsNotBucketNotFoundException()
+      throws Exception {
+    PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+    when(event.getRegion()).thenReturn(partitionedRegion);
+
+    ForceReattemptException forceReattemptException = mock(ForceReattemptException.class);
+    doNothing().when(stub).waitToRetry();
+    doReturn(false).when(stub).isBucketNotFoundException(forceReattemptException);
+    when(partitionedRegion.containsKeyRemotely((InternalDistributedMember) remoteTransactionHost, 1,
+        key)).thenThrow(forceReattemptException);
+
+    Throwable caughtException = catchThrowable(() -> stub.containsKey(keyInfo));
+
+    assertThat(caughtException).isInstanceOf(TransactionDataNodeHasDepartedException.class);
+    verify(stub, never()).trackBucketForTx(keyInfo);
+  }
+
+  @Test
+  public void containsValueForKeyReturnsTrueIfContainsKeyRemotelyReturnsTrue() throws Exception {
+    PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+    when(event.getRegion()).thenReturn(partitionedRegion);
+    when(partitionedRegion
+        .containsValueForKeyRemotely((InternalDistributedMember) remoteTransactionHost, 1, key))
+            .thenReturn(true);
+
+    assertThat(stub.containsValueForKey(keyInfo)).isTrue();
+    verify(stub).trackBucketForTx(keyInfo);
+  }
+
+  @Test
+  public void containsValueForKeyRemotelyReturnsFalseIfContainsKeyRemotelyReturnsFalse()
+      throws Exception {
+    PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+    when(event.getRegion()).thenReturn(partitionedRegion);
+    when(partitionedRegion
+        .containsValueForKeyRemotely((InternalDistributedMember) remoteTransactionHost, 1, key))
+            .thenReturn(false);
+
+    assertThat(stub.containsValueForKey(keyInfo)).isFalse();
+    verify(stub).trackBucketForTx(keyInfo);
+  }
+
+  @Test
+  public void containsValueForKeyThrowsTransactionExceptionFromRemoteHost() throws Exception {
+    PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+    when(event.getRegion()).thenReturn(partitionedRegion);
+    when(partitionedRegion
+        .containsValueForKeyRemotely((InternalDistributedMember) remoteTransactionHost, 1, key))
+            .thenThrow(expectedException);
+
+    Throwable caughtException = catchThrowable(() -> stub.containsValueForKey(keyInfo));
+
+    assertThat(caughtException).isSameAs(expectedException);
+    verify(stub, never()).trackBucketForTx(keyInfo);
+  }
+
+  @Test
+  public void containsValueForKeyThrowsTransactionExceptionIfIsBucketNotFoundException()
+      throws Exception {
+    PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+    when(event.getRegion()).thenReturn(partitionedRegion);
+    ForceReattemptException forceReattemptException = mock(ForceReattemptException.class);
+    doReturn(true).when(stub).isBucketNotFoundException(forceReattemptException);
+    doReturn(expectedException).when(stub).getTransactionException(keyInfo,
+        forceReattemptException);
+    when(partitionedRegion
+        .containsValueForKeyRemotely((InternalDistributedMember) remoteTransactionHost, 1, key))
+            .thenThrow(forceReattemptException);
+
+    Throwable caughtException = catchThrowable(() -> stub.containsValueForKey(keyInfo));
+
+    assertThat(caughtException).isInstanceOf(TransactionException.class);
+    verify(stub, never()).trackBucketForTx(keyInfo);
+  }
+
+  @Test
+  public void containsValueForKeyThrowsTransactionDataNodeHasDepartedExceptionIfIsNotBucketNotFoundException()
+      throws Exception {
+    PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+    when(event.getRegion()).thenReturn(partitionedRegion);
+
+    ForceReattemptException forceReattemptException = mock(ForceReattemptException.class);
+    doNothing().when(stub).waitToRetry();
+    doReturn(false).when(stub).isBucketNotFoundException(forceReattemptException);
+    when(partitionedRegion
+        .containsValueForKeyRemotely((InternalDistributedMember) remoteTransactionHost, 1, key))
+            .thenThrow(forceReattemptException);
+
+    Throwable caughtException = catchThrowable(() -> stub.containsValueForKey(keyInfo));
+
+    assertThat(caughtException).isInstanceOf(TransactionDataNodeHasDepartedException.class);
+    verify(stub, never()).trackBucketForTx(keyInfo);
+  }
+
+  @Test
+  public void findObjectReturnsObjectFoundFromRemote() throws Exception {
+    PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+    EntrySnapshot entry = mock(EntrySnapshot.class);
+    when(event.getRegion()).thenReturn(partitionedRegion);
+    when(partitionedRegion.getRemotely((InternalDistributedMember) remoteTransactionHost, 1,
+        key, null, true, null, event, false)).thenReturn((expectedObject));
+
+    assertThat(stub.findObject(keyInfo, false, false, null, true, null, event))
+        .isEqualTo(expectedObject);
+    verify(stub).trackBucketForTx(keyInfo);
+  }
+
+  @Test
+  public void findObjectThrowsTransactionExceptionFromRemoteHost() throws Exception {
+    PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub,
+        partitionedRegion));
+    when(event.getRegion()).thenReturn(partitionedRegion);
+    doThrow(expectedException).when(partitionedRegion).getRemotely(
+        (InternalDistributedMember) remoteTransactionHost, 1, key, null, true, null, event, false);
+
+    Throwable caughtException =
+        catchThrowable(() -> stub.findObject(keyInfo, false, false, null, true, null, event));
+
+    assertThat(caughtException).isSameAs(expectedException);
+    verify(stub, never()).trackBucketForTx(keyInfo);
+  }
+
+  @Test
+  public void findObjectThrowsTransactionExceptionIfIsBucketNotFoundException()
+      throws Exception {
+    PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub,
+        partitionedRegion));
+    when(event.getRegion()).thenReturn(partitionedRegion);
+    ForceReattemptException forceReattemptException = mock(ForceReattemptException.class);
+    doReturn(true).when(stub).isBucketNotFoundException(forceReattemptException);
+    doNothing().when(stub).waitToRetry();
+    doThrow(forceReattemptException).when(partitionedRegion)
+        .getRemotely((InternalDistributedMember) remoteTransactionHost, 1, key, null, true, null,
+            event, false);
+    doReturn(expectedException).when(stub).getTransactionException(keyInfo,
+        forceReattemptException);
+
+    Throwable caughtException =
+        catchThrowable(() -> stub.findObject(keyInfo, false, false, null, true, null, event));
+
+    assertThat(caughtException).isInstanceOf(TransactionException.class);
+    verify(stub, never()).trackBucketForTx(keyInfo);
+  }
+
+  @Test
+  public void findObjectThrowsTransactionExceptionIfIsNotBucketNotFoundException()
+      throws Exception {
+    PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub,
+        partitionedRegion));
+    when(event.getRegion()).thenReturn(partitionedRegion);
+
+    ForceReattemptException forceReattemptException = mock(ForceReattemptException.class);
+    doReturn(false).when(stub).isBucketNotFoundException(forceReattemptException);
+    doNothing().when(stub).waitToRetry();
+    doThrow(forceReattemptException).when(partitionedRegion)
+        .getRemotely((InternalDistributedMember) remoteTransactionHost, 1, key, null, true, null,
+            event, false);
+    doReturn(expectedException).when(stub).getTransactionException(keyInfo,
+        forceReattemptException);
+
+    Throwable caughtException =
+        catchThrowable(() -> stub.findObject(keyInfo, false, false, null, true, null, event));
+
+    assertThat(caughtException).isInstanceOf(TransactionException.class);
+    verify(stub, never()).trackBucketForTx(keyInfo);
+  }
+
+  @Test
+  public void putEntryReturnsTrueIfPutRemotelyReturnsTrue() throws Exception {
+    PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+    when(event.getRegion()).thenReturn(partitionedRegion);
+    when(partitionedRegion
+        .putRemotely((InternalDistributedMember) remoteTransactionHost, event, false, false,
+            expectedObject, true))
+                .thenReturn(true);
+
+    assertThat(stub.putEntry(event, false, false, expectedObject, true, 1, false)).isTrue();
+    verify(stub).trackBucketForTx(keyInfo);
+  }
+
+  @Test
+  public void putEntryReturnsFalseIfPutRemotelyReturnsFalse()
+      throws Exception {
+    PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+    when(event.getRegion()).thenReturn(partitionedRegion);
+    when(partitionedRegion
+        .putRemotely((InternalDistributedMember) remoteTransactionHost, event, false, false,
+            expectedObject, true))
+                .thenReturn(false);
+
+    assertThat(stub.putEntry(event, false, false, expectedObject, true, 1, false)).isFalse();
+    verify(stub).trackBucketForTx(keyInfo);
+  }
+
+  @Test
+  public void putEntryThrowsTransactionExceptionFromRemoteHost() throws Exception {
+    PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+    when(event.getRegion()).thenReturn(partitionedRegion);
+    when(partitionedRegion
+        .putRemotely((InternalDistributedMember) remoteTransactionHost, event, true, false,
+            expectedObject, true))
+                .thenThrow(expectedException);
+
+    Throwable caughtException =
+        catchThrowable(() -> stub.putEntry(event, true, false, expectedObject, true, 1, false));
+
+    assertThat(caughtException).isSameAs(expectedException);
+    verify(stub, never()).trackBucketForTx(keyInfo);
+  }
+
+  @Test
+  public void putEntryThrowsTransactionExceptionIfIsBucketNotFoundException()
+      throws Exception {
+    PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+    when(event.getRegion()).thenReturn(partitionedRegion);
+    ForceReattemptException forceReattemptException = mock(ForceReattemptException.class);
+    doReturn(true).when(stub).isBucketNotFoundException(forceReattemptException);
+    doReturn(expectedException).when(stub).getTransactionException(keyInfo,
+        forceReattemptException);
+    when(partitionedRegion
+        .putRemotely((InternalDistributedMember) remoteTransactionHost, event, true, false,
+            expectedObject, true))
+                .thenThrow(forceReattemptException);
+
+    Throwable caughtException =
+        catchThrowable(() -> stub.putEntry(event, true, false, expectedObject, true, 1, false));
+
+    assertThat(caughtException).isInstanceOf(TransactionException.class);
+    verify(stub, never()).trackBucketForTx(keyInfo);
+  }
+
+  @Test
+  public void putEntryThrowsTransactionExceptionIfIsNotBucketNotFoundException()
+      throws Exception {
+    PartitionedTXRegionStub stub = spy(new PartitionedTXRegionStub(txStateStub, partitionedRegion));
+    when(event.getRegion()).thenReturn(partitionedRegion);
+
+    ForceReattemptException forceReattemptException = mock(ForceReattemptException.class);
+    doNothing().when(stub).waitToRetry();
+    doReturn(false).when(stub).isBucketNotFoundException(forceReattemptException);
+    doReturn(expectedException).when(stub).getTransactionException(keyInfo,
+        forceReattemptException);
+    when(partitionedRegion
+        .putRemotely((InternalDistributedMember) remoteTransactionHost, event, true, false,
+            expectedObject, true))
+                .thenThrow(forceReattemptException);
+
+    Throwable caughtException =
+        catchThrowable(() -> stub.putEntry(event, true, false, expectedObject, true, 1, false));
+
+    assertThat(caughtException).isInstanceOf(TransactionException.class);
+    verify(stub, never()).trackBucketForTx(keyInfo);
+  }
+
+}