[improve]Transform the End-To-End(E2E) tasks on the assembly line (#466)
diff --git a/.licenserc.yaml b/.licenserc.yaml
index 6048839..27e1080 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -12,5 +12,6 @@
- '.github/PULL_REQUEST_TEMPLATE.md'
- '.licenserc.yaml'
- 'custom_env.sh.tpl'
+ - 'flink-doris-connector/src/test/resources/container/'
comment: on-failure
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index ec3b452..61beea1 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -20,6 +20,7 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
@@ -36,11 +37,14 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
/** cdc sync tools. */
public class CdcTools {
private static final List<String> EMPTY_KEYS =
Collections.singletonList(DatabaseSyncConfig.PASSWORD);
+ private static StreamExecutionEnvironment flinkEnvironmentForTesting;
+ private static JobClient jobClient;
public static void main(String[] args) throws Exception {
System.out.println("Input args: " + Arrays.asList(args) + ".\n");
@@ -146,7 +150,10 @@
new DorisTableConfig(getConfigMap(params, DatabaseSyncConfig.TABLE_CONF));
Configuration sinkConfig = Configuration.fromMap(sinkMap);
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamExecutionEnvironment env =
+ Objects.nonNull(flinkEnvironmentForTesting)
+ ? flinkEnvironmentForTesting
+ : StreamExecutionEnvironment.getExecutionEnvironment();
databaseSync
.setEnv(env)
.setDatabase(database)
@@ -174,7 +181,23 @@
config.getString(
DatabaseSyncConfig.DATABASE_NAME, DatabaseSyncConfig.DB));
}
- env.execute(jobName);
+ if (Objects.nonNull(flinkEnvironmentForTesting)) {
+ jobClient = env.executeAsync();
+ } else {
+ env.execute(jobName);
+ }
+ }
+
+ @VisibleForTesting
+ public static JobClient getJobClient() {
+ return jobClient;
+ }
+
+ // Only for testing, please do not use it in actual environment
+ @VisibleForTesting
+ public static void setStreamExecutionEnvironmentForTesting(
+ StreamExecutionEnvironment environment) {
+ flinkEnvironmentForTesting = environment;
}
@VisibleForTesting
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
deleted file mode 100644
index 5097a21..0000000
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
+++ /dev/null
@@ -1,326 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.flink;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.api.common.time.Deadline;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
-import org.apache.flink.runtime.minicluster.MiniCluster;
-import org.apache.flink.runtime.minicluster.RpcServiceSharing;
-import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
-import org.apache.flink.util.function.SupplierWithException;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.lang3.StringUtils;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.Network;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.utility.DockerLoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.InetAddress;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.locks.LockSupport;
-
-public abstract class DorisTestBase {
- protected static final Logger LOG = LoggerFactory.getLogger(DorisTestBase.class);
- private static final String DEFAULT_DOCKER_IMAGE = "apache/doris:doris-all-in-one-2.1.0";
- protected static final String DORIS_DOCKER_IMAGE =
- System.getProperty("image") == null
- ? DEFAULT_DOCKER_IMAGE
- : System.getProperty("image");
- private static final String DRIVER_JAR =
- "https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";
- protected static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
- protected static final String URL = "jdbc:mysql://%s:9030";
- protected static final String USERNAME = "root";
- protected static final String PASSWORD = "";
- protected static final GenericContainer DORIS_CONTAINER = createDorisContainer();
-
- protected static String getFenodes() {
- return DORIS_CONTAINER.getHost() + ":8030";
- }
-
- protected static String getBenodes() {
- return DORIS_CONTAINER.getHost() + ":8040";
- }
-
- protected static String getJdbcUrl() {
- return String.format(URL, DORIS_CONTAINER.getHost());
- }
-
- protected static String getHost() {
- return DORIS_CONTAINER.getHost();
- }
-
- static {
- startContainers();
- }
-
- public static void startContainers() {
- try {
- LOG.info("Starting doris containers...");
- // singleton doris container
- DORIS_CONTAINER.start();
- initializeJdbcConnection();
- } catch (Exception ex) {
- LOG.error("Failed to start containers doris, ", ex);
- }
- LOG.info("Containers doris are started.");
- }
-
- public static GenericContainer createDorisContainer() {
- LOG.info("Create doris containers...");
- GenericContainer container =
- new GenericContainer<>(DORIS_DOCKER_IMAGE)
- .withNetwork(Network.newNetwork())
- .withNetworkAliases("DorisContainer")
- .withPrivilegedMode(true)
- .withLogConsumer(
- new Slf4jLogConsumer(
- DockerLoggerFactory.getLogger(DORIS_DOCKER_IMAGE)))
- .withExposedPorts(8030, 9030, 8040, 9060);
-
- container.setPortBindings(
- Lists.newArrayList(
- String.format("%s:%s", "8030", "8030"),
- String.format("%s:%s", "9030", "9030"),
- String.format("%s:%s", "9060", "9060"),
- String.format("%s:%s", "8040", "8040")));
-
- return container;
- }
-
- protected static void initializeJdbcConnection() throws Exception {
- URLClassLoader urlClassLoader =
- new URLClassLoader(
- new URL[] {new URL(DRIVER_JAR)}, DorisTestBase.class.getClassLoader());
- LOG.info("Try to connect to Doris...");
- Thread.currentThread().setContextClassLoader(urlClassLoader);
- try (Connection connection =
- DriverManager.getConnection(
- String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
- Statement statement = connection.createStatement()) {
- ResultSet resultSet;
- do {
- LOG.info("Wait for the Backend to start successfully...");
- resultSet = statement.executeQuery("show backends");
- } while (!isBeReady(resultSet, Duration.ofSeconds(1L)));
- }
- LOG.info("Connected to Doris successfully...");
- printClusterStatus();
- }
-
- private static boolean isBeReady(ResultSet rs, Duration duration) throws SQLException {
- LockSupport.parkNanos(duration.toNanos());
- if (rs.next()) {
- String isAlive = rs.getString("Alive").trim();
- String totalCap = rs.getString("TotalCapacity").trim();
- return "true".equalsIgnoreCase(isAlive) && !"0.000".equalsIgnoreCase(totalCap);
- }
- return false;
- }
-
- protected static void printClusterStatus() throws Exception {
- LOG.info("Current machine IP: {}", InetAddress.getLocalHost());
- echo("sh", "-c", "cat /proc/cpuinfo | grep 'cpu cores' | uniq");
- echo("sh", "-c", "free -h");
- try (Connection connection =
- DriverManager.getConnection(
- String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
- Statement statement = connection.createStatement()) {
- ResultSet showFrontends = statement.executeQuery("show frontends");
- LOG.info("Frontends status: {}", convertList(showFrontends));
- ResultSet showBackends = statement.executeQuery("show backends");
- LOG.info("Backends status: {}", convertList(showBackends));
- }
- }
-
- static void echo(String... cmd) {
- try {
- Process p = Runtime.getRuntime().exec(cmd);
- InputStream is = p.getInputStream();
- BufferedReader reader = new BufferedReader(new InputStreamReader(is));
- String line;
- while ((line = reader.readLine()) != null) {
- System.out.println(line);
- }
- p.waitFor();
- is.close();
- reader.close();
- p.destroy();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- private static List<Map> convertList(ResultSet rs) throws SQLException {
- List<Map> list = new ArrayList<>();
- ResultSetMetaData metaData = rs.getMetaData();
- int columnCount = metaData.getColumnCount();
- while (rs.next()) {
- Map<String, Object> rowData = new HashMap<>();
- for (int i = 1; i <= columnCount; i++) {
- rowData.put(metaData.getColumnName(i), rs.getObject(i));
- }
- list.add(rowData);
- }
- return list;
- }
-
- public void checkResult(List<String> expected, String query, int columnSize) throws Exception {
- List<String> actual = new ArrayList<>();
- try (Connection connection =
- DriverManager.getConnection(
- String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
- Statement statement = connection.createStatement()) {
- ResultSet sinkResultSet = statement.executeQuery(query);
- while (sinkResultSet.next()) {
- List<String> row = new ArrayList<>();
- for (int i = 1; i <= columnSize; i++) {
- Object value = sinkResultSet.getObject(i);
- if (value == null) {
- row.add("null");
- } else {
- row.add(value.toString());
- }
- }
- actual.add(StringUtils.join(row, ","));
- }
- }
- Assert.assertArrayEquals(expected.toArray(), actual.toArray());
- }
-
- @Rule
- public final MiniClusterWithClientResource miniClusterResource =
- new MiniClusterWithClientResource(
- new MiniClusterResourceConfiguration.Builder()
- .setNumberTaskManagers(1)
- .setNumberSlotsPerTaskManager(2)
- .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
- .withHaLeadershipControl()
- .build());
-
- /** The type of failover. */
- protected enum FailoverType {
- TM,
- JM,
- NONE
- }
-
- protected static void triggerFailover(
- FailoverType type, JobID jobId, MiniCluster miniCluster, Runnable afterFailAction)
- throws Exception {
- switch (type) {
- case TM:
- restartTaskManager(miniCluster, afterFailAction);
- break;
- case JM:
- triggerJobManagerFailover(jobId, miniCluster, afterFailAction);
- break;
- case NONE:
- break;
- default:
- throw new IllegalStateException("Unexpected value: " + type);
- }
- }
-
- protected static void triggerJobManagerFailover(
- JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) throws Exception {
- final HaLeadershipControl haLeadershipControl = miniCluster.getHaLeadershipControl().get();
- haLeadershipControl.revokeJobMasterLeadership(jobId).get();
- afterFailAction.run();
- haLeadershipControl.grantJobMasterLeadership(jobId).get();
- }
-
- protected static void restartTaskManager(MiniCluster miniCluster, Runnable afterFailAction)
- throws Exception {
- miniCluster.terminateTaskManager(0).get();
- afterFailAction.run();
- miniCluster.startTaskManager();
- }
-
- public static void waitForJobStatus(
- JobClient client, List<JobStatus> expectedStatus, Deadline deadline) throws Exception {
- waitUntilCondition(
- () -> {
- JobStatus currentStatus = (JobStatus) client.getJobStatus().get();
- if (expectedStatus.contains(currentStatus)) {
- return true;
- } else if (currentStatus.isTerminalState()) {
- try {
- client.getJobExecutionResult().get();
- } catch (Exception var4) {
- throw new IllegalStateException(
- String.format(
- "Job has entered %s state, but expecting %s",
- currentStatus, expectedStatus),
- var4);
- }
-
- throw new IllegalStateException(
- String.format(
- "Job has entered a terminal state %s, but expecting %s",
- currentStatus, expectedStatus));
- } else {
- return false;
- }
- },
- deadline,
- 100L,
- "Condition was not met in given timeout.");
- }
-
- public static void waitUntilCondition(
- SupplierWithException<Boolean, Exception> condition,
- Deadline timeout,
- long retryIntervalMillis,
- String errorMsg)
- throws Exception {
- while (timeout.hasTimeLeft() && !(Boolean) condition.get()) {
- long timeLeft = Math.max(0L, timeout.timeLeft().toMillis());
- Thread.sleep(Math.min(retryIntervalMillis, timeLeft));
- }
-
- if (!timeout.hasTimeLeft()) {
- throw new TimeoutException(errorMsg);
- }
- }
-}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisCatalogITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisCatalogITCase.java
index 712b188..b3a3ce0 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisCatalogITCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisCatalogITCase.java
@@ -40,8 +40,8 @@
import org.apache.flink.util.CollectionUtil;
import com.google.common.collect.Lists;
-import org.apache.doris.flink.DorisTestBase;
import org.apache.doris.flink.cfg.DorisConnectionOptions;
+import org.apache.doris.flink.container.AbstractITCaseService;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
@@ -62,40 +62,14 @@
import static org.junit.Assert.assertTrue;
/** Class for unit tests to run on catalogs. */
-public class DorisCatalogITCase extends DorisTestBase {
+public class DorisCatalogITCase extends AbstractITCaseService {
private static final String TEST_CATALOG_NAME = "doris_catalog";
- private static final String TEST_FENODES = getFenodes();
- private static final String TEST_JDBCURL = getJdbcUrl();
- private static final String TEST_USERNAME = USERNAME;
- private static final String TEST_PWD = PASSWORD;
- // private static final String TEST_FENODES = "127.0.0.1:8030";
- // private static final String TEST_JDBCURL = "jdbc:mysql://127.0.0.1:9030";
- // private static final String TEST_USERNAME = "root";
- // private static final String TEST_PWD = "";
private static final String TEST_DB = "catalog_db";
private static final String TEST_TABLE = "t_all_types";
private static final String TEST_TABLE_SINK = "t_all_types_sink";
private static final String TEST_TABLE_SINK_GROUPBY = "t_all_types_sink_groupby";
- protected static final Schema TABLE_SCHEMA =
- Schema.newBuilder()
- .column("id", DataTypes.STRING())
- .column("c_boolean", DataTypes.BOOLEAN())
- .column("c_char", DataTypes.CHAR(1))
- .column("c_date", DataTypes.DATE())
- .column("c_datetime", DataTypes.TIMESTAMP(0))
- .column("c_decimal", DataTypes.DECIMAL(10, 2))
- .column("c_double", DataTypes.DOUBLE())
- .column("c_float", DataTypes.FLOAT())
- .column("c_int", DataTypes.INT())
- .column("c_bigint", DataTypes.BIGINT())
- .column("c_largeint", DataTypes.STRING())
- .column("c_smallint", DataTypes.SMALLINT())
- .column("c_string", DataTypes.STRING())
- .column("c_tinyint", DataTypes.TINYINT())
- .build();
-
- protected static final TableSchema TABLE_SCHEMA_1 =
+ private static final TableSchema TABLE_SCHEMA =
TableSchema.builder()
.field("id", new AtomicDataType(new VarCharType(false, 128)))
.field("c_boolean", DataTypes.BOOLEAN())
@@ -162,10 +136,10 @@
TableNotExistException, DatabaseNotExistException {
DorisConnectionOptions connectionOptions =
new DorisConnectionOptions.DorisConnectionOptionsBuilder()
- .withFenodes(TEST_FENODES)
- .withJdbcUrl(TEST_JDBCURL)
- .withUsername(TEST_USERNAME)
- .withPassword(TEST_PWD)
+ .withFenodes(getFenodes())
+ .withJdbcUrl(getDorisQueryUrl())
+ .withUsername(getDorisUsername())
+ .withPassword(getDorisPassword())
.build();
Map<String, String> props = new HashMap<>();
@@ -272,7 +246,7 @@
CatalogBaseTable table = catalog.getTable(new ObjectPath(TEST_DB, TEST_TABLE));
Schema actual = table.getUnresolvedSchema();
assertEquals(
- TABLE_SCHEMA_1.getFieldNames(),
+ TABLE_SCHEMA.getFieldNames(),
actual.getColumns().stream().map(Schema.UnresolvedColumn::getName).toArray());
}
@@ -308,7 +282,7 @@
public void testCreateTable() throws TableAlreadyExistException, DatabaseNotExistException {
CatalogTableImpl catalogTable =
new CatalogTableImpl(
- TABLE_SCHEMA_1,
+ TABLE_SCHEMA,
new HashMap<String, String>() {
{
put("connector", "doris-1");
@@ -425,7 +399,7 @@
private static CatalogTable createTable() {
return new CatalogTableImpl(
- TABLE_SCHEMA_1,
+ TABLE_SCHEMA,
new HashMap<String, String>() {
{
put("connector", "doris");
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java
new file mode 100644
index 0000000..967e6f3
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.container;
+
+import org.apache.doris.flink.container.instance.ContainerService;
+import org.apache.doris.flink.container.instance.DorisContainer;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.util.Objects;
+
+public abstract class AbstractContainerTestBase {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractContainerTestBase.class);
+ private static ContainerService dorisContainerService;
+
+ @BeforeClass
+ public static void initContainers() {
+ LOG.info("Trying to start doris containers.");
+ initDorisContainer();
+ }
+
+ private static void initDorisContainer() {
+ if (Objects.nonNull(dorisContainerService) && dorisContainerService.isRunning()) {
+ LOG.info("The doris container has been started and is running status.");
+ return;
+ }
+ dorisContainerService = new DorisContainer();
+ dorisContainerService.startContainer();
+ LOG.info("Doris container was started.");
+ }
+
+ protected static Connection getDorisQueryConnection() {
+ return dorisContainerService.getQueryConnection();
+ }
+
+ protected String getFenodes() {
+ return dorisContainerService.getFenodes();
+ }
+
+ protected String getBenodes() {
+ return dorisContainerService.getBenodes();
+ }
+
+ protected String getDorisUsername() {
+ return dorisContainerService.getUsername();
+ }
+
+ protected String getDorisPassword() {
+ return dorisContainerService.getPassword();
+ }
+
+ protected String getDorisQueryUrl() {
+ return String.format(
+ "jdbc:mysql://%s:%s",
+ getDorisInstanceHost(), dorisContainerService.getMappedPort(9030));
+ }
+
+ protected String getDorisInstanceHost() {
+ return dorisContainerService.getInstanceHost();
+ }
+
+ public static void closeContainers() {
+ LOG.info("Starting to close containers.");
+ closeDorisContainer();
+ }
+
+ private static void closeDorisContainer() {
+ if (Objects.isNull(dorisContainerService)) {
+ return;
+ }
+ dorisContainerService.close();
+ LOG.info("Doris container was closed.");
+ }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java
new file mode 100644
index 0000000..527f82c
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.container;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.apache.doris.flink.container.instance.ContainerService;
+import org.apache.doris.flink.container.instance.MySQLContainer;
+import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.apache.doris.flink.tools.cdc.CdcTools;
+import org.apache.doris.flink.tools.cdc.DatabaseSyncConfig;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Semaphore;
+
+public abstract class AbstractE2EService extends AbstractContainerTestBase {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractE2EService.class);
+ private static ContainerService mysqlContainerService;
+ private static JobClient jobClient;
+ protected static final Semaphore SEMAPHORE = new Semaphore(1);
+ protected static final String SINK_CONF = "--" + DatabaseSyncConfig.SINK_CONF;
+ protected static final String DORIS_DATABASE = "--database";
+ protected static final String HOSTNAME = "hostname";
+ protected static final String PORT = "port";
+ protected static final String USERNAME = "username";
+ protected static final String PASSWORD = "password";
+ protected static final String DATABASE_NAME = "database-name";
+ protected static final String FENODES = "fenodes";
+ protected static final String JDBC_URL = "jdbc-url";
+ protected static final String SINK_LABEL_PREFIX = "sink.label-prefix";
+
+ @BeforeClass
+ public static void initE2EContainers() {
+ LOG.info("Trying to Start init E2E containers.");
+ initMySQLContainer();
+ }
+
+ private static void initMySQLContainer() {
+ if (Objects.nonNull(mysqlContainerService) && mysqlContainerService.isRunning()) {
+ LOG.info("The MySQL container has been started and is running status.");
+ return;
+ }
+ mysqlContainerService = new MySQLContainer();
+ mysqlContainerService.startContainer();
+ LOG.info("Mysql container was started.");
+ }
+
+ protected String getMySQLInstanceHost() {
+ return mysqlContainerService.getInstanceHost();
+ }
+
+ protected Integer getMySQLQueryPort() {
+ return mysqlContainerService.getMappedPort(3306);
+ }
+
+ protected String getMySQLUsername() {
+ return mysqlContainerService.getUsername();
+ }
+
+ protected String getMySQLPassword() {
+ return mysqlContainerService.getPassword();
+ }
+
+ protected Connection getMySQLQueryConnection() {
+ return mysqlContainerService.getQueryConnection();
+ }
+
+ protected void submitE2EJob(String jobName, String[] args) {
+ try {
+ LOG.info("{} e2e job will submit to start. ", jobName);
+ CdcTools.setStreamExecutionEnvironmentForTesting(configFlinkEnvironment());
+ CdcTools.main(args);
+ jobClient = CdcTools.getJobClient();
+ if (Objects.isNull(jobClient)) {
+ LOG.warn("Failed get flink job client. jobName={}", jobName);
+ throw new DorisRuntimeException("Failed get flink job client. jobName=" + jobName);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to submit e2e job. jobName={}", jobName);
+ throw new DorisRuntimeException(e);
+ }
+ }
+
+ protected void cancelE2EJob(String jobName) {
+ LOG.info("{} e2e job will cancel", jobName);
+ jobClient.cancel();
+ }
+
+ private StreamExecutionEnvironment configFlinkEnvironment() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ Map<String, String> flinkMap = new HashMap<>();
+ flinkMap.put("execution.checkpointing.interval", "10s");
+ flinkMap.put("pipeline.operator-chaining", "false");
+ flinkMap.put("parallelism.default", "1");
+ Configuration configuration = Configuration.fromMap(flinkMap);
+ env.configure(configuration);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ return env;
+ }
+
+ protected void setSinkConfDefaultConfig(List<String> argList) {
+ // set default doris sink config
+ argList.add(SINK_CONF);
+ argList.add(FENODES + "=" + getFenodes());
+ argList.add(SINK_CONF);
+ argList.add(USERNAME + "=" + getDorisUsername());
+ argList.add(SINK_CONF);
+ argList.add(PASSWORD + "=" + getDorisPassword());
+ argList.add(SINK_CONF);
+ argList.add(FENODES + "=" + getFenodes());
+ argList.add(SINK_CONF);
+ argList.add(JDBC_URL + "=" + getDorisQueryUrl());
+ argList.add(SINK_CONF);
+ argList.add(SINK_LABEL_PREFIX + "=" + "label");
+ }
+
+ public static void closeE2EContainers() {
+ LOG.info("Starting to close E2E containers.");
+ closeMySQLContainer();
+ }
+
+ private static void closeMySQLContainer() {
+ if (Objects.isNull(mysqlContainerService)) {
+ return;
+ }
+ mysqlContainerService.close();
+ LOG.info("Mysql container was closed.");
+ }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java
new file mode 100644
index 0000000..956b8be
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.container;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.junit.Rule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+public abstract class AbstractITCaseService extends AbstractContainerTestBase {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractITCaseService.class);
+
+ protected static void waitForJobStatus(
+ JobClient client, List<JobStatus> expectedStatus, Deadline deadline) throws Exception {
+ waitUntilCondition(
+ () -> {
+ JobStatus currentStatus = (JobStatus) client.getJobStatus().get();
+ if (expectedStatus.contains(currentStatus)) {
+ return true;
+ } else if (currentStatus.isTerminalState()) {
+ try {
+ client.getJobExecutionResult().get();
+ } catch (Exception var4) {
+ throw new IllegalStateException(
+ String.format(
+ "Job has entered %s state, but expecting %s",
+ currentStatus, expectedStatus),
+ var4);
+ }
+
+ throw new IllegalStateException(
+ String.format(
+ "Job has entered a terminal state %s, but expecting %s",
+ currentStatus, expectedStatus));
+ } else {
+ return false;
+ }
+ },
+ deadline,
+ 100L,
+ "Condition was not met in given timeout.");
+ }
+
+ protected static void waitUntilCondition(
+ SupplierWithException<Boolean, Exception> condition,
+ Deadline timeout,
+ long retryIntervalMillis,
+ String errorMsg)
+ throws Exception {
+ while (timeout.hasTimeLeft() && !(Boolean) condition.get()) {
+ long timeLeft = Math.max(0L, timeout.timeLeft().toMillis());
+ Thread.sleep(Math.min(retryIntervalMillis, timeLeft));
+ }
+
+ if (!timeout.hasTimeLeft()) {
+ throw new TimeoutException(errorMsg);
+ }
+ }
+
+ @Rule
+ public final MiniClusterWithClientResource miniClusterResource =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(2)
+ .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+ .withHaLeadershipControl()
+ .build());
+
+ /** The type of failover. */
+ protected enum FailoverType {
+ TM,
+ JM,
+ NONE
+ }
+
+ protected static void triggerFailover(
+ FailoverType type, JobID jobId, MiniCluster miniCluster, Runnable afterFailAction)
+ throws Exception {
+ LOG.info("Will job trigger failover. type={}, jobId={}", type, jobId);
+ switch (type) {
+ case TM:
+ restartTaskManager(miniCluster, afterFailAction);
+ break;
+ case JM:
+ triggerJobManagerFailover(jobId, miniCluster, afterFailAction);
+ break;
+ case NONE:
+ break;
+ default:
+ throw new IllegalStateException("Unexpected value: " + type);
+ }
+ }
+
+ protected static void restartTaskManager(MiniCluster miniCluster, Runnable afterFailAction)
+ throws Exception {
+ LOG.info("flink cluster will terminate task manager.");
+ miniCluster.terminateTaskManager(0).get();
+ afterFailAction.run();
+ LOG.info("flink cluster will start task manager.");
+ miniCluster.startTaskManager();
+ }
+
+ protected static void triggerJobManagerFailover(
+ JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) throws Exception {
+ LOG.info("flink cluster will revoke job master leadership. jobId={}", jobId);
+ final HaLeadershipControl haLeadershipControl = miniCluster.getHaLeadershipControl().get();
+ haLeadershipControl.revokeJobMasterLeadership(jobId).get();
+ afterFailAction.run();
+ LOG.info("flink cluster will grant job master leadership. jobId={}", jobId);
+ haLeadershipControl.grantJobMasterLeadership(jobId).get();
+ }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/ContainerUtils.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/ContainerUtils.java
new file mode 100644
index 0000000..e4c99d5
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/ContainerUtils.java
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.container;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.junit.Assert;
+import org.slf4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class ContainerUtils {
+
+ public static void executeSQLStatement(Connection connection, Logger logger, String... sql) {
+ if (Objects.isNull(sql) || sql.length == 0) {
+ return;
+ }
+ try (Statement statement = connection.createStatement()) {
+ for (String s : sql) {
+ if (StringUtils.isNotEmpty(s)) {
+ logger.info("start to execute sql={}", s);
+ statement.execute(s);
+ }
+ }
+ } catch (SQLException e) {
+ throw new DorisRuntimeException(e);
+ }
+ }
+
+ public static String loadFileContent(String resourcePath) {
+ try (InputStream stream =
+ ContainerUtils.class.getClassLoader().getResourceAsStream(resourcePath)) {
+ return new BufferedReader(new InputStreamReader(Objects.requireNonNull(stream)))
+ .lines()
+ .collect(Collectors.joining("\n"));
+ } catch (IOException e) {
+ throw new DorisRuntimeException("Failed to read " + resourcePath + " file.", e);
+ }
+ }
+
+ public static List<String> parseFileArgs(String resourcePath) {
+ String fileContent = ContainerUtils.loadFileContent(resourcePath);
+ String[] args = fileContent.split("\n");
+ List<String> argList = new ArrayList<>();
+ for (String arg : args) {
+ String[] split = arg.trim().split("\\s+");
+ List<String> stringList =
+ Arrays.stream(split)
+ .map(ContainerUtils::removeQuotes)
+ .collect(Collectors.toList());
+ argList.addAll(stringList);
+ }
+ return argList;
+ }
+
+ private static String removeQuotes(String str) {
+ if (str == null || str.length() < 2) {
+ return str;
+ }
+ if (str.startsWith("\"") && str.endsWith("\"")) {
+ return str.substring(1, str.length() - 1);
+ }
+ if (str.startsWith("\\'") && str.endsWith("\\'")) {
+ return str.substring(1, str.length() - 1);
+ }
+ return str;
+ }
+
+ public static String[] parseFileContentSQL(String resourcePath) {
+ String fileContent = loadFileContent(resourcePath);
+ return Arrays.stream(fileContent.split(";")).map(String::trim).toArray(String[]::new);
+ }
+
+ public static void checkResult(
+ Connection connection,
+ Logger logger,
+ List<String> expected,
+ String query,
+ int columnSize) {
+ List<String> actual = new ArrayList<>();
+ try (Statement statement = connection.createStatement()) {
+ ResultSet sinkResultSet = statement.executeQuery(query);
+ while (sinkResultSet.next()) {
+ List<String> row = new ArrayList<>();
+ for (int i = 1; i <= columnSize; i++) {
+ Object value = sinkResultSet.getObject(i);
+ if (value == null) {
+ row.add("null");
+ } else {
+ row.add(value.toString());
+ }
+ }
+ actual.add(StringUtils.join(row, ","));
+ }
+ } catch (SQLException e) {
+ logger.info(
+ "Failed to check query result. expected={}, actual={}",
+ String.join(",", expected),
+ String.join(",", actual),
+ e);
+ throw new DorisRuntimeException(e);
+ }
+ logger.info(
+ "checking test result. expected={}, actual={}",
+ String.join(",", expected),
+ String.join(",", actual));
+ Assert.assertArrayEquals(expected.toArray(), actual.toArray());
+ }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java
new file mode 100644
index 0000000..fcb4858
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.container.e2e;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import org.apache.doris.flink.container.AbstractE2EService;
+import org.apache.doris.flink.container.ContainerUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+public class Doris2DorisE2ECase extends AbstractE2EService {
+ private static final Logger LOG = LoggerFactory.getLogger(Doris2DorisE2ECase.class);
+ private static final String DATABASE_SOURCE = "test_doris2doris_source";
+ private static final String DATABASE_SINK = "test_doris2doris_sink";
+ private static final String TABLE = "test_tbl";
+
+ @Before
+ public void setUp() throws InterruptedException {
+ LOG.info("Doris2DorisE2ECase attempting to acquire semaphore.");
+ SEMAPHORE.acquire();
+ LOG.info("Doris2DorisE2ECase semaphore acquired.");
+ }
+
+ @Test
+ public void testDoris2Doris() throws Exception {
+ LOG.info("Start executing the test case of doris to doris.");
+ initializeDorisTable();
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ String sourceDDL =
+ String.format(
+ "CREATE TABLE doris_source ("
+ + "id int,\n"
+ + "c1 boolean,\n"
+ + "c2 tinyint,\n"
+ + "c3 smallint,\n"
+ + "c4 int, \n"
+ + "c5 bigint, \n"
+ + "c6 string, \n"
+ + "c7 float, \n"
+ + "c8 double, \n"
+ + "c9 decimal(12,4), \n"
+ + "c10 date, \n"
+ + "c11 TIMESTAMP, \n"
+ + "c12 char(1), \n"
+ + "c13 varchar(256), \n"
+ + "c14 Array<String>, \n"
+ + "c15 Map<String, String>, \n"
+ + "c16 ROW<name String, age int>, \n"
+ + "c17 STRING \n"
+ + ") WITH ("
+ + " 'connector' = 'doris',"
+ + " 'fenodes' = '%s',"
+ + " 'table.identifier' = '%s',"
+ + " 'sink.label-prefix' = '"
+ + UUID.randomUUID()
+ + "',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s'"
+ + ")",
+ getFenodes(),
+ DATABASE_SOURCE + "." + TABLE,
+ getDorisUsername(),
+ getDorisPassword());
+ tEnv.executeSql(sourceDDL);
+
+ String sinkDDL =
+ String.format(
+ "CREATE TABLE doris_sink ("
+ + "id int,\n"
+ + "c1 boolean,\n"
+ + "c2 tinyint,\n"
+ + "c3 smallint,\n"
+ + "c4 int, \n"
+ + "c5 bigint, \n"
+ + "c6 string, \n"
+ + "c7 float, \n"
+ + "c8 double, \n"
+ + "c9 decimal(12,4), \n"
+ + "c10 date, \n"
+ + "c11 TIMESTAMP, \n"
+ + "c12 char(1), \n"
+ + "c13 varchar(256), \n"
+ + "c14 Array<String>, \n"
+ + "c15 Map<String, String>, \n"
+ + "c16 ROW<name String, age int>, \n"
+ + "c17 STRING \n"
+ + ") WITH ("
+ + " 'connector' = 'doris',"
+ + " 'fenodes' = '%s',"
+ + " 'sink.label-prefix' = '"
+ + UUID.randomUUID()
+ + "',"
+ + " 'table.identifier' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s'"
+ + ")",
+ getFenodes(),
+ DATABASE_SINK + "." + TABLE,
+ getDorisUsername(),
+ getDorisPassword());
+ tEnv.executeSql(sinkDDL);
+
+ tEnv.executeSql("INSERT INTO doris_sink SELECT * FROM doris_source").await();
+
+ TableResult tableResult = tEnv.executeSql("SELECT * FROM doris_sink");
+ List<Object> actual = new ArrayList<>();
+ try (CloseableIterator<Row> iterator = tableResult.collect()) {
+ while (iterator.hasNext()) {
+ actual.add(iterator.next().toString());
+ }
+ }
+ LOG.info("The actual data in the doris sink table is, actual={}", actual);
+
+ String[] expected =
+ new String[] {
+ "+I[1, true, 127, 32767, 2147483647, 9223372036854775807, 123456789012345678901234567890, 3.14, 2.7182818284, 12345.6789, 2023-05-22, 2023-05-22T12:34:56, A, Example text, [item1, item2, item3], {key1=value1, key2=value2}, +I[John Doe, 30], {\"key\":\"value\"}]",
+ "+I[2, false, -128, -32768, -2147483648, -9223372036854775808, -123456789012345678901234567890, -3.14, -2.7182818284, -12345.6789, 2024-01-01, 2024-01-01T00:00, B, Another example, [item4, item5, item6], {key3=value3, key4=value4}, +I[Jane Doe, 25], {\"another_key\":\"another_value\"}]"
+ };
+ Assert.assertArrayEquals(expected, actual.toArray(new String[0]));
+ }
+
+ private void initializeDorisTable() {
+ String[] sourceInitSql =
+ ContainerUtils.parseFileContentSQL(
+ "container/e2e/doris2doris/test_doris2doris_source_test_tbl.sql");
+ ContainerUtils.executeSQLStatement(getDorisQueryConnection(), LOG, sourceInitSql);
+ String[] sinkInitSql =
+ ContainerUtils.parseFileContentSQL(
+ "container/e2e/doris2doris/test_doris2doris_sink_test_tbl.sql");
+ ContainerUtils.executeSQLStatement(getDorisQueryConnection(), LOG, sinkInitSql);
+ LOG.info("Initialization of doris table successful.");
+ }
+
+ @After
+ public void close() {
+ try {
+ // Ensure that semaphore is always released
+ } finally {
+ LOG.info("Doris2DorisE2ECase releasing semaphore.");
+ SEMAPHORE.release();
+ }
+ }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java
new file mode 100644
index 0000000..68b5d43
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java
@@ -0,0 +1,391 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.container.e2e;
+
+import org.apache.doris.flink.container.AbstractE2EService;
+import org.apache.doris.flink.container.ContainerUtils;
+import org.apache.doris.flink.tools.cdc.DatabaseSyncConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class Mysql2DorisE2ECase extends AbstractE2EService {
+ private static final Logger LOG = LoggerFactory.getLogger(Mysql2DorisE2ECase.class);
+ private static final String DATABASE = "test_e2e_mysql";
+ private static final String CREATE_DATABASE = "CREATE DATABASE IF NOT EXISTS " + DATABASE;
+ private static final String MYSQL_CONF = "--" + DatabaseSyncConfig.MYSQL_CONF;
+
+ @Before
+ public void setUp() throws InterruptedException {
+ LOG.info("Mysql2DorisE2ECase attempting to acquire semaphore.");
+ SEMAPHORE.acquire();
+ LOG.info("Mysql2DorisE2ECase semaphore acquired.");
+ }
+
+ private List<String> setMysql2DorisDefaultConfig(List<String> argList) {
+ // set default mysql config
+ argList.add(MYSQL_CONF);
+ argList.add(HOSTNAME + "=" + getMySQLInstanceHost());
+ argList.add(MYSQL_CONF);
+ argList.add(PORT + "=" + getMySQLQueryPort());
+ argList.add(MYSQL_CONF);
+ argList.add(USERNAME + "=" + getMySQLUsername());
+ argList.add(MYSQL_CONF);
+ argList.add(PASSWORD + "=" + getMySQLPassword());
+ argList.add(MYSQL_CONF);
+ argList.add(DATABASE_NAME + "=" + DATABASE);
+
+ // set doris database
+ argList.add(DORIS_DATABASE);
+ argList.add(DATABASE);
+ setSinkConfDefaultConfig(argList);
+ return argList;
+ }
+
+ private void startMysql2DorisJob(String jobName, String resourcePath) {
+ LOG.info("start a mysql to doris job. jobName={}, resourcePath={}", jobName, resourcePath);
+ List<String> argList = ContainerUtils.parseFileArgs(resourcePath);
+ String[] args = setMysql2DorisDefaultConfig(argList).toArray(new String[0]);
+ submitE2EJob(jobName, args);
+ }
+
+ private void initMysqlEnvironment(String sourcePath) {
+ LOG.info("Initializing MySQL environment.");
+ ContainerUtils.executeSQLStatement(
+ getMySQLQueryConnection(), LOG, ContainerUtils.parseFileContentSQL(sourcePath));
+ }
+
+ private void initDorisEnvironment() {
+ LOG.info("Initializing Doris environment.");
+ ContainerUtils.executeSQLStatement(getDorisQueryConnection(), LOG, CREATE_DATABASE);
+ ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection(),
+ LOG,
+ "DROP TABLE IF EXISTS test_e2e_mysql.tbl1",
+ "DROP TABLE IF EXISTS test_e2e_mysql.tbl2",
+ "DROP TABLE IF EXISTS test_e2e_mysql.tbl3",
+ "DROP TABLE IF EXISTS test_e2e_mysql.tbl4",
+ "DROP TABLE IF EXISTS test_e2e_mysql.tbl5");
+ }
+
+ private void initEnvironment(String jobName, String mysqlSourcePath) {
+ LOG.info(
+ "start to init mysql to doris environment. jobName={}, mysqlSourcePath={}",
+ jobName,
+ mysqlSourcePath);
+ initMysqlEnvironment(mysqlSourcePath);
+ initDorisEnvironment();
+ }
+
+ @Test
+ public void testMySQL2Doris() throws Exception {
+ String jobName = "testMySQL2Doris";
+ String resourcePath = "container/e2e/mysql2doris/testMySQL2Doris.txt";
+ initEnvironment(jobName, "container/e2e/mysql2doris/testMySQL2Doris_init.sql");
+ startMysql2DorisJob(jobName, resourcePath);
+
+ // wait 2 times checkpoint
+ Thread.sleep(20000);
+ LOG.info("Start to verify init result.");
+ List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3", "doris_5,5");
+ String sql1 =
+ "select * from ( select * from test_e2e_mysql.tbl1 union all select * from test_e2e_mysql.tbl2 union all select * from test_e2e_mysql.tbl3 union all select * from test_e2e_mysql.tbl5) res order by 1";
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, sql1, 2);
+
+ // add incremental data
+ ContainerUtils.executeSQLStatement(
+ getMySQLQueryConnection(),
+ LOG,
+ "insert into test_e2e_mysql.tbl1 values ('doris_1_1',10)",
+ "insert into test_e2e_mysql.tbl2 values ('doris_2_1',11)",
+ "insert into test_e2e_mysql.tbl3 values ('doris_3_1',12)",
+ "update test_e2e_mysql.tbl1 set age=18 where name='doris_1'",
+ "delete from test_e2e_mysql.tbl2 where name='doris_2'");
+ Thread.sleep(20000);
+
+ LOG.info("Start to verify incremental data result.");
+ List<String> expected2 =
+ Arrays.asList(
+ "doris_1,18", "doris_1_1,10", "doris_2_1,11", "doris_3,3", "doris_3_1,12");
+ String sql2 =
+ "select * from ( select * from test_e2e_mysql.tbl1 union all select * from test_e2e_mysql.tbl2 union all select * from test_e2e_mysql.tbl3 ) res order by 1";
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected2, sql2, 2);
+
+ // mock schema change
+ LOG.info("start to schema change in mysql.");
+ ContainerUtils.executeSQLStatement(
+ getMySQLQueryConnection(),
+ LOG,
+ "alter table test_e2e_mysql.tbl1 add column c1 varchar(128)",
+ "alter table test_e2e_mysql.tbl1 drop column age");
+ Thread.sleep(10000);
+ ContainerUtils.executeSQLStatement(
+ getMySQLQueryConnection(),
+ LOG,
+ "insert into test_e2e_mysql.tbl1 values ('doris_1_1_1','c1_val')");
+ Thread.sleep(20000);
+ LOG.info("verify tal1 schema change.");
+ List<String> schemaChangeExpected =
+ Arrays.asList("doris_1,null", "doris_1_1,null", "doris_1_1_1,c1_val");
+ String schemaChangeSql = "select * from test_e2e_mysql.tbl1 order by 1";
+ ContainerUtils.checkResult(
+ getDorisQueryConnection(), LOG, schemaChangeExpected, schemaChangeSql, 2);
+ cancelE2EJob(jobName);
+ }
+
+ @Test
+ public void testAutoAddTable() throws InterruptedException {
+ String jobName = "testAutoAddTable";
+ initEnvironment(jobName, "container/e2e/mysql2doris/testAutoAddTable_init.sql");
+ startMysql2DorisJob(jobName, "container/e2e/mysql2doris/testAutoAddTable.txt");
+
+ // wait 2 times checkpoint
+ Thread.sleep(20000);
+ LOG.info("Start to verify init result.");
+ List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3", "doris_5,5");
+ String sql1 =
+ "select * from ( select * from test_e2e_mysql.tbl1 union all select * from test_e2e_mysql.tbl2 union all select * from test_e2e_mysql.tbl3 union all select * from test_e2e_mysql.tbl5) res order by 1";
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, sql1, 2);
+
+ // auto add table
+ LOG.info("starting to create auto_add table.");
+ ContainerUtils.executeSQLStatement(
+ getMySQLQueryConnection(),
+ LOG,
+ "CREATE TABLE test_e2e_mysql.auto_add ( \n"
+ + "`name` varchar(256) primary key,\n"
+ + "`age` int\n"
+ + ")",
+ "insert into test_e2e_mysql.auto_add values ('doris_4_1',4)",
+ "insert into test_e2e_mysql.auto_add values ('doris_4_2',4)");
+ Thread.sleep(20000);
+ List<String> autoAddResult = Arrays.asList("doris_4_1,4", "doris_4_2,4");
+ String autoAddSql = "select * from test_e2e_mysql.auto_add order by 1";
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG, autoAddResult, autoAddSql, 2);
+
+ // incremental data
+ LOG.info("starting to increment data.");
+ ContainerUtils.executeSQLStatement(
+ getMySQLQueryConnection(),
+ LOG,
+ "insert into test_e2e_mysql.tbl1 values ('doris_1_1',10)",
+ "insert into test_e2e_mysql.tbl2 values ('doris_2_1',11)",
+ "insert into test_e2e_mysql.tbl3 values ('doris_3_1',12)",
+ "update test_e2e_mysql.tbl1 set age=18 where name='doris_1'",
+ "delete from test_e2e_mysql.tbl2 where name='doris_2'",
+ "insert into test_e2e_mysql.auto_add values ('doris_4_3',43)",
+ "delete from test_e2e_mysql.auto_add where name='doris_4_2'",
+ "update test_e2e_mysql.auto_add set age=41 where name='doris_4_1'");
+ Thread.sleep(20000);
+ List<String> incrementDataExpected =
+ Arrays.asList(
+ "doris_1,18",
+ "doris_1_1,10",
+ "doris_2_1,11",
+ "doris_3,3",
+ "doris_3_1,12",
+ "doris_4_1,41",
+ "doris_4_3,43");
+ String incrementDataSql =
+ "select * from ( select * from test_e2e_mysql.tbl1 union all select * from test_e2e_mysql.tbl2 union all select * from test_e2e_mysql.tbl3 union all select * from test_e2e_mysql.auto_add) res order by 1";
+ ContainerUtils.checkResult(
+ getDorisQueryConnection(), LOG, incrementDataExpected, incrementDataSql, 2);
+
+ // schema change
+ LOG.info("starting to mock schema change.");
+ ContainerUtils.executeSQLStatement(
+ getMySQLQueryConnection(),
+ LOG,
+ "alter table test_e2e_mysql.auto_add add column c1 varchar(128)",
+ "alter table test_e2e_mysql.auto_add drop column age",
+ "insert into test_e2e_mysql.auto_add values ('doris_4_4','c1_val')");
+ Thread.sleep(20000);
+ List<String> schemaChangeExpected =
+ Arrays.asList("doris_4_1,null", "doris_4_3,null", "doris_4_4,c1_val");
+ String schemaChangeSql = "select * from test_e2e_mysql.auto_add order by 1";
+ ContainerUtils.checkResult(
+ getDorisQueryConnection(), LOG, schemaChangeExpected, schemaChangeSql, 2);
+ cancelE2EJob(jobName);
+ }
+
+ @Test
+ public void testMySQL2DorisSQLParse() throws Exception {
+ String jobName = "testMySQL2DorisSQLParse";
+ String resourcePath = "container/e2e/mysql2doris/testMySQL2DorisSQLParse.txt";
+ initEnvironment(jobName, "container/e2e/mysql2doris/testMySQL2DorisSQLParse_init.sql");
+ startMysql2DorisJob(jobName, resourcePath);
+
+ // wait 2 times checkpoint
+ Thread.sleep(20000);
+ LOG.info("Start to verify init result.");
+ List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3", "doris_5,5");
+ String sql1 =
+ "select * from ( select * from test_e2e_mysql.tbl1 union all select * from test_e2e_mysql.tbl2 union all select * from test_e2e_mysql.tbl3 union all select * from test_e2e_mysql.tbl5) res order by 1";
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, sql1, 2);
+
+ // add incremental data
+ ContainerUtils.executeSQLStatement(
+ getMySQLQueryConnection(),
+ LOG,
+ "insert into test_e2e_mysql.tbl1 values ('doris_1_1',10)",
+ "insert into test_e2e_mysql.tbl2 values ('doris_2_1',11)",
+ "insert into test_e2e_mysql.tbl3 values ('doris_3_1',12)",
+ "update test_e2e_mysql.tbl1 set age=18 where name='doris_1'",
+ "delete from test_e2e_mysql.tbl2 where name='doris_2'");
+ Thread.sleep(20000);
+
+ LOG.info("Start to verify incremental data result.");
+ List<String> expected2 =
+ Arrays.asList(
+ "doris_1,18", "doris_1_1,10", "doris_2_1,11", "doris_3,3", "doris_3_1,12");
+ String sql2 =
+ "select * from ( select * from test_e2e_mysql.tbl1 union all select * from test_e2e_mysql.tbl2 union all select * from test_e2e_mysql.tbl3 ) res order by 1";
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected2, sql2, 2);
+
+ // mock schema change
+ ContainerUtils.executeSQLStatement(
+ getMySQLQueryConnection(),
+ LOG,
+ "alter table test_e2e_mysql.tbl1 add column c1 varchar(128)",
+ "alter table test_e2e_mysql.tbl1 drop column age");
+ Thread.sleep(10000);
+ ContainerUtils.executeSQLStatement(
+ getMySQLQueryConnection(),
+ LOG,
+ "insert into test_e2e_mysql.tbl1 values ('doris_1_1_1','c1_val')");
+ Thread.sleep(20000);
+ LOG.info("verify tal1 schema change.");
+ List<String> schemaChangeExpected =
+ Arrays.asList("doris_1,null", "doris_1_1,null", "doris_1_1_1,c1_val");
+ String schemaChangeSql = "select * from test_e2e_mysql.tbl1 order by 1";
+ ContainerUtils.checkResult(
+ getDorisQueryConnection(), LOG, schemaChangeExpected, schemaChangeSql, 2);
+
+ // mock create table
+ LOG.info("start to create table in mysql.");
+ ContainerUtils.executeSQLStatement(
+ getMySQLQueryConnection(),
+ LOG,
+ "CREATE TABLE test_e2e_mysql.add_tbl (\n"
+ + " `name` varchar(256) primary key,\n"
+ + " `age` int\n"
+ + ");",
+ "insert into test_e2e_mysql.add_tbl values ('doris_1',1)",
+ "insert into test_e2e_mysql.add_tbl values ('doris_2',2)",
+ "insert into test_e2e_mysql.add_tbl values ('doris_3',3)");
+ Thread.sleep(20000);
+ List<String> createTableExpected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3");
+ String createTableSql = "select * from test_e2e_mysql.add_tbl order by 1";
+ ContainerUtils.checkResult(
+ getDorisQueryConnection(), LOG, createTableExpected, createTableSql, 2);
+ cancelE2EJob(jobName);
+ }
+
+ @Test
+ public void testMySQL2DorisByDefault() throws Exception {
+ String jobName = "testMySQL2DorisByDefault";
+ initEnvironment(jobName, "container/e2e/mysql2doris/testMySQL2DorisByDefault_init.sql");
+ startMysql2DorisJob(jobName, "container/e2e/mysql2doris/testMySQL2DorisByDefault.txt");
+
+ // wait 2 times checkpoint
+ Thread.sleep(20000);
+ LOG.info("Start to verify init result.");
+ List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3", "doris_5,5");
+ String sql1 =
+ "select * from ( select * from test_e2e_mysql.tbl1 union all select * from test_e2e_mysql.tbl2 union all select * from test_e2e_mysql.tbl3 union all select * from test_e2e_mysql.tbl5) res order by 1";
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, sql1, 2);
+
+ // add incremental data
+ ContainerUtils.executeSQLStatement(
+ getMySQLQueryConnection(),
+ LOG,
+ "insert into test_e2e_mysql.tbl1 values ('doris_1_1',10)",
+ "insert into test_e2e_mysql.tbl2 values ('doris_2_1',11)",
+ "insert into test_e2e_mysql.tbl3 values ('doris_3_1',12)",
+ "update test_e2e_mysql.tbl1 set age=18 where name='doris_1'",
+ "delete from test_e2e_mysql.tbl2 where name='doris_2'");
+ Thread.sleep(20000);
+
+ LOG.info("Start to verify incremental data result.");
+ List<String> expected2 =
+ Arrays.asList(
+ "doris_1,18", "doris_1_1,10", "doris_2_1,11", "doris_3,3", "doris_3_1,12");
+ String sql2 =
+ "select * from ( select * from test_e2e_mysql.tbl1 union all select * from test_e2e_mysql.tbl2 union all select * from test_e2e_mysql.tbl3 ) res order by 1";
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected2, sql2, 2);
+ cancelE2EJob(jobName);
+ }
+
+ @Test
+ public void testMySQL2DorisEnableDelete() throws Exception {
+ String jobName = "testMySQL2DorisEnableDelete";
+ initEnvironment(jobName, "container/e2e/mysql2doris/testMySQL2DorisEnableDelete_init.sql");
+ startMysql2DorisJob(jobName, "container/e2e/mysql2doris/testMySQL2DorisEnableDelete.txt");
+
+ // wait 2 times checkpoint
+ Thread.sleep(20000);
+ LOG.info("Start to verify init result.");
+ List<String> initExpected =
+ Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3", "doris_5,5");
+ String sql1 =
+ "select * from ( select * from test_e2e_mysql.tbl1 union all select * from test_e2e_mysql.tbl2 union all select * from test_e2e_mysql.tbl3 union all select * from test_e2e_mysql.tbl5) res order by 1";
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG, initExpected, sql1, 2);
+
+ // add incremental data
+ ContainerUtils.executeSQLStatement(
+ getMySQLQueryConnection(),
+ LOG,
+ "insert into test_e2e_mysql.tbl1 values ('doris_1_1',10)",
+ "insert into test_e2e_mysql.tbl2 values ('doris_2_1',11)",
+ "insert into test_e2e_mysql.tbl3 values ('doris_3_1',12)",
+ "update test_e2e_mysql.tbl1 set age=18 where name='doris_1'",
+ "delete from test_e2e_mysql.tbl2 where name='doris_2'",
+ "delete from test_e2e_mysql.tbl3 where name='doris_3'",
+ "delete from test_e2e_mysql.tbl5 where name='doris_5'");
+
+ Thread.sleep(20000);
+ List<String> expected =
+ Arrays.asList(
+ "doris_1,18",
+ "doris_1_1,10",
+ "doris_2,2",
+ "doris_2_1,11",
+ "doris_3,3",
+ "doris_3_1,12",
+ "doris_5,5");
+ String sql =
+ "select * from ( select * from test_e2e_mysql.tbl1 union all select * from test_e2e_mysql.tbl2 union all select * from test_e2e_mysql.tbl3 union all select * from test_e2e_mysql.tbl5) res order by 1";
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, sql, 2);
+ cancelE2EJob(jobName);
+ }
+
+ @After
+ public void close() {
+ try {
+ // Ensure that semaphore is always released
+ } finally {
+ LOG.info("Mysql2DorisE2ECase releasing semaphore.");
+ SEMAPHORE.release();
+ }
+ }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/ContainerService.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/ContainerService.java
new file mode 100644
index 0000000..6ad1e3c
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/ContainerService.java
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.container.instance;
+
+import org.apache.doris.flink.exception.DorisRuntimeException;
+
+import java.sql.Connection;
+
+public interface ContainerService {
+
+ void startContainer();
+
+ boolean isRunning();
+
+ Connection getQueryConnection();
+
+ String getInstanceHost();
+
+ Integer getMappedPort(int originalPort);
+
+ String getUsername();
+
+ String getPassword();
+
+ default String getFenodes() {
+ throw new DorisRuntimeException("Only doris container can implemented.");
+ }
+
+ default String getBenodes() {
+ throw new DorisRuntimeException("Only doris container can implemented.");
+ }
+
+ void close();
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java
new file mode 100644
index 0000000..6af827b
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java
@@ -0,0 +1,234 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.container.instance;
+
+import com.google.common.collect.Lists;
+import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.LockSupport;
+
+public class DorisContainer implements ContainerService {
+ private static final Logger LOG = LoggerFactory.getLogger(DorisContainer.class);
+ private static final String DEFAULT_DOCKER_IMAGE = "apache/doris:doris-all-in-one-2.1.0";
+ private static final String DORIS_DOCKER_IMAGE =
+ System.getProperty("image") == null
+ ? DEFAULT_DOCKER_IMAGE
+ : System.getProperty("image");
+ private static final String DRIVER_JAR =
+ "https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";
+ private static final String JDBC_URL = "jdbc:mysql://%s:9030";
+ private static final String USERNAME = "root";
+ private static final String PASSWORD = "";
+ private final GenericContainer dorisContainer;
+
+ public DorisContainer() {
+ dorisContainer = createDorisContainer();
+ }
+
+ public GenericContainer createDorisContainer() {
+ LOG.info("Will create doris containers.");
+ GenericContainer container =
+ new GenericContainer<>(DORIS_DOCKER_IMAGE)
+ .withNetwork(Network.newNetwork())
+ .withNetworkAliases("DorisContainer")
+ .withPrivilegedMode(true)
+ .withLogConsumer(
+ new Slf4jLogConsumer(
+ DockerLoggerFactory.getLogger(DORIS_DOCKER_IMAGE)))
+ .withExposedPorts(8030, 9030, 8040, 9060);
+
+ container.setPortBindings(
+ Lists.newArrayList(
+ String.format("%s:%s", "8030", "8030"),
+ String.format("%s:%s", "9030", "9030"),
+ String.format("%s:%s", "9060", "9060"),
+ String.format("%s:%s", "8040", "8040")));
+ return container;
+ }
+
+ public void startContainer() {
+ try {
+ LOG.info("Starting doris containers.");
+ // singleton doris container
+ dorisContainer.start();
+ initializeJdbcConnection();
+ printClusterStatus();
+ } catch (Exception ex) {
+ LOG.error("Failed to start containers doris", ex);
+ throw new DorisRuntimeException("Failed to start containers doris", ex);
+ }
+ LOG.info("Doris container started successfully.");
+ }
+
+ @Override
+ public boolean isRunning() {
+ return dorisContainer.isRunning();
+ }
+
+ @Override
+ public Connection getQueryConnection() {
+ LOG.info("Try to get query connection from doris.");
+ String jdbcUrl = String.format(JDBC_URL, dorisContainer.getHost());
+ try {
+ return DriverManager.getConnection(jdbcUrl, USERNAME, PASSWORD);
+ } catch (SQLException e) {
+ LOG.info("Failed to get doris query connection. jdbcUrl={}", jdbcUrl, e);
+ throw new DorisRuntimeException(e);
+ }
+ }
+
+ @Override
+ public String getInstanceHost() {
+ return dorisContainer.getHost();
+ }
+
+ @Override
+ public Integer getMappedPort(int originalPort) {
+ return dorisContainer.getMappedPort(originalPort);
+ }
+
+ @Override
+ public String getUsername() {
+ return USERNAME;
+ }
+
+ @Override
+ public String getPassword() {
+ return PASSWORD;
+ }
+
+ @Override
+ public String getFenodes() {
+ return dorisContainer.getHost() + ":8030";
+ }
+
+ @Override
+ public String getBenodes() {
+ return dorisContainer.getHost() + ":8040";
+ }
+
+ public void close() {
+ LOG.info("Doris container is about to be close.");
+ dorisContainer.close();
+ LOG.info("Doris container closed successfully.");
+ }
+
+ private void initializeJDBCDriver() throws MalformedURLException {
+ URLClassLoader urlClassLoader =
+ new URLClassLoader(
+ new URL[] {new URL(DRIVER_JAR)}, DorisContainer.class.getClassLoader());
+ LOG.info("Try to connect to Doris.");
+ Thread.currentThread().setContextClassLoader(urlClassLoader);
+ }
+
+ private void initializeJdbcConnection() throws Exception {
+ initializeJDBCDriver();
+ try (Connection connection = getQueryConnection();
+ Statement statement = connection.createStatement()) {
+ ResultSet resultSet;
+ do {
+ LOG.info("Waiting for the Backend to start successfully.");
+ resultSet = statement.executeQuery("show backends");
+ } while (!isBeReady(resultSet, Duration.ofSeconds(1L)));
+ }
+ LOG.info("Connected to Doris successfully.");
+ }
+
+ private boolean isBeReady(ResultSet rs, Duration duration) throws SQLException {
+ LockSupport.parkNanos(duration.toNanos());
+ if (rs.next()) {
+ String isAlive = rs.getString("Alive").trim();
+ String totalCap = rs.getString("TotalCapacity").trim();
+ return Boolean.toString(true).equalsIgnoreCase(isAlive)
+ && !"0.000".equalsIgnoreCase(totalCap);
+ }
+ return false;
+ }
+
+ private void printClusterStatus() throws Exception {
+ LOG.info("Current machine IP: {}", dorisContainer.getHost());
+ echo("sh", "-c", "cat /proc/cpuinfo | grep 'cpu cores' | uniq");
+ echo("sh", "-c", "free -h");
+ try (Connection connection =
+ DriverManager.getConnection(
+ String.format(JDBC_URL, dorisContainer.getHost()),
+ USERNAME,
+ PASSWORD);
+ Statement statement = connection.createStatement()) {
+ ResultSet showFrontends = statement.executeQuery("show frontends");
+ LOG.info("Frontends status: {}", convertList(showFrontends));
+ ResultSet showBackends = statement.executeQuery("show backends");
+ LOG.info("Backends status: {}", convertList(showBackends));
+ }
+ }
+
+ private void echo(String... cmd) {
+ try {
+ Process p = Runtime.getRuntime().exec(cmd);
+ InputStream is = p.getInputStream();
+ BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+ String line;
+ while ((line = reader.readLine()) != null) {
+ System.out.println(line);
+ }
+ p.waitFor();
+ is.close();
+ reader.close();
+ p.destroy();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private List<Map> convertList(ResultSet rs) throws SQLException {
+ List<Map> list = new ArrayList<>();
+ ResultSetMetaData metaData = rs.getMetaData();
+ int columnCount = metaData.getColumnCount();
+ while (rs.next()) {
+ Map<String, Object> rowData = new HashMap<>();
+ for (int i = 1; i <= columnCount; i++) {
+ rowData.put(metaData.getColumnName(i), rs.getObject(i));
+ }
+ list.add(rowData);
+ }
+ return list;
+ }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/MySQLContainer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/MySQLContainer.java
new file mode 100644
index 0000000..21b30e8
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/MySQLContainer.java
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.container.instance;
+
+import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.lifecycle.Startables;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.stream.Stream;
+
+public class MySQLContainer implements ContainerService {
+ private static final Logger LOG = LoggerFactory.getLogger(MySQLContainer.class);
+ private static final String MYSQL_VERSION = "mysql:8.0";
+ private static final String USERNAME = "root";
+ private static final String PASSWORD = "123456";
+ private final org.testcontainers.containers.MySQLContainer mysqlcontainer;
+
+ public MySQLContainer() {
+ mysqlcontainer = createContainer();
+ }
+
+ private org.testcontainers.containers.MySQLContainer createContainer() {
+ LOG.info("Will create mysql container.");
+ return new org.testcontainers.containers.MySQLContainer(MYSQL_VERSION)
+ .withUsername(USERNAME)
+ .withPassword(PASSWORD);
+ }
+
+ @Override
+ public void startContainer() {
+ LOG.info("Starting MySQL container.");
+ Startables.deepStart(Stream.of(mysqlcontainer)).join();
+ LOG.info("MySQL Container was started.");
+ }
+
+ @Override
+ public boolean isRunning() {
+ return mysqlcontainer.isRunning();
+ }
+
+ @Override
+ public String getInstanceHost() {
+ return mysqlcontainer.getHost();
+ }
+
+ @Override
+ public Integer getMappedPort(int originalPort) {
+ return mysqlcontainer.getMappedPort(originalPort);
+ }
+
+ @Override
+ public String getUsername() {
+ return USERNAME;
+ }
+
+ @Override
+ public String getPassword() {
+ return PASSWORD;
+ }
+
+ @Override
+ public Connection getQueryConnection() {
+ LOG.info("Try to get query connection from mysql.");
+ try {
+ return DriverManager.getConnection(mysqlcontainer.getJdbcUrl(), USERNAME, PASSWORD);
+ } catch (SQLException e) {
+ LOG.warn(
+ "Failed to get mysql container query connection. jdbcUrl={}, user={}",
+ mysqlcontainer.getJdbcUrl(),
+ USERNAME,
+ e);
+ throw new DorisRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ LOG.info("Stopping MySQL container.");
+ mysqlcontainer.stop();
+ LOG.info("MySQL Container was stopped.");
+ }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
index de0ef04..50bcf6b 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
@@ -26,18 +26,19 @@
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.doris.flink.DorisTestBase;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.container.AbstractITCaseService;
+import org.apache.doris.flink.container.ContainerUtils;
+import org.apache.doris.flink.sink.DorisSink.Builder;
import org.apache.doris.flink.sink.batch.DorisBatchSink;
import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
import org.apache.doris.flink.utils.MockSource;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.Statement;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
@@ -51,7 +52,8 @@
import static org.apache.flink.api.common.JobStatus.RUNNING;
/** DorisSink ITCase with csv and arrow format. */
-public class DorisSinkITCase extends DorisTestBase {
+public class DorisSinkITCase extends AbstractITCaseService {
+ private static final Logger LOG = LoggerFactory.getLogger(DorisSinkITCase.class);
static final String DATABASE = "test_sink";
static final String TABLE_CSV = "tbl_csv";
static final String TABLE_JSON = "tbl_json";
@@ -70,12 +72,20 @@
properties.setProperty("column_separator", ",");
properties.setProperty("line_delimiter", "\n");
properties.setProperty("format", "csv");
- submitJob(TABLE_CSV, properties, new String[] {"doris,1"});
+ DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
+ executionBuilder.setLabelPrefix(UUID.randomUUID().toString()).setStreamLoadProp(properties);
+ DorisOptions.Builder dorisBuilder = DorisOptions.builder();
+ dorisBuilder
+ .setFenodes(getFenodes())
+ .setTableIdentifier(DATABASE + "." + TABLE_CSV)
+ .setUsername(getDorisUsername())
+ .setPassword(getDorisPassword());
+ submitJob(dorisBuilder.build(), executionBuilder.build(), new String[] {"doris,1"});
Thread.sleep(10000);
List<String> expected = Arrays.asList("doris,1");
String query = String.format("select name,age from %s.%s order by 1", DATABASE, TABLE_CSV);
- checkResult(expected, query, 2);
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
}
@Test
@@ -93,9 +103,18 @@
row2.put("name", "doris2");
row2.put("age", 2);
+ DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
+ executionBuilder.setLabelPrefix(UUID.randomUUID().toString()).setStreamLoadProp(properties);
+ DorisOptions.Builder dorisBuilder = DorisOptions.builder();
+ dorisBuilder
+ .setFenodes(getFenodes())
+ .setTableIdentifier(DATABASE + "." + TABLE_JSON)
+ .setUsername(getDorisUsername())
+ .setPassword(getDorisPassword());
+
submitJob(
- TABLE_JSON,
- properties,
+ dorisBuilder.build(),
+ executionBuilder.build(),
new String[] {
new ObjectMapper().writeValueAsString(row1),
new ObjectMapper().writeValueAsString(row2)
@@ -104,28 +123,21 @@
Thread.sleep(10000);
List<String> expected = Arrays.asList("doris1,1", "doris2,2");
String query = String.format("select name,age from %s.%s order by 1", DATABASE, TABLE_JSON);
- checkResult(expected, query, 2);
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
}
- public void submitJob(String table, Properties properties, String[] records) throws Exception {
+ private void submitJob(
+ DorisOptions dorisOptions, DorisExecutionOptions executionOptions, String[] records)
+ throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
- DorisSink.Builder<String> builder = DorisSink.builder();
+ Builder<String> builder = DorisSink.builder();
final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
- DorisOptions.Builder dorisBuilder = DorisOptions.builder();
- dorisBuilder
- .setFenodes(getFenodes())
- .setTableIdentifier(DATABASE + "." + table)
- .setUsername(USERNAME)
- .setPassword(PASSWORD);
- DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
- executionBuilder.setLabelPrefix(UUID.randomUUID().toString()).setStreamLoadProp(properties);
-
builder.setDorisReadOptions(readOptionBuilder.build())
- .setDorisExecutionOptions(executionBuilder.build())
+ .setDorisExecutionOptions(executionOptions)
.setSerializer(new SimpleStringSerializer())
- .setDorisOptions(dorisBuilder.build());
+ .setDorisOptions(dorisOptions);
env.fromElements(records).sinkTo(builder.build());
env.execute();
@@ -168,8 +180,8 @@
getFenodes(),
getBenodes(),
DATABASE + "." + TABLE_JSON_TBL,
- USERNAME,
- PASSWORD);
+ getDorisUsername(),
+ getDorisPassword());
tEnv.executeSql(sinkDDL);
tEnv.executeSql("INSERT INTO doris_sink SELECT 'doris',1 union all SELECT 'flink',2");
@@ -177,7 +189,7 @@
List<String> expected = Arrays.asList("doris,1", "flink,2");
String query =
String.format("select name,age from %s.%s order by 1", DATABASE, TABLE_JSON_TBL);
- checkResult(expected, query, 2);
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
}
@Test
@@ -190,7 +202,7 @@
String sinkDDL =
String.format(
- "CREATE TABLE doris_sink ("
+ "CREATE TABLE doris_sink_batch ("
+ " name STRING,"
+ " age INT"
+ ") WITH ("
@@ -214,17 +226,17 @@
+ ")",
getFenodes(),
DATABASE + "." + TABLE_CSV_BATCH_TBL,
- USERNAME,
- PASSWORD);
+ getDorisUsername(),
+ getDorisPassword());
tEnv.executeSql(sinkDDL);
- tEnv.executeSql("INSERT INTO doris_sink SELECT 'doris',1 union all SELECT 'flink',2");
+ tEnv.executeSql("INSERT INTO doris_sink_batch SELECT 'doris',1 union all SELECT 'flink',2");
Thread.sleep(20000);
List<String> expected = Arrays.asList("doris,1", "flink,2");
String query =
String.format(
"select name,age from %s.%s order by 1", DATABASE, TABLE_CSV_BATCH_TBL);
- checkResult(expected, query, 2);
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
}
@Test
@@ -238,8 +250,8 @@
dorisBuilder
.setFenodes(getFenodes())
.setTableIdentifier(DATABASE + "." + TABLE_CSV_BATCH_DS)
- .setUsername(USERNAME)
- .setPassword(PASSWORD);
+ .setUsername(getDorisUsername())
+ .setPassword(getDorisPassword());
Properties properties = new Properties();
properties.setProperty("column_separator", ",");
properties.setProperty("line_delimiter", "\n");
@@ -264,7 +276,7 @@
String query =
String.format(
"select name,age from %s.%s order by 1", DATABASE, TABLE_CSV_BATCH_DS);
- checkResult(expected, query, 2);
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
}
@Test
@@ -302,8 +314,8 @@
+ ")",
getFenodes(),
DATABASE + "." + TABLE_GROUP_COMMIT,
- USERNAME,
- PASSWORD);
+ getDorisUsername(),
+ getDorisPassword());
tEnv.executeSql(sinkDDL);
tEnv.executeSql(
"INSERT INTO doris_group_commit_sink SELECT 'doris',1 union all SELECT 'group_commit',2 union all SELECT 'flink',3");
@@ -313,8 +325,7 @@
String query =
String.format(
"select name,age from %s.%s order by 1", DATABASE, TABLE_GROUP_COMMIT);
- //
- checkResult(expected, query, 2);
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
}
@Test
@@ -345,8 +356,8 @@
+ ")",
getFenodes(),
DATABASE + "." + TABLE_GZ_FORMAT,
- USERNAME,
- PASSWORD);
+ getDorisUsername(),
+ getDorisPassword());
tEnv.executeSql(sinkDDL);
tEnv.executeSql(
"INSERT INTO doris_gz_format_sink SELECT 'doris',1 union all SELECT 'flink',2");
@@ -355,12 +366,12 @@
List<String> expected = Arrays.asList("doris,1", "flink,2");
String query =
String.format("select name,age from %s.%s order by 1", DATABASE, TABLE_GZ_FORMAT);
- //
- checkResult(expected, query, 2);
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
}
@Test
public void testJobManagerFailoverSink() throws Exception {
+ LOG.info("start to test JobManagerFailoverSink.");
initializeFailoverTable(TABLE_CSV_JM);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
@@ -374,8 +385,8 @@
dorisBuilder
.setFenodes(getFenodes())
.setTableIdentifier(DATABASE + "." + TABLE_CSV_JM)
- .setUsername(USERNAME)
- .setPassword(PASSWORD);
+ .setUsername(getDorisUsername())
+ .setPassword(getDorisPassword());
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
Properties properties = new Properties();
properties.setProperty("column_separator", ",");
@@ -404,20 +415,23 @@
triggerFailover(
FailoverType.JM, jobID, miniClusterResource.getMiniCluster(), () -> sleepMs(100));
+ LOG.info("Waiting the JobManagerFailoverSink job to be finished. jobId={}", jobID);
waitForJobStatus(
jobClient,
Collections.singletonList(FINISHED),
Deadline.fromNow(Duration.ofSeconds(120)));
+ LOG.info("Will check job manager failover sink result.");
List<String> expected =
Arrays.asList("1,0", "1,1", "2,0", "2,1", "3,0", "3,1", "4,0", "4,1", "5,0", "5,1");
String query =
String.format("select id,task_id from %s.%s order by 1,2", DATABASE, TABLE_CSV_JM);
- checkResult(expected, query, 2);
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
}
@Test
public void testTaskManagerFailoverSink() throws Exception {
+ LOG.info("start to test TaskManagerFailoverSink.");
initializeFailoverTable(TABLE_CSV_TM);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
@@ -431,8 +445,8 @@
dorisBuilder
.setFenodes(getFenodes())
.setTableIdentifier(DATABASE + "." + TABLE_CSV_TM)
- .setUsername(USERNAME)
- .setPassword(PASSWORD);
+ .setUsername(getDorisUsername())
+ .setPassword(getDorisPassword());
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
Properties properties = new Properties();
properties.setProperty("column_separator", ",");
@@ -458,16 +472,18 @@
triggerFailover(
FailoverType.TM, jobID, miniClusterResource.getMiniCluster(), () -> sleepMs(100));
+ LOG.info("Waiting the TaskManagerFailoverSink job to be finished. jobId={}", jobID);
waitForJobStatus(
jobClient,
Collections.singletonList(FINISHED),
Deadline.fromNow(Duration.ofSeconds(120)));
+ LOG.info("Will check task manager failover sink result.");
List<String> expected =
Arrays.asList("1,0", "1,1", "2,0", "2,1", "3,0", "3,1", "4,0", "4,1", "5,0", "5,1");
String query =
String.format("select id,task_id from %s.%s order by 1,2", DATABASE, TABLE_CSV_TM);
- checkResult(expected, query, 2);
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
}
private void sleepMs(long millis) {
@@ -477,43 +493,37 @@
}
}
- private void initializeTable(String table) throws Exception {
- try (Connection connection =
- DriverManager.getConnection(
- String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
- Statement statement = connection.createStatement()) {
- statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE));
- statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table));
- statement.execute(
- String.format(
- "CREATE TABLE %s.%s ( \n"
- + "`name` varchar(256),\n"
- + "`age` int\n"
- + ") DISTRIBUTED BY HASH(`name`) BUCKETS 1\n"
- + "PROPERTIES (\n"
- + "\"replication_num\" = \"1\"\n"
- + ")\n",
- DATABASE, table));
- }
+ private void initializeTable(String table) {
+ ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection(),
+ LOG,
+ String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
+ String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table),
+ String.format(
+ "CREATE TABLE %s.%s ( \n"
+ + "`name` varchar(256),\n"
+ + "`age` int\n"
+ + ") DISTRIBUTED BY HASH(`name`) BUCKETS 1\n"
+ + "PROPERTIES (\n"
+ + "\"replication_num\" = \"1\"\n"
+ + ")\n",
+ DATABASE, table));
}
- private void initializeFailoverTable(String table) throws Exception {
- try (Connection connection =
- DriverManager.getConnection(
- String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
- Statement statement = connection.createStatement()) {
- statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE));
- statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table));
- statement.execute(
- String.format(
- "CREATE TABLE %s.%s ( \n"
- + "`id` int,\n"
- + "`task_id` int\n"
- + ") DISTRIBUTED BY HASH(`id`) BUCKETS 1\n"
- + "PROPERTIES (\n"
- + "\"replication_num\" = \"1\"\n"
- + ")\n",
- DATABASE, table));
- }
+ private void initializeFailoverTable(String table) {
+ ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection(),
+ LOG,
+ String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
+ String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table),
+ String.format(
+ "CREATE TABLE %s.%s ( \n"
+ + "`id` int,\n"
+ + "`task_id` int\n"
+ + ") DISTRIBUTED BY HASH(`id`) BUCKETS 1\n"
+ + "PROPERTIES (\n"
+ + "\"replication_num\" = \"1\"\n"
+ + ")\n",
+ DATABASE, table));
}
}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java
index 3dde08d..37ca3a2 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java
@@ -17,31 +17,30 @@
package org.apache.doris.flink.sink.schema;
-import org.apache.doris.flink.DorisTestBase;
import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.catalog.doris.TableSchema;
import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.container.AbstractITCaseService;
+import org.apache.doris.flink.container.ContainerUtils;
import org.apache.doris.flink.exception.IllegalArgumentException;
import org.apache.doris.flink.rest.models.Field;
import org.apache.doris.flink.rest.models.Schema;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
-public class SchemaManagerITCase extends DorisTestBase {
-
+public class SchemaManagerITCase extends AbstractITCaseService {
+ private static final Logger LOG = LoggerFactory.getLogger(SchemaManagerITCase.class);
private static final String DATABASE = "test_sc_db";
private DorisOptions options;
private SchemaChangeManager schemaChangeManager;
@@ -52,34 +51,31 @@
DorisOptions.builder()
.setFenodes(getFenodes())
.setTableIdentifier(DATABASE + ".add_column")
- .setUsername(USERNAME)
- .setPassword(PASSWORD)
+ .setUsername(getDorisUsername())
+ .setPassword(getDorisPassword())
.build();
schemaChangeManager = new SchemaChangeManager(options);
}
- private void initDorisSchemaChangeTable(String table) throws SQLException {
- try (Connection connection =
- DriverManager.getConnection(
- String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
- Statement statement = connection.createStatement()) {
- statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE));
- statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table));
- statement.execute(
- String.format(
- "CREATE TABLE %s.%s ( \n"
- + "`id` varchar(32),\n"
- + "`age` int\n"
- + ") DISTRIBUTED BY HASH(`id`) BUCKETS 1\n"
- + "PROPERTIES (\n"
- + "\"replication_num\" = \"1\"\n"
- + ")\n",
- DATABASE, table));
- }
+ private void initDorisSchemaChangeTable(String table) {
+ ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection(),
+ LOG,
+ String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
+ String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table),
+ String.format(
+ "CREATE TABLE %s.%s ( \n"
+ + "`id` varchar(32),\n"
+ + "`age` int\n"
+ + ") DISTRIBUTED BY HASH(`id`) BUCKETS 1\n"
+ + "PROPERTIES (\n"
+ + "\"replication_num\" = \"1\"\n"
+ + ")\n",
+ DATABASE, table));
}
@Test
- public void testAddColumn() throws SQLException, IOException, IllegalArgumentException {
+ public void testAddColumn() throws IOException, IllegalArgumentException {
String addColumnTbls = "add_column";
initDorisSchemaChangeTable(addColumnTbls);
FieldSchema field = new FieldSchema("c1", "int", "");
@@ -93,7 +89,7 @@
@Test
public void testAddColumnWithChineseComment()
- throws SQLException, IOException, IllegalArgumentException, InterruptedException {
+ throws IOException, IllegalArgumentException, InterruptedException {
String addColumnTbls = "add_column";
initDorisSchemaChangeTable(addColumnTbls);
@@ -149,7 +145,7 @@
}
@Test
- public void testDropColumn() throws SQLException, IOException, IllegalArgumentException {
+ public void testDropColumn() throws IOException, IllegalArgumentException {
String dropColumnTbls = "drop_column";
initDorisSchemaChangeTable(dropColumnTbls);
schemaChangeManager.dropColumn(DATABASE, dropColumnTbls, "age");
@@ -161,7 +157,7 @@
}
@Test
- public void testRenameColumn() throws SQLException, IOException, IllegalArgumentException {
+ public void testRenameColumn() throws IOException, IllegalArgumentException {
String renameColumnTbls = "rename_column";
initDorisSchemaChangeTable(renameColumnTbls);
schemaChangeManager.renameColumn(DATABASE, renameColumnTbls, "age", "age1");
@@ -173,8 +169,7 @@
}
@Test
- public void testModifyColumnComment()
- throws SQLException, IOException, IllegalArgumentException {
+ public void testModifyColumnComment() throws IOException, IllegalArgumentException {
String modifyColumnCommentTbls = "modify_column_comment";
initDorisSchemaChangeTable(modifyColumnCommentTbls);
String columnName = "age";
@@ -188,7 +183,7 @@
@Test
public void testOnlyModifyColumnType()
- throws SQLException, IOException, IllegalArgumentException, InterruptedException {
+ throws IOException, IllegalArgumentException, InterruptedException {
String modifyColumnTbls = "modify_column_type";
String columnName = "age";
String newColumnType = "bigint";
@@ -203,7 +198,7 @@
@Test
public void testModifyColumnTypeAndComment()
- throws SQLException, IOException, IllegalArgumentException, InterruptedException {
+ throws IOException, IllegalArgumentException, InterruptedException {
String modifyColumnTbls = "modify_column_type_and_comment";
initDorisSchemaChangeTable(modifyColumnTbls);
String columnName = "age";
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
index a13e96f..783e6bd 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
@@ -25,32 +25,36 @@
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
-import org.apache.doris.flink.DorisTestBase;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisStreamOptions;
+import org.apache.doris.flink.container.AbstractITCaseService;
+import org.apache.doris.flink.container.ContainerUtils;
import org.apache.doris.flink.datastream.DorisSourceFunction;
import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
+import org.apache.doris.flink.exception.DorisRuntimeException;
import org.junit.Assert;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.Properties;
+import java.util.Set;
/** DorisSource ITCase. */
-public class DorisSourceITCase extends DorisTestBase {
- static final String DATABASE = "test_source";
- static final String TABLE_READ = "tbl_read";
- static final String TABLE_READ_OLD_API = "tbl_read_old_api";
- static final String TABLE_READ_TBL = "tbl_read_tbl";
- static final String TABLE_READ_TBL_OLD_API = "tbl_read_tbl_old_api";
- static final String TABLE_READ_TBL_ALL_OPTIONS = "tbl_read_tbl_all_options";
- static final String TABLE_READ_TBL_PUSH_DOWN = "tbl_read_tbl_push_down";
- static final String TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL =
+public class DorisSourceITCase extends AbstractITCaseService {
+ private static final Logger LOG = LoggerFactory.getLogger(DorisSourceITCase.class);
+ private static final String DATABASE = "test_source";
+ private static final String TABLE_READ = "tbl_read";
+ private static final String TABLE_READ_OLD_API = "tbl_read_old_api";
+ private static final String TABLE_READ_TBL = "tbl_read_tbl";
+ private static final String TABLE_READ_TBL_OLD_API = "tbl_read_tbl_old_api";
+ private static final String TABLE_READ_TBL_ALL_OPTIONS = "tbl_read_tbl_all_options";
+ private static final String TABLE_READ_TBL_PUSH_DOWN = "tbl_read_tbl_push_down";
+ private static final String TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL =
"tbl_read_tbl_push_down_with_union_all";
@Test
@@ -63,8 +67,8 @@
dorisBuilder
.setFenodes(getFenodes())
.setTableIdentifier(DATABASE + "." + TABLE_READ)
- .setUsername(USERNAME)
- .setPassword(PASSWORD);
+ .setUsername(getDorisUsername())
+ .setPassword(getDorisPassword());
DorisSource<List<?>> source =
DorisSource.<List<?>>builder()
@@ -80,7 +84,7 @@
}
}
List<String> expected = Arrays.asList("[doris, 18]", "[flink, 10]", "[apache, 12]");
- Assert.assertArrayEquals(actual.toArray(), expected.toArray());
+ checkResult("testSource", expected.toArray(), actual.toArray());
}
@Test
@@ -89,8 +93,8 @@
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.put("fenodes", getFenodes());
- properties.put("username", USERNAME);
- properties.put("password", PASSWORD);
+ properties.put("username", getDorisUsername());
+ properties.put("password", getDorisPassword());
properties.put("table.identifier", DATABASE + "." + TABLE_READ_OLD_API);
DorisStreamOptions options = new DorisStreamOptions(properties);
@@ -105,7 +109,7 @@
}
}
List<String> expected = Arrays.asList("[doris, 18]", "[flink, 10]", "[apache, 12]");
- Assert.assertArrayEquals(actual.toArray(), expected.toArray());
+ checkResult("testOldSourceApi", expected.toArray(), actual.toArray());
}
@Test
@@ -128,7 +132,10 @@
+ " 'username' = '%s',"
+ " 'password' = '%s'"
+ ")",
- getFenodes(), DATABASE + "." + TABLE_READ_TBL, USERNAME, PASSWORD);
+ getFenodes(),
+ DATABASE + "." + TABLE_READ_TBL,
+ getDorisUsername(),
+ getDorisPassword());
tEnv.executeSql(sourceDDL);
TableResult tableResult = tEnv.executeSql("SELECT * FROM doris_source");
@@ -151,7 +158,7 @@
}
}
String[] expectedFilter = new String[] {"+I[doris, 18]"};
- Assert.assertArrayEquals(expectedFilter, actualFilter.toArray());
+ checkResult("testTableSource", expectedFilter, actualFilter.toArray());
}
@Test
@@ -163,7 +170,7 @@
String sourceDDL =
String.format(
- "CREATE TABLE doris_source ("
+ "CREATE TABLE doris_source_old_api ("
+ " name STRING,"
+ " age INT"
+ ") WITH ("
@@ -174,9 +181,12 @@
+ " 'username' = '%s',"
+ " 'password' = '%s'"
+ ")",
- getFenodes(), DATABASE + "." + TABLE_READ_TBL_OLD_API, USERNAME, PASSWORD);
+ getFenodes(),
+ DATABASE + "." + TABLE_READ_TBL_OLD_API,
+ getDorisUsername(),
+ getDorisPassword());
tEnv.executeSql(sourceDDL);
- TableResult tableResult = tEnv.executeSql("SELECT * FROM doris_source");
+ TableResult tableResult = tEnv.executeSql("SELECT * FROM doris_source_old_api");
List<String> actual = new ArrayList<>();
try (CloseableIterator<Row> iterator = tableResult.collect()) {
@@ -185,7 +195,7 @@
}
}
String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]", "+I[apache, 12]"};
- Assert.assertArrayEquals(expected, actual.toArray());
+ checkResult("testTableSourceOldApi", expected, actual.toArray());
}
@Test
@@ -197,7 +207,7 @@
String sourceDDL =
String.format(
- "CREATE TABLE doris_source ("
+ "CREATE TABLE doris_source_all_options ("
+ " name STRING,"
+ " age INT"
+ ") WITH ("
@@ -219,10 +229,10 @@
+ ")",
getFenodes(),
DATABASE + "." + TABLE_READ_TBL_ALL_OPTIONS,
- USERNAME,
- PASSWORD);
+ getDorisUsername(),
+ getDorisPassword());
tEnv.executeSql(sourceDDL);
- TableResult tableResult = tEnv.executeSql("SELECT * FROM doris_source");
+ TableResult tableResult = tEnv.executeSql("SELECT * FROM doris_source_all_options");
List<String> actual = new ArrayList<>();
try (CloseableIterator<Row> iterator = tableResult.collect()) {
@@ -231,7 +241,7 @@
}
}
String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]", "+I[apache, 12]"};
- Assert.assertArrayEquals(expected, actual.toArray());
+ checkResult("testTableSourceAllOptions", expected, actual.toArray());
}
@Test
@@ -243,7 +253,7 @@
String sourceDDL =
String.format(
- "CREATE TABLE doris_source ("
+ "CREATE TABLE doris_source_filter_and_projection_push_down ("
+ " name STRING,"
+ " age INT"
+ ") WITH ("
@@ -255,10 +265,12 @@
+ ")",
getFenodes(),
DATABASE + "." + TABLE_READ_TBL_PUSH_DOWN,
- USERNAME,
- PASSWORD);
+ getDorisUsername(),
+ getDorisPassword());
tEnv.executeSql(sourceDDL);
- TableResult tableResult = tEnv.executeSql("SELECT age FROM doris_source where age = '18'");
+ TableResult tableResult =
+ tEnv.executeSql(
+ "SELECT age FROM doris_source_filter_and_projection_push_down where age = '18'");
List<String> actual = new ArrayList<>();
try (CloseableIterator<Row> iterator = tableResult.collect()) {
@@ -267,11 +279,12 @@
}
}
String[] expected = new String[] {"+I[18]"};
- Assert.assertArrayEquals(expected, actual.toArray());
+ checkResult("testTableSourceFilterAndProjectionPushDown", expected, actual.toArray());
}
@Test
- public void testTableSourceFilterWithUnionAll() throws Exception {
+ public void testTableSourceFilterWithUnionAll() {
+ LOG.info("starting to execute testTableSourceFilterWithUnionAll case.");
initializeTable(TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
@@ -279,7 +292,7 @@
String sourceDDL =
String.format(
- "CREATE TABLE doris_source ("
+ "CREATE TABLE doris_source_filter_with_union_all ("
+ " name STRING,"
+ " age INT"
+ ") WITH ("
@@ -291,48 +304,56 @@
+ ")",
getFenodes(),
DATABASE + "." + TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL,
- USERNAME,
- PASSWORD);
+ getDorisUsername(),
+ getDorisPassword());
tEnv.executeSql(sourceDDL);
- TableResult tableResult =
- tEnv.executeSql(
- " SELECT * FROM doris_source where age = '18'"
- + " UNION ALL "
- + "SELECT * FROM doris_source where age = '10' ");
+ String querySql =
+ " SELECT * FROM doris_source_filter_with_union_all where age = '18'"
+ + " UNION ALL "
+ + "SELECT * FROM doris_source_filter_with_union_all where age = '10'";
+ TableResult tableResult = tEnv.executeSql(querySql);
List<String> actual = new ArrayList<>();
try (CloseableIterator<Row> iterator = tableResult.collect()) {
while (iterator.hasNext()) {
actual.add(iterator.next().toString());
}
+ } catch (Exception e) {
+ LOG.error("Failed to execute sql. sql={}", querySql, e);
+ throw new DorisRuntimeException(e);
}
- String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]"};
- Assert.assertArrayEquals(expected, actual.toArray());
+ Set<String> expected = new HashSet<>(Arrays.asList("+I[flink, 10]", "+I[doris, 18]"));
+ for (String a : actual) {
+ Assert.assertTrue(expected.contains(a));
+ }
}
- private void initializeTable(String table) throws Exception {
- try (Connection connection =
- DriverManager.getConnection(
- String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
- Statement statement = connection.createStatement()) {
- statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE));
- statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table));
- statement.execute(
- String.format(
- "CREATE TABLE %s.%s ( \n"
- + "`name` varchar(256),\n"
- + "`age` int\n"
- + ") DISTRIBUTED BY HASH(`name`) BUCKETS 1\n"
- + "PROPERTIES (\n"
- + "\"replication_num\" = \"1\"\n"
- + ")\n",
- DATABASE, table));
- statement.execute(
- String.format("insert into %s.%s values ('doris',18)", DATABASE, table));
- statement.execute(
- String.format("insert into %s.%s values ('flink',10)", DATABASE, table));
- statement.execute(
- String.format("insert into %s.%s values ('apache',12)", DATABASE, table));
- }
+ private void checkResult(String testName, Object[] expected, Object[] actual) {
+ LOG.info(
+ "Checking DorisSourceITCase result. testName={}, actual={}, expected={}",
+ testName,
+ actual,
+ expected);
+ Assert.assertArrayEquals(expected, actual);
+ }
+
+ private void initializeTable(String table) {
+ ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection(),
+ LOG,
+ String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
+ String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table),
+ String.format(
+ "CREATE TABLE %s.%s ( \n"
+ + "`name` varchar(256),\n"
+ + "`age` int\n"
+ + ") DISTRIBUTED BY HASH(`name`) BUCKETS 1\n"
+ + "PROPERTIES (\n"
+ + "\"replication_num\" = \"1\"\n"
+ + ")\n",
+ DATABASE, table),
+ String.format("insert into %s.%s values ('doris',18)", DATABASE, table),
+ String.format("insert into %s.%s values ('flink',10)", DATABASE, table),
+ String.format("insert into %s.%s values ('apache',12)", DATABASE, table));
}
}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunctionITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunctionITCase.java
index 0ad1781..7f9021f 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunctionITCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunctionITCase.java
@@ -26,15 +26,15 @@
import org.apache.flink.util.Collector;
import com.google.common.cache.Cache;
-import org.apache.doris.flink.DorisTestBase;
import org.apache.doris.flink.cfg.DorisLookupOptions;
import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.container.AbstractITCaseService;
+import org.apache.doris.flink.container.ContainerUtils;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -43,45 +43,43 @@
import static org.junit.Assert.assertEquals;
-public class DorisRowDataJdbcLookupFunctionITCase extends DorisTestBase {
+public class DorisRowDataJdbcLookupFunctionITCase extends AbstractITCaseService {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DorisRowDataJdbcLookupFunctionITCase.class);
private static final String LOOKUP_TABLE = "test.t_lookup_table";
- private static String[] fieldNames = new String[] {"id1", "id2", "c_string", "c_double"};
- private static DataType[] fieldDataTypes =
+ private static final String[] fieldNames = new String[] {"id1", "id2", "c_string", "c_double"};
+ private static final DataType[] fieldDataTypes =
new DataType[] {
DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.DOUBLE()
};
- private static String[] lookupKeys = new String[] {"id1", "id2"};
- private static int[] keyIndexs = new int[] {0, 1};
+ private static final String[] lookupKeys = new String[] {"id1", "id2"};
+ private static final int[] keyIndexs = new int[] {0, 1};
@Before
public void setUp() throws Exception {
- try (Connection connection =
- DriverManager.getConnection(
- String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
- Statement statement = connection.createStatement()) {
- statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", "test"));
- statement.execute(String.format("DROP TABLE IF EXISTS %s", LOOKUP_TABLE));
- statement.execute(
- String.format(
- "CREATE TABLE %s ( \n"
- + "`id1` int,\n"
- + "`id2` varchar(128),\n"
- + "`c_string` string,\n"
- + "`c_double` double\n"
- + ") DISTRIBUTED BY HASH(`id1`) BUCKETS 1\n"
- + "PROPERTIES (\n"
- + "\"replication_num\" = \"1\"\n"
- + ")\n",
- LOOKUP_TABLE));
- statement.execute(
- String.format(
- "insert into %s values (1,'A','zhangsanA',1.12),"
- + "(1,'A','zhangsanA-1',11.12),"
- + "(2,'B','zhangsanB',2.12),(4,'D','zhangsanD',4.12)",
- LOOKUP_TABLE));
- }
+ ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection(),
+ LOG,
+ String.format("CREATE DATABASE IF NOT EXISTS %s", "test"),
+ String.format("DROP TABLE IF EXISTS %s", LOOKUP_TABLE),
+ String.format(
+ "CREATE TABLE %s ( \n"
+ + "`id1` int,\n"
+ + "`id2` varchar(128),\n"
+ + "`c_string` string,\n"
+ + "`c_double` double\n"
+ + ") DISTRIBUTED BY HASH(`id1`) BUCKETS 1\n"
+ + "PROPERTIES (\n"
+ + "\"replication_num\" = \"1\"\n"
+ + ")\n",
+ LOOKUP_TABLE),
+ String.format(
+ "insert into %s values (1,'A','zhangsanA',1.12),"
+ + "(1,'A','zhangsanA-1',11.12),"
+ + "(2,'B','zhangsanB',2.12),(4,'D','zhangsanD',4.12)",
+ LOOKUP_TABLE));
}
@Test
@@ -167,9 +165,9 @@
DorisOptions.builder()
.setFenodes(getFenodes())
.setTableIdentifier(LOOKUP_TABLE)
- .setJdbcUrl(getJdbcUrl())
- .setUsername(USERNAME)
- .setPassword(PASSWORD)
+ .setJdbcUrl(getDorisQueryUrl())
+ .setUsername(getDorisUsername())
+ .setPassword(getDorisPassword())
.build();
DorisRowDataJdbcLookupFunction lookupFunction =
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisDorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisDorisE2ECase.java
deleted file mode 100644
index 8281350..0000000
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisDorisE2ECase.java
+++ /dev/null
@@ -1,234 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.flink.tools.cdc;
-
-import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.CloseableIterator;
-
-import org.apache.doris.flink.DorisTestBase;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-/** DorisDorisE2ECase. */
-public class DorisDorisE2ECase extends DorisTestBase {
- private static final String DATABASE_SOURCE = "test_e2e_source";
- private static final String DATABASE_SINK = "test_e2e_sink";
- private static final String TABLE = "test_tbl";
-
- @Test
- public void testDoris2Doris() throws Exception {
- initializeDorisTable(TABLE);
- printClusterStatus();
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- env.setRuntimeMode(RuntimeExecutionMode.BATCH);
- final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
-
- String sourceDDL =
- String.format(
- "CREATE TABLE doris_source ("
- + "id int,\n"
- + "c1 boolean,\n"
- + "c2 tinyint,\n"
- + "c3 smallint,\n"
- + "c4 int, \n"
- + "c5 bigint, \n"
- + "c6 string, \n"
- + "c7 float, \n"
- + "c8 double, \n"
- + "c9 decimal(12,4), \n"
- + "c10 date, \n"
- + "c11 TIMESTAMP, \n"
- + "c12 char(1), \n"
- + "c13 varchar(256), \n"
- + "c14 Array<String>, \n"
- + "c15 Map<String, String>, \n"
- + "c16 ROW<name String, age int>, \n"
- + "c17 STRING \n"
- + ") WITH ("
- + " 'connector' = 'doris',"
- + " 'fenodes' = '%s',"
- + " 'table.identifier' = '%s',"
- + " 'sink.label-prefix' = '"
- + UUID.randomUUID()
- + "',"
- + " 'username' = '%s',"
- + " 'password' = '%s'"
- + ")",
- getFenodes(),
- DATABASE_SOURCE + "." + TABLE,
- USERNAME,
- PASSWORD);
- tEnv.executeSql(sourceDDL);
-
- String sinkDDL =
- String.format(
- "CREATE TABLE doris_sink ("
- + "id int,\n"
- + "c1 boolean,\n"
- + "c2 tinyint,\n"
- + "c3 smallint,\n"
- + "c4 int, \n"
- + "c5 bigint, \n"
- + "c6 string, \n"
- + "c7 float, \n"
- + "c8 double, \n"
- + "c9 decimal(12,4), \n"
- + "c10 date, \n"
- + "c11 TIMESTAMP, \n"
- + "c12 char(1), \n"
- + "c13 varchar(256), \n"
- + "c14 Array<String>, \n"
- + "c15 Map<String, String>, \n"
- + "c16 ROW<name String, age int>, \n"
- + "c17 STRING \n"
- + ") WITH ("
- + " 'connector' = 'doris',"
- + " 'fenodes' = '%s',"
- + " 'sink.label-prefix' = '"
- + UUID.randomUUID()
- + "',"
- + " 'table.identifier' = '%s',"
- + " 'username' = '%s',"
- + " 'password' = '%s'"
- + ")",
- getFenodes(),
- DATABASE_SINK + "." + TABLE,
- USERNAME,
- PASSWORD);
- tEnv.executeSql(sinkDDL);
-
- tEnv.executeSql("INSERT INTO doris_sink SELECT * FROM doris_source").await();
-
- TableResult tableResult = tEnv.executeSql("SELECT * FROM doris_sink");
- List<Object> actual = new ArrayList<>();
- try (CloseableIterator<Row> iterator = tableResult.collect()) {
- while (iterator.hasNext()) {
- actual.add(iterator.next().toString());
- }
- }
- System.out.println(actual);
- String[] expected =
- new String[] {
- "+I[1, true, 127, 32767, 2147483647, 9223372036854775807, 123456789012345678901234567890, 3.14, 2.7182818284, 12345.6789, 2023-05-22, 2023-05-22T12:34:56, A, Example text, [item1, item2, item3], {key1=value1, key2=value2}, +I[John Doe, 30], {\"key\":\"value\"}]",
- "+I[2, false, -128, -32768, -2147483648, -9223372036854775808, -123456789012345678901234567890, -3.14, -2.7182818284, -12345.6789, 2024-01-01, 2024-01-01T00:00, B, Another example, [item4, item5, item6], {key3=value3, key4=value4}, +I[Jane Doe, 25], {\"another_key\":\"another_value\"}]"
- };
- Assert.assertArrayEquals(expected, actual.toArray(new String[0]));
- }
-
- private void initializeDorisTable(String table) throws Exception {
- try (Connection connection =
- DriverManager.getConnection(
- String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
- Statement statement = connection.createStatement()) {
- statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE_SOURCE));
- statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE_SINK));
- statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE_SOURCE, table));
- statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE_SINK, table));
- statement.execute(
- String.format(
- "CREATE TABLE %s.%s (\n"
- + " `id` int,\n"
- + " `c1` boolean,\n"
- + " `c2` tinyint,\n"
- + " `c3` smallint,\n"
- + " `c4` int,\n"
- + " `c5` bigint,\n"
- + " `c6` largeint,\n"
- + " `c7` float,\n"
- + " `c8` double,\n"
- + " `c9` decimal(12,4),\n"
- + " `c10` date,\n"
- + " `c11` datetime,\n"
- + " `c12` char(1),\n"
- + " `c13` varchar(256),\n"
- + " `c14` Array<String>,\n"
- + " `c15` Map<String, String>,\n"
- + " `c16` Struct<name: String, age: int>,\n"
- + " `c17` JSON\n"
- + " )\n"
- + "DUPLICATE KEY(`id`)\n"
- + "DISTRIBUTED BY HASH(`id`) BUCKETS 1\n"
- + "PROPERTIES (\n"
- + "\"replication_num\" = \"1\",\n"
- + "\"light_schema_change\" = \"true\"\n"
- + ");",
- DATABASE_SOURCE, table));
- statement.execute(
- String.format(
- "CREATE TABLE %s.%s like %s.%s",
- DATABASE_SINK, table, DATABASE_SOURCE, table));
- statement.execute(
- String.format(
- "INSERT INTO %s.%s \n"
- + "VALUES \n"
- + "(\n"
- + " 1, \n"
- + " TRUE, \n"
- + " 127, \n"
- + " 32767, \n"
- + " 2147483647, \n"
- + " 9223372036854775807, \n"
- + " 123456789012345678901234567890, \n"
- + " 3.14, \n"
- + " 2.7182818284, \n"
- + " 12345.6789, \n"
- + " '2023-05-22', \n"
- + " '2023-05-22 12:34:56', \n"
- + " 'A', \n"
- + " 'Example text', \n"
- + " ['item1', 'item2', 'item3'], \n"
- + " {'key1': 'value1', 'key2': 'value2'}, \n"
- + " STRUCT('John Doe', 30), \n"
- + " '{\"key\": \"value\"}' \n"
- + "),\n"
- + "(\n"
- + " 2,\n"
- + " FALSE,\n"
- + " -128,\n"
- + " -32768,\n"
- + " -2147483648,\n"
- + " -9223372036854775808,\n"
- + " -123456789012345678901234567890,\n"
- + " -3.14,\n"
- + " -2.7182818284,\n"
- + " -12345.6789,\n"
- + " '2024-01-01',\n"
- + " '2024-01-01 00:00:00',\n"
- + " 'B',\n"
- + " 'Another example',\n"
- + " ['item4', 'item5', 'item6'],\n"
- + " {'key3': 'value3', 'key4': 'value4'},\n"
- + " STRUCT('Jane Doe', 25),\n"
- + " '{\"another_key\": \"another_value\"}'\n"
- + ")",
- DATABASE_SOURCE, table));
- }
- }
-}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
deleted file mode 100644
index aeb17c2..0000000
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
+++ /dev/null
@@ -1,779 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.flink.tools.cdc;
-
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.time.Deadline;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-import org.apache.doris.flink.DorisTestBase;
-import org.apache.doris.flink.sink.schema.SchemaChangeMode;
-import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.MySQLContainer;
-import org.testcontainers.lifecycle.Startables;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.stream.Stream;
-
-import static org.apache.flink.api.common.JobStatus.RUNNING;
-
-/**
- * MySQLDorisE2ECase 1. Automatically create tables 2. Schema change event synchronization
- * 3.Synchronization of addition, deletion and modification events 4. CDC multi-table writing.
- */
-public class MySQLDorisE2ECase extends DorisTestBase {
- protected static final Logger LOG = LoggerFactory.getLogger(MySQLDorisE2ECase.class);
- private static final String DATABASE = "test_e2e_mysql";
- private static final String MYSQL_USER = "root";
- private static final String MYSQL_PASSWD = "123456";
- private static final String TABLE_1 = "tbl1";
- private static final String TABLE_2 = "tbl2";
- private static final String TABLE_3 = "tbl3";
- private static final String TABLE_4 = "tbl4";
- private static final String TABLE_5 = "tbl5";
- private static final String TABLE_SQL_PARSE = "tbl_sql_parse";
-
- private static final MySQLContainer MYSQL_CONTAINER =
- new MySQLContainer("mysql:8.0")
- .withDatabaseName(DATABASE)
- .withUsername(MYSQL_USER)
- .withPassword(MYSQL_PASSWD);
-
- @BeforeClass
- public static void startMySQLContainers() {
- MYSQL_CONTAINER.setCommand("--default-time-zone=Asia/Shanghai");
- LOG.info("Starting MySQL containers...");
- Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
- LOG.info("MySQL Containers are started.");
- }
-
- @AfterClass
- public static void stopMySQLContainers() {
- LOG.info("Stopping MySQL containers...");
- MYSQL_CONTAINER.stop();
- LOG.info("MySQL Containers are stopped.");
- }
-
- @Test
- public void testMySQL2Doris() throws Exception {
- printClusterStatus();
- initializeMySQLTable();
- initializeDorisTable();
- JobClient jobClient = submitJob();
- // wait 2 times checkpoint
- Thread.sleep(20000);
- List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3", "doris_5,5");
- String sql =
- "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s union all select * from %s.%s) res order by 1";
- String query1 =
- String.format(
- sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3, DATABASE,
- TABLE_5);
- checkResult(expected, query1, 2);
-
- // add incremental data
- try (Connection connection =
- DriverManager.getConnection(
- MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
- Statement statement = connection.createStatement()) {
- statement.execute(
- String.format("insert into %s.%s values ('doris_1_1',10)", DATABASE, TABLE_1));
- statement.execute(
- String.format("insert into %s.%s values ('doris_2_1',11)", DATABASE, TABLE_2));
- statement.execute(
- String.format("insert into %s.%s values ('doris_3_1',12)", DATABASE, TABLE_3));
-
- statement.execute(
- String.format(
- "update %s.%s set age=18 where name='doris_1'", DATABASE, TABLE_1));
- statement.execute(
- String.format("delete from %s.%s where name='doris_2'", DATABASE, TABLE_2));
- }
-
- Thread.sleep(20000);
- List<String> expected2 =
- Arrays.asList(
- "doris_1,18", "doris_1_1,10", "doris_2_1,11", "doris_3,3", "doris_3_1,12");
- sql =
- "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s ) res order by 1";
- String query2 = String.format(sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3);
- checkResult(expected2, query2, 2);
-
- // mock schema change
- try (Connection connection =
- DriverManager.getConnection(
- MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
- Statement statement = connection.createStatement()) {
- statement.execute(
- String.format(
- "alter table %s.%s add column c1 varchar(128)", DATABASE, TABLE_1));
- statement.execute(
- String.format("alter table %s.%s drop column age", DATABASE, TABLE_1));
- Thread.sleep(20000);
- statement.execute(
- String.format(
- "insert into %s.%s values ('doris_1_1_1','c1_val')",
- DATABASE, TABLE_1));
- }
- Thread.sleep(20000);
- List<String> expected3 =
- Arrays.asList("doris_1,null", "doris_1_1,null", "doris_1_1_1,c1_val");
- sql = "select * from %s.%s order by 1";
- String query3 = String.format(sql, DATABASE, TABLE_1);
- checkResult(expected3, query3, 2);
- jobClient.cancel().get();
- }
-
- @Test
- public void testAutoAddTable() throws Exception {
- printClusterStatus();
- initializeMySQLTable();
- initializeDorisTable();
- JobClient jobClient = submitJob();
- // wait 2 times checkpoint
- Thread.sleep(20000);
- List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3");
- String sql =
- "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s ) res order by 1";
- String query1 = String.format(sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3);
- checkResult(expected, query1, 2);
-
- // auto create table4
- addTableTable_4();
- Thread.sleep(20000);
- List<String> expected2 = Arrays.asList("doris_4_1,4", "doris_4_2,4");
- sql = "select * from %s.%s order by 1";
- String query2 = String.format(sql, DATABASE, TABLE_4);
- checkResult(expected2, query2, 2);
-
- // add incremental data
- try (Connection connection =
- DriverManager.getConnection(
- MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
- Statement statement = connection.createStatement()) {
- statement.execute(
- String.format("insert into %s.%s values ('doris_1_1',10)", DATABASE, TABLE_1));
- statement.execute(
- String.format("insert into %s.%s values ('doris_2_1',11)", DATABASE, TABLE_2));
- statement.execute(
- String.format("insert into %s.%s values ('doris_3_1',12)", DATABASE, TABLE_3));
- statement.execute(
- String.format("insert into %s.%s values ('doris_4_3',43)", DATABASE, TABLE_4));
-
- statement.execute(
- String.format(
- "update %s.%s set age=18 where name='doris_1'", DATABASE, TABLE_1));
- statement.execute(
- String.format("delete from %s.%s where name='doris_2'", DATABASE, TABLE_2));
- statement.execute(
- String.format("delete from %s.%s where name='doris_4_2'", DATABASE, TABLE_4));
- statement.execute(
- String.format(
- "update %s.%s set age=41 where name='doris_4_1'", DATABASE, TABLE_4));
- }
-
- Thread.sleep(20000);
- List<String> expected3 =
- Arrays.asList(
- "doris_1,18",
- "doris_1_1,10",
- "doris_2_1,11",
- "doris_3,3",
- "doris_3_1,12",
- "doris_4_1,41",
- "doris_4_3,43");
- sql =
- "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s union all select * from %s.%s ) res order by 1";
- String query3 =
- String.format(
- sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3, DATABASE,
- TABLE_4);
- checkResult(expected3, query3, 2);
-
- // mock schema change
- try (Connection connection =
- DriverManager.getConnection(
- MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
- Statement statement = connection.createStatement()) {
- statement.execute(
- String.format(
- "alter table %s.%s add column c1 varchar(128)", DATABASE, TABLE_4));
- statement.execute(
- String.format("alter table %s.%s drop column age", DATABASE, TABLE_4));
- Thread.sleep(20000);
- statement.execute(
- String.format(
- "insert into %s.%s values ('doris_4_4','c1_val')", DATABASE, TABLE_4));
- }
- Thread.sleep(20000);
- List<String> expected4 =
- Arrays.asList("doris_4_1,null", "doris_4_3,null", "doris_4_4,c1_val");
- sql = "select * from %s.%s order by 1";
- String query4 = String.format(sql, DATABASE, TABLE_4);
- checkResult(expected4, query4, 2);
- jobClient.cancel().get();
- }
-
- @Test
- public void testMySQL2DorisSQLParse() throws Exception {
- printClusterStatus();
- initializeMySQLTable();
- initializeDorisTable();
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setRestartStrategy(RestartStrategies.noRestart());
- Map<String, String> flinkMap = new HashMap<>();
- flinkMap.put("execution.checkpointing.interval", "10s");
- flinkMap.put("pipeline.operator-chaining", "false");
- flinkMap.put("parallelism.default", "1");
-
- Configuration configuration = Configuration.fromMap(flinkMap);
- env.configure(configuration);
-
- String database = DATABASE;
- Map<String, String> mysqlConfig = new HashMap<>();
- mysqlConfig.put("database-name", DATABASE);
- mysqlConfig.put("hostname", MYSQL_CONTAINER.getHost());
- mysqlConfig.put("port", MYSQL_CONTAINER.getMappedPort(3306) + "");
- mysqlConfig.put("username", MYSQL_USER);
- mysqlConfig.put("password", MYSQL_PASSWD);
- mysqlConfig.put("server-time-zone", "Asia/Shanghai");
- Configuration config = Configuration.fromMap(mysqlConfig);
-
- Map<String, String> sinkConfig = new HashMap<>();
- sinkConfig.put("fenodes", getFenodes());
- sinkConfig.put("username", USERNAME);
- sinkConfig.put("password", PASSWORD);
- sinkConfig.put("jdbc-url", String.format(DorisTestBase.URL, DORIS_CONTAINER.getHost()));
- sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
- sinkConfig.put("sink.check-interval", "5000");
- Configuration sinkConf = Configuration.fromMap(sinkConfig);
-
- Map<String, String> tableConfig = new HashMap<>();
- tableConfig.put("replication_num", "1");
-
- String includingTables = "tbl.*";
- String excludingTables = "";
- DatabaseSync databaseSync = new MysqlDatabaseSync();
- databaseSync
- .setEnv(env)
- .setDatabase(database)
- .setConfig(config)
- .setIncludingTables(includingTables)
- .setExcludingTables(excludingTables)
- .setIgnoreDefaultValue(false)
- .setSinkConfig(sinkConf)
- .setTableConfig(tableConfig)
- .setCreateTableOnly(false)
- .setNewSchemaChange(true)
- .setSchemaChangeMode(SchemaChangeMode.SQL_PARSER.getName())
- // no single sink
- .setSingleSink(true)
- .create();
- databaseSync.build();
- JobClient jobClient = env.executeAsync();
- waitForJobStatus(
- jobClient,
- Collections.singletonList(RUNNING),
- Deadline.fromNow(Duration.ofSeconds(10)));
-
- // wait 2 times checkpoint
- Thread.sleep(20000);
- List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3", "doris_5,5");
- String sql =
- "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s union all select * from %s.%s) res order by 1";
- String query1 =
- String.format(
- sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3, DATABASE,
- TABLE_5);
- checkResult(expected, query1, 2);
-
- // add incremental data
- try (Connection connection =
- DriverManager.getConnection(
- MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
- Statement statement = connection.createStatement()) {
- statement.execute(
- String.format("insert into %s.%s values ('doris_1_1',10)", DATABASE, TABLE_1));
- statement.execute(
- String.format("insert into %s.%s values ('doris_2_1',11)", DATABASE, TABLE_2));
- statement.execute(
- String.format("insert into %s.%s values ('doris_3_1',12)", DATABASE, TABLE_3));
-
- statement.execute(
- String.format(
- "update %s.%s set age=18 where name='doris_1'", DATABASE, TABLE_1));
- statement.execute(
- String.format("delete from %s.%s where name='doris_2'", DATABASE, TABLE_2));
- }
-
- Thread.sleep(20000);
- List<String> expected2 =
- Arrays.asList(
- "doris_1,18", "doris_1_1,10", "doris_2_1,11", "doris_3,3", "doris_3_1,12");
- sql =
- "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s ) res order by 1";
- String query2 = String.format(sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3);
- checkResult(expected2, query2, 2);
-
- // mock schema change
- try (Connection connection =
- DriverManager.getConnection(
- MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
- Statement statement = connection.createStatement()) {
- statement.execute(
- String.format(
- "alter table %s.%s add column c1 varchar(128)", DATABASE, TABLE_1));
- statement.execute(
- String.format("alter table %s.%s drop column age", DATABASE, TABLE_1));
- Thread.sleep(20000);
- statement.execute(
- String.format(
- "insert into %s.%s values ('doris_1_1_1','c1_val')",
- DATABASE, TABLE_1));
- }
- Thread.sleep(20000);
- List<String> expected3 =
- Arrays.asList("doris_1,null", "doris_1_1,null", "doris_1_1_1,c1_val");
- sql = "select * from %s.%s order by 1";
- String query3 = String.format(sql, DATABASE, TABLE_1);
- checkResult(expected3, query3, 2);
-
- // mock create table
- try (Connection connection =
- DriverManager.getConnection(
- MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
- Statement statement = connection.createStatement()) {
- statement.execute(
- String.format(
- "CREATE TABLE %s.%s ( \n"
- + "`name` varchar(256) primary key,\n"
- + "`age` int\n"
- + ")",
- DATABASE, TABLE_SQL_PARSE));
- statement.execute(
- String.format(
- "insert into %s.%s values ('doris_1',1)", DATABASE, TABLE_SQL_PARSE));
- statement.execute(
- String.format(
- "insert into %s.%s values ('doris_2',2)", DATABASE, TABLE_SQL_PARSE));
- statement.execute(
- String.format(
- "insert into %s.%s values ('doris_3',3)", DATABASE, TABLE_SQL_PARSE));
- }
- Thread.sleep(20000);
- List<String> expected4 = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3");
- sql = "select * from %s.%s order by 1";
- String query4 = String.format(sql, DATABASE, TABLE_SQL_PARSE);
- checkResult(expected4, query4, 2);
-
- jobClient.cancel().get();
- }
-
- @Test
- public void testMySQL2DorisByDefault() throws Exception {
- printClusterStatus();
- initializeMySQLTable();
- initializeDorisTable();
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setRestartStrategy(RestartStrategies.noRestart());
- Map<String, String> flinkMap = new HashMap<>();
- flinkMap.put("execution.checkpointing.interval", "10s");
- flinkMap.put("pipeline.operator-chaining", "false");
- flinkMap.put("parallelism.default", "1");
-
- Configuration configuration = Configuration.fromMap(flinkMap);
- env.configure(configuration);
-
- String database = DATABASE;
- Map<String, String> mysqlConfig = new HashMap<>();
- mysqlConfig.put("database-name", DATABASE);
- mysqlConfig.put("hostname", MYSQL_CONTAINER.getHost());
- mysqlConfig.put("port", MYSQL_CONTAINER.getMappedPort(3306) + "");
- mysqlConfig.put("username", MYSQL_USER);
- mysqlConfig.put("password", MYSQL_PASSWD);
- mysqlConfig.put("server-time-zone", "Asia/Shanghai");
- Configuration config = Configuration.fromMap(mysqlConfig);
-
- Map<String, String> sinkConfig = new HashMap<>();
- sinkConfig.put("fenodes", getFenodes());
- sinkConfig.put("username", USERNAME);
- sinkConfig.put("password", PASSWORD);
- sinkConfig.put("jdbc-url", String.format(DorisTestBase.URL, DORIS_CONTAINER.getHost()));
- sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
- sinkConfig.put("sink.check-interval", "5000");
- Configuration sinkConf = Configuration.fromMap(sinkConfig);
-
- Map<String, String> tableConfig = new HashMap<>();
- tableConfig.put("replication_num", "1");
-
- String includingTables = "tbl1|tbl2|tbl3|tbl5";
- String excludingTables = "";
- DatabaseSync databaseSync = new MysqlDatabaseSync();
- databaseSync
- .setEnv(env)
- .setDatabase(database)
- .setConfig(config)
- .setIncludingTables(includingTables)
- .setExcludingTables(excludingTables)
- .setIgnoreDefaultValue(false)
- .setSinkConfig(sinkConf)
- .setTableConfig(tableConfig)
- .setCreateTableOnly(false)
- .setNewSchemaChange(true)
- // no single sink
- .setSingleSink(false)
- .create();
- databaseSync.build();
- JobClient jobClient = env.executeAsync();
- waitForJobStatus(
- jobClient,
- Collections.singletonList(RUNNING),
- Deadline.fromNow(Duration.ofSeconds(10)));
-
- // wait 2 times checkpoint
- Thread.sleep(20000);
- List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3", "doris_5,5");
- String sql =
- "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s union all select * from %s.%s) res order by 1";
- String query1 =
- String.format(
- sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3, DATABASE,
- TABLE_5);
- checkResult(expected, query1, 2);
-
- // add incremental data
- try (Connection connection =
- DriverManager.getConnection(
- MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
- Statement statement = connection.createStatement()) {
- statement.execute(
- String.format("insert into %s.%s values ('doris_1_1',10)", DATABASE, TABLE_1));
- statement.execute(
- String.format("insert into %s.%s values ('doris_2_1',11)", DATABASE, TABLE_2));
- statement.execute(
- String.format("insert into %s.%s values ('doris_3_1',12)", DATABASE, TABLE_3));
-
- statement.execute(
- String.format(
- "update %s.%s set age=18 where name='doris_1'", DATABASE, TABLE_1));
- statement.execute(
- String.format("delete from %s.%s where name='doris_2'", DATABASE, TABLE_2));
- }
-
- Thread.sleep(20000);
- List<String> expected2 =
- Arrays.asList(
- "doris_1,18", "doris_1_1,10", "doris_2_1,11", "doris_3,3", "doris_3_1,12");
- sql =
- "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s ) res order by 1";
- String query2 = String.format(sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3);
- checkResult(expected2, query2, 2);
- jobClient.cancel().get();
- }
-
- @Test
- public void testMySQL2DorisEnableDelete() throws Exception {
- printClusterStatus();
- initializeMySQLTable();
- initializeDorisTable();
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setRestartStrategy(RestartStrategies.noRestart());
- Map<String, String> flinkMap = new HashMap<>();
- flinkMap.put("execution.checkpointing.interval", "10s");
- flinkMap.put("pipeline.operator-chaining", "false");
- flinkMap.put("parallelism.default", "1");
-
- Configuration configuration = Configuration.fromMap(flinkMap);
- env.configure(configuration);
-
- String database = DATABASE;
- Map<String, String> mysqlConfig = new HashMap<>();
- mysqlConfig.put("database-name", DATABASE);
- mysqlConfig.put("hostname", MYSQL_CONTAINER.getHost());
- mysqlConfig.put("port", MYSQL_CONTAINER.getMappedPort(3306) + "");
- mysqlConfig.put("username", MYSQL_USER);
- mysqlConfig.put("password", MYSQL_PASSWD);
- mysqlConfig.put("server-time-zone", "Asia/Shanghai");
- Configuration config = Configuration.fromMap(mysqlConfig);
-
- Map<String, String> sinkConfig = new HashMap<>();
- sinkConfig.put("fenodes", getFenodes());
- sinkConfig.put("username", USERNAME);
- sinkConfig.put("password", PASSWORD);
- sinkConfig.put("jdbc-url", String.format(DorisTestBase.URL, DORIS_CONTAINER.getHost()));
- sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
- sinkConfig.put("sink.check-interval", "5000");
- sinkConfig.put("sink.enable-delete", "false");
- Configuration sinkConf = Configuration.fromMap(sinkConfig);
-
- Map<String, String> tableConfig = new HashMap<>();
- tableConfig.put("replication_num", "1");
-
- String includingTables = "tbl1|tbl2|tbl3|tbl5";
- String excludingTables = "";
- DatabaseSync databaseSync = new MysqlDatabaseSync();
- databaseSync
- .setEnv(env)
- .setDatabase(database)
- .setConfig(config)
- .setIncludingTables(includingTables)
- .setExcludingTables(excludingTables)
- .setIgnoreDefaultValue(false)
- .setSinkConfig(sinkConf)
- .setTableConfig(tableConfig)
- .setCreateTableOnly(false)
- .setNewSchemaChange(true)
- // no single sink
- .setSingleSink(false)
- .create();
- databaseSync.build();
- JobClient jobClient = env.executeAsync();
- waitForJobStatus(
- jobClient,
- Collections.singletonList(RUNNING),
- Deadline.fromNow(Duration.ofSeconds(10)));
-
- // wait 2 times checkpoint
- Thread.sleep(20000);
- List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3", "doris_5,5");
- String sql =
- "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s union all select * from %s.%s) res order by 1";
- String query1 =
- String.format(
- sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3, DATABASE,
- TABLE_5);
- checkResult(expected, query1, 2);
-
- // add incremental data
- try (Connection connection =
- DriverManager.getConnection(
- MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
- Statement statement = connection.createStatement()) {
- statement.execute(
- String.format("insert into %s.%s values ('doris_1_1',10)", DATABASE, TABLE_1));
- statement.execute(
- String.format("insert into %s.%s values ('doris_2_1',11)", DATABASE, TABLE_2));
- statement.execute(
- String.format("insert into %s.%s values ('doris_3_1',12)", DATABASE, TABLE_3));
-
- statement.execute(
- String.format(
- "update %s.%s set age=18 where name='doris_1'", DATABASE, TABLE_1));
- statement.execute(
- String.format("delete from %s.%s where name='doris_2'", DATABASE, TABLE_2));
- statement.execute(
- String.format("delete from %s.%s where name='doris_3'", DATABASE, TABLE_3));
- statement.execute(
- String.format("delete from %s.%s where name='doris_5'", DATABASE, TABLE_5));
- }
-
- Thread.sleep(20000);
- List<String> expected2 =
- Arrays.asList(
- "doris_1,18",
- "doris_1_1,10",
- "doris_2,2",
- "doris_2_1,11",
- "doris_3,3",
- "doris_3_1,12",
- "doris_5,5");
- sql =
- "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s union all select * from %s.%s) res order by 1";
- String query2 =
- String.format(
- sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3, DATABASE,
- TABLE_5);
- checkResult(expected2, query2, 2);
- jobClient.cancel().get();
- }
-
- private void initializeDorisTable() throws Exception {
- try (Connection connection =
- DriverManager.getConnection(
- String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
- Statement statement = connection.createStatement()) {
- statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE));
- statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_1));
- statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_2));
- statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_3));
- statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_4));
- statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_5));
- // create a table in Doris
- statement.execute(
- String.format(
- "CREATE TABLE %s.%s ( \n"
- + "`name` varchar(256),\n"
- + "`age` int\n"
- + ")\n"
- + "UNIQUE KEY(`name`)\n"
- + "DISTRIBUTED BY HASH(`name`) BUCKETS 1\n"
- + "PROPERTIES ( \n"
- + "\"replication_num\" = \"1\" \n"
- + ");\n",
- DATABASE, TABLE_5));
- }
- }
-
- public JobClient submitJob() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setRestartStrategy(RestartStrategies.noRestart());
- Map<String, String> flinkMap = new HashMap<>();
- flinkMap.put("execution.checkpointing.interval", "10s");
- flinkMap.put("pipeline.operator-chaining", "false");
- flinkMap.put("parallelism.default", "1");
-
- Configuration configuration = Configuration.fromMap(flinkMap);
- env.configure(configuration);
-
- String database = DATABASE;
- Map<String, String> mysqlConfig = new HashMap<>();
- mysqlConfig.put("database-name", DATABASE);
- mysqlConfig.put("hostname", MYSQL_CONTAINER.getHost());
- mysqlConfig.put("port", MYSQL_CONTAINER.getMappedPort(3306) + "");
- mysqlConfig.put("username", MYSQL_USER);
- mysqlConfig.put("password", MYSQL_PASSWD);
- mysqlConfig.put("server-time-zone", "Asia/Shanghai");
- Configuration config = Configuration.fromMap(mysqlConfig);
-
- Map<String, String> sinkConfig = new HashMap<>();
- sinkConfig.put("fenodes", getFenodes());
- sinkConfig.put("username", USERNAME);
- sinkConfig.put("password", PASSWORD);
- sinkConfig.put("jdbc-url", String.format(DorisTestBase.URL, DORIS_CONTAINER.getHost()));
- sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
- Configuration sinkConf = Configuration.fromMap(sinkConfig);
-
- Map<String, String> tableConfig = new HashMap<>();
- tableConfig.put("replication_num", "1");
-
- String includingTables = "tbl.*";
- String excludingTables = "";
- DatabaseSync databaseSync = new MysqlDatabaseSync();
- databaseSync
- .setEnv(env)
- .setDatabase(database)
- .setConfig(config)
- .setIncludingTables(includingTables)
- .setExcludingTables(excludingTables)
- .setIgnoreDefaultValue(false)
- .setSinkConfig(sinkConf)
- .setTableConfig(tableConfig)
- .setCreateTableOnly(false)
- .setNewSchemaChange(true)
- .setSingleSink(true)
- .setIgnoreDefaultValue(true)
- .create();
- databaseSync.build();
- JobClient jobClient = env.executeAsync();
- waitForJobStatus(
- jobClient,
- Collections.singletonList(RUNNING),
- Deadline.fromNow(Duration.ofSeconds(10)));
- return jobClient;
- }
-
- private void addTableTable_4() throws SQLException {
- try (Connection connection =
- DriverManager.getConnection(
- MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
- Statement statement = connection.createStatement()) {
- statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_4));
- statement.execute(
- String.format(
- "CREATE TABLE %s.%s ( \n"
- + "`name` varchar(256) primary key,\n"
- + "`age` int\n"
- + ")",
- DATABASE, TABLE_4));
-
- // mock stock data
- statement.execute(
- String.format("insert into %s.%s values ('doris_4_1',4)", DATABASE, TABLE_4));
- statement.execute(
- String.format("insert into %s.%s values ('doris_4_2',4)", DATABASE, TABLE_4));
- }
- }
-
- public void initializeMySQLTable() throws Exception {
- try (Connection connection =
- DriverManager.getConnection(
- MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
- Statement statement = connection.createStatement()) {
- statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE));
- statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_1));
- statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_2));
- statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_3));
- statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_4));
- statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_5));
- statement.execute(
- String.format(
- "CREATE TABLE %s.%s ( \n"
- + "`name` varchar(256) primary key,\n"
- + "`age` int\n"
- + ")",
- DATABASE, TABLE_1));
- statement.execute(
- String.format(
- "CREATE TABLE %s.%s ( \n"
- + "`name` varchar(256) primary key,\n"
- + "`age` int\n"
- + ")",
- DATABASE, TABLE_2));
- statement.execute(
- String.format(
- "CREATE TABLE %s.%s ( \n"
- + "`name` varchar(256) primary key,\n"
- + "`age` int\n"
- + ")",
- DATABASE, TABLE_3));
- statement.execute(
- String.format(
- "CREATE TABLE %s.%s ( \n"
- + "`name` varchar(256) primary key,\n"
- + "`age` int\n"
- + ")",
- DATABASE, TABLE_5));
- // mock stock data
- statement.execute(
- String.format("insert into %s.%s values ('doris_1',1)", DATABASE, TABLE_1));
- statement.execute(
- String.format("insert into %s.%s values ('doris_2',2)", DATABASE, TABLE_2));
- statement.execute(
- String.format("insert into %s.%s values ('doris_3',3)", DATABASE, TABLE_3));
- statement.execute(
- String.format("insert into %s.%s values ('doris_5',5)", DATABASE, TABLE_5));
- }
- }
-}
diff --git a/flink-doris-connector/src/test/resources/container/e2e/doris2doris/test_doris2doris_sink_test_tbl.sql b/flink-doris-connector/src/test/resources/container/e2e/doris2doris/test_doris2doris_sink_test_tbl.sql
new file mode 100644
index 0000000..20ffb52
--- /dev/null
+++ b/flink-doris-connector/src/test/resources/container/e2e/doris2doris/test_doris2doris_sink_test_tbl.sql
@@ -0,0 +1,31 @@
+CREATE DATABASE IF NOT EXISTS test_doris2doris_sink;
+
+DROP TABLE IF EXISTS test_doris2doris_sink.test_tbl;
+
+CREATE TABLE test_doris2doris_sink.test_tbl (
+ `id` int,
+ `c1` boolean,
+ `c2` tinyint,
+ `c3` smallint,
+ `c4` int,
+ `c5` bigint,
+ `c6` largeint,
+ `c7` float,
+ `c8` double,
+ `c9` decimal(12,4),
+ `c10` date,
+ `c11` datetime,
+ `c12` char(1),
+ `c13` varchar(256),
+ `c14` Array<String>,
+ `c15` Map<String, String>,
+ `c16` Struct<name: String, age: int>,
+ `c17` JSON
+)
+ DUPLICATE KEY(`id`)
+DISTRIBUTED BY HASH(`id`) BUCKETS 1
+PROPERTIES (
+"replication_num" = "1",
+"light_schema_change" = "true"
+);
+
diff --git a/flink-doris-connector/src/test/resources/container/e2e/doris2doris/test_doris2doris_source_test_tbl.sql b/flink-doris-connector/src/test/resources/container/e2e/doris2doris/test_doris2doris_source_test_tbl.sql
new file mode 100644
index 0000000..5e57b50
--- /dev/null
+++ b/flink-doris-connector/src/test/resources/container/e2e/doris2doris/test_doris2doris_source_test_tbl.sql
@@ -0,0 +1,73 @@
+CREATE DATABASE IF NOT EXISTS test_doris2doris_source;
+
+DROP TABLE IF EXISTS test_doris2doris_source.test_tbl;
+
+CREATE TABLE test_doris2doris_source.test_tbl (
+ `id` int,
+ `c1` boolean,
+ `c2` tinyint,
+ `c3` smallint,
+ `c4` int,
+ `c5` bigint,
+ `c6` largeint,
+ `c7` float,
+ `c8` double,
+ `c9` decimal(12,4),
+ `c10` date,
+ `c11` datetime,
+ `c12` char(1),
+ `c13` varchar(256),
+ `c14` Array<String>,
+ `c15` Map<String, String>,
+ `c16` Struct<name: String, age: int>,
+ `c17` JSON
+)
+DUPLICATE KEY(`id`)
+DISTRIBUTED BY HASH(`id`) BUCKETS 1
+PROPERTIES (
+"replication_num" = "1",
+"light_schema_change" = "true"
+);
+
+INSERT INTO test_doris2doris_source.test_tbl
+VALUES
+ (
+ 1,
+ TRUE,
+ 127,
+ 32767,
+ 2147483647,
+ 9223372036854775807,
+ 123456789012345678901234567890,
+ 3.14,
+ 2.7182818284,
+ 12345.6789,
+ '2023-05-22',
+ '2023-05-22 12:34:56',
+ 'A',
+ 'Example text',
+ ['item1', 'item2', 'item3'],
+ {'key1': 'value1', 'key2': 'value2'},
+ STRUCT('John Doe', 30),
+ '{"key": "value"}'
+ ),
+ (
+ 2,
+ FALSE,
+ -128,
+ -32768,
+ -2147483648,
+ -9223372036854775808,
+ -123456789012345678901234567890,
+ -3.14,
+ -2.7182818284,
+ -12345.6789,
+ '2024-01-01',
+ '2024-01-01 00:00:00',
+ 'B',
+ 'Another example',
+ ['item4', 'item5', 'item6'],
+ {'key3': 'value3', 'key4': 'value4'},
+ STRUCT('Jane Doe', 25),
+ '{"another_key": "another_value"}'
+);
\ No newline at end of file
diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable.txt b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable.txt
new file mode 100644
index 0000000..88ec454
--- /dev/null
+++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable.txt
@@ -0,0 +1,5 @@
+mysql-sync-database
+ --including-tables "tbl.*|auto_add"
+ --table-conf replication_num=1
+ --single-sink true
+ --ignore-default-value true
\ No newline at end of file
diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable_init.sql b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable_init.sql
new file mode 100644
index 0000000..ec617f3
--- /dev/null
+++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable_init.sql
@@ -0,0 +1,38 @@
+CREATE DATABASE if NOT EXISTS test_e2e_mysql;
+DROP TABLE IF EXISTS test_e2e_mysql.tbl1;
+CREATE TABLE test_e2e_mysql.tbl1 (
+ `name` varchar(256) primary key,
+ `age` int
+);
+insert into test_e2e_mysql.tbl1 values ('doris_1',1);
+
+
+DROP TABLE IF EXISTS test_e2e_mysql.tbl2;
+CREATE TABLE test_e2e_mysql.tbl2 (
+ `name` varchar(256) primary key,
+ `age` int
+);
+insert into test_e2e_mysql.tbl2 values ('doris_2',2);
+
+
+DROP TABLE IF EXISTS test_e2e_mysql.tbl3;
+CREATE TABLE test_e2e_mysql.tbl3 (
+ `name` varchar(256) primary key,
+ `age` int
+);
+insert into test_e2e_mysql.tbl3 values ('doris_3',3);
+
+
+DROP TABLE IF EXISTS test_e2e_mysql.tbl4;
+CREATE TABLE test_e2e_mysql.tbl4 (
+ `name` varchar(256) primary key,
+ `age` int
+);
+
+
+DROP TABLE IF EXISTS test_e2e_mysql.tbl5;
+CREATE TABLE test_e2e_mysql.tbl5 (
+ `name` varchar(256) primary key,
+ `age` int
+);
+insert into test_e2e_mysql.tbl5 values ('doris_5',5);
\ No newline at end of file
diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt
new file mode 100644
index 0000000..601d083
--- /dev/null
+++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt
@@ -0,0 +1,5 @@
+mysql-sync-database
+ --including-tables "tbl.*"
+ --table-conf replication_num=1
+ --single-sink true
+ --ignore-default-value false
\ No newline at end of file
diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault.txt b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault.txt
new file mode 100644
index 0000000..6f69a75
--- /dev/null
+++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault.txt
@@ -0,0 +1,3 @@
+mysql-sync-database
+ --including-tables "tbl1|tbl2|tbl3|tbl5"
+ --table-conf replication_num=1
\ No newline at end of file
diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault_init.sql b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault_init.sql
new file mode 100644
index 0000000..ec617f3
--- /dev/null
+++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault_init.sql
@@ -0,0 +1,38 @@
+CREATE DATABASE if NOT EXISTS test_e2e_mysql;
+DROP TABLE IF EXISTS test_e2e_mysql.tbl1;
+CREATE TABLE test_e2e_mysql.tbl1 (
+ `name` varchar(256) primary key,
+ `age` int
+);
+insert into test_e2e_mysql.tbl1 values ('doris_1',1);
+
+
+DROP TABLE IF EXISTS test_e2e_mysql.tbl2;
+CREATE TABLE test_e2e_mysql.tbl2 (
+ `name` varchar(256) primary key,
+ `age` int
+);
+insert into test_e2e_mysql.tbl2 values ('doris_2',2);
+
+
+DROP TABLE IF EXISTS test_e2e_mysql.tbl3;
+CREATE TABLE test_e2e_mysql.tbl3 (
+ `name` varchar(256) primary key,
+ `age` int
+);
+insert into test_e2e_mysql.tbl3 values ('doris_3',3);
+
+
+DROP TABLE IF EXISTS test_e2e_mysql.tbl4;
+CREATE TABLE test_e2e_mysql.tbl4 (
+ `name` varchar(256) primary key,
+ `age` int
+);
+
+
+DROP TABLE IF EXISTS test_e2e_mysql.tbl5;
+CREATE TABLE test_e2e_mysql.tbl5 (
+ `name` varchar(256) primary key,
+ `age` int
+);
+insert into test_e2e_mysql.tbl5 values ('doris_5',5);
\ No newline at end of file
diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete.txt b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete.txt
new file mode 100644
index 0000000..1048916
--- /dev/null
+++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete.txt
@@ -0,0 +1,5 @@
+mysql-sync-database
+ --including-tables "tbl1|tbl2|tbl3|tbl5"
+ --table-conf replication_num=1
+ --sink-conf sink.enable-delete=false
+ --sink-conf sink.check-interval=5000
\ No newline at end of file
diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete_init.sql b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete_init.sql
new file mode 100644
index 0000000..ec617f3
--- /dev/null
+++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete_init.sql
@@ -0,0 +1,38 @@
+CREATE DATABASE if NOT EXISTS test_e2e_mysql;
+DROP TABLE IF EXISTS test_e2e_mysql.tbl1;
+CREATE TABLE test_e2e_mysql.tbl1 (
+ `name` varchar(256) primary key,
+ `age` int
+);
+insert into test_e2e_mysql.tbl1 values ('doris_1',1);
+
+
+DROP TABLE IF EXISTS test_e2e_mysql.tbl2;
+CREATE TABLE test_e2e_mysql.tbl2 (
+ `name` varchar(256) primary key,
+ `age` int
+);
+insert into test_e2e_mysql.tbl2 values ('doris_2',2);
+
+
+DROP TABLE IF EXISTS test_e2e_mysql.tbl3;
+CREATE TABLE test_e2e_mysql.tbl3 (
+ `name` varchar(256) primary key,
+ `age` int
+);
+insert into test_e2e_mysql.tbl3 values ('doris_3',3);
+
+
+DROP TABLE IF EXISTS test_e2e_mysql.tbl4;
+CREATE TABLE test_e2e_mysql.tbl4 (
+ `name` varchar(256) primary key,
+ `age` int
+);
+
+
+DROP TABLE IF EXISTS test_e2e_mysql.tbl5;
+CREATE TABLE test_e2e_mysql.tbl5 (
+ `name` varchar(256) primary key,
+ `age` int
+);
+insert into test_e2e_mysql.tbl5 values ('doris_5',5);
\ No newline at end of file
diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse.txt b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse.txt
new file mode 100644
index 0000000..d863ecf
--- /dev/null
+++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse.txt
@@ -0,0 +1,5 @@
+mysql-sync-database
+ --including-tables "tbl.*|add_tbl"
+ --table-conf replication_num=1
+ --schema-change-mode sql_parser
+ --single-sink true
\ No newline at end of file
diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse_init.sql b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse_init.sql
new file mode 100644
index 0000000..ec617f3
--- /dev/null
+++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse_init.sql
@@ -0,0 +1,38 @@
+CREATE DATABASE if NOT EXISTS test_e2e_mysql;
+DROP TABLE IF EXISTS test_e2e_mysql.tbl1;
+CREATE TABLE test_e2e_mysql.tbl1 (
+ `name` varchar(256) primary key,
+ `age` int
+);
+insert into test_e2e_mysql.tbl1 values ('doris_1',1);
+
+
+DROP TABLE IF EXISTS test_e2e_mysql.tbl2;
+CREATE TABLE test_e2e_mysql.tbl2 (
+ `name` varchar(256) primary key,
+ `age` int
+);
+insert into test_e2e_mysql.tbl2 values ('doris_2',2);
+
+
+DROP TABLE IF EXISTS test_e2e_mysql.tbl3;
+CREATE TABLE test_e2e_mysql.tbl3 (
+ `name` varchar(256) primary key,
+ `age` int
+);
+insert into test_e2e_mysql.tbl3 values ('doris_3',3);
+
+
+DROP TABLE IF EXISTS test_e2e_mysql.tbl4;
+CREATE TABLE test_e2e_mysql.tbl4 (
+ `name` varchar(256) primary key,
+ `age` int
+);
+
+
+DROP TABLE IF EXISTS test_e2e_mysql.tbl5;
+CREATE TABLE test_e2e_mysql.tbl5 (
+ `name` varchar(256) primary key,
+ `age` int
+);
+insert into test_e2e_mysql.tbl5 values ('doris_5',5);
\ No newline at end of file
diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris_init.sql b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris_init.sql
new file mode 100644
index 0000000..ec617f3
--- /dev/null
+++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris_init.sql
@@ -0,0 +1,38 @@
+CREATE DATABASE if NOT EXISTS test_e2e_mysql;
+DROP TABLE IF EXISTS test_e2e_mysql.tbl1;
+CREATE TABLE test_e2e_mysql.tbl1 (
+ `name` varchar(256) primary key,
+ `age` int
+);
+insert into test_e2e_mysql.tbl1 values ('doris_1',1);
+
+
+DROP TABLE IF EXISTS test_e2e_mysql.tbl2;
+CREATE TABLE test_e2e_mysql.tbl2 (
+ `name` varchar(256) primary key,
+ `age` int
+);
+insert into test_e2e_mysql.tbl2 values ('doris_2',2);
+
+
+DROP TABLE IF EXISTS test_e2e_mysql.tbl3;
+CREATE TABLE test_e2e_mysql.tbl3 (
+ `name` varchar(256) primary key,
+ `age` int
+);
+insert into test_e2e_mysql.tbl3 values ('doris_3',3);
+
+
+DROP TABLE IF EXISTS test_e2e_mysql.tbl4;
+CREATE TABLE test_e2e_mysql.tbl4 (
+ `name` varchar(256) primary key,
+ `age` int
+);
+
+
+DROP TABLE IF EXISTS test_e2e_mysql.tbl5;
+CREATE TABLE test_e2e_mysql.tbl5 (
+ `name` varchar(256) primary key,
+ `age` int
+);
+insert into test_e2e_mysql.tbl5 values ('doris_5',5);
\ No newline at end of file