[Improve] standardize Flink parameters (#337)

diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
index b446806..ff6ad47 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
@@ -24,21 +24,14 @@
     String DORIS_DEFAULT_CLUSTER = "default_cluster";
 
     String TABLE_IDENTIFIER = "table.identifier";
-    String DORIS_TABLE_IDENTIFIER = "doris.table.identifier";
     String DORIS_READ_FIELD = "doris.read.field";
     String DORIS_FILTER_QUERY = "doris.filter.query";
-    String DORIS_FILTER_QUERY_IN_MAX_COUNT = "doris.filter.query.in.max.count";
-    Integer DORIS_FILTER_QUERY_IN_VALUE_UPPER_LIMIT = 10000;
-
     String DORIS_USER = "username";
     String DORIS_PASSWORD = "password";
-
-    String DORIS_REQUEST_AUTH_USER = "doris.request.auth.user";
-    String DORIS_REQUEST_AUTH_PASSWORD = "doris.request.auth.password";
     String DORIS_REQUEST_RETRIES = "doris.request.retries";
-    String DORIS_REQUEST_CONNECT_TIMEOUT_MS = "doris.request.connect.timeout.ms";
-    String DORIS_REQUEST_READ_TIMEOUT_MS = "doris.request.read.timeout.ms";
-    String DORIS_REQUEST_QUERY_TIMEOUT_S = "doris.request.query.timeout.s";
+    String DORIS_REQUEST_CONNECT_TIMEOUT_MS = "doris.request.connect.timeout";
+    String DORIS_REQUEST_READ_TIMEOUT_MS = "doris.request.read.timeout";
+    String DORIS_REQUEST_QUERY_TIMEOUT_S = "doris.request.query.timeout";
     Integer DORIS_REQUEST_RETRIES_DEFAULT = 3;
     Integer DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT = 30 * 1000;
     Integer DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT = 30 * 1000;
@@ -53,12 +46,9 @@
 
     String DORIS_EXEC_MEM_LIMIT = "doris.exec.mem.limit";
     Long DORIS_EXEC_MEM_LIMIT_DEFAULT = 2147483648L;
-
-    String DORIS_VALUE_READER_CLASS = "doris.value.reader.class";
-
+    String DORIS_EXEC_MEM_LIMIT_DEFAULT_STR = "2048mb";
     String DORIS_DESERIALIZE_ARROW_ASYNC = "doris.deserialize.arrow.async";
     Boolean DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT = false;
-
     String DORIS_DESERIALIZE_QUEUE_SIZE = "doris.deserialize.queue.size";
     Integer DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64;
 }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index c698871..b91b04b 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -19,6 +19,7 @@
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.table.factories.FactoryUtil;
 
 import org.apache.doris.flink.sink.writer.WriteMode;
@@ -30,7 +31,7 @@
 import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
 import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT;
 import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT;
-import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT;
+import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT_STR;
 import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT;
 import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT;
 import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
@@ -95,47 +96,52 @@
             ConfigOptions.key("doris.request.tablet.size")
                     .intType()
                     .defaultValue(DORIS_TABLET_SIZE_DEFAULT)
