[ISSUE #208]support request/reply model in rocketmq-spring (#209)

* support request/response model in rocketmq-spring

* fix checkstyle problem

* add more test cases

* optimize request/reply model

* add examples to ProduceApplication.java

* wrap RequestCallback to conceal RocketMQ message

* requestCallback as method parameter

* delete useless class

* fix some comments and print format
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ConsumerWithReplyBytes.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ConsumerWithReplyBytes.java
new file mode 100644
index 0000000..850fbd3
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ConsumerWithReplyBytes.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot.consumer;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQReplyListener;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.stereotype.Service;
+
+/**
+ * The consumer that replying bytes
+ */
+@Service
+@RocketMQMessageListener(topic = "${demo.rocketmq.bytesRequestTopic}", consumerGroup = "${demo.rocketmq.bytesRequestConsumer}", selectorExpression = "${demo.rocketmq.tag}")
+public class ConsumerWithReplyBytes implements RocketMQReplyListener<MessageExt, byte[]> {
+
+    @Override
+    public byte[] onMessage(MessageExt message) {
+        System.out.printf("------- ConsumerWithReplyBytes received: %s \n", message);
+        return "reply message content".getBytes();
+    }
+}
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ConsumerWithReplyGeneric.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ConsumerWithReplyGeneric.java
new file mode 100644
index 0000000..e17e675
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ConsumerWithReplyGeneric.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot.consumer;
+
+import org.apache.rocketmq.samples.springboot.domain.ProductWithPayload;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQReplyListener;
+import org.springframework.stereotype.Service;
+
+/**
+ * The consumer that replying generic type
+ */
+@Service
+@RocketMQMessageListener(topic = "${demo.rocketmq.genericRequestTopic}", consumerGroup = "${demo.rocketmq.genericRequestConsumer}", selectorExpression = "${demo.rocketmq.tag}")
+public class ConsumerWithReplyGeneric implements RocketMQReplyListener<String, ProductWithPayload<String>> {
+    @Override
+    public ProductWithPayload<String> onMessage(String message) {
+        System.out.printf("------- ConsumerWithReplyGeneric received: %s \n", message);
+        return new ProductWithPayload<String>("replyProductName", "product payload");
+    }
+}
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ObjectConsumerWithReplyUser.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ObjectConsumerWithReplyUser.java
new file mode 100644
index 0000000..f66b003
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ObjectConsumerWithReplyUser.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot.consumer;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.samples.springboot.domain.User;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQReplyListener;
+import org.springframework.stereotype.Service;
+
+/**
+ * The consumer that replying Object
+ */
+@Service
+@RocketMQMessageListener(topic = "${demo.rocketmq.objectRequestTopic}", consumerGroup = "${demo.rocketmq.objectRequestConsumer}", selectorExpression = "${demo.rocketmq.tag}")
+public class ObjectConsumerWithReplyUser implements RocketMQReplyListener<User, User> {
+
+    @Override
+    public User onMessage(User user) {
+        System.out.printf("------- ObjectConsumerWithReplyUser received: %s \n", user);
+        User replyUser = new User();
+        replyUser.setUserAge((byte) 10);
+        replyUser.setUserName("replyUserName");
+        return replyUser;
+    }
+}
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumer.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumer.java
index 3801d48..11ac489 100644
--- a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumer.java
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumer.java
@@ -22,7 +22,7 @@
 import org.springframework.stereotype.Service;
 
 /**
- * RocketMQMessageListener
+ * StringConsumer
  */
 @Service
 @RocketMQMessageListener(topic = "${demo.rocketmq.topic}", consumerGroup = "string_consumer", selectorExpression = "${demo.rocketmq.tag}")
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumerWithReplyString.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumerWithReplyString.java
new file mode 100644
index 0000000..b194bc6
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumerWithReplyString.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot.consumer;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQReplyListener;
+import org.springframework.stereotype.Service;
+
+/**
+ * The consumer that replying String
+ */
+@Service
+@RocketMQMessageListener(topic = "${demo.rocketmq.stringRequestTopic}", consumerGroup = "${demo.rocketmq.stringRequestConsumer}", selectorExpression = "${demo.rocketmq.tag}")
+public class StringConsumerWithReplyString implements RocketMQReplyListener<String, String> {
+
+    @Override
+    public String onMessage(String message) {
+        System.out.printf("------- StringConsumerWithReplyString received: %s \n", message);
+        return "reply string";
+    }
+}
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/ProductWithPayload.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/ProductWithPayload.java
new file mode 100644
index 0000000..e241929
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/ProductWithPayload.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot.domain;
+
+public class ProductWithPayload<T> {
+    private String productName;
+    private T payload;
+
+    public ProductWithPayload() {
+    }
+
+    public ProductWithPayload(String productName, T payload) {
+        this.productName = productName;
+        this.payload = payload;
+    }
+
+    public String getProductName() {
+        return productName;
+    }
+
+    public void setProductName(String productName) {
+        this.productName = productName;
+    }
+
+    public T getPayload() {
+        return payload;
+    }
+
+    public void setPayload(T payload) {
+        this.payload = payload;
+    }
+
+    @Override public String toString() {
+        return "ProductWithPayload{" +
+            "productName='" + productName + '\'' +
+            ", payload=" + payload +
+            '}';
+    }
+}
\ No newline at end of file
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties
index b2b2690..5be0358 100644
--- a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties
@@ -1,9 +1,16 @@
 spring.application.name=rocketmq-consume-demo
 
 rocketmq.name-server=localhost:9876
-
 # properties used in application code
 demo.rocketmq.topic=string-topic
+demo.rocketmq.bytesRequestTopic=bytesRequestTopic
+demo.rocketmq.stringRequestTopic=stringRequestTopic
+demo.rocketmq.objectRequestTopic=objectRequestTopic
+demo.rocketmq.genericRequestTopic=genericRequestTopic
+demo.rocketmq.bytesRequestConsumer=bytesRequestConsumer
+demo.rocketmq.stringRequestConsumer=stringRequestConsumer
+demo.rocketmq.objectRequestConsumer=objectRequestConsumer
+demo.rocketmq.genericRequestConsumer=genericRequestConsumer
 demo.rocketmq.orderTopic=order-paid-topic
 demo.rocketmq.msgExtTopic=message-ext-topic
 demo.rocketmq.transTopic=spring-transaction-topic
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
index d914bb5..46ac5ab 100644
--- a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.samples.springboot;
 
+import com.alibaba.fastjson.TypeReference;
 import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.List;
@@ -26,8 +27,10 @@
 import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.samples.springboot.domain.OrderPaidEvent;
+import org.apache.rocketmq.samples.springboot.domain.ProductWithPayload;
 import org.apache.rocketmq.samples.springboot.domain.User;
 import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
+import org.apache.rocketmq.spring.core.RocketMQLocalRequestCallback;
 import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
 import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
 import org.apache.rocketmq.spring.core.RocketMQTemplate;
@@ -60,6 +63,15 @@
     private String orderPaidTopic;
     @Value("${demo.rocketmq.msgExtTopic}")
     private String msgExtTopic;
+    @Value("${demo.rocketmq.stringRequestTopic}")
+    private String stringRequestTopic;
+    @Value("${demo.rocketmq.bytesRequestTopic}")
+    private String bytesRequestTopic;
+    @Value("${demo.rocketmq.objectRequestTopic}")
+    private String objectRequestTopic;
+    @Value("${demo.rocketmq.genericRequestTopic}")
+    private String genericRequestTopic;
+
     @Resource(name = "extRocketMQTemplate")
     private RocketMQTemplate extRocketMQTemplate;
 
@@ -116,6 +128,45 @@
 
         // Send transactional messages using extRocketMQTemplate
         testExtRocketMQTemplateTransaction();
+
+        // Send request in sync mode and receive a reply of String type.
+        String replyString = rocketMQTemplate.sendAndReceive(stringRequestTopic, "request string", String.class);
+        System.out.printf("send %s and receive %s %n", "request string", replyString);
+
+        // Send request in sync mode with timeout parameter and receive a reply of byte[] type.
+        byte[] replyBytes = rocketMQTemplate.sendAndReceive(bytesRequestTopic, MessageBuilder.withPayload("request byte[]").build(), byte[].class, 3000);
+        System.out.printf("send %s and receive %s %n", "request byte[]", new String(replyBytes));
+
+        // Send request in sync mode with hashKey parameter and receive a reply of User type.
+        User requestUser = new User().setUserAge((byte) 9).setUserName("requestUserName");
+        User replyUser = rocketMQTemplate.sendAndReceive(objectRequestTopic, requestUser, User.class, "order-id");
+        System.out.printf("send %s and receive %s %n", requestUser, replyUser);
+        // Send request in sync mode with timeout and delayLevel parameter parameter and receive a reply of generic type.
+        ProductWithPayload<String> replyGenericObject = rocketMQTemplate.sendAndReceive(genericRequestTopic, "request generic",
+            new TypeReference<ProductWithPayload<String>>() {
+            }.getType(), 30000, 2);
+        System.out.printf("send %s and receive %s %n", "request generic", replyGenericObject);
+
+        // Send request in async mode and receive a reply of String type.
+        rocketMQTemplate.sendAndReceive(stringRequestTopic, "request string", new RocketMQLocalRequestCallback<String>() {
+            @Override public void onSuccess(String message) {
+                System.out.printf("send %s and receive %s %n", "request string", message);
+            }
+
+            @Override public void onException(Throwable e) {
+                e.printStackTrace();
+            }
+        });
+        // Send request in async mode and receive a reply of User type.
+        rocketMQTemplate.sendAndReceive(objectRequestTopic, new User().setUserAge((byte) 9).setUserName("requestUserName"), new RocketMQLocalRequestCallback<User>() {
+            @Override public void onSuccess(User message) {
+                System.out.printf("send user object and receive %s %n", message.toString());
+            }
+
+            @Override public void onException(Throwable e) {
+                e.printStackTrace();
+            }
+        }, 5000);
     }
 
     private void testBatchMessages() {
@@ -237,5 +288,4 @@
             return RocketMQLocalTransactionState.COMMIT;
         }
     }
