Merge branch 'master' into feature/ISSUE_#219_support_configable_tls
diff --git a/rocketmq-spring-boot-parent/pom.xml b/rocketmq-spring-boot-parent/pom.xml
index 12e0807..6f1c4ae 100644
--- a/rocketmq-spring-boot-parent/pom.xml
+++ b/rocketmq-spring-boot-parent/pom.xml
@@ -16,7 +16,8 @@
~ limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
@@ -40,7 +41,7 @@
<rocketmq.spring.boot.version>2.2.1-SNAPSHOT</rocketmq.spring.boot.version>
- <rocketmq.version>4.8.0</rocketmq.version>
+ <rocketmq.version>4.9.1</rocketmq.version>
<slf4j.version>1.7.25</slf4j.version>
<jackson.version>2.11.1</jackson.version>
<fastjson.version>1.2.72</fastjson.version>
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumerWithReplyString.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumerWithReplyString.java
index a57f20d..51f4e29 100644
--- a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumerWithReplyString.java
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumerWithReplyString.java
@@ -25,7 +25,8 @@
* The consumer that replying String
*/
@Service
-@RocketMQMessageListener(topic = "${demo.rocketmq.stringRequestTopic}", consumerGroup = "${demo.rocketmq.stringRequestConsumer}", selectorExpression = "${demo.rocketmq.tag}")
+@RocketMQMessageListener(topic = "${demo.rocketmq.stringRequestTopic}", consumerGroup = "${demo.rocketmq.stringRequestConsumer}",
+ selectorExpression = "${demo.rocketmq.tag}", replyTimeout = 10000)
public class StringConsumerWithReplyString implements RocketMQReplyListener<String, String> {
@Override
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
index f8140d4..3acd8a6 100644
--- a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
@@ -123,6 +123,9 @@
// Send a batch of strings
testBatchMessages();
+ // send a bath of strings orderly
+ testSendBatchMessageOrderly();
+
// Send transactional messages using rocketMQTemplate
testRocketMQTemplateTransaction();
@@ -181,6 +184,21 @@
System.out.printf("--- Batch messages send result :" + sr);
}
+ private void testSendBatchMessageOrderly() {
+ for (int q = 0; q < 4; q++) {
+ // send to 4 queues
+ List<Message> msgs = new ArrayList<Message>();
+ for (int i = 0; i < 10; i++) {
+ int msgIndex = q * 10 + i;
+ String msg = String.format("Hello RocketMQ Batch Msg#%d to queue: %d", msgIndex, q);
+ msgs.add(MessageBuilder.withPayload(msg).
+ setHeader(RocketMQHeaders.KEYS, "KEY_" + msgIndex).build());
+ }
+ SendResult sr = rocketMQTemplate.syncSendOrderly(springTopic, msgs, q + "", 60000);
+ System.out.println("--- Batch messages orderly to queue :" + sr.getMessageQueue().getQueueId() + " send result :" + sr);
+ }
+ }
+
private void testRocketMQTemplateTransaction() throws MessagingException {
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java
index cc78ff9..84d641a 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java
@@ -37,6 +37,7 @@
String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
+ String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
/**
* The component name of the Producer configuration.
@@ -94,8 +95,19 @@
* Maximum number of messages pulled each time.
*/
int pullBatchSize() default 10;
+
/**
* The property of "tlsEnable" default false.
*/
String tlsEnable() default "false";
-}
+
+ /**
+ * Switch flag instance for message trace.
+ */
+ boolean enableMsgTrace() default false;
+
+ /**
+ * The name value of message trace topic.If you don't config,you can use the default trace topic name.
+ */
+ String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;
+}
\ No newline at end of file
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java
index ccf40ef..8e80147 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java
@@ -80,7 +80,7 @@
/**
* Switch flag instance for message trace.
*/
- boolean enableMsgTrace() default true;
+ boolean enableMsgTrace() default false;
/**
* The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
index f159a63..e80f328 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
@@ -76,11 +76,24 @@
int consumeThreadMax() default 64;
/**
+ * Max re-consume times.
+ *
+ * In concurrently mode, -1 means 16;
+ * In orderly mode, -1 means Integer.MAX_VALUE.
+ */
+ int maxReconsumeTimes() default -1;
+
+ /**
* Maximum amount of time in minutes a message may block the consuming thread.
*/
long consumeTimeout() default 15L;
/**
+ * Timeout for sending reply messages.
+ */
+ int replyTimeout() default 3000;
+
+ /**
* The property of "access-key".
*/
String accessKey() default ACCESS_KEY_PLACEHOLDER;
@@ -93,7 +106,7 @@
/**
* Switch flag instance for message trace.
*/
- boolean enableMsgTrace() default true;
+ boolean enableMsgTrace() default false;
/**
* The name value of message trace topic.If you don't config,you can use the default trace topic name.
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java
index a42e0d3..f2ece8a 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java
@@ -122,15 +122,16 @@
String accessChannel = resolvePlaceholders(annotation.accessChannel(), rocketMQProperties.getAccessChannel());
MessageModel messageModel = annotation.messageModel();
SelectorType selectorType = annotation.selectorType();
- String selectorExpression = annotation.selectorExpression();
+ String selectorExpression = resolvePlaceholders(annotation.selectorExpression(), consumerConfig.getSelectorExpression());
String ak = resolvePlaceholders(annotation.accessKey(), consumerConfig.getAccessKey());
String sk = resolvePlaceholders(annotation.secretKey(), consumerConfig.getSecretKey());
int pullBatchSize = annotation.pullBatchSize();
//if String is not is equal "true" TLS mode will represent the as default value false
boolean useTLS = new Boolean(environment.resolvePlaceholders(annotation.tlsEnable()));
-
DefaultLitePullConsumer litePullConsumer = RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,
- groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize,useTLS);
+ groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize, useTLS);
+ litePullConsumer.setEnableMsgTrace(annotation.enableMsgTrace());
+ litePullConsumer.setCustomizedTraceTopic(resolvePlaceholders(annotation.customizedTraceTopic(), consumerConfig.getCustomizedTraceTopic()));
return litePullConsumer;
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
index b7f6c7c..ea00432 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
@@ -144,6 +144,8 @@
DefaultLitePullConsumer litePullConsumer = RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,
groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize, useTLS);
+ litePullConsumer.setEnableMsgTrace(consumerConfig.isEnableMsgTrace());
+ litePullConsumer.setCustomizedTraceTopic(consumerConfig.getCustomizedTraceTopic());
return litePullConsumer;
}
@@ -176,4 +178,4 @@
static class DefaultLitePullConsumerExistsCondition {
}
}
-}
+}
\ No newline at end of file
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
index 2fe430b..d6c8a90 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
@@ -125,7 +125,7 @@
/**
* Switch flag instance for message trace.
*/
- private boolean enableMsgTrace = true;
+ private boolean enableMsgTrace = false;
/**
* The name value of message trace topic.If you don't config,you can use the default trace topic name.
@@ -285,6 +285,16 @@
private int pullBatchSize = 10;
/**
+ * Switch flag instance for message trace.
+ */
+ private boolean enableMsgTrace = false;
+
+ /**
+ * The name value of message trace topic.If you don't config,you can use the default trace topic name.
+ */
+ private String customizedTraceTopic = TopicValidator.RMQ_SYS_TRACE_TOPIC;
+
+ /**
* listener configuration container
* the pattern is like this:
* group1.topic1 = false
@@ -370,6 +380,22 @@
this.listeners = listeners;
}
+ public boolean isEnableMsgTrace() {
+ return enableMsgTrace;
+ }
+
+ public void setEnableMsgTrace(boolean enableMsgTrace) {
+ this.enableMsgTrace = enableMsgTrace;
+ }
+
+ public String getCustomizedTraceTopic() {
+ return customizedTraceTopic;
+ }
+
+ public void setCustomizedTraceTopic(String customizedTraceTopic) {
+ this.customizedTraceTopic = customizedTraceTopic;
+ }
+
public boolean isTlsEnable() {
return tlsEnable;
}
@@ -379,4 +405,4 @@
}
}
-}
+}
\ No newline at end of file
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index a18e781..8b1af2a 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.spring.core;
+import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
@@ -27,6 +28,8 @@
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
+import org.apache.rocketmq.common.message.MessageBatch;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.apache.rocketmq.spring.support.RocketMQUtil;
@@ -656,6 +659,53 @@
}
/**
+ * syncSend batch messages orderly.
+ *
+ * @param destination formats: `topicName:tags`
+ * @param messages Collection of {@link org.springframework.messaging.Message}
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @return {@link SendResult}
+ */
+ public <T extends Message> SendResult syncSendOrderly(String destination, Collection<T> messages, String hashKey) {
+ return syncSendOrderly(destination, messages, hashKey, producer.getSendMsgTimeout());
+ }
+
+ /**
+ * Same to {@link #syncSendOrderly(String, Collection, String)} with send timeout specified in addition.
+ *
+ * @param destination formats: `topicName:tags`
+ * @param messages Collection of {@link org.springframework.messaging.Message}
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param timeout send timeout with millis
+ * @return {@link SendResult}
+ */
+ public <T extends Message> SendResult syncSendOrderly(String destination, Collection<T> messages, String hashKey, long timeout) {
+ if (Objects.isNull(messages) || messages.isEmpty()) {
+ log.error("syncSendOrderly failed. destination:{}, message is null ", destination);
+ throw new IllegalArgumentException("`messages` can not be empty");
+ }
+ try {
+ long now = System.currentTimeMillis();
+ Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>();
+ for (T message : messages) {
+ if (Objects.isNull(message)) {
+ continue;
+ }
+ rmqMsgs.add(this.createRocketMqMessage(destination, message));
+ }
+ MessageBatch messageBatch = batch(rmqMsgs);
+ SendResult sendResult = producer.send(messageBatch, this.messageQueueSelector, hashKey, timeout);
+ long costTime = System.currentTimeMillis() - now;
+ if (log.isDebugEnabled()) {
+ log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
+ }
+ return sendResult;
+ } catch (Exception e) {
+ throw new MessagingException(e.getMessage(), e);
+ }
+ }
+
+ /**
* Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout and delay level specified in
* addition.
*
@@ -738,6 +788,47 @@
}
/**
+ * asyncSend batch messages
+ *
+ * @param destination formats: `topicName:tags`
+ * @param messages Collection of {@link org.springframework.messaging.Message}
+ * @param sendCallback {@link SendCallback}
+ */
+ public <T extends Message> void asyncSend(String destination, Collection<T> messages, SendCallback sendCallback) {
+ asyncSend(destination, messages, sendCallback, producer.getSendMsgTimeout());
+ }
+
+ /**
+ * asyncSend batch messages in a given timeout.
+ *
+ * @param destination formats: `topicName:tags`
+ * @param messages Collection of {@link org.springframework.messaging.Message}
+ * @param sendCallback {@link SendCallback}
+ * @param timeout send timeout with millis
+ */
+ public <T extends Message> void asyncSend(String destination, Collection<T> messages, SendCallback sendCallback, long timeout) {
+ if (Objects.isNull(messages) || messages.size() == 0) {
+ log.error("asyncSend with batch failed. destination:{}, messages is empty ", destination);
+ throw new IllegalArgumentException("`messages` can not be empty");
+ }
+
+ try {
+ Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>();
+ for (Message msg : messages) {
+ if (Objects.isNull(msg) || Objects.isNull(msg.getPayload())) {
+ log.warn("Found a message empty in the batch, skip it");
+ continue;
+ }
+ rmqMsgs.add(this.createRocketMqMessage(destination, msg));
+ }
+ producer.send(rmqMsgs, sendCallback, timeout);
+ } catch (Exception e) {
+ log.error("asyncSend with batch failed. destination:{}, messages.size:{} ", destination, messages.size());
+ throw new MessagingException(e.getMessage(), e);
+ }
+ }
+
+ /**
* Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)} with send timeout specified in
* addition.
*
@@ -990,6 +1081,23 @@
return Object.class;
}
+ private MessageBatch batch(Collection<org.apache.rocketmq.common.message.Message> msgs) throws MQClientException {
+ MessageBatch msgBatch;
+ try {
+ msgBatch = MessageBatch.generateFromList(msgs);
+ for (org.apache.rocketmq.common.message.Message message : msgBatch) {
+ Validators.checkMessage(message, producer);
+ MessageClientIDSetter.setUniqID(message);
+ message.setTopic(producer.withNamespace(message.getTopic()));
+ }
+ msgBatch.setBody(msgBatch.encode());
+ } catch (Exception e) {
+ throw new MQClientException("Failed to initiate the MessageBatch", e);
+ }
+ msgBatch.setTopic(producer.withNamespace(msgBatch.getTopic()));
+ return msgBatch;
+ }
+
/**
* receive message in pull mode.
*
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
index 2693a07..2a42fe4 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
@@ -34,6 +34,7 @@
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
@@ -120,6 +121,8 @@
private String selectorExpression;
private MessageModel messageModel;
private long consumeTimeout;
+ private int maxReconsumeTimes;
+ private int replyTimeout;
private String tlsEnable;
public long getSuspendCurrentQueueTimeMillis() {
@@ -220,6 +223,8 @@
this.selectorType = anno.selectorType();
this.selectorExpression = anno.selectorExpression();
this.consumeTimeout = anno.consumeTimeout();
+ this.maxReconsumeTimes = anno.maxReconsumeTimes();
+ this.replyTimeout = anno.replyTimeout();
this.tlsEnable = anno.tlsEnable();
}
@@ -338,15 +343,15 @@
@Override
public String toString() {
return "DefaultRocketMQListenerContainer{" +
- "consumerGroup='" + consumerGroup + '\'' +
- ", nameServer='" + nameServer + '\'' +
- ", topic='" + topic + '\'' +
- ", consumeMode=" + consumeMode +
- ", selectorType=" + selectorType +
- ", selectorExpression='" + selectorExpression + '\'' +
+ "consumerGroup='" + consumerGroup + '\'' +
+ ", nameServer='" + nameServer + '\'' +
+ ", topic='" + topic + '\'' +
+ ", consumeMode=" + consumeMode +
+ ", selectorType=" + selectorType +
+ ", selectorExpression='" + selectorExpression + '\'' +
", messageModel=" + messageModel + '\'' +
", tlsEnable=" + tlsEnable +
- '}';
+ '}';
}
public void setName(String name) {
@@ -408,7 +413,9 @@
Message<?> message = MessageBuilder.withPayload(replyContent).build();
org.apache.rocketmq.common.message.Message replyMessage = MessageUtil.createReplyMessage(messageExt, convertToBytes(message));
- consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(replyMessage, new SendCallback() {
+ DefaultMQProducer producer = consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer();
+ producer.setSendMsgTimeout(replyTimeout);
+ producer.send(replyMessage, new SendCallback() {
@Override public void onSuccess(SendResult sendResult) {
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());
@@ -468,7 +475,7 @@
@SuppressWarnings("unchecked")
private Object doConvertMessage(MessageExt messageExt) {
- if (Objects.equals(messageType, MessageExt.class)) {
+ if (Objects.equals(messageType, MessageExt.class) || Objects.equals(messageType, org.apache.rocketmq.common.message.Message.class)) {
return messageExt;
} else {
String str = new String(messageExt.getBody(), Charset.forName(charset));
@@ -589,7 +596,7 @@
consumer.setConsumeThreadMin(consumeThreadMax);
}
consumer.setConsumeTimeout(consumeTimeout);
-
+ consumer.setMaxReconsumeTimes(maxReconsumeTimes);
switch (messageModel) {
case BROADCASTING:
consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
@@ -634,4 +641,4 @@
}
-}
+}
\ No newline at end of file
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
index 2030fae..e8fbc94 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
@@ -287,7 +287,8 @@
char separator = '@';
StringBuilder instanceName = new StringBuilder();
instanceName.append(identify)
- .append(separator).append(UtilAll.getPid());
+ .append(separator).append(UtilAll.getPid())
+ .append(separator).append(System.nanoTime());
return instanceName.toString();
}
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
index 2103da2..4948c81 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.rocketmq.spring.core;
+import java.util.ArrayList;
+import java.util.List;
import javax.annotation.Resource;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.producer.SendCallback;
@@ -94,6 +96,26 @@
assertThat(e).hasMessageContaining("org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [127.0.0.1:9876] failed");
}
}
+ @Test
+ public void testAsyncBatchSendMessage() {
+ List<Message> messages = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ messages.add(MessageBuilder.withPayload("payload" + i).build());
+ }
+ try {
+ rocketMQTemplate.asyncSend(topic, messages, new SendCallback() {
+ @Override public void onSuccess(SendResult sendResult) {
+
+ }
+
+ @Override public void onException(Throwable e) {
+
+ }
+ });
+ } catch (MessagingException e) {
+ assertThat(e).hasMessageContaining("org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [127.0.0.1:9876] failed");
+ }
+ }
@Test
public void testReceiveMessage() {
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
index 7133b98..1304b9f 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
@@ -19,13 +19,20 @@
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQReplyListener;
import org.junit.Test;
import org.springframework.core.MethodParameter;
-import org.springframework.messaging.Message;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.StringMessageConverter;
@@ -33,9 +40,12 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
-import org.springframework.messaging.support.MessageBuilder;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class DefaultRocketMQListenerContainerTest {
@Test
@@ -52,6 +62,15 @@
Class result = (Class) getMessageType.invoke(listenerContainer);
assertThat(result.getName().equals(String.class.getName()));
+ //support message
+ listenerContainer.setRocketMQListener(new RocketMQListener<Message>() {
+ @Override
+ public void onMessage(Message message) {
+ }
+ });
+ result = (Class) getMessageType.invoke(listenerContainer);
+ assertThat(result.getName().equals(Message.class.getName()));
+
listenerContainer.setRocketMQListener(new RocketMQListener<MessageExt>() {
@Override
public void onMessage(MessageExt message) {
@@ -60,6 +79,7 @@
result = (Class) getMessageType.invoke(listenerContainer);
assertThat(result.getName().equals(MessageExt.class.getName()));
+
listenerContainer.setRocketMQReplyListener(new RocketMQReplyListener<MessageExt, String>() {
@Override
public String onMessage(MessageExt message) {
@@ -112,6 +132,26 @@
MessageExt result2 = (MessageExt) doConvertMessage.invoke(listenerContainer, messageExt);
assertThat(result2).isEqualTo(messageExt);
+ //support message
+ listenerContainer.setRocketMQListener(new RocketMQListener<Message>() {
+ @Override
+ public void onMessage(Message message) {
+ }
+ });
+ Field messageType3 = DefaultRocketMQListenerContainer.class.getDeclaredField("messageType");
+ messageType3.setAccessible(true);
+ messageType3.set(listenerContainer, Message.class);
+ Message message = new MessageExt(0, System.currentTimeMillis(), null, System.currentTimeMillis(), null, null);
+ message.setBody("hello".getBytes());
+ Message result3 = (Message) doConvertMessage.invoke(listenerContainer, message);
+ assertThat(result3).isEqualTo(message);
+
+ listenerContainer.setRocketMQListener(new RocketMQListener<User>() {
+ @Override
+ public void onMessage(User message) {
+ }
+ });
+
listenerContainer.setRocketMQListener(new RocketMQListener<User>() {
@Override
public void onMessage(User message) {
@@ -153,6 +193,45 @@
assertThat(methodParameter.getParameterType() == ArrayList.class);
}
+ @Test
+ public void testHandleMessage() throws Exception {
+ DefaultRocketMQListenerContainer listenerContainer = new DefaultRocketMQListenerContainer();
+ Method handleMessage = DefaultRocketMQListenerContainer.class.getDeclaredMethod("handleMessage", MessageExt.class);
+ handleMessage.setAccessible(true);
+ listenerContainer.setRocketMQListener(new RocketMQListener<String>() {
+ @Override
+ public void onMessage(String message) {
+ }
+ });
+ Field messageType = DefaultRocketMQListenerContainer.class.getDeclaredField("messageType");
+ messageType.setAccessible(true);
+ messageType.set(listenerContainer, String.class);
+ MessageExt messageExt = new MessageExt(0, System.currentTimeMillis(), null, System.currentTimeMillis(), null, null);
+ MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_CLUSTER, "defaultCluster");
+ messageExt.setBody("hello".getBytes());
+ handleMessage.invoke(listenerContainer, messageExt);
+
+ // reply message
+ listenerContainer.setRocketMQListener(null);
+ DefaultMQPushConsumer consumer = mock(DefaultMQPushConsumer.class);
+ DefaultMQPushConsumerImpl pushConsumer = mock(DefaultMQPushConsumerImpl.class);
+ MQClientInstance mqClientInstance = mock(MQClientInstance.class);
+ DefaultMQProducer producer = mock(DefaultMQProducer.class);
+ when(consumer.getDefaultMQPushConsumerImpl()).thenReturn(pushConsumer);
+ when(pushConsumer.getmQClientFactory()).thenReturn(mqClientInstance);
+ when(mqClientInstance.getDefaultMQProducer()).thenReturn(producer);
+ listenerContainer.setConsumer(consumer);
+ listenerContainer.setMessageConverter(new CompositeMessageConverter(Arrays.asList(new StringMessageConverter(), new MappingJackson2MessageConverter())));
+ doNothing().when(producer).send(any(MessageExt.class), any(SendCallback.class));
+ listenerContainer.setRocketMQReplyListener(new RocketMQReplyListener<String, String>() {
+ @Override
+ public String onMessage(String message) {
+ return "test";
+ }
+ });
+ handleMessage.invoke(listenerContainer, messageExt);
+ }
+
class User {
private String userName;
private int userAge;
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQUtilTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQUtilTest.java
index e558483..30b8026 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQUtilTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQUtilTest.java
@@ -135,6 +135,6 @@
public void testGetInstanceName() {
String nameServer = "127.0.0.1:9876";
String expected = "127.0.0.1:9876@";
- assertEquals(expected + UtilAll.getPid(), RocketMQUtil.getInstanceName(nameServer));
+ assertTrue(RocketMQUtil.getInstanceName(nameServer).startsWith(expected + UtilAll.getPid()));
}
}
\ No newline at end of file