feat:add runtime demo.
diff --git a/README.md b/README.md
index 230ccff..505be7d 100644
--- a/README.md
+++ b/README.md
@@ -42,14 +42,14 @@
Apache RocketMQ Connect can connect the external upstream and downstream services,and You can deploy it according to the
manual: [RocketMQ Connect Quick Start](https://github.com/apache/rocketmq-connect)
-. Before deploy the Apache RocketMQ Connect, you should download the plugins below and put it to the "pluginPaths" which
+. Before deploy the Apache RocketMQ Connect, you should download the plugins below and put it to the "pluginpath" which
defined on rocketmq-connect.
-* [rocketmq-connect-eventbridge-0.0.1-SNAPSHOT-jar-with-dependencies.jar](https://cn-hangzhou-eventbridge.oss-cn-hangzhou.aliyuncs.com/rocketmq-connect-eventbridge-0.0.1-SNAPSHOT-jar-with-dependencies.jar)
-* [rocketmq-connect-dingtalk-1.0-SNAPSHOT-jar-with-dependencies.jar](https://cn-hangzhou-eventbridge.oss-cn-hangzhou.aliyuncs.com/rocketmq-connect-dingtalk-1.0-SNAPSHOT-jar-with-dependencies.jar)
-* [connect-cloudevent-transform-1.0.0-SNAPSHOT-jar-with-dependencies.jar](https://cn-hangzhou-eventbridge.oss-cn-hangzhou.aliyuncs.com/connect-cloudevent-transform-1.0.0-SNAPSHOT-jar-with-dependencies.jar)
-* [connect-filter-transform-1.0.0-SNAPSHOT-jar-with-dependencies.jar](https://cn-hangzhou-eventbridge.oss-cn-hangzhou.aliyuncs.com/connect-filter-transform-1.0.0-SNAPSHOT-jar-with-dependencies.jar)
-* [connect-eventbridge-transform-1.0.0-SNAPSHOT-jar-with-dependencies.jar](https://cn-hangzhou-eventbridge.oss-cn-hangzhou.aliyuncs.com/connect-eventbridge-transform-1.0.0-SNAPSHOT-jar-with-dependencies.jar)
+* [rocketmq-connect-eventbridge.jar](https://cn-hangzhou-eventbridge.oss-cn-hangzhou.aliyuncs.com/rocketmq-connect-eventbridge-0.0.1-SNAPSHOT-jar-with-dependencies.jar)
+* [eventbridge-connect-file.jar](https://cn-hangzhou-eventbridge.oss-cn-hangzhou.aliyuncs.com/eventbridge-connect-file-1.0.0-SNAPSHOT-jar-with-dependencies.jar)
+* [connect-cloudevent-transform.jar](https://cn-hangzhou-eventbridge.oss-cn-hangzhou.aliyuncs.com/connect-cloudevent-transform-1.0.0-SNAPSHOT-jar-with-dependencies.jar)
+* [connect-filter-transform.jar](https://cn-hangzhou-eventbridge.oss-cn-hangzhou.aliyuncs.com/connect-filter-transform-1.0.0-SNAPSHOT-jar-with-dependencies.jar)
+* [connect-eventbridge-transform.jar](https://cn-hangzhou-eventbridge.oss-cn-hangzhou.aliyuncs.com/connect-eventbridge-transform-1.0.0-SNAPSHOT-jar-with-dependencies.jar)
#### Apache RocketMQ EventBridge
@@ -66,202 +66,35 @@
# The cluster name of rocketmq.
rocketmq.cluster.name=DefaultCluster
-
-# The endpoint of rocketmq-connect.
-rocketmq.connect.endpoint=xxxxxx:8082
+runtime.pluginpath=xxxx
```
+Config the runtime.pluginpath to set the directory of plugin.
## Demo
####
-* Create EventBus
-
-```text
-POST /bus/createEventBus HTTP/1.1
-Host: demo.eventbridge.com
-Content-Type: application/json; charset=utf-8
-{
-"eventBusName":"demo-bus",
-"description":"a demo bus."
-}
-```
-
-* Create EventSource
-
-```text
-POST /source/createEventSource HTTP/1.1
-Host: demo.eventbridge.com
-Content-Type: application/json; charset=utf-8
-{
-"eventBusName":"demo-bus",
-"eventSourceName":"demo-source",
-"description":"A demo source."
-}
-```
-
-* Create EventRule
-
-```text
-POST /rule/createEventRule HTTP/1.1
-Host: demo.eventbridge.com
-Content-Type: application/json; charset=utf-8
-{
- "eventBusName":"demo-bus",
- "eventRuleName":"demo-rule",
- "description":"A demo rule.",
- "filterPattern":"{}"
-}
-```
-
-* Create Target
-
-This is a sample with EventBridge target:
-
-```text
-POST /target/createEventTargets HTTP/1.1
-Host: demo.eventbridge.com
-Content-Type: application/json; charset=utf-8
-{
- "eventBusName":"demo-bus",
- "eventRuleName":"demo-rule",
- "eventTargets":[
- {
- "eventTargetName":"eventbridge-target",
- "className":"acs.eventbridge",
- "config":{
- "RegionId":"cn-hangzhou",
- "AliyunEventBus":"rocketmq-eventbridge"
- }
- }
- ]
-}
-```
-
-This is a sample with DingTalk target:
-
-```text
-POST /target/createEventTargets HTTP/1.1
-Host: demo.eventbridge.com
-Content-Type: application/json; charset=utf-8
-{
- "eventBusName":"demo-bus",
- "eventRuleName":"demo-rule",
- "eventTargets":[
- {
- "eventTargetName":"dingtalk-target",
- "className":"acs.dingtalk",
- "config":{
- "WebHook":"https://oapi.dingtalk.com/robot/send?access_token=b43a54b702314415c2acdae97eda1e092528b7a9dddb31510a5b4430be2ef867",
- "SecretKey":"SEC53483bf496b8f9e0b4ab0ab669d422208e6ccfaedfd5120ea6b8426b9ecd47aa",
- "Body":"{\"template\":\"{\\\"text\\\":{\\\"content\\\":\\\"${content}\\\"},\\\"msgtype\\\":\\\"text\\\"}\",\"form\":\"TEMPLATE\",\"value\":\"{\\\"content\\\":\\\"$.data.body\\\"}\"}"
- }
- }
- ]
-}
-```
-
* Put Events to EventBus
-
+ The system creates a demo bus by default, and you can send events directly to the bus.
```text
-POST /putEvents HTTP/1.1
-Host: demo.eventbridge.com
-Content-Type:"application/cloudevents+json; charset=UTF-8"
-{
- "specversion" : "1.0",
- "type" : "com.github.pull_request.opened",
- "source" : "https://github.com/cloudevents/spec/pull",
- "subject" : "123",
- "id" : "A234-1234-1234",
- "time" : "2018-04-05T17:31:00Z",
- "datacontenttype" : "application/json",
- "data" : {
- "body":"demo"
- },
- "aliyuneventbusname":"demo-bus"
-}
+curl -X POST http://127.0.0.1:7001/putEvents \
+-H "Content-Type: application/json; charset=UTF-8" \
+-H "ce-specversion:1.0" \
+-H "ce-type:com.github.pull_request.opened" \
+-H "ce-source:https://github.com/cloudevents/spec/pull" \
+-H "ce-subject:demo" \
+-H "ce-id:1234-1234-1234" \
+-H "ce-datacontenttype:application/json" \
+-H "ce-time:2018-04-05T17:31:00Z" \
+-H "ce-aliyuneventbusname:demo-bus" \
+-d 'test'
```
-* Use HttpSource to put events
+* Check if the local file received a write event
-EventBridge HttpSource allows you to put events to eventbus in the form of webhook.
+In addition, by default, the system will create a demo rule for you to subscribe and push to the file. You can check whether there are events received in the directory:~/demo.eventbridge
-Here is an example explaining how to put events using EventBridge HttpSource.
+
-1. Create an EventBridge HttpSource
-
- - eventSourceName: Name of EventSource
- - eventBusName: Name of EventBus
- - description: Description
- - className: HttpEvent. This parameter is a fixed value and cannot be modified.
- - config: HttpSource Config
- - Type: Request type. Available values are 'HTTP', 'HTTPS' and 'HTTP&HTTPS'.
- - Method: Allowed HTTP request methods. The request will be filtered if the http request method type for accessing
- the webhook does not meet the configuration.
- - SecurityConfig: Security configuration type. Available values are 'none', 'ip' and 'referer'.
- - Ip: IP security configuration. Http requests whose source ip is not in the configured network segment will be
- filtered if the security configuration is selected as 'ip'.
- - Referer: Referer security configuration. HTTP requests whose referer is not in this configuration will be filtered
- if the security configuration is selected as 'referer'.
-
-A webhook will be generated after the creation of HttpSource.
-
-```text
-POST /source/createEventSource HTTP/1.1
-Host: demo.eventbridge.com
-Content-Type: application/json; charset=utf-8
-{
- "eventSourceName": "httpEventSourceDemo",
- "eventBusName": "demo",
- "description": "http source demo",
- "className": "HttpEvent",
- "config": {
- "Type": "HTTP&HTTPS",
- "Method": ["GET", "POST"],
- "SecurityConfig": "ip",
- "Ip": ["10.0.0.0/8"],
- "Referer":[]
- }
-}
-```
-
-2. Put event to EventBus
-
-Http request to access this webhook will be converted into a CloudEvent and delivered to eventbus.
-
-```
-curl -d '{"username": "testUser", "testData": "testData"}' -H 'Content-Type: application/json' -H 'Accept-Language: en-US' http://127.0.0.1:7001/webhook/putEvents?token=43146d108b224eb2adc581aedd28f272007320d14b9d
-```
-
-generated CloudEvent demo
-
-```json
-{
- "datacontenttype": "application/json",
- "data": {
- "body": {
- "username": "testUser",
- "testData": "testData"
- },
- "headers": {
- "Accept": "*/*",
- "User-Agent": "curl/7.64.1",
- "Host": "127.0.0.1:7001",
- "Accept-Language": "en-US",
- "Content-Length": "48",
- "Content-Type": "application/json"
- },
- "httpMethod": "POST",
- "path": "/webhook/putEvents",
- "queryString": {}
- },
- "subject": "DemoBus/httpEventSourceDemo",
- "source": "httpEventSourceDemo",
- "type": "eventbridge:Events:HTTPEvent",
- "specversion": "1.0",
- "id": "75bc099b-130a-45a8-82e1-3f9a7f0d10f3",
- "time": "2022-05-12T17:20:30.264+08:00"
-}
-```
-
+>> 链接:为什么输出:test,...
diff --git a/adapter/persistence/src/main/resources/db/migration/V9__init_file_target_class.sql b/adapter/persistence/src/main/resources/db/migration/V9__init_file_target_class.sql
new file mode 100644
index 0000000..5029f79
--- /dev/null
+++ b/adapter/persistence/src/main/resources/db/migration/V9__init_file_target_class.sql
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT INTO `event_target_class` (`id`,`name`,`api_params`,`target_transform`,`required_params`,`visual_config`,`description`,`gmt_create`,`gmt_modify`) VALUES (4,'file','{
+ "fileName":{
+ "type":"String",
+ "desc":"the output file name.",
+ "required":false
+ },
+ "line":{
+ "type":"String",
+ "desc":"the content write to file."
+ }
+}
+','{ "data":"${line}" }','{
+ "fileName":"${fileName}",
+ "class":"org.apache.rocketmq.connect.sink.FileSinkTask"
+}',null,'output file config','2023-06-09 16:54:55','2023-06-09 16:54:57');
\ No newline at end of file
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfiguration.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfiguration.java
index b6a7a46..56fa123 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfiguration.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfiguration.java
@@ -21,18 +21,22 @@
import org.apache.rocketmq.eventbridge.adapter.runtime.service.TargetRunnerConfigObserver;
import org.apache.rocketmq.eventbridge.adapter.runtime.service.TargetRunnerConfigOnDBObserver;
import org.apache.rocketmq.eventbridge.adapter.runtime.service.TargetRunnerConfigOnFileObserver;
+import org.apache.rocketmq.eventbridge.domain.repository.EventTargetRepository;
+import org.apache.rocketmq.eventbridge.domain.repository.EventTargetRunnerRepository;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.DependsOn;
@Configuration
public class RuntimeConfiguration {
@Bean(name = "runnerConfigObserver")
- public TargetRunnerConfigObserver targetRunnerConfigObserver(@Value("${runtime.config.mode}") String configMode) {
+ public TargetRunnerConfigObserver targetRunnerConfigObserver(@Value("${runtime.config.mode}") String configMode,
+ EventTargetRunnerRepository eventTargetRunnerRepository, EventTargetRepository eventTargetRepository) {
switch (ConfigModeEnum.parse(configMode)) {
case DB:
- return new TargetRunnerConfigOnDBObserver();
+ return new TargetRunnerConfigOnDBObserver(eventTargetRunnerRepository, eventTargetRepository);
default:
return new TargetRunnerConfigOnFileObserver();
}
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnDBObserver.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnDBObserver.java
index c272bd3..f018ce1 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnDBObserver.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnDBObserver.java
@@ -48,7 +48,9 @@
@Autowired
EventTargetRepository eventTargetRepository;
- public TargetRunnerConfigOnDBObserver() {
+ public TargetRunnerConfigOnDBObserver(EventTargetRunnerRepository eventTargetRunnerRepository, EventTargetRepository eventTargetRepository) {
+ this.eventTargetRunnerRepository = eventTargetRunnerRepository;
+ this.eventTargetRepository = eventTargetRepository;
}
@Override
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java
index b5d34f3..cbd01e0 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java
@@ -41,7 +41,7 @@
import org.springframework.stereotype.Component;
@Slf4j
-@Component
+//@Component
public class TargetRunnerConfigOnFileObserver extends AbstractTargetRunnerConfigObserver {
private String pathName;
diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
index 3005486..56a4550 100644
--- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
+++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
@@ -217,7 +217,11 @@
* init rocket mq pull consumer
*/
private void initConsumeWorkers() {
- for (SubscribeRunnerKeys subscribeRunnerKeys : runnerConfigObserver.getSubscribeRunnerKeys()) {
+ Set<SubscribeRunnerKeys> subscribeRunnerKeysSet = runnerConfigObserver.getSubscribeRunnerKeys();
+ if(subscribeRunnerKeysSet == null || subscribeRunnerKeysSet.isEmpty()){
+ return;
+ }
+ for (SubscribeRunnerKeys subscribeRunnerKeys : subscribeRunnerKeysSet) {
LitePullConsumer litePullConsumer = initLitePullConsumer(subscribeRunnerKeys);
ConsumeWorker consumeWorker = new ConsumeWorker(litePullConsumer, subscribeRunnerKeys.getRunnerName());
consumeWorkerMap.put(subscribeRunnerKeys.getRunnerName(), consumeWorker);
diff --git a/docs/CreateDingTalkTarget.md b/docs/CreateDingTalkTarget.md
new file mode 100644
index 0000000..4446433
--- /dev/null
+++ b/docs/CreateDingTalkTarget.md
@@ -0,0 +1,86 @@
+
+## Create EventBus
+
+```text
+POST /bus/createEventBus HTTP/1.1
+Host: demo.eventbridge.com
+Content-Type: application/json; charset=utf-8
+{
+"eventBusName":"demo-bus",
+"description":"a demo bus."
+}
+```
+
+## Create EventSource
+
+```text
+POST /source/createEventSource HTTP/1.1
+Host: demo.eventbridge.com
+Content-Type: application/json; charset=utf-8
+{
+"eventBusName":"demo-bus",
+"eventSourceName":"demo-source",
+"description":"A demo source."
+}
+```
+
+## Create EventRule
+
+```text
+POST /rule/createEventRule HTTP/1.1
+Host: demo.eventbridge.com
+Content-Type: application/json; charset=utf-8
+{
+ "eventBusName":"demo-bus",
+ "eventRuleName":"demo-rule",
+ "description":"A demo rule.",
+ "filterPattern":"{}"
+}
+```
+
+## Create Target
+
+This is a sample with EventBridge target:
+
+```text
+POST /target/createEventTargets HTTP/1.1
+Host: demo.eventbridge.com
+Content-Type: application/json; charset=utf-8
+{
+ "eventBusName":"demo-bus",
+ "eventRuleName":"demo-rule",
+ "eventTargets":[
+ {
+ "eventTargetName":"eventbridge-target",
+ "className":"acs.eventbridge",
+ "config":{
+ "RegionId":"cn-hangzhou",
+ "AliyunEventBus":"rocketmq-eventbridge"
+ }
+ }
+ ]
+}
+```
+
+This is a sample with DingTalk target:
+
+```text
+POST /target/createEventTargets HTTP/1.1
+Host: demo.eventbridge.com
+Content-Type: application/json; charset=utf-8
+{
+ "eventBusName":"demo-bus",
+ "eventRuleName":"demo-rule",
+ "eventTargets":[
+ {
+ "eventTargetName":"dingtalk-target",
+ "className":"acs.dingtalk",
+ "config":{
+ "WebHook":"https://oapi.dingtalk.com/robot/send?access_token=b43a54b702314415c2acdae97eda1e092528b7a9dddb31510a5b4430be2ef867",
+ "SecretKey":"SEC53483bf496b8f9e0b4ab0ab669d422208e6ccfaedfd5120ea6b8426b9ecd47aa",
+ "Body":"{\"template\":\"{\\\"text\\\":{\\\"content\\\":\\\"${content}\\\"},\\\"msgtype\\\":\\\"text\\\"}\",\"form\":\"TEMPLATE\",\"value\":\"{\\\"content\\\":\\\"$.data.body\\\"}\"}"
+ }
+ }
+ ]
+}
+```
diff --git a/docs/EventSource.md b/docs/EventSource.md
new file mode 100644
index 0000000..13dd41b
--- /dev/null
+++ b/docs/EventSource.md
@@ -0,0 +1,42 @@
+## Use HttpSource to put events
+
+EventBridge HttpSource allows you to put events to eventbus in the form of webhook.
+
+Here is an example explaining how to put events using EventBridge HttpSource.
+
+1. Create an EventBridge HttpSource
+
+ - eventSourceName: Name of EventSource
+ - eventBusName: Name of EventBus
+ - description: Description
+ - className: HttpEvent. This parameter is a fixed value and cannot be modified.
+ - config: HttpSource Config
+ - Type: Request type. Available values are 'HTTP', 'HTTPS' and 'HTTP&HTTPS'.
+ - Method: Allowed HTTP request methods. The request will be filtered if the http request method type for accessing
+ the webhook does not meet the configuration.
+ - SecurityConfig: Security configuration type. Available values are 'none', 'ip' and 'referer'.
+ - Ip: IP security configuration. Http requests whose source ip is not in the configured network segment will be
+ filtered if the security configuration is selected as 'ip'.
+ - Referer: Referer security configuration. HTTP requests whose referer is not in this configuration will be filtered
+ if the security configuration is selected as 'referer'.
+
+A webhook will be generated after the creation of HttpSource.
+
+```text
+POST /source/createEventSource HTTP/1.1
+Host: demo.eventbridge.com
+Content-Type: application/json; charset=utf-8
+{
+ "eventSourceName": "httpEventSourceDemo",
+ "eventBusName": "demo",
+ "description": "http source demo",
+ "className": "HttpEvent",
+ "config": {
+ "Type": "HTTP&HTTPS",
+ "Method": ["GET", "POST"],
+ "SecurityConfig": "ip",
+ "Ip": ["10.0.0.0/8"],
+ "Referer":[]
+ }
+}
+```
diff --git a/start/src/main/resources/application.properties b/start/src/main/resources/application.properties
index 3736814..dc13acc 100644
--- a/start/src/main/resources/application.properties
+++ b/start/src/main/resources/application.properties
@@ -27,14 +27,14 @@
spring.flyway.placeholderReplacement=false
## rocketmq
rocketmq.namesrvAddr=localhost:9876
-rocketmq.connect.endpoint=http://127.0.0.1:8082
+
rocketmq.cluster.name=DefaultCluster
## runtime
-runtime.config.mode=FILE
+runtime.config.mode=DB
runtime.storage.mode=ROCKETMQ
rumtime.name=eventbridge-runtimer
runtime.pluginpath=/Users/Local/eventbridge/plugin
-runtime.storePathRootDir=/Users/Local/eventbridge/store
+
## log
app.name=rocketmqeventbridge
diff --git a/supports/connect-standard/README.md b/supports/eventbridge-connect-file/README.md
similarity index 100%
copy from supports/connect-standard/README.md
copy to supports/eventbridge-connect-file/README.md
diff --git a/supports/connect-standard/pom.xml b/supports/eventbridge-connect-file/pom.xml
similarity index 98%
rename from supports/connect-standard/pom.xml
rename to supports/eventbridge-connect-file/pom.xml
index 90c8bb5..a42cdfa 100644
--- a/supports/connect-standard/pom.xml
+++ b/supports/eventbridge-connect-file/pom.xml
@@ -14,7 +14,7 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>org.apache.rocketmq</groupId>
- <artifactId>connect-standard</artifactId>
+ <artifactId>eventbridge-connect-file</artifactId>
<version>1.0.0-SNAPSHOT</version>
<modelVersion>4.0.0</modelVersion>
@@ -154,7 +154,7 @@
<configuration>
<archive>
<manifest>
- <mainClass>org.apache.rocketmq.connect.transform.eventbridge.EventBridgeTransform
+ <mainClass>org.apache.rocketmq.connect.sink.FileSinkTask
</mainClass>
</manifest>
</archive>
diff --git a/supports/eventbridge-connect-file/src/main/java/org/apache/rocketmq/connect/sink/FileConstant.java b/supports/eventbridge-connect-file/src/main/java/org/apache/rocketmq/connect/sink/FileConstant.java
new file mode 100644
index 0000000..a0843d1
--- /dev/null
+++ b/supports/eventbridge-connect-file/src/main/java/org/apache/rocketmq/connect/sink/FileConstant.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.sink;
+
+public class FileConstant {
+ public static final String FILE_NAME="fileName";
+}
\ No newline at end of file
diff --git a/supports/eventbridge-connect-file/src/main/java/org/apache/rocketmq/connect/sink/FileSinkTask.java b/supports/eventbridge-connect-file/src/main/java/org/apache/rocketmq/connect/sink/FileSinkTask.java
new file mode 100644
index 0000000..39526cd
--- /dev/null
+++ b/supports/eventbridge-connect-file/src/main/java/org/apache/rocketmq/connect/sink/FileSinkTask.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.sink;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.List;
+
+public class FileSinkTask extends SinkTask {
+ private String fileName = System.getProperty("user.home") + "/demo.eventbridge";
+ private PrintStream outputStream;
+
+ @Override public void put(List<ConnectRecord> sinkRecords) throws ConnectException {
+ if (sinkRecords == null || sinkRecords.isEmpty()) {
+ return;
+ }
+ for (ConnectRecord connectRecord : sinkRecords) {
+ try {
+ outputStream.println(connectRecord.getData());
+ } catch (Throwable e) {
+ throw new ConnectException("Write record to file failed.", e);
+ }
+ }
+
+ }
+
+ @Override public void pause() {
+
+ }
+
+ @Override public void resume() {
+
+ }
+
+ @Override public void validate(KeyValue config) {
+
+ }
+
+ @Override public void init(KeyValue config) {
+ String inputFileName = config.getString(FileConstant.FILE_NAME);
+ if (inputFileName != null) {
+ fileName = inputFileName;
+ }
+ try {
+
+ outputStream = new PrintStream(
+ Files.newOutputStream(Paths.get(fileName), StandardOpenOption.CREATE, StandardOpenOption.APPEND),
+ false,
+ StandardCharsets.UTF_8.name());
+ } catch (IOException e) {
+ throw new ConnectException("Create outputStream: " + fileName + " for FileSinkTask failed", e);
+ }
+
+ }
+
+ @Override public void stop() {
+ if (outputStream != null) {
+ outputStream.close();
+ }
+ }
+}
\ No newline at end of file
diff --git a/supports/eventbridge-connect-file/src/test/java/org/apache/rocketmq/connect/FileSinkTaskTest.java b/supports/eventbridge-connect-file/src/test/java/org/apache/rocketmq/connect/FileSinkTaskTest.java
new file mode 100644
index 0000000..752bad4
--- /dev/null
+++ b/supports/eventbridge-connect-file/src/test/java/org/apache/rocketmq/connect/FileSinkTaskTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.internal.DefaultKeyValue;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.connect.sink.FileSinkTask;
+
+public class FileSinkTaskTest {
+ public static void main(String[] args) {
+ FileSinkTask fileSinkTask = new FileSinkTask();
+ KeyValue config = new DefaultKeyValue();
+ fileSinkTask.init(config);
+ List<ConnectRecord> sinkRecords = new ArrayList<>();
+ ConnectRecord connectRecord= new ConnectRecord(null,null,null);
+ connectRecord.setData("test");
+ sinkRecords.add(connectRecord);
+ fileSinkTask.put(sinkRecords);
+ fileSinkTask.stop();
+ }
+}
\ No newline at end of file
diff --git a/supports/connect-standard/README.md b/supports/eventbridge-connect-standard/README.md
similarity index 100%
rename from supports/connect-standard/README.md
rename to supports/eventbridge-connect-standard/README.md
diff --git a/supports/connect-standard/pom.xml b/supports/eventbridge-connect-standard/pom.xml
similarity index 98%
copy from supports/connect-standard/pom.xml
copy to supports/eventbridge-connect-standard/pom.xml
index 90c8bb5..4b6e61b 100644
--- a/supports/connect-standard/pom.xml
+++ b/supports/eventbridge-connect-standard/pom.xml
@@ -14,7 +14,7 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>org.apache.rocketmq</groupId>
- <artifactId>connect-standard</artifactId>
+ <artifactId>eventbridge-connect-standard</artifactId>
<version>1.0.0-SNAPSHOT</version>
<modelVersion>4.0.0</modelVersion>
@@ -154,7 +154,7 @@
<configuration>
<archive>
<manifest>
- <mainClass>org.apache.rocketmq.connect.transform.eventbridge.EventBridgeTransform
+ <mainClass>org.apache.rocketmq.connect.sink.FileSinkTask
</mainClass>
</manifest>
</archive>
diff --git a/supports/eventbridge-connect-standard/src/main/java/org/apache/rocketmq/connect/sink/StandardConstant.java b/supports/eventbridge-connect-standard/src/main/java/org/apache/rocketmq/connect/sink/StandardConstant.java
new file mode 100644
index 0000000..c399266
--- /dev/null
+++ b/supports/eventbridge-connect-standard/src/main/java/org/apache/rocketmq/connect/sink/StandardConstant.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.sink;
+
+public class StandardConstant {
+ public static final String STANDARD_PREFIX="prefix";
+}
\ No newline at end of file
diff --git a/supports/connect-standard/src/main/java/org/apache/rocketmq/connect/StandardSinkTask.java b/supports/eventbridge-connect-standard/src/main/java/org/apache/rocketmq/connect/sink/StandardSinkTask.java
similarity index 85%
rename from supports/connect-standard/src/main/java/org/apache/rocketmq/connect/StandardSinkTask.java
rename to supports/eventbridge-connect-standard/src/main/java/org/apache/rocketmq/connect/sink/StandardSinkTask.java
index 9714be2..ee02a1b 100644
--- a/supports/connect-standard/src/main/java/org/apache/rocketmq/connect/StandardSinkTask.java
+++ b/supports/eventbridge-connect-standard/src/main/java/org/apache/rocketmq/connect/sink/StandardSinkTask.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.rocketmq.connect;
+package org.apache.rocketmq.connect.sink;
import com.google.gson.Gson;
import io.openmessaging.KeyValue;
@@ -26,11 +26,13 @@
public class StandardSinkTask extends SinkTask {
+ private String prefix;
+
@Override public void put(List<ConnectRecord> sinkRecords) throws ConnectException {
if (sinkRecords == null || sinkRecords.isEmpty()) {
return;
}
- sinkRecords.forEach(sinkRecord -> System.out.println(new Gson().toJson(sinkRecord)));
+ sinkRecords.forEach(sinkRecord -> System.out.println(prefix + ":" + new Gson().toJson(sinkRecord.getData())));
}
@Override public void pause() {
@@ -46,7 +48,7 @@
}
@Override public void init(KeyValue config) {
-
+ prefix = config.getString(StandardConstant.STANDARD_PREFIX);
}
@Override public void stop() {