Merge pull request #48 from fangjian0423/master

[ISSUE-47] Support ACL & message trace feature
diff --git a/README.md b/README.md
index 38e2f81..b39b1a9 100644
--- a/README.md
+++ b/README.md
@@ -35,6 +35,8 @@
 - [x] concurrently consume(broadcasting/clustering)
 - [x] one-way transmission
 - [x] transaction transmission
+- [x] message trace
+- [x] ACL
 - [ ] pull consume
 
 ## Quick Start
@@ -188,6 +190,90 @@
 >
 > see: [RocketMQMessageListener](src/main/java/org/apache/rocketmq/spring/starter/annotation/RocketMQMessageListener.java)
 
+### Message Trace
+
+We need 2 more configurations for support message trace in producer.
+
+```properties
+## application.properties
+rocketmq.name-server=127.0.0.1:9876
+rocketmq.producer.group=my-group
+
+rocketmq.producer.enable-msg-trace=true
+rocketmq.producer.customized-trace-topic=my-trace-topic
+```
+
+The message trace in consumer should configure in `@RocketMQMessageListener`.
+
+```
+@Service
+@RocketMQMessageListener(
+    topic = "test-topic-1", 
+    consumerGroup = "my-consumer_test-topic-1",
+    enableMsgTrace = true,
+    customizedTraceTopic = "my-trace-topic"
+)
+public class MyConsumer implements RocketMQListener<String> {
+    ...
+}
+```
+
+
+> Note:
+> 
+> Maybe you need change `127.0.0.1:9876` with your real NameServer address for RocketMQ
+
+> By default, the message track feature of Producer and Consumer is turned on and the trace-topic is RMQ_SYS_TRACE_TOPIC
+> The topic of message trace can be configured with `rocketmq.consumer.customized-trace-topic` configuration item, not required to be configured in each `@RocketMQTransactionListener`
+
+
+### ACL
+
+We need 2 more configurations for support ACL in producer.
+
+```properties
+## application.properties
+rocketmq.name-server=127.0.0.1:9876
+rocketmq.producer.group=my-group
+
+rocketmq.producer.access-key=AK
+rocketmq.producer.secret-key=SK
+```
+Transaction Message should configure AK/SK in `@RocketMQTransactionListener`. 
+
+```
+@RocketMQTransactionListener(
+    txProducerGroup = "test,
+    accessKey = "AK",
+    secretKey = "SK"
+)
+class TransactionListenerImpl implements RocketMQLocalTransactionListener {
+    ...
+}
+```
+
+> Note:
+> 
+> You do not need to configure AK/SK for each `@RocketMQTransactionListener`, you could configure `rocketmq.producer.access-key` and `rocketmq.producer.secret-key` as default value
+
+The ACL feature in consumer should configure AK/SK in `@RocketMQMessageListener`.
+
+```
+@Service
+@RocketMQMessageListener(
+    topic = "test-topic-1", 
+    consumerGroup = "my-consumer_test-topic-1",
+    accessKey = "AK",
+    secretKey = "SK"
+)
+public class MyConsumer implements RocketMQListener<String> {
+    ...
+}
+```
+
+> Note:
+> 
+> You do not need to configure AK/SK for each `@RocketMQMessageListener`, you could configure `rocketmq.consumer.access-key` and `rocketmq.consumer.secret-key` as default value
 
 ## FAQ
 
diff --git a/README_zh_CN.md b/README_zh_CN.md
index dc4d741..4763656 100644
--- a/README_zh_CN.md
+++ b/README_zh_CN.md
@@ -26,6 +26,8 @@
 - [x] 并发消费(广播/集群)
 - [x] one-way方式发送
 - [x] 事务方式发送
+- [x] 消息轨迹
+- [x] ACL
 - [ ] pull消费
 
 ## Quick Start
@@ -180,6 +182,88 @@
 >
 > see: [RocketMQMessageListener](src/main/java/org/apache/rocketmq/spring/starter/annotation/RocketMQMessageListener.java) 
 
