feat:add runtime demo.
diff --git a/README.md b/README.md
index 505be7d..6dac851 100644
--- a/README.md
+++ b/README.md
@@ -88,13 +88,13 @@
-H "ce-datacontenttype:application/json" \
-H "ce-time:2018-04-05T17:31:00Z" \
-H "ce-aliyuneventbusname:demo-bus" \
--d 'test'
+-d 'A test recrod.'
```
* Check if the local file received a write event
-In addition, by default, the system will create a demo rule for you to subscribe and push to the file. You can check whether there are events received in the directory:~/demo.eventbridge
+In addition, by default, the system will create a demo rule for you to subscribe and push to the file. You can check whether there are events received in the directory:~/demo
+![img.png](docs/cn/images/demo.png)
-![img.png](img.png)
-
->> 链接:为什么输出:test,...
+Why does the file output the data attribute of CloudEvent instead of other attributes?This is because the configuration in the demo rule is to output "$.data" in CloudEvent to the file line.
+You can refer to this [document](docs/CreateFileTarget.md) to configure and modify event targets.
diff --git a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventBusController.java b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventBusController.java
index 8f8388c..79c4a21 100644
--- a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventBusController.java
+++ b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventBusController.java
@@ -95,7 +95,7 @@
paginationResult.getData()
.forEach(eventBus -> {
EventBusDTO eventBusDTO = new EventBusDTO();
- eventBusDTO.setEventBusName(eventBusDTO.getEventBusName());
+ eventBusDTO.setEventBusName(eventBus.getName());
eventBusDTO.setDescription(eventBus.getDescription());
eventBuses.add(eventBusDTO);
});
diff --git a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/dto/bus/ListEventBusesRequest.java b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/dto/bus/ListEventBusesRequest.java
index 48b0932..d1590fe 100644
--- a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/dto/bus/ListEventBusesRequest.java
+++ b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/dto/bus/ListEventBusesRequest.java
@@ -25,7 +25,7 @@
public class ListEventBusesRequest extends BaseRequest {
@SerializedName("NextToken")
- private String nextToken;
+ private String nextToken = "0";
@SerializedName("MaxResults")
private int maxResults;
diff --git a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/FlywayConfig.java b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/FlywayConfig.java
new file mode 100644
index 0000000..5ed64d3
--- /dev/null
+++ b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/FlywayConfig.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.persistence;
+
+import javax.sql.DataSource;
+import org.flywaydb.core.Flyway;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class FlywayConfig {
+
+ @Autowired
+ private DataSource dataSource;
+
+ @Bean("flyway")
+ public Flyway Flyway() {
+ Flyway flyway = Flyway.configure()
+ .dataSource(dataSource)
+ .cleanDisabled(Boolean.TRUE)
+ .createSchemas(Boolean.TRUE)
+ .validateMigrationNaming(Boolean.TRUE)
+ .baselineOnMigrate(Boolean.TRUE)
+ .placeholderReplacement(Boolean.FALSE)
+ .load();
+ flyway.migrate();
+ return flyway;
+ }
+}
\ No newline at end of file
diff --git a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java
index 0c92aa4..97bfb88 100644
--- a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java
+++ b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java
@@ -74,9 +74,6 @@
@Override
public List<EventTargetRunner> listEventTargetRunners(String accountId, String eventBusName, String eventRuleName) {
- if (StringUtils.isBlank(accountId) || StringUtils.isBlank(eventBusName) || StringUtils.isBlank(eventRuleName)) {
- return Lists.newArrayListWithCapacity(0);
- }
List<EventTargetRunnerDO> eventTargetRunnerDOS = eventTargetRunnerMapper.listEventTargetRunners(accountId, eventBusName, eventRuleName);
if (eventTargetRunnerDOS == null || eventTargetRunnerDOS.isEmpty()) {
return Lists.newArrayListWithCapacity(0);
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
index d7e41fc..79dfd00 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
@@ -25,10 +25,9 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import javax.annotation.PostConstruct;
-
import org.apache.commons.collections.MapUtils;
-import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.OffsetManager;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.CirculatorContext;
+import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.OffsetManager;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.transfer.TransformEngine;
import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread;
import org.apache.rocketmq.eventbridge.adapter.runtime.error.ErrorHandler;
@@ -68,27 +67,29 @@
@Override
public void run() {
+ List<ConnectRecord> afterTransformConnect= Lists.newArrayList();
while (!stopped) {
- Map<String, List<ConnectRecord>> eventRecordMap = circulatorContext.takeEventRecords(batchSize);
- if(MapUtils.isEmpty(eventRecordMap)){
- logger.info("listen eventRecords is empty, continue by curTime - {}", System.currentTimeMillis());
- this.waitForRunning(1000);
- continue;
- }
- Map<String, TransformEngine<ConnectRecord>> latestTransformMap = circulatorContext.getTaskTransformMap();
- if (MapUtils.isEmpty(latestTransformMap)) {
- logger.warn("latest transform engine is empty, continue by curTime - {}", System.currentTimeMillis());
- this.waitForRunning(3000);
- continue;
- }
+ try {
+ Map<String, List<ConnectRecord>> eventRecordMap = circulatorContext.takeEventRecords(batchSize);
+ if (MapUtils.isEmpty(eventRecordMap)) {
+ logger.trace("listen eventRecords is empty, continue by curTime - {}", System.currentTimeMillis());
+ this.waitForRunning(1000);
+ continue;
+ }
+ Map<String, TransformEngine<ConnectRecord>> latestTransformMap = circulatorContext.getTaskTransformMap();
+ if (MapUtils.isEmpty(latestTransformMap)) {
+ logger.warn("latest transform engine is empty, continue by curTime - {}", System.currentTimeMillis());
+ this.waitForRunning(3000);
+ continue;
+ }
- List<ConnectRecord> afterTransformConnect = Lists.newArrayList();
- List<CompletableFuture<Void>> completableFutures = Lists.newArrayList();
- for(String runnerName: eventRecordMap.keySet()){
- TransformEngine<ConnectRecord> curTransformEngine = latestTransformMap.get(runnerName);
- List<ConnectRecord> curEventRecords = eventRecordMap.get(runnerName);
- curEventRecords.forEach(pullRecord -> {
- CompletableFuture<Void> transformFuture = CompletableFuture.supplyAsync(() -> curTransformEngine.doTransforms(pullRecord))
+ afterTransformConnect.clear();
+ List<CompletableFuture<Void>> completableFutures = Lists.newArrayList();
+ for (String runnerName : eventRecordMap.keySet()) {
+ TransformEngine<ConnectRecord> curTransformEngine = latestTransformMap.get(runnerName);
+ List<ConnectRecord> curEventRecords = eventRecordMap.get(runnerName);
+ curEventRecords.forEach(pullRecord -> {
+ CompletableFuture<Void> transformFuture = CompletableFuture.supplyAsync(() -> curTransformEngine.doTransforms(pullRecord))
.exceptionally((exception) -> {
logger.error("transfer do transform event record failed,stackTrace-", exception);
errorHandler.handle(pullRecord, exception);
@@ -101,11 +102,9 @@
offsetManager.commit(pullRecord);
}
});
- completableFutures.add(transformFuture);
- });
- }
-
- try {
+ completableFutures.add(transformFuture);
+ });
+ }
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[eventRecordMap.values().size()])).get();
circulatorContext.offerTargetTaskQueue(afterTransformConnect);
logger.info("offer target task queues succeed, transforms - {}", JSON.toJSONString(afterTransformConnect));
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
index 3f61b10..85dae3b 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
@@ -60,7 +60,7 @@
while (!stopped) {
Map<String, List<ConnectRecord>> targetRecordMap = circulatorContext.takeTargetRecords(batchSize);
if (MapUtils.isEmpty(targetRecordMap)) {
- logger.info("current target pusher is empty");
+ logger.trace("current target pusher is empty");
this.waitForRunning(1000);
continue;
}
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnDBObserver.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnDBObserver.java
index f018ce1..0e34f6a 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnDBObserver.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnDBObserver.java
@@ -48,7 +48,8 @@
@Autowired
EventTargetRepository eventTargetRepository;
- public TargetRunnerConfigOnDBObserver(EventTargetRunnerRepository eventTargetRunnerRepository, EventTargetRepository eventTargetRepository) {
+ public TargetRunnerConfigOnDBObserver(EventTargetRunnerRepository eventTargetRunnerRepository,
+ EventTargetRepository eventTargetRepository) {
this.eventTargetRunnerRepository = eventTargetRunnerRepository;
this.eventTargetRepository = eventTargetRepository;
}
@@ -56,7 +57,17 @@
@Override
@Transactional
public Set<TargetRunnerConfig> getLatestTargetRunnerConfig() {
- List<EventTargetRunner> eventTargetRunners = eventTargetRunnerRepository.listEventTargetRunners(null, null, null);
+ List<EventTargetRunner> eventTargetRunners = null;
+ try {
+ eventTargetRunners = eventTargetRunnerRepository.listEventTargetRunners(null, null, null);
+ } catch (Throwable e) {
+ if (e.getMessage().contains("not found")) {
+ return Sets.newHashSet();
+ }
+ }
+ if (eventTargetRunners == null || eventTargetRunners.isEmpty()) {
+ return Sets.newHashSet();
+ }
Set<TargetRunnerConfig> targetRunnerConfigs = Sets.newHashSet();
for (EventTargetRunner eventTargetRunner : eventTargetRunners) {
targetRunnerConfigs.add(new Gson().fromJson(eventTargetRunner.getRunContext(), TargetRunnerConfig.class));
diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java
index 320c69e..1b95789 100644
--- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java
+++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java
@@ -20,7 +20,6 @@
import com.google.gson.Gson;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.eventbridge.adapter.persistence.data.mybatis.dataobject.EventTopicDO;
@@ -93,10 +92,11 @@
@Cacheable(value = "topicCache")
@Override
public String getTopicName(String accountId, String eventBusName) {
- String topicName = eventDataOnRocketMQConnectAPI.buildTopicName(accountId, eventBusName);
- if (StringUtils.isBlank(AppConfig.getGlobalConfig().getDefaultDataPersistentClusterName())) {
- return topicName;
- }
+ return getTopicNameWithOutCache(accountId, eventBusName);
+ }
+
+ @Override public String getTopicNameWithOutCache(String accountId, String eventBusName) {
+ String topicName = null;
EventTopicDO eventTopicDO = eventTopicMapper.getTopic(accountId, eventBusName);
if (eventTopicDO != null) {
topicName = eventTopicDO.getName();
diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
index 56a4550..df9cda7 100644
--- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
+++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
@@ -52,6 +52,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.DependsOn;
import org.springframework.core.io.support.PropertiesLoaderUtils;
import org.springframework.stereotype.Component;
@@ -69,6 +70,7 @@
* RocketMQ implement event subscriber
*/
@Component
+@DependsOn("flyway")
public class RocketMQEventSubscriber extends EventSubscriber {
private static final Logger logger = LoggerFactory.getLogger(RocketMQEventSubscriber.class);
@@ -122,7 +124,7 @@
ArrayList<MessageExt> messages = new ArrayList<>();
messageBuffer.drainTo(messages, pullBatchSize);
if (CollectionUtils.isEmpty(messages)) {
- logger.info("consumer poll message empty.");
+ logger.trace("consumer poll message empty.");
return null;
}
List<ConnectRecord> connectRecords = Lists.newArrayList();
@@ -251,7 +253,7 @@
}
private String getTopicName(SubscribeRunnerKeys subscribeRunnerKeys) {
- return eventDataRepository.getTopicName(subscribeRunnerKeys.getAccountId(), subscribeRunnerKeys.getEventBusName());
+ return eventDataRepository.getTopicNameWithOutCache(subscribeRunnerKeys.getAccountId(), subscribeRunnerKeys.getEventBusName());
}
private String createGroupName(String prefix) {
diff --git a/docs/CreateDingTalkTarget.md b/docs/CreateDingTalkTarget.md
deleted file mode 100644
index 4446433..0000000
--- a/docs/CreateDingTalkTarget.md
+++ /dev/null
@@ -1,86 +0,0 @@
-
-## Create EventBus
-
-```text
-POST /bus/createEventBus HTTP/1.1
-Host: demo.eventbridge.com
-Content-Type: application/json; charset=utf-8
-{
-"eventBusName":"demo-bus",
-"description":"a demo bus."
-}
-```
-
-## Create EventSource
-
-```text
-POST /source/createEventSource HTTP/1.1
-Host: demo.eventbridge.com
-Content-Type: application/json; charset=utf-8
-{
-"eventBusName":"demo-bus",
-"eventSourceName":"demo-source",
-"description":"A demo source."
-}
-```
-
-## Create EventRule
-
-```text
-POST /rule/createEventRule HTTP/1.1
-Host: demo.eventbridge.com
-Content-Type: application/json; charset=utf-8
-{
- "eventBusName":"demo-bus",
- "eventRuleName":"demo-rule",
- "description":"A demo rule.",
- "filterPattern":"{}"
-}
-```
-
-## Create Target
-
-This is a sample with EventBridge target:
-
-```text
-POST /target/createEventTargets HTTP/1.1
-Host: demo.eventbridge.com
-Content-Type: application/json; charset=utf-8
-{
- "eventBusName":"demo-bus",
- "eventRuleName":"demo-rule",
- "eventTargets":[
- {
- "eventTargetName":"eventbridge-target",
- "className":"acs.eventbridge",
- "config":{
- "RegionId":"cn-hangzhou",
- "AliyunEventBus":"rocketmq-eventbridge"
- }
- }
- ]
-}
-```
-
-This is a sample with DingTalk target:
-
-```text
-POST /target/createEventTargets HTTP/1.1
-Host: demo.eventbridge.com
-Content-Type: application/json; charset=utf-8
-{
- "eventBusName":"demo-bus",
- "eventRuleName":"demo-rule",
- "eventTargets":[
- {
- "eventTargetName":"dingtalk-target",
- "className":"acs.dingtalk",
- "config":{
- "WebHook":"https://oapi.dingtalk.com/robot/send?access_token=b43a54b702314415c2acdae97eda1e092528b7a9dddb31510a5b4430be2ef867",
- "SecretKey":"SEC53483bf496b8f9e0b4ab0ab669d422208e6ccfaedfd5120ea6b8426b9ecd47aa",
- "Body":"{\"template\":\"{\\\"text\\\":{\\\"content\\\":\\\"${content}\\\"},\\\"msgtype\\\":\\\"text\\\"}\",\"form\":\"TEMPLATE\",\"value\":\"{\\\"content\\\":\\\"$.data.body\\\"}\"}"
- }
- }
- ]
-}
-```
diff --git a/docs/CreateFileTarget.md b/docs/CreateFileTarget.md
new file mode 100644
index 0000000..53dff84
--- /dev/null
+++ b/docs/CreateFileTarget.md
@@ -0,0 +1,51 @@
+
+
+## Create EventBus
+
+```text
+POST /bus/createEventBus HTTP/1.1
+Host: demo.eventbridge.com
+Content-Type: application/json; charset=utf-8
+{
+"eventBusName":"demo-bus",
+"description":"a demo bus."
+}
+```
+
+## Create EventRule
+
+```text
+POST /rule/createEventRule HTTP/1.1
+Host: demo.eventbridge.com
+Content-Type: application/json; charset=utf-8
+{
+ "eventBusName":"demo-bus",
+ "eventRuleName":"demo-rule",
+ "description":"A demo rule.",
+ "filterPattern":"{}"
+}
+```
+
+## Create Target
+
+This is a sample with EventBridge target:
+
+```text
+POST /target/createEventTargets HTTP/1.1
+Host: demo.eventbridge.com
+Content-Type: application/json; charset=utf-8
+{
+ "eventBusName":"demo-bus",
+ "eventRuleName":"demo-rule",
+ "eventTargets":[
+ {
+ "eventTargetName":"demo-target",
+ "className":"acs.eventbridge",
+ "config":{
+ "fileName":"~/demo",
+ "line":"{ \"form\":\"JSONPATH\", \"value\":\"$.data\"}"
+ }
+ }
+ ]
+}
+```
diff --git a/docs/cn/images/demo.png b/docs/cn/images/demo.png
new file mode 100644
index 0000000..4f9c1a3
--- /dev/null
+++ b/docs/cn/images/demo.png
Binary files differ
diff --git a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/storage/EventDataRepository.java b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/storage/EventDataRepository.java
index 55a412e..550275d 100644
--- a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/storage/EventDataRepository.java
+++ b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/storage/EventDataRepository.java
@@ -59,4 +59,11 @@
*/
String getTopicName(String accountId, String eventBusName);
+ /**
+ * @param accountId
+ * @param eventBusName
+ * @return
+ */
+ String getTopicNameWithOutCache(String accountId, String eventBusName);
+
}
diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/DefaultAuthValidation.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/DefaultAuthValidation.java
index f85517d..5738a75 100644
--- a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/DefaultAuthValidation.java
+++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/DefaultAuthValidation.java
@@ -31,7 +31,7 @@
@Override
public Context validate(ServerHttpRequest request, Context ctx) {
- String resourceOwnerId = "defaultResourceOwnerId";
+ String resourceOwnerId = "default";
List<String> resourceOwnerIds = request.getHeaders().get(HEADER_KEY_RESOURCE_OWNER_ACCOUNT_ID);
if (resourceOwnerIds != null && !resourceOwnerIds.isEmpty()) {
//throw new EventBridgeException(DefaultErrorCode.LoginFailed);
diff --git a/pom.xml b/pom.xml
index 5a89fba..7683fb8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -95,6 +95,7 @@
<jacoco-maven-plugin.version>0.8.5</jacoco-maven-plugin.version>
<maven-surefire-plugin.version>2.19.1</maven-surefire-plugin.version>
<rocketmq.version>5.1.0</rocketmq.version>
+ <flyway.version>8.5.7</flyway.version>
</properties>
<modules>
@@ -148,6 +149,11 @@
<artifactId>rocketmq-eventbridge-infrastructure</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-test-demo</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- Framework -->
<dependency>
@@ -320,6 +326,11 @@
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.flywaydb</groupId>
+ <artifactId>flyway-core</artifactId>
+ <version>${flyway.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
diff --git a/start/pom.xml b/start/pom.xml
index 2bf80b0..e1e1995 100644
--- a/start/pom.xml
+++ b/start/pom.xml
@@ -54,6 +54,10 @@
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-eventbridge-infrastructure</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-test-demo</artifactId>
+ </dependency>
<!-- Framework -->
<dependency>
<groupId>org.springframework.boot</groupId>
diff --git a/start/src/main/java/org/apache/rocketmq/eventbridge/Main.java b/start/src/main/java/org/apache/rocketmq/eventbridge/Main.java
index 05344e6..fe8c642 100644
--- a/start/src/main/java/org/apache/rocketmq/eventbridge/Main.java
+++ b/start/src/main/java/org/apache/rocketmq/eventbridge/Main.java
@@ -18,9 +18,10 @@
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.flyway.FlywayAutoConfiguration;
import org.springframework.cache.annotation.EnableCaching;
-@SpringBootApplication(scanBasePackages = "org.apache.rocketmq.eventbridge.*")
+@SpringBootApplication(scanBasePackages = "org.apache.rocketmq.eventbridge.*",exclude = {FlywayAutoConfiguration.class})
@EnableCaching
public class Main {
public static void main(String[] args) {
diff --git a/start/src/main/resources/application.properties b/start/src/main/resources/application.properties
index dc13acc..5911b30 100644
--- a/start/src/main/resources/application.properties
+++ b/start/src/main/resources/application.properties
@@ -24,7 +24,6 @@
#spring.datasource.hikari.password=xxxxx
mybatis.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl
## flyway
-spring.flyway.placeholderReplacement=false
## rocketmq
rocketmq.namesrvAddr=localhost:9876
@@ -33,10 +32,10 @@
runtime.config.mode=DB
runtime.storage.mode=ROCKETMQ
rumtime.name=eventbridge-runtimer
-runtime.pluginpath=/Users/Local/eventbridge/plugin
+runtime.pluginpath=~/eventbridge/plugin
## log
app.name=rocketmqeventbridge
log.level=INFO
-log.path=/Users/Local/logs
\ No newline at end of file
+log.path=~/logs
\ No newline at end of file
diff --git a/test/demo/pom.xml b/test/demo/pom.xml
new file mode 100644
index 0000000..80b3bf8
--- /dev/null
+++ b/test/demo/pom.xml
@@ -0,0 +1,120 @@
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>rocketmq-eventbridge-test</artifactId>
+ <groupId>org.apache.rocketmq</groupId>
+ <version>1.0.0</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>rocketmq-test-demo</artifactId>
+
+ <properties>
+ <jakarta.version>2.1.6</jakarta.version>
+ <jersey.version>2.34</jersey.version>
+ <reactor.version>3.4.14</reactor.version>
+ <httpcore.version>4.4.9</httpcore.version>
+ </properties>
+
+ <dependencies>
+ <!-- Project modules -->
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-eventbridge-adapter-persistence</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-eventbridge-adapter-rpc</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-eventbridge-adapter-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-eventbridge-domain</artifactId>
+ </dependency>
+ <!-- Framework -->
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-webflux</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-autoconfigure</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>jakarta.ws.rs</groupId>
+ <artifactId>jakarta.ws.rs-api</artifactId>
+ <version>${jakarta.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.core</groupId>
+ <artifactId>jersey-common</artifactId>
+ <version>${jersey.version}</version>
+ </dependency>
+
+ <!-- tools -->
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.cloudevents</groupId>
+ <artifactId>cloudevents-json-jackson</artifactId>
+ <version>${cloudevents.version}</version>
+ </dependency>
+ <dependency>
+ <artifactId>cloudevents-http-basic</artifactId>
+ <groupId>io.cloudevents</groupId>
+ <version>${cloudevents.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.cloudevents</groupId>
+ <artifactId>cloudevents-http-restful-ws</artifactId>
+ <version>${cloudevents.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ <version>${httpcore.version}</version>
+ </dependency>
+ <!-- Test -->
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-test</artifactId>
+ <version>${reactor.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hibernate.validator</groupId>
+ <artifactId>hibernate-validator</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-aop</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/test/demo/src/main/java/org/apache/rocketmq/eventbridge/demo/DefaultDemo.java b/test/demo/src/main/java/org/apache/rocketmq/eventbridge/demo/DefaultDemo.java
new file mode 100644
index 0000000..ce71c7a
--- /dev/null
+++ b/test/demo/src/main/java/org/apache/rocketmq/eventbridge/demo/DefaultDemo.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.demo;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.PostConstruct;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.eventbridge.domain.common.exception.EventBridgeErrorCode;
+import org.apache.rocketmq.eventbridge.domain.model.bus.EventBusService;
+import org.apache.rocketmq.eventbridge.domain.model.rule.EventRuleService;
+import org.apache.rocketmq.eventbridge.domain.model.target.EventTarget;
+import org.apache.rocketmq.eventbridge.domain.model.target.EventTargetService;
+import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.DependsOn;
+import org.springframework.stereotype.Service;
+
+@Service
+@Slf4j
+@DependsOn("flyway")
+public class DefaultDemo {
+
+ @Autowired
+ EventBusService eventBusService;
+
+ @Autowired
+ EventRuleService eventRuleService;
+
+ @Autowired
+ EventTargetService eventTargetService;
+
+ private static final String DEFAULT_ACCOUNT_ID = "default";
+
+ private static final String DEFAULT_EVENT_TOPIC_NAME = "demo-bus";
+
+ private static final String DEFAULT_EVENT_RULE_NAME = "demo-rule";
+
+ private static final String DEFAULT_EVENT_TARGET_NAME = "demo-target";
+
+ private static final String DEFAULT_EVENT_TARGET_CLASS = "file";
+
+ @PostConstruct
+ public void initDemo() {
+ log.info("init demo");
+ initEventBus();
+ initEventRule();
+ intEventTarget();
+
+ }
+
+ private void initEventBus() {
+ try {
+ eventBusService.getEventBus(DEFAULT_ACCOUNT_ID, DEFAULT_EVENT_TOPIC_NAME);
+ } catch (EventBridgeException e) {
+ if (EventBridgeErrorCode.EventBusNotExist.getCode().equals(e.getCode())) {
+ eventBusService.createEventBus(DEFAULT_ACCOUNT_ID, DEFAULT_EVENT_TOPIC_NAME, "A demo bus.");
+ log.info("Create demo eventbus:{}", DEFAULT_EVENT_TOPIC_NAME);
+ }
+ }
+
+ }
+
+ private void initEventRule() {
+ try {
+ eventRuleService.getEventRule(DEFAULT_ACCOUNT_ID, DEFAULT_EVENT_TOPIC_NAME, DEFAULT_EVENT_RULE_NAME);
+ } catch (EventBridgeException e) {
+ if (EventBridgeErrorCode.EventRuleNotExist.getCode().equals(e.getCode())) {
+ eventRuleService.createEventRule(DEFAULT_ACCOUNT_ID, DEFAULT_EVENT_TOPIC_NAME, DEFAULT_EVENT_RULE_NAME, "A demo rule.", "{}");
+ log.info("Create demo event rule:{}", DEFAULT_EVENT_RULE_NAME);
+ }
+ }
+ }
+
+ private void intEventTarget() {
+ List<EventTarget> eventTargets = eventTargetService.listTargets(DEFAULT_ACCOUNT_ID, DEFAULT_EVENT_TOPIC_NAME, DEFAULT_EVENT_RULE_NAME);
+ if (eventTargets == null || eventTargets.isEmpty()) {
+ List<EventTarget> eventTargetList = Lists.newArrayList();
+ Map<String, Object> config = Maps.newHashMap();
+ config.put("fileName", System.getProperty("user.home") + "/demo");
+ config.put("line", "{\"form\":\"JSONPATH\",\"value\":\"$.data\"}");
+ EventTarget eventTarget = EventTarget.builder().name(DEFAULT_EVENT_TARGET_NAME).className(DEFAULT_EVENT_TARGET_CLASS).config(config).build();
+ eventTargetList.add(eventTarget);
+ eventTargetService.createTargets(DEFAULT_ACCOUNT_ID, DEFAULT_EVENT_TOPIC_NAME, DEFAULT_EVENT_RULE_NAME, eventTargetList);
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ApiDestinationDTOControllerTest.java b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ApiDestinationDTOControllerTest.java
new file mode 100644
index 0000000..bdfc870
--- /dev/null
+++ b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ApiDestinationDTOControllerTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.api.controller;
+
+import com.google.common.collect.Lists;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import javax.validation.ConstraintViolation;
+import javax.validation.Validator;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.apidestination.CreateApiDestinationRequest;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.apidestination.CreateApiDestinationResponse;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.apidestination.DeleteApiDestinationRequest;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.apidestination.DeleteApiDestinationResponse;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.apidestination.GetApiDestinationRequest;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.apidestination.GetApiDestinationResponse;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.apidestination.ListApiDestinationsRequest;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.apidestination.ListApiDestinationsResponse;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.apidestination.UpdateApiDestinationRequest;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.apidestination.UpdateApiDestinationResponse;
+import org.apache.rocketmq.eventbridge.domain.common.exception.EventBridgeErrorCode;
+import org.apache.rocketmq.eventbridge.domain.model.PaginationResult;
+import org.apache.rocketmq.eventbridge.domain.model.apidestination.ApiDestinationDTO;
+import org.apache.rocketmq.eventbridge.domain.model.apidestination.ApiDestinationService;
+import org.apache.rocketmq.eventbridge.domain.model.apidestination.parameter.HttpApiParameters;
+import org.apache.rocketmq.eventbridge.domain.rpc.AccountAPI;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+import reactor.core.publisher.Mono;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ApiDestinationDTOControllerTest {
+
+ @InjectMocks
+ private ApiDestinationController apiDestinationController;
+ @Mock
+ private ApiDestinationService apiDestinationService;
+ @Mock
+ private Validator validator;
+ @Mock
+ private AccountAPI accountAPI;
+
+ @Before
+ public void testBefore() {
+ Mockito.when(accountAPI.getResourceOwnerAccountId(any()))
+ .thenReturn(UUID.randomUUID()
+ .toString());
+ }
+
+ @Test
+ public void testCreateApiDestination() {
+ Mockito.when(apiDestinationService.createApiDestination(any()))
+ .thenReturn(UUID.randomUUID()
+ .toString());
+ CreateApiDestinationRequest createApiDestinationRequest = new CreateApiDestinationRequest();
+ createApiDestinationRequest.setApiDestinationName(UUID.randomUUID()
+ .toString());
+ createApiDestinationRequest.setDescription(UUID.randomUUID()
+ .toString());
+ HttpApiParameters httpApiParameters = new HttpApiParameters();
+ httpApiParameters.setEndpoint(UUID.randomUUID()
+ .toString());
+ httpApiParameters.setMethod(UUID.randomUUID()
+ .toString());
+ createApiDestinationRequest.setHttpApiParameters(httpApiParameters);
+ createApiDestinationRequest.setInvocationRateLimitPerSecond(11);
+ final Mono<CreateApiDestinationResponse> apiDestination = apiDestinationController.createApiDestination(
+ createApiDestinationRequest);
+ Assert.assertEquals(apiDestination.block()
+ .getCode(), EventBridgeErrorCode.Success.getCode());
+ }
+
+ @Test
+ public void testUpdateApiDestination() {
+ Set<ConstraintViolation<UpdateApiDestinationRequest>> constraintViolations = new HashSet<>();
+ Mockito.when(validator.validate(any(UpdateApiDestinationRequest.class)))
+ .thenReturn(constraintViolations);
+ Mockito.when(apiDestinationService.updateApiDestination(any()))
+ .thenReturn(Boolean.TRUE);
+ UpdateApiDestinationRequest updateApiDestinationRequest = new UpdateApiDestinationRequest();
+ updateApiDestinationRequest.setApiDestinationName(UUID.randomUUID()
+ .toString());
+ updateApiDestinationRequest.setDescription(UUID.randomUUID()
+ .toString());
+ HttpApiParameters httpApiParameters = new HttpApiParameters();
+ httpApiParameters.setEndpoint(UUID.randomUUID()
+ .toString());
+ httpApiParameters.setMethod(UUID.randomUUID()
+ .toString());
+ updateApiDestinationRequest.setHttpApiParameters(httpApiParameters);
+ updateApiDestinationRequest.setInvocationRateLimitPerSecond(11);
+ final Mono<UpdateApiDestinationResponse> updateApiDestinationResponse
+ = apiDestinationController.updateApiDestination(updateApiDestinationRequest);
+ Assert.assertEquals(updateApiDestinationResponse.block()
+ .getCode(), EventBridgeErrorCode.Success.getCode());
+ }
+
+ @Test
+ public void testGetApiDestination() {
+ Set<ConstraintViolation<GetApiDestinationRequest>> constraintViolations = new HashSet<>();
+ Mockito.when(validator.validate(any(GetApiDestinationRequest.class)))
+ .thenReturn(constraintViolations);
+ ApiDestinationDTO eventApiDestinationDTO = new ApiDestinationDTO();
+ eventApiDestinationDTO.setName(UUID.randomUUID()
+ .toString());
+ eventApiDestinationDTO.setGmtCreate(new Date());
+ Mockito.when(apiDestinationService.getApiDestination(any(), any()))
+ .thenReturn(eventApiDestinationDTO);
+ GetApiDestinationRequest getApiDestinationRequest = new GetApiDestinationRequest();
+ getApiDestinationRequest.setApiDestinationName(UUID.randomUUID()
+ .toString());
+ final Mono<GetApiDestinationResponse> apiDestination = apiDestinationController.getApiDestination(
+ getApiDestinationRequest);
+ Assert.assertEquals(apiDestination.block()
+ .getCode(), EventBridgeErrorCode.Success.getCode());
+ }
+
+ @Test
+ public void testDeleteApiDestination() {
+ Set<ConstraintViolation<DeleteApiDestinationRequest>> constraintViolations = new HashSet<>();
+ Mockito.when(validator.validate(any(DeleteApiDestinationRequest.class)))
+ .thenReturn(constraintViolations);
+ Mockito.when(apiDestinationService.deleteApiDestination(any(), any()))
+ .thenReturn(Boolean.TRUE);
+ DeleteApiDestinationRequest deleteApiDestinationRequest = new DeleteApiDestinationRequest();
+ deleteApiDestinationRequest.setApiDestinationName(UUID.randomUUID()
+ .toString());
+ final Mono<DeleteApiDestinationResponse> deleteApiDestinationResponse
+ = apiDestinationController.deleteApiDestination(deleteApiDestinationRequest);
+ Assert.assertEquals(deleteApiDestinationResponse.block()
+ .getCode(), EventBridgeErrorCode.Success.getCode());
+ }
+
+ @Test
+ public void testListApiDestinations() {
+ Set<ConstraintViolation<ListApiDestinationsRequest>> constraintViolations = new HashSet<>();
+ Mockito.when(validator.validate(any(ListApiDestinationsRequest.class)))
+ .thenReturn(constraintViolations);
+ PaginationResult<List<ApiDestinationDTO>> result = new PaginationResult();
+ List<ApiDestinationDTO> apiDestinationDTOList = Lists.newArrayList();
+ ApiDestinationDTO apiDestinationDTO = new ApiDestinationDTO();
+ apiDestinationDTO.setName(UUID.randomUUID()
+ .toString());
+ apiDestinationDTO.setGmtCreate(new Date());
+ apiDestinationDTOList.add(apiDestinationDTO);
+ result.setData(apiDestinationDTOList);
+ result.setTotal(9);
+ result.setNextToken("0");
+ Mockito.when(apiDestinationService.listApiDestinations(any(), any(), any(), anyInt()))
+ .thenReturn(result);
+ ListApiDestinationsRequest listApiDestinationsRequest = new ListApiDestinationsRequest();
+ listApiDestinationsRequest.setApiDestinationNamePrefix(UUID.randomUUID()
+ .toString());
+ listApiDestinationsRequest.setNextToken("0");
+ listApiDestinationsRequest.setMaxResults(10);
+ final Mono<ListApiDestinationsResponse> listApiDestinationsResponse
+ = apiDestinationController.listApiDestinations(listApiDestinationsRequest);
+ Assert.assertEquals(listApiDestinationsResponse.block()
+ .getCode(), EventBridgeErrorCode.Success.getCode());
+ }
+}
diff --git a/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ConnectionControllerTest.java b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ConnectionControllerTest.java
new file mode 100644
index 0000000..dd2e27f
--- /dev/null
+++ b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ConnectionControllerTest.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.api.controller;
+
+import com.google.common.collect.Lists;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import javax.validation.ConstraintViolation;
+import javax.validation.Validator;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.connection.CreateConnectionRequest;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.connection.CreateConnectionResponse;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.connection.DeleteConnectionRequest;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.connection.DeleteConnectionResponse;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.connection.GetConnectionRequest;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.connection.GetConnectionResponse;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.connection.ListConnectionRequest;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.connection.ListConnectionResponse;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.connection.ListEnumsResponse;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.connection.UpdateConnectionRequest;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.connection.UpdateConnectionResponse;
+import org.apache.rocketmq.eventbridge.domain.common.enums.AuthorizationTypeEnum;
+import org.apache.rocketmq.eventbridge.domain.common.enums.NetworkTypeEnum;
+import org.apache.rocketmq.eventbridge.domain.common.exception.EventBridgeErrorCode;
+import org.apache.rocketmq.eventbridge.domain.model.PaginationResult;
+import org.apache.rocketmq.eventbridge.domain.model.connection.ConnectionDTO;
+import org.apache.rocketmq.eventbridge.domain.model.connection.ConnectionService;
+import org.apache.rocketmq.eventbridge.domain.model.connection.parameter.AuthParameters;
+import org.apache.rocketmq.eventbridge.domain.model.connection.parameter.BasicAuthParameters;
+import org.apache.rocketmq.eventbridge.domain.model.connection.parameter.NetworkParameters;
+import org.apache.rocketmq.eventbridge.domain.rpc.AccountAPI;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.BDDMockito;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+import reactor.core.publisher.Mono;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ConnectionControllerTest {
+
+ @InjectMocks
+ private ConnectionController connectionController;
+ @Mock
+ private ConnectionService connectionService;
+ @Mock
+ private Validator validator;
+ @Mock
+ private AccountAPI accountAPI;
+
+ @Before
+ public void testBefore() throws Exception {
+ Mockito.when(accountAPI.getResourceOwnerAccountId(any()))
+ .thenReturn(UUID.randomUUID()
+ .toString());
+ }
+
+ @Test
+ public void testCreateConnection() {
+ Mockito.when(connectionService.createConnection(any(ConnectionDTO.class)))
+ .thenReturn(UUID.randomUUID()
+ .toString());
+ Set<ConstraintViolation<CreateConnectionRequest>> constraintViolations = new HashSet<>();
+ Mockito.when(validator.validate(any(CreateConnectionRequest.class)))
+ .thenReturn(constraintViolations);
+ CreateConnectionRequest createConnectionRequest = new CreateConnectionRequest();
+ createConnectionRequest.setConnectionName(UUID.randomUUID()
+ .toString());
+ createConnectionRequest.setDescription(UUID.randomUUID()
+ .toString());
+ NetworkParameters networkParameters = new NetworkParameters();
+ networkParameters.setNetworkType(NetworkTypeEnum.PUBLIC_NETWORK.getNetworkType());
+ networkParameters.setSecurityGroupId(UUID.randomUUID()
+ .toString());
+ networkParameters.setVpcId(UUID.randomUUID()
+ .toString());
+ networkParameters.setVswitcheId(UUID.randomUUID()
+ .toString());
+ createConnectionRequest.setNetworkParameters(networkParameters);
+ AuthParameters authParameters = new AuthParameters();
+ BasicAuthParameters basicAuthParameters = new BasicAuthParameters();
+ basicAuthParameters.setPassword(UUID.randomUUID()
+ .toString());
+ basicAuthParameters.setUsername(UUID.randomUUID()
+ .toString());
+ authParameters.setBasicAuthParameters(basicAuthParameters);
+ authParameters.setAuthorizationType(AuthorizationTypeEnum.BASIC_AUTH.getType());
+ createConnectionRequest.setAuthParameters(authParameters);
+ final Mono<CreateConnectionResponse> connection = connectionController.createConnection(
+ createConnectionRequest);
+ Assert.assertEquals(connection.block()
+ .getCode(), EventBridgeErrorCode.Success.getCode());
+ }
+
+ @Test
+ public void testDeleteConnection() {
+ Mockito.doNothing()
+ .when(connectionService)
+ .deleteConnection(anyString(), anyString());
+ Set<ConstraintViolation<DeleteConnectionRequest>> constraintViolations = new HashSet<>();
+ Mockito.when(validator.validate(any(DeleteConnectionRequest.class)))
+ .thenReturn(constraintViolations);
+ DeleteConnectionRequest deleteConnectionRequest = new DeleteConnectionRequest();
+ deleteConnectionRequest.setConnectionName(UUID.randomUUID()
+ .toString());
+ final Mono<DeleteConnectionResponse> deleteConnectionResponse = connectionController.deleteConnection(
+ deleteConnectionRequest);
+ Assert.assertEquals(deleteConnectionResponse.block()
+ .getCode(), EventBridgeErrorCode.Success.getCode());
+ }
+
+ @Test
+ public void testUpdateConnection() {
+ Mockito.doNothing()
+ .when(connectionService)
+ .updateConnection(any(ConnectionDTO.class), anyString());
+ Set<ConstraintViolation<UpdateConnectionRequest>> constraintViolations = new HashSet<>();
+ Mockito.when(validator.validate(any(UpdateConnectionRequest.class)))
+ .thenReturn(constraintViolations);
+ UpdateConnectionRequest updateConnectionRequest = new UpdateConnectionRequest();
+ updateConnectionRequest.setConnectionName(UUID.randomUUID()
+ .toString());
+ updateConnectionRequest.setDescription(UUID.randomUUID()
+ .toString());
+ NetworkParameters networkParameters = new NetworkParameters();
+ networkParameters.setNetworkType(NetworkTypeEnum.PUBLIC_NETWORK.getNetworkType());
+ networkParameters.setSecurityGroupId(UUID.randomUUID()
+ .toString());
+ networkParameters.setVpcId(UUID.randomUUID()
+ .toString());
+ networkParameters.setVswitcheId(UUID.randomUUID()
+ .toString());
+ updateConnectionRequest.setNetworkParameters(networkParameters);
+ AuthParameters authParameters = new AuthParameters();
+ BasicAuthParameters basicAuthParameters = new BasicAuthParameters();
+ basicAuthParameters.setPassword(UUID.randomUUID()
+ .toString());
+ basicAuthParameters.setUsername(UUID.randomUUID()
+ .toString());
+ authParameters.setBasicAuthParameters(basicAuthParameters);
+ authParameters.setAuthorizationType(AuthorizationTypeEnum.BASIC_AUTH.getType());
+ updateConnectionRequest.setAuthParameters(authParameters);
+ final Mono<UpdateConnectionResponse> updateConnectionResponse = connectionController.updateConnection(
+ updateConnectionRequest);
+ Assert.assertEquals(updateConnectionResponse.block()
+ .getCode(), EventBridgeErrorCode.Success.getCode());
+ }
+
+ @Test
+ public void testGetConnection() {
+ Set<ConstraintViolation<GetConnectionRequest>> constraintViolations = new HashSet<>();
+ Mockito.when(validator.validate(any(GetConnectionRequest.class)))
+ .thenReturn(constraintViolations);
+ final ConnectionDTO connectionDTO = new ConnectionDTO();
+ NetworkParameters networkParameters = new NetworkParameters();
+ networkParameters.setNetworkType(NetworkTypeEnum.PUBLIC_NETWORK.getNetworkType());
+ networkParameters.setSecurityGroupId(UUID.randomUUID()
+ .toString());
+ networkParameters.setVpcId(UUID.randomUUID()
+ .toString());
+ networkParameters.setVswitcheId(UUID.randomUUID()
+ .toString());
+ connectionDTO.setNetworkParameters(networkParameters);
+ connectionDTO.setGmtCreate(new Date());
+ List<ConnectionDTO> list = Lists.newArrayList();
+ list.add(connectionDTO);
+ AuthParameters authParameters = new AuthParameters();
+ BasicAuthParameters basicAuthParameters = new BasicAuthParameters();
+ basicAuthParameters.setPassword(UUID.randomUUID()
+ .toString());
+ basicAuthParameters.setUsername(UUID.randomUUID()
+ .toString());
+ authParameters.setBasicAuthParameters(basicAuthParameters);
+ authParameters.setAuthorizationType(AuthorizationTypeEnum.BASIC_AUTH.getType());
+ connectionDTO.setAuthParameters(authParameters);
+ BDDMockito.given(connectionService.getConnection(any(), any()))
+ .willReturn(list);
+ GetConnectionRequest getConnectionRequest = new GetConnectionRequest();
+ getConnectionRequest.setConnectionName(UUID.randomUUID()
+ .toString());
+ final Mono<GetConnectionResponse> getConnectionResponse = connectionController.getConnection(
+ getConnectionRequest);
+ Assert.assertEquals(getConnectionResponse.block()
+ .getCode(), EventBridgeErrorCode.Success.getCode());
+ }
+
+ @Test
+ public void testSelectOneConnection() {
+ Set<ConstraintViolation<GetConnectionRequest>> constraintViolations = new HashSet<>();
+ Mockito.when(validator.validate(any(GetConnectionRequest.class)))
+ .thenReturn(constraintViolations);
+ final ConnectionDTO connectionDTO = new ConnectionDTO();
+ NetworkParameters networkParameters = new NetworkParameters();
+ networkParameters.setNetworkType(NetworkTypeEnum.PUBLIC_NETWORK.getNetworkType());
+ networkParameters.setSecurityGroupId(UUID.randomUUID()
+ .toString());
+ networkParameters.setVpcId(UUID.randomUUID()
+ .toString());
+ networkParameters.setVswitcheId(UUID.randomUUID()
+ .toString());
+ connectionDTO.setNetworkParameters(networkParameters);
+ connectionDTO.setGmtCreate(new Date());
+ List<ConnectionDTO> list = Lists.newArrayList();
+ list.add(connectionDTO);
+ AuthParameters authParameters = new AuthParameters();
+ BasicAuthParameters basicAuthParameters = new BasicAuthParameters();
+ basicAuthParameters.setPassword(UUID.randomUUID()
+ .toString());
+ basicAuthParameters.setUsername(UUID.randomUUID()
+ .toString());
+ authParameters.setBasicAuthParameters(basicAuthParameters);
+ authParameters.setAuthorizationType(AuthorizationTypeEnum.BASIC_AUTH.getType());
+ connectionDTO.setAuthParameters(authParameters);
+ BDDMockito.given(connectionService.getConnection(any(), any()))
+ .willReturn(list);
+ GetConnectionRequest getConnectionRequest = new GetConnectionRequest();
+ getConnectionRequest.setConnectionName(UUID.randomUUID()
+ .toString());
+ final Mono<GetConnectionResponse> getConnectionResponse = connectionController.selectOneConnection(
+ getConnectionRequest);
+ Assert.assertEquals(getConnectionResponse.block()
+ .getCode(), EventBridgeErrorCode.Success.getCode());
+ }
+
+ @Test
+ public void testListConnections() {
+ PaginationResult<List<ConnectionDTO>> result = new PaginationResult();
+ List<ConnectionDTO> eventConnectionWithBLOBs = Lists.newArrayList();
+ ConnectionDTO eventConnection = new ConnectionDTO();
+ eventConnection.setConnectionName(UUID.randomUUID()
+ .toString());
+ eventConnection.setGmtCreate(new Date());
+ eventConnectionWithBLOBs.add(eventConnection);
+ result.setData(eventConnectionWithBLOBs);
+ result.setTotal(9);
+ result.setNextToken("0");
+ Mockito.when(connectionService.listConnections(any(), any(), any(), anyInt()))
+ .thenReturn(result);
+ Set<ConstraintViolation<ListConnectionRequest>> constraintViolations = new HashSet<>();
+ Mockito.when(validator.validate(any(ListConnectionRequest.class)))
+ .thenReturn(constraintViolations);
+ ListConnectionRequest listConnectionRequest = new ListConnectionRequest();
+ listConnectionRequest.setConnectionNamePrefix(UUID.randomUUID()
+ .toString());
+ listConnectionRequest.setNextToken("0");
+ listConnectionRequest.setMaxResults(10);
+ final Mono<ListConnectionResponse> listConnections = connectionController.listConnections(
+ listConnectionRequest);
+ Assert.assertEquals(listConnections.block()
+ .getCode(), EventBridgeErrorCode.Success.getCode());
+ }
+
+ @Test
+ public void testListEnumsResponse() {
+ final Mono<ListEnumsResponse> listEnumsResponse = connectionController.listEnumsResponse();
+ Assert.assertEquals(listEnumsResponse.block()
+ .getNetworkTypeEnums()
+ .size(), NetworkTypeEnum.values().length);
+ }
+}
diff --git a/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventConverterAdapterTest.java b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventConverterAdapterTest.java
new file mode 100644
index 0000000..8e617ef
--- /dev/null
+++ b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventConverterAdapterTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.api.converter;
+
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import io.cloudevents.CloudEvent;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.apache.http.protocol.HTTP.CONTENT_TYPE;
+
+@RunWith(MockitoJUnitRunner.class)
+public class EventConverterAdapterTest {
+ @InjectMocks
+ private EventConverterAdapter eventConverterAdapter;
+
+ @Before
+ public void before() {
+ eventConverterAdapter.getEventConverterList()
+ .add(new CloudEventBatchedConverter());
+ eventConverterAdapter.getEventConverterList()
+ .add(new CloudEventBinaryConverter());
+ eventConverterAdapter.getEventConverterList()
+ .add(new CloudEventStructuredConverter());
+ }
+
+ @Test
+ public void toEventsRequest_Binary() {
+ Map<String, String> headers = Maps.newHashMap();
+ headers.put(CONTENT_TYPE, "application/json");
+ headers.put("ce-id", UUID.randomUUID()
+ .toString());
+ headers.put("ce-source", "demo-source");
+ headers.put("ce-type", URI.create("demo:type")
+ .toString());
+ headers.put("ce-specversion", "1.0");
+ byte[] body = new String("{\n" + "\t\"a\":1,\n" + "\t\"b\":2\n" + "}").getBytes(StandardCharsets.UTF_8);
+ List<CloudEvent> cloudEventList = eventConverterAdapter.toEventsRequest(headers, body);
+ Assert.assertEquals(1, cloudEventList.size());
+ Assert.assertEquals("demo:type", cloudEventList.get(0)
+ .getType());
+ }
+
+ @Test
+ public void toEventsRequest_Structured() {
+ Map<String, String> headers = Maps.newHashMap();
+ headers.put(CONTENT_TYPE, "application/cloudevents+json");
+
+ Map<String, Object> cloudEvent = Maps.newHashMap();
+ cloudEvent.put("id", UUID.randomUUID()
+ .toString());
+ cloudEvent.put("source", "demo-source");
+ cloudEvent.put("type", URI.create("demo:type"));
+ cloudEvent.put("specversion", "1.0");
+ cloudEvent.put("data", "{\n" + "\t\"a\":1,\n" + "\t\"b\":2\n" + "}");
+ List<CloudEvent> cloudEventList = eventConverterAdapter.toEventsRequest(headers, new Gson().toJson(cloudEvent)
+ .getBytes(StandardCharsets.UTF_8));
+
+ Assert.assertEquals(1, cloudEventList.size());
+ Assert.assertEquals("demo:type", cloudEventList.get(0)
+ .getType());
+ }
+
+ @Test
+ public void toEventsRequest_Batched() {
+ Map<String, String> headers = Maps.newHashMap();
+ headers.put(CONTENT_TYPE, "application/cloudevents-batch+json");
+
+ Map<String, Object> cloudEvent1 = Maps.newHashMap();
+ cloudEvent1.put("id", UUID.randomUUID()
+ .toString());
+ cloudEvent1.put("source", "demo-source");
+ cloudEvent1.put("type", URI.create("demo:type"));
+ cloudEvent1.put("specversion", "1.0");
+ cloudEvent1.put("data", "{\n" + "\t\"a\":1,\n" + "\t\"b\":2\n" + "}");
+
+ Map<String, Object> cloudEvent2 = Maps.newHashMap();
+ cloudEvent2.put("id", UUID.randomUUID()
+ .toString());
+ cloudEvent2.put("source", "demo-source");
+ cloudEvent2.put("type", URI.create("demo:type"));
+ cloudEvent2.put("specversion", "1.0");
+ cloudEvent2.put("data", "{\n" + "\t\"a\":1,\n" + "\t\"b\":2\n" + "}");
+
+ List<CloudEvent> cloudEventList = eventConverterAdapter.toEventsRequest(headers,
+ new Gson().toJson(Arrays.asList(cloudEvent1, cloudEvent2))
+ .getBytes(StandardCharsets.UTF_8));
+
+ Assert.assertEquals(2, cloudEventList.size());
+ Assert.assertEquals("demo:type", cloudEventList.get(0)
+ .getType());
+ }
+
+}
\ No newline at end of file
diff --git a/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetConverterTest.java b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetConverterTest.java
new file mode 100644
index 0000000..3b1b45c
--- /dev/null
+++ b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetConverterTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.adapter.api.converter;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.target.DeadLetterQueueDTO;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.target.EventTargetDTO;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.target.RetryStrategyDTO;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.target.RunOptionsDTO;
+import org.apache.rocketmq.eventbridge.enums.ErrorToleranceEnum;
+import org.apache.rocketmq.eventbridge.enums.PushRetryStrategyEnum;
+import org.apache.rocketmq.eventbridge.domain.model.run.DeadLetterQueue;
+import org.apache.rocketmq.eventbridge.domain.model.run.RetryStrategy;
+import org.apache.rocketmq.eventbridge.domain.model.target.EventTarget;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class EventTargetConverterTest {
+
+ @Test
+ public void convertEventTargetRunners() {
+
+ }
+
+ @Test
+ public void convertEventTargetRunner() {
+ EventTargetDTO eventTargetDTO = new EventTargetDTO();
+ eventTargetDTO.setEventTargetName("targetName");
+ Map<String, Object> config = Maps.newHashMap();
+ config.put("url", "http://127.0.0.1:7001/cloudevents");
+ eventTargetDTO.setConfig(config);
+ eventTargetDTO.setClassName("http");
+
+ RunOptionsDTO runOptionsDTO = new RunOptionsDTO();
+ runOptionsDTO.setErrorsTolerance("NONE");
+ runOptionsDTO.setDeadLetterQueue(this.buildDeadLetterQueueDTO());
+ runOptionsDTO.setRetryStrategy(this.buildRetryStrategyDTO());
+ eventTargetDTO.setRunOptions(runOptionsDTO);
+
+ EventTarget eventTarget = EventTargetConverter.convertEventTarget("123456", "bus", "rule", eventTargetDTO);
+
+ Assert.assertEquals(eventTarget.getAccountId(), "123456");
+ Assert.assertEquals(eventTarget.getEventBusName(), "bus");
+ Assert.assertEquals(eventTarget.getEventRuleName(), "rule");
+ Assert.assertEquals(eventTarget.getName(), eventTargetDTO.getEventTargetName());
+ Assert.assertEquals(eventTarget.getClassName(), eventTargetDTO.getClassName());
+ Assert.assertEquals(eventTarget.getConfig(), eventTargetDTO.getConfig());
+ }
+
+ @Test
+ public void convertRetryStrategy() {
+ RetryStrategyDTO retryStrategyDTO = buildRetryStrategyDTO();
+ RetryStrategy retryStrategy = EventTargetConverter.convertRetryStrategy(retryStrategyDTO);
+
+ Assert.assertEquals(PushRetryStrategyEnum.BACKOFF_RETRY, retryStrategy.getPushRetryStrategy());
+ Assert.assertEquals(retryStrategyDTO.getMaximumRetryAttempts(), retryStrategy.getMaximumRetryAttempts());
+ Assert.assertEquals(retryStrategyDTO.getMaximumEventAgeInSeconds(),
+ retryStrategy.getMaximumEventAgeInSeconds());
+ }
+
+ @Test
+ public void convertErrorTolerance() {
+ ErrorToleranceEnum errorToleranceEnum = EventTargetConverter.convertErrorTolerance("NONE");
+ Assert.assertEquals(ErrorToleranceEnum.NONE, errorToleranceEnum);
+ }
+
+ @Test
+ public void convertDeadLetterQueue() {
+ DeadLetterQueueDTO deadLetterQueueDTO = this.buildDeadLetterQueueDTO();
+ DeadLetterQueue deadLetterQueue = EventTargetConverter.convertDeadLetterQueue(deadLetterQueueDTO);
+ Assert.assertEquals(deadLetterQueue.getType(), deadLetterQueueDTO.getType());
+ Assert.assertEquals(deadLetterQueue.getConfig(), deadLetterQueueDTO.getConfig());
+ }
+
+ private RetryStrategyDTO buildRetryStrategyDTO() {
+ RetryStrategyDTO retryStrategyDTO = new RetryStrategyDTO();
+ retryStrategyDTO.setPushRetryStrategy("BACKOFF_RETRY");
+ retryStrategyDTO.setMaximumRetryAttempts(3);
+ retryStrategyDTO.setMaximumEventAgeInSeconds(4);
+ return retryStrategyDTO;
+ }
+
+ private DeadLetterQueueDTO buildDeadLetterQueueDTO() {
+ DeadLetterQueueDTO deadLetterQueueDTO = new DeadLetterQueueDTO();
+ deadLetterQueueDTO.setType("rocketmq");
+ Map<String, Object> config = Maps.newHashMap();
+ config.put("topic", "demo");
+ config.put("nameSrv", "127.0.01:9876");
+ deadLetterQueueDTO.setConfig(config);
+ return deadLetterQueueDTO;
+ }
+}
\ No newline at end of file
diff --git a/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetDTOConverterTest.java b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetDTOConverterTest.java
new file mode 100644
index 0000000..79445e6
--- /dev/null
+++ b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetDTOConverterTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.api.converter;
+
+import java.util.Collections;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.target.EventTargetDTO;
+import org.apache.rocketmq.eventbridge.enums.ErrorToleranceEnum;
+import org.apache.rocketmq.eventbridge.enums.PushRetryStrategyEnum;
+import org.apache.rocketmq.eventbridge.domain.model.run.DeadLetterQueue;
+import org.apache.rocketmq.eventbridge.domain.model.run.RetryStrategy;
+import org.apache.rocketmq.eventbridge.domain.model.run.RunOptions;
+import org.apache.rocketmq.eventbridge.domain.model.target.EventTarget;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class EventTargetDTOConverterTest {
+
+ @Test
+ public void convert() {
+ RetryStrategy retryStrategy = RetryStrategy.builder()
+ .pushRetryStrategy(PushRetryStrategyEnum.BACKOFF_RETRY)
+ .maximumRetryAttempts(3)
+ .maximumEventAgeInSeconds(4)
+ .build();
+ DeadLetterQueue deadLetterQueue = DeadLetterQueue.builder()
+ .type("rocketmq")
+ .config(Collections.singletonMap("topic", "demo"))
+ .build();
+ RunOptions runOptions = RunOptions.builder()
+ .errorsTolerance(ErrorToleranceEnum.ALL)
+ .retryStrategy(retryStrategy)
+ .deadLetterQueue(deadLetterQueue)
+ .build();
+ EventTarget eventTarget = EventTarget.builder()
+ .eventBusName("bus")
+ .eventRuleName("rule")
+ .name("target")
+ .config(Collections.singletonMap("url", "http://127.0.0.1:7002/cloudevent"))
+ .className("http")
+ .runOptions(runOptions)
+ .build();
+
+ EventTargetDTO eventTargetDTO = EventTargetDTOConverter.convert(eventTarget);
+ Assert.assertEquals(eventTarget.getName(), eventTargetDTO.getEventTargetName());
+ Assert.assertEquals(eventTarget.getClassName(), eventTargetDTO.getClassName());
+ Assert.assertEquals(eventTarget.getConfig(), eventTargetDTO.getConfig());
+
+ Assert.assertEquals(eventTarget.getRunOptions()
+ .getErrorsTolerance()
+ .toString(), eventTargetDTO.getRunOptions()
+ .getErrorsTolerance());
+
+ Assert.assertEquals(eventTarget.getRunOptions()
+ .getRetryStrategy()
+ .getPushRetryStrategy()
+ .toString(), eventTargetDTO.getRunOptions()
+ .getRetryStrategy()
+ .getPushRetryStrategy());
+ Assert.assertEquals(eventTarget.getRunOptions()
+ .getRetryStrategy()
+ .getMaximumRetryAttempts(), eventTargetDTO.getRunOptions()
+ .getRetryStrategy()
+ .getMaximumRetryAttempts());
+ Assert.assertEquals(eventTarget.getRunOptions()
+ .getRetryStrategy()
+ .getMaximumEventAgeInSeconds(), eventTargetDTO.getRunOptions()
+ .getRetryStrategy()
+ .getMaximumEventAgeInSeconds());
+
+ Assert.assertEquals(eventTarget.getRunOptions()
+ .getDeadLetterQueue()
+ .getType(), eventTargetDTO.getRunOptions()
+ .getDeadLetterQueue()
+ .getType());
+ Assert.assertEquals(eventTarget.getRunOptions()
+ .getDeadLetterQueue()
+ .getConfig(), eventTargetDTO.getRunOptions()
+ .getDeadLetterQueue()
+ .getConfig());
+ }
+}
\ No newline at end of file
diff --git a/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/handler/EventDataHandlerTest.java b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/handler/EventDataHandlerTest.java
new file mode 100644
index 0000000..fe5b718
--- /dev/null
+++ b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/handler/EventDataHandlerTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.api.handler;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import lombok.SneakyThrows;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.data.PutEventsResponse;
+import org.apache.rocketmq.eventbridge.domain.model.data.EventDataService;
+import org.apache.rocketmq.eventbridge.domain.model.data.PutEventCallback;
+import org.apache.rocketmq.eventbridge.domain.model.data.PutEventsResponseEntry;
+import org.apache.rocketmq.eventbridge.event.EventBridgeEvent;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+import reactor.core.publisher.Mono;
+
+import static org.mockito.ArgumentMatchers.any;
+
+@RunWith(MockitoJUnitRunner.class)
+public class EventDataHandlerTest {
+ @InjectMocks
+ private EventDataHandler eventDataHandler;
+
+ @Mock
+ EventDataService eventDataService;
+
+ ExecutorService executor = Executors.newFixedThreadPool(10);
+
+ @Before
+ public void before() {
+ Mockito.doAnswer((invocation) -> {
+ Object[] args = invocation.getArguments();
+ EventBridgeEvent event = (EventBridgeEvent) args[1];
+ ReactorPutEventCallback callback = (ReactorPutEventCallback) args[2];
+ executor.submit(new PutEventTestThread(event, callback));
+ return null;
+ })
+ .when(eventDataService)
+ .putEvent(any(), any(), any());
+ }
+
+ @Test
+ public void testPutEvents() {
+ Long startTime = System.currentTimeMillis();
+ List<EventBridgeEvent> eventList = IntStream.range(0, 10)
+ .mapToObj(index -> {
+ EventBridgeEvent event = new EventBridgeEvent();
+ event.setId(UUID.randomUUID()
+ .toString());
+ return event;
+ })
+ .collect(Collectors.toList());
+ Mono<PutEventsResponse> mono = eventDataHandler.putEvents("123456", eventList);
+ PutEventsResponse putEventsResponse = mono.block();
+ Long costTime = System.currentTimeMillis() - startTime;
+ Assert.assertEquals(10, putEventsResponse.getEntryList()
+ .size());
+ System.out.println("costTime:" + costTime);
+ Assert.assertEquals(true, costTime < 4000);
+ }
+
+ class PutEventTestThread implements Runnable {
+ EventBridgeEvent event;
+ PutEventCallback putEventCallback;
+
+ public PutEventTestThread(EventBridgeEvent event, PutEventCallback putEventCallback) {
+ this.event = event;
+ this.putEventCallback = putEventCallback;
+ }
+
+ @SneakyThrows
+ @Override
+ public void run() {
+ Thread.sleep(3000L);
+ PutEventsResponseEntry putEventsResponseEntry = new PutEventsResponseEntry();
+ putEventsResponseEntry.setEventId(event.getId());
+ if (System.currentTimeMillis() % 2 == 1) {
+ putEventsResponseEntry.setErrorCode("Success");
+ } else {
+ putEventsResponseEntry.setErrorCode("Failed.");
+ }
+
+ putEventCallback.endProcess(putEventsResponseEntry);
+ }
+ }
+
+}
diff --git a/test/pom.xml b/test/pom.xml
new file mode 100644
index 0000000..12f4a80
--- /dev/null
+++ b/test/pom.xml
@@ -0,0 +1,28 @@
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>rocketmq-eventbridge</artifactId>
+ <groupId>org.apache.rocketmq</groupId>
+ <version>1.0.0</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>rocketmq-eventbridge-test</artifactId>
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>demo</module>
+ </modules>
+
+</project>
\ No newline at end of file