[improve]Transform the End-To-End(E2E) tasks on the assembly line (#466)

diff --git a/.licenserc.yaml b/.licenserc.yaml
index 6048839..27e1080 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -12,5 +12,6 @@
     - '.github/PULL_REQUEST_TEMPLATE.md'
     - '.licenserc.yaml'
     - 'custom_env.sh.tpl'
+    - 'flink-doris-connector/src/test/resources/container/'
 
   comment: on-failure
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index ec3b452..61beea1 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -20,6 +20,7 @@
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.utils.MultipleParameterTool;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
@@ -36,11 +37,14 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 /** cdc sync tools. */
 public class CdcTools {
     private static final List<String> EMPTY_KEYS =
             Collections.singletonList(DatabaseSyncConfig.PASSWORD);
+    private static StreamExecutionEnvironment flinkEnvironmentForTesting;
+    private static JobClient jobClient;
 
     public static void main(String[] args) throws Exception {
         System.out.println("Input args: " + Arrays.asList(args) + ".\n");
@@ -146,7 +150,10 @@
                 new DorisTableConfig(getConfigMap(params, DatabaseSyncConfig.TABLE_CONF));
         Configuration sinkConfig = Configuration.fromMap(sinkMap);
 
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamExecutionEnvironment env =
+                Objects.nonNull(flinkEnvironmentForTesting)
+                        ? flinkEnvironmentForTesting
+                        : StreamExecutionEnvironment.getExecutionEnvironment();
         databaseSync
                 .setEnv(env)
                 .setDatabase(database)
@@ -174,7 +181,23 @@
                             config.getString(
                                     DatabaseSyncConfig.DATABASE_NAME, DatabaseSyncConfig.DB));
         }
-        env.execute(jobName);
+        if (Objects.nonNull(flinkEnvironmentForTesting)) {
+            jobClient = env.executeAsync();
+        } else {
+            env.execute(jobName);
+        }
+    }
+
+    @VisibleForTesting
+    public static JobClient getJobClient() {
+        return jobClient;
+    }
+
+    // Only for testing, please do not use it in actual environment
+    @VisibleForTesting
+    public static void setStreamExecutionEnvironmentForTesting(
+            StreamExecutionEnvironment environment) {
+        flinkEnvironmentForTesting = environment;
     }
 
     @VisibleForTesting
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
deleted file mode 100644
index 5097a21..0000000
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
+++ /dev/null
@@ -1,326 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.flink;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.api.common.time.Deadline;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
-import org.apache.flink.runtime.minicluster.MiniCluster;
-import org.apache.flink.runtime.minicluster.RpcServiceSharing;
-import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
-import org.apache.flink.util.function.SupplierWithException;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.lang3.StringUtils;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.Network;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.utility.DockerLoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.InetAddress;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.locks.LockSupport;
-
-public abstract class DorisTestBase {
-    protected static final Logger LOG = LoggerFactory.getLogger(DorisTestBase.class);
-    private static final String DEFAULT_DOCKER_IMAGE = "apache/doris:doris-all-in-one-2.1.0";
-    protected static final String DORIS_DOCKER_IMAGE =
-            System.getProperty("image") == null
-                    ? DEFAULT_DOCKER_IMAGE
-                    : System.getProperty("image");
-    private static final String DRIVER_JAR =
-            "https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";
-    protected static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
-    protected static final String URL = "jdbc:mysql://%s:9030";
-    protected static final String USERNAME = "root";
-    protected static final String PASSWORD = "";
-    protected static final GenericContainer DORIS_CONTAINER = createDorisContainer();
-
-    protected static String getFenodes() {
-        return DORIS_CONTAINER.getHost() + ":8030";
-    }
-
-    protected static String getBenodes() {
-        return DORIS_CONTAINER.getHost() + ":8040";
-    }
-
-    protected static String getJdbcUrl() {
-        return String.format(URL, DORIS_CONTAINER.getHost());
-    }
-
-    protected static String getHost() {
-        return DORIS_CONTAINER.getHost();
-    }
-
-    static {
-        startContainers();
-    }
-
-    public static void startContainers() {
-        try {
-            LOG.info("Starting doris containers...");
-            // singleton doris container
-            DORIS_CONTAINER.start();
-            initializeJdbcConnection();
-        } catch (Exception ex) {
-            LOG.error("Failed to start containers doris, ", ex);
-        }
-        LOG.info("Containers doris are started.");
-    }
-
-    public static GenericContainer createDorisContainer() {
-        LOG.info("Create doris containers...");
-        GenericContainer container =
-                new GenericContainer<>(DORIS_DOCKER_IMAGE)
-                        .withNetwork(Network.newNetwork())
-                        .withNetworkAliases("DorisContainer")
-                        .withPrivilegedMode(true)
-                        .withLogConsumer(
-                                new Slf4jLogConsumer(
-                                        DockerLoggerFactory.getLogger(DORIS_DOCKER_IMAGE)))
-                        .withExposedPorts(8030, 9030, 8040, 9060);
-
-        container.setPortBindings(
-                Lists.newArrayList(
-                        String.format("%s:%s", "8030", "8030"),
-                        String.format("%s:%s", "9030", "9030"),
-                        String.format("%s:%s", "9060", "9060"),
-                        String.format("%s:%s", "8040", "8040")));
-
-        return container;
-    }
-
-    protected static void initializeJdbcConnection() throws Exception {
-        URLClassLoader urlClassLoader =
-                new URLClassLoader(
-                        new URL[] {new URL(DRIVER_JAR)}, DorisTestBase.class.getClassLoader());
-        LOG.info("Try to connect to Doris...");
-        Thread.currentThread().setContextClassLoader(urlClassLoader);
-        try (Connection connection =
-                        DriverManager.getConnection(
-                                String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
-                Statement statement = connection.createStatement()) {
-            ResultSet resultSet;
-            do {
-                LOG.info("Wait for the Backend to start successfully...");
-                resultSet = statement.executeQuery("show backends");
-            } while (!isBeReady(resultSet, Duration.ofSeconds(1L)));
-        }
-        LOG.info("Connected to Doris successfully...");
-        printClusterStatus();
-    }
-
-    private static boolean isBeReady(ResultSet rs, Duration duration) throws SQLException {
-        LockSupport.parkNanos(duration.toNanos());
-        if (rs.next()) {
-            String isAlive = rs.getString("Alive").trim();
-            String totalCap = rs.getString("TotalCapacity").trim();
-            return "true".equalsIgnoreCase(isAlive) && !"0.000".equalsIgnoreCase(totalCap);
-        }
-        return false;
-    }
-
-    protected static void printClusterStatus() throws Exception {
-        LOG.info("Current machine IP: {}", InetAddress.getLocalHost());
-        echo("sh", "-c", "cat /proc/cpuinfo | grep 'cpu cores' | uniq");
-        echo("sh", "-c", "free -h");
-        try (Connection connection =
-                        DriverManager.getConnection(
-                                String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
-                Statement statement = connection.createStatement()) {
-            ResultSet showFrontends = statement.executeQuery("show frontends");
-            LOG.info("Frontends status: {}", convertList(showFrontends));
-            ResultSet showBackends = statement.executeQuery("show backends");
-            LOG.info("Backends status: {}", convertList(showBackends));
-        }
-    }
-
-    static void echo(String... cmd) {
-        try {
-            Process p = Runtime.getRuntime().exec(cmd);
-            InputStream is = p.getInputStream();
-            BufferedReader reader = new BufferedReader(new InputStreamReader(is));
-            String line;
-            while ((line = reader.readLine()) != null) {
-                System.out.println(line);
-            }
-            p.waitFor();
-            is.close();
-            reader.close();
-            p.destroy();
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-    private static List<Map> convertList(ResultSet rs) throws SQLException {
-        List<Map> list = new ArrayList<>();
-        ResultSetMetaData metaData = rs.getMetaData();
-        int columnCount = metaData.getColumnCount();
-        while (rs.next()) {
-            Map<String, Object> rowData = new HashMap<>();
-            for (int i = 1; i <= columnCount; i++) {
-                rowData.put(metaData.getColumnName(i), rs.getObject(i));
-            }
-            list.add(rowData);
-        }
-        return list;
-    }
-
-    public void checkResult(List<String> expected, String query, int columnSize) throws Exception {
-        List<String> actual = new ArrayList<>();
-        try (Connection connection =
-                        DriverManager.getConnection(
-                                String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
-                Statement statement = connection.createStatement()) {
-            ResultSet sinkResultSet = statement.executeQuery(query);
-            while (sinkResultSet.next()) {
-                List<String> row = new ArrayList<>();
-                for (int i = 1; i <= columnSize; i++) {
-                    Object value = sinkResultSet.getObject(i);
-                    if (value == null) {
-                        row.add("null");
-                    } else {
-                        row.add(value.toString());
-                    }
-                }
-                actual.add(StringUtils.join(row, ","));
-            }
-        }
-        Assert.assertArrayEquals(expected.toArray(), actual.toArray());
-    }
-
-    @Rule
-    public final MiniClusterWithClientResource miniClusterResource =
-            new MiniClusterWithClientResource(
-                    new MiniClusterResourceConfiguration.Builder()
-                            .setNumberTaskManagers(1)
-                            .setNumberSlotsPerTaskManager(2)
-                            .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
-                            .withHaLeadershipControl()
-                            .build());
-
-    /** The type of failover. */
-    protected enum FailoverType {
-        TM,
-        JM,
-        NONE
-    }
-
-    protected static void triggerFailover(
-            FailoverType type, JobID jobId, MiniCluster miniCluster, Runnable afterFailAction)
-            throws Exception {
-        switch (type) {
-            case TM:
-                restartTaskManager(miniCluster, afterFailAction);
-                break;
-            case JM:
-                triggerJobManagerFailover(jobId, miniCluster, afterFailAction);
-                break;
-            case NONE:
-                break;
-            default:
-                throw new IllegalStateException("Unexpected value: " + type);
-        }
-    }
-
-    protected static void triggerJobManagerFailover(
-            JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) throws Exception {
-        final HaLeadershipControl haLeadershipControl = miniCluster.getHaLeadershipControl().get();
-        haLeadershipControl.revokeJobMasterLeadership(jobId).get();
-        afterFailAction.run();
-        haLeadershipControl.grantJobMasterLeadership(jobId).get();
-    }
-
-    protected static void restartTaskManager(MiniCluster miniCluster, Runnable afterFailAction)
-            throws Exception {
-        miniCluster.terminateTaskManager(0).get();
-        afterFailAction.run();
-        miniCluster.startTaskManager();
-    }
-
-    public static void waitForJobStatus(
-            JobClient client, List<JobStatus> expectedStatus, Deadline deadline) throws Exception {
-        waitUntilCondition(
-                () -> {
-                    JobStatus currentStatus = (JobStatus) client.getJobStatus().get();
-                    if (expectedStatus.contains(currentStatus)) {
-                        return true;
-                    } else if (currentStatus.isTerminalState()) {
-                        try {
-                            client.getJobExecutionResult().get();
-                        } catch (Exception var4) {
-                            throw new IllegalStateException(
-                                    String.format(
-                                            "Job has entered %s state, but expecting %s",
-                                            currentStatus, expectedStatus),
-                                    var4);
-                        }
-
-                        throw new IllegalStateException(
-                                String.format(
-                                        "Job has entered a terminal state %s, but expecting %s",
-                                        currentStatus, expectedStatus));
-                    } else {
-                        return false;
-                    }
-                },
-                deadline,
-                100L,
-                "Condition was not met in given timeout.");
-    }
-
-    public static void waitUntilCondition(
-            SupplierWithException<Boolean, Exception> condition,
-            Deadline timeout,
-            long retryIntervalMillis,
-            String errorMsg)
-            throws Exception {
-        while (timeout.hasTimeLeft() && !(Boolean) condition.get()) {
-            long timeLeft = Math.max(0L, timeout.timeLeft().toMillis());
-            Thread.sleep(Math.min(retryIntervalMillis, timeLeft));
-        }
-
-        if (!timeout.hasTimeLeft()) {
-            throw new TimeoutException(errorMsg);
-        }
-    }
-}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisCatalogITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisCatalogITCase.java
index 712b188..b3a3ce0 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisCatalogITCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisCatalogITCase.java
@@ -40,8 +40,8 @@
 import org.apache.flink.util.CollectionUtil;
 
 import com.google.common.collect.Lists;
-import org.apache.doris.flink.DorisTestBase;
 import org.apache.doris.flink.cfg.DorisConnectionOptions;
+import org.apache.doris.flink.container.AbstractITCaseService;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -62,40 +62,14 @@
 import static org.junit.Assert.assertTrue;
 
 /** Class for unit tests to run on catalogs. */
-public class DorisCatalogITCase extends DorisTestBase {
+public class DorisCatalogITCase extends AbstractITCaseService {
     private static final String TEST_CATALOG_NAME = "doris_catalog";
-    private static final String TEST_FENODES = getFenodes();
-    private static final String TEST_JDBCURL = getJdbcUrl();
-    private static final String TEST_USERNAME = USERNAME;
-    private static final String TEST_PWD = PASSWORD;
-    //    private static final String TEST_FENODES = "127.0.0.1:8030";
-    //    private static final String TEST_JDBCURL = "jdbc:mysql://127.0.0.1:9030";
-    //    private static final String TEST_USERNAME = "root";
-    //    private static final String TEST_PWD = "";
     private static final String TEST_DB = "catalog_db";
     private static final String TEST_TABLE = "t_all_types";
     private static final String TEST_TABLE_SINK = "t_all_types_sink";
     private static final String TEST_TABLE_SINK_GROUPBY = "t_all_types_sink_groupby";
 
-    protected static final Schema TABLE_SCHEMA =
-            Schema.newBuilder()
-                    .column("id", DataTypes.STRING())
-                    .column("c_boolean", DataTypes.BOOLEAN())
-                    .column("c_char", DataTypes.CHAR(1))
-                    .column("c_date", DataTypes.DATE())
-                    .column("c_datetime", DataTypes.TIMESTAMP(0))
-                    .column("c_decimal", DataTypes.DECIMAL(10, 2))
-                    .column("c_double", DataTypes.DOUBLE())
-                    .column("c_float", DataTypes.FLOAT())
-                    .column("c_int", DataTypes.INT())
-                    .column("c_bigint", DataTypes.BIGINT())
-                    .column("c_largeint", DataTypes.STRING())
-                    .column("c_smallint", DataTypes.SMALLINT())
-                    .column("c_string", DataTypes.STRING())
-                    .column("c_tinyint", DataTypes.TINYINT())
-                    .build();
-
-    protected static final TableSchema TABLE_SCHEMA_1 =
+    private static final TableSchema TABLE_SCHEMA =
             TableSchema.builder()
                     .field("id", new AtomicDataType(new VarCharType(false, 128)))
                     .field("c_boolean", DataTypes.BOOLEAN())
@@ -162,10 +136,10 @@
                     TableNotExistException, DatabaseNotExistException {
         DorisConnectionOptions connectionOptions =
                 new DorisConnectionOptions.DorisConnectionOptionsBuilder()
-                        .withFenodes(TEST_FENODES)
-                        .withJdbcUrl(TEST_JDBCURL)
-                        .withUsername(TEST_USERNAME)
-                        .withPassword(TEST_PWD)
+                        .withFenodes(getFenodes())
+                        .withJdbcUrl(getDorisQueryUrl())
+                        .withUsername(getDorisUsername())
+                        .withPassword(getDorisPassword())
                         .build();
 
         Map<String, String> props = new HashMap<>();
@@ -272,7 +246,7 @@
         CatalogBaseTable table = catalog.getTable(new ObjectPath(TEST_DB, TEST_TABLE));
         Schema actual = table.getUnresolvedSchema();
         assertEquals(
-                TABLE_SCHEMA_1.getFieldNames(),
+                TABLE_SCHEMA.getFieldNames(),
                 actual.getColumns().stream().map(Schema.UnresolvedColumn::getName).toArray());
     }
 
@@ -308,7 +282,7 @@
     public void testCreateTable() throws TableAlreadyExistException, DatabaseNotExistException {
         CatalogTableImpl catalogTable =
                 new CatalogTableImpl(
-                        TABLE_SCHEMA_1,
+                        TABLE_SCHEMA,
                         new HashMap<String, String>() {
                             {
                                 put("connector", "doris-1");
@@ -425,7 +399,7 @@
 
     private static CatalogTable createTable() {
         return new CatalogTableImpl(
-                TABLE_SCHEMA_1,
+                TABLE_SCHEMA,
                 new HashMap<String, String>() {
                     {
                         put("connector", "doris");
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java
new file mode 100644
index 0000000..967e6f3
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.container;
+
+import org.apache.doris.flink.container.instance.ContainerService;
+import org.apache.doris.flink.container.instance.DorisContainer;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.util.Objects;
+
+public abstract class AbstractContainerTestBase {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractContainerTestBase.class);
+    private static ContainerService dorisContainerService;
+
+    @BeforeClass
+    public static void initContainers() {
+        LOG.info("Trying to start doris containers.");
+        initDorisContainer();
+    }
+
+    private static void initDorisContainer() {
+        if (Objects.nonNull(dorisContainerService) && dorisContainerService.isRunning()) {
+            LOG.info("The doris container has been started and is running status.");
+            return;
+        }
+        dorisContainerService = new DorisContainer();
+        dorisContainerService.startContainer();
+        LOG.info("Doris container was started.");
+    }
+
+    protected static Connection getDorisQueryConnection() {
+        return dorisContainerService.getQueryConnection();
+    }
+
+    protected String getFenodes() {
+        return dorisContainerService.getFenodes();
+    }
+
+    protected String getBenodes() {
+        return dorisContainerService.getBenodes();
+    }
+
+    protected String getDorisUsername() {
+        return dorisContainerService.getUsername();
+    }
+
+    protected String getDorisPassword() {
+        return dorisContainerService.getPassword();
+    }
+
+    protected String getDorisQueryUrl() {
+        return String.format(
+                "jdbc:mysql://%s:%s",
+                getDorisInstanceHost(), dorisContainerService.getMappedPort(9030));
+    }
+
+    protected String getDorisInstanceHost() {
+        return dorisContainerService.getInstanceHost();
+    }
+
+    public static void closeContainers() {
+        LOG.info("Starting to close containers.");
+        closeDorisContainer();
+    }
+
+    private static void closeDorisContainer() {
+        if (Objects.isNull(dorisContainerService)) {
+            return;
+        }
+        dorisContainerService.close();
+        LOG.info("Doris container was closed.");
+    }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java
new file mode 100644
index 0000000..527f82c
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.container;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.apache.doris.flink.container.instance.ContainerService;
+import org.apache.doris.flink.container.instance.MySQLContainer;
+import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.apache.doris.flink.tools.cdc.CdcTools;
+import org.apache.doris.flink.tools.cdc.DatabaseSyncConfig;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Semaphore;
+
+public abstract class AbstractE2EService extends AbstractContainerTestBase {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractE2EService.class);
+    private static ContainerService mysqlContainerService;
+    private static JobClient jobClient;
+    protected static final Semaphore SEMAPHORE = new Semaphore(1);
+    protected static final String SINK_CONF = "--" + DatabaseSyncConfig.SINK_CONF;
+    protected static final String DORIS_DATABASE = "--database";
+    protected static final String HOSTNAME = "hostname";
+    protected static final String PORT = "port";
+    protected static final String USERNAME = "username";
+    protected static final String PASSWORD = "password";
+    protected static final String DATABASE_NAME = "database-name";
+    protected static final String FENODES = "fenodes";
+    protected static final String JDBC_URL = "jdbc-url";
+    protected static final String SINK_LABEL_PREFIX = "sink.label-prefix";
+
+    @BeforeClass
+    public static void initE2EContainers() {
+        LOG.info("Trying to Start init E2E containers.");
+        initMySQLContainer();
+    }
+
+    private static void initMySQLContainer() {
+        if (Objects.nonNull(mysqlContainerService) && mysqlContainerService.isRunning()) {
+            LOG.info("The MySQL container has been started and is running status.");
+            return;
+        }
+        mysqlContainerService = new MySQLContainer();
+        mysqlContainerService.startContainer();
+        LOG.info("Mysql container was started.");
+    }
+
+    protected String getMySQLInstanceHost() {
+        return mysqlContainerService.getInstanceHost();
+    }
+
+    protected Integer getMySQLQueryPort() {
+        return mysqlContainerService.getMappedPort(3306);
+    }
+
+    protected String getMySQLUsername() {
+        return mysqlContainerService.getUsername();
+    }
+
+    protected String getMySQLPassword() {
+        return mysqlContainerService.getPassword();
+    }
+
+    protected Connection getMySQLQueryConnection() {
+        return mysqlContainerService.getQueryConnection();
+    }
+
+    protected void submitE2EJob(String jobName, String[] args) {
+        try {
+            LOG.info("{} e2e job will submit to start. ", jobName);
+            CdcTools.setStreamExecutionEnvironmentForTesting(configFlinkEnvironment());
+            CdcTools.main(args);
+            jobClient = CdcTools.getJobClient();
+            if (Objects.isNull(jobClient)) {
+                LOG.warn("Failed get flink job client. jobName={}", jobName);
+                throw new DorisRuntimeException("Failed get flink job client. jobName=" + jobName);
+            }
+        } catch (Exception e) {
+            LOG.warn("Failed to submit e2e job. jobName={}", jobName);
+            throw new DorisRuntimeException(e);
+        }
+    }
+
+    protected void cancelE2EJob(String jobName) {
+        LOG.info("{} e2e job will cancel", jobName);
+        jobClient.cancel();
+    }
+
+    private StreamExecutionEnvironment configFlinkEnvironment() {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        Map<String, String> flinkMap = new HashMap<>();
+        flinkMap.put("execution.checkpointing.interval", "10s");
+        flinkMap.put("pipeline.operator-chaining", "false");
+        flinkMap.put("parallelism.default", "1");
+        Configuration configuration = Configuration.fromMap(flinkMap);
+        env.configure(configuration);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        return env;
+    }
+
+    protected void setSinkConfDefaultConfig(List<String> argList) {
+        // set default doris sink config
+        argList.add(SINK_CONF);
+        argList.add(FENODES + "=" + getFenodes());
+        argList.add(SINK_CONF);
+        argList.add(USERNAME + "=" + getDorisUsername());
+        argList.add(SINK_CONF);
+        argList.add(PASSWORD + "=" + getDorisPassword());
+        argList.add(SINK_CONF);
+        argList.add(FENODES + "=" + getFenodes());
+        argList.add(SINK_CONF);
+        argList.add(JDBC_URL + "=" + getDorisQueryUrl());
+        argList.add(SINK_CONF);
+        argList.add(SINK_LABEL_PREFIX + "=" + "label");
+    }
+
+    public static void closeE2EContainers() {
+        LOG.info("Starting to close E2E containers.");
+        closeMySQLContainer();
+    }
+
+    private static void closeMySQLContainer() {
+        if (Objects.isNull(mysqlContainerService)) {
+            return;
+        }
+        mysqlContainerService.close();
+        LOG.info("Mysql container was closed.");
+    }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java
new file mode 100644
index 0000000..956b8be
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.container;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.junit.Rule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+public abstract class AbstractITCaseService extends AbstractContainerTestBase {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractITCaseService.class);
+
+    protected static void waitForJobStatus(
+            JobClient client, List<JobStatus> expectedStatus, Deadline deadline) throws Exception {
+        waitUntilCondition(
+                () -> {
+                    JobStatus currentStatus = (JobStatus) client.getJobStatus().get();
+                    if (expectedStatus.contains(currentStatus)) {
+                        return true;
+                    } else if (currentStatus.isTerminalState()) {
+                        try {
+                            client.getJobExecutionResult().get();
+                        } catch (Exception var4) {
+                            throw new IllegalStateException(
+                                    String.format(
+                                            "Job has entered %s state, but expecting %s",
+                                            currentStatus, expectedStatus),
+                                    var4);
+                        }
+
+                        throw new IllegalStateException(
+                                String.format(
+                                        "Job has entered a terminal state %s, but expecting %s",
+                                        currentStatus, expectedStatus));
+                    } else {
+                        return false;
+                    }
+                },
+                deadline,
+                100L,
+                "Condition was not met in given timeout.");
+    }
+
+    protected static void waitUntilCondition(
+            SupplierWithException<Boolean, Exception> condition,
+            Deadline timeout,
+            long retryIntervalMillis,
+            String errorMsg)
+            throws Exception {
+        while (timeout.hasTimeLeft() && !(Boolean) condition.get()) {
+            long timeLeft = Math.max(0L, timeout.timeLeft().toMillis());
+            Thread.sleep(Math.min(retryIntervalMillis, timeLeft));
+        }
+
+        if (!timeout.hasTimeLeft()) {
+            throw new TimeoutException(errorMsg);
+        }
+    }
+
+    @Rule
+    public final MiniClusterWithClientResource miniClusterResource =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(2)
+                            .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                            .withHaLeadershipControl()
+                            .build());
+
+    /** The type of failover. */
+    protected enum FailoverType {
+        TM,
+        JM,
+        NONE
+    }
+
+    protected static void triggerFailover(
+            FailoverType type, JobID jobId, MiniCluster miniCluster, Runnable afterFailAction)
+            throws Exception {
+        LOG.info("Will job trigger failover. type={}, jobId={}", type, jobId);
+        switch (type) {
+            case TM:
+                restartTaskManager(miniCluster, afterFailAction);
+                break;
+            case JM:
+                triggerJobManagerFailover(jobId, miniCluster, afterFailAction);
+                break;
+            case NONE:
+                break;
+            default:
+                throw new IllegalStateException("Unexpected value: " + type);
+        }
+    }
+
+    protected static void restartTaskManager(MiniCluster miniCluster, Runnable afterFailAction)
+            throws Exception {
+        LOG.info("flink cluster will terminate task manager.");
+        miniCluster.terminateTaskManager(0).get();
+        afterFailAction.run();
+        LOG.info("flink cluster will start task manager.");
+        miniCluster.startTaskManager();
+    }
+
+    protected static void triggerJobManagerFailover(
+            JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) throws Exception {
+        LOG.info("flink cluster will revoke job master leadership. jobId={}", jobId);
+        final HaLeadershipControl haLeadershipControl = miniCluster.getHaLeadershipControl().get();
+        haLeadershipControl.revokeJobMasterLeadership(jobId).get();
+        afterFailAction.run();
+        LOG.info("flink cluster will grant job master leadership. jobId={}", jobId);
+        haLeadershipControl.grantJobMasterLeadership(jobId).get();
+    }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/ContainerUtils.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/ContainerUtils.java
new file mode 100644
index 0000000..e4c99d5
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/ContainerUtils.java
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.container;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.junit.Assert;
+import org.slf4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class ContainerUtils {
+
+    public static void executeSQLStatement(Connection connection, Logger logger, String... sql) {
+        if (Objects.isNull(sql) || sql.length == 0) {
+            return;
+        }
+        try (Statement statement = connection.createStatement()) {
+            for (String s : sql) {
+                if (StringUtils.isNotEmpty(s)) {
+                    logger.info("start to execute sql={}", s);
+                    statement.execute(s);
+                }
+            }
+        } catch (SQLException e) {
+            throw new DorisRuntimeException(e);
+        }
+    }
+
+    public static String loadFileContent(String resourcePath) {
+        try (InputStream stream =
+                ContainerUtils.class.getClassLoader().getResourceAsStream(resourcePath)) {
+            return new BufferedReader(new InputStreamReader(Objects.requireNonNull(stream)))
+                    .lines()
+                    .collect(Collectors.joining("\n"));
+        } catch (IOException e) {
+            throw new DorisRuntimeException("Failed to read " + resourcePath + " file.", e);
+        }
+    }
+
+    public static List<String> parseFileArgs(String resourcePath) {
+        String fileContent = ContainerUtils.loadFileContent(resourcePath);
+        String[] args = fileContent.split("\n");
+        List<String> argList = new ArrayList<>();
+        for (String arg : args) {
+            String[] split = arg.trim().split("\\s+");
+            List<String> stringList =
+                    Arrays.stream(split)
+                            .map(ContainerUtils::removeQuotes)
+                            .collect(Collectors.toList());
+            argList.addAll(stringList);
+        }
+        return argList;
+    }
+
+    private static String removeQuotes(String str) {
+        if (str == null || str.length() < 2) {
+            return str;
+        }
+        if (str.startsWith("\"") && str.endsWith("\"")) {
+            return str.substring(1, str.length() - 1);
+        }
+        if (str.startsWith("\\'") && str.endsWith("\\'")) {
+            return str.substring(1, str.length() - 1);
+        }
+        return str;
+    }
+
+    public static String[] parseFileContentSQL(String resourcePath) {
+        String fileContent = loadFileContent(resourcePath);
+        return Arrays.stream(fileContent.split(";")).map(String::trim).toArray(String[]::new);
+    }
+
+    public static void checkResult(
+            Connection connection,
+            Logger logger,
+            List<String> expected,
+            String query,
+            int columnSize) {
+        List<String> actual = new ArrayList<>();
+        try (Statement statement = connection.createStatement()) {
+            ResultSet sinkResultSet = statement.executeQuery(query);
+            while (sinkResultSet.next()) {
+                List<String> row = new ArrayList<>();
+                for (int i = 1; i <= columnSize; i++) {
+                    Object value = sinkResultSet.getObject(i);
+                    if (value == null) {
+                        row.add("null");
+                    } else {
+                        row.add(value.toString());
+                    }
+                }
+                actual.add(StringUtils.join(row, ","));
+            }
+        } catch (SQLException e) {
+            logger.info(
+                    "Failed to check query result. expected={}, actual={}",
+                    String.join(",", expected),
+                    String.join(",", actual),
+                    e);
+            throw new DorisRuntimeException(e);
+        }
+        logger.info(
+                "checking test result. expected={}, actual={}",
+                String.join(",", expected),
+                String.join(",", actual));
+        Assert.assertArrayEquals(expected.toArray(), actual.toArray());
+    }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java
new file mode 100644
index 0000000..fcb4858
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.container.e2e;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import org.apache.doris.flink.container.AbstractE2EService;
+import org.apache.doris.flink.container.ContainerUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+public class Doris2DorisE2ECase extends AbstractE2EService {
+    private static final Logger LOG = LoggerFactory.getLogger(Doris2DorisE2ECase.class);
+    private static final String DATABASE_SOURCE = "test_doris2doris_source";
+    private static final String DATABASE_SINK = "test_doris2doris_sink";
+    private static final String TABLE = "test_tbl";
+
+    @Before
+    public void setUp() throws InterruptedException {
+        LOG.info("Doris2DorisE2ECase attempting to acquire semaphore.");
+        SEMAPHORE.acquire();
+        LOG.info("Doris2DorisE2ECase semaphore acquired.");
+    }
+
+    @Test
+    public void testDoris2Doris() throws Exception {
+        LOG.info("Start executing the test case of doris to doris.");
+        initializeDorisTable();
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        String sourceDDL =
+                String.format(
+                        "CREATE TABLE doris_source ("
+                                + "id  int,\n"
+                                + "c1  boolean,\n"
+                                + "c2  tinyint,\n"
+                                + "c3  smallint,\n"
+                                + "c4  int, \n"
+                                + "c5  bigint, \n"
+                                + "c6  string, \n"
+                                + "c7  float, \n"
+                                + "c8  double, \n"
+                                + "c9  decimal(12,4), \n"
+                                + "c10  date, \n"
+                                + "c11  TIMESTAMP, \n"
+                                + "c12  char(1), \n"
+                                + "c13  varchar(256), \n"
+                                + "c14  Array<String>, \n"
+                                + "c15  Map<String, String>, \n"
+                                + "c16  ROW<name String, age int>, \n"
+                                + "c17  STRING \n"
+                                + ") WITH ("
+                                + " 'connector' = 'doris',"
+                                + " 'fenodes' = '%s',"
+                                + " 'table.identifier' = '%s',"
+                                + " 'sink.label-prefix' = '"
+                                + UUID.randomUUID()
+                                + "',"
+                                + " 'username' = '%s',"
+                                + " 'password' = '%s'"
+                                + ")",
+                        getFenodes(),
+                        DATABASE_SOURCE + "." + TABLE,
+                        getDorisUsername(),
+                        getDorisPassword());
+        tEnv.executeSql(sourceDDL);
+
+        String sinkDDL =
+                String.format(
+                        "CREATE TABLE doris_sink ("
+                                + "id  int,\n"
+                                + "c1  boolean,\n"
+                                + "c2  tinyint,\n"
+                                + "c3  smallint,\n"
+                                + "c4  int, \n"
+                                + "c5  bigint, \n"
+                                + "c6  string, \n"
+                                + "c7  float, \n"
+                                + "c8  double, \n"
+                                + "c9  decimal(12,4), \n"
+                                + "c10  date, \n"
+                                + "c11  TIMESTAMP, \n"
+                                + "c12  char(1), \n"
+                                + "c13  varchar(256), \n"
+                                + "c14  Array<String>, \n"
+                                + "c15  Map<String, String>, \n"
+                                + "c16  ROW<name String, age int>, \n"
+                                + "c17  STRING \n"
+                                + ") WITH ("
+                                + " 'connector' = 'doris',"
+                                + " 'fenodes' = '%s',"
+                                + " 'sink.label-prefix' = '"
+                                + UUID.randomUUID()
+                                + "',"
+                                + " 'table.identifier' = '%s',"
+                                + " 'username' = '%s',"
+                                + " 'password' = '%s'"
+                                + ")",
+                        getFenodes(),
+                        DATABASE_SINK + "." + TABLE,
+                        getDorisUsername(),
+                        getDorisPassword());
+        tEnv.executeSql(sinkDDL);
+
+        tEnv.executeSql("INSERT INTO doris_sink SELECT * FROM doris_source").await();
+
+        TableResult tableResult = tEnv.executeSql("SELECT * FROM doris_sink");
+        List<Object> actual = new ArrayList<>();
+        try (CloseableIterator<Row> iterator = tableResult.collect()) {
+            while (iterator.hasNext()) {
+                actual.add(iterator.next().toString());
+            }
+        }
+        LOG.info("The actual data in the doris sink table is, actual={}", actual);
+
+        String[] expected =
+                new String[] {
+                    "+I[1, true, 127, 32767, 2147483647, 9223372036854775807, 123456789012345678901234567890, 3.14, 2.7182818284, 12345.6789, 2023-05-22, 2023-05-22T12:34:56, A, Example text, [item1, item2, item3], {key1=value1, key2=value2}, +I[John Doe, 30], {\"key\":\"value\"}]",
+                    "+I[2, false, -128, -32768, -2147483648, -9223372036854775808, -123456789012345678901234567890, -3.14, -2.7182818284, -12345.6789, 2024-01-01, 2024-01-01T00:00, B, Another example, [item4, item5, item6], {key3=value3, key4=value4}, +I[Jane Doe, 25], {\"another_key\":\"another_value\"}]"
+                };
+        Assert.assertArrayEquals(expected, actual.toArray(new String[0]));
+    }
+
+    private void initializeDorisTable() {
+        String[] sourceInitSql =
+                ContainerUtils.parseFileContentSQL(
+                        "container/e2e/doris2doris/test_doris2doris_source_test_tbl.sql");
+        ContainerUtils.executeSQLStatement(getDorisQueryConnection(), LOG, sourceInitSql);
+        String[] sinkInitSql =
+                ContainerUtils.parseFileContentSQL(
+                        "container/e2e/doris2doris/test_doris2doris_sink_test_tbl.sql");
+        ContainerUtils.executeSQLStatement(getDorisQueryConnection(), LOG, sinkInitSql);
+        LOG.info("Initialization of doris table successful.");
+    }
+
+    @After
+    public void close() {
+        try {
+            // Ensure that semaphore is always released
+        } finally {
+            LOG.info("Doris2DorisE2ECase releasing semaphore.");
+            SEMAPHORE.release();
+        }
+    }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java
new file mode 100644
index 0000000..68b5d43
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java
@@ -0,0 +1,391 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.container.e2e;
+
+import org.apache.doris.flink.container.AbstractE2EService;
+import org.apache.doris.flink.container.ContainerUtils;
+import org.apache.doris.flink.tools.cdc.DatabaseSyncConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class Mysql2DorisE2ECase extends AbstractE2EService {
+    private static final Logger LOG = LoggerFactory.getLogger(Mysql2DorisE2ECase.class);
+    private static final String DATABASE = "test_e2e_mysql";
+    private static final String CREATE_DATABASE = "CREATE DATABASE IF NOT EXISTS " + DATABASE;
+    private static final String MYSQL_CONF = "--" + DatabaseSyncConfig.MYSQL_CONF;
+
+    @Before
+    public void setUp() throws InterruptedException {
+        LOG.info("Mysql2DorisE2ECase attempting to acquire semaphore.");
+        SEMAPHORE.acquire();
+        LOG.info("Mysql2DorisE2ECase semaphore acquired.");
+    }
+
+    private List<String> setMysql2DorisDefaultConfig(List<String> argList) {
+        // set default mysql config
+        argList.add(MYSQL_CONF);
+        argList.add(HOSTNAME + "=" + getMySQLInstanceHost());
+        argList.add(MYSQL_CONF);
+        argList.add(PORT + "=" + getMySQLQueryPort());
+        argList.add(MYSQL_CONF);
+        argList.add(USERNAME + "=" + getMySQLUsername());
+        argList.add(MYSQL_CONF);
+        argList.add(PASSWORD + "=" + getMySQLPassword());
+        argList.add(MYSQL_CONF);
+        argList.add(DATABASE_NAME + "=" + DATABASE);
+
+        // set doris database
+        argList.add(DORIS_DATABASE);
+        argList.add(DATABASE);
+        setSinkConfDefaultConfig(argList);
+        return argList;
+    }
+
+    private void startMysql2DorisJob(String jobName, String resourcePath) {
+        LOG.info("start a mysql to doris job. jobName={}, resourcePath={}", jobName, resourcePath);
+        List<String> argList = ContainerUtils.parseFileArgs(resourcePath);
+        String[] args = setMysql2DorisDefaultConfig(argList).toArray(new String[0]);
+        submitE2EJob(jobName, args);
+    }
+
+    private void initMysqlEnvironment(String sourcePath) {
+        LOG.info("Initializing MySQL environment.");
+        ContainerUtils.executeSQLStatement(
+                getMySQLQueryConnection(), LOG, ContainerUtils.parseFileContentSQL(sourcePath));
+    }
+
+    private void initDorisEnvironment() {
+        LOG.info("Initializing Doris environment.");
+        ContainerUtils.executeSQLStatement(getDorisQueryConnection(), LOG, CREATE_DATABASE);
+        ContainerUtils.executeSQLStatement(
+                getDorisQueryConnection(),
+                LOG,
+                "DROP TABLE IF EXISTS test_e2e_mysql.tbl1",
+                "DROP TABLE IF EXISTS test_e2e_mysql.tbl2",
+                "DROP TABLE IF EXISTS test_e2e_mysql.tbl3",
+                "DROP TABLE IF EXISTS test_e2e_mysql.tbl4",
+                "DROP TABLE IF EXISTS test_e2e_mysql.tbl5");
+    }
+
+    private void initEnvironment(String jobName, String mysqlSourcePath) {
+        LOG.info(
+                "start to init mysql to doris environment. jobName={}, mysqlSourcePath={}",
+                jobName,
+                mysqlSourcePath);
+        initMysqlEnvironment(mysqlSourcePath);
+        initDorisEnvironment();
+    }
+
+    @Test
+    public void testMySQL2Doris() throws Exception {
+        String jobName = "testMySQL2Doris";
+        String resourcePath = "container/e2e/mysql2doris/testMySQL2Doris.txt";
+        initEnvironment(jobName, "container/e2e/mysql2doris/testMySQL2Doris_init.sql");
+        startMysql2DorisJob(jobName, resourcePath);
+
+        // wait 2 times checkpoint
+        Thread.sleep(20000);
+        LOG.info("Start to verify init result.");
+        List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3", "doris_5,5");
+        String sql1 =
+                "select * from ( select * from test_e2e_mysql.tbl1 union all select * from test_e2e_mysql.tbl2 union all select * from test_e2e_mysql.tbl3 union all select * from test_e2e_mysql.tbl5) res order by 1";
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, sql1, 2);
+
+        // add incremental data
+        ContainerUtils.executeSQLStatement(
+                getMySQLQueryConnection(),
+                LOG,
+                "insert into test_e2e_mysql.tbl1 values ('doris_1_1',10)",
+                "insert into test_e2e_mysql.tbl2 values ('doris_2_1',11)",
+                "insert into test_e2e_mysql.tbl3 values ('doris_3_1',12)",
+                "update test_e2e_mysql.tbl1 set age=18 where name='doris_1'",
+                "delete from test_e2e_mysql.tbl2 where name='doris_2'");
+        Thread.sleep(20000);
+
+        LOG.info("Start to verify incremental data result.");
+        List<String> expected2 =
+                Arrays.asList(
+                        "doris_1,18", "doris_1_1,10", "doris_2_1,11", "doris_3,3", "doris_3_1,12");
+        String sql2 =
+                "select * from ( select * from test_e2e_mysql.tbl1 union all select * from test_e2e_mysql.tbl2 union all select * from test_e2e_mysql.tbl3 ) res order by 1";
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected2, sql2, 2);
+
+        // mock schema change
+        LOG.info("start to schema change in mysql.");
+        ContainerUtils.executeSQLStatement(
+                getMySQLQueryConnection(),
+                LOG,
+                "alter table test_e2e_mysql.tbl1 add column c1 varchar(128)",
+                "alter table test_e2e_mysql.tbl1 drop column age");
+        Thread.sleep(10000);
+        ContainerUtils.executeSQLStatement(
+                getMySQLQueryConnection(),
+                LOG,
+                "insert into test_e2e_mysql.tbl1  values ('doris_1_1_1','c1_val')");
+        Thread.sleep(20000);
+        LOG.info("verify tal1 schema change.");
+        List<String> schemaChangeExpected =
+                Arrays.asList("doris_1,null", "doris_1_1,null", "doris_1_1_1,c1_val");
+        String schemaChangeSql = "select * from test_e2e_mysql.tbl1 order by 1";
+        ContainerUtils.checkResult(
+                getDorisQueryConnection(), LOG, schemaChangeExpected, schemaChangeSql, 2);
+        cancelE2EJob(jobName);
+    }
+
+    @Test
+    public void testAutoAddTable() throws InterruptedException {
+        String jobName = "testAutoAddTable";
+        initEnvironment(jobName, "container/e2e/mysql2doris/testAutoAddTable_init.sql");
+        startMysql2DorisJob(jobName, "container/e2e/mysql2doris/testAutoAddTable.txt");
+
+        // wait 2 times checkpoint
+        Thread.sleep(20000);
+        LOG.info("Start to verify init result.");
+        List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3", "doris_5,5");
+        String sql1 =
+                "select * from ( select * from test_e2e_mysql.tbl1 union all select * from test_e2e_mysql.tbl2 union all select * from test_e2e_mysql.tbl3 union all select * from test_e2e_mysql.tbl5) res order by 1";
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, sql1, 2);
+
+        // auto add table
+        LOG.info("starting to create auto_add table.");
+        ContainerUtils.executeSQLStatement(
+                getMySQLQueryConnection(),
+                LOG,
+                "CREATE TABLE test_e2e_mysql.auto_add ( \n"
+                        + "`name` varchar(256) primary key,\n"
+                        + "`age` int\n"
+                        + ")",
+                "insert into test_e2e_mysql.auto_add  values ('doris_4_1',4)",
+                "insert into test_e2e_mysql.auto_add  values ('doris_4_2',4)");
+        Thread.sleep(20000);
+        List<String> autoAddResult = Arrays.asList("doris_4_1,4", "doris_4_2,4");
+        String autoAddSql = "select * from test_e2e_mysql.auto_add order by 1";
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, autoAddResult, autoAddSql, 2);
+
+        // incremental data
+        LOG.info("starting to increment data.");
+        ContainerUtils.executeSQLStatement(
+                getMySQLQueryConnection(),
+                LOG,
+                "insert into test_e2e_mysql.tbl1 values ('doris_1_1',10)",
+                "insert into test_e2e_mysql.tbl2 values ('doris_2_1',11)",
+                "insert into test_e2e_mysql.tbl3 values ('doris_3_1',12)",
+                "update test_e2e_mysql.tbl1 set age=18 where name='doris_1'",
+                "delete from test_e2e_mysql.tbl2 where name='doris_2'",
+                "insert into test_e2e_mysql.auto_add values ('doris_4_3',43)",
+                "delete from test_e2e_mysql.auto_add where name='doris_4_2'",
+                "update test_e2e_mysql.auto_add set age=41 where name='doris_4_1'");
+        Thread.sleep(20000);
+        List<String> incrementDataExpected =
+                Arrays.asList(
+                        "doris_1,18",
+                        "doris_1_1,10",
+                        "doris_2_1,11",
+                        "doris_3,3",
+                        "doris_3_1,12",
+                        "doris_4_1,41",
+                        "doris_4_3,43");
+        String incrementDataSql =
+                "select * from ( select * from test_e2e_mysql.tbl1 union all select * from test_e2e_mysql.tbl2 union all select * from test_e2e_mysql.tbl3 union all select * from test_e2e_mysql.auto_add) res order by 1";
+        ContainerUtils.checkResult(
+                getDorisQueryConnection(), LOG, incrementDataExpected, incrementDataSql, 2);
+
+        // schema change
+        LOG.info("starting to mock schema change.");
+        ContainerUtils.executeSQLStatement(
+                getMySQLQueryConnection(),
+                LOG,
+                "alter table test_e2e_mysql.auto_add add column c1 varchar(128)",
+                "alter table test_e2e_mysql.auto_add drop column age",
+                "insert into test_e2e_mysql.auto_add values ('doris_4_4','c1_val')");
+        Thread.sleep(20000);
+        List<String> schemaChangeExpected =
+                Arrays.asList("doris_4_1,null", "doris_4_3,null", "doris_4_4,c1_val");
+        String schemaChangeSql = "select * from test_e2e_mysql.auto_add order by 1";
+        ContainerUtils.checkResult(
+                getDorisQueryConnection(), LOG, schemaChangeExpected, schemaChangeSql, 2);
+        cancelE2EJob(jobName);
+    }
+
+    @Test
+    public void testMySQL2DorisSQLParse() throws Exception {
+        String jobName = "testMySQL2DorisSQLParse";
+        String resourcePath = "container/e2e/mysql2doris/testMySQL2DorisSQLParse.txt";
+        initEnvironment(jobName, "container/e2e/mysql2doris/testMySQL2DorisSQLParse_init.sql");
+        startMysql2DorisJob(jobName, resourcePath);
+
+        // wait 2 times checkpoint
+        Thread.sleep(20000);
+        LOG.info("Start to verify init result.");
+        List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3", "doris_5,5");
+        String sql1 =
+                "select * from ( select * from test_e2e_mysql.tbl1 union all select * from test_e2e_mysql.tbl2 union all select * from test_e2e_mysql.tbl3 union all select * from test_e2e_mysql.tbl5) res order by 1";
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, sql1, 2);
+
+        // add incremental data
+        ContainerUtils.executeSQLStatement(
+                getMySQLQueryConnection(),
+                LOG,
+                "insert into test_e2e_mysql.tbl1 values ('doris_1_1',10)",
+                "insert into test_e2e_mysql.tbl2 values ('doris_2_1',11)",
+                "insert into test_e2e_mysql.tbl3 values ('doris_3_1',12)",
+                "update test_e2e_mysql.tbl1 set age=18 where name='doris_1'",
+                "delete from test_e2e_mysql.tbl2 where name='doris_2'");
+        Thread.sleep(20000);
+
+        LOG.info("Start to verify incremental data result.");
+        List<String> expected2 =
+                Arrays.asList(
+                        "doris_1,18", "doris_1_1,10", "doris_2_1,11", "doris_3,3", "doris_3_1,12");
+        String sql2 =
+                "select * from ( select * from test_e2e_mysql.tbl1 union all select * from test_e2e_mysql.tbl2 union all select * from test_e2e_mysql.tbl3 ) res order by 1";
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected2, sql2, 2);
+
+        // mock schema change
+        ContainerUtils.executeSQLStatement(
+                getMySQLQueryConnection(),
+                LOG,
+                "alter table test_e2e_mysql.tbl1 add column c1 varchar(128)",
+                "alter table test_e2e_mysql.tbl1 drop column age");
+        Thread.sleep(10000);
+        ContainerUtils.executeSQLStatement(
+                getMySQLQueryConnection(),
+                LOG,
+                "insert into test_e2e_mysql.tbl1  values ('doris_1_1_1','c1_val')");
+        Thread.sleep(20000);
+        LOG.info("verify tal1 schema change.");
+        List<String> schemaChangeExpected =
+                Arrays.asList("doris_1,null", "doris_1_1,null", "doris_1_1_1,c1_val");
+        String schemaChangeSql = "select * from test_e2e_mysql.tbl1 order by 1";
+        ContainerUtils.checkResult(
+                getDorisQueryConnection(), LOG, schemaChangeExpected, schemaChangeSql, 2);
+
+        // mock create table
+        LOG.info("start to create table in mysql.");
+        ContainerUtils.executeSQLStatement(
+                getMySQLQueryConnection(),
+                LOG,
+                "CREATE TABLE test_e2e_mysql.add_tbl (\n"
+                        + "    `name` varchar(256) primary key,\n"
+                        + "    `age` int\n"
+                        + ");",
+                "insert into test_e2e_mysql.add_tbl  values ('doris_1',1)",
+                "insert into test_e2e_mysql.add_tbl  values ('doris_2',2)",
+                "insert into test_e2e_mysql.add_tbl  values ('doris_3',3)");
+        Thread.sleep(20000);
+        List<String> createTableExpected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3");
+        String createTableSql = "select * from test_e2e_mysql.add_tbl order by 1";
+        ContainerUtils.checkResult(
+                getDorisQueryConnection(), LOG, createTableExpected, createTableSql, 2);
+        cancelE2EJob(jobName);
+    }
+
+    @Test
+    public void testMySQL2DorisByDefault() throws Exception {
+        String jobName = "testMySQL2DorisByDefault";
+        initEnvironment(jobName, "container/e2e/mysql2doris/testMySQL2DorisByDefault_init.sql");
+        startMysql2DorisJob(jobName, "container/e2e/mysql2doris/testMySQL2DorisByDefault.txt");
+
+        // wait 2 times checkpoint
+        Thread.sleep(20000);
+        LOG.info("Start to verify init result.");
+        List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3", "doris_5,5");
+        String sql1 =
+                "select * from ( select * from test_e2e_mysql.tbl1 union all select * from test_e2e_mysql.tbl2 union all select * from test_e2e_mysql.tbl3 union all select * from test_e2e_mysql.tbl5) res order by 1";
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, sql1, 2);
+
+        // add incremental data
+        ContainerUtils.executeSQLStatement(
+                getMySQLQueryConnection(),
+                LOG,
+                "insert into test_e2e_mysql.tbl1 values ('doris_1_1',10)",
+                "insert into test_e2e_mysql.tbl2 values ('doris_2_1',11)",
+                "insert into test_e2e_mysql.tbl3 values ('doris_3_1',12)",
+                "update test_e2e_mysql.tbl1 set age=18 where name='doris_1'",
+                "delete from test_e2e_mysql.tbl2 where name='doris_2'");
+        Thread.sleep(20000);
+
+        LOG.info("Start to verify incremental data result.");
+        List<String> expected2 =
+                Arrays.asList(
+                        "doris_1,18", "doris_1_1,10", "doris_2_1,11", "doris_3,3", "doris_3_1,12");
+        String sql2 =
+                "select * from ( select * from test_e2e_mysql.tbl1 union all select * from test_e2e_mysql.tbl2 union all select * from test_e2e_mysql.tbl3 ) res order by 1";
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected2, sql2, 2);
+        cancelE2EJob(jobName);
+    }
+
+    @Test
+    public void testMySQL2DorisEnableDelete() throws Exception {
+        String jobName = "testMySQL2DorisEnableDelete";
+        initEnvironment(jobName, "container/e2e/mysql2doris/testMySQL2DorisEnableDelete_init.sql");
+        startMysql2DorisJob(jobName, "container/e2e/mysql2doris/testMySQL2DorisEnableDelete.txt");
+
+        // wait 2 times checkpoint
+        Thread.sleep(20000);
+        LOG.info("Start to verify init result.");
+        List<String> initExpected =
+                Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3", "doris_5,5");
+        String sql1 =
+                "select * from ( select * from test_e2e_mysql.tbl1 union all select * from test_e2e_mysql.tbl2 union all select * from test_e2e_mysql.tbl3 union all select * from test_e2e_mysql.tbl5) res order by 1";
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, initExpected, sql1, 2);
+
+        // add incremental data
+        ContainerUtils.executeSQLStatement(
+                getMySQLQueryConnection(),
+                LOG,
+                "insert into test_e2e_mysql.tbl1 values ('doris_1_1',10)",
+                "insert into test_e2e_mysql.tbl2 values ('doris_2_1',11)",
+                "insert into test_e2e_mysql.tbl3 values ('doris_3_1',12)",
+                "update test_e2e_mysql.tbl1 set age=18 where name='doris_1'",
+                "delete from test_e2e_mysql.tbl2 where name='doris_2'",
+                "delete from test_e2e_mysql.tbl3 where name='doris_3'",
+                "delete from test_e2e_mysql.tbl5 where name='doris_5'");
+
+        Thread.sleep(20000);
+        List<String> expected =
+                Arrays.asList(
+                        "doris_1,18",
+                        "doris_1_1,10",
+                        "doris_2,2",
+                        "doris_2_1,11",
+                        "doris_3,3",
+                        "doris_3_1,12",
+                        "doris_5,5");
+        String sql =
+                "select * from ( select * from test_e2e_mysql.tbl1 union all select * from test_e2e_mysql.tbl2 union all select * from test_e2e_mysql.tbl3 union all select * from test_e2e_mysql.tbl5) res order by 1";
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, sql, 2);
+        cancelE2EJob(jobName);
+    }
+
+    @After
+    public void close() {
+        try {
+            // Ensure that semaphore is always released
+        } finally {
+            LOG.info("Mysql2DorisE2ECase releasing semaphore.");
+            SEMAPHORE.release();
+        }
+    }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/ContainerService.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/ContainerService.java
new file mode 100644
index 0000000..6ad1e3c
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/ContainerService.java
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.container.instance;
+
+import org.apache.doris.flink.exception.DorisRuntimeException;
+
+import java.sql.Connection;
+
+public interface ContainerService {
+
+    void startContainer();
+
+    boolean isRunning();
+
+    Connection getQueryConnection();
+
+    String getInstanceHost();
+
+    Integer getMappedPort(int originalPort);
+
+    String getUsername();
+
+    String getPassword();
+
+    default String getFenodes() {
+        throw new DorisRuntimeException("Only doris container can implemented.");
+    }
+
+    default String getBenodes() {
+        throw new DorisRuntimeException("Only doris container can implemented.");
+    }
+
+    void close();
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java
new file mode 100644
index 0000000..6af827b
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java
@@ -0,0 +1,234 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.container.instance;
+
+import com.google.common.collect.Lists;
+import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.LockSupport;
+
+public class DorisContainer implements ContainerService {
+    private static final Logger LOG = LoggerFactory.getLogger(DorisContainer.class);
+    private static final String DEFAULT_DOCKER_IMAGE = "apache/doris:doris-all-in-one-2.1.0";
+    private static final String DORIS_DOCKER_IMAGE =
+            System.getProperty("image") == null
+                    ? DEFAULT_DOCKER_IMAGE
+                    : System.getProperty("image");
+    private static final String DRIVER_JAR =
+            "https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";
+    private static final String JDBC_URL = "jdbc:mysql://%s:9030";
+    private static final String USERNAME = "root";
+    private static final String PASSWORD = "";
+    private final GenericContainer dorisContainer;
+
+    public DorisContainer() {
+        dorisContainer = createDorisContainer();
+    }
+
+    public GenericContainer createDorisContainer() {
+        LOG.info("Will create doris containers.");
+        GenericContainer container =
+                new GenericContainer<>(DORIS_DOCKER_IMAGE)
+                        .withNetwork(Network.newNetwork())
+                        .withNetworkAliases("DorisContainer")
+                        .withPrivilegedMode(true)
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(
+                                        DockerLoggerFactory.getLogger(DORIS_DOCKER_IMAGE)))
+                        .withExposedPorts(8030, 9030, 8040, 9060);
+
+        container.setPortBindings(
+                Lists.newArrayList(
+                        String.format("%s:%s", "8030", "8030"),
+                        String.format("%s:%s", "9030", "9030"),
+                        String.format("%s:%s", "9060", "9060"),
+                        String.format("%s:%s", "8040", "8040")));
+        return container;
+    }
+
+    public void startContainer() {
+        try {
+            LOG.info("Starting doris containers.");
+            // singleton doris container
+            dorisContainer.start();
+            initializeJdbcConnection();
+            printClusterStatus();
+        } catch (Exception ex) {
+            LOG.error("Failed to start containers doris", ex);
+            throw new DorisRuntimeException("Failed to start containers doris", ex);
+        }
+        LOG.info("Doris container started successfully.");
+    }
+
+    @Override
+    public boolean isRunning() {
+        return dorisContainer.isRunning();
+    }
+
+    @Override
+    public Connection getQueryConnection() {
+        LOG.info("Try to get query connection from doris.");
+        String jdbcUrl = String.format(JDBC_URL, dorisContainer.getHost());
+        try {
+            return DriverManager.getConnection(jdbcUrl, USERNAME, PASSWORD);
+        } catch (SQLException e) {
+            LOG.info("Failed to get doris query connection. jdbcUrl={}", jdbcUrl, e);
+            throw new DorisRuntimeException(e);
+        }
+    }
+
+    @Override
+    public String getInstanceHost() {
+        return dorisContainer.getHost();
+    }
+
+    @Override
+    public Integer getMappedPort(int originalPort) {
+        return dorisContainer.getMappedPort(originalPort);
+    }
+
+    @Override
+    public String getUsername() {
+        return USERNAME;
+    }
+
+    @Override
+    public String getPassword() {
+        return PASSWORD;
+    }
+
+    @Override
+    public String getFenodes() {
+        return dorisContainer.getHost() + ":8030";
+    }
+
+    @Override
+    public String getBenodes() {
+        return dorisContainer.getHost() + ":8040";
+    }
+
+    public void close() {
+        LOG.info("Doris container is about to be close.");
+        dorisContainer.close();
+        LOG.info("Doris container closed successfully.");
+    }
+
+    private void initializeJDBCDriver() throws MalformedURLException {
+        URLClassLoader urlClassLoader =
+                new URLClassLoader(
+                        new URL[] {new URL(DRIVER_JAR)}, DorisContainer.class.getClassLoader());
+        LOG.info("Try to connect to Doris.");
+        Thread.currentThread().setContextClassLoader(urlClassLoader);
+    }
+
+    private void initializeJdbcConnection() throws Exception {
+        initializeJDBCDriver();
+        try (Connection connection = getQueryConnection();
+                Statement statement = connection.createStatement()) {
+            ResultSet resultSet;
+            do {
+                LOG.info("Waiting for the Backend to start successfully.");
+                resultSet = statement.executeQuery("show backends");
+            } while (!isBeReady(resultSet, Duration.ofSeconds(1L)));
+        }
+        LOG.info("Connected to Doris successfully.");
+    }
+
+    private boolean isBeReady(ResultSet rs, Duration duration) throws SQLException {
+        LockSupport.parkNanos(duration.toNanos());
+        if (rs.next()) {
+            String isAlive = rs.getString("Alive").trim();
+            String totalCap = rs.getString("TotalCapacity").trim();
+            return Boolean.toString(true).equalsIgnoreCase(isAlive)
+                    && !"0.000".equalsIgnoreCase(totalCap);
+        }
+        return false;
+    }
+
+    private void printClusterStatus() throws Exception {
+        LOG.info("Current machine IP: {}", dorisContainer.getHost());
+        echo("sh", "-c", "cat /proc/cpuinfo | grep 'cpu cores' | uniq");
+        echo("sh", "-c", "free -h");
+        try (Connection connection =
+                        DriverManager.getConnection(
+                                String.format(JDBC_URL, dorisContainer.getHost()),
+                                USERNAME,
+                                PASSWORD);
+                Statement statement = connection.createStatement()) {
+            ResultSet showFrontends = statement.executeQuery("show frontends");
+            LOG.info("Frontends status: {}", convertList(showFrontends));
+            ResultSet showBackends = statement.executeQuery("show backends");
+            LOG.info("Backends status: {}", convertList(showBackends));
+        }
+    }
+
+    private void echo(String... cmd) {
+        try {
+            Process p = Runtime.getRuntime().exec(cmd);
+            InputStream is = p.getInputStream();
+            BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+            String line;
+            while ((line = reader.readLine()) != null) {
+                System.out.println(line);
+            }
+            p.waitFor();
+            is.close();
+            reader.close();
+            p.destroy();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    private List<Map> convertList(ResultSet rs) throws SQLException {
+        List<Map> list = new ArrayList<>();
+        ResultSetMetaData metaData = rs.getMetaData();
+        int columnCount = metaData.getColumnCount();
+        while (rs.next()) {
+            Map<String, Object> rowData = new HashMap<>();
+            for (int i = 1; i <= columnCount; i++) {
+                rowData.put(metaData.getColumnName(i), rs.getObject(i));
+            }
+            list.add(rowData);
+        }
+        return list;
+    }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/MySQLContainer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/MySQLContainer.java
new file mode 100644
index 0000000..21b30e8
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/MySQLContainer.java
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.container.instance;
+
+import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.lifecycle.Startables;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.stream.Stream;
+
+public class MySQLContainer implements ContainerService {
+    private static final Logger LOG = LoggerFactory.getLogger(MySQLContainer.class);
+    private static final String MYSQL_VERSION = "mysql:8.0";
+    private static final String USERNAME = "root";
+    private static final String PASSWORD = "123456";
+    private final org.testcontainers.containers.MySQLContainer mysqlcontainer;
+
+    public MySQLContainer() {
+        mysqlcontainer = createContainer();
+    }
+
+    private org.testcontainers.containers.MySQLContainer createContainer() {
+        LOG.info("Will create mysql container.");
+        return new org.testcontainers.containers.MySQLContainer(MYSQL_VERSION)
+                .withUsername(USERNAME)
+                .withPassword(PASSWORD);
+    }
+
+    @Override
+    public void startContainer() {
+        LOG.info("Starting MySQL container.");
+        Startables.deepStart(Stream.of(mysqlcontainer)).join();
+        LOG.info("MySQL Container was started.");
+    }
+
+    @Override
+    public boolean isRunning() {
+        return mysqlcontainer.isRunning();
+    }
+
+    @Override
+    public String getInstanceHost() {
+        return mysqlcontainer.getHost();
+    }
+
+    @Override
+    public Integer getMappedPort(int originalPort) {
+        return mysqlcontainer.getMappedPort(originalPort);
+    }
+
+    @Override
+    public String getUsername() {
+        return USERNAME;
+    }
+
+    @Override
+    public String getPassword() {
+        return PASSWORD;
+    }
+
+    @Override
+    public Connection getQueryConnection() {
+        LOG.info("Try to get query connection from mysql.");
+        try {
+            return DriverManager.getConnection(mysqlcontainer.getJdbcUrl(), USERNAME, PASSWORD);
+        } catch (SQLException e) {
+            LOG.warn(
+                    "Failed to get mysql container query connection. jdbcUrl={}, user={}",
+                    mysqlcontainer.getJdbcUrl(),
+                    USERNAME,
+                    e);
+            throw new DorisRuntimeException(e);
+        }
+    }
+
+    @Override
+    public void close() {
+        LOG.info("Stopping MySQL container.");
+        mysqlcontainer.stop();
+        LOG.info("MySQL Container was stopped.");
+    }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
index de0ef04..50bcf6b 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
@@ -26,18 +26,19 @@
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.doris.flink.DorisTestBase;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.container.AbstractITCaseService;
+import org.apache.doris.flink.container.ContainerUtils;
+import org.apache.doris.flink.sink.DorisSink.Builder;
 import org.apache.doris.flink.sink.batch.DorisBatchSink;
 import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
 import org.apache.doris.flink.utils.MockSource;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.Statement;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
@@ -51,7 +52,8 @@
 import static org.apache.flink.api.common.JobStatus.RUNNING;
 
 /** DorisSink ITCase with csv and arrow format. */
-public class DorisSinkITCase extends DorisTestBase {
+public class DorisSinkITCase extends AbstractITCaseService {
+    private static final Logger LOG = LoggerFactory.getLogger(DorisSinkITCase.class);
     static final String DATABASE = "test_sink";
     static final String TABLE_CSV = "tbl_csv";
     static final String TABLE_JSON = "tbl_json";
@@ -70,12 +72,20 @@
         properties.setProperty("column_separator", ",");
         properties.setProperty("line_delimiter", "\n");
         properties.setProperty("format", "csv");
-        submitJob(TABLE_CSV, properties, new String[] {"doris,1"});
+        DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
+        executionBuilder.setLabelPrefix(UUID.randomUUID().toString()).setStreamLoadProp(properties);
+        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
+        dorisBuilder
+                .setFenodes(getFenodes())
+                .setTableIdentifier(DATABASE + "." + TABLE_CSV)
+                .setUsername(getDorisUsername())
+                .setPassword(getDorisPassword());
+        submitJob(dorisBuilder.build(), executionBuilder.build(), new String[] {"doris,1"});
 
         Thread.sleep(10000);
         List<String> expected = Arrays.asList("doris,1");
         String query = String.format("select name,age from %s.%s order by 1", DATABASE, TABLE_CSV);
-        checkResult(expected, query, 2);
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
     }
 
     @Test
@@ -93,9 +103,18 @@
         row2.put("name", "doris2");
         row2.put("age", 2);
 
+        DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
+        executionBuilder.setLabelPrefix(UUID.randomUUID().toString()).setStreamLoadProp(properties);
+        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
+        dorisBuilder
+                .setFenodes(getFenodes())
+                .setTableIdentifier(DATABASE + "." + TABLE_JSON)
+                .setUsername(getDorisUsername())
+                .setPassword(getDorisPassword());
+
         submitJob(
-                TABLE_JSON,
-                properties,
+                dorisBuilder.build(),
+                executionBuilder.build(),
                 new String[] {
                     new ObjectMapper().writeValueAsString(row1),
                     new ObjectMapper().writeValueAsString(row2)
@@ -104,28 +123,21 @@
         Thread.sleep(10000);
         List<String> expected = Arrays.asList("doris1,1", "doris2,2");
         String query = String.format("select name,age from %s.%s order by 1", DATABASE, TABLE_JSON);
-        checkResult(expected, query, 2);
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
     }
 
-    public void submitJob(String table, Properties properties, String[] records) throws Exception {
+    private void submitJob(
+            DorisOptions dorisOptions, DorisExecutionOptions executionOptions, String[] records)
+            throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
-        DorisSink.Builder<String> builder = DorisSink.builder();
+        Builder<String> builder = DorisSink.builder();
         final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
 
-        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
-        dorisBuilder
-                .setFenodes(getFenodes())
-                .setTableIdentifier(DATABASE + "." + table)
-                .setUsername(USERNAME)
-                .setPassword(PASSWORD);
-        DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
-        executionBuilder.setLabelPrefix(UUID.randomUUID().toString()).setStreamLoadProp(properties);
-
         builder.setDorisReadOptions(readOptionBuilder.build())
-                .setDorisExecutionOptions(executionBuilder.build())
+                .setDorisExecutionOptions(executionOptions)
                 .setSerializer(new SimpleStringSerializer())
-                .setDorisOptions(dorisBuilder.build());
+                .setDorisOptions(dorisOptions);
 
         env.fromElements(records).sinkTo(builder.build());
         env.execute();
@@ -168,8 +180,8 @@
                         getFenodes(),
                         getBenodes(),
                         DATABASE + "." + TABLE_JSON_TBL,
-                        USERNAME,
-                        PASSWORD);
+                        getDorisUsername(),
+                        getDorisPassword());
         tEnv.executeSql(sinkDDL);
         tEnv.executeSql("INSERT INTO doris_sink SELECT 'doris',1 union all SELECT 'flink',2");
 
@@ -177,7 +189,7 @@
         List<String> expected = Arrays.asList("doris,1", "flink,2");
         String query =
                 String.format("select name,age from %s.%s order by 1", DATABASE, TABLE_JSON_TBL);
-        checkResult(expected, query, 2);
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
     }
 
     @Test
@@ -190,7 +202,7 @@
 
         String sinkDDL =
                 String.format(
-                        "CREATE TABLE doris_sink ("
+                        "CREATE TABLE doris_sink_batch ("
                                 + " name STRING,"
                                 + " age INT"
                                 + ") WITH ("
@@ -214,17 +226,17 @@
                                 + ")",
                         getFenodes(),
                         DATABASE + "." + TABLE_CSV_BATCH_TBL,
-                        USERNAME,
-                        PASSWORD);
+                        getDorisUsername(),
+                        getDorisPassword());
         tEnv.executeSql(sinkDDL);
-        tEnv.executeSql("INSERT INTO doris_sink SELECT 'doris',1 union all SELECT 'flink',2");
+        tEnv.executeSql("INSERT INTO doris_sink_batch SELECT 'doris',1 union all SELECT 'flink',2");
 
         Thread.sleep(20000);
         List<String> expected = Arrays.asList("doris,1", "flink,2");
         String query =
                 String.format(
                         "select name,age from %s.%s order by 1", DATABASE, TABLE_CSV_BATCH_TBL);
-        checkResult(expected, query, 2);
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
     }
 
     @Test
@@ -238,8 +250,8 @@
         dorisBuilder
                 .setFenodes(getFenodes())
                 .setTableIdentifier(DATABASE + "." + TABLE_CSV_BATCH_DS)
-                .setUsername(USERNAME)
-                .setPassword(PASSWORD);
+                .setUsername(getDorisUsername())
+                .setPassword(getDorisPassword());
         Properties properties = new Properties();
         properties.setProperty("column_separator", ",");
         properties.setProperty("line_delimiter", "\n");
@@ -264,7 +276,7 @@
         String query =
                 String.format(
                         "select name,age from %s.%s order by 1", DATABASE, TABLE_CSV_BATCH_DS);
-        checkResult(expected, query, 2);
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
     }
 
     @Test
@@ -302,8 +314,8 @@
                                 + ")",
                         getFenodes(),
                         DATABASE + "." + TABLE_GROUP_COMMIT,
-                        USERNAME,
-                        PASSWORD);
+                        getDorisUsername(),
+                        getDorisPassword());
         tEnv.executeSql(sinkDDL);
         tEnv.executeSql(
                 "INSERT INTO doris_group_commit_sink SELECT 'doris',1 union all  SELECT 'group_commit',2 union all  SELECT 'flink',3");
@@ -313,8 +325,7 @@
         String query =
                 String.format(
                         "select name,age from %s.%s order by 1", DATABASE, TABLE_GROUP_COMMIT);
-        //
-        checkResult(expected, query, 2);
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
     }
 
     @Test
@@ -345,8 +356,8 @@
                                 + ")",
                         getFenodes(),
                         DATABASE + "." + TABLE_GZ_FORMAT,
-                        USERNAME,
-                        PASSWORD);
+                        getDorisUsername(),
+                        getDorisPassword());
         tEnv.executeSql(sinkDDL);
         tEnv.executeSql(
                 "INSERT INTO doris_gz_format_sink SELECT 'doris',1 union all  SELECT 'flink',2");
@@ -355,12 +366,12 @@
         List<String> expected = Arrays.asList("doris,1", "flink,2");
         String query =
                 String.format("select name,age from %s.%s order by 1", DATABASE, TABLE_GZ_FORMAT);
-        //
-        checkResult(expected, query, 2);
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
     }
 
     @Test
     public void testJobManagerFailoverSink() throws Exception {
+        LOG.info("start to test JobManagerFailoverSink.");
         initializeFailoverTable(TABLE_CSV_JM);
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(2);
@@ -374,8 +385,8 @@
         dorisBuilder
                 .setFenodes(getFenodes())
                 .setTableIdentifier(DATABASE + "." + TABLE_CSV_JM)
-                .setUsername(USERNAME)
-                .setPassword(PASSWORD);
+                .setUsername(getDorisUsername())
+                .setPassword(getDorisPassword());
         DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
         Properties properties = new Properties();
         properties.setProperty("column_separator", ",");
@@ -404,20 +415,23 @@
         triggerFailover(
                 FailoverType.JM, jobID, miniClusterResource.getMiniCluster(), () -> sleepMs(100));
 
+        LOG.info("Waiting the JobManagerFailoverSink job to be finished. jobId={}", jobID);
         waitForJobStatus(
                 jobClient,
                 Collections.singletonList(FINISHED),
                 Deadline.fromNow(Duration.ofSeconds(120)));
 
+        LOG.info("Will check job manager failover sink result.");
         List<String> expected =
                 Arrays.asList("1,0", "1,1", "2,0", "2,1", "3,0", "3,1", "4,0", "4,1", "5,0", "5,1");
         String query =
                 String.format("select id,task_id from %s.%s order by 1,2", DATABASE, TABLE_CSV_JM);
-        checkResult(expected, query, 2);
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
     }
 
     @Test
     public void testTaskManagerFailoverSink() throws Exception {
+        LOG.info("start to test TaskManagerFailoverSink.");
         initializeFailoverTable(TABLE_CSV_TM);
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(2);
@@ -431,8 +445,8 @@
         dorisBuilder
                 .setFenodes(getFenodes())
                 .setTableIdentifier(DATABASE + "." + TABLE_CSV_TM)
-                .setUsername(USERNAME)
-                .setPassword(PASSWORD);
+                .setUsername(getDorisUsername())
+                .setPassword(getDorisPassword());
         DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
         Properties properties = new Properties();
         properties.setProperty("column_separator", ",");
@@ -458,16 +472,18 @@
         triggerFailover(
                 FailoverType.TM, jobID, miniClusterResource.getMiniCluster(), () -> sleepMs(100));
 
+        LOG.info("Waiting the TaskManagerFailoverSink job to be finished. jobId={}", jobID);
         waitForJobStatus(
                 jobClient,
                 Collections.singletonList(FINISHED),
                 Deadline.fromNow(Duration.ofSeconds(120)));
 
+        LOG.info("Will check task manager failover sink result.");
         List<String> expected =
                 Arrays.asList("1,0", "1,1", "2,0", "2,1", "3,0", "3,1", "4,0", "4,1", "5,0", "5,1");
         String query =
                 String.format("select id,task_id from %s.%s order by 1,2", DATABASE, TABLE_CSV_TM);
-        checkResult(expected, query, 2);
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
     }
 
     private void sleepMs(long millis) {
@@ -477,43 +493,37 @@
         }
     }
 
-    private void initializeTable(String table) throws Exception {
-        try (Connection connection =
-                        DriverManager.getConnection(
-                                String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
-                Statement statement = connection.createStatement()) {
-            statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE));
-            statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table));
-            statement.execute(
-                    String.format(
-                            "CREATE TABLE %s.%s ( \n"
-                                    + "`name` varchar(256),\n"
-                                    + "`age` int\n"
-                                    + ") DISTRIBUTED BY HASH(`name`) BUCKETS 1\n"
-                                    + "PROPERTIES (\n"
-                                    + "\"replication_num\" = \"1\"\n"
-                                    + ")\n",
-                            DATABASE, table));
-        }
+    private void initializeTable(String table) {
+        ContainerUtils.executeSQLStatement(
+                getDorisQueryConnection(),
+                LOG,
+                String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
+                String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table),
+                String.format(
+                        "CREATE TABLE %s.%s ( \n"
+                                + "`name` varchar(256),\n"
+                                + "`age` int\n"
+                                + ") DISTRIBUTED BY HASH(`name`) BUCKETS 1\n"
+                                + "PROPERTIES (\n"
+                                + "\"replication_num\" = \"1\"\n"
+                                + ")\n",
+                        DATABASE, table));
     }
 
