chore(test):upgrade rocketmq version to 4.6.0 and add unit tests
diff --git a/rocketmq-spring-boot-parent/pom.xml b/rocketmq-spring-boot-parent/pom.xml
index 8352241..ca490b0 100644
--- a/rocketmq-spring-boot-parent/pom.xml
+++ b/rocketmq-spring-boot-parent/pom.xml
@@ -40,7 +40,7 @@
<rocketmq.spring.boot.version>2.0.5-SNAPSHOT</rocketmq.spring.boot.version>
- <rocketmq-version>4.5.2</rocketmq-version>
+ <rocketmq-version>4.6.0</rocketmq-version>
<slf4j.version>1.7.25</slf4j.version>
<jackson.version>2.9.7</jackson.version>
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
index c2e9ad9..553183d 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
@@ -17,10 +17,10 @@
package org.apache.rocketmq.spring.autoconfigure;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
-import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
@@ -28,7 +28,6 @@
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
-import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.junit.Assert;
import org.junit.Test;
@@ -41,18 +40,13 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
-import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.GenericMessage;
-import java.util.ArrayList;
-import java.util.List;
-
import static org.assertj.core.api.Assertions.assertThat;
public class RocketMQAutoConfigurationTest {
private ApplicationContextRunner runner = new ApplicationContextRunner()
- .withConfiguration(AutoConfigurations.of(RocketMQAutoConfiguration.class));
-
+ .withConfiguration(AutoConfigurations.of(RocketMQAutoConfiguration.class));
@Test(expected = NoSuchBeanDefinitionException.class)
public void testDefaultMQProducerNotCreatedByDefault() {
@@ -60,128 +54,133 @@
runner.run(context -> context.getBean(DefaultMQProducer.class));
}
-
@Test
public void testDefaultMQProducerWithRelaxPropertyName() {
runner.withPropertyValues("rocketmq.nameServer=127.0.0.1:9876",
- "rocketmq.producer.group=spring_rocketmq",
- "rocketmq.accessChannel=LOCAL").
- run((context) -> {
- assertThat(context).hasSingleBean(DefaultMQProducer.class);
- assertThat(context).hasSingleBean(RocketMQProperties.class);
- });
+ "rocketmq.producer.group=spring_rocketmq",
+ "rocketmq.accessChannel=LOCAL").
+ run((context) -> {
+ assertThat(context).hasSingleBean(DefaultMQProducer.class);
+ assertThat(context).hasSingleBean(RocketMQProperties.class);
+ });
}
@Test
public void testBadAccessChannelProperty() {
runner.withPropertyValues("rocketmq.nameServer=127.0.0.1:9876",
- "rocketmq.producer.group=spring_rocketmq",
- "rocketmq.accessChannel=LOCAL123").
- run((context) -> {
- //Should throw exception for bad accessChannel property
- assertThat(context).getFailure();
- });
+ "rocketmq.producer.group=spring_rocketmq",
+ "rocketmq.accessChannel=LOCAL123").
+ run((context) -> {
+ //Should throw exception for bad accessChannel property
+ assertThat(context).getFailure();
+ });
}
@Test
public void testDefaultMQProducer() {
runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876",
- "rocketmq.producer.group=spring_rocketmq").
- run((context) -> {
- assertThat(context).hasSingleBean(DefaultMQProducer.class);
- });
-
+ "rocketmq.producer.group=spring_rocketmq").
+ run((context) -> {
+ assertThat(context).hasSingleBean(DefaultMQProducer.class);
+ });
}
@Test
public void testExtRocketMQTemplate() {
runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876").
- withUserConfiguration(TestExtRocketMQTemplateConfig.class, CustomObjectMappersConfig.class).
- run(new ContextConsumer<AssertableApplicationContext>() {
- @Override
- public void accept(AssertableApplicationContext context) throws Throwable {
- Throwable th = context.getStartupFailure();
- System.out.printf("th==" + th + "\n");
- Assert.assertTrue(th instanceof BeanDefinitionValidationException);
- }
- });
+ withUserConfiguration(TestExtRocketMQTemplateConfig.class, CustomObjectMappersConfig.class).
+ run(new ContextConsumer<AssertableApplicationContext>() {
+ @Override
+ public void accept(AssertableApplicationContext context) throws Throwable {
+ Throwable th = context.getStartupFailure();
+ System.out.printf("th==" + th + "\n");
+ Assert.assertTrue(th instanceof BeanDefinitionValidationException);
+ }
+ });
runner.withPropertyValues("rocketmq.name-server=127.0.1.1:9876").
- withUserConfiguration(TestExtRocketMQTemplateConfig.class, CustomObjectMappersConfig.class).
- run((context) -> {
- // No producer on consume side
- assertThat(context).getBean("extRocketMQTemplate").hasFieldOrProperty("producer");
- // Auto-create consume container if existing Bean annotated with @RocketMQMessageListener
- });
+ withUserConfiguration(TestExtRocketMQTemplateConfig.class, CustomObjectMappersConfig.class).
+ run((context) -> {
+ // No producer on consume side
+ assertThat(context).getBean("extRocketMQTemplate").hasFieldOrProperty("producer");
+ // Auto-create consume container if existing Bean annotated with @RocketMQMessageListener
+ });
}
@Test
public void testConsumerListener() {
runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876",
- "rocketmq.producer.group=spring_rocketmq",
- "rocketmq.consumer.listeners.spring_rocketmq.FOO_TEST_TOPIC=false",
- "rocketmq.consumer.listeners.spring_rocketmq.FOO_TEST_TOPIC2=true").
- run((context) -> {
- RocketMQProperties rocketMQProperties = context.getBean(RocketMQProperties.class);
- assertThat(rocketMQProperties.getConsumer().getListeners().get("spring_rocketmq").get("FOO_TEST_TOPIC").booleanValue()).isEqualTo(false);
- assertThat(rocketMQProperties.getConsumer().getListeners().get("spring_rocketmq").get("FOO_TEST_TOPIC2").booleanValue()).isEqualTo(true);
- });
+ "rocketmq.producer.group=spring_rocketmq",
+ "rocketmq.consumer.listeners.spring_rocketmq.FOO_TEST_TOPIC=false",
+ "rocketmq.consumer.listeners.spring_rocketmq.FOO_TEST_TOPIC2=true").
+ run((context) -> {
+ RocketMQProperties rocketMQProperties = context.getBean(RocketMQProperties.class);
+ assertThat(rocketMQProperties.getConsumer().getListeners().get("spring_rocketmq").get("FOO_TEST_TOPIC").booleanValue()).isEqualTo(false);
+ assertThat(rocketMQProperties.getConsumer().getListeners().get("spring_rocketmq").get("FOO_TEST_TOPIC2").booleanValue()).isEqualTo(true);
+ });
}
@Test
public void testRocketMQTransactionListener() {
runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876",
- "rocketmq.producer.group=spring_rocketmq",
- "demo.rocketmq.transaction.producer.group=transaction-group1").
- withUserConfiguration(TestTransactionListenerConfig.class).
- run((context) -> {
- assertThat(context).hasSingleBean(TestRocketMQLocalTransactionListener.class);
-
- });
-
+ "rocketmq.producer.group=spring_rocketmq",
+ "demo.rocketmq.transaction.producer.group=transaction-group1").
+ withUserConfiguration(TestTransactionListenerConfig.class).
+ run((context) -> {
+ assertThat(context).hasSingleBean(TestRocketMQLocalTransactionListener.class);
+ });
}
@Test
public void testBatchSendMessage() {
runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876",
- "rocketmq.producer.group=spring_rocketmq").
- run((context) -> {
- RocketMQTemplate rocketMQTemplate = context.getBean(RocketMQTemplate.class);
- List<GenericMessage<String>> batchMessages = new ArrayList<GenericMessage<String>>();
+ "rocketmq.producer.group=spring_rocketmq").
+ run((context) -> {
+ RocketMQTemplate rocketMQTemplate = context.getBean(RocketMQTemplate.class);
+ List<GenericMessage<String>> batchMessages = new ArrayList<GenericMessage<String>>();
- String errorMsg = null;
- try {
- SendResult customSendResult = rocketMQTemplate.syncSend("test", batchMessages, 60000);
- } catch (IllegalArgumentException e) {
- // it will be throw IllegalArgumentException: `messages` can not be empty
- errorMsg = e.getMessage();
- }
+ String errorMsg = null;
+ try {
+ SendResult customSendResult = rocketMQTemplate.syncSend("test", batchMessages, 60000);
+ } catch (IllegalArgumentException e) {
+ // it will be throw IllegalArgumentException: `messages` can not be empty
+ errorMsg = e.getMessage();
+ }
- // that means the rocketMQTemplate.syncSend is chosen the correct type method
- Assert.assertEquals("`messages` can not be empty", errorMsg);
- });
+ // that means the rocketMQTemplate.syncSend is chosen the correct type method
+ Assert.assertEquals("`messages` can not be empty", errorMsg);
+ });
}
public void testPlaceholdersListenerContainer() {
runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876",
- "demo.placeholders.consumer.group = abc3",
- "demo.placeholders.consumer.topic = test",
- "demo.placeholders.consumer.tags = tag1").
- withUserConfiguration(TestPlaceholdersConfig.class).
- run((context) -> {
- // No producer on consume side
- assertThat(context).doesNotHaveBean(DefaultMQProducer.class);
- // Auto-create consume container if existing Bean annotated with @RocketMQMessageListener
- assertThat(context).hasBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1");
- assertThat(context).getBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1").
- hasFieldOrPropertyWithValue("nameServer", "127.0.0.1:9876").
- hasFieldOrPropertyWithValue("consumerGroup", "abc3").
- hasFieldOrPropertyWithValue("topic", "test").
- hasFieldOrPropertyWithValue("selectorExpression", "tag1");
- });
+ "demo.placeholders.consumer.group = abc3",
+ "demo.placeholders.consumer.topic = test",
+ "demo.placeholders.consumer.tags = tag1").
+ withUserConfiguration(TestPlaceholdersConfig.class).
+ run((context) -> {
+ // No producer on consume side
+ assertThat(context).doesNotHaveBean(DefaultMQProducer.class);
+ // Auto-create consume container if existing Bean annotated with @RocketMQMessageListener
+ assertThat(context).hasBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1");
+ assertThat(context).getBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1").
+ hasFieldOrPropertyWithValue("nameServer", "127.0.0.1:9876").
+ hasFieldOrPropertyWithValue("consumerGroup", "abc3").
+ hasFieldOrPropertyWithValue("topic", "test").
+ hasFieldOrPropertyWithValue("selectorExpression", "tag1");
+ });
+ }
+
+ @Test
+ public void testRocketMQListenerContainer() {
+ runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876").
+ withUserConfiguration(TestConfig.class).
+ run((context) -> {
+ assertThat(context).getFailure().hasMessageContaining("connect to [127.0.0.1:9876] failed");
+ });
}
@Configuration
@@ -203,30 +202,16 @@
static class CustomObjectMapperConfig {
@Bean
- public ObjectMapper testObjectMapper() {
- return new ObjectMapper();
- }
- @Bean
public RocketMQMessageConverter rocketMQMessageConverter() {
return new RocketMQMessageConverter();
}
-
}
@Configuration
static class CustomObjectMappersConfig {
@Bean
- public ObjectMapper testObjectMapper() {
- return new ObjectMapper();
- }
-
- @Bean
- public ObjectMapper rocketMQMessageObjectMapper() {
- return new ObjectMapper();
- }
- @Bean
public RocketMQMessageConverter rocketMQMessageConverter() {
return new RocketMQMessageConverter();
}
@@ -263,7 +248,6 @@
@RocketMQTransactionListener
static class TestRocketMQLocalTransactionListener implements RocketMQLocalTransactionListener {
-
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
return RocketMQLocalTransactionState.COMMIT;
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
new file mode 100644
index 0000000..af2d636
--- /dev/null
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
@@ -0,0 +1,50 @@
+package org.apache.rocketmq.spring.core;
+
+import javax.annotation.Resource;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.messaging.MessagingException;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@SpringBootTest(properties = {"rocketmq.nameServer=127.0.0.1:9876", "rocketmq.producer.group=producer_group"}, classes = RocketMQAutoConfiguration.class)
+
+public class RocketMQTemplateTest {
+ @Resource
+ RocketMQTemplate rocketMQTemplate;
+
+ @Test
+ public void testSendMessage() {
+ try {
+ rocketMQTemplate.syncSend("test", "123");
+ } catch (MessagingException e) {
+ assertThat(e).hasMessageContaining("org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [127.0.0.1:9876] failed");
+ }
+
+ try {
+ rocketMQTemplate.asyncSend("test", "123", new SendCallback() {
+ @Override public void onSuccess(SendResult sendResult) {
+
+ }
+
+ @Override public void onException(Throwable e) {
+
+ }
+ });
+ } catch (MessagingException e) {
+ assertThat(e).hasMessageContaining("org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [127.0.0.1:9876] failed");
+ }
+
+ try {
+ rocketMQTemplate.syncSendOrderly("test", "123", "123");
+ } catch (MessagingException e) {
+ assertThat(e).hasMessageContaining("org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [127.0.0.1:9876] failed");
+ }
+ }
+}
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
index ea85b98..6395c9e 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
@@ -16,11 +16,10 @@
*/
package org.apache.rocketmq.spring.support;
+import java.lang.reflect.Method;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.junit.Test;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
import static org.assertj.core.api.Assertions.assertThat;
@@ -36,7 +35,7 @@
public void onMessage(String message) {
}
});
- Class result = (Class)getMessageType.invoke(listenerContainer);
+ Class result = (Class) getMessageType.invoke(listenerContainer);
assertThat(result.getName().equals(String.class.getName()));
listenerContainer.setRocketMQListener(new RocketMQListener<MessageExt>() {
@@ -44,7 +43,7 @@
public void onMessage(MessageExt message) {
}
});
- result = (Class)getMessageType.invoke(listenerContainer);
+ result = (Class) getMessageType.invoke(listenerContainer);
assertThat(result.getName().equals(MessageExt.class.getName()));
}
}
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQUtilTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQUtilTest.java
index 3a0bc41..94cf07a 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQUtilTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQUtilTest.java
@@ -17,13 +17,19 @@
package org.apache.rocketmq.spring.support;
import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Arrays;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.remoting.RPCHook;
import org.junit.Test;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
-import java.util.Arrays;
-
-import static org.junit.Assert.*;
+import static org.apache.rocketmq.spring.support.RocketMQUtil.toRocketHeaderKey;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
public class RocketMQUtilTest {
@@ -52,8 +58,8 @@
org.apache.rocketmq.common.message.Message rocketMsg2 = RocketMQUtil.convertToRocketMessage(objectMapper,
charset, destination, msgWithBytePayload);
- assertTrue(Arrays.equals(((String)msgWithStringPayload.getPayload()).getBytes(), rocketMsg1.getBody()));
- assertTrue(Arrays.equals((byte[])msgWithBytePayload.getPayload(), rocketMsg2.getBody()));
+ assertTrue(Arrays.equals(((String) msgWithStringPayload.getPayload()).getBytes(), rocketMsg1.getBody()));
+ assertTrue(Arrays.equals((byte[]) msgWithBytePayload.getPayload(), rocketMsg2.getBody()));
}
@Test
@@ -87,9 +93,51 @@
assertEquals(String.valueOf("tags"), rocketMsg.getProperty(RocketMQHeaders.PREFIX + RocketMQHeaders.TAGS));
assertNull(rocketMsg.getTags());
- rmqMsg.putUserProperty(RocketMQUtil.toRocketHeaderKey(RocketMQHeaders.TAGS), "tags2");
+ rmqMsg.putUserProperty(toRocketHeaderKey(RocketMQHeaders.TAGS), "tags2");
springMsg = RocketMQUtil.convertToSpringMessage(rmqMsg);
assertEquals("tags", springMsg.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TAGS));
}
+ @Test
+ public void testConvertToRocketMessageWithMessageConvert() {
+ Message msgWithStringPayload = MessageBuilder.withPayload("test body")
+ .setHeader("test", 1)
+ .setHeader(RocketMQHeaders.TAGS, "tags")
+ .setHeader(RocketMQHeaders.KEYS, "my_keys")
+ .build();
+ RocketMQMessageConverter messageConverter = new RocketMQMessageConverter();
+ org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(messageConverter.getMessageConverter(),
+ "UTF-8", "test-topic", msgWithStringPayload);
+ assertEquals("1", rocketMsg.getProperty("test"));
+ assertNull(rocketMsg.getProperty(RocketMQHeaders.TAGS));
+ assertEquals("my_keys", rocketMsg.getProperty(RocketMQHeaders.KEYS));
+
+ Message msgWithBytesPayload = MessageBuilder.withPayload("123".getBytes()).build();
+ org.apache.rocketmq.common.message.Message rocketMsgWithObj = RocketMQUtil.convertToRocketMessage(messageConverter.getMessageConverter(),
+ "UTF-8", "test-topic", msgWithBytesPayload);
+ assertEquals("123", new String(rocketMsgWithObj.getBody()));
+ }
+
+ @Test
+ public void testConvertToSpringMessage() {
+ org.apache.rocketmq.common.message.MessageExt rocketMsg = new org.apache.rocketmq.common.message.MessageExt();
+ rocketMsg.setTopic("test");
+ rocketMsg.setBody("123".getBytes());
+ rocketMsg.setTags("tagA");
+ rocketMsg.setKeys("key1");
+ Message message = RocketMQUtil.convertToSpringMessage(rocketMsg);
+ assertEquals("test", message.getHeaders().get(toRocketHeaderKey(RocketMQHeaders.TOPIC)));
+ assertEquals("tagA", message.getHeaders().get(toRocketHeaderKey(RocketMQHeaders.TAGS)));
+ assertEquals("key1", message.getHeaders().get(toRocketHeaderKey(RocketMQHeaders.KEYS)));
+ }
+
+ @Test
+ public void testGetInstanceName() {
+ String ak = "123";
+ String sk = "456";
+ String consumerGroup = "consumerGroup";
+ RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ak, sk));
+ String expected = "123|456|consumerGroup|";
+ assertEquals(expected + UtilAll.getPid(), RocketMQUtil.getInstanceName(rpcHook, consumerGroup));
+ }
}
\ No newline at end of file