commit | 9f30144120964eeaf9c2757afcfc1242e588a8f0 | [log] [tgz] |
---|---|---|
author | wudi <676366545@qq.com> | Thu Oct 19 10:05:09 2023 +0800 |
committer | GitHub <noreply@github.com> | Wed Oct 18 21:05:09 2023 -0500 |
tree | 90fd061296f51aadf00ea17933934b8ca69de6f1 | |
parent | 39465998add63518c2c86a0f55e7385f028e9acc [diff] |
[feature] support struct and map type (#212) Support doris map, struct type reading and writing ```java //doris create table CREATE TABLE `simple_map2` ( `id` int(11) NULL, `m` MAP<text,int(11)> NULL, `s_info` STRUCT<s_id:int(11),s_name:text,s_address:text> NULL ) ENGINE=OLAP DUPLICATE KEY(`id`) COMMENT 'OLAP' DISTRIBUTED BY HASH(`id`) BUCKETS 1 PROPERTIES ( "replication_allocation" = "tag.location.default: 1", "is_being_synced" = "false", "storage_format" = "V2", "light_schema_change" = "true", "disable_auto_compaction" = "false", "enable_single_replica_compaction" = "false" ); //datagen->doris tEnv.executeSql( "CREATE TABLE doris_test (" + " id int,\n" + " task Map<String,int>,\n" + " buyer ROW<s_id int,s_name string, s_address string>\n" + ") " + "WITH (\n" + " 'connector' = 'datagen', \n" + " 'number-of-rows' = '11' \n" + ")"); tEnv.executeSql("CREATE TABLE blackhole_table (" + "id int," + "m Map<String,int>," + "s_info Row<s_id int,s_name string, s_address string>" + ") WITH (" + " 'connector' = 'doris',\n" + " 'fenodes' = '127.0.0.1:8030',\n" + " 'table.identifier' = 'test.simple_map2',\n" + " 'sink.enable-2pc' = 'false',\n" + " 'username' = 'root',\n" + " 'password' = '',\n" + " 'sink.properties.format' = 'json',\n" + " 'sink.properties.read_json_by_line' = 'true'\n" + ");"); tEnv.executeSql("INSERT INTO blackhole_table select * from doris_test"); //doris->doris tEnv.executeSql( "CREATE TABLE doris_source (" + "id int," + "m Map<String,int>," + "s_info Row<s_id int,s_name string, s_address string>" + ") " + "WITH (\n" + " 'connector' = 'doris',\n" + " 'fenodes' = '127.0.0.1:8030',\n" + " 'table.identifier' = 'test.simple_map',\n" + " 'username' = 'root',\n" + " 'password' = ''\n" + ")"); tEnv.executeSql("CREATE TABLE blackhole_table (" + "id int," + "m Map<String,int>," + "s_info Row<s_id int,s_name string, s_address string>" + ") WITH (" + " 'connector' = 'doris',\n" + " 'fenodes' = '127.0.0.1:8030',\n" + " 'table.identifier' = 'test.simple_map2',\n" + " 'sink.enable-2pc' = 'false',\n" + " 'username' = 'root',\n" + " 'password' = '',\n" + " 'sink.properties.format' = 'json',\n" + " 'sink.properties.read_json_by_line' = 'true'\n" + ");"); tEnv.executeSql("insert into blackhole_table select * from doris_source"); ```
Flink Doris Connector now support flink version from 1.11 to 1.17.
If you wish to contribute or use a connector from flink 1.13 (and earlier), please use the branch-for-flink-before-1.13
More information about compilation and usage, please visit Flink Doris Connector
You need to copy customer_env.sh.tpl to customer_env.sh before build and you need to configure it before build.
git clone git@github.com:apache/doris-flink-connector.git cd doris-flink-connector/flink-doris-connector ./build.sh
If you find any bugs, feel free to file a GitHub issue or fix it by submitting a pull request.
Contact us through the following mailing list.
Name | Scope | |||
---|---|---|---|---|
dev@doris.apache.org | Development-related discussions | Subscribe | Unsubscribe | Archives |