-    private void initializeFailoverTable(String table) throws Exception {
-        try (Connection connection =
-                        DriverManager.getConnection(
-                                String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
-                Statement statement = connection.createStatement()) {
-            statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE));
-            statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table));
-            statement.execute(
-                    String.format(
-                            "CREATE TABLE %s.%s ( \n"
-                                    + "`id` int,\n"
-                                    + "`task_id` int\n"
-                                    + ") DISTRIBUTED BY HASH(`id`) BUCKETS 1\n"
-                                    + "PROPERTIES (\n"
-                                    + "\"replication_num\" = \"1\"\n"
-                                    + ")\n",
-                            DATABASE, table));
-        }
+    private void initializeFailoverTable(String table) {
+        ContainerUtils.executeSQLStatement(
+                getDorisQueryConnection(),
+                LOG,
+                String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
+                String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table),
+                String.format(
+                        "CREATE TABLE %s.%s ( \n"
+                                + "`id` int,\n"
+                                + "`task_id` int\n"
+                                + ") DISTRIBUTED BY HASH(`id`) BUCKETS 1\n"
+                                + "PROPERTIES (\n"
+                                + "\"replication_num\" = \"1\"\n"
+                                + ")\n",
+                        DATABASE, table));
     }
 }
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java
index 3dde08d..37ca3a2 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java
@@ -17,31 +17,30 @@
 
 package org.apache.doris.flink.sink.schema;
 
