blob: be00cff1718b27c3acda6f735ab9b84cf6c02f10 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.flink.table;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.TableSchemaUtils;
import java.time.Duration;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
/**
* The {@link DorisDynamicTableFactory} translates the catalog table to a table source.
*
* <p>Because the table source requires a decoding format, we are discovering the format using the
* provided {@link FactoryUtil} for convenience.
*/
public final class DorisDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
public static final ConfigOption<String> FENODES = ConfigOptions.key("fenodes").stringType().noDefaultValue().withDescription("doris fe http address.");
public static final ConfigOption<String> TABLE_IDENTIFIER = ConfigOptions.key("table.identifier").stringType().noDefaultValue().withDescription("the jdbc table name.");
public static final ConfigOption<String> USERNAME = ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the jdbc user name.");
public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the jdbc password.");
// Prefix for Doris StreamLoad specific properties.
public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";
// doris options
private static final ConfigOption<String> DORIS_READ_FIELD = ConfigOptions
.key("doris.read.field")
.stringType()
.noDefaultValue()
.withDescription("List of column names in the Doris table, separated by commas");
private static final ConfigOption<String> DORIS_FILTER_QUERY = ConfigOptions
.key("doris.filter.query")
.stringType()
.noDefaultValue()
.withDescription("Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering");
private static final ConfigOption<Integer> DORIS_TABLET_SIZE = ConfigOptions
.key("doris.request.tablet.size")
.intType()
.defaultValue(DORIS_TABLET_SIZE_DEFAULT)
.withDescription("");
private static final ConfigOption<Integer> DORIS_REQUEST_CONNECT_TIMEOUT_MS = ConfigOptions
.key("doris.request.connect.timeout.ms")
.intType()
.defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)
.withDescription("");
private static final ConfigOption<Integer> DORIS_REQUEST_READ_TIMEOUT_MS = ConfigOptions
.key("doris.request.read.timeout.ms")
.intType()
.defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)
.withDescription("");
private static final ConfigOption<Integer> DORIS_REQUEST_QUERY_TIMEOUT_S = ConfigOptions
.key("doris.request.query.timeout.s")
.intType()
.defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT)
.withDescription("");
private static final ConfigOption<Integer> DORIS_REQUEST_RETRIES = ConfigOptions
.key("doris.request.retries")
.intType()
.defaultValue(DORIS_REQUEST_RETRIES_DEFAULT)
.withDescription("");
private static final ConfigOption<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC = ConfigOptions
.key("doris.deserialize.arrow.async")
.booleanType()
.defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT)
.withDescription("");
private static final ConfigOption<Integer> DORIS_DESERIALIZE_QUEUE_SIZE = ConfigOptions
.key("doris.request.retriesdoris.deserialize.queue.size")
.intType()
.defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)
.withDescription("");
private static final ConfigOption<Integer> DORIS_BATCH_SIZE = ConfigOptions
.key("doris.batch.size")
.intType()
.defaultValue(DORIS_BATCH_SIZE_DEFAULT)
.withDescription("");
private static final ConfigOption<Long> DORIS_EXEC_MEM_LIMIT = ConfigOptions
.key("doris.exec.mem.limit")
.longType()
.defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT)
.withDescription("");
// flink write config options
private static final ConfigOption<Boolean> SINK_ENABLE_2PC = ConfigOptions
.key("sink.enable-2pc")
.booleanType()
.defaultValue(true)
.withDescription("enable 2PC while loading");
private static final ConfigOption<Integer> SINK_CHECK_INTERVAL = ConfigOptions
.key("sink.check-interval")
.intType()
.defaultValue(10000)
.withDescription("check exception with the interval while loading");
private static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions
.key("sink.max-retries")
.intType()
.defaultValue(3)
.withDescription("the max retry times if writing records to database failed.");
private static final ConfigOption<Integer> SINK_BUFFER_SIZE = ConfigOptions
.key("sink.buffer-size")
.intType()
.defaultValue(256 * 1024)
.withDescription("the buffer size to cache data for stream load.");
private static final ConfigOption<Integer> SINK_BUFFER_COUNT = ConfigOptions
.key("sink.buffer-count")
.intType()
.defaultValue(3)
.withDescription("the buffer count to cache data for stream load.");
private static final ConfigOption<String> SINK_LABEL_PREFIX = ConfigOptions
.key("sink.label-prefix")
.stringType()
.noDefaultValue()
.withDescription("the unique label prefix.");
private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions
.key("sink.batch.interval")
.durationType()
.defaultValue(Duration.ofSeconds(1))
.withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The " +
"default value is 1s.");
private static final ConfigOption<Boolean> SINK_ENABLE_DELETE = ConfigOptions
.key("sink.enable-delete")
.booleanType()
.defaultValue(true)
.withDescription("whether to enable the delete function");
private static final ConfigOption<Boolean> SOURCE_USE_OLD_API = ConfigOptions
.key("source.use-old-api")
.booleanType()
.defaultValue(false)
.withDescription("Whether to read data using the new interface defined according to the FLIP-27 specification,default false");
@Override
public String factoryIdentifier() {
return "doris"; // used for matching to `connector = '...'`
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(FENODES);
options.add(TABLE_IDENTIFIER);
return options;
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(FENODES);
options.add(TABLE_IDENTIFIER);
options.add(USERNAME);
options.add(PASSWORD);
options.add(DORIS_READ_FIELD);
options.add(DORIS_FILTER_QUERY);
options.add(DORIS_TABLET_SIZE);
options.add(DORIS_REQUEST_CONNECT_TIMEOUT_MS);
options.add(DORIS_REQUEST_READ_TIMEOUT_MS);
options.add(DORIS_REQUEST_QUERY_TIMEOUT_S);
options.add(DORIS_REQUEST_RETRIES);
options.add(DORIS_DESERIALIZE_ARROW_ASYNC);
options.add(DORIS_DESERIALIZE_QUEUE_SIZE);
options.add(DORIS_BATCH_SIZE);
options.add(DORIS_EXEC_MEM_LIMIT);
options.add(SINK_CHECK_INTERVAL);
options.add(SINK_ENABLE_2PC);
options.add(SINK_MAX_RETRIES);
options.add(SINK_BUFFER_FLUSH_INTERVAL);
options.add(SINK_ENABLE_DELETE);
options.add(SINK_LABEL_PREFIX);
options.add(SINK_BUFFER_SIZE);
options.add(SINK_BUFFER_COUNT);
options.add(SOURCE_USE_OLD_API);
return options;
}
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
// either implement your custom validation logic here ...
// or use the provided helper utility
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
// validate all options
helper.validateExcept(STREAM_LOAD_PROP_PREFIX);
// get the validated options
final ReadableConfig options = helper.getOptions();
// derive the produced data type (excluding computed columns) from the catalog table
final DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
// create and return dynamic table source
return new DorisDynamicTableSource(
getDorisOptions(helper.getOptions()),
getDorisReadOptions(helper.getOptions()),
physicalSchema);
}
private DorisOptions getDorisOptions(ReadableConfig readableConfig) {
final String fenodes = readableConfig.get(FENODES);
final DorisOptions.Builder builder = DorisOptions.builder()
.setFenodes(fenodes)
.setTableIdentifier(readableConfig.get(TABLE_IDENTIFIER));
readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
return builder.build();
}
private DorisReadOptions getDorisReadOptions(ReadableConfig readableConfig) {
final DorisReadOptions.Builder builder = DorisReadOptions.builder();
builder.setDeserializeArrowAsync(readableConfig.get(DORIS_DESERIALIZE_ARROW_ASYNC))
.setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE))
.setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT))
.setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY))
.setReadFields(readableConfig.get(DORIS_READ_FIELD))
.setRequestQueryTimeoutS(readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S))
.setRequestBatchSize(readableConfig.get(DORIS_BATCH_SIZE))
.setRequestConnectTimeoutMs(readableConfig.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS))
.setRequestReadTimeoutMs(readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS))
.setRequestRetries(readableConfig.get(DORIS_REQUEST_RETRIES))
.setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE))
.setUseOldApi(readableConfig.get(SOURCE_USE_OLD_API));
return builder.build();
}
private DorisExecutionOptions getDorisExecutionOptions(ReadableConfig readableConfig, Properties streamLoadProp) {
final DorisExecutionOptions.Builder builder = DorisExecutionOptions.builder();
builder.setCheckInterval(readableConfig.get(SINK_CHECK_INTERVAL));
builder.setMaxRetries(readableConfig.get(SINK_MAX_RETRIES));
builder.setBufferSize(readableConfig.get(SINK_BUFFER_SIZE));
builder.setBufferCount(readableConfig.get(SINK_BUFFER_COUNT));
builder.setLabelPrefix(readableConfig.get(SINK_LABEL_PREFIX));
builder.setStreamLoadProp(streamLoadProp);
builder.setDeletable(readableConfig.get(SINK_ENABLE_DELETE));
if (!readableConfig.get(SINK_ENABLE_2PC)) {
builder.disable2PC();
}
return builder.build();
}
private Properties getStreamLoadProp(Map<String, String> tableOptions) {
final Properties streamLoadProp = new Properties();
for (Map.Entry<String, String> entry : tableOptions.entrySet()) {
if (entry.getKey().startsWith(STREAM_LOAD_PROP_PREFIX)) {
String subKey = entry.getKey().substring(STREAM_LOAD_PROP_PREFIX.length());
streamLoadProp.put(subKey, entry.getValue());
}
}
return streamLoadProp;
}
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(
this, context);
// validate all options
helper.validateExcept(STREAM_LOAD_PROP_PREFIX);
Properties streamLoadProp = getStreamLoadProp(context.getCatalogTable().getOptions());
TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
// create and return dynamic table source
return new DorisDynamicTableSink(
getDorisOptions(helper.getOptions()),
getDorisReadOptions(helper.getOptions()),
getDorisExecutionOptions(helper.getOptions(), streamLoadProp),
physicalSchema
);
}
}