Merge pull request #152 from zkzlx/message

[ISSUE #147]An enhancement about the convert in RocketMQTemplate
diff --git a/rocketmq-spring-boot-samples/pom.xml b/rocketmq-spring-boot-samples/pom.xml
index e348e15..c513a5d 100644
--- a/rocketmq-spring-boot-samples/pom.xml
+++ b/rocketmq-spring-boot-samples/pom.xml
@@ -38,7 +38,7 @@
     </modules>
 
     <properties>
-        <rocketmq-spring-boot-starter-version>2.0.3</rocketmq-spring-boot-starter-version>
+        <rocketmq-spring-boot-starter-version>2.0.4-SNAPSHOT</rocketmq-spring-boot-starter-version>
     </properties>
 
     <dependencies>
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/MessageExtConsumer.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/MessageExtConsumer.java
index 59ef263..3fa0b31 100644
--- a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/MessageExtConsumer.java
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/MessageExtConsumer.java
@@ -30,7 +30,7 @@
  * MessageExtConsumer, consume listener impl class.
  */
 @Service
-@RocketMQMessageListener(topic = "message-ext-topic", selectorExpression = "tag1", consumerGroup = "${spring.application.name}-message-ext-consumer")
+@RocketMQMessageListener(topic = "${demo.rocketmq.msgExtTopic}", selectorExpression = "tag0||tag1", consumerGroup = "${spring.application.name}-message-ext-consumer")
 public class MessageExtConsumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {
     @Override
     public void onMessage(MessageExt message) {
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/OrderPaidEventConsumer.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/OrderPaidEventConsumer.java
index 677e0ea..04cb17f 100644
--- a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/OrderPaidEventConsumer.java
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/OrderPaidEventConsumer.java
@@ -31,6 +31,6 @@
 
     @Override
     public void onMessage(OrderPaidEvent orderPaidEvent) {
-        System.out.printf("------- OrderPaidEventConsumer received: %s \n", orderPaidEvent);
+        System.out.printf("------- OrderPaidEventConsumer received: %s [orderId : %s]\n", orderPaidEvent,orderPaidEvent.getOrderId());
     }
 }
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/UserConsumer.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/UserConsumer.java
new file mode 100644
index 0000000..b9e3647
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/UserConsumer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot.consumer;
+
+import org.apache.rocketmq.samples.springboot.domain.User;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.springframework.stereotype.Service;
+
+/**
+ * RocketMQMessageListener
+ */
+@Service
+@RocketMQMessageListener(nameServer = "${demo.rocketmq.myNameServer}", topic = "${demo.rocketmq.topic.user}", consumerGroup = "user_consumer")
+public class UserConsumer implements RocketMQListener<User> {
+    @Override
+    public void onMessage(User message) {
+
+        System.out.printf("######## user_consumer received: %s ; age: %s ; name: %s \n", message, message.getUserAge(), message.getUserName());
+    }
+}
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/User.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/User.java
new file mode 100644
index 0000000..4f2579f
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/User.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot.domain;
+
+public class User {
+    private String userName;
+    private Byte userAge;
+
+    public String getUserName() {
+        return userName;
+    }
+
+    public User setUserName(String userName) {
+        this.userName = userName;
+        return this;
+    }
+
+    public Byte getUserAge() {
+        return userAge;
+    }
+
+    public User setUserAge(Byte userAge) {
+        this.userAge = userAge;
+        return this;
+    }
+
+    @Override
+    public String toString() {
+        return "User{" +
+            "userName='" + userName + '\'' +
+            ", userAge=" + userAge +
+            '}';
+    }
+}
\ No newline at end of file
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties
index 404cb10..583cde4 100644
--- a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties
@@ -7,6 +7,7 @@
 demo.rocketmq.orderTopic=order-paid-topic
 demo.rocketmq.msgExtTopic=message-ext-topic
 demo.rocketmq.transTopic=spring-transaction-topic
+demo.rocketmq.topic.user=user-topic
 
 # another nameserver different global
 demo.rocketmq.myNameServer=127.0.0.1:9876
\ No newline at end of file
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
index 8ab42e1..8abc4ff 100644
--- a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
@@ -20,6 +20,7 @@
 import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.samples.springboot.domain.OrderPaidEvent;
+import org.apache.rocketmq.samples.springboot.domain.User;
 import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
 import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
 import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
@@ -30,8 +31,10 @@
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
 import org.springframework.messaging.MessagingException;
 import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.util.MimeTypeUtils;
 
 import javax.annotation.Resource;
 import java.math.BigDecimal;
@@ -52,6 +55,9 @@
     private String springTransTopic;
     @Value("${demo.rocketmq.topic}")
     private String springTopic;
+    @Value("${demo.rocketmq.topic.user}")
+    private String userTopic;
+
     @Value("${demo.rocketmq.orderTopic}")
     private String orderPaidTopic;
     @Value("${demo.rocketmq.msgExtTopic}")
@@ -69,8 +75,15 @@
         SendResult sendResult = rocketMQTemplate.syncSend(springTopic, "Hello, World!");
         System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);
 
+        sendResult = rocketMQTemplate.syncSend(userTopic, new User().setUserAge((byte)18).setUserName("Kitty"));
+        System.out.printf("syncSend1 to topic %s sendResult=%s %n", userTopic, sendResult);
+
+        sendResult = rocketMQTemplate.syncSend(userTopic, MessageBuilder.withPayload(
+            new User().setUserAge((byte)21).setUserName("Lester")).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE).build());
+        System.out.printf("syncSend1 to topic %s sendResult=%s %n", userTopic, sendResult);
+
         // Use the extRocketMQTemplate
