[ISSUE #632 ] Fix NPE caused by using ExtRocketMQTemplateConfiguration annotation extension to send messages
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListenerBeanPostProcessor.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListenerBeanPostProcessor.java
index bf53ecb..8dfb358 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListenerBeanPostProcessor.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListenerBeanPostProcessor.java
@@ -23,6 +23,7 @@
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
+import org.springframework.context.SmartLifecycle;
import org.springframework.core.OrderComparator;
import org.springframework.core.annotation.AnnotationUtils;
@@ -32,7 +33,7 @@
import java.util.function.BiFunction;
import java.util.stream.Collectors;
-public class RocketMQMessageListenerBeanPostProcessor implements ApplicationContextAware, BeanPostProcessor, InitializingBean {
+public class RocketMQMessageListenerBeanPostProcessor implements ApplicationContextAware, BeanPostProcessor, InitializingBean, SmartLifecycle {
private ApplicationContext applicationContext;
@@ -40,6 +41,8 @@
private RocketMQMessageListenerContainerRegistrar listenerContainerRegistrar;
+ private boolean running = false;
+
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
@@ -59,6 +62,34 @@
}
@Override
+ public int getPhase() {
+ return Integer.MAX_VALUE - 2000;
+ }
+
+ @Override
+ public void start() {
+ if (!isRunning()) {
+ this.setRunning(true);
+ listenerContainerRegistrar.startContainer();
+ }
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ public void setRunning(boolean running) {
+ this.running = running;
+ }
+
+
+ @Override
+ public boolean isRunning() {
+ return running;
+ }
+
+ @Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageListenerContainerRegistrar.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageListenerContainerRegistrar.java
index c15d168..11cdcd9 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageListenerContainerRegistrar.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageListenerContainerRegistrar.java
@@ -36,7 +36,9 @@
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.util.StringUtils;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
public class RocketMQMessageListenerContainerRegistrar implements ApplicationContextAware {
@@ -52,6 +54,8 @@
private final RocketMQMessageConverter rocketMQMessageConverter;
+ private final List<DefaultRocketMQListenerContainer> containers = new ArrayList<>();
+
public RocketMQMessageListenerContainerRegistrar(RocketMQMessageConverter rocketMQMessageConverter,
ConfigurableEnvironment environment, RocketMQProperties rocketMQProperties) {
this.rocketMQMessageConverter = rocketMQMessageConverter;
@@ -97,18 +101,25 @@
genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class, () -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
DefaultRocketMQListenerContainer.class);
- if (!container.isRunning()) {
- try {
- container.start();
- } catch (Exception e) {
- log.error("Started container failed. {}", container, e);
- throw new RuntimeException(e);
- }
- }
+
+ containers.add(container);
log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
}
+ public void startContainer() {
+ for (DefaultRocketMQListenerContainer container : containers) {
+ if (!container.isRunning()) {
+ try {
+ container.start();
+ } catch (Exception e) {
+ log.error("Started container failed. {}", container, e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,
RocketMQMessageListener annotation) {
DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();