-import org.apache.doris.flink.DorisTestBase;
 import org.apache.doris.flink.catalog.doris.DataModel;
 import org.apache.doris.flink.catalog.doris.FieldSchema;
 import org.apache.doris.flink.catalog.doris.TableSchema;
 import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.container.AbstractITCaseService;
+import org.apache.doris.flink.container.ContainerUtils;
 import org.apache.doris.flink.exception.IllegalArgumentException;
 import org.apache.doris.flink.rest.models.Field;
 import org.apache.doris.flink.rest.models.Schema;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Random;
 
-public class SchemaManagerITCase extends DorisTestBase {
-
+public class SchemaManagerITCase extends AbstractITCaseService {
+    private static final Logger LOG = LoggerFactory.getLogger(SchemaManagerITCase.class);
     private static final String DATABASE = "test_sc_db";
     private DorisOptions options;
     private SchemaChangeManager schemaChangeManager;
@@ -52,34 +51,31 @@
                 DorisOptions.builder()
                         .setFenodes(getFenodes())
                         .setTableIdentifier(DATABASE + ".add_column")
-                        .setUsername(USERNAME)
-                        .setPassword(PASSWORD)
+                        .setUsername(getDorisUsername())
+                        .setPassword(getDorisPassword())
                         .build();
         schemaChangeManager = new SchemaChangeManager(options);
     }
 
-    private void initDorisSchemaChangeTable(String table) throws SQLException {
-        try (Connection connection =
-                        DriverManager.getConnection(
-                                String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
-                Statement statement = connection.createStatement()) {
-            statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE));
-            statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table));
-            statement.execute(
-                    String.format(
-                            "CREATE TABLE %s.%s ( \n"
-                                    + "`id` varchar(32),\n"
-                                    + "`age` int\n"
-                                    + ") DISTRIBUTED BY HASH(`id`) BUCKETS 1\n"
-                                    + "PROPERTIES (\n"
-                                    + "\"replication_num\" = \"1\"\n"
-                                    + ")\n",
-                            DATABASE, table));
-        }
+    private void initDorisSchemaChangeTable(String table) {
+        ContainerUtils.executeSQLStatement(
+                getDorisQueryConnection(),
+                LOG,
+                String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
+                String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table),
+                String.format(
+                        "CREATE TABLE %s.%s ( \n"
+                                + "`id` varchar(32),\n"
+                                + "`age` int\n"
+                                + ") DISTRIBUTED BY HASH(`id`) BUCKETS 1\n"
+                                + "PROPERTIES (\n"
+                                + "\"replication_num\" = \"1\"\n"
+                                + ")\n",
+                        DATABASE, table));
     }
 
     @Test
