tree: eb69d0b0fcaf12be69f95986e794e9161a94c9d2 [path history] [tgz]
  1. src/
  2. pom.xml
  3. README.md
connectors/rocketmq-connect-jdbc/README.md

rocketmq-connect-jdbc

注: 目前支持的数据库类型为 mysql ,openMLDB ,其它数据库的jdbc模式持续扩展中

rocketmq-connect-jdbc 打包

mvn clean package -Dmaven.test.skip=true

rocketmq-connect-jdbc 启动

  • jdbc-source-connector 启动
POST  http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-jdbc-source-connector-name}
{
    "connector-class":"org.apache.rocketmq.connect.jdbc.connector.JdbcSourceConnector",
    "max-task":"2",
    "connection.url":"jdbc:mysql://XXXXXXXXX:3306",
    "connection.user":"*****",
    "connection.password":"*****",
    "table.whitelist":"db.table",
    "mode": "incrementing",
    "incrementing.column.name":"id",
    "timestamp.initial": -1,
    "source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
}
  • jdbc-sink-connector 启动
POST  http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-jdbc-sink-connector-name}
{
    "connector-class":"org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConnector",
    "max-task":"2",
    "connect-topicname":"connect-topicname-jdbc-02",
    "connection.url":"jdbc:mysql://*****:3306/{dbname}",
    "connection.user":"******",
    "connection.password":"******",
    "pk.fields":"id",
    "pk.mode":"record_value",
    "insert.mode":"UPSERT",
    "source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
}

注: rocketmq-jdbc-connect 的启动依赖于rocketmq-connect-runtime项目的启动,需将打好的所有jar包放置到runtime项目中pluginPaths配置的路径后再执行上面的启动请求,该值配置在runtime项目下的connect.conf文件中

rocketmq-connect-jdbc 停止

http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-jdbc-connector-name}/stop

rocketmq-connect-jdbc 参数说明

  • jdbc-source-connector 参数说明
KEYTYPEMust be filledDescriptionExample
connection.urlStringYESsource端 jdbc连接jdbc:mysql://XXXXXXXXX:3306
connection.userStringYESsource端 DB 用户名root
connection.passwordStringYESsource端 DB 密码root
connection.attemptsStringYESsource端 DB连接重试次数3
connection.backoff.msLongYES
poll.interval.msLongYES拉取间隔时间3000ms
batch.max.rowsIntegerNO每次拉取数量300
modeIntegerNO拉取模式bulk、timestamp、incrementing、timestamp+incrementing
incrementing.column.nameIntegerNO增量字段,常用IDid
timestamp.column.nameStringYES时间增量字段modified_time
table.whitelistStringYES需要扫描的表db.table,db.table01
max-taskIntegerYES任务数量,最大不能大于表的数量2
source-record-converterIntegerYESdata转换器org.apache.rocketmq.connect.runtime.converter.JsonConverter
注:1.source拉取的数据写入到以表名自动创建的topic中,如果需要写入特定的topic中则需要指定"connect-topicname" 参数
   2.topic.prefix参数可以为自动创建的topic增加前缀,用来进行逻辑的隔离
  • jdbc-sink-connector 参数说明
KEYTYPEMust be filledDescriptionExample
connection.urlStringYESsink端 jdbc连接jdbc:mysql://XXXXXXXXX:3306
connection.userStringYESsink端 DB 用户名root
connection.passwordStringYESsink端 DB 密码root
connection.attemptsStringNOsink端 DB连接重试次数3
connection.backoff.msLongNO
connect-topicnameLongYES监听的topictopic-name
pk.fieldsStringNO写入侧主键配置,用于更新使用id
pk.modeStringNO获取主键的模式none、record_value
insert.modeIntegerYES写入模式UPDATE、UPSERT、INSERT
max-taskIntegerNO任务数量2
source-record-converterIntegerYESdata转换器org.apache.rocketmq.connect.runtime.converter.JsonConverter
注: openMLDB maven包的引入:
---------MacOS 系统下运行-------------
     <dependency>
            <groupId>com.4paradigm.openmldb</groupId>
            <artifactId>openmldb-native</artifactId>
            <version>0.5.0-macos</version>
        </dependency>
        <dependency>
            <groupId>com.4paradigm.openmldb</groupId>
            <artifactId>openmldb-jdbc</artifactId>
            <version>0.5.0</version>
            <exclusions>
                <exclusion>
                    <groupId>com.4paradigm.openmldb</groupId>
                    <artifactId>openmldb-native</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
---------Linux 系统下运行-------------
        <dependency>
            <groupId>com.4paradigm.openmldb</groupId>
            <artifactId>openmldb-jdbc</artifactId>
            <version>0.5.0</version>
        </dependency>