[ISSUE #343] Sending batch messages orderly with RocketMQTemplate (#344)

* change log level from info to debug to avoid lots of log

* support batch sending orderly

* mv batch method

* add test to send batch message orderly

* format code
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
index f8140d4..3acd8a6 100644
--- a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
@@ -123,6 +123,9 @@
         // Send a batch of strings
         testBatchMessages();
 
+        // send a bath of strings orderly
+        testSendBatchMessageOrderly();
+
         // Send transactional messages using rocketMQTemplate
         testRocketMQTemplateTransaction();
 
@@ -181,6 +184,21 @@
         System.out.printf("--- Batch messages send result :" + sr);
     }
 
+    private void testSendBatchMessageOrderly() {
+        for (int q = 0; q < 4; q++) {
+            // send to 4 queues
+            List<Message> msgs = new ArrayList<Message>();
+            for (int i = 0; i < 10; i++) {
+                int msgIndex = q * 10 + i;
+                String msg = String.format("Hello RocketMQ Batch Msg#%d to queue: %d", msgIndex, q);
+                msgs.add(MessageBuilder.withPayload(msg).
+                        setHeader(RocketMQHeaders.KEYS, "KEY_" + msgIndex).build());
+            }
+            SendResult sr = rocketMQTemplate.syncSendOrderly(springTopic, msgs, q + "", 60000);
+            System.out.println("--- Batch messages orderly to queue :" + sr.getMessageQueue().getQueueId() + " send result :" + sr);
+        }
+    }
+
     private void testRocketMQTemplateTransaction() throws MessagingException {
         String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
         for (int i = 0; i < 10; i++) {
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index a18e781..dd6a4d8 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.spring.core;
 
+import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
@@ -27,6 +28,8 @@
 import org.apache.rocketmq.client.producer.TransactionMQProducer;
 import org.apache.rocketmq.client.producer.TransactionSendResult;
 import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
+import org.apache.rocketmq.common.message.MessageBatch;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
 import org.apache.rocketmq.spring.support.RocketMQUtil;
@@ -656,6 +659,53 @@
     }
 
     /**
+     * syncSend batch messages orderly.
+     *
+     * @param destination formats: `topicName:tags`
+     * @param messages    Collection of {@link org.springframework.messaging.Message}
+     * @param hashKey     use this key to select queue. for example: orderId, productId ...
+     * @return {@link SendResult}
+     */
+    public <T extends Message> SendResult syncSendOrderly(String destination, Collection<T> messages, String hashKey) {
+        return syncSendOrderly(destination, messages, hashKey, producer.getSendMsgTimeout());
+    }
+
+    /**
+     * Same to {@link #syncSendOrderly(String, Collection, String)} with send timeout specified in addition.
+     *
+     * @param destination formats: `topicName:tags`
+     * @param messages    Collection of {@link org.springframework.messaging.Message}
+     * @param hashKey     use this key to select queue. for example: orderId, productId ...
+     * @param timeout     send timeout with millis
+     * @return {@link SendResult}
+     */
+    public <T extends Message> SendResult syncSendOrderly(String destination, Collection<T> messages, String hashKey, long timeout) {
+        if (Objects.isNull(messages) || messages.isEmpty()) {
+            log.error("syncSendOrderly failed. destination:{}, message is null ", destination);
+            throw new IllegalArgumentException("`messages` can not be empty");
+        }
+        try {
+            long now = System.currentTimeMillis();
+            Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>();
+            for (T message : messages) {
+                if (Objects.isNull(message)) {
+                    continue;
+                }
+                rmqMsgs.add(this.createRocketMqMessage(destination, message));
+            }
+            MessageBatch messageBatch = batch(rmqMsgs);
+            SendResult sendResult = producer.send(messageBatch, this.messageQueueSelector, hashKey, timeout);
+            long costTime = System.currentTimeMillis() - now;
+            if (log.isDebugEnabled()) {
+                log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
+            }
+            return sendResult;
+        } catch (Exception e) {
+            throw new MessagingException(e.getMessage(), e);
+        }
+    }
+
+    /**
      * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout and delay level specified in
      * addition.
      *
@@ -990,6 +1040,23 @@
         return Object.class;
     }
 
+    private MessageBatch batch(Collection<org.apache.rocketmq.common.message.Message> msgs) throws MQClientException {
+        MessageBatch msgBatch;
+        try {
+            msgBatch = MessageBatch.generateFromList(msgs);
+            for (org.apache.rocketmq.common.message.Message message : msgBatch) {
+                Validators.checkMessage(message, producer);
+                MessageClientIDSetter.setUniqID(message);
+                message.setTopic(producer.withNamespace(message.getTopic()));
+            }
+            msgBatch.setBody(msgBatch.encode());
+        } catch (Exception e) {
+            throw new MQClientException("Failed to initiate the MessageBatch", e);
+        }
+        msgBatch.setTopic(producer.withNamespace(msgBatch.getTopic()));
+        return msgBatch;
+    }
+
     /**
      * receive message  in pull mode.
      *