DataX Doriswriter 插件,支持将 MySQL、Oracle、SqlServer 等多种数据源中的数据通过 Stream Load的方式同步到 Doris 中。
:::info 注意
DataX官方提供了安装包,已经包含了DataX可直接下载使用,可参考这里
下载DorisWriter 的插件源码
运行 init-env.sh
编译 doriswriter:
单独编译 doriswriter 插件:
`mvn clean install -pl plugin-rdbms-util,doriswriter -DskipTests`
如需编译整个 DataX 项目可参考这里
编译错误
如遇到如下编译错误: ``` Could not find artifact com.alibaba.datax:datax-all:pom:0.0.1-SNAPSHOT ... ``` 可尝试以下方式解决: 1. 下载 [alibaba-datax-maven-m2-20210928.tar.gz](https://doris-thirdparty-repo.bj.bcebos.com/thirdparty/alibaba-datax-maven-m2-20210928.tar.gz) 2. 解压后,将得到的 `alibaba/datax/` 目录,拷贝到所使用的 maven 对应的 `.m2/repository/com/alibaba/` 下, 再次尝试编译。
jdbcUrl
loadUrl
,,doriswriter 将以轮询的方式访问。username
password
connection.selectedDatabase
connection.table
flushInterval
maxBatchRows 和 batchSize 参数设置的有很大,那么很可能达不到你这设置的数据量大小,也会执行导入。column
preSql
postSql
maxBatchRows
batchSize
maxRetries
labelPrefix
labelPrefix + UUID 组成全局唯一的 label,确保数据不会重复导入datax_doris_writer_loadProps
描述:StreamLoad 的请求参数,详情参照 StreamLoad 介绍页面。Stream load - Apache Doris
这里包括导入的数据格式:format 等,导入数据格式默认我们使用 csv,支持 JSON,具体可以参照下面类型转换部分,也可以参照上面 Stream load 官方信息
必选:否
默认值:无
该示例插件的使用说明请参阅 这里
1.Mysql 表结构
CREATE TABLE `t_test`( `id`bigint(30) NOT NULL, `order_code` varchar(30) DEFAULT NULL COMMENT '', `line_code` varchar(30) DEFAULT NULL COMMENT '', `remark` varchar(30) DEFAULT NULL COMMENT '', `unit_no` varchar(30) DEFAULT NULL COMMENT '', `unit_name` varchar(30) DEFAULT NULL COMMENT '', `price` decimal(12,2) DEFAULT NULL COMMENT '', PRIMARY KEY(`id`) USING BTREE )ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=DYNAMIC COMMENT='';
2.Doris 表结构
CREATE TABLE `ods_t_test` ( `id` bigint(30) NOT NULL, `order_code` varchar(30) DEFAULT NULL COMMENT '', `line_code` varchar(30) DEFAULT NULL COMMENT '', `remark` varchar(30) DEFAULT NULL COMMENT '', `unit_no` varchar(30) DEFAULT NULL COMMENT '', `unit_name` varchar(30) DEFAULT NULL COMMENT '', `price` decimal(12,2) DEFAULT NULL COMMENT '' ) ENGINE=OLAP UNIQUE KEY(`id`, `order_code`) DISTRIBUTED BY HASH(`order_code`) BUCKETS 1 PROPERTIES ( "replication_allocation" = "tag.location.default: 3", "in_memory" = "false", "storage_format" = "V2" );
3.创建 datax 脚本
my_import.json
{ "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "column": ["id","order_code","line_code","remark","unit_no","unit_name","price"], "connection": [ { "jdbcUrl": ["jdbc:mysql://localhost:3306/demo"], "table": ["employees_1"] } ], "username": "root", "password": "xxxxx", "where": "" } }, "writer": { "name": "doriswriter", "parameter": { "loadUrl": ["127.0.0.1:8030"], "column": ["id","order_code","line_code","remark","unit_no","unit_name","price"], "username": "root", "password": "xxxxxx", "postSql": ["select count(1) from all_employees_info"], "preSql": [], "flushInterval":30000, "connection": [ { "jdbcUrl": "jdbc:mysql://127.0.0.1:9030/demo", "selectedDatabase": "demo", "table": ["all_employees_info"] } ], "loadProps": { "format": "json", "strip_outer_array":"true", "line_delimiter": "\\x02" } } } } ], "setting": { "speed": { "channel": "1" } } } }
备注:
"loadProps": { "format": "json", "strip_outer_array":"true", "line_delimiter": "\\x02" }
- 这里我们使用了 JSON 格式导入数据
line_delimiter默认是换行符,可能会和数据中的值冲突,我们可以使用一些特殊字符或者不可见字符,避免导入错误- strip_outer_array:在一批导入数据中表示多行数据,Doris 在解析时会将数组展开,然后依次解析其中的每一个 Object 作为一行数据
- 更多 Stream load 参数请参照 [Stream load 文档](Stream load - Apache Doris)
- 如果是 CSV 格式我们可以这样使用
"loadProps": { "format": "csv", "column_separator": "\\x01", "line_delimiter": "\\x02" }CSV 格式要特别注意行列分隔符,避免和数据中的特殊字符冲突,这里建议使用隐藏字符,默认列分隔符是:
\t,行分隔符:\n
4.执行 DataX 任务,具体参考 DataX 官网
python bin/datax.py my_import.json
执行之后我们可以看到下面的信息
2022-11-16 14:28:54.012 [job-0] INFO JobContainer - jobContainer starts to do prepare ... 2022-11-16 14:28:54.012 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] do prepare work . 2022-11-16 14:28:54.013 [job-0] INFO JobContainer - DataX Writer.Job [doriswriter] do prepare work . 2022-11-16 14:28:54.020 [job-0] INFO JobContainer - jobContainer starts to do split ... 2022-11-16 14:28:54.020 [job-0] INFO JobContainer - Job set Channel-Number to 1 channels. 2022-11-16 14:28:54.023 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] splits to [1] tasks. 2022-11-16 14:28:54.023 [job-0] INFO JobContainer - DataX Writer.Job [doriswriter] splits to [1] tasks. 2022-11-16 14:28:54.033 [job-0] INFO JobContainer - jobContainer starts to do schedule ... 2022-11-16 14:28:54.036 [job-0] INFO JobContainer - Scheduler starts [1] taskGroups. 2022-11-16 14:28:54.037 [job-0] INFO JobContainer - Running by standalone Mode. 2022-11-16 14:28:54.041 [taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks. 2022-11-16 14:28:54.043 [taskGroup-0] INFO Channel - Channel set byte_speed_limit to -1, No bps activated. 2022-11-16 14:28:54.043 [taskGroup-0] INFO Channel - Channel set record_speed_limit to -1, No tps activated. 2022-11-16 14:28:54.049 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started 2022-11-16 14:28:54.052 [0-0-0-reader] INFO CommonRdbmsReader$Task - Begin to read record by Sql: [select taskid,projectid,taskflowid,templateid,template_name,status_task from dwd_universal_tb_task ] jdbcUrl:[jdbc:mysql://localhost:3306/demo?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true]. Wed Nov 16 14:28:54 GMT+08:00 2022 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification. 2022-11-16 14:28:54.071 [0-0-0-reader] INFO CommonRdbmsReader$Task - Finished read record by Sql: [select taskid,projectid,taskflowid,templateid,template_name,status_task from dwd_universal_tb_task ] jdbcUrl:[jdbc:mysql://localhost:3306/demo?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true]. 2022-11-16 14:28:54.104 [Thread-1] INFO DorisStreamLoadObserver - Start to join batch data: rows[2] bytes[438] label[datax_doris_writer_c4e08cb9-c157-4689-932f-db34acc45b6f]. 2022-11-16 14:28:54.104 [Thread-1] INFO DorisStreamLoadObserver - Executing stream load to: 'http://127.0.0.1:8030/api/demo/dwd_universal_tb_task/_stream_load', size: '441' 2022-11-16 14:28:54.224 [Thread-1] INFO DorisStreamLoadObserver - StreamLoad response :{"Status":"Success","BeginTxnTimeMs":0,"Message":"OK","NumberUnselectedRows":0,"CommitAndPublishTimeMs":17,"Label":"datax_doris_writer_c4e08cb9-c157-4689-932f-db34acc45b6f","LoadBytes":441,"StreamLoadPutTimeMs":1,"NumberTotalRows":2,"WriteDataTimeMs":11,"TxnId":217056,"LoadTimeMs":31,"TwoPhaseCommit":"false","ReadDataTimeMs":0,"NumberLoadedRows":2,"NumberFilteredRows":0} 2022-11-16 14:28:54.225 [Thread-1] INFO DorisWriterManager - Async stream load finished: label[datax_doris_writer_c4e08cb9-c157-4689-932f-db34acc45b6f]. 2022-11-16 14:28:54.249 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[201]ms 2022-11-16 14:28:54.250 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] completed it's tasks. 2022-11-16 14:29:04.048 [job-0] INFO StandAloneJobContainerCommunicator - Total 2 records, 214 bytes | Speed 21B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00% 2022-11-16 14:29:04.049 [job-0] INFO AbstractScheduler - Scheduler accomplished all tasks. 2022-11-16 14:29:04.049 [job-0] INFO JobContainer - DataX Writer.Job [doriswriter] do post work. Wed Nov 16 14:29:04 GMT+08:00 2022 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification. 2022-11-16 14:29:04.187 [job-0] INFO DorisWriter$Job - Start to execute preSqls:[select count(1) from dwd_universal_tb_task]. context info:jdbc:mysql://172.16.0.13:9030/demo. 2022-11-16 14:29:04.204 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] do post work. 2022-11-16 14:29:04.204 [job-0] INFO JobContainer - DataX jobId [0] completed successfully. 2022-11-16 14:29:04.204 [job-0] INFO HookInvoker - No hook invoked, because base dir not exists or is a file: /data/datax/hook 2022-11-16 14:29:04.205 [job-0] INFO JobContainer - [total cpu info] => averageCpu | maxDeltaCpu | minDeltaCpu -1.00% | -1.00% | -1.00% [total gc info] => NAME | totalGCCount | maxDeltaGCCount | minDeltaGCCount | totalGCTime | maxDeltaGCTime | minDeltaGCTime PS MarkSweep | 1 | 1 | 1 | 0.017s | 0.017s | 0.017s PS Scavenge | 1 | 1 | 1 | 0.007s | 0.007s | 0.007s 2022-11-16 14:29:04.205 [job-0] INFO JobContainer - PerfTrace not enable! 2022-11-16 14:29:04.206 [job-0] INFO StandAloneJobContainerCommunicator - Total 2 records, 214 bytes | Speed 21B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00% 2022-11-16 14:29:04.206 [job-0] INFO JobContainer - 任务启动时刻 : 2022-11-16 14:28:53 任务结束时刻 : 2022-11-16 14:29:04 任务总计耗时 : 10s 任务平均流量 : 21B/s 记录写入速度 : 0rec/s 读出记录总数 : 2 读写失败总数 : 0