+### 消息轨迹
+
+Producer 端要想使用消息轨迹,需要多配置两个配置项:
+
+```properties
+## application.properties
+rocketmq.name-server=127.0.0.1:9876
+rocketmq.producer.group=my-group
+
+rocketmq.producer.enable-msg-trace=true
+rocketmq.producer.customized-trace-topic=my-trace-topic
+```
+
+Consumer 端消息轨迹的功能需要在 `@RocketMQMessageListener` 中进行配置对应的属性:
+
+```
+@Service
+@RocketMQMessageListener(
+    topic = "test-topic-1", 
+    consumerGroup = "my-consumer_test-topic-1",
+    enableMsgTrace = true,
+    customizedTraceTopic = "my-trace-topic"
+)
+public class MyConsumer implements RocketMQListener<String> {
+    ...
+}
+```
+
+> 注意:
+> 
+> 默认情况下 Producer 和 Consumer 的消息轨迹功能是开启的且 trace-topic 为 RMQ_SYS_TRACE_TOPIC
+> Consumer 端的消息轨迹 trace-topic 可以在配置文件中配置 `rocketmq.consumer.customized-trace-topic` 配置项,不需要为在每个 `@RocketMQTransactionListener` 配置
+
+
+### ACL
+
+Producer 端要想使用 ACL 功能,需要多配置两个配置项:
+
+```properties
+## application.properties
+rocketmq.name-server=127.0.0.1:9876
+rocketmq.producer.group=my-group
+
+rocketmq.producer.access-key=AK
+rocketmq.producer.secret-key=SK
+```
+
+事务消息的发送需要在 `@RocketMQTransactionListener` 注解里配置上 AK/SK:
+
+```
+@RocketMQTransactionListener(
+    txProducerGroup = "test,
+    accessKey = "AK",
+    secretKey = "SK"
+)
+class TransactionListenerImpl implements RocketMQLocalTransactionListener {
+    ...
+}
+```
+
+> 注意:
+> 
+> 可以不用为每个 `@RocketMQTransactionListener` 注解配置 AK/SK,在配置文件中配置 `rocketmq.producer.access-key` 和 `rocketmq.producer.secret-key` 配置项,这两个配置项的值就是默认值
+
+Consumer 端 ACL 功能需要在 `@RocketMQMessageListener` 中进行配置
+
+```
+@Service
+@RocketMQMessageListener(
+    topic = "test-topic-1", 
+    consumerGroup = "my-consumer_test-topic-1",
+    accessKey = "AK",
+    secretKey = "SK"
+)
+public class MyConsumer implements RocketMQListener<String> {
+    ...
+}
+```
+
+> 注意:
+> 
+> 可以不用为每个 `@RocketMQMessageListener` 注解配置 AK/SK,在配置文件中配置 `rocketmq.consumer.access-key` 和 `rocketmq.consumer.secret-key` 配置项,这两个配置项的值就是默认值
 
 ## FAQ
 
diff --git a/rocketmq-spring-boot-parent/pom.xml b/rocketmq-spring-boot-parent/pom.xml
index a3c698a..6ecb8c3 100644
--- a/rocketmq-spring-boot-parent/pom.xml
+++ b/rocketmq-spring-boot-parent/pom.xml
@@ -40,7 +40,7 @@
 
         <rocketmq.spring.boot.version>2.0.2-SNAPSHOT</rocketmq.spring.boot.version>
 
-        <rocketmq-version>4.3.2</rocketmq-version>
+        <rocketmq-version>4.4.0</rocketmq-version>
         <slf4j.version>1.7.25</slf4j.version>
         <jackson.version>2.9.7</jackson.version>
 
@@ -115,6 +115,12 @@
             </dependency>
 
             <dependency>
