Merge pull request #179 from RongtongJin/polish_transaction

[ISSUE #178] Refactor transaction message implementation
diff --git a/rocketmq-spring-boot-samples/pom.xml b/rocketmq-spring-boot-samples/pom.xml
index c513a5d..983a2d4 100644
--- a/rocketmq-spring-boot-samples/pom.xml
+++ b/rocketmq-spring-boot-samples/pom.xml
@@ -38,7 +38,7 @@
     </modules>
 
     <properties>
-        <rocketmq-spring-boot-starter-version>2.0.4-SNAPSHOT</rocketmq-spring-boot-starter-version>
+        <rocketmq-spring-boot-starter-version>2.0.5-SNAPSHOT</rocketmq-spring-boot-starter-version>
     </properties>
 
     <dependencies>
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerACLApplication.java b/rocketmq-spring-boot-samples/rocketmq-produce-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerACLApplication.java
index 498c2e1..486a413 100644
--- a/rocketmq-spring-boot-samples/rocketmq-produce-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerACLApplication.java
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerACLApplication.java
@@ -41,7 +41,6 @@
  */
 @SpringBootApplication
 public class ProducerACLApplication implements CommandLineRunner {
-    private static final String TX_PGROUP_NAME = "myTxProducerGroup";
     @Resource
     private RocketMQTemplate rocketMQTemplate;
     @Value("${demo.rocketmq.transTopic}")
@@ -75,7 +74,7 @@
 
                 Message msg = MessageBuilder.withPayload("Hello RocketMQ " + i).
                     setHeader(RocketMQHeaders.TRANSACTION_ID, "KEY_" + i).build();
-                SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(TX_PGROUP_NAME,
+                SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(
                     springTransTopic + ":" + tags[i % tags.length], msg, null);
                 System.out.printf("------ send Transactional msg body = %s , sendResult=%s %n",
                     msg.getPayload(), sendResult.getSendStatus());
@@ -87,11 +86,7 @@
         }
     }
 
-    @RocketMQTransactionListener(
-        txProducerGroup = TX_PGROUP_NAME,
-        accessKey = "AK", // if not setting, it will read by `rocketmq.producer.access-key` key
-        secretKey = "SK"  // if not setting, it will read by `rocketmq.producer.secret-key` key
-        )
+    @RocketMQTransactionListener
     class TransactionListenerImpl implements RocketMQLocalTransactionListener {
         private AtomicInteger transactionIndex = new AtomicInteger(0);
 
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
index 8abc4ff..4771bea 100644
--- a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
@@ -48,7 +48,6 @@
  */
 @SpringBootApplication
 public class ProducerApplication implements CommandLineRunner {
-    private static final String TX_PGROUP_NAME = "myTxProducerGroup";
     @Resource
     private RocketMQTemplate rocketMQTemplate;
     @Value("${demo.rocketmq.transTopic}")
@@ -136,7 +135,7 @@
 
                 Message msg = MessageBuilder.withPayload("Hello RocketMQ " + i).
                     setHeader(RocketMQHeaders.TRANSACTION_ID, "KEY_" + i).build();
-                SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(TX_PGROUP_NAME,
+                SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(
                     springTransTopic + ":" + tags[i % tags.length], msg, null);
                 System.out.printf("------ send Transactional msg body = %s , sendResult=%s %n",
                     msg.getPayload(), sendResult.getSendStatus());
@@ -148,7 +147,7 @@
         }
     }
 
-    @RocketMQTransactionListener(txProducerGroup = TX_PGROUP_NAME)
+    @RocketMQTransactionListener
     class TransactionListenerImpl implements RocketMQLocalTransactionListener {
         private AtomicInteger transactionIndex = new AtomicInteger(0);
 
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQTransactionListener.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQTransactionListener.java
index f3f874c..c65ffc6 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQTransactionListener.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQTransactionListener.java
@@ -17,14 +17,12 @@
 
 package org.apache.rocketmq.spring.annotation;
 
-import org.apache.rocketmq.spring.config.RocketMQConfigUtils;
-import org.springframework.stereotype.Component;
-
 import java.lang.annotation.Documented;
 import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
+import org.springframework.stereotype.Component;
 
 /**
  * This annotation is used over a class which implements interface
@@ -40,14 +38,6 @@
 public @interface RocketMQTransactionListener {
 
     /**
-     * Declare the txProducerGroup that is used to relate callback event to the listener, rocketMQTemplate must send a
-     * transactional message with the declared txProducerGroup.
-     * <p>
-     * <p>It is suggested to use the default txProducerGroup if your system only needs to define a TransactionListener class.
-     */
-    String txProducerGroup() default RocketMQConfigUtils.ROCKETMQ_TRANSACTION_DEFAULT_GLOBAL_NAME;
-
-    /**
      * Set ExecutorService params -- corePoolSize
      */
     int corePoolSize() default 1;
@@ -67,13 +57,4 @@
      */
     int blockingQueueSize() default 2000;
 
-    /**
-     * The property of "access-key"
-     */
-    String accessKey() default "${rocketmq.producer.access-key}";
-
-    /**
-     * The property of "secret-key"
-     */
-    String secretKey() default "${rocketmq.producer.secret-key}";
 }
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
index dbe697b..f4abf72 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
@@ -17,16 +17,19 @@
 
 package org.apache.rocketmq.spring.autoconfigure;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.lang.reflect.Field;
 import javax.annotation.PostConstruct;
-
 import org.apache.rocketmq.acl.common.AclClientRPCHook;
 import org.apache.rocketmq.acl.common.SessionCredentials;
 import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.client.MQAdmin;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.TransactionMQProducer;
+import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
+import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl;
 import org.apache.rocketmq.spring.config.RocketMQConfigUtils;
 import org.apache.rocketmq.spring.config.RocketMQTransactionAnnotationProcessor;
-import org.apache.rocketmq.spring.config.TransactionHandlerRegistry;
 import org.apache.rocketmq.spring.core.RocketMQTemplate;
 import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
 import org.slf4j.Logger;
@@ -48,8 +51,6 @@
 import org.springframework.util.Assert;
 import org.springframework.util.StringUtils;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-
 @Configuration
 @EnableConfigurationProperties(RocketMQProperties.class)
 @ConditionalOnClass({MQAdmin.class, ObjectMapper.class})
@@ -86,14 +87,26 @@
         DefaultMQProducer producer;
         String ak = rocketMQProperties.getProducer().getAccessKey();
         String sk = rocketMQProperties.getProducer().getSecretKey();
-        if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
-            producer = new DefaultMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)),
-                rocketMQProperties.getProducer().isEnableMsgTrace(),
-                rocketMQProperties.getProducer().getCustomizedTraceTopic());
+        boolean isEnableAcl = !StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk);
+        if (isEnableAcl) {
+            producer = new TransactionMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)));
             producer.setVipChannelEnabled(false);
         } else {
-            producer = new DefaultMQProducer(groupName, rocketMQProperties.getProducer().isEnableMsgTrace(),
-                rocketMQProperties.getProducer().getCustomizedTraceTopic());
+            producer = new TransactionMQProducer(groupName);
+        }
+
+        if (rocketMQProperties.getProducer().isEnableMsgTrace()) {
+            try {
+                AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(rocketMQProperties.getProducer().getCustomizedTraceTopic(), isEnableAcl ? new AclClientRPCHook(new SessionCredentials(ak, sk)) : null);
+                dispatcher.setHostProducer(producer.getDefaultMQProducerImpl());
+                Field field = DefaultMQProducer.class.getDeclaredField("traceDispatcher");
+                field.setAccessible(true);
+                field.set(producer, dispatcher);
+                producer.getDefaultMQProducerImpl().registerSendMessageHook(
+                    new SendMessageTraceHookImpl(dispatcher));
+            } catch (Throwable e) {
+                log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
+            }
         }
 
         producer.setNamesrvAddr(nameServer);