-
 }
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/ProductWithPayload.java b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/ProductWithPayload.java
new file mode 100644
index 0000000..e241929
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/ProductWithPayload.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot.domain;
+
+public class ProductWithPayload<T> {
+    private String productName;
+    private T payload;
+
+    public ProductWithPayload() {
+    }
+
+    public ProductWithPayload(String productName, T payload) {
+        this.productName = productName;
+        this.payload = payload;
+    }
+
+    public String getProductName() {
+        return productName;
+    }
+
+    public void setProductName(String productName) {
+        this.productName = productName;
+    }
+
+    public T getPayload() {
+        return payload;
+    }
+
+    public void setPayload(T payload) {
+        this.payload = payload;
+    }
+
+    @Override public String toString() {
+        return "ProductWithPayload{" +
+            "productName='" + productName + '\'' +
+            ", payload=" + payload +
+            '}';
+    }
+}
\ No newline at end of file
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/User.java b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/User.java
index 42acc11..4f2579f 100644
--- a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/User.java
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/User.java
@@ -18,32 +18,32 @@
 package org.apache.rocketmq.samples.springboot.domain;
 
 public class User {
-        private String userName;
-        private Byte userAge;
+    private String userName;
+    private Byte userAge;
 
-        public String getUserName() {
-            return userName;
-        }
+    public String getUserName() {
+        return userName;
+    }
 
-        public User setUserName(String userName) {
-            this.userName = userName;
-            return this;
-        }
+    public User setUserName(String userName) {
+        this.userName = userName;
+        return this;
+    }
 
-        public Byte getUserAge() {
-            return userAge;
-        }
+    public Byte getUserAge() {
+        return userAge;
+    }
 
-        public User setUserAge(Byte userAge) {
-            this.userAge = userAge;
-            return this;
-        }
+    public User setUserAge(Byte userAge) {
+        this.userAge = userAge;
+        return this;
+    }
 
-        @Override
-        public String toString() {
-            return "User{" +
-                "userName='" + userName + '\'' +
-                ", userAge=" + userAge +
-                '}';
-        }
-    }
\ No newline at end of file
+    @Override
+    public String toString() {
+        return "User{" +
+            "userName='" + userName + '\'' +
+            ", userAge=" + userAge +
+            '}';
+    }
+}
\ No newline at end of file
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/application.properties b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/application.properties
index 27a3abc..c68ac3a 100644
--- a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/application.properties
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/application.properties
@@ -9,4 +9,9 @@
 demo.rocketmq.transTopic=spring-transaction-topic
 demo.rocketmq.topic.user=user-topic
 
+demo.rocketmq.bytesRequestTopic=bytesRequestTopic:tagA
+demo.rocketmq.stringRequestTopic=stringRequestTopic:tagA
+demo.rocketmq.objectRequestTopic=objectRequestTopic:tagA
+demo.rocketmq.genericRequestTopic=genericRequestTopic:tagA
+
 demo.rocketmq.extNameServer=127.0.0.1:9876
\ No newline at end of file
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQTransactionListener.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQTransactionListener.java
index dac280f..0faa57a 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQTransactionListener.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQTransactionListener.java
@@ -26,7 +26,8 @@
 
 /**
  * This annotation is used over a class which implements interface
- * org.apache.rocketmq.client.producer.TransactionListener. The class implements
+ * org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener, which will be converted to
+ * org.apache.rocketmq.client.producer.TransactionListener later. The class implements
  * two methods for process callback events after the txProducer sends a transactional message.
  * <p>Note: The annotation is used only on RocketMQ client producer side, it can not be used
  * on consumer side.
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
index 7bda36c..699474d 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
@@ -26,6 +26,7 @@
 import org.apache.rocketmq.spring.annotation.MessageModel;
 import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
 import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.apache.rocketmq.spring.core.RocketMQReplyListener;
 import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
 import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
 import org.slf4j.Logger;
@@ -65,7 +66,7 @@
 
     @Override
     public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
-        this.applicationContext = (ConfigurableApplicationContext)applicationContext;
+        this.applicationContext = (ConfigurableApplicationContext) applicationContext;
     }
 
     @Override
@@ -80,8 +81,12 @@
     private void registerContainer(String beanName, Object bean) {
         Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
 
-        if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {
-            throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName());
+        if (RocketMQListener.class.isAssignableFrom(bean.getClass()) && RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
+            throw new IllegalStateException(clazz + " cannot be both instance of " + RocketMQListener.class.getName() + " and " + RocketMQReplyListener.class.getName());
+        }
+
+        if (!RocketMQListener.class.isAssignableFrom(bean.getClass()) && !RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
+            throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName() + " or " + RocketMQReplyListener.class.getName());
         }
 
         RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
@@ -90,7 +95,7 @@
         String topic = this.environment.resolvePlaceholders(annotation.topic());
 
         boolean listenerEnabled =
-            (boolean)rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
+            (boolean) rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
                 .getOrDefault(topic, true);
 
         if (!listenerEnabled) {
@@ -103,7 +108,7 @@
 
         String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
             counter.incrementAndGet());
-        GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext;
+        GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
 
         genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
             () -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
@@ -124,9 +129,9 @@
     private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,
         RocketMQMessageListener annotation) {
         DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();
-        
+
         container.setRocketMQMessageListener(annotation);
-        
+
         String nameServer = environment.resolvePlaceholders(annotation.nameServer());
         nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer;
         String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());
@@ -140,7 +145,11 @@
             container.setSelectorExpression(tags);
         }
         container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
-        container.setRocketMQListener((RocketMQListener)bean);
+        if (RocketMQListener.class.isAssignableFrom(bean.getClass())) {
+            container.setRocketMQListener((RocketMQListener) bean);
+        } else if (RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
+            container.setRocketMQReplyListener((RocketMQReplyListener) bean);
+        }
         container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
         container.setName(name);  // REVIEW ME, use the same clientId or multiple?
 
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQLocalRequestCallback.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQLocalRequestCallback.java
new file mode 100644
index 0000000..56b15ff
--- /dev/null
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQLocalRequestCallback.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.spring.core;
+
+/**
+ * Classes implementing this interface are used for processing callback events after receiving
+ * reply messages from consumers.
+ *
+ * @param <T> the type of message that wanted to receive from consumer
+ */
+public interface RocketMQLocalRequestCallback<T> {
+    void onSuccess(final T message);
+
+    void onException(final Throwable e);
+}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQReplyListener.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQReplyListener.java
new file mode 100644
index 0000000..916368d
--- /dev/null
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQReplyListener.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.spring.core;
+
+/**
+ * The consumer supporting request-reply should implement this interface.
+ *
+ * @param <T> the type of data received by the listener
+ * @param <R> the type of data replying to producer
+ */
+public interface RocketMQReplyListener<T, R> {
+    /**
+     * @param message data received by the listener
+     * @return data replying to producer
+     */
+    R onMessage(T message);
+}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index c655696..089016a 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -17,6 +17,9 @@
 
 package org.apache.rocketmq.spring.core;
 
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Map;
@@ -24,19 +27,24 @@
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.client.producer.RequestCallback;
 import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.TransactionMQProducer;
 import org.apache.rocketmq.client.producer.TransactionSendResult;
 import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
 import org.apache.rocketmq.spring.support.RocketMQUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.aop.framework.AopProxyUtils;
 import org.springframework.beans.factory.DisposableBean;
 import org.springframework.beans.factory.InitializingBean;
 import org.springframework.messaging.Message;
 import org.springframework.messaging.MessageHeaders;
 import org.springframework.messaging.MessagingException;
