CASSANALYTICS-91: Add extractCdcTables method to CqlUtils (#141)
Patch by Bernardo Botella; Reviewed by Francisco Guerrero, Yifan Cai for CASSANALYTICS-91
diff --git a/CHANGES.txt b/CHANGES.txt
index a3698b7..fd5e343 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.2.0
-----
+ * Add extractCdcTables method to CqlUtils (CASSANALYTICS-91)
* Create bridge modules for Cassandra 5.0 (CASSANALYTICS-84)
* Analytics job fails when source table has secondary indexes (CASSANALYTICS-86)
* Set KeyStore to be optional (CASSANALYTICS-69)
diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/CqlUtils.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/CqlUtils.java
index 6c8471a..e694db6 100644
--- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/CqlUtils.java
+++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/CqlUtils.java
@@ -22,6 +22,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -132,6 +133,25 @@
return cleaned.substring(matcher.start(0), matcher.end(0));
}
+ /**
+ * @param schemaStr full cluster schema text.
+ * @return map of keyspace/table identifier to table create statements.
+ */
+ public static Map<TableIdentifier, String> extractCdcTables(@NotNull final String schemaStr)
+ {
+ String cleaned = cleanCql(schemaStr);
+ Pattern pattern = Pattern.compile("CREATE TABLE \"?(\\w+)\"?\\.\"?(\\w+)\"?[^;]*cdc = true[^;]*;");
+ Matcher matcher = pattern.matcher(cleaned);
+ Map<TableIdentifier, String> createStmts = new HashMap<>();
+ while (matcher.find())
+ {
+ String keyspace = matcher.group(1);
+ String table = matcher.group(2);
+ createStmts.put(TableIdentifier.of(keyspace, table), extractCleanedTableSchema(cleaned, keyspace, table));
+ }
+ return createStmts;
+ }
+
public static ReplicationFactor extractReplicationFactor(@NotNull String schemaStr, @NotNull String keyspace)
{
String createKeyspaceSchema = extractKeyspaceSchema(schemaStr, keyspace);
diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/CqlUtilsTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/CqlUtilsTest.java
index 058ac21..17b7d3c 100644
--- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/CqlUtilsTest.java
+++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/CqlUtilsTest.java
@@ -772,6 +772,31 @@
Collections.singletonList("max_index_interval"))).contains("max_index_interval = 6");
}
+ @Test
+ public void testCdcExtractSchema()
+ {
+ String schema = "CREATE KEYSPACE ks1 WITH REPLICATION = { 'class' : 'org.apache.cassandra.locator.NetworkTopologyStrategy', " +
+ "'MS': '3', 'ST': '3' } AND DURABLE_WRITES = true;\\n\\n" +
+ "CREATE TABLE ks1.tb1 (\"a\" text, \"b\" text, \"c\" text, \"d\" text, e timestamp, " +
+ "f uuid, g blob, PRIMARY KEY ((\"a\", \"b\"), \"c\", \"d\")) WITH CLUSTERING ORDER BY (\"c\" DESC, \"d\" ASC) " +
+ "AND cdc = true;\n\n" +
+ "CREATE TABLE ks1.tb2 (a text, b text, c text, d text, PRIMARY KEY ((a), b, c)) WITH " +
+ "CLUSTERING ORDER BY (b DESC, c ASC) AND cdc = true;\n\n" +
+ "CREATE KEYSPACE ks2 WITH REPLICATION = { 'class' : 'org.apache.cassandra.locator.NetworkTopologyStrategy', " +
+ "'MS': '3', 'ST': '3' } AND DURABLE_WRITES = true;\\n\\n" +
+ "CREATE TABLE ks2.tb3 (a text, b text, c text, d text, PRIMARY KEY ((a), b, c)) WITH " +
+ "CLUSTERING ORDER BY (b DESC, c ASC);\n\n" +
+ "CREATE TABLE ks2.tb4 (a bigint, b int, c uuid, d text, PRIMARY KEY (a)) WITH cdc = true;\n\n";
+ Map<TableIdentifier, String> createStmts = CqlUtils.extractCdcTables(schema);
+
+ assertThat(3).isEqualTo(createStmts.size());
+ assertThat(createStmts.containsKey(TableIdentifier.of("ks1", "tb1"))).isTrue();
+ assertThat(createStmts.containsKey(TableIdentifier.of("ks1", "tb2"))).isTrue();
+ assertThat(createStmts.containsKey(TableIdentifier.of("ks2", "tb3"))).isFalse();
+ assertThat(createStmts.containsKey(TableIdentifier.of("ks2", "tb4"))).isTrue();
+ }
+
+
private static String loadFullSchemaSample() throws IOException
{
Path fullSchemaSampleFile = ResourceUtils.writeResourceToPath(CqlUtilsTest.class.getClassLoader(), tempPath, "cql/fullSchema.cql");