[log](improve)Add printing parameters and move the verification forward (#295)

diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index 9caddd2..c76506a 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -20,6 +20,7 @@
 import org.apache.flink.api.java.utils.MultipleParameterTool;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
 import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
@@ -42,9 +43,9 @@
     private static final List<String> EMPTY_KEYS = Collections.singletonList("password");
 
     public static void main(String[] args) throws Exception {
+        System.out.println("Input args: " + Arrays.asList(args) + ".\n");
         String operation = args[0].toLowerCase();
         String[] opArgs = Arrays.copyOfRange(args, 1, args.length);
-        System.out.println();
         switch (operation) {
             case MYSQL_SYNC_DATABASE:
                 createMySQLSyncDatabase(opArgs);
@@ -66,6 +67,7 @@
 
     private static void createMySQLSyncDatabase(String[] opArgs) throws Exception {
         MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+        Preconditions.checkArgument(params.has("mysql-conf"));
         Map<String, String> mysqlMap = getConfigMap(params, "mysql-conf");
         Configuration mysqlConfig = Configuration.fromMap(mysqlMap);
         DatabaseSync databaseSync = new MysqlDatabaseSync();
@@ -74,6 +76,7 @@
 
     private static void createOracleSyncDatabase(String[] opArgs) throws Exception {
         MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+        Preconditions.checkArgument(params.has("oracle-conf"));
         Map<String, String> oracleMap = getConfigMap(params, "oracle-conf");
         Configuration oracleConfig = Configuration.fromMap(oracleMap);
         DatabaseSync databaseSync = new OracleDatabaseSync();
@@ -82,6 +85,7 @@
 
     private static void createPostgresSyncDatabase(String[] opArgs) throws Exception {
         MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+        Preconditions.checkArgument(params.has("postgres-conf"));
         Map<String, String> postgresMap = getConfigMap(params, "postgres-conf");
         Configuration postgresConfig = Configuration.fromMap(postgresMap);
         DatabaseSync databaseSync = new PostgresDatabaseSync();
@@ -90,6 +94,7 @@
 
     private static void createSqlServerSyncDatabase(String[] opArgs) throws Exception {
         MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+        Preconditions.checkArgument(params.has("sqlserver-conf"));
         Map<String, String> postgresMap = getConfigMap(params, "sqlserver-conf");
         Configuration postgresConfig = Configuration.fromMap(postgresMap);
         DatabaseSync databaseSync = new SqlServerDatabaseSync();
@@ -115,6 +120,7 @@
         boolean useNewSchemaChange = params.has("use-new-schema-change");
         boolean singleSink = params.has("single-sink");
 
+        Preconditions.checkArgument(params.has("sink-conf"));
         Map<String, String> sinkMap = getConfigMap(params, "sink-conf");
         Map<String, String> tableMap = getConfigMap(params, "table-conf");
         Configuration sinkConfig = Configuration.fromMap(sinkMap);
@@ -149,7 +155,13 @@
 
     private static Map<String, String> getConfigMap(MultipleParameterTool params, String key) {
         if (!params.has(key)) {
-            return new HashMap<>();
+            System.out.println(
+                    "Can not find key ["
+                            + key
+                            + "] from args: "
+                            + params.toMap().toString()
+                            + ".\n");
+            return null;
         }
 
         Map<String, String> map = new HashMap<>();
@@ -163,7 +175,8 @@
                 continue;
             }
 
-            System.err.println("Invalid " + key + " " + param + ".\n");
+            System.out.println("Invalid " + key + " " + param + ".\n");
+            return null;
         }
         return map;
     }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index f1e172b..b6bb5e5 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -22,6 +22,7 @@
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
@@ -441,7 +442,9 @@
     }
 
     public DatabaseSync setTableConfig(Map<String, String> tableConfig) {
-        this.tableConfig = tableConfig;
+        if (!CollectionUtil.isNullOrEmpty(tableConfig)) {
+            this.tableConfig = tableConfig;
+        }
         return this;
     }
 
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
index e3478da..278be8c 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
@@ -28,15 +28,20 @@
 import org.testcontainers.lifecycle.Startables;
 import org.testcontainers.utility.DockerLoggerFactory;
 
-import java.net.MalformedURLException;
+import java.net.InetAddress;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.LockSupport;
 import java.util.stream.Stream;
@@ -107,7 +112,7 @@
         return container;
     }
 
-    protected static void initializeJdbcConnection() throws SQLException, MalformedURLException {
+    protected static void initializeJdbcConnection() throws Exception {
         URLClassLoader urlClassLoader =
                 new URLClassLoader(
                         new URL[] {new URL(DRIVER_JAR)}, DorisTestBase.class.getClassLoader());
@@ -124,6 +129,7 @@
             } while (!isBeReady(resultSet, Duration.ofSeconds(1L)));
         }
         LOG.info("Connected to Doris successfully...");
+        printClusterStatus();
     }
 
     private static boolean isBeReady(ResultSet rs, Duration duration) throws SQLException {
@@ -135,4 +141,28 @@
         }
         return false;
     }
+
+    protected static void printClusterStatus() throws Exception {
+        LOG.info("Current machine IP: {}", InetAddress.getLocalHost());
+        try (Statement statement = connection.createStatement()) {
+            ResultSet showFrontends = statement.executeQuery("show frontends");
+            LOG.info("Frontends status: {}", convertList(showFrontends));
+            ResultSet showBackends = statement.executeQuery("show backends");
+            LOG.info("Backends status: {}", convertList(showBackends));
+        }
+    }
+
+    private static List<Map> convertList(ResultSet rs) throws SQLException {
+        List<Map> list = new ArrayList<>();
+        ResultSetMetaData metaData = rs.getMetaData();
+        int columnCount = metaData.getColumnCount();
+        while (rs.next()) {
+            Map<String, Object> rowData = new HashMap<>();
+            for (int i = 1; i <= columnCount; i++) {
+                rowData.put(metaData.getColumnName(i), rs.getObject(i));
+            }
+            list.add(rowData);
+        }
+        return list;
+    }
 }
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
index 2f5568e..242f93f 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
@@ -94,6 +94,7 @@
 
     @Test
     public void testMySQL2Doris() throws Exception {
+        printClusterStatus();
         initializeMySQLTable();
         JobClient jobClient = submitJob();
         // wait 2 times checkpoint
@@ -173,6 +174,7 @@
 
     @Test
     public void testAutoAddTable() throws Exception {
+        printClusterStatus();
         initializeMySQLTable();
         initializeDorisTable();
         JobClient jobClient = submitJob();