[improve] Extract Schema change operation (#233)

diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
index 7aa314f..1f0a09f 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
@@ -139,7 +139,7 @@
         }
     }
 
-    public String buildCreateTableDDL(TableSchema schema) {
+    public static String buildCreateTableDDL(TableSchema schema) {
         StringBuilder sb = new StringBuilder("CREATE TABLE IF NOT EXISTS ");
         sb.append(identifier(schema.getDatabase()))
                 .append(".")
@@ -209,7 +209,7 @@
         return sb.toString();
     }
 
-    private void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey){
+    private static void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey){
         String fieldType = field.getTypeString();
         if(isKey && DorisType.STRING.equals(fieldType)){
             fieldType = String.format("%s(%s)", DorisType.VARCHAR, 65533);
@@ -222,7 +222,7 @@
                 .append("',");
     }
 
-    private String quoteComment(String comment){
+    private static String quoteComment(String comment){
         if(comment == null){
             return "";
         } else {
@@ -230,16 +230,16 @@
         }
     }
 
-    private List<String> identifier(List<String> name) {
+    private static List<String> identifier(List<String> name) {
         List<String> result = name.stream().map(m -> identifier(m)).collect(Collectors.toList());
         return result;
     }
 
-    private String identifier(String name) {
+    private static String identifier(String name) {
         return "`" + name + "`";
     }
 
-    private String quoteProperties(String name) {
+    private static String quoteProperties(String name) {
         return "'" + name + "'";
     }
 
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
similarity index 76%
rename from flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java
rename to flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
index c1380c7..c7a3384 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.flink.sink.writer;
+package org.apache.doris.flink.sink.schema;
 
 import org.apache.doris.flink.catalog.doris.FieldSchema;
 
@@ -66,8 +66,7 @@
         for (Entry<String, FieldSchema> originFieldSchema : originFieldSchemaMap.entrySet()) {
             if (originFieldSchema.getKey().equals(oldColumnName)) {
                 fieldSchema = originFieldSchema.getValue();
-                String renameSQL = String.format(RENAME_DDL, table, oldColumnName, newColumnName);
-                ddlList.add(renameSQL);
+                ddlList.add(buildRenameColumnDDL(table, oldColumnName, newColumnName));
                 ddlSchemas.add(new DDLSchema(oldColumnName, false));
             }
         }
@@ -80,23 +79,11 @@
         ddlSchemas.clear();
         List<String> ddlList = Lists.newArrayList();
         for (FieldSchema fieldSchema : addFieldSchemas) {
-            String name = fieldSchema.getName();
-            String type = fieldSchema.getTypeString();
-            String defaultValue = fieldSchema.getDefaultValue();
-            String comment = fieldSchema.getComment();
-            String addDDL = String.format(ADD_DDL, table, name, type);
-            if (!StringUtils.isNullOrWhitespaceOnly(defaultValue)) {
-                addDDL = addDDL + " DEFAULT " + defaultValue;
-            }
-            if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
-                addDDL = addDDL + " COMMENT " + comment;
-            }
-            ddlList.add(addDDL);
-            ddlSchemas.add(new DDLSchema(name, false));
+            ddlList.add(buildAddColumnDDL(table, fieldSchema));
+            ddlSchemas.add(new DDLSchema(fieldSchema.getName(), false));
         }
         for (String columName : dropFieldSchemas) {
-            String dropDDL = String.format(DROP_DDL, table, columName);
-            ddlList.add(dropDDL);
+            ddlList.add(buildDropColumnDDL(table, columName));
             ddlSchemas.add(new DDLSchema(columName, true));
         }
 
@@ -105,6 +92,29 @@
         return ddlList;
     }
 
+    public static String buildAddColumnDDL(String tableIdentifier, FieldSchema fieldSchema){
+        String name = fieldSchema.getName();
+        String type = fieldSchema.getTypeString();
+        String defaultValue = fieldSchema.getDefaultValue();
+        String comment = fieldSchema.getComment();
+        String addDDL = String.format(ADD_DDL, tableIdentifier, name, type);
+        if (!StringUtils.isNullOrWhitespaceOnly(defaultValue)) {
+            addDDL = addDDL + " DEFAULT " + defaultValue;
+        }
+        if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
+            addDDL = addDDL + " COMMENT " + comment;
+        }
+        return addDDL;
+    }
+
+    public static String buildDropColumnDDL(String tableIdentifier, String columName){
+        return String.format(DROP_DDL, tableIdentifier, columName);
+    }
+
+    public static String buildRenameColumnDDL(String tableIdentifier, String oldColumnName, String newColumnName){
+        return String.format(RENAME_DDL, tableIdentifier, oldColumnName, newColumnName);
+    }
+
     public static List<DDLSchema> getDdlSchemas() {
         return ddlSchemas;
     }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
new file mode 100644
index 0000000..26e43cf
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.schema;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.doris.flink.catalog.doris.DorisSystem;
+import org.apache.doris.flink.catalog.doris.FieldSchema;
+import org.apache.doris.flink.catalog.doris.TableSchema;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.exception.IllegalArgumentException;
+import org.apache.doris.flink.rest.RestService;
+import org.apache.doris.flink.sink.HttpGetWithEntity;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.StringUtils;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+public class SchemaChangeManager {
+    private static final Logger LOG = LoggerFactory.getLogger(SchemaChangeManager.class);
+    private static final String CHECK_SCHEMA_CHANGE_API = "http://%s/api/enable_light_schema_change/%s/%s";
+    private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/";
+    private ObjectMapper objectMapper = new ObjectMapper();
+    private DorisOptions dorisOptions;
+
+    public SchemaChangeManager(DorisOptions dorisOptions) {
+        this.dorisOptions = dorisOptions;
+    }
+
+    public boolean createTable(TableSchema table) throws IOException, IllegalArgumentException {
+        String createTableDDL = DorisSystem.buildCreateTableDDL(table);
+        return execute(createTableDDL);
+    }
+
+    public boolean addColumn(String database, String table, FieldSchema field) throws IOException, IllegalArgumentException {
+        String tableIdentifier = getTableIdentifier(database, table);
+        String addColumnDDL = SchemaChangeHelper.buildAddColumnDDL(tableIdentifier, field);
+        return schemaChange(database, table, buildRequestParam(false, field.getName()), addColumnDDL);
+    }
+
+    public boolean dropColumn(String database, String table, String columnName) throws IOException, IllegalArgumentException {
+        String tableIdentifier = getTableIdentifier(database, table);
+        String dropColumnDDL = SchemaChangeHelper.buildDropColumnDDL(tableIdentifier, columnName);
+        return schemaChange(database, table, buildRequestParam(true, columnName), dropColumnDDL);
+    }
+
+    public boolean renameColumn(String database, String table, String oldColumnName, String newColumnName) throws IOException, IllegalArgumentException {
+        String tableIdentifier = getTableIdentifier(database, table);
+        String renameColumnDDL = SchemaChangeHelper.buildRenameColumnDDL(tableIdentifier, oldColumnName, newColumnName);
+        return schemaChange(database, table, buildRequestParam(true, oldColumnName), renameColumnDDL);
+    }
+
+    public boolean schemaChange(String database, String table, Map<String, Object> params, String sql) throws IOException, IllegalArgumentException {
+        if(checkSchemaChange(database, table, params)){
+            return execute(sql);
+        }
+        return false;
+    }
+
+    public static Map<String, Object> buildRequestParam(boolean dropColumn, String columnName) {
+        Map<String, Object> params = new HashMap<>();
+        params.put("isDropColumn", dropColumn);
+        params.put("columnName", columnName);
+        return params;
+    }
+
+    /**
+     * check ddl can do light schema change
+     */
+    public boolean checkSchemaChange(String database, String table, Map<String, Object> params) throws IOException, IllegalArgumentException {
+        if(CollectionUtil.isNullOrEmpty(params)){
+            return false;
+        }
+        String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API,
+                RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database, table);
+        HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl);
+        httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
+        httpGet.setEntity(new StringEntity(objectMapper.writeValueAsString(params)));
+        boolean success = handleResponse(httpGet);
+        if (!success) {
+            LOG.warn("schema change can not do table {}.{}", database, table);
+        }
+        return success;
+    }
+
+    /**
+     * execute sql in doris
+     */
+    public boolean execute(String ddl) throws IOException, IllegalArgumentException {
+        if(StringUtils.isNullOrWhitespaceOnly(ddl)){
+            return false;
+        }
+        Map<String, String> param = new HashMap<>();
+        param.put("stmt", ddl);
+        String requestUrl = String.format(SCHEMA_CHANGE_API, RestService.randomEndpoint(dorisOptions.getFenodes(), LOG));
+        HttpPost httpPost = new HttpPost(requestUrl);
+        httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
+        httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "application/json");
+        httpPost.setEntity(new StringEntity(objectMapper.writeValueAsString(param)));
+        boolean success = handleResponse(httpPost);
+        return success;
+    }
+
+    private boolean handleResponse(HttpUriRequest request) {
+        try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
+            CloseableHttpResponse response = httpclient.execute(request);
+            final int statusCode = response.getStatusLine().getStatusCode();
+            if (statusCode == 200 && response.getEntity() != null) {
+                String loadResult = EntityUtils.toString(response.getEntity());
+                Map<String, Object> responseMap = objectMapper.readValue(loadResult, Map.class);
+                String code = responseMap.getOrDefault("code", "-1").toString();
+                if (code.equals("0")) {
+                    return true;
+                } else {
+                    LOG.error("schema change response:{}", loadResult);
+                }
+            }
+        } catch (Exception e) {
+            LOG.error("http request error,", e);
+        }
+        return false;
+    }
+
+    private String authHeader() {
+        return "Basic " + new String(Base64.encodeBase64(
+                (dorisOptions.getUsername() + ":" + dorisOptions.getPassword()).getBytes(StandardCharsets.UTF_8)));
+    }
+
+    private String getTableIdentifier(String database, String table){
+        return String.format("%s.%s", database, table);
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/util/DeleteOperation.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/util/DeleteOperation.java
new file mode 100644
index 0000000..85ef3d7
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/util/DeleteOperation.java
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.util;
+
+import java.util.Map;
+
+import static org.apache.doris.flink.sink.writer.LoadConstants.DORIS_DELETE_SIGN;
+
+public class DeleteOperation {
+    public static void addDeleteSign(Map<String, Object> valueMap, boolean delete) {
+        if (delete) {
+            valueMap.put(DORIS_DELETE_SIGN, "1");
+        } else {
+            valueMap.put(DORIS_DELETE_SIGN, "0");
+        }
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
index 2171e84..ee4cad3 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
@@ -24,34 +24,22 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.NullNode;
-import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.collections.CollectionUtils;
-
 import org.apache.doris.flink.catalog.doris.FieldSchema;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.exception.IllegalArgumentException;
-import org.apache.doris.flink.rest.RestService;
-import org.apache.doris.flink.sink.HttpGetWithEntity;
-import org.apache.doris.flink.sink.writer.SchemaChangeHelper;
-import org.apache.doris.flink.sink.writer.SchemaChangeHelper.DDLSchema;
+import org.apache.doris.flink.sink.schema.SchemaChangeHelper;
+import org.apache.doris.flink.sink.schema.SchemaChangeHelper.DDLSchema;
+import org.apache.doris.flink.sink.schema.SchemaChangeManager;
 import org.apache.doris.flink.tools.cdc.SourceConnector;
 import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
 import org.apache.doris.flink.tools.cdc.oracle.OracleType;
 import org.apache.doris.flink.tools.cdc.postgres.PostgresType;
 import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType;
-
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.StringUtils;
-import org.apache.http.HttpHeaders;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.util.EntityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,15 +55,12 @@
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import static org.apache.doris.flink.sink.writer.LoadConstants.DORIS_DELETE_SIGN;
+import static org.apache.doris.flink.sink.util.DeleteOperation.addDeleteSign;
 import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
 import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
 
 public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<String> {
-
     private static final Logger LOG = LoggerFactory.getLogger(JsonDebeziumSchemaSerializer.class);
-    private static final String CHECK_SCHEMA_CHANGE_API = "http://%s/api/enable_light_schema_change/%s/%s";
-    private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/%s";
     private static final String OP_READ = "r"; // snapshot read
     private static final String OP_CREATE = "c"; // insert
     private static final String OP_UPDATE = "u"; // update
@@ -100,6 +85,7 @@
     private String lineDelimiter = LINE_DELIMITER_DEFAULT;
     private boolean ignoreUpdateBefore = true;
     private SourceConnector sourceConnector;
+    private SchemaChangeManager schemaChangeManager;
 
     public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions,
             Pattern pattern,
@@ -118,6 +104,7 @@
         this.newSchemaChange = newSchemaChange;
         this.firstLoad = true;
         this.firstSchemaChange = true;
+        this.schemaChangeManager = new SchemaChangeManager(dorisOptions);
     }
 
     public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions,
@@ -211,7 +198,7 @@
                 DDLSchema ddlSchema = ddlSchemas.get(i);
                 String ddlSql = ddlSqlList.get(i);
                 boolean doSchemaChange = checkSchemaChange(ddlSchema);
-                status = doSchemaChange && execSchemaChange(ddlSql);
+                status = doSchemaChange && schemaChangeManager.execute(ddlSql);
                 LOG.info("schema change status:{}, ddl:{}", status, ddlSql);
             }
         } catch (Exception ex) {
@@ -220,20 +207,6 @@
         return status;
     }
 
-    private boolean checkSchemaChange(DDLSchema ddlSchema) throws IOException, IllegalArgumentException {
-        String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API,
-                RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database, table);
-        Map<String, Object> param = buildRequestParam(ddlSchema);
-        HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl);
-        httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
-        httpGet.setEntity(new StringEntity(objectMapper.writeValueAsString(param)));
-        boolean success = handleResponse(httpGet);
-        if (!success) {
-            LOG.warn("schema change can not do table {}.{}", database, table);
-        }
-        return success;
-    }
-
     @VisibleForTesting
     public List<String> extractDDLList(JsonNode record) throws JsonProcessingException {
         JsonNode historyRecord = extractHistoryRecord(record);
@@ -295,7 +268,7 @@
                 return false;
             }
             boolean doSchemaChange = checkSchemaChange(ddl);