+                <groupId>org.apache.rocketmq</groupId>
+                <artifactId>rocketmq-acl</artifactId>
+                <version>${rocketmq-version}</version>
+            </dependency>
+
+            <dependency>
                 <groupId>org.springframework</groupId>
                 <artifactId>spring-messaging</artifactId>
                 <version>${spring.version}</version>
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/pom.xml b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/pom.xml
new file mode 100644
index 0000000..47fc221
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/pom.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.rocketmq</groupId>
+        <artifactId>rocketmq-spring-boot-samples</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+    
+    <artifactId>rocketmq-consume-acl-demo</artifactId>
+
+</project>
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ACLStringConsumer.java b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ACLStringConsumer.java
new file mode 100644
index 0000000..fdddffd
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ACLStringConsumer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot;
+
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.springframework.stereotype.Service;
+
+/**
+ * RocketMQMessageListener
+ */
+@Service
+@RocketMQMessageListener(
+    topic = "normal_topic_define_in_Aliware_MQ",
+    consumerGroup = "group_define_in_Aliware_MQ"
+    //accessKey = "AK" // It will read by `rocketmq.consumer.access-key` key
+    //secretKey = "SK" // It will read by `rocketmq.consumer.access-key` key
+)
+public class ACLStringConsumer implements RocketMQListener<String> {
+    @Override
+    public void onMessage(String message) {
+        System.out.printf("------- ACL StringConsumer received: %s \n", message);
+    }
+}
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ACLStringTransactionalConsumer.java b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ACLStringTransactionalConsumer.java
new file mode 100644
index 0000000..d815e8c
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ACLStringTransactionalConsumer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot;
+
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.springframework.stereotype.Service;
+
+/**
+ * StringTransactionalConsumer
+ */
+@Service
+@RocketMQMessageListener(
+    topic = "${demo.rocketmq.transTopic}",
+    consumerGroup = "group_define_in_Aliware_MQ",
+    accessKey = "AK", // if accessKey is empty, it will read by `rocketmq.consumer.access-key` key
+    secretKey = "SK"  // if accessKey is empty, it will read by `rocketmq.consumer.secret-key` key
+)
+public class ACLStringTransactionalConsumer implements RocketMQListener<String> {
+    @Override
+    public void onMessage(String message) {
+        System.out.printf("------- ACL StringTransactionalConsumer received: %s \n", message);
+    }
+}
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ConsumerACLApplication.java b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ConsumerACLApplication.java
new file mode 100644
index 0000000..3bf266b
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ConsumerACLApplication.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+/**
+ * ConsumerApplication
+ */
+@SpringBootApplication
+public class ConsumerACLApplication {
+
+    public static void main(String[] args) {
+        SpringApplication.run(ConsumerACLApplication.class, args);
+    }
+}
+
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/resources/application.properties b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/resources/application.properties
new file mode 100644
index 0000000..7ca3c42
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/resources/application.properties
@@ -0,0 +1,10 @@
+spring.application.name=rocketmq-consume-acl-demo
+
+rocketmq.name-server=Endpoint_of_Aliware_MQ
+rocketmq.topic=normal_topic_define_in_Aliware_MQ
+
+# properties used in application code
+demo.rocketmq.transTopic=transaction_topic_define_in_Aliware_MQ
+
+rocketmq.consumer.access-key=AK
+rocketmq.consumer.secret-key=SK
\ No newline at end of file
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-acl-demo/pom.xml b/rocketmq-spring-boot-samples/rocketmq-produce-acl-demo/pom.xml
new file mode 100644
index 0000000..9193d98
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-acl-demo/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.rocketmq</groupId>
+        <artifactId>rocketmq-spring-boot-samples</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>rocketmq-produce-acl-demo</artifactId>
+
+
+</project>
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerACLApplication.java b/rocketmq-spring-boot-samples/rocketmq-produce-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerACLApplication.java
new file mode 100644
index 0000000..0796c5e
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerACLApplication.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.Resource;
+
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
+import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
+import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.apache.rocketmq.spring.support.RocketMQHeaders;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessagingException;
+import org.springframework.messaging.support.MessageBuilder;
+
+/**
+ * Producer, using RocketMQTemplate sends a variety of messages
+ */
+@SpringBootApplication
+public class ProducerACLApplication implements CommandLineRunner {
+    private static final String TX_PGROUP_NAME = "myTxProducerGroup";
+    @Resource
+    private RocketMQTemplate rocketMQTemplate;
+    @Value("${demo.rocketmq.transTopic}")
+    private String springTransTopic;
+    @Value("${demo.rocketmq.topic}")
+    private String springTopic;
+
+    public static void main(String[] args) {
+        SpringApplication.run(ProducerACLApplication.class, args);
+    }
+
+    @Override
+    public void run(String... args) throws Exception {
+        // Send string
+        SendResult sendResult = rocketMQTemplate.syncSend(springTopic + ":acl", "Hello, ACL Msg!");
+        System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);
+
+        // Send string with spring Message
+        sendResult = rocketMQTemplate.syncSend(springTopic, MessageBuilder.withPayload("Hello, World! I'm from spring message & ACL Msg").build());
+        System.out.printf("syncSend2 to topic %s sendResult=%s %n", springTopic, sendResult);
+
+         //Send transactional messages
+        testTransaction();
+    }
+
+
+    private void testTransaction() throws MessagingException {
+        String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
+        for (int i = 0; i < 10; i++) {
+            try {
+
+                Message msg = MessageBuilder.withPayload("Hello RocketMQ " + i).
+                    setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build();
+                SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(TX_PGROUP_NAME,
+                    springTransTopic + ":" + tags[i % tags.length], msg, null);
+                System.out.printf("------ send Transactional msg body = %s , sendResult=%s %n",
+                    msg.getPayload(), sendResult.getSendStatus());
+
+                Thread.sleep(10);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @RocketMQTransactionListener(
+        txProducerGroup = TX_PGROUP_NAME,
+        accessKey = "AK", // if not setting, it will read by `rocketmq.producer.access-key` key
+        secretKey = "SK"  // if not setting, it will read by `rocketmq.producer.secret-key` key
+    )
+    class TransactionListenerImpl implements RocketMQLocalTransactionListener {
+        private AtomicInteger transactionIndex = new AtomicInteger(0);
+
+        private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<String, Integer>();
+
+        @Override
+        public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
+            String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
+            System.out.printf("#### executeLocalTransaction is executed, msgTransactionId=%s %n",
+                transId);
+            int value = transactionIndex.getAndIncrement();
+            int status = value % 3;
+            localTrans.put(transId, status);
+            if (status == 0) {
+                // Return local transaction with success(commit), in this case,
+                // this message will not be checked in checkLocalTransaction()
+                System.out.printf("    # COMMIT # Simulating msg %s related local transaction exec succeeded! ### %n", msg.getPayload());
+                return RocketMQLocalTransactionState.COMMIT;
+            }
+
+            if (status == 1) {
+                // Return local transaction with failure(rollback) , in this case,
+                // this message will not be checked in checkLocalTransaction()
+                System.out.printf("    # ROLLBACK # Simulating %s related local transaction exec failed! %n", msg.getPayload());
+                return RocketMQLocalTransactionState.ROLLBACK;
+            }
+
+            System.out.printf("    # UNKNOW # Simulating %s related local transaction exec UNKNOWN! \n");
+            return RocketMQLocalTransactionState.UNKNOWN;
+        }
+
+        @Override
+        public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
+            String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
+            RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT;
+            Integer status = localTrans.get(transId);
+            if (null != status) {
+                switch (status) {
+                    case 0:
+                        retState = RocketMQLocalTransactionState.UNKNOWN;
+                        break;
+                    case 1:
+                        retState = RocketMQLocalTransactionState.COMMIT;
+                        break;
+                    case 2:
+                        retState = RocketMQLocalTransactionState.COMMIT;
+                        break;
+                }
+            }
+            System.out.printf("------ !!! checkLocalTransaction is executed once," +
+                    " msgTransactionId=%s, TransactionState=%s status=%s %n",
+                transId, retState, status);
+            return retState;
+        }
+    }
+
+}
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-acl-demo/src/main/resources/application.properties b/rocketmq-spring-boot-samples/rocketmq-produce-acl-demo/src/main/resources/application.properties
new file mode 100644
index 0000000..dd49d0d
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-acl-demo/src/main/resources/application.properties
@@ -0,0 +1,7 @@
+rocketmq.name-server=Endpoint_of_Aliware_MQ
+rocketmq.producer.group=my-group1
+rocketmq.producer.access-key=AK
+rocketmq.producer.secret-key=SK
+
+demo.rocketmq.topic=normal_topic_define_in_Aliware_MQ
+demo.rocketmq.transTopic=transaction_topic_define_in_Aliware_MQ
diff --git a/rocketmq-spring-boot/pom.xml b/rocketmq-spring-boot/pom.xml
index 30d137b..f162f0c 100644
--- a/rocketmq-spring-boot/pom.xml
+++ b/rocketmq-spring-boot/pom.xml
@@ -70,6 +70,10 @@
             </exclusions>
         </dependency>
         <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-acl</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.springframework</groupId>
             <artifactId>spring-messaging</artifactId>
         </dependency>
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
index 762ea82..608be82 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
@@ -28,6 +28,10 @@
 @Documented
 public @interface RocketMQMessageListener {
 
+    String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
+    String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
+    String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
+
     /**
      * Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve
      * load balance. It's required and needs to be globally unique.
@@ -69,4 +73,24 @@
      */
     int consumeThreadMax() default 64;
 
+    /**
+     * The property of "access-key".
+     */
+    String accessKey() default ACCESS_KEY_PLACEHOLDER;
+
+    /**
+     * The property of "secret-key".
+     */
+    String secretKey() default SECRET_KEY_PLACEHOLDER;
+
+    /**
+     * Switch flag instance for message trace.
+     */
+    boolean enableMsgTrace() default true;
+
+    /**
+     * The name value of message trace topic.If you don't config,you can use the default trace topic name.
+     */
+    String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;
+
 }
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQTransactionListener.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQTransactionListener.java
index b355641..f3f874c 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQTransactionListener.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQTransactionListener.java
@@ -66,4 +66,14 @@
      * Set ExecutorService params -- blockingQueueSize
      */
     int blockingQueueSize() default 2000;