@@ -123,20 +136,13 @@
         return rocketMQTemplate;
     }
 
-    @Bean
-    @ConditionalOnBean(name = RocketMQConfigUtils.ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME)
-    @ConditionalOnMissingBean(TransactionHandlerRegistry.class)
-    public TransactionHandlerRegistry transactionHandlerRegistry(
-        @Qualifier(RocketMQConfigUtils.ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME) RocketMQTemplate template) {
-        return new TransactionHandlerRegistry(template);
-    }
 
     @Bean(name = RocketMQConfigUtils.ROCKETMQ_TRANSACTION_ANNOTATION_PROCESSOR_BEAN_NAME)
-    @ConditionalOnBean(TransactionHandlerRegistry.class)
+    @ConditionalOnBean(name = RocketMQConfigUtils.ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME)
     @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
     public static RocketMQTransactionAnnotationProcessor transactionAnnotationProcessor(
-        TransactionHandlerRegistry transactionHandlerRegistry) {
-        return new RocketMQTransactionAnnotationProcessor(transactionHandlerRegistry);
+        @Qualifier(RocketMQConfigUtils.ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME) RocketMQTemplate template) {
+        return new RocketMQTransactionAnnotationProcessor(template);
     }
 
 }
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/RocketMQTransactionAnnotationProcessor.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/RocketMQTransactionAnnotationProcessor.java
index 2713bad..9008d2d 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/RocketMQTransactionAnnotationProcessor.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/RocketMQTransactionAnnotationProcessor.java
@@ -17,11 +17,20 @@
 
 package org.apache.rocketmq.spring.config;
 
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.client.producer.TransactionMQProducer;
 import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
 import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
 import org.apache.rocketmq.spring.support.RocketMQUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.aop.support.AopUtils;
 import org.springframework.beans.BeansException;
 import org.springframework.beans.factory.BeanCreationException;