+import org.springframework.messaging.converter.SmartMessageConverter;
 import org.springframework.messaging.core.AbstractMessageSendingTemplate;
 import org.springframework.messaging.core.MessagePostProcessor;
 import org.springframework.messaging.support.MessageBuilder;
@@ -52,6 +60,8 @@
 
     private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash();
 
+    private RocketMQMessageConverter rocketMQMessageConverter = new RocketMQMessageConverter();
+
     public DefaultMQProducer getProducer() {
         return producer;
     }
@@ -77,6 +87,356 @@
     }
 
     /**
+     * @param destination formats: `topicName:tags`
+     * @param message {@link org.springframework.messaging.Message} the message to be sent.
+     * @param type The type of T
+     * @return
+     */
+    public <T> T sendAndReceive(String destination, Message<?> message, Type type) {
+        return sendAndReceive(destination, message, type, null, producer.getSendMsgTimeout(), 0);
+    }
+
+    /**
+     * @param destination formats: `topicName:tags`
+     * @param payload the payload to be sent.
+     * @param type The type of T
+     * @return
+     */
+    public <T> T sendAndReceive(String destination, Object payload, Type type) {
+        return sendAndReceive(destination, payload, type, null, producer.getSendMsgTimeout(), 0);
+    }
+
+    /**
+     * @param destination formats: `topicName:tags`
+     * @param message {@link org.springframework.messaging.Message} the message to be sent.
+     * @param type The type of T
+     * @param timeout send timeout in millis
+     * @return
+     */
+    public <T> T sendAndReceive(String destination, Message<?> message, Type type, long timeout) {
+        return sendAndReceive(destination, message, type, null, timeout, 0);
+    }
+
+    /**
+     * @param destination formats: `topicName:tags`
+     * @param payload the payload to be sent.
+     * @param type The type of T
+     * @param timeout send timeout in millis
+     * @return
+     */
+    public <T> T sendAndReceive(String destination, Object payload, Type type, long timeout) {
+        return sendAndReceive(destination, payload, type, null, timeout, 0);
+    }
+
+    /**
+     * @param destination formats: `topicName:tags`
+     * @param message {@link org.springframework.messaging.Message} the message to be sent.
+     * @param type The type of T
+     * @param timeout send timeout in millis
+     * @param delayLevel message delay level(0 means no delay)
+     * @return
+     */
+    public <T> T sendAndReceive(String destination, Message<?> message, Type type, long timeout, int delayLevel) {
+        return sendAndReceive(destination, message, type, null, timeout, delayLevel);
+    }
+
+    /**
+     * @param destination formats: `topicName:tags`
+     * @param payload the payload to be sent.
+     * @param type The type of T
+     * @param timeout send timeout in millis
+     * @param delayLevel message delay level(0 means no delay)
+     * @return
+     */
+    public <T> T sendAndReceive(String destination, Object payload, Type type, long timeout, int delayLevel) {
+        return sendAndReceive(destination, payload, type, null, timeout, delayLevel);
+    }
+
+    /**
+     * @param destination formats: `topicName:tags`
+     * @param message {@link org.springframework.messaging.Message} the message to be sent.
+     * @param type The type of T
+     * @param hashKey needed when sending message orderly
+     * @return
+     */
+    public <T> T sendAndReceive(String destination, Message<?> message, Type type, String hashKey) {
+        return sendAndReceive(destination, message, type, hashKey, producer.getSendMsgTimeout(), 0);
+    }
+
+    /**
+     * @param destination formats: `topicName:tags`
+     * @param payload the payload to be sent.
+     * @param type The type of T
+     * @param hashKey needed when sending message orderly
+     * @return
+     */
+    public <T> T sendAndReceive(String destination, Object payload, Type type, String hashKey) {
+        return sendAndReceive(destination, payload, type, hashKey, producer.getSendMsgTimeout(), 0);
+    }
+
+    /**
+     * @param destination formats: `topicName:tags`
+     * @param message {@link org.springframework.messaging.Message} the message to be sent.
+     * @param type The type of T
+     * @param hashKey needed when sending message orderly
+     * @param timeout send timeout in millis
+     * @return
+     */
+    public <T> T sendAndReceive(String destination, Message<?> message, Type type, String hashKey, long timeout) {
+        return sendAndReceive(destination, message, type, hashKey, timeout, 0);
+    }
+
+    /**
+     * @param destination formats: `topicName:tags`
+     * @param payload the payload to be sent.
+     * @param type The type of T
+     * @param hashKey
+     * @return
+     */
+    public <T> T sendAndReceive(String destination, Object payload, Type type, String hashKey, long timeout) {
+        return sendAndReceive(destination, payload, type, hashKey, timeout, 0);
+    }
+
+    /**
+     * @param destination formats: `topicName:tags`
+     * @param message {@link org.springframework.messaging.Message} the message to be sent.
+     * @param type The type that receive
+     * @param hashKey needed when sending message orderly
+     * @param timeout send timeout in millis
+     * @param delayLevel message delay level(0 means no delay)
+     * @return
+     */
+    public <T> T sendAndReceive(String destination, Message<?> message, Type type, String hashKey,
+        long timeout, int delayLevel) {
+        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+            log.error("send request message failed. destination:{}, message is null ", destination);
+            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+        }
+
+        try {
+            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
+            if (delayLevel > 0) {
+                rocketMsg.setDelayTimeLevel(delayLevel);
+            }
+            MessageExt replyMessage;
+
+            if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
+                replyMessage = (MessageExt) producer.request(rocketMsg, timeout);
+            } else {
+                replyMessage = (MessageExt) producer.request(rocketMsg, messageQueueSelector, hashKey, timeout);
+            }
+            return replyMessage != null ? (T) doConvertMessage(replyMessage, type) : null;
+        } catch (Exception e) {
+            log.error("send request message failed. destination:{}, message:{} ", destination, message);
+            throw new MessagingException(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * @param destination formats: `topicName:tags`
+     * @param payload the payload to be sent.
+     * @param type The type that receive
+     * @param hashKey needed when sending message orderly
+     * @param timeout send timeout in millis
+     * @param delayLevel message delay level(0 means no delay)
+     * @return
+     */
+    public <T> T sendAndReceive(String destination, Object payload, Type type, String hashKey,
+        long timeout, int delayLevel) {
+        Message<?> message = MessageBuilder.withPayload(payload).build();
+        return sendAndReceive(destination, message, type, hashKey, timeout, delayLevel);
+    }
+
+    /**
+     * @param destination formats: `topicName:tags`
+     * @param message {@link org.springframework.messaging.Message} the message to be sent.
+     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
+     * @return
+     */
+    public void sendAndReceive(String destination, Message<?> message,
+        RocketMQLocalRequestCallback rocketMQLocalRequestCallback) {
+        sendAndReceive(destination, message, rocketMQLocalRequestCallback, null, producer.getSendMsgTimeout(), 0);
+    }
+
+    /**
+     * @param destination formats: `topicName:tags`
+     * @param payload the payload to be sent.
+     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
+     * @return
+     */
+    public void sendAndReceive(String destination, Object payload,
+        RocketMQLocalRequestCallback rocketMQLocalRequestCallback) {
+        sendAndReceive(destination, payload, rocketMQLocalRequestCallback, null, producer.getSendMsgTimeout(), 0);
+    }
+
+    /**
+     * @param destination formats: `topicName:tags`
+     * @param message {@link org.springframework.messaging.Message} the message to be sent.
+     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
+     * @param timeout send timeout in millis
+     * @return
+     */
+    public void sendAndReceive(String destination, Message<?> message,
+        RocketMQLocalRequestCallback rocketMQLocalRequestCallback, long timeout) {
+        sendAndReceive(destination, message, rocketMQLocalRequestCallback, null, timeout, 0);
+    }
+
+    /**
+     * @param destination formats: `topicName:tags`
+     * @param payload the payload to be sent.
+     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
+     * @param timeout send timeout in millis
+     * @return
+     */
+    public void sendAndReceive(String destination, Object payload,
+        RocketMQLocalRequestCallback rocketMQLocalRequestCallback, long timeout) {
+        sendAndReceive(destination, payload, rocketMQLocalRequestCallback, null, timeout, 0);
+    }
+
+    /**
+     * @param destination formats: `topicName:tags`
+     * @param message {@link org.springframework.messaging.Message} the message to be sent.
+     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
+     * @param timeout send timeout in millis
+     * @param delayLevel message delay level(0 means no delay)
+     * @return
+     */
+    public void sendAndReceive(String destination, Message<?> message,
+        RocketMQLocalRequestCallback rocketMQLocalRequestCallback, long timeout, int delayLevel) {
+        sendAndReceive(destination, message, rocketMQLocalRequestCallback, null, timeout, delayLevel);
+    }
+
+    /**
+     * @param destination formats: `topicName:tags`
+     * @param payload the payload to be sent.
+     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
+     * @param hashKey needed when sending message orderly
+     * @return
+     */
+    public void sendAndReceive(String destination, Object payload,
+        RocketMQLocalRequestCallback rocketMQLocalRequestCallback, String hashKey) {
+        sendAndReceive(destination, payload, rocketMQLocalRequestCallback, hashKey, producer.getSendMsgTimeout(), 0);
+    }
+
+    /**
+     * @param destination formats: `topicName:tags`
+     * @param message {@link org.springframework.messaging.Message} the message to be sent.
+     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
+     * @param hashKey needed when sending message orderly
+     * @param timeout send timeout in millis
+     * @return
+     */
+    public void sendAndReceive(String destination, Message<?> message,
+        RocketMQLocalRequestCallback rocketMQLocalRequestCallback, String hashKey, long timeout) {
+        sendAndReceive(destination, message, rocketMQLocalRequestCallback, hashKey, timeout, 0);
+    }
+
+    /**
+     * @param destination formats: `topicName:tags`
+     * @param payload the payload to be sent.
+     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
+     * @param hashKey needed when sending message orderly
+     * @param timeout send timeout in millis
+     * @return
+     */
+    public void sendAndReceive(String destination, Object payload,
+        RocketMQLocalRequestCallback rocketMQLocalRequestCallback, String hashKey, long timeout) {
+        sendAndReceive(destination, payload, rocketMQLocalRequestCallback, hashKey, timeout, 0);
+    }
+
+    /**
+     * @param destination formats: `topicName:tags`
+     * @param message {@link org.springframework.messaging.Message} the message to be sent.
+     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
+     * @param hashKey needed when sending message orderly
+     * @return
+     */
+    public void sendAndReceive(String destination, Message<?> message,
+        RocketMQLocalRequestCallback rocketMQLocalRequestCallback, String hashKey) {
+        sendAndReceive(destination, message, rocketMQLocalRequestCallback, hashKey, producer.getSendMsgTimeout(), 0);
+    }
+
+    /**
+     * @param destination formats: `topicName:tags`
+     * @param payload the payload to be sent.
+     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
+     * @param timeout send timeout in millis
+     * @param delayLevel message delay level(0 means no delay)
+     * @return
+     */
+    public void sendAndReceive(String destination, Object payload,
+        RocketMQLocalRequestCallback rocketMQLocalRequestCallback, long timeout, int delayLevel) {
+        sendAndReceive(destination, payload, rocketMQLocalRequestCallback, null, timeout, delayLevel);
+    }
+
+    /**
+     * @param destination formats: `topicName:tags`
+     * @param payload the payload to be sent.
+     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
+     * @param hashKey needed when sending message orderly
+     * @param timeout send timeout in millis
+     * @param delayLevel message delay level(0 means no delay)
+     * @return
+     */
+    public void sendAndReceive(String destination, Object payload,
+        RocketMQLocalRequestCallback rocketMQLocalRequestCallback, String hashKey, long timeout, int delayLevel) {
+        Message<?> message = MessageBuilder.withPayload(payload).build();
+        sendAndReceive(destination, message, rocketMQLocalRequestCallback, hashKey, timeout, delayLevel);
+    }
+
+    /**
+     * Send request message in asynchronous mode. </p> This method returns immediately. On receiving reply message,
+     * <code>rocketMQLocalRequestCallback</code> will be executed. </p>
+     *
+     * @param destination formats: `topicName:tags`
+     * @param message {@link org.springframework.messaging.Message} the message to be sent.
+     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
+     * @param hashKey needed when sending message orderly
+     * @param timeout send timeout in millis
+     * @param delayLevel message delay level(0 means no delay)
+     * @return
+     */
+    public void sendAndReceive(String destination, Message<?> message,
+        RocketMQLocalRequestCallback rocketMQLocalRequestCallback, String hashKey, long timeout, int delayLevel) {
+        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+            log.error("send request message failed. destination:{}, message is null ", destination);
+            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+        }
+
+        try {
+            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
+            if (delayLevel > 0) {
+                rocketMsg.setDelayTimeLevel(delayLevel);
+            }
+            if (timeout <= 0) {
+                timeout = producer.getSendMsgTimeout();
+            }
+            RequestCallback requestCallback = null;
+            if (rocketMQLocalRequestCallback != null) {
+                requestCallback = new RequestCallback() {
+                    @Override public void onSuccess(org.apache.rocketmq.common.message.Message message) {
+                        rocketMQLocalRequestCallback.onSuccess(doConvertMessage((MessageExt) message, getMessageType(rocketMQLocalRequestCallback)));
+                    }
+
+                    @Override public void onException(Throwable e) {
+                        rocketMQLocalRequestCallback.onException(e);
+                    }
+                };
+            }
+            if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
+                producer.request(rocketMsg, requestCallback, timeout);
+            } else {
+                producer.request(rocketMsg, messageQueueSelector, hashKey, requestCallback, timeout);
+            }
+        } catch (
+            Exception e) {
+            log.error("send request message failed. destination:{}, message:{} ", destination, message);
+            throw new MessagingException(e.getMessage(), e);
+        }
+
+    }
+
+    /**
      * <p> Send message in synchronous mode. This method returns only when the sending procedure totally completes.
      * Reliable synchronous transmission is used in extensive scenes, such as important notification messages, SMS
      * notification, SMS marketing system, etc.. </p>
@@ -87,7 +447,7 @@
      * duplication issue.
      *
      * @param destination formats: `topicName:tags`
-     * @param message     {@link org.springframework.messaging.Message}
+     * @param message {@link org.springframework.messaging.Message}
      * @return {@link SendResult}
      */
     public SendResult syncSend(String destination, Message<?> message) {
@@ -98,8 +458,8 @@
      * Same to {@link #syncSend(String, Message)} with send timeout specified in addition.
      *
      * @param destination formats: `topicName:tags`
-     * @param message     {@link org.springframework.messaging.Message}
-     * @param timeout     send timeout with millis
+     * @param message {@link org.springframework.messaging.Message}
+     * @param timeout send timeout with millis
      * @return {@link SendResult}
      */
     public SendResult syncSend(String destination, Message<?> message, long timeout) {
@@ -110,8 +470,8 @@
      * syncSend batch messages in a given timeout.
      *
      * @param destination formats: `topicName:tags`
-     * @param messages    Collection of {@link org.springframework.messaging.Message}
-     * @param timeout     send timeout with millis
+     * @param messages Collection of {@link org.springframework.messaging.Message}
+     * @param timeout send timeout with millis
      * @return {@link SendResult}
      */
     public <T extends Message> SendResult syncSend(String destination, Collection<T> messages, long timeout) {
@@ -147,9 +507,9 @@
      * Same to {@link #syncSend(String, Message)} with send timeout specified in addition.
      *
      * @param destination formats: `topicName:tags`
-     * @param message     {@link org.springframework.messaging.Message}
-     * @param timeout     send timeout with millis
-     * @param delayLevel  level for the delay message
+     * @param message {@link org.springframework.messaging.Message}
+     * @param timeout send timeout with millis
+     * @param delayLevel level for the delay message
      * @return {@link SendResult}
      */
     public SendResult syncSend(String destination, Message<?> message, long timeout, int delayLevel) {
@@ -179,7 +539,7 @@
      * Same to {@link #syncSend(String, Message)}.
      *
      * @param destination formats: `topicName:tags`
-     * @param payload     the Object to use as payload
+     * @param payload the Object to use as payload
      * @return {@link SendResult}
      */
     public SendResult syncSend(String destination, Object payload) {
@@ -190,8 +550,8 @@
      * Same to {@link #syncSend(String, Object)} with send timeout specified in addition.
      *
      * @param destination formats: `topicName:tags`
-     * @param payload     the Object to use as payload
-     * @param timeout     send timeout with millis
+     * @param payload the Object to use as payload
+     * @param timeout send timeout with millis
      * @return {@link SendResult}
      */
     public SendResult syncSend(String destination, Object payload, long timeout) {
@@ -203,8 +563,8 @@
      * Same to {@link #syncSend(String, Message)} with send orderly with hashKey by specified.
      *
      * @param destination formats: `topicName:tags`
-     * @param message     {@link org.springframework.messaging.Message}
-     * @param hashKey     use this key to select queue. for example: orderId, productId ...
+     * @param message {@link org.springframework.messaging.Message}
+     * @param hashKey use this key to select queue. for example: orderId, productId ...
      * @return {@link SendResult}
      */
     public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey) {
@@ -215,9 +575,9 @@
      * Same to {@link #syncSendOrderly(String, Message, String)} with send timeout specified in addition.
      *
      * @param destination formats: `topicName:tags`
-     * @param message     {@link org.springframework.messaging.Message}
-     * @param hashKey     use this key to select queue. for example: orderId, productId ...
-     * @param timeout     send timeout with millis
+     * @param message {@link org.springframework.messaging.Message}
+     * @param hashKey use this key to select queue. for example: orderId, productId ...
+     * @param timeout send timeout with millis
      * @return {@link SendResult}
      */
     public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) {
@@ -244,8 +604,8 @@
      * Same to {@link #syncSend(String, Object)} with send orderly with hashKey by specified.
      *
      * @param destination formats: `topicName:tags`
-     * @param payload     the Object to use as payload
-     * @param hashKey     use this key to select queue. for example: orderId, productId ...
+     * @param payload the Object to use as payload
+     * @param hashKey use this key to select queue. for example: orderId, productId ...
      * @return {@link SendResult}
      */
     public SendResult syncSendOrderly(String destination, Object payload, String hashKey) {
@@ -256,9 +616,9 @@
      * Same to {@link #syncSendOrderly(String, Object, String)} with send timeout specified in addition.
      *
      * @param destination formats: `topicName:tags`
-     * @param payload     the Object to use as payload
-     * @param hashKey     use this key to select queue. for example: orderId, productId ...
-     * @param timeout     send timeout with millis
+     * @param payload the Object to use as payload
+     * @param hashKey use this key to select queue. for example: orderId, productId ...
+     * @param timeout send timeout with millis
      * @return {@link SendResult}
      */
     public SendResult syncSendOrderly(String destination, Object payload, String hashKey, long timeout) {
@@ -270,11 +630,11 @@
      * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout and delay level specified in
      * addition.
      *
-     * @param destination  formats: `topicName:tags`
-     * @param message      {@link org.springframework.messaging.Message}
+     * @param destination formats: `topicName:tags`
+     * @param message {@link org.springframework.messaging.Message}
      * @param sendCallback {@link SendCallback}
-     * @param timeout      send timeout with millis
-     * @param delayLevel   level for the delay message
+     * @param timeout send timeout with millis
+     * @param delayLevel level for the delay message
      */
     public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout,
         int delayLevel) {
@@ -297,10 +657,10 @@
     /**
      * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout specified in addition.
      *
-     * @param destination  formats: `topicName:tags`
-     * @param message      {@link org.springframework.messaging.Message}
+     * @param destination formats: `topicName:tags`
+     * @param message {@link org.springframework.messaging.Message}
      * @param sendCallback {@link SendCallback}
-     * @param timeout      send timeout with millis
+     * @param timeout send timeout with millis
      */
     public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout) {
         asyncSend(destination, message, sendCallback, timeout, 0);
@@ -316,8 +676,8 @@
      * DefaultMQProducer#getRetryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield
      * message duplication and application developers are the one to resolve this potential issue.
      *
-     * @param destination  formats: `topicName:tags`
-     * @param message      {@link org.springframework.messaging.Message}
+     * @param destination formats: `topicName:tags`
+     * @param message {@link org.springframework.messaging.Message}
      * @param sendCallback {@link SendCallback}
      */
     public void asyncSend(String destination, Message<?> message, SendCallback sendCallback) {
@@ -327,10 +687,10 @@
     /**
      * Same to {@link #asyncSend(String, Object, SendCallback)} with send timeout specified in addition.
      *
-     * @param destination  formats: `topicName:tags`
-     * @param payload      the Object to use as payload
+     * @param destination formats: `topicName:tags`
+     * @param payload the Object to use as payload
      * @param sendCallback {@link SendCallback}
-     * @param timeout      send timeout with millis
+     * @param timeout send timeout with millis
      */
     public void asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout) {
         Message<?> message = MessageBuilder.withPayload(payload).build();
@@ -340,8 +700,8 @@
     /**
      * Same to {@link #asyncSend(String, Message, SendCallback)}.
      *
-     * @param destination  formats: `topicName:tags`
-     * @param payload      the Object to use as payload
+     * @param destination formats: `topicName:tags`
+     * @param payload the Object to use as payload
      * @param sendCallback {@link SendCallback}
      */
     public void asyncSend(String destination, Object payload, SendCallback sendCallback) {
@@ -352,11 +712,11 @@
      * Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)} with send timeout specified in
      * addition.
      *
-     * @param destination  formats: `topicName:tags`
-     * @param message      {@link org.springframework.messaging.Message}
-     * @param hashKey      use this key to select queue. for example: orderId, productId ...
+     * @param destination formats: `topicName:tags`
+     * @param message {@link org.springframework.messaging.Message}
+     * @param hashKey use this key to select queue. for example: orderId, productId ...
      * @param sendCallback {@link SendCallback}
-     * @param timeout      send timeout with millis
+     * @param timeout send timeout with millis
      */
     public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback,
         long timeout) {
@@ -376,9 +736,9 @@
     /**
      * Same to {@link #asyncSend(String, Message, SendCallback)} with send orderly with hashKey by specified.
      *
-     * @param destination  formats: `topicName:tags`
-     * @param message      {@link org.springframework.messaging.Message}
-     * @param hashKey      use this key to select queue. for example: orderId, productId ...
+     * @param destination formats: `topicName:tags`
+     * @param message {@link org.springframework.messaging.Message}
+     * @param hashKey use this key to select queue. for example: orderId, productId ...
      * @param sendCallback {@link SendCallback}
      */
     public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback) {
@@ -388,9 +748,9 @@
     /**
      * Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)}.
      *
-     * @param destination  formats: `topicName:tags`
-     * @param payload      the Object to use as payload
-     * @param hashKey      use this key to select queue. for example: orderId, productId ...
+     * @param destination formats: `topicName:tags`
+     * @param payload the Object to use as payload
+     * @param hashKey use this key to select queue. for example: orderId, productId ...
      * @param sendCallback {@link SendCallback}
      */
     public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback) {
@@ -400,11 +760,11 @@
     /**
      * Same to {@link #asyncSendOrderly(String, Object, String, SendCallback)} with send timeout specified in addition.
      *
-     * @param destination  formats: `topicName:tags`
-     * @param payload      the Object to use as payload
-     * @param hashKey      use this key to select queue. for example: orderId, productId ...
+     * @param destination formats: `topicName:tags`
+     * @param payload the Object to use as payload
+     * @param hashKey use this key to select queue. for example: orderId, productId ...
      * @param sendCallback {@link SendCallback}
-     * @param timeout      send timeout with millis
+     * @param timeout send timeout with millis
      */
     public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback,
         long timeout) {
@@ -419,7 +779,7 @@
      * One-way transmission is used for cases requiring moderate reliability, such as log collection.
      *
      * @param destination formats: `topicName:tags`
-     * @param message     {@link org.springframework.messaging.Message}
+     * @param message {@link org.springframework.messaging.Message}
      */
     public void sendOneWay(String destination, Message<?> message) {
         if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
@@ -439,7 +799,7 @@
      * Same to {@link #sendOneWay(String, Message)}
      *
      * @param destination formats: `topicName:tags`
-     * @param payload     the Object to use as payload
+     * @param payload the Object to use as payload
      */
     public void sendOneWay(String destination, Object payload) {
         Message<?> message = MessageBuilder.withPayload(payload).build();
@@ -450,8 +810,8 @@
      * Same to {@link #sendOneWay(String, Message)} with send orderly with hashKey by specified.
      *
      * @param destination formats: `topicName:tags`
-     * @param message     {@link org.springframework.messaging.Message}
-     * @param hashKey     use this key to select queue. for example: orderId, productId ...
+     * @param message {@link org.springframework.messaging.Message}
+     * @param hashKey use this key to select queue. for example: orderId, productId ...
      */
     public void sendOneWayOrderly(String destination, Message<?> message, String hashKey) {
         if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
@@ -471,7 +831,7 @@
      * Same to {@link #sendOneWayOrderly(String, Message, String)}
      *
      * @param destination formats: `topicName:tags`
-     * @param payload     the Object to use as payload
+     * @param payload the Object to use as payload
      */
     public void sendOneWayOrderly(String destination, Object payload, String hashKey) {
         Message<?> message = MessageBuilder.withPayload(payload).build();
@@ -511,9 +871,9 @@
     /**
      * Send Spring Message in Transaction
      *
-     * @param destination     destination formats: `topicName:tags`
-     * @param message         message {@link org.springframework.messaging.Message}
-     * @param arg             ext arg
+     * @param destination destination formats: `topicName:tags`
+     * @param message message {@link org.springframework.messaging.Message}
+     * @param arg ext arg
      * @return TransactionSendResult
      * @throws MessagingException
      */
@@ -537,4 +897,57 @@
             destination, msg);
     }
 
+    private Object doConvertMessage(MessageExt messageExt, Type type) {
+        if (Objects.equals(type, MessageExt.class)) {
+            return messageExt;
+        } else if (Objects.equals(type, byte[].class)) {
+            return messageExt.getBody();
+        } else {
+            String str = new String(messageExt.getBody(), Charset.forName(charset));
+            if (Objects.equals(type, String.class)) {
+                return str;
+            } else {
+                // If msgType not string, use objectMapper change it.
+                try {
+                    if (type instanceof Class) {
+                        //if the messageType has not Generic Parameter
+                        return this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) type);
+                    } else {
+                        //if the messageType has Generic Parameter, then use SmartMessageConverter#fromMessage with third parameter "conversionHint".
+                        //we have validate the MessageConverter is SmartMessageConverter in this#getMethodParameter.
+                        return ((SmartMessageConverter) this.getMessageConverter()).fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) ((ParameterizedType) type).getRawType(), null);
+                    }
+                } catch (Exception e) {
+                    log.error("convert failed. str:{}, msgType:{}", str, type);
+                    throw new RuntimeException("cannot convert message to " + type, e);
+                }
+            }
+        }
+    }
+
+    private Type getMessageType(RocketMQLocalRequestCallback rocketMQLocalRequestCallback) {
+        Class<?> targetClass = AopProxyUtils.ultimateTargetClass(rocketMQLocalRequestCallback);
+        Type matchedGenericInterface = null;
+        while (Objects.nonNull(targetClass)) {
+            Type[] interfaces = targetClass.getGenericInterfaces();
+            if (Objects.nonNull(interfaces)) {
+                for (Type type : interfaces) {
+                    if (type instanceof ParameterizedType && (Objects.equals(((ParameterizedType) type).getRawType(), RocketMQLocalRequestCallback.class))) {
+                        matchedGenericInterface = type;
+                        break;
+                    }
+                }
+            }
+            targetClass = targetClass.getSuperclass();
+        }
+        if (Objects.isNull(matchedGenericInterface)) {
+            return Object.class;
+        }
+
+        Type[] actualTypeArguments = ((ParameterizedType) matchedGenericInterface).getActualTypeArguments();
+        if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
+            return actualTypeArguments[0];
+        }
+        return Object.class;
+    }
 }
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
index 25ec320..2642d07 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
@@ -34,14 +34,20 @@
 import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
 import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.client.utils.MessageUtil;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.spring.annotation.ConsumeMode;
 import org.apache.rocketmq.spring.annotation.MessageModel;
 import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
 import org.apache.rocketmq.spring.annotation.SelectorType;
 import org.apache.rocketmq.spring.core.RocketMQListener;
 import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
+import org.apache.rocketmq.spring.core.RocketMQReplyListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.aop.framework.AopProxyUtils;
@@ -51,10 +57,14 @@
 import org.springframework.context.ApplicationContextAware;
 import org.springframework.context.SmartLifecycle;
 import org.springframework.core.MethodParameter;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.converter.MessageConversionException;
 import org.springframework.messaging.converter.MessageConverter;
 import org.springframework.messaging.converter.SmartMessageConverter;
 import org.springframework.messaging.support.MessageBuilder;
 import org.springframework.util.Assert;
+import org.springframework.util.MimeTypeUtils;
 
 @SuppressWarnings("WeakerAccess")
 public class DefaultRocketMQListenerContainer implements InitializingBean,
@@ -92,6 +102,8 @@
 
     private RocketMQListener rocketMQListener;
 
+    private RocketMQReplyListener rocketMQReplyListener;
+
     private RocketMQMessageListener rocketMQMessageListener;
 
     private DefaultMQPushConsumer consumer;
@@ -186,6 +198,14 @@
         this.rocketMQListener = rocketMQListener;
     }
 
+    public RocketMQReplyListener getRocketMQReplyListener() {
+        return rocketMQReplyListener;
+    }
+
+    public void setRocketMQReplyListener(RocketMQReplyListener rocketMQReplyListener) {
+        this.rocketMQReplyListener = rocketMQReplyListener;
+    }
+
     public RocketMQMessageListener getRocketMQMessageListener() {
         return rocketMQMessageListener;
     }
@@ -209,14 +229,14 @@
         return selectorType;
     }
 
-    public String getSelectorExpression() {
-        return selectorExpression;
-    }
-
     public void setSelectorExpression(String selectorExpression) {
         this.selectorExpression = selectorExpression;
     }
 
+    public String getSelectorExpression() {
+        return selectorExpression;
+    }
+
     public MessageModel getMessageModel() {
         return messageModel;
     }
@@ -230,11 +250,6 @@
     }
 
     @Override
-    public void setupMessageListener(RocketMQListener rocketMQListener) {
-        this.rocketMQListener = rocketMQListener;
-    }
-
-    @Override
     public void destroy() {
         this.setRunning(false);
         if (Objects.nonNull(consumer)) {
@@ -327,6 +342,119 @@
         this.name = name;
     }
 
+    public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
+            for (MessageExt messageExt : msgs) {
+                log.debug("received msg: {}", messageExt);
+                try {
+                    long now = System.currentTimeMillis();
+                    handleMessage(messageExt);
+                    long costTime = System.currentTimeMillis() - now;
+                    log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
+                } catch (Exception e) {
+                    log.warn("consume message failed. messageExt:{}, error:{}", messageExt, e);
+                    context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
+                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+                }
+            }
+
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        }
+    }
+
+    public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
+            for (MessageExt messageExt : msgs) {
+                log.debug("received msg: {}", messageExt);
+                try {
+                    long now = System.currentTimeMillis();
+                    handleMessage(messageExt);
+                    long costTime = System.currentTimeMillis() - now;
+                    log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
+                } catch (Exception e) {
+                    log.warn("consume message failed. messageExt:{}", messageExt, e);
+                    context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
+                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
+                }
+            }
+
+            return ConsumeOrderlyStatus.SUCCESS;
+        }
+    }
+
+    private void handleMessage(
+        MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException {
+        if (rocketMQListener != null) {
+            rocketMQListener.onMessage(doConvertMessage(messageExt));
+        } else if (rocketMQReplyListener != null) {
+            Object replyContent = rocketMQReplyListener.onMessage(doConvertMessage(messageExt));
+            Message<?> message = MessageBuilder.withPayload(replyContent).build();
+
+            org.apache.rocketmq.common.message.Message replyMessage = MessageUtil.createReplyMessage(messageExt, convertToBytes(message));
+            consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(replyMessage, new SendCallback() {
+                @Override public void onSuccess(SendResult sendResult) {
+                    if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
+                        log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());
+                    } else {
+                        log.info("Consumer replies message success.");
+                    }
+                }
+
+                @Override public void onException(Throwable e) {
+                    log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage());
+                }
+            });
+        }
+    }
+
+    private byte[] convertToBytes(Message<?> message) {
+        Message<?> messageWithSerializedPayload = doConvert(message.getPayload(), message.getHeaders());
+        Object payloadObj = messageWithSerializedPayload.getPayload();
+        byte[] payloads;
+        try {
+            if (null == payloadObj) {
+                throw new RuntimeException("the message cannot be empty");
+            }
+            if (payloadObj instanceof String) {
+                payloads = ((String) payloadObj).getBytes(Charset.forName(charset));
+            } else if (payloadObj instanceof byte[]) {
+                payloads = (byte[]) messageWithSerializedPayload.getPayload();
+            } else {
+                String jsonObj = (String) this.messageConverter.fromMessage(messageWithSerializedPayload, payloadObj.getClass());
+                if (null == jsonObj) {
+                    throw new RuntimeException(String.format(
+                        "empty after conversion [messageConverter:%s,payloadClass:%s,payloadObj:%s]",
+                        this.messageConverter.getClass(), payloadObj.getClass(), payloadObj));
+                }
+                payloads = jsonObj.getBytes(Charset.forName(charset));
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("convert to bytes failed.", e);
+        }
+        return payloads;
+    }
+
+    private Message<?> doConvert(Object payload, MessageHeaders headers) {
+        Message<?> message = this.messageConverter instanceof SmartMessageConverter ?
+            ((SmartMessageConverter) this.messageConverter).toMessage(payload, headers, null) :
+            this.messageConverter.toMessage(payload, headers);
+        if (message == null) {
+            String payloadType = payload.getClass().getName();
+            Object contentType = headers != null ? headers.get(MessageHeaders.CONTENT_TYPE) : null;
+            throw new MessageConversionException("Unable to convert payload with type='" + payloadType +
+                "', contentType='" + contentType + "', converter=[" + this.messageConverter + "]");
+        }
+        MessageBuilder<?> builder = MessageBuilder.fromMessage(message);
+        builder.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN);
+        return builder.build();
+    }
+
     @SuppressWarnings("unchecked")
     private Object doConvertMessage(MessageExt messageExt) {
         if (Objects.equals(messageType, MessageExt.class)) {
@@ -355,7 +483,12 @@
     }
 
     private MethodParameter getMethodParameter() {
-        Class<?> targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener);
+        Class<?> targetClass;
+        if (rocketMQListener != null) {
+            targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener);
+        } else {
+            targetClass = AopProxyUtils.ultimateTargetClass(rocketMQReplyListener);
+        }
         Type messageType = this.getMessageType();
         Class clazz = null;
         if (messageType instanceof ParameterizedType && messageConverter instanceof SmartMessageConverter) {
@@ -375,14 +508,19 @@
     }
 
     private Type getMessageType() {
-        Class<?> targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener);
+        Class<?> targetClass;
+        if (rocketMQListener != null) {
+            targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener);
+        } else {
+            targetClass = AopProxyUtils.ultimateTargetClass(rocketMQReplyListener);
+        }
         Type matchedGenericInterface = null;
         while (Objects.nonNull(targetClass)) {
             Type[] interfaces = targetClass.getGenericInterfaces();
             if (Objects.nonNull(interfaces)) {
                 for (Type type : interfaces) {
-                    if (type instanceof ParameterizedType
-                            && Objects.equals(((ParameterizedType) type).getRawType(), RocketMQListener.class)) {
+                    if (type instanceof ParameterizedType &&
+                        (Objects.equals(((ParameterizedType) type).getRawType(), RocketMQListener.class) || Objects.equals(((ParameterizedType) type).getRawType(), RocketMQReplyListener.class))) {
                         matchedGenericInterface = type;
                         break;
                     }
@@ -401,10 +539,10 @@
         return Object.class;
     }
 
-
-
     private void initRocketMQPushConsumer() throws MQClientException {
-        Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required");
+        if (rocketMQListener == null && rocketMQReplyListener == null) {
+            throw new IllegalArgumentException("Property 'rocketMQListener' or 'rocketMQReplyListener' is required");
+        }
         Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
         Assert.notNull(nameServer, "Property 'nameServer' is required");
         Assert.notNull(topic, "Property 'topic' is required");
@@ -475,54 +613,10 @@
 
         if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
             ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
+        } else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) {
+            ((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);
         }
 
     }
 
-    public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
-
-        @SuppressWarnings("unchecked")
-        @Override
-        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
-            for (MessageExt messageExt : msgs) {
-                log.debug("received msg: {}", messageExt);
-                try {
-                    long now = System.currentTimeMillis();
-                    rocketMQListener.onMessage(doConvertMessage(messageExt));
-                    long costTime = System.currentTimeMillis() - now;
-                    log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
-                } catch (Exception e) {
-                    log.warn("consume message failed. messageExt:{}", messageExt, e);
-                    context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
-                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
-                }
-            }
-
-            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-        }
-    }
-
-    public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
-
-        @SuppressWarnings("unchecked")
-        @Override
-        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
-            for (MessageExt messageExt : msgs) {
-                log.debug("received msg: {}", messageExt);
-                try {
-                    long now = System.currentTimeMillis();
-                    rocketMQListener.onMessage(doConvertMessage(messageExt));
-                    long costTime = System.currentTimeMillis() - now;
-                    log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
-                } catch (Exception e) {
-                    log.warn("consume message failed. messageExt:{}", messageExt, e);
-                    context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
-                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
-                }
-            }
-
-            return ConsumeOrderlyStatus.SUCCESS;
-        }
-    }
-
 }
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQListenerContainer.java
index ee52de8..d9693bc 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQListenerContainer.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQListenerContainer.java
@@ -17,14 +17,8 @@
 
 package org.apache.rocketmq.spring.support;
 
-import org.apache.rocketmq.spring.core.RocketMQListener;
 import org.springframework.beans.factory.DisposableBean;
 
 public interface RocketMQListenerContainer extends DisposableBean {
 
-    /**
-     * Setup the message listener to use. Throws an {@link IllegalArgumentException} if that message listener type is
-     * not supported.
-     */
-    void setupMessageListener(RocketMQListener<?> messageListener);
 }
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
index 553183d..18a32f8 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
@@ -27,6 +27,7 @@
 import org.apache.rocketmq.spring.core.RocketMQListener;
 import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
 import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
+import org.apache.rocketmq.spring.core.RocketMQReplyListener;
 import org.apache.rocketmq.spring.core.RocketMQTemplate;
 import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
 import org.junit.Assert;
@@ -183,6 +184,24 @@
             });
     }
 