+
+    /**
+     * The property of "access-key"
+     */
+    String accessKey() default "${rocketmq.producer.access-key}";
+
+    /**
+     * The property of "secret-key"
+     */
+    String secretKey() default "${rocketmq.producer.secret-key}";
 }
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
index 78a6eba..2fc034c 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
@@ -18,6 +18,8 @@
 package org.apache.rocketmq.spring.autoconfigure;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
 import org.apache.rocketmq.client.MQAdmin;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.spring.config.RocketMQConfigUtils;
@@ -37,6 +39,7 @@
 import org.springframework.context.annotation.Import;
 import org.springframework.context.annotation.Role;
 import org.springframework.util.Assert;
+import org.springframework.util.StringUtils;
 
 @Configuration
 @EnableConfigurationProperties(RocketMQProperties.class)
@@ -56,7 +59,19 @@
         Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
         Assert.hasText(groupName, "[rocketmq.producer.group] must not be null");
 
-        DefaultMQProducer producer = new DefaultMQProducer(groupName);
+        DefaultMQProducer producer;
+        String ak = rocketMQProperties.getProducer().getAccessKey();
+        String sk = rocketMQProperties.getProducer().getSecretKey();
+        if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
+            producer = new DefaultMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)),
+                rocketMQProperties.getProducer().isEnableMsgTrace(),
+                rocketMQProperties.getProducer().getCustomizedTraceTopic());
+            producer.setVipChannelEnabled(false);
+        } else {
+            producer = new DefaultMQProducer(groupName, rocketMQProperties.getProducer().isEnableMsgTrace(),
+                rocketMQProperties.getProducer().getCustomizedTraceTopic());
+        }
+
         producer.setNamesrvAddr(nameServer);
         producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());
         producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