-        sendResult = extRocketMQTemplate.syncSend(springTopic, "Hello, World!");
+        sendResult = extRocketMQTemplate.syncSend(springTopic, MessageBuilder.withPayload("Hello, World!2222".getBytes()).build());
         System.out.printf("extRocketMQTemplate.syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);
 
         // Send string with spring Message
@@ -79,10 +92,12 @@
 
         // Send user-defined object
         rocketMQTemplate.asyncSend(orderPaidTopic, new OrderPaidEvent("T_001", new BigDecimal("88.00")), new SendCallback() {
+            @Override
             public void onSuccess(SendResult var1) {
                 System.out.printf("async onSucess SendResult=%s %n", var1);
             }
 
+            @Override
             public void onException(Throwable var1) {
                 System.out.printf("async onException Throwable=%s %n", var1);
             }
@@ -95,7 +110,6 @@
         rocketMQTemplate.convertAndSend(msgExtTopic + ":tag1", "I'm from tag1");
         System.out.printf("syncSend topic %s tag %s %n", msgExtTopic, "tag1");
 
-
         // Send a batch of strings
         testBatchMessages();
 
@@ -107,7 +121,7 @@
         List<Message> msgs = new ArrayList<Message>();
         for (int i = 0; i < 10; i++) {
             msgs.add(MessageBuilder.withPayload("Hello RocketMQ Batch Msg#" + i).
-                    setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());
+                setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());
         }
 
         SendResult sr = rocketMQTemplate.syncSend(springTopic, msgs, 60000);
@@ -115,9 +129,8 @@
         System.out.printf("--- Batch messages send result :" + sr);
     }
 
-
     private void testTransaction() throws MessagingException {
-        String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
+        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
         for (int i = 0; i < 10; i++) {
             try {
 
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/User.java b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/User.java
new file mode 100644
index 0000000..42acc11
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/User.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot.domain;
+
+public class User {
+        private String userName;
+        private Byte userAge;
+
+        public String getUserName() {
+            return userName;
+        }
+
+        public User setUserName(String userName) {
+            this.userName = userName;
+            return this;
+        }
+
+        public Byte getUserAge() {
+            return userAge;
+        }
+
+        public User setUserAge(Byte userAge) {
+            this.userAge = userAge;
+            return this;
+        }
+
+        @Override
+        public String toString() {
+            return "User{" +
+                "userName='" + userName + '\'' +
+                ", userAge=" + userAge +
+                '}';
+        }
+    }
\ No newline at end of file
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/application.properties b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/application.properties
index 3a68505..27a3abc 100644
--- a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/application.properties
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/application.properties
@@ -7,5 +7,6 @@
 demo.rocketmq.orderTopic=order-paid-topic
 demo.rocketmq.msgExtTopic=message-ext-topic
 demo.rocketmq.transTopic=spring-transaction-topic
+demo.rocketmq.topic.user=user-topic
 
 demo.rocketmq.extNameServer=127.0.0.1:9876
\ No newline at end of file
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
index 192bfc9..0e1e37e 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
@@ -17,13 +17,16 @@
 
 package org.apache.rocketmq.spring.autoconfigure;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Map;
+import java.util.Objects;
+
 import org.apache.rocketmq.acl.common.AclClientRPCHook;
 import org.apache.rocketmq.acl.common.SessionCredentials;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
 import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.aop.framework.AopProxyUtils;
@@ -38,9 +41,7 @@
 import org.springframework.core.env.StandardEnvironment;
 import org.springframework.util.StringUtils;
 
-import java.util.Map;
-import java.util.Objects;
-
+import com.fasterxml.jackson.databind.ObjectMapper;
 
 @Configuration
 public class ExtProducerResetConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
@@ -53,10 +54,12 @@
     private RocketMQProperties rocketMQProperties;
 
     private ObjectMapper objectMapper;
+    private RocketMQMessageConverter rocketMQMessageConverter;
 
     public ExtProducerResetConfiguration(ObjectMapper rocketMQMessageObjectMapper,
-                                             StandardEnvironment environment,
-                                             RocketMQProperties rocketMQProperties) {
+        RocketMQMessageConverter rocketMQMessageConverter,
+        StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
+        this.rocketMQMessageConverter = rocketMQMessageConverter;
         this.objectMapper = rocketMQMessageObjectMapper;
         this.environment = environment;
         this.rocketMQProperties = rocketMQProperties;
@@ -64,7 +67,7 @@
 
     @Override
     public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
-        this.applicationContext = (ConfigurableApplicationContext) applicationContext;
+        this.applicationContext = (ConfigurableApplicationContext)applicationContext;
     }
 
     @Override
@@ -84,7 +87,7 @@
         }
 
         ExtRocketMQTemplateConfiguration annotation = clazz.getAnnotation(ExtRocketMQTemplateConfiguration.class);
-        GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
+        GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext;
         validate(annotation, genericApplicationContext);
 
         DefaultMQProducer mqProducer = createProducer(annotation);
@@ -94,13 +97,12 @@
             mqProducer.start();
         } catch (MQClientException e) {
             throw new BeanDefinitionValidationException(String.format("Failed to startup MQProducer for RocketMQTemplate {}",
-                    beanName), e);
+                beanName), e);
         }
-        RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) bean;
+        RocketMQTemplate rocketMQTemplate = (RocketMQTemplate)bean;
         rocketMQTemplate.setProducer(mqProducer);
+        rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
         rocketMQTemplate.setObjectMapper(objectMapper);
-
-
         log.info("Set real producer to :{} {}", beanName, annotation.value());
     }
 
@@ -124,7 +126,7 @@
 
         if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
             producer = new DefaultMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)),
-                    annotation.enableMsgTrace(), customizedTraceTopic);
+                annotation.enableMsgTrace(), customizedTraceTopic);
             producer.setVipChannelEnabled(false);
         } else {
             producer = new DefaultMQProducer(groupName, annotation.enableMsgTrace(), customizedTraceTopic);
@@ -141,18 +143,19 @@
         return producer;
     }
 