-                    .withDescription("");
-    public static final ConfigOption<Integer> DORIS_REQUEST_CONNECT_TIMEOUT_MS =
-            ConfigOptions.key("doris.request.connect.timeout.ms")
-                    .intType()
-                    .defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)
-                    .withDescription("");
-    public static final ConfigOption<Integer> DORIS_REQUEST_READ_TIMEOUT_MS =
-            ConfigOptions.key("doris.request.read.timeout.ms")
-                    .intType()
-                    .defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)
-                    .withDescription("");
-    public static final ConfigOption<Integer> DORIS_REQUEST_QUERY_TIMEOUT_S =
-            ConfigOptions.key("doris.request.query.timeout.s")
-                    .intType()
-                    .defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT)
-                    .withDescription("");
+                    .withDescription(
+                            "The number of Doris Tablets corresponding to a Partition. The smaller this value is set, the more Partitions will be generated. This improves the parallelism on the Flink side, but at the same time puts more pressure on Doris.");
+    public static final ConfigOption<Duration> DORIS_REQUEST_CONNECT_TIMEOUT_MS =
+            ConfigOptions.key("doris.request.connect.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofMillis(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT))
+                    .withDescription("Connection timeout for sending requests to Doris");
+    public static final ConfigOption<Duration> DORIS_REQUEST_READ_TIMEOUT_MS =
+            ConfigOptions.key("doris.request.read.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofMillis(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT))
+                    .withDescription("Read timeout for sending requests to Doris");
+    public static final ConfigOption<Duration> DORIS_REQUEST_QUERY_TIMEOUT_S =
+            ConfigOptions.key("doris.request.query.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT))
+                    .withDescription(
+                            "The timeout time for querying Doris, the default value is 1 hour, -1 means no timeout limit");
     public static final ConfigOption<Integer> DORIS_REQUEST_RETRIES =
             ConfigOptions.key("doris.request.retries")
                     .intType()
                     .defaultValue(DORIS_REQUEST_RETRIES_DEFAULT)
-                    .withDescription("");
+                    .withDescription("Number of retries to send requests to Doris");
     public static final ConfigOption<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC =
             ConfigOptions.key("doris.deserialize.arrow.async")
                     .booleanType()
                     .defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT)
-                    .withDescription("");
+                    .withDescription(
+                            "Whether to support asynchronous conversion of Arrow format to RowBatch needed for connector iterations");
     public static final ConfigOption<Integer> DORIS_DESERIALIZE_QUEUE_SIZE =
             ConfigOptions.key("doris.deserialize.queue.size")
                     .intType()
                     .defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)
-                    .withDescription("");
+                    .withDescription(
+                            "Asynchronous conversion of internal processing queue in Arrow format, effective when doris.deserialize.arrow.async is true");
     public static final ConfigOption<Integer> DORIS_BATCH_SIZE =
             ConfigOptions.key("doris.batch.size")
                     .intType()
                     .defaultValue(DORIS_BATCH_SIZE_DEFAULT)
-                    .withDescription("");
-    public static final ConfigOption<Long> DORIS_EXEC_MEM_LIMIT =
+                    .withDescription(
+                            "The maximum number of rows to read data from BE at a time. Increasing this value reduces the number of connections established between Flink and Doris. Thereby reducing the additional time overhead caused by network delay.");
+    public static final ConfigOption<MemorySize> DORIS_EXEC_MEM_LIMIT =
             ConfigOptions.key("doris.exec.mem.limit")
-                    .longType()
-                    .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT)
-                    .withDescription("");
+                    .memoryType()
+                    .defaultValue(MemorySize.parse(DORIS_EXEC_MEM_LIMIT_DEFAULT_STR))
+                    .withDescription("Memory limit for a single query. The default is 2048mb.");
     public static final ConfigOption<Boolean> SOURCE_USE_OLD_API =
             ConfigOptions.key("source.use-old-api")
                     .booleanType()
@@ -198,20 +204,20 @@
                     .defaultValue(true)
                     .withDescription("enable 2PC while loading");
 
-    public static final ConfigOption<Integer> SINK_CHECK_INTERVAL =
+    public static final ConfigOption<Duration> SINK_CHECK_INTERVAL =
             ConfigOptions.key("sink.check-interval")
-                    .intType()
-                    .defaultValue(10000)
+                    .durationType()
+                    .defaultValue(Duration.ofMillis(10000))
                     .withDescription("check exception with the interval while loading");
     public static final ConfigOption<Integer> SINK_MAX_RETRIES =
             ConfigOptions.key("sink.max-retries")
                     .intType()
                     .defaultValue(3)
                     .withDescription("the max retry times if writing records to database failed.");
-    public static final ConfigOption<Integer> SINK_BUFFER_SIZE =
+    public static final ConfigOption<MemorySize> SINK_BUFFER_SIZE =
             ConfigOptions.key("sink.buffer-size")
-                    .intType()
-                    .defaultValue(1024 * 1024)
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("1mb"))
                     .withDescription("the buffer size to cache data for stream load.");
     public static final ConfigOption<Integer> SINK_BUFFER_COUNT =
             ConfigOptions.key("sink.buffer-count")
@@ -263,10 +269,10 @@
                     .withDescription(
                             "The maximum number of flush items in each batch, the default is 5w");
 
-    public static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_BYTES =
+    public static final ConfigOption<MemorySize> SINK_BUFFER_FLUSH_MAX_BYTES =
             ConfigOptions.key("sink.buffer-flush.max-bytes")
