[Improve] standardize Flink parameters (#337)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
index b446806..ff6ad47 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
@@ -24,21 +24,14 @@
String DORIS_DEFAULT_CLUSTER = "default_cluster";
String TABLE_IDENTIFIER = "table.identifier";
- String DORIS_TABLE_IDENTIFIER = "doris.table.identifier";
String DORIS_READ_FIELD = "doris.read.field";
String DORIS_FILTER_QUERY = "doris.filter.query";
- String DORIS_FILTER_QUERY_IN_MAX_COUNT = "doris.filter.query.in.max.count";
- Integer DORIS_FILTER_QUERY_IN_VALUE_UPPER_LIMIT = 10000;
-
String DORIS_USER = "username";
String DORIS_PASSWORD = "password";
-
- String DORIS_REQUEST_AUTH_USER = "doris.request.auth.user";
- String DORIS_REQUEST_AUTH_PASSWORD = "doris.request.auth.password";
String DORIS_REQUEST_RETRIES = "doris.request.retries";
- String DORIS_REQUEST_CONNECT_TIMEOUT_MS = "doris.request.connect.timeout.ms";
- String DORIS_REQUEST_READ_TIMEOUT_MS = "doris.request.read.timeout.ms";
- String DORIS_REQUEST_QUERY_TIMEOUT_S = "doris.request.query.timeout.s";
+ String DORIS_REQUEST_CONNECT_TIMEOUT_MS = "doris.request.connect.timeout";
+ String DORIS_REQUEST_READ_TIMEOUT_MS = "doris.request.read.timeout";
+ String DORIS_REQUEST_QUERY_TIMEOUT_S = "doris.request.query.timeout";
Integer DORIS_REQUEST_RETRIES_DEFAULT = 3;
Integer DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT = 30 * 1000;
Integer DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT = 30 * 1000;
@@ -53,12 +46,9 @@
String DORIS_EXEC_MEM_LIMIT = "doris.exec.mem.limit";
Long DORIS_EXEC_MEM_LIMIT_DEFAULT = 2147483648L;
-
- String DORIS_VALUE_READER_CLASS = "doris.value.reader.class";
-
+ String DORIS_EXEC_MEM_LIMIT_DEFAULT_STR = "2048mb";
String DORIS_DESERIALIZE_ARROW_ASYNC = "doris.deserialize.arrow.async";
Boolean DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT = false;
-
String DORIS_DESERIALIZE_QUEUE_SIZE = "doris.deserialize.queue.size";
Integer DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64;
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index c698871..b91b04b 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -19,6 +19,7 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.MemorySize;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.doris.flink.sink.writer.WriteMode;
@@ -30,7 +31,7 @@
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT;
-import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT;
+import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT_STR;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
@@ -95,47 +96,52 @@
ConfigOptions.key("doris.request.tablet.size")
.intType()
.defaultValue(DORIS_TABLET_SIZE_DEFAULT)
- .withDescription("");
- public static final ConfigOption<Integer> DORIS_REQUEST_CONNECT_TIMEOUT_MS =
- ConfigOptions.key("doris.request.connect.timeout.ms")
- .intType()
- .defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)
- .withDescription("");
- public static final ConfigOption<Integer> DORIS_REQUEST_READ_TIMEOUT_MS =
- ConfigOptions.key("doris.request.read.timeout.ms")
- .intType()
- .defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)
- .withDescription("");
- public static final ConfigOption<Integer> DORIS_REQUEST_QUERY_TIMEOUT_S =
- ConfigOptions.key("doris.request.query.timeout.s")
- .intType()
- .defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT)
- .withDescription("");
+ .withDescription(
+ "The number of Doris Tablets corresponding to a Partition. The smaller this value is set, the more Partitions will be generated. This improves the parallelism on the Flink side, but at the same time puts more pressure on Doris.");
+ public static final ConfigOption<Duration> DORIS_REQUEST_CONNECT_TIMEOUT_MS =
+ ConfigOptions.key("doris.request.connect.timeout")
+ .durationType()
+ .defaultValue(Duration.ofMillis(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT))
+ .withDescription("Connection timeout for sending requests to Doris");
+ public static final ConfigOption<Duration> DORIS_REQUEST_READ_TIMEOUT_MS =
+ ConfigOptions.key("doris.request.read.timeout")
+ .durationType()
+ .defaultValue(Duration.ofMillis(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT))
+ .withDescription("Read timeout for sending requests to Doris");
+ public static final ConfigOption<Duration> DORIS_REQUEST_QUERY_TIMEOUT_S =
+ ConfigOptions.key("doris.request.query.timeout")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT))
+ .withDescription(
+ "The timeout time for querying Doris, the default value is 1 hour, -1 means no timeout limit");
public static final ConfigOption<Integer> DORIS_REQUEST_RETRIES =
ConfigOptions.key("doris.request.retries")
.intType()
.defaultValue(DORIS_REQUEST_RETRIES_DEFAULT)
- .withDescription("");
+ .withDescription("Number of retries to send requests to Doris");
public static final ConfigOption<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC =
ConfigOptions.key("doris.deserialize.arrow.async")
.booleanType()
.defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT)
- .withDescription("");
+ .withDescription(
+ "Whether to support asynchronous conversion of Arrow format to RowBatch needed for connector iterations");
public static final ConfigOption<Integer> DORIS_DESERIALIZE_QUEUE_SIZE =
ConfigOptions.key("doris.deserialize.queue.size")
.intType()
.defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)
- .withDescription("");
+ .withDescription(
+ "Asynchronous conversion of internal processing queue in Arrow format, effective when doris.deserialize.arrow.async is true");
public static final ConfigOption<Integer> DORIS_BATCH_SIZE =
ConfigOptions.key("doris.batch.size")
.intType()
.defaultValue(DORIS_BATCH_SIZE_DEFAULT)
- .withDescription("");
- public static final ConfigOption<Long> DORIS_EXEC_MEM_LIMIT =
+ .withDescription(
+ "The maximum number of rows to read data from BE at a time. Increasing this value reduces the number of connections established between Flink and Doris. Thereby reducing the additional time overhead caused by network delay.");
+ public static final ConfigOption<MemorySize> DORIS_EXEC_MEM_LIMIT =
ConfigOptions.key("doris.exec.mem.limit")
- .longType()
- .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT)
- .withDescription("");
+ .memoryType()
+ .defaultValue(MemorySize.parse(DORIS_EXEC_MEM_LIMIT_DEFAULT_STR))
+ .withDescription("Memory limit for a single query. The default is 2048mb.");
public static final ConfigOption<Boolean> SOURCE_USE_OLD_API =
ConfigOptions.key("source.use-old-api")
.booleanType()
@@ -198,20 +204,20 @@
.defaultValue(true)
.withDescription("enable 2PC while loading");
- public static final ConfigOption<Integer> SINK_CHECK_INTERVAL =
+ public static final ConfigOption<Duration> SINK_CHECK_INTERVAL =
ConfigOptions.key("sink.check-interval")
- .intType()
- .defaultValue(10000)
+ .durationType()
+ .defaultValue(Duration.ofMillis(10000))
.withDescription("check exception with the interval while loading");
public static final ConfigOption<Integer> SINK_MAX_RETRIES =
ConfigOptions.key("sink.max-retries")
.intType()
.defaultValue(3)
.withDescription("the max retry times if writing records to database failed.");
- public static final ConfigOption<Integer> SINK_BUFFER_SIZE =
+ public static final ConfigOption<MemorySize> SINK_BUFFER_SIZE =
ConfigOptions.key("sink.buffer-size")
- .intType()
- .defaultValue(1024 * 1024)
+ .memoryType()
+ .defaultValue(MemorySize.parse("1mb"))
.withDescription("the buffer size to cache data for stream load.");
public static final ConfigOption<Integer> SINK_BUFFER_COUNT =
ConfigOptions.key("sink.buffer-count")
@@ -263,10 +269,10 @@
.withDescription(
"The maximum number of flush items in each batch, the default is 5w");
- public static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_BYTES =
+ public static final ConfigOption<MemorySize> SINK_BUFFER_FLUSH_MAX_BYTES =
ConfigOptions.key("sink.buffer-flush.max-bytes")
- .intType()
- .defaultValue(10 * 1024 * 1024)
+ .memoryType()
+ .defaultValue(MemorySize.parse("10mb"))
.withDescription(
"The maximum number of bytes flushed in each batch, the default is 10MB");
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index bf5cd8c..f327b9c 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -204,13 +204,16 @@
final DorisReadOptions.Builder builder = DorisReadOptions.builder();
builder.setDeserializeArrowAsync(readableConfig.get(DORIS_DESERIALIZE_ARROW_ASYNC))
.setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE))
- .setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT))
+ .setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT).getBytes())
.setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY))
.setReadFields(readableConfig.get(DORIS_READ_FIELD))
- .setRequestQueryTimeoutS(readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S))
+ .setRequestQueryTimeoutS(
+ (int) readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S).getSeconds())
.setRequestBatchSize(readableConfig.get(DORIS_BATCH_SIZE))
- .setRequestConnectTimeoutMs(readableConfig.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS))
- .setRequestReadTimeoutMs(readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS))
+ .setRequestConnectTimeoutMs(
+ (int) readableConfig.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS).toMillis())
+ .setRequestReadTimeoutMs(
+ (int) readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS).toMillis())
.setRequestRetries(readableConfig.get(DORIS_REQUEST_RETRIES))
.setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE))
.setUseOldApi(readableConfig.get(SOURCE_USE_OLD_API));
@@ -220,9 +223,9 @@
private DorisExecutionOptions getDorisExecutionOptions(
ReadableConfig readableConfig, Properties streamLoadProp) {
final DorisExecutionOptions.Builder builder = DorisExecutionOptions.builder();
- builder.setCheckInterval(readableConfig.get(SINK_CHECK_INTERVAL));
+ builder.setCheckInterval((int) readableConfig.get(SINK_CHECK_INTERVAL).toMillis());
builder.setMaxRetries(readableConfig.get(SINK_MAX_RETRIES));
- builder.setBufferSize(readableConfig.get(SINK_BUFFER_SIZE));
+ builder.setBufferSize((int) readableConfig.get(SINK_BUFFER_SIZE).getBytes());
builder.setBufferCount(readableConfig.get(SINK_BUFFER_COUNT));
builder.setLabelPrefix(readableConfig.get(SINK_LABEL_PREFIX));
builder.setStreamLoadProp(streamLoadProp);
@@ -245,7 +248,8 @@
}
builder.setFlushQueueSize(readableConfig.get(SINK_FLUSH_QUEUE_SIZE));
builder.setBufferFlushMaxRows(readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS));
- builder.setBufferFlushMaxBytes(readableConfig.get(SINK_BUFFER_FLUSH_MAX_BYTES));
+ builder.setBufferFlushMaxBytes(
+ (int) readableConfig.get(SINK_BUFFER_FLUSH_MAX_BYTES).getBytes());
builder.setBufferFlushIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
builder.setUseCache(readableConfig.get(SINK_USE_CACHE));
return builder.build();
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 6b0a0fd..14d3fbb 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -260,10 +260,10 @@
.ifPresent(executionBuilder::setBufferCount);
sinkConfig
.getOptional(DorisConfigOptions.SINK_BUFFER_SIZE)
- .ifPresent(executionBuilder::setBufferSize);
+ .ifPresent(v -> executionBuilder.setBufferSize((int) v.getBytes()));
sinkConfig
.getOptional(DorisConfigOptions.SINK_CHECK_INTERVAL)
- .ifPresent(executionBuilder::setCheckInterval);
+ .ifPresent(v -> executionBuilder.setCheckInterval((int) v.toMillis()));
sinkConfig
.getOptional(DorisConfigOptions.SINK_MAX_RETRIES)
.ifPresent(executionBuilder::setMaxRetries);
@@ -289,7 +289,7 @@
.ifPresent(executionBuilder::setBufferFlushMaxRows);
sinkConfig
.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_BYTES)
- .ifPresent(executionBuilder::setBufferFlushMaxBytes);
+ .ifPresent(v -> executionBuilder.setBufferFlushMaxBytes((int) v.getBytes()));
sinkConfig
.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_INTERVAL)
.ifPresent(v -> executionBuilder.setBufferFlushIntervalMs(v.toMillis()));