index 88495e5..c539b8f 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.spring.autoconfigure;
 
+import org.apache.rocketmq.common.MixAll;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 
 @SuppressWarnings("WeakerAccess")
@@ -85,6 +86,26 @@
          */
         private int maxMessageSize = 1024 * 1024 * 4;
 
+        /**
+         * The property of "access-key".
+         */
+        private String accessKey;
+
+        /**
+         * The property of "secret-key".
+         */
+        private String secretKey;
+
+        /**
+         * Switch flag instance for message trace.
+         */
+        private boolean enableMsgTrace = true;
+
+        /**
+         * The name value of message trace topic.If you don't config,you can use the default trace topic name.
+         */
+        private String customizedTraceTopic = MixAll.RMQ_SYS_TRACE_TOPIC;
+
         public String getGroup() {
             return group;
         }
@@ -140,5 +161,37 @@
         public void setMaxMessageSize(int maxMessageSize) {
             this.maxMessageSize = maxMessageSize;
         }
+
+        public String getAccessKey() {
+            return accessKey;
+        }
+
+        public void setAccessKey(String accessKey) {
+            this.accessKey = accessKey;
+        }
+
+        public String getSecretKey() {
+            return secretKey;
+        }
+
+        public void setSecretKey(String secretKey) {
+            this.secretKey = secretKey;
+        }
+
+        public boolean isEnableMsgTrace() {
+            return enableMsgTrace;
+        }
+
+        public void setEnableMsgTrace(boolean enableMsgTrace) {
+            this.enableMsgTrace = enableMsgTrace;
+        }
+
+        public String getCustomizedTraceTopic() {
+            return customizedTraceTopic;
+        }
+
+        public void setCustomizedTraceTopic(String customizedTraceTopic) {
+            this.customizedTraceTopic = customizedTraceTopic;
+        }
     }
 }
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/RocketMQTransactionAnnotationProcessor.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/RocketMQTransactionAnnotationProcessor.java
index cd78fdb..a802265 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/RocketMQTransactionAnnotationProcessor.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/RocketMQTransactionAnnotationProcessor.java
@@ -18,28 +18,31 @@
 package org.apache.rocketmq.spring.config;
 
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
 import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
