GEODE-8292: Added check if key is destroyed in CQResults (#5426)
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/ServerCQ.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/ServerCQ.java
index 08bfba3..1f004f9 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/ServerCQ.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/ServerCQ.java
@@ -77,6 +77,11 @@
boolean isOldValueRequiredForQueryProcessing(Object key);
/**
+ * Returns true if key is in destroy token mode.
+ */
+ boolean isKeyDestroyed(Object key);
+
+ /**
* Closes the Query. On Client side, sends the cq close request to server. On Server side, takes
* care of repository cleanup.
*
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java
index dd50836..f46b714 100644
--- a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java
@@ -51,13 +51,14 @@
private MemberVM locator;
private MemberVM server;
- private int locator1Port;
+ private MemberVM server2;
private CqAttributes cqa;
private QueryService qs;
private TestCqListener testListener;
private TestCqListener2 testListener2;
+ private Region region;
@Rule
public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
@@ -69,8 +70,11 @@
server = clusterStartupRule.startServerVM(3, locator1Port);
createServerRegion(server, RegionShortcut.PARTITION);
+ server2 = clusterStartupRule.startServerVM(4, locator1Port);
+ createServerRegion(server2, RegionShortcut.PARTITION);
+
ClientCache clientCache = createClientCache(locator1Port);
- Region region =
+ region =
clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("region");
qs = clientCache.getQueryService();
@@ -233,6 +237,39 @@
await().untilAsserted(() -> assertThat(testListener2.onEventUpdateCalls).isEqualTo(0));
}
+ @Test
+ public void cqWithTransaction2Servers() throws Exception {
+
+ qs.newCq("Select * from /region r where r.ID = 1", cqa).execute();
+
+ final CacheTransactionManager txMgr = region.getCache().getCacheTransactionManager();
+
+ // CREATE new entry
+ for (int i = 0; i < 4; i++) {
+ txMgr.begin();
+ region.put(i, new Portfolio(1));
+ txMgr.commit();
+ }
+
+ // UPDATE
+ for (int i = 0; i < 4; i++) {
+ txMgr.begin();
+ region.put(i, new Portfolio(0));
+ txMgr.commit();
+ }
+
+ // CREATE
+ for (int i = 0; i < 4; i++) {
+ txMgr.begin();
+ region.put(i, new Portfolio(1));
+ txMgr.commit();
+ }
+
+ await().untilAsserted(() -> assertThat(testListener2.onEventCreateCalls).isEqualTo(8));
+ await().untilAsserted(() -> assertThat(testListener2.onEventUpdateCalls).isEqualTo(0));
+ }
+
+
private class TestCqListener implements CqListener, Serializable {
public int onEventCalls = 0;
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqServiceImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqServiceImpl.java
index bf09bd7..554a49c 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqServiceImpl.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqServiceImpl.java
@@ -1367,7 +1367,8 @@
// Partitioned Regions. Once this is added remove the check
// with PR region.
if (cQuery.isCqResultsCacheInitialized()) {
- b_cqResults_oldValue = cQuery.isPartOfCqResult(eventKey);
+ b_cqResults_oldValue =
+ (cQuery.isPartOfCqResult(eventKey) && !cQuery.isKeyDestroyed(eventKey));
// For PR if not found in cache, apply the query on old value.
// Also apply if the query was not executed during cq execute
if ((cQuery.isPR || !CqServiceImpl.EXECUTE_QUERY_DURING_INIT)
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQImpl.java
index e48ec2b..2f702ac 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQImpl.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQImpl.java
@@ -333,6 +333,14 @@
return serverCQResultsCache.isOldValueRequiredForQueryProcessing(key);
}
+ @Override
+ public boolean isKeyDestroyed(Object key) {
+ if (!serverCQResultsCache.contains(key)) {
+ return false;
+ }
+ return serverCQResultsCache.isKeyDestroyed(key);
+ }
+
/**
* Closes the Query. On Client side, sends the cq close request to server. On Server side, takes
* care of repository cleanup.
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCache.java b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCache.java
index 87c9693..02db2be 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCache.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCache.java
@@ -44,5 +44,7 @@
boolean isOldValueRequiredForQueryProcessing(Object key);
+ boolean isKeyDestroyed(Object key);
+
void clear();
}
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheNoOpImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheNoOpImpl.java
index 4f0f1cf..7248d5a 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheNoOpImpl.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheNoOpImpl.java
@@ -64,5 +64,10 @@
}
@Override
+ public boolean isKeyDestroyed(Object key) {
+ return false;
+ }
+
+ @Override
public void clear() {}
}
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCachePartitionRegionImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCachePartitionRegionImpl.java
index 882a923..8f7279c 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCachePartitionRegionImpl.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCachePartitionRegionImpl.java
@@ -127,6 +127,11 @@
}
@Override
+ public boolean isKeyDestroyed(Object key) {
+ return (cqResultKeys.get(key) == Token.DESTROYED);
+ }
+
+ @Override
public void clear() {
cqResultKeys.clear();
}
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheReplicateRegionImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheReplicateRegionImpl.java
index 3d987b8..dd66d03 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheReplicateRegionImpl.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheReplicateRegionImpl.java
@@ -167,6 +167,11 @@
}
@Override
+ public boolean isKeyDestroyed(Object key) {
+ return false;
+ }
+
+ @Override
public void clear() {
// Clean-up the CQ Results Cache.
synchronized (LOCK) {