PHOENIX-7658 CDC event for TTL_DELETE to exclude pre-image if PRE scope is not selected (#2215)
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCChangeBuilder.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCChangeBuilder.java
index 101d0f9..e1a3c16 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCChangeBuilder.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCChangeBuilder.java
@@ -149,4 +149,8 @@
return (cell.getTimestamp() < changeTimestamp &&
cell.getTimestamp() > lastDeletedTimestamp) ? true : false;
}
+
+ public boolean isPreImageInScope() {
+ return isPreImageInScope;
+ }
}
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCCompactionUtil.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCCompactionUtil.java
index 78fd936..efc1bba 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCCompactionUtil.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCCompactionUtil.java
@@ -47,7 +47,6 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -130,7 +129,6 @@
}
}
cdcEvent.put(QueryConstants.CDC_PRE_IMAGE, preImage);
- cdcEvent.put(QueryConstants.CDC_POST_IMAGE, Collections.emptyMap());
return cdcEvent;
}
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
index 6cede61..cd47f30 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
@@ -54,6 +54,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import static org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.CDC_DATA_TABLE_DEF;
import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
@@ -320,9 +321,10 @@
* @param indexCell The primary index cell
* @param result The result list to populate
* @return true if event was processed successfully
+ * @throws IOException If error is encountered while handling built-in image data.
*/
private boolean handlePreImageCDCEvent(List<Cell> indexRow, byte[] indexRowKey,
- Cell indexCell, List<Cell> result) {
+ Cell indexCell, List<Cell> result) throws IOException {
Cell cdcDataCell = null;
for (Cell cell : indexRow) {
if (Bytes.equals(cell.getQualifierArray(), cell.getQualifierOffset(),
@@ -337,6 +339,12 @@
return false;
}
byte[] cdcEventBytes = CellUtil.cloneValue(cdcDataCell);
+ if (!this.changeBuilder.isPreImageInScope()) {
+ Map<String, Object> cdcJson =
+ JacksonUtil.getObjectReader(HashMap.class).readValue(cdcEventBytes);
+ cdcJson.remove(QueryConstants.CDC_PRE_IMAGE);
+ cdcEventBytes = JacksonUtil.getObjectWriter(HashMap.class).writeValueAsBytes(cdcJson);
+ }
Result cdcRow = createCDCResult(indexRowKey, indexCell, cdcDataCell.getTimestamp(),
cdcEventBytes);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson3IT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson3IT.java
index 8b55334..fb04899 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson3IT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson3IT.java
@@ -2203,6 +2203,25 @@
assertEquals("TTL delete pre-image should match last post-image for " + pk,
lastPostImage, ttlPreImage);
}
+
+ ttlDeleteQuery = "SELECT /*+ CDC_INCLUDE(POST) */ * FROM " + cdcName +
+ " WHERE PHOENIX_ROW_TIMESTAMP() > ?";
+
+ int nonePreImages = 0;
+ try (PreparedStatement pst = conn.prepareStatement(ttlDeleteQuery)) {
+ pst.setTimestamp(1, beforeTTLTimestamp);
+ try (ResultSet ttlRs = pst.executeQuery()) {
+ while (ttlRs.next()) {
+ String cdcVal = ttlRs.getString(3);
+ Map<String, Object> cdcEvent = OBJECT_MAPPER.readValue(cdcVal, HashMap.class);
+ assertEquals(CDC_TTL_DELETE_EVENT_TYPE, cdcEvent.get(CDC_EVENT_TYPE));
+ assertNull("Pre-image should not be present", cdcEvent.get(CDC_PRE_IMAGE));
+ nonePreImages++;
+ }
+ }
+ }
+ assertEquals("Total num of TTL_DELETE events without pre-image should be 3 but it is " +
+ nonePreImages, 3, nonePreImages);
} finally {
EnvironmentEdgeManager.reset();
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java
index 8e58237..6815d52 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java
@@ -501,9 +501,8 @@
assertFalse("TTL_DELETE events should have non-empty pre-image",
preImage.isEmpty());
- Map<String, Object> postImage =
- (Map<String, Object>) map.get(QueryConstants.CDC_POST_IMAGE);
- assertTrue("TTL_DELETE events should have empty post-image", postImage.isEmpty());
+ assertNull("TTL_DELETE events should have empty post-image",
+ map.get(QueryConstants.CDC_POST_IMAGE));
// TTL delete pre-image should match previous upsert post-image
assertEquals("TTL_DELETE pre-image should match original insert post-image",
@@ -600,9 +599,8 @@
assertFalse("Second TTL_DELETE should have non-empty pre-image",
preImage.isEmpty());
- Map<String, Object> postImage =
- (Map<String, Object>) map.get(QueryConstants.CDC_POST_IMAGE);
- assertTrue("Second TTL_DELETE should have empty post-image", postImage.isEmpty());
+ assertNull("TTL_DELETE events should have empty post-image",
+ map.get(QueryConstants.CDC_POST_IMAGE));
assertEquals("Second TTL_DELETE pre-image should match resurrection post-image",
postImageList.get(i), preImage);