blob: bd685ef0726c2c479567cd1f52449dfd848aa361 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.flink.sink.writer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.codec.binary.Base64;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.sink.HttpGetWithEntity;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.StringUtils;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.apache.doris.flink.sink.writer.LoadConstants.DORIS_DELETE_SIGN;
public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<String> {
private static final Logger LOG = LoggerFactory.getLogger(JsonDebeziumSchemaSerializer.class);
private static final String CHECK_SCHEMA_CHANGE_API = "http://%s/api/enable_light_schema_change/%s/%s";
private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/%s";
private static final String OP_READ = "r"; // snapshot read
private static final String OP_CREATE = "c"; // insert
private static final String OP_UPDATE = "u"; // update
private static final String OP_DELETE = "d"; // delete
public static final String EXECUTE_DDL = "ALTER TABLE %s %s COLUMN %s %s"; //alter table tbl add cloumn aca int
private static final String addDropDDLRegex = "ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*";
private final Pattern addDropDDLPattern;
private DorisOptions dorisOptions;
private ObjectMapper objectMapper = new ObjectMapper();
private String database;
private String table;
//table name of the cdc upstream, format is db.tbl
private String sourceTableName;
public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions, Pattern pattern, String sourceTableName) {
this.dorisOptions = dorisOptions;
this.addDropDDLPattern = pattern == null ? Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE) : pattern;
String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
this.database = tableInfo[0];
this.table = tableInfo[1];
this.sourceTableName = sourceTableName;
}
@Override
public byte[] serialize(String record) throws IOException {
LOG.debug("received debezium json data {} :", record);
JsonNode recordRoot = objectMapper.readTree(record);
String op = extractJsonNode(recordRoot, "op");
if (Objects.isNull(op)) {
//schema change ddl
schemaChange(recordRoot);
return null;
}
Map<String, String> valueMap;
if (OP_READ.equals(op) || OP_CREATE.equals(op)) {
valueMap = extractAfterRow(recordRoot);
addDeleteSign(valueMap,false);
} else if (OP_UPDATE.equals(op)) {
valueMap = extractAfterRow(recordRoot);
addDeleteSign(valueMap,false);
} else if (OP_DELETE.equals(op)) {
valueMap = extractBeforeRow(recordRoot);
addDeleteSign(valueMap,true);
} else {
LOG.error("parse record fail, unknown op {} in {}",op,record);
return null;
}
return objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8);
}
@VisibleForTesting
public boolean schemaChange(JsonNode recordRoot) {
boolean status = false;
try{
if(!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)){
return false;
}
String ddl = extractDDL(recordRoot);
if(StringUtils.isNullOrWhitespaceOnly(ddl)){
LOG.info("ddl can not do schema change:{}", recordRoot);
return false;
}
boolean doSchemaChange = checkSchemaChange(ddl);
status = doSchemaChange && execSchemaChange(ddl);
LOG.info("schema change status:{}", status);
}catch (Exception ex){
LOG.warn("schema change error :", ex);
}
return status;
}
/**
* When cdc synchronizes multiple tables, it will capture multiple table schema changes
*/
protected boolean checkTable(JsonNode recordRoot) {
String db = extractDatabase(recordRoot);
String tbl = extractTable(recordRoot);
String dbTbl = db + "." + tbl;
return sourceTableName.equals(dbTbl);
}
private void addDeleteSign(Map<String, String> valueMap, boolean delete) {
if(delete){
valueMap.put(DORIS_DELETE_SIGN, "1");
}else{
valueMap.put(DORIS_DELETE_SIGN, "0");
}
}
private boolean checkSchemaChange(String ddl) throws IOException {
String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API, dorisOptions.getFenodes(), database, table);
Map<String,Object> param = buildRequestParam(ddl);
if(param.size() != 2){
return false;
}
HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl);
httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
httpGet.setEntity(new StringEntity(objectMapper.writeValueAsString(param)));
boolean success = handleResponse(httpGet);
if (!success) {
LOG.warn("schema change can not do table {}.{}",database,table);
}
return success;
}
/**
* Build param
* {
* "isDropColumn": true,
* "columnName" : "column"
* }
*/
protected Map<String, Object> buildRequestParam(String ddl) {
Map<String,Object> params = new HashMap<>();
Matcher matcher = addDropDDLPattern.matcher(ddl);
if(matcher.find()){
String op = matcher.group(1);
String col = matcher.group(3);
params.put("isDropColumn", op.equalsIgnoreCase("DROP"));
params.put("columnName", col);
}
return params;
}
private boolean execSchemaChange(String ddl) throws IOException {
Map<String, String> param = new HashMap<>();
param.put("stmt", ddl);
String requestUrl = String.format(SCHEMA_CHANGE_API, dorisOptions.getFenodes(), database);
HttpPost httpPost = new HttpPost(requestUrl);
httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "application/json");
httpPost.setEntity(new StringEntity(objectMapper.writeValueAsString(param)));
boolean success = handleResponse(httpPost);
return success;
}
protected String extractDatabase(JsonNode record) {
if(record.get("source").has("schema")){
//compatible with schema
return extractJsonNode(record.get("source"), "schema");
}else{
return extractJsonNode(record.get("source"), "db");
}
}
protected String extractTable(JsonNode record) {
return extractJsonNode(record.get("source"), "table");
}
private boolean handleResponse(HttpUriRequest request) {
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
CloseableHttpResponse response = httpclient.execute(request);
final int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == 200 && response.getEntity() != null) {
String loadResult = EntityUtils.toString(response.getEntity());
Map<String, Object> responseMap = objectMapper.readValue(loadResult, Map.class);
String code = responseMap.getOrDefault("code", "-1").toString();
if (code.equals("0")) {
return true;
} else {
LOG.error("schema change response:{}", loadResult);
}
}
}catch(Exception e){
LOG.error("http request error,", e);
}
return false;
}
private String extractJsonNode(JsonNode record, String key) {
return record != null && record.get(key) != null ? record.get(key).asText() : null;
}
private Map<String, String> extractBeforeRow(JsonNode record) {
return extractRow(record.get("before"));
}
private Map<String, String> extractAfterRow(JsonNode record) {
return extractRow(record.get("after"));
}
private Map<String, String> extractRow(JsonNode recordRow) {
Map<String, String> recordMap = objectMapper.convertValue(recordRow, new TypeReference<Map<String, String>>() {
});
return recordMap != null ? recordMap : new HashMap<>();
}
public String extractDDL(JsonNode record) throws JsonProcessingException {
String historyRecord = extractJsonNode(record, "historyRecord");
if (Objects.isNull(historyRecord)) {
return null;
}
String ddl = extractJsonNode(objectMapper.readTree(historyRecord), "ddl");
LOG.debug("received debezium ddl :{}", ddl);
if (!Objects.isNull(ddl)) {
//filter add/drop operation
Matcher matcher = addDropDDLPattern.matcher(ddl);
if(matcher.find()){
String op = matcher.group(1);
String col = matcher.group(3);
String type = matcher.group(5);
type = type == null ? "" : type;
ddl = String.format(EXECUTE_DDL, dorisOptions.getTableIdentifier(), op, col, type);
LOG.info("parse ddl:{}", ddl);
return ddl;
}
}
return null;
}
private String authHeader() {
return "Basic " + new String(Base64.encodeBase64((dorisOptions.getUsername() + ":" + dorisOptions.getPassword()).getBytes(StandardCharsets.UTF_8)));
}
public static JsonDebeziumSchemaSerializer.Builder builder() {
return new JsonDebeziumSchemaSerializer.Builder();
}
/**
* Builder for JsonDebeziumSchemaSerializer.
*/
public static class Builder {
private DorisOptions dorisOptions;
private Pattern addDropDDLPattern;
private String sourceTableName;
public JsonDebeziumSchemaSerializer.Builder setDorisOptions(DorisOptions dorisOptions) {
this.dorisOptions = dorisOptions;
return this;
}
public JsonDebeziumSchemaSerializer.Builder setPattern(Pattern addDropDDLPattern) {
this.addDropDDLPattern = addDropDDLPattern;
return this;
}
public JsonDebeziumSchemaSerializer.Builder setSourceTableName(String sourceTableName) {
this.sourceTableName = sourceTableName;
return this;
}
public JsonDebeziumSchemaSerializer build() {
return new JsonDebeziumSchemaSerializer(dorisOptions, addDropDDLPattern, sourceTableName);
}
}
}