@@ -31,13 +40,6 @@
 import org.springframework.core.Ordered;
 import org.springframework.core.annotation.AnnotationUtils;
 
-import java.util.Collections;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 public class RocketMQTransactionAnnotationProcessor
     implements BeanPostProcessor, Ordered, ApplicationContextAware {
     private final static Logger log = LoggerFactory.getLogger(RocketMQTransactionAnnotationProcessor.class);
@@ -46,10 +48,10 @@
     private final Set<Class<?>> nonProcessedClasses =
         Collections.newSetFromMap(new ConcurrentHashMap<Class<?>, Boolean>(64));
 
-    private TransactionHandlerRegistry transactionHandlerRegistry;
+    private RocketMQTemplate rocketMQTemplate;
 
-    public RocketMQTransactionAnnotationProcessor(TransactionHandlerRegistry transactionHandlerRegistry) {
-        this.transactionHandlerRegistry = transactionHandlerRegistry;
+    public RocketMQTransactionAnnotationProcessor(RocketMQTemplate rocketMQTemplate) {
+        this.rocketMQTemplate = rocketMQTemplate;
     }
 
     @Override
@@ -85,7 +87,7 @@
 
     private void processTransactionListenerAnnotation(RocketMQTransactionListener listener, Object bean)
         throws MQClientException {
-        if (transactionHandlerRegistry == null) {
+        if (rocketMQTemplate == null) {
             throw new MQClientException("Bad usage of @RocketMQTransactionListener, " +
                 "the class must work with RocketMQTemplate", null);
         }
@@ -94,24 +96,11 @@
                 "the class must implement interface RocketMQLocalTransactionListener",
                 null);
         }
-        TransactionHandler transactionHandler = new TransactionHandler();
-        transactionHandler.setBeanFactory(this.applicationContext.getAutowireCapableBeanFactory());
-        transactionHandler.setName(applicationContext.getEnvironment().resolvePlaceholders(listener.txProducerGroup()));
-        transactionHandler.setBeanName(bean.getClass().getName());
-        transactionHandler.setListener((RocketMQLocalTransactionListener) bean);
-        transactionHandler.setCheckExecutor(listener.corePoolSize(), listener.maximumPoolSize(),
-                listener.keepAliveTime(), listener.blockingQueueSize());
 
-        RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(),
-            listener.accessKey(), listener.secretKey());
+        ((TransactionMQProducer) rocketMQTemplate.getProducer()).setExecutorService(new ThreadPoolExecutor(listener.corePoolSize(), listener.maximumPoolSize(),
+            listener.keepAliveTime(), TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(listener.blockingQueueSize())));
+        ((TransactionMQProducer) rocketMQTemplate.getProducer()).setTransactionListener(RocketMQUtil.convert((RocketMQLocalTransactionListener) bean));
 
-        if (Objects.nonNull(rpcHook)) {
-            transactionHandler.setRpcHook(rpcHook);
-        } else {
-            log.debug("Access-key or secret-key not configure in " + listener + ".");
-        }
-
-        transactionHandlerRegistry.registerTransactionHandler(transactionHandler);
     }
 
     @Override
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandler.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandler.java
deleted file mode 100644
index f6ce61c..0000000
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandler.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.spring.config;
-
-import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
-import org.springframework.beans.factory.BeanFactory;
-
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-class TransactionHandler {
-    private String name;
-    private String beanName;
-    private RocketMQLocalTransactionListener bean;
-    private BeanFactory beanFactory;
-    private ThreadPoolExecutor checkExecutor;
-    private RPCHook rpcHook;
-
-    public String getBeanName() {
-        return beanName;
-    }
-
-    public void setBeanName(String beanName) {
-        this.beanName = beanName;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public RPCHook getRpcHook() {
-        return rpcHook;
-    }
-
-    public void setRpcHook(RPCHook rpcHook) {
-        this.rpcHook = rpcHook;
-    }
-
-    public BeanFactory getBeanFactory() {
-        return beanFactory;
-    }
-
-    public void setBeanFactory(BeanFactory beanFactory) {
-        this.beanFactory = beanFactory;
-    }
-
-    public void setListener(RocketMQLocalTransactionListener listener) {
-        this.bean = listener;
-    }
-
-    public RocketMQLocalTransactionListener getListener() {
-        return this.bean;
-    }
-
-    public void setCheckExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime, int blockingQueueSize) {
-        this.checkExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize,
-            keepAliveTime, TimeUnit.MILLISECONDS,
-            new LinkedBlockingDeque<>(blockingQueueSize));
-    }
-
-    public ThreadPoolExecutor getCheckExecutor() {
-        return checkExecutor;
-    }
-}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandlerRegistry.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandlerRegistry.java
deleted file mode 100644
index 0ec0161..0000000
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandlerRegistry.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.spring.config;
-
-import io.netty.util.internal.ConcurrentSet;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.spring.core.RocketMQTemplate;
-import org.springframework.beans.factory.DisposableBean;
-
-import java.util.Set;
-
-public class TransactionHandlerRegistry implements DisposableBean {
-    private RocketMQTemplate rocketMQTemplate;
-
-    private final Set<String> listenerContainers = new ConcurrentSet<>();
-
-    public TransactionHandlerRegistry(RocketMQTemplate template) {
-        this.rocketMQTemplate = template;
-    }
-
-    @Override
-    public void destroy() throws Exception {
-        listenerContainers.clear();
-    }
-
-    public void registerTransactionHandler(TransactionHandler handler)
-        throws MQClientException {
-        if (listenerContainers.contains(handler.getName())) {
-            throw new MQClientException(-1, String.format(
-                "The transaction name [%s] has been defined in TransactionListener [%s]",
-                handler.getName(), handler.getBeanName()));
-        }
-        listenerContainers.add(handler.getName());
-        rocketMQTemplate.createAndStartTransactionMQProducer(handler.getName(),
-            handler.getListener(), handler.getCheckExecutor(), handler.getRpcHook());
-    }
-}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index a2431cf..dd34155 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -17,23 +17,18 @@
 
 package org.apache.rocketmq.spring.core;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.MessageQueueSelector;
 import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.client.producer.SendResult;
