1.add embedded database
2.add default validateFilter
diff --git a/adapter/persistence/pom.xml b/adapter/persistence/pom.xml
index dd46427..eac0c8e 100644
--- a/adapter/persistence/pom.xml
+++ b/adapter/persistence/pom.xml
@@ -54,6 +54,11 @@
<version>${mysql-connector-java.version}</version>
</dependency>
<dependency>
+ <groupId>com.h2database</groupId>
+ <artifactId>h2</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<scope>compile</scope>
diff --git a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/DatasourceConfig.java b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/DatasourceConfig.java
new file mode 100644
index 0000000..6718b8d
--- /dev/null
+++ b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/DatasourceConfig.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.adapter.persistence;
+
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.ibatis.session.SqlSessionFactory;
+import org.mybatis.spring.SqlSessionFactoryBean;
+import org.mybatis.spring.SqlSessionTemplate;
+import org.mybatis.spring.annotation.MapperScan;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
+
+import javax.sql.DataSource;
+
+/**
+ * DatasourceConfig
+ */
+@Configuration
+@MapperScan(basePackages = "org.apache.rocketmq.eventbridge.adapter.persistence.*.mybatis.*")
+public class DatasourceConfig {
+ private static final String MAPPER_LOCATION = "classpath:mybatis/*.xml";
+
+ @Value("${spring.datasource.hikari.jdbc-url:jdbc:h2:./rocketmq_eventbridge;MODE=MySQL}")
+ private String baseUrl;
+ @Value("${spring.datasource.hikari.driver-class-name:org.h2.Driver}")
+ private String baseDriverClassName;
+ @Value("${spring.datasource.hikari.username:sa}")
+ private String baseUserName;
+ @Value("${spring.datasource.hikari.password:sa}")
+ private String basePassword;
+
+ @Value("${spring.datasource.hikari.minimum-idle:5}")
+ private Integer minIdle;
+
+ @Value("${spring.datasource.hikari.idle-timeout:180000}")
+ private Long idleTimeoutMs;
+
+ @Value("${spring.datasource.hikari.maximum-pool-size: 10}")
+ private Integer maxPoolSize;
+
+ @Value("${spring.datasource.hikari.auto-commit: true}")
+ private Boolean autoCommit;
+
+ @Value("${spring.datasource.hikari.pool-name: hikaricp}")
+ private String poolName;
+
+ @Value("${spring.datasource.hikari.max-lifetime: 180000}")
+ private Long maxLifeTime;
+
+ @Value("${spring.datasource.hikari.connection-timeout: 30000}")
+ private Long connectionTimeoutMs;
+
+ @Value("${spring.datasource.hikari.connection-test-query: select 1}")
+ private String connectionTestQuery;
+
+ @Value("${spring.datasource.hikari.validation-timeout: 500}")
+ private Long validationTimeoutMs;
+
+ @Bean("dataSource")
+ public DataSource getMasterDataSource(){
+ HikariConfig hikariConfig = new HikariConfig();
+ hikariConfig.setJdbcUrl(baseUrl);
+ hikariConfig.setDriverClassName(baseDriverClassName);
+ hikariConfig.setUsername(baseUserName);
+ hikariConfig.setPassword(basePassword);
+ hikariConfig.setMinimumIdle(minIdle);
+ hikariConfig.setIdleTimeout(idleTimeoutMs);
+ hikariConfig.setMaximumPoolSize(maxPoolSize);
+ hikariConfig.setAutoCommit(autoCommit);
+ hikariConfig.setPoolName(poolName);
+ hikariConfig.setMaxLifetime(maxLifeTime);
+ hikariConfig.setConnectionTimeout(connectionTimeoutMs);
+ hikariConfig.setConnectionTestQuery(connectionTestQuery);
+ hikariConfig.setValidationTimeout(validationTimeoutMs);
+ return new HikariDataSource(hikariConfig);
+ }
+
+ @Bean("sqlSessionFactory")
+ public SqlSessionFactory masterSqlSessionFactory(@Qualifier("dataSource") DataSource dataSource) throws Exception {
+ SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
+ bean.setDataSource(dataSource);
+ bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(MAPPER_LOCATION));
+ return bean.getObject();
+ }
+
+ @Bean("sqlSessionTemplate")
+ public SqlSessionTemplate masterSqlSessionTemplate(@Qualifier("sqlSessionFactory") SqlSessionFactory sqlSessionFactory){
+ return new SqlSessionTemplate(sqlSessionFactory);
+ }
+
+}
+
diff --git a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/PersistenceConfig.java b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/PersistenceConfig.java
index 8b70fb9..d115ad2 100644
--- a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/PersistenceConfig.java
+++ b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/PersistenceConfig.java
@@ -20,13 +20,11 @@
import lombok.SneakyThrows;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
-@MapperScan(basePackages = "org.apache.rocketmq.eventbridge.adapter.persistence.*.mybatis.*")
public class PersistenceConfig {
@SneakyThrows
diff --git a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java
index 4edeecd..0c92aa4 100644
--- a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java
+++ b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java
@@ -21,6 +21,8 @@
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.eventbridge.adapter.persistence.target.mybatis.converter.EventTargetRunnerConverter;
import org.apache.rocketmq.eventbridge.adapter.persistence.target.mybatis.dataobject.EventTargetRunnerDO;
import org.apache.rocketmq.eventbridge.adapter.persistence.target.mybatis.mapper.EventTargetRunnerMapper;
@@ -72,6 +74,9 @@
@Override
public List<EventTargetRunner> listEventTargetRunners(String accountId, String eventBusName, String eventRuleName) {
+ if (StringUtils.isBlank(accountId) || StringUtils.isBlank(eventBusName) || StringUtils.isBlank(eventRuleName)) {
+ return Lists.newArrayListWithCapacity(0);
+ }
List<EventTargetRunnerDO> eventTargetRunnerDOS = eventTargetRunnerMapper.listEventTargetRunners(accountId, eventBusName, eventRuleName);
if (eventTargetRunnerDOS == null || eventTargetRunnerDOS.isEmpty()) {
return Lists.newArrayListWithCapacity(0);
diff --git a/adapter/persistence/src/main/resources/db/migration/V1__baseline.sql b/adapter/persistence/src/main/resources/db/migration/V1__baseline.sql
index 6681b7e..3909293 100644
--- a/adapter/persistence/src/main/resources/db/migration/V1__baseline.sql
+++ b/adapter/persistence/src/main/resources/db/migration/V1__baseline.sql
@@ -1,17 +1,19 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
CREATE TABLE IF NOT EXISTS `event_bus` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
@@ -21,11 +23,10 @@
`gmt_create` datetime DEFAULT NULL COMMENT 'create time',
`gmt_modify` datetime DEFAULT NULL COMMENT 'modify time',
PRIMARY KEY (`id`),
- UNIQUE KEY `name_uniq_key` (`account_id`,`name`)
-) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='event bus meta'
-;
+ UNIQUE KEY `name_uniq_key_event_bus` (`account_id`,`name`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-CREATE TABLE `event_topic` (
+CREATE TABLE IF NOT EXISTS `event_topic` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`account_id` varchar(255) DEFAULT 'SYSTEM' COMMENT 'source account id',
`bus` varchar(255) NOT NULL COMMENT 'bus name',
@@ -36,9 +37,8 @@
`gmt_create` datetime DEFAULT NULL COMMENT 'create time',
`gmt_modify` datetime DEFAULT NULL COMMENT 'modify time',
PRIMARY KEY (`id`),
- UNIQUE KEY `name_uniq_key` (`name`)
-) ENGINE=InnoDB DEFAULT CHARSET=utf8
-;
+ UNIQUE KEY `name_uniq_key_event_topicdd` (`name`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE IF NOT EXISTS `event_source` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
@@ -53,8 +53,8 @@
`gmt_create` datetime DEFAULT NULL COMMENT 'create time',
`gmt_modify` datetime DEFAULT NULL COMMENT 'modify time',
PRIMARY KEY (`id`),
- UNIQUE KEY `name_uniq_key` (`account_id`,`name`)
-) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='event source meta'
+ UNIQUE KEY `name_uniq_key_event_source` (`account_id`,`name`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8
;
CREATE TABLE IF NOT EXISTS `event_type` (
@@ -67,12 +67,12 @@
`gmt_create` datetime DEFAULT NULL COMMENT 'create time',
`gmt_modify` datetime DEFAULT NULL COMMENT 'modify time',
PRIMARY KEY (`id`),
- UNIQUE KEY `name_uniq_key` (`source`,`name`)
-) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='event type meta'
+ UNIQUE KEY `name_uniq_key_event_type` (`source`,`name`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8
;
CREATE TABLE IF NOT EXISTS `event_rule` (
- `id` bigint(20) NOT NULL AUTO_INCREMENT,
+ `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`account_id` varchar(255) NOT NULL COMMENT 'bus account id',
`bus` varchar(255) NOT NULL COMMENT 'bus name',
`name` varchar(255) NOT NULL COMMENT 'rule name',
@@ -82,8 +82,8 @@
`gmt_create` datetime DEFAULT NULL COMMENT 'create time',
`gmt_modify` datetime DEFAULT NULL COMMENT 'modify time',
UNIQUE KEY `id` (`id`),
- UNIQUE KEY `name_uniq_key` (`account_id`,`name`,`bus`)
-) ENGINE=InnoDB AUTO_INCREMENT=51815 DEFAULT CHARSET=utf8 COMMENT='event rule meta'
+ UNIQUE KEY `name_uniq_key_event_rule` (`account_id`,`name`,`bus`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8
;
CREATE TABLE IF NOT EXISTS `event_source_runner` (
@@ -96,8 +96,8 @@
`gmt_create` datetime DEFAULT NULL COMMENT 'create time',
`gmt_modify` datetime DEFAULT NULL COMMENT 'modify time',
PRIMARY KEY (`id`),
- UNIQUE KEY `name_uniq_key` (`account_id`,`bus`,`source`)
-) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='event source runner meta'
+ UNIQUE KEY `name_uniq_key_event_source_runner` (`account_id`,`bus`,`source`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8
;
CREATE TABLE IF NOT EXISTS `event_target` (
@@ -112,8 +112,8 @@
`gmt_create` datetime DEFAULT NULL COMMENT 'create time',
`gmt_modify` datetime DEFAULT NULL COMMENT 'modify time',
PRIMARY KEY (`id`),
- UNIQUE KEY `name_uniq_key` (`account_id`,`bus`,`rule`,`name`)
-) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='event target meta'
+ UNIQUE KEY `name_uniq_key_event_target` (`account_id`,`bus`,`rule`,`name`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8
;
@@ -128,8 +128,8 @@
`gmt_create` datetime DEFAULT NULL COMMENT 'create time',
`gmt_modify` datetime DEFAULT NULL COMMENT 'modify time',
PRIMARY KEY (`id`),
- UNIQUE KEY `name_uniq_key` (`account_id`,`bus`,`rule`,`target`)
-) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='event target runner meta'
+ UNIQUE KEY `name_uniq_key_event_target_runner` (`account_id`,`bus`,`rule`,`target`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8
;
@@ -144,8 +144,8 @@
`gmt_create` datetime DEFAULT NULL COMMENT 'create time',
`gmt_modify` datetime DEFAULT NULL COMMENT 'modify time',
PRIMARY KEY (`id`),
- UNIQUE KEY `name_uniq_key` (`name`)
-) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='event source class meta'
+ UNIQUE KEY `name_uniq_key_event_source_class` (`name`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8
;
@@ -160,6 +160,6 @@
`gmt_create` datetime DEFAULT NULL COMMENT 'create time',
`gmt_modify` datetime DEFAULT NULL COMMENT 'modify time',
PRIMARY KEY (`id`),
- UNIQUE KEY `name_uniq_key` (`name`)
-) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='event target class meta'
+ UNIQUE KEY `name_uniq_key_event_target_class` (`name`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8
;
diff --git a/adapter/persistence/src/main/resources/db/migration/V2__baseline.sql b/adapter/persistence/src/main/resources/db/migration/V2__baseline.sql
index 1d04ec5..c70912c 100644
--- a/adapter/persistence/src/main/resources/db/migration/V2__baseline.sql
+++ b/adapter/persistence/src/main/resources/db/migration/V2__baseline.sql
@@ -1,17 +1,19 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
CREATE TABLE IF NOT EXISTS `event_connection` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
@@ -25,8 +27,8 @@
`gmt_create` datetime DEFAULT NULL COMMENT 'create time',
`gmt_modify` datetime DEFAULT NULL COMMENT 'modify time',
PRIMARY KEY (`id`),
- UNIQUE KEY `name_uniq_key` (`name`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='event connection meta'
+ UNIQUE KEY `name_uniq_key_event_connection` (`name`)
+ ) ENGINE=InnoDB DEFAULT CHARSET=utf8
;
@@ -42,6 +44,6 @@
`gmt_create` datetime DEFAULT NULL COMMENT 'create time',
`gmt_modify` datetime DEFAULT NULL COMMENT 'modify time',
PRIMARY KEY (`id`),
- UNIQUE KEY `name_uniq_key` (`name`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='event api destination meta'
+ UNIQUE KEY `name_uniq_key_event_api_destination` (`name`)
+ ) ENGINE=InnoDB DEFAULT CHARSET=utf8
;
diff --git a/adapter/persistence/src/main/resources/db/migration/V3__change_target_transform_type.sql b/adapter/persistence/src/main/resources/db/migration/V3__change_target_transform_type.sql
index edaf379..b2a18c2 100644
--- a/adapter/persistence/src/main/resources/db/migration/V3__change_target_transform_type.sql
+++ b/adapter/persistence/src/main/resources/db/migration/V3__change_target_transform_type.sql
@@ -1,17 +1,19 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
ALTER TABLE `event_target_class`
CHANGE COLUMN `target_transform` `target_transform` TEXT NULL DEFAULT NULL ;
diff --git a/adapter/persistence/src/main/resources/db/migration/V4__register_source_acs_mns.sql b/adapter/persistence/src/main/resources/db/migration/V4__register_source_acs_mns.sql
index 7e826f0..1ad499a 100644
--- a/adapter/persistence/src/main/resources/db/migration/V4__register_source_acs_mns.sql
+++ b/adapter/persistence/src/main/resources/db/migration/V4__register_source_acs_mns.sql
@@ -1,16 +1,18 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
INSERT INTO `event_source_class` (`name`,`api_params`,`required_params`,`transform`,`visual_config`,`description`,`gmt_create`,`gmt_modify`) VALUES ('acs.mns','{\n \"RegionId\":{\n \"type\":\"String\",\n \"desc\":\"the region of aliyun mns.\",\n \"required\":true,\n \"defaultValue\":\"\"\n },\n \"QueueName\":{\n \"type\":\"String\",\n \"desc\":\"the queue name of aliyun mns.\",\n \"required\":true,\n \"defaultValue\":\"\"\n },\n \"IsBase64Decode\":{\n \"type\":\"boolean\",\n \"desc\":\"base64 decode or not\"\n },\n \"AliyunAccountId\":{\n \"type\":\"String\",\n \"desc\":\"the account id of aliyun mns.\",\n \"required\":true\n },\n \"AccessKeyId\":{\n \"type\":\"String\",\n \"desc\":\"the access key id of aliyun mns.\",\n \"required\":true\n },\n \"AccessKeySecret\":{\n \"type\":\"String\",\n \"desc\":\"the access key idsecret of aliyun mns.\",\n \"required\":true\n }\n}','{\n \"accountEndpoint\":\"http://${AliyunAccountId}.mns.${RegionId}.aliyuncs.com\",\n \"accountId\":\"${AliyunAccountId}\",\n \"queueName\":\"${QueueName}\",\n \"isBase64Decode\":\"${IsBase64Decode}\",\n \"accessKeyId\":\"${AccessKeyId}\",\n \"accessKeySecret\":\"${AccessKeySecret}\",\n \"class\":\"org.apache.rocketmq.connect.mns.source.MNSSourceConnector\"\n}','{\n \"data\":\"{\\\"value\\\":\\\"$.data\\\",\\\"form\\\":\\\"JSONPATH\\\"}\",\n \"subject\":\"{\\\"value\\\":\\\"acs:mns:${RegionId}:${AliyunAccountId}:queues/${QueueName}\\\",\\\"form\\\":\\\"CONSTANT\\\"}\",\n \"type\":\"{\\\"value\\\":\\\"mns.sendMsg\\\",\\\"form\\\":\\\"CONSTANT\\\"}\"\n}',NULL,'aliyun mns source',now(),now());
diff --git a/adapter/persistence/src/main/resources/db/migration/V5__register_target_acs_dingtalk.sql b/adapter/persistence/src/main/resources/db/migration/V5__register_target_acs_dingtalk.sql
index 31291a1..1d8a643 100644
--- a/adapter/persistence/src/main/resources/db/migration/V5__register_target_acs_dingtalk.sql
+++ b/adapter/persistence/src/main/resources/db/migration/V5__register_target_acs_dingtalk.sql
@@ -1,17 +1,19 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
INSERT INTO `event_target_class` (`name`,`api_params`,`target_transform`,`required_params`,`visual_config`,`description`,`gmt_create`,`gmt_modify`)
VALUES
diff --git a/adapter/persistence/src/main/resources/db/migration/V6__register_target_acs_eventbridge.sql b/adapter/persistence/src/main/resources/db/migration/V6__register_target_acs_eventbridge.sql
index e4c20c1..b7587a0 100644
--- a/adapter/persistence/src/main/resources/db/migration/V6__register_target_acs_eventbridge.sql
+++ b/adapter/persistence/src/main/resources/db/migration/V6__register_target_acs_eventbridge.sql
@@ -1,17 +1,19 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
INSERT INTO `event_target_class` (`name`,`api_params`,`target_transform`,`required_params`,`visual_config`,`description`,`gmt_create`,`gmt_modify`)
VALUES
diff --git a/adapter/persistence/src/main/resources/db/migration/V7__update_event_connection_table_structure.sql b/adapter/persistence/src/main/resources/db/migration/V7__update_event_connection_table_structure.sql
index 82be349..e98bbf0 100644
--- a/adapter/persistence/src/main/resources/db/migration/V7__update_event_connection_table_structure.sql
+++ b/adapter/persistence/src/main/resources/db/migration/V7__update_event_connection_table_structure.sql
@@ -1,18 +1,19 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
-ALTER TABLE `event_connection`
-MODIFY COLUMN `authorization_type` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT '' COMMENT '授权类型' AFTER `name`,
-MODIFY COLUMN `auth_parameters` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL AFTER `authorization_type`;
\ No newline at end of file
+--ALTER TABLE `event_connection` MODIFY COLUMN `authorization_type` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT '' COMMENT '授权类型' AFTER `name`;
+--ALTER TABLE `event_connection` MODIFY COLUMN `auth_parameters` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL AFTER `authorization_type`;
\ No newline at end of file
diff --git a/adapter/persistence/src/main/resources/db/migration/V8__update_unique_key.sql b/adapter/persistence/src/main/resources/db/migration/V8__update_unique_key.sql
index f25bfb0..2e9098e 100644
--- a/adapter/persistence/src/main/resources/db/migration/V8__update_unique_key.sql
+++ b/adapter/persistence/src/main/resources/db/migration/V8__update_unique_key.sql
@@ -1,22 +1,22 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
-ALTER TABLE `event_connection`
-DROP INDEX `name_uniq_key`,
-ADD UNIQUE INDEX `name_uniq_key`(`name`, `account_id`) USING BTREE;
+ALTER TABLE `event_connection` DROP INDEX `name_uniq_key_event_connection`;
+ALTER TABLE `event_connection` ADD UNIQUE INDEX `name_uniq_key_event_connection`(`name`, `account_id`) USING BTREE;
-ALTER TABLE `event_api_destination`
-DROP INDEX `name_uniq_key`,
-ADD UNIQUE INDEX `name_uniq_key`(`name`, `account_id`) USING BTREE;
\ No newline at end of file
+ALTER TABLE `event_api_destination` DROP INDEX `name_uniq_key_event_api_destination`;
+ALTER TABLE `event_api_destination` ADD UNIQUE INDEX `name_uniq_key_event_api_destination`(`name`, `account_id`) USING BTREE;
\ No newline at end of file
diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java
index 4e6f64e..320c69e 100644
--- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java
+++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java
@@ -20,6 +20,7 @@
import com.google.gson.Gson;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.eventbridge.adapter.persistence.data.mybatis.dataobject.EventTopicDO;
@@ -92,7 +93,10 @@
@Cacheable(value = "topicCache")
@Override
public String getTopicName(String accountId, String eventBusName) {
- String topicName = null;
+ String topicName = eventDataOnRocketMQConnectAPI.buildTopicName(accountId, eventBusName);
+ if (StringUtils.isBlank(AppConfig.getGlobalConfig().getDefaultDataPersistentClusterName())) {
+ return topicName;
+ }
EventTopicDO eventTopicDO = eventTopicMapper.getTopic(accountId, eventBusName);
if (eventTopicDO != null) {
topicName = eventTopicDO.getName();
diff --git a/common/src/main/java/org/apache/rocketmq/eventbridge/enums/props/Constants.java b/common/src/main/java/org/apache/rocketmq/eventbridge/enums/props/Constants.java
new file mode 100644
index 0000000..dfd1d30
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/eventbridge/enums/props/Constants.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.enums.props;
+
+/**
+ * Constants
+ */
+public enum Constants {
+ HEADER_KEY_RESOURCE_OWNER_ACCOUNT_ID("resourceOwnerAccountId"),
+ HEADER_KEY_LOGIN_ACCOUNT_ID("loginAccountId"),
+ HEADER_KEY_PARENT_ACCOUNT_ID("parentAccountId"),
+ ;
+
+ private String name;
+
+ Constants(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return this.name;
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/eventbridge/exception/ServiceProviderNotFoundException.java b/common/src/main/java/org/apache/rocketmq/eventbridge/exception/ServiceProviderNotFoundException.java
new file mode 100644
index 0000000..37f7b3f
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/eventbridge/exception/ServiceProviderNotFoundException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.exception;
+
+
+/**
+ * Service provider not found exception.
+ *
+ */
+public final class ServiceProviderNotFoundException extends EventBridgeException {
+
+ private static final long serialVersionUID = -3730257541332863236L;
+
+ private static final String ERROR_CATEGORY = "SPI";
+
+ private static final int ERROR_CODE = 500;
+
+ public ServiceProviderNotFoundException(final Class<?> clazz, final String type) {
+ super(String.format("%s-%05d: %s %s", ERROR_CATEGORY, ERROR_CODE, String.format("No implementation class load from SPI `%s`:", clazz.getName()), type));
+ }
+
+}
diff --git a/infrastructure/pom.xml b/infrastructure/pom.xml
index d419a45..5db59ef 100644
--- a/infrastructure/pom.xml
+++ b/infrastructure/pom.xml
@@ -26,4 +26,15 @@
<maven.compiler.target>8</maven.compiler.target>
</properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-eventbridge-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-webflux</artifactId>
+ </dependency>
+ </dependencies>
+
</project>
\ No newline at end of file
diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/AuthValidation.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/AuthValidation.java
new file mode 100644
index 0000000..c2a86a1
--- /dev/null
+++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/AuthValidation.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.infrastructure.validate;
+
+
+import org.apache.rocketmq.eventbridge.infrastructure.validate.spi.ServiceLifecycle;
+import org.apache.rocketmq.eventbridge.infrastructure.validate.spi.typed.TypedSPI;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import reactor.util.context.Context;
+
+/**
+ * AuthValidation
+ */
+public interface AuthValidation extends ServiceLifecycle, TypedSPI {
+
+ Context validate(ServerHttpRequest request, Context ctx);
+
+ @Override
+ default void init() {
+
+ }
+
+ @Override
+ default void shutdown(){
+
+ }
+}
diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/DefaultAuthValidation.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/DefaultAuthValidation.java
new file mode 100644
index 0000000..f85517d
--- /dev/null
+++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/DefaultAuthValidation.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.infrastructure.validate;
+
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import reactor.util.context.Context;
+
+import java.util.List;
+
+import static org.apache.rocketmq.eventbridge.enums.props.Constants.HEADER_KEY_RESOURCE_OWNER_ACCOUNT_ID;
+
+/**
+ * DefaultAuthValidation
+ */
+public class DefaultAuthValidation implements AuthValidation {
+
+ @Override
+ public Context validate(ServerHttpRequest request, Context ctx) {
+ String resourceOwnerId = "defaultResourceOwnerId";
+ List<String> resourceOwnerIds = request.getHeaders().get(HEADER_KEY_RESOURCE_OWNER_ACCOUNT_ID);
+ if (resourceOwnerIds != null && !resourceOwnerIds.isEmpty()) {
+ //throw new EventBridgeException(DefaultErrorCode.LoginFailed);
+ resourceOwnerId = resourceOwnerIds.get(0);
+ }
+ return ctx.put(HEADER_KEY_RESOURCE_OWNER_ACCOUNT_ID.getName(), resourceOwnerId);
+ }
+
+ @Override
+ public String getType() {
+ return "default";
+ }
+}
diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/ServiceLifecycle.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/ServiceLifecycle.java
new file mode 100644
index 0000000..517264c
--- /dev/null
+++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/ServiceLifecycle.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.infrastructure.validate.spi;
+
+/**
+ * ServiceLifecycle
+ *
+ */
+public interface ServiceLifecycle {
+ /**
+ * Used for startup or initialization of a service endpoint. A service endpoint instance will be in a ready state
+ * after this method has been completed.
+ */
+ void init();
+
+ /**
+ * Notify a service instance of the end of its life cycle. Once this method completes, the service endpoint could be
+ * destroyed and eligible for garbage collection.
+ */
+ void shutdown();
+}
\ No newline at end of file
diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/ValidationServiceFactory.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/ValidationServiceFactory.java
new file mode 100644
index 0000000..e467e9e
--- /dev/null
+++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/ValidationServiceFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.infrastructure.validate.spi;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.rocketmq.eventbridge.infrastructure.validate.AuthValidation;
+import org.apache.rocketmq.eventbridge.infrastructure.validate.spi.typed.TypedSPIRegistry;
+
+/**
+ * validation service load factory.
+ *
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class ValidationServiceFactory {
+ static {
+ ValidationServiceLoader.register(AuthValidation.class);
+ }
+
+ /**
+ * Get instance of cluster persist repository.
+ *
+ * @param type persist repository configuration
+ * @return got instance
+ */
+ public static AuthValidation getInstance(final String type) {
+ AuthValidation result = TypedSPIRegistry.getRegisteredService(AuthValidation.class, type);
+ result.init();
+ return result;
+ }
+
+}
diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/ValidationServiceLoader.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/ValidationServiceLoader.java
new file mode 100644
index 0000000..a2c220e
--- /dev/null
+++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/ValidationServiceLoader.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.infrastructure.validate.spi;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.SneakyThrows;
+import org.apache.rocketmq.eventbridge.infrastructure.validate.spi.annotation.SingletonSPI;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * validation service loader.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class ValidationServiceLoader {
+
+ private static final Map<Class<?>, Collection<Object>> SERVICES = new ConcurrentHashMap<>();
+
+ /**
+ * Register service.
+ *
+ * @param serviceInterface service interface
+ */
+ public static void register(final Class<?> serviceInterface) {
+ if (!SERVICES.containsKey(serviceInterface)) {
+ SERVICES.put(serviceInterface, load(serviceInterface));
+ }
+ }
+
+ private static <T> Collection<Object> load(final Class<T> serviceInterface) {
+ Collection<Object> result = new LinkedList<>();
+ for (T each : ServiceLoader.load(serviceInterface)) {
+ result.add(each);
+ }
+ return result;
+ }
+
+ /**
+ * Get service instances.
+ *
+ * @param serviceInterface service interface
+ * @param <T> type of service
+ * @return service instances
+ */
+ public static <T> Collection<T> getServiceInstances(final Class<T> serviceInterface) {
+ return null == serviceInterface.getAnnotation(SingletonSPI.class) ? createNewServiceInstances(serviceInterface) : getSingletonServiceInstances(serviceInterface);
+ }
+
+ @SneakyThrows(ReflectiveOperationException.class)
+ @SuppressWarnings("unchecked")
+ private static <T> Collection<T> createNewServiceInstances(final Class<T> serviceInterface) {
+ if (!SERVICES.containsKey(serviceInterface)) {
+ return Collections.emptyList();
+ }
+ Collection<Object> services = SERVICES.get(serviceInterface);
+ if (services.isEmpty()) {
+ return Collections.emptyList();
+ }
+ Collection<T> result = new LinkedList<>();
+ for (Object each : services) {
+ result.add((T) each.getClass().getDeclaredConstructor().newInstance());
+ }
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> Collection<T> getSingletonServiceInstances(final Class<T> serviceInterface) {
+ return (Collection<T>) SERVICES.getOrDefault(serviceInterface, Collections.emptyList());
+ }
+}
diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/annotation/SingletonSPI.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/annotation/SingletonSPI.java
new file mode 100644
index 0000000..cd61305
--- /dev/null
+++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/annotation/SingletonSPI.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.infrastructure.validate.spi.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation of singleton SPI.
+ */
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface SingletonSPI {
+}
diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/typed/TypedSPI.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/typed/TypedSPI.java
new file mode 100644
index 0000000..65a0cbb
--- /dev/null
+++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/typed/TypedSPI.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.infrastructure.validate.spi.typed;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Typed SPI.
+ */
+public interface TypedSPI {
+
+ /**
+ * Get type.
+ *
+ * @return type
+ */
+ default String getType() {
+ return "";
+ }
+
+ /**
+ * Get type aliases.
+ *
+ * @return type aliases
+ */
+ default Collection<String> getTypeAliases() {
+ return Collections.emptyList();
+ }
+}
diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/typed/TypedSPIRegistry.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/typed/TypedSPIRegistry.java
new file mode 100644
index 0000000..104da6d
--- /dev/null
+++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/spi/typed/TypedSPIRegistry.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.infrastructure.validate.spi.typed;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.rocketmq.eventbridge.exception.ServiceProviderNotFoundException;
+import org.apache.rocketmq.eventbridge.infrastructure.validate.spi.ValidationServiceLoader;
+
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Typed SPI registry.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class TypedSPIRegistry {
+
+ /**
+ * Find registered service.
+ *
+ * @param spiClass typed SPI class
+ * @param type type
+ * @param <T> SPI class type
+ * @return registered service
+ */
+ public static <T extends TypedSPI> Optional<T> findRegisteredService(final Class<T> spiClass, final String type) {
+ for (T each : ValidationServiceLoader.getServiceInstances(spiClass)) {
+ if (matchesType(type, each)) {
+ return Optional.of(each);
+ }
+ }
+ return Optional.empty();
+ }
+
+ private static boolean matchesType(final String type, final TypedSPI instance) {
+ return instance.getType().equalsIgnoreCase(type) || instance.getTypeAliases().contains(type);
+ }
+
+ private static Properties convertToStringTypedProperties(final Properties props) {
+ if (null == props) {
+ return new Properties();
+ }
+ Properties result = new Properties();
+ props.forEach((key, value) -> result.setProperty(key.toString(), null == value ? null : value.toString()));
+ return result;
+ }
+
+ /**
+ * Get registered service.
+ *
+ * @param spiClass typed SPI class
+ * @param type type
+ * @param <T> SPI class type
+ * @return registered service
+ */
+ public static <T extends TypedSPI> T getRegisteredService(final Class<T> spiClass, final String type) {
+ Optional<T> result = findRegisteredService(spiClass, type);
+ if (result.isPresent()) {
+ return result.get();
+ }
+ throw new ServiceProviderNotFoundException(spiClass, type);
+ }
+}
diff --git a/infrastructure/src/main/resources/META-INF/services/org.apache.rocketmq.eventbridge.infrastructure.validate.AuthValidation b/infrastructure/src/main/resources/META-INF/services/org.apache.rocketmq.eventbridge.infrastructure.validate.AuthValidation
new file mode 100644
index 0000000..205672f
--- /dev/null
+++ b/infrastructure/src/main/resources/META-INF/services/org.apache.rocketmq.eventbridge.infrastructure.validate.AuthValidation
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+org.apache.rocketmq.eventbridge.infrastructure.validate.DefaultAuthValidation
\ No newline at end of file
diff --git a/start/src/main/java/org/apache/rocketmq/eventbridge/config/H2ServerConfig.java b/start/src/main/java/org/apache/rocketmq/eventbridge/config/H2ServerConfig.java
new file mode 100644
index 0000000..e7c1990
--- /dev/null
+++ b/start/src/main/java/org/apache/rocketmq/eventbridge/config/H2ServerConfig.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.config;
+
+import org.h2.tools.Server;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Profile;
+import org.springframework.context.event.ContextClosedEvent;
+import org.springframework.context.event.ContextRefreshedEvent;
+import org.springframework.context.event.EventListener;
+
+@Configuration
+@Profile("local")
+public class H2ServerConfig {
+ private Server webServer;
+
+
+ @EventListener(ContextRefreshedEvent.class)
+ public void start() throws java.sql.SQLException {
+ this.webServer = org.h2.tools.Server.createWebServer("-webPort", "8083", "-web", "-webAllowOthers", "-tcp", "-tcpAllowOthers", "-browser").start();
+ }
+
+ @EventListener(ContextClosedEvent.class)
+ public void stop() {
+ this.webServer.stop();
+ }
+}
diff --git a/start/src/main/java/org/apache/rocketmq/eventbridge/filter/LoginFilter.java b/start/src/main/java/org/apache/rocketmq/eventbridge/filter/LoginFilter.java
index af7cb04..5afafae 100644
--- a/start/src/main/java/org/apache/rocketmq/eventbridge/filter/LoginFilter.java
+++ b/start/src/main/java/org/apache/rocketmq/eventbridge/filter/LoginFilter.java
@@ -18,8 +18,6 @@
import java.util.List;
import lombok.extern.slf4j.Slf4j;
-import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
-import org.apache.rocketmq.eventbridge.exception.code.DefaultErrorCode;
import org.springframework.core.annotation.Order;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
@@ -28,35 +26,27 @@
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
+import static org.apache.rocketmq.eventbridge.enums.props.Constants.HEADER_KEY_LOGIN_ACCOUNT_ID;
+import static org.apache.rocketmq.eventbridge.enums.props.Constants.HEADER_KEY_PARENT_ACCOUNT_ID;
+
@Component
@Order(value = 2)
@Slf4j
public class LoginFilter implements WebFilter {
- public static final String HEADER_KEY_LOGIN_ACCOUNT_ID = "loginAccountId";
- public static final String HEADER_KEY_PARENT_ACCOUNT_ID = "parentAccountId";
- public static final String HEADER_KEY_RESOURCE_OWNER_ACCOUNT_ID = "resourceOwnerAccountId";
-
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
return chain.filter(exchange)
.subscriberContext(ctx -> {
List<String> parentAccountIds = request.getHeaders()
- .get(HEADER_KEY_PARENT_ACCOUNT_ID);
+ .get(HEADER_KEY_PARENT_ACCOUNT_ID.getName());
List<String> loginAccountIds = request.getHeaders()
- .get(HEADER_KEY_LOGIN_ACCOUNT_ID);
- List<String> resourceOwnerIds = request.getHeaders()
- .get(HEADER_KEY_RESOURCE_OWNER_ACCOUNT_ID);
- if (resourceOwnerIds == null || resourceOwnerIds.isEmpty()) {
- throw new EventBridgeException(DefaultErrorCode.LoginFailed);
- }
- return ctx.put(HEADER_KEY_PARENT_ACCOUNT_ID,
+ .get(HEADER_KEY_LOGIN_ACCOUNT_ID.getName());
+ return ctx.put(HEADER_KEY_PARENT_ACCOUNT_ID.getName(),
parentAccountIds != null && !parentAccountIds.isEmpty() ? parentAccountIds.get(0) : "")
- .put(HEADER_KEY_LOGIN_ACCOUNT_ID,
- loginAccountIds != null && !loginAccountIds.isEmpty() ? loginAccountIds.get(0) : "")
- .put(HEADER_KEY_RESOURCE_OWNER_ACCOUNT_ID,
- resourceOwnerIds != null && !resourceOwnerIds.isEmpty() ? resourceOwnerIds.get(0) : "");
+ .put(HEADER_KEY_LOGIN_ACCOUNT_ID.getName(),
+ loginAccountIds != null && !loginAccountIds.isEmpty() ? loginAccountIds.get(0) : "");
});
}
}
diff --git a/start/src/main/java/org/apache/rocketmq/eventbridge/filter/ValidateFilter.java b/start/src/main/java/org/apache/rocketmq/eventbridge/filter/ValidateFilter.java
new file mode 100644
index 0000000..5d80328
--- /dev/null
+++ b/start/src/main/java/org/apache/rocketmq/eventbridge/filter/ValidateFilter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.filter;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.eventbridge.infrastructure.validate.AuthValidation;
+import org.apache.rocketmq.eventbridge.infrastructure.validate.spi.ValidationServiceFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.core.annotation.Order;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.stereotype.Component;
+import org.springframework.web.server.ServerWebExchange;
+import org.springframework.web.server.WebFilter;
+import org.springframework.web.server.WebFilterChain;
+import reactor.core.publisher.Mono;
+import reactor.util.context.Context;
+
+import javax.annotation.PostConstruct;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
+
+@Component
+@Order(value = 3)
+@Slf4j
+public class ValidateFilter implements WebFilter {
+
+ private List<AuthValidation> validations = new CopyOnWriteArrayList<>();
+
+ @Value(value="${auth.validation:default}")
+ private String validationName;
+
+ @PostConstruct
+ public void init() {
+ Arrays.stream(validationName.split(",")).forEach(action->validations.add(ValidationServiceFactory.getInstance(action)));
+ }
+
+ @Override
+ public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
+ ServerHttpRequest request = exchange.getRequest();
+ return chain.filter(exchange)
+ .subscriberContext(ctx -> {
+ AtomicReference<Context> result = new AtomicReference<Context>();
+ validations.forEach(validation-> result.set(validation.validate(request, ctx)));
+ return result.get();
+ });
+ }
+}
diff --git a/start/src/main/resources/application.properties b/start/src/main/resources/application.properties
index 185ba48..3736814 100644
--- a/start/src/main/resources/application.properties
+++ b/start/src/main/resources/application.properties
@@ -16,12 +16,12 @@
server.port=7001
management.server.port=7002
management.endpoints.web.base-path=/
+spring.profiles.active=local
## database
-spring.datasource.url=jdbc:mysql://localhost:3306/eventBridge?useUnicode=true&characterEncoding=utf8&useSSL=false
-spring.datasource.driver-class-name=com.mysql.jdbc.Driver
-spring.datasource.username=xxxxx
-spring.datasource.password=xxxxx
-mybatis.mapper-locations=classpath:mybatis/*.xml
+#spring.datasource.hikari.jdbc-url=jdbc:mysql://localhost:3306/eventBridge?useUnicode=true&characterEncoding=utf8&useSSL=false
+#spring.datasource.hikari.driver-class-name=com.mysql.jdbc.Driver
+#spring.datasource.hikari.username=xxxxx
+#spring.datasource.hikari.password=xxxxx
mybatis.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl
## flyway
spring.flyway.placeholderReplacement=false