Be responsible for consuming messages from producer and writing data to another web service system.
进入想要使用的connectors目录下(以rocketmq-connect-http目录为例),使用以下指令将插件进行打包
mvn clean package -Dmaven.test.skip=true
打包好的插件以jar包的模式出现在rocketmq-connect-http/target/目录下
在distribution/conf目录下找的对应的配置文件进行更新,对于standalone的启动方式,更新connect-standalone.conf文件中的pluginPaths变量
pluginPaths=(you plugin path)
相应的,使用distributed启动方式,则更新connect-distributed.conf中的变量
创建并启动对应的SourceConnector以及SinkConnector
POST http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-jdbc-source-connector-name}
{
"connector.class":"org.apache.rocketmq.connect.http.HttpSinkTask",
"url":"${url}",
"method":"${method}",
"connect.topicnames":"${connect.topicnames}"
}
例子
http://localhost:8081/connectors/httpSinkConnector?config={"connector-class":"org.apache.rocketmq.connect.http.HttpSinkTask","connect-topicname" : "http-topic","url":"192.168.1.2"}
http://127.0.0.1:8082/connectors/httpSinkConnector?config={"connector.class":"org.apache.rocketmq.connect.http.HttpSinkConnector","url":"http://localhost:8080/api","timeout":"6000","connect.topicnames":"fileTopic","headerParameters":"{\"header1k\":\"header1v\"}","method":"POST","queryParameters":"{\"queryk1\":\"queryv1\"}"}
更多参数见rocketmq-connect-http 参数说明
注:
rocketmq-http-connect的启动依赖于rocketmq-connect-runtime项目的启动,需将打好的所有jar包放置到runtime项目中pluginPaths配置的路径后再执行上面的启动请求,该值配置在runtime项目下的connect.conf文件中
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-http-connector-name}/stop
| KEY | TYPE | Must be filled | Description | Example |
|---|---|---|---|---|
| connect-topicname | String | YES | sink需要处理数据消息topic | fileTopic |
| url | String | YES | 目标端url地址 | http://localhost:8080/api |
| method | String | YES | http请求方法 | POST |
| body | String | No | http请求body字段,不填时默认使用事件的Data字段 | POST |
| headerParameters | String | NO | http请求header map动态参数Json字符串 | {“key1”:“value1”} |
| fixedHeaderParameters | String | NO | http请求header map静态参数Json字符串 | {“key1”:“value1”} |
| queryParameters | String | NO | http请求query map动态参数Json字符串 | {“key1”:“value1”} |
| fixedQueryParameters | String | NO | http请求query map静态参数Json字符串 | {“key1”:“value1”} |
| socks5UserName | String | NO | sock5代理用户名 | ***** |
| socks5Password | String | NO | sock5代理密码 | ***** |
| socks5Endpoint | String | NO | sock5代理地址 | http://localhost:7000 |
| timeout | String | NO | http请求超时时间(毫秒) | 3000 |
| concurrency | String | NO | http请求并发数 | 1 |
| authType | String | NO | 认证方式 (BASIC_AUTH/OAUTH_AUTH/API_KEY_AUTH/NONE) | BASIC_AUTH |
| basicUsername | String | NO | basic auth username | ***** |
| basicPassword | String | NO | basic auth password | ***** |
| apiKeyUsername | String | NO | api key auth username | ***** |
| apiKeyPassword | String | NO | api key auth password | ***** |
| oAuthEndpoint | String | NO | oauth 地址 | http://localhost:7000 |
| oAuthHttpMethod | String | NO | oauth http请求方法 | GET |
| oAuthClientId | String | NO | oauth client id | xxxx |
| oAuthClientSecret | String | NO | oauth client secret | xxxx |
| oAuthHeaderParameters | String | NO | oauth header map参数Json字符串 | {“key1”:“value1”} |
| oAuthQueryParameters | String | NO | oauth query map参数Json字符串 | {“key1”:“value1”} |
| oAuthBody | String | NO | oauth body参数 | bodyData |
| token | String | NO | http请求token,如果非空,会添加到http请求的header中,key为token | xxxx |