[improve] Schema change does not require db and tbl consistency (#87)
* Optimizing schema changes
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
index c3fe987..bd685ef 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
@@ -25,6 +25,7 @@
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.sink.HttpGetWithEntity;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.StringUtils;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
@@ -56,14 +57,23 @@
private static final String OP_UPDATE = "u"; // update
private static final String OP_DELETE = "d"; // delete
- private static final String addDropDDLRegex = "ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+COLUMN\\s+([^\\s]+).*";
+ public static final String EXECUTE_DDL = "ALTER TABLE %s %s COLUMN %s %s"; //alter table tbl add cloumn aca int
+ private static final String addDropDDLRegex = "ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*";
private final Pattern addDropDDLPattern;
private DorisOptions dorisOptions;
private ObjectMapper objectMapper = new ObjectMapper();
+ private String database;
+ private String table;
+ //table name of the cdc upstream, format is db.tbl
+ private String sourceTableName;
- public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions, Pattern pattern) {
+ public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions, Pattern pattern, String sourceTableName) {
this.dorisOptions = dorisOptions;
this.addDropDDLPattern = pattern == null ? Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE) : pattern;
+ String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
+ this.database = tableInfo[0];
+ this.table = tableInfo[1];
+ this.sourceTableName = sourceTableName;
}
@Override
@@ -97,8 +107,16 @@
public boolean schemaChange(JsonNode recordRoot) {
boolean status = false;
try{
- boolean doSchemaChange = checkSchemaChange(recordRoot);
- status = doSchemaChange && execSchemaChange(recordRoot);
+ if(!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)){
+ return false;
+ }
+ String ddl = extractDDL(recordRoot);
+ if(StringUtils.isNullOrWhitespaceOnly(ddl)){
+ LOG.info("ddl can not do schema change:{}", recordRoot);
+ return false;
+ }
+ boolean doSchemaChange = checkSchemaChange(ddl);
+ status = doSchemaChange && execSchemaChange(ddl);
LOG.info("schema change status:{}", status);
}catch (Exception ex){
LOG.warn("schema change error :", ex);
@@ -106,6 +124,16 @@
return status;
}
+ /**
+ * When cdc synchronizes multiple tables, it will capture multiple table schema changes
+ */
+ protected boolean checkTable(JsonNode recordRoot) {
+ String db = extractDatabase(recordRoot);
+ String tbl = extractTable(recordRoot);
+ String dbTbl = db + "." + tbl;
+ return sourceTableName.equals(dbTbl);
+ }
+
private void addDeleteSign(Map<String, String> valueMap, boolean delete) {
if(delete){
valueMap.put(DORIS_DELETE_SIGN, "1");
@@ -114,11 +142,9 @@
}
}
- private boolean checkSchemaChange(JsonNode record) throws IOException {
- String database = extractDatabase(record);
- String table = extractTable(record);
+ private boolean checkSchemaChange(String ddl) throws IOException {
String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API, dorisOptions.getFenodes(), database, table);
- Map<String,Object> param = buildRequestParam(record);
+ Map<String,Object> param = buildRequestParam(ddl);
if(param.size() != 2){
return false;
}
@@ -139,27 +165,21 @@
* "columnName" : "column"
* }
*/
- private Map<String, Object> buildRequestParam(JsonNode record) throws JsonProcessingException {
+ protected Map<String, Object> buildRequestParam(String ddl) {
Map<String,Object> params = new HashMap<>();
- String ddl = extractDDL(record);
- if(ddl == null){
- return params;
- }
Matcher matcher = addDropDDLPattern.matcher(ddl);
if(matcher.find()){
String op = matcher.group(1);
- String col = matcher.group(2);
+ String col = matcher.group(3);
params.put("isDropColumn", op.equalsIgnoreCase("DROP"));
params.put("columnName", col);
}
return params;
}
- private boolean execSchemaChange(JsonNode record) throws IOException {
- String extractDDL = extractDDL(record);
+ private boolean execSchemaChange(String ddl) throws IOException {
Map<String, String> param = new HashMap<>();
- param.put("stmt", extractDDL);
- String database = extractDatabase(record);
+ param.put("stmt", ddl);
String requestUrl = String.format(SCHEMA_CHANGE_API, dorisOptions.getFenodes(), database);
HttpPost httpPost = new HttpPost(requestUrl);
httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
@@ -169,15 +189,20 @@
return success;
}
- private String extractDatabase(JsonNode record) {
- return extractJsonNode(record.get("source"), "db");
+ protected String extractDatabase(JsonNode record) {
+ if(record.get("source").has("schema")){
+ //compatible with schema
+ return extractJsonNode(record.get("source"), "schema");
+ }else{
+ return extractJsonNode(record.get("source"), "db");
+ }
}
- private String extractTable(JsonNode record) {
+ protected String extractTable(JsonNode record) {
return extractJsonNode(record.get("source"), "table");
}
- private boolean handleResponse(HttpUriRequest request) throws IOException {
+ private boolean handleResponse(HttpUriRequest request) {
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
CloseableHttpResponse response = httpclient.execute(request);
final int statusCode = response.getStatusLine().getStatusCode();
@@ -215,20 +240,26 @@
return recordMap != null ? recordMap : new HashMap<>();
}
- @VisibleForTesting
public String extractDDL(JsonNode record) throws JsonProcessingException {
String historyRecord = extractJsonNode(record, "historyRecord");
if (Objects.isNull(historyRecord)) {
return null;
}
String ddl = extractJsonNode(objectMapper.readTree(historyRecord), "ddl");
+ LOG.debug("received debezium ddl :{}", ddl);
if (!Objects.isNull(ddl)) {
//filter add/drop operation
- if (addDropDDLPattern.matcher(ddl).matches()) {
+ Matcher matcher = addDropDDLPattern.matcher(ddl);
+ if(matcher.find()){
+ String op = matcher.group(1);
+ String col = matcher.group(3);
+ String type = matcher.group(5);
+ type = type == null ? "" : type;
+ ddl = String.format(EXECUTE_DDL, dorisOptions.getTableIdentifier(), op, col, type);
+ LOG.info("parse ddl:{}", ddl);
return ddl;
}
}
- LOG.info("parse ddl:{}", ddl);
return null;
}
@@ -246,6 +277,7 @@
public static class Builder {
private DorisOptions dorisOptions;
private Pattern addDropDDLPattern;
+ private String sourceTableName;
public JsonDebeziumSchemaSerializer.Builder setDorisOptions(DorisOptions dorisOptions) {
this.dorisOptions = dorisOptions;
@@ -257,8 +289,13 @@
return this;
}
+ public JsonDebeziumSchemaSerializer.Builder setSourceTableName(String sourceTableName) {
+ this.sourceTableName = sourceTableName;
+ return this;
+ }
+
public JsonDebeziumSchemaSerializer build() {
- return new JsonDebeziumSchemaSerializer(dorisOptions, addDropDDLPattern);
+ return new JsonDebeziumSchemaSerializer(dorisOptions, addDropDDLPattern, sourceTableName);
}
}
}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
index 0d35ae3..ed5c37f 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
@@ -100,8 +100,8 @@
@Test
public void testExtractDDL() throws IOException {
- String srcDDL = "alter table t1 add \n column c_1 varchar(200)";
- String record = "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n column c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
+ String srcDDL = "ALTER TABLE test.t1 add COLUMN c_1 varchar(200)";
+ String record = "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
JsonNode recordRoot = objectMapper.readTree(record);
String ddl = serializer.extractDDL(recordRoot);
Assert.assertEquals(srcDDL, ddl);