[improve] Extract Schema change operation (#233)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
index 7aa314f..1f0a09f 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
@@ -139,7 +139,7 @@
}
}
- public String buildCreateTableDDL(TableSchema schema) {
+ public static String buildCreateTableDDL(TableSchema schema) {
StringBuilder sb = new StringBuilder("CREATE TABLE IF NOT EXISTS ");
sb.append(identifier(schema.getDatabase()))
.append(".")
@@ -209,7 +209,7 @@
return sb.toString();
}
- private void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey){
+ private static void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey){
String fieldType = field.getTypeString();
if(isKey && DorisType.STRING.equals(fieldType)){
fieldType = String.format("%s(%s)", DorisType.VARCHAR, 65533);
@@ -222,7 +222,7 @@
.append("',");
}
- private String quoteComment(String comment){
+ private static String quoteComment(String comment){
if(comment == null){
return "";
} else {
@@ -230,16 +230,16 @@
}
}
- private List<String> identifier(List<String> name) {
+ private static List<String> identifier(List<String> name) {
List<String> result = name.stream().map(m -> identifier(m)).collect(Collectors.toList());
return result;
}
- private String identifier(String name) {
+ private static String identifier(String name) {
return "`" + name + "`";
}
- private String quoteProperties(String name) {
+ private static String quoteProperties(String name) {
return "'" + name + "'";
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
similarity index 76%
rename from flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java
rename to flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
index c1380c7..c7a3384 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.flink.sink.writer;
+package org.apache.doris.flink.sink.schema;
import org.apache.doris.flink.catalog.doris.FieldSchema;
@@ -66,8 +66,7 @@
for (Entry<String, FieldSchema> originFieldSchema : originFieldSchemaMap.entrySet()) {
if (originFieldSchema.getKey().equals(oldColumnName)) {
fieldSchema = originFieldSchema.getValue();
- String renameSQL = String.format(RENAME_DDL, table, oldColumnName, newColumnName);
- ddlList.add(renameSQL);
+ ddlList.add(buildRenameColumnDDL(table, oldColumnName, newColumnName));
ddlSchemas.add(new DDLSchema(oldColumnName, false));
}
}
@@ -80,23 +79,11 @@
ddlSchemas.clear();
List<String> ddlList = Lists.newArrayList();
for (FieldSchema fieldSchema : addFieldSchemas) {
- String name = fieldSchema.getName();
- String type = fieldSchema.getTypeString();
- String defaultValue = fieldSchema.getDefaultValue();
- String comment = fieldSchema.getComment();
- String addDDL = String.format(ADD_DDL, table, name, type);
- if (!StringUtils.isNullOrWhitespaceOnly(defaultValue)) {
- addDDL = addDDL + " DEFAULT " + defaultValue;
- }
- if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
- addDDL = addDDL + " COMMENT " + comment;
- }
- ddlList.add(addDDL);
- ddlSchemas.add(new DDLSchema(name, false));
+ ddlList.add(buildAddColumnDDL(table, fieldSchema));
+ ddlSchemas.add(new DDLSchema(fieldSchema.getName(), false));
}
for (String columName : dropFieldSchemas) {
- String dropDDL = String.format(DROP_DDL, table, columName);
- ddlList.add(dropDDL);
+ ddlList.add(buildDropColumnDDL(table, columName));
ddlSchemas.add(new DDLSchema(columName, true));
}
@@ -105,6 +92,29 @@
return ddlList;
}
+ public static String buildAddColumnDDL(String tableIdentifier, FieldSchema fieldSchema){
+ String name = fieldSchema.getName();
+ String type = fieldSchema.getTypeString();
+ String defaultValue = fieldSchema.getDefaultValue();
+ String comment = fieldSchema.getComment();
+ String addDDL = String.format(ADD_DDL, tableIdentifier, name, type);
+ if (!StringUtils.isNullOrWhitespaceOnly(defaultValue)) {
+ addDDL = addDDL + " DEFAULT " + defaultValue;
+ }
+ if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
+ addDDL = addDDL + " COMMENT " + comment;
+ }
+ return addDDL;
+ }
+
+ public static String buildDropColumnDDL(String tableIdentifier, String columName){
+ return String.format(DROP_DDL, tableIdentifier, columName);
+ }
+
+ public static String buildRenameColumnDDL(String tableIdentifier, String oldColumnName, String newColumnName){
+ return String.format(RENAME_DDL, tableIdentifier, oldColumnName, newColumnName);
+ }
+
public static List<DDLSchema> getDdlSchemas() {
return ddlSchemas;
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
new file mode 100644
index 0000000..26e43cf
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.schema;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.doris.flink.catalog.doris.DorisSystem;
+import org.apache.doris.flink.catalog.doris.FieldSchema;
+import org.apache.doris.flink.catalog.doris.TableSchema;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.exception.IllegalArgumentException;
+import org.apache.doris.flink.rest.RestService;
+import org.apache.doris.flink.sink.HttpGetWithEntity;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.StringUtils;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+public class SchemaChangeManager {
+ private static final Logger LOG = LoggerFactory.getLogger(SchemaChangeManager.class);
+ private static final String CHECK_SCHEMA_CHANGE_API = "http://%s/api/enable_light_schema_change/%s/%s";
+ private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/";
+ private ObjectMapper objectMapper = new ObjectMapper();
+ private DorisOptions dorisOptions;
+
+ public SchemaChangeManager(DorisOptions dorisOptions) {
+ this.dorisOptions = dorisOptions;
+ }
+
+ public boolean createTable(TableSchema table) throws IOException, IllegalArgumentException {
+ String createTableDDL = DorisSystem.buildCreateTableDDL(table);
+ return execute(createTableDDL);
+ }
+
+ public boolean addColumn(String database, String table, FieldSchema field) throws IOException, IllegalArgumentException {
+ String tableIdentifier = getTableIdentifier(database, table);
+ String addColumnDDL = SchemaChangeHelper.buildAddColumnDDL(tableIdentifier, field);
+ return schemaChange(database, table, buildRequestParam(false, field.getName()), addColumnDDL);
+ }
+
+ public boolean dropColumn(String database, String table, String columnName) throws IOException, IllegalArgumentException {
+ String tableIdentifier = getTableIdentifier(database, table);
+ String dropColumnDDL = SchemaChangeHelper.buildDropColumnDDL(tableIdentifier, columnName);
+ return schemaChange(database, table, buildRequestParam(true, columnName), dropColumnDDL);
+ }
+
+ public boolean renameColumn(String database, String table, String oldColumnName, String newColumnName) throws IOException, IllegalArgumentException {
+ String tableIdentifier = getTableIdentifier(database, table);
+ String renameColumnDDL = SchemaChangeHelper.buildRenameColumnDDL(tableIdentifier, oldColumnName, newColumnName);
+ return schemaChange(database, table, buildRequestParam(true, oldColumnName), renameColumnDDL);
+ }
+
+ public boolean schemaChange(String database, String table, Map<String, Object> params, String sql) throws IOException, IllegalArgumentException {
+ if(checkSchemaChange(database, table, params)){
+ return execute(sql);
+ }
+ return false;
+ }
+
+ public static Map<String, Object> buildRequestParam(boolean dropColumn, String columnName) {
+ Map<String, Object> params = new HashMap<>();
+ params.put("isDropColumn", dropColumn);
+ params.put("columnName", columnName);
+ return params;
+ }
+
+ /**
+ * check ddl can do light schema change
+ */
+ public boolean checkSchemaChange(String database, String table, Map<String, Object> params) throws IOException, IllegalArgumentException {
+ if(CollectionUtil.isNullOrEmpty(params)){
+ return false;
+ }
+ String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API,
+ RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database, table);
+ HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl);
+ httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
+ httpGet.setEntity(new StringEntity(objectMapper.writeValueAsString(params)));
+ boolean success = handleResponse(httpGet);
+ if (!success) {
+ LOG.warn("schema change can not do table {}.{}", database, table);
+ }
+ return success;
+ }
+
+ /**
+ * execute sql in doris
+ */
+ public boolean execute(String ddl) throws IOException, IllegalArgumentException {
+ if(StringUtils.isNullOrWhitespaceOnly(ddl)){
+ return false;
+ }
+ Map<String, String> param = new HashMap<>();
+ param.put("stmt", ddl);
+ String requestUrl = String.format(SCHEMA_CHANGE_API, RestService.randomEndpoint(dorisOptions.getFenodes(), LOG));
+ HttpPost httpPost = new HttpPost(requestUrl);
+ httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
+ httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "application/json");
+ httpPost.setEntity(new StringEntity(objectMapper.writeValueAsString(param)));
+ boolean success = handleResponse(httpPost);
+ return success;
+ }
+
+ private boolean handleResponse(HttpUriRequest request) {
+ try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
+ CloseableHttpResponse response = httpclient.execute(request);
+ final int statusCode = response.getStatusLine().getStatusCode();
+ if (statusCode == 200 && response.getEntity() != null) {
+ String loadResult = EntityUtils.toString(response.getEntity());
+ Map<String, Object> responseMap = objectMapper.readValue(loadResult, Map.class);
+ String code = responseMap.getOrDefault("code", "-1").toString();
+ if (code.equals("0")) {
+ return true;
+ } else {
+ LOG.error("schema change response:{}", loadResult);
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("http request error,", e);
+ }
+ return false;
+ }
+
+ private String authHeader() {
+ return "Basic " + new String(Base64.encodeBase64(
+ (dorisOptions.getUsername() + ":" + dorisOptions.getPassword()).getBytes(StandardCharsets.UTF_8)));
+ }
+
+ private String getTableIdentifier(String database, String table){
+ return String.format("%s.%s", database, table);
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/util/DeleteOperation.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/util/DeleteOperation.java
new file mode 100644
index 0000000..85ef3d7
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/util/DeleteOperation.java
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.util;
+
+import java.util.Map;
+
+import static org.apache.doris.flink.sink.writer.LoadConstants.DORIS_DELETE_SIGN;
+
+public class DeleteOperation {
+ public static void addDeleteSign(Map<String, Object> valueMap, boolean delete) {
+ if (delete) {
+ valueMap.put(DORIS_DELETE_SIGN, "1");
+ } else {
+ valueMap.put(DORIS_DELETE_SIGN, "0");
+ }
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
index 2171e84..ee4cad3 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
@@ -24,34 +24,22 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.NullNode;
-import org.apache.commons.codec.binary.Base64;
import org.apache.commons.collections.CollectionUtils;
-
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.exception.IllegalArgumentException;
-import org.apache.doris.flink.rest.RestService;
-import org.apache.doris.flink.sink.HttpGetWithEntity;
-import org.apache.doris.flink.sink.writer.SchemaChangeHelper;
-import org.apache.doris.flink.sink.writer.SchemaChangeHelper.DDLSchema;
+import org.apache.doris.flink.sink.schema.SchemaChangeHelper;
+import org.apache.doris.flink.sink.schema.SchemaChangeHelper.DDLSchema;
+import org.apache.doris.flink.sink.schema.SchemaChangeManager;
import org.apache.doris.flink.tools.cdc.SourceConnector;
import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
import org.apache.doris.flink.tools.cdc.oracle.OracleType;
import org.apache.doris.flink.tools.cdc.postgres.PostgresType;
import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType;
-
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.StringUtils;
-import org.apache.http.HttpHeaders;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,15 +55,12 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import static org.apache.doris.flink.sink.writer.LoadConstants.DORIS_DELETE_SIGN;
+import static org.apache.doris.flink.sink.util.DeleteOperation.addDeleteSign;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<String> {
-
private static final Logger LOG = LoggerFactory.getLogger(JsonDebeziumSchemaSerializer.class);
- private static final String CHECK_SCHEMA_CHANGE_API = "http://%s/api/enable_light_schema_change/%s/%s";
- private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/%s";
private static final String OP_READ = "r"; // snapshot read
private static final String OP_CREATE = "c"; // insert
private static final String OP_UPDATE = "u"; // update
@@ -100,6 +85,7 @@
private String lineDelimiter = LINE_DELIMITER_DEFAULT;
private boolean ignoreUpdateBefore = true;
private SourceConnector sourceConnector;
+ private SchemaChangeManager schemaChangeManager;
public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions,
Pattern pattern,
@@ -118,6 +104,7 @@
this.newSchemaChange = newSchemaChange;
this.firstLoad = true;
this.firstSchemaChange = true;
+ this.schemaChangeManager = new SchemaChangeManager(dorisOptions);
}
public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions,
@@ -211,7 +198,7 @@
DDLSchema ddlSchema = ddlSchemas.get(i);
String ddlSql = ddlSqlList.get(i);
boolean doSchemaChange = checkSchemaChange(ddlSchema);
- status = doSchemaChange && execSchemaChange(ddlSql);
+ status = doSchemaChange && schemaChangeManager.execute(ddlSql);
LOG.info("schema change status:{}, ddl:{}", status, ddlSql);
}
} catch (Exception ex) {
@@ -220,20 +207,6 @@
return status;
}
- private boolean checkSchemaChange(DDLSchema ddlSchema) throws IOException, IllegalArgumentException {
- String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API,
- RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database, table);
- Map<String, Object> param = buildRequestParam(ddlSchema);
- HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl);
- httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
- httpGet.setEntity(new StringEntity(objectMapper.writeValueAsString(param)));
- boolean success = handleResponse(httpGet);
- if (!success) {
- LOG.warn("schema change can not do table {}.{}", database, table);
- }
- return success;
- }
-
@VisibleForTesting
public List<String> extractDDLList(JsonNode record) throws JsonProcessingException {
JsonNode historyRecord = extractHistoryRecord(record);
@@ -295,7 +268,7 @@
return false;
}
boolean doSchemaChange = checkSchemaChange(ddl);
- status = doSchemaChange && execSchemaChange(ddl);
+ status = doSchemaChange && schemaChangeManager.execute(ddl);
LOG.info("schema change status:{}", status);
} catch (Exception ex) {
LOG.warn("schema change error :", ex);
@@ -313,36 +286,14 @@
return sourceTableName.equals(dbTbl);
}
- private void addDeleteSign(Map<String, Object> valueMap, boolean delete) {
- if (delete) {
- valueMap.put(DORIS_DELETE_SIGN, "1");
- } else {
- valueMap.put(DORIS_DELETE_SIGN, "0");
- }
- }
-
private boolean checkSchemaChange(String ddl) throws IOException, IllegalArgumentException {
- String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API,
- RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database, table);
Map<String, Object> param = buildRequestParam(ddl);
- if (param.size() != 2) {
- return false;
- }
- HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl);
- httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
- httpGet.setEntity(new StringEntity(objectMapper.writeValueAsString(param)));
- boolean success = handleResponse(httpGet);
- if (!success) {
- LOG.warn("schema change can not do table {}.{}", database, table);
- }
- return success;
+ return schemaChangeManager.checkSchemaChange(database, table, param);
}
- protected Map<String, Object> buildRequestParam(DDLSchema ddlSchema) {
- Map<String, Object> params = new HashMap<>();
- params.put("isDropColumn", ddlSchema.isDropColumn());
- params.put("columnName", ddlSchema.getColumnName());
- return params;
+ private boolean checkSchemaChange(DDLSchema ddlSchema) throws IOException, IllegalArgumentException {
+ Map<String, Object> param = SchemaChangeManager.buildRequestParam(ddlSchema.isDropColumn(), ddlSchema.getColumnName());
+ return schemaChangeManager.checkSchemaChange(database, table, param);
}
/**
@@ -364,19 +315,6 @@
return params;
}
- private boolean execSchemaChange(String ddl) throws IOException, IllegalArgumentException {
- Map<String, String> param = new HashMap<>();
- param.put("stmt", ddl);
- String requestUrl = String.format(SCHEMA_CHANGE_API,
- RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database);
- HttpPost httpPost = new HttpPost(requestUrl);
- httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
- httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "application/json");
- httpPost.setEntity(new StringEntity(objectMapper.writeValueAsString(param)));
- boolean success = handleResponse(httpPost);
- return success;
- }
-
protected String extractDatabase(JsonNode record) {
if (record.get("source").has("schema")) {
// compatible with schema
@@ -390,25 +328,7 @@
return extractJsonNode(record.get("source"), "table");
}
- private boolean handleResponse(HttpUriRequest request) {
- try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
- CloseableHttpResponse response = httpclient.execute(request);
- final int statusCode = response.getStatusLine().getStatusCode();
- if (statusCode == 200 && response.getEntity() != null) {
- String loadResult = EntityUtils.toString(response.getEntity());
- Map<String, Object> responseMap = objectMapper.readValue(loadResult, Map.class);
- String code = responseMap.getOrDefault("code", "-1").toString();
- if (code.equals("0")) {
- return true;
- } else {
- LOG.error("schema change response:{}", loadResult);
- }
- }
- } catch (Exception e) {
- LOG.error("http request error,", e);
- }
- return false;
- }
+
private String extractJsonNode(JsonNode record, String key) {
return record != null && record.get(key) != null &&
@@ -457,10 +377,7 @@
return null;
}
- private String authHeader() {
- return "Basic " + new String(Base64.encodeBase64(
- (dorisOptions.getUsername() + ":" + dorisOptions.getPassword()).getBytes(StandardCharsets.UTF_8)));
- }
+
@VisibleForTesting
public void fillOriginSchema(JsonNode columns) {
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/SchemaChangeHelperTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java
similarity index 98%
rename from flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/SchemaChangeHelperTest.java
rename to flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java
index 62906df..7e4fb47 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/SchemaChangeHelperTest.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java
@@ -15,10 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.flink.sink.writer;
+package org.apache.doris.flink.sink.schema;
import org.apache.doris.flink.catalog.doris.FieldSchema;
-
import org.apache.flink.shaded.guava30.com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.Before;