-import org.apache.rocketmq.client.producer.TransactionMQProducer;
 import org.apache.rocketmq.client.producer.TransactionSendResult;
 import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
-import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.spring.config.RocketMQConfigUtils;
 import org.apache.rocketmq.spring.support.RocketMQUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,11 +40,8 @@
 import org.springframework.messaging.core.AbstractMessageSendingTemplate;
 import org.springframework.messaging.core.MessagePostProcessor;
 import org.springframework.messaging.support.MessageBuilder;
-import org.springframework.util.Assert;
 import org.springframework.util.MimeTypeUtils;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-
 @SuppressWarnings({"WeakerAccess", "unused"})
 public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean {
     private static final Logger log = LoggerFactory.getLogger(RocketMQTemplate.class);
@@ -63,8 +55,6 @@
 
     private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash();
 
-    private final Map<String, TransactionMQProducer> cache = new ConcurrentHashMap<>(); //only put TransactionMQProducer by now!!!
-
     public DefaultMQProducer getProducer() {
         return producer;
     }
@@ -529,131 +519,27 @@
         if (Objects.nonNull(producer)) {
             producer.shutdown();
         }
-        for (Map.Entry<String, TransactionMQProducer> kv : cache.entrySet()) {
-            if (Objects.nonNull(kv.getValue())) {
-                kv.getValue().shutdown();
-            }
-        }
-        cache.clear();
-    }
-
-    private String getTxProducerGroupName(String name) {
-        return name == null ? RocketMQConfigUtils.ROCKETMQ_TRANSACTION_DEFAULT_GLOBAL_NAME : name;
-    }
-
-    private TransactionMQProducer stageMQProducer(String name) throws MessagingException {
-        name = getTxProducerGroupName(name);
-
-        TransactionMQProducer cachedProducer = cache.get(name);
-        if (cachedProducer == null) {
-            throw new MessagingException(
-                String.format("Can not found MQProducer '%s' in cache! please define @RocketMQLocalTransactionListener class or invoke createOrGetStartedTransactionMQProducer() to create it firstly", name));
-        }
-
-        return cachedProducer;
     }
 
     /**
      * Send Spring Message in Transaction
      *
-     * @param txProducerGroup the validate txProducerGroup name, set null if using the default name
      * @param destination     destination formats: `topicName:tags`
      * @param message         message {@link org.springframework.messaging.Message}
      * @param arg             ext arg
      * @return TransactionSendResult
      * @throws MessagingException
      */