-    private void validate(ExtRocketMQTemplateConfiguration annotation, GenericApplicationContext genericApplicationContext) {
+    private void validate(ExtRocketMQTemplateConfiguration annotation,
+        GenericApplicationContext genericApplicationContext) {
         if (genericApplicationContext.isBeanNameInUse(annotation.value())) {
             throw new BeanDefinitionValidationException(String.format("Bean {} has been used in Spring Application Context, " +
-                            "please check the @ExtRocketMQTemplateConfiguration",
-                    annotation.value()));
+                    "please check the @ExtRocketMQTemplateConfiguration",
+                annotation.value()));
         }
 
         if (rocketMQProperties.getNameServer() == null ||
-                rocketMQProperties.getNameServer().equals(environment.resolvePlaceholders(annotation.nameServer()))) {
+            rocketMQProperties.getNameServer().equals(environment.resolvePlaceholders(annotation.nameServer()))) {
             throw new BeanDefinitionValidationException(
-                    "Bad annotation definition in @ExtRocketMQTemplateConfiguration, nameServer property is same with " +
-                            "global property, please use the default RocketMQTemplate!");
+                "Bad annotation definition in @ExtRocketMQTemplateConfiguration, nameServer property is same with " +
+                    "global property, please use the default RocketMQTemplate!");
         }
     }
 }
\ No newline at end of file
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/JacksonFallbackConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/JacksonFallbackConfiguration.java
index d25ca8d..b6518c0 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/JacksonFallbackConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/JacksonFallbackConfiguration.java
@@ -24,6 +24,7 @@
 
 @Configuration
 @ConditionalOnMissingBean(ObjectMapper.class)
+@Deprecated
 class JacksonFallbackConfiguration {
 
     @Bean
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
index 927bf44..6462604 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
@@ -17,13 +17,18 @@
 
 package org.apache.rocketmq.spring.autoconfigure;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.spring.annotation.ConsumeMode;
 import org.apache.rocketmq.spring.annotation.MessageModel;
 import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
 import org.apache.rocketmq.spring.core.RocketMQListener;
 import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
+import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.aop.framework.AopProxyUtils;
@@ -38,11 +43,7 @@
 import org.springframework.core.env.StandardEnvironment;
 import org.springframework.util.StringUtils;
 
-import java.util.Collections;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicLong;
-
+import com.fasterxml.jackson.databind.ObjectMapper;
 
 @Configuration
 public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
@@ -57,18 +58,20 @@
     private RocketMQProperties rocketMQProperties;
 
     private ObjectMapper objectMapper;
+    private RocketMQMessageConverter rocketMQMessageConverter;
 
     public ListenerContainerConfiguration(ObjectMapper rocketMQMessageObjectMapper,
-        StandardEnvironment environment,
-        RocketMQProperties rocketMQProperties) {
+        RocketMQMessageConverter rocketMQMessageConverter,
+        StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
         this.objectMapper = rocketMQMessageObjectMapper;
+        this.rocketMQMessageConverter = rocketMQMessageConverter;
         this.environment = environment;
         this.rocketMQProperties = rocketMQProperties;
     }
 
     @Override
     public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
-        this.applicationContext = (ConfigurableApplicationContext) applicationContext;
+        this.applicationContext = (ConfigurableApplicationContext)applicationContext;
     }
 
     @Override
@@ -106,7 +109,7 @@
 
         String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
             counter.incrementAndGet());
-        GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
+        GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext;
 
         genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
             () -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
@@ -124,7 +127,8 @@
         log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
     }
 
-    private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean, RocketMQMessageListener annotation) {
+    private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,
+        RocketMQMessageListener annotation) {
         DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();
         
         container.setRocketMQMessageListener(annotation);
@@ -142,8 +146,10 @@
             container.setSelectorExpression(tags);
         }
         container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
-        container.setRocketMQListener((RocketMQListener) bean);
+        container.setRocketMQMessageListener(annotation);
+        container.setRocketMQListener((RocketMQListener)bean);
         container.setObjectMapper(objectMapper);
+        container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
         container.setName(name);  // REVIEW ME, use the same clientId or multiple?
 
         return container;
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/MessageConverterConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/MessageConverterConfiguration.java
new file mode 100644
index 0000000..5f7e419
--- /dev/null
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/MessageConverterConfiguration.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.spring.autoconfigure;
+
+import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * @see RocketMQMessageConverter
+ */
+@Configuration
+@ConditionalOnMissingBean(RocketMQMessageConverter.class)
+class MessageConverterConfiguration {
+
+    @Bean
+    public RocketMQMessageConverter createRocketMQMessageConverter() {
+        return new RocketMQMessageConverter();
+    }
+
+}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
index 5196369..dbe697b 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
@@ -17,7 +17,8 @@
 
 package org.apache.rocketmq.spring.autoconfigure;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
+import javax.annotation.PostConstruct;
+
 import org.apache.rocketmq.acl.common.AclClientRPCHook;
 import org.apache.rocketmq.acl.common.SessionCredentials;
 import org.apache.rocketmq.client.AccessChannel;
@@ -27,6 +28,7 @@
 import org.apache.rocketmq.spring.config.RocketMQTransactionAnnotationProcessor;
 import org.apache.rocketmq.spring.config.TransactionHandlerRegistry;
 import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -37,7 +39,6 @@
 import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -47,14 +48,14 @@
 import org.springframework.util.Assert;
 import org.springframework.util.StringUtils;
 
-import javax.annotation.PostConstruct;
+import com.fasterxml.jackson.databind.ObjectMapper;
 
 @Configuration
 @EnableConfigurationProperties(RocketMQProperties.class)
-@ConditionalOnClass({ MQAdmin.class, ObjectMapper.class })
+@ConditionalOnClass({MQAdmin.class, ObjectMapper.class})
 @ConditionalOnProperty(prefix = "rocketmq", value = "name-server", matchIfMissing = true)
-@Import({ JacksonFallbackConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class })
-@AutoConfigureAfter(JacksonAutoConfiguration.class)
+@Import({JacksonFallbackConfiguration.class, MessageConverterConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class})
+@AutoConfigureAfter({JacksonFallbackConfiguration.class, MessageConverterConfiguration.class})
 public class RocketMQAutoConfiguration {
     private static final Logger log = LoggerFactory.getLogger(RocketMQAutoConfiguration.class);
 
@@ -70,7 +71,6 @@
         }
     }
 
