commit | 0b2a14a93d3b65115c25fe59d84d6b3375da517d | [log] [tgz] |
---|---|---|
author | wudi <676366545@qq.com> | Mon Nov 06 16:56:04 2023 +0800 |
committer | GitHub <noreply@github.com> | Mon Nov 06 16:56:04 2023 +0800 |
tree | 6faf947d5ac48f6f2f84f8a4ddc04c1db7cac0af | |
parent | a4b4bdfc92bb8fecefffb6f4f81b0a8f577d142e [diff] |
[improve] add multi table sink to DorisSink (#224) DorisSink supports multi-table import. Example: ```java Configuration config = new Configuration(); // config.setString("execution.savepoint.path","/tmp/checkpoint/chk-6"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); env.setParallelism(1); env.getCheckpointConfig().setCheckpointStorage("file:///tmp/checkpoint/"); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(10000))); env.enableCheckpointing(10000); DorisSink.Builder<RecordWithMeta> builder = DorisSink.builder(); final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder(); Properties properties = new Properties(); properties.setProperty("column_separator", ","); properties.setProperty("line_delimiter", "\n"); properties.setProperty("format", "csv"); DorisOptions.Builder dorisBuilder = DorisOptions.builder(); dorisBuilder.setFenodes("127.0.0.1:8030") .setTableIdentifier("") .setUsername("root") .setPassword(""); DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); executionBuilder.setLabelPrefix("xxx12") .setStreamLoadProp(properties) .setDeletable(false).enable2PC(); builder.setDorisReadOptions(readOptionBuilder.build()) .setDorisExecutionOptions(executionBuilder.build()) .setDorisOptions(dorisBuilder.build()); RecordWithMeta record = new RecordWithMeta("test", "test_flink_tmp1", "wangwu,1"); RecordWithMeta record1 = new RecordWithMeta("test", "test_flink_tmp", "wangwu,1"); DataStreamSource<RecordWithMeta> stringDataStreamSource = env.fromCollection( Arrays.asList(record, record1)); stringDataStreamSource.sinkTo(builder.build()); ``` For details, please refer to `org.apache.doris.flink.DorisSinkStreamMultiTableExample.java`
Flink Doris Connector now support flink version from 1.11 to 1.17.
If you wish to contribute or use a connector from flink 1.13 (and earlier), please use the branch-for-flink-before-1.13
More information about compilation and usage, please visit Flink Doris Connector
You need to copy customer_env.sh.tpl to customer_env.sh before build and you need to configure it before build.
git clone git@github.com:apache/doris-flink-connector.git cd doris-flink-connector/flink-doris-connector ./build.sh
If you find any bugs, feel free to file a GitHub issue or fix it by submitting a pull request.
Contact us through the following mailing list.
Name | Scope | |||
---|---|---|---|---|
dev@doris.apache.org | Development-related discussions | Subscribe | Unsubscribe | Archives |