[feature] support  struct and map type (#212)

Support doris map, struct type reading and writing

```java
//doris create table
CREATE TABLE `simple_map2` (
  `id` int(11) NULL,
  `m` MAP<text,int(11)> NULL,
  `s_info` STRUCT<s_id:int(11),s_name:text,s_address:text> NULL
) ENGINE=OLAP
DUPLICATE KEY(`id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"is_being_synced" = "false",
"storage_format" = "V2",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false"
); 

//datagen->doris
tEnv.executeSql(
         "CREATE TABLE doris_test (" +
         "  id int,\n" +
         "  task Map<String,int>,\n" +
         "  buyer ROW<s_id int,s_name string, s_address string>\n" +
         ") " +
         "WITH (\n" +
         "  'connector' = 'datagen', \n" +
         "  'number-of-rows' = '11' \n" +
         ")");

tEnv.executeSql("CREATE TABLE blackhole_table (" +
         "id int," +
         "m Map<String,int>," +
         "s_info Row<s_id int,s_name string, s_address string>" +
         ") WITH (" +
         "  'connector' = 'doris',\n" +
         "  'fenodes' = '127.0.0.1:8030',\n" +
         "  'table.identifier' = 'test.simple_map2',\n" +
         "  'sink.enable-2pc' = 'false',\n" +
         "  'username' = 'root',\n" +
         "  'password' = '',\n" +
         "  'sink.properties.format' = 'json',\n" +
         "  'sink.properties.read_json_by_line' = 'true'\n" +
         ");");

tEnv.executeSql("INSERT INTO blackhole_table select * from doris_test");
 


//doris->doris
tEnv.executeSql(
        "CREATE TABLE doris_source (" +
        "id int," +
        "m Map<String,int>," +
        "s_info Row<s_id int,s_name string, s_address string>" +
        ") " +
        "WITH (\n" +
        "  'connector' = 'doris',\n" +
        "  'fenodes' = '127.0.0.1:8030',\n" +
        "  'table.identifier' = 'test.simple_map',\n" +
        "  'username' = 'root',\n" +
        "  'password' = ''\n" +
        ")");

tEnv.executeSql("CREATE TABLE blackhole_table (" +
                "id int," +
                "m Map<String,int>," +
                "s_info Row<s_id int,s_name string, s_address string>" +
                ") WITH (" +
                "  'connector' = 'doris',\n" +
                "  'fenodes' = '127.0.0.1:8030',\n" +
                "  'table.identifier' = 'test.simple_map2',\n" +
                "  'sink.enable-2pc' = 'false',\n" +
                "  'username' = 'root',\n" +
                "  'password' = '',\n" +
                "  'sink.properties.format' = 'json',\n" +
                "  'sink.properties.read_json_by_line' = 'true'\n" +
                ");");

tEnv.executeSql("insert into blackhole_table select * from doris_source");
```
3 files changed
tree: 90fd061296f51aadf00ea17933934b8ca69de6f1
  1. .github/
  2. .mvn/
  3. flink-doris-connector/
  4. .asf.yaml
  5. .gitignore
  6. .licenserc.yaml
  7. CODE_OF_CONDUCT.md
  8. CONTRIBUTING.md
  9. CONTRIBUTING_CN.md
  10. custom_env.sh.tpl
  11. env.sh
  12. LICENSE.txt
  13. mvnw
  14. NOTICE.txt
  15. README.md
README.md

Flink Connector for Apache Doris

License Join the Doris Community at Slack

Flink Doris Connector

Flink Doris Connector now support flink version from 1.11 to 1.17.

If you wish to contribute or use a connector from flink 1.13 (and earlier), please use the branch-for-flink-before-1.13

More information about compilation and usage, please visit Flink Doris Connector

License

Apache License, Version 2.0

How to Build

You need to copy customer_env.sh.tpl to customer_env.sh before build and you need to configure it before build.

git clone git@github.com:apache/doris-flink-connector.git
cd doris-flink-connector/flink-doris-connector
./build.sh

how-to-build

Report issues or submit pull request

If you find any bugs, feel free to file a GitHub issue or fix it by submitting a pull request.

Contact Us

Contact us through the following mailing list.

NameScope
dev@doris.apache.orgDevelopment-related discussionsSubscribeUnsubscribeArchives

Links