Merge branch 'master' into feature/ISSUE_#219_support_configable_tls
diff --git a/rocketmq-spring-boot-parent/pom.xml b/rocketmq-spring-boot-parent/pom.xml
index 12e0807..6f1c4ae 100644
--- a/rocketmq-spring-boot-parent/pom.xml
+++ b/rocketmq-spring-boot-parent/pom.xml
@@ -16,7 +16,8 @@
   ~ limitations under the License.
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
 
     <parent>
@@ -40,7 +41,7 @@
 
         <rocketmq.spring.boot.version>2.2.1-SNAPSHOT</rocketmq.spring.boot.version>
 
-        <rocketmq.version>4.8.0</rocketmq.version>
+        <rocketmq.version>4.9.1</rocketmq.version>
         <slf4j.version>1.7.25</slf4j.version>
         <jackson.version>2.11.1</jackson.version>
         <fastjson.version>1.2.72</fastjson.version>
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumerWithReplyString.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumerWithReplyString.java
index a57f20d..51f4e29 100644
--- a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumerWithReplyString.java
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumerWithReplyString.java
@@ -25,7 +25,8 @@
  * The consumer that replying String
  */
 @Service
-@RocketMQMessageListener(topic = "${demo.rocketmq.stringRequestTopic}", consumerGroup = "${demo.rocketmq.stringRequestConsumer}", selectorExpression = "${demo.rocketmq.tag}")
+@RocketMQMessageListener(topic = "${demo.rocketmq.stringRequestTopic}", consumerGroup = "${demo.rocketmq.stringRequestConsumer}",
+    selectorExpression = "${demo.rocketmq.tag}", replyTimeout = 10000)
 public class StringConsumerWithReplyString implements RocketMQReplyListener<String, String> {
 
     @Override
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
index f8140d4..3acd8a6 100644
--- a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
@@ -123,6 +123,9 @@
         // Send a batch of strings
         testBatchMessages();
 
+        // send a bath of strings orderly
+        testSendBatchMessageOrderly();
+
         // Send transactional messages using rocketMQTemplate
         testRocketMQTemplateTransaction();
 
@@ -181,6 +184,21 @@
         System.out.printf("--- Batch messages send result :" + sr);
     }
 
+    private void testSendBatchMessageOrderly() {
+        for (int q = 0; q < 4; q++) {
+            // send to 4 queues
+            List<Message> msgs = new ArrayList<Message>();
+            for (int i = 0; i < 10; i++) {
+                int msgIndex = q * 10 + i;
+                String msg = String.format("Hello RocketMQ Batch Msg#%d to queue: %d", msgIndex, q);
+                msgs.add(MessageBuilder.withPayload(msg).
+                        setHeader(RocketMQHeaders.KEYS, "KEY_" + msgIndex).build());
+            }
+            SendResult sr = rocketMQTemplate.syncSendOrderly(springTopic, msgs, q + "", 60000);
+            System.out.println("--- Batch messages orderly to queue :" + sr.getMessageQueue().getQueueId() + " send result :" + sr);
+        }
+    }
+
     private void testRocketMQTemplateTransaction() throws MessagingException {
         String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
         for (int i = 0; i < 10; i++) {
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java
index cc78ff9..84d641a 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java
@@ -37,6 +37,7 @@
     String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
     String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
     String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
+    String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
 
     /**
      * The component name of the Producer configuration.
@@ -94,8 +95,19 @@
      * Maximum number of messages pulled each time.
      */
     int pullBatchSize() default 10;
+
     /**
      * The property of "tlsEnable" default false.
      */
     String tlsEnable() default "false";
-}
+
+    /**
+     * Switch flag instance for message trace.
+     */
+    boolean enableMsgTrace() default false;
+
+    /**
+     * The name value of message trace topic.If you don't config,you can use the default trace topic name.
+     */
+    String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;
+}
\ No newline at end of file
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java
index ccf40ef..8e80147 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java
@@ -80,7 +80,7 @@
     /**
      * Switch flag instance for message trace.
      */
