Merge pull request #129 from liuliuzo/master
[ISSUE #103]Support resolvePlaceholders for selectorExpression
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
index 5d461bc..927bf44 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
@@ -126,7 +126,9 @@
private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean, RocketMQMessageListener annotation) {
DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();
-
+
+ container.setRocketMQMessageListener(annotation);
+
String nameServer = environment.resolvePlaceholders(annotation.nameServer());
nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer;
String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());
@@ -135,8 +137,11 @@
container.setAccessChannel(AccessChannel.valueOf(accessChannel));
}
container.setTopic(environment.resolvePlaceholders(annotation.topic()));
+ String tags = environment.resolvePlaceholders(annotation.selectorExpression());
+ if (!StringUtils.isEmpty(tags)) {
+ container.setSelectorExpression(tags);
+ }
container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
- container.setRocketMQMessageListener(annotation);
container.setRocketMQListener((RocketMQListener) bean);
container.setObjectMapper(objectMapper);
container.setName(name); // REVIEW ME, use the same clientId or multiple?
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
index c4b5d19..497d94b 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
@@ -204,6 +204,10 @@
return selectorType;
}
+ public void setSelectorExpression(String selectorExpression) {
+ this.selectorExpression = selectorExpression;
+ }
+
public String getSelectorExpression() {
return selectorExpression;
}
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
index 18fdcca..9fa0fa4 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
@@ -130,7 +130,7 @@
@Test
public void testExtRocketMQTemplate() {
runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876").
- withUserConfiguration(ExtRocketMQTemplateConfig.class, CustomObjectMappersConfig.class).
+ withUserConfiguration(TestExtRocketMQTemplateConfig.class, CustomObjectMappersConfig.class).
run(new ContextConsumer<AssertableApplicationContext>() {
@Override
public void accept(AssertableApplicationContext context) throws Throwable {
@@ -141,7 +141,7 @@
});
runner.withPropertyValues("rocketmq.name-server=127.0.1.1:9876").
- withUserConfiguration(ExtRocketMQTemplateConfig.class, CustomObjectMappersConfig.class).
+ withUserConfiguration(TestExtRocketMQTemplateConfig.class, CustomObjectMappersConfig.class).
run((context) -> {
// No producer on consume side
assertThat(context).getBean("extRocketMQTemplate").hasFieldOrProperty("producer");
@@ -170,23 +170,44 @@
"demo.rocketmq.transaction.producer.group=transaction-group1").
withUserConfiguration(TestTransactionListenerConfig.class).
run((context) -> {
- assertThat(context).hasSingleBean(MyRocketMQLocalTransactionListener.class);
+ assertThat(context).hasSingleBean(TestRocketMQLocalTransactionListener.class);
});
}
+ @Test
+ public void testPlaceholdersListenerContainer() {
+ runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876",
+ "demo.placeholders.consumer.group = abc3",
+ "demo.placeholders.consumer.topic = test",
+ "demo.placeholders.consumer.tags = tag1").
+ withUserConfiguration(TestPlaceholdersConfig.class).
+ run((context) -> {
+ // No producer on consume side
+ assertThat(context).doesNotHaveBean(DefaultMQProducer.class);
+ // Auto-create consume container if existing Bean annotated with @RocketMQMessageListener
+ assertThat(context).hasBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1");
+ assertThat(context).getBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1").
+ hasFieldOrPropertyWithValue("nameServer", "127.0.0.1:9876").
+ hasFieldOrPropertyWithValue("consumerGroup", "abc3").
+ hasFieldOrPropertyWithValue("topic", "test").
+ hasFieldOrPropertyWithValue("selectorExpression", "tag1");
+ });
+ }
+
+
@Configuration
static class TestConfig {
@Bean
public Object consumeListener() {
- return new MyMessageListener();
+ return new TestDefaultNameServerListener();
}
@Bean
public Object consumeListener1() {
- return new MyMessageListener1();
+ return new TestCustomNameServerListener();
}
}
@@ -217,7 +238,7 @@
}
@RocketMQMessageListener(consumerGroup = "abc", topic = "test")
- static class MyMessageListener implements RocketMQListener {
+ static class TestDefaultNameServerListener implements RocketMQListener {
@Override
public void onMessage(Object message) {
@@ -226,7 +247,7 @@
}
@RocketMQMessageListener(nameServer = "127.0.1.1:9876", consumerGroup = "abc1", topic = "test")
- static class MyMessageListener1 implements RocketMQListener {
+ static class TestCustomNameServerListener implements RocketMQListener {
@Override
public void onMessage(Object message) {
@@ -238,13 +259,13 @@
static class TestTransactionListenerConfig {
@Bean
public Object rocketMQLocalTransactionListener() {
- return new MyRocketMQLocalTransactionListener();
+ return new TestRocketMQLocalTransactionListener();
}
}
@RocketMQTransactionListener(txProducerGroup = "${demo.rocketmq.transaction.producer.group}")
- static class MyRocketMQLocalTransactionListener implements RocketMQLocalTransactionListener {
+ static class TestRocketMQLocalTransactionListener implements RocketMQLocalTransactionListener {
@Override
@@ -259,18 +280,37 @@
}
@Configuration
- static class ExtRocketMQTemplateConfig {
+ static class TestExtRocketMQTemplateConfig {
@Bean
public RocketMQTemplate extRocketMQTemplate() {
- return new MyExtRocketMQTemplate();
+ return new TestExtRocketMQTemplate();
}
}
@ExtRocketMQTemplateConfiguration(group = "test", nameServer = "127.0.0.1:9876")
- static class MyExtRocketMQTemplate extends RocketMQTemplate {
+ static class TestExtRocketMQTemplate extends RocketMQTemplate {
}
+
+ @Configuration
+ static class TestPlaceholdersConfig {
+
+ @Bean
+ public Object consumeListener() {
+ return new TestPlaceholdersListener();
+ }
+
+ }
+
+ @RocketMQMessageListener(consumerGroup = "${demo.placeholders.consumer.group}", topic = "${demo.placeholders.consumer.topic}", selectorExpression = "${demo.placeholders.consumer.tags}")
+ static class TestPlaceholdersListener implements RocketMQListener {
+
+ @Override
+ public void onMessage(Object message) {
+
+ }
+ }
}