blob: 37390ce128947ca66b727f42d1116b9067c0d4b5 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.flink.sink.writer.serializer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.NullNode;
import org.apache.commons.collections.CollectionUtils;
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.exception.IllegalArgumentException;
import org.apache.doris.flink.sink.schema.SchemaChangeHelper;
import org.apache.doris.flink.sink.schema.SchemaChangeHelper.DDLSchema;
import org.apache.doris.flink.sink.schema.SchemaChangeManager;
import org.apache.doris.flink.tools.cdc.SourceConnector;
import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
import org.apache.doris.flink.tools.cdc.oracle.OracleType;
import org.apache.doris.flink.tools.cdc.postgres.PostgresType;
import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.apache.doris.flink.sink.util.DeleteOperation.addDeleteSign;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<String> {
private static final Logger LOG = LoggerFactory.getLogger(JsonDebeziumSchemaSerializer.class);
private static final String OP_READ = "r"; // snapshot read
private static final String OP_CREATE = "c"; // insert
private static final String OP_UPDATE = "u"; // update
private static final String OP_DELETE = "d"; // delete
public static final String EXECUTE_DDL = "ALTER TABLE %s %s COLUMN %s %s"; // alter table tbl add cloumn aca int
private static final String addDropDDLRegex
= "ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*";
private static final Pattern renameDDLPattern = Pattern.compile(
"ALTER\\s+TABLE\\s+(\\w+)\\s+RENAME\\s+COLUMN\\s+(\\w+)\\s+TO\\s+(\\w+)", Pattern.CASE_INSENSITIVE);
private final Pattern addDropDDLPattern;
private DorisOptions dorisOptions;
private ObjectMapper objectMapper = new ObjectMapper();
private String database;
private String table;
// table name of the cdc upstream, format is db.tbl
private String sourceTableName;
private boolean firstLoad;
private boolean firstSchemaChange;
private Map<String, FieldSchema> originFieldSchemaMap;
private final boolean newSchemaChange;
private String lineDelimiter = LINE_DELIMITER_DEFAULT;
private boolean ignoreUpdateBefore = true;
private SourceConnector sourceConnector;
private SchemaChangeManager schemaChangeManager;
public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions,
Pattern pattern,
String sourceTableName,
boolean newSchemaChange) {
this.dorisOptions = dorisOptions;
this.addDropDDLPattern = pattern == null ? Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE) : pattern;
String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
this.database = tableInfo[0];
this.table = tableInfo[1];
this.sourceTableName = sourceTableName;
// Prevent loss of decimal data precision
this.objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true);
this.objectMapper.setNodeFactory(jsonNodeFactory);
this.newSchemaChange = newSchemaChange;
this.firstLoad = true;
this.firstSchemaChange = true;
this.schemaChangeManager = new SchemaChangeManager(dorisOptions);
}
public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions,
Pattern pattern,
String sourceTableName,
boolean newSchemaChange,
DorisExecutionOptions executionOptions) {
this(dorisOptions, pattern, sourceTableName, newSchemaChange);
if (executionOptions != null) {
this.lineDelimiter = executionOptions.getStreamLoadProp()
.getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT);
this.ignoreUpdateBefore = executionOptions.getIgnoreUpdateBefore();
}
}
@Override
public Tuple2<String, byte[]> serialize(String record) throws IOException {
LOG.debug("received debezium json data {} :", record);
JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
String op = extractJsonNode(recordRoot, "op");
if (Objects.isNull(op)) {
// schema change ddl
if (newSchemaChange) {
schemaChangeV2(recordRoot);
} else {
schemaChange(recordRoot);
}
return null;
}
if (newSchemaChange && firstLoad) {
initOriginFieldSchema(recordRoot);
}
Map<String, Object> valueMap;
switch (op) {
case OP_READ:
case OP_CREATE:
valueMap = extractAfterRow(recordRoot);
addDeleteSign(valueMap, false);
break;
case OP_UPDATE:
return Tuple2.of(null, extractUpdate(recordRoot));
case OP_DELETE:
valueMap = extractBeforeRow(recordRoot);
addDeleteSign(valueMap, true);
break;
default:
LOG.error("parse record fail, unknown op {} in {}", op, record);
return null;
}
return Tuple2.of(null, objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8));
}
/**
* Change the update event into two
*
* @param recordRoot
* @return
*/
private byte[] extractUpdate(JsonNode recordRoot) throws JsonProcessingException {
StringBuilder updateRow = new StringBuilder();
if (!ignoreUpdateBefore) {
// convert delete
Map<String, Object> beforeRow = extractBeforeRow(recordRoot);
addDeleteSign(beforeRow, true);
updateRow.append(objectMapper.writeValueAsString(beforeRow))
.append(this.lineDelimiter);
}
// convert insert
Map<String, Object> afterRow = extractAfterRow(recordRoot);
addDeleteSign(afterRow, false);
updateRow.append(objectMapper.writeValueAsString(afterRow));
return updateRow.toString().getBytes(StandardCharsets.UTF_8);
}
public boolean schemaChangeV2(JsonNode recordRoot) {
boolean status = false;
try {
if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)) {
return false;
}
List<String> ddlSqlList = extractDDLList(recordRoot);
if (CollectionUtils.isEmpty(ddlSqlList)) {
LOG.info("ddl can not do schema change:{}", recordRoot);
return false;
}
List<DDLSchema> ddlSchemas = SchemaChangeHelper.getDdlSchemas();
for (int i = 0; i < ddlSqlList.size(); i++) {
DDLSchema ddlSchema = ddlSchemas.get(i);
String ddlSql = ddlSqlList.get(i);
boolean doSchemaChange = checkSchemaChange(ddlSchema);
status = doSchemaChange && schemaChangeManager.execute(ddlSql, database);
LOG.info("schema change status:{}, ddl:{}", status, ddlSql);
}
} catch (Exception ex) {
LOG.warn("schema change error :", ex);
}
return status;
}
@VisibleForTesting
public List<String> extractDDLList(JsonNode record) throws JsonProcessingException {
JsonNode historyRecord = extractHistoryRecord(record);
JsonNode tableChanges = historyRecord.get("tableChanges");
String ddl = extractJsonNode(historyRecord, "ddl");
if (Objects.isNull(tableChanges) || Objects.isNull(ddl)) {
return new ArrayList<>();
}
LOG.debug("received debezium ddl :{}", ddl);
JsonNode tableChange = tableChanges.get(0);
if (Objects.isNull(tableChange) || !tableChange.get("type").asText().equals("ALTER")) {
return null;
}
JsonNode columns = tableChange.get("table").get("columns");
if (firstSchemaChange) {
sourceConnector = SourceConnector.valueOf(record.get("source").get("connector").asText().toUpperCase());
fillOriginSchema(columns);
}
// rename ddl
Matcher renameMatcher = renameDDLPattern.matcher(ddl);
if (renameMatcher.find()) {
String oldColumnName = renameMatcher.group(2);
String newColumnName = renameMatcher.group(3);
return SchemaChangeHelper.generateRenameDDLSql(
dorisOptions.getTableIdentifier(), oldColumnName, newColumnName, originFieldSchemaMap);
}
// add/drop ddl
Map<String, FieldSchema> updateFiledSchema = new LinkedHashMap<>();
for (JsonNode column : columns) {
buildFieldSchema(updateFiledSchema, column);
}
SchemaChangeHelper.compareSchema(updateFiledSchema, originFieldSchemaMap);
// In order to avoid other source table column change operations other than add/drop/rename,
// which may lead to the accidental deletion of the doris column.
Matcher matcher = addDropDDLPattern.matcher(ddl);
if (!matcher.find()) {
return null;
}
return SchemaChangeHelper.generateDDLSql(dorisOptions.getTableIdentifier());
}
@VisibleForTesting
public void setOriginFieldSchemaMap(Map<String, FieldSchema> originFieldSchemaMap) {
this.originFieldSchemaMap = originFieldSchemaMap;
}
@VisibleForTesting
public boolean schemaChange(JsonNode recordRoot) {
boolean status = false;
try {
if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)) {
return false;
}
String ddl = extractDDL(recordRoot);
if (StringUtils.isNullOrWhitespaceOnly(ddl)) {
LOG.info("ddl can not do schema change:{}", recordRoot);
return false;
}
boolean doSchemaChange = checkSchemaChange(ddl);
status = doSchemaChange && schemaChangeManager.execute(ddl, database);
LOG.info("schema change status:{}", status);
} catch (Exception ex) {
LOG.warn("schema change error :", ex);
}
return status;
}
/**
* When cdc synchronizes multiple tables, it will capture multiple table schema changes
*/
protected boolean checkTable(JsonNode recordRoot) {
String db = extractDatabase(recordRoot);
String tbl = extractTable(recordRoot);
String dbTbl = db + "." + tbl;
return sourceTableName.equals(dbTbl);
}
private boolean checkSchemaChange(String ddl) throws IOException, IllegalArgumentException {
Map<String, Object> param = buildRequestParam(ddl);
return schemaChangeManager.checkSchemaChange(database, table, param);
}
private boolean checkSchemaChange(DDLSchema ddlSchema) throws IOException, IllegalArgumentException {
Map<String, Object> param = SchemaChangeManager.buildRequestParam(ddlSchema.isDropColumn(), ddlSchema.getColumnName());
return schemaChangeManager.checkSchemaChange(database, table, param);
}
/**
* Build param
* {
* "isDropColumn": true,
* "columnName" : "column"
* }
*/
protected Map<String, Object> buildRequestParam(String ddl) {
Map<String, Object> params = new HashMap<>();
Matcher matcher = addDropDDLPattern.matcher(ddl);
if (matcher.find()) {
String op = matcher.group(1);
String col = matcher.group(3);
params.put("isDropColumn", op.equalsIgnoreCase("DROP"));
params.put("columnName", col);
}
return params;
}
protected String extractDatabase(JsonNode record) {
if (record.get("source").has("schema")) {
// compatible with schema
return extractJsonNode(record.get("source"), "schema");
} else {
return extractJsonNode(record.get("source"), "db");
}
}
protected String extractTable(JsonNode record) {
return extractJsonNode(record.get("source"), "table");
}
private String extractJsonNode(JsonNode record, String key) {
return record != null && record.get(key) != null &&
!(record.get(key) instanceof NullNode) ? record.get(key).asText() : null;
}
private Map<String, Object> extractBeforeRow(JsonNode record) {
return extractRow(record.get("before"));
}
private Map<String, Object> extractAfterRow(JsonNode record) {
return extractRow(record.get("after"));
}
private Map<String, Object> extractRow(JsonNode recordRow) {
Map<String, Object> recordMap = objectMapper.convertValue(recordRow, new TypeReference<Map<String, Object>>() {
});
return recordMap != null ? recordMap : new HashMap<>();
}
private JsonNode extractHistoryRecord(JsonNode record) throws JsonProcessingException {
if (record.has("historyRecord")) {
return objectMapper.readTree(record.get("historyRecord").asText());
}
// The ddl passed by some scenes will not be included in the historyRecord, such as DebeziumSourceFunction
return record;
}
public String extractDDL(JsonNode record) throws JsonProcessingException {
JsonNode historyRecord = extractHistoryRecord(record);
String ddl = extractJsonNode(historyRecord, "ddl");
LOG.debug("received debezium ddl :{}", ddl);
if (!Objects.isNull(ddl)) {
// filter add/drop operation
Matcher matcher = addDropDDLPattern.matcher(ddl);
if (matcher.find()) {
String op = matcher.group(1);
String col = matcher.group(3);
String type = matcher.group(5);
type = handleType(type);
ddl = String.format(EXECUTE_DDL, dorisOptions.getTableIdentifier(), op, col, type);
LOG.info("parse ddl:{}", ddl);
return ddl;
}
}
return null;
}
@VisibleForTesting
public void fillOriginSchema(JsonNode columns) {
if (Objects.nonNull(originFieldSchemaMap)) {
for (JsonNode column : columns) {
String fieldName = column.get("name").asText();
if (originFieldSchemaMap.containsKey(fieldName)) {
String dorisTypeName = buildDorisTypeName(column);
String defaultValue = handleDefaultValue(extractJsonNode(column, "defaultValueExpression"));
String comment = extractJsonNode(column, "comment");
FieldSchema fieldSchema = originFieldSchemaMap.get(fieldName);
fieldSchema.setName(fieldName);
fieldSchema.setTypeString(dorisTypeName);
fieldSchema.setComment(comment);
fieldSchema.setDefaultValue(defaultValue);
}
}
} else {
LOG.error("Current schema change failed! You need to ensure that "
+ "there is data in the table." + dorisOptions.getTableIdentifier());
originFieldSchemaMap = new LinkedHashMap<>();
columns.forEach(column -> buildFieldSchema(originFieldSchemaMap, column));
}
firstSchemaChange = false;
firstLoad = false;
}
private void buildFieldSchema(Map<String, FieldSchema> filedSchemaMap, JsonNode column) {
String fieldName = column.get("name").asText();
String dorisTypeName = buildDorisTypeName(column);
String defaultValue = handleDefaultValue(extractJsonNode(column, "defaultValueExpression"));
String comment = extractJsonNode(column, "comment");
filedSchemaMap.put(fieldName, new FieldSchema(fieldName, dorisTypeName, defaultValue, comment));
}
@VisibleForTesting
public String buildDorisTypeName(JsonNode column) {
int length = column.get("length") == null ? 0 : column.get("length").asInt();
int scale = column.get("scale") == null ? 0 : column.get("scale").asInt();
String sourceTypeName = column.get("typeName").asText();
String dorisTypeName;
switch (sourceConnector) {
case MYSQL:
dorisTypeName = MysqlType.toDorisType(sourceTypeName, length, scale);
break;
case ORACLE:
dorisTypeName = OracleType.toDorisType(sourceTypeName, length, scale);
break;
case POSTGRES:
dorisTypeName = PostgresType.toDorisType(sourceTypeName, length, scale);
break;
case SQLSERVER:
dorisTypeName = SqlServerType.toDorisType(sourceTypeName, length, scale);
break;
default:
String errMsg = "Not support " + sourceTypeName + " schema change.";
throw new UnsupportedOperationException(errMsg);
}
return dorisTypeName;
}
private String handleDefaultValue(String defaultValue) {
if (StringUtils.isNullOrWhitespaceOnly(defaultValue)) {
return null;
}
// Due to historical reasons, doris needs to add quotes to the default value of the new column
// For example in mysql: alter table add column c1 int default 100
// In Doris: alter table add column c1 int default '100'
if (Pattern.matches("['\"].*?['\"]", defaultValue)) {
return defaultValue;
} else if (defaultValue.equals("1970-01-01 00:00:00")) {
// TODO: The default value of setting the current time in CDC is 1970-01-01 00:00:00
return "current_timestamp";
}
return "'" + defaultValue + "'";
}
private void initOriginFieldSchema(JsonNode recordRoot) {
originFieldSchemaMap = new LinkedHashMap<>();
Set<String> columnNameSet = extractAfterRow(recordRoot).keySet();
if (CollectionUtils.isEmpty(columnNameSet)) {
columnNameSet = extractBeforeRow(recordRoot).keySet();
}
columnNameSet.forEach(columnName -> originFieldSchemaMap.put(columnName, new FieldSchema()));
firstLoad = false;
}
@VisibleForTesting
public Map<String, FieldSchema> getOriginFieldSchemaMap() {
return originFieldSchemaMap;
}
@VisibleForTesting
public void setSourceConnector(String sourceConnector) {
this.sourceConnector = SourceConnector.valueOf(sourceConnector.toUpperCase());
}
public static JsonDebeziumSchemaSerializer.Builder builder() {
return new JsonDebeziumSchemaSerializer.Builder();
}
/**
* Builder for JsonDebeziumSchemaSerializer.
*/
public static class Builder {
private DorisOptions dorisOptions;
private Pattern addDropDDLPattern;
private String sourceTableName;
private boolean newSchemaChange;
private DorisExecutionOptions executionOptions;
public JsonDebeziumSchemaSerializer.Builder setDorisOptions(DorisOptions dorisOptions) {
this.dorisOptions = dorisOptions;
return this;
}
public JsonDebeziumSchemaSerializer.Builder setNewSchemaChange(boolean newSchemaChange) {
this.newSchemaChange = newSchemaChange;
return this;
}
public JsonDebeziumSchemaSerializer.Builder setPattern(Pattern addDropDDLPattern) {
this.addDropDDLPattern = addDropDDLPattern;
return this;
}
public JsonDebeziumSchemaSerializer.Builder setSourceTableName(String sourceTableName) {
this.sourceTableName = sourceTableName;
return this;
}
public Builder setExecutionOptions(DorisExecutionOptions executionOptions) {
this.executionOptions = executionOptions;
return this;
}
public JsonDebeziumSchemaSerializer build() {
return new JsonDebeziumSchemaSerializer(dorisOptions, addDropDDLPattern, sourceTableName, newSchemaChange,
executionOptions);
}
}
private String handleType(String type) {
if (type == null || "".equals(type)) {
return "";
}
// varchar len * 3
Pattern pattern = Pattern.compile("varchar\\(([1-9][0-9]*)\\)", Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(type);
if (matcher.find()) {
String len = matcher.group(1);
return String.format("varchar(%d)", Math.min(Integer.parseInt(len) * 3, 65533));
}
return type;
}
}