tree: 0f10882c56cabb4d7d87caabbee77b235982b6ab [path history] [tgz]
  1. src/
  2. style/
  3. pom.xml
  4. README.md
connectors/rocketmq-connect-hudi/README.md

rocketmq-connect-hudi

rocketmq-connect-hudi 打包

mvn clean install -DskipTest -U 

将target目录下打包的rocketmq-connect-hudi-0.0.1-SNAPSHOT-jar-with-dependencies.jar拷贝到connector-runtime connect.conf配置的connector-plugin目录下。

目前安装会遇到的问题

目前的rocketmq-connect-hudi 使用的是0.8.0版本的hudi.

rocketmq-connect-hudi 启动

首先,需要启动connect-runtime,参考rocketmq-connect-runtime的run_work.sh脚本。

  • hudi-sink-connector 启动
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-hudi-sink-connector-name}
?config='{"connector-class":"org.apache.rocketmq.connect.hudi.connector.HudiSinkConnector","topicNames":"topicc","tablePath":"file:///tmp/hudi_connector_test","tableName":"hudi_connector_test_table","insertShuffleParallelism":"2","upsertShuffleParallelism":"2","deleteParallelism":"2","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.RocketMQConverter","source-rocketmq":"127.0.0.1:9876","src-cluster":"DefaultCluster","refresh-interval":"10000","schemaPath":"/Users/osgoo/Downloads/user.avsc"\}'

启动成功会打印如下日志:

2021-09-06 16:23:14 INFO pool-2-thread-1 - Open HoodieJavaWriteClient successfully

注: rocketmq-hudi-connect 的启动依赖于rocketmq-connect-runtime项目的启动,需将打好的jar包放置到runtime项目中pluginPaths配置的路径后再执行上面的启动请求,该值配置在runtime项目下的connect.conf文件中

rocketmq-connect-hudi 停止

http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-hudi-connector-name}/stop

rocketmq-connect-hudi 参数说明

  • hudi-sink-connector 参数说明
参数类型是否必须描述样例
connector-classStringsink connector类HudiSinkConnector
tablePathStringsink到hudi的表路径file:///tmp/hudi_connector_test
tableNameStringsink到hudi的表名称hudi_connector_test_table
insertShuffleParallelisminthudi insert并发度2
upsertShuffleParallelisminthudi upsert并发度2
deleteParallelisminthudi delete并发度2
topicNamesStringrocketmq默认每一个数据源中的表对应一个名字,该名称需和数据库表名称相同jdbc_hudi
task-divide-strategyIntegertask 分配策略, 默认值为 0,表示按照topic分配任务,每一个table便是一个topic0
task-parallelismIntegertask parallelism,默认值为 1,表示将topic拆分为多少个任务进行执行2
source-clusterStringsink 端 RocketMQ cluster 名称DefaultCluster
source-rocketmqStringsink 端获取路由信息连接到的 RocketMQ nameserver 地址127.0.0.1:9876
source-record-converterStringsource data 解析org.apache.rocketmq.connect.runtime.converter.RocketMQConverter
refresh-intervalStringsink的刷新时间,单位ms10000
schemaPathStringsink的schema地址/Users/osgoo/Downloads/user.avsc"

示例配置如下

{
	"connector-class": "org.apache.rocketmq.connect.hudi.connector.HudiSinkConnector",
	"topicNames": "topicc",
	"tablePath": "file:///tmp/hudi_connector_test",
	"tableName": "hudi_connector_test_table",
	"insertShuffleParallelism": "2",
	"upsertShuffleParallelism": "2",
	"deleteParallelism": "2",
	"source-record-converter": "org.apache.rocketmq.connect.runtime.converter.RocketMQConverter",
	"source-rocketmq": "127.0.0.1:9876",
	"source-cluster": "DefaultCluster",
	"refresh-interval": "10000",
	"schemaPath": "/Users/osgoo/Downloads/user.avsc"
}
  • spark-submit 启动任务 将connect-runtime打包后通过spark-submit提交任务
nohup sh spark-submit 	--class org.apache.rocketmq.connect.runtime.DistributedConnectStartup --conf "spark.driver.extraJavaOptions=-Dlogback.configurationFile=logback.xml" --files /xxx/conf/connect.conf,/xxx/conf/log4j.properties  --packages org.apache.hudi:hudi-spark3-bundle_2.12:0.8.0,org.apache.spark:spark-avro_2.12:3.0.1,org.apache.hudi:hudi-java-client:0.8.0,org.apache.parquet:parquet-avro:1.10.1,org.apache.avro:avro:1.10.2,com.alibaba:fastjson:1.2.51,org.reflections:reflections:0.9.11,org.apache.httpcomponents:httpclient:4.5.5,io.openmessaging:openmessaging-connector:0.1.1,commons-cli:commons-cli:1.1,org.apache.rocketmq:rocketmq-client:4.4.0,org.apache.rocketmq:rocketmq-tools:4.4.0,org.apache.rocketmq:rocketmq-remoting:4.4.0,org.apache.rocketmq:rocketmq-openmessaging:4.3.2,org.slf4j:slf4j-api:1.7.7,com.google.guava:guava:20.0,org.apache.hadoop:hadoop-common:3.3.1,org.reflections:reflections:0.9.12,org.apache.hive:hive-exec:2.3.7 --conf 'spark.executor.userClassPathFirst=true'  --conf 'spark.driver.userClassPathFirst=true' --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' /xxx/rocketmq/rocketmq-connect-runtime-0.0.1-SNAPSHOT.jar  &

后续操作参考rocketmq-connect-hudi启动步骤