Merge pull request #206 from zhangjidi2016/master

[ISSUE #151] Fix the Infinite loop in DefaultRocketMQListenerContainer
diff --git a/README.md b/README.md
index 04a9334..a5d9429 100644
--- a/README.md
+++ b/README.md
@@ -4,47 +4,32 @@
 [![GitHub release](https://img.shields.io/badge/release-download-orange.svg)](https://github.com/apache/rocketmq-spring/releases)
 [![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html)
 
-[中文](./README_zh_CN.md)
-
-## Introduction
-
 This project aims to help developers quickly integrate [RocketMQ](http://rocketmq.apache.org/) with [Spring Boot](http://projects.spring.io/spring-boot/). 
 
-## How To Contribute
+## Features
 
-We are always very happy to have contributions, whether for trivial cleanups or big new features. Please see the RocketMQ main website to read [details](http://rocketmq.apache.org/docs/how-to-contribute/)
-
+- [x] Send messages synchronously
+- [x] Send messages asynchronously
+- [x] Send messages in one-way mode
+- [x] Send ordered messages
+- [x] Send batched messages
+- [x] Send transactional messages
+- [x] Send scheduled messages with delay level
+- [x] Consume messages with concurrently mode (broadcasting/clustering)
+- [x] Consume ordered messages
+- [x] Filter messages using the tag or sql92 expression
+- [x] Suport message tracing
+- [x] Support authentication and authorization
+- [ ] Support request-reply message exchange pattern
 
 ## Prerequisites
 - JDK 1.8 and above
 - [Maven](http://maven.apache.org/) 3.0 and above
+- Spring Boot 2.0 and above
 
-## Build and Install with local maven repository
+## Usage
 
-```
-  mvn clean install
-```
-
-## Features:
-
-- [x] synchronous transmission
-- [x] synchronous ordered transmission
-- [x] synchronous batch transmission
-- [x] asynchronous transmission
-- [x] asynchronous ordered transmission
-- [x] orderly consume
-- [x] concurrently consume(broadcasting/clustering)
-- [x] one-way transmission
-- [x] transaction transmission
-- [x] message trace
-- [x] ACL
-- [ ] pull consume
-
-## Quick Start
-
-Please see the complete sample [rocketmq-spring-boot-samples](rocketmq-spring-boot-samples)
-
-Note: Current RELEASE.VERSION=2.0.4 
+Add a dependency using maven:
 
 ```xml
 <!--add dependency in pom.xml-->
@@ -53,354 +38,20 @@
     <artifactId>rocketmq-spring-boot-starter</artifactId>
     <version>${RELEASE.VERSION}</version>
 </dependency>
-```
+``` 
 
-### Produce Message
+## Samples
 
-```properties
-## application.properties
-rocketmq.name-server=127.0.0.1:9876
-rocketmq.producer.group=my-group
-```
+Please see the [rocketmq-spring-boot-samples](rocketmq-spring-boot-samples).
 
-> Note:
-> 
-> Maybe you need change `127.0.0.1:9876` with your real NameServer address for RocketMQ
+## User Guide
 
-```java
-@SpringBootApplication
-public class ProducerApplication implements CommandLineRunner{
-    @Resource
-    private RocketMQTemplate rocketMQTemplate;
-    
-    public static void main(String[] args){
-        SpringApplication.run(ProducerApplication.class, args);
-    }
-    
-    public void run(String... args) throws Exception {
-        rocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!");
-        rocketMQTemplate.send("test-topic-1", MessageBuilder.withPayload("Hello, World! I'm from spring message").build());
-        rocketMQTemplate.convertAndSend("test-topic-2", new OrderPaidEvent("T_001", new BigDecimal("88.00")));
-        
-//        rocketMQTemplate.destroy(); // notes:  once rocketMQTemplate be destroyed, you can not send any message again with this rocketMQTemplate
-    }
-    
-    @Data
-    @AllArgsConstructor
-    public class OrderPaidEvent implements Serializable{
-        private String orderId;
-        
-        private BigDecimal paidMoney;
-    }
-}
-```
-
-> More relevant configurations for producing:
->
-> ```properties
-> rocketmq.producer.send-message-timeout=300000
-> rocketmq.producer.compress-message-body-threshold=4096
-> rocketmq.producer.max-message-size=4194304
-> rocketmq.producer.retry-times-when-send-async-failed=0
-> rocketmq.producer.retry-next-server=true
-> rocketmq.producer.retry-times-when-send-failed=2
-> ```
+Please see the [wiki](https://github.com/apache/rocketmq-spring/wiki) page.
 
 
-### Send message in transaction and implement local check Listener
-```java
-@SpringBootApplication
-public class ProducerApplication implements CommandLineRunner{
-    @Resource
-    private RocketMQTemplate rocketMQTemplate;
+## Contributing
 
-    public static void main(String[] args){
-        SpringApplication.run(ProducerApplication.class, args);
-    }
+We are always very happy to have contributions, whether for trivial cleanups or big new features. Please see the RocketMQ main website to read [details](http://rocketmq.apache.org/docs/how-to-contribute/).
 
-    public void run(String... args) throws Exception {
-        try {
-            // Build a SpringMessage for sending in transaction
-            Message msg = MessageBuilder.withPayload(..)...;
-            // In sendMessageInTransaction(), the first parameter transaction name ("test")
-            // must be same with the @RocketMQTransactionListener's member field 'transName'
-            rocketMQTemplate.sendMessageInTransaction("test", "test-topic", msg, null);
-        } catch (MQClientException e) {
-            e.printStackTrace(System.out);
-        }
-    }
-
-    // Define transaction listener with the annotation @RocketMQTransactionListener
-    @RocketMQTransactionListener(transName="test")
-    class TransactionListenerImpl implements RocketMQLocalTransactionListener {
-          @Override
-          public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
-            // ... local transaction process, return bollback, commit or unknown
-            return RocketMQLocalTransactionState.UNKNOWN;
-          }
-
-          @Override
-          public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
-            // ... check transaction status and return bollback, commit or unknown
-            return RocketMQLocalTransactionState.COMMIT;
-          }
-    }
-}
-```
-
-### Consume Message
-
-```properties
-## application.properties
-rocketmq.name-server=127.0.0.1:9876
-```
-
-> Note:
-> 
-> Maybe you need change `127.0.0.1:9876` with your real NameServer address for RocketMQ
-
-```java
-@SpringBootApplication
-public class ConsumerApplication{
-    
-    public static void main(String[] args){
-        SpringApplication.run(ConsumerApplication.class, args);
-    }
-    
-    @Slf4j
-    @Service
-    @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
-    public class MyConsumer1 implements RocketMQListener<String>{
-        public void onMessage(String message) {
-            log.info("received message: {}", message);
-        }
-    }
-    
-    @Slf4j
-    @Service
-    @RocketMQMessageListener(topic = "test-topic-2", consumerGroup = "my-consumer_test-topic-2")
-    public class MyConsumer2 implements RocketMQListener<OrderPaidEvent>{
-        public void onMessage(OrderPaidEvent orderPaidEvent) {
-            log.info("received orderPaidEvent: {}", orderPaidEvent);
-        }
-    }
-}
-```
-
-> More relevant configurations for consuming:
->
-> see: [RocketMQMessageListener](rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java)
-
-### Message Trace
-
-We need 2 more configurations for support message trace in producer.
-
-```properties
-## application.properties
-rocketmq.name-server=127.0.0.1:9876
-rocketmq.producer.group=my-group
-
-rocketmq.producer.enable-msg-trace=true
-rocketmq.producer.customized-trace-topic=my-trace-topic
-```
-
-The message trace in consumer should configure in `@RocketMQMessageListener`.
-
-```
-@Service
-@RocketMQMessageListener(
-    topic = "test-topic-1", 
-    consumerGroup = "my-consumer_test-topic-1",
-    enableMsgTrace = true,
-    customizedTraceTopic = "my-trace-topic"
-)
-public class MyConsumer implements RocketMQListener<String> {
-    ...
-}
-```
-
-
-> Note:
-> 
-> Maybe you need change `127.0.0.1:9876` with your real NameServer address for RocketMQ
-
-> By default, the message track feature of Producer and Consumer is turned on and the trace-topic is RMQ_SYS_TRACE_TOPIC
-> The topic of message trace can be configured with `rocketmq.consumer.customized-trace-topic` configuration item, not required to be configured in each `@RocketMQTransactionListener`
-
-
-### ACL
-
-We need 2 more configurations for support ACL in producer.
-
-```properties
-## application.properties
-rocketmq.name-server=127.0.0.1:9876
-rocketmq.producer.group=my-group
-
-rocketmq.producer.access-key=AK
-rocketmq.producer.secret-key=SK
-```
-Transaction Message should configure AK/SK in `@RocketMQTransactionListener`. 
-
-```
-@RocketMQTransactionListener(
-    txProducerGroup = "test,
-    accessKey = "AK",
-    secretKey = "SK"
-)
-class TransactionListenerImpl implements RocketMQLocalTransactionListener {
-    ...
-}
-```
-
-> Note:
-> 
-> You do not need to configure AK/SK for each `@RocketMQTransactionListener`, you could configure `rocketmq.producer.access-key` and `rocketmq.producer.secret-key` as default value
-
-The ACL feature in consumer should configure AK/SK in `@RocketMQMessageListener`.
-
-```
-@Service
-@RocketMQMessageListener(
-    topic = "test-topic-1", 
-    consumerGroup = "my-consumer_test-topic-1",
-    accessKey = "AK",
-    secretKey = "SK"
-)
-public class MyConsumer implements RocketMQListener<String> {
-    ...
-}
-```
-
-> Note:
-> 
-> You do not need to configure AK/SK for each `@RocketMQMessageListener`, you could configure `rocketmq.consumer.access-key` and `rocketmq.consumer.secret-key` as default value
-
-## FAQ
-
-1. How to connected many `nameserver` on production environment?
-
-    `rocketmq.name-server` support the configuration of multiple `nameserver`, separated by `;`. For example: `172.19.0.1:9876; 172.19.0.2:9876`
-
-1. When was `rocketMQTemplate` destroyed?
-
-    Developers do not need to manually execute the `rocketMQTemplate.destroy ()` method when using `rocketMQTemplate` to send a message in the project, and` rocketMQTemplate` will be destroyed automatically when the spring container is destroyed.
-
-1. start exception:`Caused by: org.apache.rocketmq.client.exception.MQClientException: The consumer group[xxx] has been created before, specify another name please`
-
-   RocketMQ in the design do not want a consumer to deal with multiple types of messages at the same time, so the same `consumerGroup` consumer responsibility should be the same, do not do different things (that is, consumption of multiple topics). Suggested `consumerGroup` and` topic` one correspondence.
-    
-1. How is the message content body being serialized and deserialized?
-
-    RocketMQ's message body is stored as `byte []`. When the business system message content body if it is `java.lang.String` type, unified in accordance with` utf-8` code into `byte []`; If the business system message content is not `java.lang.String` Type, then use [jackson-databind](https://github.com/FasterXML/jackson-databind) serialized into the `JSON` format string, and then unified in accordance with` utf-8` code into `byte [] `.
-    
-1. How do I specify the `tags` for topic?
-
-    RocketMQ best practice recommended: an application as much as possible with one Topic, the message sub-type with `tags` to identify,` tags` can be set by the application free.
-    
-    When you use `rocketMQTemplate` to send a message, set the destination of the message by setting the` destination` parameter of the send method. The `destination` format is `topicName:tagName`, `:` Precedes the name of the topic, followed by the `tags` name.
-    
-    > Note:
-    >
-    > `tags` looks a complex, but when sending a message , the destination can only specify one topic under a `tag`, can not specify multiple.
-    
-1. How do I set the message's `key` when sending a message?
-
-    You can send a message by overloading method like `xxxSend(String destination, Message<?> msg, ...)`, setting `headers` of `msg`. for example:
-    
-    ```java
-    Message<?> message = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, msgId).build();
-    rocketMQTemplate.send("topic-test", message);
-    ```
-
-    Similarly, you can also set the message `FLAG`,` WAIT_STORE_MSG_OK` and some other user-defined other header information according to the above method.
-    
-    > Note:
-    >
-    > In the case of converting Spring's Message to RocketMQ's Message, to prevent the `header` information from conflicting with RocketMQ's system properties, the prefix `USERS_` was added in front of all `header` names. So if you want to get a custom message header when consuming, please pass through the key at the beginning of `USERS_` in the header.
-    
-1. When consume message, in addition to get the message `payload`, but also want to get RocketMQ message of other system attributes, how to do?
-
-    Consumers in the realization of `RocketMQListener` interface, only need to be generic for the` MessageExt` can, so in the `onMessage` method will receive RocketMQ native 'MessageExt` message.
-    
-    ```java
-    @Slf4j
-    @Service
-    @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
-    public class MyConsumer2 implements RocketMQListener<MessageExt>{
-        public void onMessage(MessageExt messageExt) {
-            log.info("received messageExt: {}", messageExt);
-        }
-    }
-    ```
-    
-1. How do I specify where consumers start consuming messages?
-
-    The default consume offset please refer: [RocketMQ FAQ](http://rocketmq.apache.org/docs/faq/).
-    To customize the consumer's starting location, simply add a `RocketMQPushConsumerLifecycleListener` interface implementation to the consumer class. Examples are as follows:
-    
-    ```java
-    @Slf4j
-    @Service
-    @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
-    public class MyConsumer1 implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
-        @Override
-        public void onMessage(String message) {
-            log.info("received message: {}", message);
-        }
-    
-        @Override
-        public void prepareStart(final DefaultMQPushConsumer consumer) {
-            // set consumer consume message from now
-            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
-            consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
-        }
-    }
-    ```
-    
-    Similarly, any other configuration on `DefaultMQPushConsumer` can be done in the same way as above.
-
-
-1. How do I send transactional messages?
-   It needs two steps on client side: 
-   
-   a) Define a class which is annotated with @RocketMQTransactionListener and implements RocketMQLocalTransactionListener interface, in which, the executeLocalTransaction() and checkLocalTransaction() methods are implemented;
-   
-   b) Invoke the sendMessageInTransaction() method with the RocketMQTemplate API. Note: The first parameter of this method is correlated with the txProducerGroup attribute of @RocketMQTransactionListener. It can be null if using the default transaction producer group.
-
-1. How do I create more than one RocketMQTemplate with a different name-server or other specific properties?
-    ```java
-    // Step1. Define an extra RocketMQTemplate with required properties, note, the 'nameServer' property must be different from the value of global
-    // Spring configuration 'rocketmq.name-server', other properties are optionally defined, they will use the global configuration 
-    // definition by default.  
- 
-    // The RocketMQTemplate's Spring Bean name is 'extRocketMQTemplate', same with the simplified class name (Initials lowercase)
-    @ExtRocketMQTemplateConfiguration(nameServer="127.0.0.1:9876"
-       , ... // override other specific properties if needed
-    )
-    public class ExtRocketMQTemplate extends RocketMQTemplate {
-      // keep the body empty
-    }
- 
- 
-    // Step2. Use the extra RocketMQTemplate. e.g.
-    @Resource(name = "extRocketMQTemplate") // Must define the name to qualify to extra-defined RocketMQTemplate bean.
-    private RocketMQTemplate extRocketMQTemplate;
-    // you can use the template as normal.
-    
-    ```
- 
-1. How do I create a consumer Listener with different name-server other than the global Spring configuration 'rocketmq.name-server' ?  
-    ```java
-    @Service
-    @RocketMQMessageListener(
-       nameServer = "NEW-NAMESERVER-LIST", // define new nameServer list
-       topic = "test-topic-1", 
-       consumerGroup = "my-consumer_test-topic-1",
-       enableMsgTrace = true,
-       customizedTraceTopic = "my-trace-topic"
-    )
-    public class MyNameServerConsumer implements RocketMQListener<String> {
-       ...
-    }
-    ```  
+## License
+[Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html) Copyright (C) Apache Software Foundation 
diff --git a/README_zh_CN.md b/README_zh_CN.md
deleted file mode 100644
index 3e26307..0000000
--- a/README_zh_CN.md
+++ /dev/null
@@ -1,391 +0,0 @@
-# RocketMQ-Spring  [![Build Status](https://travis-ci.org/apache/rocketmq-spring.svg?branch=master)](https://travis-ci.org/apache/rocketmq-spring)
-
-[![Maven Central](https://maven-badges.herokuapp.com/maven-central/org.apache.rocketmq/rocketmq-spring-all/badge.svg)](https://search.maven.org/search?q=g:org.apache.rocketmq%20AND%20a:rocketmq-spring-all)
-[![GitHub release](https://img.shields.io/badge/release-download-orange.svg)](https://github.com/apache/rocketmq-spring/releases)
-[![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html)
-
-[English](./README.md)
-
-帮助开发者在[Spring Boot](http://projects.spring.io/spring-boot/)中快速集成[RocketMQ](http://rocketmq.apache.org/)。支持Spring Message规范,方便开发者从其它MQ快速切换到RocketMQ。
-
-## 如何贡献和帮助社区
-
-我们永远欢迎开发者的帮助来使这个项目更加完善,无论是小的文档还是大的功能新特性,请参考RocketMQ的主站了解[细节](http://rocketmq.apache.org/docs/how-to-contribute/)
-
-## 前提条件
-- JDK 1.8 and above
-- [Maven](http://maven.apache.org/) 3.0 and above
-
-功能特性:
-
-- [x] 同步发送
-- [x] 同步顺序发送
-- [x] 同步批量发送
-- [x] 异步发送
-- [x] 异步顺序发送
-- [x] 顺序消费
-- [x] 并发消费(广播/集群)
-- [x] one-way方式发送
-- [x] 事务方式发送
-- [x] 消息轨迹
-- [x] ACL
-- [ ] pull消费
-
-## Quick Start
-
-下面列出来了一些关键点,完整的示例请参考: [rocketmq-spring-boot-samples](rocketmq-spring-boot-samples)
-
-注意:当前的RELEASE.VERSION=2.0.4 
-
-```xml
-<!--在pom.xml中添加依赖-->
-<dependency>
-    <groupId>org.apache.rocketmq</groupId>
-    <artifactId>rocketmq-spring-boot-starter</artifactId>
-    <version>${RELEASE.VERSION}</version>
-</dependency>
-```
-
-### 发送消息
-
-```properties
-## application.properties
-rocketmq.name-server=127.0.0.1:9876
-rocketmq.producer.group=my-group
-```
-
-> 注意:
-> 
-> 请将上述示例配置中的`127.0.0.1:9876`替换成真实RocketMQ的NameServer地址与端口
-
-```java
-@SpringBootApplication
-public class ProducerApplication implements CommandLineRunner{
-    @Resource
-    private RocketMQTemplate rocketMQTemplate;
-    
-    public static void main(String[] args){
-        SpringApplication.run(ProducerApplication.class, args);
-    }
-    
-    public void run(String... args) throws Exception {
-        rocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!");
-        rocketMQTemplate.send("test-topic-1", MessageBuilder.withPayload("Hello, World! I'm from spring message").build());
-        rocketMQTemplate.convertAndSend("test-topic-2", new OrderPaidEvent("T_001", new BigDecimal("88.00")));
-        
-//        rocketMQTemplate.destroy(); // notes:  once rocketMQTemplate be destroyed, you can not send any message again with this rocketMQTemplate
-    }
-    
-    @Data
-    @AllArgsConstructor
-    public class OrderPaidEvent implements Serializable{
-        private String orderId;
-        
-        private BigDecimal paidMoney;
-    }
-}
-```
-
-### 在发送客户端发送事务性消息并且实现回查Listener
-```java
-@SpringBootApplication
-public class ProducerApplication implements CommandLineRunner{
-    @Resource
-    private RocketMQTemplate rocketMQTemplate;
-
-    public static void main(String[] args){
-        SpringApplication.run(ProducerApplication.class, args);
-    }
-
-    public void run(String... args) throws Exception {
-        try {
-            // Build a SpringMessage for sending in transaction
-            Message msg = MessageBuilder.withPayload(..)...
-            // In sendMessageInTransaction(), the first parameter transaction name ("test")
-            // must be same with the @RocketMQTransactionListener's member field 'transName'
-            rocketMQTemplate.sendMessageInTransaction("test", "test-topic" msg, null);
-        } catch (MQClientException e) {
-            e.printStackTrace(System.out);
-        }
-    }
-
-    // Define transaction listener with the annotation @RocketMQTransactionListener
-    @RocketMQTransactionListener(transName="test")
-    class TransactionListenerImpl implements RocketMQLocalTransactionListener {
-          @Override
-          public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
-            // ... local transaction process, return bollback, commit or unknown
-            return RocketMQLocalTransactionState.UNKNOWN;
-          }
-
-          @Override
-          public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
-            // ... check transaction status and return bollback, commit or unknown
-            return RocketMQLocalTransactionState.COMMIT;
-          }
-    }
-}
-```
-
-> 更多发送相关配置
->
-> ```properties
-> rocketmq.producer.send-message-timeout=300000
-> rocketmq.producer.compress-message-body-threshold=4096
-> rocketmq.producer.max-message-size=4194304
-> rocketmq.producer.retry-times-when-send-async-failed=0
-> rocketmq.producer.retry-next-server=true
-> rocketmq.producer.retry-times-when-send-failed=2
-> ```
-
-
-### 接收消息
-
-```properties
-## application.properties
-rocketmq.name-server=127.0.0.1:9876
-```
-
-> 注意:
-> 
-> 请将上述示例配置中的`127.0.0.1:9876`替换成真实RocketMQ的NameServer地址与端口
-
-```java
-@SpringBootApplication
-public class ConsumerApplication{
-    
-    public static void main(String[] args){
-        SpringApplication.run(ConsumerApplication.class, args);
-    }
-    
-    @Slf4j
-    @Service
-    @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
-    public class MyConsumer1 implements RocketMQListener<String>{
-        public void onMessage(String message) {
-            log.info("received message: {}", message);
-        }
-    }
-    
-    @Slf4j
-    @Service
-    @RocketMQMessageListener(topic = "test-topic-2", consumerGroup = "my-consumer_test-topic-2")
-    public class MyConsumer2 implements RocketMQListener<OrderPaidEvent>{
-        public void onMessage(OrderPaidEvent orderPaidEvent) {
-            log.info("received orderPaidEvent: {}", orderPaidEvent);
-        }
-    }
-}
-```
-
-
-> 更多消费相关配置
->
-> see: [RocketMQMessageListener](rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java) 
-
-### 消息轨迹
-
-Producer 端要想使用消息轨迹,需要多配置两个配置项:
-
-```properties
-## application.properties
-rocketmq.name-server=127.0.0.1:9876
-rocketmq.producer.group=my-group
-
-rocketmq.producer.enable-msg-trace=true
-rocketmq.producer.customized-trace-topic=my-trace-topic
-```
-
-Consumer 端消息轨迹的功能需要在 `@RocketMQMessageListener` 中进行配置对应的属性:
-
-```
-@Service
-@RocketMQMessageListener(
-    topic = "test-topic-1", 
-    consumerGroup = "my-consumer_test-topic-1",
-    enableMsgTrace = true,
-    customizedTraceTopic = "my-trace-topic"
-)
-public class MyConsumer implements RocketMQListener<String> {
-    ...
-}
-```
-
-> 注意:
-> 
-> 默认情况下 Producer 和 Consumer 的消息轨迹功能是开启的且 trace-topic 为 RMQ_SYS_TRACE_TOPIC
-> Consumer 端的消息轨迹 trace-topic 可以在配置文件中配置 `rocketmq.consumer.customized-trace-topic` 配置项,不需要为在每个 `@RocketMQTransactionListener` 配置
-
-
-### ACL
-
-Producer 端要想使用 ACL 功能,需要多配置两个配置项:
-
-```properties
-## application.properties
-rocketmq.name-server=127.0.0.1:9876
-rocketmq.producer.group=my-group
-
-rocketmq.producer.access-key=AK
-rocketmq.producer.secret-key=SK
-```
-
-事务消息的发送需要在 `@RocketMQTransactionListener` 注解里配置上 AK/SK:
-
-```
-@RocketMQTransactionListener(
-    txProducerGroup = "test,
-    accessKey = "AK",
-    secretKey = "SK"
-)
-class TransactionListenerImpl implements RocketMQLocalTransactionListener {
-    ...
-}
-```
-
-> 注意:
-> 
-> 可以不用为每个 `@RocketMQTransactionListener` 注解配置 AK/SK,在配置文件中配置 `rocketmq.producer.access-key` 和 `rocketmq.producer.secret-key` 配置项,这两个配置项的值就是默认值
-
-Consumer 端 ACL 功能需要在 `@RocketMQMessageListener` 中进行配置
-
-```
-@Service
-@RocketMQMessageListener(
-    topic = "test-topic-1", 
-    consumerGroup = "my-consumer_test-topic-1",
-    accessKey = "AK",
-    secretKey = "SK"
-)
-public class MyConsumer implements RocketMQListener<String> {
-    ...
-}
-```
-
-> 注意:
-> 
-> 可以不用为每个 `@RocketMQMessageListener` 注解配置 AK/SK,在配置文件中配置 `rocketmq.consumer.access-key` 和 `rocketmq.consumer.secret-key` 配置项,这两个配置项的值就是默认值
-
-## FAQ
-
-1. 生产环境有多个`nameserver`该如何连接?
-
-   `rocketmq.name-server`支持配置多个`nameserver`地址,采用`;`分隔即可。例如:`172.19.0.1:9876;172.19.0.2:9876`
-
-1. `rocketMQTemplate`在什么时候被销毁?
-
-    开发者在项目中使用`rocketMQTemplate`发送消息时,不需要手动执行`rocketMQTemplate.destroy()`方法, `rocketMQTemplate`会在spring容器销毁时自动销毁。
-
-1. 启动报错:`Caused by: org.apache.rocketmq.client.exception.MQClientException: The consumer group[xxx] has been created before, specify another name please`
-
-    RocketMQ在设计时就不希望一个消费者同时处理多个类型的消息,因此同一个`consumerGroup`下的consumer职责应该是一样的,不要干不同的事情(即消费多个topic)。建议`consumerGroup`与`topic`一一对应。
-    
-1. 发送的消息内容体是如何被序列化与反序列化的?
-
-    RocketMQ的消息体都是以`byte[]`方式存储。当业务系统的消息内容体如果是`java.lang.String`类型时,统一按照`utf-8`编码转成`byte[]`;如果业务系统的消息内容为非`java.lang.String`类型,则采用[jackson-databind](https://github.com/FasterXML/jackson-databind)序列化成`JSON`格式的字符串之后,再统一按照`utf-8`编码转成`byte[]`。
-    
-1. 如何指定topic的`tags`?
-
-    RocketMQ的最佳实践中推荐:一个应用尽可能用一个Topic,消息子类型用`tags`来标识,`tags`可以由应用自由设置。
-    在使用`rocketMQTemplate`发送消息时,通过设置发送方法的`destination`参数来设置消息的目的地,`destination`的格式为`topicName:tagName`,`:`前面表示topic的名称,后面表示`tags`名称。
-    
-    > 注意:
-    >
-    > `tags`从命名来看像是一个复数,但发送消息时,目的地只能指定一个topic下的一个`tag`,不能指定多个。
-    
-1. 发送消息时如何设置消息的`key`?
-
-    可以通过重载的`xxxSend(String destination, Message<?> msg, ...)`方法来发送消息,指定`msg`的`headers`来完成。示例:
-    
-    ```java
-    Message<?> message = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, msgId).build();
-    rocketMQTemplate.send("topic-test", message);
-    ```
-
-    同理还可以根据上面的方式来设置消息的`FLAG`、`WAIT_STORE_MSG_OK`以及一些用户自定义的其它头信息。
-    
-    > 注意:
-    >
-    > 在将Spring的Message转化为RocketMQ的Message时,为防止`header`信息与RocketMQ的系统属性冲突,在所有`header`的名称前面都统一添加了前缀`USERS_`。因此在消费时如果想获取自定义的消息头信息,请遍历头信息中以`USERS_`开头的key即可。
-    
-1. 消费消息时,除了获取消息`payload`外,还想获取RocketMQ消息的其它系统属性,需要怎么做?
-
-    消费者在实现`RocketMQListener`接口时,只需要起泛型为`MessageExt`即可,这样在`onMessage`方法将接收到RocketMQ原生的`MessageExt`消息。
-    
-    ```java
-    @Slf4j
-    @Service
-    @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
-    public class MyConsumer2 implements RocketMQListener<MessageExt>{
-        public void onMessage(MessageExt messageExt) {
-            log.info("received messageExt: {}", messageExt);
-        }
-    }
-    ```
-    
-1. 如何指定消费者从哪开始消费消息,或开始消费的位置?
-
-    消费者默认开始消费的位置请参考:[RocketMQ FAQ](http://rocketmq.apache.org/docs/faq/)。
-    若想自定义消费者开始的消费位置,只需在消费者类添加一个`RocketMQPushConsumerLifecycleListener`接口的实现即可。 示例如下:
-    
-    ```java
-    @Slf4j
-    @Service
-    @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
-    public class MyConsumer1 implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
-        @Override
-        public void onMessage(String message) {
-            log.info("received message: {}", message);
-        }
-    
-        @Override
-        public void prepareStart(final DefaultMQPushConsumer consumer) {
-            // set consumer consume message from now
-            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
-            consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
-        }
-    }
-    ```
-    
-    同理,任何关于`DefaultMQPushConsumer`的更多其它其它配置,都可以采用上述方式来完成。
-    
-1. 如何发送事务消息(即半消息支持分布式事务)?
-	在客户端,首先用户需要实现RocketMQLocalTransactionListener接口,并在接口类上注解声明@RocketMQTransactionListener,实现确认和回查方法;然后再使用资源模板RocketMQTemplate,
-	调用方法sendMessageInTransaction()来进行消息的发布。 注意:这个方法通过指定发送者组名与具体的声明了txProducerGroup的TransactionListener进行关联,您也可以不指定这个值,从而使用默认的事务发送者组。
-    
-1. 如何声明不同name-server或者其他特定的属性来定义非标的RocketMQTemplate?
-    ```java
-    // 第一步: 定义非标的RocketMQTemplate使用你需要的属性,注意,这里的'nameServer'属性必须要定义,并且其取值不能与全局配置属性'rocketmq.name-server'相同
-    // 也可以定义其他属性,如果不定义,它们取全局的配置属性值或默认值。
- 
-    // 这个RocketMQTemplate的Spring Bean名是'extRocketMQTemplate', 与所定义的类名相同(但首字母小写)
-    @ExtRocketMQTemplateConfiguration(nameServer="127.0.0.1:9876"
-       , ... // 定义其他属性,如果有必要。
-    )
-    public class ExtRocketMQTemplate extends RocketMQTemplate {
-      //类里面不需要做任何修改
-    }
- 
- 
-    // 第二步: 使用这个非标RocketMQTemplate
-    @Resource(name = "extRocketMQTemplate") // 这里必须定义name属性来指向上具体的Spring Bean.
-    private RocketMQTemplate extRocketMQTemplate; 
-    // 接下来就可以正常使用这个extRocketMQTemplate了.
-    
-    ```
- 
-1. MessageListener消费端,是否可以指定不同的name-server而不是使用全局定义的'rocketmq.name-server'属性值 ?  
-    
-    ```java
-    @Service
-    @RocketMQMessageListener(
-       nameServer = "NEW-NAMESERVER-LIST", // 可以使用这个optional属性来指定不同的name-server
-       topic = "test-topic-1", 
-       consumerGroup = "my-consumer_test-topic-1",
-       enableMsgTrace = true,
-       customizedTraceTopic = "my-trace-topic"
-    )
-    public class MyNameServerConsumer implements RocketMQListener<String> {
-       ...
-    }
-    ``` 
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index f8acf1d..827ae1a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@
     
     <groupId>org.apache.rocketmq</groupId>
     <artifactId>rocketmq-spring-all</artifactId>
-    <version>2.0.5-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <packaging>pom</packaging>
 
     <name>Apache RocketMQ Spring Boot ${project.version}</name>
diff --git a/rocketmq-spring-boot-parent/pom.xml b/rocketmq-spring-boot-parent/pom.xml
index ca490b0..bf62ce4 100644
--- a/rocketmq-spring-boot-parent/pom.xml
+++ b/rocketmq-spring-boot-parent/pom.xml
@@ -22,7 +22,7 @@
     <parent>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-spring-all</artifactId>
-        <version>2.0.5-SNAPSHOT</version>
+        <version>2.1.0-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
 
@@ -38,7 +38,7 @@
         <spring.boot.version>2.0.5.RELEASE</spring.boot.version>
         <spring.version>5.1.0.RELEASE</spring.version>
 
-        <rocketmq.spring.boot.version>2.0.5-SNAPSHOT</rocketmq.spring.boot.version>
+        <rocketmq.spring.boot.version>2.1.0-SNAPSHOT</rocketmq.spring.boot.version>
 
         <rocketmq-version>4.6.0</rocketmq-version>
         <slf4j.version>1.7.25</slf4j.version>
diff --git a/rocketmq-spring-boot-samples/README.md b/rocketmq-spring-boot-samples/README.md
index d1f2b27..ae5c383 100644
--- a/rocketmq-spring-boot-samples/README.md
+++ b/rocketmq-spring-boot-samples/README.md
@@ -1,7 +1,5 @@
 # rocketmq-spring-boot-samples
 
-[中文](./README_zh_CN.md)
-
 [![License](https://img.shields.io/badge/license-Apache--2.0-blue.svg)](https://www.apache.org/licenses/LICENSE-2.0.html)
 
 It's a demo project for how to use [rocketmq-spring-boot](https://github.com/apache/rocketmq-spring)
diff --git a/rocketmq-spring-boot-samples/README_zh_CN.md b/rocketmq-spring-boot-samples/README_zh_CN.md
deleted file mode 100644
index c8f27d1..0000000
--- a/rocketmq-spring-boot-samples/README_zh_CN.md
+++ /dev/null
@@ -1,40 +0,0 @@
-# rocketmq-spring-boot-samples
-
-[English](./README.md)
-
-[![License](https://img.shields.io/badge/license-Apache--2.0-blue.svg)](https://www.apache.org/licenses/LICENSE-2.0.html)
-
-这里是一个使用rocketmq-spring-boot-starter的例子。 [rocketmq-spring-boot](https://github.com/apache/rocketmq-spring)
-
-
-## 在本地运行这个测试例子
-
-1. 如上面注意项所述,需要开发者在本地build并安装rocketmq-spring-boot-starter
-
-2. 根据RocketMQ官网的quick-start来启动NameServer和Broker,并验证是否启动正确。注意: 测试期间不要停止Broker或者NameServer
-http://rocketmq.apache.org/docs/quick-start/
-
-3. 创建测试例子所需要的Topic
-```
-cd YOUR_ROCKETMQ_HOME
-
-bash bin/mqadmin updateTopic -c DefaultCluster -t string-topic
-bash bin/mqadmin updateTopic -c DefaultCluster -t order-paid-topic
-bash bin/mqadmin updateTopic -c DefaultCluster -t message-ext-topic
-bash bin/mqadmin updateTopic -c DefaultCluster -t spring-transaction-topic
-```
-
-4. 编译并运行测试例子
-
-```
-# 打开一个终端窗口,编译并启动发送端
-cd rocketmq-produce-demo
-mvn clean package
-java -jar target/rocketmq-produce-demo-0.0.1-SNAPSHOT.jar
-
-# 打开另一个终端窗口,编译并启动消费端
-cd rocketmq-consume-demo
-mvn clean package
-java -jar target/rocketmq-consume-demo-0.0.1-SNAPSHOT.jar
-```
-结合测试代码,观察窗口中消息的发送和接收情况
diff --git a/rocketmq-spring-boot-samples/pom.xml b/rocketmq-spring-boot-samples/pom.xml
index 983a2d4..1787afd 100644
--- a/rocketmq-spring-boot-samples/pom.xml
+++ b/rocketmq-spring-boot-samples/pom.xml
@@ -38,7 +38,7 @@
     </modules>
 
     <properties>
-        <rocketmq-spring-boot-starter-version>2.0.5-SNAPSHOT</rocketmq-spring-boot-starter-version>
+        <rocketmq-spring-boot-starter-version>2.1.0-SNAPSHOT</rocketmq-spring-boot-starter-version>
     </properties>
 
     <dependencies>
diff --git a/rocketmq-spring-boot-starter/pom.xml b/rocketmq-spring-boot-starter/pom.xml
index 697ab16..71ea5cd 100644
--- a/rocketmq-spring-boot-starter/pom.xml
+++ b/rocketmq-spring-boot-starter/pom.xml
@@ -22,7 +22,7 @@
     <parent>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-spring-boot-parent</artifactId>
-        <version>2.0.5-SNAPSHOT</version>
+        <version>2.1.0-SNAPSHOT</version>
         <relativePath>../rocketmq-spring-boot-parent/pom.xml</relativePath>
     </parent>
 
@@ -30,7 +30,7 @@
     <packaging>jar</packaging>
 
     <name>RocketMQ Spring Boot Starter</name>
-    <description>SRocketMQ Spring Boot Starter</description>
+    <description>RocketMQ Spring Boot Starter</description>
     <url>https://github.com/apache/rocketmq-spring</url>
 
     <dependencies>
diff --git a/rocketmq-spring-boot/pom.xml b/rocketmq-spring-boot/pom.xml
index d5d5505..cd1d2a4 100644
--- a/rocketmq-spring-boot/pom.xml
+++ b/rocketmq-spring-boot/pom.xml
@@ -22,7 +22,7 @@
     <parent>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-spring-boot-parent</artifactId>
-        <version>2.0.5-SNAPSHOT</version>
+        <version>2.1.0-SNAPSHOT</version>
         <relativePath>../rocketmq-spring-boot-parent/pom.xml</relativePath>
     </parent>
 
@@ -30,7 +30,7 @@
     <packaging>jar</packaging>
 
     <name>RocketMQ Spring Boot AutoConfigure</name>
-    <description>SRocketMQ Spring Boot AutoConfigure</description>
+    <description>RocketMQ Spring Boot AutoConfigure</description>
     <url>https://github.com/apache/rocketmq-spring</url>
 
 
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
index 1c019bb..f082072 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
@@ -17,19 +17,14 @@
 
 package org.apache.rocketmq.spring.autoconfigure;
 
-import java.lang.reflect.Field;
 import java.util.Map;
 import java.util.Objects;
-import org.apache.rocketmq.acl.common.AclClientRPCHook;
-import org.apache.rocketmq.acl.common.SessionCredentials;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.TransactionMQProducer;
-import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
-import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl;
 import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
 import org.apache.rocketmq.spring.core.RocketMQTemplate;
 import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
+import org.apache.rocketmq.spring.support.RocketMQUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.aop.framework.AopProxyUtils;
@@ -104,7 +99,6 @@
     }
 
     private DefaultMQProducer createProducer(ExtRocketMQTemplateConfiguration annotation) {
-        DefaultMQProducer producer = null;
 
         RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
         if (producerConfig == null) {
@@ -118,35 +112,15 @@
         ak = StringUtils.isEmpty(ak) ? producerConfig.getAccessKey() : annotation.accessKey();
         String sk = environment.resolvePlaceholders(annotation.secretKey());
         sk = StringUtils.isEmpty(sk) ? producerConfig.getSecretKey() : annotation.secretKey();
+        boolean isEnableMsgTrace = annotation.enableMsgTrace();
         String customizedTraceTopic = environment.resolvePlaceholders(annotation.customizedTraceTopic());
         customizedTraceTopic = StringUtils.isEmpty(customizedTraceTopic) ? producerConfig.getCustomizedTraceTopic() : customizedTraceTopic;
 
-        boolean isEnableAcl = !StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk);
-
-        if (isEnableAcl) {
-            producer = new TransactionMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)));
-            producer.setVipChannelEnabled(false);
-        } else {
-            producer = new TransactionMQProducer(groupName);
-        }
-
-        if (annotation.enableMsgTrace()) {
-            try {
-                AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, isEnableAcl ? new AclClientRPCHook(new SessionCredentials(ak, sk)) : null);
-                dispatcher.setHostProducer(producer.getDefaultMQProducerImpl());
-                Field field = DefaultMQProducer.class.getDeclaredField("traceDispatcher");
-                field.setAccessible(true);
-                field.set(producer, dispatcher);
-                producer.getDefaultMQProducerImpl().registerSendMessageHook(
-                    new SendMessageTraceHookImpl(dispatcher));
-            } catch (Throwable e) {
-                log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
-            }
-        }
+        DefaultMQProducer producer = RocketMQUtil.createDefaultMQProducer(groupName, ak, sk, isEnableMsgTrace, customizedTraceTopic);
 
         producer.setNamesrvAddr(nameServer);
         producer.setSendMsgTimeout(annotation.sendMessageTimeout() == -1 ? producerConfig.getSendMessageTimeout() : annotation.sendMessageTimeout());
-        producer.setRetryTimesWhenSendFailed(annotation.retryTimesWhenSendAsyncFailed() == -1 ? producerConfig.getRetryTimesWhenSendFailed() : annotation.retryTimesWhenSendAsyncFailed());
+        producer.setRetryTimesWhenSendFailed(annotation.retryTimesWhenSendFailed() == -1 ? producerConfig.getRetryTimesWhenSendFailed() : annotation.retryTimesWhenSendFailed());
         producer.setRetryTimesWhenSendAsyncFailed(annotation.retryTimesWhenSendAsyncFailed() == -1 ? producerConfig.getRetryTimesWhenSendAsyncFailed() : annotation.retryTimesWhenSendAsyncFailed());
         producer.setMaxMessageSize(annotation.maxMessageSize() == -1 ? producerConfig.getMaxMessageSize() : annotation.maxMessageSize());
         producer.setCompressMsgBodyOverHowmuch(annotation.compressMessageBodyThreshold() == -1 ? producerConfig.getCompressMessageBodyThreshold() : annotation.compressMessageBodyThreshold());
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
index 14f7160..e6131e7 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
@@ -17,18 +17,13 @@
 
 package org.apache.rocketmq.spring.autoconfigure;
 
-import java.lang.reflect.Field;
 import javax.annotation.PostConstruct;
-import org.apache.rocketmq.acl.common.AclClientRPCHook;
-import org.apache.rocketmq.acl.common.SessionCredentials;
 import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.client.MQAdmin;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.TransactionMQProducer;
-import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
-import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl;
 import org.apache.rocketmq.spring.core.RocketMQTemplate;
 import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
+import org.apache.rocketmq.spring.support.RocketMQUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -84,30 +79,12 @@
 
         String accessChannel = rocketMQProperties.getAccessChannel();
 
-        DefaultMQProducer producer;
         String ak = rocketMQProperties.getProducer().getAccessKey();
         String sk = rocketMQProperties.getProducer().getSecretKey();
-        boolean isEnableAcl = !StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk);
-        if (isEnableAcl) {
-            producer = new TransactionMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)));
-            producer.setVipChannelEnabled(false);
-        } else {
-            producer = new TransactionMQProducer(groupName);
-        }
+        boolean isEnableMsgTrace = rocketMQProperties.getProducer().isEnableMsgTrace();
+        String customizedTraceTopic = rocketMQProperties.getProducer().getCustomizedTraceTopic();
 
-        if (rocketMQProperties.getProducer().isEnableMsgTrace()) {
-            try {
-                AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(rocketMQProperties.getProducer().getCustomizedTraceTopic(), isEnableAcl ? new AclClientRPCHook(new SessionCredentials(ak, sk)) : null);
-                dispatcher.setHostProducer(producer.getDefaultMQProducerImpl());
-                Field field = DefaultMQProducer.class.getDeclaredField("traceDispatcher");
-                field.setAccessible(true);
-                field.set(producer, dispatcher);
-                producer.getDefaultMQProducerImpl().registerSendMessageHook(
-                    new SendMessageTraceHookImpl(dispatcher));
-            } catch (Throwable e) {
-                log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
-            }
-        }
+        DefaultMQProducer producer = RocketMQUtil.createDefaultMQProducer(groupName, ak, sk, isEnableMsgTrace, customizedTraceTopic);
 
         producer.setNamesrvAddr(nameServer);
         if (!StringUtils.isEmpty(accessChannel)) {
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
index 832d021..1957389 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
@@ -33,7 +33,7 @@
     private String nameServer;
 
     /**
-     * Enum type for accesChannel, values: LOCAL, CLOUD
+     * Enum type for accessChannel, values: LOCAL, CLOUD
      */
     private String accessChannel;
 
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
index 691e516..25ec320 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
@@ -209,14 +209,14 @@
         return selectorType;
     }
 
-    public void setSelectorExpression(String selectorExpression) {
-        this.selectorExpression = selectorExpression;
-    }
-
     public String getSelectorExpression() {
         return selectorExpression;
     }
 
+    public void setSelectorExpression(String selectorExpression) {
+        this.selectorExpression = selectorExpression;
+    }
+
     public MessageModel getMessageModel() {
         return messageModel;
     }
@@ -296,7 +296,6 @@
         return Integer.MAX_VALUE;
     }
 
-
     @Override
     public void afterPropertiesSet() throws Exception {
         initRocketMQPushConsumer();
@@ -328,53 +327,6 @@
         this.name = name;
     }
 
-    public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
-
-        @SuppressWarnings("unchecked")
-        @Override
-        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
-            for (MessageExt messageExt : msgs) {
-                log.debug("received msg: {}", messageExt);
-                try {
-                    long now = System.currentTimeMillis();
-                    rocketMQListener.onMessage(doConvertMessage(messageExt));
-                    long costTime = System.currentTimeMillis() - now;
-                    log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
-                } catch (Exception e) {
-                    log.warn("consume message failed. messageExt:{}", messageExt, e);
-                    context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
-                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
-                }
-            }
-
-            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-        }
-    }
-
-    public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
-
-        @SuppressWarnings("unchecked")
-        @Override
-        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
-            for (MessageExt messageExt : msgs) {
-                log.debug("received msg: {}", messageExt);
-                try {
-                    long now = System.currentTimeMillis();
-                    rocketMQListener.onMessage(doConvertMessage(messageExt));
-                    long costTime = System.currentTimeMillis() - now;
-                    log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
-                } catch (Exception e) {
-                    log.warn("consume message failed. messageExt:{}", messageExt, e);
-                    context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
-                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
-                }
-            }
-
-            return ConsumeOrderlyStatus.SUCCESS;
-        }
-    }
-
-
     @SuppressWarnings("unchecked")
     private Object doConvertMessage(MessageExt messageExt) {
         if (Objects.equals(messageType, MessageExt.class)) {
@@ -402,7 +354,6 @@
         }
     }
 
-
     private MethodParameter getMethodParameter() {
         Class<?> targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener);
         Type messageType = this.getMessageType();
@@ -470,7 +421,7 @@
         } else {
             log.debug("Access-key or secret-key not configure in " + this + ".");
             consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,
-                    this.applicationContext.getEnvironment().
+                this.applicationContext.getEnvironment().
                     resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
         }
 
@@ -528,4 +479,50 @@
 
     }
 
+    public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
+            for (MessageExt messageExt : msgs) {
+                log.debug("received msg: {}", messageExt);
+                try {
+                    long now = System.currentTimeMillis();
+                    rocketMQListener.onMessage(doConvertMessage(messageExt));
+                    long costTime = System.currentTimeMillis() - now;
+                    log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
+                } catch (Exception e) {
+                    log.warn("consume message failed. messageExt:{}", messageExt, e);
+                    context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
+                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+                }
+            }
+
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        }
+    }
+
+    public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
+            for (MessageExt messageExt : msgs) {
+                log.debug("received msg: {}", messageExt);
+                try {
+                    long now = System.currentTimeMillis();
+                    rocketMQListener.onMessage(doConvertMessage(messageExt));
+                    long costTime = System.currentTimeMillis() - now;
+                    log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
+                } catch (Exception e) {
+                    log.warn("consume message failed. messageExt:{}", messageExt, e);
+                    context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
+                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
+                }
+            }
+
+            return ConsumeOrderlyStatus.SUCCESS;
+        }
+    }
+
 }
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
index 4c731f2..5352b19 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
@@ -17,11 +17,19 @@
 package org.apache.rocketmq.spring.support;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import java.lang.reflect.Field;
