blob: 7cb557e0f4f3f2c4bfaabaa96b82866e5b07a886 [file] [log] [blame]
"use strict";(self.webpackChunkdoris_website=self.webpackChunkdoris_website||[]).push([[67942],{15680:(e,t,n)=>{n.d(t,{xA:()=>p,yg:()=>y});var a=n(296540);function r(e,t,n){return t in e?Object.defineProperty(e,t,{value:n,enumerable:!0,configurable:!0,writable:!0}):e[t]=n,e}function i(e,t){var n=Object.keys(e);if(Object.getOwnPropertySymbols){var a=Object.getOwnPropertySymbols(e);t&&(a=a.filter((function(t){return Object.getOwnPropertyDescriptor(e,t).enumerable}))),n.push.apply(n,a)}return n}function l(e){for(var t=1;t<arguments.length;t++){var n=null!=arguments[t]?arguments[t]:{};t%2?i(Object(n),!0).forEach((function(t){r(e,t,n[t])})):Object.getOwnPropertyDescriptors?Object.defineProperties(e,Object.getOwnPropertyDescriptors(n)):i(Object(n)).forEach((function(t){Object.defineProperty(e,t,Object.getOwnPropertyDescriptor(n,t))}))}return e}function o(e,t){if(null==e)return{};var n,a,r=function(e,t){if(null==e)return{};var n,a,r={},i=Object.keys(e);for(a=0;a<i.length;a++)n=i[a],t.indexOf(n)>=0||(r[n]=e[n]);return r}(e,t);if(Object.getOwnPropertySymbols){var i=Object.getOwnPropertySymbols(e);for(a=0;a<i.length;a++)n=i[a],t.indexOf(n)>=0||Object.prototype.propertyIsEnumerable.call(e,n)&&(r[n]=e[n])}return r}var s=a.createContext({}),d=function(e){var t=a.useContext(s),n=t;return e&&(n="function"==typeof e?e(t):l(l({},t),e)),n},p=function(e){var t=d(e.components);return a.createElement(s.Provider,{value:t},e.children)},g="mdxType",u={inlineCode:"code",wrapper:function(e){var t=e.children;return a.createElement(a.Fragment,{},t)}},m=a.forwardRef((function(e,t){var n=e.components,r=e.mdxType,i=e.originalType,s=e.parentName,p=o(e,["components","mdxType","originalType","parentName"]),g=d(n),m=r,y=g["".concat(s,".").concat(m)]||g[m]||u[m]||i;return n?a.createElement(y,l(l({ref:t},p),{},{components:n})):a.createElement(y,l({ref:t},p))}));function y(e,t){var n=arguments,r=t&&t.mdxType;if("string"==typeof e||r){var i=n.length,l=new Array(i);l[0]=m;var o={};for(var s in t)hasOwnProperty.call(t,s)&&(o[s]=t[s]);o.originalType=e,o[g]="string"==typeof e?e:r,l[1]=o;for(var d=2;d<i;d++)l[d]=n[d];return a.createElement.apply(null,l)}return a.createElement.apply(null,n)}m.displayName="MDXCreateElement"},564394:(e,t,n)=>{n.r(t),n.d(t,{assets:()=>s,contentTitle:()=>l,default:()=>u,frontMatter:()=>i,metadata:()=>o,toc:()=>d});var a=n(58168),r=(n(296540),n(15680));const i={title:"Flink Doris Connector",language:"en"},l=void 0,o={unversionedId:"ecosystem/flink-doris-connector",id:"version-1.2/ecosystem/flink-doris-connector",title:"Flink Doris Connector",description:"\x3c!--",source:"@site/versioned_docs/version-1.2/ecosystem/flink-doris-connector.md",sourceDirName:"ecosystem",slug:"/ecosystem/flink-doris-connector",permalink:"/docs/1.2/ecosystem/flink-doris-connector",draft:!1,tags:[],version:"1.2",frontMatter:{title:"Flink Doris Connector",language:"en"},sidebar:"docs",previous:{title:"Spark Doris Connector",permalink:"/docs/1.2/ecosystem/spark-doris-connector"},next:{title:"DataX doriswriter",permalink:"/docs/1.2/ecosystem/datax"}},s={},d=[{value:"Version Compatibility",id:"version-compatibility",level:2},{value:"Build and Install",id:"build-and-install",level:2},{value:"Using Maven",id:"using-maven",level:2},{value:"How to use",id:"how-to-use",level:2},{value:"Parameters Configuration",id:"parameters-configuration",level:3},{value:"SQL",id:"sql",level:3},{value:"DataStream",id:"datastream",level:3},{value:"General",id:"general",level:3},{value:"Doris &amp; Flink Column Type Mapping",id:"doris--flink-column-type-mapping",level:2},{value:"An example of using Flink CDC to access Doris (supports insert/update/delete events)",id:"an-example-of-using-flink-cdc-to-access-doris-supports-insertupdatedelete-events",level:2},{value:"Use FlinkCDC to update Key column",id:"use-flinkcdc-to-update-key-column",level:2},{value:"Principle",id:"principle",level:3},{value:"Example",id:"example",level:3},{value:"Java example",id:"java-example",level:2},{value:"Best Practices",id:"best-practices",level:2},{value:"Application scenarios",id:"application-scenarios",level:3},{value:"Other",id:"other",level:3},{value:"FAQ",id:"faq",level:2}],p={toc:d},g="wrapper";function u(e){let{components:t,...n}=e;return(0,r.yg)(g,(0,a.A)({},p,n,{components:t,mdxType:"MDXLayout"}),(0,r.yg)("h1",{id:"flink-doris-connector"},"Flink Doris Connector"),(0,r.yg)("blockquote",null,(0,r.yg)("p",{parentName:"blockquote"},"This document applies to flink-doris-connector versions after 1.1.0, for versions before 1.1.0 refer to ",(0,r.yg)("a",{parentName:"p",href:"https://doris.apache.org/docs/0.15/extending-doris/flink-doris-connector"},"here"))),(0,r.yg)("p",null,"The Flink Doris Connector can support operations (read, insert, modify, delete) data stored in Doris through Flink."),(0,r.yg)("p",null,"Github: ",(0,r.yg)("a",{parentName:"p",href:"https://github.com/apache/doris-flink-connector"},"https://github.com/apache/doris-flink-connector")),(0,r.yg)("ul",null,(0,r.yg)("li",{parentName:"ul"},(0,r.yg)("inlineCode",{parentName:"li"},"Doris")," table can be mapped to ",(0,r.yg)("inlineCode",{parentName:"li"},"DataStream")," or ",(0,r.yg)("inlineCode",{parentName:"li"},"Table"),".")),(0,r.yg)("blockquote",null,(0,r.yg)("p",{parentName:"blockquote"},(0,r.yg)("strong",{parentName:"p"},"Note:")),(0,r.yg)("ol",{parentName:"blockquote"},(0,r.yg)("li",{parentName:"ol"},"Modification and deletion are only supported on the Unique Key model"),(0,r.yg)("li",{parentName:"ol"},"The current deletion is to support Flink CDC to access data to achieve automatic deletion. If it is to delete other data access methods, you need to implement it yourself. For the data deletion usage of Flink CDC, please refer to the last section of this document"))),(0,r.yg)("h2",{id:"version-compatibility"},"Version Compatibility"),(0,r.yg)("table",null,(0,r.yg)("thead",{parentName:"table"},(0,r.yg)("tr",{parentName:"thead"},(0,r.yg)("th",{parentName:"tr",align:null},"Connector Version"),(0,r.yg)("th",{parentName:"tr",align:null},"Flink Version"),(0,r.yg)("th",{parentName:"tr",align:null},"Doris Version"),(0,r.yg)("th",{parentName:"tr",align:null},"Java Version"),(0,r.yg)("th",{parentName:"tr",align:null},"Scala Version"))),(0,r.yg)("tbody",{parentName:"table"},(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"1.0.3"),(0,r.yg)("td",{parentName:"tr",align:null},"1.11+"),(0,r.yg)("td",{parentName:"tr",align:null},"0.15+"),(0,r.yg)("td",{parentName:"tr",align:null},"8"),(0,r.yg)("td",{parentName:"tr",align:null},"2.11,2.12")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"1.1.0"),(0,r.yg)("td",{parentName:"tr",align:null},"1.14"),(0,r.yg)("td",{parentName:"tr",align:null},"1.0+"),(0,r.yg)("td",{parentName:"tr",align:null},"8"),(0,r.yg)("td",{parentName:"tr",align:null},"2.11,2.12")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"1.2.0"),(0,r.yg)("td",{parentName:"tr",align:null},"1.15"),(0,r.yg)("td",{parentName:"tr",align:null},"1.0+"),(0,r.yg)("td",{parentName:"tr",align:null},"8"),(0,r.yg)("td",{parentName:"tr",align:null},"-")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"1.3.0"),(0,r.yg)("td",{parentName:"tr",align:null},"1.16"),(0,r.yg)("td",{parentName:"tr",align:null},"1.0+"),(0,r.yg)("td",{parentName:"tr",align:null},"8"),(0,r.yg)("td",{parentName:"tr",align:null},"-")))),(0,r.yg)("h2",{id:"build-and-install"},"Build and Install"),(0,r.yg)("p",null,"Ready to work"),(0,r.yg)("p",null,"1.Modify the ",(0,r.yg)("inlineCode",{parentName:"p"},"custom_env.sh.tpl")," file and rename it to ",(0,r.yg)("inlineCode",{parentName:"p"},"custom_env.sh")),(0,r.yg)("p",null,"2.Specify the thrift installation directory"),(0,r.yg)("pre",null,(0,r.yg)("code",{parentName:"pre",className:"language-bash"},"##source file content\n#export THRIFT_BIN=\n#export MVN_BIN=\n#export JAVA_HOME=\n\n##amend as below,MacOS as an example\nexport THRIFT_BIN=/opt/homebrew/Cellar/thrift@0.13.0/0.13.0/bin/thrift\n#export MVN_BIN=\n#export JAVA_HOME=\n")),(0,r.yg)("p",null,"Install ",(0,r.yg)("inlineCode",{parentName:"p"},"thrift")," 0.13.0 (Note: ",(0,r.yg)("inlineCode",{parentName:"p"},"Doris")," 0.15 and the latest builds are based on ",(0,r.yg)("inlineCode",{parentName:"p"},"thrift")," 0.13.0, previous versions are still built with ",(0,r.yg)("inlineCode",{parentName:"p"},"thrift")," 0.9.3)\nWindows:"),(0,r.yg)("ol",null,(0,r.yg)("li",{parentName:"ol"},"Download: ",(0,r.yg)("inlineCode",{parentName:"li"},"http://archive.apache.org/dist/thrift/0.13.0/thrift-0.13.0.exe")),(0,r.yg)("li",{parentName:"ol"},"Modify thrift-0.13.0.exe to thrift ")),(0,r.yg)("p",null,"MacOS:"),(0,r.yg)("ol",null,(0,r.yg)("li",{parentName:"ol"},"Download: ",(0,r.yg)("inlineCode",{parentName:"li"},"brew install thrift@0.13.0")),(0,r.yg)("li",{parentName:"ol"},"default address: /opt/homebrew/Cellar/",(0,r.yg)("a",{parentName:"li",href:"mailto:thrift@0.13.0"},"thrift@0.13.0"),"/0.13.0/bin/thrift")),(0,r.yg)("p",null,"Note: Executing ",(0,r.yg)("inlineCode",{parentName:"p"},"brew install thrift@0.13.0")," on MacOS may report an error that the version cannot be found. The solution is as follows, execute it in the terminal:"),(0,r.yg)("pre",null,(0,r.yg)("code",{parentName:"pre"},"1. `brew tap-new $USER/local-tap`\n2. `brew extract --version='0.13.0' thrift $USER/local-tap`\n3. `brew install thrift@0.13.0`\n")),(0,r.yg)("p",null," Reference link: ",(0,r.yg)("inlineCode",{parentName:"p"},"https://gist.github.com/tonydeng/02e571f273d6cce4230dc8d5f394493c")),(0,r.yg)("p",null,"Linux:"),(0,r.yg)("pre",null,(0,r.yg)("code",{parentName:"pre",className:"language-bash"}," 1. wget https://archive.apache.org/dist/thrift/0.13.0/thrift-0.13.0.tar.gz # Download source package\n 2. yum install -y autoconf automake libtool cmake ncurses-devel openssl-devel lzo-devel zlib-devel gcc gcc-c++ # Install dependencies\n 3. tar zxvf thrift-0.13.0.tar.gz\n 4. cd thrift-0.13.0\n 5. ./configure --without-tests\n 6. make\n 7. make install\n 8. thrift --version # Check the version after installation is complete\n")),(0,r.yg)("p",null," Note: If you have compiled Doris, you do not need to install thrift, you can directly use ",(0,r.yg)("inlineCode",{parentName:"p"},"$DORIS_HOME/thirdparty/installed/bin/thrift")),(0,r.yg)("p",null,"Execute following command in source dir:"),(0,r.yg)("pre",null,(0,r.yg)("code",{parentName:"pre",className:"language-bash"},"sh build.sh\n\n Usage:\n build.sh --flink version # specify flink version (after flink-doris-connector v1.2 and flink-1.15, there is no need to provide scala version)\n build.sh --tag # this is a build from tag\n e.g.:\n build.sh --flink 1.16.0\n build.sh --tag\n")),(0,r.yg)("p",null,"Then, for example, execute the command to compile according to the version you need:\n",(0,r.yg)("inlineCode",{parentName:"p"},"sh build.sh --flink 1.16.0")),(0,r.yg)("p",null,"After successful compilation, the file ",(0,r.yg)("inlineCode",{parentName:"p"},"flink-doris-connector-1.16-1.3.0-SNAPSHOT.jar")," will be generated in the ",(0,r.yg)("inlineCode",{parentName:"p"},"target/")," directory. Copy this file to ",(0,r.yg)("inlineCode",{parentName:"p"},"classpath")," in ",(0,r.yg)("inlineCode",{parentName:"p"},"Flink")," to use ",(0,r.yg)("inlineCode",{parentName:"p"},"Flink-Doris-Connector"),". For example, ",(0,r.yg)("inlineCode",{parentName:"p"},"Flink")," running in ",(0,r.yg)("inlineCode",{parentName:"p"},"Local")," mode, put this file in the ",(0,r.yg)("inlineCode",{parentName:"p"},"lib/")," folder. ",(0,r.yg)("inlineCode",{parentName:"p"},"Flink")," running in ",(0,r.yg)("inlineCode",{parentName:"p"},"Yarn")," cluster mode, put this file in the pre-deployment package."),(0,r.yg)("p",null,(0,r.yg)("strong",{parentName:"p"},"Remarks:")," "),(0,r.yg)("ol",null,(0,r.yg)("li",{parentName:"ol"},(0,r.yg)("p",{parentName:"li"},"Doris FE should be configured to enable http v2 in the configuration"),(0,r.yg)("p",{parentName:"li"},"conf/fe.conf"))),(0,r.yg)("pre",null,(0,r.yg)("code",{parentName:"pre"},"enable_http_server_v2 = true\n")),(0,r.yg)("h2",{id:"using-maven"},"Using Maven"),(0,r.yg)("p",null,"Add flink-doris-connector Maven dependencies"),(0,r.yg)("pre",null,(0,r.yg)("code",{parentName:"pre"},"\x3c!-- flink-doris-connector --\x3e\n<dependency>\n <groupId>org.apache.doris</groupId>\n <artifactId>flink-doris-connector-1.16</artifactId>\n <version>1.3.0</version>\n</dependency> \n")),(0,r.yg)("p",null,(0,r.yg)("strong",{parentName:"p"},"Notes")),(0,r.yg)("ol",null,(0,r.yg)("li",{parentName:"ol"},(0,r.yg)("p",{parentName:"li"},"Please replace the corresponding Connector and Flink dependency versions according to different Flink versions. Version 1.3.0 only supports Flink1.16")),(0,r.yg)("li",{parentName:"ol"},(0,r.yg)("p",{parentName:"li"},"You can also download the relevant version jar package from ",(0,r.yg)("a",{parentName:"p",href:"https://repo.maven.apache.org/maven2/org/apache/doris/"},"here"),"."))),(0,r.yg)("h2",{id:"how-to-use"},"How to use"),(0,r.yg)("p",null,"There are three ways to use Flink Doris Connector. "),(0,r.yg)("ul",null,(0,r.yg)("li",{parentName:"ul"},"SQL"),(0,r.yg)("li",{parentName:"ul"},"DataStream")),(0,r.yg)("h3",{id:"parameters-configuration"},"Parameters Configuration"),(0,r.yg)("p",null,"Flink Doris Connector Sink writes data to Doris by the ",(0,r.yg)("inlineCode",{parentName:"p"},"Stream load"),", and also supports the configurations of ",(0,r.yg)("inlineCode",{parentName:"p"},"Stream load"),", For specific parameters, please refer to ",(0,r.yg)("a",{parentName:"p",href:"/docs/1.2/data-operate/import/import-way/stream-load-manual"},"here"),"."),(0,r.yg)("ul",null,(0,r.yg)("li",{parentName:"ul"},"SQL configured by ",(0,r.yg)("inlineCode",{parentName:"li"},"sink.properties.")," in the ",(0,r.yg)("inlineCode",{parentName:"li"},"WITH")),(0,r.yg)("li",{parentName:"ul"},"DataStream configured by ",(0,r.yg)("inlineCode",{parentName:"li"},"DorisExecutionOptions.builder().setStreamLoadProp(Properties)"))),(0,r.yg)("h3",{id:"sql"},"SQL"),(0,r.yg)("ul",null,(0,r.yg)("li",{parentName:"ul"},"Source")),(0,r.yg)("pre",null,(0,r.yg)("code",{parentName:"pre",className:"language-sql"},"CREATE TABLE flink_doris_source (\n name STRING,\n age INT,\n price DECIMAL(5,2),\n sale DOUBLE\n ) \n WITH (\n 'connector' = 'doris',\n 'fenodes' = 'FE_IP:8030',\n 'table.identifier' = 'database.table',\n 'username' = 'root',\n 'password' = 'password'\n);\n")),(0,r.yg)("ul",null,(0,r.yg)("li",{parentName:"ul"},"Sink")),(0,r.yg)("pre",null,(0,r.yg)("code",{parentName:"pre",className:"language-sql"},"-- enable checkpoint\nSET 'execution.checkpointing.interval' = '10s';\nCREATE TABLE flink_doris_sink (\n name STRING,\n age INT,\n price DECIMAL(5,2),\n sale DOUBLE\n ) \n WITH (\n 'connector' = 'doris',\n 'fenodes' = 'FE_IP:8030',\n 'table.identifier' = 'db.table',\n 'username' = 'root',\n 'password' = 'password',\n 'sink.label-prefix' = 'doris_label'\n);\n")),(0,r.yg)("ul",null,(0,r.yg)("li",{parentName:"ul"},"Insert")),(0,r.yg)("pre",null,(0,r.yg)("code",{parentName:"pre",className:"language-sql"},"INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source\n")),(0,r.yg)("h3",{id:"datastream"},"DataStream"),(0,r.yg)("ul",null,(0,r.yg)("li",{parentName:"ul"},"Source")),(0,r.yg)("pre",null,(0,r.yg)("code",{parentName:"pre",className:"language-java"},'DorisOptions.Builder builder = DorisOptions.builder()\n .setFenodes("FE_IP:8030")\n .setTableIdentifier("db.table")\n .setUsername("root")\n .setPassword("password");\n\nDorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder()\n .setDorisOptions(builder.build())\n .setDorisReadOptions(DorisReadOptions.builder().build())\n .setDeserializer(new SimpleListDeserializationSchema())\n .build();\n\nenv.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print();\n')),(0,r.yg)("ul",null,(0,r.yg)("li",{parentName:"ul"},"Sink")),(0,r.yg)("p",null,(0,r.yg)("strong",{parentName:"p"},"String Stream")),(0,r.yg)("pre",null,(0,r.yg)("code",{parentName:"pre",className:"language-java"},'// enable checkpoint\nenv.enableCheckpointing(10000);\n// using batch mode for bounded data\nenv.setRuntimeMode(RuntimeExecutionMode.BATCH);\n\nDorisSink.Builder<String> builder = DorisSink.builder();\nDorisOptions.Builder dorisBuilder = DorisOptions.builder();\ndorisBuilder.setFenodes("FE_IP:8030")\n .setTableIdentifier("db.table")\n .setUsername("root")\n .setPassword("password");\n\nProperties properties = new Properties();\n/**\njson format to streamload\nproperties.setProperty("format", "json");\nproperties.setProperty("read_json_by_line", "true");\n**/\n\nDorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();\nexecutionBuilder.setLabelPrefix("label-doris") //streamload label prefix\n .setStreamLoadProp(properties); \n\nbuilder.setDorisReadOptions(DorisReadOptions.builder().build())\n .setDorisExecutionOptions(executionBuilder.build())\n .setSerializer(new SimpleStringSerializer()) //serialize according to string \n .setDorisOptions(dorisBuilder.build());\n\n\n//mock string source\nList<Tuple2<String, Integer>> data = new ArrayList<>();\ndata.add(new Tuple2<>("doris",1));\nDataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(data);\n\nsource.map((MapFunction<Tuple2<String, Integer>, String>) t -> t.f0 + "\\t" + t.f1)\n .sinkTo(builder.build());\n')),(0,r.yg)("p",null,(0,r.yg)("strong",{parentName:"p"},"RowData Stream")),(0,r.yg)("pre",null,(0,r.yg)("code",{parentName:"pre",className:"language-java"},'// enable checkpoint\nenv.enableCheckpointing(10000);\n// using batch mode for bounded data\nenv.setRuntimeMode(RuntimeExecutionMode.BATCH);\n\n//doris sink option\nDorisSink.Builder<RowData> builder = DorisSink.builder();\nDorisOptions.Builder dorisBuilder = DorisOptions.builder();\ndorisBuilder.setFenodes("FE_IP:8030")\n .setTableIdentifier("db.table")\n .setUsername("root")\n .setPassword("password");\n\n// json format to streamload\nProperties properties = new Properties();\nproperties.setProperty("format", "json");\nproperties.setProperty("read_json_by_line", "true");\nDorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();\nexecutionBuilder.setLabelPrefix("label-doris") //streamload label prefix\n .setStreamLoadProp(properties); //streamload params\n\n//flink rowdata\u2018s schema\nString[] fields = {"city", "longitude", "latitude", "destroy_date"};\nDataType[] types = {DataTypes.VARCHAR(256), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DATE()};\n\nbuilder.setDorisReadOptions(DorisReadOptions.builder().build())\n .setDorisExecutionOptions(executionBuilder.build())\n .setSerializer(RowDataSerializer.builder() //serialize according to rowdata \n .setFieldNames(fields)\n .setType("json") //json format\n .setFieldType(types).build())\n .setDorisOptions(dorisBuilder.build());\n\n//mock rowdata source\nDataStream<RowData> source = env.fromElements("")\n .map(new MapFunction<String, RowData>() {\n @Override\n public RowData map(String value) throws Exception {\n GenericRowData genericRowData = new GenericRowData(4);\n genericRowData.setField(0, StringData.fromString("beijing"));\n genericRowData.setField(1, 116.405419);\n genericRowData.setField(2, 39.916927);\n genericRowData.setField(3, LocalDate.now().toEpochDay());\n return genericRowData;\n }\n });\n\nsource.sinkTo(builder.build());\n')),(0,r.yg)("p",null,(0,r.yg)("strong",{parentName:"p"},"SchemaChange Stream")),(0,r.yg)("pre",null,(0,r.yg)("code",{parentName:"pre",className:"language-java"},'// enable checkpoint\nenv.enableCheckpointing(10000);\n\nProperties props = new Properties();\nprops.setProperty("format", "json");\nprops.setProperty("read_json_by_line", "true");\nDorisOptions dorisOptions = DorisOptions.builder()\n .setFenodes("127.0.0.1:8030")\n .setTableIdentifier("test.t1")\n .setUsername("root")\n .setPassword("").build();\n\nDorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();\nexecutionBuilder.setLabelPrefix("label-doris" + UUID.randomUUID())\n .setStreamLoadProp(props).setDeletable(true);\n\nDorisSink.Builder<String> builder = DorisSink.builder();\nbuilder.setDorisReadOptions(DorisReadOptions.builder().build())\n .setDorisExecutionOptions(executionBuilder.build())\n .setDorisOptions(dorisOptions)\n .setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build());\n\nenv.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")//.print();\n .sinkTo(builder.build());\n')),(0,r.yg)("p",null,"refer: ",(0,r.yg)("a",{parentName:"p",href:"https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java"},"CDCSchemaChangeExample")),(0,r.yg)("h3",{id:"general"},"General"),(0,r.yg)("table",null,(0,r.yg)("thead",{parentName:"table"},(0,r.yg)("tr",{parentName:"thead"},(0,r.yg)("th",{parentName:"tr",align:null},"Key"),(0,r.yg)("th",{parentName:"tr",align:null},"Default Value"),(0,r.yg)("th",{parentName:"tr",align:null},"Required"),(0,r.yg)("th",{parentName:"tr",align:null},"Comment"))),(0,r.yg)("tbody",{parentName:"table"},(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"fenodes"),(0,r.yg)("td",{parentName:"tr",align:null},"--"),(0,r.yg)("td",{parentName:"tr",align:null},"Y"),(0,r.yg)("td",{parentName:"tr",align:null},"Doris FE http address, support multiple addresses, separated by commas")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"table.identifier"),(0,r.yg)("td",{parentName:"tr",align:null},"--"),(0,r.yg)("td",{parentName:"tr",align:null},"Y"),(0,r.yg)("td",{parentName:"tr",align:null},"Doris table identifier, eg, db1.tbl1")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"username"),(0,r.yg)("td",{parentName:"tr",align:null},"--"),(0,r.yg)("td",{parentName:"tr",align:null},"Y"),(0,r.yg)("td",{parentName:"tr",align:null},"Doris username")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"password"),(0,r.yg)("td",{parentName:"tr",align:null},"--"),(0,r.yg)("td",{parentName:"tr",align:null},"Y"),(0,r.yg)("td",{parentName:"tr",align:null},"Doris password")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"doris.request.retries"),(0,r.yg)("td",{parentName:"tr",align:null},"3"),(0,r.yg)("td",{parentName:"tr",align:null},"N"),(0,r.yg)("td",{parentName:"tr",align:null},"Number of retries to send requests to Doris")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"doris.request.connect.timeout.ms"),(0,r.yg)("td",{parentName:"tr",align:null},"30000"),(0,r.yg)("td",{parentName:"tr",align:null},"N"),(0,r.yg)("td",{parentName:"tr",align:null},"Connection timeout for sending requests to Doris")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"doris.request.read.timeout.ms"),(0,r.yg)("td",{parentName:"tr",align:null},"30000"),(0,r.yg)("td",{parentName:"tr",align:null},"N"),(0,r.yg)("td",{parentName:"tr",align:null},"Read timeout for sending request to Doris")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"doris.request.query.timeout.s"),(0,r.yg)("td",{parentName:"tr",align:null},"3600"),(0,r.yg)("td",{parentName:"tr",align:null},"N"),(0,r.yg)("td",{parentName:"tr",align:null},"Query the timeout time of doris, the default is 1 hour, -1 means no timeout limit")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"doris.request.tablet.size"),(0,r.yg)("td",{parentName:"tr",align:null},"Integer.MAX_VALUE"),(0,r.yg)("td",{parentName:"tr",align:null},"N"),(0,r.yg)("td",{parentName:"tr",align:null},"The number of Doris Tablets corresponding to an Partition. The smaller this value is set, the more partitions will be generated. This will increase the parallelism on the flink side, but at the same time will cause greater pressure on Doris.")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"doris.batch.size"),(0,r.yg)("td",{parentName:"tr",align:null},"1024"),(0,r.yg)("td",{parentName:"tr",align:null},"N"),(0,r.yg)("td",{parentName:"tr",align:null},"The maximum number of rows to read data from BE at one time. Increasing this value can reduce the number of connections between Flink and Doris. Thereby reducing the extra time overhead caused by network delay.")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"doris.exec.mem.limit"),(0,r.yg)("td",{parentName:"tr",align:null},"2147483648"),(0,r.yg)("td",{parentName:"tr",align:null},"N"),(0,r.yg)("td",{parentName:"tr",align:null},"Memory limit for a single query. The default is 2GB, in bytes.")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"doris.deserialize.arrow.async"),(0,r.yg)("td",{parentName:"tr",align:null},"false"),(0,r.yg)("td",{parentName:"tr",align:null},"N"),(0,r.yg)("td",{parentName:"tr",align:null},"Whether to support asynchronous conversion of Arrow format to RowBatch required for flink-doris-connector iteration")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"doris.deserialize.queue.size"),(0,r.yg)("td",{parentName:"tr",align:null},"64"),(0,r.yg)("td",{parentName:"tr",align:null},"N"),(0,r.yg)("td",{parentName:"tr",align:null},"Asynchronous conversion of the internal processing queue in Arrow format takes effect when doris.deserialize.arrow.async is true")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"doris.read.field"),(0,r.yg)("td",{parentName:"tr",align:null},"--"),(0,r.yg)("td",{parentName:"tr",align:null},"N"),(0,r.yg)("td",{parentName:"tr",align:null},"List of column names in the Doris table, separated by commas")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"doris.filter.query"),(0,r.yg)("td",{parentName:"tr",align:null},"--"),(0,r.yg)("td",{parentName:"tr",align:null},"N"),(0,r.yg)("td",{parentName:"tr",align:null},"Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering.")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"sink.label-prefix"),(0,r.yg)("td",{parentName:"tr",align:null},"--"),(0,r.yg)("td",{parentName:"tr",align:null},"Y"),(0,r.yg)("td",{parentName:"tr",align:null},"The label prefix used by stream load imports. In the 2pc scenario, global uniqueness is required to ensure the EOS semantics of Flink.")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"sink.properties.*"),(0,r.yg)("td",{parentName:"tr",align:null},"--"),(0,r.yg)("td",{parentName:"tr",align:null},"N"),(0,r.yg)("td",{parentName:"tr",align:null},"The stream load parameters.",(0,r.yg)("br",null)," ",(0,r.yg)("br",null)," eg:",(0,r.yg)("br",null)," sink.properties.column_separator' = ','",(0,r.yg)("br",null)," ",(0,r.yg)("br",null)," Setting 'sink.properties.escape_delimiters' = 'true' if you want to use a control char as a separator, so that such as '","\\","x01' will translate to binary 0x01",(0,r.yg)("br",null),(0,r.yg)("br",null),"Support JSON format import, you need to enable both 'sink.properties.format' ='json' and 'sink.properties.strip_outer_array' ='true'")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"sink.enable-delete"),(0,r.yg)("td",{parentName:"tr",align:null},"true"),(0,r.yg)("td",{parentName:"tr",align:null},"N"),(0,r.yg)("td",{parentName:"tr",align:null},"Whether to enable deletion. This option requires Doris table to enable batch delete function (0.15+ version is enabled by default), and only supports Uniq model.")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"sink.enable-2pc"),(0,r.yg)("td",{parentName:"tr",align:null},"true"),(0,r.yg)("td",{parentName:"tr",align:null},"N"),(0,r.yg)("td",{parentName:"tr",align:null},"Whether to enable two-phase commit (2pc), the default is true, to ensure Exactly-Once semantics. For two-phase commit, please refer to ",(0,r.yg)("a",{parentName:"td",href:"/docs/1.2/data-operate/import/import-way/stream-load-manual"},"here"),".")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"sink.max-retries"),(0,r.yg)("td",{parentName:"tr",align:null},"1"),(0,r.yg)("td",{parentName:"tr",align:null},"N"),(0,r.yg)("td",{parentName:"tr",align:null},"In the 2pc scenario, the number of retries after the commit phase fails.")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"sink.buffer-size"),(0,r.yg)("td",{parentName:"tr",align:null},"1048576(1MB)"),(0,r.yg)("td",{parentName:"tr",align:null},"N"),(0,r.yg)("td",{parentName:"tr",align:null},"Write data cache buffer size, in bytes. It is not recommended to modify, the default configuration is sufficient.")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"sink.buffer-count"),(0,r.yg)("td",{parentName:"tr",align:null},"3"),(0,r.yg)("td",{parentName:"tr",align:null},"N"),(0,r.yg)("td",{parentName:"tr",align:null},"The number of write data cache buffers, it is not recommended to modify, the default configuration is sufficient.")))),(0,r.yg)("h2",{id:"doris--flink-column-type-mapping"},"Doris & Flink Column Type Mapping"),(0,r.yg)("table",null,(0,r.yg)("thead",{parentName:"table"},(0,r.yg)("tr",{parentName:"thead"},(0,r.yg)("th",{parentName:"tr",align:null},"Doris Type"),(0,r.yg)("th",{parentName:"tr",align:null},"Flink Type"))),(0,r.yg)("tbody",{parentName:"table"},(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"NULL_TYPE"),(0,r.yg)("td",{parentName:"tr",align:null},"NULL")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"BOOLEAN"),(0,r.yg)("td",{parentName:"tr",align:null},"BOOLEAN")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"TINYINT"),(0,r.yg)("td",{parentName:"tr",align:null},"TINYINT")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"SMALLINT"),(0,r.yg)("td",{parentName:"tr",align:null},"SMALLINT")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"INT"),(0,r.yg)("td",{parentName:"tr",align:null},"INT")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"BIGINT"),(0,r.yg)("td",{parentName:"tr",align:null},"BIGINT")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"FLOAT"),(0,r.yg)("td",{parentName:"tr",align:null},"FLOAT")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"DOUBLE"),(0,r.yg)("td",{parentName:"tr",align:null},"DOUBLE")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"DATE"),(0,r.yg)("td",{parentName:"tr",align:null},"DATE")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"DATETIME"),(0,r.yg)("td",{parentName:"tr",align:null},"TIMESTAMP")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"DECIMAL"),(0,r.yg)("td",{parentName:"tr",align:null},"DECIMAL")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"CHAR"),(0,r.yg)("td",{parentName:"tr",align:null},"STRING")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"LARGEINT"),(0,r.yg)("td",{parentName:"tr",align:null},"STRING")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"VARCHAR"),(0,r.yg)("td",{parentName:"tr",align:null},"STRING")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"DECIMALV2"),(0,r.yg)("td",{parentName:"tr",align:null},"DECIMAL")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"TIME"),(0,r.yg)("td",{parentName:"tr",align:null},"DOUBLE")),(0,r.yg)("tr",{parentName:"tbody"},(0,r.yg)("td",{parentName:"tr",align:null},"HLL"),(0,r.yg)("td",{parentName:"tr",align:null},"Unsupported datatype")))),(0,r.yg)("h2",{id:"an-example-of-using-flink-cdc-to-access-doris-supports-insertupdatedelete-events"},"An example of using Flink CDC to access Doris (supports insert/update/delete events)"),(0,r.yg)("pre",null,(0,r.yg)("code",{parentName:"pre",className:"language-sql"},"SET 'execution.checkpointing.interval' = '10s';\nCREATE TABLE cdc_mysql_source (\n id int\n ,name VARCHAR\n ,PRIMARY KEY (id) NOT ENFORCED\n) WITH (\n 'connector' = 'mysql-cdc',\n 'hostname' = '127.0.0.1',\n 'port' = '3306',\n 'username' = 'root',\n 'password' = 'password',\n 'database-name' = 'database',\n 'table-name' = 'table'\n);\n\n-- Support delete event synchronization (sink.enable-delete='true'), requires Doris table to enable batch delete function\nCREATE TABLE doris_sink (\nid INT,\nname STRING\n) \nWITH (\n 'connector' = 'doris',\n 'fenodes' = '127.0.0.1:8030',\n 'table.identifier' = 'database.table',\n 'username' = 'root',\n 'password' = '',\n 'sink.properties.format' = 'json',\n 'sink.properties.read_json_by_line' = 'true',\n 'sink.enable-delete' = 'true',\n 'sink.label-prefix' = 'doris_label'\n);\n\ninsert into doris_sink select id,name from cdc_mysql_source;\n")),(0,r.yg)("h2",{id:"use-flinkcdc-to-update-key-column"},"Use FlinkCDC to update Key column"),(0,r.yg)("p",null,"Generally, in a business database, the number is used as the primary key of the table, such as the Student table, the number (id) is used as the primary key, but with the development of the business, the number corresponding to the data may change.\nIn this scenario, using FlinkCDC + Doris Connector to synchronize data can automatically update the data in the Doris primary key column."),(0,r.yg)("h3",{id:"principle"},"Principle"),(0,r.yg)("p",null,"The underlying collection tool of Flink CDC is Debezium. Debezium internally uses the op field to identify the corresponding operation: the values of the op field are c, u, d, and r, corresponding to create, update, delete, and read.\nFor the update of the primary key column, FlinkCDC will send DELETE and INSERT events downstream, and after the data is synchronized to Doris, it will automatically update the data of the primary key column."),(0,r.yg)("h3",{id:"example"},"Example"),(0,r.yg)("p",null,"The Flink program can refer to the CDC synchronization example above. After the task is successfully submitted, execute the Update primary key column statement (",(0,r.yg)("inlineCode",{parentName:"p"},"update student set id = '1002' where id = '1001'"),") on the MySQL side to modify the data in Doris ."),(0,r.yg)("h2",{id:"java-example"},"Java example"),(0,r.yg)("p",null,(0,r.yg)("inlineCode",{parentName:"p"},"samples/doris-demo/")," An example of the Java version is provided below for reference, see ",(0,r.yg)("a",{parentName:"p",href:"https://github.com/apache/doris/tree/master/samples/doris-demo/"},"here")),(0,r.yg)("h2",{id:"best-practices"},"Best Practices"),(0,r.yg)("h3",{id:"application-scenarios"},"Application scenarios"),(0,r.yg)("p",null,"The most suitable scenario for using Flink Doris Connector is to synchronize source data to Doris (Mysql, Oracle, PostgreSQL) in real time/batch, etc., and use Flink to perform joint analysis on data in Doris and other data sources. You can also use Flink Doris Connector"),(0,r.yg)("h3",{id:"other"},"Other"),(0,r.yg)("ol",null,(0,r.yg)("li",{parentName:"ol"},"The Flink Doris Connector mainly relies on Checkpoint for streaming writing, so the interval between Checkpoints is the visible delay time of the data."),(0,r.yg)("li",{parentName:"ol"},"To ensure the Exactly Once semantics of Flink, the Flink Doris Connector enables two-phase commit by default, and Doris enables two-phase commit by default after version 1.1. 1.0 can be enabled by modifying the BE parameters, please refer to ",(0,r.yg)("a",{parentName:"li",href:"/docs/1.2/data-operate/import/import-way/stream-load-manual"},"two_phase_commit"),".")),(0,r.yg)("h2",{id:"faq"},"FAQ"),(0,r.yg)("ol",null,(0,r.yg)("li",{parentName:"ol"},(0,r.yg)("strong",{parentName:"li"},"After Doris Source finishes reading data, why does the stream end?"))),(0,r.yg)("p",null,"Currently Doris Source is a bounded stream and does not support CDC reading."),(0,r.yg)("ol",{start:2},(0,r.yg)("li",{parentName:"ol"},(0,r.yg)("strong",{parentName:"li"},"Can Flink read Doris and perform conditional pushdown?"))),(0,r.yg)("p",null,"By configuring the doris.filter.query parameter, refer to the configuration section for details."),(0,r.yg)("ol",{start:3},(0,r.yg)("li",{parentName:"ol"},(0,r.yg)("strong",{parentName:"li"},"How to write Bitmap type?"))),(0,r.yg)("pre",null,(0,r.yg)("code",{parentName:"pre",className:"language-sql"},"CREATE TABLE bitmap_sink (\ndt int,\npage string,\nuser_id int\n)\nWITH (\n 'connector' = 'doris',\n 'fenodes' = '127.0.0.1:8030',\n 'table.identifier' = 'test.bitmap_test',\n 'username' = 'root',\n 'password' = '',\n 'sink.label-prefix' = 'doris_label',\n 'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)'\n)\n")),(0,r.yg)("ol",{start:4},(0,r.yg)("li",{parentName:"ol"},(0,r.yg)("strong",{parentName:"li"},"errCode = 2, detailMessage = Label ","[label_0_1]"," has already been used, relate to txn ","[19650]"))),(0,r.yg)("p",null,"In the Exactly-Once scenario, the Flink Job must be restarted from the latest Checkpoint/Savepoint, otherwise the above error will be reported.\nWhen Exactly-Once is not required, it can also be solved by turning off 2PC commits (sink.enable-2pc=false) or changing to a different sink.label-prefix."),(0,r.yg)("ol",{start:5},(0,r.yg)("li",{parentName:"ol"},(0,r.yg)("strong",{parentName:"li"},"errCode = 2, detailMessage = transaction ","[19650]"," not found"))),(0,r.yg)("p",null,"Occurred in the Commit phase, the transaction ID recorded in the checkpoint has expired on the FE side, and the above error will occur when committing again at this time.\nAt this time, it cannot be started from the checkpoint, and the expiration time can be extended by modifying the streaming_label_keep_max_second configuration in fe.conf, which defaults to 12 hours."),(0,r.yg)("ol",{start:6},(0,r.yg)("li",{parentName:"ol"},(0,r.yg)("strong",{parentName:"li"},"errCode = 2, detailMessage = current running txns on db 10006 is 100, larger than limit 100"))),(0,r.yg)("p",null,"This is because the concurrent import of the same library exceeds 100, which can be solved by adjusting the parameter ",(0,r.yg)("inlineCode",{parentName:"p"},"max_running_txn_num_per_db")," of fe.conf. For details, please refer to ",(0,r.yg)("a",{parentName:"p",href:"https://doris.apache.org/zh-CN/docs/dev/admin-manual/config/fe-config/#max_running_txn_num_per_db"},"max_running_txn_num_per_db")),(0,r.yg)("ol",{start:7},(0,r.yg)("li",{parentName:"ol"},(0,r.yg)("strong",{parentName:"li"},"How to ensure the order of a batch of data when Flink writes to the Uniq model?"))),(0,r.yg)("p",null,"You can add sequence column configuration to ensure that, for details, please refer to ",(0,r.yg)("a",{parentName:"p",href:"https://doris.apache.org/zh-CN/docs/dev/data-operate/update-delete/sequence-column-manual"},"sequence")),(0,r.yg)("ol",{start:8},(0,r.yg)("li",{parentName:"ol"},(0,r.yg)("strong",{parentName:"li"},"The Flink task does not report an error, but the data cannot be synchronized? "))),(0,r.yg)("p",null,"Before Connector1.1.0, it was written in batches, and the writing was driven by data. It was necessary to determine whether there was data written upstream. After 1.1.0, it depends on Checkpoint, and Checkpoint must be enabled to write."),(0,r.yg)("ol",{start:9},(0,r.yg)("li",{parentName:"ol"},(0,r.yg)("strong",{parentName:"li"},"tablet writer write failed, tablet_id=190958, txn_id=3505530, err=-235"))),(0,r.yg)("p",null,"It usually occurs before Connector1.1.0, because the writing frequency is too fast, resulting in too many versions. The frequency of Streamload can be reduced by setting the sink.batch.size and sink.batch.interval parameters."),(0,r.yg)("ol",{start:10},(0,r.yg)("li",{parentName:"ol"},(0,r.yg)("strong",{parentName:"li"},"Flink imports dirty data, how to skip it? "))),(0,r.yg)("p",null,"When Flink imports data, if there is dirty data, such as field format, length, etc., it will cause StreamLoad to report an error, and Flink will continue to retry at this time. If you need to skip, you can disable the strict mode of StreamLoad (strict_mode=false, max_filter_ratio=1) or filter the data before the Sink operator."),(0,r.yg)("ol",{start:11},(0,r.yg)("li",{parentName:"ol"},(0,r.yg)("strong",{parentName:"li"},"How should the source table and Doris table correspond?"),'\nWhen using Flink Connector to import data, pay attention to two aspects. The first is that the columns and types of the source table correspond to the columns and types in flink sql; the second is that the columns and types in flink sql must match those of the doris table For the correspondence between columns and types, please refer to the above "Doris & Flink Column Type Mapping" for details')))}u.isMDXComponent=!0}}]);