-    public void testAddColumn() throws SQLException, IOException, IllegalArgumentException {
+    public void testAddColumn() throws IOException, IllegalArgumentException {
         String addColumnTbls = "add_column";
         initDorisSchemaChangeTable(addColumnTbls);
         FieldSchema field = new FieldSchema("c1", "int", "");
@@ -93,7 +89,7 @@
 
     @Test
     public void testAddColumnWithChineseComment()
-            throws SQLException, IOException, IllegalArgumentException, InterruptedException {
+            throws IOException, IllegalArgumentException, InterruptedException {
         String addColumnTbls = "add_column";
         initDorisSchemaChangeTable(addColumnTbls);
 
@@ -149,7 +145,7 @@
     }
 
     @Test
-    public void testDropColumn() throws SQLException, IOException, IllegalArgumentException {
+    public void testDropColumn() throws IOException, IllegalArgumentException {
         String dropColumnTbls = "drop_column";
         initDorisSchemaChangeTable(dropColumnTbls);
         schemaChangeManager.dropColumn(DATABASE, dropColumnTbls, "age");
@@ -161,7 +157,7 @@
     }
 
     @Test
-    public void testRenameColumn() throws SQLException, IOException, IllegalArgumentException {
+    public void testRenameColumn() throws IOException, IllegalArgumentException {
         String renameColumnTbls = "rename_column";
         initDorisSchemaChangeTable(renameColumnTbls);
         schemaChangeManager.renameColumn(DATABASE, renameColumnTbls, "age", "age1");
@@ -173,8 +169,7 @@
     }
 
     @Test
-    public void testModifyColumnComment()
-            throws SQLException, IOException, IllegalArgumentException {
+    public void testModifyColumnComment() throws IOException, IllegalArgumentException {
         String modifyColumnCommentTbls = "modify_column_comment";
         initDorisSchemaChangeTable(modifyColumnCommentTbls);
         String columnName = "age";
@@ -188,7 +183,7 @@
 
     @Test
     public void testOnlyModifyColumnType()
-            throws SQLException, IOException, IllegalArgumentException, InterruptedException {
+            throws IOException, IllegalArgumentException, InterruptedException {
         String modifyColumnTbls = "modify_column_type";
         String columnName = "age";
         String newColumnType = "bigint";
@@ -203,7 +198,7 @@
 
     @Test
     public void testModifyColumnTypeAndComment()
-            throws SQLException, IOException, IllegalArgumentException, InterruptedException {
+            throws IOException, IllegalArgumentException, InterruptedException {
         String modifyColumnTbls = "modify_column_type_and_comment";
         initDorisSchemaChangeTable(modifyColumnTbls);
         String columnName = "age";
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
index a13e96f..783e6bd 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
@@ -25,32 +25,36 @@
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 
-import org.apache.doris.flink.DorisTestBase;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisStreamOptions;
+import org.apache.doris.flink.container.AbstractITCaseService;
+import org.apache.doris.flink.container.ContainerUtils;
 import org.apache.doris.flink.datastream.DorisSourceFunction;
 import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
+import org.apache.doris.flink.exception.DorisRuntimeException;
 import org.junit.Assert;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 
 /** DorisSource ITCase. */
-public class DorisSourceITCase extends DorisTestBase {
-    static final String DATABASE = "test_source";
-    static final String TABLE_READ = "tbl_read";
-    static final String TABLE_READ_OLD_API = "tbl_read_old_api";
-    static final String TABLE_READ_TBL = "tbl_read_tbl";
-    static final String TABLE_READ_TBL_OLD_API = "tbl_read_tbl_old_api";
-    static final String TABLE_READ_TBL_ALL_OPTIONS = "tbl_read_tbl_all_options";
-    static final String TABLE_READ_TBL_PUSH_DOWN = "tbl_read_tbl_push_down";
-    static final String TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL =
+public class DorisSourceITCase extends AbstractITCaseService {
+    private static final Logger LOG = LoggerFactory.getLogger(DorisSourceITCase.class);
+    private static final String DATABASE = "test_source";
+    private static final String TABLE_READ = "tbl_read";
+    private static final String TABLE_READ_OLD_API = "tbl_read_old_api";
+    private static final String TABLE_READ_TBL = "tbl_read_tbl";
+    private static final String TABLE_READ_TBL_OLD_API = "tbl_read_tbl_old_api";
+    private static final String TABLE_READ_TBL_ALL_OPTIONS = "tbl_read_tbl_all_options";
+    private static final String TABLE_READ_TBL_PUSH_DOWN = "tbl_read_tbl_push_down";
+    private static final String TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL =
             "tbl_read_tbl_push_down_with_union_all";
 
     @Test
@@ -63,8 +67,8 @@
         dorisBuilder
                 .setFenodes(getFenodes())
                 .setTableIdentifier(DATABASE + "." + TABLE_READ)
-                .setUsername(USERNAME)
-                .setPassword(PASSWORD);
+                .setUsername(getDorisUsername())
+                .setPassword(getDorisPassword());
 
         DorisSource<List<?>> source =
                 DorisSource.<List<?>>builder()
@@ -80,7 +84,7 @@
             }
         }
         List<String> expected = Arrays.asList("[doris, 18]", "[flink, 10]", "[apache, 12]");
-        Assert.assertArrayEquals(actual.toArray(), expected.toArray());
+        checkResult("testSource", expected.toArray(), actual.toArray());
     }
 
     @Test
@@ -89,8 +93,8 @@
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         Properties properties = new Properties();
         properties.put("fenodes", getFenodes());
-        properties.put("username", USERNAME);
-        properties.put("password", PASSWORD);
+        properties.put("username", getDorisUsername());
+        properties.put("password", getDorisPassword());
         properties.put("table.identifier", DATABASE + "." + TABLE_READ_OLD_API);
         DorisStreamOptions options = new DorisStreamOptions(properties);
 
@@ -105,7 +109,7 @@
             }
         }
         List<String> expected = Arrays.asList("[doris, 18]", "[flink, 10]", "[apache, 12]");
-        Assert.assertArrayEquals(actual.toArray(), expected.toArray());
+        checkResult("testOldSourceApi", expected.toArray(), actual.toArray());
     }
 
     @Test
@@ -128,7 +132,10 @@
                                 + " 'username' = '%s',"
                                 + " 'password' = '%s'"
                                 + ")",
-                        getFenodes(), DATABASE + "." + TABLE_READ_TBL, USERNAME, PASSWORD);
+                        getFenodes(),
+                        DATABASE + "." + TABLE_READ_TBL,
+                        getDorisUsername(),
+                        getDorisPassword());
         tEnv.executeSql(sourceDDL);
         TableResult tableResult = tEnv.executeSql("SELECT * FROM doris_source");
 
@@ -151,7 +158,7 @@
             }
         }
         String[] expectedFilter = new String[] {"+I[doris, 18]"};
-        Assert.assertArrayEquals(expectedFilter, actualFilter.toArray());
+        checkResult("testTableSource", expectedFilter, actualFilter.toArray());
     }
 
     @Test
@@ -163,7 +170,7 @@
 
         String sourceDDL =
                 String.format(
-                        "CREATE TABLE doris_source ("
+                        "CREATE TABLE doris_source_old_api ("
                                 + " name STRING,"
                                 + " age INT"
                                 + ") WITH ("
@@ -174,9 +181,12 @@
                                 + " 'username' = '%s',"
                                 + " 'password' = '%s'"
                                 + ")",
-                        getFenodes(), DATABASE + "." + TABLE_READ_TBL_OLD_API, USERNAME, PASSWORD);
+                        getFenodes(),
+                        DATABASE + "." + TABLE_READ_TBL_OLD_API,
+                        getDorisUsername(),
+                        getDorisPassword());
         tEnv.executeSql(sourceDDL);
-        TableResult tableResult = tEnv.executeSql("SELECT * FROM doris_source");
+        TableResult tableResult = tEnv.executeSql("SELECT * FROM doris_source_old_api");
 
         List<String> actual = new ArrayList<>();
         try (CloseableIterator<Row> iterator = tableResult.collect()) {
@@ -185,7 +195,7 @@
             }
         }
         String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]", "+I[apache, 12]"};