-    public TransactionSendResult sendMessageInTransaction(final String txProducerGroup, final String destination,
+    public TransactionSendResult sendMessageInTransaction(final String destination,
         final Message<?> message, final Object arg) throws MessagingException {
         try {
-            TransactionMQProducer txProducer = this.stageMQProducer(txProducerGroup);
             org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
-            return txProducer.sendMessageInTransaction(rocketMsg, arg);
+            return producer.sendMessageInTransaction(rocketMsg, arg);
         } catch (MQClientException e) {
             throw RocketMQUtil.convert(e);
         }
     }
 
-    /**
-     * Remove a TransactionMQProducer from cache by manual.
-     * <p>Note: RocketMQTemplate can release all cached producers when bean destroying, it is not recommended to
-     * directly use this method by user.
-     *
-     * @param txProducerGroup
-     * @throws MessagingException
-     */
-    public void removeTransactionMQProducer(String txProducerGroup) throws MessagingException {
-        txProducerGroup = getTxProducerGroupName(txProducerGroup);
-        if (cache.containsKey(txProducerGroup)) {
-            DefaultMQProducer cachedProducer = cache.get(txProducerGroup);
-            cachedProducer.shutdown();
-            cache.remove(txProducerGroup);
-        }
-    }
-
-    /**
-     * Create and start a transaction MQProducer, this new producer is cached in memory.
-     * <p>Note: This method is invoked internally when processing {@code @RocketMQLocalTransactionListener}, it is not
-     * recommended to directly use this method by user.
-     *
-     * @param txProducerGroup     Producer (group) name, unique for each producer
-     * @param transactionListener TransactoinListener impl class
-     * @param executorService     Nullable.
-     * @param rpcHook             Nullable.
-     * @return true if producer is created and started; false if the named producer already exists in cache.
-     * @throws MessagingException
-     */
-    public boolean createAndStartTransactionMQProducer(String txProducerGroup,
-        RocketMQLocalTransactionListener transactionListener,
-        ExecutorService executorService, RPCHook rpcHook) throws MessagingException {
-        txProducerGroup = getTxProducerGroupName(txProducerGroup);
-        if (cache.containsKey(txProducerGroup)) {
-            log.info(String.format("get TransactionMQProducer '%s' from cache", txProducerGroup));
-            return false;
-        }
-
-        TransactionMQProducer txProducer = createTransactionMQProducer(txProducerGroup, transactionListener, executorService, rpcHook);
-        try {
-            txProducer.start();
-            cache.put(txProducerGroup, txProducer);
-        } catch (MQClientException e) {
-            throw RocketMQUtil.convert(e);
-        }
-
-        return true;
-    }
-
-    private TransactionMQProducer createTransactionMQProducer(String name,
-        RocketMQLocalTransactionListener transactionListener,
-        ExecutorService executorService, RPCHook rpcHook) {
-        Assert.notNull(producer, "Property 'producer' is required");
-        Assert.notNull(transactionListener, "Parameter 'transactionListener' is required");
-        TransactionMQProducer txProducer;
-        if (Objects.nonNull(rpcHook)) {
-            txProducer = new TransactionMQProducer(name, rpcHook);
-            txProducer.setVipChannelEnabled(false);
-            txProducer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, name));
-        } else {
-            txProducer = new TransactionMQProducer(name);
-        }
-        txProducer.setTransactionListener(RocketMQUtil.convert(transactionListener));
-
-        txProducer.setNamespace(producer.getNamespace());
-        txProducer.setNamesrvAddr(producer.getNamesrvAddr());
-        if (executorService != null) {
-            txProducer.setExecutorService(executorService);
-        }
-
-        txProducer.setSendMsgTimeout(producer.getSendMsgTimeout());
-        txProducer.setRetryTimesWhenSendFailed(producer.getRetryTimesWhenSendFailed());
-        txProducer.setRetryTimesWhenSendAsyncFailed(producer.getRetryTimesWhenSendAsyncFailed());
-        txProducer.setMaxMessageSize(producer.getMaxMessageSize());
-        txProducer.setCompressMsgBodyOverHowmuch(producer.getCompressMsgBodyOverHowmuch());
-        txProducer.setRetryAnotherBrokerWhenNotStoreOK(producer.isRetryAnotherBrokerWhenNotStoreOK());
-
-        return txProducer;
-    }
-
     private org.apache.rocketmq.common.message.Message createRocketMqMessage(
         String destination, Message<?> message) {
         Message<?> msg = this.doConvert(message.getPayload(), message.getHeaders(), null);
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
index b4c7437..c2e9ad9 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
@@ -95,47 +95,6 @@
     }
 
     @Test
