blob: 6d74bf87f166ebd6aa7d173e5bc3466e55b94d22 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.flink.table;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.doris.flink.sink.writer.WriteMode;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT_STR;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
public class DorisConfigOptions {
public static final String IDENTIFIER = "doris";
// common option
public static final ConfigOption<String> FENODES =
ConfigOptions.key("fenodes")
.stringType()
.noDefaultValue()
.withDescription("doris fe http address.");
public static final ConfigOption<String> BENODES =
ConfigOptions.key("benodes")
.stringType()
.noDefaultValue()
.withDescription("doris be http address.");
public static final ConfigOption<String> TABLE_IDENTIFIER =
ConfigOptions.key("table.identifier")
.stringType()
.noDefaultValue()
.withDescription("the doris table name.");
public static final ConfigOption<String> USERNAME =
ConfigOptions.key("username")
.stringType()
.noDefaultValue()
.withDescription("the doris user name.");
public static final ConfigOption<String> PASSWORD =
ConfigOptions.key("password")
.stringType()
.noDefaultValue()
.withDescription("the doris password.");
public static final ConfigOption<String> JDBC_URL =
ConfigOptions.key("jdbc-url")
.stringType()
.noDefaultValue()
.withDescription("doris jdbc url address.");
public static final ConfigOption<Boolean> AUTO_REDIRECT =
ConfigOptions.key("auto-redirect")
.booleanType()
.defaultValue(true)
.withDescription(
"Use automatic redirection of fe without explicitly obtaining the be list");
// source config options
public static final ConfigOption<Integer> DORIS_TABLET_SIZE =
ConfigOptions.key("doris.request.tablet.size")
.intType()
.defaultValue(DORIS_TABLET_SIZE_DEFAULT)
.withDescription(
"The number of Doris Tablets corresponding to a Partition. The smaller this value is set, the more Partitions will be generated. This improves the parallelism on the Flink side, but at the same time puts more pressure on Doris.");
public static final ConfigOption<Duration> DORIS_REQUEST_CONNECT_TIMEOUT_MS =
ConfigOptions.key("doris.request.connect.timeout")
.durationType()
.defaultValue(Duration.ofMillis(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT))
.withDescription("Connection timeout for sending requests to Doris");
public static final ConfigOption<Duration> DORIS_REQUEST_READ_TIMEOUT_MS =
ConfigOptions.key("doris.request.read.timeout")
.durationType()
.defaultValue(Duration.ofMillis(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT))
.withDescription("Read timeout for sending requests to Doris");
public static final ConfigOption<Duration> DORIS_REQUEST_QUERY_TIMEOUT_S =
ConfigOptions.key("doris.request.query.timeout")
.durationType()
.defaultValue(Duration.ofSeconds(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT))
.withDescription(
"The timeout time for querying Doris, the default value is 1 hour, -1 means no timeout limit");
public static final ConfigOption<Integer> DORIS_REQUEST_RETRIES =
ConfigOptions.key("doris.request.retries")
.intType()
.defaultValue(DORIS_REQUEST_RETRIES_DEFAULT)
.withDescription("Number of retries to send requests to Doris");
public static final ConfigOption<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC =
ConfigOptions.key("doris.deserialize.arrow.async")
.booleanType()
.defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT)
.withDescription(
"Whether to support asynchronous conversion of Arrow format to RowBatch needed for connector iterations");
public static final ConfigOption<Integer> DORIS_DESERIALIZE_QUEUE_SIZE =
ConfigOptions.key("doris.deserialize.queue.size")
.intType()
.defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)
.withDescription(
"Asynchronous conversion of internal processing queue in Arrow format, effective when doris.deserialize.arrow.async is true");
public static final ConfigOption<Integer> DORIS_BATCH_SIZE =
ConfigOptions.key("doris.batch.size")
.intType()
.defaultValue(DORIS_BATCH_SIZE_DEFAULT)
.withDescription(
"The maximum number of rows to read data from BE at a time. Increasing this value reduces the number of connections established between Flink and Doris. Thereby reducing the additional time overhead caused by network delay.");
public static final ConfigOption<MemorySize> DORIS_EXEC_MEM_LIMIT =
ConfigOptions.key("doris.exec.mem.limit")
.memoryType()
.defaultValue(MemorySize.parse(DORIS_EXEC_MEM_LIMIT_DEFAULT_STR))
.withDescription("Memory limit for a single query. The default is 2048mb.");
public static final ConfigOption<Boolean> SOURCE_USE_OLD_API =
ConfigOptions.key("source.use-old-api")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to read data using the new interface defined according to the FLIP-27 specification,default false");
// Lookup options
public static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS =
ConfigOptions.key("lookup.cache.max-rows")
.longType()
.defaultValue(-1L)
.withDescription(
"The max number of rows of lookup cache, over this value, the oldest rows will "
+ "be eliminated. \"cache.max-rows\" and \"cache.ttl\" options must all be specified if any of them is "
+ "specified.");
public static final ConfigOption<Duration> LOOKUP_CACHE_TTL =
ConfigOptions.key("lookup.cache.ttl")
.durationType()
.defaultValue(Duration.ofSeconds(10))
.withDescription("The cache time to live.");
public static final ConfigOption<Integer> LOOKUP_MAX_RETRIES =
ConfigOptions.key("lookup.max-retries")
.intType()
.defaultValue(1)
.withDescription("The max retry times if lookup database failed.");
public static final ConfigOption<Integer> LOOKUP_JDBC_READ_BATCH_SIZE =
ConfigOptions.key("lookup.jdbc.read.batch.size")
.intType()
.defaultValue(128)
.withDescription(
"when dimension table query, save the maximum number of batches.");
public static final ConfigOption<Integer> LOOKUP_JDBC_READ_BATCH_QUEUE_SIZE =
ConfigOptions.key("lookup.jdbc.read.batch.queue-size")
.intType()
.defaultValue(256)
.withDescription("dimension table query request buffer queue size.");
public static final ConfigOption<Integer> LOOKUP_JDBC_READ_THREAD_SIZE =
ConfigOptions.key("lookup.jdbc.read.thread-size")
.intType()
.defaultValue(3)
.withDescription(
"the number of threads for dimension table query, each query occupies a JDBC connection");
public static final ConfigOption<Boolean> LOOKUP_JDBC_ASYNC =
ConfigOptions.key("lookup.jdbc.async")
.booleanType()
.defaultValue(false)
.withDescription("whether to set async lookup");
// sink config options
public static final ConfigOption<Boolean> SINK_ENABLE_2PC =
ConfigOptions.key("sink.enable-2pc")
.booleanType()
.defaultValue(true)
.withDescription("enable 2PC while loading");
public static final ConfigOption<Duration> SINK_CHECK_INTERVAL =
ConfigOptions.key("sink.check-interval")
.durationType()
.defaultValue(Duration.ofMillis(10000))
.withDescription("check exception with the interval while loading");
public static final ConfigOption<Integer> SINK_MAX_RETRIES =
ConfigOptions.key("sink.max-retries")
.intType()
.defaultValue(3)
.withDescription("the max retry times if writing records to database failed.");
public static final ConfigOption<MemorySize> SINK_BUFFER_SIZE =
ConfigOptions.key("sink.buffer-size")
.memoryType()
.defaultValue(MemorySize.parse("1mb"))
.withDescription("the buffer size to cache data for stream load.");
public static final ConfigOption<Integer> SINK_BUFFER_COUNT =
ConfigOptions.key("sink.buffer-count")
.intType()
.defaultValue(3)
.withDescription("the buffer count to cache data for stream load.");
public static final ConfigOption<String> SINK_LABEL_PREFIX =
ConfigOptions.key("sink.label-prefix")
.stringType()
.defaultValue("")
.withDescription("the unique label prefix.");
public static final ConfigOption<Boolean> SINK_ENABLE_DELETE =
ConfigOptions.key("sink.enable-delete")
.booleanType()
.defaultValue(true)
.withDescription("whether to enable the delete function");
public static final ConfigOption<String> SINK_WRITE_MODE =
ConfigOptions.key("sink.write-mode")
.stringType()
.defaultValue(WriteMode.STREAM_LOAD.name())
.withDescription("Write mode, supports stream_load, stream_load_batch");
public static final ConfigOption<Boolean> SINK_IGNORE_COMMIT_ERROR =
ConfigOptions.key("sink.ignore.commit-error")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to ignore commit errors. Usually used when the checkpoint cannot be restored to skip the commit of txn. The default is false.");
public static final ConfigOption<Integer> SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM;
public static final ConfigOption<Boolean> SINK_ENABLE_BATCH_MODE =
ConfigOptions.key("sink.enable.batch-mode")
.booleanType()
.defaultValue(false)
.withDescription("Whether to enable batch write mode");
public static final ConfigOption<Integer> SINK_FLUSH_QUEUE_SIZE =
ConfigOptions.key("sink.flush.queue-size")
.intType()
.defaultValue(2)
.withDescription("Queue length for async stream load, default is 2");
public static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS =
ConfigOptions.key("sink.buffer-flush.max-rows")
.intType()
.defaultValue(50000)
.withDescription(
"The maximum number of flush items in each batch, the default is 5w");
public static final ConfigOption<MemorySize> SINK_BUFFER_FLUSH_MAX_BYTES =
ConfigOptions.key("sink.buffer-flush.max-bytes")
.memoryType()
.defaultValue(MemorySize.parse("10mb"))
.withDescription(
"The maximum number of bytes flushed in each batch, the default is 10MB");
public static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL =
ConfigOptions.key("sink.buffer-flush.interval")
.durationType()
.defaultValue(Duration.ofSeconds(10))
.withDescription(
"the flush interval mills, over this time, asynchronous threads will flush data. The "
+ "default value is 10s.");
public static final ConfigOption<Boolean> SINK_IGNORE_UPDATE_BEFORE =
ConfigOptions.key("sink.ignore.update-before")
.booleanType()
.defaultValue(true)
.withDescription(
"In the CDC scenario, when the primary key of the upstream is inconsistent with that of the downstream, the update-before data needs to be passed to the downstream as deleted data, otherwise the data cannot be deleted.\n"
+ "The default is to ignore, that is, perform upsert semantics.");
public static final ConfigOption<Boolean> SINK_USE_CACHE =
ConfigOptions.key("sink.use-cache")
.booleanType()
.defaultValue(false)
.withDescription("Whether to use buffer cache for breakpoint resume");
// Prefix for Doris StreamLoad specific properties.
public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";
public static Properties getStreamLoadProp(Map<String, String> tableOptions) {
final Properties streamLoadProp = new Properties();
for (Map.Entry<String, String> entry : tableOptions.entrySet()) {
if (entry.getKey().startsWith(STREAM_LOAD_PROP_PREFIX)) {
String subKey = entry.getKey().substring(STREAM_LOAD_PROP_PREFIX.length());
streamLoadProp.put(subKey, entry.getValue());
}
}
return streamLoadProp;
}
}