+import org.apache.rocketmq.spring.support.RocketMQUtil;
 import org.springframework.aop.support.AopUtils;
 import org.springframework.beans.BeansException;
 import org.springframework.beans.factory.BeanCreationException;
-import org.springframework.beans.factory.BeanFactory;
-import org.springframework.beans.factory.BeanFactoryAware;
 import org.springframework.beans.factory.config.BeanPostProcessor;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
 import org.springframework.core.Ordered;
 import org.springframework.core.annotation.AnnotationUtils;
 
 import java.util.Collections;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class RocketMQTransactionAnnotationProcessor
-    implements BeanPostProcessor, Ordered, BeanFactoryAware {
+    implements BeanPostProcessor, Ordered, ApplicationContextAware {
     private final static Logger log = LoggerFactory.getLogger(RocketMQTransactionAnnotationProcessor.class);
 
-    private BeanFactory beanFactory;
+    private ApplicationContext applicationContext;
     private final Set<Class<?>> nonProcessedClasses =
         Collections.newSetFromMap(new ConcurrentHashMap<Class<?>, Boolean>(64));
 
@@ -50,8 +53,8 @@
     }
 
     @Override
-    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
-        this.beanFactory = beanFactory;
+    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+        this.applicationContext = applicationContext;
     }
 
     @Override
@@ -92,13 +95,22 @@
                 null);
         }
         TransactionHandler transactionHandler = new TransactionHandler();
-        transactionHandler.setBeanFactory(this.beanFactory);
+        transactionHandler.setBeanFactory(this.applicationContext.getAutowireCapableBeanFactory());
         transactionHandler.setName(listener.txProducerGroup());
         transactionHandler.setBeanName(bean.getClass().getName());
         transactionHandler.setListener((RocketMQLocalTransactionListener) bean);
         transactionHandler.setCheckExecutor(listener.corePoolSize(), listener.maximumPoolSize(),
                 listener.keepAliveTime(), listener.blockingQueueSize());
 
+        RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(),
+            listener.accessKey(), listener.secretKey());
+
+        if (Objects.nonNull(rpcHook)) {
+            transactionHandler.setRpcHook(rpcHook);
+        } else {
+            log.debug("Access-key or secret-key not configure in " + listener + ".");
+        }
+
         transactionHandlerRegistry.registerTransactionHandler(transactionHandler);
     }
 
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandler.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandler.java
index f6f6955..f6ce61c 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandler.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.spring.config;
 