+    @Test
+    public void testRocketMQListenerContainer_RocketMQReplyListener() {
+        runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876").
+            withUserConfiguration(TestConfigWithRocketMQReplyListener.class).
+            run((context) -> {
+                assertThat(context).getFailure().hasMessageContaining("connect to [127.0.0.1:9876] failed");
+            });
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testRocketMQListenerContainer_WrongRocketMQListenerType() {
+        runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876").
+            withUserConfiguration(TestConfigWithWrongRocketMQListener.class).
+            run((context) -> {
+                context.getBean(RocketMQMessageConverter.class);
+            });
+    }
+
     @Configuration
     static class TestConfig {
 
@@ -199,6 +218,30 @@
     }
 
     @Configuration
+    static class TestConfigWithRocketMQReplyListener {
+
+        @Bean
+        public Object consumeListener() {
+            return new TestDefaultNameServerRocketMQReplyListener();
+        }
+
+        @Bean
+        public Object consumeListener1() {
+            return new TestCustomNameServerRocketMQReplyListener();
+        }
+
+    }
+
+    @Configuration
+    static class TestConfigWithWrongRocketMQListener {
+
+        @Bean
+        public Object consumeListener() {
+            return new WrongRocketMQListener();
+        }
+    }
+
+    @Configuration
     static class CustomObjectMapperConfig {
 
         @Bean
@@ -236,6 +279,32 @@
         }
     }
 