-
     @Bean
     @ConditionalOnMissingBean(DefaultMQProducer.class)
     @ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "producer.group"})
@@ -113,18 +113,21 @@
     @Bean(destroyMethod = "destroy")
     @ConditionalOnBean(DefaultMQProducer.class)
     @ConditionalOnMissingBean(name = RocketMQConfigUtils.ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME)
-    public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer, ObjectMapper rocketMQMessageObjectMapper) {
+    public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer,
+        ObjectMapper rocketMQMessageObjectMapper,
+        RocketMQMessageConverter rocketMQMessageConverter) {
         RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
         rocketMQTemplate.setProducer(mqProducer);
         rocketMQTemplate.setObjectMapper(rocketMQMessageObjectMapper);
+        rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
         return rocketMQTemplate;
     }
 
     @Bean
     @ConditionalOnBean(name = RocketMQConfigUtils.ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME)
     @ConditionalOnMissingBean(TransactionHandlerRegistry.class)
-    public TransactionHandlerRegistry transactionHandlerRegistry(@Qualifier(RocketMQConfigUtils.ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME)
-                                                                             RocketMQTemplate template) {
+    public TransactionHandlerRegistry transactionHandlerRegistry(
+        @Qualifier(RocketMQConfigUtils.ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME) RocketMQTemplate template) {
         return new TransactionHandlerRegistry(template);
     }
 
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandlerRegistry.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandlerRegistry.java
index 7307a31..0ec0161 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandlerRegistry.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/config/TransactionHandlerRegistry.java
@@ -38,15 +38,15 @@
         listenerContainers.clear();
     }
 
-    public void registerTransactionHandler(TransactionHandler handler) throws MQClientException {
+    public void registerTransactionHandler(TransactionHandler handler)
+        throws MQClientException {
         if (listenerContainers.contains(handler.getName())) {
-            throw new MQClientException(-1,
-                String
-                    .format("The transaction name [%s] has been defined in TransactionListener [%s]", handler.getName(),
-                        handler.getBeanName()));
+            throw new MQClientException(-1, String.format(
+                "The transaction name [%s] has been defined in TransactionListener [%s]",
+                handler.getName(), handler.getBeanName()));
         }
         listenerContainers.add(handler.getName());
-
-        rocketMQTemplate.createAndStartTransactionMQProducer(handler.getName(), handler.getListener(), handler.getCheckExecutor(), handler.getRpcHook());
+        rocketMQTemplate.createAndStartTransactionMQProducer(handler.getName(),
+            handler.getListener(), handler.getCheckExecutor(), handler.getRpcHook());
     }
 }
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index 528615d..a2431cf 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -17,8 +17,13 @@
 
 package org.apache.rocketmq.spring.core;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.MessageQueueSelector;
@@ -30,6 +35,8 @@
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.spring.config.RocketMQConfigUtils;
 import org.apache.rocketmq.spring.support.RocketMQUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.DisposableBean;
 import org.springframework.beans.factory.InitializingBean;
 import org.springframework.messaging.Message;
@@ -41,21 +48,15 @@
 import org.springframework.util.Assert;
 import org.springframework.util.MimeTypeUtils;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.databind.ObjectMapper;
 
 @SuppressWarnings({"WeakerAccess", "unused"})
 public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean {
-    private static final  Logger log = LoggerFactory.getLogger(RocketMQTemplate.class);
+    private static final Logger log = LoggerFactory.getLogger(RocketMQTemplate.class);
 
     private DefaultMQProducer producer;
 
+    @Deprecated
     private ObjectMapper objectMapper;
 
     private String charset = "UTF-8";
@@ -72,10 +73,12 @@
         this.producer = producer;
     }
 
+    @Deprecated
     public ObjectMapper getObjectMapper() {
         return objectMapper;
     }
 
+    @Deprecated
     public void setObjectMapper(ObjectMapper objectMapper) {
         this.objectMapper = objectMapper;
     }
@@ -143,19 +146,19 @@
         try {
             long now = System.currentTimeMillis();
             Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>();
-            org.apache.rocketmq.common.message.Message rocketMsg;
-            for (Message msg:messages) {
+            for (Message msg : messages) {
                 if (Objects.isNull(msg) || Objects.isNull(msg.getPayload())) {
                     log.warn("Found a message empty in the batch, skip it");
                     continue;
                 }
-                rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper, charset, destination, msg);
-                rmqMsgs.add(rocketMsg);
+                rmqMsgs.add(this.createRocketMqMessage(destination, msg));
             }
 
             SendResult sendResult = producer.send(rmqMsgs, timeout);
             long costTime = System.currentTimeMillis() - now;
-            log.debug("send messages cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
+            if (log.isDebugEnabled()) {
+                log.debug("send messages cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
+            }
             return sendResult;
         } catch (Exception e) {
             log.error("syncSend with batch failed. destination:{}, messages.size:{} ", destination, messages.size());
@@ -177,17 +180,17 @@
             log.error("syncSend failed. destination:{}, message is null ", destination);
             throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
         }
-
         try {
             long now = System.currentTimeMillis();
-            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
-                charset, destination, message);
+            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
             if (delayLevel > 0) {
                 rocketMsg.setDelayTimeLevel(delayLevel);
             }
             SendResult sendResult = producer.send(rocketMsg, timeout);
             long costTime = System.currentTimeMillis() - now;
-            log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
+            if (log.isDebugEnabled()) {
+                log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
+            }
             return sendResult;
         } catch (Exception e) {
             log.error("syncSend failed. destination:{}, message:{} ", destination, message);
@@ -215,7 +218,7 @@
      * @return {@link SendResult}
      */
     public SendResult syncSend(String destination, Object payload, long timeout) {
-        Message<?> message = this.doConvert(payload, null, null);
+        Message<?> message = MessageBuilder.withPayload(payload).build();
         return syncSend(destination, message, timeout);
     }
 
@@ -245,14 +248,14 @@
             log.error("syncSendOrderly failed. destination:{}, message is null ", destination);
             throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
         }
-
         try {
             long now = System.currentTimeMillis();
-            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
-                charset, destination, message);
+            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
             SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout);
             long costTime = System.currentTimeMillis() - now;
