[ISSUE #81]Fix RocketMQTemplate.syncSend collection type method signature (#150)
* fix RocketMQTemplate.syncSend collection type method signature
* add a custom batch message send test code;
* change batch message send unit test code
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index e70d304..528615d 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -134,7 +134,7 @@
* @param timeout send timeout with millis
* @return {@link SendResult}
*/
- public SendResult syncSend(String destination, Collection<Message<?>> messages, long timeout) {
+ public <T extends Message> SendResult syncSend(String destination, Collection<T> messages, long timeout) {
if (Objects.isNull(messages) || messages.size() == 0) {
log.error("syncSend with batch failed. destination:{}, messages is empty ", destination);
throw new IllegalArgumentException("`messages` can not be empty");
@@ -144,7 +144,7 @@
long now = System.currentTimeMillis();
Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>();
org.apache.rocketmq.common.message.Message rocketMsg;
- for (Message<?> msg:messages) {
+ for (Message msg:messages) {
if (Objects.isNull(msg) || Objects.isNull(msg.getPayload())) {
log.warn("Found a message empty in the batch, skip it");
continue;
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
index d10cb1b..f4deb45 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
@@ -129,7 +129,7 @@
public static org.apache.rocketmq.common.message.Message convertToRocketMessage(
ObjectMapper objectMapper, String charset,
- String destination, org.springframework.messaging.Message<?> message) {
+ String destination, org.springframework.messaging.Message message) {
Object payloadObj = message.getPayload();
byte[] payloads;
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
index 9fa0fa4..8f2a5ab 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
@@ -19,6 +19,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
@@ -38,6 +40,11 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
+import org.springframework.messaging.MessagingException;
+import org.springframework.messaging.support.GenericMessage;
+
+import java.util.ArrayList;
+import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
@@ -177,6 +184,27 @@
}
@Test
+ public void testBatchSendMessage() {
+ runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876",
+ "rocketmq.producer.group=spring_rocketmq").
+ run((context) -> {
+ RocketMQTemplate rocketMQTemplate = context.getBean(RocketMQTemplate.class);
+ List<GenericMessage<String>> batchMessages = new ArrayList<GenericMessage<String>>();
+
+ String errorMsg = null;
+ try {
+ SendResult customSendResult = rocketMQTemplate.syncSend("test", batchMessages, 60000);
+ } catch (IllegalArgumentException e) {
+ // it will be throw IllegalArgumentException: `messages` can not be empty
+ errorMsg = e.getMessage();
+ }
+
+ // that means the rocketMQTemplate.syncSend is chosen the correct type method
+ Assert.assertEquals("`messages` can not be empty", errorMsg);
+ });
+
+ }
+
public void testPlaceholdersListenerContainer() {
runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876",
"demo.placeholders.consumer.group = abc3",
@@ -196,7 +224,6 @@
});
}
-
@Configuration
static class TestConfig {