[improve] change batch mode param and add default streamload prop  (#251)

Co-authored-by: wudi <>
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 9a43d77..25e1535 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -119,6 +119,13 @@
         return new Builder().setStreamLoadProp(properties).build();
     }
 
+    public static Properties defaultsProperties() {
+        Properties properties = new Properties();
+        properties.setProperty("format", "json");
+        properties.setProperty("read_json_by_line", "true");
+        return properties;
+    }
+
     public Integer checkInterval() {
         return checkInterval;
     }
@@ -269,8 +276,8 @@
             return this;
         }
 
-        public Builder enableBatchMode() {
-            this.enableBatchMode = true;
+        public Builder setBatchMode(Boolean enableBatchMode) {
+            this.enableBatchMode = enableBatchMode;
             return this;
         }
 
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index bccb8b7..521d741 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -224,10 +224,7 @@
             builder.enable2PC();
         }
 
-        if(readableConfig.get(SINK_ENABLE_BATCH_MODE)) {
-            builder.enableBatchMode();
-        }
-
+        builder.setBatchMode(readableConfig.get(SINK_ENABLE_BATCH_MODE));
         builder.setFlushQueueSize(readableConfig.get(SINK_FLUSH_QUEUE_SIZE));
         builder.setBufferFlushMaxRows(readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS));
         builder.setBufferFlushMaxBytes(readableConfig.get(SINK_BUFFER_FLUSH_MAX_BYTES));
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 4dfa6d5..bf26214 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -221,10 +221,7 @@
             executionBuilder.enable2PC();
         }
 
-        //batch option
-        if(sinkConfig.getBoolean(DorisConfigOptions.SINK_ENABLE_BATCH_MODE)){
-            executionBuilder.enableBatchMode();
-        }
+        sinkConfig.getOptional(DorisConfigOptions.SINK_ENABLE_BATCH_MODE).ifPresent(executionBuilder::setBatchMode);
         sinkConfig.getOptional(DorisConfigOptions.SINK_FLUSH_QUEUE_SIZE).ifPresent(executionBuilder::setFlushQueueSize);
         sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_ROWS).ifPresent(executionBuilder::setBufferFlushMaxRows);
         sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_BYTES).ifPresent(executionBuilder::setBufferFlushMaxBytes);