-            log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
+            if (log.isDebugEnabled()) {
+                log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
+            }
             return sendResult;
         } catch (Exception e) {
             log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message);
@@ -282,11 +285,13 @@
      * @return {@link SendResult}
      */
     public SendResult syncSendOrderly(String destination, Object payload, String hashKey, long timeout) {
-        Message<?> message = this.doConvert(payload, null, null);
+        Message<?> message = MessageBuilder.withPayload(payload).build();
         return syncSendOrderly(destination, message, hashKey, timeout);
     }
+
     /**
-     * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout and delay level specified in addition.
+     * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout and delay level specified in
+     * addition.
      *
      * @param destination  formats: `topicName:tags`
      * @param message      {@link org.springframework.messaging.Message}
@@ -294,15 +299,14 @@
      * @param timeout      send timeout with millis
      * @param delayLevel   level for the delay message
      */
-    public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel) {
+    public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout,
+        int delayLevel) {
         if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
             log.error("asyncSend failed. destination:{}, message is null ", destination);
             throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
         }
-
         try {
-            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
-                charset, destination, message);
+            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
             if (delayLevel > 0) {
                 rocketMsg.setDelayTimeLevel(delayLevel);
             }
@@ -312,6 +316,7 @@
             throw new MessagingException(e.getMessage(), e);
         }
     }
+
     /**
      * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout specified in addition.
      *
@@ -321,12 +326,12 @@
      * @param timeout      send timeout with millis
      */
     public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout) {
-        asyncSend(destination,message,sendCallback,timeout,0);
+        asyncSend(destination, message, sendCallback, timeout, 0);
     }
 
     /**
-     * <p> Send message to broker asynchronously. asynchronous transmission is generally used in response time sensitive
-     * business scenarios. </p>
+     * <p> Send message to broker asynchronously. asynchronous transmission is generally used in response time
+     * sensitive business scenarios. </p>
      * <p>
      * This method returns immediately. On sending completion, <code>sendCallback</code> will be executed.
      * <p>
@@ -351,7 +356,7 @@
      * @param timeout      send timeout with millis
      */
     public void asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout) {
-        Message<?> message = this.doConvert(payload, null, null);
+        Message<?> message = MessageBuilder.withPayload(payload).build();
         asyncSend(destination, message, sendCallback, timeout);
     }
 
@@ -377,15 +382,13 @@
      * @param timeout      send timeout with millis
      */
     public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback,
-                                 long timeout) {
+        long timeout) {
         if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
             log.error("asyncSendOrderly failed. destination:{}, message is null ", destination);
             throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
         }
-
         try {
-            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
-                charset, destination, message);
+            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
             producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout);
         } catch (Exception e) {
             log.error("asyncSendOrderly failed. destination:{}, message:{} ", destination, message);
@@ -427,8 +430,8 @@
      * @param timeout      send timeout with millis
      */
     public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback,
-                                 long timeout) {
-        Message<?> message = this.doConvert(payload, null, null);
+        long timeout) {
+        Message<?> message = MessageBuilder.withPayload(payload).build();
         asyncSendOrderly(destination, message, hashKey, sendCallback, timeout);
     }
 
@@ -446,10 +449,8 @@
             log.error("sendOneWay failed. destination:{}, message is null ", destination);
             throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
         }
-
         try {
-            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
-                charset, destination, message);
+            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
             producer.sendOneway(rocketMsg);
         } catch (Exception e) {
             log.error("sendOneWay failed. destination:{}, message:{} ", destination, message);
@@ -464,7 +465,7 @@
      * @param payload     the Object to use as payload
      */
     public void sendOneWay(String destination, Object payload) {
-        Message<?> message = this.doConvert(payload, null, null);
+        Message<?> message = MessageBuilder.withPayload(payload).build();
         sendOneWay(destination, message);
     }
 
@@ -480,10 +481,8 @@
             log.error("sendOneWayOrderly failed. destination:{}, message is null ", destination);
             throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
         }
-
         try {
-            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
-                charset, destination, message);
+            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
             producer.sendOneway(rocketMsg, messageQueueSelector, hashKey);
         } catch (Exception e) {
             log.error("sendOneWayOrderly failed. destination:{}, message:{}", destination, message);
@@ -498,7 +497,7 @@
      * @param payload     the Object to use as payload
      */
     public void sendOneWayOrderly(String destination, Object payload, String hashKey) {
-        Message<?> message = this.doConvert(payload, null, null);
+        Message<?> message = MessageBuilder.withPayload(payload).build();
         sendOneWayOrderly(destination, message, hashKey);
     }
 
@@ -512,37 +511,17 @@
     @Override
     protected void doSend(String destination, Message<?> message) {
         SendResult sendResult = syncSend(destination, message);
-        log.debug("send message to `{}` finished. result:{}", destination, sendResult);
+        if (log.isDebugEnabled()) {
+            log.debug("send message to `{}` finished. result:{}", destination, sendResult);
+        }
     }
 
-
-
     @Override
     protected Message<?> doConvert(Object payload, Map<String, Object> headers, MessagePostProcessor postProcessor) {
-        String content;
-        if (payload instanceof String) {
-            content = (String) payload;
-        } else {
-            // If payload not as string, use objectMapper change it.
-            try {
-                content = objectMapper.writeValueAsString(payload);
-            } catch (JsonProcessingException e) {
-                log.error("convert payload to String failed. payload:{}", payload);
-                throw new RuntimeException("convert to payload to String failed.", e);
-            }
-        }
-
-        MessageBuilder<?> builder = MessageBuilder.withPayload(content);
-        if (headers != null) {
-            builder.copyHeaders(headers);
-        }
+        Message<?> message = super.doConvert(payload, headers, postProcessor);
+        MessageBuilder<?> builder = MessageBuilder.fromMessage(message);
         builder.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN);
-
-        Message<?> message = builder.build();
-        if (postProcessor != null) {
-            message = postProcessor.postProcessMessage(message);
-        }
-        return message;
+        return builder.build();
     }
 
     @Override
