[ISSUE #208]support request/reply model in rocketmq-spring (#209)
* support request/response model in rocketmq-spring
* fix checkstyle problem
* add more test cases
* optimize request/reply model
* add examples to ProduceApplication.java
* wrap RequestCallback to conceal RocketMQ message
* requestCallback as method parameter
* delete useless class
* fix some comments and print format
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ConsumerWithReplyBytes.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ConsumerWithReplyBytes.java
new file mode 100644
index 0000000..850fbd3
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ConsumerWithReplyBytes.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot.consumer;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQReplyListener;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.stereotype.Service;
+
+/**
+ * The consumer that replying bytes
+ */
+@Service
+@RocketMQMessageListener(topic = "${demo.rocketmq.bytesRequestTopic}", consumerGroup = "${demo.rocketmq.bytesRequestConsumer}", selectorExpression = "${demo.rocketmq.tag}")
+public class ConsumerWithReplyBytes implements RocketMQReplyListener<MessageExt, byte[]> {
+
+ @Override
+ public byte[] onMessage(MessageExt message) {
+ System.out.printf("------- ConsumerWithReplyBytes received: %s \n", message);
+ return "reply message content".getBytes();
+ }
+}
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ConsumerWithReplyGeneric.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ConsumerWithReplyGeneric.java
new file mode 100644
index 0000000..e17e675
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ConsumerWithReplyGeneric.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot.consumer;
+
+import org.apache.rocketmq.samples.springboot.domain.ProductWithPayload;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQReplyListener;
+import org.springframework.stereotype.Service;
+
+/**
+ * The consumer that replying generic type
+ */
+@Service
+@RocketMQMessageListener(topic = "${demo.rocketmq.genericRequestTopic}", consumerGroup = "${demo.rocketmq.genericRequestConsumer}", selectorExpression = "${demo.rocketmq.tag}")
+public class ConsumerWithReplyGeneric implements RocketMQReplyListener<String, ProductWithPayload<String>> {
+ @Override
+ public ProductWithPayload<String> onMessage(String message) {
+ System.out.printf("------- ConsumerWithReplyGeneric received: %s \n", message);
+ return new ProductWithPayload<String>("replyProductName", "product payload");
+ }
+}
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ObjectConsumerWithReplyUser.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ObjectConsumerWithReplyUser.java
new file mode 100644
index 0000000..f66b003
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ObjectConsumerWithReplyUser.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot.consumer;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.samples.springboot.domain.User;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQReplyListener;
+import org.springframework.stereotype.Service;
+
+/**
+ * The consumer that replying Object
+ */
+@Service
+@RocketMQMessageListener(topic = "${demo.rocketmq.objectRequestTopic}", consumerGroup = "${demo.rocketmq.objectRequestConsumer}", selectorExpression = "${demo.rocketmq.tag}")
+public class ObjectConsumerWithReplyUser implements RocketMQReplyListener<User, User> {
+
+ @Override
+ public User onMessage(User user) {
+ System.out.printf("------- ObjectConsumerWithReplyUser received: %s \n", user);
+ User replyUser = new User();
+ replyUser.setUserAge((byte) 10);
+ replyUser.setUserName("replyUserName");
+ return replyUser;
+ }
+}
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumer.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumer.java
index 3801d48..11ac489 100644
--- a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumer.java
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumer.java
@@ -22,7 +22,7 @@
import org.springframework.stereotype.Service;
/**
- * RocketMQMessageListener
+ * StringConsumer
*/
@Service
@RocketMQMessageListener(topic = "${demo.rocketmq.topic}", consumerGroup = "string_consumer", selectorExpression = "${demo.rocketmq.tag}")
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumerWithReplyString.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumerWithReplyString.java
new file mode 100644
index 0000000..b194bc6
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumerWithReplyString.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot.consumer;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQReplyListener;
+import org.springframework.stereotype.Service;
+
+/**
+ * The consumer that replying String
+ */
+@Service
+@RocketMQMessageListener(topic = "${demo.rocketmq.stringRequestTopic}", consumerGroup = "${demo.rocketmq.stringRequestConsumer}", selectorExpression = "${demo.rocketmq.tag}")
+public class StringConsumerWithReplyString implements RocketMQReplyListener<String, String> {
+
+ @Override
+ public String onMessage(String message) {
+ System.out.printf("------- StringConsumerWithReplyString received: %s \n", message);
+ return "reply string";
+ }
+}
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/ProductWithPayload.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/ProductWithPayload.java
new file mode 100644
index 0000000..e241929
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/ProductWithPayload.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot.domain;
+
+public class ProductWithPayload<T> {
+ private String productName;
+ private T payload;
+
+ public ProductWithPayload() {
+ }
+
+ public ProductWithPayload(String productName, T payload) {
+ this.productName = productName;
+ this.payload = payload;
+ }
+
+ public String getProductName() {
+ return productName;
+ }
+
+ public void setProductName(String productName) {
+ this.productName = productName;
+ }
+
+ public T getPayload() {
+ return payload;
+ }
+
+ public void setPayload(T payload) {
+ this.payload = payload;
+ }
+
+ @Override public String toString() {
+ return "ProductWithPayload{" +
+ "productName='" + productName + '\'' +
+ ", payload=" + payload +
+ '}';
+ }
+}
\ No newline at end of file
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties
index b2b2690..5be0358 100644
--- a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/resources/application.properties
@@ -1,9 +1,16 @@
spring.application.name=rocketmq-consume-demo
rocketmq.name-server=localhost:9876
-
# properties used in application code
demo.rocketmq.topic=string-topic
+demo.rocketmq.bytesRequestTopic=bytesRequestTopic
+demo.rocketmq.stringRequestTopic=stringRequestTopic
+demo.rocketmq.objectRequestTopic=objectRequestTopic
+demo.rocketmq.genericRequestTopic=genericRequestTopic
+demo.rocketmq.bytesRequestConsumer=bytesRequestConsumer
+demo.rocketmq.stringRequestConsumer=stringRequestConsumer
+demo.rocketmq.objectRequestConsumer=objectRequestConsumer
+demo.rocketmq.genericRequestConsumer=genericRequestConsumer
demo.rocketmq.orderTopic=order-paid-topic
demo.rocketmq.msgExtTopic=message-ext-topic
demo.rocketmq.transTopic=spring-transaction-topic
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
index d914bb5..46ac5ab 100644
--- a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.samples.springboot;
+import com.alibaba.fastjson.TypeReference;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
@@ -26,8 +27,10 @@
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.samples.springboot.domain.OrderPaidEvent;
+import org.apache.rocketmq.samples.springboot.domain.ProductWithPayload;
import org.apache.rocketmq.samples.springboot.domain.User;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
+import org.apache.rocketmq.spring.core.RocketMQLocalRequestCallback;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
@@ -60,6 +63,15 @@
private String orderPaidTopic;
@Value("${demo.rocketmq.msgExtTopic}")
private String msgExtTopic;
+ @Value("${demo.rocketmq.stringRequestTopic}")
+ private String stringRequestTopic;
+ @Value("${demo.rocketmq.bytesRequestTopic}")
+ private String bytesRequestTopic;
+ @Value("${demo.rocketmq.objectRequestTopic}")
+ private String objectRequestTopic;
+ @Value("${demo.rocketmq.genericRequestTopic}")
+ private String genericRequestTopic;
+
@Resource(name = "extRocketMQTemplate")
private RocketMQTemplate extRocketMQTemplate;
@@ -116,6 +128,45 @@
// Send transactional messages using extRocketMQTemplate
testExtRocketMQTemplateTransaction();
+
+ // Send request in sync mode and receive a reply of String type.
+ String replyString = rocketMQTemplate.sendAndReceive(stringRequestTopic, "request string", String.class);
+ System.out.printf("send %s and receive %s %n", "request string", replyString);
+
+ // Send request in sync mode with timeout parameter and receive a reply of byte[] type.
+ byte[] replyBytes = rocketMQTemplate.sendAndReceive(bytesRequestTopic, MessageBuilder.withPayload("request byte[]").build(), byte[].class, 3000);
+ System.out.printf("send %s and receive %s %n", "request byte[]", new String(replyBytes));
+
+ // Send request in sync mode with hashKey parameter and receive a reply of User type.
+ User requestUser = new User().setUserAge((byte) 9).setUserName("requestUserName");
+ User replyUser = rocketMQTemplate.sendAndReceive(objectRequestTopic, requestUser, User.class, "order-id");
+ System.out.printf("send %s and receive %s %n", requestUser, replyUser);
+ // Send request in sync mode with timeout and delayLevel parameter parameter and receive a reply of generic type.
+ ProductWithPayload<String> replyGenericObject = rocketMQTemplate.sendAndReceive(genericRequestTopic, "request generic",
+ new TypeReference<ProductWithPayload<String>>() {
+ }.getType(), 30000, 2);
+ System.out.printf("send %s and receive %s %n", "request generic", replyGenericObject);
+
+ // Send request in async mode and receive a reply of String type.
+ rocketMQTemplate.sendAndReceive(stringRequestTopic, "request string", new RocketMQLocalRequestCallback<String>() {
+ @Override public void onSuccess(String message) {
+ System.out.printf("send %s and receive %s %n", "request string", message);
+ }
+
+ @Override public void onException(Throwable e) {
+ e.printStackTrace();
+ }
+ });
+ // Send request in async mode and receive a reply of User type.
+ rocketMQTemplate.sendAndReceive(objectRequestTopic, new User().setUserAge((byte) 9).setUserName("requestUserName"), new RocketMQLocalRequestCallback<User>() {
+ @Override public void onSuccess(User message) {
+ System.out.printf("send user object and receive %s %n", message.toString());
+ }
+
+ @Override public void onException(Throwable e) {
+ e.printStackTrace();
+ }
+ }, 5000);
}
private void testBatchMessages() {
@@ -237,5 +288,4 @@
return RocketMQLocalTransactionState.COMMIT;
}
}
-
}
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/ProductWithPayload.java b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/ProductWithPayload.java
new file mode 100644
index 0000000..e241929
--- /dev/null
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/ProductWithPayload.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot.domain;
+
+public class ProductWithPayload<T> {
+ private String productName;
+ private T payload;
+
+ public ProductWithPayload() {
+ }
+
+ public ProductWithPayload(String productName, T payload) {
+ this.productName = productName;
+ this.payload = payload;
+ }
+
+ public String getProductName() {
+ return productName;
+ }
+
+ public void setProductName(String productName) {
+ this.productName = productName;
+ }
+
+ public T getPayload() {
+ return payload;
+ }
+
+ public void setPayload(T payload) {
+ this.payload = payload;
+ }
+
+ @Override public String toString() {
+ return "ProductWithPayload{" +
+ "productName='" + productName + '\'' +
+ ", payload=" + payload +
+ '}';
+ }
+}
\ No newline at end of file
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/User.java b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/User.java
index 42acc11..4f2579f 100644
--- a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/User.java
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/domain/User.java
@@ -18,32 +18,32 @@
package org.apache.rocketmq.samples.springboot.domain;
public class User {
- private String userName;
- private Byte userAge;
+ private String userName;
+ private Byte userAge;
- public String getUserName() {
- return userName;
- }
+ public String getUserName() {
+ return userName;
+ }
- public User setUserName(String userName) {
- this.userName = userName;
- return this;
- }
+ public User setUserName(String userName) {
+ this.userName = userName;
+ return this;
+ }
- public Byte getUserAge() {
- return userAge;
- }
+ public Byte getUserAge() {
+ return userAge;
+ }
- public User setUserAge(Byte userAge) {
- this.userAge = userAge;
- return this;
- }
+ public User setUserAge(Byte userAge) {
+ this.userAge = userAge;
+ return this;
+ }
- @Override
- public String toString() {
- return "User{" +
- "userName='" + userName + '\'' +
- ", userAge=" + userAge +
- '}';
- }
- }
\ No newline at end of file
+ @Override
+ public String toString() {
+ return "User{" +
+ "userName='" + userName + '\'' +
+ ", userAge=" + userAge +
+ '}';
+ }
+}
\ No newline at end of file
diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/application.properties b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/application.properties
index 27a3abc..c68ac3a 100644
--- a/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/application.properties
+++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/resources/application.properties
@@ -9,4 +9,9 @@
demo.rocketmq.transTopic=spring-transaction-topic
demo.rocketmq.topic.user=user-topic
+demo.rocketmq.bytesRequestTopic=bytesRequestTopic:tagA
+demo.rocketmq.stringRequestTopic=stringRequestTopic:tagA
+demo.rocketmq.objectRequestTopic=objectRequestTopic:tagA
+demo.rocketmq.genericRequestTopic=genericRequestTopic:tagA
+
demo.rocketmq.extNameServer=127.0.0.1:9876
\ No newline at end of file
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQTransactionListener.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQTransactionListener.java
index dac280f..0faa57a 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQTransactionListener.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQTransactionListener.java
@@ -26,7 +26,8 @@
/**
* This annotation is used over a class which implements interface
- * org.apache.rocketmq.client.producer.TransactionListener. The class implements
+ * org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener, which will be converted to
+ * org.apache.rocketmq.client.producer.TransactionListener later. The class implements
* two methods for process callback events after the txProducer sends a transactional message.
* <p>Note: The annotation is used only on RocketMQ client producer side, it can not be used
* on consumer side.
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
index 7bda36c..699474d 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
@@ -26,6 +26,7 @@
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.apache.rocketmq.spring.core.RocketMQReplyListener;
import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.slf4j.Logger;
@@ -65,7 +66,7 @@
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- this.applicationContext = (ConfigurableApplicationContext)applicationContext;
+ this.applicationContext = (ConfigurableApplicationContext) applicationContext;
}
@Override
@@ -80,8 +81,12 @@
private void registerContainer(String beanName, Object bean) {
Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
- if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {
- throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName());
+ if (RocketMQListener.class.isAssignableFrom(bean.getClass()) && RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
+ throw new IllegalStateException(clazz + " cannot be both instance of " + RocketMQListener.class.getName() + " and " + RocketMQReplyListener.class.getName());
+ }
+
+ if (!RocketMQListener.class.isAssignableFrom(bean.getClass()) && !RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
+ throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName() + " or " + RocketMQReplyListener.class.getName());
}
RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
@@ -90,7 +95,7 @@
String topic = this.environment.resolvePlaceholders(annotation.topic());
boolean listenerEnabled =
- (boolean)rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
+ (boolean) rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
.getOrDefault(topic, true);
if (!listenerEnabled) {
@@ -103,7 +108,7 @@
String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
counter.incrementAndGet());
- GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext;
+ GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
() -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
@@ -124,9 +129,9 @@
private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,
RocketMQMessageListener annotation) {
DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();
-
+
container.setRocketMQMessageListener(annotation);
-
+
String nameServer = environment.resolvePlaceholders(annotation.nameServer());
nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer;
String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());
@@ -140,7 +145,11 @@
container.setSelectorExpression(tags);
}
container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
- container.setRocketMQListener((RocketMQListener)bean);
+ if (RocketMQListener.class.isAssignableFrom(bean.getClass())) {
+ container.setRocketMQListener((RocketMQListener) bean);
+ } else if (RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
+ container.setRocketMQReplyListener((RocketMQReplyListener) bean);
+ }
container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
container.setName(name); // REVIEW ME, use the same clientId or multiple?
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQLocalRequestCallback.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQLocalRequestCallback.java
new file mode 100644
index 0000000..56b15ff
--- /dev/null
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQLocalRequestCallback.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.spring.core;
+
+/**
+ * Classes implementing this interface are used for processing callback events after receiving
+ * reply messages from consumers.
+ *
+ * @param <T> the type of message that wanted to receive from consumer
+ */
+public interface RocketMQLocalRequestCallback<T> {
+ void onSuccess(final T message);
+
+ void onException(final Throwable e);
+}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQReplyListener.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQReplyListener.java
new file mode 100644
index 0000000..916368d
--- /dev/null
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQReplyListener.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.spring.core;
+
+/**
+ * The consumer supporting request-reply should implement this interface.
+ *
+ * @param <T> the type of data received by the listener
+ * @param <R> the type of data replying to producer
+ */
+public interface RocketMQReplyListener<T, R> {
+ /**
+ * @param message data received by the listener
+ * @return data replying to producer
+ */
+ R onMessage(T message);
+}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index c655696..089016a 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -17,6 +17,9 @@
package org.apache.rocketmq.spring.core;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
@@ -24,19 +27,24 @@
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.client.producer.RequestCallback;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
+import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.core.AbstractMessageSendingTemplate;
import org.springframework.messaging.core.MessagePostProcessor;
import org.springframework.messaging.support.MessageBuilder;
@@ -52,6 +60,8 @@
private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash();
+ private RocketMQMessageConverter rocketMQMessageConverter = new RocketMQMessageConverter();
+
public DefaultMQProducer getProducer() {
return producer;
}
@@ -77,6 +87,356 @@
}
/**
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param type The type of T
+ * @return
+ */
+ public <T> T sendAndReceive(String destination, Message<?> message, Type type) {
+ return sendAndReceive(destination, message, type, null, producer.getSendMsgTimeout(), 0);
+ }
+
+ /**
+ * @param destination formats: `topicName:tags`
+ * @param payload the payload to be sent.
+ * @param type The type of T
+ * @return
+ */
+ public <T> T sendAndReceive(String destination, Object payload, Type type) {
+ return sendAndReceive(destination, payload, type, null, producer.getSendMsgTimeout(), 0);
+ }
+
+ /**
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param type The type of T
+ * @param timeout send timeout in millis
+ * @return
+ */
+ public <T> T sendAndReceive(String destination, Message<?> message, Type type, long timeout) {
+ return sendAndReceive(destination, message, type, null, timeout, 0);
+ }
+
+ /**
+ * @param destination formats: `topicName:tags`
+ * @param payload the payload to be sent.
+ * @param type The type of T
+ * @param timeout send timeout in millis
+ * @return
+ */
+ public <T> T sendAndReceive(String destination, Object payload, Type type, long timeout) {
+ return sendAndReceive(destination, payload, type, null, timeout, 0);
+ }
+
+ /**
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param type The type of T
+ * @param timeout send timeout in millis
+ * @param delayLevel message delay level(0 means no delay)
+ * @return
+ */
+ public <T> T sendAndReceive(String destination, Message<?> message, Type type, long timeout, int delayLevel) {
+ return sendAndReceive(destination, message, type, null, timeout, delayLevel);
+ }
+
+ /**
+ * @param destination formats: `topicName:tags`
+ * @param payload the payload to be sent.
+ * @param type The type of T
+ * @param timeout send timeout in millis
+ * @param delayLevel message delay level(0 means no delay)
+ * @return
+ */
+ public <T> T sendAndReceive(String destination, Object payload, Type type, long timeout, int delayLevel) {
+ return sendAndReceive(destination, payload, type, null, timeout, delayLevel);
+ }
+
+ /**
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param type The type of T
+ * @param hashKey needed when sending message orderly
+ * @return
+ */
+ public <T> T sendAndReceive(String destination, Message<?> message, Type type, String hashKey) {
+ return sendAndReceive(destination, message, type, hashKey, producer.getSendMsgTimeout(), 0);
+ }
+
+ /**
+ * @param destination formats: `topicName:tags`
+ * @param payload the payload to be sent.
+ * @param type The type of T
+ * @param hashKey needed when sending message orderly
+ * @return
+ */
+ public <T> T sendAndReceive(String destination, Object payload, Type type, String hashKey) {
+ return sendAndReceive(destination, payload, type, hashKey, producer.getSendMsgTimeout(), 0);
+ }
+
+ /**
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param type The type of T
+ * @param hashKey needed when sending message orderly
+ * @param timeout send timeout in millis
+ * @return
+ */
+ public <T> T sendAndReceive(String destination, Message<?> message, Type type, String hashKey, long timeout) {
+ return sendAndReceive(destination, message, type, hashKey, timeout, 0);
+ }
+
+ /**
+ * @param destination formats: `topicName:tags`
+ * @param payload the payload to be sent.
+ * @param type The type of T
+ * @param hashKey
+ * @return
+ */
+ public <T> T sendAndReceive(String destination, Object payload, Type type, String hashKey, long timeout) {
+ return sendAndReceive(destination, payload, type, hashKey, timeout, 0);
+ }
+
+ /**
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param type The type that receive
+ * @param hashKey needed when sending message orderly
+ * @param timeout send timeout in millis
+ * @param delayLevel message delay level(0 means no delay)
+ * @return
+ */
+ public <T> T sendAndReceive(String destination, Message<?> message, Type type, String hashKey,
+ long timeout, int delayLevel) {
+ if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+ log.error("send request message failed. destination:{}, message is null ", destination);
+ throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+ }
+
+ try {
+ org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
+ if (delayLevel > 0) {
+ rocketMsg.setDelayTimeLevel(delayLevel);
+ }
+ MessageExt replyMessage;
+
+ if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
+ replyMessage = (MessageExt) producer.request(rocketMsg, timeout);
+ } else {
+ replyMessage = (MessageExt) producer.request(rocketMsg, messageQueueSelector, hashKey, timeout);
+ }
+ return replyMessage != null ? (T) doConvertMessage(replyMessage, type) : null;
+ } catch (Exception e) {
+ log.error("send request message failed. destination:{}, message:{} ", destination, message);
+ throw new MessagingException(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * @param destination formats: `topicName:tags`
+ * @param payload the payload to be sent.
+ * @param type The type that receive
+ * @param hashKey needed when sending message orderly
+ * @param timeout send timeout in millis
+ * @param delayLevel message delay level(0 means no delay)
+ * @return
+ */
+ public <T> T sendAndReceive(String destination, Object payload, Type type, String hashKey,
+ long timeout, int delayLevel) {
+ Message<?> message = MessageBuilder.withPayload(payload).build();
+ return sendAndReceive(destination, message, type, hashKey, timeout, delayLevel);
+ }
+
+ /**
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
+ * @return
+ */
+ public void sendAndReceive(String destination, Message<?> message,
+ RocketMQLocalRequestCallback rocketMQLocalRequestCallback) {
+ sendAndReceive(destination, message, rocketMQLocalRequestCallback, null, producer.getSendMsgTimeout(), 0);
+ }
+
+ /**
+ * @param destination formats: `topicName:tags`
+ * @param payload the payload to be sent.
+ * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
+ * @return
+ */
+ public void sendAndReceive(String destination, Object payload,
+ RocketMQLocalRequestCallback rocketMQLocalRequestCallback) {
+ sendAndReceive(destination, payload, rocketMQLocalRequestCallback, null, producer.getSendMsgTimeout(), 0);
+ }
+
+ /**
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
+ * @param timeout send timeout in millis
+ * @return
+ */
+ public void sendAndReceive(String destination, Message<?> message,
+ RocketMQLocalRequestCallback rocketMQLocalRequestCallback, long timeout) {
+ sendAndReceive(destination, message, rocketMQLocalRequestCallback, null, timeout, 0);
+ }
+
+ /**
+ * @param destination formats: `topicName:tags`
+ * @param payload the payload to be sent.
+ * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
+ * @param timeout send timeout in millis
+ * @return
+ */
+ public void sendAndReceive(String destination, Object payload,
+ RocketMQLocalRequestCallback rocketMQLocalRequestCallback, long timeout) {
+ sendAndReceive(destination, payload, rocketMQLocalRequestCallback, null, timeout, 0);
+ }
+
+ /**
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
+ * @param timeout send timeout in millis
+ * @param delayLevel message delay level(0 means no delay)
+ * @return
+ */
+ public void sendAndReceive(String destination, Message<?> message,
+ RocketMQLocalRequestCallback rocketMQLocalRequestCallback, long timeout, int delayLevel) {
+ sendAndReceive(destination, message, rocketMQLocalRequestCallback, null, timeout, delayLevel);
+ }
+
+ /**
+ * @param destination formats: `topicName:tags`
+ * @param payload the payload to be sent.
+ * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
+ * @param hashKey needed when sending message orderly
+ * @return
+ */
+ public void sendAndReceive(String destination, Object payload,
+ RocketMQLocalRequestCallback rocketMQLocalRequestCallback, String hashKey) {
+ sendAndReceive(destination, payload, rocketMQLocalRequestCallback, hashKey, producer.getSendMsgTimeout(), 0);
+ }
+
+ /**
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
+ * @param hashKey needed when sending message orderly
+ * @param timeout send timeout in millis
+ * @return
+ */
+ public void sendAndReceive(String destination, Message<?> message,
+ RocketMQLocalRequestCallback rocketMQLocalRequestCallback, String hashKey, long timeout) {
+ sendAndReceive(destination, message, rocketMQLocalRequestCallback, hashKey, timeout, 0);
+ }
+
+ /**
+ * @param destination formats: `topicName:tags`
+ * @param payload the payload to be sent.
+ * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
+ * @param hashKey needed when sending message orderly
+ * @param timeout send timeout in millis
+ * @return
+ */
+ public void sendAndReceive(String destination, Object payload,
+ RocketMQLocalRequestCallback rocketMQLocalRequestCallback, String hashKey, long timeout) {
+ sendAndReceive(destination, payload, rocketMQLocalRequestCallback, hashKey, timeout, 0);
+ }
+
+ /**
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
+ * @param hashKey needed when sending message orderly
+ * @return
+ */
+ public void sendAndReceive(String destination, Message<?> message,
+ RocketMQLocalRequestCallback rocketMQLocalRequestCallback, String hashKey) {
+ sendAndReceive(destination, message, rocketMQLocalRequestCallback, hashKey, producer.getSendMsgTimeout(), 0);
+ }
+
+ /**
+ * @param destination formats: `topicName:tags`
+ * @param payload the payload to be sent.
+ * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
+ * @param timeout send timeout in millis
+ * @param delayLevel message delay level(0 means no delay)
+ * @return
+ */
+ public void sendAndReceive(String destination, Object payload,
+ RocketMQLocalRequestCallback rocketMQLocalRequestCallback, long timeout, int delayLevel) {
+ sendAndReceive(destination, payload, rocketMQLocalRequestCallback, null, timeout, delayLevel);
+ }
+
+ /**
+ * @param destination formats: `topicName:tags`
+ * @param payload the payload to be sent.
+ * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
+ * @param hashKey needed when sending message orderly
+ * @param timeout send timeout in millis
+ * @param delayLevel message delay level(0 means no delay)
+ * @return
+ */
+ public void sendAndReceive(String destination, Object payload,
+ RocketMQLocalRequestCallback rocketMQLocalRequestCallback, String hashKey, long timeout, int delayLevel) {
+ Message<?> message = MessageBuilder.withPayload(payload).build();
+ sendAndReceive(destination, message, rocketMQLocalRequestCallback, hashKey, timeout, delayLevel);
+ }
+
+ /**
+ * Send request message in asynchronous mode. </p> This method returns immediately. On receiving reply message,
+ * <code>rocketMQLocalRequestCallback</code> will be executed. </p>
+ *
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
+ * @param hashKey needed when sending message orderly
+ * @param timeout send timeout in millis
+ * @param delayLevel message delay level(0 means no delay)
+ * @return
+ */
+ public void sendAndReceive(String destination, Message<?> message,
+ RocketMQLocalRequestCallback rocketMQLocalRequestCallback, String hashKey, long timeout, int delayLevel) {
+ if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+ log.error("send request message failed. destination:{}, message is null ", destination);
+ throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+ }
+
+ try {
+ org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
+ if (delayLevel > 0) {
+ rocketMsg.setDelayTimeLevel(delayLevel);
+ }
+ if (timeout <= 0) {
+ timeout = producer.getSendMsgTimeout();
+ }
+ RequestCallback requestCallback = null;
+ if (rocketMQLocalRequestCallback != null) {
+ requestCallback = new RequestCallback() {
+ @Override public void onSuccess(org.apache.rocketmq.common.message.Message message) {
+ rocketMQLocalRequestCallback.onSuccess(doConvertMessage((MessageExt) message, getMessageType(rocketMQLocalRequestCallback)));
+ }
+
+ @Override public void onException(Throwable e) {
+ rocketMQLocalRequestCallback.onException(e);
+ }
+ };
+ }
+ if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
+ producer.request(rocketMsg, requestCallback, timeout);
+ } else {
+ producer.request(rocketMsg, messageQueueSelector, hashKey, requestCallback, timeout);
+ }
+ } catch (
+ Exception e) {
+ log.error("send request message failed. destination:{}, message:{} ", destination, message);
+ throw new MessagingException(e.getMessage(), e);
+ }
+
+ }
+
+ /**
* <p> Send message in synchronous mode. This method returns only when the sending procedure totally completes.
* Reliable synchronous transmission is used in extensive scenes, such as important notification messages, SMS
* notification, SMS marketing system, etc.. </p>
@@ -87,7 +447,7 @@
* duplication issue.
*
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
+ * @param message {@link org.springframework.messaging.Message}
* @return {@link SendResult}
*/
public SendResult syncSend(String destination, Message<?> message) {
@@ -98,8 +458,8 @@
* Same to {@link #syncSend(String, Message)} with send timeout specified in addition.
*
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
- * @param timeout send timeout with millis
+ * @param message {@link org.springframework.messaging.Message}
+ * @param timeout send timeout with millis
* @return {@link SendResult}
*/
public SendResult syncSend(String destination, Message<?> message, long timeout) {
@@ -110,8 +470,8 @@
* syncSend batch messages in a given timeout.
*
* @param destination formats: `topicName:tags`
- * @param messages Collection of {@link org.springframework.messaging.Message}
- * @param timeout send timeout with millis
+ * @param messages Collection of {@link org.springframework.messaging.Message}
+ * @param timeout send timeout with millis
* @return {@link SendResult}
*/
public <T extends Message> SendResult syncSend(String destination, Collection<T> messages, long timeout) {
@@ -147,9 +507,9 @@
* Same to {@link #syncSend(String, Message)} with send timeout specified in addition.
*
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
- * @param timeout send timeout with millis
- * @param delayLevel level for the delay message
+ * @param message {@link org.springframework.messaging.Message}
+ * @param timeout send timeout with millis
+ * @param delayLevel level for the delay message
* @return {@link SendResult}
*/
public SendResult syncSend(String destination, Message<?> message, long timeout, int delayLevel) {
@@ -179,7 +539,7 @@
* Same to {@link #syncSend(String, Message)}.
*
* @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
+ * @param payload the Object to use as payload
* @return {@link SendResult}
*/
public SendResult syncSend(String destination, Object payload) {
@@ -190,8 +550,8 @@
* Same to {@link #syncSend(String, Object)} with send timeout specified in addition.
*
* @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
- * @param timeout send timeout with millis
+ * @param payload the Object to use as payload
+ * @param timeout send timeout with millis
* @return {@link SendResult}
*/
public SendResult syncSend(String destination, Object payload, long timeout) {
@@ -203,8 +563,8 @@
* Same to {@link #syncSend(String, Message)} with send orderly with hashKey by specified.
*
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
- * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param message {@link org.springframework.messaging.Message}
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
* @return {@link SendResult}
*/
public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey) {
@@ -215,9 +575,9 @@
* Same to {@link #syncSendOrderly(String, Message, String)} with send timeout specified in addition.
*
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
- * @param hashKey use this key to select queue. for example: orderId, productId ...
- * @param timeout send timeout with millis
+ * @param message {@link org.springframework.messaging.Message}
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param timeout send timeout with millis
* @return {@link SendResult}
*/
public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) {
@@ -244,8 +604,8 @@
* Same to {@link #syncSend(String, Object)} with send orderly with hashKey by specified.
*
* @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
- * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param payload the Object to use as payload
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
* @return {@link SendResult}
*/
public SendResult syncSendOrderly(String destination, Object payload, String hashKey) {
@@ -256,9 +616,9 @@
* Same to {@link #syncSendOrderly(String, Object, String)} with send timeout specified in addition.
*
* @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
- * @param hashKey use this key to select queue. for example: orderId, productId ...
- * @param timeout send timeout with millis
+ * @param payload the Object to use as payload
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param timeout send timeout with millis
* @return {@link SendResult}
*/
public SendResult syncSendOrderly(String destination, Object payload, String hashKey, long timeout) {
@@ -270,11 +630,11 @@
* Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout and delay level specified in
* addition.
*
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message}
* @param sendCallback {@link SendCallback}
- * @param timeout send timeout with millis
- * @param delayLevel level for the delay message
+ * @param timeout send timeout with millis
+ * @param delayLevel level for the delay message
*/
public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout,
int delayLevel) {
@@ -297,10 +657,10 @@
/**
* Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout specified in addition.
*
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message}
* @param sendCallback {@link SendCallback}
- * @param timeout send timeout with millis
+ * @param timeout send timeout with millis
*/
public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout) {
asyncSend(destination, message, sendCallback, timeout, 0);
@@ -316,8 +676,8 @@
* DefaultMQProducer#getRetryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield
* message duplication and application developers are the one to resolve this potential issue.
*
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message}
* @param sendCallback {@link SendCallback}
*/
public void asyncSend(String destination, Message<?> message, SendCallback sendCallback) {
@@ -327,10 +687,10 @@
/**
* Same to {@link #asyncSend(String, Object, SendCallback)} with send timeout specified in addition.
*
- * @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
+ * @param destination formats: `topicName:tags`
+ * @param payload the Object to use as payload
* @param sendCallback {@link SendCallback}
- * @param timeout send timeout with millis
+ * @param timeout send timeout with millis
*/
public void asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout) {
Message<?> message = MessageBuilder.withPayload(payload).build();
@@ -340,8 +700,8 @@
/**
* Same to {@link #asyncSend(String, Message, SendCallback)}.
*
- * @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
+ * @param destination formats: `topicName:tags`
+ * @param payload the Object to use as payload
* @param sendCallback {@link SendCallback}
*/
public void asyncSend(String destination, Object payload, SendCallback sendCallback) {
@@ -352,11 +712,11 @@
* Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)} with send timeout specified in
* addition.
*
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
- * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message}
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
* @param sendCallback {@link SendCallback}
- * @param timeout send timeout with millis
+ * @param timeout send timeout with millis
*/
public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback,
long timeout) {
@@ -376,9 +736,9 @@
/**
* Same to {@link #asyncSend(String, Message, SendCallback)} with send orderly with hashKey by specified.
*
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
- * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message}
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
* @param sendCallback {@link SendCallback}
*/
public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback) {
@@ -388,9 +748,9 @@
/**
* Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)}.
*
- * @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
- * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param destination formats: `topicName:tags`
+ * @param payload the Object to use as payload
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
* @param sendCallback {@link SendCallback}
*/
public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback) {
@@ -400,11 +760,11 @@
/**
* Same to {@link #asyncSendOrderly(String, Object, String, SendCallback)} with send timeout specified in addition.
*
- * @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
- * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param destination formats: `topicName:tags`
+ * @param payload the Object to use as payload
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
* @param sendCallback {@link SendCallback}
- * @param timeout send timeout with millis
+ * @param timeout send timeout with millis
*/
public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback,
long timeout) {
@@ -419,7 +779,7 @@
* One-way transmission is used for cases requiring moderate reliability, such as log collection.
*
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
+ * @param message {@link org.springframework.messaging.Message}
*/
public void sendOneWay(String destination, Message<?> message) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
@@ -439,7 +799,7 @@
* Same to {@link #sendOneWay(String, Message)}
*
* @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
+ * @param payload the Object to use as payload
*/
public void sendOneWay(String destination, Object payload) {
Message<?> message = MessageBuilder.withPayload(payload).build();
@@ -450,8 +810,8 @@
* Same to {@link #sendOneWay(String, Message)} with send orderly with hashKey by specified.
*
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
- * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param message {@link org.springframework.messaging.Message}
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
*/
public void sendOneWayOrderly(String destination, Message<?> message, String hashKey) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
@@ -471,7 +831,7 @@
* Same to {@link #sendOneWayOrderly(String, Message, String)}
*
* @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
+ * @param payload the Object to use as payload
*/
public void sendOneWayOrderly(String destination, Object payload, String hashKey) {
Message<?> message = MessageBuilder.withPayload(payload).build();
@@ -511,9 +871,9 @@
/**
* Send Spring Message in Transaction
*
- * @param destination destination formats: `topicName:tags`
- * @param message message {@link org.springframework.messaging.Message}
- * @param arg ext arg
+ * @param destination destination formats: `topicName:tags`
+ * @param message message {@link org.springframework.messaging.Message}
+ * @param arg ext arg
* @return TransactionSendResult
* @throws MessagingException
*/
@@ -537,4 +897,57 @@
destination, msg);
}
+ private Object doConvertMessage(MessageExt messageExt, Type type) {
+ if (Objects.equals(type, MessageExt.class)) {
+ return messageExt;
+ } else if (Objects.equals(type, byte[].class)) {
+ return messageExt.getBody();
+ } else {
+ String str = new String(messageExt.getBody(), Charset.forName(charset));
+ if (Objects.equals(type, String.class)) {
+ return str;
+ } else {
+ // If msgType not string, use objectMapper change it.
+ try {
+ if (type instanceof Class) {
+ //if the messageType has not Generic Parameter
+ return this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) type);
+ } else {
+ //if the messageType has Generic Parameter, then use SmartMessageConverter#fromMessage with third parameter "conversionHint".
+ //we have validate the MessageConverter is SmartMessageConverter in this#getMethodParameter.
+ return ((SmartMessageConverter) this.getMessageConverter()).fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) ((ParameterizedType) type).getRawType(), null);
+ }
+ } catch (Exception e) {
+ log.error("convert failed. str:{}, msgType:{}", str, type);
+ throw new RuntimeException("cannot convert message to " + type, e);
+ }
+ }
+ }
+ }
+
+ private Type getMessageType(RocketMQLocalRequestCallback rocketMQLocalRequestCallback) {
+ Class<?> targetClass = AopProxyUtils.ultimateTargetClass(rocketMQLocalRequestCallback);
+ Type matchedGenericInterface = null;
+ while (Objects.nonNull(targetClass)) {
+ Type[] interfaces = targetClass.getGenericInterfaces();
+ if (Objects.nonNull(interfaces)) {
+ for (Type type : interfaces) {
+ if (type instanceof ParameterizedType && (Objects.equals(((ParameterizedType) type).getRawType(), RocketMQLocalRequestCallback.class))) {
+ matchedGenericInterface = type;
+ break;
+ }
+ }
+ }
+ targetClass = targetClass.getSuperclass();
+ }
+ if (Objects.isNull(matchedGenericInterface)) {
+ return Object.class;
+ }
+
+ Type[] actualTypeArguments = ((ParameterizedType) matchedGenericInterface).getActualTypeArguments();
+ if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
+ return actualTypeArguments[0];
+ }
+ return Object.class;
+ }
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
index 25ec320..2642d07 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
@@ -34,14 +34,20 @@
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.client.utils.MessageUtil;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
+import org.apache.rocketmq.spring.core.RocketMQReplyListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
@@ -51,10 +57,14 @@
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.MethodParameter;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
+import org.springframework.util.MimeTypeUtils;
@SuppressWarnings("WeakerAccess")
public class DefaultRocketMQListenerContainer implements InitializingBean,
@@ -92,6 +102,8 @@
private RocketMQListener rocketMQListener;
+ private RocketMQReplyListener rocketMQReplyListener;
+
private RocketMQMessageListener rocketMQMessageListener;
private DefaultMQPushConsumer consumer;
@@ -186,6 +198,14 @@
this.rocketMQListener = rocketMQListener;
}
+ public RocketMQReplyListener getRocketMQReplyListener() {
+ return rocketMQReplyListener;
+ }
+
+ public void setRocketMQReplyListener(RocketMQReplyListener rocketMQReplyListener) {
+ this.rocketMQReplyListener = rocketMQReplyListener;
+ }
+
public RocketMQMessageListener getRocketMQMessageListener() {
return rocketMQMessageListener;
}
@@ -209,14 +229,14 @@
return selectorType;
}
- public String getSelectorExpression() {
- return selectorExpression;
- }
-
public void setSelectorExpression(String selectorExpression) {
this.selectorExpression = selectorExpression;
}
+ public String getSelectorExpression() {
+ return selectorExpression;
+ }
+
public MessageModel getMessageModel() {
return messageModel;
}
@@ -230,11 +250,6 @@
}
@Override
- public void setupMessageListener(RocketMQListener rocketMQListener) {
- this.rocketMQListener = rocketMQListener;
- }
-
- @Override
public void destroy() {
this.setRunning(false);
if (Objects.nonNull(consumer)) {
@@ -327,6 +342,119 @@
this.name = name;
}
+ public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
+ for (MessageExt messageExt : msgs) {
+ log.debug("received msg: {}", messageExt);
+ try {
+ long now = System.currentTimeMillis();
+ handleMessage(messageExt);
+ long costTime = System.currentTimeMillis() - now;
+ log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
+ } catch (Exception e) {
+ log.warn("consume message failed. messageExt:{}, error:{}", messageExt, e);
+ context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+ }
+ }
+
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ }
+
+ public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
+ for (MessageExt messageExt : msgs) {
+ log.debug("received msg: {}", messageExt);
+ try {
+ long now = System.currentTimeMillis();
+ handleMessage(messageExt);
+ long costTime = System.currentTimeMillis() - now;
+ log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
+ } catch (Exception e) {
+ log.warn("consume message failed. messageExt:{}", messageExt, e);
+ context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
+ return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
+ }
+ }
+
+ return ConsumeOrderlyStatus.SUCCESS;
+ }
+ }
+
+ private void handleMessage(
+ MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException {
+ if (rocketMQListener != null) {
+ rocketMQListener.onMessage(doConvertMessage(messageExt));
+ } else if (rocketMQReplyListener != null) {
+ Object replyContent = rocketMQReplyListener.onMessage(doConvertMessage(messageExt));
+ Message<?> message = MessageBuilder.withPayload(replyContent).build();
+
+ org.apache.rocketmq.common.message.Message replyMessage = MessageUtil.createReplyMessage(messageExt, convertToBytes(message));
+ consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(replyMessage, new SendCallback() {
+ @Override public void onSuccess(SendResult sendResult) {
+ if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
+ log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());
+ } else {
+ log.info("Consumer replies message success.");
+ }
+ }
+
+ @Override public void onException(Throwable e) {
+ log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage());
+ }
+ });
+ }
+ }
+
+ private byte[] convertToBytes(Message<?> message) {
+ Message<?> messageWithSerializedPayload = doConvert(message.getPayload(), message.getHeaders());
+ Object payloadObj = messageWithSerializedPayload.getPayload();
+ byte[] payloads;
+ try {
+ if (null == payloadObj) {
+ throw new RuntimeException("the message cannot be empty");
+ }
+ if (payloadObj instanceof String) {
+ payloads = ((String) payloadObj).getBytes(Charset.forName(charset));
+ } else if (payloadObj instanceof byte[]) {
+ payloads = (byte[]) messageWithSerializedPayload.getPayload();
+ } else {
+ String jsonObj = (String) this.messageConverter.fromMessage(messageWithSerializedPayload, payloadObj.getClass());
+ if (null == jsonObj) {
+ throw new RuntimeException(String.format(
+ "empty after conversion [messageConverter:%s,payloadClass:%s,payloadObj:%s]",
+ this.messageConverter.getClass(), payloadObj.getClass(), payloadObj));
+ }
+ payloads = jsonObj.getBytes(Charset.forName(charset));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("convert to bytes failed.", e);
+ }
+ return payloads;
+ }
+
+ private Message<?> doConvert(Object payload, MessageHeaders headers) {
+ Message<?> message = this.messageConverter instanceof SmartMessageConverter ?
+ ((SmartMessageConverter) this.messageConverter).toMessage(payload, headers, null) :
+ this.messageConverter.toMessage(payload, headers);
+ if (message == null) {
+ String payloadType = payload.getClass().getName();
+ Object contentType = headers != null ? headers.get(MessageHeaders.CONTENT_TYPE) : null;
+ throw new MessageConversionException("Unable to convert payload with type='" + payloadType +
+ "', contentType='" + contentType + "', converter=[" + this.messageConverter + "]");
+ }
+ MessageBuilder<?> builder = MessageBuilder.fromMessage(message);
+ builder.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN);
+ return builder.build();
+ }
+
@SuppressWarnings("unchecked")
private Object doConvertMessage(MessageExt messageExt) {
if (Objects.equals(messageType, MessageExt.class)) {
@@ -355,7 +483,12 @@
}
private MethodParameter getMethodParameter() {
- Class<?> targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener);
+ Class<?> targetClass;
+ if (rocketMQListener != null) {
+ targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener);
+ } else {
+ targetClass = AopProxyUtils.ultimateTargetClass(rocketMQReplyListener);
+ }
Type messageType = this.getMessageType();
Class clazz = null;
if (messageType instanceof ParameterizedType && messageConverter instanceof SmartMessageConverter) {
@@ -375,14 +508,19 @@
}
private Type getMessageType() {
- Class<?> targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener);
+ Class<?> targetClass;
+ if (rocketMQListener != null) {
+ targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener);
+ } else {
+ targetClass = AopProxyUtils.ultimateTargetClass(rocketMQReplyListener);
+ }
Type matchedGenericInterface = null;
while (Objects.nonNull(targetClass)) {
Type[] interfaces = targetClass.getGenericInterfaces();
if (Objects.nonNull(interfaces)) {
for (Type type : interfaces) {
- if (type instanceof ParameterizedType
- && Objects.equals(((ParameterizedType) type).getRawType(), RocketMQListener.class)) {
+ if (type instanceof ParameterizedType &&
+ (Objects.equals(((ParameterizedType) type).getRawType(), RocketMQListener.class) || Objects.equals(((ParameterizedType) type).getRawType(), RocketMQReplyListener.class))) {
matchedGenericInterface = type;
break;
}
@@ -401,10 +539,10 @@
return Object.class;
}
-
-
private void initRocketMQPushConsumer() throws MQClientException {
- Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required");
+ if (rocketMQListener == null && rocketMQReplyListener == null) {
+ throw new IllegalArgumentException("Property 'rocketMQListener' or 'rocketMQReplyListener' is required");
+ }
Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
Assert.notNull(nameServer, "Property 'nameServer' is required");
Assert.notNull(topic, "Property 'topic' is required");
@@ -475,54 +613,10 @@
if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
+ } else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) {
+ ((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);
}
}
- public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
-
- @SuppressWarnings("unchecked")
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- for (MessageExt messageExt : msgs) {
- log.debug("received msg: {}", messageExt);
- try {
- long now = System.currentTimeMillis();
- rocketMQListener.onMessage(doConvertMessage(messageExt));
- long costTime = System.currentTimeMillis() - now;
- log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
- } catch (Exception e) {
- log.warn("consume message failed. messageExt:{}", messageExt, e);
- context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;
- }
- }
-
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- }
-
- public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
-
- @SuppressWarnings("unchecked")
- @Override
- public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
- for (MessageExt messageExt : msgs) {
- log.debug("received msg: {}", messageExt);
- try {
- long now = System.currentTimeMillis();
- rocketMQListener.onMessage(doConvertMessage(messageExt));
- long costTime = System.currentTimeMillis() - now;
- log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
- } catch (Exception e) {
- log.warn("consume message failed. messageExt:{}", messageExt, e);
- context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
- return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
- }
- }
-
- return ConsumeOrderlyStatus.SUCCESS;
- }
- }
-
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQListenerContainer.java
index ee52de8..d9693bc 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQListenerContainer.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQListenerContainer.java
@@ -17,14 +17,8 @@
package org.apache.rocketmq.spring.support;
-import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.DisposableBean;
public interface RocketMQListenerContainer extends DisposableBean {
- /**
- * Setup the message listener to use. Throws an {@link IllegalArgumentException} if that message listener type is
- * not supported.
- */
- void setupMessageListener(RocketMQListener<?> messageListener);
}
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
index 553183d..18a32f8 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
@@ -27,6 +27,7 @@
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
+import org.apache.rocketmq.spring.core.RocketMQReplyListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.junit.Assert;
@@ -183,6 +184,24 @@
});
}
+ @Test
+ public void testRocketMQListenerContainer_RocketMQReplyListener() {
+ runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876").
+ withUserConfiguration(TestConfigWithRocketMQReplyListener.class).
+ run((context) -> {
+ assertThat(context).getFailure().hasMessageContaining("connect to [127.0.0.1:9876] failed");
+ });
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testRocketMQListenerContainer_WrongRocketMQListenerType() {
+ runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876").
+ withUserConfiguration(TestConfigWithWrongRocketMQListener.class).
+ run((context) -> {
+ context.getBean(RocketMQMessageConverter.class);
+ });
+ }
+
@Configuration
static class TestConfig {
@@ -199,6 +218,30 @@
}
@Configuration
+ static class TestConfigWithRocketMQReplyListener {
+
+ @Bean
+ public Object consumeListener() {
+ return new TestDefaultNameServerRocketMQReplyListener();
+ }
+
+ @Bean
+ public Object consumeListener1() {
+ return new TestCustomNameServerRocketMQReplyListener();
+ }
+
+ }
+
+ @Configuration
+ static class TestConfigWithWrongRocketMQListener {
+
+ @Bean
+ public Object consumeListener() {
+ return new WrongRocketMQListener();
+ }
+ }
+
+ @Configuration
static class CustomObjectMapperConfig {
@Bean
@@ -236,6 +279,32 @@
}
}
+ @RocketMQMessageListener(consumerGroup = "abcd", topic = "test")
+ static class TestDefaultNameServerRocketMQReplyListener implements RocketMQReplyListener<String, String> {
+
+ @Override
+ public String onMessage(String message) {
+ return "test";
+ }
+ }
+
+ @RocketMQMessageListener(consumerGroup = "abcde", topic = "test")
+ static class WrongRocketMQListener {
+
+ public String onMessage(String message) {
+ return "test";
+ }
+ }
+
+ @RocketMQMessageListener(nameServer = "127.0.1.1:9876", consumerGroup = "abcd1", topic = "test")
+ static class TestCustomNameServerRocketMQReplyListener implements RocketMQReplyListener<String, String> {
+
+ @Override
+ public String onMessage(String message) {
+ return "test";
+ }
+ }
+
@Configuration
static class TestTransactionListenerConfig {
@Bean
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
index a6cc91d..da6d777 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
@@ -21,6 +21,7 @@
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
+import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
import org.junit.Test;
@@ -28,10 +29,13 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
+import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(properties = {
@@ -48,6 +52,12 @@
@Value("${test.rocketmq.topic}")
String topic;
+ @Value("stringRequestTopic:tagA")
+ String stringRequestTopic;
+
+ @Value("objectRequestTopic:tagA")
+ String objectRequestTopic;
+
@Test
public void testSendMessage() {
try {
@@ -78,6 +88,104 @@
}
@Test
+ public void testSendAndReceive_NullMessage() {
+ try {
+ String response = rocketMQTemplate.sendAndReceive(stringRequestTopic, new Message<String>() {
+ @Override public String getPayload() {
+ return null;
+ }
+
+ @Override public MessageHeaders getHeaders() {
+ return null;
+ }
+ }, String.class);
+ } catch (IllegalArgumentException e) {
+ assertThat(e).hasMessageContaining("`message` and `message.payload` cannot be null");
+ }
+
+ try {
+ String response = rocketMQTemplate.sendAndReceive(stringRequestTopic, (Object) null, String.class);
+ } catch (IllegalArgumentException e) {
+ assertThat(e).hasMessageContaining("Payload must not be null");
+ }
+ }
+
+ @Test
+ public void testSendAndReceive_Sync() throws InterruptedException {
+ try {
+ String responseMessage = rocketMQTemplate.sendAndReceive(stringRequestTopic, MessageBuilder.withPayload("requestTopicSync").build(), String.class);
+ assertThat(responseMessage).isNotNull();
+ } catch (MessagingException e) {
+ assertThat(e).hasMessageContaining("org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [127.0.0.1:9876] failed");
+ }
+
+ try {
+ String responseMessage = rocketMQTemplate.sendAndReceive(stringRequestTopic, "requestTopicSync", String.class, "orderId");
+ assertThat(responseMessage).isNotNull();
+ } catch (MessagingException e) {
+ assertThat(e).hasMessageContaining("org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [127.0.0.1:9876] failed");
+ }
+ }
+
+ @Test
+ public void testSendAndReceive_Async() {
+ try {
+ rocketMQTemplate.sendAndReceive(stringRequestTopic, MessageBuilder.withPayload("requestTopicASync").build(), new RocketMQLocalRequestCallback<String>() {
+ @Override public void onSuccess(String message) {
+ System.out.printf("receive string: %s %n", message);
+ }
+
+ @Override public void onException(Throwable e) {
+ e.printStackTrace();
+ }
+ });
+ } catch (MessagingException e) {
+ assertThat(e).hasMessageContaining("org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [127.0.0.1:9876] failed");
+ }
+
+ try {
+ rocketMQTemplate.sendAndReceive(stringRequestTopic, "requestTopicAsyncWithHasKey", new RocketMQLocalRequestCallback<String>() {
+ @Override public void onSuccess(String message) {
+ System.out.printf("receive string: %s %n", message);
+ }
+
+ @Override public void onException(Throwable e) {
+ e.printStackTrace();
+ }
+ }, "order-id");
+ } catch (MessagingException e) {
+ assertThat(e).hasMessageContaining("org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [127.0.0.1:9876] failed");
+ }
+
+ try {
+ rocketMQTemplate.sendAndReceive(stringRequestTopic, "requestTopicAsyncWithTimeout", new RocketMQLocalRequestCallback<String>() {
+ @Override public void onSuccess(String message) {
+ System.out.printf("receive string: %s %n", message);
+ }
+
+ @Override public void onException(Throwable e) {
+ e.printStackTrace();
+ }
+ }, "order-id", 5000);
+ } catch (MessagingException e) {
+ assertThat(e).hasMessageContaining("org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [127.0.0.1:9876] failed");
+ }
+ try {
+ rocketMQTemplate.sendAndReceive(objectRequestTopic, "requestTopicAsyncWithTimeout", new RocketMQLocalRequestCallback<MessageExt>() {
+ @Override public void onSuccess(MessageExt message) {
+ System.out.printf("receive messageExt: %s %n", message.toString());
+ }
+
+ @Override public void onException(Throwable e) {
+ e.printStackTrace();
+ }
+ }, 5000);
+ } catch (MessagingException e) {
+ assertThat(e).hasMessageContaining("org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [127.0.0.1:9876] failed");
+ }
+ }
+
+ @Test
public void testProperties() {
assertThat(rocketMQTemplate.getProducer().getNamesrvAddr()).isEqualTo("127.0.0.1:9876");
assertThat(rocketMQTemplate.getProducer().getProducerGroup()).isEqualTo("rocketMQTemplate-test-producer_group");
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
index 60014c9..7133b98 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
@@ -16,12 +16,16 @@
*/
package org.apache.rocketmq.spring.support;
+import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.apache.rocketmq.spring.core.RocketMQReplyListener;
import org.junit.Test;
import org.springframework.core.MethodParameter;
+import org.springframework.messaging.Message;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.StringMessageConverter;
@@ -29,6 +33,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
+import org.springframework.messaging.support.MessageBuilder;
import static org.assertj.core.api.Assertions.assertThat;
@@ -54,6 +59,64 @@
});
result = (Class) getMessageType.invoke(listenerContainer);
assertThat(result.getName().equals(MessageExt.class.getName()));
+
+ listenerContainer.setRocketMQReplyListener(new RocketMQReplyListener<MessageExt, String>() {
+ @Override
+ public String onMessage(MessageExt message) {
+ return "test";
+ }
+ });
+ result = (Class) getMessageType.invoke(listenerContainer);
+ assertThat(result.getName().equals(MessageExt.class.getName()));
+
+ listenerContainer.setRocketMQReplyListener(new RocketMQReplyListener<String, String>() {
+ @Override
+ public String onMessage(String message) {
+ return "test";
+ }
+ });
+ result = (Class) getMessageType.invoke(listenerContainer);
+ assertThat(result.getName().equals(String.class.getName()));
+ }
+
+ @Test
+ public void testDoConvertMessage() throws Exception {
+ DefaultRocketMQListenerContainer listenerContainer = new DefaultRocketMQListenerContainer();
+ Method doConvertMessage = DefaultRocketMQListenerContainer.class.getDeclaredMethod("doConvertMessage", MessageExt.class);
+ doConvertMessage.setAccessible(true);
+
+ listenerContainer.setRocketMQListener(new RocketMQListener<String>() {
+ @Override
+ public void onMessage(String message) {
+ }
+ });
+
+ Field messageType = DefaultRocketMQListenerContainer.class.getDeclaredField("messageType");
+ messageType.setAccessible(true);
+ messageType.set(listenerContainer, String.class);
+ MessageExt messageExt = new MessageExt(0, System.currentTimeMillis(), null, System.currentTimeMillis(), null, null);
+ messageExt.setBody("hello".getBytes());
+ String result = (String) doConvertMessage.invoke(listenerContainer, messageExt);
+ assertThat(result).isEqualTo("hello");
+
+ listenerContainer.setRocketMQListener(new RocketMQListener<MessageExt>() {
+ @Override
+ public void onMessage(MessageExt message) {
+ }
+ });
+ Field messageType2 = DefaultRocketMQListenerContainer.class.getDeclaredField("messageType");
+ messageType2.setAccessible(true);
+ messageType2.set(listenerContainer, MessageExt.class);
+ messageExt = new MessageExt(0, System.currentTimeMillis(), null, System.currentTimeMillis(), null, null);
+ messageExt.setBody("hello".getBytes());
+ MessageExt result2 = (MessageExt) doConvertMessage.invoke(listenerContainer, messageExt);
+ assertThat(result2).isEqualTo(messageExt);
+
+ listenerContainer.setRocketMQListener(new RocketMQListener<User>() {
+ @Override
+ public void onMessage(User message) {
+ }
+ });
}
@Test
@@ -76,6 +139,41 @@
assertThat(type.getRawType() == ArrayList.class);
MethodParameter methodParameter = ((MethodParameter) getMethodParameter.invoke(listenerContainer));
assertThat(methodParameter.getParameterType() == ArrayList.class);
+
+ listenerContainer.setRocketMQReplyListener(new RocketMQReplyListener<ArrayList<Date>, String>() {
+ @Override
+ public String onMessage(ArrayList<Date> message) {
+ return "test";
+ }
+ });
+
+ type = (ParameterizedType) getMessageType.invoke(listenerContainer);
+ assertThat(type.getRawType() == ArrayList.class);
+ methodParameter = ((MethodParameter) getMethodParameter.invoke(listenerContainer));
+ assertThat(methodParameter.getParameterType() == ArrayList.class);
+ }
+
+ class User {
+ private String userName;
+ private int userAge;
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public User setUserName(String userName) {
+ this.userName = userName;
+ return this;
+ }
+
+ public int getUserAge() {
+ return userAge;
+ }
+
+ public User setUserAge(int userAge) {
+ this.userAge = userAge;
+ return this;
+ }
}
}