+import java.nio.charset.Charset;
+import java.util.Map;
+import java.util.Objects;
 import org.apache.rocketmq.acl.common.AclClientRPCHook;
 import org.apache.rocketmq.acl.common.SessionCredentials;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.LocalTransactionState;
 import org.apache.rocketmq.client.producer.TransactionListener;
+import org.apache.rocketmq.client.producer.TransactionMQProducer;
+import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
+import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageConst;
@@ -39,10 +47,6 @@
 import org.springframework.util.CollectionUtils;
 import org.springframework.util.StringUtils;
 
-import java.nio.charset.Charset;
-import java.util.Map;
-import java.util.Objects;
-
 public class RocketMQUtil {
     private final static Logger log = LoggerFactory.getLogger(RocketMQUtil.class);
 
@@ -73,7 +77,7 @@
         }
 
         // Never happen
-        log.warn("Failed to covert enum type RocketMQLocalTransactionState.%s", state);
+        log.warn("Failed to covert enum type RocketMQLocalTransactionState {}.", state);
         return LocalTransactionState.UNKNOW;
     }
 
@@ -136,9 +140,9 @@
         byte[] payloads;
 
         if (payloadObj instanceof String) {
-            payloads = ((String)payloadObj).getBytes(Charset.forName(charset));
+            payloads = ((String) payloadObj).getBytes(Charset.forName(charset));
         } else if (payloadObj instanceof byte[]) {
-            payloads = (byte[])message.getPayload();
+            payloads = (byte[]) message.getPayload();
         } else {
             try {
                 String jsonObj = objectMapper.writeValueAsString(payloadObj);
@@ -150,11 +154,6 @@
         return getAndWrapMessage(destination, message.getHeaders(), payloads);
     }
 
-    public static org.apache.rocketmq.common.message.Message convertToRocketMessage(
-        String destination, org.springframework.messaging.Message<byte[]> message) {
-        return getAndWrapMessage(destination, message.getHeaders(), message.getPayload());
-    }
-
     private static Message getAndWrapMessage(String destination, MessageHeaders headers, byte[] payloads) {
         if (destination == null || destination.length() < 1) {
             return null;
@@ -210,11 +209,11 @@
                 throw new RuntimeException("the message cannot be empty");
             }
             if (payloadObj instanceof String) {
-                payloads = ((String)payloadObj).getBytes(Charset.forName(charset));
+                payloads = ((String) payloadObj).getBytes(Charset.forName(charset));
             } else if (payloadObj instanceof byte[]) {
-                payloads = (byte[])message.getPayload();
+                payloads = (byte[]) message.getPayload();
             } else {
-                String jsonObj = (String)messageConverter.fromMessage(message, payloadObj.getClass());
+                String jsonObj = (String) messageConverter.fromMessage(message, payloadObj.getClass());
                 if (null == jsonObj) {
                     throw new RuntimeException(String.format(
                         "empty after conversion [messageConverter:%s,payloadClass:%s,payloadObj:%s]",
@@ -247,11 +246,41 @@
     public static String getInstanceName(RPCHook rpcHook, String identify) {
         String separator = "|";
         StringBuilder instanceName = new StringBuilder();
-        SessionCredentials sessionCredentials = ((AclClientRPCHook)rpcHook).getSessionCredentials();
+        SessionCredentials sessionCredentials = ((AclClientRPCHook) rpcHook).getSessionCredentials();
         instanceName.append(sessionCredentials.getAccessKey())
             .append(separator).append(sessionCredentials.getSecretKey())
             .append(separator).append(identify)
             .append(separator).append(UtilAll.getPid());
         return instanceName.toString();
     }
+
+    public static DefaultMQProducer createDefaultMQProducer(String groupName, String ak, String sk,
+        boolean isEnableMsgTrace, String customizedTraceTopic) {
+
+        boolean isEnableAcl = !StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk);
+        DefaultMQProducer producer;
+        if (isEnableAcl) {
+            producer = new TransactionMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)));
+            producer.setVipChannelEnabled(false);
+        } else {
+            producer = new TransactionMQProducer(groupName);
+        }
+
+        if (isEnableMsgTrace) {
+            try {
+                AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, isEnableAcl ? new AclClientRPCHook(new SessionCredentials(ak, sk)) : null);
+                dispatcher.setHostProducer(producer.getDefaultMQProducerImpl());
+                Field field = DefaultMQProducer.class.getDeclaredField("traceDispatcher");
+                field.setAccessible(true);
+                field.set(producer, dispatcher);
+                producer.getDefaultMQProducerImpl().registerSendMessageHook(
+                    new SendMessageTraceHookImpl(dispatcher));
+            } catch (Throwable e) {
+                log.error("system trace hook init failed ,maybe can't send msg trace data");
+            }
+        }
+
+        return producer;
+    }
+
 }