[improve] change batch mode param and add default streamload prop (#251)
Co-authored-by: wudi <>
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 9a43d77..25e1535 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -119,6 +119,13 @@
return new Builder().setStreamLoadProp(properties).build();
}
+ public static Properties defaultsProperties() {
+ Properties properties = new Properties();
+ properties.setProperty("format", "json");
+ properties.setProperty("read_json_by_line", "true");
+ return properties;
+ }
+
public Integer checkInterval() {
return checkInterval;
}
@@ -269,8 +276,8 @@
return this;
}
- public Builder enableBatchMode() {
- this.enableBatchMode = true;
+ public Builder setBatchMode(Boolean enableBatchMode) {
+ this.enableBatchMode = enableBatchMode;
return this;
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index bccb8b7..521d741 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -224,10 +224,7 @@
builder.enable2PC();
}
- if(readableConfig.get(SINK_ENABLE_BATCH_MODE)) {
- builder.enableBatchMode();
- }
-
+ builder.setBatchMode(readableConfig.get(SINK_ENABLE_BATCH_MODE));
builder.setFlushQueueSize(readableConfig.get(SINK_FLUSH_QUEUE_SIZE));
builder.setBufferFlushMaxRows(readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS));
builder.setBufferFlushMaxBytes(readableConfig.get(SINK_BUFFER_FLUSH_MAX_BYTES));
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 4dfa6d5..bf26214 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -221,10 +221,7 @@
executionBuilder.enable2PC();
}
- //batch option
- if(sinkConfig.getBoolean(DorisConfigOptions.SINK_ENABLE_BATCH_MODE)){
- executionBuilder.enableBatchMode();
- }
+ sinkConfig.getOptional(DorisConfigOptions.SINK_ENABLE_BATCH_MODE).ifPresent(executionBuilder::setBatchMode);
sinkConfig.getOptional(DorisConfigOptions.SINK_FLUSH_QUEUE_SIZE).ifPresent(executionBuilder::setFlushQueueSize);
sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_ROWS).ifPresent(executionBuilder::setBufferFlushMaxRows);
sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_BYTES).ifPresent(executionBuilder::setBufferFlushMaxBytes);