feat:add runtime demo.
diff --git a/README.md b/README.md
index 505be7d..6dac851 100644
--- a/README.md
+++ b/README.md
@@ -88,13 +88,13 @@
 -H "ce-datacontenttype:application/json"  \
 -H "ce-time:2018-04-05T17:31:00Z"  \
 -H "ce-aliyuneventbusname:demo-bus"  \
--d 'test'
+-d 'A test recrod.'
 ```
 
 * Check if the local file received a write event
 
-In addition, by default, the system will create a demo rule for you to subscribe and push to the file. You can check whether there are events received in the directory:~/demo.eventbridge
+In addition, by default, the system will create a demo rule for you to subscribe and push to the file. You can check whether there are events received in the directory:~/demo
+![img.png](docs/cn/images/demo.png)
 
-![img.png](img.png)
-
->> 链接:为什么输出:test,...
+Why does the file output the data attribute of CloudEvent instead of other attributes?This is because the configuration in the demo rule is to output "$.data" in CloudEvent to the file line.
+You can refer to this [document](docs/CreateFileTarget.md)  to configure and modify event targets.
diff --git a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventBusController.java b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventBusController.java
index 8f8388c..79c4a21 100644
--- a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventBusController.java
+++ b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventBusController.java
@@ -95,7 +95,7 @@
                 paginationResult.getData()
                     .forEach(eventBus -> {
                         EventBusDTO eventBusDTO = new EventBusDTO();
-                        eventBusDTO.setEventBusName(eventBusDTO.getEventBusName());
+                        eventBusDTO.setEventBusName(eventBus.getName());
                         eventBusDTO.setDescription(eventBus.getDescription());
                         eventBuses.add(eventBusDTO);
                     });
diff --git a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/dto/bus/ListEventBusesRequest.java b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/dto/bus/ListEventBusesRequest.java
index 48b0932..d1590fe 100644
--- a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/dto/bus/ListEventBusesRequest.java
+++ b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/dto/bus/ListEventBusesRequest.java
@@ -25,7 +25,7 @@
 public class ListEventBusesRequest extends BaseRequest {
 
     @SerializedName("NextToken")
-    private String nextToken;
+    private String nextToken = "0";
 
     @SerializedName("MaxResults")
     private int maxResults;
diff --git a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/FlywayConfig.java b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/FlywayConfig.java
new file mode 100644
index 0000000..5ed64d3
--- /dev/null
+++ b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/FlywayConfig.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.persistence;
+
+import javax.sql.DataSource;
+import org.flywaydb.core.Flyway;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class FlywayConfig {
+
+    @Autowired
+    private DataSource dataSource;
+
+    @Bean("flyway")
+    public Flyway Flyway() {
+        Flyway flyway = Flyway.configure()
+            .dataSource(dataSource)
+            .cleanDisabled(Boolean.TRUE)
+            .createSchemas(Boolean.TRUE)
+            .validateMigrationNaming(Boolean.TRUE)
+            .baselineOnMigrate(Boolean.TRUE)
+            .placeholderReplacement(Boolean.FALSE)
+            .load();
+        flyway.migrate();
+        return flyway;
+    }
+}
\ No newline at end of file
diff --git a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java
index 0c92aa4..97bfb88 100644
--- a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java
+++ b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java
@@ -74,9 +74,6 @@
 
     @Override
     public List<EventTargetRunner> listEventTargetRunners(String accountId, String eventBusName, String eventRuleName) {
-        if (StringUtils.isBlank(accountId) || StringUtils.isBlank(eventBusName) || StringUtils.isBlank(eventRuleName)) {
-            return Lists.newArrayListWithCapacity(0);
-        }
         List<EventTargetRunnerDO> eventTargetRunnerDOS = eventTargetRunnerMapper.listEventTargetRunners(accountId, eventBusName, eventRuleName);
         if (eventTargetRunnerDOS == null || eventTargetRunnerDOS.isEmpty()) {
             return Lists.newArrayListWithCapacity(0);
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
index d7e41fc..79dfd00 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
@@ -25,10 +25,9 @@
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import javax.annotation.PostConstruct;
-
 import org.apache.commons.collections.MapUtils;
-import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.OffsetManager;
 import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.CirculatorContext;
+import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.OffsetManager;
 import org.apache.rocketmq.eventbridge.adapter.runtime.boot.transfer.TransformEngine;
 import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread;
 import org.apache.rocketmq.eventbridge.adapter.runtime.error.ErrorHandler;
@@ -68,27 +67,29 @@
 
     @Override
     public void run() {
+        List<ConnectRecord> afterTransformConnect= Lists.newArrayList();
         while (!stopped) {
-            Map<String, List<ConnectRecord>> eventRecordMap = circulatorContext.takeEventRecords(batchSize);
-            if(MapUtils.isEmpty(eventRecordMap)){
-                logger.info("listen eventRecords is empty, continue by curTime - {}", System.currentTimeMillis());
-                this.waitForRunning(1000);
-                continue;
-            }
-            Map<String, TransformEngine<ConnectRecord>> latestTransformMap = circulatorContext.getTaskTransformMap();
-            if (MapUtils.isEmpty(latestTransformMap)) {
-                logger.warn("latest transform engine is empty, continue by curTime - {}", System.currentTimeMillis());
-                this.waitForRunning(3000);
-                continue;
-            }
+            try {
+                Map<String, List<ConnectRecord>> eventRecordMap = circulatorContext.takeEventRecords(batchSize);
+                if (MapUtils.isEmpty(eventRecordMap)) {
+                    logger.trace("listen eventRecords is empty, continue by curTime - {}", System.currentTimeMillis());
+                    this.waitForRunning(1000);
+                    continue;
+                }
+                Map<String, TransformEngine<ConnectRecord>> latestTransformMap = circulatorContext.getTaskTransformMap();
+                if (MapUtils.isEmpty(latestTransformMap)) {
+                    logger.warn("latest transform engine is empty, continue by curTime - {}", System.currentTimeMillis());
+                    this.waitForRunning(3000);
+                    continue;
+                }
 
-            List<ConnectRecord> afterTransformConnect = Lists.newArrayList();
-            List<CompletableFuture<Void>> completableFutures = Lists.newArrayList();
-            for(String runnerName: eventRecordMap.keySet()){
-                TransformEngine<ConnectRecord> curTransformEngine = latestTransformMap.get(runnerName);
-                List<ConnectRecord> curEventRecords = eventRecordMap.get(runnerName);
-                curEventRecords.forEach(pullRecord -> {
-                    CompletableFuture<Void> transformFuture = CompletableFuture.supplyAsync(() -> curTransformEngine.doTransforms(pullRecord))
+                afterTransformConnect.clear();
+                List<CompletableFuture<Void>> completableFutures = Lists.newArrayList();
+                for (String runnerName : eventRecordMap.keySet()) {
+                    TransformEngine<ConnectRecord> curTransformEngine = latestTransformMap.get(runnerName);
+                    List<ConnectRecord> curEventRecords = eventRecordMap.get(runnerName);
+                    curEventRecords.forEach(pullRecord -> {
+                        CompletableFuture<Void> transformFuture = CompletableFuture.supplyAsync(() -> curTransformEngine.doTransforms(pullRecord))
                             .exceptionally((exception) -> {
                                 logger.error("transfer do transform event record failed,stackTrace-", exception);
                                 errorHandler.handle(pullRecord, exception);
@@ -101,11 +102,9 @@
                                     offsetManager.commit(pullRecord);
                                 }
                             });
-                    completableFutures.add(transformFuture);
-                });
-            }
-
-            try {
+                        completableFutures.add(transformFuture);
+                    });
+                }
                 CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[eventRecordMap.values().size()])).get();
                 circulatorContext.offerTargetTaskQueue(afterTransformConnect);
                 logger.info("offer target task queues succeed, transforms - {}", JSON.toJSONString(afterTransformConnect));
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
index 3f61b10..85dae3b 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
@@ -60,7 +60,7 @@
         while (!stopped) {
             Map<String, List<ConnectRecord>> targetRecordMap = circulatorContext.takeTargetRecords(batchSize);
             if (MapUtils.isEmpty(targetRecordMap)) {
-                logger.info("current target pusher is empty");
+                logger.trace("current target pusher is empty");
                 this.waitForRunning(1000);
                 continue;
             }
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnDBObserver.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnDBObserver.java
index f018ce1..0e34f6a 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnDBObserver.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnDBObserver.java
@@ -48,7 +48,8 @@
     @Autowired
     EventTargetRepository eventTargetRepository;
 
-    public TargetRunnerConfigOnDBObserver(EventTargetRunnerRepository eventTargetRunnerRepository, EventTargetRepository eventTargetRepository) {
+    public TargetRunnerConfigOnDBObserver(EventTargetRunnerRepository eventTargetRunnerRepository,
+        EventTargetRepository eventTargetRepository) {
         this.eventTargetRunnerRepository = eventTargetRunnerRepository;
         this.eventTargetRepository = eventTargetRepository;
     }
@@ -56,7 +57,17 @@
     @Override
     @Transactional
     public Set<TargetRunnerConfig> getLatestTargetRunnerConfig() {
-        List<EventTargetRunner> eventTargetRunners = eventTargetRunnerRepository.listEventTargetRunners(null, null, null);
+        List<EventTargetRunner> eventTargetRunners = null;
+        try {
+            eventTargetRunners = eventTargetRunnerRepository.listEventTargetRunners(null, null, null);
+        } catch (Throwable e) {
+            if (e.getMessage().contains("not found")) {
+                return Sets.newHashSet();
+            }
+        }
+        if (eventTargetRunners == null || eventTargetRunners.isEmpty()) {
+            return Sets.newHashSet();
+        }
         Set<TargetRunnerConfig> targetRunnerConfigs = Sets.newHashSet();
         for (EventTargetRunner eventTargetRunner : eventTargetRunners) {
             targetRunnerConfigs.add(new Gson().fromJson(eventTargetRunner.getRunContext(), TargetRunnerConfig.class));
diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java
index 320c69e..1b95789 100644
--- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java
+++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java
@@ -20,7 +20,6 @@
 import com.google.gson.Gson;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.eventbridge.adapter.persistence.data.mybatis.dataobject.EventTopicDO;
@@ -93,10 +92,11 @@
     @Cacheable(value = "topicCache")
     @Override
     public String getTopicName(String accountId, String eventBusName) {
-        String topicName = eventDataOnRocketMQConnectAPI.buildTopicName(accountId, eventBusName);
-        if (StringUtils.isBlank(AppConfig.getGlobalConfig().getDefaultDataPersistentClusterName())) {
-            return topicName;
-        }
+        return getTopicNameWithOutCache(accountId, eventBusName);
+    }
+
+    @Override public String getTopicNameWithOutCache(String accountId, String eventBusName) {
+        String topicName = null;
         EventTopicDO eventTopicDO = eventTopicMapper.getTopic(accountId, eventBusName);
         if (eventTopicDO != null) {
             topicName = eventTopicDO.getName();
diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
index 56a4550..df9cda7 100644
--- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
+++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
@@ -52,6 +52,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.DependsOn;
 import org.springframework.core.io.support.PropertiesLoaderUtils;
 import org.springframework.stereotype.Component;
 
@@ -69,6 +70,7 @@
  * RocketMQ implement event subscriber
  */
 @Component
+@DependsOn("flyway")
 public class RocketMQEventSubscriber extends EventSubscriber {
 
     private static final Logger logger = LoggerFactory.getLogger(RocketMQEventSubscriber.class);
@@ -122,7 +124,7 @@
         ArrayList<MessageExt> messages = new ArrayList<>();
         messageBuffer.drainTo(messages, pullBatchSize);
         if (CollectionUtils.isEmpty(messages)) {
-            logger.info("consumer poll message empty.");
+            logger.trace("consumer poll message empty.");
             return null;
         }
         List<ConnectRecord> connectRecords = Lists.newArrayList();
@@ -251,7 +253,7 @@
     }
 
     private String getTopicName(SubscribeRunnerKeys subscribeRunnerKeys) {
-        return eventDataRepository.getTopicName(subscribeRunnerKeys.getAccountId(), subscribeRunnerKeys.getEventBusName());
+        return eventDataRepository.getTopicNameWithOutCache(subscribeRunnerKeys.getAccountId(), subscribeRunnerKeys.getEventBusName());
     }
 
     private String createGroupName(String prefix) {
diff --git a/docs/CreateDingTalkTarget.md b/docs/CreateDingTalkTarget.md
deleted file mode 100644
index 4446433..0000000
--- a/docs/CreateDingTalkTarget.md
+++ /dev/null
@@ -1,86 +0,0 @@
-
-## Create EventBus
-
-```text
-POST /bus/createEventBus HTTP/1.1
-Host: demo.eventbridge.com
-Content-Type: application/json; charset=utf-8
-{
-"eventBusName":"demo-bus",
-"description":"a demo bus."
-}
-```
-
-## Create EventSource
-
-```text
-POST /source/createEventSource HTTP/1.1
-Host: demo.eventbridge.com
-Content-Type: application/json; charset=utf-8
-{
-"eventBusName":"demo-bus",
-"eventSourceName":"demo-source",
-"description":"A demo source."
-}
-```
-
-## Create EventRule
-
-```text
-POST /rule/createEventRule HTTP/1.1
-Host: demo.eventbridge.com
-Content-Type: application/json; charset=utf-8
-{
-  "eventBusName":"demo-bus",
-  "eventRuleName":"demo-rule",
-  "description":"A demo rule.",
-  "filterPattern":"{}"
-}
-```
-
-## Create Target
-
-This is a sample with EventBridge target:
-
-```text
-POST /target/createEventTargets HTTP/1.1
-Host: demo.eventbridge.com
-Content-Type: application/json; charset=utf-8
-{
-    "eventBusName":"demo-bus",
-    "eventRuleName":"demo-rule",
-    "eventTargets":[
-            {
-            "eventTargetName":"eventbridge-target",
-            "className":"acs.eventbridge",
-                "config":{
-                "RegionId":"cn-hangzhou",
-                "AliyunEventBus":"rocketmq-eventbridge"
-                }
-            }
-        ]
-}
-```
-
-This is a sample with DingTalk target:
-
-```text
-POST /target/createEventTargets HTTP/1.1
-Host: demo.eventbridge.com
-Content-Type: application/json; charset=utf-8
-{
-    "eventBusName":"demo-bus",
-    "eventRuleName":"demo-rule",
-    "eventTargets":[
-        {
-            "eventTargetName":"dingtalk-target",
-            "className":"acs.dingtalk",
-            "config":{
-            "WebHook":"https://oapi.dingtalk.com/robot/send?access_token=b43a54b702314415c2acdae97eda1e092528b7a9dddb31510a5b4430be2ef867",
-            "SecretKey":"SEC53483bf496b8f9e0b4ab0ab669d422208e6ccfaedfd5120ea6b8426b9ecd47aa",
-            "Body":"{\"template\":\"{\\\"text\\\":{\\\"content\\\":\\\"${content}\\\"},\\\"msgtype\\\":\\\"text\\\"}\",\"form\":\"TEMPLATE\",\"value\":\"{\\\"content\\\":\\\"$.data.body\\\"}\"}"
-            }
-        }
-    ]
-}
-```
diff --git a/docs/CreateFileTarget.md b/docs/CreateFileTarget.md
new file mode 100644
index 0000000..53dff84
--- /dev/null
+++ b/docs/CreateFileTarget.md
@@ -0,0 +1,51 @@
+
+
+## Create EventBus
+
+```text
+POST /bus/createEventBus HTTP/1.1
+Host: demo.eventbridge.com
+Content-Type: application/json; charset=utf-8
+{
+"eventBusName":"demo-bus",
+"description":"a demo bus."
+}
+```
+
+## Create EventRule
+
+```text
+POST /rule/createEventRule HTTP/1.1
+Host: demo.eventbridge.com
+Content-Type: application/json; charset=utf-8
+{
+  "eventBusName":"demo-bus",
+  "eventRuleName":"demo-rule",
+  "description":"A demo rule.",
+  "filterPattern":"{}"
+}
+```
+
+## Create Target
+
+This is a sample with EventBridge target:
+
+```text
+POST /target/createEventTargets HTTP/1.1
+Host: demo.eventbridge.com
+Content-Type: application/json; charset=utf-8
+{
+    "eventBusName":"demo-bus",
+    "eventRuleName":"demo-rule",
+    "eventTargets":[
+            {
+            "eventTargetName":"demo-target",
+            "className":"acs.eventbridge",
+                "config":{
+                "fileName":"~/demo",
+                "line":"{    \"form\":\"JSONPATH\",    \"value\":\"$.data\"}"
+                }
+            }
+        ]
+}
+```
diff --git a/docs/cn/images/demo.png b/docs/cn/images/demo.png
new file mode 100644
index 0000000..4f9c1a3
--- /dev/null
+++ b/docs/cn/images/demo.png
Binary files differ
diff --git a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/storage/EventDataRepository.java b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/storage/EventDataRepository.java
index 55a412e..550275d 100644
--- a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/storage/EventDataRepository.java
+++ b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/storage/EventDataRepository.java
@@ -59,4 +59,11 @@
      */
     String getTopicName(String accountId, String eventBusName);
 
+    /**
+     * @param accountId
+     * @param eventBusName
+     * @return
+     */
+    String getTopicNameWithOutCache(String accountId, String eventBusName);
+
 }
diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/DefaultAuthValidation.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/DefaultAuthValidation.java
index f85517d..5738a75 100644
--- a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/DefaultAuthValidation.java
+++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/DefaultAuthValidation.java
@@ -31,7 +31,7 @@
 
     @Override
     public Context validate(ServerHttpRequest request, Context ctx) {
-        String resourceOwnerId = "defaultResourceOwnerId";
+        String resourceOwnerId = "default";
         List<String> resourceOwnerIds = request.getHeaders().get(HEADER_KEY_RESOURCE_OWNER_ACCOUNT_ID);
         if (resourceOwnerIds != null && !resourceOwnerIds.isEmpty()) {
             //throw new EventBridgeException(DefaultErrorCode.LoginFailed);
diff --git a/pom.xml b/pom.xml
index 5a89fba..7683fb8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -95,6 +95,7 @@
         <jacoco-maven-plugin.version>0.8.5</jacoco-maven-plugin.version>
         <maven-surefire-plugin.version>2.19.1</maven-surefire-plugin.version>
         <rocketmq.version>5.1.0</rocketmq.version>
+        <flyway.version>8.5.7</flyway.version>
     </properties>
 
     <modules>
@@ -148,6 +149,11 @@
                 <artifactId>rocketmq-eventbridge-infrastructure</artifactId>
                 <version>${project.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.rocketmq</groupId>
+                <artifactId>rocketmq-test-demo</artifactId>
+                <version>${project.version}</version>
+            </dependency>
 
             <!-- Framework -->
             <dependency>
@@ -320,6 +326,11 @@
                 <version>${mockito.version}</version>
                 <scope>test</scope>
             </dependency>
+            <dependency>
+                <groupId>org.flywaydb</groupId>
+                <artifactId>flyway-core</artifactId>
+                <version>${flyway.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 
diff --git a/start/pom.xml b/start/pom.xml
index 2bf80b0..e1e1995 100644
--- a/start/pom.xml
+++ b/start/pom.xml
@@ -54,6 +54,10 @@
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-eventbridge-infrastructure</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-test-demo</artifactId>
+        </dependency>
         <!-- Framework -->
         <dependency>
             <groupId>org.springframework.boot</groupId>
diff --git a/start/src/main/java/org/apache/rocketmq/eventbridge/Main.java b/start/src/main/java/org/apache/rocketmq/eventbridge/Main.java
index 05344e6..fe8c642 100644
--- a/start/src/main/java/org/apache/rocketmq/eventbridge/Main.java
+++ b/start/src/main/java/org/apache/rocketmq/eventbridge/Main.java
@@ -18,9 +18,10 @@
 
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.flyway.FlywayAutoConfiguration;
 import org.springframework.cache.annotation.EnableCaching;
 
-@SpringBootApplication(scanBasePackages = "org.apache.rocketmq.eventbridge.*")
+@SpringBootApplication(scanBasePackages = "org.apache.rocketmq.eventbridge.*",exclude = {FlywayAutoConfiguration.class})
 @EnableCaching
 public class Main {
     public static void main(String[] args) {
diff --git a/start/src/main/resources/application.properties b/start/src/main/resources/application.properties
index dc13acc..5911b30 100644
--- a/start/src/main/resources/application.properties
+++ b/start/src/main/resources/application.properties
@@ -24,7 +24,6 @@
 #spring.datasource.hikari.password=xxxxx
 mybatis.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl
 ## flyway
-spring.flyway.placeholderReplacement=false
 ## rocketmq
 rocketmq.namesrvAddr=localhost:9876
 
@@ -33,10 +32,10 @@
 runtime.config.mode=DB
 runtime.storage.mode=ROCKETMQ
 rumtime.name=eventbridge-runtimer
-runtime.pluginpath=/Users/Local/eventbridge/plugin
+runtime.pluginpath=~/eventbridge/plugin
 
 
 ## log
 app.name=rocketmqeventbridge
 log.level=INFO
-log.path=/Users/Local/logs
\ No newline at end of file
+log.path=~/logs
\ No newline at end of file
diff --git a/test/demo/pom.xml b/test/demo/pom.xml
new file mode 100644
index 0000000..80b3bf8
--- /dev/null
+++ b/test/demo/pom.xml
@@ -0,0 +1,120 @@
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+	license agreements. See the NOTICE file distributed with this work for additional
+	information regarding copyright ownership. The ASF licenses this file to
+	You under the Apache License, Version 2.0 (the "License"); you may not use
+	this file except in compliance with the License. You may obtain a copy of
+	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+	by applicable law or agreed to in writing, software distributed under the
+	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+	OF ANY KIND, either express or implied. See the License for the specific
+	language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>rocketmq-eventbridge-test</artifactId>
+        <groupId>org.apache.rocketmq</groupId>
+        <version>1.0.0</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>rocketmq-test-demo</artifactId>
+
+    <properties>
+        <jakarta.version>2.1.6</jakarta.version>
+        <jersey.version>2.34</jersey.version>
+        <reactor.version>3.4.14</reactor.version>
+        <httpcore.version>4.4.9</httpcore.version>
+    </properties>
+
+    <dependencies>
+        <!-- Project modules -->
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-eventbridge-adapter-persistence</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-eventbridge-adapter-rpc</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-eventbridge-adapter-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-eventbridge-domain</artifactId>
+        </dependency>
+        <!-- Framework -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-webflux</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-autoconfigure</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>jakarta.ws.rs</groupId>
+            <artifactId>jakarta.ws.rs-api</artifactId>
+            <version>${jakarta.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish.jersey.core</groupId>
+            <artifactId>jersey-common</artifactId>
+            <version>${jersey.version}</version>
+        </dependency>
+
+        <!-- tools -->
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.cloudevents</groupId>
+            <artifactId>cloudevents-json-jackson</artifactId>
+            <version>${cloudevents.version}</version>
+        </dependency>
+        <dependency>
+            <artifactId>cloudevents-http-basic</artifactId>
+            <groupId>io.cloudevents</groupId>
+            <version>${cloudevents.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.cloudevents</groupId>
+            <artifactId>cloudevents-http-restful-ws</artifactId>
+            <version>${cloudevents.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpcore</artifactId>
+            <version>${httpcore.version}</version>
+        </dependency>
+        <!-- Test -->
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-test</artifactId>
+            <version>${reactor.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.hibernate.validator</groupId>
+            <artifactId>hibernate-validator</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-aop</artifactId>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/test/demo/src/main/java/org/apache/rocketmq/eventbridge/demo/DefaultDemo.java b/test/demo/src/main/java/org/apache/rocketmq/eventbridge/demo/DefaultDemo.java
new file mode 100644
index 0000000..ce71c7a
--- /dev/null
+++ b/test/demo/src/main/java/org/apache/rocketmq/eventbridge/demo/DefaultDemo.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.demo;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.PostConstruct;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.eventbridge.domain.common.exception.EventBridgeErrorCode;
+import org.apache.rocketmq.eventbridge.domain.model.bus.EventBusService;
+import org.apache.rocketmq.eventbridge.domain.model.rule.EventRuleService;
+import org.apache.rocketmq.eventbridge.domain.model.target.EventTarget;
+import org.apache.rocketmq.eventbridge.domain.model.target.EventTargetService;
+import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.DependsOn;
+import org.springframework.stereotype.Service;
+
+@Service
+@Slf4j
+@DependsOn("flyway")
+public class DefaultDemo {
+
+    @Autowired
+    EventBusService eventBusService;
+
+    @Autowired
+    EventRuleService eventRuleService;
+
+    @Autowired
+    EventTargetService eventTargetService;
+
+    private static final String DEFAULT_ACCOUNT_ID = "default";
+
+    private static final String DEFAULT_EVENT_TOPIC_NAME = "demo-bus";
+
+    private static final String DEFAULT_EVENT_RULE_NAME = "demo-rule";
+
+    private static final String DEFAULT_EVENT_TARGET_NAME = "demo-target";
+
+    private static final String DEFAULT_EVENT_TARGET_CLASS = "file";
+
+    @PostConstruct
+    public void initDemo() {
+        log.info("init demo");
+        initEventBus();
+        initEventRule();
+        intEventTarget();
+
+    }
+
+    private void initEventBus() {
+        try {
+            eventBusService.getEventBus(DEFAULT_ACCOUNT_ID, DEFAULT_EVENT_TOPIC_NAME);
+        } catch (EventBridgeException e) {
+            if (EventBridgeErrorCode.EventBusNotExist.getCode().equals(e.getCode())) {
+                eventBusService.createEventBus(DEFAULT_ACCOUNT_ID, DEFAULT_EVENT_TOPIC_NAME, "A demo bus.");
+                log.info("Create demo eventbus:{}", DEFAULT_EVENT_TOPIC_NAME);
+            }
+        }
+
+    }
+
+    private void initEventRule() {
+        try {
+            eventRuleService.getEventRule(DEFAULT_ACCOUNT_ID, DEFAULT_EVENT_TOPIC_NAME, DEFAULT_EVENT_RULE_NAME);
+        } catch (EventBridgeException e) {
+            if (EventBridgeErrorCode.EventRuleNotExist.getCode().equals(e.getCode())) {
+                eventRuleService.createEventRule(DEFAULT_ACCOUNT_ID, DEFAULT_EVENT_TOPIC_NAME, DEFAULT_EVENT_RULE_NAME, "A demo rule.", "{}");
+                log.info("Create demo event rule:{}", DEFAULT_EVENT_RULE_NAME);
+            }
+        }
+    }
+
+    private void intEventTarget() {
+        List<EventTarget> eventTargets = eventTargetService.listTargets(DEFAULT_ACCOUNT_ID, DEFAULT_EVENT_TOPIC_NAME, DEFAULT_EVENT_RULE_NAME);
+        if (eventTargets == null || eventTargets.isEmpty()) {
+            List<EventTarget> eventTargetList = Lists.newArrayList();
+            Map<String, Object> config = Maps.newHashMap();
+            config.put("fileName", System.getProperty("user.home") + "/demo");
+            config.put("line", "{\"form\":\"JSONPATH\",\"value\":\"$.data\"}");
+            EventTarget eventTarget = EventTarget.builder().name(DEFAULT_EVENT_TARGET_NAME).className(DEFAULT_EVENT_TARGET_CLASS).config(config).build();
+            eventTargetList.add(eventTarget);
+            eventTargetService.createTargets(DEFAULT_ACCOUNT_ID, DEFAULT_EVENT_TOPIC_NAME, DEFAULT_EVENT_RULE_NAME, eventTargetList);
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ApiDestinationDTOControllerTest.java b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ApiDestinationDTOControllerTest.java
new file mode 100644
index 0000000..bdfc870
--- /dev/null
+++ b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ApiDestinationDTOControllerTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.api.controller;
+
+import com.google.common.collect.Lists;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import javax.validation.ConstraintViolation;
+import javax.validation.Validator;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.apidestination.CreateApiDestinationRequest;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.apidestination.CreateApiDestinationResponse;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.apidestination.DeleteApiDestinationRequest;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.apidestination.DeleteApiDestinationResponse;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.apidestination.GetApiDestinationRequest;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.apidestination.GetApiDestinationResponse;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.apidestination.ListApiDestinationsRequest;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.apidestination.ListApiDestinationsResponse;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.apidestination.UpdateApiDestinationRequest;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.apidestination.UpdateApiDestinationResponse;
+import org.apache.rocketmq.eventbridge.domain.common.exception.EventBridgeErrorCode;
+import org.apache.rocketmq.eventbridge.domain.model.PaginationResult;
+import org.apache.rocketmq.eventbridge.domain.model.apidestination.ApiDestinationDTO;
+import org.apache.rocketmq.eventbridge.domain.model.apidestination.ApiDestinationService;
+import org.apache.rocketmq.eventbridge.domain.model.apidestination.parameter.HttpApiParameters;
+import org.apache.rocketmq.eventbridge.domain.rpc.AccountAPI;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+import reactor.core.publisher.Mono;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ApiDestinationDTOControllerTest {
+
+    @InjectMocks
+    private ApiDestinationController apiDestinationController;
+    @Mock
+    private ApiDestinationService apiDestinationService;
+    @Mock
+    private Validator validator;
+    @Mock
+    private AccountAPI accountAPI;
+
+    @Before
+    public void testBefore() {
+        Mockito.when(accountAPI.getResourceOwnerAccountId(any()))
+            .thenReturn(UUID.randomUUID()
+                .toString());
+    }
+
+    @Test
+    public void testCreateApiDestination() {
+        Mockito.when(apiDestinationService.createApiDestination(any()))
+            .thenReturn(UUID.randomUUID()
+                .toString());
+        CreateApiDestinationRequest createApiDestinationRequest = new CreateApiDestinationRequest();
+        createApiDestinationRequest.setApiDestinationName(UUID.randomUUID()
+            .toString());
+        createApiDestinationRequest.setDescription(UUID.randomUUID()
+            .toString());
+        HttpApiParameters httpApiParameters = new HttpApiParameters();
+        httpApiParameters.setEndpoint(UUID.randomUUID()
+            .toString());
+        httpApiParameters.setMethod(UUID.randomUUID()
+            .toString());
+        createApiDestinationRequest.setHttpApiParameters(httpApiParameters);
+        createApiDestinationRequest.setInvocationRateLimitPerSecond(11);
+        final Mono<CreateApiDestinationResponse> apiDestination = apiDestinationController.createApiDestination(
+            createApiDestinationRequest);
+        Assert.assertEquals(apiDestination.block()
+            .getCode(), EventBridgeErrorCode.Success.getCode());
+    }
+
+    @Test
+    public void testUpdateApiDestination() {
+        Set<ConstraintViolation<UpdateApiDestinationRequest>> constraintViolations = new HashSet<>();
+        Mockito.when(validator.validate(any(UpdateApiDestinationRequest.class)))
+            .thenReturn(constraintViolations);
+        Mockito.when(apiDestinationService.updateApiDestination(any()))
+            .thenReturn(Boolean.TRUE);
+        UpdateApiDestinationRequest updateApiDestinationRequest = new UpdateApiDestinationRequest();
+        updateApiDestinationRequest.setApiDestinationName(UUID.randomUUID()
+            .toString());
+        updateApiDestinationRequest.setDescription(UUID.randomUUID()
+            .toString());
+        HttpApiParameters httpApiParameters = new HttpApiParameters();
+        httpApiParameters.setEndpoint(UUID.randomUUID()
+            .toString());
+        httpApiParameters.setMethod(UUID.randomUUID()
+            .toString());
+        updateApiDestinationRequest.setHttpApiParameters(httpApiParameters);
+        updateApiDestinationRequest.setInvocationRateLimitPerSecond(11);
+        final Mono<UpdateApiDestinationResponse> updateApiDestinationResponse
+            = apiDestinationController.updateApiDestination(updateApiDestinationRequest);
+        Assert.assertEquals(updateApiDestinationResponse.block()
+            .getCode(), EventBridgeErrorCode.Success.getCode());
+    }
+
+    @Test
+    public void testGetApiDestination() {
+        Set<ConstraintViolation<GetApiDestinationRequest>> constraintViolations = new HashSet<>();
+        Mockito.when(validator.validate(any(GetApiDestinationRequest.class)))
+            .thenReturn(constraintViolations);
+        ApiDestinationDTO eventApiDestinationDTO = new ApiDestinationDTO();
+        eventApiDestinationDTO.setName(UUID.randomUUID()
+            .toString());
+        eventApiDestinationDTO.setGmtCreate(new Date());
+        Mockito.when(apiDestinationService.getApiDestination(any(), any()))
+            .thenReturn(eventApiDestinationDTO);
+        GetApiDestinationRequest getApiDestinationRequest = new GetApiDestinationRequest();
+        getApiDestinationRequest.setApiDestinationName(UUID.randomUUID()
+            .toString());
+        final Mono<GetApiDestinationResponse> apiDestination = apiDestinationController.getApiDestination(
+            getApiDestinationRequest);
+        Assert.assertEquals(apiDestination.block()
+            .getCode(), EventBridgeErrorCode.Success.getCode());
+    }
+
+    @Test
+    public void testDeleteApiDestination() {
+        Set<ConstraintViolation<DeleteApiDestinationRequest>> constraintViolations = new HashSet<>();
+        Mockito.when(validator.validate(any(DeleteApiDestinationRequest.class)))
+            .thenReturn(constraintViolations);
+        Mockito.when(apiDestinationService.deleteApiDestination(any(), any()))
+            .thenReturn(Boolean.TRUE);
+        DeleteApiDestinationRequest deleteApiDestinationRequest = new DeleteApiDestinationRequest();
+        deleteApiDestinationRequest.setApiDestinationName(UUID.randomUUID()
+            .toString());
+        final Mono<DeleteApiDestinationResponse> deleteApiDestinationResponse
+            = apiDestinationController.deleteApiDestination(deleteApiDestinationRequest);
+        Assert.assertEquals(deleteApiDestinationResponse.block()
+            .getCode(), EventBridgeErrorCode.Success.getCode());
+    }
+
+    @Test
+    public void testListApiDestinations() {
+        Set<ConstraintViolation<ListApiDestinationsRequest>> constraintViolations = new HashSet<>();
+        Mockito.when(validator.validate(any(ListApiDestinationsRequest.class)))
+            .thenReturn(constraintViolations);
+        PaginationResult<List<ApiDestinationDTO>> result = new PaginationResult();
+        List<ApiDestinationDTO> apiDestinationDTOList = Lists.newArrayList();
+        ApiDestinationDTO apiDestinationDTO = new ApiDestinationDTO();
+        apiDestinationDTO.setName(UUID.randomUUID()
+            .toString());
+        apiDestinationDTO.setGmtCreate(new Date());
+        apiDestinationDTOList.add(apiDestinationDTO);
+        result.setData(apiDestinationDTOList);
+        result.setTotal(9);
+        result.setNextToken("0");
+        Mockito.when(apiDestinationService.listApiDestinations(any(), any(), any(), anyInt()))
+            .thenReturn(result);
+        ListApiDestinationsRequest listApiDestinationsRequest = new ListApiDestinationsRequest();
+        listApiDestinationsRequest.setApiDestinationNamePrefix(UUID.randomUUID()
+            .toString());
+        listApiDestinationsRequest.setNextToken("0");
+        listApiDestinationsRequest.setMaxResults(10);
+        final Mono<ListApiDestinationsResponse> listApiDestinationsResponse
+            = apiDestinationController.listApiDestinations(listApiDestinationsRequest);
+        Assert.assertEquals(listApiDestinationsResponse.block()
+            .getCode(), EventBridgeErrorCode.Success.getCode());
+    }
+}
diff --git a/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ConnectionControllerTest.java b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ConnectionControllerTest.java
new file mode 100644
index 0000000..dd2e27f
--- /dev/null
+++ b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ConnectionControllerTest.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.api.controller;
+
+import com.google.common.collect.Lists;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import javax.validation.ConstraintViolation;
+import javax.validation.Validator;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.connection.CreateConnectionRequest;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.connection.CreateConnectionResponse;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.connection.DeleteConnectionRequest;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.connection.DeleteConnectionResponse;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.connection.GetConnectionRequest;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.connection.GetConnectionResponse;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.connection.ListConnectionRequest;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.connection.ListConnectionResponse;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.connection.ListEnumsResponse;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.connection.UpdateConnectionRequest;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.connection.UpdateConnectionResponse;
+import org.apache.rocketmq.eventbridge.domain.common.enums.AuthorizationTypeEnum;
+import org.apache.rocketmq.eventbridge.domain.common.enums.NetworkTypeEnum;
+import org.apache.rocketmq.eventbridge.domain.common.exception.EventBridgeErrorCode;
+import org.apache.rocketmq.eventbridge.domain.model.PaginationResult;
+import org.apache.rocketmq.eventbridge.domain.model.connection.ConnectionDTO;
+import org.apache.rocketmq.eventbridge.domain.model.connection.ConnectionService;
+import org.apache.rocketmq.eventbridge.domain.model.connection.parameter.AuthParameters;
+import org.apache.rocketmq.eventbridge.domain.model.connection.parameter.BasicAuthParameters;
+import org.apache.rocketmq.eventbridge.domain.model.connection.parameter.NetworkParameters;
+import org.apache.rocketmq.eventbridge.domain.rpc.AccountAPI;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.BDDMockito;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+import reactor.core.publisher.Mono;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ConnectionControllerTest {
+
+    @InjectMocks
+    private ConnectionController connectionController;
+    @Mock
+    private ConnectionService connectionService;
+    @Mock
+    private Validator validator;
+    @Mock
+    private AccountAPI accountAPI;
+
+    @Before
+    public void testBefore() throws Exception {
+        Mockito.when(accountAPI.getResourceOwnerAccountId(any()))
+            .thenReturn(UUID.randomUUID()
+                .toString());
+    }
+
+    @Test
+    public void testCreateConnection() {
+        Mockito.when(connectionService.createConnection(any(ConnectionDTO.class)))
+            .thenReturn(UUID.randomUUID()
+                .toString());
+        Set<ConstraintViolation<CreateConnectionRequest>> constraintViolations = new HashSet<>();
+        Mockito.when(validator.validate(any(CreateConnectionRequest.class)))
+            .thenReturn(constraintViolations);
+        CreateConnectionRequest createConnectionRequest = new CreateConnectionRequest();
+        createConnectionRequest.setConnectionName(UUID.randomUUID()
+            .toString());
+        createConnectionRequest.setDescription(UUID.randomUUID()
+            .toString());
+        NetworkParameters networkParameters = new NetworkParameters();
+        networkParameters.setNetworkType(NetworkTypeEnum.PUBLIC_NETWORK.getNetworkType());
+        networkParameters.setSecurityGroupId(UUID.randomUUID()
+            .toString());
+        networkParameters.setVpcId(UUID.randomUUID()
+            .toString());
+        networkParameters.setVswitcheId(UUID.randomUUID()
+            .toString());
+        createConnectionRequest.setNetworkParameters(networkParameters);
+        AuthParameters authParameters = new AuthParameters();
+        BasicAuthParameters basicAuthParameters = new BasicAuthParameters();
+        basicAuthParameters.setPassword(UUID.randomUUID()
+            .toString());
+        basicAuthParameters.setUsername(UUID.randomUUID()
+            .toString());
+        authParameters.setBasicAuthParameters(basicAuthParameters);
+        authParameters.setAuthorizationType(AuthorizationTypeEnum.BASIC_AUTH.getType());
+        createConnectionRequest.setAuthParameters(authParameters);
+        final Mono<CreateConnectionResponse> connection = connectionController.createConnection(
+            createConnectionRequest);
+        Assert.assertEquals(connection.block()
+            .getCode(), EventBridgeErrorCode.Success.getCode());
+    }
+
+    @Test
+    public void testDeleteConnection() {
+        Mockito.doNothing()
+            .when(connectionService)
+            .deleteConnection(anyString(), anyString());
+        Set<ConstraintViolation<DeleteConnectionRequest>> constraintViolations = new HashSet<>();
+        Mockito.when(validator.validate(any(DeleteConnectionRequest.class)))
+            .thenReturn(constraintViolations);
+        DeleteConnectionRequest deleteConnectionRequest = new DeleteConnectionRequest();
+        deleteConnectionRequest.setConnectionName(UUID.randomUUID()
+            .toString());
+        final Mono<DeleteConnectionResponse> deleteConnectionResponse = connectionController.deleteConnection(
+            deleteConnectionRequest);
+        Assert.assertEquals(deleteConnectionResponse.block()
+            .getCode(), EventBridgeErrorCode.Success.getCode());
+    }
+
+    @Test
+    public void testUpdateConnection() {
+        Mockito.doNothing()
+            .when(connectionService)
+            .updateConnection(any(ConnectionDTO.class), anyString());
+        Set<ConstraintViolation<UpdateConnectionRequest>> constraintViolations = new HashSet<>();
+        Mockito.when(validator.validate(any(UpdateConnectionRequest.class)))
+            .thenReturn(constraintViolations);
+        UpdateConnectionRequest updateConnectionRequest = new UpdateConnectionRequest();
+        updateConnectionRequest.setConnectionName(UUID.randomUUID()
+            .toString());
+        updateConnectionRequest.setDescription(UUID.randomUUID()
+            .toString());
+        NetworkParameters networkParameters = new NetworkParameters();
+        networkParameters.setNetworkType(NetworkTypeEnum.PUBLIC_NETWORK.getNetworkType());
+        networkParameters.setSecurityGroupId(UUID.randomUUID()
+            .toString());
+        networkParameters.setVpcId(UUID.randomUUID()
+            .toString());
+        networkParameters.setVswitcheId(UUID.randomUUID()
+            .toString());
+        updateConnectionRequest.setNetworkParameters(networkParameters);
+        AuthParameters authParameters = new AuthParameters();
+        BasicAuthParameters basicAuthParameters = new BasicAuthParameters();
+        basicAuthParameters.setPassword(UUID.randomUUID()
+            .toString());
+        basicAuthParameters.setUsername(UUID.randomUUID()
+            .toString());
+        authParameters.setBasicAuthParameters(basicAuthParameters);
+        authParameters.setAuthorizationType(AuthorizationTypeEnum.BASIC_AUTH.getType());
+        updateConnectionRequest.setAuthParameters(authParameters);
+        final Mono<UpdateConnectionResponse> updateConnectionResponse = connectionController.updateConnection(
+            updateConnectionRequest);
+        Assert.assertEquals(updateConnectionResponse.block()
+            .getCode(), EventBridgeErrorCode.Success.getCode());
+    }
+
+    @Test
+    public void testGetConnection() {
+        Set<ConstraintViolation<GetConnectionRequest>> constraintViolations = new HashSet<>();
+        Mockito.when(validator.validate(any(GetConnectionRequest.class)))
+            .thenReturn(constraintViolations);
+        final ConnectionDTO connectionDTO = new ConnectionDTO();
+        NetworkParameters networkParameters = new NetworkParameters();
+        networkParameters.setNetworkType(NetworkTypeEnum.PUBLIC_NETWORK.getNetworkType());
+        networkParameters.setSecurityGroupId(UUID.randomUUID()
+            .toString());
+        networkParameters.setVpcId(UUID.randomUUID()
+            .toString());
+        networkParameters.setVswitcheId(UUID.randomUUID()
+            .toString());
+        connectionDTO.setNetworkParameters(networkParameters);
+        connectionDTO.setGmtCreate(new Date());
+        List<ConnectionDTO> list = Lists.newArrayList();
+        list.add(connectionDTO);
+        AuthParameters authParameters = new AuthParameters();
+        BasicAuthParameters basicAuthParameters = new BasicAuthParameters();
+        basicAuthParameters.setPassword(UUID.randomUUID()
+            .toString());
+        basicAuthParameters.setUsername(UUID.randomUUID()
+            .toString());
+        authParameters.setBasicAuthParameters(basicAuthParameters);
+        authParameters.setAuthorizationType(AuthorizationTypeEnum.BASIC_AUTH.getType());
+        connectionDTO.setAuthParameters(authParameters);
+        BDDMockito.given(connectionService.getConnection(any(), any()))
+            .willReturn(list);
+        GetConnectionRequest getConnectionRequest = new GetConnectionRequest();
+        getConnectionRequest.setConnectionName(UUID.randomUUID()
+            .toString());
+        final Mono<GetConnectionResponse> getConnectionResponse = connectionController.getConnection(
+            getConnectionRequest);
+        Assert.assertEquals(getConnectionResponse.block()
+            .getCode(), EventBridgeErrorCode.Success.getCode());
+    }
+
+    @Test
+    public void testSelectOneConnection() {
+        Set<ConstraintViolation<GetConnectionRequest>> constraintViolations = new HashSet<>();
+        Mockito.when(validator.validate(any(GetConnectionRequest.class)))
+            .thenReturn(constraintViolations);
+        final ConnectionDTO connectionDTO = new ConnectionDTO();
+        NetworkParameters networkParameters = new NetworkParameters();
+        networkParameters.setNetworkType(NetworkTypeEnum.PUBLIC_NETWORK.getNetworkType());
+        networkParameters.setSecurityGroupId(UUID.randomUUID()
+            .toString());
+        networkParameters.setVpcId(UUID.randomUUID()
+            .toString());
+        networkParameters.setVswitcheId(UUID.randomUUID()
+            .toString());
+        connectionDTO.setNetworkParameters(networkParameters);
+        connectionDTO.setGmtCreate(new Date());
+        List<ConnectionDTO> list = Lists.newArrayList();
+        list.add(connectionDTO);
+        AuthParameters authParameters = new AuthParameters();
+        BasicAuthParameters basicAuthParameters = new BasicAuthParameters();
+        basicAuthParameters.setPassword(UUID.randomUUID()
+            .toString());
+        basicAuthParameters.setUsername(UUID.randomUUID()
+            .toString());
+        authParameters.setBasicAuthParameters(basicAuthParameters);
+        authParameters.setAuthorizationType(AuthorizationTypeEnum.BASIC_AUTH.getType());
+        connectionDTO.setAuthParameters(authParameters);
+        BDDMockito.given(connectionService.getConnection(any(), any()))
+            .willReturn(list);
+        GetConnectionRequest getConnectionRequest = new GetConnectionRequest();
+        getConnectionRequest.setConnectionName(UUID.randomUUID()
+            .toString());
+        final Mono<GetConnectionResponse> getConnectionResponse = connectionController.selectOneConnection(
+            getConnectionRequest);
+        Assert.assertEquals(getConnectionResponse.block()
+            .getCode(), EventBridgeErrorCode.Success.getCode());
+    }
+
+    @Test
+    public void testListConnections() {
+        PaginationResult<List<ConnectionDTO>> result = new PaginationResult();
+        List<ConnectionDTO> eventConnectionWithBLOBs = Lists.newArrayList();
+        ConnectionDTO eventConnection = new ConnectionDTO();
+        eventConnection.setConnectionName(UUID.randomUUID()
+            .toString());
+        eventConnection.setGmtCreate(new Date());
+        eventConnectionWithBLOBs.add(eventConnection);
+        result.setData(eventConnectionWithBLOBs);
+        result.setTotal(9);
+        result.setNextToken("0");
+        Mockito.when(connectionService.listConnections(any(), any(), any(), anyInt()))
+            .thenReturn(result);
+        Set<ConstraintViolation<ListConnectionRequest>> constraintViolations = new HashSet<>();
+        Mockito.when(validator.validate(any(ListConnectionRequest.class)))
+            .thenReturn(constraintViolations);
+        ListConnectionRequest listConnectionRequest = new ListConnectionRequest();
+        listConnectionRequest.setConnectionNamePrefix(UUID.randomUUID()
+            .toString());
+        listConnectionRequest.setNextToken("0");
+        listConnectionRequest.setMaxResults(10);
+        final Mono<ListConnectionResponse> listConnections = connectionController.listConnections(
+            listConnectionRequest);
+        Assert.assertEquals(listConnections.block()
+            .getCode(), EventBridgeErrorCode.Success.getCode());
+    }
+
+    @Test
+    public void testListEnumsResponse() {
+        final Mono<ListEnumsResponse> listEnumsResponse = connectionController.listEnumsResponse();
+        Assert.assertEquals(listEnumsResponse.block()
+            .getNetworkTypeEnums()
+            .size(), NetworkTypeEnum.values().length);
+    }
+}
diff --git a/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventConverterAdapterTest.java b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventConverterAdapterTest.java
new file mode 100644
index 0000000..8e617ef
--- /dev/null
+++ b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventConverterAdapterTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.api.converter;
+
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import io.cloudevents.CloudEvent;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.apache.http.protocol.HTTP.CONTENT_TYPE;
+
+@RunWith(MockitoJUnitRunner.class)
+public class EventConverterAdapterTest {
+    @InjectMocks
+    private EventConverterAdapter eventConverterAdapter;
+
+    @Before
+    public void before() {
+        eventConverterAdapter.getEventConverterList()
+            .add(new CloudEventBatchedConverter());
+        eventConverterAdapter.getEventConverterList()
+            .add(new CloudEventBinaryConverter());
+        eventConverterAdapter.getEventConverterList()
+            .add(new CloudEventStructuredConverter());
+    }
+
+    @Test
+    public void toEventsRequest_Binary() {
+        Map<String, String> headers = Maps.newHashMap();
+        headers.put(CONTENT_TYPE, "application/json");
+        headers.put("ce-id", UUID.randomUUID()
+            .toString());
+        headers.put("ce-source", "demo-source");
+        headers.put("ce-type", URI.create("demo:type")
+            .toString());
+        headers.put("ce-specversion", "1.0");
+        byte[] body = new String("{\n" + "\t\"a\":1,\n" + "\t\"b\":2\n" + "}").getBytes(StandardCharsets.UTF_8);
+        List<CloudEvent> cloudEventList = eventConverterAdapter.toEventsRequest(headers, body);
+        Assert.assertEquals(1, cloudEventList.size());
+        Assert.assertEquals("demo:type", cloudEventList.get(0)
+            .getType());
+    }
+
+    @Test
+    public void toEventsRequest_Structured() {
+        Map<String, String> headers = Maps.newHashMap();
+        headers.put(CONTENT_TYPE, "application/cloudevents+json");
+
+        Map<String, Object> cloudEvent = Maps.newHashMap();
+        cloudEvent.put("id", UUID.randomUUID()
+            .toString());
+        cloudEvent.put("source", "demo-source");
+        cloudEvent.put("type", URI.create("demo:type"));
+        cloudEvent.put("specversion", "1.0");
+        cloudEvent.put("data", "{\n" + "\t\"a\":1,\n" + "\t\"b\":2\n" + "}");
+        List<CloudEvent> cloudEventList = eventConverterAdapter.toEventsRequest(headers, new Gson().toJson(cloudEvent)
+            .getBytes(StandardCharsets.UTF_8));
+
+        Assert.assertEquals(1, cloudEventList.size());
+        Assert.assertEquals("demo:type", cloudEventList.get(0)
+            .getType());
+    }
+
+    @Test
+    public void toEventsRequest_Batched() {
+        Map<String, String> headers = Maps.newHashMap();
+        headers.put(CONTENT_TYPE, "application/cloudevents-batch+json");
+
+        Map<String, Object> cloudEvent1 = Maps.newHashMap();
+        cloudEvent1.put("id", UUID.randomUUID()
+            .toString());
+        cloudEvent1.put("source", "demo-source");
+        cloudEvent1.put("type", URI.create("demo:type"));
+        cloudEvent1.put("specversion", "1.0");
+        cloudEvent1.put("data", "{\n" + "\t\"a\":1,\n" + "\t\"b\":2\n" + "}");
+
+        Map<String, Object> cloudEvent2 = Maps.newHashMap();
+        cloudEvent2.put("id", UUID.randomUUID()
+            .toString());
+        cloudEvent2.put("source", "demo-source");
+        cloudEvent2.put("type", URI.create("demo:type"));
+        cloudEvent2.put("specversion", "1.0");
+        cloudEvent2.put("data", "{\n" + "\t\"a\":1,\n" + "\t\"b\":2\n" + "}");
+
+        List<CloudEvent> cloudEventList = eventConverterAdapter.toEventsRequest(headers,
+            new Gson().toJson(Arrays.asList(cloudEvent1, cloudEvent2))
+                .getBytes(StandardCharsets.UTF_8));
+
+        Assert.assertEquals(2, cloudEventList.size());
+        Assert.assertEquals("demo:type", cloudEventList.get(0)
+            .getType());
+    }
+
+}
\ No newline at end of file
diff --git a/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetConverterTest.java b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetConverterTest.java
new file mode 100644
index 0000000..3b1b45c
--- /dev/null
+++ b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetConverterTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.adapter.api.converter;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.target.DeadLetterQueueDTO;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.target.EventTargetDTO;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.target.RetryStrategyDTO;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.target.RunOptionsDTO;
+import org.apache.rocketmq.eventbridge.enums.ErrorToleranceEnum;
+import org.apache.rocketmq.eventbridge.enums.PushRetryStrategyEnum;
+import org.apache.rocketmq.eventbridge.domain.model.run.DeadLetterQueue;
+import org.apache.rocketmq.eventbridge.domain.model.run.RetryStrategy;
+import org.apache.rocketmq.eventbridge.domain.model.target.EventTarget;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class EventTargetConverterTest {
+
+    @Test
+    public void convertEventTargetRunners() {
+
+    }
+
+    @Test
+    public void convertEventTargetRunner() {
+        EventTargetDTO eventTargetDTO = new EventTargetDTO();
+        eventTargetDTO.setEventTargetName("targetName");
+        Map<String, Object> config = Maps.newHashMap();
+        config.put("url", "http://127.0.0.1:7001/cloudevents");
+        eventTargetDTO.setConfig(config);
+        eventTargetDTO.setClassName("http");
+
+        RunOptionsDTO runOptionsDTO = new RunOptionsDTO();
+        runOptionsDTO.setErrorsTolerance("NONE");
+        runOptionsDTO.setDeadLetterQueue(this.buildDeadLetterQueueDTO());
+        runOptionsDTO.setRetryStrategy(this.buildRetryStrategyDTO());
+        eventTargetDTO.setRunOptions(runOptionsDTO);
+
+        EventTarget eventTarget = EventTargetConverter.convertEventTarget("123456", "bus", "rule", eventTargetDTO);
+
+        Assert.assertEquals(eventTarget.getAccountId(), "123456");
+        Assert.assertEquals(eventTarget.getEventBusName(), "bus");
+        Assert.assertEquals(eventTarget.getEventRuleName(), "rule");
+        Assert.assertEquals(eventTarget.getName(), eventTargetDTO.getEventTargetName());
+        Assert.assertEquals(eventTarget.getClassName(), eventTargetDTO.getClassName());
+        Assert.assertEquals(eventTarget.getConfig(), eventTargetDTO.getConfig());
+    }
+
+    @Test
+    public void convertRetryStrategy() {
+        RetryStrategyDTO retryStrategyDTO = buildRetryStrategyDTO();
+        RetryStrategy retryStrategy = EventTargetConverter.convertRetryStrategy(retryStrategyDTO);
+
+        Assert.assertEquals(PushRetryStrategyEnum.BACKOFF_RETRY, retryStrategy.getPushRetryStrategy());
+        Assert.assertEquals(retryStrategyDTO.getMaximumRetryAttempts(), retryStrategy.getMaximumRetryAttempts());
+        Assert.assertEquals(retryStrategyDTO.getMaximumEventAgeInSeconds(),
+            retryStrategy.getMaximumEventAgeInSeconds());
+    }
+
+    @Test
+    public void convertErrorTolerance() {
+        ErrorToleranceEnum errorToleranceEnum = EventTargetConverter.convertErrorTolerance("NONE");
+        Assert.assertEquals(ErrorToleranceEnum.NONE, errorToleranceEnum);
+    }
+
+    @Test
+    public void convertDeadLetterQueue() {
+        DeadLetterQueueDTO deadLetterQueueDTO = this.buildDeadLetterQueueDTO();
+        DeadLetterQueue deadLetterQueue = EventTargetConverter.convertDeadLetterQueue(deadLetterQueueDTO);
+        Assert.assertEquals(deadLetterQueue.getType(), deadLetterQueueDTO.getType());
+        Assert.assertEquals(deadLetterQueue.getConfig(), deadLetterQueueDTO.getConfig());
+    }
+
+    private RetryStrategyDTO buildRetryStrategyDTO() {
+        RetryStrategyDTO retryStrategyDTO = new RetryStrategyDTO();
+        retryStrategyDTO.setPushRetryStrategy("BACKOFF_RETRY");
+        retryStrategyDTO.setMaximumRetryAttempts(3);
+        retryStrategyDTO.setMaximumEventAgeInSeconds(4);
+        return retryStrategyDTO;
+    }
+
+    private DeadLetterQueueDTO buildDeadLetterQueueDTO() {
+        DeadLetterQueueDTO deadLetterQueueDTO = new DeadLetterQueueDTO();
+        deadLetterQueueDTO.setType("rocketmq");
+        Map<String, Object> config = Maps.newHashMap();
+        config.put("topic", "demo");
+        config.put("nameSrv", "127.0.01:9876");
+        deadLetterQueueDTO.setConfig(config);
+        return deadLetterQueueDTO;
+    }
+}
\ No newline at end of file
diff --git a/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetDTOConverterTest.java b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetDTOConverterTest.java
new file mode 100644
index 0000000..79445e6
--- /dev/null
+++ b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetDTOConverterTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.api.converter;
+
+import java.util.Collections;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.target.EventTargetDTO;
+import org.apache.rocketmq.eventbridge.enums.ErrorToleranceEnum;
+import org.apache.rocketmq.eventbridge.enums.PushRetryStrategyEnum;
+import org.apache.rocketmq.eventbridge.domain.model.run.DeadLetterQueue;
+import org.apache.rocketmq.eventbridge.domain.model.run.RetryStrategy;
+import org.apache.rocketmq.eventbridge.domain.model.run.RunOptions;
+import org.apache.rocketmq.eventbridge.domain.model.target.EventTarget;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class EventTargetDTOConverterTest {
+
+    @Test
+    public void convert() {
+        RetryStrategy retryStrategy = RetryStrategy.builder()
+            .pushRetryStrategy(PushRetryStrategyEnum.BACKOFF_RETRY)
+            .maximumRetryAttempts(3)
+            .maximumEventAgeInSeconds(4)
+            .build();
+        DeadLetterQueue deadLetterQueue = DeadLetterQueue.builder()
+            .type("rocketmq")
+            .config(Collections.singletonMap("topic", "demo"))
+            .build();
+        RunOptions runOptions = RunOptions.builder()
+            .errorsTolerance(ErrorToleranceEnum.ALL)
+            .retryStrategy(retryStrategy)
+            .deadLetterQueue(deadLetterQueue)
+            .build();
+        EventTarget eventTarget = EventTarget.builder()
+            .eventBusName("bus")
+            .eventRuleName("rule")
+            .name("target")
+            .config(Collections.singletonMap("url", "http://127.0.0.1:7002/cloudevent"))
+            .className("http")
+            .runOptions(runOptions)
+            .build();
+
+        EventTargetDTO eventTargetDTO = EventTargetDTOConverter.convert(eventTarget);
+        Assert.assertEquals(eventTarget.getName(), eventTargetDTO.getEventTargetName());
+        Assert.assertEquals(eventTarget.getClassName(), eventTargetDTO.getClassName());
+        Assert.assertEquals(eventTarget.getConfig(), eventTargetDTO.getConfig());
+
+        Assert.assertEquals(eventTarget.getRunOptions()
+            .getErrorsTolerance()
+            .toString(), eventTargetDTO.getRunOptions()
+            .getErrorsTolerance());
+
+        Assert.assertEquals(eventTarget.getRunOptions()
+            .getRetryStrategy()
+            .getPushRetryStrategy()
+            .toString(), eventTargetDTO.getRunOptions()
+            .getRetryStrategy()
+            .getPushRetryStrategy());
+        Assert.assertEquals(eventTarget.getRunOptions()
+            .getRetryStrategy()
+            .getMaximumRetryAttempts(), eventTargetDTO.getRunOptions()
+            .getRetryStrategy()
+            .getMaximumRetryAttempts());
+        Assert.assertEquals(eventTarget.getRunOptions()
+            .getRetryStrategy()
+            .getMaximumEventAgeInSeconds(), eventTargetDTO.getRunOptions()
+            .getRetryStrategy()
+            .getMaximumEventAgeInSeconds());
+
+        Assert.assertEquals(eventTarget.getRunOptions()
+            .getDeadLetterQueue()
+            .getType(), eventTargetDTO.getRunOptions()
+            .getDeadLetterQueue()
+            .getType());
+        Assert.assertEquals(eventTarget.getRunOptions()
+            .getDeadLetterQueue()
+            .getConfig(), eventTargetDTO.getRunOptions()
+            .getDeadLetterQueue()
+            .getConfig());
+    }
+}
\ No newline at end of file
diff --git a/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/handler/EventDataHandlerTest.java b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/handler/EventDataHandlerTest.java
new file mode 100644
index 0000000..fe5b718
--- /dev/null
+++ b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/handler/EventDataHandlerTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.api.handler;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import lombok.SneakyThrows;
+import org.apache.rocketmq.eventbridge.adapter.api.dto.data.PutEventsResponse;
+import org.apache.rocketmq.eventbridge.domain.model.data.EventDataService;
+import org.apache.rocketmq.eventbridge.domain.model.data.PutEventCallback;
+import org.apache.rocketmq.eventbridge.domain.model.data.PutEventsResponseEntry;
+import org.apache.rocketmq.eventbridge.event.EventBridgeEvent;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+import reactor.core.publisher.Mono;
+
+import static org.mockito.ArgumentMatchers.any;
+
+@RunWith(MockitoJUnitRunner.class)
+public class EventDataHandlerTest {
+    @InjectMocks
+    private EventDataHandler eventDataHandler;
+
+    @Mock
+    EventDataService eventDataService;
+
+    ExecutorService executor = Executors.newFixedThreadPool(10);
+
+    @Before
+    public void before() {
+        Mockito.doAnswer((invocation) -> {
+            Object[] args = invocation.getArguments();
+            EventBridgeEvent event = (EventBridgeEvent) args[1];
+            ReactorPutEventCallback callback = (ReactorPutEventCallback) args[2];
+            executor.submit(new PutEventTestThread(event, callback));
+            return null;
+        })
+            .when(eventDataService)
+            .putEvent(any(), any(), any());
+    }
+
+    @Test
+    public void testPutEvents() {
+        Long startTime = System.currentTimeMillis();
+        List<EventBridgeEvent> eventList = IntStream.range(0, 10)
+            .mapToObj(index -> {
+                EventBridgeEvent event = new EventBridgeEvent();
+                event.setId(UUID.randomUUID()
+                    .toString());
+                return event;
+            })
+            .collect(Collectors.toList());
+        Mono<PutEventsResponse> mono = eventDataHandler.putEvents("123456", eventList);
+        PutEventsResponse putEventsResponse = mono.block();
+        Long costTime = System.currentTimeMillis() - startTime;
+        Assert.assertEquals(10, putEventsResponse.getEntryList()
+            .size());
+        System.out.println("costTime:" + costTime);
+        Assert.assertEquals(true, costTime < 4000);
+    }
+
+    class PutEventTestThread implements Runnable {
+        EventBridgeEvent event;
+        PutEventCallback putEventCallback;
+
+        public PutEventTestThread(EventBridgeEvent event, PutEventCallback putEventCallback) {
+            this.event = event;
+            this.putEventCallback = putEventCallback;
+        }
+
+        @SneakyThrows
+        @Override
+        public void run() {
+            Thread.sleep(3000L);
+            PutEventsResponseEntry putEventsResponseEntry = new PutEventsResponseEntry();
+            putEventsResponseEntry.setEventId(event.getId());
+            if (System.currentTimeMillis() % 2 == 1) {
+                putEventsResponseEntry.setErrorCode("Success");
+            } else {
+                putEventsResponseEntry.setErrorCode("Failed.");
+            }
+
+            putEventCallback.endProcess(putEventsResponseEntry);
+        }
+    }
+
+}
diff --git a/test/pom.xml b/test/pom.xml
new file mode 100644
index 0000000..12f4a80
--- /dev/null
+++ b/test/pom.xml
@@ -0,0 +1,28 @@
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+	license agreements. See the NOTICE file distributed with this work for additional
+	information regarding copyright ownership. The ASF licenses this file to
+	You under the Apache License, Version 2.0 (the "License"); you may not use
+	this file except in compliance with the License. You may obtain a copy of
+	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+	by applicable law or agreed to in writing, software distributed under the
+	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+	OF ANY KIND, either express or implied. See the License for the specific
+	language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>rocketmq-eventbridge</artifactId>
+        <groupId>org.apache.rocketmq</groupId>
+        <version>1.0.0</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>rocketmq-eventbridge-test</artifactId>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>demo</module>
+    </modules>
+
+</project>
\ No newline at end of file