-        Assert.assertArrayEquals(expected, actual.toArray());
+        checkResult("testTableSourceOldApi", expected, actual.toArray());
     }
 
     @Test
@@ -197,7 +207,7 @@
 
         String sourceDDL =
                 String.format(
-                        "CREATE TABLE doris_source ("
+                        "CREATE TABLE doris_source_all_options ("
                                 + " name STRING,"
                                 + " age INT"
                                 + ") WITH ("
@@ -219,10 +229,10 @@
                                 + ")",
                         getFenodes(),
                         DATABASE + "." + TABLE_READ_TBL_ALL_OPTIONS,
-                        USERNAME,
-                        PASSWORD);
+                        getDorisUsername(),
+                        getDorisPassword());
         tEnv.executeSql(sourceDDL);
-        TableResult tableResult = tEnv.executeSql("SELECT * FROM doris_source");
+        TableResult tableResult = tEnv.executeSql("SELECT * FROM doris_source_all_options");
 
         List<String> actual = new ArrayList<>();
         try (CloseableIterator<Row> iterator = tableResult.collect()) {
@@ -231,7 +241,7 @@
             }
         }
         String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]", "+I[apache, 12]"};
-        Assert.assertArrayEquals(expected, actual.toArray());
+        checkResult("testTableSourceAllOptions", expected, actual.toArray());
     }
 
     @Test
@@ -243,7 +253,7 @@
 
         String sourceDDL =
                 String.format(
-                        "CREATE TABLE doris_source ("
+                        "CREATE TABLE doris_source_filter_and_projection_push_down ("
                                 + " name STRING,"
                                 + " age INT"
                                 + ") WITH ("
@@ -255,10 +265,12 @@
                                 + ")",
                         getFenodes(),
                         DATABASE + "." + TABLE_READ_TBL_PUSH_DOWN,
-                        USERNAME,
-                        PASSWORD);
+                        getDorisUsername(),
+                        getDorisPassword());
         tEnv.executeSql(sourceDDL);
-        TableResult tableResult = tEnv.executeSql("SELECT age FROM doris_source where age = '18'");
+        TableResult tableResult =
+                tEnv.executeSql(
+                        "SELECT age FROM doris_source_filter_and_projection_push_down where age = '18'");
 
         List<String> actual = new ArrayList<>();
         try (CloseableIterator<Row> iterator = tableResult.collect()) {
@@ -267,11 +279,12 @@
             }
         }
         String[] expected = new String[] {"+I[18]"};
-        Assert.assertArrayEquals(expected, actual.toArray());
+        checkResult("testTableSourceFilterAndProjectionPushDown", expected, actual.toArray());
     }
 
     @Test
-    public void testTableSourceFilterWithUnionAll() throws Exception {
+    public void testTableSourceFilterWithUnionAll() {
+        LOG.info("starting to execute testTableSourceFilterWithUnionAll case.");
         initializeTable(TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL);
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(1);
@@ -279,7 +292,7 @@
 
         String sourceDDL =
                 String.format(
-                        "CREATE TABLE doris_source ("
+                        "CREATE TABLE doris_source_filter_with_union_all ("
                                 + " name STRING,"
                                 + " age INT"
                                 + ") WITH ("
@@ -291,48 +304,56 @@
                                 + ")",
                         getFenodes(),
                         DATABASE + "." + TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL,
-                        USERNAME,
-                        PASSWORD);
+                        getDorisUsername(),
+                        getDorisPassword());
         tEnv.executeSql(sourceDDL);
-        TableResult tableResult =
-                tEnv.executeSql(
-                        "  SELECT * FROM doris_source where age = '18'"
-                                + " UNION ALL "
-                                + "SELECT * FROM doris_source where age = '10'  ");
+        String querySql =
+                "  SELECT * FROM doris_source_filter_with_union_all where age = '18'"
+                        + " UNION ALL "
+                        + "SELECT * FROM doris_source_filter_with_union_all where age = '10'";
+        TableResult tableResult = tEnv.executeSql(querySql);
 
         List<String> actual = new ArrayList<>();
         try (CloseableIterator<Row> iterator = tableResult.collect()) {
             while (iterator.hasNext()) {
                 actual.add(iterator.next().toString());
             }
+        } catch (Exception e) {
+            LOG.error("Failed to execute sql. sql={}", querySql, e);
+            throw new DorisRuntimeException(e);
         }
-        String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]"};
-        Assert.assertArrayEquals(expected, actual.toArray());
+        Set<String> expected = new HashSet<>(Arrays.asList("+I[flink, 10]", "+I[doris, 18]"));
+        for (String a : actual) {
+            Assert.assertTrue(expected.contains(a));
+        }
     }
 
-    private void initializeTable(String table) throws Exception {
-        try (Connection connection =
-                        DriverManager.getConnection(
-                                String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
-                Statement statement = connection.createStatement()) {
-            statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE));
-            statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table));
-            statement.execute(
-                    String.format(
-                            "CREATE TABLE %s.%s ( \n"
-                                    + "`name` varchar(256),\n"
-                                    + "`age` int\n"
-                                    + ") DISTRIBUTED BY HASH(`name`) BUCKETS 1\n"
-                                    + "PROPERTIES (\n"
-                                    + "\"replication_num\" = \"1\"\n"
-                                    + ")\n",
-                            DATABASE, table));
-            statement.execute(
-                    String.format("insert into %s.%s  values ('doris',18)", DATABASE, table));
-            statement.execute(
-                    String.format("insert into %s.%s  values ('flink',10)", DATABASE, table));
-            statement.execute(
-                    String.format("insert into %s.%s  values ('apache',12)", DATABASE, table));
-        }
+    private void checkResult(String testName, Object[] expected, Object[] actual) {
+        LOG.info(
+                "Checking DorisSourceITCase result. testName={}, actual={}, expected={}",
+                testName,
+                actual,
+                expected);
+        Assert.assertArrayEquals(expected, actual);
+    }
+
+    private void initializeTable(String table) {
+        ContainerUtils.executeSQLStatement(
+                getDorisQueryConnection(),
+                LOG,
+                String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
+                String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table),
+                String.format(
+                        "CREATE TABLE %s.%s ( \n"
+                                + "`name` varchar(256),\n"
+                                + "`age` int\n"
+                                + ") DISTRIBUTED BY HASH(`name`) BUCKETS 1\n"
+                                + "PROPERTIES (\n"
+                                + "\"replication_num\" = \"1\"\n"
+                                + ")\n",
+                        DATABASE, table),
+                String.format("insert into %s.%s  values ('doris',18)", DATABASE, table),
+                String.format("insert into %s.%s  values ('flink',10)", DATABASE, table),
+                String.format("insert into %s.%s  values ('apache',12)", DATABASE, table));
     }
 }
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunctionITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunctionITCase.java
index 0ad1781..7f9021f 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunctionITCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunctionITCase.java
@@ -26,15 +26,15 @@
 import org.apache.flink.util.Collector;
 
 import com.google.common.cache.Cache;
