[log](improve)Add printing parameters and move the verification forward (#295)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index 9caddd2..c76506a 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -20,6 +20,7 @@
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
@@ -42,9 +43,9 @@
private static final List<String> EMPTY_KEYS = Collections.singletonList("password");
public static void main(String[] args) throws Exception {
+ System.out.println("Input args: " + Arrays.asList(args) + ".\n");
String operation = args[0].toLowerCase();
String[] opArgs = Arrays.copyOfRange(args, 1, args.length);
- System.out.println();
switch (operation) {
case MYSQL_SYNC_DATABASE:
createMySQLSyncDatabase(opArgs);
@@ -66,6 +67,7 @@
private static void createMySQLSyncDatabase(String[] opArgs) throws Exception {
MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+ Preconditions.checkArgument(params.has("mysql-conf"));
Map<String, String> mysqlMap = getConfigMap(params, "mysql-conf");
Configuration mysqlConfig = Configuration.fromMap(mysqlMap);
DatabaseSync databaseSync = new MysqlDatabaseSync();
@@ -74,6 +76,7 @@
private static void createOracleSyncDatabase(String[] opArgs) throws Exception {
MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+ Preconditions.checkArgument(params.has("oracle-conf"));
Map<String, String> oracleMap = getConfigMap(params, "oracle-conf");
Configuration oracleConfig = Configuration.fromMap(oracleMap);
DatabaseSync databaseSync = new OracleDatabaseSync();
@@ -82,6 +85,7 @@
private static void createPostgresSyncDatabase(String[] opArgs) throws Exception {
MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+ Preconditions.checkArgument(params.has("postgres-conf"));
Map<String, String> postgresMap = getConfigMap(params, "postgres-conf");
Configuration postgresConfig = Configuration.fromMap(postgresMap);
DatabaseSync databaseSync = new PostgresDatabaseSync();
@@ -90,6 +94,7 @@
private static void createSqlServerSyncDatabase(String[] opArgs) throws Exception {
MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+ Preconditions.checkArgument(params.has("sqlserver-conf"));
Map<String, String> postgresMap = getConfigMap(params, "sqlserver-conf");
Configuration postgresConfig = Configuration.fromMap(postgresMap);
DatabaseSync databaseSync = new SqlServerDatabaseSync();
@@ -115,6 +120,7 @@
boolean useNewSchemaChange = params.has("use-new-schema-change");
boolean singleSink = params.has("single-sink");
+ Preconditions.checkArgument(params.has("sink-conf"));
Map<String, String> sinkMap = getConfigMap(params, "sink-conf");
Map<String, String> tableMap = getConfigMap(params, "table-conf");
Configuration sinkConfig = Configuration.fromMap(sinkMap);
@@ -149,7 +155,13 @@
private static Map<String, String> getConfigMap(MultipleParameterTool params, String key) {
if (!params.has(key)) {
- return new HashMap<>();
+ System.out.println(
+ "Can not find key ["
+ + key
+ + "] from args: "
+ + params.toMap().toString()
+ + ".\n");
+ return null;
}
Map<String, String> map = new HashMap<>();
@@ -163,7 +175,8 @@
continue;
}
- System.err.println("Invalid " + key + " " + param + ".\n");
+ System.out.println("Invalid " + key + " " + param + ".\n");
+ return null;
}
return map;
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index f1e172b..b6bb5e5 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -22,6 +22,7 @@
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
@@ -441,7 +442,9 @@
}
public DatabaseSync setTableConfig(Map<String, String> tableConfig) {
- this.tableConfig = tableConfig;
+ if (!CollectionUtil.isNullOrEmpty(tableConfig)) {
+ this.tableConfig = tableConfig;
+ }
return this;
}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
index e3478da..278be8c 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
@@ -28,15 +28,20 @@
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerLoggerFactory;
-import java.net.MalformedURLException;
+import java.net.InetAddress;
import java.net.URL;
import java.net.URLClassLoader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Stream;
@@ -107,7 +112,7 @@
return container;
}
- protected static void initializeJdbcConnection() throws SQLException, MalformedURLException {
+ protected static void initializeJdbcConnection() throws Exception {
URLClassLoader urlClassLoader =
new URLClassLoader(
new URL[] {new URL(DRIVER_JAR)}, DorisTestBase.class.getClassLoader());
@@ -124,6 +129,7 @@
} while (!isBeReady(resultSet, Duration.ofSeconds(1L)));
}
LOG.info("Connected to Doris successfully...");
+ printClusterStatus();
}
private static boolean isBeReady(ResultSet rs, Duration duration) throws SQLException {
@@ -135,4 +141,28 @@
}
return false;
}
+
+ protected static void printClusterStatus() throws Exception {
+ LOG.info("Current machine IP: {}", InetAddress.getLocalHost());
+ try (Statement statement = connection.createStatement()) {
+ ResultSet showFrontends = statement.executeQuery("show frontends");
+ LOG.info("Frontends status: {}", convertList(showFrontends));
+ ResultSet showBackends = statement.executeQuery("show backends");
+ LOG.info("Backends status: {}", convertList(showBackends));
+ }
+ }
+
+ private static List<Map> convertList(ResultSet rs) throws SQLException {
+ List<Map> list = new ArrayList<>();
+ ResultSetMetaData metaData = rs.getMetaData();
+ int columnCount = metaData.getColumnCount();
+ while (rs.next()) {
+ Map<String, Object> rowData = new HashMap<>();
+ for (int i = 1; i <= columnCount; i++) {
+ rowData.put(metaData.getColumnName(i), rs.getObject(i));
+ }
+ list.add(rowData);
+ }
+ return list;
+ }
}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
index 2f5568e..242f93f 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
@@ -94,6 +94,7 @@
@Test
public void testMySQL2Doris() throws Exception {
+ printClusterStatus();
initializeMySQLTable();
JobClient jobClient = submitJob();
// wait 2 times checkpoint
@@ -173,6 +174,7 @@
@Test
public void testAutoAddTable() throws Exception {
+ printClusterStatus();
initializeMySQLTable();
initializeDorisTable();
JobClient jobClient = submitJob();