-                    .intType()
-                    .defaultValue(10 * 1024 * 1024)
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("10mb"))
                     .withDescription(
                             "The maximum number of bytes flushed in each batch, the default is 10MB");
 
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index bf5cd8c..f327b9c 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -204,13 +204,16 @@
         final DorisReadOptions.Builder builder = DorisReadOptions.builder();
         builder.setDeserializeArrowAsync(readableConfig.get(DORIS_DESERIALIZE_ARROW_ASYNC))
                 .setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE))
-                .setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT))
+                .setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT).getBytes())
                 .setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY))
                 .setReadFields(readableConfig.get(DORIS_READ_FIELD))
-                .setRequestQueryTimeoutS(readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S))
+                .setRequestQueryTimeoutS(
+                        (int) readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S).getSeconds())
                 .setRequestBatchSize(readableConfig.get(DORIS_BATCH_SIZE))
-                .setRequestConnectTimeoutMs(readableConfig.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS))
-                .setRequestReadTimeoutMs(readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS))
+                .setRequestConnectTimeoutMs(
+                        (int) readableConfig.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS).toMillis())
+                .setRequestReadTimeoutMs(
+                        (int) readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS).toMillis())
                 .setRequestRetries(readableConfig.get(DORIS_REQUEST_RETRIES))
                 .setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE))
                 .setUseOldApi(readableConfig.get(SOURCE_USE_OLD_API));
@@ -220,9 +223,9 @@
     private DorisExecutionOptions getDorisExecutionOptions(
             ReadableConfig readableConfig, Properties streamLoadProp) {
         final DorisExecutionOptions.Builder builder = DorisExecutionOptions.builder();
-        builder.setCheckInterval(readableConfig.get(SINK_CHECK_INTERVAL));
+        builder.setCheckInterval((int) readableConfig.get(SINK_CHECK_INTERVAL).toMillis());
         builder.setMaxRetries(readableConfig.get(SINK_MAX_RETRIES));
-        builder.setBufferSize(readableConfig.get(SINK_BUFFER_SIZE));
+        builder.setBufferSize((int) readableConfig.get(SINK_BUFFER_SIZE).getBytes());
         builder.setBufferCount(readableConfig.get(SINK_BUFFER_COUNT));
         builder.setLabelPrefix(readableConfig.get(SINK_LABEL_PREFIX));
         builder.setStreamLoadProp(streamLoadProp);
@@ -245,7 +248,8 @@
         }
         builder.setFlushQueueSize(readableConfig.get(SINK_FLUSH_QUEUE_SIZE));
         builder.setBufferFlushMaxRows(readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS));
-        builder.setBufferFlushMaxBytes(readableConfig.get(SINK_BUFFER_FLUSH_MAX_BYTES));
+        builder.setBufferFlushMaxBytes(
+                (int) readableConfig.get(SINK_BUFFER_FLUSH_MAX_BYTES).getBytes());
         builder.setBufferFlushIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
         builder.setUseCache(readableConfig.get(SINK_USE_CACHE));
         return builder.build();
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 6b0a0fd..14d3fbb 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -260,10 +260,10 @@
                 .ifPresent(executionBuilder::setBufferCount);
         sinkConfig
                 .getOptional(DorisConfigOptions.SINK_BUFFER_SIZE)
-                .ifPresent(executionBuilder::setBufferSize);
+                .ifPresent(v -> executionBuilder.setBufferSize((int) v.getBytes()));
         sinkConfig
                 .getOptional(DorisConfigOptions.SINK_CHECK_INTERVAL)
-                .ifPresent(executionBuilder::setCheckInterval);
+                .ifPresent(v -> executionBuilder.setCheckInterval((int) v.toMillis()));
         sinkConfig
                 .getOptional(DorisConfigOptions.SINK_MAX_RETRIES)
                 .ifPresent(executionBuilder::setMaxRetries);
@@ -289,7 +289,7 @@
                 .ifPresent(executionBuilder::setBufferFlushMaxRows);
         sinkConfig
                 .getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_BYTES)
-                .ifPresent(executionBuilder::setBufferFlushMaxBytes);
+                .ifPresent(v -> executionBuilder.setBufferFlushMaxBytes((int) v.getBytes()));
         sinkConfig
                 .getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_INTERVAL)
                 .ifPresent(v -> executionBuilder.setBufferFlushIntervalMs(v.toMillis()));