[ISSUE-46] Support mutliple RocketMQTemplate & name-server overrided Consumer Listener
diff --git a/README.md b/README.md
index b1f4178..780c5ce 100644
--- a/README.md
+++ b/README.md
@@ -361,5 +361,45 @@
1. How do I send transactional messages?
- It needs two steps on client side: a) Define a class which is annotated with @RocketMQTransactionListener and implements RocketMQLocalTransactionListener interface, in which, the executeLocalTransaction() and checkLocalTransaction() methods are implemented;
+ It needs two steps on client side:
+
+ a) Define a class which is annotated with @RocketMQTransactionListener and implements RocketMQLocalTransactionListener interface, in which, the executeLocalTransaction() and checkLocalTransaction() methods are implemented;
+
b) Invoke the sendMessageInTransaction() method with the RocketMQTemplate API. Note: The first parameter of this method is correlated with the txProducerGroup attribute of @RocketMQTransactionListener. It can be null if using the default transaction producer group.
+
+1. How do I create more than one RocketMQTemplate with a different name-server or other specific properties?
+ ```java
+ // Step1. Define an extra RocketMQTemplate with required properties, note, the 'nameServer' property must be different from the value of global
+ // Spring configuration 'rocketmq.name-server', other properties are optionally defined, they will use the global configuration
+ // definition by default.
+
+ // The RocketMQTemplate's Spring Bean name is 'extRocketMQTemplate', same with the simplified class name (Initials lowercase)
+ @ExtRocketMQTemplateConfiguration(nameServer="127.0.0.1:9876"
+ , ... // override other specific properties if needed
+ )
+ public class ExtRocketMQTemplate extends RocketMQTemplate {
+ // keep the body empty
+ }
+
+
+ // Step2. Use the extra RocketMQTemplate. e.g.
+ @Resource(name = "extRocketMQTemplate") // Must define the name to qualify to extra-defined RocketMQTemplate bean.
+ private RocketMQTemplate extRocketMQTemplate;
+ // you can use the template as normal.
+
+ ```
+
+1. How do I create a consumer Listener with different name-server other than the global Spring configuration 'rocketmq.name-server' ?
+ ```java
+ @Service
+ @RocketMQMessageListener(
+ nameServer = "NEW-NAMESERVER-LIST", // define new nameServer list
+ topic = "test-topic-1",
+ consumerGroup = "my-consumer_test-topic-1",
+ enableMsgTrace = true,
+ customizedTraceTopic = "my-trace-topic"
+ )
+ public class MyNameServerConsumer implements RocketMQListener<String> {
+ ...
+ }
+ ```
\ No newline at end of file
diff --git a/README_zh_CN.md b/README_zh_CN.md
index c285c52..b75a617 100644
--- a/README_zh_CN.md
+++ b/README_zh_CN.md
@@ -352,3 +352,39 @@
在客户端,首先用户需要实现RocketMQLocalTransactionListener接口,并在接口类上注解声明@RocketMQTransactionListener,实现确认和回查方法;然后再使用资源模板RocketMQTemplate,
调用方法sendMessageInTransaction()来进行消息的发布。 注意:这个方法通过指定发送者组名与具体的声明了txProducerGroup的TransactionListener进行关联,您也可以不指定这个值,从而使用默认的事务发送者组。
+1. 如何声明不同name-server或者其他特定的属性来定义非标的RocketMQTemplate?
+ ```java
+ // 第一步: 定义非标的RocketMQTemplate使用你需要的属性,注意,这里的'nameServer'属性必须要定义,并且其取值不能与全局配置属性'rocketmq.name-server'相同
+ // 也可以定义其他属性,如果不定义,它们取全局的配置属性值或默认值。
+
+ // 这个RocketMQTemplate的Spring Bean名是'extRocketMQTemplate', 与所定义的类名相同(但首字母小写)
+ @ExtRocketMQTemplateConfiguration(nameServer="127.0.0.1:9876"
+ , ... // 定义其他属性,如果有必要。
+ )
+ public class ExtRocketMQTemplate extends RocketMQTemplate {
+ //类里面不需要做任何修改
+ }
+
+
+ // 第二步: 使用这个非标RocketMQTemplate
+ @Resource(name = "extRocketMQTemplate") // 这里必须定义name属性来指向上具体的Spring Bean.
+ private RocketMQTemplate extRocketMQTemplate;
+ // 接下来就可以正常使用这个extRocketMQTemplate了.
+
+ ```
+
+1. MessageListener消费端,是否可以指定不同的name-server而不是使用全局定义的'rocketmq.name-server'属性值 ?
+
+ ```java
+ @Service
+ @RocketMQMessageListener(
+ nameServer = "NEW-NAMESERVER-LIST", // 可以使用这个optional属性来指定不同的name-server
+ topic = "test-topic-1",
+ consumerGroup = "my-consumer_test-topic-1",
+ enableMsgTrace = true,
+ customizedTraceTopic = "my-trace-topic"
+ )
+ public class MyNameServerConsumer implements RocketMQListener<String> {
+ ...
+ }
+ ```
\ No newline at end of file
diff --git a/rocketmq-spring-boot-samples/pom.xml b/rocketmq-spring-boot-samples/pom.xml
index c1699b6..63a080f 100644
--- a/rocketmq-spring-boot-samples/pom.xml
+++ b/rocketmq-spring-boot-samples/pom.xml
@@ -36,7 +36,7 @@
</modules>
<properties>
- <rocketmq-spring-boot-starter-version>2.0.1</rocketmq-spring-boot-starter-version>
+ <rocketmq-spring-boot-starter-version>2.0.3-SNAPSHOT</rocketmq-spring-boot-starter-version>
</properties>
<dependencies>
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumerNewNS.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumerNewNS.java
new file mode 100644
index 0000000..5bd0deb
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumerNewNS.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot.consumer;
+
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.springframework.stereotype.Service;
+
+/**
+ * RocketMQMessageListener
+ */
+@Service
+@RocketMQMessageListener(nameServer = "${demo.rocketmq.myNameServer}", topic = "${demo.rocketmq.topic}", consumerGroup = "string_consumer")
+public class StringConsumerNewNS implements RocketMQListener<String> {
+ @Override
+ public void onMessage(String message) {
+ System.out.printf("------- StringConsumer received: %s \n", message);
+ }
+}
\ No newline at end of file
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties
index 5f74d5d..85063d3 100644
--- a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties
@@ -7,3 +7,6 @@
demo.rocketmq.orderTopic=order-paid-topic
demo.rocketmq.msgExtTopic=message-ext-topic
demo.rocketmq.transTopic=spring-transaction-topic
+
+# another nameserver different global
+demo.rocketmq.myNameServer=127.0.0.1:9876
\ No newline at end of file
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQTemplate.java b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQTemplate.java
new file mode 100644
index 0000000..7a78552
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQTemplate.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.samples.springboot;
+
+import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+
+@ExtRocketMQTemplateConfiguration(nameServer = "${demo.rocketmq.extNameServer}")
+public class ExtRocketMQTemplate extends RocketMQTemplate {
+}
\ No newline at end of file
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
index d7ca208..465658a 100644
--- a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
@@ -54,6 +54,8 @@
private String orderPaidTopic;
@Value("${demo.rocketmq.msgExtTopic}")
private String msgExtTopic;
+ @Resource(name = "extRocketMQTemplate")
+ private RocketMQTemplate extRocketMQTemplate;
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
@@ -65,6 +67,10 @@
SendResult sendResult = rocketMQTemplate.syncSend(springTopic, "Hello, World!");
System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);
+ // Use the extRocketMQTemplate
+ sendResult = extRocketMQTemplate.syncSend(springTopic, "Hello, World!");
+ System.out.printf("extRocketMQTemplate.syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);
+
// Send string with spring Message
sendResult = rocketMQTemplate.syncSend(springTopic, MessageBuilder.withPayload("Hello, World! I'm from spring message").build());
System.out.printf("syncSend2 to topic %s sendResult=%s %n", springTopic, sendResult);
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/application.properties b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/application.properties
index 7965b28..782291f 100644
--- a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/application.properties
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/application.properties
@@ -5,4 +5,6 @@
demo.rocketmq.topic=string-topic
demo.rocketmq.orderTopic=order-paid-topic
demo.rocketmq.msgExtTopic=message-ext-topic
-demo.rocketmq.transTopic=spring-transaction-topic
\ No newline at end of file
+demo.rocketmq.transTopic=spring-transaction-topic
+
+demo.rocketmq.extNameServer=127.0.0.1:9876
\ No newline at end of file
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java
new file mode 100644
index 0000000..c268b57
--- /dev/null
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.spring.annotation;
+
+import org.springframework.stereotype.Component;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+@Component
+public @interface ExtRocketMQTemplateConfiguration {
+ /**
+ * The component name of the Producer configuration.
+ */
+ String value() default "";
+
+ /**
+ * The property of "name-server".
+ */
+ String nameServer();
+
+ /**
+ * Name of producer.
+ */
+ String group() default "${rocketmq.producer.group:}";
+ /**
+ * Millis of send message timeout.
+ */
+ int sendMessageTimeout() default -1;
+ /**
+ * Compress message body threshold, namely, message body larger than 4k will be compressed on default.
+ */
+ int compressMessageBodyThreshold() default -1;
+ /**
+ * Maximum number of retry to perform internally before claiming sending failure in synchronous mode.
+ * This may potentially cause message duplication which is up to application developers to resolve.
+ */
+ int retryTimesWhenSendFailed() default -1;
+ /**
+ * <p> Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p>
+ * This may potentially cause message duplication which is up to application developers to resolve.
+ */
+ int retryTimesWhenSendAsyncFailed() default -1;
+ /**
+ * Indicate whether to retry another broker on sending failure internally.
+ */
+ boolean retryNextServer() default false;
+ /**
+ * Maximum allowed message size in bytes.
+ */
+ int maxMessageSize() default -1;
+ /**
+ * The property of "access-key".
+ */
+ String accessKey() default "${rocketmq.producer.accessKey:}";
+ /**
+ * The property of "secret-key".
+ */
+ String secretKey() default "${rocketmq.producer.secretKey:}";
+ /**
+ * Switch flag instance for message trace.
+ */
+ boolean enableMsgTrace() default true;
+ /**
+ * The name value of message trace topic.If you don't config,you can use the default trace topic name.
+ */
+ String customizedTraceTopic() default "${rocketmq.producer.customized-trace-topic:}";
+}
\ No newline at end of file
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
index 608be82..8d287d1 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
@@ -28,6 +28,7 @@
@Documented
public @interface RocketMQMessageListener {
+ String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
@@ -93,4 +94,8 @@
*/
String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;
+ /**
+ * The property of "name-server".
+ */
+ String nameServer() default NAME_SERVER_PLACEHOLDER;
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
new file mode 100644
index 0000000..192bfc9
--- /dev/null
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.spring.autoconfigure;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.aop.framework.AopProxyUtils;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.SmartInitializingSingleton;
+import org.springframework.beans.factory.support.BeanDefinitionValidationException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.support.GenericApplicationContext;
+import org.springframework.core.env.StandardEnvironment;
+import org.springframework.util.StringUtils;
+
+import java.util.Map;
+import java.util.Objects;
+
+
+@Configuration
+public class ExtProducerResetConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
+ private final static Logger log = LoggerFactory.getLogger(ExtProducerResetConfiguration.class);
+
+ private ConfigurableApplicationContext applicationContext;
+
+ private StandardEnvironment environment;
+
+ private RocketMQProperties rocketMQProperties;
+
+ private ObjectMapper objectMapper;
+
+ public ExtProducerResetConfiguration(ObjectMapper rocketMQMessageObjectMapper,
+ StandardEnvironment environment,
+ RocketMQProperties rocketMQProperties) {
+ this.objectMapper = rocketMQMessageObjectMapper;
+ this.environment = environment;
+ this.rocketMQProperties = rocketMQProperties;
+ }
+
+ @Override
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+ this.applicationContext = (ConfigurableApplicationContext) applicationContext;
+ }
+
+ @Override
+ public void afterSingletonsInstantiated() {
+ Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(ExtRocketMQTemplateConfiguration.class);
+
+ if (Objects.nonNull(beans)) {
+ beans.forEach(this::registerTemplate);
+ }
+ }
+
+ private void registerTemplate(String beanName, Object bean) {
+ Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
+
+ if (!RocketMQTemplate.class.isAssignableFrom(bean.getClass())) {
+ throw new IllegalStateException(clazz + " is not instance of " + RocketMQTemplate.class.getName());
+ }
+
+ ExtRocketMQTemplateConfiguration annotation = clazz.getAnnotation(ExtRocketMQTemplateConfiguration.class);
+ GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
+ validate(annotation, genericApplicationContext);
+
+ DefaultMQProducer mqProducer = createProducer(annotation);
+ // Set instanceName same as the beanName
+ mqProducer.setInstanceName(beanName);
+ try {
+ mqProducer.start();
+ } catch (MQClientException e) {
+ throw new BeanDefinitionValidationException(String.format("Failed to startup MQProducer for RocketMQTemplate {}",
+ beanName), e);
+ }
+ RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) bean;
+ rocketMQTemplate.setProducer(mqProducer);
+ rocketMQTemplate.setObjectMapper(objectMapper);
+
+
+ log.info("Set real producer to :{} {}", beanName, annotation.value());
+ }
+
+ private DefaultMQProducer createProducer(ExtRocketMQTemplateConfiguration annotation) {
+ DefaultMQProducer producer = null;
+
+ RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
+ if (producerConfig == null) {
+ producerConfig = new RocketMQProperties.Producer();
+ }
+ String nameServer = environment.resolvePlaceholders(annotation.nameServer());
+ String groupName = environment.resolvePlaceholders(annotation.group());
+ groupName = StringUtils.isEmpty(groupName) ? producerConfig.getGroup() : groupName;
+
+ String ak = environment.resolvePlaceholders(annotation.accessKey());
+ ak = StringUtils.isEmpty(ak) ? producerConfig.getAccessKey() : annotation.accessKey();
+ String sk = environment.resolvePlaceholders(annotation.secretKey());
+ sk = StringUtils.isEmpty(sk) ? producerConfig.getSecretKey() : annotation.secretKey();
+ String customizedTraceTopic = environment.resolvePlaceholders(annotation.customizedTraceTopic());
+ customizedTraceTopic = StringUtils.isEmpty(customizedTraceTopic) ? producerConfig.getCustomizedTraceTopic() : customizedTraceTopic;
+
+ if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
+ producer = new DefaultMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)),
+ annotation.enableMsgTrace(), customizedTraceTopic);
+ producer.setVipChannelEnabled(false);
+ } else {
+ producer = new DefaultMQProducer(groupName, annotation.enableMsgTrace(), customizedTraceTopic);
+ }
+
+ producer.setNamesrvAddr(nameServer);
+ producer.setSendMsgTimeout(annotation.sendMessageTimeout() == -1 ? producerConfig.getSendMessageTimeout() : annotation.sendMessageTimeout());
+ producer.setRetryTimesWhenSendFailed(annotation.retryTimesWhenSendAsyncFailed() == -1 ? producerConfig.getRetryTimesWhenSendFailed() : annotation.retryTimesWhenSendAsyncFailed());
+ producer.setRetryTimesWhenSendAsyncFailed(annotation.retryTimesWhenSendAsyncFailed() == -1 ? producerConfig.getRetryTimesWhenSendAsyncFailed() : annotation.retryTimesWhenSendAsyncFailed());
+ producer.setMaxMessageSize(annotation.maxMessageSize() == -1 ? producerConfig.getMaxMessageSize() : annotation.maxMessageSize());
+ producer.setCompressMsgBodyOverHowmuch(annotation.compressMessageBodyThreshold() == -1 ? producerConfig.getCompressMessageBodyThreshold() : annotation.compressMessageBodyThreshold());
+ producer.setRetryAnotherBrokerWhenNotStoreOK(annotation.retryNextServer());
+
+ return producer;
+ }
+
+ private void validate(ExtRocketMQTemplateConfiguration annotation, GenericApplicationContext genericApplicationContext) {
+ if (genericApplicationContext.isBeanNameInUse(annotation.value())) {
+ throw new BeanDefinitionValidationException(String.format("Bean {} has been used in Spring Application Context, " +
+ "please check the @ExtRocketMQTemplateConfiguration",
+ annotation.value()));
+ }
+
+ if (rocketMQProperties.getNameServer() == null ||
+ rocketMQProperties.getNameServer().equals(environment.resolvePlaceholders(annotation.nameServer()))) {
+ throw new BeanDefinitionValidationException(
+ "Bad annotation definition in @ExtRocketMQTemplateConfiguration, nameServer property is same with " +
+ "global property, please use the default RocketMQTemplate!");
+ }
+ }
+}
\ No newline at end of file
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
index 3a9e070..a3149fd 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
@@ -35,6 +35,7 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.env.StandardEnvironment;
+import org.springframework.util.StringUtils;
import java.util.Map;
import java.util.Objects;
@@ -110,7 +111,9 @@
private DefaultRocketMQListenerContainer createRocketMQListenerContainer(Object bean, RocketMQMessageListener annotation) {
DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();
- container.setNameServer(rocketMQProperties.getNameServer());
+ String nameServer = environment.resolvePlaceholders(annotation.nameServer());
+ nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer;
+ container.setNameServer(nameServer);
container.setTopic(environment.resolvePlaceholders(annotation.topic()));
container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
container.setRocketMQMessageListener(annotation);
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
index 2fc034c..f1ff5c3 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
@@ -45,7 +45,7 @@
@EnableConfigurationProperties(RocketMQProperties.class)
@ConditionalOnClass({ MQAdmin.class, ObjectMapper.class })
@ConditionalOnProperty(prefix = "rocketmq", value = "name-server")
-@Import({ JacksonFallbackConfiguration.class, ListenerContainerConfiguration.class })
+@Import({ JacksonFallbackConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class })
@AutoConfigureAfter(JacksonAutoConfiguration.class)
public class RocketMQAutoConfiguration {
@@ -94,7 +94,7 @@
}
@Bean
- @ConditionalOnBean(RocketMQTemplate.class)
+ @ConditionalOnBean(value = RocketMQTemplate.class, name = "rocketMQTemplate")
@ConditionalOnMissingBean(TransactionHandlerRegistry.class)
public TransactionHandlerRegistry transactionHandlerRegistry(RocketMQTemplate template) {
return new TransactionHandlerRegistry(template);
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index 2f29d7a..58d1301 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -450,8 +450,9 @@
@Override
public void afterPropertiesSet() throws Exception {
- Assert.notNull(producer, "Property 'producer' is required");
- producer.start();
+ if (producer != null) {
+ producer.start();
+ }
}
@Override
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
index 50cd3a0..4f843d2 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
@@ -414,7 +414,12 @@
resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
}
- consumer.setNamesrvAddr(nameServer);
+ String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer());
+ if (customizedNameServer != null) {
+ consumer.setNamesrvAddr(customizedNameServer);
+ } else {
+ consumer.setNamesrvAddr(nameServer);
+ }
consumer.setConsumeThreadMax(consumeThreadMax);
if (consumeThreadMax < consumer.getConsumeThreadMin()) {
consumer.setConsumeThreadMin(consumeThreadMax);
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
index f78251e..e6113a8 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
@@ -19,13 +19,19 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
+import org.junit.Assert;
import org.junit.Test;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
+import org.springframework.beans.factory.support.BeanDefinitionValidationException;
import org.springframework.boot.autoconfigure.AutoConfigurations;
+import org.springframework.boot.test.context.assertj.AssertableApplicationContext;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
+import org.springframework.boot.test.context.runner.ContextConsumer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -62,23 +68,28 @@
@Test
public void testDefaultMQProducer() {
runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876",
- "rocketmq.producer.group=spring_rocketmq").
- run((context) -> {
- assertThat(context).hasSingleBean(DefaultMQProducer.class);
- });
+ "rocketmq.producer.group=spring_rocketmq").
+ run((context) -> {
+ assertThat(context).hasSingleBean(DefaultMQProducer.class);
+ });
}
@Test
public void testRocketMQListenerContainer() {
runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876").
- withUserConfiguration(TestConfig.class).
- run((context) -> {
- // No producer on consume side
- assertThat(context).doesNotHaveBean(DefaultMQProducer.class);
- // Auto-create consume container if existing Bean annotated with @RocketMQMessageListener
- assertThat(context).hasSingleBean(DefaultRocketMQListenerContainer.class);
- });
+ withUserConfiguration(TestConfig.class).
+ run((context) -> {
+ // No producer on consume side
+ assertThat(context).doesNotHaveBean(DefaultMQProducer.class);
+ // Auto-create consume container if existing Bean annotated with @RocketMQMessageListener
+ assertThat(context).hasBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1");
+ assertThat(context).hasBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_2");
+ assertThat(context).getBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1").
+ hasFieldOrPropertyWithValue("nameServer", "127.0.0.1:9876");
+ assertThat(context).getBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_2").
+ hasFieldOrPropertyWithValue("nameServer", "127.0.1.1:9876");
+ });
}
@@ -87,8 +98,8 @@
runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876").
withUserConfiguration(TestConfig.class, CustomObjectMapperConfig.class).
run((context) -> {
- assertThat(context).hasSingleBean(DefaultRocketMQListenerContainer.class);
- assertThat(context.getBean(DefaultRocketMQListenerContainer.class).getObjectMapper())
+ assertThat(context.getBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1",
+ DefaultRocketMQListenerContainer.class).getObjectMapper())
.isSameAs(context.getBean(CustomObjectMapperConfig.class).testObjectMapper());
});
}
@@ -98,12 +109,35 @@
runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876").
withUserConfiguration(TestConfig.class, CustomObjectMappersConfig.class).
run((context) -> {
- assertThat(context).hasSingleBean(DefaultRocketMQListenerContainer.class);
- assertThat(context.getBean(DefaultRocketMQListenerContainer.class).getObjectMapper())
+ assertThat(context.getBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1",
+ DefaultRocketMQListenerContainer.class).getObjectMapper())
.isSameAs(context.getBean(CustomObjectMappersConfig.class).rocketMQMessageObjectMapper());
});
}
+
+ @Test
+ public void testExtRocketMQTemplate() {
+ runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876").
+ withUserConfiguration(ExtRocketMQTemplateConfig.class, CustomObjectMappersConfig.class).
+ run(new ContextConsumer<AssertableApplicationContext>() {
+ @Override
+ public void accept(AssertableApplicationContext context) throws Throwable {
+ Throwable th = context.getStartupFailure();
+ System.out.printf("th==" + th + "\n");
+ Assert.assertTrue(th instanceof BeanDefinitionValidationException);
+ }
+ });
+
+ runner.withPropertyValues("rocketmq.name-server=127.0.1.1:9876").
+ withUserConfiguration(ExtRocketMQTemplateConfig.class, CustomObjectMappersConfig.class).
+ run((context) -> {
+ // No producer on consume side
+ assertThat(context).getBean("extRocketMQTemplate").hasFieldOrProperty("producer");
+ // Auto-create consume container if existing Bean annotated with @RocketMQMessageListener
+ });
+ }
+
@Configuration
static class TestConfig {
@@ -111,6 +145,12 @@
public Object consumeListener() {
return new MyMessageListener();
}
+
+ @Bean
+ public Object consumeListener1() {
+ return new MyMessageListener1();
+ }
+
}
@Configuration
@@ -146,5 +186,29 @@
}
}
+
+ @RocketMQMessageListener(nameServer = "127.0.1.1:9876", consumerGroup = "abc1", topic = "test")
+ static class MyMessageListener1 implements RocketMQListener {
+
+ @Override
+ public void onMessage(Object message) {
+
+ }
+ }
+
+ @Configuration
+ static class ExtRocketMQTemplateConfig {
+
+ @Bean
+ public RocketMQTemplate extRocketMQTemplate() {
+ return new MyExtRocketMQTemplate();
+ }
+
+ }
+
+ @ExtRocketMQTemplateConfiguration(group = "test", nameServer = "127.0.0.1:9876")
+ static class MyExtRocketMQTemplate extends RocketMQTemplate {
+
+ }
}