+import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
 import org.springframework.beans.factory.BeanFactory;
 
@@ -30,6 +31,7 @@
     private RocketMQLocalTransactionListener bean;
     private BeanFactory beanFactory;
     private ThreadPoolExecutor checkExecutor;
+    private RPCHook rpcHook;
 
     public String getBeanName() {
         return beanName;
@@ -47,6 +49,14 @@
         this.name = name;
     }
 
+    public RPCHook getRpcHook() {
+        return rpcHook;
+    }
+
+    public void setRpcHook(RPCHook rpcHook) {
+        this.rpcHook = rpcHook;
+    }
+
     public BeanFactory getBeanFactory() {
         return beanFactory;
     }
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandlerRegistry.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandlerRegistry.java
index 4365e41..7307a31 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandlerRegistry.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandlerRegistry.java
@@ -47,6 +47,6 @@
         }
         listenerContainers.add(handler.getName());
 
-        rocketMQTemplate.createAndStartTransactionMQProducer(handler.getName(), handler.getListener(), handler.getCheckExecutor());
+        rocketMQTemplate.createAndStartTransactionMQProducer(handler.getName(), handler.getListener(), handler.getCheckExecutor(), handler.getRpcHook());
     }
 }
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index 0eb1053..2f29d7a 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -27,6 +27,7 @@
 import org.apache.rocketmq.client.producer.TransactionMQProducer;
 import org.apache.rocketmq.client.producer.TransactionSendResult;
 import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
+import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.spring.config.RocketMQConfigUtils;
 import org.apache.rocketmq.spring.support.RocketMQUtil;
 import org.springframework.beans.factory.DisposableBean;
@@ -565,18 +566,20 @@
      * @param txProducerGroup     Producer (group) name, unique for each producer
      * @param transactionListener TransactoinListener impl class
      * @param executorService     Nullable.
+     * @param rpcHook Nullable.
      * @return true if producer is created and started; false if the named producer already exists in cache.
      * @throws MessagingException
      */