-    public void testRocketMQListenerContainer() {
-        runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876").
-                withUserConfiguration(TestConfig.class).
-                run((context) -> {
-                    // No producer on consume side
-                    assertThat(context).doesNotHaveBean(DefaultMQProducer.class);
-                    // Auto-create consume container if existing Bean annotated with @RocketMQMessageListener
-                    assertThat(context).hasBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1");
-                    assertThat(context).hasBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_2");
-                    assertThat(context).getBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1").
-                            hasFieldOrPropertyWithValue("nameServer", "127.0.0.1:9876");
-                    assertThat(context).getBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_2").
-                            hasFieldOrPropertyWithValue("nameServer", "127.0.1.1:9876");
-                });
-
-    }
-
-    @Test
-    public void testRocketMQListenerWithCustomObjectMapper() {
-        runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876").
-                withUserConfiguration(TestConfig.class, CustomObjectMapperConfig.class).
-                run((context) -> {
-                    assertThat(context.getBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1",
-                            DefaultRocketMQListenerContainer.class).getMessageConverter())
-                            .isSameAs(context.getBean(CustomObjectMapperConfig.class).rocketMQMessageConverter().getMessageConverter());
-                });
-    }
-
-    @Test
-    public void testRocketMQListenerWithSeveralObjectMappers() {
-        runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876").
-                withUserConfiguration(TestConfig.class, CustomObjectMappersConfig.class).
-                run((context) -> {
-                    assertThat(context.getBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1",
-                            DefaultRocketMQListenerContainer.class).getMessageConverter())
-                            .isSameAs(context.getBean(CustomObjectMappersConfig.class).rocketMQMessageConverter().getMessageConverter());
-                });
-    }
-
-
-    @Test
     public void testExtRocketMQTemplate() {
         runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876").
                 withUserConfiguration(TestExtRocketMQTemplateConfig.class, CustomObjectMappersConfig.class).
@@ -301,7 +260,7 @@
 
     }
 
-    @RocketMQTransactionListener(txProducerGroup = "${demo.rocketmq.transaction.producer.group}")
+    @RocketMQTransactionListener
     static class TestRocketMQLocalTransactionListener implements RocketMQLocalTransactionListener {