-import org.apache.doris.flink.DorisTestBase;
 import org.apache.doris.flink.cfg.DorisLookupOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.container.AbstractITCaseService;
+import org.apache.doris.flink.container.ContainerUtils;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -43,45 +43,43 @@
 
 import static org.junit.Assert.assertEquals;
 
-public class DorisRowDataJdbcLookupFunctionITCase extends DorisTestBase {
+public class DorisRowDataJdbcLookupFunctionITCase extends AbstractITCaseService {
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DorisRowDataJdbcLookupFunctionITCase.class);
 
     private static final String LOOKUP_TABLE = "test.t_lookup_table";
 
-    private static String[] fieldNames = new String[] {"id1", "id2", "c_string", "c_double"};
-    private static DataType[] fieldDataTypes =
+    private static final String[] fieldNames = new String[] {"id1", "id2", "c_string", "c_double"};
+    private static final DataType[] fieldDataTypes =
             new DataType[] {
                 DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.DOUBLE()
             };
-    private static String[] lookupKeys = new String[] {"id1", "id2"};
-    private static int[] keyIndexs = new int[] {0, 1};
+    private static final String[] lookupKeys = new String[] {"id1", "id2"};
+    private static final int[] keyIndexs = new int[] {0, 1};
 
     @Before
     public void setUp() throws Exception {
-        try (Connection connection =
-                        DriverManager.getConnection(
-                                String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
-                Statement statement = connection.createStatement()) {
-            statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", "test"));
-            statement.execute(String.format("DROP TABLE IF EXISTS %s", LOOKUP_TABLE));
-            statement.execute(
-                    String.format(
-                            "CREATE TABLE %s ( \n"
-                                    + "`id1` int,\n"
-                                    + "`id2` varchar(128),\n"
-                                    + "`c_string` string,\n"
-                                    + "`c_double` double\n"
-                                    + ") DISTRIBUTED BY HASH(`id1`) BUCKETS 1\n"
-                                    + "PROPERTIES (\n"
-                                    + "\"replication_num\" = \"1\"\n"
-                                    + ")\n",
-                            LOOKUP_TABLE));
-            statement.execute(
-                    String.format(
-                            "insert into %s  values (1,'A','zhangsanA',1.12),"
-                                    + "(1,'A','zhangsanA-1',11.12),"
-                                    + "(2,'B','zhangsanB',2.12),(4,'D','zhangsanD',4.12)",
-                            LOOKUP_TABLE));
-        }
+        ContainerUtils.executeSQLStatement(
+                getDorisQueryConnection(),
+                LOG,
+                String.format("CREATE DATABASE IF NOT EXISTS %s", "test"),
+                String.format("DROP TABLE IF EXISTS %s", LOOKUP_TABLE),
+                String.format(
+                        "CREATE TABLE %s ( \n"
+                                + "`id1` int,\n"
+                                + "`id2` varchar(128),\n"
+                                + "`c_string` string,\n"
+                                + "`c_double` double\n"
+                                + ") DISTRIBUTED BY HASH(`id1`) BUCKETS 1\n"
+                                + "PROPERTIES (\n"
+                                + "\"replication_num\" = \"1\"\n"
+                                + ")\n",
+                        LOOKUP_TABLE),
+                String.format(
+                        "insert into %s  values (1,'A','zhangsanA',1.12),"
+                                + "(1,'A','zhangsanA-1',11.12),"
+                                + "(2,'B','zhangsanB',2.12),(4,'D','zhangsanD',4.12)",
+                        LOOKUP_TABLE));
     }
 
     @Test
@@ -167,9 +165,9 @@
                 DorisOptions.builder()
                         .setFenodes(getFenodes())
                         .setTableIdentifier(LOOKUP_TABLE)
-                        .setJdbcUrl(getJdbcUrl())
-                        .setUsername(USERNAME)
-                        .setPassword(PASSWORD)
+                        .setJdbcUrl(getDorisQueryUrl())
+                        .setUsername(getDorisUsername())
+                        .setPassword(getDorisPassword())
                         .build();
 
         DorisRowDataJdbcLookupFunction lookupFunction =
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisDorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisDorisE2ECase.java
deleted file mode 100644
index 8281350..0000000
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisDorisE2ECase.java
+++ /dev/null
@@ -1,234 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.flink.tools.cdc;
-
-import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.CloseableIterator;
-
-import org.apache.doris.flink.DorisTestBase;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-/** DorisDorisE2ECase. */
-public class DorisDorisE2ECase extends DorisTestBase {
-    private static final String DATABASE_SOURCE = "test_e2e_source";
-    private static final String DATABASE_SINK = "test_e2e_sink";
-    private static final String TABLE = "test_tbl";
-
-    @Test
-    public void testDoris2Doris() throws Exception {
-        initializeDorisTable(TABLE);
-        printClusterStatus();
-        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(1);
-        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
-        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
-
-        String sourceDDL =
-                String.format(
-                        "CREATE TABLE doris_source ("
-                                + "id  int,\n"
-                                + "c1  boolean,\n"
-                                + "c2  tinyint,\n"
-                                + "c3  smallint,\n"
-                                + "c4  int, \n"
-                                + "c5  bigint, \n"
-                                + "c6  string, \n"
-                                + "c7  float, \n"
-                                + "c8  double, \n"
-                                + "c9  decimal(12,4), \n"
-                                + "c10  date, \n"
-                                + "c11  TIMESTAMP, \n"
-                                + "c12  char(1), \n"
-                                + "c13  varchar(256), \n"
-                                + "c14  Array<String>, \n"
-                                + "c15  Map<String, String>, \n"
-                                + "c16  ROW<name String, age int>, \n"
-                                + "c17  STRING \n"
-                                + ") WITH ("
-                                + " 'connector' = 'doris',"
-                                + " 'fenodes' = '%s',"
-                                + " 'table.identifier' = '%s',"
-                                + " 'sink.label-prefix' = '"
-                                + UUID.randomUUID()
-                                + "',"
-                                + " 'username' = '%s',"
-                                + " 'password' = '%s'"
-                                + ")",
-                        getFenodes(),
-                        DATABASE_SOURCE + "." + TABLE,
-                        USERNAME,
-                        PASSWORD);
-        tEnv.executeSql(sourceDDL);
-
-        String sinkDDL =
-                String.format(
-                        "CREATE TABLE doris_sink ("
-                                + "id  int,\n"
-                                + "c1  boolean,\n"
-                                + "c2  tinyint,\n"
-                                + "c3  smallint,\n"
-                                + "c4  int, \n"
-                                + "c5  bigint, \n"
-                                + "c6  string, \n"
-                                + "c7  float, \n"
-                                + "c8  double, \n"
-                                + "c9  decimal(12,4), \n"
-                                + "c10  date, \n"
-                                + "c11  TIMESTAMP, \n"
-                                + "c12  char(1), \n"
-                                + "c13  varchar(256), \n"
-                                + "c14  Array<String>, \n"
-                                + "c15  Map<String, String>, \n"
-                                + "c16  ROW<name String, age int>, \n"
-                                + "c17  STRING \n"
-                                + ") WITH ("
-                                + " 'connector' = 'doris',"
-                                + " 'fenodes' = '%s',"
-                                + " 'sink.label-prefix' = '"
-                                + UUID.randomUUID()
-                                + "',"
-                                + " 'table.identifier' = '%s',"
-                                + " 'username' = '%s',"
-                                + " 'password' = '%s'"
-                                + ")",
-                        getFenodes(),
-                        DATABASE_SINK + "." + TABLE,
-                        USERNAME,
-                        PASSWORD);
-        tEnv.executeSql(sinkDDL);
-
-        tEnv.executeSql("INSERT INTO doris_sink SELECT * FROM doris_source").await();
-
-        TableResult tableResult = tEnv.executeSql("SELECT * FROM doris_sink");
-        List<Object> actual = new ArrayList<>();
-        try (CloseableIterator<Row> iterator = tableResult.collect()) {
-            while (iterator.hasNext()) {
-                actual.add(iterator.next().toString());
-            }
-        }
-        System.out.println(actual);
-        String[] expected =
-                new String[] {
-                    "+I[1, true, 127, 32767, 2147483647, 9223372036854775807, 123456789012345678901234567890, 3.14, 2.7182818284, 12345.6789, 2023-05-22, 2023-05-22T12:34:56, A, Example text, [item1, item2, item3], {key1=value1, key2=value2}, +I[John Doe, 30], {\"key\":\"value\"}]",
-                    "+I[2, false, -128, -32768, -2147483648, -9223372036854775808, -123456789012345678901234567890, -3.14, -2.7182818284, -12345.6789, 2024-01-01, 2024-01-01T00:00, B, Another example, [item4, item5, item6], {key3=value3, key4=value4}, +I[Jane Doe, 25], {\"another_key\":\"another_value\"}]"
-                };
-        Assert.assertArrayEquals(expected, actual.toArray(new String[0]));
-    }
-
-    private void initializeDorisTable(String table) throws Exception {
-        try (Connection connection =
-                        DriverManager.getConnection(
-                                String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
-                Statement statement = connection.createStatement()) {
-            statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE_SOURCE));
-            statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE_SINK));
-            statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE_SOURCE, table));
-            statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE_SINK, table));
-            statement.execute(
-                    String.format(
-                            "CREATE TABLE %s.%s (\n"
-                                    + "            `id` int,\n"
-                                    + "            `c1` boolean,\n"
-                                    + "            `c2` tinyint,\n"
-                                    + "            `c3` smallint,\n"
-                                    + "            `c4` int,\n"
-                                    + "            `c5` bigint,\n"
-                                    + "            `c6` largeint,\n"
-                                    + "            `c7` float,\n"
-                                    + "            `c8` double,\n"
-                                    + "            `c9` decimal(12,4),\n"
-                                    + "            `c10` date,\n"
-                                    + "            `c11` datetime,\n"
-                                    + "            `c12` char(1),\n"
-                                    + "            `c13` varchar(256),\n"
-                                    + "            `c14` Array<String>,\n"
-                                    + "            `c15` Map<String, String>,\n"
-                                    + "            `c16` Struct<name: String, age: int>,\n"
-                                    + "            `c17` JSON\n"
-                                    + "        )\n"
-                                    + "DUPLICATE KEY(`id`)\n"
-                                    + "DISTRIBUTED BY HASH(`id`) BUCKETS 1\n"
-                                    + "PROPERTIES (\n"
-                                    + "\"replication_num\" = \"1\",\n"
-                                    + "\"light_schema_change\" = \"true\"\n"
-                                    + ");",
-                            DATABASE_SOURCE, table));
-            statement.execute(
-                    String.format(
-                            "CREATE TABLE %s.%s like %s.%s",
-                            DATABASE_SINK, table, DATABASE_SOURCE, table));
-            statement.execute(
-                    String.format(
-                            "INSERT INTO %s.%s \n"
-                                    + "VALUES \n"
-                                    + "(\n"
-                                    + "    1,  \n"
-                                    + "    TRUE, \n"
-                                    + "    127,  \n"
-                                    + "    32767, \n"
-                                    + "    2147483647, \n"
-                                    + "    9223372036854775807, \n"
-                                    + "    123456789012345678901234567890, \n"
-                                    + "    3.14,  \n"
-                                    + "    2.7182818284, \n"
-                                    + "    12345.6789, \n"
-                                    + "    '2023-05-22',  \n"
-                                    + "    '2023-05-22 12:34:56', \n"
-                                    + "    'A', \n"
-                                    + "    'Example text', \n"
-                                    + "    ['item1', 'item2', 'item3'], \n"
-                                    + "    {'key1': 'value1', 'key2': 'value2'}, \n"
-                                    + "    STRUCT('John Doe', 30),  \n"
-                                    + "    '{\"key\": \"value\"}'  \n"
-                                    + "),\n"
-                                    + "(\n"
-                                    + "    2,\n"
-                                    + "    FALSE,\n"
-                                    + "    -128,\n"
-                                    + "    -32768,\n"
-                                    + "    -2147483648,\n"
-                                    + "    -9223372036854775808,\n"
-                                    + "    -123456789012345678901234567890,\n"
-                                    + "    -3.14,\n"
-                                    + "    -2.7182818284,\n"
-                                    + "    -12345.6789,\n"
-                                    + "    '2024-01-01',\n"
-                                    + "    '2024-01-01 00:00:00',\n"
-                                    + "    'B',\n"
-                                    + "    'Another example',\n"
-                                    + "    ['item4', 'item5', 'item6'],\n"
-                                    + "    {'key3': 'value3', 'key4': 'value4'},\n"
-                                    + "    STRUCT('Jane Doe', 25),\n"
-                                    + "    '{\"another_key\": \"another_value\"}'\n"
-                                    + ")",
-                            DATABASE_SOURCE, table));
-        }
-    }
-}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
deleted file mode 100644
index aeb17c2..0000000
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
+++ /dev/null
@@ -1,779 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.flink.tools.cdc;
-
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.time.Deadline;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-import org.apache.doris.flink.DorisTestBase;
-import org.apache.doris.flink.sink.schema.SchemaChangeMode;
-import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.MySQLContainer;
-import org.testcontainers.lifecycle.Startables;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.stream.Stream;
-
-import static org.apache.flink.api.common.JobStatus.RUNNING;
-
-/**
- * MySQLDorisE2ECase 1. Automatically create tables 2. Schema change event synchronization
- * 3.Synchronization of addition, deletion and modification events 4. CDC multi-table writing.
- */
-public class MySQLDorisE2ECase extends DorisTestBase {
-    protected static final Logger LOG = LoggerFactory.getLogger(MySQLDorisE2ECase.class);
-    private static final String DATABASE = "test_e2e_mysql";
-    private static final String MYSQL_USER = "root";
-    private static final String MYSQL_PASSWD = "123456";
-    private static final String TABLE_1 = "tbl1";
-    private static final String TABLE_2 = "tbl2";
-    private static final String TABLE_3 = "tbl3";
-    private static final String TABLE_4 = "tbl4";
-    private static final String TABLE_5 = "tbl5";
-    private static final String TABLE_SQL_PARSE = "tbl_sql_parse";
-
-    private static final MySQLContainer MYSQL_CONTAINER =
-            new MySQLContainer("mysql:8.0")
-                    .withDatabaseName(DATABASE)
-                    .withUsername(MYSQL_USER)
-                    .withPassword(MYSQL_PASSWD);
-
-    @BeforeClass
-    public static void startMySQLContainers() {
-        MYSQL_CONTAINER.setCommand("--default-time-zone=Asia/Shanghai");
-        LOG.info("Starting MySQL containers...");
-        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
-        LOG.info("MySQL Containers are started.");
-    }
-
-    @AfterClass
-    public static void stopMySQLContainers() {
-        LOG.info("Stopping MySQL containers...");
-        MYSQL_CONTAINER.stop();
-        LOG.info("MySQL Containers are stopped.");
-    }
-
-    @Test
-    public void testMySQL2Doris() throws Exception {
-        printClusterStatus();
-        initializeMySQLTable();
-        initializeDorisTable();
-        JobClient jobClient = submitJob();
-        // wait 2 times checkpoint
-        Thread.sleep(20000);
-        List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3", "doris_5,5");
-        String sql =
-                "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s union all select * from %s.%s) res order by 1";
-        String query1 =
-                String.format(
-                        sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3, DATABASE,
-                        TABLE_5);
-        checkResult(expected, query1, 2);
-
-        // add incremental data
-        try (Connection connection =
-                        DriverManager.getConnection(
-                                MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
-                Statement statement = connection.createStatement()) {
-            statement.execute(
-                    String.format("insert into %s.%s  values ('doris_1_1',10)", DATABASE, TABLE_1));
-            statement.execute(
-                    String.format("insert into %s.%s  values ('doris_2_1',11)", DATABASE, TABLE_2));
-            statement.execute(
-                    String.format("insert into %s.%s  values ('doris_3_1',12)", DATABASE, TABLE_3));
-
-            statement.execute(
-                    String.format(
-                            "update %s.%s set age=18 where name='doris_1'", DATABASE, TABLE_1));
-            statement.execute(
-                    String.format("delete from %s.%s where name='doris_2'", DATABASE, TABLE_2));
-        }
-
-        Thread.sleep(20000);
-        List<String> expected2 =
-                Arrays.asList(
-                        "doris_1,18", "doris_1_1,10", "doris_2_1,11", "doris_3,3", "doris_3_1,12");
-        sql =
-                "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s ) res order by 1";
-        String query2 = String.format(sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3);
-        checkResult(expected2, query2, 2);
-
-        // mock schema change
-        try (Connection connection =
-                        DriverManager.getConnection(
-                                MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
-                Statement statement = connection.createStatement()) {
-            statement.execute(
-                    String.format(
-                            "alter table %s.%s add column c1 varchar(128)", DATABASE, TABLE_1));
-            statement.execute(
-                    String.format("alter table %s.%s drop column age", DATABASE, TABLE_1));
-            Thread.sleep(20000);
-            statement.execute(
-                    String.format(
-                            "insert into %s.%s  values ('doris_1_1_1','c1_val')",
-                            DATABASE, TABLE_1));
-        }
-        Thread.sleep(20000);
-        List<String> expected3 =
-                Arrays.asList("doris_1,null", "doris_1_1,null", "doris_1_1_1,c1_val");
-        sql = "select * from %s.%s order by 1";
-        String query3 = String.format(sql, DATABASE, TABLE_1);
-        checkResult(expected3, query3, 2);
-        jobClient.cancel().get();
-    }
-
-    @Test
-    public void testAutoAddTable() throws Exception {
-        printClusterStatus();
-        initializeMySQLTable();
-        initializeDorisTable();
-        JobClient jobClient = submitJob();
-        // wait 2 times checkpoint
-        Thread.sleep(20000);
-        List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3");
-        String sql =
-                "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s ) res order by 1";
-        String query1 = String.format(sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3);
-        checkResult(expected, query1, 2);
-
-        // auto create table4
-        addTableTable_4();
-        Thread.sleep(20000);
-        List<String> expected2 = Arrays.asList("doris_4_1,4", "doris_4_2,4");
-        sql = "select * from %s.%s order by 1";
-        String query2 = String.format(sql, DATABASE, TABLE_4);
-        checkResult(expected2, query2, 2);
-
-        // add incremental data
-        try (Connection connection =
-                        DriverManager.getConnection(
-                                MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
-                Statement statement = connection.createStatement()) {
-            statement.execute(
-                    String.format("insert into %s.%s  values ('doris_1_1',10)", DATABASE, TABLE_1));
-            statement.execute(
-                    String.format("insert into %s.%s  values ('doris_2_1',11)", DATABASE, TABLE_2));
-            statement.execute(
-                    String.format("insert into %s.%s  values ('doris_3_1',12)", DATABASE, TABLE_3));
-            statement.execute(
-                    String.format("insert into %s.%s  values ('doris_4_3',43)", DATABASE, TABLE_4));
-
-            statement.execute(
-                    String.format(
-                            "update %s.%s set age=18 where name='doris_1'", DATABASE, TABLE_1));
-            statement.execute(
-                    String.format("delete from %s.%s where name='doris_2'", DATABASE, TABLE_2));
-            statement.execute(
-                    String.format("delete from %s.%s where name='doris_4_2'", DATABASE, TABLE_4));
-            statement.execute(
-                    String.format(
-                            "update %s.%s set age=41 where name='doris_4_1'", DATABASE, TABLE_4));
-        }
-
-        Thread.sleep(20000);
-        List<String> expected3 =
-                Arrays.asList(
-                        "doris_1,18",
-                        "doris_1_1,10",
-                        "doris_2_1,11",
-                        "doris_3,3",
-                        "doris_3_1,12",
-                        "doris_4_1,41",
-                        "doris_4_3,43");
-        sql =
-                "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s union all select * from %s.%s ) res order by 1";
-        String query3 =
-                String.format(
-                        sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3, DATABASE,
-                        TABLE_4);
-        checkResult(expected3, query3, 2);
-
-        // mock schema change
-        try (Connection connection =
-                        DriverManager.getConnection(
-                                MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
-                Statement statement = connection.createStatement()) {
-            statement.execute(
-                    String.format(
-                            "alter table %s.%s add column c1 varchar(128)", DATABASE, TABLE_4));
-            statement.execute(
-                    String.format("alter table %s.%s drop column age", DATABASE, TABLE_4));
-            Thread.sleep(20000);
-            statement.execute(
-                    String.format(
-                            "insert into %s.%s  values ('doris_4_4','c1_val')", DATABASE, TABLE_4));
-        }
-        Thread.sleep(20000);
-        List<String> expected4 =
-                Arrays.asList("doris_4_1,null", "doris_4_3,null", "doris_4_4,c1_val");
-        sql = "select * from %s.%s order by 1";
-        String query4 = String.format(sql, DATABASE, TABLE_4);
-        checkResult(expected4, query4, 2);
-        jobClient.cancel().get();
-    }
-
-    @Test
-    public void testMySQL2DorisSQLParse() throws Exception {
-        printClusterStatus();
-        initializeMySQLTable();
-        initializeDorisTable();
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRestartStrategy(RestartStrategies.noRestart());
-        Map<String, String> flinkMap = new HashMap<>();
-        flinkMap.put("execution.checkpointing.interval", "10s");
-        flinkMap.put("pipeline.operator-chaining", "false");
-        flinkMap.put("parallelism.default", "1");
-
-        Configuration configuration = Configuration.fromMap(flinkMap);
-        env.configure(configuration);
-
-        String database = DATABASE;
-        Map<String, String> mysqlConfig = new HashMap<>();
-        mysqlConfig.put("database-name", DATABASE);
-        mysqlConfig.put("hostname", MYSQL_CONTAINER.getHost());
-        mysqlConfig.put("port", MYSQL_CONTAINER.getMappedPort(3306) + "");
-        mysqlConfig.put("username", MYSQL_USER);
-        mysqlConfig.put("password", MYSQL_PASSWD);
-        mysqlConfig.put("server-time-zone", "Asia/Shanghai");
-        Configuration config = Configuration.fromMap(mysqlConfig);
-
-        Map<String, String> sinkConfig = new HashMap<>();
-        sinkConfig.put("fenodes", getFenodes());
-        sinkConfig.put("username", USERNAME);
-        sinkConfig.put("password", PASSWORD);
-        sinkConfig.put("jdbc-url", String.format(DorisTestBase.URL, DORIS_CONTAINER.getHost()));
-        sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
-        sinkConfig.put("sink.check-interval", "5000");
-        Configuration sinkConf = Configuration.fromMap(sinkConfig);
-
-        Map<String, String> tableConfig = new HashMap<>();
-        tableConfig.put("replication_num", "1");
-
-        String includingTables = "tbl.*";
-        String excludingTables = "";
-        DatabaseSync databaseSync = new MysqlDatabaseSync();
-        databaseSync
-                .setEnv(env)
-                .setDatabase(database)
-                .setConfig(config)
-                .setIncludingTables(includingTables)
-                .setExcludingTables(excludingTables)
-                .setIgnoreDefaultValue(false)
-                .setSinkConfig(sinkConf)
-                .setTableConfig(tableConfig)
-                .setCreateTableOnly(false)
-                .setNewSchemaChange(true)
-                .setSchemaChangeMode(SchemaChangeMode.SQL_PARSER.getName())
-                // no single sink
-                .setSingleSink(true)
-                .create();
-        databaseSync.build();
-        JobClient jobClient = env.executeAsync();
-        waitForJobStatus(
-                jobClient,
-                Collections.singletonList(RUNNING),
-                Deadline.fromNow(Duration.ofSeconds(10)));
-
-        // wait 2 times checkpoint
-        Thread.sleep(20000);
-        List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3", "doris_5,5");
-        String sql =
-                "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s union all select * from %s.%s) res order by 1";
-        String query1 =
-                String.format(
-                        sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3, DATABASE,
-                        TABLE_5);
-        checkResult(expected, query1, 2);
-
-        // add incremental data
-        try (Connection connection =
-                        DriverManager.getConnection(
-                                MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
-                Statement statement = connection.createStatement()) {
-            statement.execute(
-                    String.format("insert into %s.%s  values ('doris_1_1',10)", DATABASE, TABLE_1));
-            statement.execute(
-                    String.format("insert into %s.%s  values ('doris_2_1',11)", DATABASE, TABLE_2));
-            statement.execute(
-                    String.format("insert into %s.%s  values ('doris_3_1',12)", DATABASE, TABLE_3));
-
-            statement.execute(
-                    String.format(
-                            "update %s.%s set age=18 where name='doris_1'", DATABASE, TABLE_1));
-            statement.execute(
-                    String.format("delete from %s.%s where name='doris_2'", DATABASE, TABLE_2));
-        }
-
-        Thread.sleep(20000);
-        List<String> expected2 =
-                Arrays.asList(
-                        "doris_1,18", "doris_1_1,10", "doris_2_1,11", "doris_3,3", "doris_3_1,12");
-        sql =
-                "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s ) res order by 1";
-        String query2 = String.format(sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3);
-        checkResult(expected2, query2, 2);
-
-        // mock schema change
-        try (Connection connection =
-                        DriverManager.getConnection(
-                                MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
-                Statement statement = connection.createStatement()) {
-            statement.execute(
-                    String.format(
-                            "alter table %s.%s add column c1 varchar(128)", DATABASE, TABLE_1));
-            statement.execute(
-                    String.format("alter table %s.%s drop column age", DATABASE, TABLE_1));
-            Thread.sleep(20000);
-            statement.execute(
-                    String.format(
-                            "insert into %s.%s  values ('doris_1_1_1','c1_val')",
-                            DATABASE, TABLE_1));
-        }
-        Thread.sleep(20000);
-        List<String> expected3 =
-                Arrays.asList("doris_1,null", "doris_1_1,null", "doris_1_1_1,c1_val");
-        sql = "select * from %s.%s order by 1";
-        String query3 = String.format(sql, DATABASE, TABLE_1);
-        checkResult(expected3, query3, 2);
-
-        // mock create table
-        try (Connection connection =
-                        DriverManager.getConnection(
-                                MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
-                Statement statement = connection.createStatement()) {
-            statement.execute(
-                    String.format(
-                            "CREATE TABLE %s.%s ( \n"
-                                    + "`name` varchar(256) primary key,\n"
-                                    + "`age` int\n"
-                                    + ")",
-                            DATABASE, TABLE_SQL_PARSE));
-            statement.execute(
-                    String.format(
-                            "insert into %s.%s  values ('doris_1',1)", DATABASE, TABLE_SQL_PARSE));
-            statement.execute(
-                    String.format(
-                            "insert into %s.%s  values ('doris_2',2)", DATABASE, TABLE_SQL_PARSE));
-            statement.execute(
-                    String.format(
-                            "insert into %s.%s  values ('doris_3',3)", DATABASE, TABLE_SQL_PARSE));
-        }
-        Thread.sleep(20000);
-        List<String> expected4 = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3");
-        sql = "select * from %s.%s order by 1";
-        String query4 = String.format(sql, DATABASE, TABLE_SQL_PARSE);
-        checkResult(expected4, query4, 2);
-
-        jobClient.cancel().get();
-    }
-
-    @Test
-    public void testMySQL2DorisByDefault() throws Exception {
-        printClusterStatus();
-        initializeMySQLTable();
-        initializeDorisTable();
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRestartStrategy(RestartStrategies.noRestart());
-        Map<String, String> flinkMap = new HashMap<>();
-        flinkMap.put("execution.checkpointing.interval", "10s");
-        flinkMap.put("pipeline.operator-chaining", "false");
-        flinkMap.put("parallelism.default", "1");
-
-        Configuration configuration = Configuration.fromMap(flinkMap);
-        env.configure(configuration);
-
-        String database = DATABASE;
-        Map<String, String> mysqlConfig = new HashMap<>();
-        mysqlConfig.put("database-name", DATABASE);
-        mysqlConfig.put("hostname", MYSQL_CONTAINER.getHost());
-        mysqlConfig.put("port", MYSQL_CONTAINER.getMappedPort(3306) + "");
-        mysqlConfig.put("username", MYSQL_USER);
-        mysqlConfig.put("password", MYSQL_PASSWD);
-        mysqlConfig.put("server-time-zone", "Asia/Shanghai");
-        Configuration config = Configuration.fromMap(mysqlConfig);
-
-        Map<String, String> sinkConfig = new HashMap<>();
-        sinkConfig.put("fenodes", getFenodes());
-        sinkConfig.put("username", USERNAME);
-        sinkConfig.put("password", PASSWORD);
-        sinkConfig.put("jdbc-url", String.format(DorisTestBase.URL, DORIS_CONTAINER.getHost()));
-        sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
-        sinkConfig.put("sink.check-interval", "5000");
-        Configuration sinkConf = Configuration.fromMap(sinkConfig);
-
-        Map<String, String> tableConfig = new HashMap<>();
-        tableConfig.put("replication_num", "1");
-
-        String includingTables = "tbl1|tbl2|tbl3|tbl5";
-        String excludingTables = "";
-        DatabaseSync databaseSync = new MysqlDatabaseSync();
-        databaseSync
-                .setEnv(env)
-                .setDatabase(database)
-                .setConfig(config)
-                .setIncludingTables(includingTables)
-                .setExcludingTables(excludingTables)
-                .setIgnoreDefaultValue(false)
-                .setSinkConfig(sinkConf)
-                .setTableConfig(tableConfig)
-                .setCreateTableOnly(false)
-                .setNewSchemaChange(true)
-                // no single sink
-                .setSingleSink(false)
-                .create();
-        databaseSync.build();
-        JobClient jobClient = env.executeAsync();
-        waitForJobStatus(
-                jobClient,
-                Collections.singletonList(RUNNING),
-                Deadline.fromNow(Duration.ofSeconds(10)));
-
-        // wait 2 times checkpoint
-        Thread.sleep(20000);
-        List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3", "doris_5,5");
-        String sql =
-                "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s union all select * from %s.%s) res order by 1";
-        String query1 =
-                String.format(
-                        sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3, DATABASE,
-                        TABLE_5);
-        checkResult(expected, query1, 2);
-
-        // add incremental data
-        try (Connection connection =
-                        DriverManager.getConnection(
-                                MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
-                Statement statement = connection.createStatement()) {
-            statement.execute(
-                    String.format("insert into %s.%s  values ('doris_1_1',10)", DATABASE, TABLE_1));
-            statement.execute(
-                    String.format("insert into %s.%s  values ('doris_2_1',11)", DATABASE, TABLE_2));
-            statement.execute(
-                    String.format("insert into %s.%s  values ('doris_3_1',12)", DATABASE, TABLE_3));
-
-            statement.execute(
-                    String.format(
-                            "update %s.%s set age=18 where name='doris_1'", DATABASE, TABLE_1));
-            statement.execute(
-                    String.format("delete from %s.%s where name='doris_2'", DATABASE, TABLE_2));
-        }
-
-        Thread.sleep(20000);
-        List<String> expected2 =
-                Arrays.asList(
-                        "doris_1,18", "doris_1_1,10", "doris_2_1,11", "doris_3,3", "doris_3_1,12");
-        sql =
-                "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s ) res order by 1";
-        String query2 = String.format(sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3);
-        checkResult(expected2, query2, 2);
-        jobClient.cancel().get();
-    }
-
-    @Test
-    public void testMySQL2DorisEnableDelete() throws Exception {
-        printClusterStatus();
-        initializeMySQLTable();
-        initializeDorisTable();
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRestartStrategy(RestartStrategies.noRestart());
-        Map<String, String> flinkMap = new HashMap<>();
-        flinkMap.put("execution.checkpointing.interval", "10s");
-        flinkMap.put("pipeline.operator-chaining", "false");
-        flinkMap.put("parallelism.default", "1");
-
-        Configuration configuration = Configuration.fromMap(flinkMap);
-        env.configure(configuration);
-
-        String database = DATABASE;
-        Map<String, String> mysqlConfig = new HashMap<>();
-        mysqlConfig.put("database-name", DATABASE);
-        mysqlConfig.put("hostname", MYSQL_CONTAINER.getHost());
-        mysqlConfig.put("port", MYSQL_CONTAINER.getMappedPort(3306) + "");
-        mysqlConfig.put("username", MYSQL_USER);
-        mysqlConfig.put("password", MYSQL_PASSWD);
-        mysqlConfig.put("server-time-zone", "Asia/Shanghai");
-        Configuration config = Configuration.fromMap(mysqlConfig);
-
-        Map<String, String> sinkConfig = new HashMap<>();
-        sinkConfig.put("fenodes", getFenodes());
-        sinkConfig.put("username", USERNAME);
-        sinkConfig.put("password", PASSWORD);
-        sinkConfig.put("jdbc-url", String.format(DorisTestBase.URL, DORIS_CONTAINER.getHost()));
-        sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
-        sinkConfig.put("sink.check-interval", "5000");
-        sinkConfig.put("sink.enable-delete", "false");
-        Configuration sinkConf = Configuration.fromMap(sinkConfig);
-
-        Map<String, String> tableConfig = new HashMap<>();
-        tableConfig.put("replication_num", "1");
-
-        String includingTables = "tbl1|tbl2|tbl3|tbl5";
-        String excludingTables = "";
-        DatabaseSync databaseSync = new MysqlDatabaseSync();
-        databaseSync
-                .setEnv(env)
-                .setDatabase(database)
-                .setConfig(config)
-                .setIncludingTables(includingTables)
-                .setExcludingTables(excludingTables)
-                .setIgnoreDefaultValue(false)
-                .setSinkConfig(sinkConf)
-                .setTableConfig(tableConfig)
-                .setCreateTableOnly(false)
-                .setNewSchemaChange(true)
-                // no single sink
-                .setSingleSink(false)
-                .create();
-        databaseSync.build();
-        JobClient jobClient = env.executeAsync();
-        waitForJobStatus(
-                jobClient,
-                Collections.singletonList(RUNNING),
-                Deadline.fromNow(Duration.ofSeconds(10)));
-
-        // wait 2 times checkpoint
-        Thread.sleep(20000);
-        List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3", "doris_5,5");
-        String sql =
-                "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s union all select * from %s.%s) res order by 1";
-        String query1 =
-                String.format(
-                        sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3, DATABASE,
-                        TABLE_5);
-        checkResult(expected, query1, 2);
-
-        // add incremental data
-        try (Connection connection =
-                        DriverManager.getConnection(
-                                MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
-                Statement statement = connection.createStatement()) {
-            statement.execute(
-                    String.format("insert into %s.%s  values ('doris_1_1',10)", DATABASE, TABLE_1));
-            statement.execute(
-                    String.format("insert into %s.%s  values ('doris_2_1',11)", DATABASE, TABLE_2));
-            statement.execute(
-                    String.format("insert into %s.%s  values ('doris_3_1',12)", DATABASE, TABLE_3));
-
-            statement.execute(
-                    String.format(
-                            "update %s.%s set age=18 where name='doris_1'", DATABASE, TABLE_1));
-            statement.execute(
-                    String.format("delete from %s.%s where name='doris_2'", DATABASE, TABLE_2));
-            statement.execute(
-                    String.format("delete from %s.%s where name='doris_3'", DATABASE, TABLE_3));
-            statement.execute(
-                    String.format("delete from %s.%s where name='doris_5'", DATABASE, TABLE_5));
-        }
-
-        Thread.sleep(20000);
-        List<String> expected2 =
-                Arrays.asList(
-                        "doris_1,18",
-                        "doris_1_1,10",
-                        "doris_2,2",
-                        "doris_2_1,11",
-                        "doris_3,3",
-                        "doris_3_1,12",
-                        "doris_5,5");
-        sql =
-                "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s union all select * from %s.%s) res order by 1";
-        String query2 =
-                String.format(
-                        sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3, DATABASE,
-                        TABLE_5);
-        checkResult(expected2, query2, 2);
-        jobClient.cancel().get();
-    }
-
-    private void initializeDorisTable() throws Exception {
-        try (Connection connection =
-                        DriverManager.getConnection(
-                                String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
-                Statement statement = connection.createStatement()) {
-            statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE));
-            statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_1));
-            statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_2));
-            statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_3));
-            statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_4));
-            statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_5));
-            // create a table in Doris
-            statement.execute(
-                    String.format(
-                            "CREATE TABLE %s.%s ( \n"
-                                    + "`name` varchar(256),\n"
-                                    + "`age` int\n"
-                                    + ")\n"
-                                    + "UNIQUE KEY(`name`)\n"
-                                    + "DISTRIBUTED BY HASH(`name`) BUCKETS 1\n"
-                                    + "PROPERTIES ( \n"
-                                    + "\"replication_num\" = \"1\" \n"
-                                    + ");\n",
-                            DATABASE, TABLE_5));
-        }
-    }
-
-    public JobClient submitJob() throws Exception {
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRestartStrategy(RestartStrategies.noRestart());
-        Map<String, String> flinkMap = new HashMap<>();
-        flinkMap.put("execution.checkpointing.interval", "10s");
-        flinkMap.put("pipeline.operator-chaining", "false");
-        flinkMap.put("parallelism.default", "1");
-
-        Configuration configuration = Configuration.fromMap(flinkMap);
-        env.configure(configuration);
-
-        String database = DATABASE;
-        Map<String, String> mysqlConfig = new HashMap<>();
-        mysqlConfig.put("database-name", DATABASE);
-        mysqlConfig.put("hostname", MYSQL_CONTAINER.getHost());
-        mysqlConfig.put("port", MYSQL_CONTAINER.getMappedPort(3306) + "");
-        mysqlConfig.put("username", MYSQL_USER);
-        mysqlConfig.put("password", MYSQL_PASSWD);
-        mysqlConfig.put("server-time-zone", "Asia/Shanghai");
-        Configuration config = Configuration.fromMap(mysqlConfig);
-
-        Map<String, String> sinkConfig = new HashMap<>();
-        sinkConfig.put("fenodes", getFenodes());
-        sinkConfig.put("username", USERNAME);
-        sinkConfig.put("password", PASSWORD);
-        sinkConfig.put("jdbc-url", String.format(DorisTestBase.URL, DORIS_CONTAINER.getHost()));
-        sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
-        Configuration sinkConf = Configuration.fromMap(sinkConfig);
-
-        Map<String, String> tableConfig = new HashMap<>();
-        tableConfig.put("replication_num", "1");
-
-        String includingTables = "tbl.*";
-        String excludingTables = "";
-        DatabaseSync databaseSync = new MysqlDatabaseSync();
-        databaseSync
-                .setEnv(env)
-                .setDatabase(database)
-                .setConfig(config)
-                .setIncludingTables(includingTables)
-                .setExcludingTables(excludingTables)
-                .setIgnoreDefaultValue(false)
-                .setSinkConfig(sinkConf)
-                .setTableConfig(tableConfig)
-                .setCreateTableOnly(false)
-                .setNewSchemaChange(true)
-                .setSingleSink(true)
-                .setIgnoreDefaultValue(true)
-                .create();
-        databaseSync.build();
-        JobClient jobClient = env.executeAsync();
-        waitForJobStatus(
-                jobClient,
-                Collections.singletonList(RUNNING),
-                Deadline.fromNow(Duration.ofSeconds(10)));
-        return jobClient;
-    }
-
-    private void addTableTable_4() throws SQLException {
-        try (Connection connection =
-                        DriverManager.getConnection(
-                                MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
-                Statement statement = connection.createStatement()) {
-            statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_4));
-            statement.execute(
-                    String.format(
-                            "CREATE TABLE %s.%s ( \n"
-                                    + "`name` varchar(256) primary key,\n"
-                                    + "`age` int\n"
-                                    + ")",
-                            DATABASE, TABLE_4));
-
-            // mock stock data
-            statement.execute(
-                    String.format("insert into %s.%s  values ('doris_4_1',4)", DATABASE, TABLE_4));
-            statement.execute(
-                    String.format("insert into %s.%s  values ('doris_4_2',4)", DATABASE, TABLE_4));
-        }
-    }
-
-    public void initializeMySQLTable() throws Exception {
-        try (Connection connection =
-                        DriverManager.getConnection(
-                                MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
-                Statement statement = connection.createStatement()) {
-            statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE));
-            statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_1));
-            statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_2));
-            statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_3));
-            statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_4));
-            statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_5));
-            statement.execute(
-                    String.format(
-                            "CREATE TABLE %s.%s ( \n"
-                                    + "`name` varchar(256) primary key,\n"
-                                    + "`age` int\n"
-                                    + ")",
-                            DATABASE, TABLE_1));
-            statement.execute(
-                    String.format(
-                            "CREATE TABLE %s.%s ( \n"
-                                    + "`name` varchar(256) primary key,\n"
-                                    + "`age` int\n"
-                                    + ")",
-                            DATABASE, TABLE_2));
-            statement.execute(
-                    String.format(
-                            "CREATE TABLE %s.%s ( \n"
-                                    + "`name` varchar(256) primary key,\n"
-                                    + "`age` int\n"
-                                    + ")",
-                            DATABASE, TABLE_3));
-            statement.execute(
-                    String.format(
-                            "CREATE TABLE %s.%s ( \n"
-                                    + "`name` varchar(256) primary key,\n"
-                                    + "`age` int\n"
-                                    + ")",
-                            DATABASE, TABLE_5));
-            // mock stock data
-            statement.execute(
-                    String.format("insert into %s.%s  values ('doris_1',1)", DATABASE, TABLE_1));
-            statement.execute(
-                    String.format("insert into %s.%s  values ('doris_2',2)", DATABASE, TABLE_2));
-            statement.execute(
-                    String.format("insert into %s.%s  values ('doris_3',3)", DATABASE, TABLE_3));
-            statement.execute(
-                    String.format("insert into %s.%s  values ('doris_5',5)", DATABASE, TABLE_5));
-        }
-    }
-}
diff --git a/flink-doris-connector/src/test/resources/container/e2e/doris2doris/test_doris2doris_sink_test_tbl.sql b/flink-doris-connector/src/test/resources/container/e2e/doris2doris/test_doris2doris_sink_test_tbl.sql
new file mode 100644
index 0000000..20ffb52
--- /dev/null
+++ b/flink-doris-connector/src/test/resources/container/e2e/doris2doris/test_doris2doris_sink_test_tbl.sql
@@ -0,0 +1,31 @@
+CREATE DATABASE IF NOT EXISTS test_doris2doris_sink;
+
+DROP TABLE IF EXISTS test_doris2doris_sink.test_tbl;
+
+CREATE TABLE test_doris2doris_sink.test_tbl (
+      `id` int,
+      `c1` boolean,
+      `c2` tinyint,
+      `c3` smallint,
+      `c4` int,
+      `c5` bigint,
+      `c6` largeint,
+      `c7` float,
+      `c8` double,
+      `c9` decimal(12,4),
+      `c10` date,
+      `c11` datetime,
+      `c12` char(1),
+      `c13` varchar(256),
+      `c14` Array<String>,
+      `c15` Map<String, String>,
+      `c16` Struct<name: String, age: int>,
+      `c17` JSON
+)
+    DUPLICATE KEY(`id`)
+DISTRIBUTED BY HASH(`id`) BUCKETS 1
+PROPERTIES (
+"replication_num" = "1",
+"light_schema_change" = "true"
+);
+
diff --git a/flink-doris-connector/src/test/resources/container/e2e/doris2doris/test_doris2doris_source_test_tbl.sql b/flink-doris-connector/src/test/resources/container/e2e/doris2doris/test_doris2doris_source_test_tbl.sql
new file mode 100644
index 0000000..5e57b50
--- /dev/null
+++ b/flink-doris-connector/src/test/resources/container/e2e/doris2doris/test_doris2doris_source_test_tbl.sql
@@ -0,0 +1,73 @@
+CREATE DATABASE IF NOT EXISTS test_doris2doris_source;
+
+DROP TABLE IF EXISTS test_doris2doris_source.test_tbl;
+
+CREATE TABLE test_doris2doris_source.test_tbl (
+     `id` int,
+     `c1` boolean,
+     `c2` tinyint,
+     `c3` smallint,
+     `c4` int,
+     `c5` bigint,
+     `c6` largeint,
+     `c7` float,
+     `c8` double,
+     `c9` decimal(12,4),
+     `c10` date,
+     `c11` datetime,
+     `c12` char(1),
+     `c13` varchar(256),
+     `c14` Array<String>,
+     `c15` Map<String, String>,
+     `c16` Struct<name: String, age: int>,
+     `c17` JSON
+)
+DUPLICATE KEY(`id`)
+DISTRIBUTED BY HASH(`id`) BUCKETS 1
+PROPERTIES (
+"replication_num" = "1",
+"light_schema_change" = "true"
+);
+
+INSERT INTO test_doris2doris_source.test_tbl
+VALUES
+    (
+        1,
+        TRUE,
+        127,
+        32767,
+        2147483647,
+        9223372036854775807,
+        123456789012345678901234567890,
+        3.14,
+        2.7182818284,
+        12345.6789,
+        '2023-05-22',
+        '2023-05-22 12:34:56',
+        'A',
+        'Example text',
+        ['item1', 'item2', 'item3'],
+        {'key1': 'value1', 'key2': 'value2'},
+        STRUCT('John Doe', 30),
+        '{"key": "value"}'
+    ),
+    (
+        2,
+        FALSE,
+        -128,
+        -32768,
+        -2147483648,
+        -9223372036854775808,
+        -123456789012345678901234567890,
+        -3.14,
+        -2.7182818284,
+        -12345.6789,
+        '2024-01-01',
+        '2024-01-01 00:00:00',
+        'B',
+        'Another example',
+        ['item4', 'item5', 'item6'],
+        {'key3': 'value3', 'key4': 'value4'},
+        STRUCT('Jane Doe', 25),
+        '{"another_key": "another_value"}'
+);
\ No newline at end of file
diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable.txt b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable.txt
new file mode 100644
index 0000000..88ec454
--- /dev/null
+++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable.txt
@@ -0,0 +1,5 @@
+mysql-sync-database
+    --including-tables "tbl.*|auto_add"
+    --table-conf replication_num=1
+    --single-sink true
+    --ignore-default-value true
\ No newline at end of file
diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable_init.sql b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable_init.sql
new file mode 100644
index 0000000..ec617f3
--- /dev/null
+++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable_init.sql
@@ -0,0 +1,38 @@
+CREATE DATABASE if NOT EXISTS test_e2e_mysql;
+DROP TABLE IF EXISTS test_e2e_mysql.tbl1;
+CREATE TABLE test_e2e_mysql.tbl1 (
+    `name` varchar(256) primary key,
+    `age` int
+);
+insert into test_e2e_mysql.tbl1 values ('doris_1',1);
+
+
+DROP TABLE IF EXISTS test_e2e_mysql.tbl2;
+CREATE TABLE test_e2e_mysql.tbl2 (
+    `name` varchar(256) primary key,
+    `age` int
+);
+insert into test_e2e_mysql.tbl2 values ('doris_2',2);
+
+
+DROP TABLE IF EXISTS test_e2e_mysql.tbl3;
+CREATE TABLE test_e2e_mysql.tbl3 (
+    `name` varchar(256) primary key,
+    `age` int
+);
+insert into test_e2e_mysql.tbl3 values ('doris_3',3);
+
+
+DROP TABLE IF EXISTS  test_e2e_mysql.tbl4;
+CREATE TABLE test_e2e_mysql.tbl4 (
+    `name` varchar(256) primary key,
+    `age` int
+);
+
+
+DROP TABLE IF EXISTS  test_e2e_mysql.tbl5;
+CREATE TABLE test_e2e_mysql.tbl5 (
+    `name` varchar(256) primary key,
+    `age` int
+);
+insert into test_e2e_mysql.tbl5 values ('doris_5',5);
\ No newline at end of file
diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt
new file mode 100644
index 0000000..601d083
--- /dev/null
+++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt
@@ -0,0 +1,5 @@
+mysql-sync-database
+    --including-tables "tbl.*"
+    --table-conf replication_num=1
+    --single-sink true
+    --ignore-default-value false
\ No newline at end of file
diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault.txt b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault.txt
new file mode 100644
index 0000000..6f69a75
--- /dev/null
+++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault.txt
@@ -0,0 +1,3 @@
+mysql-sync-database
+    --including-tables "tbl1|tbl2|tbl3|tbl5"
+    --table-conf replication_num=1
\ No newline at end of file
diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault_init.sql b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault_init.sql
new file mode 100644
index 0000000..ec617f3
--- /dev/null
+++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault_init.sql
@@ -0,0 +1,38 @@
+CREATE DATABASE if NOT EXISTS test_e2e_mysql;
+DROP TABLE IF EXISTS test_e2e_mysql.tbl1;
+CREATE TABLE test_e2e_mysql.tbl1 (
+    `name` varchar(256) primary key,
+    `age` int
+);
+insert into test_e2e_mysql.tbl1 values ('doris_1',1);
+
+
+DROP TABLE IF EXISTS test_e2e_mysql.tbl2;
+CREATE TABLE test_e2e_mysql.tbl2 (
+    `name` varchar(256) primary key,
+    `age` int
+);
+insert into test_e2e_mysql.tbl2 values ('doris_2',2);
+
+
+DROP TABLE IF EXISTS test_e2e_mysql.tbl3;
+CREATE TABLE test_e2e_mysql.tbl3 (
+    `name` varchar(256) primary key,
+    `age` int
+);
+insert into test_e2e_mysql.tbl3 values ('doris_3',3);
+
+
+DROP TABLE IF EXISTS  test_e2e_mysql.tbl4;
+CREATE TABLE test_e2e_mysql.tbl4 (
+    `name` varchar(256) primary key,
+    `age` int
+);
+
+
+DROP TABLE IF EXISTS  test_e2e_mysql.tbl5;
+CREATE TABLE test_e2e_mysql.tbl5 (
+    `name` varchar(256) primary key,
+    `age` int
+);
+insert into test_e2e_mysql.tbl5 values ('doris_5',5);
\ No newline at end of file
diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete.txt b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete.txt
new file mode 100644
index 0000000..1048916
--- /dev/null
+++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete.txt
@@ -0,0 +1,5 @@
+mysql-sync-database
+    --including-tables "tbl1|tbl2|tbl3|tbl5"
+    --table-conf replication_num=1
+    --sink-conf sink.enable-delete=false
+    --sink-conf sink.check-interval=5000
\ No newline at end of file
diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete_init.sql b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete_init.sql
new file mode 100644
index 0000000..ec617f3
--- /dev/null
+++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete_init.sql
@@ -0,0 +1,38 @@
+CREATE DATABASE if NOT EXISTS test_e2e_mysql;
+DROP TABLE IF EXISTS test_e2e_mysql.tbl1;
+CREATE TABLE test_e2e_mysql.tbl1 (
+    `name` varchar(256) primary key,
+    `age` int
+);
+insert into test_e2e_mysql.tbl1 values ('doris_1',1);
+
+
+DROP TABLE IF EXISTS test_e2e_mysql.tbl2;
+CREATE TABLE test_e2e_mysql.tbl2 (
+    `name` varchar(256) primary key,
+    `age` int
+);
+insert into test_e2e_mysql.tbl2 values ('doris_2',2);
+
+
+DROP TABLE IF EXISTS test_e2e_mysql.tbl3;
+CREATE TABLE test_e2e_mysql.tbl3 (
+    `name` varchar(256) primary key,
+    `age` int
+);
+insert into test_e2e_mysql.tbl3 values ('doris_3',3);
+
+
+DROP TABLE IF EXISTS  test_e2e_mysql.tbl4;
+CREATE TABLE test_e2e_mysql.tbl4 (
+    `name` varchar(256) primary key,
+    `age` int
+);
+
+
+DROP TABLE IF EXISTS  test_e2e_mysql.tbl5;
+CREATE TABLE test_e2e_mysql.tbl5 (
+    `name` varchar(256) primary key,
+    `age` int
+);
+insert into test_e2e_mysql.tbl5 values ('doris_5',5);
\ No newline at end of file
diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse.txt b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse.txt
new file mode 100644
index 0000000..d863ecf
--- /dev/null
+++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse.txt
@@ -0,0 +1,5 @@
+mysql-sync-database
+    --including-tables "tbl.*|add_tbl"
+    --table-conf replication_num=1
+    --schema-change-mode sql_parser
+    --single-sink true
\ No newline at end of file
diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse_init.sql b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse_init.sql
new file mode 100644
index 0000000..ec617f3
--- /dev/null
+++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse_init.sql
@@ -0,0 +1,38 @@
+CREATE DATABASE if NOT EXISTS test_e2e_mysql;
+DROP TABLE IF EXISTS test_e2e_mysql.tbl1;
+CREATE TABLE test_e2e_mysql.tbl1 (
+    `name` varchar(256) primary key,
+    `age` int
+);
+insert into test_e2e_mysql.tbl1 values ('doris_1',1);
+
+
+DROP TABLE IF EXISTS test_e2e_mysql.tbl2;
+CREATE TABLE test_e2e_mysql.tbl2 (
+    `name` varchar(256) primary key,
+    `age` int
+);
+insert into test_e2e_mysql.tbl2 values ('doris_2',2);
+
+
+DROP TABLE IF EXISTS test_e2e_mysql.tbl3;
+CREATE TABLE test_e2e_mysql.tbl3 (
+    `name` varchar(256) primary key,
+    `age` int
+);
+insert into test_e2e_mysql.tbl3 values ('doris_3',3);
+
+
+DROP TABLE IF EXISTS  test_e2e_mysql.tbl4;
+CREATE TABLE test_e2e_mysql.tbl4 (
+    `name` varchar(256) primary key,
+    `age` int
+);
+
+
+DROP TABLE IF EXISTS  test_e2e_mysql.tbl5;
+CREATE TABLE test_e2e_mysql.tbl5 (
+    `name` varchar(256) primary key,
+    `age` int
+);
+insert into test_e2e_mysql.tbl5 values ('doris_5',5);
\ No newline at end of file
diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris_init.sql b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris_init.sql
new file mode 100644
index 0000000..ec617f3
--- /dev/null
+++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris_init.sql
@@ -0,0 +1,38 @@
+CREATE DATABASE if NOT EXISTS test_e2e_mysql;
+DROP TABLE IF EXISTS test_e2e_mysql.tbl1;
+CREATE TABLE test_e2e_mysql.tbl1 (
+    `name` varchar(256) primary key,
+    `age` int
+);
+insert into test_e2e_mysql.tbl1 values ('doris_1',1);
+
+
+DROP TABLE IF EXISTS test_e2e_mysql.tbl2;
+CREATE TABLE test_e2e_mysql.tbl2 (
+    `name` varchar(256) primary key,
+    `age` int
+);
+insert into test_e2e_mysql.tbl2 values ('doris_2',2);
+
+
+DROP TABLE IF EXISTS test_e2e_mysql.tbl3;
+CREATE TABLE test_e2e_mysql.tbl3 (
+    `name` varchar(256) primary key,
+    `age` int
+);
+insert into test_e2e_mysql.tbl3 values ('doris_3',3);
+
+
+DROP TABLE IF EXISTS  test_e2e_mysql.tbl4;
+CREATE TABLE test_e2e_mysql.tbl4 (
+    `name` varchar(256) primary key,
+    `age` int
+);
+
+
+DROP TABLE IF EXISTS  test_e2e_mysql.tbl5;
+CREATE TABLE test_e2e_mysql.tbl5 (
+    `name` varchar(256) primary key,
+    `age` int
+);
+insert into test_e2e_mysql.tbl5 values ('doris_5',5);
\ No newline at end of file