-    boolean enableMsgTrace() default true;
+    boolean enableMsgTrace() default false;
     /**
      * The name value of message trace topic.If you don't config,you can use the default trace topic name.
      */
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
index f159a63..e80f328 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
@@ -76,11 +76,24 @@
     int consumeThreadMax() default 64;
 
     /**
+     * Max re-consume times.
+     *
+     * In concurrently mode, -1 means 16;
+     * In orderly mode, -1 means Integer.MAX_VALUE.
+     */
+    int maxReconsumeTimes() default -1;
+
+    /**
      * Maximum amount of time in minutes a message may block the consuming thread.
      */
     long consumeTimeout() default 15L;
 
     /**
+     * Timeout for sending reply messages.
+     */
+    int replyTimeout() default 3000;
+
+    /**
      * The property of "access-key".
      */
     String accessKey() default ACCESS_KEY_PLACEHOLDER;
@@ -93,7 +106,7 @@
     /**
      * Switch flag instance for message trace.
      */
-    boolean enableMsgTrace() default true;
+    boolean enableMsgTrace() default false;
 
     /**
      * The name value of message trace topic.If you don't config,you can use the default trace topic name.
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java
index a42e0d3..f2ece8a 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java
@@ -122,15 +122,16 @@
         String accessChannel = resolvePlaceholders(annotation.accessChannel(), rocketMQProperties.getAccessChannel());
         MessageModel messageModel = annotation.messageModel();
         SelectorType selectorType = annotation.selectorType();
-        String selectorExpression = annotation.selectorExpression();
+        String selectorExpression = resolvePlaceholders(annotation.selectorExpression(), consumerConfig.getSelectorExpression());
         String ak = resolvePlaceholders(annotation.accessKey(), consumerConfig.getAccessKey());
         String sk = resolvePlaceholders(annotation.secretKey(), consumerConfig.getSecretKey());
         int pullBatchSize = annotation.pullBatchSize();
         //if String is not is equal "true" TLS mode will represent the as default value false
         boolean useTLS = new Boolean(environment.resolvePlaceholders(annotation.tlsEnable()));
-
         DefaultLitePullConsumer litePullConsumer = RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,
-                groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize,useTLS);
+                groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize, useTLS);
+        litePullConsumer.setEnableMsgTrace(annotation.enableMsgTrace());
+        litePullConsumer.setCustomizedTraceTopic(resolvePlaceholders(annotation.customizedTraceTopic(), consumerConfig.getCustomizedTraceTopic()));
         return litePullConsumer;
     }
 
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
index b7f6c7c..ea00432 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
@@ -144,6 +144,8 @@
 
         DefaultLitePullConsumer litePullConsumer = RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,
                 groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize, useTLS);
+        litePullConsumer.setEnableMsgTrace(consumerConfig.isEnableMsgTrace());
+        litePullConsumer.setCustomizedTraceTopic(consumerConfig.getCustomizedTraceTopic());
         return litePullConsumer;
     }
 
@@ -176,4 +178,4 @@
         static class DefaultLitePullConsumerExistsCondition {
         }
     }
-}
+}
\ No newline at end of file
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
index 2fe430b..d6c8a90 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
@@ -125,7 +125,7 @@
         /**
          * Switch flag instance for message trace.
          */
