[Improve]Separate the record and schema-change in JsonDebeziumSchemaSerializer (#279)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/ChangeEvent.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/ChangeEvent.java
new file mode 100644
index 0000000..8d7c5cc
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/ChangeEvent.java
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer;
+
+import java.io.Serializable;
+
+/**
+ * represents the change events of external systems, including data change and schema change event.
+ */
+public interface ChangeEvent extends Serializable {}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
index cdbd482..370bca7 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
@@ -18,88 +18,56 @@
package org.apache.doris.flink.sink.writer.serializer;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.CollectionUtil;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.StringUtils;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.NullNode;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.doris.flink.catalog.doris.FieldSchema;
-import org.apache.doris.flink.catalog.doris.TableSchema;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
-import org.apache.doris.flink.exception.IllegalArgumentException;
-import org.apache.doris.flink.sink.schema.SchemaChangeHelper;
-import org.apache.doris.flink.sink.schema.SchemaChangeHelper.DDLSchema;
-import org.apache.doris.flink.sink.schema.SchemaChangeManager;
-import org.apache.doris.flink.sink.writer.EventType;
-import org.apache.doris.flink.tools.cdc.SourceConnector;
-import org.apache.doris.flink.tools.cdc.SourceSchema;
-import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
-import org.apache.doris.flink.tools.cdc.oracle.OracleType;
-import org.apache.doris.flink.tools.cdc.postgres.PostgresType;
-import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType;
+import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeContext;
+import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumDataChange;
+import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChange;
+import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChangeImpl;
+import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChangeImplV2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Set;
-import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import static org.apache.doris.flink.sink.util.DeleteOperation.addDeleteSign;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
+/**
+ * Serialize the records of the upstream data source into a data form that can be recognized by
+ * downstream doris.
+ *
+ * <p>There are two serialization methods here: <br>
+ * 1. data change{@link JsonDebeziumDataChange} record. <br>
+ * 2. schema change{@link JsonDebeziumSchemaChange} records.
+ */
public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<String> {
private static final Logger LOG = LoggerFactory.getLogger(JsonDebeziumSchemaSerializer.class);
- private static final String OP_READ = "r"; // snapshot read
- private static final String OP_CREATE = "c"; // insert
- private static final String OP_UPDATE = "u"; // update
- private static final String OP_DELETE = "d"; // delete
- public static final String EXECUTE_DDL =
- "ALTER TABLE %s %s COLUMN %s %s"; // alter table tbl add cloumn aca int
- private static final String addDropDDLRegex =
- "ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*";
- private static final Pattern renameDDLPattern =
- Pattern.compile(
- "ALTER\\s+TABLE\\s+(\\w+)\\s+RENAME\\s+COLUMN\\s+(\\w+)\\s+TO\\s+(\\w+)",
- Pattern.CASE_INSENSITIVE);
- private final Pattern addDropDDLPattern;
- private DorisOptions dorisOptions;
- private ObjectMapper objectMapper = new ObjectMapper();
- private String database;
- private String table;
+ private final Pattern pattern;
+ private final DorisOptions dorisOptions;
+ private final ObjectMapper objectMapper = new ObjectMapper();
// table name of the cdc upstream, format is db.tbl
- private String sourceTableName;
+ private final String sourceTableName;
private boolean firstLoad;
- private boolean firstSchemaChange;
- private Map<String, FieldSchema> originFieldSchemaMap;
private final boolean newSchemaChange;
private String lineDelimiter = LINE_DELIMITER_DEFAULT;
private boolean ignoreUpdateBefore = true;
- private SourceConnector sourceConnector;
- private SchemaChangeManager schemaChangeManager;
// <cdc db.schema.table, doris db.table>
private Map<String, String> tableMapping;
// create table properties
private Map<String, String> tableProperties;
private String targetDatabase;
+ private JsonDebeziumDataChange dataChange;
+ private JsonDebeziumSchemaChange schemaChange;
public JsonDebeziumSchemaSerializer(
DorisOptions dorisOptions,
@@ -107,15 +75,7 @@
String sourceTableName,
boolean newSchemaChange) {
this.dorisOptions = dorisOptions;
- this.addDropDDLPattern =
- pattern == null
- ? Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE)
- : pattern;
- if (!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())) {
- String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
- this.database = tableInfo[0];
- this.table = tableInfo[1];
- }
+ this.pattern = pattern;
this.sourceTableName = sourceTableName;
// Prevent loss of decimal data precision
this.objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
@@ -123,8 +83,6 @@
this.objectMapper.setNodeFactory(jsonNodeFactory);
this.newSchemaChange = newSchemaChange;
this.firstLoad = true;
- this.firstSchemaChange = true;
- this.schemaChangeManager = new SchemaChangeManager(dorisOptions);
}
public JsonDebeziumSchemaSerializer(
@@ -156,6 +114,27 @@
this.tableMapping = tableMapping;
this.tableProperties = tableProperties;
this.targetDatabase = targetDatabase;
+ init();
+ }
+
+ private void init() {
+ JsonDebeziumChangeContext changeContext =
+ new JsonDebeziumChangeContext(
+ dorisOptions,
+ tableMapping,
+ sourceTableName,
+ targetDatabase,
+ tableProperties,
+ objectMapper,
+ pattern,
+ lineDelimiter,
+ ignoreUpdateBefore);
+
+ this.schemaChange =
+ newSchemaChange
+ ? new JsonDebeziumSchemaChangeImplV2(changeContext)
+ : new JsonDebeziumSchemaChangeImpl(changeContext);
+ this.dataChange = new JsonDebeziumDataChange(changeContext);
}
@Override
@@ -165,360 +144,15 @@
String op = extractJsonNode(recordRoot, "op");
if (Objects.isNull(op)) {
// schema change ddl
- if (newSchemaChange) {
- schemaChangeV2(recordRoot);
- } else {
- schemaChange(recordRoot);
- }
+ schemaChange.schemaChange(recordRoot);
return null;
}
- if (newSchemaChange && firstLoad) {
- initOriginFieldSchema(recordRoot);
+ if (firstLoad) {
+ schemaChange.init(recordRoot);
+ firstLoad = false;
}
-
- // Filter out table records that are not in tableMapping
- String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
- String dorisTableIdentifier = getDorisTableIdentifier(cdcTableIdentifier);
- if (StringUtils.isNullOrWhitespaceOnly(dorisTableIdentifier)) {
- LOG.warn(
- "filter table {}, because it is not listened, record detail is {}",
- cdcTableIdentifier,
- record);
- return null;
- }
-
- Map<String, Object> valueMap;
- switch (op) {
- case OP_READ:
- case OP_CREATE:
- valueMap = extractAfterRow(recordRoot);
- addDeleteSign(valueMap, false);
- break;
- case OP_UPDATE:
- return DorisRecord.of(dorisTableIdentifier, extractUpdate(recordRoot));
- case OP_DELETE:
- valueMap = extractBeforeRow(recordRoot);
- addDeleteSign(valueMap, true);
- break;
- default:
- LOG.error("parse record fail, unknown op {} in {}", op, record);
- return null;
- }
-
- return DorisRecord.of(
- dorisTableIdentifier,
- objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8));
- }
-
- /**
- * Change the update event into two.
- *
- * @param recordRoot
- * @return
- */
- private byte[] extractUpdate(JsonNode recordRoot) throws JsonProcessingException {
- StringBuilder updateRow = new StringBuilder();
- if (!ignoreUpdateBefore) {
- // convert delete
- Map<String, Object> beforeRow = extractBeforeRow(recordRoot);
- addDeleteSign(beforeRow, true);
- updateRow.append(objectMapper.writeValueAsString(beforeRow)).append(this.lineDelimiter);
- }
-
- // convert insert
- Map<String, Object> afterRow = extractAfterRow(recordRoot);
- addDeleteSign(afterRow, false);
- updateRow.append(objectMapper.writeValueAsString(afterRow));
- return updateRow.toString().getBytes(StandardCharsets.UTF_8);
- }
-
- public boolean schemaChangeV2(JsonNode recordRoot) {
- boolean status = false;
- try {
- if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)) {
- return false;
- }
-
- EventType eventType = extractEventType(recordRoot);
- if (eventType == null) {
- return false;
- }
- if (eventType.equals(EventType.CREATE)) {
- TableSchema tableSchema = extractCreateTableSchema(recordRoot);
- status = schemaChangeManager.createTable(tableSchema);
- if (status) {
- String cdcTbl = getCdcTableIdentifier(recordRoot);
- String dorisTbl = getCreateTableIdentifier(recordRoot);
- tableMapping.put(cdcTbl, dorisTbl);
- LOG.info("create table ddl status: {}", status);
- }
- } else if (eventType.equals(EventType.ALTER)) {
- // db,table
- Tuple2<String, String> tuple = getDorisTableTuple(recordRoot);
- if (tuple == null) {
- return false;
- }
- List<String> ddlSqlList = extractDDLList(recordRoot);
- if (CollectionUtils.isEmpty(ddlSqlList)) {
- LOG.info("ddl can not do schema change:{}", recordRoot);
- return false;
- }
- List<DDLSchema> ddlSchemas = SchemaChangeHelper.getDdlSchemas();
- for (int i = 0; i < ddlSqlList.size(); i++) {
- DDLSchema ddlSchema = ddlSchemas.get(i);
- String ddlSql = ddlSqlList.get(i);
- boolean doSchemaChange = checkSchemaChange(tuple.f0, tuple.f1, ddlSchema);
- status = doSchemaChange && schemaChangeManager.execute(ddlSql, tuple.f0);
- LOG.info("schema change status:{}, ddl:{}", status, ddlSql);
- }
- } else {
- LOG.info("Unsupported event type {}", eventType);
- }
- } catch (Exception ex) {
- LOG.warn("schema change error :", ex);
- }
- return status;
- }
-
- protected JsonNode extractTableChange(JsonNode record) throws JsonProcessingException {
- JsonNode historyRecord = extractHistoryRecord(record);
- JsonNode tableChanges = historyRecord.get("tableChanges");
- if (!Objects.isNull(tableChanges)) {
- JsonNode tableChange = tableChanges.get(0);
- return tableChange;
- }
- return null;
- }
-
- /** Parse Alter Event. */
- @VisibleForTesting
- public List<String> extractDDLList(JsonNode record) throws IOException {
- String dorisTable = getDorisTableIdentifier(record);
- JsonNode historyRecord = extractHistoryRecord(record);
- String ddl = extractJsonNode(historyRecord, "ddl");
- JsonNode tableChange = extractTableChange(record);
- if (Objects.isNull(tableChange) || Objects.isNull(ddl)) {
- return null;
- }
-
- JsonNode columns = tableChange.get("table").get("columns");
- if (firstSchemaChange) {
- sourceConnector =
- SourceConnector.valueOf(
- record.get("source").get("connector").asText().toUpperCase());
- fillOriginSchema(columns);
- }
-
- // rename ddl
- Matcher renameMatcher = renameDDLPattern.matcher(ddl);
- if (renameMatcher.find()) {
- String oldColumnName = renameMatcher.group(2);
- String newColumnName = renameMatcher.group(3);
- return SchemaChangeHelper.generateRenameDDLSql(
- dorisTable, oldColumnName, newColumnName, originFieldSchemaMap);
- }
-
- // add/drop ddl
- Map<String, FieldSchema> updateFiledSchema = new LinkedHashMap<>();
- for (JsonNode column : columns) {
- buildFieldSchema(updateFiledSchema, column);
- }
- SchemaChangeHelper.compareSchema(updateFiledSchema, originFieldSchemaMap);
- // In order to avoid other source table column change operations other than add/drop/rename,
- // which may lead to the accidental deletion of the doris column.
- Matcher matcher = addDropDDLPattern.matcher(ddl);
- if (!matcher.find()) {
- return null;
- }
- return SchemaChangeHelper.generateDDLSql(dorisTable);
- }
-
- @VisibleForTesting
- public TableSchema extractCreateTableSchema(JsonNode record) throws JsonProcessingException {
- if (sourceConnector == null) {
- sourceConnector =
- SourceConnector.valueOf(
- record.get("source").get("connector").asText().toUpperCase());
- }
-
- String dorisTable = getCreateTableIdentifier(record);
- JsonNode tableChange = extractTableChange(record);
- JsonNode pkColumns = tableChange.get("table").get("primaryKeyColumnNames");
- JsonNode columns = tableChange.get("table").get("columns");
- JsonNode comment = tableChange.get("table").get("comment");
- String tblComment = comment == null ? "" : comment.asText();
- Map<String, FieldSchema> field = new LinkedHashMap<>();
- for (JsonNode column : columns) {
- buildFieldSchema(field, column);
- }
- List<String> pkList = new ArrayList<>();
- for (JsonNode column : pkColumns) {
- String fieldName = column.asText();
- pkList.add(fieldName);
- }
-
- TableSchema tableSchema = new TableSchema();
- tableSchema.setFields(field);
- tableSchema.setKeys(pkList);
- tableSchema.setDistributeKeys(buildDistributeKeys(pkList, field));
- tableSchema.setTableComment(tblComment);
- tableSchema.setProperties(tableProperties);
-
- String[] split = dorisTable.split("\\.");
- Preconditions.checkArgument(split.length == 2);
- tableSchema.setDatabase(split[0]);
- tableSchema.setTable(split[1]);
- return tableSchema;
- }
-
- private List<String> buildDistributeKeys(
- List<String> primaryKeys, Map<String, FieldSchema> fields) {
- if (!CollectionUtil.isNullOrEmpty(primaryKeys)) {
- return primaryKeys;
- }
- if (!fields.isEmpty()) {
- Map.Entry<String, FieldSchema> firstField = fields.entrySet().iterator().next();
- return Collections.singletonList(firstField.getKey());
- }
- return new ArrayList<>();
- }
-
- @VisibleForTesting
- public void setOriginFieldSchemaMap(Map<String, FieldSchema> originFieldSchemaMap) {
- this.originFieldSchemaMap = originFieldSchemaMap;
- }
-
- @VisibleForTesting
- public boolean schemaChange(JsonNode recordRoot) {
- boolean status = false;
- try {
- if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)) {
- return false;
- }
- // db,table
- Tuple2<String, String> tuple = getDorisTableTuple(recordRoot);
- if (tuple == null) {
- return false;
- }
-
- String ddl = extractDDL(recordRoot);
- if (StringUtils.isNullOrWhitespaceOnly(ddl)) {
- LOG.info("ddl can not do schema change:{}", recordRoot);
- return false;
- }
-
- boolean doSchemaChange = checkSchemaChange(tuple.f0, tuple.f1, ddl);
- status = doSchemaChange && schemaChangeManager.execute(ddl, tuple.f0);
- LOG.info("schema change status:{}", status);
- } catch (Exception ex) {
- LOG.warn("schema change error :", ex);
- }
- return status;
- }
-
- /** When cdc synchronizes multiple tables, it will capture multiple table schema changes. */
- protected boolean checkTable(JsonNode recordRoot) {
- String db = extractDatabase(recordRoot);
- String tbl = extractTable(recordRoot);
- String dbTbl = db + "." + tbl;
- return sourceTableName.equals(dbTbl);
- }
-
- public String getCdcTableIdentifier(JsonNode record) {
- String db = extractJsonNode(record.get("source"), "db");
- String schema = extractJsonNode(record.get("source"), "schema");
- String table = extractJsonNode(record.get("source"), "table");
- return SourceSchema.getString(db, schema, table);
- }
-
- public String getCreateTableIdentifier(JsonNode record) {
- String table = extractJsonNode(record.get("source"), "table");
- return targetDatabase + "." + table;
- }
-
- public String getDorisTableIdentifier(String cdcTableIdentifier) {
- if (!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())) {
- return dorisOptions.getTableIdentifier();
- }
- if (!CollectionUtil.isNullOrEmpty(tableMapping)
- && !StringUtils.isNullOrWhitespaceOnly(cdcTableIdentifier)
- && tableMapping.get(cdcTableIdentifier) != null) {
- return tableMapping.get(cdcTableIdentifier);
- }
- return null;
- }
-
- protected String getDorisTableIdentifier(JsonNode record) {
- String identifier = getCdcTableIdentifier(record);
- return getDorisTableIdentifier(identifier);
- }
-
- protected Tuple2<String, String> getDorisTableTuple(JsonNode record) {
- String identifier = getDorisTableIdentifier(record);
- if (StringUtils.isNullOrWhitespaceOnly(identifier)) {
- return null;
- }
- String[] tableInfo = identifier.split("\\.");
- if (tableInfo.length != 2) {
- return null;
- }
- return Tuple2.of(tableInfo[0], tableInfo[1]);
- }
-
- private boolean checkSchemaChange(String database, String table, String ddl)
- throws IOException, IllegalArgumentException {
- Map<String, Object> param = buildRequestParam(ddl);
- return schemaChangeManager.checkSchemaChange(database, table, param);
- }
-
- private boolean checkSchemaChange(String database, String table, DDLSchema ddlSchema)
- throws IOException, IllegalArgumentException {
- Map<String, Object> param =
- SchemaChangeManager.buildRequestParam(
- ddlSchema.isDropColumn(), ddlSchema.getColumnName());
- return schemaChangeManager.checkSchemaChange(database, table, param);
- }
-
- /** Build param { "isDropColumn": true, "columnName" : "column" }. */
- protected Map<String, Object> buildRequestParam(String ddl) {
- Map<String, Object> params = new HashMap<>();
- Matcher matcher = addDropDDLPattern.matcher(ddl);
- if (matcher.find()) {
- String op = matcher.group(1);
- String col = matcher.group(3);
- params.put("isDropColumn", op.equalsIgnoreCase("DROP"));
- params.put("columnName", col);
- }
- return params;
- }
-
- protected String extractDatabase(JsonNode record) {
- if (record.get("source").has("schema")) {
- // compatible with schema
- return extractJsonNode(record.get("source"), "schema");
- } else {
- return extractJsonNode(record.get("source"), "db");
- }
- }
-
- protected String extractTable(JsonNode record) {
- return extractJsonNode(record.get("source"), "table");
- }
-
- /** Parse event type. */
- protected EventType extractEventType(JsonNode record) throws JsonProcessingException {
- JsonNode tableChange = extractTableChange(record);
- if (tableChange == null || tableChange.get("type") == null) {
- return null;
- }
- String type = tableChange.get("type").asText();
- if (EventType.ALTER.toString().equalsIgnoreCase(type)) {
- return EventType.ALTER;
- } else if (EventType.CREATE.toString().equalsIgnoreCase(type)) {
- return EventType.CREATE;
- }
- return null;
+ return dataChange.serialize(record, recordRoot, op);
}
private String extractJsonNode(JsonNode record, String key) {
@@ -527,154 +161,9 @@
: null;
}
- private Map<String, Object> extractBeforeRow(JsonNode record) {
- return extractRow(record.get("before"));
- }
-
- private Map<String, Object> extractAfterRow(JsonNode record) {
- return extractRow(record.get("after"));
- }
-
- private Map<String, Object> extractRow(JsonNode recordRow) {
- Map<String, Object> recordMap =
- objectMapper.convertValue(recordRow, new TypeReference<Map<String, Object>>() {});
- return recordMap != null ? recordMap : new HashMap<>();
- }
-
- private JsonNode extractHistoryRecord(JsonNode record) throws JsonProcessingException {
- if (record != null && record.has("historyRecord")) {
- return objectMapper.readTree(record.get("historyRecord").asText());
- }
- // The ddl passed by some scenes will not be included in the historyRecord, such as
- // DebeziumSourceFunction
- return record;
- }
-
- public String extractDDL(JsonNode record) throws JsonProcessingException {
- JsonNode historyRecord = extractHistoryRecord(record);
- String ddl = extractJsonNode(historyRecord, "ddl");
- LOG.debug("received debezium ddl :{}", ddl);
- if (!Objects.isNull(ddl)) {
- // filter add/drop operation
- Matcher matcher = addDropDDLPattern.matcher(ddl);
- if (matcher.find()) {
- String op = matcher.group(1);
- String col = matcher.group(3);
- String type = matcher.group(5);
- type = handleType(type);
- ddl = String.format(EXECUTE_DDL, getDorisTableIdentifier(record), op, col, type);
- LOG.info("parse ddl:{}", ddl);
- return ddl;
- }
- }
- return null;
- }
-
@VisibleForTesting
- public void fillOriginSchema(JsonNode columns) {
- if (Objects.nonNull(originFieldSchemaMap)) {
- for (JsonNode column : columns) {
- String fieldName = column.get("name").asText();
- if (originFieldSchemaMap.containsKey(fieldName)) {
- String dorisTypeName = buildDorisTypeName(column);
- String defaultValue =
- handleDefaultValue(extractJsonNode(column, "defaultValueExpression"));
- String comment = extractJsonNode(column, "comment");
- FieldSchema fieldSchema = originFieldSchemaMap.get(fieldName);
- fieldSchema.setName(fieldName);
- fieldSchema.setTypeString(dorisTypeName);
- fieldSchema.setComment(comment);
- fieldSchema.setDefaultValue(defaultValue);
- }
- }
- } else {
- LOG.error(
- "Current schema change failed! You need to ensure that "
- + "there is data in the table."
- + dorisOptions.getTableIdentifier());
- originFieldSchemaMap = new LinkedHashMap<>();
- columns.forEach(column -> buildFieldSchema(originFieldSchemaMap, column));
- }
- firstSchemaChange = false;
- firstLoad = false;
- }
-
- private void buildFieldSchema(Map<String, FieldSchema> filedSchemaMap, JsonNode column) {
- String fieldName = column.get("name").asText();
- String dorisTypeName = buildDorisTypeName(column);
- String defaultValue = handleDefaultValue(extractJsonNode(column, "defaultValueExpression"));
- String comment = extractJsonNode(column, "comment");
- filedSchemaMap.put(
- fieldName, new FieldSchema(fieldName, dorisTypeName, defaultValue, comment));
- }
-
- @VisibleForTesting
- public String buildDorisTypeName(JsonNode column) {
- int length = column.get("length") == null ? 0 : column.get("length").asInt();
- int scale = column.get("scale") == null ? 0 : column.get("scale").asInt();
- String sourceTypeName = column.get("typeName").asText();
- String dorisTypeName;
- switch (sourceConnector) {
- case MYSQL:
- dorisTypeName = MysqlType.toDorisType(sourceTypeName, length, scale);
- break;
- case ORACLE:
- dorisTypeName = OracleType.toDorisType(sourceTypeName, length, scale);
- break;
- case POSTGRES:
- dorisTypeName = PostgresType.toDorisType(sourceTypeName, length, scale);
- break;
- case SQLSERVER:
- dorisTypeName = SqlServerType.toDorisType(sourceTypeName, length, scale);
- break;
- default:
- String errMsg = "Not support " + sourceTypeName + " schema change.";
- throw new UnsupportedOperationException(errMsg);
- }
- return dorisTypeName;
- }
-
- private String handleDefaultValue(String defaultValue) {
- if (StringUtils.isNullOrWhitespaceOnly(defaultValue)) {
- return null;
- }
- // Due to historical reasons, doris needs to add quotes to the default value of the new
- // column
- // For example in mysql: alter table add column c1 int default 100
- // In Doris: alter table add column c1 int default '100'
- if (Pattern.matches("['\"].*?['\"]", defaultValue)) {
- return defaultValue;
- } else if (defaultValue.equals("1970-01-01 00:00:00")) {
- // TODO: The default value of setting the current time in CDC is 1970-01-01 00:00:00
- return "current_timestamp";
- }
- return "'" + defaultValue + "'";
- }
-
- private void initOriginFieldSchema(JsonNode recordRoot) {
- originFieldSchemaMap = new LinkedHashMap<>();
- Set<String> columnNameSet = extractAfterRow(recordRoot).keySet();
- if (CollectionUtils.isEmpty(columnNameSet)) {
- columnNameSet = extractBeforeRow(recordRoot).keySet();
- }
- columnNameSet.forEach(
- columnName -> originFieldSchemaMap.put(columnName, new FieldSchema()));
- firstLoad = false;
- }
-
- @VisibleForTesting
- public Map<String, FieldSchema> getOriginFieldSchemaMap() {
- return originFieldSchemaMap;
- }
-
- @VisibleForTesting
- public void setSourceConnector(String sourceConnector) {
- this.sourceConnector = SourceConnector.valueOf(sourceConnector.toUpperCase());
- }
-
- @VisibleForTesting
- public void setTableMapping(Map<String, String> tableMapping) {
- this.tableMapping = tableMapping;
+ public JsonDebeziumSchemaChange getJsonDebeziumSchemaChange() {
+ return this.schemaChange;
}
public static JsonDebeziumSchemaSerializer.Builder builder() {
@@ -744,21 +233,4 @@
targetDatabase);
}
}
-
- private String handleType(String type) {
-
- if (type == null || "".equals(type)) {
- return "";
- }
-
- // varchar len * 3
- Pattern pattern = Pattern.compile("varchar\\(([1-9][0-9]*)\\)", Pattern.CASE_INSENSITIVE);
- Matcher matcher = pattern.matcher(type);
- if (matcher.find()) {
- String len = matcher.group(1);
- return String.format("varchar(%d)", Math.min(Integer.parseInt(len) * 3, 65533));
- }
-
- return type;
- }
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
new file mode 100644
index 0000000..9c59f14
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.doris.flink.cfg.DorisOptions;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/** Record the context of schema change and data change during serialization. */
+public class JsonDebeziumChangeContext implements Serializable {
+ private final DorisOptions dorisOptions;
+ // <cdc db.schema.table, doris db.table>
+ private final Map<String, String> tableMapping;
+ // table name of the cdc upstream, format is db.tbl
+ private final String sourceTableName;
+ private final String targetDatabase;
+ // create table properties
+ private final Map<String, String> tableProperties;
+ private final ObjectMapper objectMapper;
+ private final Pattern pattern;
+ private final String lineDelimiter;
+ private final boolean ignoreUpdateBefore;
+
+ public JsonDebeziumChangeContext(
+ DorisOptions dorisOptions,
+ Map<String, String> tableMapping,
+ String sourceTableName,
+ String targetDatabase,
+ Map<String, String> tableProperties,
+ ObjectMapper objectMapper,
+ Pattern pattern,
+ String lineDelimiter,
+ boolean ignoreUpdateBefore) {
+ this.dorisOptions = dorisOptions;
+ this.tableMapping = tableMapping;
+ this.sourceTableName = sourceTableName;
+ this.targetDatabase = targetDatabase;
+ this.tableProperties = tableProperties;
+ this.objectMapper = objectMapper;
+ this.pattern = pattern;
+ this.lineDelimiter = lineDelimiter;
+ this.ignoreUpdateBefore = ignoreUpdateBefore;
+ }
+
+ public DorisOptions getDorisOptions() {
+ return dorisOptions;
+ }
+
+ public Map<String, String> getTableMapping() {
+ return tableMapping;
+ }
+
+ public String getSourceTableName() {
+ return sourceTableName;
+ }
+
+ public String getTargetDatabase() {
+ return targetDatabase;
+ }
+
+ public Map<String, String> getTableProperties() {
+ return tableProperties;
+ }
+
+ public ObjectMapper getObjectMapper() {
+ return objectMapper;
+ }
+
+ public Pattern getPattern() {
+ return pattern;
+ }
+
+ public String getLineDelimiter() {
+ return lineDelimiter;
+ }
+
+ public boolean isIgnoreUpdateBefore() {
+ return ignoreUpdateBefore;
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java
new file mode 100644
index 0000000..5790c48
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.NullNode;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.sink.writer.ChangeEvent;
+import org.apache.doris.flink.sink.writer.serializer.DorisRecord;
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.doris.flink.sink.util.DeleteOperation.addDeleteSign;
+
+/**
+ * Convert the data change record of the upstream data source into a byte array that can be imported
+ * into doris through stream load.<br>
+ * Supported data changes include: read, insert, update, delete.
+ */
+public class JsonDebeziumDataChange implements ChangeEvent {
+ private static final Logger LOG = LoggerFactory.getLogger(JsonDebeziumDataChange.class);
+
+ private static final String OP_READ = "r"; // snapshot read
+ private static final String OP_CREATE = "c"; // insert
+ private static final String OP_UPDATE = "u"; // update
+ private static final String OP_DELETE = "d"; // delete
+ private final ObjectMapper objectMapper;
+ private final DorisOptions dorisOptions;
+ private final boolean ignoreUpdateBefore;
+ private final String lineDelimiter;
+ private JsonDebeziumChangeContext changeContext;
+
+ public JsonDebeziumDataChange(JsonDebeziumChangeContext changeContext) {
+ this.changeContext = changeContext;
+ this.dorisOptions = changeContext.getDorisOptions();
+ this.objectMapper = changeContext.getObjectMapper();
+ this.ignoreUpdateBefore = changeContext.isIgnoreUpdateBefore();
+ this.lineDelimiter = changeContext.getLineDelimiter();
+ }
+
+ public DorisRecord serialize(String record, JsonNode recordRoot, String op) throws IOException {
+ // Filter out table records that are not in tableMapping
+ String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
+ String dorisTableIdentifier = getDorisTableIdentifier(cdcTableIdentifier);
+ if (StringUtils.isNullOrWhitespaceOnly(dorisTableIdentifier)) {
+ LOG.warn(
+ "filter table {}, because it is not listened, record detail is {}",
+ cdcTableIdentifier,
+ record);
+ return null;
+ }
+
+ Map<String, Object> valueMap;
+ switch (op) {
+ case OP_READ:
+ case OP_CREATE:
+ valueMap = extractAfterRow(recordRoot);
+ addDeleteSign(valueMap, false);
+ break;
+ case OP_UPDATE:
+ return DorisRecord.of(dorisTableIdentifier, extractUpdate(recordRoot));
+ case OP_DELETE:
+ valueMap = extractBeforeRow(recordRoot);
+ addDeleteSign(valueMap, true);
+ break;
+ default:
+ LOG.error("parse record fail, unknown op {} in {}", op, record);
+ return null;
+ }
+
+ return DorisRecord.of(
+ dorisTableIdentifier,
+ objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8));
+ }
+
+ /**
+ * Change the update event into two.
+ *
+ * @param recordRoot
+ * @return
+ */
+ private byte[] extractUpdate(JsonNode recordRoot) throws JsonProcessingException {
+ StringBuilder updateRow = new StringBuilder();
+ if (!ignoreUpdateBefore) {
+ // convert delete
+ Map<String, Object> beforeRow = extractBeforeRow(recordRoot);
+ addDeleteSign(beforeRow, true);
+ updateRow.append(objectMapper.writeValueAsString(beforeRow)).append(this.lineDelimiter);
+ }
+
+ // convert insert
+ Map<String, Object> afterRow = extractAfterRow(recordRoot);
+ addDeleteSign(afterRow, false);
+ updateRow.append(objectMapper.writeValueAsString(afterRow));
+ return updateRow.toString().getBytes(StandardCharsets.UTF_8);
+ }
+
+ @VisibleForTesting
+ public String getCdcTableIdentifier(JsonNode record) {
+ String db = extractJsonNode(record.get("source"), "db");
+ String schema = extractJsonNode(record.get("source"), "schema");
+ String table = extractJsonNode(record.get("source"), "table");
+ return SourceSchema.getString(db, schema, table);
+ }
+
+ @VisibleForTesting
+ public String getDorisTableIdentifier(String cdcTableIdentifier) {
+ if (!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())) {
+ return dorisOptions.getTableIdentifier();
+ }
+ Map<String, String> tableMapping = changeContext.getTableMapping();
+ if (!CollectionUtil.isNullOrEmpty(tableMapping)
+ && !StringUtils.isNullOrWhitespaceOnly(cdcTableIdentifier)
+ && tableMapping.get(cdcTableIdentifier) != null) {
+ return tableMapping.get(cdcTableIdentifier);
+ }
+ return null;
+ }
+
+ private String extractJsonNode(JsonNode record, String key) {
+ return record != null && record.get(key) != null && !(record.get(key) instanceof NullNode)
+ ? record.get(key).asText()
+ : null;
+ }
+
+ private Map<String, Object> extractBeforeRow(JsonNode record) {
+ return extractRow(record.get("before"));
+ }
+
+ private Map<String, Object> extractAfterRow(JsonNode record) {
+ return extractRow(record.get("after"));
+ }
+
+ private Map<String, Object> extractRow(JsonNode recordRow) {
+ Map<String, Object> recordMap =
+ objectMapper.convertValue(recordRow, new TypeReference<Map<String, Object>>() {});
+ return recordMap != null ? recordMap : new HashMap<>();
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
new file mode 100644
index 0000000..4c67164
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.NullNode;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.sink.schema.SchemaChangeManager;
+import org.apache.doris.flink.sink.writer.ChangeEvent;
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * Synchronize the schema change in the upstream data source to the doris database table.
+ *
+ * <p>There are two schema change modes:<br>
+ * 1. {@link JsonDebeziumSchemaChangeImpl} only supports table column name and column type changes,
+ * and this mode is used by default. <br>
+ * 2. {@link JsonDebeziumSchemaChangeImplV2} supports table column name, column type, default,
+ * comment synchronization, supports multi-column changes, and supports column name rename. Need to
+ * be enabled by configuring use-new-schema-change.
+ */
+public abstract class JsonDebeziumSchemaChange implements ChangeEvent {
+ protected static String addDropDDLRegex =
+ "ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*";
+ protected Pattern addDropDDLPattern;
+
+ // table name of the cdc upstream, format is db.tbl
+ protected String sourceTableName;
+ protected DorisOptions dorisOptions;
+ protected ObjectMapper objectMapper;
+ // <cdc db.schema.table, doris db.table>
+ protected Map<String, String> tableMapping;
+ protected SchemaChangeManager schemaChangeManager;
+ protected JsonDebeziumChangeContext changeContext;
+
+ public abstract boolean schemaChange(JsonNode recordRoot);
+
+ public abstract void init(JsonNode recordRoot);
+
+ /** When cdc synchronizes multiple tables, it will capture multiple table schema changes. */
+ protected boolean checkTable(JsonNode recordRoot) {
+ String db = extractDatabase(recordRoot);
+ String tbl = extractTable(recordRoot);
+ String dbTbl = db + "." + tbl;
+ return sourceTableName.equals(dbTbl);
+ }
+
+ protected String extractDatabase(JsonNode record) {
+ if (record.get("source").has("schema")) {
+ // compatible with schema
+ return extractJsonNode(record.get("source"), "schema");
+ } else {
+ return extractJsonNode(record.get("source"), "db");
+ }
+ }
+
+ protected String extractTable(JsonNode record) {
+ return extractJsonNode(record.get("source"), "table");
+ }
+
+ protected String extractJsonNode(JsonNode record, String key) {
+ return record != null && record.get(key) != null && !(record.get(key) instanceof NullNode)
+ ? record.get(key).asText()
+ : null;
+ }
+
+ @VisibleForTesting
+ public String getDorisTableIdentifier(String cdcTableIdentifier) {
+ if (!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())) {
+ return dorisOptions.getTableIdentifier();
+ }
+ if (!CollectionUtil.isNullOrEmpty(tableMapping)
+ && !StringUtils.isNullOrWhitespaceOnly(cdcTableIdentifier)
+ && tableMapping.get(cdcTableIdentifier) != null) {
+ return tableMapping.get(cdcTableIdentifier);
+ }
+ return null;
+ }
+
+ protected String getDorisTableIdentifier(JsonNode record) {
+ String identifier = getCdcTableIdentifier(record);
+ return getDorisTableIdentifier(identifier);
+ }
+
+ protected Tuple2<String, String> getDorisTableTuple(JsonNode record) {
+ String identifier = getDorisTableIdentifier(record);
+ if (StringUtils.isNullOrWhitespaceOnly(identifier)) {
+ return null;
+ }
+ String[] tableInfo = identifier.split("\\.");
+ if (tableInfo.length != 2) {
+ return null;
+ }
+ return Tuple2.of(tableInfo[0], tableInfo[1]);
+ }
+
+ @VisibleForTesting
+ public String getCdcTableIdentifier(JsonNode record) {
+ String db = extractJsonNode(record.get("source"), "db");
+ String schema = extractJsonNode(record.get("source"), "schema");
+ String table = extractJsonNode(record.get("source"), "table");
+ return SourceSchema.getString(db, schema, table);
+ }
+
+ protected JsonNode extractHistoryRecord(JsonNode record) throws JsonProcessingException {
+ if (record != null && record.has("historyRecord")) {
+ return objectMapper.readTree(record.get("historyRecord").asText());
+ }
+ // The ddl passed by some scenes will not be included in the historyRecord,
+ // such as DebeziumSourceFunction
+ return record;
+ }
+
+ @VisibleForTesting
+ public void setSchemaChangeManager(SchemaChangeManager schemaChangeManager) {
+ this.schemaChangeManager = schemaChangeManager;
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java
new file mode 100644
index 0000000..4cf0970
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.doris.flink.exception.IllegalArgumentException;
+import org.apache.doris.flink.sink.schema.SchemaChangeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/** Use expression to match ddl sql. */
+public class JsonDebeziumSchemaChangeImpl extends JsonDebeziumSchemaChange {
+ private static final Logger LOG = LoggerFactory.getLogger(JsonDebeziumSchemaChangeImpl.class);
+ // alter table tbl add cloumn aca int
+ public static final String EXECUTE_DDL = "ALTER TABLE %s %s COLUMN %s %s";
+
+ public JsonDebeziumSchemaChangeImpl(JsonDebeziumChangeContext changeContext) {
+ this.changeContext = changeContext;
+ this.dorisOptions = changeContext.getDorisOptions();
+ this.schemaChangeManager = new SchemaChangeManager(dorisOptions);
+ this.sourceTableName = changeContext.getSourceTableName();
+ this.tableMapping = changeContext.getTableMapping();
+ this.objectMapper = changeContext.getObjectMapper();
+ this.addDropDDLPattern =
+ Objects.isNull(changeContext.getPattern())
+ ? Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE)
+ : changeContext.getPattern();
+ }
+
+ @Override
+ public void init(JsonNode recordRoot) {
+ // do nothing
+ }
+
+ @VisibleForTesting
+ @Override
+ public boolean schemaChange(JsonNode recordRoot) {
+ boolean status = false;
+ try {
+ if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)) {
+ return false;
+ }
+ // db,table
+ Tuple2<String, String> tuple = getDorisTableTuple(recordRoot);
+ if (tuple == null) {
+ return false;
+ }
+
+ String ddl = extractDDL(recordRoot);
+ if (StringUtils.isNullOrWhitespaceOnly(ddl)) {
+ LOG.info("ddl can not do schema change:{}", recordRoot);
+ return false;
+ }
+
+ boolean doSchemaChange = checkSchemaChange(tuple.f0, tuple.f1, ddl);
+ status = doSchemaChange && schemaChangeManager.execute(ddl, tuple.f0);
+ LOG.info("schema change status:{}", status);
+ } catch (Exception ex) {
+ LOG.warn("schema change error :", ex);
+ }
+ return status;
+ }
+
+ private boolean checkSchemaChange(String database, String table, String ddl)
+ throws IOException, IllegalArgumentException {
+ Map<String, Object> param = buildRequestParam(ddl);
+ return schemaChangeManager.checkSchemaChange(database, table, param);
+ }
+
+ /** Build param { "isDropColumn": true, "columnName" : "column" }. */
+ protected Map<String, Object> buildRequestParam(String ddl) {
+ Map<String, Object> params = new HashMap<>();
+ Matcher matcher = addDropDDLPattern.matcher(ddl);
+ if (matcher.find()) {
+ String op = matcher.group(1);
+ String col = matcher.group(3);
+ params.put("isDropColumn", op.equalsIgnoreCase("DROP"));
+ params.put("columnName", col);
+ }
+ return params;
+ }
+
+ @VisibleForTesting
+ public String extractDDL(JsonNode record) throws JsonProcessingException {
+ JsonNode historyRecord = extractHistoryRecord(record);
+ String ddl = extractJsonNode(historyRecord, "ddl");
+ LOG.debug("received debezium ddl :{}", ddl);
+ if (!Objects.isNull(ddl)) {
+ // filter add/drop operation
+ Matcher matcher = addDropDDLPattern.matcher(ddl);
+ if (matcher.find()) {
+ String op = matcher.group(1);
+ String col = matcher.group(3);
+ String type = matcher.group(5);
+ type = handleType(type);
+ ddl = String.format(EXECUTE_DDL, getDorisTableIdentifier(record), op, col, type);
+ LOG.info("parse ddl:{}", ddl);
+ return ddl;
+ }
+ }
+ return null;
+ }
+
+ private String handleType(String type) {
+ if (type == null || "".equals(type)) {
+ return "";
+ }
+ // varchar len * 3
+ Pattern pattern = Pattern.compile("varchar\\(([1-9][0-9]*)\\)", Pattern.CASE_INSENSITIVE);
+ Matcher matcher = pattern.matcher(type);
+ if (matcher.find()) {
+ String len = matcher.group(1);
+ return String.format("varchar(%d)", Math.min(Integer.parseInt(len) * 3, 65533));
+ }
+
+ return type;
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
new file mode 100644
index 0000000..5604da7
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
@@ -0,0 +1,389 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.doris.flink.catalog.doris.DataModel;
+import org.apache.doris.flink.catalog.doris.FieldSchema;
+import org.apache.doris.flink.catalog.doris.TableSchema;
+import org.apache.doris.flink.exception.IllegalArgumentException;
+import org.apache.doris.flink.sink.schema.SchemaChangeHelper;
+import org.apache.doris.flink.sink.schema.SchemaChangeHelper.DDLSchema;
+import org.apache.doris.flink.sink.schema.SchemaChangeManager;
+import org.apache.doris.flink.sink.writer.EventType;
+import org.apache.doris.flink.tools.cdc.SourceConnector;
+import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
+import org.apache.doris.flink.tools.cdc.oracle.OracleType;
+import org.apache.doris.flink.tools.cdc.postgres.PostgresType;
+import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Extract the columns that need to be changed based on the change records of the upstream data
+ * source.
+ */
+public class JsonDebeziumSchemaChangeImplV2 extends JsonDebeziumSchemaChange {
+ private static final Logger LOG = LoggerFactory.getLogger(JsonDebeziumSchemaChangeImplV2.class);
+ private static final Pattern renameDDLPattern =
+ Pattern.compile(
+ "ALTER\\s+TABLE\\s+(\\w+)\\s+RENAME\\s+COLUMN\\s+(\\w+)\\s+TO\\s+(\\w+)",
+ Pattern.CASE_INSENSITIVE);
+ private Map<String, FieldSchema> originFieldSchemaMap;
+ private SourceConnector sourceConnector;
+ // create table properties
+ private final Map<String, String> tableProperties;
+ private String targetDatabase;
+
+ public JsonDebeziumSchemaChangeImplV2(JsonDebeziumChangeContext changeContext) {
+ this.addDropDDLPattern = Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE);
+ this.changeContext = changeContext;
+ this.sourceTableName = changeContext.getSourceTableName();
+ this.dorisOptions = changeContext.getDorisOptions();
+ this.schemaChangeManager = new SchemaChangeManager(dorisOptions);
+ this.targetDatabase = changeContext.getTargetDatabase();
+ this.tableProperties = changeContext.getTableProperties();
+ this.tableMapping = changeContext.getTableMapping();
+ this.objectMapper = changeContext.getObjectMapper();
+ }
+
+ @Override
+ public void init(JsonNode recordRoot) {
+ originFieldSchemaMap = new LinkedHashMap<>();
+ Set<String> columnNameSet = extractAfterRow(recordRoot).keySet();
+ if (CollectionUtils.isEmpty(columnNameSet)) {
+ columnNameSet = extractBeforeRow(recordRoot).keySet();
+ }
+ columnNameSet.forEach(
+ columnName -> originFieldSchemaMap.put(columnName, new FieldSchema()));
+ }
+
+ @Override
+ public boolean schemaChange(JsonNode recordRoot) {
+ boolean status = false;
+ try {
+ if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)) {
+ return false;
+ }
+
+ EventType eventType = extractEventType(recordRoot);
+ if (eventType == null) {
+ return false;
+ }
+ if (eventType.equals(EventType.CREATE)) {
+ TableSchema tableSchema = extractCreateTableSchema(recordRoot);
+ status = schemaChangeManager.createTable(tableSchema);
+ if (status) {
+ String cdcTbl = getCdcTableIdentifier(recordRoot);
+ String dorisTbl = getCreateTableIdentifier(recordRoot);
+ changeContext.getTableMapping().put(cdcTbl, dorisTbl);
+ this.tableMapping = changeContext.getTableMapping();
+ LOG.info("create table ddl status: {}", status);
+ }
+ } else if (eventType.equals(EventType.ALTER)) {
+ // db,table
+ Tuple2<String, String> tuple = getDorisTableTuple(recordRoot);
+ if (tuple == null) {
+ return false;
+ }
+ List<String> ddlSqlList = extractDDLList(recordRoot);
+ if (CollectionUtils.isEmpty(ddlSqlList)) {
+ LOG.info("ddl can not do schema change:{}", recordRoot);
+ return false;
+ }
+ List<DDLSchema> ddlSchemas = SchemaChangeHelper.getDdlSchemas();
+ for (int i = 0; i < ddlSqlList.size(); i++) {
+ DDLSchema ddlSchema = ddlSchemas.get(i);
+ String ddlSql = ddlSqlList.get(i);
+ boolean doSchemaChange = checkSchemaChange(tuple.f0, tuple.f1, ddlSchema);
+ status = doSchemaChange && schemaChangeManager.execute(ddlSql, tuple.f0);
+ LOG.info("schema change status:{}, ddl:{}", status, ddlSql);
+ }
+ } else {
+ LOG.info("Unsupported event type {}", eventType);
+ }
+ } catch (Exception ex) {
+ LOG.warn("schema change error :", ex);
+ }
+ return status;
+ }
+
+ private JsonNode extractTableChange(JsonNode record) throws JsonProcessingException {
+ JsonNode historyRecord = extractHistoryRecord(record);
+ JsonNode tableChanges = historyRecord.get("tableChanges");
+ if (!Objects.isNull(tableChanges)) {
+ JsonNode tableChange = tableChanges.get(0);
+ return tableChange;
+ }
+ return null;
+ }
+
+ /** Parse Alter Event. */
+ @VisibleForTesting
+ public List<String> extractDDLList(JsonNode record) throws IOException {
+ String dorisTable = getDorisTableIdentifier(record);
+ JsonNode historyRecord = extractHistoryRecord(record);
+ String ddl = extractJsonNode(historyRecord, "ddl");
+ JsonNode tableChange = extractTableChange(record);
+ EventType eventType = extractEventType(record);
+ if (Objects.isNull(tableChange)
+ || Objects.isNull(ddl)
+ || !eventType.equals(EventType.ALTER)) {
+ return null;
+ }
+
+ JsonNode columns = tableChange.get("table").get("columns");
+ if (Objects.isNull(sourceConnector)) {
+ sourceConnector =
+ SourceConnector.valueOf(
+ record.get("source").get("connector").asText().toUpperCase());
+ fillOriginSchema(columns);
+ }
+
+ // rename ddl
+ Matcher renameMatcher = renameDDLPattern.matcher(ddl);
+ if (renameMatcher.find()) {
+ String oldColumnName = renameMatcher.group(2);
+ String newColumnName = renameMatcher.group(3);
+ return SchemaChangeHelper.generateRenameDDLSql(
+ dorisTable, oldColumnName, newColumnName, originFieldSchemaMap);
+ }
+
+ // add/drop ddl
+ Map<String, FieldSchema> updateFiledSchema = new LinkedHashMap<>();
+ for (JsonNode column : columns) {
+ buildFieldSchema(updateFiledSchema, column);
+ }
+ SchemaChangeHelper.compareSchema(updateFiledSchema, originFieldSchemaMap);
+ // In order to avoid other source table column change operations other than add/drop/rename,
+ // which may lead to the accidental deletion of the doris column.
+ Matcher matcher = addDropDDLPattern.matcher(ddl);
+ if (!matcher.find()) {
+ return null;
+ }
+ return SchemaChangeHelper.generateDDLSql(dorisTable);
+ }
+
+ @VisibleForTesting
+ public TableSchema extractCreateTableSchema(JsonNode record) throws JsonProcessingException {
+ if (sourceConnector == null) {
+ sourceConnector =
+ SourceConnector.valueOf(
+ record.get("source").get("connector").asText().toUpperCase());
+ }
+
+ String dorisTable = getCreateTableIdentifier(record);
+ JsonNode tableChange = extractTableChange(record);
+ JsonNode pkColumns = tableChange.get("table").get("primaryKeyColumnNames");
+ JsonNode columns = tableChange.get("table").get("columns");
+ JsonNode comment = tableChange.get("table").get("comment");
+ String tblComment = comment == null ? "" : comment.asText();
+ Map<String, FieldSchema> field = new LinkedHashMap<>();
+ for (JsonNode column : columns) {
+ buildFieldSchema(field, column);
+ }
+ List<String> pkList = new ArrayList<>();
+ for (JsonNode column : pkColumns) {
+ String fieldName = column.asText();
+ pkList.add(fieldName);
+ }
+
+ TableSchema tableSchema = new TableSchema();
+ tableSchema.setFields(field);
+ tableSchema.setKeys(pkList);
+ tableSchema.setDistributeKeys(buildDistributeKeys(pkList, field));
+ tableSchema.setTableComment(tblComment);
+ tableSchema.setProperties(tableProperties);
+ tableSchema.setModel(pkList.isEmpty() ? DataModel.DUPLICATE : DataModel.UNIQUE);
+
+ String[] split = dorisTable.split("\\.");
+ Preconditions.checkArgument(split.length == 2);
+ tableSchema.setDatabase(split[0]);
+ tableSchema.setTable(split[1]);
+ return tableSchema;
+ }
+
+ private List<String> buildDistributeKeys(
+ List<String> primaryKeys, Map<String, FieldSchema> fields) {
+ if (!CollectionUtil.isNullOrEmpty(primaryKeys)) {
+ return primaryKeys;
+ }
+ if (!fields.isEmpty()) {
+ Map.Entry<String, FieldSchema> firstField = fields.entrySet().iterator().next();
+ return Collections.singletonList(firstField.getKey());
+ }
+ return new ArrayList<>();
+ }
+
+ private String getCreateTableIdentifier(JsonNode record) {
+ String table = extractJsonNode(record.get("source"), "table");
+ return targetDatabase + "." + table;
+ }
+
+ private boolean checkSchemaChange(String database, String table, DDLSchema ddlSchema)
+ throws IOException, IllegalArgumentException {
+ Map<String, Object> param =
+ SchemaChangeManager.buildRequestParam(
+ ddlSchema.isDropColumn(), ddlSchema.getColumnName());
+ return schemaChangeManager.checkSchemaChange(database, table, param);
+ }
+
+ /** Parse event type. */
+ protected EventType extractEventType(JsonNode record) throws JsonProcessingException {
+ JsonNode tableChange = extractTableChange(record);
+ if (tableChange == null || tableChange.get("type") == null) {
+ return null;
+ }
+ String type = tableChange.get("type").asText();
+ if (EventType.ALTER.toString().equalsIgnoreCase(type)) {
+ return EventType.ALTER;
+ } else if (EventType.CREATE.toString().equalsIgnoreCase(type)) {
+ return EventType.CREATE;
+ }
+ return null;
+ }
+
+ private Map<String, Object> extractBeforeRow(JsonNode record) {
+ return extractRow(record.get("before"));
+ }
+
+ private Map<String, Object> extractAfterRow(JsonNode record) {
+ return extractRow(record.get("after"));
+ }
+
+ private Map<String, Object> extractRow(JsonNode recordRow) {
+ Map<String, Object> recordMap =
+ objectMapper.convertValue(recordRow, new TypeReference<Map<String, Object>>() {});
+ return recordMap != null ? recordMap : new HashMap<>();
+ }
+
+ @VisibleForTesting
+ public void fillOriginSchema(JsonNode columns) {
+ if (Objects.nonNull(originFieldSchemaMap)) {
+ for (JsonNode column : columns) {
+ String fieldName = column.get("name").asText();
+ if (originFieldSchemaMap.containsKey(fieldName)) {
+ String dorisTypeName = buildDorisTypeName(column);
+ String defaultValue =
+ handleDefaultValue(extractJsonNode(column, "defaultValueExpression"));
+ String comment = extractJsonNode(column, "comment");
+ FieldSchema fieldSchema = originFieldSchemaMap.get(fieldName);
+ fieldSchema.setName(fieldName);
+ fieldSchema.setTypeString(dorisTypeName);
+ fieldSchema.setComment(comment);
+ fieldSchema.setDefaultValue(defaultValue);
+ }
+ }
+ } else {
+ LOG.error(
+ "Current schema change failed! You need to ensure that "
+ + "there is data in the table."
+ + dorisOptions.getTableIdentifier());
+ originFieldSchemaMap = new LinkedHashMap<>();
+ columns.forEach(column -> buildFieldSchema(originFieldSchemaMap, column));
+ }
+ }
+
+ private void buildFieldSchema(Map<String, FieldSchema> filedSchemaMap, JsonNode column) {
+ String fieldName = column.get("name").asText();
+ String dorisTypeName = buildDorisTypeName(column);
+ String defaultValue = handleDefaultValue(extractJsonNode(column, "defaultValueExpression"));
+ String comment = extractJsonNode(column, "comment");
+ filedSchemaMap.put(
+ fieldName, new FieldSchema(fieldName, dorisTypeName, defaultValue, comment));
+ }
+
+ @VisibleForTesting
+ public String buildDorisTypeName(JsonNode column) {
+ int length = column.get("length") == null ? 0 : column.get("length").asInt();
+ int scale = column.get("scale") == null ? 0 : column.get("scale").asInt();
+ String sourceTypeName = column.get("typeName").asText();
+ String dorisTypeName;
+ switch (sourceConnector) {
+ case MYSQL:
+ dorisTypeName = MysqlType.toDorisType(sourceTypeName, length, scale);
+ break;
+ case ORACLE:
+ dorisTypeName = OracleType.toDorisType(sourceTypeName, length, scale);
+ break;
+ case POSTGRES:
+ dorisTypeName = PostgresType.toDorisType(sourceTypeName, length, scale);
+ break;
+ case SQLSERVER:
+ dorisTypeName = SqlServerType.toDorisType(sourceTypeName, length, scale);
+ break;
+ default:
+ String errMsg = "Not support " + sourceTypeName + " schema change.";
+ throw new UnsupportedOperationException(errMsg);
+ }
+ return dorisTypeName;
+ }
+
+ private String handleDefaultValue(String defaultValue) {
+ if (StringUtils.isNullOrWhitespaceOnly(defaultValue)) {
+ return null;
+ }
+ // Due to historical reasons, doris needs to add quotes to
+ // the default value of the new column
+ // For example in mysql: alter table add column c1 int default 100
+ // In Doris: alter table add column c1 int default '100'
+ if (Pattern.matches("['\"].*?['\"]", defaultValue)) {
+ return defaultValue;
+ } else if (defaultValue.equals("1970-01-01 00:00:00")) {
+ // TODO: The default value of setting the current time in CDC is 1970-01-01 00:00:00
+ return "current_timestamp";
+ }
+ return "'" + defaultValue + "'";
+ }
+
+ @VisibleForTesting
+ public void setOriginFieldSchemaMap(Map<String, FieldSchema> originFieldSchemaMap) {
+ this.originFieldSchemaMap = originFieldSchemaMap;
+ }
+
+ @VisibleForTesting
+ public Map<String, FieldSchema> getOriginFieldSchemaMap() {
+ return originFieldSchemaMap;
+ }
+
+ @VisibleForTesting
+ public void setSourceConnector(String sourceConnector) {
+ this.sourceConnector = SourceConnector.valueOf(sourceConnector.toUpperCase());
+ }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
index 7da3a30..e32f1b3 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
@@ -18,48 +18,34 @@
package org.apache.doris.flink.sink.writer;
import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Maps;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.doris.flink.catalog.doris.FieldSchema;
-import org.apache.doris.flink.catalog.doris.TableSchema;
-import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import org.apache.doris.flink.cfg.DorisOptions;
-import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.doris.flink.exception.DorisException;
-import org.apache.doris.flink.rest.RestService;
-import org.apache.doris.flink.rest.models.Field;
-import org.apache.doris.flink.rest.models.Schema;
+import org.apache.doris.flink.exception.IllegalArgumentException;
+import org.apache.doris.flink.sink.schema.SchemaChangeManager;
+import org.apache.doris.flink.sink.writer.serializer.DorisRecord;
import org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer;
-import org.apache.doris.flink.tools.cdc.SourceConnector;
+import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChange;
import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.junit.Before;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.mockito.Mockito;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
-/** test for JsonDebeziumSchemaSerializer. */
+/** Test for JsonDebeziumSchemaSerializer. */
public class TestJsonDebeziumSchemaSerializer {
- private static final Logger LOG =
- LoggerFactory.getLogger(TestJsonDebeziumSchemaSerializer.class);
- static DorisOptions dorisOptions;
- static JsonDebeziumSchemaSerializer serializer;
- static ObjectMapper objectMapper = new ObjectMapper();
- @BeforeClass
- public static void setUp() {
+ protected static DorisOptions dorisOptions;
+ protected ObjectMapper objectMapper = new ObjectMapper();
+ private JsonDebeziumSchemaSerializer serializer;
+ private SchemaChangeManager mockSchemaChangeManager;
+
+ @Before
+ public void setUp() throws IOException, IllegalArgumentException {
dorisOptions =
DorisOptions.builder()
.setFenodes("127.0.0.1:8030")
@@ -67,22 +53,31 @@
.setUsername("root")
.setPassword("")
.build();
- serializer = JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build();
+ this.objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
+ JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true);
+ this.objectMapper.setNodeFactory(jsonNodeFactory);
+ this.serializer =
+ JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build();
+ this.mockSchemaChangeManager = Mockito.mock(SchemaChangeManager.class);
+ Mockito.when(
+ mockSchemaChangeManager.checkSchemaChange(
+ Mockito.any(), Mockito.any(), Mockito.any()))
+ .thenReturn(true);
}
@Test
- public void testSerializeInsert() throws IOException {
- // insert into t1 VALUES(1,"doris",'2022-01-01','2022-01-01 10:01:02','2022-01-01
- // 10:01:03');
- byte[] serializedValue =
- serializer
- .serialize(
- "{\"before\":null,\"after\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663923840000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":11834,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"c\",\"ts_ms\":1663923840146,\"transaction\":null}")
- .getRow();
+ public void testJsonDebeziumDataChange() throws IOException {
+ // insert into t1
+ // VALUES(1,"doris",'2022-01-01','2022-01-01 10:01:02','2022-01-0110:01:03');
+ String record =
+ "{\"before\":null,\"after\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663923840000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":11834,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"c\",\"ts_ms\":1663923840146,\"transaction\":null}";
+ DorisRecord dorisRecord = serializer.serialize(record);
+ byte[] row = dorisRecord.getRow();
Map<String, String> valueMap =
objectMapper.readValue(
- new String(serializedValue, StandardCharsets.UTF_8),
+ new String(row, StandardCharsets.UTF_8),
new TypeReference<Map<String, String>>() {});
+
Assert.assertEquals("1", valueMap.get("id"));
Assert.assertEquals("doris", valueMap.get("name"));
Assert.assertEquals("2022-01-01", valueMap.get("dt"));
@@ -93,388 +88,32 @@
}
@Test
- public void testSerializeUpdate() throws IOException {
- // update t1 set name='doris-update' WHERE id =1;
- byte[] serializedValue =
- serializer
- .serialize(
- "{\"before\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924082000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12154,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"u\",\"ts_ms\":1663924082186,\"transaction\":null}")
- .getRow();
- Map<String, String> valueMap =
- objectMapper.readValue(
- new String(serializedValue, StandardCharsets.UTF_8),
- new TypeReference<Map<String, String>>() {});
- Assert.assertEquals("1", valueMap.get("id"));
- Assert.assertEquals("doris-update", valueMap.get("name"));
- Assert.assertEquals("2022-01-01", valueMap.get("dt"));
- Assert.assertEquals("2022-01-01 10:01:02", valueMap.get("dtime"));
- Assert.assertEquals("2022-01-01 10:01:03", valueMap.get("ts"));
- Assert.assertEquals("0", valueMap.get("__DORIS_DELETE_SIGN__"));
- Assert.assertEquals(6, valueMap.size());
+ public void testTestJsonDebeziumSchemaChangeImpl() throws IOException {
+ // ALTER TABLE test.t1 add COLUMN c_1 varchar(600)
+ String record =
+ "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
+ JsonDebeziumSchemaChange schemaChange = serializer.getJsonDebeziumSchemaChange();
+ schemaChange.setSchemaChangeManager(mockSchemaChangeManager);
+ DorisRecord serializeRecord = serializer.serialize(record);
+ Assert.assertNull(serializeRecord);
}
@Test
- public void testSerializeUpdateBefore() throws IOException {
- serializer =
+ public void testTestJsonDebeziumSchemaChangeImplV2() throws IOException {
+ // alter table test_sink add column c4 varchar(100) default 'aaa', drop column c3;
+ String recordValue =
+ "{\"before\":null,\"after\":{\"id\":1,\"c3\":1111},\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":0,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink\",\"server_id\":0,\"gtid\":null,\"file\":\"\",\"pos\":0,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"r\",\"ts_ms\":1703151083107,\"transaction\":null}";
+ String recordSchema =
+ "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1703151236431,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000050\",\"pos\":6040,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000050\\\",\\\"pos\\\":6040,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1703151236,\\\"file\\\":\\\"binlog.000050\\\",\\\"pos\\\":6206,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table test_sink add column c4 varchar(100) default 'aaa', drop column c3\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"test_sink\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10000\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c4\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":100,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"aaa\\\",\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}";
+ this.serializer =
JsonDebeziumSchemaSerializer.builder()
.setDorisOptions(dorisOptions)
- .setExecutionOptions(
- DorisExecutionOptions.builderDefaults()
- .setIgnoreUpdateBefore(false)
- .build())
+ .setNewSchemaChange(true)
.build();
- // update t1 set name='doris-update' WHERE id =1;
- byte[] serializedValue =
- serializer
- .serialize(
- "{\"before\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924082000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12154,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"u\",\"ts_ms\":1663924082186,\"transaction\":null}")
- .getRow();
- String row = new String(serializedValue, StandardCharsets.UTF_8);
- String[] split = row.split("\n");
- Map<String, String> valueMap =
- objectMapper.readValue(split[1], new TypeReference<Map<String, String>>() {});
- Assert.assertEquals("1", valueMap.get("id"));
- Assert.assertEquals("doris-update", valueMap.get("name"));
- Assert.assertEquals("2022-01-01", valueMap.get("dt"));
- Assert.assertEquals("2022-01-01 10:01:02", valueMap.get("dtime"));
- Assert.assertEquals("2022-01-01 10:01:03", valueMap.get("ts"));
- Assert.assertEquals("0", valueMap.get("__DORIS_DELETE_SIGN__"));
- Assert.assertEquals(6, valueMap.size());
-
- Map<String, String> beforeMap =
- objectMapper.readValue(split[0], new TypeReference<Map<String, String>>() {});
- Assert.assertEquals("doris", beforeMap.get("name"));
- }
-
- @Test
- public void testSerializeDelete() throws IOException {
- byte[] serializedValue =
- serializer
- .serialize(
- "{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"d\",\"ts_ms\":1663924328869,\"transaction\":null}")
- .getRow();
- Map<String, String> valueMap =
- objectMapper.readValue(
- new String(serializedValue, StandardCharsets.UTF_8),
- new TypeReference<Map<String, String>>() {});
- Assert.assertEquals("1", valueMap.get("id"));
- Assert.assertEquals("doris-update", valueMap.get("name"));
- Assert.assertEquals("2022-01-01", valueMap.get("dt"));
- Assert.assertEquals("2022-01-01 10:01:02", valueMap.get("dtime"));
- Assert.assertEquals("2022-01-01 10:01:03", valueMap.get("ts"));
- Assert.assertEquals("1", valueMap.get("__DORIS_DELETE_SIGN__"));
- Assert.assertEquals(6, valueMap.size());
- }
-
- @Test
- public void testExtractDDL() throws IOException {
- String srcDDL = "ALTER TABLE test.t1 add COLUMN c_1 varchar(600)";
- String record =
- "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
- JsonNode recordRoot = objectMapper.readTree(record);
- String ddl = serializer.extractDDL(recordRoot);
- Assert.assertEquals(srcDDL, ddl);
-
- String targetDDL = "ALTER TABLE test.t1 add COLUMN c_1 varchar(65533)";
- String record1 =
- "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n c_1 varchar(30000)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
- JsonNode recordRoot1 = objectMapper.readTree(record1);
- String ddl1 = serializer.extractDDL(recordRoot1);
- Assert.assertEquals(targetDDL, ddl1);
- }
-
- @Test
- public void testExtractDDLListMultipleColumns() throws IOException {
- String sql0 = "ALTER TABLE test.t1 ADD COLUMN id INT DEFAULT '10000'";
- String sql1 = "ALTER TABLE test.t1 ADD COLUMN c199 INT";
- String sql2 = "ALTER TABLE test.t1 ADD COLUMN c12 INT DEFAULT '100'";
- String sql3 = "ALTER TABLE test.t1 DROP COLUMN c13";
- List<String> srcSqlList = Arrays.asList(sql0, sql1, sql2, sql3);
-
- Map<String, FieldSchema> originFiledSchemaMap = new LinkedHashMap<>();
- originFiledSchemaMap.put("c2", new FieldSchema());
- originFiledSchemaMap.put("c555", new FieldSchema());
- originFiledSchemaMap.put("c666", new FieldSchema());
- originFiledSchemaMap.put("c4", new FieldSchema());
- originFiledSchemaMap.put("c13", new FieldSchema());
-
- String record =
- "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691033764674,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23305,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23305,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1691033764,\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23464,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 drop c11, drop column c3, add c12 int default 100\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10000\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c2\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c555\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":100,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c666\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"100\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c4\\\",\\\"jdbcType\\\":-5,\\\"typeName\\\":\\\"BIGINT\\\",\\\"typeExpression\\\":\\\"BIGINT\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"555\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c199\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c12\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":7,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"100\\\",\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}";
- JsonNode recordRoot = objectMapper.readTree(record);
- serializer.setOriginFieldSchemaMap(originFiledSchemaMap);
- List<String> ddlSQLList = serializer.extractDDLList(recordRoot);
- for (int i = 0; i < ddlSQLList.size(); i++) {
- String srcSQL = srcSqlList.get(i);
- String targetSQL = ddlSQLList.get(i);
- Assert.assertEquals(srcSQL, targetSQL);
- }
- }
-
- @Test
- public void testExtractDDLListCreateTable() throws IOException {
- String record =
- "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"oracle\",\"name\":\"oracle_logminer\",\"ts_ms\":1696945825065,\"snapshot\":\"true\",\"db\":\"HELOWIN\",\"sequence\":null,\"schema\":\"ADMIN\",\"table\":\"PERSONS\",\"txId\":null,\"scn\":\"1199617\",\"commit_scn\":null,\"lcr_position\":null,\"rs_id\":null,\"ssn\":0,\"redo_thread\":null},\"databaseName\":\"HELOWIN\",\"schemaName\":\"ADMIN\",\"ddl\":\"\\n CREATE TABLE \\\"ADMIN\\\".\\\"PERSONS\\\" \\n (\\t\\\"ID\\\" NUMBER(10,0), \\n\\t\\\"NAME4\\\" VARCHAR2(128) NOT NULL ENABLE, \\n\\t\\\"age4\\\" VARCHAR2(128), \\n\\t\\\"c100\\\" LONG, \\n\\t\\\"c55\\\" VARCHAR2(255), \\n\\t\\\"c77\\\" VARCHAR2(255), \\n\\t PRIMARY KEY (\\\"ID\\\") ENABLE\\n ) ;\\n \",\"tableChanges\":[{\"type\":\"CREATE\",\"id\":\"\\\"HELOWIN\\\".\\\"ADMIN\\\".\\\"PERSONS\\\"\",\"table\":{\"defaultCharsetName\":null,\"primaryKeyColumnNames\":[\"ID\"],\"columns\":[{\"name\":\"ID\",\"jdbcType\":2,\"nativeType\":null,\"typeName\":\"NUMBER\",\"typeExpression\":\"NUMBER\",\"charsetName\":null,\"length\":10,\"scale\":0,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null},{\"name\":\"NAME4\",\"jdbcType\":12,\"nativeType\":null,\"typeName\":\"VARCHAR2\",\"typeExpression\":\"VARCHAR2\",\"charsetName\":null,\"length\":128,\"scale\":null,\"position\":2,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null},{\"name\":\"age4\",\"jdbcType\":12,\"nativeType\":null,\"typeName\":\"VARCHAR2\",\"typeExpression\":\"VARCHAR2\",\"charsetName\":null,\"length\":128,\"scale\":null,\"position\":3,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null},{\"name\":\"c100\",\"jdbcType\":-1,\"nativeType\":null,\"typeName\":\"LONG\",\"typeExpression\":\"LONG\",\"charsetName\":null,\"length\":0,\"scale\":null,\"position\":4,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null},{\"name\":\"c55\",\"jdbcType\":12,\"nativeType\":null,\"typeName\":\"VARCHAR2\",\"typeExpression\":\"VARCHAR2\",\"charsetName\":null,\"length\":255,\"scale\":null,\"position\":5,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null},{\"name\":\"c77\",\"jdbcType\":12,\"nativeType\":null,\"typeName\":\"VARCHAR2\",\"typeExpression\":\"VARCHAR2\",\"charsetName\":null,\"length\":255,\"scale\":null,\"position\":6,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null}],\"comment\":null}}]}";
- JsonNode recordRoot = objectMapper.readTree(record);
- serializer.setSourceConnector("oracle");
- List<String> ddlSQLList = serializer.extractDDLList(recordRoot);
- Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList));
- serializer.setSourceConnector("mysql");
- }
-
- @Test
- public void testExtractDDLListTruncateTable() throws IOException {
- String record =
- "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1696944601264,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink11\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000043\",\"pos\":5719,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":5719,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1696944601,\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":5824,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"truncate table test_sink11\\\",\\\"tableChanges\\\":[]}\"}";
- JsonNode recordRoot = objectMapper.readTree(record);
- List<String> ddlSQLList = serializer.extractDDLList(recordRoot);
- Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList));
- }
-
- @Test
- public void testExtractDDLListDropTable() throws IOException {
- String record =
- "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1696944747956,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink11\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000043\",\"pos\":5901,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":5901,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1696944747,\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":6037,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"DROP TABLE `test`.`test_sink11`\\\",\\\"tableChanges\\\":[]}\"}";
- JsonNode recordRoot = objectMapper.readTree(record);
- List<String> ddlSQLList = serializer.extractDDLList(recordRoot);
- Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList));
- }
-
- @Test
- public void testExtractDDLListChangeColumn() throws IOException {
- String record =
- "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1696945030603,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000043\",\"pos\":6521,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":6521,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1696945030,\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":6661,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table test_sink change column c555 c777 bigint\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"test_sink\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10000\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c2\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c777\\\",\\\"jdbcType\\\":-5,\\\"typeName\\\":\\\"BIGINT\\\",\\\"typeExpression\\\":\\\"BIGINT\\\",\\\"charsetName\\\":null,\\\"length\\\":100,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}";
- JsonNode recordRoot = objectMapper.readTree(record);
- List<String> ddlSQLList = serializer.extractDDLList(recordRoot);
- Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList));
- }
-
- @Test
- public void testExtractDDLListModifyColumn() throws IOException {
- String record =
- "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1696945306941,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000043\",\"pos\":6738,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":6738,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1696945306,\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":6884,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table test_sink modify column c777 tinyint default 7\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"test_sink\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10000\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c2\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c777\\\",\\\"jdbcType\\\":5,\\\"typeName\\\":\\\"TINYINT\\\",\\\"typeExpression\\\":\\\"TINYINT\\\",\\\"charsetName\\\":null,\\\"length\\\":100,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"7\\\",\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}";
- JsonNode recordRoot = objectMapper.readTree(record);
- List<String> ddlSQLList = serializer.extractDDLList(recordRoot);
- Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList));
- }
-
- @Test
- public void testExtractDDLListRenameColumn() throws IOException {
- String record =
- "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691034519226,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23752,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23752,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1691034519,\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23886,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 rename column c22 to c33\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10000\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c2\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c555\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":100,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c666\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"100\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c4\\\",\\\"jdbcType\\\":-5,\\\"typeName\\\":\\\"BIGINT\\\",\\\"typeExpression\\\":\\\"BIGINT\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"555\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c199\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c33\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":7,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"100\\\",\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}";
- JsonNode recordRoot = objectMapper.readTree(record);
- List<String> ddlSQLList = serializer.extractDDLList(recordRoot);
- Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList));
- }
-
- @Test
- public void testFillOriginSchema() throws IOException {
- Map<String, FieldSchema> srcFiledSchemaMap = new LinkedHashMap<>();
- srcFiledSchemaMap.put("id", new FieldSchema("id", "INT", null, null));
- srcFiledSchemaMap.put("name", new FieldSchema("name", "VARCHAR(150)", null, null));
- srcFiledSchemaMap.put(
- "test_time", new FieldSchema("test_time", "DATETIMEV2(0)", null, null));
- srcFiledSchemaMap.put("c1", new FieldSchema("c1", "INT", "'100'", null));
-
- serializer.setSourceConnector("mysql");
- String columnsString =
- "[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":false,\"enumValues\":[]},{\"name\":\"name\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":50,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"test_time\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"position\":3,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"c1\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":4,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"100\",\"enumValues\":[]},{\"name\":\"cc\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":5,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"100\",\"enumValues\":[]}]";
- JsonNode columns = objectMapper.readTree(columnsString);
- serializer.fillOriginSchema(columns);
- Map<String, FieldSchema> originFieldSchemaMap = serializer.getOriginFieldSchemaMap();
-
- Iterator<Entry<String, FieldSchema>> originFieldSchemaIterator =
- originFieldSchemaMap.entrySet().iterator();
- for (Entry<String, FieldSchema> entry : srcFiledSchemaMap.entrySet()) {
- FieldSchema srcFiledSchema = entry.getValue();
- Entry<String, FieldSchema> originField = originFieldSchemaIterator.next();
-
- Assert.assertEquals(entry.getKey(), originField.getKey());
- Assert.assertEquals(srcFiledSchema.getName(), originField.getValue().getName());
- Assert.assertEquals(
- srcFiledSchema.getTypeString(), originField.getValue().getTypeString());
- Assert.assertEquals(
- srcFiledSchema.getDefaultValue(), originField.getValue().getDefaultValue());
- Assert.assertEquals(srcFiledSchema.getComment(), originField.getValue().getComment());
- }
- }
-
- @Test
- public void testBuildMysql2DorisTypeName() throws IOException {
- String columnInfo =
- "{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"10\",\"enumValues\":[]}";
- serializer.setSourceConnector("mysql");
- JsonNode columns = objectMapper.readTree(columnInfo);
- String dorisTypeName = serializer.buildDorisTypeName(columns);
- Assert.assertEquals(dorisTypeName, "INT");
- }
-
- @Test
- public void testBuildOracle2DorisTypeName() throws IOException {
- String columnInfo =
- "{\"name\":\"NAME\",\"jdbcType\":12,\"nativeType\":null,\"typeName\":\"VARCHAR2\",\"typeExpression\":\"VARCHAR2\",\"charsetName\":null,\"length\":128,\"scale\":null,\"position\":2,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null}";
- serializer.setSourceConnector("oracle");
- JsonNode columns = objectMapper.readTree(columnInfo);
- String dorisTypeName = serializer.buildDorisTypeName(columns);
- Assert.assertEquals(dorisTypeName, "VARCHAR(384)");
- }
-
- @Test
- public void testExtractDDLListRename() throws IOException {
- String columnInfo =
- "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1698314781975,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000046\",\"pos\":5197,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000046\\\",\\\"pos\\\":5197,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1698314781,\\\"file\\\":\\\"binlog.000046\\\",\\\"pos\\\":5331,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 rename column c3 to c333\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10000\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c2\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c333\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":10,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}";
- Map<String, FieldSchema> originFieldSchemaMap = Maps.newHashMap();
- JsonNode record = objectMapper.readTree(columnInfo);
-
- DorisOptions dorisOptions =
- DorisOptions.builder()
- .setFenodes("127.0.0.1:8030")
- .setTableIdentifier("test.t1")
- .setUsername("root")
- .setPassword("")
- .build();
- JsonDebeziumSchemaSerializer serializer =
- JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build();
- serializer.setSourceConnector("mysql");
-
- originFieldSchemaMap.put("id", new FieldSchema("id", "INT", "", ""));
- originFieldSchemaMap.put("c2", new FieldSchema("c2", "INT", "", ""));
- originFieldSchemaMap.put("c3", new FieldSchema("c3", "VARCHAR(30)", "", ""));
- serializer.setOriginFieldSchemaMap(originFieldSchemaMap);
-
- List<String> ddlList = serializer.extractDDLList(record);
- Assert.assertEquals("ALTER TABLE test.t1 RENAME COLUMN c3 c333", ddlList.get(0));
- }
-
- @Ignore
- @Test
- public void testSerializeAddColumn() throws IOException, DorisException {
- // alter table t1 add column c_1 varchar(200)
- String record =
- "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n column c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
- JsonNode recordRoot = objectMapper.readTree(record);
- boolean flag = serializer.schemaChange(recordRoot);
- Assert.assertEquals(true, flag);
-
- Field targetField = getField("c_1");
- Assert.assertNotNull(targetField);
- Assert.assertEquals("c_1", targetField.getName());
- Assert.assertEquals("VARCHAR", targetField.getType());
- }
-
- @Ignore
- @Test
- public void testSerializeDropColumn() throws IOException, DorisException {
- // alter table t1 drop column c_1;
- String ddl =
- "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663925897321,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13298,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13298,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663925897,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13422,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 drop \\\\n column c_1\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
- JsonNode recordRoot = objectMapper.readTree(ddl);
- boolean flag = serializer.schemaChange(recordRoot);
- Assert.assertEquals(true, flag);
-
- Field targetField = getField("c_1");
- Assert.assertNull(targetField);
- }
-
- private static Field getField(String column) throws DorisException {
- // get table schema
- Schema schema =
- RestService.getSchema(dorisOptions, DorisReadOptions.builder().build(), LOG);
- List<Field> properties = schema.getProperties();
- Field targetField = null;
- for (Field field : properties) {
- if (column.equals(field.getName())) {
- targetField = field;
- break;
- }
- }
- return targetField;
- }
-
- @Test
- public void testGetCdcTableIdentifier() throws Exception {
- String insert =
- "{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"d\",\"ts_ms\":1663924328869,\"transaction\":null}";
- JsonNode recordRoot = objectMapper.readTree(insert);
- String identifier = serializer.getCdcTableIdentifier(recordRoot);
- Assert.assertEquals("test.t1", identifier);
-
- String insertSchema =
- "{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"schema\":\"dbo\",\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"d\",\"ts_ms\":1663924328869,\"transaction\":null}";
- String identifierSchema =
- serializer.getCdcTableIdentifier(objectMapper.readTree(insertSchema));
- Assert.assertEquals("test.dbo.t1", identifierSchema);
-
- String ddl =
- "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
- String ddlRes = serializer.getCdcTableIdentifier(objectMapper.readTree(ddl));
- Assert.assertEquals("test.t1", ddlRes);
- }
-
- @Test
- public void testGetDorisTableIdentifier() throws Exception {
- Map<String, String> map = new HashMap<>();
- map.put("test.dbo.t1", "test.t1");
- serializer.setTableMapping(map);
- String identifier = serializer.getDorisTableIdentifier("test.dbo.t1");
- Assert.assertEquals("test.t1", identifier);
-
- identifier = serializer.getDorisTableIdentifier("test.t1");
- Assert.assertEquals("test.t1", identifier);
-
- String tmp = dorisOptions.getTableIdentifier();
- dorisOptions.setTableIdentifier(null);
- identifier = serializer.getDorisTableIdentifier("test.t1");
- Assert.assertNull(identifier);
- dorisOptions.setTableIdentifier(tmp);
- }
-
- @Test
- public void testSchemaChangeMultiTable() throws Exception {
- Map<String, String> map = new HashMap<>();
- map.put("mysql.t1", "doris.t1");
- map.put("mysql.t2", "doris.t2");
- serializer.setTableMapping(map);
- String tmp = dorisOptions.getTableIdentifier();
- dorisOptions.setTableIdentifier(null);
- String ddl1 =
- "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"mysql\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
- String ddl2 =
- "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"mysql\",\"sequence\":null,\"table\":\"t2\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
- String exceptDDL1 = "ALTER TABLE doris.t1 add COLUMN c_1 varchar(600)";
- String exceptDDL2 = "ALTER TABLE doris.t2 add COLUMN c_1 varchar(600)";
-
- Assert.assertEquals(exceptDDL1, serializer.extractDDL(objectMapper.readTree(ddl1)));
- Assert.assertEquals(exceptDDL2, serializer.extractDDL(objectMapper.readTree(ddl2)));
-
- // Assert.assertEquals(exceptDDL1, serializer.extractDDLList(objectMapper.readTree(ddl1)));
- // Assert.assertEquals(exceptDDL2, serializer.extractDDLList(objectMapper.readTree(ddl2)));
-
- dorisOptions.setTableIdentifier(tmp);
- }
-
- @Test
- @Ignore
- public void testAutoCreateTable() throws Exception {
- String record =
- "{ \"source\":{ \"version\":\"1.9.7.Final\", \"connector\":\"oracle\", \"name\":\"oracle_logminer\", \"ts_ms\":1696945825065, \"snapshot\":\"true\", \"db\":\"TESTDB\", \"sequence\":null, \"schema\":\"ADMIN\", \"table\":\"PERSONS\", \"txId\":null, \"scn\":\"1199617\", \"commit_scn\":null, \"lcr_position\":null, \"rs_id\":null, \"ssn\":0, \"redo_thread\":null }, \"databaseName\":\"TESTDB\", \"schemaName\":\"ADMIN\", \"ddl\":\"\\n CREATE TABLE \\\"ADMIN\\\".\\\"PERSONS\\\" \\n (\\t\\\"ID\\\" NUMBER(10,0), \\n\\t\\\"NAME4\\\" VARCHAR2(128) NOT NULL ENABLE, \\n\\t\\\"age4\\\" VARCHAR2(128), \\n\\t PRIMARY KEY (\\\"ID\\\") ENABLE\\n ) ;\\n \", \"tableChanges\":[ { \"type\":\"CREATE\", \"id\":\"\\\"TESTDB\\\".\\\"ADMIN\\\".\\\"PERSONS\\\"\", \"table\":{ \"defaultCharsetName\":null, \"primaryKeyColumnNames\":[ \"ID\" ], \"columns\":[ { \"name\":\"ID\", \"jdbcType\":2, \"nativeType\":null, \"typeName\":\"NUMBER\", \"typeExpression\":\"NUMBER\", \"charsetName\":null, \"length\":10, \"scale\":0, \"position\":1, \"optional\":false, \"autoIncremented\":false, \"generated\":false, \"comment\":null }, { \"name\":\"NAME4\", \"jdbcType\":12, \"nativeType\":null, \"typeName\":\"VARCHAR2\", \"typeExpression\":\"VARCHAR2\", \"charsetName\":null, \"length\":128, \"scale\":null, \"position\":2, \"optional\":false, \"autoIncremented\":false, \"generated\":false, \"comment\":null }, { \"name\":\"age4\", \"jdbcType\":12, \"nativeType\":null, \"typeName\":\"VARCHAR2\", \"typeExpression\":\"VARCHAR2\", \"charsetName\":null, \"length\":128, \"scale\":null, \"position\":3, \"optional\":true, \"autoIncremented\":false, \"generated\":false, \"comment\":null } ], \"comment\":null } } ]}";
- JsonNode recordRoot = objectMapper.readTree(record);
- dorisOptions =
- DorisOptions.builder()
- .setFenodes("127.0.0.1:8030")
- .setTableIdentifier("")
- .setUsername("root")
- .setPassword("")
- .build();
- serializer = JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build();
- serializer.setSourceConnector(SourceConnector.ORACLE.connectorName);
- TableSchema tableSchema = serializer.extractCreateTableSchema(recordRoot);
- Assert.assertEquals("TESTDB", tableSchema.getDatabase());
- Assert.assertEquals("PERSONS", tableSchema.getTable());
- Assert.assertArrayEquals(new String[] {"ID"}, tableSchema.getKeys().toArray());
- Assert.assertEquals(3, tableSchema.getFields().size());
- Assert.assertEquals("ID", tableSchema.getFields().get("ID").getName());
- Assert.assertEquals("NAME4", tableSchema.getFields().get("NAME4").getName());
- Assert.assertEquals("age4", tableSchema.getFields().get("age4").getName());
- serializer.setSourceConnector(SourceConnector.MYSQL.connectorName);
+ JsonDebeziumSchemaChange schemaChange = serializer.getJsonDebeziumSchemaChange();
+ schemaChange.setSchemaChangeManager(mockSchemaChangeManager);
+ serializer.serialize(recordValue);
+ DorisRecord serializeRecord = serializer.serialize(recordSchema);
+ Assert.assertNull(serializeRecord);
}
}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumChangeBase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumChangeBase.java
new file mode 100644
index 0000000..d88be9a
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumChangeBase.java
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.sink.writer.LoadConstants;
+import org.junit.Before;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Test base for data change and schema change. */
+public class TestJsonDebeziumChangeBase {
+
+ protected static DorisOptions dorisOptions;
+ protected Map<String, String> tableMapping = new HashMap<>();
+ protected boolean ignoreUpdateBefore = true;
+ protected String lineDelimiter = LoadConstants.LINE_DELIMITER_DEFAULT;
+ protected ObjectMapper objectMapper = new ObjectMapper();
+
+ @Before
+ public void setUp() {
+ dorisOptions =
+ DorisOptions.builder()
+ .setFenodes("127.0.0.1:8030")
+ .setTableIdentifier("test.t1")
+ .setUsername("root")
+ .setPassword("")
+ .build();
+ this.tableMapping.put("mysql.t1", "doris.t1");
+ this.tableMapping.put("mysql.t2", "doris.t2");
+ this.tableMapping.put("test.dbo.t1", "test.t1");
+ this.objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
+ JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true);
+ this.objectMapper.setNodeFactory(jsonNodeFactory);
+ }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java
new file mode 100644
index 0000000..d789807
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java
@@ -0,0 +1,190 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.NullNode;
+import org.apache.doris.flink.sink.writer.serializer.DorisRecord;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+/** Test for JsonDebeziumDataChange. */
+public class TestJsonDebeziumDataChange extends TestJsonDebeziumChangeBase {
+
+ private JsonDebeziumDataChange dataChange;
+ private JsonDebeziumChangeContext changeContext;
+
+ @Before
+ public void setUp() {
+ super.setUp();
+ changeContext =
+ new JsonDebeziumChangeContext(
+ dorisOptions,
+ tableMapping,
+ null,
+ null,
+ null,
+ objectMapper,
+ null,
+ lineDelimiter,
+ ignoreUpdateBefore);
+ dataChange = new JsonDebeziumDataChange(changeContext);
+ }
+
+ @Test
+ public void testSerializeInsert() throws IOException {
+ // insert into t1
+ // VALUES(1,"doris",'2022-01-01','2022-01-01 10:01:02','2022-01-0110:01:03');
+ String record =
+ "{\"before\":null,\"after\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663923840000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":11834,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"c\",\"ts_ms\":1663923840146,\"transaction\":null}";
+ Map<String, String> valueMap = extractValueMap(record);
+ Assert.assertEquals("1", valueMap.get("id"));
+ Assert.assertEquals("doris", valueMap.get("name"));
+ Assert.assertEquals("2022-01-01", valueMap.get("dt"));
+ Assert.assertEquals("2022-01-01 10:01:02", valueMap.get("dtime"));
+ Assert.assertEquals("2022-01-01 10:01:03", valueMap.get("ts"));
+ Assert.assertEquals("0", valueMap.get("__DORIS_DELETE_SIGN__"));
+ Assert.assertEquals(6, valueMap.size());
+ }
+
+ @Test
+ public void testSerializeUpdate() throws IOException {
+ // update t1 set name='doris-update' WHERE id =1;
+ String record =
+ "{\"before\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924082000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12154,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"u\",\"ts_ms\":1663924082186,\"transaction\":null}";
+ Map<String, String> valueMap = extractValueMap(record);
+ Assert.assertEquals("1", valueMap.get("id"));
+ Assert.assertEquals("doris-update", valueMap.get("name"));
+ Assert.assertEquals("2022-01-01", valueMap.get("dt"));
+ Assert.assertEquals("2022-01-01 10:01:02", valueMap.get("dtime"));
+ Assert.assertEquals("2022-01-01 10:01:03", valueMap.get("ts"));
+ Assert.assertEquals("0", valueMap.get("__DORIS_DELETE_SIGN__"));
+ Assert.assertEquals(6, valueMap.size());
+ }
+
+ @Test
+ public void testSerializeDelete() throws IOException {
+ String record =
+ "{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"d\",\"ts_ms\":1663924328869,\"transaction\":null}";
+ Map<String, String> valueMap = extractValueMap(record);
+ Assert.assertEquals("1", valueMap.get("id"));
+ Assert.assertEquals("doris-update", valueMap.get("name"));
+ Assert.assertEquals("2022-01-01", valueMap.get("dt"));
+ Assert.assertEquals("2022-01-01 10:01:02", valueMap.get("dtime"));
+ Assert.assertEquals("2022-01-01 10:01:03", valueMap.get("ts"));
+ Assert.assertEquals("1", valueMap.get("__DORIS_DELETE_SIGN__"));
+ Assert.assertEquals(6, valueMap.size());
+ }
+
+ @Test
+ public void testSerializeUpdateBefore() throws IOException {
+ changeContext =
+ new JsonDebeziumChangeContext(
+ dorisOptions,
+ tableMapping,
+ null,
+ null,
+ null,
+ objectMapper,
+ null,
+ lineDelimiter,
+ false);
+ dataChange = new JsonDebeziumDataChange(changeContext);
+
+ // update t1 set name='doris-update' WHERE id =1;
+ String record =
+ "{\"before\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924082000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12154,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"u\",\"ts_ms\":1663924082186,\"transaction\":null}";
+ JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
+ String op = extractJsonNode(recordRoot, "op");
+ DorisRecord dorisRecord = dataChange.serialize(record, recordRoot, op);
+ byte[] serializedValue = dorisRecord.getRow();
+ String row = new String(serializedValue, StandardCharsets.UTF_8);
+ String[] split = row.split("\n");
+ Map<String, String> valueMap =
+ objectMapper.readValue(split[1], new TypeReference<Map<String, String>>() {});
+
+ Assert.assertEquals("1", valueMap.get("id"));
+ Assert.assertEquals("doris-update", valueMap.get("name"));
+ Assert.assertEquals("2022-01-01", valueMap.get("dt"));
+ Assert.assertEquals("2022-01-01 10:01:02", valueMap.get("dtime"));
+ Assert.assertEquals("2022-01-01 10:01:03", valueMap.get("ts"));
+ Assert.assertEquals("0", valueMap.get("__DORIS_DELETE_SIGN__"));
+ Assert.assertEquals(6, valueMap.size());
+
+ Map<String, String> beforeMap =
+ objectMapper.readValue(split[0], new TypeReference<Map<String, String>>() {});
+ Assert.assertEquals("doris", beforeMap.get("name"));
+ }
+
+ private Map<String, String> extractValueMap(String record) throws IOException {
+ JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
+ String op = extractJsonNode(recordRoot, "op");
+ DorisRecord dorisRecord = dataChange.serialize(record, recordRoot, op);
+ byte[] serializedValue = dorisRecord.getRow();
+ return objectMapper.readValue(
+ new String(serializedValue, StandardCharsets.UTF_8),
+ new TypeReference<Map<String, String>>() {});
+ }
+
+ @Test
+ public void testGetCdcTableIdentifier() throws Exception {
+ String insert =
+ "{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"d\",\"ts_ms\":1663924328869,\"transaction\":null}";
+ JsonNode recordRoot = objectMapper.readTree(insert);
+ String identifier = dataChange.getCdcTableIdentifier(recordRoot);
+ Assert.assertEquals("test.t1", identifier);
+
+ String insertSchema =
+ "{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"schema\":\"dbo\",\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"d\",\"ts_ms\":1663924328869,\"transaction\":null}";
+ String identifierSchema =
+ dataChange.getCdcTableIdentifier(objectMapper.readTree(insertSchema));
+ Assert.assertEquals("test.dbo.t1", identifierSchema);
+
+ String ddl =
+ "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
+ String ddlRes = dataChange.getCdcTableIdentifier(objectMapper.readTree(ddl));
+ Assert.assertEquals("test.t1", ddlRes);
+ }
+
+ @Test
+ public void testGetDorisTableIdentifier() throws Exception {
+ String identifier = dataChange.getDorisTableIdentifier("test.dbo.t1");
+ Assert.assertEquals("test.t1", identifier);
+
+ identifier = dataChange.getDorisTableIdentifier("test.t1");
+ Assert.assertEquals("test.t1", identifier);
+
+ String tmp = dorisOptions.getTableIdentifier();
+ dorisOptions.setTableIdentifier(null);
+ identifier = dataChange.getDorisTableIdentifier("test.t1");
+ Assert.assertNull(identifier);
+ dorisOptions.setTableIdentifier(tmp);
+ }
+
+ private String extractJsonNode(JsonNode record, String key) {
+ return record != null && record.get(key) != null && !(record.get(key) instanceof NullNode)
+ ? record.get(key).asText()
+ : null;
+ }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java
new file mode 100644
index 0000000..9b003a8
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.exception.DorisException;
+import org.apache.doris.flink.rest.RestService;
+import org.apache.doris.flink.rest.models.Field;
+import org.apache.doris.flink.rest.models.Schema;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/** Test for JsonDebeziumSchemaChangeImpl. */
+public class TestJsonDebeziumSchemaChangeImpl extends TestJsonDebeziumChangeBase {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestJsonDebeziumSchemaChangeImpl.class);
+
+ private JsonDebeziumSchemaChangeImpl schemaChange;
+ private JsonDebeziumChangeContext changeContext;
+
+ @Before
+ public void setUp() {
+ super.setUp();
+ changeContext =
+ new JsonDebeziumChangeContext(
+ dorisOptions,
+ tableMapping,
+ null,
+ null,
+ null,
+ objectMapper,
+ null,
+ lineDelimiter,
+ ignoreUpdateBefore);
+ schemaChange = new JsonDebeziumSchemaChangeImpl(changeContext);
+ }
+
+ @Test
+ public void testExtractDDL() throws IOException {
+ String srcDDL = "ALTER TABLE test.t1 add COLUMN c_1 varchar(600)";
+ String record =
+ "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
+ JsonNode recordRoot = objectMapper.readTree(record);
+ String ddl = schemaChange.extractDDL(recordRoot);
+ Assert.assertEquals(srcDDL, ddl);
+
+ String targetDDL = "ALTER TABLE test.t1 add COLUMN c_1 varchar(65533)";
+ String record1 =
+ "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n c_1 varchar(30000)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
+ JsonNode recordRoot1 = objectMapper.readTree(record1);
+ String ddl1 = schemaChange.extractDDL(recordRoot1);
+ Assert.assertEquals(targetDDL, ddl1);
+ }
+
+ @Test
+ public void testSchemaChangeMultiTable() throws Exception {
+ String tmp = dorisOptions.getTableIdentifier();
+ dorisOptions.setTableIdentifier(null);
+ String ddl1 =
+ "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"mysql\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
+ String ddl2 =
+ "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"mysql\",\"sequence\":null,\"table\":\"t2\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
+ String exceptDDL1 = "ALTER TABLE doris.t1 add COLUMN c_1 varchar(600)";
+ String exceptDDL2 = "ALTER TABLE doris.t2 add COLUMN c_1 varchar(600)";
+
+ Assert.assertEquals(exceptDDL1, schemaChange.extractDDL(objectMapper.readTree(ddl1)));
+ Assert.assertEquals(exceptDDL2, schemaChange.extractDDL(objectMapper.readTree(ddl2)));
+
+ // Assert.assertEquals(exceptDDL1, serializer.extractDDLList(objectMapper.readTree(ddl1)));
+ // Assert.assertEquals(exceptDDL2, serializer.extractDDLList(objectMapper.readTree(ddl2)));
+
+ dorisOptions.setTableIdentifier(tmp);
+ }
+
+ @Ignore
+ @Test
+ public void testSerializeAddColumn() throws IOException, DorisException {
+ // alter table t1 add column c_1 varchar(200)
+ String record =
+ "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n column c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
+ JsonNode recordRoot = objectMapper.readTree(record);
+ boolean flag = schemaChange.schemaChange(recordRoot);
+ Assert.assertEquals(true, flag);
+
+ Field targetField = getField("c_1");
+ Assert.assertNotNull(targetField);
+ Assert.assertEquals("c_1", targetField.getName());
+ Assert.assertEquals("VARCHAR", targetField.getType());
+ }
+
+ @Ignore
+ @Test
+ public void testSerializeDropColumn() throws IOException, DorisException {
+ // alter table t1 drop column c_1;
+ String ddl =
+ "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663925897321,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13298,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13298,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663925897,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13422,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 drop \\\\n column c_1\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
+ JsonNode recordRoot = objectMapper.readTree(ddl);
+ boolean flag = schemaChange.schemaChange(recordRoot);
+ Assert.assertEquals(true, flag);
+
+ Field targetField = getField("c_1");
+ Assert.assertNull(targetField);
+ }
+
+ private static Field getField(String column) throws DorisException {
+ // get table schema
+ Schema schema =
+ RestService.getSchema(dorisOptions, DorisReadOptions.builder().build(), LOG);
+ List<Field> properties = schema.getProperties();
+ Field targetField = null;
+ for (Field field : properties) {
+ if (column.equals(field.getName())) {
+ targetField = field;
+ break;
+ }
+ }
+ return targetField;
+ }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
new file mode 100644
index 0000000..adc87b1
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
@@ -0,0 +1,268 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.Maps;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.doris.flink.catalog.doris.FieldSchema;
+import org.apache.doris.flink.catalog.doris.TableSchema;
+import org.apache.doris.flink.tools.cdc.SourceConnector;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/** Test for JsonDebeziumSchemaChangeImplV2. */
+public class TestJsonDebeziumSchemaChangeImplV2 extends TestJsonDebeziumChangeBase {
+
+ private JsonDebeziumSchemaChangeImplV2 schemaChange;
+ private JsonDebeziumChangeContext changeContext;
+
+ @Before
+ public void setUp() {
+ super.setUp();
+ String sourceTableName = null;
+ String targetDatabase = "TESTDB";
+ Map<String, String> tableProperties = new HashMap<>();
+ changeContext =
+ new JsonDebeziumChangeContext(
+ dorisOptions,
+ tableMapping,
+ sourceTableName,
+ targetDatabase,
+ tableProperties,
+ objectMapper,
+ null,
+ lineDelimiter,
+ ignoreUpdateBefore);
+ schemaChange = new JsonDebeziumSchemaChangeImplV2(changeContext);
+ }
+
+ @Test
+ public void testExtractDDLListMultipleColumns() throws IOException {
+ String sql0 = "ALTER TABLE test.t1 ADD COLUMN id INT DEFAULT '10000'";
+ String sql1 = "ALTER TABLE test.t1 ADD COLUMN c199 INT";
+ String sql2 = "ALTER TABLE test.t1 ADD COLUMN c12 INT DEFAULT '100'";
+ String sql3 = "ALTER TABLE test.t1 DROP COLUMN c13";
+ List<String> srcSqlList = Arrays.asList(sql0, sql1, sql2, sql3);
+
+ Map<String, FieldSchema> originFiledSchemaMap = new LinkedHashMap<>();
+ originFiledSchemaMap.put("c2", new FieldSchema());
+ originFiledSchemaMap.put("c555", new FieldSchema());
+ originFiledSchemaMap.put("c666", new FieldSchema());
+ originFiledSchemaMap.put("c4", new FieldSchema());
+ originFiledSchemaMap.put("c13", new FieldSchema());
+
+ String record =
+ "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691033764674,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23305,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23305,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1691033764,\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23464,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 drop c11, drop column c3, add c12 int default 100\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10000\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c2\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c555\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":100,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c666\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"100\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c4\\\",\\\"jdbcType\\\":-5,\\\"typeName\\\":\\\"BIGINT\\\",\\\"typeExpression\\\":\\\"BIGINT\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"555\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c199\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c12\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":7,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"100\\\",\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}";
+ JsonNode recordRoot = objectMapper.readTree(record);
+ schemaChange.setOriginFieldSchemaMap(originFiledSchemaMap);
+ List<String> ddlSQLList = schemaChange.extractDDLList(recordRoot);
+ for (int i = 0; i < ddlSQLList.size(); i++) {
+ String srcSQL = srcSqlList.get(i);
+ String targetSQL = ddlSQLList.get(i);
+ Assert.assertEquals(srcSQL, targetSQL);
+ }
+ }
+
+ @Test
+ public void testExtractDDLListCreateTable() throws IOException {
+ String record =
+ "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"oracle\",\"name\":\"oracle_logminer\",\"ts_ms\":1696945825065,\"snapshot\":\"true\",\"db\":\"HELOWIN\",\"sequence\":null,\"schema\":\"ADMIN\",\"table\":\"PERSONS\",\"txId\":null,\"scn\":\"1199617\",\"commit_scn\":null,\"lcr_position\":null,\"rs_id\":null,\"ssn\":0,\"redo_thread\":null},\"databaseName\":\"HELOWIN\",\"schemaName\":\"ADMIN\",\"ddl\":\"\\n CREATE TABLE \\\"ADMIN\\\".\\\"PERSONS\\\" \\n (\\t\\\"ID\\\" NUMBER(10,0), \\n\\t\\\"NAME4\\\" VARCHAR2(128) NOT NULL ENABLE, \\n\\t\\\"age4\\\" VARCHAR2(128), \\n\\t\\\"c100\\\" LONG, \\n\\t\\\"c55\\\" VARCHAR2(255), \\n\\t\\\"c77\\\" VARCHAR2(255), \\n\\t PRIMARY KEY (\\\"ID\\\") ENABLE\\n ) ;\\n \",\"tableChanges\":[{\"type\":\"CREATE\",\"id\":\"\\\"HELOWIN\\\".\\\"ADMIN\\\".\\\"PERSONS\\\"\",\"table\":{\"defaultCharsetName\":null,\"primaryKeyColumnNames\":[\"ID\"],\"columns\":[{\"name\":\"ID\",\"jdbcType\":2,\"nativeType\":null,\"typeName\":\"NUMBER\",\"typeExpression\":\"NUMBER\",\"charsetName\":null,\"length\":10,\"scale\":0,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null},{\"name\":\"NAME4\",\"jdbcType\":12,\"nativeType\":null,\"typeName\":\"VARCHAR2\",\"typeExpression\":\"VARCHAR2\",\"charsetName\":null,\"length\":128,\"scale\":null,\"position\":2,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null},{\"name\":\"age4\",\"jdbcType\":12,\"nativeType\":null,\"typeName\":\"VARCHAR2\",\"typeExpression\":\"VARCHAR2\",\"charsetName\":null,\"length\":128,\"scale\":null,\"position\":3,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null},{\"name\":\"c100\",\"jdbcType\":-1,\"nativeType\":null,\"typeName\":\"LONG\",\"typeExpression\":\"LONG\",\"charsetName\":null,\"length\":0,\"scale\":null,\"position\":4,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null},{\"name\":\"c55\",\"jdbcType\":12,\"nativeType\":null,\"typeName\":\"VARCHAR2\",\"typeExpression\":\"VARCHAR2\",\"charsetName\":null,\"length\":255,\"scale\":null,\"position\":5,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null},{\"name\":\"c77\",\"jdbcType\":12,\"nativeType\":null,\"typeName\":\"VARCHAR2\",\"typeExpression\":\"VARCHAR2\",\"charsetName\":null,\"length\":255,\"scale\":null,\"position\":6,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null}],\"comment\":null}}]}";
+ JsonNode recordRoot = objectMapper.readTree(record);
+ schemaChange.setSourceConnector("oracle");
+ List<String> ddlSQLList = schemaChange.extractDDLList(recordRoot);
+ Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList));
+ schemaChange.setSourceConnector("mysql");
+ }
+
+ @Test
+ public void testExtractDDLListTruncateTable() throws IOException {
+ String record =
+ "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1696944601264,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink11\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000043\",\"pos\":5719,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":5719,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1696944601,\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":5824,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"truncate table test_sink11\\\",\\\"tableChanges\\\":[]}\"}";
+ JsonNode recordRoot = objectMapper.readTree(record);
+ List<String> ddlSQLList = schemaChange.extractDDLList(recordRoot);
+ Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList));
+ }
+
+ @Test
+ public void testExtractDDLListDropTable() throws IOException {
+ String record =
+ "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1696944747956,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink11\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000043\",\"pos\":5901,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":5901,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1696944747,\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":6037,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"DROP TABLE `test`.`test_sink11`\\\",\\\"tableChanges\\\":[]}\"}";
+ JsonNode recordRoot = objectMapper.readTree(record);
+ List<String> ddlSQLList = schemaChange.extractDDLList(recordRoot);
+ Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList));
+ }
+
+ @Test
+ public void testExtractDDLListChangeColumn() throws IOException {
+ String record =
+ "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1696945030603,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000043\",\"pos\":6521,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":6521,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1696945030,\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":6661,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table test_sink change column c555 c777 bigint\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"test_sink\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10000\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c2\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c777\\\",\\\"jdbcType\\\":-5,\\\"typeName\\\":\\\"BIGINT\\\",\\\"typeExpression\\\":\\\"BIGINT\\\",\\\"charsetName\\\":null,\\\"length\\\":100,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}";
+ JsonNode recordRoot = objectMapper.readTree(record);
+ List<String> ddlSQLList = schemaChange.extractDDLList(recordRoot);
+ Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList));
+ }
+
+ @Test
+ public void testExtractDDLListModifyColumn() throws IOException {
+ String record =
+ "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1696945306941,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000043\",\"pos\":6738,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":6738,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1696945306,\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":6884,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table test_sink modify column c777 tinyint default 7\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"test_sink\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10000\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c2\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c777\\\",\\\"jdbcType\\\":5,\\\"typeName\\\":\\\"TINYINT\\\",\\\"typeExpression\\\":\\\"TINYINT\\\",\\\"charsetName\\\":null,\\\"length\\\":100,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"7\\\",\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}";
+ JsonNode recordRoot = objectMapper.readTree(record);
+ List<String> ddlSQLList = schemaChange.extractDDLList(recordRoot);
+ Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList));
+ }
+
+ @Test
+ public void testExtractDDLListRenameColumn() throws IOException {
+ String record =
+ "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691034519226,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23752,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23752,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1691034519,\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23886,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 rename column c22 to c33\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10000\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c2\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c555\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":100,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c666\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"100\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c4\\\",\\\"jdbcType\\\":-5,\\\"typeName\\\":\\\"BIGINT\\\",\\\"typeExpression\\\":\\\"BIGINT\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"555\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c199\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c33\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":7,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"100\\\",\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}";
+ JsonNode recordRoot = objectMapper.readTree(record);
+ List<String> ddlSQLList = schemaChange.extractDDLList(recordRoot);
+ Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList));
+ }
+
+ @Test
+ public void testFillOriginSchema() throws IOException {
+ Map<String, FieldSchema> srcFiledSchemaMap = new LinkedHashMap<>();
+ srcFiledSchemaMap.put("id", new FieldSchema("id", "INT", null, null));
+ srcFiledSchemaMap.put("name", new FieldSchema("name", "VARCHAR(150)", null, null));
+ srcFiledSchemaMap.put(
+ "test_time", new FieldSchema("test_time", "DATETIMEV2(0)", null, null));
+ srcFiledSchemaMap.put("c1", new FieldSchema("c1", "INT", "'100'", null));
+
+ schemaChange.setSourceConnector("mysql");
+ String columnsString =
+ "[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":false,\"enumValues\":[]},{\"name\":\"name\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":50,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"test_time\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"position\":3,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"c1\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":4,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"100\",\"enumValues\":[]},{\"name\":\"cc\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":5,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"100\",\"enumValues\":[]}]";
+ JsonNode columns = objectMapper.readTree(columnsString);
+ schemaChange.fillOriginSchema(columns);
+ Map<String, FieldSchema> originFieldSchemaMap = schemaChange.getOriginFieldSchemaMap();
+
+ Iterator<Entry<String, FieldSchema>> originFieldSchemaIterator =
+ originFieldSchemaMap.entrySet().iterator();
+ for (Entry<String, FieldSchema> entry : srcFiledSchemaMap.entrySet()) {
+ FieldSchema srcFiledSchema = entry.getValue();
+ Entry<String, FieldSchema> originField = originFieldSchemaIterator.next();
+
+ Assert.assertEquals(entry.getKey(), originField.getKey());
+ Assert.assertEquals(srcFiledSchema.getName(), originField.getValue().getName());
+ Assert.assertEquals(
+ srcFiledSchema.getTypeString(), originField.getValue().getTypeString());
+ Assert.assertEquals(
+ srcFiledSchema.getDefaultValue(), originField.getValue().getDefaultValue());
+ Assert.assertEquals(srcFiledSchema.getComment(), originField.getValue().getComment());
+ }
+ }
+
+ @Test
+ public void testBuildMysql2DorisTypeName() throws IOException {
+ String columnInfo =
+ "{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"10\",\"enumValues\":[]}";
+ schemaChange.setSourceConnector("mysql");
+ JsonNode columns = objectMapper.readTree(columnInfo);
+ String dorisTypeName = schemaChange.buildDorisTypeName(columns);
+ Assert.assertEquals(dorisTypeName, "INT");
+ }
+
+ @Test
+ public void testBuildOracle2DorisTypeName() throws IOException {
+ String columnInfo =
+ "{\"name\":\"NAME\",\"jdbcType\":12,\"nativeType\":null,\"typeName\":\"VARCHAR2\",\"typeExpression\":\"VARCHAR2\",\"charsetName\":null,\"length\":128,\"scale\":null,\"position\":2,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null}";
+ schemaChange.setSourceConnector("oracle");
+ JsonNode columns = objectMapper.readTree(columnInfo);
+ String dorisTypeName = schemaChange.buildDorisTypeName(columns);
+ Assert.assertEquals(dorisTypeName, "VARCHAR(384)");
+ }
+
+ @Test
+ public void testExtractDDLListRename() throws IOException {
+ String columnInfo =
+ "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1698314781975,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000046\",\"pos\":5197,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000046\\\",\\\"pos\\\":5197,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1698314781,\\\"file\\\":\\\"binlog.000046\\\",\\\"pos\\\":5331,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 rename column c3 to c333\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"defaultValueExpression\\\":\\\"10000\\\",\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c2\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]},{\\\"name\\\":\\\"c333\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":10,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false,\\\"comment\\\":null,\\\"hasDefaultValue\\\":true,\\\"enumValues\\\":[]}]},\\\"comment\\\":null}]}\"}";
+ Map<String, FieldSchema> originFieldSchemaMap = Maps.newHashMap();
+ JsonNode record = objectMapper.readTree(columnInfo);
+ schemaChange.setSourceConnector("mysql");
+
+ originFieldSchemaMap.put("id", new FieldSchema("id", "INT", "", ""));
+ originFieldSchemaMap.put("c2", new FieldSchema("c2", "INT", "", ""));
+ originFieldSchemaMap.put("c3", new FieldSchema("c3", "VARCHAR(30)", "", ""));
+ schemaChange.setOriginFieldSchemaMap(originFieldSchemaMap);
+
+ List<String> ddlList = schemaChange.extractDDLList(record);
+ Assert.assertEquals("ALTER TABLE test.t1 RENAME COLUMN c3 c333", ddlList.get(0));
+ }
+
+ @Test
+ public void testGetCdcTableIdentifier() throws Exception {
+ String insert =
+ "{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"d\",\"ts_ms\":1663924328869,\"transaction\":null}";
+ JsonNode recordRoot = objectMapper.readTree(insert);
+ String identifier = schemaChange.getCdcTableIdentifier(recordRoot);
+ Assert.assertEquals("test.t1", identifier);
+
+ String insertSchema =
+ "{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"schema\":\"dbo\",\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"d\",\"ts_ms\":1663924328869,\"transaction\":null}";
+ String identifierSchema =
+ schemaChange.getCdcTableIdentifier(objectMapper.readTree(insertSchema));
+ Assert.assertEquals("test.dbo.t1", identifierSchema);
+
+ String ddl =
+ "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
+ String ddlRes = schemaChange.getCdcTableIdentifier(objectMapper.readTree(ddl));
+ Assert.assertEquals("test.t1", ddlRes);
+ }
+
+ @Test
+ public void testGetDorisTableIdentifier() throws Exception {
+ String identifier = schemaChange.getDorisTableIdentifier("test.dbo.t1");
+ Assert.assertEquals("test.t1", identifier);
+
+ identifier = schemaChange.getDorisTableIdentifier("test.t1");
+ Assert.assertEquals("test.t1", identifier);
+
+ String tmp = dorisOptions.getTableIdentifier();
+ dorisOptions.setTableIdentifier(null);
+ identifier = schemaChange.getDorisTableIdentifier("test.t1");
+ Assert.assertNull(identifier);
+ dorisOptions.setTableIdentifier(tmp);
+ }
+
+ @Test
+ public void testAutoCreateTable() throws Exception {
+ String record =
+ "{ \"source\":{ \"version\":\"1.9.7.Final\", \"connector\":\"oracle\", \"name\":\"oracle_logminer\", \"ts_ms\":1696945825065, \"snapshot\":\"true\", \"db\":\"TESTDB\", \"sequence\":null, \"schema\":\"ADMIN\", \"table\":\"PERSONS\", \"txId\":null, \"scn\":\"1199617\", \"commit_scn\":null, \"lcr_position\":null, \"rs_id\":null, \"ssn\":0, \"redo_thread\":null }, \"databaseName\":\"TESTDB\", \"schemaName\":\"ADMIN\", \"ddl\":\"\\n CREATE TABLE \\\"ADMIN\\\".\\\"PERSONS\\\" \\n (\\t\\\"ID\\\" NUMBER(10,0), \\n\\t\\\"NAME4\\\" VARCHAR2(128) NOT NULL ENABLE, \\n\\t\\\"age4\\\" VARCHAR2(128), \\n\\t PRIMARY KEY (\\\"ID\\\") ENABLE\\n ) ;\\n \", \"tableChanges\":[ { \"type\":\"CREATE\", \"id\":\"\\\"TESTDB\\\".\\\"ADMIN\\\".\\\"PERSONS\\\"\", \"table\":{ \"defaultCharsetName\":null, \"primaryKeyColumnNames\":[ \"ID\" ], \"columns\":[ { \"name\":\"ID\", \"jdbcType\":2, \"nativeType\":null, \"typeName\":\"NUMBER\", \"typeExpression\":\"NUMBER\", \"charsetName\":null, \"length\":10, \"scale\":0, \"position\":1, \"optional\":false, \"autoIncremented\":false, \"generated\":false, \"comment\":null }, { \"name\":\"NAME4\", \"jdbcType\":12, \"nativeType\":null, \"typeName\":\"VARCHAR2\", \"typeExpression\":\"VARCHAR2\", \"charsetName\":null, \"length\":128, \"scale\":null, \"position\":2, \"optional\":false, \"autoIncremented\":false, \"generated\":false, \"comment\":null }, { \"name\":\"age4\", \"jdbcType\":12, \"nativeType\":null, \"typeName\":\"VARCHAR2\", \"typeExpression\":\"VARCHAR2\", \"charsetName\":null, \"length\":128, \"scale\":null, \"position\":3, \"optional\":true, \"autoIncremented\":false, \"generated\":false, \"comment\":null } ], \"comment\":null } } ]}";
+ JsonNode recordRoot = objectMapper.readTree(record);
+ schemaChange.setSourceConnector(SourceConnector.ORACLE.connectorName);
+ TableSchema tableSchema = schemaChange.extractCreateTableSchema(recordRoot);
+ Assert.assertEquals("TESTDB", tableSchema.getDatabase());
+ Assert.assertEquals("PERSONS", tableSchema.getTable());
+ Assert.assertArrayEquals(new String[] {"ID"}, tableSchema.getKeys().toArray());
+ Assert.assertEquals(3, tableSchema.getFields().size());
+ Assert.assertEquals("ID", tableSchema.getFields().get("ID").getName());
+ Assert.assertEquals("NAME4", tableSchema.getFields().get("NAME4").getName());
+ Assert.assertEquals("age4", tableSchema.getFields().get("age4").getName());
+ schemaChange.setSourceConnector(SourceConnector.MYSQL.connectorName);
+ }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
index 8987030..2f5568e 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
@@ -39,6 +39,7 @@
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
+import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
@@ -68,6 +69,7 @@
private static final String TABLE_1 = "tbl1";
private static final String TABLE_2 = "tbl2";
private static final String TABLE_3 = "tbl3";
+ private static final String TABLE_4 = "tbl4";
private static final MySQLContainer MYSQL_CONTAINER =
new MySQLContainer("mysql")
@@ -104,7 +106,8 @@
.collect(Collectors.toSet());
String sql =
"select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s ) res order by 1";
- checkResult(expected, sql, 2);
+ String query1 = String.format(sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3);
+ checkResult(expected, query1, 2);
// add incremental data
try (Connection connection =
@@ -136,7 +139,8 @@
.collect(Collectors.toSet());
sql =
"select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s ) res order by 1";
- checkResult(expected2, sql, 2);
+ String query2 = String.format(sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3);
+ checkResult(expected2, query2, 2);
// mock schema change
try (Connection connection =
@@ -162,19 +166,127 @@
Arrays.asList("doris_1_1_1", "c1_val"))
.collect(Collectors.toSet());
sql = "select * from %s.%s order by 1";
- checkResult(expected3, sql, 2);
+ String query3 = String.format(sql, DATABASE, TABLE_1);
+ checkResult(expected3, query3, 2);
jobClient.cancel().get();
}
+ @Test
+ public void testAutoAddTable() throws Exception {
+ initializeMySQLTable();
+ initializeDorisTable();
+ JobClient jobClient = submitJob();
+ // wait 2 times checkpoint
+ Thread.sleep(20000);
+ Set<List<Object>> expected =
+ Stream.<List<Object>>of(
+ Arrays.asList("doris_1", 1),
+ Arrays.asList("doris_2", 2),
+ Arrays.asList("doris_3", 3))
+ .collect(Collectors.toSet());
+ String sql =
+ "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s ) res order by 1";
+ String query1 = String.format(sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3);
+ checkResult(expected, query1, 2);
+
+ // auto create table4
+ addTableTable_4();
+ Thread.sleep(20000);
+ Set<List<Object>> expected2 =
+ Stream.<List<Object>>of(
+ Arrays.asList("doris_4_1", 4), Arrays.asList("doris_4_2", 4))
+ .collect(Collectors.toSet());
+ sql = "select * from %s.%s order by 1";
+ String query2 = String.format(sql, DATABASE, TABLE_4);
+ checkResult(expected2, query2, 2);
+
+ // add incremental data
+ try (Connection connection =
+ DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
+ Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format("insert into %s.%s values ('doris_1_1',10)", DATABASE, TABLE_1));
+ statement.execute(
+ String.format("insert into %s.%s values ('doris_2_1',11)", DATABASE, TABLE_2));
+ statement.execute(
+ String.format("insert into %s.%s values ('doris_3_1',12)", DATABASE, TABLE_3));
+ statement.execute(
+ String.format("insert into %s.%s values ('doris_4_3',43)", DATABASE, TABLE_4));
+
+ statement.execute(
+ String.format(
+ "update %s.%s set age=18 where name='doris_1'", DATABASE, TABLE_1));
+ statement.execute(
+ String.format("delete from %s.%s where name='doris_2'", DATABASE, TABLE_2));
+ statement.execute(
+ String.format("delete from %s.%s where name='doris_4_2'", DATABASE, TABLE_4));
+ statement.execute(
+ String.format(
+ "update %s.%s set age=41 where name='doris_4_1'", DATABASE, TABLE_4));
+ }
+
+ Thread.sleep(20000);
+ Set<List<Object>> expected3 =
+ Stream.<List<Object>>of(
+ Arrays.asList("doris_1", 18),
+ Arrays.asList("doris_1_1", 10),
+ Arrays.asList("doris_2_1", 11),
+ Arrays.asList("doris_3", 3),
+ Arrays.asList("doris_3_1", 12),
+ Arrays.asList("doris_4_1", 41),
+ Arrays.asList("doris_4_3", 43))
+ .collect(Collectors.toSet());
+ sql =
+ "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s union all select * from %s.%s ) res order by 1";
+ String query3 =
+ String.format(
+ sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3, DATABASE,
+ TABLE_4);
+ checkResult(expected3, query3, 2);
+
+ // mock schema change
+ try (Connection connection =
+ DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
+ Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "alter table %s.%s add column c1 varchar(128)", DATABASE, TABLE_4));
+ statement.execute(
+ String.format("alter table %s.%s drop column age", DATABASE, TABLE_4));
+ Thread.sleep(20000);
+ statement.execute(
+ String.format(
+ "insert into %s.%s values ('doris_4_4','c1_val')", DATABASE, TABLE_4));
+ }
+ Thread.sleep(20000);
+ Set<List<Object>> expected4 =
+ Stream.<List<Object>>of(
+ Arrays.asList("doris_4_1", null),
+ Arrays.asList("doris_4_3", null),
+ Arrays.asList("doris_4_4", "c1_val"))
+ .collect(Collectors.toSet());
+ sql = "select * from %s.%s order by 1";
+ String query4 = String.format(sql, DATABASE, TABLE_4);
+ checkResult(expected4, query4, 2);
+ jobClient.cancel().get();
+ }
+
+ private void initializeDorisTable() throws Exception {
+ try (Statement statement = connection.createStatement()) {
+ statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_1));
+ statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_2));
+ statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_3));
+ statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_4));
+ }
+ }
+
public void checkResult(Set<List<Object>> expected, String query, int columnSize)
throws Exception {
Set<List<Object>> actual = new HashSet<>();
try (Statement sinkStatement = connection.createStatement()) {
- ResultSet sinkResultSet =
- sinkStatement.executeQuery(
- String.format(
- query, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE,
- TABLE_3));
+ ResultSet sinkResultSet = sinkStatement.executeQuery(query);
while (sinkResultSet.next()) {
List<Object> row = new ArrayList<>();
for (int i = 1; i <= columnSize; i++) {
@@ -218,7 +330,7 @@
Map<String, String> tableConfig = new HashMap<>();
tableConfig.put("replication_num", "1");
- String includingTables = "tbl1|tbl2|tbl3";
+ String includingTables = "tbl.*";
String excludingTables = "";
DatabaseSync databaseSync = new MysqlDatabaseSync();
databaseSync
@@ -232,6 +344,7 @@
.setTableConfig(tableConfig)
.setCreateTableOnly(false)
.setNewSchemaChange(true)
+ .setSingleSink(true)
.create();
databaseSync.build();
JobClient jobClient = env.executeAsync();
@@ -242,6 +355,28 @@
return jobClient;
}
+ private void addTableTable_4() throws SQLException {
+ try (Connection connection =
+ DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
+ Statement statement = connection.createStatement()) {
+ statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_4));
+ statement.execute(
+ String.format(
+ "CREATE TABLE %s.%s ( \n"
+ + "`name` varchar(256) primary key,\n"
+ + "`age` int\n"
+ + ")",
+ DATABASE, TABLE_4));
+
+ // mock stock data
+ statement.execute(
+ String.format("insert into %s.%s values ('doris_4_1',4)", DATABASE, TABLE_4));
+ statement.execute(
+ String.format("insert into %s.%s values ('doris_4_2',4)", DATABASE, TABLE_4));
+ }
+ }
+
public void initializeMySQLTable() throws Exception {
try (Connection connection =
DriverManager.getConnection(