GEODE-8292: Added check if key is destroyed in CQResults (#5426)

diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/ServerCQ.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/ServerCQ.java
index 08bfba3..1f004f9 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/ServerCQ.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/ServerCQ.java
@@ -77,6 +77,11 @@
   boolean isOldValueRequiredForQueryProcessing(Object key);
 
   /**
+   * Returns true if key is in destroy token mode.
+   */
+  boolean isKeyDestroyed(Object key);
+
+  /**
    * Closes the Query. On Client side, sends the cq close request to server. On Server side, takes
    * care of repository cleanup.
    *
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java
index dd50836..f46b714 100644
--- a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java
@@ -51,13 +51,14 @@
 
   private MemberVM locator;
   private MemberVM server;
-  private int locator1Port;
+  private MemberVM server2;
 
   private CqAttributes cqa;
   private QueryService qs;
   private TestCqListener testListener;
   private TestCqListener2 testListener2;
 
+  private Region region;
 
   @Rule
   public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
@@ -69,8 +70,11 @@
     server = clusterStartupRule.startServerVM(3, locator1Port);
     createServerRegion(server, RegionShortcut.PARTITION);
 
+    server2 = clusterStartupRule.startServerVM(4, locator1Port);
+    createServerRegion(server2, RegionShortcut.PARTITION);
+
     ClientCache clientCache = createClientCache(locator1Port);
-    Region region =
+    region =
         clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("region");
 
     qs = clientCache.getQueryService();
@@ -233,6 +237,39 @@
     await().untilAsserted(() -> assertThat(testListener2.onEventUpdateCalls).isEqualTo(0));
   }
 
+  @Test
+  public void cqWithTransaction2Servers() throws Exception {
+
+    qs.newCq("Select * from /region r where r.ID = 1", cqa).execute();
+
+    final CacheTransactionManager txMgr = region.getCache().getCacheTransactionManager();
+
+    // CREATE new entry
+    for (int i = 0; i < 4; i++) {
+      txMgr.begin();
+      region.put(i, new Portfolio(1));
+      txMgr.commit();
+    }
+
+    // UPDATE
+    for (int i = 0; i < 4; i++) {
+      txMgr.begin();
+      region.put(i, new Portfolio(0));
+      txMgr.commit();
+    }
+
+    // CREATE
+    for (int i = 0; i < 4; i++) {
+      txMgr.begin();
+      region.put(i, new Portfolio(1));
+      txMgr.commit();
+    }
+
+    await().untilAsserted(() -> assertThat(testListener2.onEventCreateCalls).isEqualTo(8));
+    await().untilAsserted(() -> assertThat(testListener2.onEventUpdateCalls).isEqualTo(0));
+  }
+
+
   private class TestCqListener implements CqListener, Serializable {
     public int onEventCalls = 0;
 
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqServiceImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqServiceImpl.java
index bf09bd7..554a49c 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqServiceImpl.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqServiceImpl.java
@@ -1367,7 +1367,8 @@
                 // Partitioned Regions. Once this is added remove the check
                 // with PR region.
                 if (cQuery.isCqResultsCacheInitialized()) {
-                  b_cqResults_oldValue = cQuery.isPartOfCqResult(eventKey);
+                  b_cqResults_oldValue =
+                      (cQuery.isPartOfCqResult(eventKey) && !cQuery.isKeyDestroyed(eventKey));
                   // For PR if not found in cache, apply the query on old value.
                   // Also apply if the query was not executed during cq execute
                   if ((cQuery.isPR || !CqServiceImpl.EXECUTE_QUERY_DURING_INIT)
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQImpl.java
index e48ec2b..2f702ac 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQImpl.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQImpl.java
@@ -333,6 +333,14 @@
     return serverCQResultsCache.isOldValueRequiredForQueryProcessing(key);
   }
 
+  @Override
+  public boolean isKeyDestroyed(Object key) {
+    if (!serverCQResultsCache.contains(key)) {
+      return false;
+    }
+    return serverCQResultsCache.isKeyDestroyed(key);
+  }
+
   /**
    * Closes the Query. On Client side, sends the cq close request to server. On Server side, takes
    * care of repository cleanup.
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCache.java b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCache.java
index 87c9693..02db2be 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCache.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCache.java
@@ -44,5 +44,7 @@
 
   boolean isOldValueRequiredForQueryProcessing(Object key);
 
+  boolean isKeyDestroyed(Object key);
+
   void clear();
 }
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheNoOpImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheNoOpImpl.java
index 4f0f1cf..7248d5a 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheNoOpImpl.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheNoOpImpl.java
@@ -64,5 +64,10 @@
   }
 
   @Override
+  public boolean isKeyDestroyed(Object key) {
+    return false;
+  }
+
+  @Override
   public void clear() {}
 }
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCachePartitionRegionImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCachePartitionRegionImpl.java
index 882a923..8f7279c 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCachePartitionRegionImpl.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCachePartitionRegionImpl.java
@@ -127,6 +127,11 @@
   }
 
   @Override
+  public boolean isKeyDestroyed(Object key) {
+    return (cqResultKeys.get(key) == Token.DESTROYED);
+  }
+
+  @Override
   public void clear() {
     cqResultKeys.clear();
   }
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheReplicateRegionImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheReplicateRegionImpl.java
index 3d987b8..dd66d03 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheReplicateRegionImpl.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheReplicateRegionImpl.java
@@ -167,6 +167,11 @@
   }
 
   @Override
+  public boolean isKeyDestroyed(Object key) {
+    return false;
+  }
+
+  @Override
   public void clear() {
     // Clean-up the CQ Results Cache.
     synchronized (LOCK) {