tree: 8bbc29ea777feec97b680151c0916af8c3e564a1 [path history] [tgz]
  1. scripts/
  2. src/
  3. pom.xml
  4. README.md
rocketmq-connect-cassandra/README.md

rocketmq-connect-cassandra

rocketmq-connect-cassandra 打包

mvn clean install -DskipTest -U 

目前安装会遇到的问题

目前的rocketmq-connect-cassandra 使用的是datastax-java-driver:4.5.0版本的cassandra-driver,由于在打包过程中还有没有解决的问题,该cassandra driver无法读取 位于driver包中的默认配置文件,因此我们需要手动下载cassandra driver的配置文件reference.conf 并将其放置于classpath中。

该问题还仍然在解决的过程中。

rocketmq-connect-cassandra 启动

  • cassandra-source-connector 启动
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-cassandra-source-connector-name}
?config={"connector-class":"org.apache.rocketmq.connect.cassandra.connector.JdbcSourceConnector",“dbUrl”:"${source-db-ip}",dbPort”:"${source-db-port}",dbUsername”:"${source-db-username}",dbPassword”:"${source-db-password}","rocketmqTopic":"cassandraTopic","mode":"bulk","whiteDataBase":{"${source-db-name}":{"${source-table-name}":{"${source-column-name}":"${source-column-value}"}}},"source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}
  • cassandra-sink-connector 启动
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-cassandra-sink-connector-name}
?config={"connector-class":"org.apache.rocketmq.connect.cassandra.connector.JdbcSinkConnector",“dbUrl”:"${sink-db-ip}",dbPort”:"${sink-db-port}",dbUsername”:"${sink-db-username}",dbPassword”:"${sink-db-password}","rocketmqTopic":"cassandraTopic","mode":"bulk","topicNames":"${sink-topic-name}","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}

注: rocketmq-cassandra-connect 的启动依赖于rocketmq-connect-runtime项目的启动,需将打好的jar包放置到runtime项目中pluginPaths配置的路径后再执行上面的启动请求,该值配置在runtime项目下的connect.conf文件中

rocketmq-connect-cassandra 停止

http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-cassandra-connector-name}/stop

rocketmq-connect-cassandra 参数说明

  • cassandra-source-connector 参数说明
参数类型是否必须描述样例
dbUrlStringsource端 DB ip192.168.1.2
dbPortStringsource端 DB port3306
dbUsernameStringsource端 DB 用户名root
dbPasswordStringsource端 DB 密码123456
rocketmqTopicString待废弃的参数,需和topicNames相同jdbc_cassandra
topicNamesStringrocketmq默认每一个数据源中的表对应一个名字,该名称需和数据库表名称相同jdbc_cassandra
whiteDataBaseStringsource端同步数据白名单,嵌套配置,为{DB名:{表名:{字段名:字段值}}},若无指定字段数据同步,字段名可设为NO-FILTER,值为任意{“DATABASE_TEST”:{“TEST_DATA”:{“name”:“test”}}}
modeStringsource-connector 模式,目前仅支持bulkbulk
localDataCenterString待废弃cassandra 集群的datacenter名称,为必填项
task-divide-strategyIntegertask 分配策略, 默认值为 0,表示按照topic分配任务,每一个table便是一个topic0
task-parallelismIntegertask parallelism,默认值为 1,表示将topic拆分为多少个任务进行执行2
source-clusterStringsink 端获取路由信息连接到的RocketMQ nameserver 地址172.17.0.1:10911
source-rocketmqStringsink 端获取路由信息连接到的RocketMQ broker cluster 地址127.0.0.1:9876
source-record-converterStringsource data 解析org.apache.rocketmq.connect.runtime.converter.JsonConverter

示例配置如下

{
    "connector-class":"org.apache.rocketmq.connect.cassandra.connector.CassandraSourceConnector",
    "rocketmqTopic":"jdbc_cassandra",
    "topicNames": "jdbc_cassandra",
    "dbUrl":"127.0.0.1",
    "dbPort":"9042",
    "dbUsername":"cassandra",
    "dbPassword":"cassandra",
    "localDataCenter":"datacenter1",
    "whiteDataBase": {
        "jdbc":{
           "jdbc_cassandra": {"NO-FILTER": "10"}
         }
      },
    "mode": "bulk",
    "task-parallelism": 1,
    "source-cluster": "172.17.0.1:10911",
    "source-rocketmq": "127.0.0.1:9876",
    "source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
 }
  • cassandra-sink-connector 参数说明
参数类型是否必须描述样例
dbUrlStringsink端 DB ip192.168.1.2
dbPortStringsink端 DB port3306
dbUsernameStringsink端 DB 用户名root
dbPasswordStringsink端 DB 密码123456
topicNamesStringsink端同步数据的topic名字topic-1,topic-2
modeStringsource-connector 模式,目前仅支持bulkbulk
rocketmqTopicString待废弃cassandraTopic
task-divide-strategyIntegertask 分配策略, 默认值为 0,表示按照topic分配任务,每一个table便是一个topic0
task-parallelismIntegertask parallelism,默认值为 1,表示将topic拆分为多少个任务进行执行2
source-rocketmqStringsink 端获取路由信息连接到的RocketMQ nameserver 地址172.17.0.1:10911
source-clusterStringsink 端获取路由信息连接到的RocketMQ broker cluster 地址127.0.0.1:9876
source-record-converterStringsource data 解析org.apache.rocketmq.connect.runtime.converter.JsonConverter