-            status = doSchemaChange && execSchemaChange(ddl);
+            status = doSchemaChange && schemaChangeManager.execute(ddl);
             LOG.info("schema change status:{}", status);
         } catch (Exception ex) {
             LOG.warn("schema change error :", ex);
@@ -313,36 +286,14 @@
         return sourceTableName.equals(dbTbl);
     }
 
-    private void addDeleteSign(Map<String, Object> valueMap, boolean delete) {
-        if (delete) {
-            valueMap.put(DORIS_DELETE_SIGN, "1");
-        } else {
-            valueMap.put(DORIS_DELETE_SIGN, "0");
-        }
-    }
-
     private boolean checkSchemaChange(String ddl) throws IOException, IllegalArgumentException {
-        String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API,
-                RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database, table);
         Map<String, Object> param = buildRequestParam(ddl);
-        if (param.size() != 2) {
-            return false;
-        }
-        HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl);
-        httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
-        httpGet.setEntity(new StringEntity(objectMapper.writeValueAsString(param)));
-        boolean success = handleResponse(httpGet);
-        if (!success) {
-            LOG.warn("schema change can not do table {}.{}", database, table);
-        }
-        return success;
+        return schemaChangeManager.checkSchemaChange(database, table, param);
     }
 
-    protected Map<String, Object> buildRequestParam(DDLSchema ddlSchema) {
-        Map<String, Object> params = new HashMap<>();
-        params.put("isDropColumn", ddlSchema.isDropColumn());
-        params.put("columnName", ddlSchema.getColumnName());
-        return params;
+    private boolean checkSchemaChange(DDLSchema ddlSchema) throws IOException, IllegalArgumentException {
+        Map<String, Object> param = SchemaChangeManager.buildRequestParam(ddlSchema.isDropColumn(), ddlSchema.getColumnName());
+        return schemaChangeManager.checkSchemaChange(database, table, param);
     }
 
     /**
@@ -364,19 +315,6 @@
         return params;
     }
 
-    private boolean execSchemaChange(String ddl) throws IOException, IllegalArgumentException {
-        Map<String, String> param = new HashMap<>();
-        param.put("stmt", ddl);
-        String requestUrl = String.format(SCHEMA_CHANGE_API,
-                RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database);
-        HttpPost httpPost = new HttpPost(requestUrl);
-        httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
-        httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "application/json");
-        httpPost.setEntity(new StringEntity(objectMapper.writeValueAsString(param)));
-        boolean success = handleResponse(httpPost);
-        return success;
-    }
-
     protected String extractDatabase(JsonNode record) {
         if (record.get("source").has("schema")) {
             // compatible with schema
@@ -390,25 +328,7 @@
         return extractJsonNode(record.get("source"), "table");
     }
 
-    private boolean handleResponse(HttpUriRequest request) {
-        try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
-            CloseableHttpResponse response = httpclient.execute(request);
-            final int statusCode = response.getStatusLine().getStatusCode();
-            if (statusCode == 200 && response.getEntity() != null) {
-                String loadResult = EntityUtils.toString(response.getEntity());
-                Map<String, Object> responseMap = objectMapper.readValue(loadResult, Map.class);
-                String code = responseMap.getOrDefault("code", "-1").toString();
-                if (code.equals("0")) {
-                    return true;
-                } else {
-                    LOG.error("schema change response:{}", loadResult);
-                }
-            }
-        } catch (Exception e) {
-            LOG.error("http request error,", e);
-        }
-        return false;
-    }
+
 
     private String extractJsonNode(JsonNode record, String key) {
         return record != null && record.get(key) != null &&
@@ -457,10 +377,7 @@
         return null;
     }
 
-    private String authHeader() {
-        return "Basic " + new String(Base64.encodeBase64(
-                (dorisOptions.getUsername() + ":" + dorisOptions.getPassword()).getBytes(StandardCharsets.UTF_8)));
-    }
+
 
     @VisibleForTesting
     public void fillOriginSchema(JsonNode columns) {
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/SchemaChangeHelperTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java
similarity index 98%
rename from flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/SchemaChangeHelperTest.java
rename to flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java
index 62906df..7e4fb47 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/SchemaChangeHelperTest.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java
@@ -15,10 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.flink.sink.writer;
+package org.apache.doris.flink.sink.schema;
 
 import org.apache.doris.flink.catalog.doris.FieldSchema;
-
 import org.apache.flink.shaded.guava30.com.google.common.collect.Maps;
 import org.junit.Assert;
 import org.junit.Before;