SFTP Server(文件数据) -> RocketMQ Connect
提示 : ${ROCKETMQ_HOME} 位置说明
bin-release.zip 版本:/rocketmq-all-4.9.4-bin-release
source-release.zip 版本:/rocketmq-all-4.9.4-source-release/distribution
RocketMQ Connector SFTP
$ cd rocketmq-connect/connectors/rocketmq-connect-sftp/ $ mvn clean package -Dmaven.test.skip=true
将 RocketMQ Connector SFTP 编译好的包放入Runtime加载目录。命令如下:
mkdir -p /usr/local/connector-plugins cp target/rocketmq-connect-sftp-0.0.1-SNAPSHOT-jar-with-dependencies.jar /usr/local/connector-plugins
cd rocketmq-connect mvn -Prelease-connect -DskipTests clean install -U
修改配置connect-standalone.conf
,重点配置如下
$ cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT $ vim conf/connect-standalone.conf
workerId=standalone-worker storePathRootDir=/tmp/storeRoot ## Http port for user to access REST API httpPort=8082 # Rocketmq namesrvAddr namesrvAddr=localhost:9876 # RocketMQ acl aclEnable=false accessKey=rocketmq secretKey=12345678 autoCreateGroupEnable=false clusterName="DefaultCluster" # 核心配置,将之前编译好包的插件目录配置在此; # Source or sink connector jar file dir,The default value is rocketmq-connect-sample pluginPaths=/usr/local/connector-plugins
cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT sh bin/connect-standalone.sh -c conf/connect-standalone.conf &
使用 MAC OS 自带的 SFTP 服务器
登陆 SFTP 服务器,将具有如何内容的 souce.txt 文件放入用户目录,例如:/path/to/
张三|100000202211290001|20221129001|30000.00|2022-11-28|03:00:00|7.00 李四|100000202211290002|20221129002|40000.00|2022-11-28|04:00:00|9.00 赵五|100000202211290003|20221129003|50000.00|2022-11-28|05:00:00|12.00
同步 SFTP 文件:source.txt 作用:通过登陆 SFTP 服务器,解析文件并封装成通用的ConnectRecord对象,发送的RocketMQ Topic当中
curl -X POST --location "http://localhost:8082/connectors/SftpSourceConnector" --http1.1 \ -H "Host: localhost:8082" \ -H "Content-Type: application/json" \ -d "{ \"connector.class\": \"org.apache.rocketmq.connect.http.sink.SftpSourceConnector\", \"host\": \"127.0.0.1\", \"port\": 22, \"username\": \"wencheng\", \"password\": \"1617\", \"filePath\": \"/Users/wencheng/Documents/source.txt\", \"connect.topicname\": \"sftpTopic\", \"fieldSeparator\": \"|\", \"fieldSchema\": \"username|idCardNo|orderNo|orderAmount|trxDate|trxTime|profit\" }"
运行完以上命令后,SFTP 服务上的文件数据会被组织成给定格式的数据,写入 MQ。之后可以通过 sink connector 或者其他业务系统去消费它。
作用:通过消费Topic中的数据,使用SFTP协议写入到目标文件当中
curl -X POST --location "http://localhost:8082/connectors/SftpSinkConnector" --http1.1 \ -H "Host: localhost:8082" \ -H "Content-Type: application/json" \ -d "{ \"connector.class\": \"org.apache.rocketmq.connect.http.sink.SftpSinkConnector\", \"host\": \"127.0.0.1\", \"port\": 22, \"username\": \"wencheng\", \"password\": \"1617\", \"filePath\": \"/Users/wencheng/Documents/sink.txt\", \"connect.topicnames\": \"sftpTopic\", \"fieldSeparator\": \"|\", \"fieldSchema\": \"username|idCardNo|orderNo|orderAmount|trxDate|trxTime|profit\" }"