tree: b854a14f69012d025da6ffb4e39c87f5ef40a5e5 [path history] [tgz]
  1. src/
  2. pom.xml
  3. README.md
connectors/rocketmq-connect-jdbc/README.md

rocketmq-connect-jdbc

为方便扩展,rocketmq-connect-jdbc目前采用Spi插件的形式进行扩展,核心扩展api主要有:

org.apache.rocketmq.connect.jdbc.dialect.DatabaseDialectFactory
org.apache.rocketmq.connect.jdbc.dialect.DatabaseDialect

目前支持Mysql、OpenMLDB数据库, pg、oracle、sqlserver、db2 等关系型数据库还在持续扩展中

rocketmq-connect-jdbc使用方法

  1. 进入想要使用的connectors目录下(以rocketmq-connect-jdbc目录为例),使用以下指令将插件进行打包

    mvn clean package -Dmaven.test.skip=true
    
  2. 打包好的插件以tar.gz的模式出现在rocketmq-connect-jdbc/target/目录下

  3. distribution/conf目录下找的对应的配置文件进行更新,对于standalone的启动方式,更新connect-standalone.conf文件中的pluginPaths变量

    pluginPaths=(you plugin path)
    

    相应的,使用distributed启动方式,则更新connect-distributed.conf中的变量

  4. 在源数据库和目的数据库准备好需要使用的表,可参考RocketMQ Connect实战1

  5. 创建并启动对应的SourceConnector以及SinkConnector

示例:jdbc -> Rocketmq -> doris

注: 目前支持将根据mysql ,openMLDB对应的source connect以流式的方式导入到rocketmq中的数据导入到doris

打包connectors并配置conf文件

mvn clean package -Dmaven.test.skip=true

启动connector

  • jdbc-source-connector 启动
POST  http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-jdbc-source-connector-name}
{
    "connector.class":"org.apache.rocketmq.connect.jdbc.mysql.source.JdbcSourceConnector",
    "max.tasks":"2",
    "connection.url":"jdbc:mysql://XXXXXXXXX:3306",
    "connection.user":"*****",
    "connection.password":"*****",
    "table.whitelist":"db.table",
    "mode": "incrementing",
    "incrementing.column.name":"id",
    "timestamp.initial": -1,
    "key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
    "value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
}
  • doris-sink-connector 启动
POST  http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-jdbc-sink-connector-name}
{
    "connector.class":"org.apache.rocketmq.connect.doris.connector.DorisSinkConnector",
    "max-task":"1",
    "table.whitelist":"sink_test.doris_test_sink",
    "connect.topicname":"doris_test_sink",
    "connect.topicnames":"doris_test_sink",
    "host":"xx.xx.xx.xx",
    "port":"xxxx",
    "user":"xx",
    "passwd":"****",
    "database":"database",
    "insert.mode":"INSERT",
    "db.timezone":"UTC",
    "table.types":"TABLE",
    "auto.create":"true",
    "source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter",
    "key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
    "value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"}'
}

注: rocketmq-jdbc-connect 的启动依赖于rocketmq-connect-runtime项目的启动,需将打好的所有jar包放置到runtime项目中pluginPaths配置的路径后再执行上面的启动请求,该值配置在runtime项目下的connect.conf文件中

rocketmq-connect-jdbc 停止

http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-jdbc-connector-name}/stop

rocketmq-connect-jdbc 参数说明

  • jdbc-source-connector 参数说明
KEYTYPEMust be filledDescriptionExample
connection.urlStringYESsource端 jdbc连接jdbc:mysql://XXXXXXXXX:3306
connection.userStringYESsource端 DB 用户名root
connection.passwordStringYESsource端 DB 密码root
connection.attemptsStringYESsource端 DB连接重试次数3
connection.backoff.msLongYES
poll.interval.msLongYES拉取间隔时间3000ms
batch.max.rowsIntegerNO每次拉取数量300
modeIntegerNO拉取模式bulk、timestamp、incrementing、timestamp+incrementing
incrementing.column.nameIntegerNO增量字段,常用IDid
timestamp.column.nameStringYES时间增量字段modified_time
table.whitelistStringYES需要扫描的表db.table,db.table01
max.tasksIntegerYES任务数量,最大不能大于表的数量2
key.converterIntegerYESkey转换器org.apache.rocketmq.connect.doris.converter.JsonConverter
value.converterIntegerYESdata转换器org.apache.rocketmq.connect.doris.converter.JsonConverter
注:1.source拉取的数据写入到以表名自动创建的topic中,如果需要写入特定的topic中则需要指定"connect-topicname" 参数
   2.topic.prefix参数可以为自动创建的topic增加前缀,用来进行逻辑的隔离
  • jdbc-sink-connector 参数说明
KEYTYPEMust be filledDescriptionExample
connection.urlStringYESsink端 jdbc连接jdbc:mysql://XXXXXXXXX:3306
connection.userStringYESsink端 DB 用户名root
connection.passwordStringYESsink端 DB 密码root
hostStringYESdoris host192.168.0.1
portStringYESdoris http port8030
userStringYES监听的topicroot
passwdStringYES监听的topicpasswd
max.tasksIntegerNO任务数量2
key.converterIntegerYESkey转换器org.apache.rocketmq.connect.doris.converter.JsonConverter
value.converterIntegerYESdata转换器org.apache.rocketmq.connect.doris.converter.JsonConverter