[ISSUE #343] Sending batch messages orderly with RocketMQTemplate (#344)
* change log level from info to debug to avoid lots of log
* support batch sending orderly
* mv batch method
* add test to send batch message orderly
* format code
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
index f8140d4..3acd8a6 100644
--- a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
@@ -123,6 +123,9 @@
// Send a batch of strings
testBatchMessages();
+ // send a bath of strings orderly
+ testSendBatchMessageOrderly();
+
// Send transactional messages using rocketMQTemplate
testRocketMQTemplateTransaction();
@@ -181,6 +184,21 @@
System.out.printf("--- Batch messages send result :" + sr);
}
+ private void testSendBatchMessageOrderly() {
+ for (int q = 0; q < 4; q++) {
+ // send to 4 queues
+ List<Message> msgs = new ArrayList<Message>();
+ for (int i = 0; i < 10; i++) {
+ int msgIndex = q * 10 + i;
+ String msg = String.format("Hello RocketMQ Batch Msg#%d to queue: %d", msgIndex, q);
+ msgs.add(MessageBuilder.withPayload(msg).
+ setHeader(RocketMQHeaders.KEYS, "KEY_" + msgIndex).build());
+ }
+ SendResult sr = rocketMQTemplate.syncSendOrderly(springTopic, msgs, q + "", 60000);
+ System.out.println("--- Batch messages orderly to queue :" + sr.getMessageQueue().getQueueId() + " send result :" + sr);
+ }
+ }
+
private void testRocketMQTemplateTransaction() throws MessagingException {
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index a18e781..dd6a4d8 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.spring.core;
+import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
@@ -27,6 +28,8 @@
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
+import org.apache.rocketmq.common.message.MessageBatch;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.apache.rocketmq.spring.support.RocketMQUtil;
@@ -656,6 +659,53 @@
}
/**
+ * syncSend batch messages orderly.
+ *
+ * @param destination formats: `topicName:tags`
+ * @param messages Collection of {@link org.springframework.messaging.Message}
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @return {@link SendResult}
+ */
+ public <T extends Message> SendResult syncSendOrderly(String destination, Collection<T> messages, String hashKey) {
+ return syncSendOrderly(destination, messages, hashKey, producer.getSendMsgTimeout());
+ }
+
+ /**
+ * Same to {@link #syncSendOrderly(String, Collection, String)} with send timeout specified in addition.
+ *
+ * @param destination formats: `topicName:tags`
+ * @param messages Collection of {@link org.springframework.messaging.Message}
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param timeout send timeout with millis
+ * @return {@link SendResult}
+ */
+ public <T extends Message> SendResult syncSendOrderly(String destination, Collection<T> messages, String hashKey, long timeout) {
+ if (Objects.isNull(messages) || messages.isEmpty()) {
+ log.error("syncSendOrderly failed. destination:{}, message is null ", destination);
+ throw new IllegalArgumentException("`messages` can not be empty");
+ }
+ try {
+ long now = System.currentTimeMillis();
+ Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>();
+ for (T message : messages) {
+ if (Objects.isNull(message)) {
+ continue;
+ }
+ rmqMsgs.add(this.createRocketMqMessage(destination, message));
+ }
+ MessageBatch messageBatch = batch(rmqMsgs);
+ SendResult sendResult = producer.send(messageBatch, this.messageQueueSelector, hashKey, timeout);
+ long costTime = System.currentTimeMillis() - now;
+ if (log.isDebugEnabled()) {
+ log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
+ }
+ return sendResult;
+ } catch (Exception e) {
+ throw new MessagingException(e.getMessage(), e);
+ }
+ }
+
+ /**
* Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout and delay level specified in
* addition.
*
@@ -990,6 +1040,23 @@
return Object.class;
}
+ private MessageBatch batch(Collection<org.apache.rocketmq.common.message.Message> msgs) throws MQClientException {
+ MessageBatch msgBatch;
+ try {
+ msgBatch = MessageBatch.generateFromList(msgs);
+ for (org.apache.rocketmq.common.message.Message message : msgBatch) {
+ Validators.checkMessage(message, producer);
+ MessageClientIDSetter.setUniqID(message);
+ message.setTopic(producer.withNamespace(message.getTopic()));
+ }
+ msgBatch.setBody(msgBatch.encode());
+ } catch (Exception e) {
+ throw new MQClientException("Failed to initiate the MessageBatch", e);
+ }
+ msgBatch.setTopic(producer.withNamespace(msgBatch.getTopic()));
+ return msgBatch;
+ }
+
/**
* receive message in pull mode.
*