[test](e2ecase) add mysql2doris e2ecase (#275)

diff --git a/.github/workflows/run-e2ecase.yml b/.github/workflows/run-e2ecase.yml
new file mode 100644
index 0000000..ad76a3f
--- /dev/null
+++ b/.github/workflows/run-e2ecase.yml
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+---
+name: Run E2ECases
+on:
+  pull_request:
+  push:
+
+jobs:
+  build-extension:
+    name: "Run E2ECases"
+    runs-on: ubuntu-latest
+    defaults:
+      run:
+        shell: bash
+    steps:
+    - name: Checkout
+      uses: actions/checkout@master
+
+    - name: Setup java
+      uses: actions/setup-java@v2
+      with:
+        distribution: adopt
+        java-version: '8'
+
+    - name: Run E2ECases
+      run: |
+        cd flink-doris-connector && mvn test -Dtest="*E2ECase"
+
diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index 76156ad..a54562e 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -332,6 +332,12 @@
             <version>${testcontainers.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>mysql</artifactId>
+            <version>${testcontainers.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
index 8ad4130..520d3d2 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
@@ -49,8 +49,8 @@
     protected static final String DORIS_12_DOCKER_IMAGE = "adamlee489/doris:1.2.7.1_x86";
     private static final String DRIVER_JAR =
             "https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";
-    private static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
-    private static final String URL = "jdbc:mysql://%s:9030";
+    protected static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
+    protected static final String URL = "jdbc:mysql://%s:9030";
     protected static final String USERNAME = "root";
     protected static final String PASSWORD = "";
     protected static final GenericContainer DORIS_CONTAINER = createDorisContainer();
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
new file mode 100644
index 0000000..0246e39
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
@@ -0,0 +1,331 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.apache.doris.flink.DorisTestBase;
+import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.api.common.JobStatus.RUNNING;
+
+/**
+ * MySQLDorisE2ECase 1. Automatically create tables 2. Schema change event synchronization
+ * 3.Synchronization of addition, deletion and modification events 4. CDC multi-table writing.
+ */
+public class MySQLDorisE2ECase extends DorisTestBase {
+    protected static final Logger LOG = LoggerFactory.getLogger(MySQLDorisE2ECase.class);
+    private static final String DATABASE = "test";
+    private static final String MYSQL_USER = "root";
+    private static final String MYSQL_PASSWD = "123456";
+    private static final String TABLE_1 = "tbl1";
+    private static final String TABLE_2 = "tbl2";
+    private static final String TABLE_3 = "tbl3";
+
+    private static final MySQLContainer MYSQL_CONTAINER =
+            new MySQLContainer("mysql")
+                    .withDatabaseName(DATABASE)
+                    .withUsername(MYSQL_USER)
+                    .withPassword(MYSQL_PASSWD);
+
+    @BeforeClass
+    public static void startMySQLContainers() {
+        MYSQL_CONTAINER.setCommand("--default-time-zone=Asia/Shanghai");
+        LOG.info("Starting MySQL containers...");
+        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+        LOG.info("MySQL Containers are started.");
+    }
+
+    @AfterClass
+    public static void stopMySQLContainers() {
+        LOG.info("Stopping MySQL containers...");
+        MYSQL_CONTAINER.stop();
+        LOG.info("MySQL Containers are stopped.");
+    }
+
+    @Test
+    public void testMySQL2Doris() throws Exception {
+        initializeMySQLTable();
+        JobClient jobClient = submitJob();
+        // wait 2 times checkpoint
+        Thread.sleep(20000);
+        Set<List<Object>> expected =
+                Stream.<List<Object>>of(
+                                Arrays.asList("doris_1", 1),
+                                Arrays.asList("doris_2", 2),
+                                Arrays.asList("doris_3", 3))
+                        .collect(Collectors.toSet());
+        String sql =
+                "select * from %s.%s union all select * from %s.%s union all select * from %s.%s order by 1";
+        checkResult(expected, sql, 2);
+
+        // add incremental data
+        try (Connection connection =
+                        DriverManager.getConnection(
+                                MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
+                Statement statement = connection.createStatement()) {
+            statement.execute(
+                    String.format("insert into %s.%s  values ('doris_1_1',10)", DATABASE, TABLE_1));
+            statement.execute(
+                    String.format("insert into %s.%s  values ('doris_2_1',11)", DATABASE, TABLE_2));
+            statement.execute(
+                    String.format("insert into %s.%s  values ('doris_3_1',12)", DATABASE, TABLE_3));
+
+            statement.execute(
+                    String.format(
+                            "update %s.%s set age=18 where name='doris_1'", DATABASE, TABLE_1));
+            statement.execute(
+                    String.format("delete from %s.%s where name='doris_2'", DATABASE, TABLE_2));
+        }
+
+        Thread.sleep(20000);
+        Set<List<Object>> expected2 =
+                Stream.<List<Object>>of(
+                                Arrays.asList("doris_1", 18),
+                                Arrays.asList("doris_1_1", 10),
+                                Arrays.asList("doris_2_1", 11),
+                                Arrays.asList("doris_3", 3),
+                                Arrays.asList("doris_3_1", 12))
+                        .collect(Collectors.toSet());
+        sql =
+                "select * from %s.%s union all select * from %s.%s union all select * from %s.%s order by 1";
+        checkResult(expected2, sql, 2);
+
+        // mock schema change
+        try (Connection connection =
+                        DriverManager.getConnection(
+                                MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
+                Statement statement = connection.createStatement()) {
+            statement.execute(
+                    String.format(
+                            "alter table %s.%s add column c1 varchar(128)", DATABASE, TABLE_1));
+            statement.execute(
+                    String.format("alter table %s.%s drop column age", DATABASE, TABLE_1));
+            Thread.sleep(20000);
+            statement.execute(
+                    String.format(
+                            "insert into %s.%s  values ('doris_1_1_1','c1_val')",
+                            DATABASE, TABLE_1));
+        }
+        Thread.sleep(20000);
+        Set<List<Object>> expected3 =
+                Stream.<List<Object>>of(
+                                Arrays.asList("doris_1", null),
+                                Arrays.asList("doris_1_1", null),
+                                Arrays.asList("doris_1_1_1", "c1_val"))
+                        .collect(Collectors.toSet());
+        sql = "select * from %s.%s order by 1";
+        checkResult(expected3, sql, 2);
+        jobClient.cancel().get();
+    }
+
+    public void checkResult(Set<List<Object>> expected, String query, int columnSize)
+            throws Exception {
+        Set<List<Object>> actual = new HashSet<>();
+        try (Statement sinkStatement = connection.createStatement()) {
+            ResultSet sinkResultSet =
+                    sinkStatement.executeQuery(
+                            String.format(
+                                    query, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE,
+                                    TABLE_3));
+            while (sinkResultSet.next()) {
+                List<Object> row = new ArrayList<>();
+                for (int i = 1; i <= columnSize; i++) {
+                    row.add(sinkResultSet.getObject(i));
+                }
+                actual.add(row);
+            }
+        }
+        Assertions.assertIterableEquals(expected, actual);
+    }
+
+    public JobClient submitJob() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        Map<String, String> flinkMap = new HashMap<>();
+        flinkMap.put("execution.checkpointing.interval", "10s");
+        flinkMap.put("pipeline.operator-chaining", "false");
+        flinkMap.put("parallelism.default", "1");
+
+        Configuration configuration = Configuration.fromMap(flinkMap);
+        env.configure(configuration);
+
+        String database = DATABASE;
+        Map<String, String> mysqlConfig = new HashMap<>();
+        mysqlConfig.put("database-name", DATABASE);
+        mysqlConfig.put("hostname", MYSQL_CONTAINER.getHost());
+        mysqlConfig.put("port", MYSQL_CONTAINER.getMappedPort(3306) + "");
+        mysqlConfig.put("username", MYSQL_USER);
+        mysqlConfig.put("password", MYSQL_PASSWD);
+        mysqlConfig.put("server-time-zone", "Asia/Shanghai");
+        Configuration config = Configuration.fromMap(mysqlConfig);
+
+        Map<String, String> sinkConfig = new HashMap<>();
+        sinkConfig.put("fenodes", getFenodes());
+        sinkConfig.put("username", USERNAME);
+        sinkConfig.put("password", PASSWORD);
+        sinkConfig.put("jdbc-url", String.format(DorisTestBase.URL, DORIS_CONTAINER.getHost()));
+        sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
+        Configuration sinkConf = Configuration.fromMap(sinkConfig);
+
+        Map<String, String> tableConfig = new HashMap<>();
+        tableConfig.put("replication_num", "1");
+
+        String includingTables = "tbl1|tbl2|tbl3";
+        String excludingTables = "";
+        DatabaseSync databaseSync = new MysqlDatabaseSync();
+        databaseSync
+                .setEnv(env)
+                .setDatabase(database)
+                .setConfig(config)
+                .setIncludingTables(includingTables)
+                .setExcludingTables(excludingTables)
+                .setIgnoreDefaultValue(false)
+                .setSinkConfig(sinkConf)
+                .setTableConfig(tableConfig)
+                .setCreateTableOnly(false)
+                .setNewSchemaChange(true)
+                .create();
+        databaseSync.build();
+        JobClient jobClient = env.executeAsync();
+        waitForJobStatus(
+                jobClient,
+                Collections.singletonList(RUNNING),
+                Deadline.fromNow(Duration.ofSeconds(10)));
+        return jobClient;
+    }
+
+    public void initializeMySQLTable() throws Exception {
+        try (Connection connection =
+                        DriverManager.getConnection(
+                                MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
+                Statement statement = connection.createStatement()) {
+            statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE));
+            statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_1));
+            statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_2));
+            statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_3));
+            statement.execute(
+                    String.format(
+                            "CREATE TABLE %s.%s ( \n"
+                                    + "`name` varchar(256) primary key,\n"
+                                    + "`age` int\n"
+                                    + ")",
+                            DATABASE, TABLE_1));
+            statement.execute(
+                    String.format(
+                            "CREATE TABLE %s.%s ( \n"
+                                    + "`name` varchar(256) primary key,\n"
+                                    + "`age` int\n"
+                                    + ")",
+                            DATABASE, TABLE_2));
+            statement.execute(
+                    String.format(
+                            "CREATE TABLE %s.%s ( \n"
+                                    + "`name` varchar(256) primary key,\n"
+                                    + "`age` int\n"
+                                    + ")",
+                            DATABASE, TABLE_3));
+            // mock stock data
+            statement.execute(
+                    String.format("insert into %s.%s  values ('doris_1',1)", DATABASE, TABLE_1));
+            statement.execute(
+                    String.format("insert into %s.%s  values ('doris_2',2)", DATABASE, TABLE_2));
+            statement.execute(
+                    String.format("insert into %s.%s  values ('doris_3',3)", DATABASE, TABLE_3));
+        }
+    }
+
+    public static void waitForJobStatus(
+            JobClient client, List<JobStatus> expectedStatus, Deadline deadline) throws Exception {
+        waitUntilCondition(
+                () -> {
+                    JobStatus currentStatus = (JobStatus) client.getJobStatus().get();
+                    if (expectedStatus.contains(currentStatus)) {
+                        return true;
+                    } else if (currentStatus.isTerminalState()) {
+                        try {
+                            client.getJobExecutionResult().get();
+                        } catch (Exception var4) {
+                            throw new IllegalStateException(
+                                    String.format(
+                                            "Job has entered %s state, but expecting %s",
+                                            currentStatus, expectedStatus),
+                                    var4);
+                        }
+
+                        throw new IllegalStateException(
+                                String.format(
+                                        "Job has entered a terminal state %s, but expecting %s",
+                                        currentStatus, expectedStatus));
+                    } else {
+                        return false;
+                    }
+                },
+                deadline,
+                100L,
+                "Condition was not met in given timeout.");
+    }
+
+    public static void waitUntilCondition(
+            SupplierWithException<Boolean, Exception> condition,
+            Deadline timeout,
+            long retryIntervalMillis,
+            String errorMsg)
+            throws Exception {
+        while (timeout.hasTimeLeft() && !(Boolean) condition.get()) {
+            long timeLeft = Math.max(0L, timeout.timeLeft().toMillis());
+            Thread.sleep(Math.min(retryIntervalMillis, timeLeft));
+        }
+
+        if (!timeout.hasTimeLeft()) {
+            throw new TimeoutException(errorMsg);
+        }
+    }
+}