+    @RocketMQMessageListener(consumerGroup = "abcd", topic = "test")
+    static class TestDefaultNameServerRocketMQReplyListener implements RocketMQReplyListener<String, String> {
+
+        @Override
+        public String onMessage(String message) {
+            return "test";
+        }
+    }
+
+    @RocketMQMessageListener(consumerGroup = "abcde", topic = "test")
+    static class WrongRocketMQListener {
+
+        public String onMessage(String message) {
+            return "test";
+        }
+    }
+
+    @RocketMQMessageListener(nameServer = "127.0.1.1:9876", consumerGroup = "abcd1", topic = "test")
+    static class TestCustomNameServerRocketMQReplyListener implements RocketMQReplyListener<String, String> {
+
+        @Override
+        public String onMessage(String message) {
+            return "test";
+        }
+    }
+
     @Configuration
     static class TestTransactionListenerConfig {
         @Bean
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
index a6cc91d..da6d777 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
@@ -21,6 +21,7 @@
 import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.TransactionMQProducer;
+import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
 import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
 import org.junit.Test;
@@ -28,10 +29,13 @@
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
 import org.springframework.messaging.MessagingException;
+import org.springframework.messaging.support.MessageBuilder;
 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
 
 @RunWith(SpringJUnit4ClassRunner.class)
 @SpringBootTest(properties = {
@@ -48,6 +52,12 @@
     @Value("${test.rocketmq.topic}")
     String topic;
 
+    @Value("stringRequestTopic:tagA")
+    String stringRequestTopic;
+
+    @Value("objectRequestTopic:tagA")
+    String objectRequestTopic;
+
     @Test
     public void testSendMessage() {
         try {
@@ -78,6 +88,104 @@
     }
 
     @Test
+    public void testSendAndReceive_NullMessage() {
+        try {
+            String response = rocketMQTemplate.sendAndReceive(stringRequestTopic, new Message<String>() {
+                @Override public String getPayload() {
+                    return null;
+                }
+
+                @Override public MessageHeaders getHeaders() {
+                    return null;
+                }
+            }, String.class);
+        } catch (IllegalArgumentException e) {
+            assertThat(e).hasMessageContaining("`message` and `message.payload` cannot be null");
+        }
+
+        try {
+            String response = rocketMQTemplate.sendAndReceive(stringRequestTopic, (Object) null, String.class);
+        } catch (IllegalArgumentException e) {
+            assertThat(e).hasMessageContaining("Payload must not be null");
+        }
+    }
+
+    @Test
+    public void testSendAndReceive_Sync() throws InterruptedException {
+        try {
+            String responseMessage = rocketMQTemplate.sendAndReceive(stringRequestTopic, MessageBuilder.withPayload("requestTopicSync").build(), String.class);
+            assertThat(responseMessage).isNotNull();
+        } catch (MessagingException e) {
+            assertThat(e).hasMessageContaining("org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [127.0.0.1:9876] failed");
+        }
+
+        try {
+            String responseMessage = rocketMQTemplate.sendAndReceive(stringRequestTopic, "requestTopicSync", String.class, "orderId");
+            assertThat(responseMessage).isNotNull();
+        } catch (MessagingException e) {
+            assertThat(e).hasMessageContaining("org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [127.0.0.1:9876] failed");
+        }
+    }
+
+    @Test
+    public void testSendAndReceive_Async() {
+        try {
+            rocketMQTemplate.sendAndReceive(stringRequestTopic, MessageBuilder.withPayload("requestTopicASync").build(), new RocketMQLocalRequestCallback<String>() {
+                @Override public void onSuccess(String message) {
+                    System.out.printf("receive string: %s %n", message);
+                }
+
+                @Override public void onException(Throwable e) {
+                    e.printStackTrace();
+                }
+            });
+        } catch (MessagingException e) {
+            assertThat(e).hasMessageContaining("org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [127.0.0.1:9876] failed");
+        }
+
+        try {
+            rocketMQTemplate.sendAndReceive(stringRequestTopic, "requestTopicAsyncWithHasKey", new RocketMQLocalRequestCallback<String>() {
+                @Override public void onSuccess(String message) {
+                    System.out.printf("receive string: %s %n", message);
+                }
+
+                @Override public void onException(Throwable e) {
+                    e.printStackTrace();
+                }
+            }, "order-id");
+        } catch (MessagingException e) {
+            assertThat(e).hasMessageContaining("org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [127.0.0.1:9876] failed");
+        }
+
+        try {
+            rocketMQTemplate.sendAndReceive(stringRequestTopic, "requestTopicAsyncWithTimeout", new RocketMQLocalRequestCallback<String>() {
+                @Override public void onSuccess(String message) {
+                    System.out.printf("receive string: %s %n", message);
+                }
+
+                @Override public void onException(Throwable e) {
+                    e.printStackTrace();
+                }
+            }, "order-id", 5000);
+        } catch (MessagingException e) {
+            assertThat(e).hasMessageContaining("org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [127.0.0.1:9876] failed");
+        }
+        try {
+            rocketMQTemplate.sendAndReceive(objectRequestTopic, "requestTopicAsyncWithTimeout", new RocketMQLocalRequestCallback<MessageExt>() {
+                @Override public void onSuccess(MessageExt message) {
+                    System.out.printf("receive messageExt: %s %n", message.toString());
+                }
+
+                @Override public void onException(Throwable e) {
+                    e.printStackTrace();
+                }
+            }, 5000);
+        } catch (MessagingException e) {
+            assertThat(e).hasMessageContaining("org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [127.0.0.1:9876] failed");
+        }
+    }
+
+    @Test
     public void testProperties() {
         assertThat(rocketMQTemplate.getProducer().getNamesrvAddr()).isEqualTo("127.0.0.1:9876");
         assertThat(rocketMQTemplate.getProducer().getProducerGroup()).isEqualTo("rocketMQTemplate-test-producer_group");
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
index 60014c9..7133b98 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
@@ -16,12 +16,16 @@
  */
 package org.apache.rocketmq.spring.support;
 
+import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.lang.reflect.ParameterizedType;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.apache.rocketmq.spring.core.RocketMQReplyListener;
 import org.junit.Test;
 import org.springframework.core.MethodParameter;
+import org.springframework.messaging.Message;
 import org.springframework.messaging.converter.CompositeMessageConverter;
 import org.springframework.messaging.converter.MappingJackson2MessageConverter;
 import org.springframework.messaging.converter.StringMessageConverter;
@@ -29,6 +33,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
+import org.springframework.messaging.support.MessageBuilder;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -54,6 +59,64 @@
         });
         result = (Class) getMessageType.invoke(listenerContainer);
         assertThat(result.getName().equals(MessageExt.class.getName()));
+
+        listenerContainer.setRocketMQReplyListener(new RocketMQReplyListener<MessageExt, String>() {
+            @Override
+            public String onMessage(MessageExt message) {
+                return "test";
+            }
+        });
+        result = (Class) getMessageType.invoke(listenerContainer);
+        assertThat(result.getName().equals(MessageExt.class.getName()));
+
+        listenerContainer.setRocketMQReplyListener(new RocketMQReplyListener<String, String>() {
+            @Override
+            public String onMessage(String message) {
+                return "test";
+            }
+        });
+        result = (Class) getMessageType.invoke(listenerContainer);
+        assertThat(result.getName().equals(String.class.getName()));
+    }
+
+    @Test
+    public void testDoConvertMessage() throws Exception {
+        DefaultRocketMQListenerContainer listenerContainer = new DefaultRocketMQListenerContainer();
+        Method doConvertMessage = DefaultRocketMQListenerContainer.class.getDeclaredMethod("doConvertMessage", MessageExt.class);
+        doConvertMessage.setAccessible(true);
+
+        listenerContainer.setRocketMQListener(new RocketMQListener<String>() {
+            @Override
+            public void onMessage(String message) {
+            }
+        });
+
+        Field messageType = DefaultRocketMQListenerContainer.class.getDeclaredField("messageType");
+        messageType.setAccessible(true);
+        messageType.set(listenerContainer, String.class);
+        MessageExt messageExt = new MessageExt(0, System.currentTimeMillis(), null, System.currentTimeMillis(), null, null);
+        messageExt.setBody("hello".getBytes());
+        String result = (String) doConvertMessage.invoke(listenerContainer, messageExt);
+        assertThat(result).isEqualTo("hello");
+
+        listenerContainer.setRocketMQListener(new RocketMQListener<MessageExt>() {
+            @Override
+            public void onMessage(MessageExt message) {
+            }
+        });
+        Field messageType2 = DefaultRocketMQListenerContainer.class.getDeclaredField("messageType");
+        messageType2.setAccessible(true);
+        messageType2.set(listenerContainer, MessageExt.class);
+        messageExt = new MessageExt(0, System.currentTimeMillis(), null, System.currentTimeMillis(), null, null);
+        messageExt.setBody("hello".getBytes());
+        MessageExt result2 = (MessageExt) doConvertMessage.invoke(listenerContainer, messageExt);
+        assertThat(result2).isEqualTo(messageExt);
+
+        listenerContainer.setRocketMQListener(new RocketMQListener<User>() {
+            @Override
+            public void onMessage(User message) {
+            }
+        });
     }
 
     @Test
@@ -76,6 +139,41 @@
         assertThat(type.getRawType() == ArrayList.class);
         MethodParameter methodParameter = ((MethodParameter) getMethodParameter.invoke(listenerContainer));
         assertThat(methodParameter.getParameterType() == ArrayList.class);
+
+        listenerContainer.setRocketMQReplyListener(new RocketMQReplyListener<ArrayList<Date>, String>() {
+            @Override
+            public String onMessage(ArrayList<Date> message) {
+                return "test";
+            }
+        });
+
+        type = (ParameterizedType) getMessageType.invoke(listenerContainer);
+        assertThat(type.getRawType() == ArrayList.class);
+        methodParameter = ((MethodParameter) getMethodParameter.invoke(listenerContainer));
+        assertThat(methodParameter.getParameterType() == ArrayList.class);
+    }
+
+    class User {
+        private String userName;
+        private int userAge;
+
+        public String getUserName() {
+            return userName;
+        }
+
+        public User setUserName(String userName) {
+            this.userName = userName;
+            return this;
+        }
+
+        public int getUserAge() {
+            return userAge;
+        }
+
+        public User setUserAge(int userAge) {
+            this.userAge = userAge;
+            return this;
+        }
     }
 }