Merge pull request #48 from fangjian0423/master
[ISSUE-47] Support ACL & message trace feature
diff --git a/README.md b/README.md
index 38e2f81..b39b1a9 100644
--- a/README.md
+++ b/README.md
@@ -35,6 +35,8 @@
- [x] concurrently consume(broadcasting/clustering)
- [x] one-way transmission
- [x] transaction transmission
+- [x] message trace
+- [x] ACL
- [ ] pull consume
## Quick Start
@@ -188,6 +190,90 @@
>
> see: [RocketMQMessageListener](src/main/java/org/apache/rocketmq/spring/starter/annotation/RocketMQMessageListener.java)
+### Message Trace
+
+We need 2 more configurations for support message trace in producer.
+
+```properties
+## application.properties
+rocketmq.name-server=127.0.0.1:9876
+rocketmq.producer.group=my-group
+
+rocketmq.producer.enable-msg-trace=true
+rocketmq.producer.customized-trace-topic=my-trace-topic
+```
+
+The message trace in consumer should configure in `@RocketMQMessageListener`.
+
+```
+@Service
+@RocketMQMessageListener(
+ topic = "test-topic-1",
+ consumerGroup = "my-consumer_test-topic-1",
+ enableMsgTrace = true,
+ customizedTraceTopic = "my-trace-topic"
+)
+public class MyConsumer implements RocketMQListener<String> {
+ ...
+}
+```
+
+
+> Note:
+>
+> Maybe you need change `127.0.0.1:9876` with your real NameServer address for RocketMQ
+
+> By default, the message track feature of Producer and Consumer is turned on and the trace-topic is RMQ_SYS_TRACE_TOPIC
+> The topic of message trace can be configured with `rocketmq.consumer.customized-trace-topic` configuration item, not required to be configured in each `@RocketMQTransactionListener`
+
+
+### ACL
+
+We need 2 more configurations for support ACL in producer.
+
+```properties
+## application.properties
+rocketmq.name-server=127.0.0.1:9876
+rocketmq.producer.group=my-group
+
+rocketmq.producer.access-key=AK
+rocketmq.producer.secret-key=SK
+```
+Transaction Message should configure AK/SK in `@RocketMQTransactionListener`.
+
+```
+@RocketMQTransactionListener(
+ txProducerGroup = "test,
+ accessKey = "AK",
+ secretKey = "SK"
+)
+class TransactionListenerImpl implements RocketMQLocalTransactionListener {
+ ...
+}
+```
+
+> Note:
+>
+> You do not need to configure AK/SK for each `@RocketMQTransactionListener`, you could configure `rocketmq.producer.access-key` and `rocketmq.producer.secret-key` as default value
+
+The ACL feature in consumer should configure AK/SK in `@RocketMQMessageListener`.
+
+```
+@Service
+@RocketMQMessageListener(
+ topic = "test-topic-1",
+ consumerGroup = "my-consumer_test-topic-1",
+ accessKey = "AK",
+ secretKey = "SK"
+)
+public class MyConsumer implements RocketMQListener<String> {
+ ...
+}
+```
+
+> Note:
+>
+> You do not need to configure AK/SK for each `@RocketMQMessageListener`, you could configure `rocketmq.consumer.access-key` and `rocketmq.consumer.secret-key` as default value
## FAQ
diff --git a/README_zh_CN.md b/README_zh_CN.md
index dc4d741..4763656 100644
--- a/README_zh_CN.md
+++ b/README_zh_CN.md
@@ -26,6 +26,8 @@
- [x] 并发消费(广播/集群)
- [x] one-way方式发送
- [x] 事务方式发送
+- [x] 消息轨迹
+- [x] ACL
- [ ] pull消费
## Quick Start
@@ -180,6 +182,88 @@
>
> see: [RocketMQMessageListener](src/main/java/org/apache/rocketmq/spring/starter/annotation/RocketMQMessageListener.java)
+### 消息轨迹
+
+Producer 端要想使用消息轨迹,需要多配置两个配置项:
+
+```properties
+## application.properties
+rocketmq.name-server=127.0.0.1:9876
+rocketmq.producer.group=my-group
+
+rocketmq.producer.enable-msg-trace=true
+rocketmq.producer.customized-trace-topic=my-trace-topic
+```
+
+Consumer 端消息轨迹的功能需要在 `@RocketMQMessageListener` 中进行配置对应的属性:
+
+```
+@Service
+@RocketMQMessageListener(
+ topic = "test-topic-1",
+ consumerGroup = "my-consumer_test-topic-1",
+ enableMsgTrace = true,
+ customizedTraceTopic = "my-trace-topic"
+)
+public class MyConsumer implements RocketMQListener<String> {
+ ...
+}
+```
+
+> 注意:
+>
+> 默认情况下 Producer 和 Consumer 的消息轨迹功能是开启的且 trace-topic 为 RMQ_SYS_TRACE_TOPIC
+> Consumer 端的消息轨迹 trace-topic 可以在配置文件中配置 `rocketmq.consumer.customized-trace-topic` 配置项,不需要为在每个 `@RocketMQTransactionListener` 配置
+
+
+### ACL
+
+Producer 端要想使用 ACL 功能,需要多配置两个配置项:
+
+```properties
+## application.properties
+rocketmq.name-server=127.0.0.1:9876
+rocketmq.producer.group=my-group
+
+rocketmq.producer.access-key=AK
+rocketmq.producer.secret-key=SK
+```
+
+事务消息的发送需要在 `@RocketMQTransactionListener` 注解里配置上 AK/SK:
+
+```
+@RocketMQTransactionListener(
+ txProducerGroup = "test,
+ accessKey = "AK",
+ secretKey = "SK"
+)
+class TransactionListenerImpl implements RocketMQLocalTransactionListener {
+ ...
+}
+```
+
+> 注意:
+>
+> 可以不用为每个 `@RocketMQTransactionListener` 注解配置 AK/SK,在配置文件中配置 `rocketmq.producer.access-key` 和 `rocketmq.producer.secret-key` 配置项,这两个配置项的值就是默认值
+
+Consumer 端 ACL 功能需要在 `@RocketMQMessageListener` 中进行配置
+
+```
+@Service
+@RocketMQMessageListener(
+ topic = "test-topic-1",
+ consumerGroup = "my-consumer_test-topic-1",
+ accessKey = "AK",
+ secretKey = "SK"
+)
+public class MyConsumer implements RocketMQListener<String> {
+ ...
+}
+```
+
+> 注意:
+>
+> 可以不用为每个 `@RocketMQMessageListener` 注解配置 AK/SK,在配置文件中配置 `rocketmq.consumer.access-key` 和 `rocketmq.consumer.secret-key` 配置项,这两个配置项的值就是默认值
## FAQ
diff --git a/rocketmq-spring-boot-parent/pom.xml b/rocketmq-spring-boot-parent/pom.xml
index a3c698a..6ecb8c3 100644
--- a/rocketmq-spring-boot-parent/pom.xml
+++ b/rocketmq-spring-boot-parent/pom.xml
@@ -40,7 +40,7 @@
<rocketmq.spring.boot.version>2.0.2-SNAPSHOT</rocketmq.spring.boot.version>
- <rocketmq-version>4.3.2</rocketmq-version>
+ <rocketmq-version>4.4.0</rocketmq-version>
<slf4j.version>1.7.25</slf4j.version>
<jackson.version>2.9.7</jackson.version>
@@ -115,6 +115,12 @@
</dependency>
<dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-acl</artifactId>
+ <version>${rocketmq-version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<version>${spring.version}</version>
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/pom.xml b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/pom.xml
new file mode 100644
index 0000000..47fc221
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/pom.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-spring-boot-samples</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>rocketmq-consume-acl-demo</artifactId>
+
+</project>
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ACLStringConsumer.java b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ACLStringConsumer.java
new file mode 100644
index 0000000..fdddffd
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ACLStringConsumer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot;
+
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.springframework.stereotype.Service;
+
+/**
+ * RocketMQMessageListener
+ */
+@Service
+@RocketMQMessageListener(
+ topic = "normal_topic_define_in_Aliware_MQ",
+ consumerGroup = "group_define_in_Aliware_MQ"
+ //accessKey = "AK" // It will read by `rocketmq.consumer.access-key` key
+ //secretKey = "SK" // It will read by `rocketmq.consumer.access-key` key
+)
+public class ACLStringConsumer implements RocketMQListener<String> {
+ @Override
+ public void onMessage(String message) {
+ System.out.printf("------- ACL StringConsumer received: %s \n", message);
+ }
+}
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ACLStringTransactionalConsumer.java b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ACLStringTransactionalConsumer.java
new file mode 100644
index 0000000..d815e8c
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ACLStringTransactionalConsumer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot;
+
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.springframework.stereotype.Service;
+
+/**
+ * StringTransactionalConsumer
+ */
+@Service
+@RocketMQMessageListener(
+ topic = "${demo.rocketmq.transTopic}",
+ consumerGroup = "group_define_in_Aliware_MQ",
+ accessKey = "AK", // if accessKey is empty, it will read by `rocketmq.consumer.access-key` key
+ secretKey = "SK" // if accessKey is empty, it will read by `rocketmq.consumer.secret-key` key
+)
+public class ACLStringTransactionalConsumer implements RocketMQListener<String> {
+ @Override
+ public void onMessage(String message) {
+ System.out.printf("------- ACL StringTransactionalConsumer received: %s \n", message);
+ }
+}
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ConsumerACLApplication.java b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ConsumerACLApplication.java
new file mode 100644
index 0000000..3bf266b
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ConsumerACLApplication.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+/**
+ * ConsumerApplication
+ */
+@SpringBootApplication
+public class ConsumerACLApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(ConsumerACLApplication.class, args);
+ }
+}
+
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/resources/application.properties b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/resources/application.properties
new file mode 100644
index 0000000..7ca3c42
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/resources/application.properties
@@ -0,0 +1,10 @@
+spring.application.name=rocketmq-consume-acl-demo
+
+rocketmq.name-server=Endpoint_of_Aliware_MQ
+rocketmq.topic=normal_topic_define_in_Aliware_MQ
+
+# properties used in application code
+demo.rocketmq.transTopic=transaction_topic_define_in_Aliware_MQ
+
+rocketmq.consumer.access-key=AK
+rocketmq.consumer.secret-key=SK
\ No newline at end of file
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-acl-demo/pom.xml b/rocketmq-spring-boot-samples/rocketmq-produce-acl-demo/pom.xml
new file mode 100644
index 0000000..9193d98
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-acl-demo/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-spring-boot-samples</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>rocketmq-produce-acl-demo</artifactId>
+
+
+</project>
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerACLApplication.java b/rocketmq-spring-boot-samples/rocketmq-produce-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerACLApplication.java
new file mode 100644
index 0000000..0796c5e
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerACLApplication.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.Resource;
+
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
+import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
+import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.apache.rocketmq.spring.support.RocketMQHeaders;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessagingException;
+import org.springframework.messaging.support.MessageBuilder;
+
+/**
+ * Producer, using RocketMQTemplate sends a variety of messages
+ */
+@SpringBootApplication
+public class ProducerACLApplication implements CommandLineRunner {
+ private static final String TX_PGROUP_NAME = "myTxProducerGroup";
+ @Resource
+ private RocketMQTemplate rocketMQTemplate;
+ @Value("${demo.rocketmq.transTopic}")
+ private String springTransTopic;
+ @Value("${demo.rocketmq.topic}")
+ private String springTopic;
+
+ public static void main(String[] args) {
+ SpringApplication.run(ProducerACLApplication.class, args);
+ }
+
+ @Override
+ public void run(String... args) throws Exception {
+ // Send string
+ SendResult sendResult = rocketMQTemplate.syncSend(springTopic + ":acl", "Hello, ACL Msg!");
+ System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);
+
+ // Send string with spring Message
+ sendResult = rocketMQTemplate.syncSend(springTopic, MessageBuilder.withPayload("Hello, World! I'm from spring message & ACL Msg").build());
+ System.out.printf("syncSend2 to topic %s sendResult=%s %n", springTopic, sendResult);
+
+ //Send transactional messages
+ testTransaction();
+ }
+
+
+ private void testTransaction() throws MessagingException {
+ String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
+ for (int i = 0; i < 10; i++) {
+ try {
+
+ Message msg = MessageBuilder.withPayload("Hello RocketMQ " + i).
+ setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build();
+ SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(TX_PGROUP_NAME,
+ springTransTopic + ":" + tags[i % tags.length], msg, null);
+ System.out.printf("------ send Transactional msg body = %s , sendResult=%s %n",
+ msg.getPayload(), sendResult.getSendStatus());
+
+ Thread.sleep(10);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @RocketMQTransactionListener(
+ txProducerGroup = TX_PGROUP_NAME,
+ accessKey = "AK", // if not setting, it will read by `rocketmq.producer.access-key` key
+ secretKey = "SK" // if not setting, it will read by `rocketmq.producer.secret-key` key
+ )
+ class TransactionListenerImpl implements RocketMQLocalTransactionListener {
+ private AtomicInteger transactionIndex = new AtomicInteger(0);
+
+ private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<String, Integer>();
+
+ @Override
+ public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
+ String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
+ System.out.printf("#### executeLocalTransaction is executed, msgTransactionId=%s %n",
+ transId);
+ int value = transactionIndex.getAndIncrement();
+ int status = value % 3;
+ localTrans.put(transId, status);
+ if (status == 0) {
+ // Return local transaction with success(commit), in this case,
+ // this message will not be checked in checkLocalTransaction()
+ System.out.printf(" # COMMIT # Simulating msg %s related local transaction exec succeeded! ### %n", msg.getPayload());
+ return RocketMQLocalTransactionState.COMMIT;
+ }
+
+ if (status == 1) {
+ // Return local transaction with failure(rollback) , in this case,
+ // this message will not be checked in checkLocalTransaction()
+ System.out.printf(" # ROLLBACK # Simulating %s related local transaction exec failed! %n", msg.getPayload());
+ return RocketMQLocalTransactionState.ROLLBACK;
+ }
+
+ System.out.printf(" # UNKNOW # Simulating %s related local transaction exec UNKNOWN! \n");
+ return RocketMQLocalTransactionState.UNKNOWN;
+ }
+
+ @Override
+ public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
+ String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
+ RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT;
+ Integer status = localTrans.get(transId);
+ if (null != status) {
+ switch (status) {
+ case 0:
+ retState = RocketMQLocalTransactionState.UNKNOWN;
+ break;
+ case 1:
+ retState = RocketMQLocalTransactionState.COMMIT;
+ break;
+ case 2:
+ retState = RocketMQLocalTransactionState.COMMIT;
+ break;
+ }
+ }
+ System.out.printf("------ !!! checkLocalTransaction is executed once," +
+ " msgTransactionId=%s, TransactionState=%s status=%s %n",
+ transId, retState, status);
+ return retState;
+ }
+ }
+
+}
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-acl-demo/src/main/resources/application.properties b/rocketmq-spring-boot-samples/rocketmq-produce-acl-demo/src/main/resources/application.properties
new file mode 100644
index 0000000..dd49d0d
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-acl-demo/src/main/resources/application.properties
@@ -0,0 +1,7 @@
+rocketmq.name-server=Endpoint_of_Aliware_MQ
+rocketmq.producer.group=my-group1
+rocketmq.producer.access-key=AK
+rocketmq.producer.secret-key=SK
+
+demo.rocketmq.topic=normal_topic_define_in_Aliware_MQ
+demo.rocketmq.transTopic=transaction_topic_define_in_Aliware_MQ
diff --git a/rocketmq-spring-boot/pom.xml b/rocketmq-spring-boot/pom.xml
index 30d137b..f162f0c 100644
--- a/rocketmq-spring-boot/pom.xml
+++ b/rocketmq-spring-boot/pom.xml
@@ -70,6 +70,10 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-acl</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
</dependency>
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
index 762ea82..608be82 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
@@ -28,6 +28,10 @@
@Documented
public @interface RocketMQMessageListener {
+ String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
+ String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
+ String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
+
/**
* Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve
* load balance. It's required and needs to be globally unique.
@@ -69,4 +73,24 @@
*/
int consumeThreadMax() default 64;
+ /**
+ * The property of "access-key".
+ */
+ String accessKey() default ACCESS_KEY_PLACEHOLDER;
+
+ /**
+ * The property of "secret-key".
+ */
+ String secretKey() default SECRET_KEY_PLACEHOLDER;
+
+ /**
+ * Switch flag instance for message trace.
+ */
+ boolean enableMsgTrace() default true;
+
+ /**
+ * The name value of message trace topic.If you don't config,you can use the default trace topic name.
+ */
+ String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;
+
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQTransactionListener.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQTransactionListener.java
index b355641..f3f874c 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQTransactionListener.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQTransactionListener.java
@@ -66,4 +66,14 @@
* Set ExecutorService params -- blockingQueueSize
*/
int blockingQueueSize() default 2000;
+
+ /**
+ * The property of "access-key"
+ */
+ String accessKey() default "${rocketmq.producer.access-key}";
+
+ /**
+ * The property of "secret-key"
+ */
+ String secretKey() default "${rocketmq.producer.secret-key}";
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
index 78a6eba..2fc034c 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
@@ -18,6 +18,8 @@
package org.apache.rocketmq.spring.autoconfigure;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.MQAdmin;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.config.RocketMQConfigUtils;
@@ -37,6 +39,7 @@
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Role;
import org.springframework.util.Assert;
+import org.springframework.util.StringUtils;
@Configuration
@EnableConfigurationProperties(RocketMQProperties.class)
@@ -56,7 +59,19 @@
Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
Assert.hasText(groupName, "[rocketmq.producer.group] must not be null");
- DefaultMQProducer producer = new DefaultMQProducer(groupName);
+ DefaultMQProducer producer;
+ String ak = rocketMQProperties.getProducer().getAccessKey();
+ String sk = rocketMQProperties.getProducer().getSecretKey();
+ if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
+ producer = new DefaultMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)),
+ rocketMQProperties.getProducer().isEnableMsgTrace(),
+ rocketMQProperties.getProducer().getCustomizedTraceTopic());
+ producer.setVipChannelEnabled(false);
+ } else {
+ producer = new DefaultMQProducer(groupName, rocketMQProperties.getProducer().isEnableMsgTrace(),
+ rocketMQProperties.getProducer().getCustomizedTraceTopic());
+ }
+
producer.setNamesrvAddr(nameServer);
producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());
producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
index 88495e5..c539b8f 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.spring.autoconfigure;
+import org.apache.rocketmq.common.MixAll;
import org.springframework.boot.context.properties.ConfigurationProperties;
@SuppressWarnings("WeakerAccess")
@@ -85,6 +86,26 @@
*/
private int maxMessageSize = 1024 * 1024 * 4;
+ /**
+ * The property of "access-key".
+ */
+ private String accessKey;
+
+ /**
+ * The property of "secret-key".
+ */
+ private String secretKey;
+
+ /**
+ * Switch flag instance for message trace.
+ */
+ private boolean enableMsgTrace = true;
+
+ /**
+ * The name value of message trace topic.If you don't config,you can use the default trace topic name.
+ */
+ private String customizedTraceTopic = MixAll.RMQ_SYS_TRACE_TOPIC;
+
public String getGroup() {
return group;
}
@@ -140,5 +161,37 @@
public void setMaxMessageSize(int maxMessageSize) {
this.maxMessageSize = maxMessageSize;
}
+
+ public String getAccessKey() {
+ return accessKey;
+ }
+
+ public void setAccessKey(String accessKey) {
+ this.accessKey = accessKey;
+ }
+
+ public String getSecretKey() {
+ return secretKey;
+ }
+
+ public void setSecretKey(String secretKey) {
+ this.secretKey = secretKey;
+ }
+
+ public boolean isEnableMsgTrace() {
+ return enableMsgTrace;
+ }
+
+ public void setEnableMsgTrace(boolean enableMsgTrace) {
+ this.enableMsgTrace = enableMsgTrace;
+ }
+
+ public String getCustomizedTraceTopic() {
+ return customizedTraceTopic;
+ }
+
+ public void setCustomizedTraceTopic(String customizedTraceTopic) {
+ this.customizedTraceTopic = customizedTraceTopic;
+ }
}
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/RocketMQTransactionAnnotationProcessor.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/RocketMQTransactionAnnotationProcessor.java
index cd78fdb..a802265 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/RocketMQTransactionAnnotationProcessor.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/RocketMQTransactionAnnotationProcessor.java
@@ -18,28 +18,31 @@
package org.apache.rocketmq.spring.config;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
+import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanCreationException;
-import org.springframework.beans.factory.BeanFactory;
-import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.config.BeanPostProcessor;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.AnnotationUtils;
import java.util.Collections;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RocketMQTransactionAnnotationProcessor
- implements BeanPostProcessor, Ordered, BeanFactoryAware {
+ implements BeanPostProcessor, Ordered, ApplicationContextAware {
private final static Logger log = LoggerFactory.getLogger(RocketMQTransactionAnnotationProcessor.class);
- private BeanFactory beanFactory;
+ private ApplicationContext applicationContext;
private final Set<Class<?>> nonProcessedClasses =
Collections.newSetFromMap(new ConcurrentHashMap<Class<?>, Boolean>(64));
@@ -50,8 +53,8 @@
}
@Override
- public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
- this.beanFactory = beanFactory;
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+ this.applicationContext = applicationContext;
}
@Override
@@ -92,13 +95,22 @@
null);
}
TransactionHandler transactionHandler = new TransactionHandler();
- transactionHandler.setBeanFactory(this.beanFactory);
+ transactionHandler.setBeanFactory(this.applicationContext.getAutowireCapableBeanFactory());
transactionHandler.setName(listener.txProducerGroup());
transactionHandler.setBeanName(bean.getClass().getName());
transactionHandler.setListener((RocketMQLocalTransactionListener) bean);
transactionHandler.setCheckExecutor(listener.corePoolSize(), listener.maximumPoolSize(),
listener.keepAliveTime(), listener.blockingQueueSize());
+ RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(),
+ listener.accessKey(), listener.secretKey());
+
+ if (Objects.nonNull(rpcHook)) {
+ transactionHandler.setRpcHook(rpcHook);
+ } else {
+ log.debug("Access-key or secret-key not configure in " + listener + ".");
+ }
+
transactionHandlerRegistry.registerTransactionHandler(transactionHandler);
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandler.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandler.java
index f6f6955..f6ce61c 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandler.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandler.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.spring.config;
+import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.springframework.beans.factory.BeanFactory;
@@ -30,6 +31,7 @@
private RocketMQLocalTransactionListener bean;
private BeanFactory beanFactory;
private ThreadPoolExecutor checkExecutor;
+ private RPCHook rpcHook;
public String getBeanName() {
return beanName;
@@ -47,6 +49,14 @@
this.name = name;
}
+ public RPCHook getRpcHook() {
+ return rpcHook;
+ }
+
+ public void setRpcHook(RPCHook rpcHook) {
+ this.rpcHook = rpcHook;
+ }
+
public BeanFactory getBeanFactory() {
return beanFactory;
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandlerRegistry.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandlerRegistry.java
index 4365e41..7307a31 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandlerRegistry.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandlerRegistry.java
@@ -47,6 +47,6 @@
}
listenerContainers.add(handler.getName());
- rocketMQTemplate.createAndStartTransactionMQProducer(handler.getName(), handler.getListener(), handler.getCheckExecutor());
+ rocketMQTemplate.createAndStartTransactionMQProducer(handler.getName(), handler.getListener(), handler.getCheckExecutor(), handler.getRpcHook());
}
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index 0eb1053..2f29d7a 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -27,6 +27,7 @@
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
+import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.config.RocketMQConfigUtils;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.beans.factory.DisposableBean;
@@ -565,18 +566,20 @@
* @param txProducerGroup Producer (group) name, unique for each producer
* @param transactionListener TransactoinListener impl class
* @param executorService Nullable.
+ * @param rpcHook Nullable.
* @return true if producer is created and started; false if the named producer already exists in cache.
* @throws MessagingException
*/
- public boolean createAndStartTransactionMQProducer(String txProducerGroup, RocketMQLocalTransactionListener transactionListener,
- ExecutorService executorService) throws MessagingException {
+ public boolean createAndStartTransactionMQProducer(String txProducerGroup,
+ RocketMQLocalTransactionListener transactionListener,
+ ExecutorService executorService, RPCHook rpcHook) throws MessagingException {
txProducerGroup = getTxProducerGroupName(txProducerGroup);
if (cache.containsKey(txProducerGroup)) {
log.info(String.format("get TransactionMQProducer '%s' from cache", txProducerGroup));
return false;
}
- TransactionMQProducer txProducer = createTransactionMQProducer(txProducerGroup, transactionListener, executorService);
+ TransactionMQProducer txProducer = createTransactionMQProducer(txProducerGroup, transactionListener, executorService, rpcHook);
try {
txProducer.start();
cache.put(txProducerGroup, txProducer);
@@ -587,11 +590,19 @@
return true;
}
- private TransactionMQProducer createTransactionMQProducer(String name, RocketMQLocalTransactionListener transactionListener,
- ExecutorService executorService) {
+ private TransactionMQProducer createTransactionMQProducer(String name,
+ RocketMQLocalTransactionListener transactionListener,
+ ExecutorService executorService, RPCHook rpcHook) {
Assert.notNull(producer, "Property 'producer' is required");
Assert.notNull(transactionListener, "Parameter 'transactionListener' is required");
- TransactionMQProducer txProducer = new TransactionMQProducer(name);
+ TransactionMQProducer txProducer;
+ if (Objects.nonNull(rpcHook)) {
+ txProducer = new TransactionMQProducer(name, rpcHook);
+ txProducer.setVipChannelEnabled(false);
+ txProducer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, name));
+ } else {
+ txProducer = new TransactionMQProducer(name);
+ }
txProducer.setTransactionListener(RocketMQUtil.convert(transactionListener));
txProducer.setNamesrvAddr(producer.getNamesrvAddr());
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
index a82a05c..50cd3a0 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
@@ -26,8 +26,10 @@
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
@@ -37,7 +39,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
+import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.util.Assert;
@@ -48,9 +53,12 @@
import java.util.Objects;
@SuppressWarnings("WeakerAccess")
-public class DefaultRocketMQListenerContainer implements InitializingBean, RocketMQListenerContainer, SmartLifecycle {
+public class DefaultRocketMQListenerContainer implements InitializingBean,
+ RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {
private final static Logger log = LoggerFactory.getLogger(DefaultRocketMQListenerContainer.class);
+ private ApplicationContext applicationContext;
+
private long suspendCurrentQueueTimeMillis = 1000;
/**
@@ -271,6 +279,11 @@
}
@Override
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+ this.applicationContext = applicationContext;
+ }
+
+ @Override
public String toString() {
return "DefaultRocketMQListenerContainer{" +
"consumerGroup='" + consumerGroup + '\'' +
@@ -385,7 +398,22 @@
Assert.notNull(nameServer, "Property 'nameServer' is required");
Assert.notNull(topic, "Property 'topic' is required");
- consumer = new DefaultMQPushConsumer(consumerGroup);
+ RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(),
+ this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey());
+ boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace();
+ if (Objects.nonNull(rpcHook)) {
+ consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),
+ enableMsgTrace, this.applicationContext.getEnvironment().
+ resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
+ consumer.setVipChannelEnabled(false);
+ consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, consumerGroup));
+ } else {
+ log.debug("Access-key or secret-key not configure in " + this + ".");
+ consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,
+ this.applicationContext.getEnvironment().
+ resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
+ }
+
consumer.setNamesrvAddr(nameServer);
consumer.setConsumeThreadMax(consumeThreadMax);
if (consumeThreadMax < consumer.getConsumeThreadMin()) {
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
index 69f2e95..16a6bcf 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
@@ -17,13 +17,17 @@
package org.apache.rocketmq.spring.support;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
+import org.springframework.core.env.Environment;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.MessageBuilder;
@@ -167,4 +171,30 @@
return rocketMsg;
}
+
+ public static RPCHook getRPCHookByAkSk(Environment env, String accessKeyOrExpr, String secretKeyOrExpr) {
+ String ak, sk;
+ try {
+ ak = env.resolveRequiredPlaceholders(accessKeyOrExpr);
+ sk = env.resolveRequiredPlaceholders(secretKeyOrExpr);
+ } catch (Exception e) {
+ // Ignore it
+ ak = null;
+ sk = null;
+ }
+ if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
+ return new AclClientRPCHook(new SessionCredentials(ak, sk));
+ }
+ return null;
+ }
+
+ public static String getInstanceName(RPCHook rpcHook, String identify) {
+ String separator = "|";
+ StringBuilder instanceName = new StringBuilder();
+ SessionCredentials sessionCredentials = ((AclClientRPCHook) rpcHook).getSessionCredentials();
+ instanceName.append(sessionCredentials.getAccessKey())
+ .append(separator).append(sessionCredentials.getSecretKey())
+ .append(separator).append(identify);
+ return instanceName.toString();
+ }
}