CASSANALYTICS-91: Add extractCdcTables method to CqlUtils (#141)

Patch by Bernardo Botella; Reviewed by Francisco Guerrero, Yifan Cai for CASSANALYTICS-91
diff --git a/CHANGES.txt b/CHANGES.txt
index a3698b7..fd5e343 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.2.0
 -----
+ * Add extractCdcTables method to CqlUtils (CASSANALYTICS-91)
  * Create bridge modules for Cassandra 5.0 (CASSANALYTICS-84)
  * Analytics job fails when source table has secondary indexes (CASSANALYTICS-86)
  * Set KeyStore to be optional (CASSANALYTICS-69)
diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/CqlUtils.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/CqlUtils.java
index 6c8471a..e694db6 100644
--- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/CqlUtils.java
+++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/CqlUtils.java
@@ -22,6 +22,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -132,6 +133,25 @@
         return cleaned.substring(matcher.start(0), matcher.end(0));
     }
 
+    /**
+     * @param schemaStr full cluster schema text.
+     * @return map of keyspace/table identifier to table create statements.
+     */
+    public static Map<TableIdentifier, String> extractCdcTables(@NotNull final String schemaStr)
+    {
+        String cleaned = cleanCql(schemaStr);
+        Pattern pattern = Pattern.compile("CREATE TABLE \"?(\\w+)\"?\\.\"?(\\w+)\"?[^;]*cdc = true[^;]*;");
+        Matcher matcher = pattern.matcher(cleaned);
+        Map<TableIdentifier, String> createStmts = new HashMap<>();
+        while (matcher.find())
+        {
+            String keyspace = matcher.group(1);
+            String table = matcher.group(2);
+            createStmts.put(TableIdentifier.of(keyspace, table), extractCleanedTableSchema(cleaned, keyspace, table));
+        }
+        return createStmts;
+    }
+
     public static ReplicationFactor extractReplicationFactor(@NotNull String schemaStr, @NotNull String keyspace)
     {
         String createKeyspaceSchema = extractKeyspaceSchema(schemaStr, keyspace);
diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/CqlUtilsTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/CqlUtilsTest.java
index 058ac21..17b7d3c 100644
--- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/CqlUtilsTest.java
+++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/CqlUtilsTest.java
@@ -772,6 +772,31 @@
                                                       Collections.singletonList("max_index_interval"))).contains("max_index_interval = 6");
     }
 
+    @Test
+    public void testCdcExtractSchema()
+    {
+        String schema = "CREATE KEYSPACE ks1 WITH REPLICATION = { 'class' : 'org.apache.cassandra.locator.NetworkTopologyStrategy', " +
+                        "'MS': '3', 'ST': '3' } AND DURABLE_WRITES = true;\\n\\n" +
+                              "CREATE TABLE ks1.tb1 (\"a\" text, \"b\" text, \"c\" text, \"d\" text, e timestamp, " +
+                        "f uuid, g blob, PRIMARY KEY ((\"a\", \"b\"), \"c\", \"d\")) WITH CLUSTERING ORDER BY (\"c\" DESC, \"d\" ASC) " +
+                        "AND cdc = true;\n\n" +
+                              "CREATE TABLE ks1.tb2 (a text, b text, c text, d text, PRIMARY KEY ((a), b, c)) WITH " +
+                        "CLUSTERING ORDER BY (b DESC, c ASC) AND cdc = true;\n\n" +
+                              "CREATE KEYSPACE ks2 WITH REPLICATION = { 'class' : 'org.apache.cassandra.locator.NetworkTopologyStrategy', " +
+                        "'MS': '3', 'ST': '3' } AND DURABLE_WRITES = true;\\n\\n" +
+                              "CREATE TABLE ks2.tb3 (a text, b text, c text, d text, PRIMARY KEY ((a), b, c)) WITH " +
+                        "CLUSTERING ORDER BY (b DESC, c ASC);\n\n" +
+                              "CREATE TABLE ks2.tb4 (a bigint, b int, c uuid, d text, PRIMARY KEY (a)) WITH cdc = true;\n\n";
+        Map<TableIdentifier, String> createStmts = CqlUtils.extractCdcTables(schema);
+
+        assertThat(3).isEqualTo(createStmts.size());
+        assertThat(createStmts.containsKey(TableIdentifier.of("ks1", "tb1"))).isTrue();
+        assertThat(createStmts.containsKey(TableIdentifier.of("ks1", "tb2"))).isTrue();
+        assertThat(createStmts.containsKey(TableIdentifier.of("ks2", "tb3"))).isFalse();
+        assertThat(createStmts.containsKey(TableIdentifier.of("ks2", "tb4"))).isTrue();
+    }
+
+
     private static String loadFullSchemaSample() throws IOException
     {
         Path fullSchemaSampleFile = ResourceUtils.writeResourceToPath(CqlUtilsTest.class.getClassLoader(), tempPath, "cql/fullSchema.cql");