[testcase](cdc) add e2e test for MySql to Doris (#445)
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
index ab1dfe7..6a61384 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
@@ -62,6 +62,7 @@
private static final String TABLE_2 = "tbl2";
private static final String TABLE_3 = "tbl3";
private static final String TABLE_4 = "tbl4";
+ private static final String TABLE_5 = "tbl5";
private static final String TABLE_SQL_PARSE = "tbl_sql_parse";
private static final MySQLContainer MYSQL_CONTAINER =
@@ -93,10 +94,13 @@
JobClient jobClient = submitJob();
// wait 2 times checkpoint
Thread.sleep(20000);
- List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3");
+ List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3", "doris_5,5");
String sql =
- "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s ) res order by 1";
- String query1 = String.format(sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3);
+ "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s union all select * from %s.%s) res order by 1";
+ String query1 =
+ String.format(
+ sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3, DATABASE,
+ TABLE_5);
checkResult(expected, query1, 2);
// add incremental data
@@ -306,10 +310,13 @@
// wait 2 times checkpoint
Thread.sleep(20000);
- List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3");
+ List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3", "doris_5,5");
String sql =
- "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s ) res order by 1";
- String query1 = String.format(sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3);
+ "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s union all select * from %s.%s) res order by 1";
+ String query1 =
+ String.format(
+ sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3, DATABASE,
+ TABLE_5);
checkResult(expected, query1, 2);
// add incremental data
@@ -431,7 +438,7 @@
Map<String, String> tableConfig = new HashMap<>();
tableConfig.put("replication_num", "1");
- String includingTables = "tbl1|tbl2|tbl3";
+ String includingTables = "tbl1|tbl2|tbl3|tbl5";
String excludingTables = "";
DatabaseSync databaseSync = new MysqlDatabaseSync();
databaseSync
@@ -457,10 +464,13 @@
// wait 2 times checkpoint
Thread.sleep(20000);
- List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3");
+ List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3", "doris_5,5");
String sql =
- "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s ) res order by 1";
- String query1 = String.format(sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3);
+ "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s union all select * from %s.%s) res order by 1";
+ String query1 =
+ String.format(
+ sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3, DATABASE,
+ TABLE_5);
checkResult(expected, query1, 2);
// add incremental data
@@ -503,6 +513,20 @@
statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_2));
statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_3));
statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_4));
+ statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_5));
+ // create a table in Doris
+ statement.execute(
+ String.format(
+ "CREATE TABLE %s.%s ( \n"
+ + "`name` varchar(256),\n"
+ + "`age` int\n"
+ + ")\n"
+ + "UNIQUE KEY(`name`)\n"
+ + "DISTRIBUTED BY HASH(`name`) BUCKETS 1\n"
+ + "PROPERTIES ( \n"
+ + "\"replication_num\" = \"1\" \n"
+ + ");\n",
+ DATABASE, TABLE_5));
}
}
@@ -596,6 +620,7 @@
statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_2));
statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_3));
statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_4));
+ statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_5));
statement.execute(
String.format(
"CREATE TABLE %s.%s ( \n"
@@ -617,6 +642,13 @@
+ "`age` int\n"
+ ")",
DATABASE, TABLE_3));
+ statement.execute(
+ String.format(
+ "CREATE TABLE %s.%s ( \n"
+ + "`name` varchar(256) primary key,\n"
+ + "`age` int\n"
+ + ")",
+ DATABASE, TABLE_5));
// mock stock data
statement.execute(
String.format("insert into %s.%s values ('doris_1',1)", DATABASE, TABLE_1));
@@ -624,6 +656,8 @@
String.format("insert into %s.%s values ('doris_2',2)", DATABASE, TABLE_2));
statement.execute(
String.format("insert into %s.%s values ('doris_3',3)", DATABASE, TABLE_3));
+ statement.execute(
+ String.format("insert into %s.%s values ('doris_5',5)", DATABASE, TABLE_5));
}
}
}