| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package org.apache.doris.flink.tools.cdc; |
| |
| import org.apache.flink.api.common.JobStatus; |
| import org.apache.flink.api.common.restartstrategy.RestartStrategies; |
| import org.apache.flink.api.common.time.Deadline; |
| import org.apache.flink.configuration.Configuration; |
| import org.apache.flink.core.execution.JobClient; |
| import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
| import org.apache.flink.util.function.SupplierWithException; |
| |
| import org.apache.doris.flink.DorisTestBase; |
| import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync; |
| import org.junit.AfterClass; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| import org.junit.jupiter.api.Assertions; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.testcontainers.containers.MySQLContainer; |
| import org.testcontainers.lifecycle.Startables; |
| |
| import java.sql.Connection; |
| import java.sql.DriverManager; |
| import java.sql.ResultSet; |
| import java.sql.Statement; |
| import java.time.Duration; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.TimeoutException; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| |
| import static org.apache.flink.api.common.JobStatus.RUNNING; |
| |
| /** |
| * MySQLDorisE2ECase 1. Automatically create tables 2. Schema change event synchronization |
| * 3.Synchronization of addition, deletion and modification events 4. CDC multi-table writing. |
| */ |
| public class MySQLDorisE2ECase extends DorisTestBase { |
| protected static final Logger LOG = LoggerFactory.getLogger(MySQLDorisE2ECase.class); |
| private static final String DATABASE = "test"; |
| private static final String MYSQL_USER = "root"; |
| private static final String MYSQL_PASSWD = "123456"; |
| private static final String TABLE_1 = "tbl1"; |
| private static final String TABLE_2 = "tbl2"; |
| private static final String TABLE_3 = "tbl3"; |
| |
| private static final MySQLContainer MYSQL_CONTAINER = |
| new MySQLContainer("mysql") |
| .withDatabaseName(DATABASE) |
| .withUsername(MYSQL_USER) |
| .withPassword(MYSQL_PASSWD); |
| |
| @BeforeClass |
| public static void startMySQLContainers() { |
| MYSQL_CONTAINER.setCommand("--default-time-zone=Asia/Shanghai"); |
| LOG.info("Starting MySQL containers..."); |
| Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join(); |
| LOG.info("MySQL Containers are started."); |
| } |
| |
| @AfterClass |
| public static void stopMySQLContainers() { |
| LOG.info("Stopping MySQL containers..."); |
| MYSQL_CONTAINER.stop(); |
| LOG.info("MySQL Containers are stopped."); |
| } |
| |
| @Test |
| public void testMySQL2Doris() throws Exception { |
| initializeMySQLTable(); |
| JobClient jobClient = submitJob(); |
| // wait 2 times checkpoint |
| Thread.sleep(20000); |
| Set<List<Object>> expected = |
| Stream.<List<Object>>of( |
| Arrays.asList("doris_1", 1), |
| Arrays.asList("doris_2", 2), |
| Arrays.asList("doris_3", 3)) |
| .collect(Collectors.toSet()); |
| String sql = |
| "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s ) res order by 1"; |
| checkResult(expected, sql, 2); |
| |
| // add incremental data |
| try (Connection connection = |
| DriverManager.getConnection( |
| MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD); |
| Statement statement = connection.createStatement()) { |
| statement.execute( |
| String.format("insert into %s.%s values ('doris_1_1',10)", DATABASE, TABLE_1)); |
| statement.execute( |
| String.format("insert into %s.%s values ('doris_2_1',11)", DATABASE, TABLE_2)); |
| statement.execute( |
| String.format("insert into %s.%s values ('doris_3_1',12)", DATABASE, TABLE_3)); |
| |
| statement.execute( |
| String.format( |
| "update %s.%s set age=18 where name='doris_1'", DATABASE, TABLE_1)); |
| statement.execute( |
| String.format("delete from %s.%s where name='doris_2'", DATABASE, TABLE_2)); |
| } |
| |
| Thread.sleep(20000); |
| Set<List<Object>> expected2 = |
| Stream.<List<Object>>of( |
| Arrays.asList("doris_1", 18), |
| Arrays.asList("doris_1_1", 10), |
| Arrays.asList("doris_2_1", 11), |
| Arrays.asList("doris_3", 3), |
| Arrays.asList("doris_3_1", 12)) |
| .collect(Collectors.toSet()); |
| sql = |
| "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s ) res order by 1"; |
| checkResult(expected2, sql, 2); |
| |
| // mock schema change |
| try (Connection connection = |
| DriverManager.getConnection( |
| MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD); |
| Statement statement = connection.createStatement()) { |
| statement.execute( |
| String.format( |
| "alter table %s.%s add column c1 varchar(128)", DATABASE, TABLE_1)); |
| statement.execute( |
| String.format("alter table %s.%s drop column age", DATABASE, TABLE_1)); |
| Thread.sleep(20000); |
| statement.execute( |
| String.format( |
| "insert into %s.%s values ('doris_1_1_1','c1_val')", |
| DATABASE, TABLE_1)); |
| } |
| Thread.sleep(20000); |
| Set<List<Object>> expected3 = |
| Stream.<List<Object>>of( |
| Arrays.asList("doris_1", null), |
| Arrays.asList("doris_1_1", null), |
| Arrays.asList("doris_1_1_1", "c1_val")) |
| .collect(Collectors.toSet()); |
| sql = "select * from %s.%s order by 1"; |
| checkResult(expected3, sql, 2); |
| jobClient.cancel().get(); |
| } |
| |
| public void checkResult(Set<List<Object>> expected, String query, int columnSize) |
| throws Exception { |
| Set<List<Object>> actual = new HashSet<>(); |
| try (Statement sinkStatement = connection.createStatement()) { |
| ResultSet sinkResultSet = |
| sinkStatement.executeQuery( |
| String.format( |
| query, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, |
| TABLE_3)); |
| while (sinkResultSet.next()) { |
| List<Object> row = new ArrayList<>(); |
| for (int i = 1; i <= columnSize; i++) { |
| row.add(sinkResultSet.getObject(i)); |
| } |
| actual.add(row); |
| } |
| } |
| Assertions.assertIterableEquals(expected, actual); |
| } |
| |
| public JobClient submitJob() throws Exception { |
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| env.setRestartStrategy(RestartStrategies.noRestart()); |
| Map<String, String> flinkMap = new HashMap<>(); |
| flinkMap.put("execution.checkpointing.interval", "10s"); |
| flinkMap.put("pipeline.operator-chaining", "false"); |
| flinkMap.put("parallelism.default", "1"); |
| |
| Configuration configuration = Configuration.fromMap(flinkMap); |
| env.configure(configuration); |
| |
| String database = DATABASE; |
| Map<String, String> mysqlConfig = new HashMap<>(); |
| mysqlConfig.put("database-name", DATABASE); |
| mysqlConfig.put("hostname", MYSQL_CONTAINER.getHost()); |
| mysqlConfig.put("port", MYSQL_CONTAINER.getMappedPort(3306) + ""); |
| mysqlConfig.put("username", MYSQL_USER); |
| mysqlConfig.put("password", MYSQL_PASSWD); |
| mysqlConfig.put("server-time-zone", "Asia/Shanghai"); |
| Configuration config = Configuration.fromMap(mysqlConfig); |
| |
| Map<String, String> sinkConfig = new HashMap<>(); |
| sinkConfig.put("fenodes", getFenodes()); |
| sinkConfig.put("username", USERNAME); |
| sinkConfig.put("password", PASSWORD); |
| sinkConfig.put("jdbc-url", String.format(DorisTestBase.URL, DORIS_CONTAINER.getHost())); |
| sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString()); |
| Configuration sinkConf = Configuration.fromMap(sinkConfig); |
| |
| Map<String, String> tableConfig = new HashMap<>(); |
| tableConfig.put("replication_num", "1"); |
| |
| String includingTables = "tbl1|tbl2|tbl3"; |
| String excludingTables = ""; |
| DatabaseSync databaseSync = new MysqlDatabaseSync(); |
| databaseSync |
| .setEnv(env) |
| .setDatabase(database) |
| .setConfig(config) |
| .setIncludingTables(includingTables) |
| .setExcludingTables(excludingTables) |
| .setIgnoreDefaultValue(false) |
| .setSinkConfig(sinkConf) |
| .setTableConfig(tableConfig) |
| .setCreateTableOnly(false) |
| .setNewSchemaChange(true) |
| .create(); |
| databaseSync.build(); |
| JobClient jobClient = env.executeAsync(); |
| waitForJobStatus( |
| jobClient, |
| Collections.singletonList(RUNNING), |
| Deadline.fromNow(Duration.ofSeconds(10))); |
| return jobClient; |
| } |
| |
| public void initializeMySQLTable() throws Exception { |
| try (Connection connection = |
| DriverManager.getConnection( |
| MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD); |
| Statement statement = connection.createStatement()) { |
| statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE)); |
| statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_1)); |
| statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_2)); |
| statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_3)); |
| statement.execute( |
| String.format( |
| "CREATE TABLE %s.%s ( \n" |
| + "`name` varchar(256) primary key,\n" |
| + "`age` int\n" |
| + ")", |
| DATABASE, TABLE_1)); |
| statement.execute( |
| String.format( |
| "CREATE TABLE %s.%s ( \n" |
| + "`name` varchar(256) primary key,\n" |
| + "`age` int\n" |
| + ")", |
| DATABASE, TABLE_2)); |
| statement.execute( |
| String.format( |
| "CREATE TABLE %s.%s ( \n" |
| + "`name` varchar(256) primary key,\n" |
| + "`age` int\n" |
| + ")", |
| DATABASE, TABLE_3)); |
| // mock stock data |
| statement.execute( |
| String.format("insert into %s.%s values ('doris_1',1)", DATABASE, TABLE_1)); |
| statement.execute( |
| String.format("insert into %s.%s values ('doris_2',2)", DATABASE, TABLE_2)); |
| statement.execute( |
| String.format("insert into %s.%s values ('doris_3',3)", DATABASE, TABLE_3)); |
| } |
| } |
| |
| public static void waitForJobStatus( |
| JobClient client, List<JobStatus> expectedStatus, Deadline deadline) throws Exception { |
| waitUntilCondition( |
| () -> { |
| JobStatus currentStatus = (JobStatus) client.getJobStatus().get(); |
| if (expectedStatus.contains(currentStatus)) { |
| return true; |
| } else if (currentStatus.isTerminalState()) { |
| try { |
| client.getJobExecutionResult().get(); |
| } catch (Exception var4) { |
| throw new IllegalStateException( |
| String.format( |
| "Job has entered %s state, but expecting %s", |
| currentStatus, expectedStatus), |
| var4); |
| } |
| |
| throw new IllegalStateException( |
| String.format( |
| "Job has entered a terminal state %s, but expecting %s", |
| currentStatus, expectedStatus)); |
| } else { |
| return false; |
| } |
| }, |
| deadline, |
| 100L, |
| "Condition was not met in given timeout."); |
| } |
| |
| public static void waitUntilCondition( |
| SupplierWithException<Boolean, Exception> condition, |
| Deadline timeout, |
| long retryIntervalMillis, |
| String errorMsg) |
| throws Exception { |
| while (timeout.hasTimeLeft() && !(Boolean) condition.get()) { |
| long timeLeft = Math.max(0L, timeout.timeLeft().toMillis()); |
| Thread.sleep(Math.min(retryIntervalMillis, timeLeft)); |
| } |
| |
| if (!timeout.hasTimeLeft()) { |
| throw new TimeoutException(errorMsg); |
| } |
| } |
| } |