-        private boolean enableMsgTrace = true;
+        private boolean enableMsgTrace = false;
 
         /**
          * The name value of message trace topic.If you don't config,you can use the default trace topic name.
@@ -285,6 +285,16 @@
         private int pullBatchSize = 10;
 
         /**
+         * Switch flag instance for message trace.
+         */
+        private boolean enableMsgTrace = false;
+
+        /**
+         * The name value of message trace topic.If you don't config,you can use the default trace topic name.
+         */
+        private String customizedTraceTopic = TopicValidator.RMQ_SYS_TRACE_TOPIC;
+
+        /**
          * listener configuration container
          * the pattern is like this:
          * group1.topic1 = false
@@ -370,6 +380,22 @@
             this.listeners = listeners;
         }
 
+        public boolean isEnableMsgTrace() {
+            return enableMsgTrace;
+        }
+
+        public void setEnableMsgTrace(boolean enableMsgTrace) {
+            this.enableMsgTrace = enableMsgTrace;
+        }
+
+        public String getCustomizedTraceTopic() {
+            return customizedTraceTopic;
+        }
+
+        public void setCustomizedTraceTopic(String customizedTraceTopic) {
+            this.customizedTraceTopic = customizedTraceTopic;
+        }
+
         public boolean isTlsEnable() {
             return tlsEnable;
         }
@@ -379,4 +405,4 @@
         }
     }
 
-}
+}
\ No newline at end of file
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index a18e781..8b1af2a 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.spring.core;
 
+import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
@@ -27,6 +28,8 @@
 import org.apache.rocketmq.client.producer.TransactionMQProducer;
 import org.apache.rocketmq.client.producer.TransactionSendResult;
 import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
+import org.apache.rocketmq.common.message.MessageBatch;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
 import org.apache.rocketmq.spring.support.RocketMQUtil;
@@ -656,6 +659,53 @@
     }
 
     /**
+     * syncSend batch messages orderly.
+     *
+     * @param destination formats: `topicName:tags`
+     * @param messages    Collection of {@link org.springframework.messaging.Message}
+     * @param hashKey     use this key to select queue. for example: orderId, productId ...
+     * @return {@link SendResult}
+     */
+    public <T extends Message> SendResult syncSendOrderly(String destination, Collection<T> messages, String hashKey) {
+        return syncSendOrderly(destination, messages, hashKey, producer.getSendMsgTimeout());
+    }
+
+    /**
+     * Same to {@link #syncSendOrderly(String, Collection, String)} with send timeout specified in addition.
+     *
+     * @param destination formats: `topicName:tags`
+     * @param messages    Collection of {@link org.springframework.messaging.Message}
+     * @param hashKey     use this key to select queue. for example: orderId, productId ...
+     * @param timeout     send timeout with millis
+     * @return {@link SendResult}
+     */
+    public <T extends Message> SendResult syncSendOrderly(String destination, Collection<T> messages, String hashKey, long timeout) {
+        if (Objects.isNull(messages) || messages.isEmpty()) {
+            log.error("syncSendOrderly failed. destination:{}, message is null ", destination);
+            throw new IllegalArgumentException("`messages` can not be empty");
+        }
+        try {
+            long now = System.currentTimeMillis();
+            Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>();
+            for (T message : messages) {
+                if (Objects.isNull(message)) {
+                    continue;
+                }
+                rmqMsgs.add(this.createRocketMqMessage(destination, message));
+            }
+            MessageBatch messageBatch = batch(rmqMsgs);
+            SendResult sendResult = producer.send(messageBatch, this.messageQueueSelector, hashKey, timeout);
+            long costTime = System.currentTimeMillis() - now;
+            if (log.isDebugEnabled()) {
+                log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
+            }
+            return sendResult;
+        } catch (Exception e) {
+            throw new MessagingException(e.getMessage(), e);
+        }
+    }
+
+    /**
      * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout and delay level specified in
      * addition.
      *
@@ -738,6 +788,47 @@
     }
 
     /**
+     * asyncSend batch messages
+     *
+     * @param destination formats: `topicName:tags`
+     * @param messages Collection of {@link org.springframework.messaging.Message}
+     * @param sendCallback {@link SendCallback}
+     */
+    public <T extends Message> void asyncSend(String destination, Collection<T> messages, SendCallback sendCallback) {
+        asyncSend(destination, messages, sendCallback, producer.getSendMsgTimeout());
+    }
+
+    /**
+     * asyncSend batch messages in a given timeout.
+     *
+     * @param destination formats: `topicName:tags`
+     * @param messages Collection of {@link org.springframework.messaging.Message}
+     * @param sendCallback {@link SendCallback}
+     * @param timeout send timeout with millis
+     */
+    public <T extends Message> void asyncSend(String destination, Collection<T> messages, SendCallback sendCallback, long timeout) {
+        if (Objects.isNull(messages) || messages.size() == 0) {
+            log.error("asyncSend with batch failed. destination:{}, messages is empty ", destination);
+            throw new IllegalArgumentException("`messages` can not be empty");
+        }
+
+        try {
+            Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>();
+            for (Message msg : messages) {
+                if (Objects.isNull(msg) || Objects.isNull(msg.getPayload())) {
+                    log.warn("Found a message empty in the batch, skip it");
+                    continue;
+                }
+                rmqMsgs.add(this.createRocketMqMessage(destination, msg));
+            }
+            producer.send(rmqMsgs, sendCallback, timeout);
+        } catch (Exception e) {
+            log.error("asyncSend with batch failed. destination:{}, messages.size:{} ", destination, messages.size());
+            throw new MessagingException(e.getMessage(), e);
+        }
+    }
+
+    /**
      * Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)} with send timeout specified in
      * addition.
      *
@@ -990,6 +1081,23 @@
         return Object.class;
     }
 
+    private MessageBatch batch(Collection<org.apache.rocketmq.common.message.Message> msgs) throws MQClientException {
+        MessageBatch msgBatch;
+        try {
+            msgBatch = MessageBatch.generateFromList(msgs);
+            for (org.apache.rocketmq.common.message.Message message : msgBatch) {
+                Validators.checkMessage(message, producer);
+                MessageClientIDSetter.setUniqID(message);
+                message.setTopic(producer.withNamespace(message.getTopic()));
+            }
+            msgBatch.setBody(msgBatch.encode());
+        } catch (Exception e) {
+            throw new MQClientException("Failed to initiate the MessageBatch", e);
+        }
+        msgBatch.setTopic(producer.withNamespace(msgBatch.getTopic()));
+        return msgBatch;
+    }
+
     /**
      * receive message  in pull mode.
      *
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
index 2693a07..2a42fe4 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
@@ -34,6 +34,7 @@
 import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
 import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.SendStatus;
@@ -120,6 +121,8 @@
     private String selectorExpression;
     private MessageModel messageModel;
     private long consumeTimeout;
+    private int maxReconsumeTimes;
+    private int replyTimeout;
     private String tlsEnable;
 
     public long getSuspendCurrentQueueTimeMillis() {
@@ -220,6 +223,8 @@
         this.selectorType = anno.selectorType();
         this.selectorExpression = anno.selectorExpression();
         this.consumeTimeout = anno.consumeTimeout();
+        this.maxReconsumeTimes = anno.maxReconsumeTimes();
+        this.replyTimeout = anno.replyTimeout();
         this.tlsEnable = anno.tlsEnable();
     }
 
@@ -338,15 +343,15 @@
     @Override
     public String toString() {
         return "DefaultRocketMQListenerContainer{" +
-                "consumerGroup='" + consumerGroup + '\'' +
-                ", nameServer='" + nameServer + '\'' +
-                ", topic='" + topic + '\'' +
-                ", consumeMode=" + consumeMode +
-                ", selectorType=" + selectorType +
-                ", selectorExpression='" + selectorExpression + '\'' +
+            "consumerGroup='" + consumerGroup + '\'' +
+            ", nameServer='" + nameServer + '\'' +
+            ", topic='" + topic + '\'' +
+            ", consumeMode=" + consumeMode +
+            ", selectorType=" + selectorType +
+            ", selectorExpression='" + selectorExpression + '\'' +
                 ", messageModel=" + messageModel + '\'' +
                 ", tlsEnable=" + tlsEnable +
-                '}';
+            '}';
     }
 
     public void setName(String name) {
@@ -408,7 +413,9 @@
             Message<?> message = MessageBuilder.withPayload(replyContent).build();
 
             org.apache.rocketmq.common.message.Message replyMessage = MessageUtil.createReplyMessage(messageExt, convertToBytes(message));
-            consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(replyMessage, new SendCallback() {
+            DefaultMQProducer producer = consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer();
+            producer.setSendMsgTimeout(replyTimeout);
+            producer.send(replyMessage, new SendCallback() {
                 @Override public void onSuccess(SendResult sendResult) {
                     if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                         log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());
@@ -468,7 +475,7 @@
 
     @SuppressWarnings("unchecked")
     private Object doConvertMessage(MessageExt messageExt) {
-        if (Objects.equals(messageType, MessageExt.class)) {
+        if (Objects.equals(messageType, MessageExt.class) || Objects.equals(messageType, org.apache.rocketmq.common.message.Message.class)) {
             return messageExt;
         } else {
             String str = new String(messageExt.getBody(), Charset.forName(charset));
@@ -589,7 +596,7 @@
             consumer.setConsumeThreadMin(consumeThreadMax);
         }
         consumer.setConsumeTimeout(consumeTimeout);
-
+        consumer.setMaxReconsumeTimes(maxReconsumeTimes);
         switch (messageModel) {
             case BROADCASTING:
                 consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
@@ -634,4 +641,4 @@
 
     }
 
-}
+}
\ No newline at end of file
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
index 2030fae..e8fbc94 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
@@ -287,7 +287,8 @@
         char separator = '@';
         StringBuilder instanceName = new StringBuilder();
         instanceName.append(identify)
-                .append(separator).append(UtilAll.getPid());
+                .append(separator).append(UtilAll.getPid())
+                .append(separator).append(System.nanoTime());
         return instanceName.toString();
     }
 
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
index 2103da2..4948c81 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.rocketmq.spring.core;
 
+import java.util.ArrayList;
+import java.util.List;
 import javax.annotation.Resource;
 import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.client.producer.SendCallback;
@@ -94,6 +96,26 @@
             assertThat(e).hasMessageContaining("org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [127.0.0.1:9876] failed");
         }
     }
+    @Test
+    public void testAsyncBatchSendMessage() {
+        List<Message> messages = new ArrayList<>();
+        for (int i = 0; i < 3; i++) {
+            messages.add(MessageBuilder.withPayload("payload" + i).build());
+        }
+        try {
+            rocketMQTemplate.asyncSend(topic, messages, new SendCallback() {
+                @Override public void onSuccess(SendResult sendResult) {
+
+                }
+
+                @Override public void onException(Throwable e) {
+
+                }
+            });
+        } catch (MessagingException e) {
+            assertThat(e).hasMessageContaining("org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [127.0.0.1:9876] failed");
+        }
+    }
 
     @Test
     public void testReceiveMessage() {
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
index 7133b98..1304b9f 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
@@ -19,13 +19,20 @@
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.lang.reflect.ParameterizedType;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.apache.rocketmq.spring.core.RocketMQListener;
 import org.apache.rocketmq.spring.core.RocketMQReplyListener;
 import org.junit.Test;
 import org.springframework.core.MethodParameter;
-import org.springframework.messaging.Message;
 import org.springframework.messaging.converter.CompositeMessageConverter;
 import org.springframework.messaging.converter.MappingJackson2MessageConverter;
 import org.springframework.messaging.converter.StringMessageConverter;
@@ -33,9 +40,12 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
-import org.springframework.messaging.support.MessageBuilder;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class DefaultRocketMQListenerContainerTest {
     @Test
@@ -52,6 +62,15 @@
         Class result = (Class) getMessageType.invoke(listenerContainer);
         assertThat(result.getName().equals(String.class.getName()));
 
+        //support message
+        listenerContainer.setRocketMQListener(new RocketMQListener<Message>() {
+            @Override
+            public void onMessage(Message message) {
+            }
+        });
+        result = (Class) getMessageType.invoke(listenerContainer);
+        assertThat(result.getName().equals(Message.class.getName()));
+
         listenerContainer.setRocketMQListener(new RocketMQListener<MessageExt>() {
             @Override
             public void onMessage(MessageExt message) {
@@ -60,6 +79,7 @@
         result = (Class) getMessageType.invoke(listenerContainer);
         assertThat(result.getName().equals(MessageExt.class.getName()));
 
+
         listenerContainer.setRocketMQReplyListener(new RocketMQReplyListener<MessageExt, String>() {
             @Override
             public String onMessage(MessageExt message) {
@@ -112,6 +132,26 @@
         MessageExt result2 = (MessageExt) doConvertMessage.invoke(listenerContainer, messageExt);
         assertThat(result2).isEqualTo(messageExt);
 
+        //support message
+        listenerContainer.setRocketMQListener(new RocketMQListener<Message>() {
+            @Override
+            public void onMessage(Message message) {
+            }
+        });
+        Field messageType3 = DefaultRocketMQListenerContainer.class.getDeclaredField("messageType");
+        messageType3.setAccessible(true);
+        messageType3.set(listenerContainer, Message.class);
+        Message message = new MessageExt(0, System.currentTimeMillis(), null, System.currentTimeMillis(), null, null);
+        message.setBody("hello".getBytes());
+        Message result3 = (Message) doConvertMessage.invoke(listenerContainer, message);
+        assertThat(result3).isEqualTo(message);
+
+        listenerContainer.setRocketMQListener(new RocketMQListener<User>() {
+            @Override
+            public void onMessage(User message) {
+            }
+        });
+
         listenerContainer.setRocketMQListener(new RocketMQListener<User>() {
             @Override
             public void onMessage(User message) {
@@ -153,6 +193,45 @@
         assertThat(methodParameter.getParameterType() == ArrayList.class);
     }
 
+    @Test
+    public void testHandleMessage() throws Exception {
+        DefaultRocketMQListenerContainer listenerContainer = new DefaultRocketMQListenerContainer();
+        Method handleMessage = DefaultRocketMQListenerContainer.class.getDeclaredMethod("handleMessage", MessageExt.class);
+        handleMessage.setAccessible(true);
+        listenerContainer.setRocketMQListener(new RocketMQListener<String>() {
+            @Override
+            public void onMessage(String message) {
+            }
+        });
+        Field messageType = DefaultRocketMQListenerContainer.class.getDeclaredField("messageType");
+        messageType.setAccessible(true);
+        messageType.set(listenerContainer, String.class);
+        MessageExt messageExt = new MessageExt(0, System.currentTimeMillis(), null, System.currentTimeMillis(), null, null);
+        MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_CLUSTER, "defaultCluster");
+        messageExt.setBody("hello".getBytes());
+        handleMessage.invoke(listenerContainer, messageExt);
+
+        // reply message
+        listenerContainer.setRocketMQListener(null);
+        DefaultMQPushConsumer consumer = mock(DefaultMQPushConsumer.class);
+        DefaultMQPushConsumerImpl pushConsumer = mock(DefaultMQPushConsumerImpl.class);
+        MQClientInstance mqClientInstance = mock(MQClientInstance.class);
+        DefaultMQProducer producer = mock(DefaultMQProducer.class);
+        when(consumer.getDefaultMQPushConsumerImpl()).thenReturn(pushConsumer);
+        when(pushConsumer.getmQClientFactory()).thenReturn(mqClientInstance);
+        when(mqClientInstance.getDefaultMQProducer()).thenReturn(producer);
+        listenerContainer.setConsumer(consumer);
+        listenerContainer.setMessageConverter(new CompositeMessageConverter(Arrays.asList(new StringMessageConverter(), new MappingJackson2MessageConverter())));
+        doNothing().when(producer).send(any(MessageExt.class), any(SendCallback.class));
+        listenerContainer.setRocketMQReplyListener(new RocketMQReplyListener<String, String>() {
+            @Override
+            public String onMessage(String message) {
+                return "test";
+            }
+        });
+        handleMessage.invoke(listenerContainer, messageExt);
+    }
+
     class User {
         private String userName;
         private int userAge;
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQUtilTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQUtilTest.java
index e558483..30b8026 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQUtilTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQUtilTest.java
@@ -135,6 +135,6 @@
     public void testGetInstanceName() {
         String nameServer = "127.0.0.1:9876";
         String expected = "127.0.0.1:9876@";
-        assertEquals(expected + UtilAll.getPid(), RocketMQUtil.getInstanceName(nameServer));
+        assertTrue(RocketMQUtil.getInstanceName(nameServer).startsWith(expected + UtilAll.getPid()));
     }
 }
\ No newline at end of file