-    public boolean createAndStartTransactionMQProducer(String txProducerGroup, RocketMQLocalTransactionListener transactionListener,
-                                                       ExecutorService executorService) throws MessagingException {
+    public boolean createAndStartTransactionMQProducer(String txProducerGroup,
+                                                       RocketMQLocalTransactionListener transactionListener,
+                                                       ExecutorService executorService, RPCHook rpcHook) throws MessagingException {
         txProducerGroup = getTxProducerGroupName(txProducerGroup);
         if (cache.containsKey(txProducerGroup)) {
             log.info(String.format("get TransactionMQProducer '%s' from cache", txProducerGroup));
             return false;
         }
 
-        TransactionMQProducer txProducer = createTransactionMQProducer(txProducerGroup, transactionListener, executorService);
+        TransactionMQProducer txProducer = createTransactionMQProducer(txProducerGroup, transactionListener, executorService, rpcHook);
         try {
             txProducer.start();
             cache.put(txProducerGroup, txProducer);
@@ -587,11 +590,19 @@
         return true;
     }
 
-    private TransactionMQProducer createTransactionMQProducer(String name, RocketMQLocalTransactionListener transactionListener,
-                                                              ExecutorService executorService) {
+    private TransactionMQProducer createTransactionMQProducer(String name,
+                                                              RocketMQLocalTransactionListener transactionListener,
+                                                              ExecutorService executorService, RPCHook rpcHook) {
         Assert.notNull(producer, "Property 'producer' is required");
         Assert.notNull(transactionListener, "Parameter 'transactionListener' is required");
-        TransactionMQProducer txProducer = new TransactionMQProducer(name);
+        TransactionMQProducer txProducer;
+        if (Objects.nonNull(rpcHook)) {
+            txProducer = new TransactionMQProducer(name, rpcHook);
+            txProducer.setVipChannelEnabled(false);
+            txProducer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, name));
+        } else {
+            txProducer = new TransactionMQProducer(name);
+        }
         txProducer.setTransactionListener(RocketMQUtil.convert(transactionListener));
 
         txProducer.setNamesrvAddr(producer.getNamesrvAddr());
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
index a82a05c..50cd3a0 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
@@ -26,8 +26,10 @@
 import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.spring.annotation.ConsumeMode;
 import org.apache.rocketmq.spring.annotation.MessageModel;
 import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
@@ -37,7 +39,10 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.aop.framework.AopProxyUtils;
+import org.springframework.beans.BeansException;
 import org.springframework.beans.factory.InitializingBean;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
 import org.springframework.context.SmartLifecycle;
 import org.springframework.util.Assert;
 
@@ -48,9 +53,12 @@
 import java.util.Objects;
 
 @SuppressWarnings("WeakerAccess")
-public class DefaultRocketMQListenerContainer implements InitializingBean, RocketMQListenerContainer, SmartLifecycle {
+public class DefaultRocketMQListenerContainer implements InitializingBean,
+    RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {
     private final static Logger log = LoggerFactory.getLogger(DefaultRocketMQListenerContainer.class);
 
+    private ApplicationContext applicationContext;
+
     private long suspendCurrentQueueTimeMillis = 1000;
 
     /**
@@ -271,6 +279,11 @@
     }
 
     @Override
+    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+        this.applicationContext = applicationContext;
+    }
+
+    @Override
     public String toString() {
         return "DefaultRocketMQListenerContainer{" +
             "consumerGroup='" + consumerGroup + '\'' +
@@ -385,7 +398,22 @@
         Assert.notNull(nameServer, "Property 'nameServer' is required");
         Assert.notNull(topic, "Property 'topic' is required");
 
-        consumer = new DefaultMQPushConsumer(consumerGroup);
+        RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(),
+            this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey());
+        boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace();
+        if (Objects.nonNull(rpcHook)) {
+            consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),
+                enableMsgTrace, this.applicationContext.getEnvironment().
+                resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
+            consumer.setVipChannelEnabled(false);
+            consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, consumerGroup));
+        } else {
+            log.debug("Access-key or secret-key not configure in " + this + ".");
+            consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,
+                    this.applicationContext.getEnvironment().
+                    resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
+        }
+
         consumer.setNamesrvAddr(nameServer);
         consumer.setConsumeThreadMax(consumeThreadMax);
         if (consumeThreadMax < consumer.getConsumeThreadMin()) {
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
index 69f2e95..16a6bcf 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
@@ -17,13 +17,17 @@
 package org.apache.rocketmq.spring.support;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.LocalTransactionState;
 import org.apache.rocketmq.client.producer.TransactionListener;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
 import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
+import org.springframework.core.env.Environment;
 import org.springframework.messaging.MessageHeaders;
 import org.springframework.messaging.MessagingException;
 import org.springframework.messaging.support.MessageBuilder;
@@ -167,4 +171,30 @@
 
         return rocketMsg;
     }
+
+    public static RPCHook getRPCHookByAkSk(Environment env, String accessKeyOrExpr, String secretKeyOrExpr) {
+        String ak, sk;
+        try {
+            ak = env.resolveRequiredPlaceholders(accessKeyOrExpr);
+            sk = env.resolveRequiredPlaceholders(secretKeyOrExpr);
+        } catch (Exception e) {
+            // Ignore it
+            ak = null;
+            sk = null;
+        }
+        if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
+            return new AclClientRPCHook(new SessionCredentials(ak, sk));
+        }
+        return null;
+    }
+
+    public static String getInstanceName(RPCHook rpcHook, String identify) {
+        String separator = "|";
+        StringBuilder instanceName = new StringBuilder();
+        SessionCredentials sessionCredentials = ((AclClientRPCHook) rpcHook).getSessionCredentials();
+        instanceName.append(sessionCredentials.getAccessKey())
+            .append(separator).append(sessionCredentials.getSecretKey())
+            .append(separator).append(identify);
+        return instanceName.toString();
+    }
 }