Merge pull request #179 from RongtongJin/polish_transaction
[ISSUE #178] Refactor transaction message implementation
diff --git a/rocketmq-spring-boot-samples/pom.xml b/rocketmq-spring-boot-samples/pom.xml
index c513a5d..983a2d4 100644
--- a/rocketmq-spring-boot-samples/pom.xml
+++ b/rocketmq-spring-boot-samples/pom.xml
@@ -38,7 +38,7 @@
</modules>
<properties>
- <rocketmq-spring-boot-starter-version>2.0.4-SNAPSHOT</rocketmq-spring-boot-starter-version>
+ <rocketmq-spring-boot-starter-version>2.0.5-SNAPSHOT</rocketmq-spring-boot-starter-version>
</properties>
<dependencies>
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerACLApplication.java b/rocketmq-spring-boot-samples/rocketmq-produce-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerACLApplication.java
index 498c2e1..486a413 100644
--- a/rocketmq-spring-boot-samples/rocketmq-produce-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerACLApplication.java
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerACLApplication.java
@@ -41,7 +41,6 @@
*/
@SpringBootApplication
public class ProducerACLApplication implements CommandLineRunner {
- private static final String TX_PGROUP_NAME = "myTxProducerGroup";
@Resource
private RocketMQTemplate rocketMQTemplate;
@Value("${demo.rocketmq.transTopic}")
@@ -75,7 +74,7 @@
Message msg = MessageBuilder.withPayload("Hello RocketMQ " + i).
setHeader(RocketMQHeaders.TRANSACTION_ID, "KEY_" + i).build();
- SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(TX_PGROUP_NAME,
+ SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(
springTransTopic + ":" + tags[i % tags.length], msg, null);
System.out.printf("------ send Transactional msg body = %s , sendResult=%s %n",
msg.getPayload(), sendResult.getSendStatus());
@@ -87,11 +86,7 @@
}
}
- @RocketMQTransactionListener(
- txProducerGroup = TX_PGROUP_NAME,
- accessKey = "AK", // if not setting, it will read by `rocketmq.producer.access-key` key
- secretKey = "SK" // if not setting, it will read by `rocketmq.producer.secret-key` key
- )
+ @RocketMQTransactionListener
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
index 8abc4ff..4771bea 100644
--- a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
@@ -48,7 +48,6 @@
*/
@SpringBootApplication
public class ProducerApplication implements CommandLineRunner {
- private static final String TX_PGROUP_NAME = "myTxProducerGroup";
@Resource
private RocketMQTemplate rocketMQTemplate;
@Value("${demo.rocketmq.transTopic}")
@@ -136,7 +135,7 @@
Message msg = MessageBuilder.withPayload("Hello RocketMQ " + i).
setHeader(RocketMQHeaders.TRANSACTION_ID, "KEY_" + i).build();
- SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(TX_PGROUP_NAME,
+ SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(
springTransTopic + ":" + tags[i % tags.length], msg, null);
System.out.printf("------ send Transactional msg body = %s , sendResult=%s %n",
msg.getPayload(), sendResult.getSendStatus());
@@ -148,7 +147,7 @@
}
}
- @RocketMQTransactionListener(txProducerGroup = TX_PGROUP_NAME)
+ @RocketMQTransactionListener
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQTransactionListener.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQTransactionListener.java
index f3f874c..c65ffc6 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQTransactionListener.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQTransactionListener.java
@@ -17,14 +17,12 @@
package org.apache.rocketmq.spring.annotation;
-import org.apache.rocketmq.spring.config.RocketMQConfigUtils;
-import org.springframework.stereotype.Component;
-
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
+import org.springframework.stereotype.Component;
/**
* This annotation is used over a class which implements interface
@@ -40,14 +38,6 @@
public @interface RocketMQTransactionListener {
/**
- * Declare the txProducerGroup that is used to relate callback event to the listener, rocketMQTemplate must send a
- * transactional message with the declared txProducerGroup.
- * <p>
- * <p>It is suggested to use the default txProducerGroup if your system only needs to define a TransactionListener class.
- */
- String txProducerGroup() default RocketMQConfigUtils.ROCKETMQ_TRANSACTION_DEFAULT_GLOBAL_NAME;
-
- /**
* Set ExecutorService params -- corePoolSize
*/
int corePoolSize() default 1;
@@ -67,13 +57,4 @@
*/
int blockingQueueSize() default 2000;
- /**
- * The property of "access-key"
- */
- String accessKey() default "${rocketmq.producer.access-key}";
-
- /**
- * The property of "secret-key"
- */
- String secretKey() default "${rocketmq.producer.secret-key}";
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
index dbe697b..f4abf72 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
@@ -17,16 +17,19 @@
package org.apache.rocketmq.spring.autoconfigure;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.lang.reflect.Field;
import javax.annotation.PostConstruct;
-
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.MQAdmin;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.TransactionMQProducer;
+import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
+import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl;
import org.apache.rocketmq.spring.config.RocketMQConfigUtils;
import org.apache.rocketmq.spring.config.RocketMQTransactionAnnotationProcessor;
-import org.apache.rocketmq.spring.config.TransactionHandlerRegistry;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.slf4j.Logger;
@@ -48,8 +51,6 @@
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
@Configuration
@EnableConfigurationProperties(RocketMQProperties.class)
@ConditionalOnClass({MQAdmin.class, ObjectMapper.class})
@@ -86,14 +87,26 @@
DefaultMQProducer producer;
String ak = rocketMQProperties.getProducer().getAccessKey();
String sk = rocketMQProperties.getProducer().getSecretKey();
- if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
- producer = new DefaultMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)),
- rocketMQProperties.getProducer().isEnableMsgTrace(),
- rocketMQProperties.getProducer().getCustomizedTraceTopic());
+ boolean isEnableAcl = !StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk);
+ if (isEnableAcl) {
+ producer = new TransactionMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)));
producer.setVipChannelEnabled(false);
} else {
- producer = new DefaultMQProducer(groupName, rocketMQProperties.getProducer().isEnableMsgTrace(),
- rocketMQProperties.getProducer().getCustomizedTraceTopic());
+ producer = new TransactionMQProducer(groupName);
+ }
+
+ if (rocketMQProperties.getProducer().isEnableMsgTrace()) {
+ try {
+ AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(rocketMQProperties.getProducer().getCustomizedTraceTopic(), isEnableAcl ? new AclClientRPCHook(new SessionCredentials(ak, sk)) : null);
+ dispatcher.setHostProducer(producer.getDefaultMQProducerImpl());
+ Field field = DefaultMQProducer.class.getDeclaredField("traceDispatcher");
+ field.setAccessible(true);
+ field.set(producer, dispatcher);
+ producer.getDefaultMQProducerImpl().registerSendMessageHook(
+ new SendMessageTraceHookImpl(dispatcher));
+ } catch (Throwable e) {
+ log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
+ }
}
producer.setNamesrvAddr(nameServer);
@@ -123,20 +136,13 @@
return rocketMQTemplate;
}
- @Bean
- @ConditionalOnBean(name = RocketMQConfigUtils.ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME)
- @ConditionalOnMissingBean(TransactionHandlerRegistry.class)
- public TransactionHandlerRegistry transactionHandlerRegistry(
- @Qualifier(RocketMQConfigUtils.ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME) RocketMQTemplate template) {
- return new TransactionHandlerRegistry(template);
- }
@Bean(name = RocketMQConfigUtils.ROCKETMQ_TRANSACTION_ANNOTATION_PROCESSOR_BEAN_NAME)
- @ConditionalOnBean(TransactionHandlerRegistry.class)
+ @ConditionalOnBean(name = RocketMQConfigUtils.ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public static RocketMQTransactionAnnotationProcessor transactionAnnotationProcessor(
- TransactionHandlerRegistry transactionHandlerRegistry) {
- return new RocketMQTransactionAnnotationProcessor(transactionHandlerRegistry);
+ @Qualifier(RocketMQConfigUtils.ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME) RocketMQTemplate template) {
+ return new RocketMQTransactionAnnotationProcessor(template);
}
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/RocketMQTransactionAnnotationProcessor.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/RocketMQTransactionAnnotationProcessor.java
index 2713bad..9008d2d 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/RocketMQTransactionAnnotationProcessor.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/RocketMQTransactionAnnotationProcessor.java
@@ -17,11 +17,20 @@
package org.apache.rocketmq.spring.config;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanCreationException;
@@ -31,13 +40,6 @@
import org.springframework.core.Ordered;
import org.springframework.core.annotation.AnnotationUtils;
-import java.util.Collections;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
public class RocketMQTransactionAnnotationProcessor
implements BeanPostProcessor, Ordered, ApplicationContextAware {
private final static Logger log = LoggerFactory.getLogger(RocketMQTransactionAnnotationProcessor.class);
@@ -46,10 +48,10 @@
private final Set<Class<?>> nonProcessedClasses =
Collections.newSetFromMap(new ConcurrentHashMap<Class<?>, Boolean>(64));
- private TransactionHandlerRegistry transactionHandlerRegistry;
+ private RocketMQTemplate rocketMQTemplate;
- public RocketMQTransactionAnnotationProcessor(TransactionHandlerRegistry transactionHandlerRegistry) {
- this.transactionHandlerRegistry = transactionHandlerRegistry;
+ public RocketMQTransactionAnnotationProcessor(RocketMQTemplate rocketMQTemplate) {
+ this.rocketMQTemplate = rocketMQTemplate;
}
@Override
@@ -85,7 +87,7 @@
private void processTransactionListenerAnnotation(RocketMQTransactionListener listener, Object bean)
throws MQClientException {
- if (transactionHandlerRegistry == null) {
+ if (rocketMQTemplate == null) {
throw new MQClientException("Bad usage of @RocketMQTransactionListener, " +
"the class must work with RocketMQTemplate", null);
}
@@ -94,24 +96,11 @@
"the class must implement interface RocketMQLocalTransactionListener",
null);
}
- TransactionHandler transactionHandler = new TransactionHandler();
- transactionHandler.setBeanFactory(this.applicationContext.getAutowireCapableBeanFactory());
- transactionHandler.setName(applicationContext.getEnvironment().resolvePlaceholders(listener.txProducerGroup()));
- transactionHandler.setBeanName(bean.getClass().getName());
- transactionHandler.setListener((RocketMQLocalTransactionListener) bean);
- transactionHandler.setCheckExecutor(listener.corePoolSize(), listener.maximumPoolSize(),
- listener.keepAliveTime(), listener.blockingQueueSize());
- RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(),
- listener.accessKey(), listener.secretKey());
+ ((TransactionMQProducer) rocketMQTemplate.getProducer()).setExecutorService(new ThreadPoolExecutor(listener.corePoolSize(), listener.maximumPoolSize(),
+ listener.keepAliveTime(), TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(listener.blockingQueueSize())));
+ ((TransactionMQProducer) rocketMQTemplate.getProducer()).setTransactionListener(RocketMQUtil.convert((RocketMQLocalTransactionListener) bean));
- if (Objects.nonNull(rpcHook)) {
- transactionHandler.setRpcHook(rpcHook);
- } else {
- log.debug("Access-key or secret-key not configure in " + listener + ".");
- }
-
- transactionHandlerRegistry.registerTransactionHandler(transactionHandler);
}
@Override
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandler.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandler.java
deleted file mode 100644
index f6ce61c..0000000
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandler.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.spring.config;
-
-import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
-import org.springframework.beans.factory.BeanFactory;
-
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-class TransactionHandler {
- private String name;
- private String beanName;
- private RocketMQLocalTransactionListener bean;
- private BeanFactory beanFactory;
- private ThreadPoolExecutor checkExecutor;
- private RPCHook rpcHook;
-
- public String getBeanName() {
- return beanName;
- }
-
- public void setBeanName(String beanName) {
- this.beanName = beanName;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public RPCHook getRpcHook() {
- return rpcHook;
- }
-
- public void setRpcHook(RPCHook rpcHook) {
- this.rpcHook = rpcHook;
- }
-
- public BeanFactory getBeanFactory() {
- return beanFactory;
- }
-
- public void setBeanFactory(BeanFactory beanFactory) {
- this.beanFactory = beanFactory;
- }
-
- public void setListener(RocketMQLocalTransactionListener listener) {
- this.bean = listener;
- }
-
- public RocketMQLocalTransactionListener getListener() {
- return this.bean;
- }
-
- public void setCheckExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime, int blockingQueueSize) {
- this.checkExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize,
- keepAliveTime, TimeUnit.MILLISECONDS,
- new LinkedBlockingDeque<>(blockingQueueSize));
- }
-
- public ThreadPoolExecutor getCheckExecutor() {
- return checkExecutor;
- }
-}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandlerRegistry.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandlerRegistry.java
deleted file mode 100644
index 0ec0161..0000000
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandlerRegistry.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.spring.config;
-
-import io.netty.util.internal.ConcurrentSet;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.spring.core.RocketMQTemplate;
-import org.springframework.beans.factory.DisposableBean;
-
-import java.util.Set;
-
-public class TransactionHandlerRegistry implements DisposableBean {
- private RocketMQTemplate rocketMQTemplate;
-
- private final Set<String> listenerContainers = new ConcurrentSet<>();
-
- public TransactionHandlerRegistry(RocketMQTemplate template) {
- this.rocketMQTemplate = template;
- }
-
- @Override
- public void destroy() throws Exception {
- listenerContainers.clear();
- }
-
- public void registerTransactionHandler(TransactionHandler handler)
- throws MQClientException {
- if (listenerContainers.contains(handler.getName())) {
- throw new MQClientException(-1, String.format(
- "The transaction name [%s] has been defined in TransactionListener [%s]",
- handler.getName(), handler.getBeanName()));
- }
- listenerContainers.add(handler.getName());
- rocketMQTemplate.createAndStartTransactionMQProducer(handler.getName(),
- handler.getListener(), handler.getCheckExecutor(), handler.getRpcHook());
- }
-}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index a2431cf..dd34155 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -17,23 +17,18 @@
package org.apache.rocketmq.spring.core;
+import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
-import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
-import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.spring.config.RocketMQConfigUtils;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,11 +40,8 @@
import org.springframework.messaging.core.AbstractMessageSendingTemplate;
import org.springframework.messaging.core.MessagePostProcessor;
import org.springframework.messaging.support.MessageBuilder;
-import org.springframework.util.Assert;
import org.springframework.util.MimeTypeUtils;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
@SuppressWarnings({"WeakerAccess", "unused"})
public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean {
private static final Logger log = LoggerFactory.getLogger(RocketMQTemplate.class);
@@ -63,8 +55,6 @@
private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash();
- private final Map<String, TransactionMQProducer> cache = new ConcurrentHashMap<>(); //only put TransactionMQProducer by now!!!
-
public DefaultMQProducer getProducer() {
return producer;
}
@@ -529,131 +519,27 @@
if (Objects.nonNull(producer)) {
producer.shutdown();
}
- for (Map.Entry<String, TransactionMQProducer> kv : cache.entrySet()) {
- if (Objects.nonNull(kv.getValue())) {
- kv.getValue().shutdown();
- }
- }
- cache.clear();
- }
-
- private String getTxProducerGroupName(String name) {
- return name == null ? RocketMQConfigUtils.ROCKETMQ_TRANSACTION_DEFAULT_GLOBAL_NAME : name;
- }
-
- private TransactionMQProducer stageMQProducer(String name) throws MessagingException {
- name = getTxProducerGroupName(name);
-
- TransactionMQProducer cachedProducer = cache.get(name);
- if (cachedProducer == null) {
- throw new MessagingException(
- String.format("Can not found MQProducer '%s' in cache! please define @RocketMQLocalTransactionListener class or invoke createOrGetStartedTransactionMQProducer() to create it firstly", name));
- }
-
- return cachedProducer;
}
/**
* Send Spring Message in Transaction
*
- * @param txProducerGroup the validate txProducerGroup name, set null if using the default name
* @param destination destination formats: `topicName:tags`
* @param message message {@link org.springframework.messaging.Message}
* @param arg ext arg
* @return TransactionSendResult
* @throws MessagingException
*/
- public TransactionSendResult sendMessageInTransaction(final String txProducerGroup, final String destination,
+ public TransactionSendResult sendMessageInTransaction(final String destination,
final Message<?> message, final Object arg) throws MessagingException {
try {
- TransactionMQProducer txProducer = this.stageMQProducer(txProducerGroup);
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
- return txProducer.sendMessageInTransaction(rocketMsg, arg);
+ return producer.sendMessageInTransaction(rocketMsg, arg);
} catch (MQClientException e) {
throw RocketMQUtil.convert(e);
}
}
- /**
- * Remove a TransactionMQProducer from cache by manual.
- * <p>Note: RocketMQTemplate can release all cached producers when bean destroying, it is not recommended to
- * directly use this method by user.
- *
- * @param txProducerGroup
- * @throws MessagingException
- */
- public void removeTransactionMQProducer(String txProducerGroup) throws MessagingException {
- txProducerGroup = getTxProducerGroupName(txProducerGroup);
- if (cache.containsKey(txProducerGroup)) {
- DefaultMQProducer cachedProducer = cache.get(txProducerGroup);
- cachedProducer.shutdown();
- cache.remove(txProducerGroup);
- }
- }
-
- /**
- * Create and start a transaction MQProducer, this new producer is cached in memory.
- * <p>Note: This method is invoked internally when processing {@code @RocketMQLocalTransactionListener}, it is not
- * recommended to directly use this method by user.
- *
- * @param txProducerGroup Producer (group) name, unique for each producer
- * @param transactionListener TransactoinListener impl class
- * @param executorService Nullable.
- * @param rpcHook Nullable.
- * @return true if producer is created and started; false if the named producer already exists in cache.
- * @throws MessagingException
- */
- public boolean createAndStartTransactionMQProducer(String txProducerGroup,
- RocketMQLocalTransactionListener transactionListener,
- ExecutorService executorService, RPCHook rpcHook) throws MessagingException {
- txProducerGroup = getTxProducerGroupName(txProducerGroup);
- if (cache.containsKey(txProducerGroup)) {
- log.info(String.format("get TransactionMQProducer '%s' from cache", txProducerGroup));
- return false;
- }
-
- TransactionMQProducer txProducer = createTransactionMQProducer(txProducerGroup, transactionListener, executorService, rpcHook);
- try {
- txProducer.start();
- cache.put(txProducerGroup, txProducer);
- } catch (MQClientException e) {
- throw RocketMQUtil.convert(e);
- }
-
- return true;
- }
-
- private TransactionMQProducer createTransactionMQProducer(String name,
- RocketMQLocalTransactionListener transactionListener,
- ExecutorService executorService, RPCHook rpcHook) {
- Assert.notNull(producer, "Property 'producer' is required");
- Assert.notNull(transactionListener, "Parameter 'transactionListener' is required");
- TransactionMQProducer txProducer;
- if (Objects.nonNull(rpcHook)) {
- txProducer = new TransactionMQProducer(name, rpcHook);
- txProducer.setVipChannelEnabled(false);
- txProducer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, name));
- } else {
- txProducer = new TransactionMQProducer(name);
- }
- txProducer.setTransactionListener(RocketMQUtil.convert(transactionListener));
-
- txProducer.setNamespace(producer.getNamespace());
- txProducer.setNamesrvAddr(producer.getNamesrvAddr());
- if (executorService != null) {
- txProducer.setExecutorService(executorService);
- }
-
- txProducer.setSendMsgTimeout(producer.getSendMsgTimeout());
- txProducer.setRetryTimesWhenSendFailed(producer.getRetryTimesWhenSendFailed());
- txProducer.setRetryTimesWhenSendAsyncFailed(producer.getRetryTimesWhenSendAsyncFailed());
- txProducer.setMaxMessageSize(producer.getMaxMessageSize());
- txProducer.setCompressMsgBodyOverHowmuch(producer.getCompressMsgBodyOverHowmuch());
- txProducer.setRetryAnotherBrokerWhenNotStoreOK(producer.isRetryAnotherBrokerWhenNotStoreOK());
-
- return txProducer;
- }
-
private org.apache.rocketmq.common.message.Message createRocketMqMessage(
String destination, Message<?> message) {
Message<?> msg = this.doConvert(message.getPayload(), message.getHeaders(), null);
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
index b4c7437..c2e9ad9 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
@@ -95,47 +95,6 @@
}
@Test
- public void testRocketMQListenerContainer() {
- runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876").
- withUserConfiguration(TestConfig.class).
- run((context) -> {
- // No producer on consume side
- assertThat(context).doesNotHaveBean(DefaultMQProducer.class);
- // Auto-create consume container if existing Bean annotated with @RocketMQMessageListener
- assertThat(context).hasBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1");
- assertThat(context).hasBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_2");
- assertThat(context).getBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1").
- hasFieldOrPropertyWithValue("nameServer", "127.0.0.1:9876");
- assertThat(context).getBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_2").
- hasFieldOrPropertyWithValue("nameServer", "127.0.1.1:9876");
- });
-
- }
-
- @Test
- public void testRocketMQListenerWithCustomObjectMapper() {
- runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876").
- withUserConfiguration(TestConfig.class, CustomObjectMapperConfig.class).
- run((context) -> {
- assertThat(context.getBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1",
- DefaultRocketMQListenerContainer.class).getMessageConverter())
- .isSameAs(context.getBean(CustomObjectMapperConfig.class).rocketMQMessageConverter().getMessageConverter());
- });
- }
-
- @Test
- public void testRocketMQListenerWithSeveralObjectMappers() {
- runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876").
- withUserConfiguration(TestConfig.class, CustomObjectMappersConfig.class).
- run((context) -> {
- assertThat(context.getBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1",
- DefaultRocketMQListenerContainer.class).getMessageConverter())
- .isSameAs(context.getBean(CustomObjectMappersConfig.class).rocketMQMessageConverter().getMessageConverter());
- });
- }
-
-
- @Test
public void testExtRocketMQTemplate() {
runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876").
withUserConfiguration(TestExtRocketMQTemplateConfig.class, CustomObjectMappersConfig.class).
@@ -301,7 +260,7 @@
}
- @RocketMQTransactionListener(txProducerGroup = "${demo.rocketmq.transaction.producer.group}")
+ @RocketMQTransactionListener
static class TestRocketMQLocalTransactionListener implements RocketMQLocalTransactionListener {