rocketmq-connect-http

  • rocketmq-connect-http 说明
Be responsible for consuming messages from producer and writing data to another web service system.

rocketmq-connect-http使用方法

  1. 进入想要使用的connectors目录下(以rocketmq-connect-http目录为例),使用以下指令将插件进行打包

    mvn clean package -Dmaven.test.skip=true
    
  2. 打包好的插件以jar包的模式出现在rocketmq-connect-http/target/目录下

  3. distribution/conf目录下找的对应的配置文件进行更新,对于standalone的启动方式,更新connect-standalone.conf文件中的pluginPaths变量

    pluginPaths=(you plugin path)
    

    相应的,使用distributed启动方式,则更新connect-distributed.conf中的变量

  4. 创建并启动对应的SourceConnector以及SinkConnector

rocketmq-connect-http 启动

  • http-sink-connector 启动
POST  http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-jdbc-source-connector-name}
{
    "connector.class":"org.apache.rocketmq.connect.http.HttpSinkTask",
    "url":"${url}",
    "method":"${method}",
    "connect.topicnames":"${connect.topicnames}"
}

例子

http://localhost:8081/connectors/httpSinkConnector?config={"connector-class":"org.apache.rocketmq.connect.http.HttpSinkTask","connect-topicname" : "http-topic","url":"192.168.1.2"}
http://127.0.0.1:8082/connectors/httpSinkConnector?config={"connector.class":"org.apache.rocketmq.connect.http.HttpSinkConnector","url":"http://localhost:8080/api","timeout":"6000","connect.topicnames":"fileTopic","headerParameters":"{\"header1k\":\"header1v\"}","method":"POST","queryParameters":"{\"queryk1\":\"queryv1\"}"}

更多参数见rocketmq-connect-http 参数说明

注: rocketmq-http-connect 的启动依赖于rocketmq-connect-runtime项目的启动,需将打好的所有jar包放置到runtime项目中pluginPaths配置的路径后再执行上面的启动请求,该值配置在runtime项目下的connect.conf文件中

rocketmq-connect-http 停止

http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-http-connector-name}/stop

rocketmq-connect-http 参数说明

  • http-sink-connector 参数说明
KEYTYPEMust be filledDescriptionExample
connect-topicnameStringYESsink需要处理数据消息topicfileTopic
urlStringYES目标端url地址http://localhost:8080/api
methodStringYEShttp请求方法POST
bodyStringNohttp请求body字段,不填时默认使用事件的Data字段POST
headerParametersStringNOhttp请求header map动态参数Json字符串{“key1”:“value1”}
fixedHeaderParametersStringNOhttp请求header map静态参数Json字符串{“key1”:“value1”}
queryParametersStringNOhttp请求query map动态参数Json字符串{“key1”:“value1”}
fixedQueryParametersStringNOhttp请求query map静态参数Json字符串{“key1”:“value1”}
socks5UserNameStringNOsock5代理用户名*****
socks5PasswordStringNOsock5代理密码*****
socks5EndpointStringNOsock5代理地址http://localhost:7000
timeoutStringNOhttp请求超时时间(毫秒)3000
concurrencyStringNOhttp请求并发数1
authTypeStringNO认证方式 (BASIC_AUTH/OAUTH_AUTH/API_KEY_AUTH/NONE)BASIC_AUTH
basicUsernameStringNObasic auth username*****
basicPasswordStringNObasic auth password*****
apiKeyUsernameStringNOapi key auth username*****
apiKeyPasswordStringNOapi key auth password*****
oAuthEndpointStringNOoauth 地址http://localhost:7000
oAuthHttpMethodStringNOoauth http请求方法GET
oAuthClientIdStringNOoauth client idxxxx
oAuthClientSecretStringNOoauth client secretxxxx
oAuthHeaderParametersStringNOoauth header map参数Json字符串{“key1”:“value1”}
oAuthQueryParametersStringNOoauth query map参数Json字符串{“key1”:“value1”}
oAuthBodyStringNOoauth body参数bodyData
tokenStringNOhttp请求token,如果非空,会添加到http请求的header中,key为tokenxxxx