blob: b3a90e60775b75271a7b84fde9080c2fe093a6f9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.commons.collections.CollectionUtils;
import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.catalog.doris.TableSchema;
import org.apache.doris.flink.exception.IllegalArgumentException;
import org.apache.doris.flink.sink.schema.SchemaChangeHelper;
import org.apache.doris.flink.sink.schema.SchemaChangeHelper.DDLSchema;
import org.apache.doris.flink.sink.schema.SchemaChangeManager;
import org.apache.doris.flink.sink.writer.EventType;
import org.apache.doris.flink.tools.cdc.SourceConnector;
import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
import org.apache.doris.flink.tools.cdc.oracle.OracleType;
import org.apache.doris.flink.tools.cdc.postgres.PostgresType;
import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Extract the columns that need to be changed based on the change records of the upstream data
* source.
*/
public class JsonDebeziumSchemaChangeImplV2 extends JsonDebeziumSchemaChange {
private static final Logger LOG = LoggerFactory.getLogger(JsonDebeziumSchemaChangeImplV2.class);
private static final Pattern renameDDLPattern =
Pattern.compile(
"ALTER\\s+TABLE\\s+(\\w+)\\s+RENAME\\s+COLUMN\\s+(\\w+)\\s+TO\\s+(\\w+)",
Pattern.CASE_INSENSITIVE);
private Map<String, FieldSchema> originFieldSchemaMap;
private SourceConnector sourceConnector;
// create table properties
private final Map<String, String> tableProperties;
private String targetDatabase;
private String targetTablePrefix;
private String targetTableSuffix;
public JsonDebeziumSchemaChangeImplV2(JsonDebeziumChangeContext changeContext) {
this.addDropDDLPattern = Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE);
this.changeContext = changeContext;
this.sourceTableName = changeContext.getSourceTableName();
this.dorisOptions = changeContext.getDorisOptions();
this.schemaChangeManager = new SchemaChangeManager(dorisOptions);
this.targetDatabase = changeContext.getTargetDatabase();
this.tableProperties = changeContext.getTableProperties();
this.tableMapping = changeContext.getTableMapping();
this.objectMapper = changeContext.getObjectMapper();
this.targetTablePrefix =
changeContext.getTargetTablePrefix() == null
? ""
: changeContext.getTargetTablePrefix();
this.targetTableSuffix =
changeContext.getTargetTableSuffix() == null
? ""
: changeContext.getTargetTableSuffix();
}
@Override
public void init(JsonNode recordRoot) {
originFieldSchemaMap = new LinkedHashMap<>();
Set<String> columnNameSet = extractAfterRow(recordRoot).keySet();
if (CollectionUtils.isEmpty(columnNameSet)) {
columnNameSet = extractBeforeRow(recordRoot).keySet();
}
columnNameSet.forEach(
columnName -> originFieldSchemaMap.put(columnName, new FieldSchema()));
}
@Override
public boolean schemaChange(JsonNode recordRoot) {
boolean status = false;
try {
if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)) {
return false;
}
EventType eventType = extractEventType(recordRoot);
if (eventType == null) {
return false;
}
if (eventType.equals(EventType.CREATE)) {
TableSchema tableSchema = extractCreateTableSchema(recordRoot);
status = schemaChangeManager.createTable(tableSchema);
if (status) {
String cdcTbl = getCdcTableIdentifier(recordRoot);
String dorisTbl = getCreateTableIdentifier(recordRoot);
changeContext.getTableMapping().put(cdcTbl, dorisTbl);
this.tableMapping = changeContext.getTableMapping();
LOG.info("create table ddl status: {}", status);
}
} else if (eventType.equals(EventType.ALTER)) {
// db,table
Tuple2<String, String> tuple = getDorisTableTuple(recordRoot);
if (tuple == null) {
return false;
}
List<String> ddlSqlList = extractDDLList(recordRoot);
if (CollectionUtils.isEmpty(ddlSqlList)) {
LOG.info("ddl can not do schema change:{}", recordRoot);
return false;
}
List<DDLSchema> ddlSchemas = SchemaChangeHelper.getDdlSchemas();
for (int i = 0; i < ddlSqlList.size(); i++) {
DDLSchema ddlSchema = ddlSchemas.get(i);
String ddlSql = ddlSqlList.get(i);
boolean doSchemaChange = checkSchemaChange(tuple.f0, tuple.f1, ddlSchema);
status = doSchemaChange && schemaChangeManager.execute(ddlSql, tuple.f0);
LOG.info("schema change status:{}, ddl:{}", status, ddlSql);
}
} else {
LOG.info("Unsupported event type {}", eventType);
}
} catch (Exception ex) {
LOG.warn("schema change error :", ex);
}
return status;
}
private JsonNode extractTableChange(JsonNode record) throws JsonProcessingException {
JsonNode historyRecord = extractHistoryRecord(record);
JsonNode tableChanges = historyRecord.get("tableChanges");
if (!Objects.isNull(tableChanges)) {
JsonNode tableChange = tableChanges.get(0);
return tableChange;
}
return null;
}
/** Parse Alter Event. */
@VisibleForTesting
public List<String> extractDDLList(JsonNode record) throws IOException {
String dorisTable = getDorisTableIdentifier(record);
JsonNode historyRecord = extractHistoryRecord(record);
String ddl = extractJsonNode(historyRecord, "ddl");
JsonNode tableChange = extractTableChange(record);
EventType eventType = extractEventType(record);
if (Objects.isNull(tableChange)
|| Objects.isNull(ddl)
|| !eventType.equals(EventType.ALTER)) {
return null;
}
JsonNode columns = tableChange.get("table").get("columns");
if (Objects.isNull(sourceConnector)) {
sourceConnector =
SourceConnector.valueOf(
record.get("source").get("connector").asText().toUpperCase());
fillOriginSchema(columns);
}
// rename ddl
Matcher renameMatcher = renameDDLPattern.matcher(ddl);
if (renameMatcher.find()) {
String oldColumnName = renameMatcher.group(2);
String newColumnName = renameMatcher.group(3);
return SchemaChangeHelper.generateRenameDDLSql(
dorisTable, oldColumnName, newColumnName, originFieldSchemaMap);
}
// add/drop ddl
Map<String, FieldSchema> updateFiledSchema = new LinkedHashMap<>();
for (JsonNode column : columns) {
buildFieldSchema(updateFiledSchema, column);
}
SchemaChangeHelper.compareSchema(updateFiledSchema, originFieldSchemaMap);
// In order to avoid other source table column change operations other than add/drop/rename,
// which may lead to the accidental deletion of the doris column.
Matcher matcher = addDropDDLPattern.matcher(ddl);
if (!matcher.find()) {
return null;
}
return SchemaChangeHelper.generateDDLSql(dorisTable);
}
@VisibleForTesting
public TableSchema extractCreateTableSchema(JsonNode record) throws JsonProcessingException {
if (sourceConnector == null) {
sourceConnector =
SourceConnector.valueOf(
record.get("source").get("connector").asText().toUpperCase());
}
String dorisTable = getCreateTableIdentifier(record);
JsonNode tableChange = extractTableChange(record);
JsonNode pkColumns = tableChange.get("table").get("primaryKeyColumnNames");
JsonNode columns = tableChange.get("table").get("columns");
JsonNode comment = tableChange.get("table").get("comment");
String tblComment = comment == null ? "" : comment.asText();
Map<String, FieldSchema> field = new LinkedHashMap<>();
for (JsonNode column : columns) {
buildFieldSchema(field, column);
}
List<String> pkList = new ArrayList<>();
for (JsonNode column : pkColumns) {
String fieldName = column.asText();
pkList.add(fieldName);
}
TableSchema tableSchema = new TableSchema();
tableSchema.setFields(field);
tableSchema.setKeys(pkList);
tableSchema.setDistributeKeys(buildDistributeKeys(pkList, field));
tableSchema.setTableComment(tblComment);
tableSchema.setProperties(tableProperties);
tableSchema.setModel(pkList.isEmpty() ? DataModel.DUPLICATE : DataModel.UNIQUE);
String[] split = dorisTable.split("\\.");
Preconditions.checkArgument(split.length == 2);
tableSchema.setDatabase(split[0]);
tableSchema.setTable(split[1]);
return tableSchema;
}
private List<String> buildDistributeKeys(
List<String> primaryKeys, Map<String, FieldSchema> fields) {
if (!CollectionUtil.isNullOrEmpty(primaryKeys)) {
return primaryKeys;
}
if (!fields.isEmpty()) {
Map.Entry<String, FieldSchema> firstField = fields.entrySet().iterator().next();
return Collections.singletonList(firstField.getKey());
}
return new ArrayList<>();
}
private String getCreateTableIdentifier(JsonNode record) {
String table = extractJsonNode(record.get("source"), "table");
return targetDatabase + "." + targetTablePrefix + table + targetTableSuffix;
}
private boolean checkSchemaChange(String database, String table, DDLSchema ddlSchema)
throws IOException, IllegalArgumentException {
Map<String, Object> param =
SchemaChangeManager.buildRequestParam(
ddlSchema.isDropColumn(), ddlSchema.getColumnName());
return schemaChangeManager.checkSchemaChange(database, table, param);
}
/** Parse event type. */
protected EventType extractEventType(JsonNode record) throws JsonProcessingException {
JsonNode tableChange = extractTableChange(record);
if (tableChange == null || tableChange.get("type") == null) {
return null;
}
String type = tableChange.get("type").asText();
if (EventType.ALTER.toString().equalsIgnoreCase(type)) {
return EventType.ALTER;
} else if (EventType.CREATE.toString().equalsIgnoreCase(type)) {
return EventType.CREATE;
}
return null;
}
private Map<String, Object> extractBeforeRow(JsonNode record) {
return extractRow(record.get("before"));
}
private Map<String, Object> extractAfterRow(JsonNode record) {
return extractRow(record.get("after"));
}
private Map<String, Object> extractRow(JsonNode recordRow) {
Map<String, Object> recordMap =
objectMapper.convertValue(recordRow, new TypeReference<Map<String, Object>>() {});
return recordMap != null ? recordMap : new HashMap<>();
}
@VisibleForTesting
public void fillOriginSchema(JsonNode columns) {
if (Objects.nonNull(originFieldSchemaMap)) {
for (JsonNode column : columns) {
String fieldName = column.get("name").asText();
if (originFieldSchemaMap.containsKey(fieldName)) {
String dorisTypeName = buildDorisTypeName(column);
String defaultValue =
handleDefaultValue(extractJsonNode(column, "defaultValueExpression"));
String comment = extractJsonNode(column, "comment");
FieldSchema fieldSchema = originFieldSchemaMap.get(fieldName);
fieldSchema.setName(fieldName);
fieldSchema.setTypeString(dorisTypeName);
fieldSchema.setComment(comment);
fieldSchema.setDefaultValue(defaultValue);
}
}
} else {
LOG.error(
"Current schema change failed! You need to ensure that "
+ "there is data in the table."
+ dorisOptions.getTableIdentifier());
originFieldSchemaMap = new LinkedHashMap<>();
columns.forEach(column -> buildFieldSchema(originFieldSchemaMap, column));
}
}
private void buildFieldSchema(Map<String, FieldSchema> filedSchemaMap, JsonNode column) {
String fieldName = column.get("name").asText();
String dorisTypeName = buildDorisTypeName(column);
String defaultValue = handleDefaultValue(extractJsonNode(column, "defaultValueExpression"));
String comment = extractJsonNode(column, "comment");
filedSchemaMap.put(
fieldName, new FieldSchema(fieldName, dorisTypeName, defaultValue, comment));
}
@VisibleForTesting
public String buildDorisTypeName(JsonNode column) {
int length = column.get("length") == null ? 0 : column.get("length").asInt();
int scale = column.get("scale") == null ? 0 : column.get("scale").asInt();
String sourceTypeName = column.get("typeName").asText();
String dorisTypeName;
switch (sourceConnector) {
case MYSQL:
dorisTypeName = MysqlType.toDorisType(sourceTypeName, length, scale);
break;
case ORACLE:
dorisTypeName = OracleType.toDorisType(sourceTypeName, length, scale);
break;
case POSTGRES:
dorisTypeName = PostgresType.toDorisType(sourceTypeName, length, scale);
break;
case SQLSERVER:
dorisTypeName = SqlServerType.toDorisType(sourceTypeName, length, scale);
break;
default:
String errMsg = "Not support " + sourceTypeName + " schema change.";
throw new UnsupportedOperationException(errMsg);
}
return dorisTypeName;
}
private String handleDefaultValue(String defaultValue) {
if (StringUtils.isNullOrWhitespaceOnly(defaultValue)) {
return null;
}
// Due to historical reasons, doris needs to add quotes to
// the default value of the new column
// For example in mysql: alter table add column c1 int default 100
// In Doris: alter table add column c1 int default '100'
if (Pattern.matches("['\"].*?['\"]", defaultValue)) {
return defaultValue;
} else if (defaultValue.equals("1970-01-01 00:00:00")) {
// TODO: The default value of setting the current time in CDC is 1970-01-01 00:00:00
return "current_timestamp";
}
return "'" + defaultValue + "'";
}
@VisibleForTesting
public void setOriginFieldSchemaMap(Map<String, FieldSchema> originFieldSchemaMap) {
this.originFieldSchemaMap = originFieldSchemaMap;
}
@VisibleForTesting
public Map<String, FieldSchema> getOriginFieldSchemaMap() {
return originFieldSchemaMap;
}
@VisibleForTesting
public void setSourceConnector(String sourceConnector) {
this.sourceConnector = SourceConnector.valueOf(sourceConnector.toUpperCase());
}
}