@@ -550,7 +529,6 @@
         if (Objects.nonNull(producer)) {
             producer.shutdown();
         }
-
         for (Map.Entry<String, TransactionMQProducer> kv : cache.entrySet()) {
             if (Objects.nonNull(kv.getValue())) {
                 kv.getValue().shutdown();
@@ -585,11 +563,11 @@
      * @return TransactionSendResult
      * @throws MessagingException
      */
-    public TransactionSendResult sendMessageInTransaction(final String txProducerGroup, final String destination, final Message<?> message, final Object arg) throws MessagingException {
+    public TransactionSendResult sendMessageInTransaction(final String txProducerGroup, final String destination,
+        final Message<?> message, final Object arg) throws MessagingException {
         try {
             TransactionMQProducer txProducer = this.stageMQProducer(txProducerGroup);
-            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
-                charset, destination, message);
+            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
             return txProducer.sendMessageInTransaction(rocketMsg, arg);
         } catch (MQClientException e) {
             throw RocketMQUtil.convert(e);
@@ -598,8 +576,8 @@
 
     /**
      * Remove a TransactionMQProducer from cache by manual.
-     * <p>Note: RocketMQTemplate can release all cached producers when bean destroying, it is not recommended to directly
-     * use this method by user.
+     * <p>Note: RocketMQTemplate can release all cached producers when bean destroying, it is not recommended to
+     * directly use this method by user.
      *
      * @param txProducerGroup
      * @throws MessagingException
@@ -621,13 +599,13 @@
      * @param txProducerGroup     Producer (group) name, unique for each producer
      * @param transactionListener TransactoinListener impl class
      * @param executorService     Nullable.
-     * @param rpcHook Nullable.
+     * @param rpcHook             Nullable.
      * @return true if producer is created and started; false if the named producer already exists in cache.
      * @throws MessagingException
      */
     public boolean createAndStartTransactionMQProducer(String txProducerGroup,
-                                                       RocketMQLocalTransactionListener transactionListener,
-                                                       ExecutorService executorService, RPCHook rpcHook) throws MessagingException {
+        RocketMQLocalTransactionListener transactionListener,
+        ExecutorService executorService, RPCHook rpcHook) throws MessagingException {
         txProducerGroup = getTxProducerGroupName(txProducerGroup);
         if (cache.containsKey(txProducerGroup)) {
             log.info(String.format("get TransactionMQProducer '%s' from cache", txProducerGroup));
@@ -646,8 +624,8 @@
     }
 
     private TransactionMQProducer createTransactionMQProducer(String name,
-                                                              RocketMQLocalTransactionListener transactionListener,
-                                                              ExecutorService executorService, RPCHook rpcHook) {
+        RocketMQLocalTransactionListener transactionListener,
+        ExecutorService executorService, RPCHook rpcHook) {
         Assert.notNull(producer, "Property 'producer' is required");
         Assert.notNull(transactionListener, "Parameter 'transactionListener' is required");
         TransactionMQProducer txProducer;
@@ -675,4 +653,12 @@
 
         return txProducer;
     }
+
+    private org.apache.rocketmq.common.message.Message createRocketMqMessage(
+        String destination, Message<?> message) {
+        Message<?> msg = this.doConvert(message.getPayload(), message.getHeaders(), null);
+        return RocketMQUtil.convertToRocketMessage(getMessageConverter(), charset,
+            destination, msg);
+    }
+
 }
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
index 497d94b..603ce66 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
@@ -17,7 +17,12 @@
 
 package org.apache.rocketmq.spring.support;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Objects;
+
 import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.MessageSelector;
@@ -45,13 +50,11 @@
 import org.springframework.context.ApplicationContext;
 import org.springframework.context.ApplicationContextAware;
 import org.springframework.context.SmartLifecycle;
+import org.springframework.messaging.converter.MessageConverter;
+import org.springframework.messaging.support.MessageBuilder;
 import org.springframework.util.Assert;
 
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.nio.charset.Charset;
-import java.util.List;
-import java.util.Objects;
+import com.fasterxml.jackson.databind.ObjectMapper;
 
 @SuppressWarnings("WeakerAccess")
 public class DefaultRocketMQListenerContainer implements InitializingBean,
@@ -85,8 +88,11 @@
 
     private String charset = "UTF-8";
 
+    @Deprecated
     private ObjectMapper objectMapper;
 
+    private MessageConverter messageConverter;
+
     private RocketMQListener rocketMQListener;
 
     private RocketMQMessageListener rocketMQMessageListener;
@@ -164,14 +170,24 @@
         this.charset = charset;
     }
 
+    @Deprecated
     public ObjectMapper getObjectMapper() {
         return objectMapper;
     }
 
+    @Deprecated
     public void setObjectMapper(ObjectMapper objectMapper) {
         this.objectMapper = objectMapper;
     }
 
+    public MessageConverter getMessageConverter() {
+        return messageConverter;
+    }
+
+    public DefaultRocketMQListenerContainer setMessageConverter(MessageConverter messageConverter) {
+        this.messageConverter = messageConverter;
+        return this;
+    }
 
     public RocketMQListener getRocketMQListener() {
         return rocketMQListener;
@@ -380,7 +396,7 @@
             } else {
                 // If msgType not string, use objectMapper change it.
                 try {
-                    return objectMapper.readValue(str, messageType);
+                    return this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), messageType);
                 } catch (Exception e) {
                     log.info("convert failed. str:{}, msgType:{}", str, messageType);
                     throw new RuntimeException("cannot convert message to " + messageType, e);
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageConverter.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageConverter.java
new file mode 100644
index 0000000..c2ee1da
--- /dev/null
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageConverter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.spring.support;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+
+import org.springframework.messaging.converter.ByteArrayMessageConverter;
+import org.springframework.messaging.converter.CompositeMessageConverter;
+import org.springframework.messaging.converter.MappingJackson2MessageConverter;
+import org.springframework.messaging.converter.MessageConverter;
+import org.springframework.messaging.converter.StringMessageConverter;
+import org.springframework.util.ClassUtils;
+
+/**
+ * @see MessageConverter
+ * @see CompositeMessageConverter
+ */
+public class RocketMQMessageConverter {
+
+    private static final boolean JACKSON_PRESENT;
+    private static final boolean FASTJSON_PRESENT;
+
+    static {
+        ClassLoader classLoader = RocketMQMessageConverter.class.getClassLoader();
+        JACKSON_PRESENT =
+            ClassUtils.isPresent("com.fasterxml.jackson.databind.ObjectMapper", classLoader) &&
+                ClassUtils.isPresent("com.fasterxml.jackson.core.JsonGenerator", classLoader);
+        FASTJSON_PRESENT = ClassUtils.isPresent("com.alibaba.fastjson.JSON", classLoader) &&
+            ClassUtils.isPresent("com.alibaba.fastjson.support.config.FastJsonConfig", classLoader);
+    }
+
+    private final CompositeMessageConverter messageConverter;
+
+    public RocketMQMessageConverter() {
+        List<MessageConverter> messageConverters = new ArrayList<>();
+        ByteArrayMessageConverter byteArrayMessageConverter = new ByteArrayMessageConverter();
+        byteArrayMessageConverter.setContentTypeResolver(null);
+        messageConverters.add(byteArrayMessageConverter);
+        messageConverters.add(new StringMessageConverter());
+        if (JACKSON_PRESENT) {
+            messageConverters.add(new MappingJackson2MessageConverter());
+        }
+        if (FASTJSON_PRESENT) {
+            try {
+                messageConverters.add(
+                    (MessageConverter)ClassUtils.forName(
+                        "com.alibaba.fastjson.support.spring.messaging.MappingFastJsonMessageConverter",
+                        ClassUtils.getDefaultClassLoader()).newInstance());
+            } catch (ClassNotFoundException | IllegalAccessException | InstantiationException ignored) {
+                //ignore this exception
+            }
+        }
+        messageConverter = new CompositeMessageConverter(messageConverters);
+    }
+
+    public MessageConverter getMessageConverter() {
+        return messageConverter;
+    }
+
+    public MessageConverter resetMessageConverter(
+        Collection<MessageConverter> converters) {
+        if (messageConverter.getConverters() != null) {
+            messageConverter.getConverters().clear();
+        }
+        Objects.requireNonNull(messageConverter.getConverters()).addAll(converters);
+        return messageConverter;
+    }
+
+    public MessageConverter addMessageConverter(MessageConverter converter) {
+        if (messageConverter.getConverters() != null && converter != null) {
+            messageConverter.getConverters().add(converter);
+        }
+        return messageConverter;
+    }
+
+}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
index f4deb45..4c731f2 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java
@@ -34,6 +34,7 @@
 import org.springframework.core.env.Environment;
 import org.springframework.messaging.MessageHeaders;
 import org.springframework.messaging.MessagingException;
+import org.springframework.messaging.converter.MessageConverter;
 import org.springframework.messaging.support.MessageBuilder;
 import org.springframework.util.CollectionUtils;
 import org.springframework.util.StringUtils;
@@ -106,8 +107,8 @@
         if (!CollectionUtils.isEmpty(properties)) {
             properties.forEach((key, val) -> {
                 if (!MessageConst.STRING_HASH_SET.contains(key) && !MessageHeaders.ID.equals(key)
-                        && !MessageHeaders.TIMESTAMP.equals(key) &&
-                        (!key.startsWith(RocketMQHeaders.PREFIX) || !MessageConst.STRING_HASH_SET.contains(key.replaceFirst("^" + RocketMQHeaders.PREFIX, "")))) {
+                    && !MessageHeaders.TIMESTAMP.equals(key) &&
+                    (!key.startsWith(RocketMQHeaders.PREFIX) || !MessageConst.STRING_HASH_SET.contains(key.replaceFirst("^" + RocketMQHeaders.PREFIX, "")))) {
                     messageBuilder.setHeader(key, val);
                 }
             });
@@ -127,6 +128,7 @@
         return messageBuilder.build();
     }
 
+    @Deprecated
     public static org.apache.rocketmq.common.message.Message convertToRocketMessage(
         ObjectMapper objectMapper, String charset,
         String destination, org.springframework.messaging.Message message) {
@@ -134,9 +136,9 @@
         byte[] payloads;
 
         if (payloadObj instanceof String) {
-            payloads = ((String) payloadObj).getBytes(Charset.forName(charset));
+            payloads = ((String)payloadObj).getBytes(Charset.forName(charset));
         } else if (payloadObj instanceof byte[]) {
-            payloads = (byte[]) message.getPayload();
+            payloads = (byte[])message.getPayload();
         } else {
             try {
                 String jsonObj = objectMapper.writeValueAsString(payloadObj);
@@ -145,37 +147,46 @@
                 throw new RuntimeException("convert to RocketMQ message failed.", e);
             }
         }
+        return getAndWrapMessage(destination, message.getHeaders(), payloads);
+    }
 
+    public static org.apache.rocketmq.common.message.Message convertToRocketMessage(
+        String destination, org.springframework.messaging.Message<byte[]> message) {
+        return getAndWrapMessage(destination, message.getHeaders(), message.getPayload());
+    }
+
+    private static Message getAndWrapMessage(String destination, MessageHeaders headers, byte[] payloads) {
+        if (destination == null || destination.length() < 1) {
+            return null;
+        }
+        if (payloads == null || payloads.length < 1) {
+            return null;
+        }
         String[] tempArr = destination.split(":", 2);
         String topic = tempArr[0];
         String tags = "";
         if (tempArr.length > 1) {
             tags = tempArr[1];
         }
-
-        org.apache.rocketmq.common.message.Message rocketMsg = new org.apache.rocketmq.common.message.Message(topic, tags, payloads);
-
-        MessageHeaders headers = message.getHeaders();
+        Message rocketMsg = new Message(topic, tags, payloads);
         if (Objects.nonNull(headers) && !headers.isEmpty()) {
             Object keys = headers.get(RocketMQHeaders.KEYS);
             if (!StringUtils.isEmpty(keys)) { // if headers has 'KEYS', set rocketMQ message key
                 rocketMsg.setKeys(keys.toString());
             }
-
             Object flagObj = headers.getOrDefault("FLAG", "0");
             int flag = 0;
             try {
                 flag = Integer.parseInt(flagObj.toString());
             } catch (NumberFormatException e) {
                 // Ignore it
-                log.info("flag must be integer, flagObj:{}", flagObj);
+                if (log.isInfoEnabled()) {
+                    log.info("flag must be integer, flagObj:{}", flagObj);
+                }
             }
             rocketMsg.setFlag(flag);
-
             Object waitStoreMsgOkObj = headers.getOrDefault("WAIT_STORE_MSG_OK", "true");
-            boolean waitStoreMsgOK = Boolean.TRUE.equals(waitStoreMsgOkObj);
-            rocketMsg.setWaitStoreMsgOK(waitStoreMsgOK);
-
+            rocketMsg.setWaitStoreMsgOK(Boolean.TRUE.equals(waitStoreMsgOkObj));
             headers.entrySet().stream()
                 .filter(entry -> !Objects.equals(entry.getKey(), "FLAG")
                     && !Objects.equals(entry.getKey(), "WAIT_STORE_MSG_OK")) // exclude "FLAG", "WAIT_STORE_MSG_OK"
@@ -186,10 +197,37 @@
                 });
 
         }
-
         return rocketMsg;
     }
 
+    public static org.apache.rocketmq.common.message.Message convertToRocketMessage(
+        MessageConverter messageConverter, String charset,
+        String destination, org.springframework.messaging.Message<?> message) {
+        Object payloadObj = message.getPayload();
+        byte[] payloads;
+        try {
+            if (null == payloadObj) {
+                throw new RuntimeException("the message cannot be empty");
+            }
+            if (payloadObj instanceof String) {
+                payloads = ((String)payloadObj).getBytes(Charset.forName(charset));
+            } else if (payloadObj instanceof byte[]) {
+                payloads = (byte[])message.getPayload();
+            } else {
+                String jsonObj = (String)messageConverter.fromMessage(message, payloadObj.getClass());
+                if (null == jsonObj) {
+                    throw new RuntimeException(String.format(
+                        "empty after conversion [messageConverter:%s,payloadClass:%s,payloadObj:%s]",
+                        messageConverter.getClass(), payloadObj.getClass(), payloadObj));
+                }
+                payloads = jsonObj.getBytes(Charset.forName(charset));
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("convert to RocketMQ message failed.", e);
+        }
+        return getAndWrapMessage(destination, message.getHeaders(), payloads);
+    }
+
     public static RPCHook getRPCHookByAkSk(Environment env, String accessKeyOrExpr, String secretKeyOrExpr) {
         String ak, sk;
         try {
@@ -209,7 +247,7 @@
     public static String getInstanceName(RPCHook rpcHook, String identify) {
         String separator = "|";
         StringBuilder instanceName = new StringBuilder();
-        SessionCredentials sessionCredentials = ((AclClientRPCHook) rpcHook).getSessionCredentials();
+        SessionCredentials sessionCredentials = ((AclClientRPCHook)rpcHook).getSessionCredentials();
         instanceName.append(sessionCredentials.getAccessKey())
             .append(separator).append(sessionCredentials.getSecretKey())
             .append(separator).append(identify)
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
index 8f2a5ab..b4c7437 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
@@ -29,6 +29,7 @@
 import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
 import org.apache.rocketmq.spring.core.RocketMQTemplate;
 import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
+import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
 import org.junit.Assert;
 import org.junit.Test;
 import org.springframework.beans.factory.NoSuchBeanDefinitionException;
@@ -117,8 +118,8 @@
                 withUserConfiguration(TestConfig.class, CustomObjectMapperConfig.class).
                 run((context) -> {
                     assertThat(context.getBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1",
-                            DefaultRocketMQListenerContainer.class).getObjectMapper())
-                            .isSameAs(context.getBean(CustomObjectMapperConfig.class).testObjectMapper());
+                            DefaultRocketMQListenerContainer.class).getMessageConverter())
+                            .isSameAs(context.getBean(CustomObjectMapperConfig.class).rocketMQMessageConverter().getMessageConverter());
                 });
     }
 
@@ -128,8 +129,8 @@
                 withUserConfiguration(TestConfig.class, CustomObjectMappersConfig.class).
                 run((context) -> {
                     assertThat(context.getBean("org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1",
-                            DefaultRocketMQListenerContainer.class).getObjectMapper())
-                            .isSameAs(context.getBean(CustomObjectMappersConfig.class).rocketMQMessageObjectMapper());
+                            DefaultRocketMQListenerContainer.class).getMessageConverter())
+                            .isSameAs(context.getBean(CustomObjectMappersConfig.class).rocketMQMessageConverter().getMessageConverter());
                 });
     }
 
@@ -246,6 +247,11 @@
         public ObjectMapper testObjectMapper() {
             return new ObjectMapper();
         }
+        @Bean
+        public RocketMQMessageConverter rocketMQMessageConverter() {
+            return new RocketMQMessageConverter();
+        }
+
 
     }
 
@@ -261,6 +267,10 @@
         public ObjectMapper rocketMQMessageObjectMapper() {
             return new ObjectMapper();
         }
+        @Bean
+        public RocketMQMessageConverter rocketMQMessageConverter() {
+            return new RocketMQMessageConverter();
+        }
 
     }