[test](e2ecase) add mysql2doris e2ecase (#275)
diff --git a/.github/workflows/run-e2ecase.yml b/.github/workflows/run-e2ecase.yml
new file mode 100644
index 0000000..ad76a3f
--- /dev/null
+++ b/.github/workflows/run-e2ecase.yml
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+---
+name: Run E2ECases
+on:
+ pull_request:
+ push:
+
+jobs:
+ build-extension:
+ name: "Run E2ECases"
+ runs-on: ubuntu-latest
+ defaults:
+ run:
+ shell: bash
+ steps:
+ - name: Checkout
+ uses: actions/checkout@master
+
+ - name: Setup java
+ uses: actions/setup-java@v2
+ with:
+ distribution: adopt
+ java-version: '8'
+
+ - name: Run E2ECases
+ run: |
+ cd flink-doris-connector && mvn test -Dtest="*E2ECase"
+
diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index 76156ad..a54562e 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -332,6 +332,12 @@
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>mysql</artifactId>
+ <version>${testcontainers.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
index 8ad4130..520d3d2 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
@@ -49,8 +49,8 @@
protected static final String DORIS_12_DOCKER_IMAGE = "adamlee489/doris:1.2.7.1_x86";
private static final String DRIVER_JAR =
"https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";
- private static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
- private static final String URL = "jdbc:mysql://%s:9030";
+ protected static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
+ protected static final String URL = "jdbc:mysql://%s:9030";
protected static final String USERNAME = "root";
protected static final String PASSWORD = "";
protected static final GenericContainer DORIS_CONTAINER = createDorisContainer();
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
new file mode 100644
index 0000000..0246e39
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
@@ -0,0 +1,331 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.apache.doris.flink.DorisTestBase;
+import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.api.common.JobStatus.RUNNING;
+
+/**
+ * MySQLDorisE2ECase 1. Automatically create tables 2. Schema change event synchronization
+ * 3.Synchronization of addition, deletion and modification events 4. CDC multi-table writing.
+ */
+public class MySQLDorisE2ECase extends DorisTestBase {
+ protected static final Logger LOG = LoggerFactory.getLogger(MySQLDorisE2ECase.class);
+ private static final String DATABASE = "test";
+ private static final String MYSQL_USER = "root";
+ private static final String MYSQL_PASSWD = "123456";
+ private static final String TABLE_1 = "tbl1";
+ private static final String TABLE_2 = "tbl2";
+ private static final String TABLE_3 = "tbl3";
+
+ private static final MySQLContainer MYSQL_CONTAINER =
+ new MySQLContainer("mysql")
+ .withDatabaseName(DATABASE)
+ .withUsername(MYSQL_USER)
+ .withPassword(MYSQL_PASSWD);
+
+ @BeforeClass
+ public static void startMySQLContainers() {
+ MYSQL_CONTAINER.setCommand("--default-time-zone=Asia/Shanghai");
+ LOG.info("Starting MySQL containers...");
+ Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+ LOG.info("MySQL Containers are started.");
+ }
+
+ @AfterClass
+ public static void stopMySQLContainers() {
+ LOG.info("Stopping MySQL containers...");
+ MYSQL_CONTAINER.stop();
+ LOG.info("MySQL Containers are stopped.");
+ }
+
+ @Test
+ public void testMySQL2Doris() throws Exception {
+ initializeMySQLTable();
+ JobClient jobClient = submitJob();
+ // wait 2 times checkpoint
+ Thread.sleep(20000);
+ Set<List<Object>> expected =
+ Stream.<List<Object>>of(
+ Arrays.asList("doris_1", 1),
+ Arrays.asList("doris_2", 2),
+ Arrays.asList("doris_3", 3))
+ .collect(Collectors.toSet());
+ String sql =
+ "select * from %s.%s union all select * from %s.%s union all select * from %s.%s order by 1";
+ checkResult(expected, sql, 2);
+
+ // add incremental data
+ try (Connection connection =
+ DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
+ Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format("insert into %s.%s values ('doris_1_1',10)", DATABASE, TABLE_1));
+ statement.execute(
+ String.format("insert into %s.%s values ('doris_2_1',11)", DATABASE, TABLE_2));
+ statement.execute(
+ String.format("insert into %s.%s values ('doris_3_1',12)", DATABASE, TABLE_3));
+
+ statement.execute(
+ String.format(
+ "update %s.%s set age=18 where name='doris_1'", DATABASE, TABLE_1));
+ statement.execute(
+ String.format("delete from %s.%s where name='doris_2'", DATABASE, TABLE_2));
+ }
+
+ Thread.sleep(20000);
+ Set<List<Object>> expected2 =
+ Stream.<List<Object>>of(
+ Arrays.asList("doris_1", 18),
+ Arrays.asList("doris_1_1", 10),
+ Arrays.asList("doris_2_1", 11),
+ Arrays.asList("doris_3", 3),
+ Arrays.asList("doris_3_1", 12))
+ .collect(Collectors.toSet());
+ sql =
+ "select * from %s.%s union all select * from %s.%s union all select * from %s.%s order by 1";
+ checkResult(expected2, sql, 2);
+
+ // mock schema change
+ try (Connection connection =
+ DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
+ Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "alter table %s.%s add column c1 varchar(128)", DATABASE, TABLE_1));
+ statement.execute(
+ String.format("alter table %s.%s drop column age", DATABASE, TABLE_1));
+ Thread.sleep(20000);
+ statement.execute(
+ String.format(
+ "insert into %s.%s values ('doris_1_1_1','c1_val')",
+ DATABASE, TABLE_1));
+ }
+ Thread.sleep(20000);
+ Set<List<Object>> expected3 =
+ Stream.<List<Object>>of(
+ Arrays.asList("doris_1", null),
+ Arrays.asList("doris_1_1", null),
+ Arrays.asList("doris_1_1_1", "c1_val"))
+ .collect(Collectors.toSet());
+ sql = "select * from %s.%s order by 1";
+ checkResult(expected3, sql, 2);
+ jobClient.cancel().get();
+ }
+
+ public void checkResult(Set<List<Object>> expected, String query, int columnSize)
+ throws Exception {
+ Set<List<Object>> actual = new HashSet<>();
+ try (Statement sinkStatement = connection.createStatement()) {
+ ResultSet sinkResultSet =
+ sinkStatement.executeQuery(
+ String.format(
+ query, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE,
+ TABLE_3));
+ while (sinkResultSet.next()) {
+ List<Object> row = new ArrayList<>();
+ for (int i = 1; i <= columnSize; i++) {
+ row.add(sinkResultSet.getObject(i));
+ }
+ actual.add(row);
+ }
+ }
+ Assertions.assertIterableEquals(expected, actual);
+ }
+
+ public JobClient submitJob() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ Map<String, String> flinkMap = new HashMap<>();
+ flinkMap.put("execution.checkpointing.interval", "10s");
+ flinkMap.put("pipeline.operator-chaining", "false");
+ flinkMap.put("parallelism.default", "1");
+
+ Configuration configuration = Configuration.fromMap(flinkMap);
+ env.configure(configuration);
+
+ String database = DATABASE;
+ Map<String, String> mysqlConfig = new HashMap<>();
+ mysqlConfig.put("database-name", DATABASE);
+ mysqlConfig.put("hostname", MYSQL_CONTAINER.getHost());
+ mysqlConfig.put("port", MYSQL_CONTAINER.getMappedPort(3306) + "");
+ mysqlConfig.put("username", MYSQL_USER);
+ mysqlConfig.put("password", MYSQL_PASSWD);
+ mysqlConfig.put("server-time-zone", "Asia/Shanghai");
+ Configuration config = Configuration.fromMap(mysqlConfig);
+
+ Map<String, String> sinkConfig = new HashMap<>();
+ sinkConfig.put("fenodes", getFenodes());
+ sinkConfig.put("username", USERNAME);
+ sinkConfig.put("password", PASSWORD);
+ sinkConfig.put("jdbc-url", String.format(DorisTestBase.URL, DORIS_CONTAINER.getHost()));
+ sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
+ Configuration sinkConf = Configuration.fromMap(sinkConfig);
+
+ Map<String, String> tableConfig = new HashMap<>();
+ tableConfig.put("replication_num", "1");
+
+ String includingTables = "tbl1|tbl2|tbl3";
+ String excludingTables = "";
+ DatabaseSync databaseSync = new MysqlDatabaseSync();
+ databaseSync
+ .setEnv(env)
+ .setDatabase(database)
+ .setConfig(config)
+ .setIncludingTables(includingTables)
+ .setExcludingTables(excludingTables)
+ .setIgnoreDefaultValue(false)
+ .setSinkConfig(sinkConf)
+ .setTableConfig(tableConfig)
+ .setCreateTableOnly(false)
+ .setNewSchemaChange(true)
+ .create();
+ databaseSync.build();
+ JobClient jobClient = env.executeAsync();
+ waitForJobStatus(
+ jobClient,
+ Collections.singletonList(RUNNING),
+ Deadline.fromNow(Duration.ofSeconds(10)));
+ return jobClient;
+ }
+
+ public void initializeMySQLTable() throws Exception {
+ try (Connection connection =
+ DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
+ Statement statement = connection.createStatement()) {
+ statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE));
+ statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_1));
+ statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_2));
+ statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_3));
+ statement.execute(
+ String.format(
+ "CREATE TABLE %s.%s ( \n"
+ + "`name` varchar(256) primary key,\n"
+ + "`age` int\n"
+ + ")",
+ DATABASE, TABLE_1));
+ statement.execute(
+ String.format(
+ "CREATE TABLE %s.%s ( \n"
+ + "`name` varchar(256) primary key,\n"
+ + "`age` int\n"
+ + ")",
+ DATABASE, TABLE_2));
+ statement.execute(
+ String.format(
+ "CREATE TABLE %s.%s ( \n"
+ + "`name` varchar(256) primary key,\n"
+ + "`age` int\n"
+ + ")",
+ DATABASE, TABLE_3));
+ // mock stock data
+ statement.execute(
+ String.format("insert into %s.%s values ('doris_1',1)", DATABASE, TABLE_1));
+ statement.execute(
+ String.format("insert into %s.%s values ('doris_2',2)", DATABASE, TABLE_2));
+ statement.execute(
+ String.format("insert into %s.%s values ('doris_3',3)", DATABASE, TABLE_3));
+ }
+ }
+
+ public static void waitForJobStatus(
+ JobClient client, List<JobStatus> expectedStatus, Deadline deadline) throws Exception {
+ waitUntilCondition(
+ () -> {
+ JobStatus currentStatus = (JobStatus) client.getJobStatus().get();
+ if (expectedStatus.contains(currentStatus)) {
+ return true;
+ } else if (currentStatus.isTerminalState()) {
+ try {
+ client.getJobExecutionResult().get();
+ } catch (Exception var4) {
+ throw new IllegalStateException(
+ String.format(
+ "Job has entered %s state, but expecting %s",
+ currentStatus, expectedStatus),
+ var4);
+ }
+
+ throw new IllegalStateException(
+ String.format(
+ "Job has entered a terminal state %s, but expecting %s",
+ currentStatus, expectedStatus));
+ } else {
+ return false;
+ }
+ },
+ deadline,
+ 100L,
+ "Condition was not met in given timeout.");
+ }
+
+ public static void waitUntilCondition(
+ SupplierWithException<Boolean, Exception> condition,
+ Deadline timeout,
+ long retryIntervalMillis,
+ String errorMsg)
+ throws Exception {
+ while (timeout.hasTimeLeft() && !(Boolean) condition.get()) {
+ long timeLeft = Math.max(0L, timeout.timeLeft().toMillis());
+ Thread.sleep(Math.min(retryIntervalMillis, timeLeft));
+ }
+
+ if (!timeout.hasTimeLeft()) {
+ throw new TimeoutException(errorMsg);
+ }
+ }
+}